File Coverage

blib/lib/Mastodon/Listener.pm
Criterion Covered Total %
statement 30 77 38.9
branch 0 8 0.0
condition n/a
subroutine 10 16 62.5
pod 0 4 0.0
total 40 105 38.1


line stmt bran cond sub pod time code
1             package Mastodon::Listener;
2              
3 3     3   16 use strict;
  3         8  
  3         73  
4 3     3   14 use warnings;
  3         6  
  3         110  
5              
6             our $VERSION = '0.012';
7              
8 3     3   15 use Moo;
  3         6  
  3         29  
9             extends 'AnyEvent::Emitter';
10              
11 3     3   1028 use Carp;
  3         7  
  3         155  
12 3     3   17 use Types::Standard qw( Int Str Bool );
  3         6  
  3         31  
13 3     3   2953 use Mastodon::Types qw( Instance to_Status to_Notification );
  3         10  
  3         22  
14 3     3   3607 use AnyEvent::HTTP;
  3         77384  
  3         236  
15 3     3   42 use Try::Tiny;
  3         6  
  3         143  
16 3     3   1495 use JSON::MaybeXS qw( decode_json );
  3         14099  
  3         145  
17              
18 3     3   21 use Log::Any;
  3         6  
  3         27  
19             my $log = Log::Any->get_logger(category => 'Mastodon');
20              
21             has instance => (
22             is => 'ro',
23             isa => Instance,
24             coerce => 1,
25             default => 'mastodon.cloud',
26             );
27              
28             has api_version => (
29             is => 'ro',
30             isa => Int,
31             default => 1,
32             );
33              
34             has url => (
35             is => 'ro',
36             lazy => 1,
37             default => sub {
38             $_[0]->instance
39             . '/api/v' . $_[0]->api_version
40             . '/streaming/' . $_[0]->stream;
41             },
42             );
43              
44             has stream => (
45             is => 'ro',
46             lazy => 1,
47             default => 'public',
48             );
49              
50             has access_token => (
51             is => 'ro',
52             required => 1,
53             );
54              
55             has connection_guard => (
56             is => 'rw',
57             init_arg => undef,
58             );
59              
60             has cv => (
61             is => 'rw',
62             init_arg => undef,
63             lazy => 1,
64             default => sub { AnyEvent->condvar },
65             );
66              
67             has coerce_entities => (
68             is => 'rw',
69             isa => Bool,
70             lazy => 1,
71             default => 0,
72             );
73              
74             sub BUILD {
75 0     0 0   my ($self, $arg) = @_;
76 0           $self->reset;
77             }
78              
79             sub start {
80 0     0 0   return $_[0]->cv->recv;
81             }
82              
83             sub stop {
84 0     0 0   return shift->cv->send(@_);
85             }
86              
87             sub reset {
88 0     0 0   $_[0]->connection_guard($_[0]->_set_connection);
89 0           return $_[0];
90             }
91              
92             around emit => sub {
93             my $orig = shift;
94             my $self = shift;
95              
96             my ($event, $data, @rest) = @_;
97             if ($event =~ /(update|notification)/ and $self->coerce_entities) {
98             $data = to_Notification($data) if $event eq 'notification';
99             $data = to_Status($data) if $event eq 'update';
100             }
101              
102             $self->$orig($event, $data, @rest);
103             };
104              
105             sub _set_connection {
106 0     0     my $self = shift;
107             my $x = http_request GET => $self->url,
108             headers => { Authorization => 'Bearer ' . $self->access_token },
109             handle_params => {
110             max_read_size => 8168,
111             },
112             want_body_handle => 1,
113             sub {
114 0     0     my ($handle, $headers) = @_;
115              
116 0 0         if ($headers->{Status} !~ /^2/) {
117             $self->emit( error => $handle, 1,
118             'Could not connect to ' . $self->url . ': ' . $headers->{Reason}
119 0           );
120 0           $self->stop;
121 0           undef $handle;
122 0           return;
123             }
124              
125 0 0         unless ($handle) {
126 0           $self->emit( error => $handle, 1,
127             'Could not connect to ' . $self->url
128             );
129 0           $self->stop;
130 0           return;
131             }
132              
133 0           my $event_pattern = qr{\s*(:thump|event: (\w+)).*?data:\s*}s;
134 0           my $skip_pattern = qr{\s*}s;
135              
136 0           my $parse_event;
137             $parse_event = sub {
138 0           shift;
139 0           my $chunk = shift;
140 0           my $event = $2;
141              
142 0 0         if (!defined $event) {
    0          
143             # Heartbeats have no data
144 0           $self->emit( 'heartbeat' );
145 0           $handle->push_read( regex =>
146             $event_pattern, undef, $skip_pattern, $parse_event );
147             }
148             elsif ($event eq 'delete') {
149             # The payload for delete is a single integer
150             $handle->push_read( line => sub {
151 0           shift;
152 0           my $line = shift;
153 0           $self->emit( delete => $line );
154 0           $handle->push_read( regex =>
155             $event_pattern, undef, $skip_pattern, $parse_event );
156 0           });
157             }
158             else {
159             # Other events have JSON arrays or objects
160             $handle->push_read( json => sub {
161 0           shift;
162 0           my $json = shift;
163 0           $self->emit( $event => $json );
164 0           $handle->push_read( regex =>
165             $event_pattern, undef, $skip_pattern, $parse_event );
166 0           });
167             }
168 0           };
169              
170             # Push initial reader: look for event name
171             $handle->on_read(sub {
172 0           $handle->push_read( regex => $event_pattern, $parse_event );
173 0           });
174              
175             $handle->on_error(sub {
176 0           undef $handle;
177 0           $self->emit( error => @_ );
178 0           });
179              
180             $handle->on_eof(sub {
181 0           undef $handle;
182 0           $self->emit( eof => @_ );
183 0           });
184              
185 0           };
186 0           return $x;
187             }
188              
189             1;
190              
191             __END__
192              
193             =encoding utf8
194              
195             =head1 NAME
196              
197             Mastodon::Listener - Access the streaming API of a Mastodon server
198              
199             =head1 SYNOPSIS
200              
201             # From Mastodon::Client
202             my $listener = $client->stream( 'public' );
203              
204             # Or use it directly
205             my $listener = Mastodon::Listener->new(
206             url => 'https://mastodon.cloud/api/v1/streaming/public',
207             access_token => $token,
208             coerce_entities => 1,
209             )
210              
211             $listener->on( update => sub {
212             my ($listener, $status) = @_;
213             printf "%s said: %s\n",
214             $status->account->display_name,
215             $status->content;
216             });
217              
218             $listener->start;
219              
220             =head1 DESCRIPTION
221              
222             A Mastodon::Listener object is created by calling the B<stream> method from a
223             L<Mastodon::Client>, and it exists for the sole purpose of parsing a stream of
224             events from a Mastodon server.
225              
226             Mastodon::Listener objects inherit from L<AnyEvent::Emitter>. Please refer to
227             their documentation for details on how to register callbacks for the different
228             events.
229              
230             Once callbacks have been registered, the listener can be set in motion by
231             calling its B<start> method, which takes no arguments and never returns.
232             The B<stop> method can be called from within callbacks to disconnect from the
233             stream.
234              
235             =head1 ATTRIBUTES
236              
237             =over 4
238              
239             =item B<access_token>
240              
241             The OAuth2 access token of your application, if authorization is needed. This
242             is not needed for streaming from public timelines.
243              
244             =item B<api_version>
245              
246             The API version to use. Defaults to C<1>.
247              
248             =item B<coerce_entities>
249              
250             Whether JSON responses should be coerced into Mastodon::Entity objects.
251             Currently defaults to false (but this will likely change in v0.01).
252              
253             =item B<instance>
254              
255             The instance to use, as a L<Mastodon::Entity::Instance> object. Will be coerced
256             from a URL, and defaults to C<mastodon.social>.
257              
258             =item B<stream>
259              
260             The stream to use. Current valid streams are C<public>, C<user>, and tag
261             timelines. To access a tag timeline, the argument to this value should begin
262             with a hash character (C<#>).
263              
264             =item B<url>
265              
266             The full streaming URL to use. By default, it is constructed from the values in
267             the B<instance>, B<api_version>, and B<stream> attributes.
268              
269             =back
270              
271             =head1 EVENTS
272              
273             =over 4
274              
275             =item B<update>
276              
277             A new status has appeared. Callback will be called with the listener and
278             the new status.
279              
280             =item B<notification>
281              
282             A new notification has appeared. Callback will be called with the listener
283             and the new notification.
284              
285             =item B<delete>
286              
287             A status has been deleted. Callback will be called with the listener and the
288             ID of the deleted status.
289              
290             =item B<heartbeat>
291              
292             A new C<:thump> has been received from the server. This is mostly for
293             debugging purposes.
294              
295             =item B<error>
296              
297             Inherited from L<AnyEvent::Emitter>, will be emitted when an error was found.
298             The callback will be called with the same arguments as the B<on_error> callback
299             for L<AnyEvent::Handle>: the handle of the current connection, a fatal flag,
300             and an error message.
301              
302             =back
303              
304             =head1 AUTHOR
305              
306             =over 4
307              
308             =item *
309              
310             José Joaquín Atria <jjatria@cpan.org>
311              
312             =back
313              
314             =head1 COPYRIGHT AND LICENSE
315              
316             This software is copyright (c) 2017 by José Joaquín Atria.
317              
318             This is free software; you can redistribute it and/or modify it under
319             the same terms as the Perl 5 programming language system itself.
320              
321             =cut