File Coverage

blib/lib/POE/Component/Client/Bayeux.pm
Criterion Covered Total %
statement 42 181 23.2
branch 0 62 0.0
condition 0 53 0.0
subroutine 14 31 45.1
pod 7 17 41.1
total 63 344 18.3


line stmt bran cond sub pod time code
1             package POE::Component::Client::Bayeux;
2              
3             =head1 NAME
4              
5             POE::Component::Client::Bayeux - Bayeux/cometd client implementation in POE
6              
7             =head1 SYNOPSIS
8              
9             use POE qw(Component::Client::Bayeux);
10              
11             POE::Component::Client::Bayeux->spawn(
12             Host => '127.0.0.1',
13             Alias => 'comet',
14             );
15              
16             POE::Session->create(
17             inline_states => {
18             _start => sub {
19             my ($kernel, $heap) = @_[KERNEL, HEAP];
20             $kernel->alias_set('my_client');
21              
22             $kernel->post('comet', 'init');
23             $kernel->post('comet', 'subscribe', '/chat/demo', 'events');
24             $kernel->post('comet', 'publish', '/chat/demo', {
25             user => "POE",
26             chat => "POE has joined",
27             join => JSON::XS::true,
28             });
29             },
30             events => sub {
31             my ($kernel, $heap, $message) = @_[KERNEL, HEAP, ARG0];
32              
33             print STDERR "Client got subscribed message:\n" . Dumper($message);
34             },
35             },
36             );
37              
38             $poe_kernel->run();
39              
40             =head1 DESCRIPTION
41              
42             This module implements the Bayeux Protocol (1.0draft1) from the Dojo Foundation.
43             Also called cometd, Bayeux is a low-latency routing protocol for JSON encoded
44             events between clients and servers in a publish-subscribe model.
45              
46             This is the client implementation. It is not feature complete, but works at the
47             moment for testing a Bayeux server.
48              
49             =cut
50              
51 3     3   7310 use strict;
  3         8  
  3         111  
52 3     3   19 use warnings;
  3         7  
  3         130  
53 3     3   20 use POE qw(Component::Client::HTTP Component::Client::Bayeux::Transport);
  3         8  
  3         20  
54 3     3   179 use Params::Validate;
  3         6  
  3         189  
55 3     3   18 use Data::Dumper;
  3         4  
  3         136  
56 3     3   19 use JSON::Any;
  3         5  
  3         17  
57 3     3   388 use Data::UUID;
  3         52  
  3         214  
58 3     3   27 use HTTP::Request::Common;
  3         5  
  3         214  
59 3     3   4390 use Log::Log4perl qw(get_logger :levels);
  3         209241  
  3         24  
60 3     3   387 use Log::Log4perl::Appender;
  3         8  
  3         61  
61 3     3   19 use Log::Log4perl::Layout;
  3         7  
  3         78  
62              
63 3     3   18 use POE::Component::Client::Bayeux::Utilities qw(decode_json_response);
  3         8  
  3         218  
64 3     3   2380 use POE::Component::Server::Bayeux::Utilities qw(channel_match);
  3         10  
  3         200  
65              
66 3     3   20 use base qw(Class::Accessor Exporter);
  3         7  
  3         8876  
