File Coverage

blib/lib/Danga/Socket/AnyEvent.pm
Criterion Covered Total %
statement 69 404 17.0
branch 2 164 1.2
condition 0 56 0.0
subroutine 24 74 32.4
pod 0 39 0.0
total 95 737 12.8


line stmt bran cond sub pod time code
1             ###########################################################################
2              
3             =head1 NAME
4              
5             Danga::Socket::AnyEvent - Danga::Socket reimplemented in terms of AnyEvent
6              
7             =head1 SYNOPSIS
8              
9             # This will clobber the Danga::Socket namespace
10             # with the new implementation.
11             use Danga::Socket::AnyEvent;
12            
13             # Then just use Danga::Socket as normal.
14              
15             =head1 DESCRIPTION
16              
17             This is an alternative implementation of L that is
18             implemented in terms of L, an abstraction layer for
19             event loops. This allows Danga::Socket applications to run
20             in any event loop supported by AnyEvent, and allows Danga::Socket
21             applications to make use of AnyEvent-based libraries.
22              
23             Loading this module will install a workalike set of functions
24             into the Danga::Socket package. It must therefore be loaded before
25             anything loads the real L. If you try to load
26             this module after Danga::Socket has been loaded then it will
27             die.
28              
29             =head1 DIFFERENCES FROM Danga::Socket
30              
31             Although this module aims to be a faithful recreation of the
32             features and interface of Danga::Socket, there are some known
33             differences:
34              
35             =over
36              
37             =item * The C feature will only work if the caller
38             runs the event loop via CEventLoop>; if a caller
39             runs the AnyEvent event loop directly, or if some other library
40             runs it, then the timeout will not take effect.
41              
42             =item * The C feature behaves in a slightly
43             different way than in the stock Danga::Socket. It's currently
44             implemented via an AnyEvent idlewatcher that runs whenever
45             the event loop goes idle after running a Danga::Socket event.
46             This means that it will probably run at different times than
47             it would have in Danga::Socket's own event loops.
48              
49             =item * The C method will always return true, regardless
50             of what backend is actually implementing the event loop. Make
51             sure to use AnyEvent's L backend if you would like to use
52             Epoll/KQueue/etc rather than other, less efficient mechanisms.
53              
54             =back
55              
56             =cut
57              
58             ###########################################################################
59              
60             package Danga::Socket::AnyEvent;
61              
62 1     1   483 use vars qw{$VERSION};
  1         2  
  1         45  
63             $VERSION = "0.01_01";
64 1     1   4 use Carp;
  1         0  
  1         70  
65              
66             BEGIN {
67             # Detect if someone's already loaded Danga::Socket and bail out.
68 1 50   1   27 if ($INC{"Danga/Socket.pm"}) {
69 0         0 Carp::croak("Can't load Danga::Socket::AnyEvent: the real Danga::Socket was already loaded from ".$INC{"Danga/Socket.pm"});
70             }
71             }
72              
73             package # hidden from PAUSE
74             Danga::Socket;
75              
76             our $VERSION = "1.61";
77              
78 1     1   8 use strict;
  1         1  
  1         15  
79 1     1   440 use bytes;
  1         7  
  1         3  
80 1     1   403 use POSIX ();
  1         4253  
  1         21  
81 1     1   425 use Time::HiRes ();
  1         954  
  1         20  
82 1     1   783 use AnyEvent;
  1         3700  
  1         34  
83              
84 1     1   221 my $opt_bsd_resource = eval "use BSD::Resource; 1;";
  0         0  
  0         0  
85              
86 1     1   4 use warnings;
  1         1  
  1         29  
87 1     1   3 no warnings qw(deprecated);
  1         1  
  1         30  
88              
89 1     1   396 use Sys::Syscall qw(:epoll);
  1         2312  
  1         159  
90              
91 1         4 use fields ('sock', # underlying socket
92             'fd', # numeric file descriptor
93             'write_buf', # arrayref of scalars, scalarrefs, or coderefs to write
94             'write_buf_offset', # offset into first array of write_buf to start writing at
95             'write_buf_size', # total length of data in all write_buf items
96             'write_set_watch', # bool: true if we internally set watch_write rather than by a subclass
97             'read_push_back', # arrayref of "pushed-back" read data the application didn't want
98             'closed', # bool: socket is closed
99             'corked', # bool: socket is corked
100             'event_watch', # bitmask of events the client is interested in (POLLIN,OUT,etc.)
101             'peer_v6', # bool: cached; if peer is an IPv6 address
102             'peer_ip', # cached stringified IP address of $sock
103             'peer_port', # cached port number of $sock
104             'local_ip', # cached stringified IP address of local end of $sock
105             'local_port', # cached port number of local end of $sock
106             'writer_func', # subref which does writing. must return bytes written (or undef) and set $! on errors
107 1     1   452 );
  1         1052  
108              
109 1         97 use Errno qw(EINPROGRESS EWOULDBLOCK EISCONN ENOTSOCK
110 1     1   110 EPIPE EAGAIN EBADF ECONNRESET ENOPROTOOPT);
  1         1  
111 1     1   477 use Socket qw(IPPROTO_TCP);
  1         2527  
  1         125  
112 1     1   5 use Carp qw(croak confess);
  1         1  
  1         44  
113              
114 1 50   1   4 use constant TCP_CORK => ($^O eq "linux" ? 3 : 0); # FIXME: not hard-coded (Linux-specific too)
  1         0  
  1         49  
