IO::Async::Notifier->VERSION(
'0.63'
);
our
$VERSION
=
'0.18'
;
$VERSION
=
eval
$VERSION
;
struct
User
=> [
qw( user_id displayname presence last_active )
];
use
constant
PATH_PREFIX
=>
"/_matrix/client/api/v1"
;
sub
_init
{
my
$self
=
shift
;
my
(
$params
) =
@_
;
$self
->SUPER::_init(
$params
);
$params
->{ua} ||=
do
{
Net::Async::HTTP->VERSION(
'0.36'
);
my
$ua
= Net::Async::HTTP->new(
fail_on_error
=> 1,
max_connections_per_host
=> 3,
user_agent
=> __PACKAGE__,
pipeline
=> 0,
);
$self
->add_child(
$ua
);
$ua
};
$self
->{make_delay} =
delete
$params
->{make_delay} ||
$self
->_capture_weakself(
sub
{
my
(
$self
,
$secs
) =
@_
;
$self
->loop->delay_future(
after
=>
$secs
);
} );
$self
->{msgid_next} = 0;
$self
->{users_by_id} = {};
$self
->{rooms_by_id} = {};
$self
->{path_prefix} = PATH_PREFIX;
$self
->{longpoll_timeout} = LONGPOLL_TIMEOUT;
}
sub
configure
{
my
$self
=
shift
;
my
%params
=
@_
;
foreach
(
qw( server path_prefix ua SSL enable_events longpoll_timeout
on_log on_unknown_event on_presence on_room_new on_room_del on_invite
on_room_member on_room_message )
) {
$self
->{
$_
} =
delete
$params
{
$_
}
if
exists
$params
{
$_
};
}
my
$ua
=
$self
->{ua};
foreach
(
grep
{ m/^SSL_/ }
keys
%params
) {
$ua
->configure(
$_
=>
delete
$params
{
$_
} );
}
$self
->SUPER::configure(
%params
);
}
sub
log
{
my
$self
=
shift
;
my
(
$message
) =
@_
;
$self
->{on_log}->(
$message
)
if
$self
->{on_log};
}
sub
_uri_for_path
{
my
$self
=
shift
;
my
(
$path
,
%params
) =
@_
;
$path
=
"/$path"
unless
$path
=~ m{^/};
my
$uri
= URI->new;
$uri
->scheme(
$self
->{SSL} ?
"https"
:
"http"
);
$uri
->authority(
$self
->{server} );
$uri
->path(
$self
->{path_prefix} .
$path
);
$params
{access_token} =
$self
->{access_token}
if
defined
$self
->{access_token};
$uri
->query_form(
%params
);
return
$uri
;
}
sub
_do_GET_json
{
my
$self
=
shift
;
my
(
$path
,
%params
) =
@_
;
$self
->{ua}->GET(
$self
->_uri_for_path(
$path
,
%params
) )->then(
sub
{
my
(
$response
) =
@_
;
$response
->content_type eq
"application/json"
or
return
Future->fail(
"Expected application/json response"
,
matrix
=> );
Future->done( decode_json(
$response
->content ),
$response
);
});
}
sub
_do_send_json
{
my
$self
=
shift
;
my
(
$method
,
$path
,
$content
) =
@_
;
my
$req
= HTTP::Request->new(
$method
,
$self
->_uri_for_path(
$path
) );
$req
->content( encode_json(
$content
) );
$req
->header(
Content_length
=>
length
$req
->content );
$req
->header(
Content_type
=>
"application/json"
);
my
$f
=
$self
->{ua}->do_request(
request
=>
$req
,
)->then(
sub
{
my
(
$response
) =
@_
;
$response
->content_type eq
"application/json"
or
return
Future->fail(
"Expected application/json response"
,
matrix
=> );
my
$content
=
$response
->content;
if
(
length
$content
and
$content
ne
q("")
) {
eval
{
$content
= decode_json(
$content
);
1;
} or
return
Future->fail(
"Unable to parse JSON response $content"
);
return
Future->done(
$content
,
$response
);
}
else
{
return
Future->done(
undef
,
$response
);
}
});
return
$self
->adopt_future(
$f
);
}
sub
_do_PUT_json {
shift
->_do_send_json(
PUT
=>
@_
) }
sub
_do_POST_json {
shift
->_do_send_json(
POST
=>
@_
) }
sub
_do_DELETE
{
my
$self
=
shift
;
my
(
$path
,
%params
) =
@_
;
$self
->{ua}->do_request(
method
=>
"DELETE"
,
uri
=>
$self
->_uri_for_path(
$path
,
%params
),
);
}
sub
_do_POST_file
{
my
$self
=
shift
;
my
(
$path
,
%params
) =
@_
;
my
$uri
=
$self
->_uri_for_path(
""
);
$uri
->path(
"/_matrix"
.
$path
);
my
$req
= HTTP::Request->new(
"POST"
,
$uri
);
$req
->header(
Content_type
=>
$params
{content_type} );
my
$body
;
if
(
defined
$params
{content} ) {
$req
->content(
$params
{content} );
$req
->header(
Content_length
=>
length
$req
->content );
}
elsif
(
defined
$params
{file} or
defined
$params
{fh} ) {
my
$fh
=
$params
{fh};
$fh
or
open
$fh
,
"<"
,
$params
{file} or
return
Future->fail(
"Cannot read $params{file} - $!"
,
open
=> );
$body
=
sub
{
$fh
->
read
(
my
$buffer
, 65536 ) or
return
undef
;
return
$buffer
;
};
$req
->header(
Content_length
=>
$params
{content_length} // (
stat
$fh
)->size );
}
my
$f
=
$self
->{ua}->do_request(
request
=>
$req
,
request_body
=>
$body
,
)->then(
sub
{
my
(
$response
) =
@_
;
$response
->content_type eq
"application/json"
or
return
Future->fail(
"Expected application/json response"
,
matrix
=> );
my
$content
=
$response
->content;
my
$uri
;
if
(
length
$content
and
$content
ne
q("")
) {
eval
{
$content
= decode_json(
$content
);
1;
} or
return
Future->fail(
"Unable to parse JSON response "
);
return
Future->done(
$content
,
$response
);
}
else
{
return
Future->done(
undef
,
$response
);
}
});
return
$self
->adopt_future(
$f
);
}
sub
login
{
my
$self
=
shift
;
my
%params
=
@_
;
if
(
defined
$params
{user_id} and
defined
$params
{access_token} ) {
$self
->{
$_
} =
$params
{
$_
}
for
qw( user_id access_token )
;
$self
->configure(
notifier_name
=>
"uid=$params{user_id}"
);
$self
->start;
return
Future->done(
$self
);
}
$self
->_do_GET_json(
"/login"
)->then(
sub
{
my
(
$response
) =
@_
;
my
$flows
=
$response
->{flows};
my
@supported
;
foreach
my
$flow
(
@$flows
) {
next
unless
my
(
$type
) =
$flow
->{type} =~ m/^m\.login\.(.*)$/;
push
@supported
,
$type
;
next
unless
my
$code
=
$self
->can(
"_login_with_$type"
);
next
unless
my
$f
=
$code
->(
$self
,
%params
);
return
$f
;
}
Future->fail(
"Unsure how to log in (server supports @supported)"
,
matrix
=> );
});
}
sub
_login_with_password
{
my
$self
=
shift
;
my
%params
=
@_
;
return
unless
defined
$params
{user_id} and
defined
$params
{password};
$self
->_do_POST_json(
"/login"
,
{
type
=>
"m.login.password"
,
user
=>
$params
{user_id},
password
=>
$params
{password} }
)->then(
sub
{
my
(
$resp
) =
@_
;
return
$self
->login(
%$resp
)
if
defined
$resp
->{access_token};
return
Future->fail(
"Expected server to respond with 'access_token'"
,
matrix
=> );
});
}
sub
register
{
my
$self
=
shift
;
my
%params
=
@_
;
$self
->_do_GET_json(
"/register"
)->then(
sub
{
my
(
$response
) =
@_
;
my
$flows
=
$response
->{flows};
my
@supported
;
FLOW:
foreach
my
$flow
(
@$flows
) {
my
@stages
=
$flow
->{stages} ? @{
$flow
->{stages} } : (
$flow
->{type} );
push
@supported
,
join
","
,
@stages
;
my
@flowcode
;
foreach
my
$stage
(
@stages
) {
next
FLOW
unless
my
(
$type
) =
$stage
=~ m/^m\.login\.(.*)$/;
$type
=~ s/\./_/g;
next
FLOW
unless
my
$method
=
$self
->can(
"_register_with_$type"
);
next
FLOW
unless
my
$code
=
$method
->(
$self
,
%params
);
push
@flowcode
,
$code
;
}
my
$start
= Future->new;
my
$tail
=
$start
;
$tail
=
$tail
->then(
$_
)
for
@flowcode
;
$start
->done();
return
$tail
->then(
sub
{
my
(
$resp
) =
@_
;
return
$self
->login(
%$resp
)
if
defined
$resp
->{access_token};
return
Future->fail(
"Expected server to respond with 'access_token'"
,
matrix
=> );
});
}
Future->fail(
"Unsure how to register (server supports @supported)"
,
matrix
=> );
});
}
sub
_register_with_password
{
my
$self
=
shift
;
my
%params
=
@_
;
return
unless
defined
(
my
$password
=
$params
{password} );
return
sub
{
my
(
$resp
) =
@_
;
$self
->_do_POST_json(
"/register"
, {
type
=>
"m.login.password"
,
session
=>
$resp
->{session},
user
=>
$params
{user_id},
password
=>
$password
,
} );
}
}
sub
_register_with_recaptcha
{
my
$self
=
shift
;
my
%params
=
@_
;
return
unless
defined
(
my
$secret
=
$params
{captcha_bypass_secret} ) and
defined
$params
{user_id};
warn
"Cannot use captcha_bypass_secret to bypass m.register.recaptcha without Digest::HMAC_SHA1\n"
and
return
if
!HAVE_DIGEST_HMAC_SHA1;
my
$digest
= Digest::HMAC_SHA1::hmac_sha1_hex(
$params
{user_id},
$secret
);
return
sub
{
my
(
$resp
) =
@_
;
$self
->_do_POST_json(
"/register"
, {
type
=>
"m.login.recaptcha"
,
session
=>
$resp
->{session},
user
=>
$params
{user_id},
captcha_bypass_hmac
=>
$digest
,
} );
};
}
sub
await_synced
{
my
$self
=
shift
;
return
$self
->{synced_future} //=
$self
->loop->new_future;
}
sub
start
{
my
$self
=
shift
;
defined
$self
->{access_token} or croak
"Cannot ->start without an access token"
;
return
$self
->{start_f} ||=
do
{
undef
$self
->{synced_future};
foreach
my
$room
(
values
%{
$self
->{rooms_by_id} } ) {
$room
->_reset_for_sync;
}
my
$f
=
$self
->_do_GET_json(
"/initialSync"
,
limit
=> 0 )
->on_fail(
sub
{
undef
$self
->{start_f} } )
->then(
sub
{
my
(
$sync
) =
@_
;
foreach
( @{
$sync
->{rooms} } ) {
my
$room_id
=
$_
->{room_id};
my
$membership
=
$_
->{membership};
if
(
$membership
eq
"join"
) {
my
$state
=
$_
->{state};
my
$room
=
$self
->_get_or_make_room(
$room_id
);
next
if
$room
->await_synced->is_ready;
$room
->_handle_event_initial(
$_
)
for
@$state
;
$room
->await_synced->done;
$room
->maybe_invoke_event(
on_synced_state
=> );
}
elsif
(
$membership
eq
"invite"
) {
$self
->maybe_invoke_event(
on_invite
=>
$_
);
}
else
{
$self
->
log
(
"TODO: imsync returned a room in membership state $membership"
);
}
}
foreach
( @{
$sync
->{presence} } ) {
$self
->_incoming_event(
$_
);
}
$self
->await_synced->done;
$self
->start_longpoll(
start
=>
$sync
->{end} )
if
$self
->{enable_events} // 1;
Future->done;
});
$self
->adopt_future(
$f
);
};
}
sub
stop
{
my
$self
=
shift
;
(
delete
$self
->{start_f} )->cancel
if
$self
->{start_f};
$self
->stop_longpoll;
}
sub
start_longpoll
{
my
$self
=
shift
;
my
%args
=
@_
;
$self
->stop_longpoll;
$self
->{longpoll_last_token} =
$args
{start} //
"END"
;
my
$f
=
$self
->{longpoll_f} = repeat {
my
$last_token
=
$self
->{longpoll_last_token};
Future->wait_any(
$self
->{make_delay}->(
$self
->{longpoll_timeout} + 5 )
->else_fail(
"Longpoll timed out"
),
$self
->_do_GET_json(
"/events"
,
$last_token
? (
from
=>
$last_token
) : (),
timeout
=>
$self
->{longpoll_timeout} * 1000,
)->then(
sub
{
my
(
$data
) =
@_
;
$self
->_incoming_event(
$_
)
foreach
@{
$data
->{chunk} };
$self
->{longpoll_last_token} =
$data
->{end};
Future->done();
}),
)->
else
(
sub
{
my
(
$failure
) =
@_
;
warn
"Longpoll failed - $failure\n"
;
$self
->{make_delay}->( 3 )
});
}
while
=>
sub
{ !
shift
->failure };
$f
->on_fail(
$self
->_capture_weakself(
sub
{
my
$self
=
shift
;
$self
->invoke_error(
@_
);
}));
}
sub
stop_longpoll
{
my
$self
=
shift
;
(
delete
$self
->{longpoll_f} )->cancel
if
$self
->{longpoll_f};
}
sub
_get_or_make_user
{
my
$self
=
shift
;
my
(
$user_id
) =
@_
;
return
$self
->{users_by_id}{
$user_id
} ||= User(
$user_id
,
undef
,
undef
,
undef
);
}
sub
_make_room
{
my
$self
=
shift
;
my
(
$room_id
) =
@_
;
$self
->{rooms_by_id}{
$room_id
} and
croak
"Already have a room with ID '$room_id'"
;
my
@args
;
foreach
(
qw( message member )
) {
push
@args
,
"on_$_"
=>
$self
->{
"on_room_$_"
}
if
$self
->{
"on_room_$_"
};
}
my
$room
=
$self
->{rooms_by_id}{
$room_id
} =
$self
->make_room(
matrix
=>
$self
,
room_id
=>
$room_id
,
@args
,
);
$self
->add_child(
$room
);
$self
->maybe_invoke_event(
on_room_new
=>
$room
);
return
$room
;
}
sub
make_room
{
my
$self
=
shift
;
return
Net::Async::Matrix::Room->new(
@_
);
}
sub
_get_or_make_room
{
my
$self
=
shift
;
my
(
$room_id
) =
@_
;
return
$self
->{rooms_by_id}{
$room_id
} //
$self
->_make_room(
$room_id
);
}
sub
myself
{
my
$self
=
shift
;
return
$self
->_get_or_make_user(
$self
->{user_id} );
}
sub
user
{
my
$self
=
shift
;
my
(
$user_id
) =
@_
;
return
$self
->{users_by_id}{
$user_id
};
}
sub
_incoming_event
{
my
$self
=
shift
;
my
(
$event
) =
@_
;
my
@type_parts
=
split
m/\./,
$event
->{type};
my
@subtype_args
;
while
(
@type_parts
) {
if
(
my
$handler
=
$self
->can(
"_handle_event_"
.
join
"_"
,
@type_parts
) ) {
$handler
->(
$self
,
@subtype_args
,
$event
);
return
;
}
unshift
@subtype_args
,
pop
@type_parts
;
}
$self
->maybe_invoke_event(
on_unknown_event
=>
$event
) or
$self
->
log
(
" incoming event="
.pp(
$event
) );
}
sub
_on_self_leave
{
my
$self
=
shift
;
my
(
$room
) =
@_
;
$self
->maybe_invoke_event(
on_room_del
=>
$room
);
delete
$self
->{rooms_by_id}{
$room
->room_id};
}
sub
get_displayname
{
my
$self
=
shift
;
my
(
$user_id
) =
@_
;
$user_id
//=
$self
->{user_id};
$self
->_do_GET_json(
"/profile/$user_id/displayname"
)->then(
sub
{
my
(
$content
) =
@_
;
Future->done(
$content
->{displayname} );
});
}
sub
set_displayname
{
my
$self
=
shift
;
my
(
$name
) =
@_
;
$self
->_do_PUT_json(
"/profile/$self->{user_id}/displayname"
,
{
displayname
=>
$name
}
);
}
sub
get_presence
{
my
$self
=
shift
;
$self
->_do_GET_json(
"/presence/$self->{user_id}/status"
)->then(
sub
{
my
(
$status
) =
@_
;
Future->done(
$status
->{presence},
$status
->{status_msg} );
});
}
sub
set_presence
{
my
$self
=
shift
;
my
(
$presence
,
$msg
) =
@_
;
my
$status
= {
presence
=>
$presence
,
};
$status
->{status_msg} =
$msg
if
defined
$msg
;
$self
->_do_PUT_json(
"/presence/$self->{user_id}/status"
,
$status
)
}
sub
get_presence_list
{
my
$self
=
shift
;
$self
->_do_GET_json(
"/presence_list/$self->{user_id}"
)->then(
sub
{
my
(
$events
) =
@_
;
my
@users
;
foreach
my
$event
(
@$events
) {
my
$user
=
$self
->_get_or_make_user(
$event
->{user_id} );
foreach
(
qw( presence displayname )
) {
$user
->
$_
=
$event
->{
$_
}
if
defined
$event
->{
$_
};
}
push
@users
,
$user
;
}
Future->done(
@users
);
});
}
sub
invite_presence
{
my
$self
=
shift
;
my
(
$remote
) =
@_
;
$self
->_do_POST_json(
"/presence_list/$self->{user_id}"
,
{
invite
=> [
$remote
] }
);
}
sub
drop_presence
{
my
$self
=
shift
;
my
(
$remote
) =
@_
;
$self
->_do_POST_json(
"/presence_list/$self->{user_id}"
,
{
drop
=> [
$remote
] }
);
}
sub
create_room
{
my
$self
=
shift
;
my
(
$room_alias
) =
@_
;
my
$body
= {};
$body
->{room_alias_name} =
$room_alias
if
defined
$room_alias
;
$self
->_do_POST_json(
"/createRoom"
,
$body
)->then(
sub
{
my
(
$content
) =
@_
;
my
$room
=
$self
->_get_or_make_room(
$content
->{room_id} );
$room
->initial_sync
->then_done(
$room
,
$content
->{room_alias} );
});
}
sub
join_room
{
my
$self
=
shift
;
my
(
$room_alias
) =
@_
;
$self
->_do_POST_json(
"/join/$room_alias"
, {} )->then(
sub
{
my
(
$content
) =
@_
;
my
$room_id
=
$content
->{room_id};
if
(
my
$room
=
$self
->{rooms_by_id}{
$room_id
} ) {
return
Future->done(
$room
);
}
else
{
my
$room
=
$self
->_make_room(
$room_id
);
$room
->initial_sync
->then_done(
$room
);
}
});
}
sub
room_list
{
my
$self
=
shift
;
$self
->_do_GET_json(
"/users/$self->{user_id}/rooms/list"
)
->then(
sub
{
my
(
$response
) =
@_
;
Future->done( pp(
$response
) );
});
}
sub
add_alias
{
my
$self
=
shift
;
my
(
$alias
,
$room_id
) =
@_
;
$self
->_do_PUT_json(
"/directory/room/$alias"
,
{
room_id
=>
$room_id
},
)->then_done();
}
sub
delete_alias
{
my
$self
=
shift
;
my
(
$alias
) =
@_
;
$self
->_do_DELETE(
"/directory/room/$alias"
)
->then_done();
}
sub
upload
{
my
$self
=
shift
;
my
%params
=
@_
;
defined
$params
{content_type} or
croak
"Require 'content_type'"
;
defined
$params
{content} or
defined
$params
{file} or
defined
$params
{fh} or
croak
"Require 'content', 'file' or 'fh'"
;
$self
->_do_POST_file(
"/media/v1/upload"
,
%params
)->then(
sub
{
my
(
$content
,
$response
) =
@_
;
Future->done(
$content
->{content_uri} );
});
}
sub
_handle_event_m_presence
{
my
$self
=
shift
;
my
(
$event
) =
@_
;
my
$content
=
$event
->{content};
my
$user
=
$self
->_get_or_make_user(
$content
->{user_id} );
my
%changes
;
foreach
(
qw( presence displayname )
) {
next
unless
defined
$content
->{
$_
};
next
if
defined
$user
->
$_
and
$content
->{
$_
} eq
$user
->
$_
;
$changes
{
$_
} = [
$user
->
$_
,
$content
->{
$_
} ];
$user
->
$_
=
$content
->{
$_
};
}
if
(
defined
$content
->{last_active_ago} ) {
my
$new_last_active
=
time
() - (
$content
->{last_active_ago} / 1000 );
$changes
{last_active} = [
$user
->last_active,
$new_last_active
];
$user
->last_active =
$new_last_active
;
}
$self
->maybe_invoke_event(
on_presence
=>
$user
,
%changes
);
foreach
my
$room
(
values
%{
$self
->{rooms_by_id} } ) {
$room
->_handle_event_m_presence(
$user
,
%changes
);
}
}
sub
_handle_event_m_room
{
my
$self
=
shift
;
my
$event
=
pop
;
my
@type_parts
=
@_
;
my
$handler
;
if
(
$handler
=
$self
->{rooms_by_id}{
$event
->{room_id}} ) {
}
elsif
(
$event
->{state_key} eq
$self
->{user_id} ) {
$handler
=
$self
;
}
else
{
$self
->
log
(
"TODO: Room event on unknown room ID $event->{room_id} not about myself"
);
return
;
}
my
@subtype_parts
;
while
(
@type_parts
) {
if
(
my
$code
=
$handler
->can(
"_handle_roomevent_"
.
join
"_"
,
@type_parts
,
"forward"
) ) {
$handler
->await_synced->on_done(
sub
{
$code
->(
$handler
,
@subtype_parts
,
$event
);
});
return
;
}
unshift
@subtype_parts
,
pop
@type_parts
;
}
$self
->
log
(
"Unhandled room event "
.
join
(
"_"
,
@subtype_parts
) .
"\n"
.
pp(
$event
) );
}
sub
_handle_event_m_typing
{
my
$self
=
shift
;
my
$event
=
pop
;
if
(
my
$room
=
$self
->{rooms_by_id}{
$event
->{room_id}} ) {
$room
->_handle_event_m_typing(
$event
);
}
}
sub
_handle_roomevent_member_forward
{
my
$self
=
shift
;
my
$event
=
pop
;
my
$content
=
$event
->{content};
my
$membership
=
$content
->{membership};
if
(
$membership
eq
"join"
) {
my
$room
=
$self
->_get_or_make_room(
$event
->{room_id} );
$self
->adopt_future(
$room
->initial_sync
);
}
elsif
(
$membership
eq
"invite"
) {
$self
->maybe_invoke_event(
on_invite
=>
$event
);
}
else
{
$self
->
log
(
"Unhandled selfroom event member membership=$membership"
);
}
}
0x55AA;