File Coverage

blib/lib/IO/Multiplex.pm
Criterion Covered Total %
statement 185 290 63.7
branch 43 142 30.2
condition 16 101 15.8
subroutine 34 45 75.5
pod 17 20 85.0
total 295 598 49.3


line stmt bran cond sub pod time code
1             package IO::Multiplex;
2              
3 5     5   92753 use strict;
  5         11  
  5         138  
4 5     5   18 use warnings;
  5         4  
  5         396  
5              
6             our $VERSION = '1.14';
7              
8             =head1 NAME
9              
10             IO::Multiplex - Manage IO on many file handles
11              
12             =head1 SYNOPSIS
13              
14             use IO::Multiplex;
15              
16             my $mux = new IO::Multiplex;
17             $mux->add($fh1);
18             $mux->add(\*FH2);
19             $mux->set_callback_object(...);
20             $mux->listen($server_socket);
21             $mux->loop;
22              
23             sub mux_input { ... }
24              
25             C is designed to take the effort out of managing
26             multiple file handles. It is essentially a really fancy front end to
27             the C
28             loop, it buffers all input and output to/from the file handles. It
29             can also accept incoming connections on one or more listen sockets.
30              
31             =head1 DESCRIPTION
32              
33             It is object oriented in design, and will notify you of significant events
34             by calling methods on an object that you supply. If you are not using
35             objects, you can simply supply C<__PACKAGE__> instead of an object reference.
36              
37             You may have one callback object registered for each file handle, or
38             one global one. Possibly both -- the per-file handle callback object
39             will be used instead of the global one.
40              
41             Each file handle may also have a timer associated with it. A callback
42             function is called when the timer expires.
43              
44             =head2 Handling input on descriptors
45              
46             When input arrives on a file handle, the C method is called
47             on the appropriate callback object. This method is passed three
48             arguments (in addition to the object reference itself of course):
49              
50             =over 4
51              
52             =item 1
53              
54             a reference to the mux,
55              
56             =item 2
57              
58             A reference to the file handle, and
59              
60             =item 3
61              
62             a reference to the input buffer for the file handle.
63              
64             =back
65              
66             The method should remove the data that it has consumed from the
67             reference supplied. It may leave unconsumed data in the input buffer.
68              
69             =head2 Handling output to descriptors
70              
71             If C did not handle output to the file handles as well
72             as input from them, then there is a chance that the program could
73             block while attempting to write. If you let the multiplexer buffer
74             the output, it will write the data only when the file handle is
75             capable of receiveing it.
76              
77             The basic method for handing output to the multiplexer is the C
78             method, which simply takes a file descriptor and the data to be
79             written, like this:
80              
81             $mux->write($fh, "Some data");
82              
83             For convenience, when the file handle is Ced to the multiplexer, it
84             is tied to a special class which intercepts all attempts to write to the
85             file handle. Thus, you can use print and printf to send output to the
86             handle in a normal manner:
87              
88             printf $fh "%s%d%X", $foo, $bar, $baz
89              
90             Unfortunately, Perl support for tied file handles is incomplete, and
91             functions such as C cannot be supported.
92              
93             Also, file handle object methods such as the C method of
94             C cannot be intercepted.
95              
96             =head1 EXAMPLES
97              
98             =head2 Simple Example
99              
100             This is a simple telnet-like program, which demonstrates the concepts
101             covered so far. It does not really work too well against a telnet
102             server, but it does OK against the sample server presented further down.
103              
104             use IO::Socket;
105             use IO::Multiplex;
106              
107             # Create a multiplex object
108             my $mux = new IO::Multiplex;
109             # Connect to the host/port specified on the command line,
110             # or localhost:23
111             my $sock = new IO::Socket::INET(Proto => 'tcp',
112             PeerAddr => shift || 'localhost',
113             PeerPort => shift || 23)
114             or die "socket: $@";
115              
116             # add the relevant file handles to the mux
117             $mux->add($sock);
118             $mux->add(\*STDIN);
119             # We want to buffer output to the terminal. This prevents the program
120             # from blocking if the user hits CTRL-S for example.
121             $mux->add(\*STDOUT);
122              
123             # We're not object oriented, so just request callbacks to the
124             # current package
125             $mux->set_callback_object(__PACKAGE__);
126              
127             # Enter the main mux loop.
128             $mux->loop;
129              
130             # mux_input is called when input is available on one of
131             # the descriptors.
132             sub mux_input {
133             my $package = shift;
134             my $mux = shift;
135             my $fh = shift;
136             my $input = shift;
137              
138             # Figure out whence the input came, and send it on to the
139             # other place.
140             if ($fh == $sock) {
141             print STDOUT $$input;
142             } else {
143             print $sock $$input;
144             }
145             # Remove the input from the input buffer.
146             $$input = '';
147             }
148              
149             # This gets called if the other end closes the connection.
150             sub mux_close {
151             print STDERR "Connection Closed\n";
152             exit;
153             }
154              
155             =head2 A server example
156              
157             Servers are just as simple to write. We just register a listen socket
158             with the multiplex object C method. It will automatically
159             accept connections on it and add them to its list of active file handles.
160              
161             This example is a simple chat server.
162              
163             use IO::Socket;
164             use IO::Multiplex;
165              
166             my $mux = new IO::Multiplex;
167              
168             # Create a listening socket
169             my $sock = new IO::Socket::INET(Proto => 'tcp',
170             LocalPort => shift || 2300,
171             Listen => 4)
172             or die "socket: $@";
173              
174             # We use the listen method instead of the add method.
175             $mux->listen($sock);
176              
177             $mux->set_callback_object(__PACKAGE__);
178             $mux->loop;
179              
180             sub mux_input {
181             my $package = shift;
182             my $mux = shift;
183             my $fh = shift;
184             my $input = shift;
185              
186             # The handles method returns a list of references to handles which
187             # we have registered, except for listen sockets.
188             foreach $c ($mux->handles) {
189             print $c $$input;
190             }
191             $$input = '';
192             }
193              
194             =head2 A more complex server example
195              
196             Let us take a look at the beginnings of a multi-user game server. We will
197             have a Player object for each player.
198              
199             # Paste the above example in here, up to but not including the
200             # mux_input subroutine.
201              
202             # mux_connection is called when a new connection is accepted.
203             sub mux_connection {
204             my $package = shift;
205             my $mux = shift;
206             my $fh = shift;
207              
208             # Construct a new player object
209             Player->new($mux, $fh);
210             }
211              
212             package Player;
213              
214             my %players = ();
215              
216             sub new {
217             my $package = shift;
218             my $self = bless { mux => shift,
219             fh => shift } => $package;
220              
221             # Register the new player object as the callback specifically for
222             # this file handle.
223              
224             $self->{mux}->set_callback_object($self, $self->{fh});
225             print $self->{fh}
226             "Greetings, Professor. Would you like to play a game?\n";
227              
228             # Register this player object in the main list of players
229             $players{$self} = $self;
230             $mux->set_timeout($self->{fh}, 1);
231             }
232              
233             sub players { return values %players; }
234              
235             sub mux_input {
236             my $self = shift;
237             shift; shift; # These two args are boring
238             my $input = shift; # Scalar reference to the input
239              
240             # Process each line in the input, leaving partial lines
241             # in the input buffer
242             while ($$input =~ s/^(.*?)\n//) {
243             $self->process_command($1);
244             }
245             }
246              
247             sub mux_close {
248             my $self = shift;
249              
250             # Player disconnected;
251             # [Notify other players or something...]
252             delete $players{$self};
253             }
254             # This gets called every second to update player info, etc...
255             sub mux_timeout {
256             my $self = shift;
257             my $mux = shift;
258              
259             $self->heartbeat;
260             $mux->set_timeout($self->{fh}, 1);
261             }
262              
263             =head1 METHODS
264              
265             =cut
266              
267 5     5   2269 use POSIX qw(errno_h BUFSIZ);
  5         25789  
  5         24  