115 1     1   3 use constant DebugLevel => 0;
  1         1  
  1         33  
116              
117 1     1   3 use constant POLLIN => 1;
  1         1  
  1         31  
118 1     1   2 use constant POLLOUT => 4;
  1         1  
  1         30  
119 1     1   3 use constant POLLERR => 8;
  1         1  
  1         29  
120 1     1   3 use constant POLLHUP => 16;
  1         1  
  1         28  
121 1     1   3 use constant POLLNVAL => 32;
  1         1  
  1         3118  
122              
123             our (
124             %Timers, # timers
125             %FdWatchers, # fd (num) -> [ AnyEvent read watcher, AnyEvent write watcher ]
126             %DescriptorMap, # fd (num) -> Danga::Socket object that owns it
127             %OtherFds, # fd (num) -> sub to run when that fd is ready to read or write
128             %PushBackSet, # fd (num) -> Danga::Socket (fds with pushed back read data)
129             $PostLoopCallback, # subref to call at the end of each loop, if defined (global)
130             %PLCMap, # fd (num) -> PostLoopCallback (per-object)
131             $IdleWatcher, # an AnyEvent idle watcher that'll run PostEventLoop and then delete itself.
132             $MainLoopCondVar, # When EventLoop is running this contains the AnyEvent condvar that
133             # will cause the main loop to exit if you call ->send() on it.
134             @ToClose, # sockets to close when event loop is done
135             $DoProfile, # if on, enable profiling
136             %Profiling, # what => [ utime, stime, calls ]
137             $DoneInit, # if we've done the one-time module init yet
138             $LoopTimeout, # timeout of event loop in milliseconds
139              
140             );
141              
142             Reset();
143              
144             =head1 METHODS
145              
146             =cut
147              
148             #####################################################################
149             ### C L A S S M E T H O D S
150             #####################################################################
151              
152             =head2 C<< CLASS->Reset() >>
153              
154             Reset all state
155              
156             =cut
157             sub Reset {
158 1     1 0 2 %Timers = ();
159 1         13 %FdWatchers = ();
160 1         1 %DescriptorMap = ();
161 1         2 %OtherFds = ();
162             }
163              
164             =head2 C<< CLASS->HaveEpoll() >>
165              
166             Returns a true value if this class will use IO::Epoll for async IO.
167              
168             =cut
169             sub HaveEpoll {
170 0     0 0   return 1;
171             }
172              
173             =head2 C<< CLASS->WatchedSockets() >>
174              
175             Returns the number of file descriptors for which we have watchers installed.
176              
177             =cut
178             sub WatchedSockets {
179 0     0 0   return scalar(keys(%FdWatchers));
180             }
181             *watched_sockets = *WatchedSockets;
182              
183             =head2 C<< CLASS->EnableProfiling() >>
184              
185             Turns profiling on, clearing current profiling data.
186              
187             =cut
188             sub EnableProfiling {
189 0 0   0 0   if ($opt_bsd_resource) {
190 0           %Profiling = ();
191 0           $DoProfile = 1;
192 0           return 1;
193             }
194 0           return 0;
195             }
196              
197             =head2 C<< CLASS->DisableProfiling() >>
198              
199             Turns off profiling, but retains data up to this point
200              
201             =cut
202             sub DisableProfiling {
203 0     0 0   $DoProfile = 0;
204             }
205              
206             =head2 C<< CLASS->ProfilingData() >>
207              
208             Returns reference to a hash of data in format:
209              
210             ITEM => [ utime, stime, #calls ]
211              
212             =cut
213             sub ProfilingData {
214 0     0 0   return \%Profiling;
215             }
216              
217             =head2 C<< CLASS->ToClose() >>
218              
219             Return the list of sockets that are awaiting close() at the end of the
220             current event loop.
221              
222             =cut
223 0     0 0   sub ToClose { return @ToClose; }
224              
225             =head2 C<< CLASS->OtherFds( [%fdmap] ) >>
226              
227             Get/set the hash of file descriptors that need processing in parallel with
228             the registered Danga::Socket objects.
229              
230             Callers must not modify the returned hash.
231              
232             =cut
233             sub OtherFds {
234 0     0 0   my $class = shift;
235 0 0         if ( @_ ) {
236             # Clean up any watchers that we no longer need.
237 0           foreach my $fd (keys %OtherFds) {
238 0           delete $FdWatchers{$fd};
239             }
240 0           %OtherFds = ();
241 0           $class->AddOtherFds(@_);
242             }
243 0 0         return wantarray ? %OtherFds : \%OtherFds;
244             }
245              
246             =head2 C<< CLASS->AddOtherFds( [%fdmap] ) >>
247              
248             Add fds to the OtherFds hash for processing.
249              
250             =cut
251             sub AddOtherFds {
252 0     0 0   my ($class, %fdmap) = @_;
253              
254 0           foreach my $fd (keys %fdmap) {
255 0           my $coderef = $fdmap{$fd};
256 0           $OtherFds{$fd} = $coderef;
257              
258             # The OtherFds interface uses the same callback for both read and write events,
259             # so create two AnyEvent watchers that differ only in their mode.
260             $FdWatchers{$fd} = [ map {
261 0           my $mode = $_;
  0            
262 0           AnyEvent->io(
263             fh => $fd,
264             poll => $mode,
265             cb => _wrap_watcher_cb($coderef),
266             )
267             } qw(r w) ];
268             }
269             }
270              
271             =head2 C<< CLASS->SetLoopTimeout( $timeout ) >>
272              
273             Set the loop timeout for the event loop to some value in milliseconds.
274              
275             A timeout of 0 (zero) means poll forever. A timeout of -1 means poll and return
276             immediately.
277              
278             =cut
279             sub SetLoopTimeout {
280 0     0 0   return $LoopTimeout = $_[1] + 0;
281             }
282              
283             =head2 C<< CLASS->DebugMsg( $format, @args ) >>
284              
285             Print the debugging message specified by the C-style I and
286             I
287              
288             =cut
289             sub DebugMsg {
290 0     0 0   my ( $class, $fmt, @args ) = @_;
291 0           chomp $fmt;
292 0           printf STDERR ">>> $fmt\n", @args;
293             }
294              
295             =head2 C<< CLASS->AddTimer( $seconds, $coderef ) >>
296              
297             Add a timer to occur $seconds from now. $seconds may be fractional, but timers
298             are not guaranteed to fire at the exact time you ask for.
299              
300             Returns a timer object which you can call C<< $timer->cancel >> on if you need to.
301              
302             =cut
303             sub AddTimer {
304 0     0 0   my $class = shift;
305 0           my ($secs, $coderef) = @_;
306              
307 0           my $timer = [ undef ];
308              
309 0           my $key = "$timer"; # Just stringify the timer array to get our hash key
310              
311             my $cancel = sub {
312 0     0     delete $Timers{$key};
313 0           };
314              
315             my $cb = sub {
316 0     0     $coderef->();
317 0           $cancel->();
318 0           };
319              
320 0           $timer->[0] = $cancel;
321              
322             # We save the watcher in $Timers to keep it alive until it runs,
323             # or until $cancel above overwrites it with undef to cause it to
324             # get collected.
325 0           $Timers{$key} = AnyEvent->timer(
326             after => $secs,
327             cb => _wrap_watcher_cb($cb),
328             );
329              
330 0           return bless $timer, 'Danga::Socket::Timer';
331             }
332              
333             =head2 C<< CLASS->DescriptorMap() >>
334              
335             Get the hash of Danga::Socket objects keyed by the file descriptor (fileno) they
336             are wrapping.
337              
338             Returns a hash in list context or a hashref in scalar context.
339              
340             =cut
341             sub DescriptorMap {
342 0 0   0 0   return wantarray ? %DescriptorMap : \%DescriptorMap;
343             }
344             *descriptor_map = *DescriptorMap;
345             *get_sock_ref = *DescriptorMap;
346              
347             =head2 C<< CLASS->EventLoop() >>
348              
349             Start processing IO events. In most daemon programs this never exits. See
350             C below for how to exit the loop.
351              
352             =cut
353              
354             sub EventLoop {
355 0     0 0   my $class = shift;
356              
357 0           my $timeout_watcher;
358 0 0 0       if ($LoopTimeout && $LoopTimeout != -1) {
359             # Return after the given amount of milliseconds (which we must of, of course, convert to seconds)
360 0           my $timeout = $LoopTimeout * 0.001;
361             $timeout_watcher = AnyEvent->timer(
362 0     0     cb => sub { PostEventLoop() },
363 0           after => $timeout,
364             interval => $timeout,
365             );
366             }
367              
368 0           $MainLoopCondVar = AnyEvent->condvar;
369 0           $MainLoopCondVar->recv(); # Blocks until $MainLoopCondVar->send is called
370              
371             # Always run PostLoopCallback before we return, even if we timed out before we completed an event.
372 0           PostEventLoop();
373              
374 0           $MainLoopCondVar = undef;
375             }
376              
377             ## profiling-related data/functions
378             our ($Prof_utime0, $Prof_stime0);
379             sub _pre_profile {
380 0     0     ($Prof_utime0, $Prof_stime0) = getrusage();
381             }
382              
383             sub _post_profile {
384             # get post information
385 0     0     my ($autime, $astime) = getrusage();
386              
387             # calculate differences
388 0           my $utime = $autime - $Prof_utime0;
389 0           my $stime = $astime - $Prof_stime0;
390              
391 0           foreach my $k (@_) {
392 0   0       $Profiling{$k} ||= [ 0.0, 0.0, 0 ];
393 0           $Profiling{$k}->[0] += $utime;
394 0           $Profiling{$k}->[1] += $stime;
395 0           $Profiling{$k}->[2]++;
396             }
397             }
398              
399             =head2 C<< CLASS->SetPostLoopCallback( CODEREF ) >>
400              
401             Sets post loop callback function. Pass a subref and it will be
402             called every time the event loop finishes.
403              
404             Return 1 (or any true value) from the sub to make the loop continue, 0 or false
405             and it will exit.
406              
407             The callback function will be passed two parameters: \%DescriptorMap, \%OtherFds.
408              
409             =cut
410             sub SetPostLoopCallback {
411 0     0 0   my ($class, $ref) = @_;
412              
413 0 0         if (ref $class) {
414             # per-object callback
415 0           my Danga::Socket $self = $class;
416 0 0 0       if (defined $ref && ref $ref eq 'CODE') {
417 0           $PLCMap{$self->{fd}} = $ref;
418             } else {
419 0           delete $PLCMap{$self->{fd}};
420             }
421             } else {
422             # global callback
423 0 0 0       $PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef;
424             }
425             }
426              
427             # Internal function: run the post-event callback, send read events
428             # for pushed-back data, and close pending connections. returns 1
429             # if event loop should continue, or 0 to shut it all down.
430             sub PostEventLoop {
431             # fire read events for objects with pushed-back read data
432 0     0 0   my $loop = 1;
433 0           while ($loop) {
434 0           $loop = 0;
435 0           foreach my $fd (keys %PushBackSet) {
436 0           my Danga::Socket $pob = $PushBackSet{$fd};
437              
438             # a previous event_read invocation could've closed a
439             # connection that we already evaluated in "keys
440             # %PushBackSet", so skip ones that seem to have
441             # disappeared. this is expected.
442 0 0         next unless $pob;
443              
444 0 0         die "ASSERT: the $pob socket has no read_push_back" unless @{$pob->{read_push_back}};
  0            
445             next unless (! $pob->{closed} &&
446 0 0 0       $pob->{event_watch} & POLLIN);
447 0           $loop = 1;
448 0           $pob->event_read;
449             }
450             }
451              
452             # now we can close sockets that wanted to close during our event processing.
453             # (we didn't want to close them during the loop, as we didn't want fd numbers
454             # being reused and confused during the event loop)
455 0           while (my $sock = shift @ToClose) {
456 0           my $fd = fileno($sock);
457              
458             # close the socket. (not a Danga::Socket close)
459 0           $sock->close;
460              
461             # and now we can finally remove the fd from the map. see
462             # comment above in _cleanup.
463 0           delete $DescriptorMap{$fd};
464             }
465              
466              
467             # by default we keep running, unless a postloop callback (either per-object
468             # or global) cancels it
469 0           my $keep_running = 1;
470              
471             # per-object post-loop-callbacks
472 0           for my $plc (values %PLCMap) {
473 0   0       $keep_running &&= $plc->(\%DescriptorMap, \%OtherFds);
474             }
475              
476             # now we're at the very end, call callback if defined
477 0 0         if (defined $PostLoopCallback) {
478 0   0       $keep_running &&= $PostLoopCallback->(\%DescriptorMap, \%OtherFds);
479             }
480              
481 0           return $keep_running;
482             }
483              
484             # Internal method to decorate a watcher callback with extra code to install
485             # the IdleWatcher necessary to run PostEventLoop.
486             sub _wrap_watcher_cb {
487 0     0     my ($cb) = @_;
488              
489             return sub {
490 0     0     my $ret = $cb->(@_);
491             $IdleWatcher = AnyEvent->idle(
492             cb => sub {
493 0           my $keep_running = PostEventLoop();
494 0           $IdleWatcher = undef; # Free this watcher
495 0 0         $MainLoopCondVar->send unless $keep_running;
496             },
497 0           );
498 0           return $ret;
499 0           };
500             }
501              
502             #####################################################################
503             ### Danga::Socket-the-object code
504             #####################################################################
505              
506             =head2 OBJECT METHODS
507              
508             =head2 C<< CLASS->new( $socket ) >>
509              
510             Create a new Danga::Socket subclass object for the given I which will
511             react to events on it during the C.
512              
513             This is normally (always?) called from your subclass via:
514              
515             $class->SUPER::new($socket);
516              
517             =cut
518             sub new {
519 0     0 0   my Danga::Socket $self = shift;
520 0 0         $self = fields::new($self) unless ref $self;
521              
522 0           my $sock = shift;
523              
524 0           $self->{sock} = $sock;
525 0           my $fd = fileno($sock);
526              
527 0 0 0       Carp::cluck("undef sock and/or fd in Danga::Socket->new. sock=" . ($sock || "") . ", fd=" . ($fd || ""))
      0        
      0        
528             unless $sock && $fd;
529              
530 0           $self->{fd} = $fd;
531 0           $self->{write_buf} = [];
532 0           $self->{write_buf_offset} = 0;
533 0           $self->{write_buf_size} = 0;
534 0           $self->{closed} = 0;
535 0           $self->{corked} = 0;
536 0           $self->{read_push_back} = [];
537              
538 0           $self->{event_watch} = POLLERR|POLLHUP|POLLNVAL;
539              
540             # Create the slots where the watchers will go if the caller
541             # decides to watch_read or watch_write.
542 0           $FdWatchers{$fd} = [ undef, undef ];
543              
544             Carp::cluck("Danga::Socket::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})")
545 0 0         if $DescriptorMap{$fd};
546              
547 0           $DescriptorMap{$fd} = $self;
548 0           return $self;
549             }
550              
551              
552             #####################################################################
553             ### I N S T A N C E M E T H O D S
554             #####################################################################
555              
556             =head2 C<< $obj->tcp_cork( $boolean ) >>
557              
558             Turn TCP_CORK on or off depending on the value of I.
559              
560             =cut
561             sub tcp_cork {
562 0     0 0   my Danga::Socket $self = $_[0];
563 0           my $val = $_[1];
564              
565             # make sure we have a socket
566 0 0         return unless $self->{sock};
567 0 0         return if $val == $self->{corked};
568              
569 0           my $rv;
570 0           if (TCP_CORK) {
571 0 0         $rv = setsockopt($self->{sock}, IPPROTO_TCP, TCP_CORK,
572             pack("l", $val ? 1 : 0));
573             } else {
574             # FIXME: implement freebsd *PUSH sockopts
575             $rv = 1;
576             }
577              
578             # if we failed, close (if we're not already) and warn about the error
579 0 0         if ($rv) {
580 0           $self->{corked} = $val;
581             } else {
582 0 0 0       if ($! == EBADF || $! == ENOTSOCK) {
    0 0        
      0        
583             # internal state is probably corrupted; warn and then close if
584             # we're not closed already
585 0           warn "setsockopt: $!";
586 0           $self->close('tcp_cork_failed');
587             } elsif ($! == ENOPROTOOPT || $!{ENOTSOCK} || $!{EOPNOTSUPP}) {
588             # TCP implementation doesn't support corking, so just ignore it
589             # or we're trying to tcp-cork a non-socket (like a socketpair pipe
590             # which is acting like a socket, which Perlbal does for child
591             # processes acting like inetd-like web servers)
592             } else {
593             # some other error; we should never hit here, but if we do, die
594 0           die "setsockopt: $!";
595             }
596             }
597             }
598              
599             =head2 C<< $obj->steal_socket() >>
600              
601             Basically returns our socket and makes it so that we don't try to close it,
602             but we do remove it from epoll handlers. THIS CLOSES $self. It is the same
603             thing as calling close, except it gives you the socket to use.
604              
605             =cut
606             sub steal_socket {
607 0     0 0   my Danga::Socket $self = $_[0];
608 0 0         return if $self->{closed};
609              
610             # cleanup does most of the work of closing this socket
611 0           $self->_cleanup();
612              
613             # now undef our internal sock and fd structures so we don't use them
614 0           my $sock = $self->{sock};
615 0           $self->{sock} = undef;
616 0           return $sock;
617             }
618              
619             =head2 C<< $obj->close( [$reason] ) >>
620              
621             Close the socket. The I argument will be used in debugging messages.
622              
623             =cut
624             sub close {
625 0     0 0   my Danga::Socket $self = $_[0];
626 0 0         return if $self->{closed};
627              
628             # print out debugging info for this close
629 0           if (DebugLevel) {
630             my ($pkg, $filename, $line) = caller;
631             my $reason = $_[1] || "";
632             warn "Closing \#$self->{fd} due to $pkg/$filename/$line ($reason)\n";
633             }
634              
635             # this does most of the work of closing us
636 0           $self->_cleanup();
637              
638             # defer closing the actual socket until the event loop is done
639             # processing this round of events. (otherwise we might reuse fds)
640 0 0         if ($self->{sock}) {
641 0           push @ToClose, $self->{sock};
642 0           $self->{sock} = undef;
643             }
644              
645 0           return 0;
646             }
647              
648             ### METHOD: _cleanup()
649             ### Called by our closers so we can clean internal data structures.
650             sub _cleanup {
651 0     0     my Danga::Socket $self = $_[0];
652              
653             # we're effectively closed; we have no fd and sock when we leave here
654 0           $self->{closed} = 1;
655              
656             # we need to flush our write buffer, as there may
657             # be self-referential closures (sub { $client->close })
658             # preventing the object from being destroyed
659 0           $self->{write_buf} = [];
660              
661             # uncork so any final data gets sent. only matters if the person closing
662             # us forgot to do it, but we do it to be safe.
663 0           $self->tcp_cork(0);
664              
665             # now delete from mappings. this fd no longer belongs to us, so we don't want
666             # to get alerts for it if it becomes writable/readable/etc.
667 0           delete $PushBackSet{$self->{fd}};
668 0           delete $PLCMap{$self->{fd}};
669 0           delete $FdWatchers{$self->{fd}};
670              
671             # we explicitly don't delete from DescriptorMap here until we
672             # actually close the socket, as we might be in the middle of
673             # processing an epoll_wait/etc that returned hundreds of fds, one
674             # of which is not yet processed and is what we're closing. if we
675             # keep it in DescriptorMap, then the event harnesses can just
676             # looked at $pob->{closed} and ignore it. but if it's an
677             # un-accounted for fd, then it (understandably) freak out a bit
678             # and emit warnings, thinking their state got off.
679              
680             # and finally get rid of our fd so we can't use it anywhere else
681 0           $self->{fd} = undef;
682             }
683              
684             =head2 C<< $obj->sock() >>
685              
686             Returns the underlying IO::Handle for the object.
687              
688             =cut
689             sub sock {
690 0     0 0   my Danga::Socket $self = shift;
691 0           return $self->{sock};
692             }
693              
694             =head2 C<< $obj->set_writer_func( CODEREF ) >>
695              
696             Sets a function to use instead of C when writing data to the socket.
697              
698             =cut
699             sub set_writer_func {
700 0     0 0   my Danga::Socket $self = shift;
701 0           my $wtr = shift;
702 0 0 0       Carp::croak("Not a subref") unless !defined $wtr || UNIVERSAL::isa($wtr, "CODE");
703 0           $self->{writer_func} = $wtr;
704             }
705              
706             =head2 C<< $obj->write( $data ) >>
707              
708             Write the specified data to the underlying handle. I may be scalar,
709             scalar ref, code ref (to run when there), or undef just to kick-start.
710             Returns 1 if writes all went through, or 0 if there are writes in queue. If
711             it returns 1, caller should stop waiting for 'writable' events)
712              
713             =cut
714             sub write {
715 0     0 0   my Danga::Socket $self;
716             my $data;
717 0           ($self, $data) = @_;
718              
719             # nobody should be writing to closed sockets, but caller code can
720             # do two writes within an event, have the first fail and
721             # disconnect the other side (whose destructor then closes the
722             # calling object, but it's still in a method), and then the
723             # now-dead object does its second write. that is this case. we
724             # just lie and say it worked. it'll be dead soon and won't be
725             # hurt by this lie.
726 0 0         return 1 if $self->{closed};
727              
728 0           my $bref;
729              
730             # just queue data if there's already a wait
731             my $need_queue;
732              
733 0 0         if (defined $data) {
734 0 0         $bref = ref $data ? $data : \$data;
735 0 0         if ($self->{write_buf_size}) {
736 0           push @{$self->{write_buf}}, $bref;
  0            
737 0 0         $self->{write_buf_size} += ref $bref eq "SCALAR" ? length($$bref) : 1;
738 0           return 0;
739             }
740              
741             # this flag says we're bypassing the queue system, knowing we're the
742             # only outstanding write, and hoping we don't ever need to use it.
743             # if so later, though, we'll need to queue
744 0           $need_queue = 1;
745             }
746              
747             WRITE:
748 0           while (1) {
749 0 0 0       return 1 unless $bref ||= $self->{write_buf}[0];
750              
751 0           my $len;
752 0           eval {
753 0           $len = length($$bref); # this will die if $bref is a code ref, caught below
754             };
755 0 0         if ($@) {
756 0 0         if (UNIVERSAL::isa($bref, "CODE")) {
757 0 0         unless ($need_queue) {
758 0           $self->{write_buf_size}--; # code refs are worth 1
759 0           shift @{$self->{write_buf}};
  0            
760             }
761 0           $bref->();
762              
763             # code refs are just run and never get reenqueued
764             # (they're one-shot), so turn off the flag indicating the
765             # outstanding data needs queueing.
766 0           $need_queue = 0;
767              
768 0           undef $bref;
769 0           next WRITE;
770             }
771 0           die "Write error: $@ <$bref>";
772             }
773              
774 0           my $to_write = $len - $self->{write_buf_offset};
775 0           my $written;
776 0 0         if (my $wtr = $self->{writer_func}) {
777 0           $written = $wtr->($bref, $to_write, $self->{write_buf_offset});
778             } else {
779 0           $written = syswrite($self->{sock}, $$bref, $to_write, $self->{write_buf_offset});
780             }
781              
782 0 0         if (! defined $written) {
    0          
    0          
783 0 0         if ($! == EPIPE) {
    0          
    0          
784 0           return $self->close("EPIPE");
785             } elsif ($! == EAGAIN) {
786             # since connection has stuff to write, it should now be
787             # interested in pending writes:
788 0 0         if ($need_queue) {
789 0           push @{$self->{write_buf}}, $bref;
  0            
790 0           $self->{write_buf_size} += $len;
791             }
792 0 0         $self->{write_set_watch} = 1 unless $self->{event_watch} & POLLOUT;
793 0           $self->watch_write(1);
794 0           return 0;
795             } elsif ($! == ECONNRESET) {
796 0           return $self->close("ECONNRESET");
797             }
798              
799 0           DebugLevel >= 1 && $self->debugmsg("Closing connection ($self) due to write error: $!\n");
800              
801 0           return $self->close("write_error");
802             } elsif ($written != $to_write) {
803             DebugLevel >= 2 && $self->debugmsg("Wrote PARTIAL %d bytes to %d",
804 0           $written, $self->{fd});
805 0 0         if ($need_queue) {
806 0           push @{$self->{write_buf}}, $bref;
  0            
807 0           $self->{write_buf_size} += $len;
808             }
809             # since connection has stuff to write, it should now be
810             # interested in pending writes:
811 0           $self->{write_buf_offset} += $written;
812 0           $self->{write_buf_size} -= $written;
813 0           $self->on_incomplete_write;
814 0           return 0;
815             } elsif ($written == $to_write) {
816             DebugLevel >= 2 && $self->debugmsg("Wrote ALL %d bytes to %d (nq=%d)",
817 0           $written, $self->{fd}, $need_queue);
818 0           $self->{write_buf_offset} = 0;
819              
820 0 0         if ($self->{write_set_watch}) {
821 0           $self->watch_write(0);
822 0           $self->{write_set_watch} = 0;
823             }
824              
825             # this was our only write, so we can return immediately
826             # since we avoided incrementing the buffer size or
827             # putting it in the buffer. we also know there
828             # can't be anything else to write.
829 0 0         return 1 if $need_queue;
830              
831 0           $self->{write_buf_size} -= $written;
832 0           shift @{$self->{write_buf}};
  0            
833 0           undef $bref;
834 0           next WRITE;
835             }
836             }
837             }
838              
839             sub on_incomplete_write {
840 0     0 0   my Danga::Socket $self = shift;
841 0 0         $self->{write_set_watch} = 1 unless $self->{event_watch} & POLLOUT;
842 0           $self->watch_write(1);
843             }
844              
845             =head2 C<< $obj->push_back_read( $buf ) >>
846              
847             Push back I (a scalar or scalarref) into the read stream. Useful if you read
848             more than you need to and want to return this data on the next "read".
849              
850             =cut
851             sub push_back_read {
852 0     0 0   my Danga::Socket $self = shift;
853 0           my $buf = shift;
854 0 0         push @{$self->{read_push_back}}, ref $buf ? $buf : \$buf;
  0            
855 0           $PushBackSet{$self->{fd}} = $self;
856             }
857              
858             =head2 C<< $obj->read( $bytecount ) >>
859              
860             Read at most I bytes from the underlying handle; returns scalar
861             ref on read, or undef on connection closed.
862              
863             =cut
864             sub read {
865 0     0 0   my Danga::Socket $self = shift;
866 0 0         return if $self->{closed};
867 0           my $bytes = shift;
868 0           my $buf;
869 0           my $sock = $self->{sock};
870              
871 0 0         if (@{$self->{read_push_back}}) {
  0            
872 0           $buf = shift @{$self->{read_push_back}};
  0            
873 0           my $len = length($$buf);
874              
875 0 0         if ($len <= $bytes) {
876 0 0         delete $PushBackSet{$self->{fd}} unless @{$self->{read_push_back}};
  0            
877 0           return $buf;
878             } else {
879             # if the pushed back read is too big, we have to split it
880 0           my $overflow = substr($$buf, $bytes);
881 0           $buf = substr($$buf, 0, $bytes);
882 0           unshift @{$self->{read_push_back}}, \$overflow;
  0            
883 0           return \$buf;
884             }
885             }
886              
887             # if this is too high, perl quits(!!). reports on mailing lists
888             # don't seem to point to a universal answer. 5MB worked for some,
889             # crashed for others. 1MB works for more people. let's go with 1MB
890             # for now. :/
891 0 0         my $req_bytes = $bytes > 1048576 ? 1048576 : $bytes;
892              
893 0           my $res = sysread($sock, $buf, $req_bytes, 0);
894 0           DebugLevel >= 2 && $self->debugmsg("sysread = %d; \$! = %d", $res, $!);
895              
896 0 0 0       if (! $res && $! != EWOULDBLOCK) {
897             # catches 0=conn closed or undef=error
898 0           DebugLevel >= 2 && $self->debugmsg("Fd \#%d read hit the end of the road.", $self->{fd});
899 0           return undef;
900             }
901              
902 0           return \$buf;
903             }
904              
905             =head2 (VIRTUAL) C<< $obj->event_read() >>
906              
907             Readable event handler. Concrete deriviatives of Danga::Socket should
908             provide an implementation of this. The default implementation will die if
909             called.
910              
911             =cut
912 0     0 0   sub event_read { die "Base class event_read called for $_[0]\n"; }
913              
914             =head2 (VIRTUAL) C<< $obj->event_err() >>
915              
916             Error event handler. Concrete deriviatives of Danga::Socket should
917             provide an implementation of this. The default implementation will die if
918             called.
919              
920             =cut
921 0     0 0   sub event_err { die "Base class event_err called for $_[0]\n"; }
922              
923             =head2 (VIRTUAL) C<< $obj->event_hup() >>
924              
925             'Hangup' event handler. Concrete deriviatives of Danga::Socket should
926             provide an implementation of this. The default implementation will die if
927             called.
928              
929             =cut
930 0     0 0   sub event_hup { die "Base class event_hup called for $_[0]\n"; }
931              
932             =head2 C<< $obj->event_write() >>
933              
934             Writable event handler. Concrete deriviatives of Danga::Socket may wish to
935             provide an implementation of this. The default implementation calls
936             C with an C.
937              
938             =cut
939             sub event_write {
940 0     0 0   my $self = shift;
941 0           $self->write(undef);
942             }
943              
944             =head2 C<< $obj->watch_read( $boolean ) >>
945              
946             Turn 'readable' event notification on or off.
947              
948             =cut
949             sub watch_read {
950 0     0 0   my Danga::Socket $self = shift;
951 0 0 0       return if $self->{closed} || !$self->{sock};
952              
953 0           my $val = shift;
954 0           my $fd = fileno($self->{sock});
955 0           my $watchers = $FdWatchers{$fd};
956 0 0         if ($val) {
957             $watchers->[0] = AnyEvent->io(
958             fh => $fd,
959             poll => 'r',
960             cb => _wrap_watcher_cb(sub {
961 0 0   0     $self->event_read() unless $self->{closed};
962 0           }),
963             );
964             }
965             else {
966 0           $watchers->[0] = undef;
967             }
968             }
969              
970             =head2 C<< $obj->watch_write( $boolean ) >>
971              
972             Turn 'writable' event notification on or off.
973              
974             =cut
975             sub watch_write {
976 0     0 0   my Danga::Socket $self = shift;
977 0 0 0       return if $self->{closed} || !$self->{sock};
978              
979 0           my $val = shift;
980 0           my $fd = fileno($self->{sock});
981              
982 0 0 0       if ($val && caller ne __PACKAGE__) {
983             # A subclass registered interest, it's now responsible for this.
984 0           $self->{write_set_watch} = 0;
985             }
986              
987 0           my $watchers = $FdWatchers{$fd};
988 0 0         if ($val) {
989             $watchers->[1] = AnyEvent->io(
990             fh => $fd,
991             poll => 'w',
992             cb => _wrap_watcher_cb(sub {
993 0 0   0     $self->event_write() unless $self->{closed};
994 0           }),
995             );
996             }
997             else {
998 0           $watchers->[1] = undef;
999             }
1000             }
1001              
1002             =head2 C<< $obj->dump_error( $message ) >>
1003              
1004             Prints to STDERR a backtrace with information about this socket and what lead
1005             up to the dump_error call.
1006              
1007             =cut
1008             sub dump_error {
1009 0     0 0   my $i = 0;
1010 0           my @list;
1011 0           while (my ($file, $line, $sub) = (caller($i++))[1..3]) {
1012 0           push @list, "\t$file:$line called $sub\n";
1013             }
1014              
1015 0           warn "ERROR: $_[1]\n" .
1016             "\t$_[0] = " . $_[0]->as_string . "\n" .
1017             join('', @list);
1018             }
1019              
1020             =head2 C<< $obj->debugmsg( $format, @args ) >>
1021              
1022             Print the debugging message specified by the C-style I and
1023             I.
1024              
1025             =cut
1026             sub debugmsg {
1027 0     0 0   my ( $self, $fmt, @args ) = @_;
1028 0 0         confess "Not an object" unless ref $self;
1029              
1030 0           chomp $fmt;
1031 0           printf STDERR ">>> $fmt\n", @args;
1032             }
1033              
1034              
1035             =head2 C<< $obj->peer_ip_string() >>
1036              
1037             Returns the string describing the peer's IP
1038              
1039             =cut
1040             sub peer_ip_string {
1041 0     0 0   my Danga::Socket $self = shift;
1042 0 0         return _undef("peer_ip_string undef: no sock") unless $self->{sock};
1043 0 0         return $self->{peer_ip} if defined $self->{peer_ip};
1044              
1045 0           my $pn = getpeername($self->{sock});
1046 0 0         return _undef("peer_ip_string undef: getpeername") unless $pn;
1047              
1048 0           my ($port, $iaddr) = eval {
1049 0 0         if (length($pn) >= 28) {
1050 0           return Socket6::unpack_sockaddr_in6($pn);
1051             } else {
1052 0           return Socket::sockaddr_in($pn);
1053             }
1054             };
1055              
1056 0 0         if ($@) {
1057 0           $self->{peer_port} = "[Unknown peerport '$@']";
1058 0           return "[Unknown peername '$@']";
1059             }
1060              
1061 0           $self->{peer_port} = $port;
1062              
1063 0 0         if (length($iaddr) == 4) {
1064 0           return $self->{peer_ip} = Socket::inet_ntoa($iaddr);
1065             } else {
1066 0           $self->{peer_v6} = 1;
1067 0           return $self->{peer_ip} = Socket6::inet_ntop(Socket6::AF_INET6(),
1068             $iaddr);
1069             }
1070             }
1071              
1072             =head2 C<< $obj->peer_addr_string() >>
1073              
1074             Returns the string describing the peer for the socket which underlies this
1075             object in form "ip:port"
1076              
1077             =cut
1078             sub peer_addr_string {
1079 0     0 0   my Danga::Socket $self = shift;
1080 0 0         my $ip = $self->peer_ip_string
1081             or return undef;
1082             return $self->{peer_v6} ?
1083 0 0         "[$ip]:$self->{peer_port}" :
1084             "$ip:$self->{peer_port}";
1085             }
1086              
1087             =head2 C<< $obj->local_ip_string() >>
1088              
1089             Returns the string describing the local IP
1090              
1091             =cut
1092             sub local_ip_string {
1093 0     0 0   my Danga::Socket $self = shift;
1094 0 0         return _undef("local_ip_string undef: no sock") unless $self->{sock};
1095 0 0         return $self->{local_ip} if defined $self->{local_ip};
1096              
1097 0           my $pn = getsockname($self->{sock});
1098 0 0         return _undef("local_ip_string undef: getsockname") unless $pn;
1099              
1100 0           my ($port, $iaddr) = Socket::sockaddr_in($pn);
1101 0           $self->{local_port} = $port;
1102              
1103 0           return $self->{local_ip} = Socket::inet_ntoa($iaddr);
1104             }
1105              
1106             =head2 C<< $obj->local_addr_string() >>
1107              
1108             Returns the string describing the local end of the socket which underlies this
1109             object in form "ip:port"
1110              
1111             =cut
1112             sub local_addr_string {
1113 0     0 0   my Danga::Socket $self = shift;
1114 0           my $ip = $self->local_ip_string;
1115 0 0         return $ip ? "$ip:$self->{local_port}" : undef;
1116             }
1117              
1118              
1119             =head2 C<< $obj->as_string() >>
1120              
1121             Returns a string describing this socket.
1122              
1123             =cut
1124             sub as_string {
1125 0     0 0   my Danga::Socket $self = shift;
1126             my $rw = "(" . ($self->{event_watch} & POLLIN ? 'R' : '') .
1127 0 0         ($self->{event_watch} & POLLOUT ? 'W' : '') . ")";
    0          
1128 0 0         my $ret = ref($self) . "$rw: " . ($self->{closed} ? "closed" : "open");
1129 0           my $peer = $self->peer_addr_string;
1130 0 0         if ($peer) {
1131 0           $ret .= " to " . $self->peer_addr_string;
1132             }
1133 0           return $ret;
1134             }
1135              
1136             sub _undef {
1137 0 0   0     return undef unless $ENV{DS_DEBUG};
1138 0   0       my $msg = shift || "";
1139 0           warn "Danga::Socket: $msg\n";
1140 0           return undef;
1141             }
1142              
1143             package # Hide from PAUSE
1144             Danga::Socket::Timer;
1145             # [$cancel_coderef];
1146             sub cancel {
1147 0     0     $_[0][0]->();
1148             }
1149              
1150             =head1 AUTHORS
1151              
1152             Martin Atkins
1153              
1154             Based on L by Brad Fitzpatrick and others.
1155              
1156             =head1 LICENSE
1157              
1158             License is granted to use and distribute this module under the same
1159             terms as Perl itself.
1160              
1161             =cut
1162              
1163             # Pretend that we loaded Danga::Socket so that
1164             # later "use Danga::Socket" calls don't conflict.
1165             $INC{"Danga/Socket.pm"} = __FILE__;
1166              
1167             1;
1168