File Coverage

blib/lib/Net/MessageBus/Server.pm
Criterion Covered Total %
statement 58 195 29.7
branch 8 50 16.0
condition 9 32 28.1
subroutine 14 23 60.8
pod 11 11 100.0
total 100 311 32.1


line stmt bran cond sub pod time code
1             package Net::MessageBus::Server;
2              
3 3     3   33357 use 5.006;
  3         14  
  3         140  
4 3     3   18 use strict;
  3         7  
  3         106  
5 3     3   19 use warnings;
  3         7  
  3         193  
6              
7             =head1 NAME
8              
9             Net::MessageBus::Server - Pure Perl message bus server
10              
11             =head1 VERSION
12              
13             Version 0.08
14              
15             =cut
16              
17             our $VERSION = '0.08';
18              
19 3     3   18 use base qw(Net::MessageBus::Base);
  3         6  
  3         858  
20              
21 3     3   1301 use JSON;
  3         20763  
  3         21  
22 3     3   1580 use IO::Select;
  3         1609  
  3         160  
23 3     3   1123 use IO::Socket::INET;
  3         29731  
  3         34  
24              
25 3     3   3162 use Net::MessageBus::Message;
  3         10  
  3         29  
26              
27             #handle gracefully the death of child ssh processes
28 3     3   3265 use POSIX ":sys_wait_h";
  3         46683  
  3         25  