268 5     5   5945 use Socket;
  5         2601  
  5         2112  
269 5     5   2271 use FileHandle qw(autoflush);
  5         19274  
  5         19  
270 5     5   1447 use IO::Handle;
  5         7  
  5         132  
271 5     5   16 use Fcntl;
  5         5  
  5         1412  
272 5     5   23 use Carp qw(carp);
  5         10  
  5         231  
273 5     5   19 use constant IsWin => ($^O eq 'MSWin32');
  5         7  
  5         421  
274              
275              
276             BEGIN {
277 5     5   38 eval {
278             # Can optionally use Hi Res timers if available
279 5         2587 require Time::HiRes;
280 5         6085 Time::HiRes->import('time');
281             };
282             }
283              
284             # This is what you want. Trust me.
285             $SIG{PIPE} = 'IGNORE';
286              
287 5     5   760 { no warnings;
  5         7  
  5         12410  
288             if(IsWin) { *EWOULDBLOCK = sub() {10035} }
289             }
290              
291             =head2 new
292              
293             Construct a new C object.
294              
295             $mux = new IO::Multiplex;
296              
297             =cut
298              
299             sub new
300             {
301 4     4 1 5456 my $package = shift;
302 4         71 my $self = bless { _readers => '',
303             _writers => '',
304             _fhs => {},
305             _handles => {},
306             _timerkeys => {},
307             _timers => [],
308             _listen => {} } => $package;
309 4         18 return $self;
310             }
311              
312             =head2 listen
313              
314             Add a socket to be listened on. The socket should have had the
315             C and C system calls already applied to it. The C
316             module will do this for you.
317              
318             $socket = new IO::Socket::INET(Listen => ..., LocalAddr => ...);
319             $mux->listen($socket);
320              
321             Connections will be automatically accepted and Ced to the multiplex
322             object. C callback method will also be called.
323              
324             =cut
325              
326             sub listen
327             {
328 2     2 1 937 my $self = shift;
329 2         4 my $fh = shift;
330              
331 2         7 $self->add($fh);
332 2         5 $self->{_fhs}{"$fh"}{listen} = 1;
333 2         5 $fh;
334             }
335              
336             =head2 add
337              
338             Add a file handle to the multiplexer.
339              
340             $mux->add($fh);
341              
342             As a side effect, this sets non-blocking mode on the handle, and disables
343             STDIO buffering. It also ties it to intercept output to the handle.
344              
345             =cut
346              
347             sub add
348             {
349 7     7 1 26 my $self = shift;
350 7         72 my $fh = shift;
351              
352 7 50       58 return if $self->{_fhs}{"$fh"};
353              
354 7         45 nonblock($fh);
355 7         37 autoflush($fh, 1);
356 7         300 fd_set($self->{_readers}, $fh, 1);
357              
358 7         53 my $sockopt = getsockopt $fh, SOL_SOCKET, SO_TYPE;
359 7 100 66     118 $self->{_fhs}{"$fh"}{udp_true} = 1
360             if defined $sockopt && SOCK_DGRAM == unpack "i", $sockopt;
361              
362 7         25 $self->{_fhs}{"$fh"}{inbuffer} = '';
363 7         21 $self->{_fhs}{"$fh"}{outbuffer} = '';
364 7         23 $self->{_fhs}{"$fh"}{fileno} = fileno($fh);
365 7         12 $self->{_handles}{"$fh"} = $fh;
366 7         48 tie *$fh, "IO::Multiplex::Handle", $self, $fh;
367 7         12 return $fh;
368             }
369              
370             =head2 remove
371              
372             Removes a file handle from the multiplexer. This also unties the
373             handle. It does not currently turn STDIO buffering back on, or turn
374             off non-blocking mode.
375              
376             $mux->remove($fh);
377              
378             =cut
379              
380             sub remove
381             {
382 0     0 1 0 my $self = shift;
383 0         0 my $fh = shift;
384 0         0 fd_set($self->{_writers}, $fh, 0);
385 0         0 fd_set($self->{_readers}, $fh, 0);
386 0         0 delete $self->{_fhs}{"$fh"};
387 0         0 delete $self->{_handles}{"$fh"};
388 0         0 $self->_removeTimer($fh);
389 0 0       0 if (my $x = tied $fh) {
390 0         0 undef $x;
391 0         0 untie *$fh;
392             }
393 0         0 return 1;
394             }
395              
396             =head2 set_callback_object
397              
398             Set the object on which callbacks are made. If you are not using objects,
399             you can specify the name of the package into which the method calls are
400             to be made.
401              
402             If a file handle is supplied, the callback object is specific for that
403             handle:
404              
405             $mux->set_callback_object($object, $fh);
406              
407             Otherwise, it is considered a default callback object, and is used when
408             events occur on a file handle that does not have its own callback object.
409              
410             $mux->set_callback_object(__PACKAGE__);
411              
412             The previously registered object (if any) is returned.
413              
414             See also the CALLBACK INTERFACE section.
415              
416             =cut
417              
418             sub set_callback_object
419             {
420 4     4 1 43 my $self = shift;
421 4         7 my $obj = shift;
422 4         9 my $fh = shift;
423 4 50 33     18 return if $fh && !exists($self->{_fhs}{"$fh"});
424              
425 4 50       20 my $old = $fh ? $self->{_fhs}{"$fh"}{object} : $self->{_object};
426              
427 4 50       21 $fh ? $self->{_fhs}{"$fh"}{object} : $self->{_object} = $obj;
428 4         15 return $old;
429             }
430              
431             =head2 kill_output
432              
433             Remove any pending output on a file descriptor.
434              
435             $mux->kill_output($fh);
436              
437             =cut
438              
439             sub kill_output
440             {
441 0     0 1 0 my $self = shift;
442 0         0 my $fh = shift;
443 0 0 0     0 return unless $fh && exists($self->{_fhs}{"$fh"});
444              
445 0         0 $self->{_fhs}{"$fh"}{outbuffer} = '';
446 0         0 fd_set($self->{_writers}, $fh, 0);
447             }
448              
449             =head2 outbuffer
450              
451             Return or set the output buffer for a descriptor
452              
453             $output = $mux->outbuffer($fh);
454             $mux->outbuffer($fh, $output);
455              
456             =cut
457              
458             sub outbuffer
459             {
460 0     0 1 0 my $self = shift;
461 0         0 my $fh = shift;
462 0 0 0     0 return unless $fh && exists($self->{_fhs}{"$fh"});
463              
464 0 0       0 if (@_) {
465 0 0       0 $self->{_fhs}{"$fh"}{outbuffer} = $_[0] if @_;
466 0 0       0 fd_set($self->{_writers}, $fh, 0) if !$_[0];
467             }
468              
469 0         0 $self->{_fhs}{"$fh"}{outbuffer};
470             }
471              
472             =head2 inbuffer
473              
474             Return or set the input buffer for a descriptor
475              
476             $input = $mux->inbuffer($fh);
477             $mux->inbuffer($fh, $input);
478              
479             =cut
480              
481             sub inbuffer
482             {
483 0     0 1 0 my $self = shift;
484 0         0 my $fh = shift;
485 0 0 0     0 return unless $fh && exists($self->{_fhs}{"$fh"});
486              
487 0 0       0 if (@_) {
488 0 0       0 $self->{_fhs}{"$fh"}{inbuffer} = $_[0] if @_;
489             }
490              
491 0         0 return $self->{_fhs}{"$fh"}{inbuffer};
492             }
493              
494             =head2 set_timeout
495              
496             Set the timer for a file handle. The timeout value is a certain number of
497             seconds in the future, after which the C callback is called.
498              
499             If the C module is installed, the timers may be specified in
500             fractions of a second.
501              
502             Timers are not reset automatically.
503              
504             $mux->set_timeout($fh, 23.6);
505              
506             Use C<$mux-Eset_timeout($fh, undef)> to cancel a timer.
507              
508             =cut
509              
510             sub set_timeout
511             {
512 4     4 1 140 my $self = shift;
513 4         5 my $fh = shift;
514 4         8 my $timeout = shift;
515 4 50 33     28 return unless $fh && exists($self->{_fhs}{"$fh"});
516              
517 4 50       13 if (defined $timeout) {
518 4         28 $self->_addTimer($fh, $timeout + time);
519             } else {
520 0         0 $self->_removeTimer($fh);
521             }
522             }
523              
524             =head2 handles
525              
526             Returns a list of handles that the C object knows about,
527             excluding listen sockets.
528              
529             @handles = $mux->handles;
530              
531             =cut
532              
533             sub handles
534             {
535 0     0 1 0 my $self = shift;
536              
537 0         0 return grep(!$self->{_fhs}{"$_"}{listen}, values %{$self->{_handles}});
  0         0  
538             }
539              
540             sub _addTimer {
541 4     4   4 my $self = shift;
542 4         5 my $fh = shift;
543 4         5 my $time = shift;
544              
545             # Set a key so that we can quickly tell if a given $fh has
546             # a timer set
547 4         9 $self->{_timerkeys}{"$fh"} = 1;
548              
549             # Store the timeout in an array, and resort it
550 4         6 @{$self->{_timers}} = sort { $a->[1] <=> $b->[1] } (@{$self->{_timers}}, [ $fh, $time ] );
  4         12  
  0         0  
  4         15  
551             }
552              
553             sub _removeTimer {
554 4     4   14 my $self = shift;
555 4         5 my $fh = shift;
556              
557             # Return quickly if no timer is set
558 4 50       16 return unless exists $self->{_timerkeys}{"$fh"};
559              
560             # Remove the timeout from the sorted array
561 4         6 @{$self->{_timers}} = grep { $_->[0] ne $fh } @{$self->{_timers}};
  4         10  
  4         22  
  4         11  
562              
563             # Get rid of the key
564 4         17 delete $self->{_timerkeys}{"$fh"};
565             }
566              
567              
568             =head2 loop
569              
570             Enter the main loop and start processing IO events.
571              
572             $mux->loop;
573              
574             =cut
575              
576             sub loop
577             {
578 4     4 1 28 my $self = shift;
579 4         7 my $heartbeat = shift;
580 4         17 $self->{_endloop} = 0;
581              
582 4   66     26 while (!$self->{_endloop} && keys %{$self->{_fhs}}) {
  14         2821  
583 14         27 my $rv;
584             my $data;
585 14         25 my $rdready = "";
586 14         28 my $wrready = "";
587 14         14 my $timeout = undef;
588              
589 14         12 foreach my $fh (values %{$self->{_handles}}) {
  14         37  
590 21 0 33     88 fd_set($rdready, $fh, 1) if
      33        
591             ref($fh) =~ /SSL/ &&
592             $fh->can("pending") &&
593             $fh->pending;
594             }
595              
596 14 50       36 if (!length $rdready) {
597 14 100       11 if (@{$self->{_timers}}) {
  14         31  
598 4         13 $timeout = $self->{_timers}[0][1] - time;
599             }
600              
601 14         20023072 my $numready = select($rdready=$self->{_readers},
602             $wrready=$self->{_writers},
603             undef,
604             $timeout);
605              
606 14 50       83 unless(defined($numready)) {
607 0 0 0     0 if ($! == EINTR || $! == EAGAIN) {
608 0         0 next;
609             } else {
610 0         0 last;
611             }
612             }
613             }
614              
615 14 50       36 &{ $heartbeat } ($rdready, $wrready) if $heartbeat;
  0         0  
616              
617 14         62 foreach my $k (keys %{$self->{_handles}}) {
  14         62  
618 19 50       43 my $fh = $self->{_handles}->{$k} or next;
619              
620             # Avoid creating a permanent empty hash ref for "$fh"
621             # by attempting to access its {object} element
622             # if it has already been closed.
623 19 50       99 next unless exists $self->{_fhs}{"$fh"};
624              
625             # It is not easy to replace $self->{_fhs}{"$fh"} with a
626             # variable, because some mux_* routines may remove it as
627             # side-effect.
628              
629             # Get the callback object.
630 19   33     151 my $obj = $self->{_fhs}{"$fh"}{object} ||
631             $self->{_object};
632              
633             # Is this descriptor ready for reading?
634 19 100       43 if (fd_isset($rdready, $fh))
635             {
636 10 100       27 if ($self->{_fhs}{"$fh"}{listen}) {
637             # It's a server socket, so a new connection is
638             # waiting to be accepted
639 2         14 my $client = $fh->accept;
640 2 50       169 next unless ($client);
641 2         7 $self->add($client);
642 2 50 33     18 $obj->mux_connection($self, $client)
643             if $obj && $obj->can("mux_connection");
644             } else {
645 8 100       22 if ($self->is_udp($fh)) {
646 6         56 $rv = recv($fh, $data, BUFSIZ, 0);
647 6 50       12 if (defined $rv) {
648             # Remember where the last UDP packet came from
649 6         15 $self->{_fhs}{"$fh"}{udp_peer} = $rv;
650             }
651             } else {
652 2         4 $rv = &POSIX::read(fileno($fh), $data, BUFSIZ);
653             }
654              
655 8 50 33     36 if (defined($rv) && length($data)) {
656             # Append the data to the client's receive buffer,
657             # and call process_input to see if anything needs to
658             # be done.
659 8         18 $self->{_fhs}{"$fh"}{inbuffer} .= $data;
660 8 50 33     108 $obj->mux_input($self, $fh,
661             \$self->{_fhs}{"$fh"}{inbuffer})
662             if $obj && $obj->can("mux_input");
663             } else {
664 0 0       0 unless (defined $rv) {
665             next if
666 0 0 0     0 $! == EINTR ||
      0        
667             $! == EAGAIN ||
668             $! == EWOULDBLOCK;
669 0 0       0 warn "IO::Multiplex read error: $!"
670             if $! != ECONNRESET;
671             }
672             # There's an error, or we received EOF. If
673             # there's pending data to be written, we leave
674             # the connection open so it can be sent. If
675             # the other end is closed for writing, the
676             # send will error and we close down there.
677             # Either way, we remove it from _readers as
678             # we're no longer interested in reading from
679             # it.
680 0         0 fd_set($self->{_readers}, $fh, 0);
681 0 0 0     0 $obj->mux_eof($self, $fh,
682             \$self->{_fhs}{"$fh"}{inbuffer})
683             if $obj && $obj->can("mux_eof");
684              
685 0 0       0 if (exists $self->{_fhs}{"$fh"}) {
686 0         0 $self->{_fhs}{"$fh"}{inbuffer} = '';
687             # The mux_eof handler could have responded
688             # with a shutdown for writing.
689 0 0 0     0 $self->close($fh)
690             unless exists $self->{_fhs}{"$fh"}
691             && length $self->{_fhs}{"$fh"}{outbuffer};
692             }
693 0         0 next;
694             }
695             }
696             } # end if readable
697 17 50       1820 next unless exists $self->{_fhs}{"$fh"};
698              
699 17 50       27 if (fd_isset($wrready, $fh)) {
700 0 0       0 unless (length $self->{_fhs}{"$fh"}{outbuffer}) {
701 0         0 fd_set($self->{_writers}, $fh, 0);
702 0 0 0     0 $obj->mux_outbuffer_empty($self, $fh)
703             if ($obj && $obj->can("mux_outbuffer_empty"));
704 0         0 next;
705             }
706 0         0 $rv = &POSIX::write(fileno($fh),
707             $self->{_fhs}{"$fh"}{outbuffer},
708             length($self->{_fhs}{"$fh"}{outbuffer}));
709 0 0       0 unless (defined($rv)) {
710             # We got an error writing to it. If it's
711             # EWOULDBLOCK (shouldn't happen if select told us
712             # we can write) or EAGAIN, or EINTR we don't worry
713             # about it. otherwise, close it down.
714 0 0 0     0 unless ($! == EWOULDBLOCK ||
      0        
715             $! == EINTR ||
716             $! == EAGAIN) {
717 0 0       0 if ($! == EPIPE) {
718 0 0 0     0 $obj->mux_epipe($self, $fh)
719             if $obj && $obj->can("mux_epipe");
720             } else {
721 0         0 warn "IO::Multiplex: write error: $!\n";
722             }
723 0         0 $self->close($fh);
724             }
725 0         0 next;
726             }
727 0         0 substr($self->{_fhs}{"$fh"}{outbuffer}, 0, $rv) = '';
728 0 0       0 unless (length $self->{_fhs}{"$fh"}{outbuffer}) {
729             # Mark us as not writable if there's nothing more to
730             # write
731 0         0 fd_set($self->{_writers}, $fh, 0);
732 0 0 0     0 $obj->mux_outbuffer_empty($self, $fh)
733             if ($obj && $obj->can("mux_outbuffer_empty"));
734              
735 0 0 0     0 if ( $self->{_fhs}{"$fh"}
736             && $self->{_fhs}{"$fh"}{shutdown}) {
737             # If we've been marked for shutdown after write
738             # do it.
739 0         0 shutdown($fh, 1);
740 0         0 $self->{_fhs}{"$fh"}{outbuffer} = '';
741 0 0       0 unless (length $self->{_fhs}{"$fh"}{inbuffer}) {
742             # We'd previously been shutdown for reading
743             # also, so close out completely
744 0         0 $self->close($fh);
745 0         0 next;
746             }
747             }
748             }
749             } # End if writeable
750              
751 17 50       68 next unless exists $self->{_fhs}{"$fh"};
752              
753             } # End foreach $fh (...)
754              
755 12 100       14 $self->_checkTimeouts() if @{$self->{_timers}};
  12         50  
756              
757             } # End while(loop)
758             }
759              
760             sub _checkTimeouts {
761 5     5   6 my $self = shift;
762              
763             # Get the current time
764 5         17 my $time = time;
765              
766             # Copy all of the timers that should go off into
767             # a temporary array. This allows us to modify the
768             # real array as we process the timers, without
769             # interfering with the loop.
770              
771 5         9 my @timers = ();
772 5         9 foreach my $timer (@{$self->{_timers}}) {
  5         9  
773             # If the timer is in the future, we can stop
774 5 100       20 last if $timer->[1] > $time;
775 4         9 push @timers, $timer;
776             }
777              
778 5         6 foreach my $timer (@timers) {
779 4         7 my $fh = $timer->[0];
780 4         17 $self->_removeTimer($fh);
781              
782 4 50       12 next unless exists $self->{_fhs}{"$fh"};
783              
784 4   33     24 my $obj = $self->{_fhs}{"$fh"}{object} || $self->{_object};
785 4 50 33     108 $obj->mux_timeout($self, $fh) if $obj && $obj->can("mux_timeout");
786             }
787             }
788              
789              
790             =head2 endloop
791              
792             Prematurly terminate the loop. The loop will automatically terminate
793             when there are no remaining descriptors to be watched.
794              
795             $mux->endloop;
796              
797             =cut
798              
799             sub endloop
800             {
801 2     2 1 134 my $self = shift;
802 2         5 $self->{_endloop} = 1;
803             }
804              
805             =head2 udp_peer
806              
807             Get peer endpoint of where the last udp packet originated.
808              
809             $saddr = $mux->udp_peer($fh);
810              
811             =cut
812              
813             sub udp_peer {
814 6     6 1 11 my $self = shift;
815 6         7 my $fh = shift;
816 6         19 return $self->{_fhs}{"$fh"}{udp_peer};
817             }
818              
819             =head2 is_udp
820              
821             Sometimes UDP packets require special attention.
822             This method will tell if a file handle is of type UDP.
823              
824             $is_udp = $mux->is_udp($fh);
825              
826             =cut
827              
828             sub is_udp {
829 14     14 1 21 my $self = shift;
830 14         12 my $fh = shift;
831 14         37 return $self->{_fhs}{"$fh"}{udp_true};
832             }
833              
834             =head2 write
835              
836             Send output to a file handle.
837              
838             $mux->write($fh, "'ere I am, JH!\n");
839              
840             =cut
841              
842             sub write
843             {
844 6     6 1 5 my $self = shift;
845 6         6 my $fh = shift;
846 6         10 my $data = shift;
847 6 50 33     31 return unless $fh && exists($self->{_fhs}{"$fh"});
848              
849 6 50       20 if ($self->{_fhs}{"$fh"}{shutdown}) {
850 0         0 $! = EPIPE;
851 0         0 return undef;
852             }
853 6 50       10 if ($self->is_udp($fh)) {
854 6 100       11 if (my $udp_peer = $self->udp_peer($fh)) {
855             # Send the packet back to the last peer that said something
856 4         83 return send($fh, $data, 0, $udp_peer);
857             } else {
858             # No udp_peer yet?
859             # This better be a connect()ed UDP socket
860             # or else this will fail with ENOTCONN
861 2         72 return send($fh, $data, 0);
862             }
863             }
864 0         0 $self->{_fhs}{"$fh"}{outbuffer} .= $data;
865 0         0 fd_set($self->{_writers}, $fh, 1);
866 0         0 return length($data);
867             }
868              
869             =head2 shutdown
870              
871             Shut down a socket for reading or writing or both. See the C
872             Perl documentation for further details.
873              
874             If the shutdown is for reading, it happens immediately. However,
875             shutdowns for writing are delayed until any pending output has been
876             successfully written to the socket.
877              
878             $mux->shutdown($socket, 1);
879              
880             =cut
881              
882             sub shutdown
883             {
884 0     0 1 0 my $self = shift;
885 0         0 my $fh = shift;
886 0         0 my $which = shift;
887 0 0 0     0 return unless $fh && exists($self->{_fhs}{"$fh"});
888              
889 0 0 0     0 if ($which == 0 || $which == 2) {
890             # Shutdown for reading. We can do this now.
891 0         0 shutdown($fh, 0);
892             # The mux_eof hook must be run from the main loop to consume
893             # the rest of the inbuffer if there is anything left.
894             # It will also remove $fh from _readers.
895             }
896              
897 0 0 0     0 if ($which == 1 || $which == 2) {
898             # Shutdown for writing. Only do this now if there is no pending
899             # data.
900 0 0       0 if(length $self->{_fhs}{"$fh"}{outbuffer}) {
901 0         0 $self->{_fhs}{"$fh"}{shutdown} = 1;
902             } else {
903 0         0 shutdown($fh, 1);
904 0         0 $self->{_fhs}{"$fh"}{outbuffer} = '';
905             }
906             }
907             # Delete the descriptor if it's totally gone.
908 0 0 0     0 unless (length $self->{_fhs}{"$fh"}{inbuffer} ||
909             length $self->{_fhs}{"$fh"}{outbuffer}) {
910 0         0 $self->close($fh);
911             }
912             }
913              
914             =head2 close
915              
916             Close a handle. Always use this method to close a handle that is being
917             watched by the multiplexer.
918              
919             $mux->close($fh);
920              
921             =cut
922              
923             sub close
924             {
925 0     0 1 0 my $self = shift;
926 0         0 my $fh = shift;
927 0 0       0 return unless exists $self->{_fhs}{"$fh"};
928              
929 0   0     0 my $obj = $self->{_fhs}{"$fh"}{object} || $self->{_object};
930 0 0       0 warn "closing with read buffer" if length $self->{_fhs}{"$fh"}{inbuffer};
931 0 0       0 warn "closing with write buffer" if length $self->{_fhs}{"$fh"}{outbuffer};
932              
933 0         0 fd_set($self->{_readers}, $fh, 0);
934 0         0 fd_set($self->{_writers}, $fh, 0);
935              
936 0         0 delete $self->{_fhs}{"$fh"};
937 0         0 delete $self->{_handles}{"$fh"};
938 0         0 untie *$fh;
939 0         0 close $fh;
940 0 0 0     0 $obj->mux_close($self, $fh) if $obj && $obj->can("mux_close");
941             }
942              
943             # We set non-blocking mode on all descriptors. If we don't, then send
944             # might block if the data is larger than the kernel can accept all at once,
945             # even though select told us we can write. With non-blocking mode, we
946             # get a partial write in those circumstances, which is what we want.
947              
948             sub nonblock
949 7     7 0 161 { my $fh = shift;
950              
951 7         13 if(IsWin)
952             { ioctl($fh, 0x8004667e, pack("L!", 1));
953             }
954             else
955 7 50       42 { my $flags = fcntl($fh, F_GETFL, 0)
956             or die "fcntl F_GETFL: $!\n";
957 7 50       46 fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
958             or die "fcntl F_SETFL $!\n";
959             }
960             }
961              
962             sub fd_set
963             {
964 7     7 0 36 vec($_[0], fileno($_[1]), 1) = $_[2];
965             }
966              
967             sub fd_isset
968             {
969 36     36 0 105 return vec($_[0], fileno($_[1]), 1);
970             }
971              
972             # We tie handles into this package to handle write buffering.
973              
974             package IO::Multiplex::Handle;
975              
976 5     5   34 use strict;
  5         2  
  5         134  
