File Coverage

blib/lib/Mastodon/Listener.pm
Criterion Covered Total %
statement 30 67 44.7
branch 0 14 0.0
condition n/a
subroutine 10 17 58.8
pod 0 4 0.0
total 40 102 39.2


line stmt bran cond sub pod time code
1             package Mastodon::Listener;
2              
3 5     5   30 use strict;
  5         12  
  5         129  
4 5     5   24 use warnings;
  5         9  
  5         187  
5              
6             our $VERSION = '0.016';
7              
8 5     5   24 use Moo;
  5         9  
  5         36  
9             with 'Role::EventEmitter';
10              
11 5     5   1666 use Types::Standard qw( Int Str Bool );
  5         11  
  5         50  
12 5     5   3533 use Mastodon::Types qw( Instance to_Status to_Notification );
  5         8  
  5         27  
13 5     5   6004 use IO::Async::Loop;
  5         162019  
  5         168  
14 5     5   2942 use Net::Async::HTTP;
  5         313210  
  5         199  
15 5     5   40 use Try::Tiny;
  5         8  
  5         350  
16 5     5   2287 use JSON::MaybeXS qw( decode_json );
  5         25238  
  5         240  
17              
18 5     5   66 use Log::Any;
  5         10  
  5         47  
19             my $log = Log::Any->get_logger(category => 'Mastodon');
20              
21             has instance => (
22             is => 'ro',
23             isa => Instance,
24             coerce => 1,
25             default => 'mastodon.cloud',
26             );
27              
28             has api_version => (
29             is => 'ro',
30             isa => Int,
31             default => 1,
32             );
33              
34             has url => (
35             is => 'ro',
36             lazy => 1,
37             default => sub {
38             $_[0]->instance
39             . '/api/v' . $_[0]->api_version
40             . '/streaming/' . $_[0]->stream;
41             },
42             );
43              
44             has stream => (
45             is => 'ro',
46             lazy => 1,
47             default => 'public',
48             );
49              
50             has access_token => (
51             is => 'ro',
52             required => 1,
53             );
54              
55             has _ua => (
56             is => 'rw',
57             init_arg => undef,
58             default => sub { Net::Async::HTTP->new },
59             );
60              
61             has _future => (
62             is => 'rw',
63             init_arg => undef,
64             lazy => 1,
65             default => sub { Future->new },
66             );
67              
68             has coerce_entities => (
69             is => 'rw',
70             isa => Bool,
71             lazy => 1,
72             default => 0,
73             );
74              
75             sub BUILD {
76 0     0 0   my ($self, $arg) = @_;
77 0           IO::Async::Loop->new->add($self->_ua);
78             }
79              
80             sub start {
81 0     0 0   my $self = shift;
82              
83 0           my $current_event;
84             my @buffer;
85              
86 0     0     my $on_error = sub { $self->emit( error => shift, shift, \@_ ) };
  0            
87              
88             $self->_future(
89             $self->_ua->do_request(
90             uri => $self->url,
91             headers => {
92             Authorization => 'Bearer ' . $self->access_token,
93             },
94 0     0     on_error => sub { $on_error->( 1, shift, \@_ ) },
95             on_header => sub {
96 0     0     my $response = shift;
97 0 0         $on_error->( 1, $response->message, $response )
98             unless $response->is_success;
99              
100             return sub {
101 0           my $chunk = shift;
102 0           push @buffer, split /\n/, $chunk;
103              
104 0           while (my $line = shift @buffer) {
105 0 0         if ($line =~ /^(:thump|event: (\w+))$/) {
106 0           my $event = $2;
107              
108 0 0         if (!defined $event) {
109             # Heartbeats have no data
110 0           $self->emit( 'heartbeat' );
111 0           next;
112             }
113             else {
114 0           $current_event = $event;
115             }
116             }
117              
118 0 0         return unless $current_event;
119 0 0         return unless @buffer;
120              
121 0           my $data = shift @buffer;
122 0           $data =~ s/^data:\s+//;
123              
124 0 0         if ($current_event eq 'delete') {
125             # The payload for delete is a single integer
126 0           $self->emit( delete => $data );
127             }
128             else {
129             # Other events have JSON arrays or objects
130             try {
131 0           my $payload = decode_json $data;
132 0           $self->emit( $current_event => $payload );
133             }
134             catch {
135 0           $self->emit( error => 0,
136             "Error decoding JSON payload: $_", $data
137             );
138 0           };
139             }
140              
141 0           $current_event = undef;
142             }
143             }
144 0           },
145             )
146 0           );
147              
148 0           $self->_future->get;
149             }
150              
151             sub stop {
152 0     0 0   my $self = shift;
153 0 0         $self->_future->done(@_) unless $self->_future->is_ready;
154 0           return $self;
155             }
156              
157             sub reset {
158 0     0 0   my $self = shift;
159 0           $self->stop->start;
160             }
161              
162             around emit => sub {
163             my $orig = shift;
164             my $self = shift;
165              
166             my ($event, $data, @rest) = @_;
167             if ($event =~ /(update|notification)/ and $self->coerce_entities) {
168             $data = to_Notification($data) if $event eq 'notification';
169             $data = to_Status($data) if $event eq 'update';
170             }
171              
172             $self->$orig($event, $data, @rest);
173             };
174              
175             1;
176              
177             __END__
178              
179             =encoding utf8
180              
181             =head1 NAME
182              
183             Mastodon::Listener - Access the streaming API of a Mastodon server
184              
185             =head1 SYNOPSIS
186              
187             # From Mastodon::Client
188             my $listener = $client->stream( 'public' );
189              
190             # Or use it directly
191             my $listener = Mastodon::Listener->new(
192             url => 'https://mastodon.cloud/api/v1/streaming/public',
193             access_token => $token,
194             coerce_entities => 1,
195             )
196              
197             $listener->on( update => sub {
198             my ($listener, $status) = @_;
199             printf "%s said: %s\n",
200             $status->account->display_name,
201             $status->content;
202             });
203              
204             $listener->start;
205              
206             =head1 DESCRIPTION
207              
208             A Mastodon::Listener object is created by calling the B<stream> method from a
209             L<Mastodon::Client>, and it exists for the sole purpose of parsing a stream of
210             events from a Mastodon server.
211              
212             Mastodon::Listener objects inherit from L<AnyEvent::Emitter>. Please refer to
213             their documentation for details on how to register callbacks for the different
214             events.
215              
216             Once callbacks have been registered, the listener can be set in motion by
217             calling its B<start> method, which takes no arguments and never returns.
218             The B<stop> method can be called from within callbacks to disconnect from the
219             stream.
220              
221             =head1 ATTRIBUTES
222              
223             =over 4
224              
225             =item B<access_token>
226              
227             The OAuth2 access token of your application, if authorization is needed. This
228             is not needed for streaming from public timelines.
229              
230             =item B<api_version>
231              
232             The API version to use. Defaults to C<1>.
233              
234             =item B<coerce_entities>
235              
236             Whether JSON responses should be coerced into Mastodon::Entity objects.
237             Currently defaults to false (but this will likely change in v0.01).
238              
239             =item B<instance>
240              
241             The instance to use, as a L<Mastodon::Entity::Instance> object. Will be coerced
242             from a URL, and defaults to C<mastodon.social>.
243              
244             =item B<stream>
245              
246             The stream to use. Current valid streams are C<public>, C<user>, and tag
247             timelines. To access a tag timeline, the argument to this value should begin
248             with a hash character (C<#>).
249              
250             =item B<url>
251              
252             The full streaming URL to use. By default, it is constructed from the values in
253             the B<instance>, B<api_version>, and B<stream> attributes.
254              
255             =back
256              
257             =head1 EVENTS
258              
259             =over 4
260              
261             =item B<update>
262              
263             A new status has appeared. Callback will be called with the listener and
264             the new status.
265              
266             =item B<notification>
267              
268             A new notification has appeared. Callback will be called with the listener
269             and the new notification.
270              
271             =item B<delete>
272              
273             A status has been deleted. Callback will be called with the listener and the
274             ID of the deleted status.
275              
276             =item B<heartbeat>
277              
278             A new C<:thump> has been received from the server. This is mostly for
279             debugging purposes.
280              
281             =item B<error>
282              
283             Inherited from L<Role::EventEmitter>, will be emitted when an error was found.
284             The callback will be called with a fatal flag, an error message, and any
285             relevant data as a single third arghument.
286              
287             If the error event is triggered in response to a 4xx or 5xx error, the data
288             payload will be an array reference with the response and request objects
289             as received from L<Net::Async::HTTP>.
290              
291             =back
292              
293             =head1 AUTHOR
294              
295             =over 4
296              
297             =item *
298              
299             José Joaquín Atria <jjatria@cpan.org>
300              
301             =back
302              
303             =head1 COPYRIGHT AND LICENSE
304              
305             This software is copyright (c) 2017 by José Joaquín Atria.
306              
307             This is free software; you can redistribute it and/or modify it under
308             the same terms as the Perl 5 programming language system itself.
309              
310             =cut