29              
30             $| = 1;
31              
32             =head1 SYNOPSIS
33              
34             This module creates a new Net::MessageBus server running on the specified address/port
35              
36             Usage :
37              
38             use Net::MessageBus::Server;
39              
40             my $MBServer = Net::MessageBus::Server->new(
41             address => '127.0.0.1',
42             port => '15000',
43             logger => $logger,
44             authenticate => \&authenticate_method,
45             );
46            
47             $MBServer->start();
48            
49             or
50            
51             $MBServer->daemon() || die "Fork to start Net::MessageBus::Server in background failed!"
52             ...
53             if ( $MBServer->is_running() ) {
54             print "Server is alive";
55             }
56             ...
57             $MBServer->stop(); #if started as a daemon.
58            
59              
60             =head1 SUBROUTINES/METHODS
61              
62             =head2 new
63              
64             Creates a new server object.
65             It does not automatically start the server, you have to start it using the
66             start() method.
67            
68             Arguments :
69            
70             =over 4
71              
72             =item * address =
73             The address on which the server should bind , 127.0.0.1 by dafault
74              
75             =item * port =
76             The port on which the server should listen , 4500 by default
77            
78             =item * logger
79             Any object that supports the fallowing methods : debug, info, warn,error
80            
81             =item * authenticate =
82             A code ref to a method that returns true if the authentication is
83             successfull and false otherwise
84              
85             =back
86              
87             B
88            
89             my $MBServer = Net::MessageBus::Server->new(
90             address => '127.0.0.1',
91             port => '15000',
92             logger => $logger,
93             authenticate => \&authenticate_method,
94             );
95            
96              
97             B :
98              
99             sub authenticate_method {
100             my ($username, $password, $client_ip) = @_;
101            
102             return 1 if ($username eq "john" && $password eq "1234");
103             return 0;
104             }
105              
106              
107             =cut
108             sub new {
109 2     2 1 48 my $class = shift;
110            
111 2         6 my %params;
112 2 50 50     25 if ((ref($_[0]) || '') eq "HASH") {
113 0         0 %params = %{$_[0]};
  0         0  
114             }
115             else {
116 2         6 %params = @_;
117             }
118            
119 0     0   0 my $self = {
120             address => $params{address} || '127.0.0.1',
121             port => $params{port} || '4500',
122             logger => $params{logger} || Net::MessageBus::Base::create_default_logger(),
123             authenticate => $params{autenticate} || sub {return 1},
124 2   50     43 };
      50        
      33        
      50        
125            
126 2         15 $self->{subscriptions} = {
127             all => [],
128             groups => {},
129             senders => {},
130             };
131            
132 2         8 $self->{authenticated} = {};
133            
134 2         9 bless $self, $class;
135            
136 2         8 return $self;
137             }
138              
139              
140             =head2 start
141              
142             Starts the server
143              
144             =cut
145             sub start {
146 0     0 1 0 my $self = shift;
147            
148 0         0 $self->{server_socket} = $self->create_server_socket();
149            
150 0         0 my $server_sel = IO::Select->new($self->{server_socket});
151            
152 0         0 $self->{run} = 1;
153            
154 0         0 while ($self->{run} == 1) {
155              
156 0         0 my @exceptions = $server_sel->has_exception(0);
157 0         0 foreach my $broken_socket (@exceptions) {
158 0         0 eval {
159 0         0 $server_sel->remove($broken_socket);
160 0         0 close($broken_socket);
161             };
162             }
163            
164 0         0 my @ready = $server_sel->can_read();
165            
166 0 0       0 next unless scalar(@ready);
167            
168 0         0 foreach my $fh (@ready) {
169            
170 0 0       0 if( $fh == $self->{server_socket} ) {
171             # Accept the incoming socket.
172 0         0 my $new = $fh->accept;
173            
174 0 0       0 next unless $new; #in case the ssl connection failed
175            
176 0         0 my $straddr = $self->get_peer_address($new);
177            
178 0         0 $self->logger->info("Accepted from : $straddr\n");
179            
180 0         0 $server_sel->add($new);
181            
182             } else {
183             # Process socket
184 0         0 local $\ = "\n";
185 0         0 local $/ = "\n";
186            
187 0         0 my $text = readline($fh);
188            
189 0         0 my $straddr = $self->get_peer_address($fh);
190              
191 0 0       0 if ($text) {
192            
193 0         0 chomp($text);
194            
195 0         0 $self->{client_socket} = $fh;
196              
197 0         0 $self->logger->debug("Request from $straddr : '$text'");
198            
199 0         0 my $request;
200 0         0 eval {
201 0         0 $request = from_json($text);
202             };
203            
204 0 0       0 if ($@) {
    0          
    0          
    0          
205 0         0 print $fh to_json({status => 0, status_message => $@ });
206             }
207             elsif ($request->{type} eq "message") {
208            
209 0         0 print $fh to_json({status => 1});
210            
211 0         0 my $message = Net::MessageBus::Message->new($request->{payload});
212            
213 0         0 $self->send_message($message);
214             }
215             elsif ($request->{type} eq "authenticate") {
216            
217 0         0 my %data = %{$request->{payload}};
  0         0  
218            
219 0         0 my $auth = $self->{authenticate}->(
220             @data{qw/username password/},
221             $self->get_peer_address($fh)
222             );
223            
224 0         0 $self->{authenticated}->{$fh} = $auth;
225            
226 0         0 print $fh to_json({status => $auth});
227             }
228             elsif ($request->{type} eq "subscribe") {
229            
230 0         0 $self->subscribe_client($request->{payload});
231              
232 0         0 print $fh to_json({status => 1});
233             }
234             else {
235 0         0 print $fh to_json({status => 0, status_message => 'Invalid request!'});
236             }
237            
238            
239             }
240             else {
241 0         0 $self->logger->info("Peear $straddr closed connection\n");
242            
243 0         0 $self->unsubscribe_client($fh);
244 0         0 delete $self->{authenticated}->{$fh};
245            
246 0         0 $server_sel->remove($fh);
247 0         0 close ($fh);
248             }
249             }
250             }
251             }
252             }
253              
254             =head2 daemon
255              
256             Starts the server in background
257              
258             =cut
259             sub daemon {
260 2     2 1 836 my $self = shift;
261            
262 2 50 33     25 if ( defined $self->{pid} && kill(0,$self->{pid}) ) {
263 0         0 $self->logger->error('An instance of the server is already running!');
264             }
265            
266             $SIG{CHLD} = sub {
267            
268             # don't change $! and $? outside handler
269 2     2   44 local ( $!, $? );
270            
271 2         92 while ( my $pid = waitpid( -1, WNOHANG ) > 0 ) {
272             #Wait for the child processes to exit
273             }
274 2         28 return 1;
275 2         57 };
276            
277 2         5 my $pid;
278            
279 2 50       2681 if ( $pid = fork() ) {
280 2         107 $self->{pid} = $pid;
281             }
282             else {
283             $SIG{INT} = $SIG{HUP} = sub {
284 0     0   0 $self->{run} = 0;
285 0         0 $self->{server_socket}->close();
286 0         0 };
287 0         0 $self->start();
288 0         0 exit(0);
289             }
290            
291 2         240 return 1;
292             }
293              
294             =head2 stop
295              
296             Stops a previously started daemon
297            
298             =cut
299             sub stop {
300 2     2 1 3080 my $self = shift;
301            
302 2 50 33     84 if (! defined $self->{pid} || ! kill(0,$self->{pid}) ) {
303 0         0 $self->logger->error('No Net::MessageBus::Server is running (pid : '.$self->{pid}.')!');
304 0         0 return 0;
305             }
306            
307 2 50       24 if ($^O =~ /Win/i ) {
308             #signal 15 not delivered while in IO wait on Windows so we have to take drastic measures
309 0         0 kill 9, $self->{pid};
310             }
311             else {
312 2         1998 kill 15, $self->{pid};
313             }
314            
315 2         1002503 sleep 1;
316            
317 2 50       55 if ( kill(0,$self->{pid}) ) {
318 0         0 $self->logger->error('Failed to stop the Net::MessageBus::Server (pid : '.$self->{pid}.')! ');
319 0         0 return 0;
320             }
321            
322 2         9 delete $self->{pid};
323            
324 2         14 return 1;
325             }
326              
327              
328             =head2 is_running
329              
330             Returns true if the server process is running
331            
332             =cut
333             sub is_running {
334 4     4 1 1000218 my $self = shift;
335            
336 4 100 66     147 if (! defined $self->{pid} || ! kill(0,$self->{pid}) ) {
337 2         20 return 0;
338             }
339            
340 2         67 return 1;
341             }
342              
343             =head1 Private methods
344              
345             =head2 create_server_socket
346              
347             Starts the TCP socket that to which the clients will connect
348              
349             =cut
350              
351             sub create_server_socket {
352 0     0 1   my $self = shift;
353            
354 0   0       my $server_sock= IO::Socket::INET->new(
355             LocalHost => $self->{address},
356             LocalPort => $self->{port},
357             Proto => 'tcp',
358             Listen => 10,
359             ReuseAddr => 1,
360             Blocking => 1,
361             ) || die "Cannot listen on ".$self->{address}.
362             ":".$self->{port}.", Error: $!";
363            
364 0           $self->logger->info("$0 server v$VERSION - Listening on ".
365             $self->{address}.":".$self->{port} );
366            
367 0           return $server_sock;
368            
369             }
370              
371              
372             =head2 get_peer_address
373              
374             Returns the ip address for the given connection
375            
376             =cut
377             sub get_peer_address {
378 0     0 1   my ($self, $fh) = @_;
379              
380 0           my $straddr = 'unknown';
381            
382 0           eval {
383 0           my $sockaddr = getpeername($fh);
384            
385 0           my ($port, $iaddr) = sockaddr_in($sockaddr);
386 0           $straddr = inet_ntoa($iaddr);
387             };
388            
389 0           return $straddr;
390             }
391              
392             =head2 subscribe_client
393              
394             Adds the client to the subscription list which he specified
395            
396             =cut
397             sub subscribe_client {
398 0     0 1   my $self = shift;
399 0           my $data = shift;
400            
401 0 0         if (defined $data->{all}) {
    0          
    0          
    0          
    0          
402 0   0       $self->{subscriptions}->{all} ||= [];
403 0           push @{$self->{subscriptions}->{all}}, $self->{client_socket};
  0            
404             }
405             elsif (defined $data->{group}) {
406 0   0       $self->{subscriptions}->{groups}->{$data->{group}} ||= [];
407 0           push @{$self->{subscriptions}->{groups}->{$data->{group}}}, $self->{client_socket};
  0            
408             }
409             elsif (defined $data->{sender}) {
410 0   0       $self->{subscriptions}->{senders}->{$data->{sender}} ||= [];
411 0           push @{$self->{subscriptions}->{senders}->{$data->{sender}}}, $self->{client_socket};
  0            
412             }
413             elsif (defined $data->{type}) {
414 0   0       $self->{subscriptions}->{types}->{$data->{type}} ||= [];
415 0           push @{$self->{subscriptions}->{types}->{$data->{type}}}, $self->{client_socket};
  0            
416             }
417             elsif (defined $data->{unsubscribe}) {
418 0           $self->unsubscribe_client($self->{client_socket});
419             }
420             else {
421 0           return 0;
422             }
423            
424 0           return 1;
425             }
426              
427              
428             =head2 unsubscribe_client
429              
430             Removes the given socket from all subscription lists
431            
432             =cut
433             sub unsubscribe_client {
434 0     0 1   my $self = shift;
435 0           my $fh = shift;
436            
437 0           $self->{subscriptions}->{all} = [ grep { $_ != $fh } @{$self->{subscriptions}->{all}} ];
  0            
  0            
438            
439 0           foreach my $group (keys %{$self->{subscriptions}->{groups}}) {
  0            
440 0           $self->{subscriptions}->{groups}->{$group} = [ grep { $_ != $fh } @{$self->{subscriptions}->{groups}->{$group}} ];
  0            
  0            
441             }
442 0           foreach my $sender (keys %{$self->{subscriptions}->{senders}}) {
  0            
443 0           $self->{subscriptions}->{senders}->{$sender} = [ grep { $_ != $fh } @{$self->{subscriptions}->{senders}->{$sender}} ];
  0            
  0            
444             }
445 0           foreach my $type (keys %{$self->{subscriptions}->{types}}) {
  0            
446 0           $self->{subscriptions}->{types}->{$type} = [ grep { $_ != $fh } @{$self->{subscriptions}->{types}->{$type}} ];
  0            
  0            
447             }
448             }
449              
450             =head2 clients_registered_for_message
451              
452             Returns a list containing all the file handles registered to receive the given message
453              
454             =cut
455             sub clients_registered_for_message {
456 0     0 1   my $self = shift;
457 0           my $message = shift;
458            
459 0           my @handles = ();
460 0 0         push @handles, @{ $self->{subscriptions}->{all} || [] };
  0            
461 0 0         push @handles, @{ $self->{subscriptions}->{groups}->{$message->group()} || [] };
  0            
462 0 0         push @handles, @{ $self->{subscriptions}->{senders}->{$message->sender()} || [] };
  0            
463 0 0 0       push @handles, @{ $self->{subscriptions}->{types}->{$message->type() || ''} || [] };
  0            
464            
465 0           my %seen = ();
466 0           @handles = grep { $_ != $self->{client_socket} }
  0            
467 0           grep { $self->{authenticated}->{$_} }
468 0           grep { ! $seen{$_} ++ } @handles;
469            
470 0           return @handles;
471             }
472              
473             =head2 send_message
474              
475             Sends the given message to the clients that subscribed to the group or sender of the messages
476              
477             =cut
478             sub send_message {
479 0     0 1   my $self = shift;
480 0           my $message = shift;
481            
482 0           my @recipients = $self->clients_registered_for_message($message);
483            
484 0           local $\ = "\n";
485            
486 0           foreach my $client ( @recipients ) {
487 0           eval {
488 0           print $client to_json({ type => 'message' , payload => $message->serialize() });
489             };
490 0 0         if ($@) {
491 0           $self->logger->error('Error sending message to client!');
492             }
493             }
494             }
495              
496             =head1 AUTHOR
497              
498             Horea Gligan, C<< >>
499              
500             =head1 BUGS
501              
502             Please report any bugs or feature requests to C, or through
503             the web interface at L. I will be notified, and then you'll
504             automatically be notified of progress on your bug as I make changes.
505              
506              
507              
508              
509             =head1 SUPPORT
510              
511             You can find documentation for this module with the perldoc command.
512              
513             perldoc Net::MessageBus::Server
514              
515              
516             You can also look for information at:
517              
518             =over 4
519              
520             =item * RT: CPAN's request tracker (report bugs here)
521              
522             L
523              
524             =item * AnnoCPAN: Annotated CPAN documentation
525              
526             L
527              
528             =item * CPAN Ratings
529              
530             L
531              
532             =item * Search CPAN
533              
534             L
535              
536             =back
537              
538              
539             =head1 ACKNOWLEDGEMENTS
540              
541              
542             =head1 LICENSE AND COPYRIGHT
543              
544             Copyright 2012 Horea Gligan.
545              
546             This program is free software; you can redistribute it and/or modify it
547             under the terms of either: the GNU General Public License as published
548             by the Free Software Foundation; or the Artistic License.
549              
550             See http://dev.perl.org/licenses/ for more information.
551              
552              
553             =cut
554              
555             1; # End of Net::MessageBus::Server