File Coverage

blib/lib/Net/MessageBus.pm
Criterion Covered Total %
statement 110 122 90.1
branch 29 40 72.5
condition 9 19 47.3
subroutine 22 22 100.0
pod 14 14 100.0
total 184 217 84.7


line stmt bran cond sub pod time code
1             package Net::MessageBus;
2              
3 2     2   30052 use 5.006;
  2         10  
  2         98  
4 2     2   13 use strict;
  2         6  
  2         75  
5 2     2   12 use warnings;
  2         4  
  2         152  
6              
7             =head1 NAME
8              
9             Net::MessageBus - Pure Perl simple message bus
10              
11             =head1 VERSION
12              
13             Version 0.08
14              
15             =cut
16              
17             our $VERSION = '0.08';
18              
19 2     2   21 use base 'Net::MessageBus::Base';
  2         5  
  2         737  
20              
21 2     2   1363 use Net::MessageBus::Message;
  2         6  
  2         16  
22              
23 2     2   5055 use IO::Socket::INET;
  2         68529  
  2         17  
24 2     2   4395 use IO::Select;
  2         4500  
  2         113  
25 2     2   2391 use JSON;
  2         34392  
  2         13  
26              
27             $| = 1;
28              
29             =head1 SYNOPSIS
30              
31             This module implements the client side of the Message Bus.
32              
33             use Net::MessageBus;
34             my $MessageBus = Net::MessageBus->new(
35             server => '127.0.0.1',
36             group => 'backend',
37             sender => 'machine1',
38             username => 'user',
39             password => 'password',
40             logger => $logger_object,
41             blocking => 0,
42             timeout => 0.01
43             );
44              
45             On initialization the client authenticates with the Net::MessageBus::Server
46             after which it can start pushing messages to the bus.
47              
48             In order to receive any messages from the bus the client must subscribe to :
49              
50             =over 4
51              
52             =item * one or more groups
53              
54             =item * one or more senders
55              
56             =item * one or more message types
57              
58             =item * all messages
59              
60             =back
61            
62             #each can be called multiple times
63             $MessageBus->subscribe(group => 'test');
64             $MessageBus->subscribe(sender => 'test_process_1');
65             $MessageBus->subscribe(type => 'test_message_type');
66            
67             or
68            
69             $MessageBus->subscribe_to_all();
70            
71             The client can unsubscribe at any time by calling the C method
72              
73             $MessageBus->unsubscribe();
74            
75            
76             To retrive the messages received from the bus, the client can call one of this
77             methods :
78              
79             my @messages = $MessageBus->pending_messages();
80            
81             or
82            
83             my $message = $MessageBus->next_message();
84            
85              
86             =head1 EXAMPLE
87              
88             use Net::MessageBus;
89              
90             my $MessageBus = Net::MessageBus->new(server => '127.0.0.1',
91             group => 'backend',
92             sender => 'machine1');
93            
94             $MessageBus->subscribe_to_all();
95             or
96             $MessageBus->subscribe(group => 'test');
97             $MessageBus->subscribe(sender => 'test_process_1');
98             ...
99            
100             my @messages = $MessageBus->pending_messages();
101             or
102             while (my $message = $MessageBus->next_message()) {
103             print $message->type();
104             }
105              
106             =head1 SUBROUTINES/METHODS
107              
108             =head2 new
109              
110             Creates a new New::MessageBus object
111            
112             B
113              
114             =over 10
115              
116             =item * server = The ip address of the server
117              
118             =item * port = The port on which the server is listening for connections
119              
120             =item * group = The group to which this client belogs to
121              
122             =item * sender = A name for the current client
123              
124             =item * username = User name that will be sent to the server for authentication
125              
126             =item * password = The password that will be sent to the server for authentication
127              
128             =item * logger = A object on which we can call the fallowing methods C
129              
130             =item * block = if we don't have any unread messages from the server we will
131             block until the server sends something. If I is true I will be ignored.
132              
133             =item * timeout = the maximum ammount of time we should wait for a message from the server
134             before returning C for C or an empty list for C
135              
136             =back
137              
138             B
139              
140             my $MessageBus = Net::MessageBus->new(
141             server => '127.0.0.1',
142             group => 'backend',
143             sender => 'machine1',
144             username => 'user',
145             password => 'password',
146             logger => $logger_object,
147             );
148              
149             =cut
150             sub new {
151 20     20 1 24626 my $class = shift;
152            
153 20         63 my %params;
154 20 50 50     167 if ((ref($_[0]) || '') eq "HASH") {
155 0         0 %params = %{$_[0]};
  0         0  
156             }
157             else {
158 20         176 %params = @_;
159             }
160            
161 20 100 50     320 my $self = {
    100 50        
    100 33        
162             server_address => $params{server} || '127.0.0.1',
163             server_port => $params{port} || '4500',
164             logger => $params{logger} || Net::MessageBus::Base::create_default_logger(),
165             group => $params{group},
166             sender => $params{sender},
167             username => $params{username},
168             password => $params{password},
169             timeout => defined($params{timeout}) ? $params{timeout} : 0.01,
170             blocking => defined($params{blocking}) ? $params{blocking} :
171             defined($params{timeout}) ? 0 : 1,
172             msgqueue => [],
173             buffer => '',
174             };
175            
176 20         309 bless $self, $class;
177            
178 20         95 $self->connect_to_server();
179            
180 20         148 return $self;
181             }
182              
183              
184             =head2 subscribe
185              
186             Subscribes the current Net::MessageBus client to the messages from the
187             specified category. It can be called multiple times
188              
189             B :
190              
191             $MessageBus->subscribe(group => 'test');
192             or
193             $MessageBus->subscribe(sender => 'test_process_1');
194            
195             =cut
196             sub subscribe {
197 8     8 1 44 my $self = shift;
198            
199 8         51 return $self->send_to_server('subscribe',{ @_ } );
200             }
201              
202              
203             =head2 subscribe_all
204              
205             Subscribes the current Net::MessageBus client to all the messages
206             the server receives
207              
208             B :
209              
210             $MessageBus->subscribe_all;
211            
212             =cut
213             sub subscribe_all {
214 2     2 1 15 my $self = shift;
215            
216 2         30 return $self->send_to_server('subscribe',{ all => 1 } );
217             }
218              
219              
220             =head2 unsubscribe
221              
222             Unsubscribes current Net::MessageBus client from all the messages it
223             previously subscribed to
224              
225             B :
226              
227             $MessageBus->unsubscribe();
228            
229             =cut
230             sub unsubscribe {
231 1     1 1 4 my $self = shift;
232            
233 1         7 return $self->send_to_server('subscribe',{ unsubscribe => 1 } );
234             }
235              
236             =head2 send
237              
238             Send a new messge to the message queue.
239             It has two forms in which it can be called :
240              
241             =over 4
242              
243             =item 1. With a Net::MessageBus::Message object as argument
244              
245             =item 2. With a hash ref containing the fallowing two keys :
246              
247             =back
248              
249             =over 8
250            
251             =item * type = The message type
252              
253             =item * payload = The actual information we want to send with the message.
254             It can be a scalar, array ref or hash ref and it cannot
255             contain any objects
256              
257             =back
258              
259             B :
260              
261             $MessageBus->send( $message ); #message must be a Net::MessageBus::Message object
262             or
263             $MessageBus->send( type => 'alert', payload => { a => 1, b => 2 } );
264              
265             =cut
266              
267             sub send {
268 207     207 1 116494 my $self = shift;
269            
270 207         271 my $message;
271            
272 207 50       716 if (ref($_[0]) eq "Net::MessageBus::Message") {
    50          
273 0         0 $message = $_[0];
274             }
275             elsif (ref($_[0]) eq "HASH") {
276 0         0 $message = Net::MessageBus::Message->new({ sender => $self->{sender},
277             group => $self->{group},
278 0         0 %{$_[0]},
279             });
280             }
281             else {
282 207         2549 $message = Net::MessageBus::Message->new({ sender => $self->{sender},
283             group => $self->{group},
284             @_,
285             });
286             }
287            
288 207         1323 return $self->send_to_server(message => $message);
289             }
290              
291              
292             =head2 next_message
293              
294             Returns the next message from the queue of messages we received from the
295             server. The message is a Net::MessageBus::Message object.
296            
297             =cut
298             sub next_message {
299 206     206 1 1192352 my $self = shift;
300 206         398 my %params = @_;
301            
302 206 100       335 if (! scalar(@{$self->{msgqueue}})) {
  206         659  
303 8         27 $self->read_server_messages();
304             }
305            
306 206         262 return shift @{$self->{msgqueue}};
  206         664  
307             }
308              
309              
310             =head2 pending_messages
311              
312             Returns all the messages received until now from the server. Each message is
313             a Net::MessageBus::Message object.
314              
315             Argumens :
316              
317             =over 4
318              
319             =item * force_read_queue = forces a read of everyting the server might have sent
320             and we have't processed yet
321            
322             Note: Forcing a read of the message queue when I mode is on will block the
323             call until we received something from the server
324              
325             =back
326            
327             =cut
328             sub pending_messages {
329 3     3 1 6000667 my $self = shift;
330 3         14 my %params = @_;
331            
332 3 50 33     6 if (! scalar(@{$self->{msgqueue}}) || $params{force_read_queue} ) {
  3         26  
333 3         12 $self->read_server_messages();
334             }
335            
336 3         10 my @messages = @{$self->{msgqueue}};
  3         35  
337 3         10 $self->{msgqueue} = [];
338            
339 3         45 return @messages;
340             }
341              
342              
343              
344             =head2 blocking
345              
346             Getter/Setter for the I setting of the client. If set to true, when waiting for server
347             messages, the client will block until it receives something
348              
349             Examples :
350             my $blocking = $MessageBus->blocking();
351             or
352             $MessageBus->blocking(1);
353            
354             =cut
355             sub blocking {
356 3     3 1 8 my $self = shift;
357            
358 3 100       12 if (defined $_[0]) {
359 1         5 $self->{blocking} = !!$_[0];
360             }
361 3         15 return $self->{blocking};
362             }
363              
364             =head2 timeout
365              
366             Getter/Setter for the timeout when waiting for server messages.
367             It can have subunitary value (eg. 0.01).
368              
369             I : When I is set to a true value, the timeout is ignored
370             I : When I is set to 0 the effect is the same as setting I to a true value.
371              
372             Example :
373             my $timeout = $MessageBus->timeout();
374             or
375             $MessageBus->timeout(0.01);
376            
377             =cut
378             sub timeout {
379 3     3 1 11 my $self = shift;
380            
381 3 100       13 if (defined $_[0]) {
382 1 50       10 die "Invalid timeout specified" unless $_[0] =~ /^\d+(?:\.\d+)?$/;
383 1         4 $self->{timeout} = $_[0];
384             }
385 3         20 return $self->{timeout};
386             }
387              
388              
389              
390             =head1 Private methods
391              
392             B
393              
394             =head2 connect_to_server
395              
396             Creates a connection to the Net::MessageBus server and authenticates the user
397              
398             =cut
399              
400             sub connect_to_server {
401 20     20 1 40 my $self = shift;
402            
403 20   50     339 $self->{server_socket} = IO::Socket::INET->new(
404             PeerHost => $self->{server_address},
405             PeerPort => $self->{server_port},
406             Proto => 'tcp',
407             Timeout => 1,
408             ReuseAddr => 1,
409             Blocking => 1,
410             ) || die "Cannot connect to Net::MessageBus server";
411            
412 20         28772 $self->{server_sel} = IO::Select->new($self->{server_socket});
413            
414 20 50       1440 $self->authenticate() || die "Authentication failed";
415             }
416              
417              
418              
419             =head2 send_to_server
420              
421             Handles the actual comunication with the server
422              
423             =cut
424             sub send_to_server {
425 238     238 1 521 my $self = shift;
426 238         575 my ($type,$object) = @_;
427            
428 238 100       1451 if (ref($object) eq "Net::MessageBus::Message") {
429 207         627 $object = $object->serialize();
430             }
431            
432 238         8289 local $\ = "\n";
433 238         1008 local $/ = "\n";
434            
435 238         601 my $socket = $self->{server_socket};
436            
437 238         361 eval {
438 238         1351 print $socket to_json( {type => $type, payload => $object} );
439             };
440            
441 238 50       154558 if ($@) {
442 0         0 $self->logger->error("Message could not be sent! : $@");
443 0         0 return 0;
444             }
445            
446 238         759 my $response = $self->get_response();
447            
448 238 50       3415 if (! $response->{status}) {
449 0         0 $self->logger->error('Error received from server: '.$response->{status_message});
450 0         0 return 0;
451             }
452            
453 238         10357 return 1;
454             }
455              
456              
457             =head2 authenticate
458              
459             Sends a authenication request to the server and waits for the response
460            
461             =cut
462             sub authenticate {
463 20     20 1 40 my $self = shift;
464            
465 20         121 return $self->send_to_server('authenticate',
466             {
467             username => $self->{username},
468             password => $self->{password},
469             }
470             );
471             }
472              
473             =head2 get_response
474              
475             Returns the response received from the server for the last request
476              
477             =cut
478             sub get_response {
479 238     238 1 626 my $self = shift;
480            
481 238         966 while (! defined $self->{response}) {
482 238         582 $self->read_server_messages();
483             }
484            
485 238         811 return delete $self->{response};
486             }
487              
488              
489             =head2 read_server_messages
490              
491             Reads all the messages received from the server and adds the to the internal
492             message queue
493              
494             =cut
495             sub read_server_messages {
496 249     249 1 336 my $self = shift;
497            
498 249         1151 local $/ = "\n";
499 249         845 local $\ = "\n";
500            
501 249 100 50     910 my $timeout = $self->{blocking} ? undef : ($self->{timeout} || 0.01);
502            
503 249         391 while (1) {
504              
505 500         1986 my @ready = $self->{server_sel}->can_read( $timeout );
506 500 100       4234179 last unless (scalar(@ready));
507            
508 251         346 my $buffer;
509            
510 251 50       3743 if ( sysread($ready[0],$buffer,8192) ) {
511            
512 251         1221 $self->{buffer} .= $buffer;
513            
514 251         3516 while ( $self->{buffer} =~ s/(.*?)\n+// ) {
515            
516 643         8011 my $text = $1;
517            
518 643         2036 my $data = from_json($text);
519            
520 643 100 66     14792 if (defined $data->{type} && $data->{type} eq 'message') {
521 405         518 push @{$self->{msgqueue}}, Net::MessageBus::Message->new($data->{payload});
  405         1729  
522 405         1368 $self->logger->debug('Received : '.$text);
523             }
524             else {
525 238         1307 $self->{response} = $data;
526             }
527             }
528             }
529             else {
530 0 0       0 if ($self->{auto_reconnect}) {
531 0         0 $self->connect_to_server();
532             }
533             else {
534 0         0 die "Net::MessageBus server closed the connection";
535             }
536             }
537            
538 251         727 $timeout = 0;
539             }
540             }
541              
542              
543             =head1 SEE ALSO
544              
545             Check out L which implements the server of the MessageBus and
546             L which is the OO inteface for the messages passwed between the client and the server
547              
548              
549             =head1 AUTHOR
550              
551             Horea Gligan, C<< >>
552              
553             =head1 BUGS
554              
555             Please report any bugs or feature requests to C, or through
556             the web interface at L. I will be notified, and then you'll
557             automatically be notified of progress on your bug as I make changes.
558              
559              
560              
561              
562             =head1 SUPPORT
563              
564             You can find documentation for this module with the perldoc command.
565              
566             perldoc Net::MessageBus
567              
568              
569             You can also look for information at:
570              
571             =over 4
572              
573             =item * RT: CPAN's request tracker (report bugs here)
574              
575             L
576              
577             =item * AnnoCPAN: Annotated CPAN documentation
578              
579             L
580              
581             =item * CPAN Ratings
582              
583             L
584              
585             =item * Search CPAN
586              
587             L
588              
589             =back
590              
591              
592             =head1 ACKNOWLEDGEMENTS
593              
594             Thanks to Manol Roujinov for helping to improve this module
595              
596             =head1 LICENSE AND COPYRIGHT
597              
598             Copyright 2012 Horea Gligan.
599              
600             This program is free software; you can redistribute it and/or modify it
601             under the terms of either: the GNU General Public License as published
602             by the Free Software Foundation; or the Artistic License.
603              
604             See http://dev.perl.org/licenses/ for more information.
605              
606              
607             =cut
608              
609             1; # End of Net::MessageBus