File Coverage

blib/lib/IO/NonBlocking.pm
Criterion Covered Total %
statement 27 196 13.7
branch 0 52 0.0
condition 0 13 0.0
subroutine 9 31 29.0
pod 19 22 86.3
total 55 314 17.5


line stmt bran cond sub pod time code
1             package IO::NonBlocking;
2              
3 1     1   542352 use strict;
  1         2  
  1         44  
4 1     1   6 use warnings;
  1         3  
  1         39  
5 1     1   857 use POSIX;
  1         9071  
  1         8  
6 1     1   11438 use IO::Socket;
  1         30970  
  1         5  
7 1     1   340663 use IO::Select;
  1         2177  
  1         72  
8 1     1   9 use Socket;
  1         3  
  1         1188  
9 1     1   7 use Fcntl;
  1         2  
  1         501  
10 1     1   1064 use Tie::RefHash;
  1         6344  
  1         32  
11 1     1   8 use vars qw($VERSION);
  1         3  
  1         2130  
12              
13             $VERSION = '1.035';
14              
15             my @now=localtime(time);
16             my $cronCounter=$now[0]+60*$now[1]+3600*$now[2]+3600*24*$now[3];
17             my %buff;
18              
19             # begin with empty buffers
20             my %inbuffer = ();
21             my %outbuffer = ();
22             my %ready = ();
23              
24             my %turn_timeout;
25             my %turn_timeout_trigger;
26             my $select;
27             my %idle;
28             my %timer;
29              
30             tie %ready, 'Tie::RefHash';
31              
32             my $daily=0;
33              
34             sub new{
35 0     0 1   my($proto,@arg)=@_;
36 0   0       my $class=ref($proto) || $proto;
37 0           my %hash=%{$arg[0]};
  0            
38              
39 0           my $self={};
40              
41 0   0       $self->{serverName}=$hash{'server_name'} || die("server_name is required");
42 0   0       $self->{port}=$hash{'port'} || die("port is required");
43 0 0         $self->{delimiter}=defined($hash{delimiter}) ? $hash{delimiter} : "\0";
44 0 0         $self->{string_format}=defined($hash{string_format}) ? $hash{string_format} : '.*?';
45 0 0         $self->{timeout}=defined($hash{timeout}) ? $hash{timeout} : 300;
46 0 0         $self->{piddir}=defined($hash{piddir}) ? $hash{piddir} : '/tmp';
47              
48 0           bless $self,$class;
49             }
50              
51             sub start_turn{
52 0     0 1   my $self=shift;
53 0           my $client=shift;
54 0           my $time=shift;
55              
56 0           $turn_timeout{$client}=$time;
57 0           $turn_timeout_trigger{$client}=$_[0];
58             }
59              
60             sub stop_time {
61 0     0 1   my $self=shift;
62 0           my $client=shift;
63              
64 0           $turn_timeout{$client}=-1;
65 0           delete($turn_timeout_trigger{$client});
66             }
67              
68             sub close_client {
69 0     0 1   my $self=shift;
70 0           my $client=shift;
71              
72 0           delete $turn_timeout{$client};
73 0           delete $turn_timeout_trigger{$client};
74 0           delete $idle{$client};
75 0           delete $inbuffer{$client};
76 0           delete $outbuffer{$client};
77 0           delete $ready{$client};
78              
79 0           $select->remove($client);
80 0 0         close $client if $client;
81             }
82              
83             sub disconnect_client {
84 0     0 1   my $self=shift;
85 0           my $client=shift;
86              
87 0           delete $turn_timeout{$client};
88 0           delete $turn_timeout_trigger{$client};
89 0           delete $idle{$client};
90 0           delete $inbuffer{$client};
91 0           delete $outbuffer{$client};
92 0           delete $ready{$client};
93              
94 0           $self->onClientDisconnected($client);
95              
96 0           $select->remove($client);
97 0 0         close $client if $client;
98             }
99              
100             sub start{
101 0     0 1   my $self=shift;
102 0           my $current_time=time;
103              
104 0           print "Listening on port ".$self->port."\n";
105              
106 0 0         my $server = IO::Socket::INET->new(
107             LocalPort => $self->port,
108             Listen => 50,
109             Proto => 'tcp',
110             Reuse => 1)
111             or die "Can't make server socket: $@\n";
112              
113 0           $self->nonblock($server);
114 0           $select = IO::Select->new($server);
115              
116 0 0         open(FILE,">".$self->piddir."/".$self->serverName) or die "Cannot write PID file: $!\n";
117 0           print FILE $$;
118 0           close(FILE);
119              
120 0           $self->onServerInit;
121              
122 0           while (1) {
123 0           my $client;
124             my $rv;
125 0           my $data;
126              
127             # cron
128 0           my $this_time=time;
129 0 0         if ($current_time != $this_time) {
130 0           foreach $client($select->handles) {
131 0 0         next if $server == $client;
132              
133 0 0         if ($turn_timeout{$client} != -1) {
134 0 0         if ($turn_timeout{$client} <= 0) {
135 0           &{$turn_timeout_trigger{$client}}($self);
  0            
136 0           delete $turn_timeout_trigger{$client};
137 0           $turn_timeout{$client} = -1;
138             } else {
139 0           --$turn_timeout{$client};
140             }
141             }
142             }
143              
144 0           $self->onSheddo;
145 0           $current_time=$this_time;
146             }
147              
148             #timeout the Idles
149              
150 0 0         if($cronCounter % ($self->{timeout}+30) == 0){
151 0           foreach $client ($select->handles) {
152 0 0         next if $server == $client;
153              
154 0           my $this_time=time;
155              
156 0 0         if( $this_time - $idle{$client} >= $self->{timeout} ){
157 0           $self->disconnect_client($client);
158 0           next;
159             }
160             }
161             }
162              
163             # check for new information on the connections we have
164              
165             # anything to read or accept?
166 0           foreach $client ($select->can_read(1)) {
167              
168 0 0         if ($client == $server) {
169             # accept a new connection
170              
171 0           $client = $server->accept();
172 0           $select->add($client);
173 0           $self->nonblock($client);
174              
175 0           $self->onClientConnected($client);
176 0           $idle{$client}=time;
177 0           $turn_timeout{$client}=-1;
178             } else {
179             # read data
180              
181 0           $data = '';
182 0           $rv = $client->recv($data, POSIX::BUFSIZ, 0);
183              
184 0 0 0       unless (defined($rv) && length $data) {
185             # This would be the end of file, so close the client
186 0           $self->disconnect_client($client);
187 0           next;
188             }
189              
190 0           $inbuffer{$client} .= $data;
191              
192             # test whether the data in the buffer or the data we
193             # just read means there is a complete request waiting
194             # to be fulfilled. If there is, set $ready{$client}
195             # to the requests waiting to be fulfilled.
196 0           my $dm=$self->{delimiter};
197 0           my $sf=$self->{string_format};
198              
199 0           while ($inbuffer{$client} =~ s/($sf)$dm//s) {
200 0           push( @{$ready{$client}}, $1 );
  0            
201             }
202              
203 0           $idle{$client}=time;
204             }
205             }
206              
207             # Any complete requests to process?
208 0           foreach $client (keys %ready) {
209 0           $self->handle($client);
210             }
211              
212 0           my @bad_client;
213              
214             # Buffers to flush?
215 0           foreach $client ($select->can_write(1)) {
216             # Skip this client if we have nothing to say
217 0 0         next unless exists $outbuffer{$client};
218              
219 0           eval{
220 0           $rv = $client->send($outbuffer{$client}, 0);
221             };
222 0 0         push(@bad_client,$client),next if $@;
223              
224 0 0         unless (defined $rv) {
225             # Whine, but move on.
226              
227 0           warn "I was told I could write, but I can't.\n";
228 0           next;
229             }
230              
231 0 0 0       if ( $rv == length $outbuffer{$client} || $! == POSIX::EWOULDBLOCK) {
232 0           substr($outbuffer{$client}, 0, $rv) = '';
233 0 0         delete $outbuffer{$client} unless length $outbuffer{$client};
234             } else {
235             # Couldn't write all the data, and it wasn't because
236             # it would have blocked. Shutdown and move on.
237              
238 0           $self->disconnect_client($client);
239 0           next;
240             }
241             }
242              
243 0           foreach (@bad_client){
244 0           $self->disconnect_client($client);
245             }
246              
247             # Out of band data?
248 0           foreach $client ($select->has_exception(0)) {
249             # arg is timeout
250             # Deal with out-of-band data here, if you want to.
251             }
252             }
253              
254             }
255              
256             sub nonblock {
257 0     0 1   my $self=shift;
258 0           my $socket=shift;
259 0           my $flags;
260            
261 0 0         $flags = fcntl($socket, F_GETFL, 0)
262             or die "Can't get flags for socket: $!\n";
263 0 0         fcntl($socket, F_SETFL, $flags | O_NONBLOCK)
264             or die "Can't make socket nonblocking: $!\n";
265             }
266              
267             sub handle {
268 0     0 0   my $self=shift;
269 0           my $client = shift;
270 0           my $request;
271              
272             # requests are in $ready{$client}
273             # send output to $outbuffer{$client}
274              
275 0           foreach $request (@{$ready{$client}}) {
  0            
276             # $request is the text of the request
277             # put text of reply into $outbuffer{$client}
278              
279 0           $self->onReceiveMessage($client,$request);
280             }
281              
282 0           delete $ready{$client};
283             }
284              
285             #============= facility functions ============
286              
287             sub getip {
288 0     0 1   my $self=shift;
289 0           $_[0]->sockhost();
290             }
291              
292             sub getport {
293 0     0 1   my $self=shift;
294 0           $_[0]->sockport();
295             }
296              
297             sub piddir{
298 0     0 1   my $self=shift;
299              
300 0           return $self->{piddir};
301             }
302              
303             sub serverName{
304 0     0 1   my $self=shift;
305              
306 0           return $self->{serverName};
307             }
308              
309             sub port{
310 0     0 1   my $self=shift;
311              
312 0           return $self->{port};
313             }
314              
315             sub sendmsg{
316 0     0 1   my $self=shift;
317 0           my $client=shift;
318 0           my $msg=shift;
319              
320 0           $outbuffer{$client}.=$msg.$self->{delimiter};
321             }
322              
323 0     0 0   sub onServerInit {
324             }
325              
326             sub onClientConnected{
327 0     0 1   my $self=shift;
328 0           my $client=shift;
329             }
330              
331             sub onClientDisconnected{
332 0     0 1   my $self=shift;
333 0           my $client=shift;
334             }
335              
336             sub onReceiveMessage{
337 0     0 1   my $self=shift;
338 0           my $client=shift;
339 0           my $request=shift;
340             }
341              
342             sub onSheddo{
343 0     0 0   my $self=shift;
344              
345 0           foreach (sort {$a <=> $b} keys %timer) {
  0            
346 0 0         unless ($cronCounter % $_) {
347 0           &{$timer{$_}}($self);
  0            
348             }
349             }
350              
351 0 0         if ($cronCounter % 4527 == 0) {
352             #Sync time
353              
354 0           my @now=localtime(time);
355 0           $cronCounter=$now[0]+60*$now[1]+3600*$now[2]+3600*24*$now[3];
356 0           return;
357             }
358              
359 0           ++$cronCounter;
360             }
361              
362             sub cron {
363 0     0 1   my $self=shift;
364              
365 0           $timer{$_[0]}=$_[1];
366             }
367              
368             sub add_socket {
369 0     0 1   my $self=shift;
370 0           my $sock=shift;
371              
372 0           $select->add($sock);
373 0           $self->nonblock($sock);
374              
375 0           $idle{$sock}=time;
376 0           $turn_timeout{$sock}=-1;
377             }
378              
379             sub select {
380 0     0 1   my $self=shift;
381              
382 0           $select;
383             }
384              
385             1;
386             __END__