File Coverage

blib/lib/Net/Server/NonBlocking.pm
Criterion Covered Total %
statement 33 283 11.6
branch 0 132 0.0
condition 0 17 0.0
subroutine 11 34 32.3
pod 13 18 72.2
total 57 484 11.7


line stmt bran cond sub pod time code
1             package Net::Server::NonBlocking;
2              
3 1     1   13042 use 5.000503;
  1         3  
  1         32  
4              
5 1     1   4 use strict;
  1         2  
  1         47  
6 1     1   5 use warnings;
  1         6  
  1         29  
7 1     1   1017 use POSIX;
  1         6428  
  1         7  
8 1     1   4310 use IO::Socket;
  1         29022  
  1         6  
9 1     1   1787 use IO::Select;
  1         2001  
  1         68  
10 1     1   70 use Socket;
  1         2  
  1         1255  
11 1     1   8 use Fcntl;
  1         9  
  1         486  
12 1     1   1029 use Tie::RefHash;
  1         6864  
  1         34  
13 1     1   11 use vars qw($VERSION);
  1         1  
  1         40  
14 1     1   333138 use Data::Dumper;
  1         17573  
  1         4068  
15              
16             $VERSION = '0.048';
17              
18             my @now=localtime(time);
19             my $cronCounter=$now[0]+60*$now[1]+3600*$now[2]+3600*24*$now[3];
20             my %buff;
21              
22             # begin with empty buffers
23             my %inbuffer = ();
24             my %outbuffer = ();
25             my %ready = ();
26              
27             my %turn_timeout;
28             my %turn_timeout_trigger;
29             my $select = IO::Select->new();
30             my %idle;
31             my %timer;
32             my %map_all;
33             my %map_specific;
34             my %map_server;
35             my %map_client;
36             my %alive;
37              
38             tie %ready, 'Tie::RefHash';
39              
40             sub new{
41 0     0 1   my($proto,@arg)=@_;
42 0   0       my $class=ref($proto) || $proto;
43 0           my $hash=$arg[0];
44              
45 0           my $self={};
46 0 0         $self->{pidfile}=exists $hash->{pidfile} ? $hash->{pidfile} : '/tmp/anonymous_server';
47              
48 0           bless $self,$class;
49             }
50              
51             sub add {
52 0     0 1   my $self=shift;
53 0           my $hash=shift;
54              
55 0 0         die("server_name is required") if not exists $hash->{server_name};
56            
57 0 0         if (not exists $hash->{local_port}) {
58 0 0         $self->{listen}->{$hash->{server_name}}->{delimiter}=
59             exists $hash->{delimiter} ? $hash->{delimiter} : "\0";
60 0 0         $self->{listen}->{$hash->{server_name}}->{string_format}=
61             exists $hash->{string_format} ? $hash->{string_format} : '.*?';
62 0 0         $self->{listen}->{$hash->{server_name}}->{timeout}=
63             exists $hash->{timeout} ? $hash->{timeout} : 300;
64             $self->{listen}->{$hash->{server_name}}->{on_disconnected}=
65 0 0   0     exists $hash->{on_disconnected} ? $hash->{on_disconnected} : sub {};
  0            
66             $self->{listen}->{$hash->{server_name}}->{on_recv_msg}=
67 0 0   0     exists $hash->{on_recv_msg} ? $hash->{on_recv_msg} : sub {};
  0            
68 0 0         $self->{listen}->{$hash->{server_name}}->{read_buffer} =
69             exists $hash->{read_buffer} ? $hash->{read_buffer} : \&read_buffer;
70 0 0         $self->{listen}->{$hash->{server_name}}->{on_disconnected_param}=
71             exists $hash->{on_disconnected_param} ? $hash->{on_disconnected_param} : [];
72 0 0         $self->{listen}->{$hash->{server_name}}->{on_recv_msg_param}=
73             exists $hash->{on_recv_msg_param} ? $hash->{on_recv_msg_param} : [];
74              
75 0           return undef;
76             } else {
77 0           my $server;
78              
79 0 0         if (exists $hash->{local_address}) {
80 0 0         $server = IO::Socket::INET->new(
81             LocalAddr => $hash->{local_address},
82             LocalPort => $hash->{local_port},
83             Listen => 50,
84             Proto => 'tcp',
85             Reuse => 1,
86             Blocking => 0)
87             or die "Can't make server socket -- $@\n";
88             } else {
89 0 0         $server = IO::Socket::INET->new(
90             LocalPort => $hash->{local_port},
91             Listen => 50,
92             Proto => 'tcp',
93             Reuse => 1,
94             Blocking => 0)
95             or die "Can't make server socket -- $@\n";
96             }
97 0           $self->nonblock($server);
98              
99 0           $self->{listen}->{$hash->{server_name}}->{socket}=$server;
100 0   0       $self->{listen}->{$hash->{server_name}}->{local_address}=$hash->{local_address} || "localhost";
101 0           $self->{listen}->{$hash->{server_name}}->{local_port}=$hash->{local_port};
102 0 0         $self->{listen}->{$hash->{server_name}}->{delimiter}=
103             exists $hash->{delimiter} ? $hash->{delimiter} : "\0";
104 0 0         $self->{listen}->{$hash->{server_name}}->{string_format}=
105             exists $hash->{string_format} ? $hash->{string_format} : '.*?';
106 0 0         $self->{listen}->{$hash->{server_name}}->{timeout}=
107             exists $hash->{timeout} ? $hash->{timeout} : 300;
108             $self->{listen}->{$hash->{server_name}}->{on_connected}=
109 0 0   0     exists $hash->{on_connected} ? $hash->{on_connected} : sub {};
  0            
110             $self->{listen}->{$hash->{server_name}}->{on_disconnected}=
111 0 0   0     exists $hash->{on_disconnected} ? $hash->{on_disconnected} : sub {};
  0            
112             $self->{listen}->{$hash->{server_name}}->{on_recv_msg}=
113 0 0   0     exists $hash->{on_recv_msg} ? $hash->{on_recv_msg} : sub {};
  0            
114 0 0         $self->{listen}->{$hash->{server_name}}->{read_buffer} =
115             exists $hash->{read_buffer} ? $hash->{read_buffer} : \&read_buffer;
116 0 0         $self->{listen}->{$hash->{server_name}}->{on_connected_param}=
117             exists $hash->{on_connected_param} ? $hash->{on_connected_param} : [];
118 0 0         $self->{listen}->{$hash->{server_name}}->{on_disconnected_param}=
119             exists $hash->{on_disconnected_param} ? $hash->{on_disconnected_param} : [];
120 0 0         $self->{listen}->{$hash->{server_name}}->{on_recv_msg_param}=
121             exists $hash->{on_recv_msg_param} ? $hash->{on_recv_msg_param} : [];
122              
123 0 0         if (exists $hash->{local_address}) {
124 0           $map_specific{"$hash->{local_address}:$hash->{local_port}"}=
125             $hash->{server_name};
126             } else {
127 0           $map_all{$hash->{local_port}}=
128             $hash->{server_name};
129             }
130              
131 0           $map_server{$server} = $hash->{server_name};
132              
133 0           return $server;
134             }
135             }
136              
137             sub bind {
138 0     0 1   my $self=shift;
139 0           my $server_name=shift;
140 0           my $client=shift;
141              
142 0           $select->add($client);
143 0           $self->nonblock($client);
144              
145 0           $alive{$client}=1;
146 0           $idle{$client}=time;
147 0           $turn_timeout{$client}=-1;
148              
149 0           $map_client{$client}=$server_name;
150             }
151              
152             sub nonblock {
153 0     0 0   my $self=shift;
154 0           my $socket=shift;
155 0           my $flags;
156              
157 0 0         $flags = fcntl($socket, F_GETFL, 0)
158             or die "Can't get flags for socket: $!\n";
159 0 0         fcntl($socket, F_SETFL, $flags | O_NONBLOCK)
160             or die "Can't make socket nonblocking: $!\n";
161             }
162              
163             sub handle {
164 0     0 0   my $self=shift;
165 0           my $server_name=shift;
166 0           my $client = shift;
167 0           my $request;
168              
169             # requests are in $ready{$client}
170             # send output to $outbuffer{$client}
171              
172 0           foreach $request (@{$ready{$client}}) {
  0            
173             # $request is the text of the request
174             # put text of reply into $outbuffer{$client}
175              
176 0           $self->{listen}->{$server_name}->{on_recv_msg}->($self,$client,$request,@{$self->{listen}->{$server_name}->{on_recv_msg_param}});
  0            
177             }
178              
179 0           delete $ready{$client};
180             }
181              
182             sub get_server_socket {
183 0     0 1   my $self=shift;
184 0           my $server_name = shift;
185              
186 0           $self->{listen}->{$server_name}->{socket};
187             }
188              
189             sub get_server_name {
190 0     0 1   my $self=shift;
191 0           my $client=shift;
192             #my @caller=caller();
193              
194 0 0         return $map_server{$client} if exists $map_server{$client};
195 0 0         return $map_client{$client} if exists $map_client{$client};
196            
197 0 0         if (exists $map_specific{$client->sockhost().":".$client->sockport()}) {
198 0           return $map_specific{$client->sockhost().":".$client->sockport()};
199             } else {
200 0           return $map_all{$client->sockport()};
201             }
202             }
203              
204             sub start_turn {
205 0     0 1   my $self=shift;
206 0           my $client=shift;
207 0           my $time=shift;
208              
209 0           $turn_timeout{$client}=$time;
210 0           $turn_timeout_trigger{$client}=$_[0];
211             }
212              
213             sub reset_turn {
214 0     0 1   my $self=shift;
215 0           my $client=shift;
216              
217 0           $turn_timeout{$client}=-1;
218 0           delete($turn_timeout_trigger{$client});
219             }
220              
221             #sub flush_input {
222             # my $self=shift;
223             # my $client=shift;
224             # my $server_name=$self->get_server_name($client);
225             #
226             # my $rin='';
227             # vec($rin,fileno($client),1)=1;
228             #
229             # select(my $rout=$rin,undef,undef,0);
230             #
231             # if (vec($rout,fileno($client),1)) {
232             # my $data = '';
233             # my $rv = $client->recv($data, POSIX::BUFSIZ, 0);
234             #
235             # unless (defined($rv) && length $data) {
236             # # This would be the end of file, so close the client
237             # $self->erase_client($server_name,$client);
238             # next;
239             # }
240             #
241             # $inbuffer{$client} .= $data;
242             # $self->{listen}->{$server_name}->{read_buffer}->($self,\$inbuffer{$client},\$ready{$client},$server_name);
243             #
244             # $idle{$client}=time;
245             # }
246             #
247             # $self->handle($server_name,$client);
248             #}
249              
250             sub flush_output {
251 0     0 1   my $self=shift;
252 0           my $client=shift;
253 0           my $server_name=$self->get_server_name($client);
254              
255 0 0         return unless length $outbuffer{$client};
256              
257 0           my $rin='';
258 0           vec($rin,fileno($client),1)=1;
259              
260 0           select(undef,my $rout=$rin,undef,0);
261              
262 0 0         if (vec($rout,fileno($client),1)) {
263 0           while ($outbuffer{$client}) {
264 0           my $rv;
265              
266 0           eval{
267 0           $rv = $client->send($outbuffer{$client}, 0);
268             };
269 0 0         return if $@; #the $client is disconnected
270              
271 0 0         unless (defined $rv) {
272             # Whine, but move on.
273            
274 0           warn "I was told I could write, but I can't.\n";
275 0           next;
276             }
277              
278 0 0 0       if ( $rv == length $outbuffer{$client} || $! == POSIX::EWOULDBLOCK) {
279 0           substr($outbuffer{$client}, 0, $rv) = '';
280 0 0         delete $outbuffer{$client} unless length $outbuffer{$client};
281             } else {
282             # Couldn't write all the data
283              
284 0 0         substr($outbuffer{$client}, 0,$rv,'') if defined $rv;
285 0 0         delete $outbuffer{$client} unless length $outbuffer{$client};
286             }
287             }
288             }
289             }
290              
291             sub close_client {
292 0     0 1   my $self=shift;
293 0           my $client=shift;
294              
295             #print "Idle delete close_client $client\n";
296              
297 0           delete $alive{$client};
298 0           delete $turn_timeout{$client};
299 0           delete $turn_timeout_trigger{$client};
300 0           delete $idle{$client};
301 0           delete $inbuffer{$client};
302 0           delete $outbuffer{$client};
303 0           delete $ready{$client};
304 0 0         delete $map_client{$client} if exists $map_client{$client};
305              
306 0           $select->remove($client);
307 0 0         close $client if $client;
308             }
309              
310             sub erase_client {
311 0     0 1   my $self=shift;
312 0           my $server_name=shift;
313 0           my $client=shift;
314              
315 0           delete $alive{$client};
316 0           delete $turn_timeout{$client};
317 0           delete $turn_timeout_trigger{$client};
318 0           delete $idle{$client};
319 0           delete $inbuffer{$client};
320 0           delete $outbuffer{$client};
321 0           delete $ready{$client};
322 0 0         delete $map_client{$client} if exists $map_client{$client};
323              
324 0           $self->{listen}->{$server_name}->{on_disconnected}->($self,$client,@{$self->{listen}->{$server_name}->{on_disconnected_param}});
  0            
325              
326 0           $select->remove($client);
327 0 0         close $client if $client;
328             }
329              
330             sub enqueue {
331 0     0 1   my $self=shift;
332 0           my $client=shift;
333 0           my $data=shift;
334              
335 0 0 0       return unless $client and $data;
336              
337 0           $outbuffer{$client}.=$data;
338             }
339              
340             sub read_buffer {
341 0     0 0   my $self=shift;
342 0           my $raw_input=shift;
343 0           my $cooked_input=shift;
344 0           my $server_name=shift;
345              
346 0           my $dm=$self->{listen}->{$server_name}->{delimiter};
347 0           my $sf=$self->{listen}->{$server_name}->{string_format};
348              
349 0           while ($$raw_input =~ s/($sf)$dm//s) {
350 0           push( @{$$cooked_input}, $1 );
  0            
351             }
352             }
353              
354             sub start{
355 0     0 1   my $self=shift;
356 0           my $current_time=time;
357              
358 0           foreach (keys %{$self->{listen}}) {
  0            
359 0 0         next unless $self->{listen}->{$_}->{local_port};
360 0           warn "Listen on ".$self->{listen}->{$_}->{local_address}.":".
361             $self->{listen}->{$_}->{local_port}."\n";
362 0           $select->add($self->{listen}->{$_}->{socket});
363             }
364              
365 0 0         open(FILE,">".$self->{pidfile}) or die "Cannot write PID file: $!\n";
366 0           print FILE $$;
367 0           close(FILE);
368              
369 0           while (1) {
370 0           my $client;
371             my $rv;
372 0           my $data;
373              
374             # cron
375 0           my $this_time=time;
376 0 0         if ($current_time != $this_time) {
377 0           foreach $client($select->handles) {
378 0 0         next if exists $map_server{$client};
379 0 0         next unless exists $alive{$client};
380              
381 0 0         if ($turn_timeout{$client} != -1) {
382 0 0         if ($turn_timeout{$client} <= 0) {
383 0           &{$turn_timeout_trigger{$client}}($self,$client);
  0            
384 0           delete $turn_timeout_trigger{$client};
385 0           $turn_timeout{$client} = -1;
386             } else {
387 0           --$turn_timeout{$client};
388             }
389             }
390             }
391              
392 0           $self->onSheddo;
393 0           $current_time=$this_time;
394             }
395              
396             #timeout the Idles
397              
398 0           foreach $client ($select->handles) {
399 0 0         next if exists $map_server{$client};
400 0 0         next unless exists $alive{$client};
401              
402 0           my $server_name=$self->get_server_name($client);
403              
404 0           my $this_time=time;
405 0 0         if( $this_time - $idle{$client} >= $self->{listen}->{$server_name}->{timeout} ){
406 0           $self->erase_client($server_name,$client);
407 0           next;
408             }
409             }
410             # check for new information on the connections we have
411              
412             # anything to read or accept?
413 0           foreach $client ($select->can_read(1)) {
414 0 0         if (exists $map_server{$client}) {
415 0           my $server_name=$self->get_server_name($client);
416             # accept a new connection
417 0           $client = $self->{listen}->{$server_name}->{socket}->accept();
418 0 0         unless ($client) {
419 0           warn "Accepting new socket error: $!\n";
420 0           next;
421             }
422              
423 0           $select->add($client);
424 0           $self->nonblock($client);
425              
426 0           $alive{$client}=1;
427 0           $self->{listen}->{$server_name}->{on_connected}->($self,$client,@{$self->{listen}->{$server_name}->{on_connected_param}});
  0            
428 0           $idle{$client}=time;
429 0           $turn_timeout{$client}=-1;
430             } else {
431 0 0         next unless exists $alive{$client};
432 0           my $server_name=$self->get_server_name($client);
433             # read data
434              
435 0           $data = '';
436 0           $rv = $client->recv($data, POSIX::BUFSIZ, 0);
437            
438 0 0 0       unless (defined($rv) && length $data) {
439             # This would be the end of file, so close the client
440 0           $self->erase_client($server_name,$client);
441 0           next;
442             }
443              
444 0           $inbuffer{$client} .= $data;
445 0           $self->{listen}->{$server_name}->{read_buffer}->($self,\$inbuffer{$client},\$ready{$client},$server_name);
446              
447 0           $idle{$client}=time;
448             }
449             }
450              
451             # Any complete requests to process?
452 0           foreach $client (keys %ready) {
453 0           my $server_name=$self->get_server_name($client);
454 0           $self->handle($server_name,$client);
455             }
456              
457 0           my @bad_client;
458              
459             # Buffers to flush?
460 0           foreach $client ($select->can_write(1)) {
461 0 0         next unless exists $alive{$client};
462              
463 0           my $server_name=$self->get_server_name($client);
464            
465             # Skip this client if we have nothing to say
466 0 0         next unless exists $outbuffer{$client};
467            
468 0           eval{
469 0           $rv = $client->send($outbuffer{$client}, 0);
470             };
471 0 0         push(@bad_client,$client),next if $@;
472            
473 0 0         unless (defined $rv) {
474             # Whine, but move on.
475            
476 0           warn "I was told I could write, but I can't.\n";
477 0           next;
478             }
479              
480 0 0 0       if ( $rv == length $outbuffer{$client} || $! == POSIX::EWOULDBLOCK) {
481 0           substr($outbuffer{$client}, 0, $rv) = '';
482 0 0         delete $outbuffer{$client} unless length $outbuffer{$client};
483             } else {
484             # Couldn't write all the data
485              
486 0 0         substr($outbuffer{$client}, 0,$rv,'') if defined $rv;
487 0 0         delete $outbuffer{$client} unless length $outbuffer{$client};
488 0           next;
489             }
490             }
491              
492 0           foreach $client (@bad_client){
493 0           my $server_name=$self->get_server_name($client);
494 0           $self->erase_client($server_name,$client);
495             }
496              
497             # Out of band data?
498 0           foreach $client ($select->has_exception(0)) {
499             # arg is timeout
500             # Deal with out-of-band data here, if you want to.
501             }
502             }
503              
504             }
505              
506             sub onSheddo{
507 0     0 0   my $self=shift;
508              
509 0           foreach (sort {$a <=> $b} keys %timer) {
  0            
510 0 0         unless ($cronCounter % $_) {
511 0           my $count=@{$timer{$_}};
  0            
512 0           &{$timer{$_}->[0]}($self,@{$timer{$_}}[1..($count-1)]);
  0            
  0            
513             }
514             }
515              
516 0           ++$cronCounter;
517             }
518              
519             sub cron {
520 0     0 1   my $self=shift;
521 0           my $sec=shift;
522 0           my $sub=shift;
523              
524 0           $timer{$sec}=[$sub,@_];
525             }
526              
527             sub select {
528 0     0 0   my $self=shift;
529              
530 0           $select;
531             }
532              
533             1;
534              
535             __END__