977 5     5   2370 use Tie::Handle;
  5         7105  
  5         86  
978 5     5   29 use Carp;
  5         5  
  5         204  
979 5     5   18 use vars qw(@ISA);
  5         6  
  5         1022  
980             @ISA = qw(Tie::Handle);
981              
982             sub FILENO
983             {
984 54     54   70 my $self = shift;
985 54         246 return ($self->{_mux}->{_fhs}->{"$self->{_fh}"}->{fileno});
986             }
987              
988              
989             sub TIEHANDLE
990             {
991 7     7   12 my $package = shift;
992 7         9 my $mux = shift;
993 7         6 my $fh = shift;
994              
995 7         34 my $self = bless { _mux => $mux,
996             _fh => $fh } => $package;
997 7         19 return $self;
998             }
999              
1000             sub WRITE
1001             {
1002 6     6   933 my $self = shift;
1003 6         9 my ($msg, $len, $offset) = @_;
1004 6   50     18 $offset ||= 0;
1005 6         24 return $self->{_mux}->write($self->{_fh}, substr($msg, $offset, $len));
1006             }
1007              
1008             sub CLOSE
1009             {
1010 0     0     my $self = shift;
1011 0           return $self->{_mux}->shutdown($self->{_fh}, 2);
1012             }
1013              
1014             sub READ
1015             {
1016 0     0     carp "Do not read from a muxed file handle";
1017             }
1018              
1019             sub READLINE
1020             {
1021 0     0     carp "Do not read from a muxed file handle";
1022             }
1023              
1024             sub FETCH
1025             {
1026 0     0     return "Fnord";
1027             }
1028              
1029             1;
1030              
1031             __END__