67             __PACKAGE__->mk_accessors(qw(session clientId logger));
68              
69             our @EXPORT_OK = qw(decode_json_response);
70              
71             my $protocol_version = '1.0';
72             our $VERSION = '0.03';
73              
74             =head1 USAGE
75              
76             =head2 spawn (...)
77              
78             =over 4
79              
80             Create a new Bayeux client. Arguments to this method:
81              
82             =over 4
83              
84             =item I (required)
85              
86             Connect to this host.
87              
88             =item I (default: 80)
89              
90             Connect to this port.
91              
92             =item I (default: 0)
93              
94             Use SSL on connection
95              
96             =item I (default: 'bayeux_client')
97              
98             The POE session alias for local sessions to interact with.
99              
100             =item I (default: 0)
101              
102             Either 0 or 1, indicates level of logging.
103              
104             =item I (default: undef)
105              
106             Logfile to write output to.
107              
108             =item I (default: 1)
109              
110             If false, no logger output to STDOUT.
111              
112             =item I (not implemented)
113              
114             Enables cross domain protocol of messaging.
115              
116             =item I (default: none)
117              
118             Provide a coderef that will receive a message hashref of any failed messages (erorrs in protocol, or simply unhandled messages).
119              
120             =back
121              
122             Returns a class object with methods of interest:
123              
124             =over 4
125              
126             =item I
127              
128             The L object returned from an internal create() call.
129              
130             =back
131              
132             =back
133              
134             =cut
135              
136             sub spawn {
137 0     0 1   my $class = shift;
138 0           my %args = validate(@_, {
139             Host => 1,
140             Port => { default => 80 },
141             Path => { default => '/cometd' },
142             SSL => { default => 0 },
143             Alias => { default => 'bayeux_client' },
144             CrossDomain => { default => 0 },
145             Debug => { default => 0 },
146             ErrorCallback => 0,
147             LogFile => 0,
148             LogStdout => { default => 1 },
149             });
150              
151 0 0         if ($args{CrossDomain}) {
152             # TODO
153 0           die __PACKAGE__ . " doesn't yet support cross domain protocol.\n";
154             }
155              
156 0           my $ua_alias = $args{Alias} . '_ua';
157 0 0         my $cometd_url = sprintf 'http%s://%s:%s%s',
158             ($args{SSL} ? 's' : ''), $args{Host}, $args{Port}, $args{Path};
159              
160 0           POE::Component::Client::HTTP->spawn(
161             Alias => $ua_alias,
162             );
163              
164 0           my $self = bless { %args }, $class;
165              
166 0 0         my $session = POE::Session->create(
167             inline_states => {
168             _start => \&client_start,
169             _stop => \&client_stop,
170             shutdown => \&client_shutdown,
171              
172             # Public methods
173             init => \&init,
174             publish => \&publish,
175             subscribe => \&subscribe,
176             unsubscribe => \&unsubscribe,
177             disconnect => \&disconnect,
178             reconnect => \&reconnect,
179              
180             # Internal
181             handshake => \&handshake,
182             handshake_response => \&handshake_response,
183             send_message => \&send_message,
184             ua_response => \&ua_response,
185             deliver => \&deliver,
186             flush_queue => \&flush_queue,
187             send_transport => \&send_transport,
188             },
189             heap => {
190             args => \%args,
191             ua => $ua_alias,
192             remote_url => $cometd_url,
193             json => JSON::Any->new(),
194             uuid => Data::UUID->new(),
195             subscriptions => {},
196             client => $self,
197             },
198             ($ENV{POE_DEBUG} ? (
199             options => { trace => 1, debug => 1 },
200             ) : ()),
201             );
202              
203             # Setup logger
204 0           my $logger = Log::Log4perl->get_logger('bayeux_client');
205             {
206 0           my $logger_layout = Log::Log4perl::Layout::PatternLayout->new("[\%d] \%p: \%m\%n");
  0            
207 0 0         $logger->level($args{Debug} ? $DEBUG : $INFO);
208              
209 0 0         if ($args{LogFile}) {
210 0           my $file_appender = Log::Log4perl::Appender->new(
211             'Log::Log4perl::Appender::File',
212             name => 'filelog',
213             filename => $args{LogFile},
214             );
215 0           $file_appender->layout( $logger_layout );
216 0           $logger->add_appender($file_appender);
217             }
218 0 0         if ($args{LogStdout}) {
219 0           my $stdout_appender = Log::Log4perl::Appender->new(
220             'Log::Log4perl::Appender::Screen',
221             name => 'screenlog',
222             stderr => 0,
223             );
224 0           $stdout_appender->layout($logger_layout);
225 0           $logger->add_appender($stdout_appender);
226             }
227             }
228              
229 0           $self->{logger} = $logger;
230 0           $self->{session} = $session->ID;
231 0           return $self;
232             }
233              
234             sub client_start {
235 0     0 0   my ($kernel, $heap) = @_[KERNEL, HEAP];
236              
237 0           $kernel->alias_set( $heap->{args}{Alias} );
238              
239 0 0         if ($ENV{POE_DEBUG}) {
240 0           $kernel->alias_resolve($heap->{ua})->option( trace => 1, debug => 1 );
241             }
242             }
243              
244             sub client_stop {
245 0     0 0   my ($kernel, $heap) = @_[KERNEL, HEAP];
246             }
247              
248             sub client_shutdown {
249 0     0 0   my ($kernel, $heap) = @_[KERNEL, HEAP];
250              
251 0           $heap->{_shutdown} = 1;
252              
253 0           $kernel->call( $heap->{ua}, 'shutdown' );
254              
255 0 0         if ($heap->{transport}) {
256 0           $kernel->call( $heap->{transport}, 'shutdown' );
257             }
258              
259 0           $kernel->alias_remove( $heap->{args}{Alias} );
260             }
261              
262             ## Public States ###
263              
264             =head1 POE STATES
265              
266             The following are states you can post to to interact with the client.
267              
268             =head2 init ()
269              
270             =over 4
271              
272             Initializes the client, connecting to the server, and sets up long polling.
273              
274             =back
275              
276             =cut
277              
278             sub init {
279 0     0 1   my ($kernel, $heap) = @_[KERNEL, HEAP];
280              
281 0           $kernel->yield('handshake');
282             }
283              
284              
285             sub handshake {
286 0     0 0   my ($kernel, $heap, %ext) = @_[KERNEL, HEAP, ARG0 .. $#_];
287              
288 0           my %handshake = (
289             channel => '/meta/handshake',
290             version => $protocol_version,
291             minimumVersion => $protocol_version,
292             supportedConnectionTypes => [ 'long-polling' ],
293             ext => {
294             'json-comment-filtered' => 1,
295             %ext,
296             }
297             );
298              
299 0           $kernel->yield('send_message', 'handshake_response', \%handshake);
300              
301             # Unsubscribe from all TODO
302              
303 0           $heap->{_initialized} = 1;
304 0           $heap->{_connected} = 0;
305             }
306              
307             =head2 publish ($channel, $message)
308              
309             =over 4
310              
311             Publishes arbitrary message to the channel given. Message will have 'clientId'
312             and 'id' fields auto-populated.
313              
314             =back
315              
316             =cut
317              
318             sub publish {
319 0     0 1   my ($kernel, $heap, $channel, $message) = @_[KERNEL, HEAP, ARG0, ARG1];
320              
321 0           $kernel->call($_[SESSION], 'send_transport', {
322             channel => $channel,
323             data => $message,
324             });
325             }
326              
327             =head2 subscribe ($channel, $callback)
328              
329             =over 4
330              
331             Subscribes client to the channel given. Callback can either be a coderef or
332             the name of a state in the calling session. Callback will get one arg, the
333             message that was posted to the channel subscribed to.
334              
335             =back
336              
337             =cut
338              
339             sub subscribe {
340 0     0 1   my ($kernel, $heap, $channel, $callback) = @_[KERNEL, HEAP, ARG0, ARG1];
341              
342 0 0 0       return if $heap->{subscriptions}{$channel}
      0        
343             && $heap->{subscriptions}{$channel}{callback} eq $callback
344             && $heap->{subscriptions}{$channel}{session} eq $_[SENDER];
345              
346 0           $heap->{subscriptions}{$channel} = {
347             callback => $callback,
348             session => $_[SENDER],
349             };
350              
351 0           $kernel->call($_[SESSION], 'send_transport', {
352             channel => '/meta/subscribe',
353             subscription => $channel,
354             });
355             }
356              
357             =head2 unsubscribe ($channel)
358              
359             =over 4
360              
361             Unsubscribes from channel.
362              
363             =back
364              
365             =cut
366              
367             sub unsubscribe {
368 0     0 1   my ($kernel, $heap, $channel) = @_[KERNEL, HEAP, ARG0];
369              
370 0           delete $heap->{subscriptions}{$channel};
371              
372 0           $kernel->call($_[SESSION], 'send_transport', {
373             channel => '/meta/unsubscribe',
374             subscription => $channel,
375             });
376             }
377              
378             =head2 disconnect ()
379              
380             =over 4
381              
382             Sends a disconnect request.
383              
384             =back
385              
386             =cut
387              
388             sub disconnect {
389 0     0 1   my ($kernel, $heap) = @_[KERNEL, HEAP];
390              
391 0           $kernel->call($_[SESSION], 'send_transport', {
392             channel => '/meta/disconnect',
393             });
394 0           $heap->{_disconnect} = 1;
395             }
396              
397             =head2 reconnect ()
398              
399             =over 4
400              
401             Disconnect and reconnect
402              
403             =back
404              
405             =cut
406              
407             sub reconnect {
408 0     0 1   my ($kernel, $heap) = @_[KERNEL, HEAP];
409              
410 0           $kernel->call($_[SESSION], 'send_transport', {
411             channel => '/meta/disconnect',
412             });
413 0           $heap->{_reconnect} = 1;
414             }
415              
416             ## Internal Main States ###
417              
418             sub handshake_response {
419 0     0 0   my ($kernel, $heap, $session, $response) = @_[KERNEL, HEAP, SESSION, ARG0];
420              
421 0 0 0       if (! $response || ! ref $response || ! ref $response eq 'HASH') {
      0        
422 0           die "Invalid response from handshake\n";
423             }
424              
425 0 0 0       if ($response->{version} && $protocol_version < $response->{version}) {
426 0           die "Can't connect to server: version $$response{version} is > my supported version $protocol_version\n";
427             }
428              
429 0 0         if (! $response->{successful}) {
430 0           die "Unsuccessful handshake.\n" . Dumper($response);
431             }
432              
433             # Store client id for all future requests
434 0           $heap->{clientId} = $response->{clientId};
435 0           $heap->{client}->clientId( $heap->{clientId} );
436              
437             # Store advice
438 0   0       $heap->{advice} = $response->{advice} || {};
439              
440             # Choose a transport, build it, and ask it to connect
441             # TODO: make sure it's one of the returned supportedConnectionTypes
442              
443 0           $heap->{transport} = POE::Component::Client::Bayeux::Transport->spawn(
444             type => 'long-polling',
445             parent => $session,
446             parent_heap => $heap,
447             );
448              
449 0           $kernel->post($heap->{transport}, 'tunnelInit');
450             }
451              
452             sub deliver {
453 0     0 0   my ($kernel, $heap, $message) = @_[KERNEL, HEAP, ARG0];
454              
455 0 0 0       if (! $message || ! ref $message || ! ref $message eq 'HASH' || ! $message->{channel}) {
      0        
      0        
456 0           die "deliver(): Invalid message\n";
457             }
458              
459             # If the message has an id, see if I have a record of the instigating request
460 0           my $request;
461 0 0         if ($message->{id}) {
462 0           $request = delete $heap->{messages}{ $message->{id} };
463             }
464              
465             # Handle /meta/ channel responses
466 0 0         if (my ($meta_channel) = $message->{channel} =~ m{^/meta/(.+)$}) {
467 0 0         if ($meta_channel eq 'connect') {
468 0 0 0       if ($message->{successful} && ! $heap->{_connected}) {
    0          
469 0           $heap->{_connected} = 1;
470             }
471             elsif (! $heap->{_initialized}) {
472 0           $heap->{_connected} = 0;
473             }
474 0           $kernel->yield('flush_queue');
475 0           return;
476             }
477             }
478              
479             # Publishes to a non-private channel MAY yield a simple successful message. Ignore those.
480 0 0 0       if ($request && $request->{caller_state} eq 'publish'
      0        
      0        
481             && $message->{successful} && $message->{channel} !~ m{^/service/}) {
482 0           return;
483             }
484              
485             # Check if I have a subscription for the channel
486 0           my $matching_subscription;
487 0           foreach my $subscription (keys %{ $heap->{subscriptions} }) {
  0            
488 0 0         next unless channel_match($message->{channel}, $subscription);
489 0           $matching_subscription = $subscription;
490 0           last;
491             }
492              
493             # Call the callback if so for each subscription
494 0 0         if ($matching_subscription) {
495 0           my $sub_details = $heap->{subscriptions}{$matching_subscription};
496 0 0         if ($sub_details->{callback}) {
497 0 0         if (ref $sub_details->{callback}) {
    0          
498 0           $sub_details->{callback}($message, $heap);
499 0           return;
500             }
501             elsif ($_[SESSION] ne $sub_details->{session}) {
502 0           $kernel->post( $sub_details->{session}, $sub_details->{callback}, $message, $heap );
503 0           return;
504             }
505             }
506             }
507              
508             # Call generic callback for all non-successful messages
509 0 0 0       if (defined $message->{successful} && ! $message->{successful} && $heap->{args}{ErrorCallback}) {
      0        
510 0           $heap->{args}{ErrorCallback}($message);
511             }
512              
513 0           $heap->{client}->logger->debug("deliver() couldn't handle message:\n" . Dumper($message));
514             }
515              
516             ## Utilities ###
517              
518             sub send_message {
519 0     0 0   my ($kernel, $heap, $callback_state, @args) = @_[KERNEL, HEAP, ARG0 .. $#_];
520              
521 0           $heap->{client}->logger->debug(" >>> Pre-transport >>>\n" . Dumper(\@args));
522              
523             # Create an HTTP POST request, encoding the args into JSON
524 0           my $request = POST $heap->{remote_url}, [ message => $heap->{json}->encode(\@args) ];
525              
526             # Create a UUID so I can collect meta info about this request
527 0           my $uuid = $heap->{uuid}->create_str();
528 0           $heap->{_ua_requests}{$uuid} = { json_callback => $callback_state };
529              
530             # Send the request to the user agent
531 0           $kernel->post( $heap->{ua}, 'request', 'ua_response', $request, $uuid );
532             }
533              
534             sub ua_response {
535 0     0 0   my ($kernel, $heap, $request_packet, $response_packet) = @_[KERNEL, HEAP, ARG0, ARG1];
536              
537 0           my $request_object = $request_packet->[0];
538 0           my $request_tag = $request_packet->[1]; # from the 'request' post
539 0           my $response_object = $response_packet->[0];
540              
541 0           my $meta = delete $heap->{_ua_requests}{$request_tag};
542 0 0 0       if ($meta && $meta->{json_callback}) {
543 0           my $json;
544 0           eval {
545 0           $json = decode_json_response($response_object);
546             };
547 0 0         if ($@) {
548             # Ignore errors if shutting down
549 0 0         return if $heap->{_shutdown};
550 0           die $@;
551             }
552 0           $heap->{client}->logger->debug("<<< Pre-transport <<<\n" . Dumper($json));
553 0           $kernel->yield( $meta->{json_callback}, @$json );
554             }
555             }
556              
557             sub send_transport {
558 0     0 0   my ($kernel, $heap, $message) = @_[KERNEL, HEAP, ARG0];
559              
560             # Add unique ID to each message
561 0           my $msg_id = ++$heap->{message_id};
562 0           $message->{id} = $msg_id;
563              
564             # Store a copy of this message
565 0           $heap->{messages}{$msg_id} = {
566             %$message,
567             caller_session => $_[SENDER],
568             caller_state => $_[CALLER_STATE],
569             };
570              
571 0 0         if ($heap->{transport}) {
572 0           $kernel->post( $heap->{transport}, 'sendMessages', [ $message ]);
573             }
574             else {
575 0           $heap->{client}->logger->debug("Queueing message ".Dumper($message)." as no active transport");
576 0           push @{ $heap->{message_queue} }, $message;
  0            
577             }
578              
579 0           return $msg_id;
580             }
581              
582             sub flush_queue {
583 0     0 0   my ($kernel, $heap) = @_[KERNEL, HEAP];
584              
585 0 0 0       return unless $heap->{message_queue} && ref $heap->{message_queue} && int @{ $heap->{message_queue} };
  0   0        
586 0 0         return unless $heap->{transport};
587              
588 0           $heap->{client}->logger->debug("Flushing queue to transport");
589              
590 0           $kernel->post($heap->{transport}, 'sendMessages', [ @{ $heap->{message_queue} } ]);
  0            
591              
592 0           $heap->{message_queue} = [];
593             }
594              
595             =head1 TODO
596              
597             Lots of stuff.
598              
599             The code currently implements only the long-polling transport and doesn't yet
600             strictly follow all the directives in the protocol document http://svn.xantus.org/shortbus/trunk/bayeux/bayeux.html
601              
602             =head1 KNOWN BUGS
603              
604             No known bugs, but I'm sure you can find some.
605              
606             =head1 SEE ALSO
607              
608             L, L, L
609              
610             =head1 COPYRIGHT
611              
612             Copyright (c) 2008 Eric Waters and XMission LLC (http://www.xmission.com/).
613             All rights reserved. This program is free software; you can redistribute it
614             and/or modify it under the same terms as Perl itself.
615              
616             The full text of the license can be found in the LICENSE file included with
617             this module.
618              
619             =head1 AUTHOR
620              
621             Eric Waters
622              
623             =cut
624              
625             1;