File Coverage

blib/lib/Danga/Socket.pm
Criterion Covered Total %
statement 323 527 61.2
branch 125 292 42.8
condition 38 119 31.9
subroutine 46 69 66.6
pod 36 43 83.7
total 568 1050 54.1


line stmt bran cond sub pod time code
1             ###########################################################################
2              
3             =head1 NAME
4              
5             Danga::Socket - Event loop and event-driven async socket base class
6              
7             =head1 SYNOPSIS
8              
9             package My::Socket
10             use Danga::Socket;
11             use base ('Danga::Socket');
12             use fields ('my_attribute');
13              
14             sub new {
15             my My::Socket $self = shift;
16             $self = fields::new($self) unless ref $self;
17             $self->SUPER::new( @_ );
18              
19             $self->{my_attribute} = 1234;
20             return $self;
21             }
22              
23             sub event_err { ... }
24             sub event_hup { ... }
25             sub event_write { ... }
26             sub event_read { ... }
27             sub close { ... }
28              
29             $my_sock->tcp_cork($bool);
30              
31             # write returns 1 if all writes have gone through, or 0 if there
32             # are writes in queue
33             $my_sock->write($scalar);
34             $my_sock->write($scalarref);
35             $my_sock->write(sub { ... }); # run when previous data written
36             $my_sock->write(undef); # kick-starts
37              
38             # read max $bytecount bytes, or undef on connection closed
39             $scalar_ref = $my_sock->read($bytecount);
40              
41             # watch for writability. not needed with ->write(). write()
42             # will automatically turn on watch_write when you wrote too much
43             # and turn it off when done
44             $my_sock->watch_write($bool);
45              
46             # watch for readability
47             $my_sock->watch_read($bool);
48              
49             # if you read too much and want to push some back on
50             # readable queue. (not incredibly well-tested)
51             $my_sock->push_back_read($buf); # scalar or scalar ref
52              
53             Danga::Socket->AddOtherFds(..);
54             Danga::Socket->SetLoopTimeout($millisecs);
55             Danga::Socket->DescriptorMap();
56             Danga::Socket->WatchedSockets(); # count of DescriptorMap keys
57             Danga::Socket->SetPostLoopCallback($code);
58             Danga::Socket->EventLoop();
59              
60             =head1 DESCRIPTION
61              
62             This is an abstract base class for objects backed by a socket which
63             provides the basic framework for event-driven asynchronous IO,
64             designed to be fast. Danga::Socket is both a base class for objects,
65             and an event loop.
66              
67             Callers subclass Danga::Socket. Danga::Socket's constructor registers
68             itself with the Danga::Socket event loop, and invokes callbacks on the
69             object for readability, writability, errors, and other conditions.
70              
71             Because Danga::Socket uses the "fields" module, your subclasses must
72             too.
73              
74             =head1 MORE INFO
75              
76             For now, see servers using Danga::Socket for guidance. For example:
77             perlbal, mogilefsd, or ddlockd.
78              
79             =head1 API
80              
81             Note where "C" is used below, normally you would call these methods as:
82              
83             Danga::Socket->method(...);
84              
85             However using a subclass works too.
86              
87             The CLASS methods are all methods for the event loop part of Danga::Socket,
88             whereas the object methods are all used on your subclasses.
89              
90             =cut
91              
92             ###########################################################################
93              
94             package Danga::Socket;
95 4     4   146862 use strict;
  4         30  
  4         85  
96 4     4   1794 use bytes;
  4         44  
  4         16  
97 4     4   1408 use POSIX ();
  4         18582  
  4         75  
98 4     4   1541 use Time::HiRes ();
  4         3953  
  4         110  
99              
100 4     4   528 my $opt_bsd_resource = eval "use BSD::Resource; 1;";
  0         0  
  0         0  
101              
102 4     4   19 use vars qw{$VERSION};
  4         5  
  4         177  
103             $VERSION = "1.62-TRIAL";
104              
105 4     4   18 use warnings;
  4         5  
  4         80  
106 4     4   14 no warnings qw(deprecated);
  4         4  
  4         117  
107              
108 4     4   1385 use Sys::Syscall qw(:epoll);
  4         10013  
  4         575  
109              
110 4         13 use fields ('sock', # underlying socket
111             'fd', # numeric file descriptor
112             'write_buf', # arrayref of scalars, scalarrefs, or coderefs to write
113             'write_buf_offset', # offset into first array of write_buf to start writing at
114             'write_buf_size', # total length of data in all write_buf items
115             'write_set_watch', # bool: true if we internally set watch_write rather than by a subclass
116             'read_push_back', # arrayref of "pushed-back" read data the application didn't want
117             'closed', # bool: socket is closed
118             'corked', # bool: socket is corked
119             'event_watch', # bitmask of events the client is interested in (POLLIN,OUT,etc.)
120             'peer_v6', # bool: cached; if peer is an IPv6 address
121             'peer_ip', # cached stringified IP address of $sock
122             'peer_port', # cached port number of $sock
123             'local_ip', # cached stringified IP address of local end of $sock
124             'local_port', # cached port number of local end of $sock
125             'writer_func', # subref which does writing. must return bytes written (or undef) and set $! on errors
126 4     4   1377 );
  4         4448  
127              
128 4         355 use Errno qw(EINPROGRESS EWOULDBLOCK EISCONN ENOTSOCK
129 4     4   1453 EPIPE EAGAIN EBADF ECONNRESET ENOPROTOOPT);
  4         2870  
130 4     4   1634 use Socket qw(IPPROTO_TCP);
  4         10404  
  4         448  
131 4     4   22 use Carp qw(croak confess);
  4         6  
  4         162  
132              
133 4 50   4   17 use constant TCP_CORK => ($^O eq "linux" ? 3 : 0); # FIXME: not hard-coded (Linux-specific too)
  4         5  
  4         178  
134 4     4   15 use constant DebugLevel => 0;
  4         5  
  4         120  
135              
136 4     4   15 use constant POLLIN => 1;
  4         7  
  4         109  
137 4     4   14 use constant POLLOUT => 4;
  4         13  
  4         128  
138 4     4   16 use constant POLLERR => 8;
  4         4  
  4         111  
139 4     4   14 use constant POLLHUP => 16;
  4         7  
  4         132  
140 4     4   15 use constant POLLNVAL => 32;
  4         5  
  4         11929  
141              
142             our $HAVE_KQUEUE = eval { require IO::KQueue; 1 };
143              
144             our (
145             $HaveEpoll, # Flag -- is epoll available? initially undefined.
146             $HaveKQueue,
147             %DescriptorMap, # fd (num) -> Danga::Socket object
148             %PushBackSet, # fd (num) -> Danga::Socket (fds with pushed back read data)
149             $Epoll, # Global epoll fd (for epoll mode only)
150             $KQueue, # Global kqueue fd (for kqueue mode only)
151             @ToClose, # sockets to close when event loop is done
152             %OtherFds, # A hash of "other" (non-Danga::Socket) file
153             # descriptors for the event loop to track.
154              
155             $PostLoopCallback, # subref to call at the end of each loop, if defined (global)
156             %PLCMap, # fd (num) -> PostLoopCallback (per-object)
157              
158             $LoopTimeout, # timeout of event loop in milliseconds
159             $DoProfile, # if on, enable profiling
160             %Profiling, # what => [ utime, stime, calls ]
161             $DoneInit, # if we've done the one-time module init yet
162             @Timers, # timers
163             );
164              
165             Reset();
166              
167             #####################################################################
168             ### C L A S S M E T H O D S
169             #####################################################################
170              
171             =head2 C<< CLASS->Reset() >>
172              
173             Reset all state
174              
175             =cut
176             sub Reset {
177 6     6 1 1468 %DescriptorMap = ();
178 6         49 %PushBackSet = ();
179 6         10 @ToClose = ();
180 6         8 %OtherFds = ();
181 6         11 $LoopTimeout = -1; # no timeout by default
182 6         7 $DoProfile = 0;
183 6         9 %Profiling = ();
184 6         8 @Timers = ();
185              
186 6         17 $PostLoopCallback = undef;
187 6         7 %PLCMap = ();
188 6         9 $DoneInit = 0;
189              
190 6 100 66     52 POSIX::close($Epoll) if defined $Epoll && $Epoll >= 0;
191 6 50 33     19 POSIX::close($KQueue) if defined $KQueue && $KQueue >= 0;
192            
193 6         25 *EventLoop = *FirstTimeEventLoop;
194             }
195              
196             =head2 C<< CLASS->HaveEpoll() >>
197              
198             Returns a true value if this class will use IO::Epoll for async IO.
199              
200             =cut
201             sub HaveEpoll {
202 1     1 1 79 _InitPoller();
203 1         4 return $HaveEpoll;
204             }
205              
206             =head2 C<< CLASS->WatchedSockets() >>
207              
208             Returns the number of file descriptors which are registered with the global
209             poll object.
210              
211             =cut
212             sub WatchedSockets {
213 4     4 1 1590 return scalar keys %DescriptorMap;
214             }
215             *watched_sockets = *WatchedSockets;
216              
217             =head2 C<< CLASS->EnableProfiling() >>
218              
219             Turns profiling on, clearing current profiling data.
220              
221             =cut
222             sub EnableProfiling {
223 0 0   0 1 0 if ($opt_bsd_resource) {
224 0         0 %Profiling = ();
225 0         0 $DoProfile = 1;
226 0         0 return 1;
227             }
228 0         0 return 0;
229             }
230              
231             =head2 C<< CLASS->DisableProfiling() >>
232              
233             Turns off profiling, but retains data up to this point
234              
235             =cut
236             sub DisableProfiling {
237 0     0 1 0 $DoProfile = 0;
238             }
239              
240             =head2 C<< CLASS->ProfilingData() >>
241              
242             Returns reference to a hash of data in format:
243              
244             ITEM => [ utime, stime, #calls ]
245              
246             =cut
247             sub ProfilingData {
248 0     0 1 0 return \%Profiling;
249             }
250              
251             =head2 C<< CLASS->ToClose() >>
252              
253             Return the list of sockets that are awaiting close() at the end of the
254             current event loop.
255              
256             =cut
257 0     0 1 0 sub ToClose { return @ToClose; }
258              
259             =head2 C<< CLASS->OtherFds( [%fdmap] ) >>
260              
261             Get/set the hash of file descriptors that need processing in parallel with
262             the registered Danga::Socket objects.
263              
264             =cut
265             sub OtherFds {
266 0     0 1 0 my $class = shift;
267 0 0       0 if ( @_ ) { %OtherFds = @_ }
  0         0  
268 0 0       0 return wantarray ? %OtherFds : \%OtherFds;
269             }
270              
271             =head2 C<< CLASS->AddOtherFds( [%fdmap] ) >>
272              
273             Add fds to the OtherFds hash for processing.
274              
275             =cut
276             sub AddOtherFds {
277 0     0 1 0 my $class = shift;
278 0         0 %OtherFds = ( %OtherFds, @_ ); # FIXME investigate what happens on dupe fds
279 0 0       0 return wantarray ? %OtherFds : \%OtherFds;
280             }
281              
282             =head2 C<< CLASS->SetLoopTimeout( $timeout ) >>
283              
284             Set the loop timeout for the event loop to some value in milliseconds.
285              
286             A timeout of 0 (zero) means poll forever. A timeout of -1 means poll and return
287             immediately.
288              
289             =cut
290             sub SetLoopTimeout {
291 3     3 1 72 return $LoopTimeout = $_[1] + 0;
292             }
293              
294             =head2 C<< CLASS->DebugMsg( $format, @args ) >>
295              
296             Print the debugging message specified by the C-style I and
297             I
298              
299             =cut
300             sub DebugMsg {
301 0     0 1 0 my ( $class, $fmt, @args ) = @_;
302 0         0 chomp $fmt;
303 0         0 printf STDERR ">>> $fmt\n", @args;
304             }
305              
306             =head2 C<< CLASS->AddTimer( $seconds, $coderef ) >>
307              
308             Add a timer to occur $seconds from now. $seconds may be fractional, but timers
309             are not guaranteed to fire at the exact time you ask for.
310              
311             Returns a timer object which you can call C<< $timer->cancel >> on if you need to.
312              
313             =cut
314             sub AddTimer {
315 6     6 1 1705 my $class = shift;
316 6         14 my ($secs, $coderef) = @_;
317              
318 6         26 my $fire_time = Time::HiRes::time() + $secs;
319              
320 6         24 my $timer = bless [$fire_time, $coderef], "Danga::Socket::Timer";
321              
322 6 100 100     44 if (!@Timers || $fire_time >= $Timers[-1][0]) {
323 3         9 push @Timers, $timer;
324 3         14 return $timer;
325             }
326              
327             # Now, where do we insert? (NOTE: this appears slow, algorithm-wise,
328             # but it was compared against calendar queues, heaps, naive push/sort,
329             # and a bunch of other versions, and found to be fastest with a large
330             # variety of datasets.)
331 3         14 for (my $i = 0; $i < @Timers; $i++) {
332 7 100       25 if ($Timers[$i][0] > $fire_time) {
333 3         11 splice(@Timers, $i, 0, $timer);
334 3         10 return $timer;
335             }
336             }
337              
338 0         0 die "Shouldn't get here.";
339             }
340              
341             =head2 C<< CLASS->DescriptorMap() >>
342              
343             Get the hash of Danga::Socket objects keyed by the file descriptor (fileno) they
344             are wrapping.
345              
346             Returns a hash in list context or a hashref in scalar context.
347              
348             =cut
349             sub DescriptorMap {
350 2 50   2 1 422 return wantarray ? %DescriptorMap : \%DescriptorMap;
351             }
352             *descriptor_map = *DescriptorMap;
353             *get_sock_ref = *DescriptorMap;
354              
355             sub _InitPoller
356             {
357 12 100   12   38 return if $DoneInit;
358 4         7 $DoneInit = 1;
359              
360 4 50       34 if ($HAVE_KQUEUE) {
    50          
361 0         0 $KQueue = IO::KQueue->new();
362 0         0 $HaveKQueue = $KQueue >= 0;
363 0 0       0 if ($HaveKQueue) {
364 0         0 *EventLoop = *KQueueEventLoop;
365             }
366             }
367             elsif (Sys::Syscall::epoll_defined()) {
368 4         23 $Epoll = eval { epoll_create(1024); };
  4         15  
369 4   33     124 $HaveEpoll = defined $Epoll && $Epoll >= 0;
370 4 50       13 if ($HaveEpoll) {
371 4         30 *EventLoop = *EpollEventLoop;
372             }
373             }
374              
375 4 50 33     19 if (!$HaveEpoll && !$HaveKQueue) {
376 0         0 require IO::Poll;
377 0         0 *EventLoop = *PollEventLoop;
378             }
379             }
380              
381             =head2 C<< CLASS->EventLoop() >>
382              
383             Start processing IO events. In most daemon programs this never exits. See
384             C below for how to exit the loop.
385              
386             =cut
387             sub FirstTimeEventLoop {
388 1     1 0 6 my $class = shift;
389              
390 1         3 _InitPoller();
391              
392 1 50       3 if ($HaveEpoll) {
    0          
393 1         3 EpollEventLoop($class);
394             } elsif ($HaveKQueue) {
395 0         0 KQueueEventLoop($class);
396             } else {
397 0         0 PollEventLoop($class);
398             }
399             }
400              
401             ## profiling-related data/functions
402             our ($Prof_utime0, $Prof_stime0);
403             sub _pre_profile {
404 0     0   0 ($Prof_utime0, $Prof_stime0) = getrusage();
405             }
406              
407             sub _post_profile {
408             # get post information
409 0     0   0 my ($autime, $astime) = getrusage();
410              
411             # calculate differences
412 0         0 my $utime = $autime - $Prof_utime0;
413 0         0 my $stime = $astime - $Prof_stime0;
414              
415 0         0 foreach my $k (@_) {
416 0   0     0 $Profiling{$k} ||= [ 0.0, 0.0, 0 ];
417 0         0 $Profiling{$k}->[0] += $utime;
418 0         0 $Profiling{$k}->[1] += $stime;
419 0         0 $Profiling{$k}->[2]++;
420             }
421             }
422              
423             # runs timers and returns milliseconds for next one, or next event loop
424             sub RunTimers {
425 49 100   49 0 152 return $LoopTimeout unless @Timers;
426              
427 21         126 my $now = Time::HiRes::time();
428              
429             # Run expired timers
430 21   100     211 while (@Timers && $Timers[0][0] <= $now) {
431 6         17 my $to_run = shift(@Timers);
432 6 50       36 $to_run->[1]->($now) if $to_run->[1];
433             }
434              
435 21 100       4464 return $LoopTimeout unless @Timers;
436              
437             # convert time to an even number of milliseconds, adding 1
438             # extra, otherwise floating point fun can occur and we'll
439             # call RunTimers like 20-30 times, each returning a timeout
440             # of 0.0000212 seconds
441 20         93 my $timeout = int(($Timers[0][0] - $now) * 1000) + 1;
442              
443             # -1 is an infinite timeout, so prefer a real timeout
444 20 50       64 return $timeout if $LoopTimeout == -1;
445              
446             # otherwise pick the lower of our regular timeout and time until
447             # the next timer
448 20 100       87 return $LoopTimeout if $LoopTimeout < $timeout;
449 5         14 return $timeout;
450             }
451              
452             ### The epoll-based event loop. Gets installed as EventLoop if IO::Epoll loads
453             ### okay.
454             sub EpollEventLoop {
455 5     5 0 34 my $class = shift;
456              
457 5         18 foreach my $fd ( keys %OtherFds ) {
458 0 0       0 if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, EPOLLIN) == -1) {
459 0         0 warn "epoll_ctl(): failure adding fd=$fd; $! (", $!+0, ")\n";
460             }
461             }
462              
463 5         8 while (1) {
464 43         99 my @events;
465             my $i;
466 43         131 my $timeout = RunTimers();
467              
468             # get up to 1000 events
469 43         180 my $evcount = epoll_wait($Epoll, 1000, $timeout, \@events);
470             EVENT:
471 43         6947736 for ($i=0; $i<$evcount; $i++) {
472 16         48 my $ev = $events[$i];
473              
474             # it's possible epoll_wait returned many events, including some at the end
475             # that ones in the front triggered unregister-interest actions. if we
476             # can't find the %sock entry, it's because we're no longer interested
477             # in that event.
478 16         50 my Danga::Socket $pob = $DescriptorMap{$ev->[0]};
479 16         19 my $code;
480 16         22 my $state = $ev->[1];
481              
482             # if we didn't find a Perlbal::Socket subclass for that fd, try other
483             # pseudo-registered (above) fds.
484 16 50       32 if (! $pob) {
485 0 0       0 if (my $code = $OtherFds{$ev->[0]}) {
486 0         0 $code->($state);
487             } else {
488 0         0 my $fd = $ev->[0];
489 0         0 warn "epoll() returned fd $fd w/ state $state for which we have no mapping. removing.\n";
490 0         0 POSIX::close($fd);
491 0         0 epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, 0);
492             }
493 0         0 next;
494             }
495              
496 16         18 DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), state=%d \@ %s\n",
497             $ev->[0], ref($pob), $ev->[1], time);
498              
499 16 50       22 if ($DoProfile) {
500 0         0 my $class = ref $pob;
501              
502             # call profiling action on things that need to be done
503 0 0 0     0 if ($state & EPOLLIN && ! $pob->{closed}) {
504 0         0 _pre_profile();
505 0         0 $pob->event_read;
506 0         0 _post_profile("$class-read");
507             }
508              
509 0 0 0     0 if ($state & EPOLLOUT && ! $pob->{closed}) {
510 0         0 _pre_profile();
511 0         0 $pob->event_write;
512 0         0 _post_profile("$class-write");
513             }
514              
515 0 0       0 if ($state & (EPOLLERR|EPOLLHUP)) {
516 0 0 0     0 if ($state & EPOLLERR && ! $pob->{closed}) {
517 0         0 _pre_profile();
518 0         0 $pob->event_err;
519 0         0 _post_profile("$class-err");
520             }
521 0 0 0     0 if ($state & EPOLLHUP && ! $pob->{closed}) {
522 0         0 _pre_profile();
523 0         0 $pob->event_hup;
524 0         0 _post_profile("$class-hup");
525             }
526             }
527              
528 0         0 next;
529             }
530              
531             # standard non-profiling codepat
532 16 100 66     44 $pob->event_read if $state & EPOLLIN && ! $pob->{closed};
533 16 100 100     727 $pob->event_write if $state & EPOLLOUT && ! $pob->{closed};
534 15 50       38 if ($state & (EPOLLERR|EPOLLHUP)) {
535 0 0 0     0 $pob->event_err if $state & EPOLLERR && ! $pob->{closed};
536 0 0 0     0 $pob->event_hup if $state & EPOLLHUP && ! $pob->{closed};
537             }
538             }
539 42 100       189 return unless PostEventLoop();
540             }
541 0         0 exit 0;
542             }
543              
544             ### The fallback IO::Poll-based event loop. Gets installed as EventLoop if
545             ### IO::Epoll fails to load.
546             sub PollEventLoop {
547 1     1 0 697 my $class = shift;
548              
549 1         2 my Danga::Socket $pob;
550              
551 1         1 while (1) {
552 6         16 my $timeout = RunTimers();
553              
554             # the following sets up @poll as a series of ($poll,$event_mask)
555             # items, then uses IO::Poll::_poll, implemented in XS, which
556             # modifies the array in place with the even elements being
557             # replaced with the event masks that occured.
558 6         9 my @poll;
559 6         18 foreach my $fd ( keys %OtherFds ) {
560 0         0 push @poll, $fd, POLLIN;
561             }
562 6         27 while ( my ($fd, $sock) = each %DescriptorMap ) {
563 10         33 push @poll, $fd, $sock->{event_watch};
564             }
565              
566             # if nothing to poll, either end immediately (if no timeout)
567             # or just keep calling the callback
568 6 100       16 unless (@poll) {
569 1         150232 select undef, undef, undef, ($timeout / 1000);
570 1 50       14 return unless PostEventLoop();
571 1         5 next;
572             }
573              
574 5         300498 my $count = IO::Poll::_poll($timeout, @poll);
575 5 100       32 unless ($count) {
576 2 50       11 return unless PostEventLoop();
577 2         8 next;
578             }
579              
580             # Fetch handles with read events
581 3         8 while (@poll) {
582 8         14 my ($fd, $state) = splice(@poll, 0, 2);
583 8 100       17 next unless $state;
584              
585 4         7 $pob = $DescriptorMap{$fd};
586              
587 4 50       7 if (!$pob) {
588 0 0       0 if (my $code = $OtherFds{$fd}) {
589 0         0 $code->($state);
590             }
591 0         0 next;
592             }
593              
594 4 100 66     22 $pob->event_read if $state & POLLIN && ! $pob->{closed};
595 4 100 66     543 $pob->event_write if $state & POLLOUT && ! $pob->{closed};
596 4 50 33     13 $pob->event_err if $state & POLLERR && ! $pob->{closed};
597 4 50 33     11 $pob->event_hup if $state & POLLHUP && ! $pob->{closed};
598             }
599              
600 3 100       6 return unless PostEventLoop();
601             }
602              
603 0         0 exit 0;
604             }
605              
606             ### The kqueue-based event loop. Gets installed as EventLoop if IO::KQueue works
607             ### okay.
608             sub KQueueEventLoop {
609 0     0 0 0 my $class = shift;
610              
611 0         0 foreach my $fd (keys %OtherFds) {
612 0         0 $KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(), IO::KQueue::EV_ADD());
613             }
614              
615 0         0 while (1) {
616 0         0 my $timeout = RunTimers();
617 0         0 my @ret = $KQueue->kevent($timeout);
618              
619 0         0 foreach my $kev (@ret) {
620 0         0 my ($fd, $filter, $flags, $fflags) = @$kev;
621 0         0 my Danga::Socket $pob = $DescriptorMap{$fd};
622 0 0       0 if (!$pob) {
623 0 0       0 if (my $code = $OtherFds{$fd}) {
624 0         0 $code->($filter);
625             } else {
626 0         0 warn "kevent() returned fd $fd for which we have no mapping. removing.\n";
627 0         0 POSIX::close($fd); # close deletes the kevent entry
628             }
629 0         0 next;
630             }
631              
632 0         0 DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), flags=%d \@ %s\n",
633             $fd, ref($pob), $flags, time);
634              
635 0 0 0     0 $pob->event_read if $filter == IO::KQueue::EVFILT_READ() && !$pob->{closed};
636 0 0 0     0 $pob->event_write if $filter == IO::KQueue::EVFILT_WRITE() && !$pob->{closed};
637 0 0 0     0 if ($flags == IO::KQueue::EV_EOF() && !$pob->{closed}) {
638 0 0       0 if ($fflags) {
639 0         0 $pob->event_err;
640             } else {
641 0         0 $pob->event_hup;
642             }
643             }
644             }
645 0 0       0 return unless PostEventLoop();
646             }
647              
648 0         0 exit(0);
649             }
650              
651             =head2 C<< CLASS->SetPostLoopCallback( CODEREF ) >>
652              
653             Sets post loop callback function. Pass a subref and it will be
654             called every time the event loop finishes.
655              
656             Return 1 (or any true value) from the sub to make the loop continue, 0 or false
657             and it will exit.
658              
659             The callback function will be passed two parameters: \%DescriptorMap, \%OtherFds.
660              
661             =cut
662             sub SetPostLoopCallback {
663 6     6 1 79 my ($class, $ref) = @_;
664              
665 6 100       17 if (ref $class) {
666             # per-object callback
667 2         3 my Danga::Socket $self = $class;
668 2 50 33     9 if (defined $ref && ref $ref eq 'CODE') {
669 2         7 $PLCMap{$self->{fd}} = $ref;
670             } else {
671 0         0 delete $PLCMap{$self->{fd}};
672             }
673             } else {
674             # global callback
675 4 50 33     33 $PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef;
676             }
677             }
678              
679             # Internal function: run the post-event callback, send read events
680             # for pushed-back data, and close pending connections. returns 1
681             # if event loop should continue, or 0 to shut it all down.
682             sub PostEventLoop {
683             # fire read events for objects with pushed-back read data
684 48     48 0 100 my $loop = 1;
685 48         133 while ($loop) {
686 54         634 $loop = 0;
687 54         195 foreach my $fd (keys %PushBackSet) {
688 6         8 my Danga::Socket $pob = $PushBackSet{$fd};
689              
690             # a previous event_read invocation could've closed a
691             # connection that we already evaluated in "keys
692             # %PushBackSet", so skip ones that seem to have
693             # disappeared. this is expected.
694 6 50       14 next unless $pob;
695              
696 6 50       6 die "ASSERT: the $pob socket has no read_push_back" unless @{$pob->{read_push_back}};
  6         12  
697             next unless (! $pob->{closed} &&
698 6 50 33     22 $pob->{event_watch} & POLLIN);
699 6         7 $loop = 1;
700 6         15 $pob->event_read;
701             }
702             }
703              
704             # now we can close sockets that wanted to close during our event processing.
705             # (we didn't want to close them during the loop, as we didn't want fd numbers
706             # being reused and confused during the event loop)
707 48         185 while (my $sock = shift @ToClose) {
708 1         2 my $fd = fileno($sock);
709              
710             # close the socket. (not a Danga::Socket close)
711 1         8 $sock->close;
712              
713             # and now we can finally remove the fd from the map. see
714             # comment above in _cleanup.
715 1         63 delete $DescriptorMap{$fd};
716             }
717              
718              
719             # by default we keep running, unless a postloop callback (either per-object
720             # or global) cancels it
721 48         76 my $keep_running = 1;
722              
723             # per-object post-loop-callbacks
724 48         110 for my $plc (values %PLCMap) {
725 2   66     523 $keep_running &&= $plc->(\%DescriptorMap, \%OtherFds);
726             }
727              
728             # now we're at the very end, call callback if defined
729 48 100       336 if (defined $PostLoopCallback) {
730 38   100     354 $keep_running &&= $PostLoopCallback->(\%DescriptorMap, \%OtherFds);
731             }
732              
733 48         2458 return $keep_running;
734             }
735              
736             #####################################################################
737             ### Danga::Socket-the-object code
738             #####################################################################
739              
740             =head2 OBJECT METHODS
741              
742             =head2 C<< CLASS->new( $socket ) >>
743              
744             Create a new Danga::Socket subclass object for the given I which will
745             react to events on it during the C.
746              
747             This is normally (always?) called from your subclass via:
748              
749             $class->SUPER::new($socket);
750              
751             =cut
752             sub new {
753 10     10 1 8707 my Danga::Socket $self = shift;
754 10 100       50 $self = fields::new($self) unless ref $self;
755              
756 10         6755 my $sock = shift;
757              
758 10         23 $self->{sock} = $sock;
759 10         26 my $fd = fileno($sock);
760              
761 10 50 0     52 Carp::cluck("undef sock and/or fd in Danga::Socket->new. sock=" . ($sock || "") . ", fd=" . ($fd || ""))
      0        
      33        
762             unless $sock && $fd;
763              
764 10         23 $self->{fd} = $fd;
765 10         25 $self->{write_buf} = [];
766 10         17 $self->{write_buf_offset} = 0;
767 10         31 $self->{write_buf_size} = 0;
768 10         16 $self->{closed} = 0;
769 10         17 $self->{corked} = 0;
770 10         19 $self->{read_push_back} = [];
771              
772 10         18 $self->{event_watch} = POLLERR|POLLHUP|POLLNVAL;
773              
774 10         36 _InitPoller();
775              
776 10 50       25 if ($HaveEpoll) {
    0          
777             epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $self->{event_watch})
778 10 50       41 and die "couldn't add epoll watch for $fd\n";
779             }
780             elsif ($HaveKQueue) {
781             # Add them to the queue but disabled for now
782 0         0 $KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(),
783             IO::KQueue::EV_ADD() | IO::KQueue::EV_DISABLE());
784 0         0 $KQueue->EV_SET($fd, IO::KQueue::EVFILT_WRITE(),
785             IO::KQueue::EV_ADD() | IO::KQueue::EV_DISABLE());
786             }
787              
788             Carp::cluck("Danga::Socket::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})")
789 10 50       258 if $DescriptorMap{$fd};
790              
791 10         27 $DescriptorMap{$fd} = $self;
792 10         24 return $self;
793             }
794              
795              
796             #####################################################################
797             ### I N S T A N C E M E T H O D S
798             #####################################################################
799              
800             =head2 C<< $obj->tcp_cork( $boolean ) >>
801              
802             Turn TCP_CORK on or off depending on the value of I.
803              
804             =cut
805             sub tcp_cork {
806 1     1 1 1 my Danga::Socket $self = $_[0];
807 1         2 my $val = $_[1];
808              
809             # make sure we have a socket
810 1 50       3 return unless $self->{sock};
811 1 50       8 return if $val == $self->{corked};
812              
813 0         0 my $rv;
814 0         0 if (TCP_CORK) {
815 0 0       0 $rv = setsockopt($self->{sock}, IPPROTO_TCP, TCP_CORK,
816             pack("l", $val ? 1 : 0));
817             } else {
818             # FIXME: implement freebsd *PUSH sockopts
819             $rv = 1;
820             }
821              
822             # if we failed, close (if we're not already) and warn about the error
823 0 0       0 if ($rv) {
824 0         0 $self->{corked} = $val;
825             } else {
826 0 0 0     0 if ($! == EBADF || $! == ENOTSOCK) {
    0 0        
      0        
827             # internal state is probably corrupted; warn and then close if
828             # we're not closed already
829 0         0 warn "setsockopt: $!";
830 0         0 $self->close('tcp_cork_failed');
831             } elsif ($! == ENOPROTOOPT || $!{ENOTSOCK} || $!{EOPNOTSUPP}) {
832             # TCP implementation doesn't support corking, so just ignore it
833             # or we're trying to tcp-cork a non-socket (like a socketpair pipe
834             # which is acting like a socket, which Perlbal does for child
835             # processes acting like inetd-like web servers)
836             } else {
837             # some other error; we should never hit here, but if we do, die
838 0         0 die "setsockopt: $!";
839             }
840             }
841             }
842              
843             =head2 C<< $obj->steal_socket() >>
844              
845             Basically returns our socket and makes it so that we don't try to close it,
846             but we do remove it from epoll handlers. THIS CLOSES $self. It is the same
847             thing as calling close, except it gives you the socket to use.
848              
849             =cut
850             sub steal_socket {
851 0     0 1 0 my Danga::Socket $self = $_[0];
852 0 0       0 return if $self->{closed};
853              
854             # cleanup does most of the work of closing this socket
855 0         0 $self->_cleanup();
856              
857             # now undef our internal sock and fd structures so we don't use them
858 0         0 my $sock = $self->{sock};
859 0         0 $self->{sock} = undef;
860 0         0 return $sock;
861             }
862              
863             =head2 C<< $obj->close( [$reason] ) >>
864              
865             Close the socket. The I argument will be used in debugging messages.
866              
867             =cut
868             sub close {
869 10     10 1 101 my Danga::Socket $self = $_[0];
870 10 100       17 return if $self->{closed};
871              
872             # print out debugging info for this close
873 1         1 if (DebugLevel) {
874             my ($pkg, $filename, $line) = caller;
875             my $reason = $_[1] || "";
876             warn "Closing \#$self->{fd} due to $pkg/$filename/$line ($reason)\n";
877             }
878              
879             # this does most of the work of closing us
880 1         5 $self->_cleanup();
881              
882             # defer closing the actual socket until the event loop is done
883             # processing this round of events. (otherwise we might reuse fds)
884 1 50       9 if ($self->{sock}) {
885 1         4 push @ToClose, $self->{sock};
886 1         2 $self->{sock} = undef;
887             }
888              
889 1         2 return 0;
890             }
891              
892             ### METHOD: _cleanup()
893             ### Called by our closers so we can clean internal data structures.
894             sub _cleanup {
895 1     1   2 my Danga::Socket $self = $_[0];
896              
897             # we're effectively closed; we have no fd and sock when we leave here
898 1         2 $self->{closed} = 1;
899              
900             # we need to flush our write buffer, as there may
901             # be self-referential closures (sub { $client->close })
902             # preventing the object from being destroyed
903 1         2 $self->{write_buf} = [];
904              
905             # uncork so any final data gets sent. only matters if the person closing
906             # us forgot to do it, but we do it to be safe.
907 1         5 $self->tcp_cork(0);
908              
909             # if we're using epoll, we have to remove this from our epoll fd so we stop getting
910             # notifications about it
911 1 50 33     6 if ($HaveEpoll && $self->{fd}) {
912 1 50       4 if (epoll_ctl($Epoll, EPOLL_CTL_DEL, $self->{fd}, $self->{event_watch}) != 0) {
913             # dump_error prints a backtrace so we can try to figure out why this happened
914 0         0 $self->dump_error("epoll_ctl(): failure deleting fd=$self->{fd} during _cleanup(); $! (" . ($!+0) . ")");
915             }
916             }
917              
918             # now delete from mappings. this fd no longer belongs to us, so we don't want
919             # to get alerts for it if it becomes writable/readable/etc.
920 1         42 delete $PushBackSet{$self->{fd}};
921 1         3 delete $PLCMap{$self->{fd}};
922              
923             # we explicitly don't delete from DescriptorMap here until we
924             # actually close the socket, as we might be in the middle of
925             # processing an epoll_wait/etc that returned hundreds of fds, one
926             # of which is not yet processed and is what we're closing. if we
927             # keep it in DescriptorMap, then the event harnesses can just
928             # looked at $pob->{closed} and ignore it. but if it's an
929             # un-accounted for fd, then it (understandably) freak out a bit
930             # and emit warnings, thinking their state got off.
931              
932             # and finally get rid of our fd so we can't use it anywhere else
933 1         3 $self->{fd} = undef;
934             }
935              
936             =head2 C<< $obj->sock() >>
937              
938             Returns the underlying IO::Handle for the object.
939              
940             =cut
941             sub sock {
942 0     0 1 0 my Danga::Socket $self = shift;
943 0         0 return $self->{sock};
944             }
945              
946             =head2 C<< $obj->set_writer_func( CODEREF ) >>
947              
948             Sets a function to use instead of C when writing data to the socket.
949              
950             =cut
951             sub set_writer_func {
952 0     0 1 0 my Danga::Socket $self = shift;
953 0         0 my $wtr = shift;
954 0 0 0     0 Carp::croak("Not a subref") unless !defined $wtr || UNIVERSAL::isa($wtr, "CODE");
955 0         0 $self->{writer_func} = $wtr;
956             }
957              
958             =head2 C<< $obj->write( $data ) >>
959              
960             Write the specified data to the underlying handle. I may be scalar,
961             scalar ref, code ref (to run when there), or undef just to kick-start.
962             Returns 1 if writes all went through, or 0 if there are writes in queue. If
963             it returns 1, caller should stop waiting for 'writable' events)
964              
965             =cut
966             sub write {
967 2     2 1 602 my Danga::Socket $self;
968             my $data;
969 2         6 ($self, $data) = @_;
970              
971             # nobody should be writing to closed sockets, but caller code can
972             # do two writes within an event, have the first fail and
973             # disconnect the other side (whose destructor then closes the
974             # calling object, but it's still in a method), and then the
975             # now-dead object does its second write. that is this case. we
976             # just lie and say it worked. it'll be dead soon and won't be
977             # hurt by this lie.
978 2 50       10 return 1 if $self->{closed};
979              
980 2         5 my $bref;
981              
982             # just queue data if there's already a wait
983             my $need_queue;
984              
985 2 50       4 if (defined $data) {
986 2 50       8 $bref = ref $data ? $data : \$data;
987 2 50       5 if ($self->{write_buf_size}) {
988 0         0 push @{$self->{write_buf}}, $bref;
  0         0  
989 0 0       0 $self->{write_buf_size} += ref $bref eq "SCALAR" ? length($$bref) : 1;
990 0         0 return 0;
991             }
992              
993             # this flag says we're bypassing the queue system, knowing we're the
994             # only outstanding write, and hoping we don't ever need to use it.
995             # if so later, though, we'll need to queue
996 2         4 $need_queue = 1;
997             }
998              
999             WRITE:
1000 2         8 while (1) {
1001 2 50 33     10 return 1 unless $bref ||= $self->{write_buf}[0];
1002              
1003 2         3 my $len;
1004 2         4 eval {
1005 2         4 $len = length($$bref); # this will die if $bref is a code ref, caught below
1006             };
1007 2 50       6 if ($@) {
1008 0 0       0 if (UNIVERSAL::isa($bref, "CODE")) {
1009 0 0       0 unless ($need_queue) {
1010 0         0 $self->{write_buf_size}--; # code refs are worth 1
1011 0         0 shift @{$self->{write_buf}};
  0         0  
1012             }
1013 0         0 $bref->();
1014              
1015             # code refs are just run and never get reenqueued
1016             # (they're one-shot), so turn off the flag indicating the
1017             # outstanding data needs queueing.
1018 0         0 $need_queue = 0;
1019              
1020 0         0 undef $bref;
1021 0         0 next WRITE;
1022             }
1023 0         0 die "Write error: $@ <$bref>";
1024             }
1025              
1026 2         5 my $to_write = $len - $self->{write_buf_offset};
1027 2         3 my $written;
1028 2 50       5 if (my $wtr = $self->{writer_func}) {
1029 0         0 $written = $wtr->($bref, $to_write, $self->{write_buf_offset});
1030             } else {
1031 2         84 $written = syswrite($self->{sock}, $$bref, $to_write, $self->{write_buf_offset});
1032             }
1033              
1034 2 50       15 if (! defined $written) {
    50          
    50          
1035 0 0       0 if ($! == EPIPE) {
    0          
    0          
1036 0         0 return $self->close("EPIPE");
1037             } elsif ($! == EAGAIN) {
1038             # since connection has stuff to write, it should now be
1039             # interested in pending writes:
1040 0 0       0 if ($need_queue) {
1041 0         0 push @{$self->{write_buf}}, $bref;
  0         0  
1042 0         0 $self->{write_buf_size} += $len;
1043             }
1044 0 0       0 $self->{write_set_watch} = 1 unless $self->{event_watch} & POLLOUT;
1045 0         0 $self->watch_write(1);
1046 0         0 return 0;
1047             } elsif ($! == ECONNRESET) {
1048 0         0 return $self->close("ECONNRESET");
1049             }
1050              
1051 0         0 DebugLevel >= 1 && $self->debugmsg("Closing connection ($self) due to write error: $!\n");
1052              
1053 0         0 return $self->close("write_error");
1054             } elsif ($written != $to_write) {
1055             DebugLevel >= 2 && $self->debugmsg("Wrote PARTIAL %d bytes to %d",
1056 0         0 $written, $self->{fd});
1057 0 0       0 if ($need_queue) {
1058 0         0 push @{$self->{write_buf}}, $bref;
  0         0  
1059 0         0 $self->{write_buf_size} += $len;
1060             }
1061             # since connection has stuff to write, it should now be
1062             # interested in pending writes:
1063 0         0 $self->{write_buf_offset} += $written;
1064 0         0 $self->{write_buf_size} -= $written;
1065 0         0 $self->on_incomplete_write;
1066 0         0 return 0;
1067             } elsif ($written == $to_write) {
1068             DebugLevel >= 2 && $self->debugmsg("Wrote ALL %d bytes to %d (nq=%d)",
1069 2         4 $written, $self->{fd}, $need_queue);
1070 2         5 $self->{write_buf_offset} = 0;
1071              
1072 2 50       5 if ($self->{write_set_watch}) {
1073 0         0 $self->watch_write(0);
1074 0         0 $self->{write_set_watch} = 0;
1075             }
1076              
1077             # this was our only write, so we can return immediately
1078             # since we avoided incrementing the buffer size or
1079             # putting it in the buffer. we also know there
1080             # can't be anything else to write.
1081 2 50       9 return 1 if $need_queue;
1082              
1083 0         0 $self->{write_buf_size} -= $written;
1084 0         0 shift @{$self->{write_buf}};
  0         0  
1085 0         0 undef $bref;
1086 0         0 next WRITE;
1087             }
1088             }
1089             }
1090              
1091             sub on_incomplete_write {
1092 0     0 0 0 my Danga::Socket $self = shift;
1093 0 0       0 $self->{write_set_watch} = 1 unless $self->{event_watch} & POLLOUT;
1094 0         0 $self->watch_write(1);
1095             }
1096              
1097             =head2 C<< $obj->push_back_read( $buf ) >>
1098              
1099             Push back I (a scalar or scalarref) into the read stream. Useful if you read
1100             more than you need to and want to return this data on the next "read".
1101              
1102             =cut
1103             sub push_back_read {
1104 4     4 1 868 my Danga::Socket $self = shift;
1105 4         6 my $buf = shift;
1106 4 50       6 push @{$self->{read_push_back}}, ref $buf ? $buf : \$buf;
  4         11  
1107 4         13 $PushBackSet{$self->{fd}} = $self;
1108             }
1109              
1110             =head2 C<< $obj->read( $bytecount ) >>
1111              
1112             Read at most I bytes from the underlying handle; returns scalar ref
1113             on read, or undef on connection closed. If you call read more than once and no
1114             more data available after the first call, a scalar ref to an empty string is
1115             returned.
1116              
1117             =cut
1118             sub read {
1119 10     10 1 100 my Danga::Socket $self = shift;
1120 10 50       18 return if $self->{closed};
1121 10         12 my $bytes = shift;
1122 10         8 my $buf;
1123 10         14 my $sock = $self->{sock};
1124              
1125 10 100       9 if (@{$self->{read_push_back}}) {
  10         22  
1126 6         6 $buf = shift @{$self->{read_push_back}};
  6         8  
1127 6         8 my $len = length($$buf);
1128              
1129 6 100       13 if ($len <= $bytes) {
1130 4 50       5 delete $PushBackSet{$self->{fd}} unless @{$self->{read_push_back}};
  4         12  
1131 4         7 return $buf;
1132             } else {
1133             # if the pushed back read is too big, we have to split it
1134 2         4 my $overflow = substr($$buf, $bytes);
1135 2         4 $buf = substr($$buf, 0, $bytes);
1136 2         4 unshift @{$self->{read_push_back}}, \$overflow;
  2         5  
1137 2         5 return \$buf;
1138             }
1139             }
1140              
1141             # if this is too high, perl quits(!!). reports on mailing lists
1142             # don't seem to point to a universal answer. 5MB worked for some,
1143             # crashed for others. 1MB works for more people. let's go with 1MB
1144             # for now. :/
1145 4 50       8 my $req_bytes = $bytes > 1048576 ? 1048576 : $bytes;
1146              
1147 4         42 my $res = sysread($sock, $buf, $req_bytes, 0);
1148 4         7 DebugLevel >= 2 && $self->debugmsg("sysread = %d; \$! = %d", $res, $!);
1149              
1150 4 50 33     9 if (! $res && $! != EWOULDBLOCK) {
1151             # catches 0=conn closed or undef=error
1152 0         0 DebugLevel >= 2 && $self->debugmsg("Fd \#%d read hit the end of the road.", $self->{fd});
1153 0         0 return undef;
1154             }
1155              
1156 4         11 return \$buf;
1157             }
1158              
1159             =head2 (VIRTUAL) C<< $obj->event_read() >>
1160              
1161             Readable event handler. Concrete deriviatives of Danga::Socket should
1162             provide an implementation of this. The default implementation will die if
1163             called.
1164              
1165             =cut
1166 0     0 1 0 sub event_read { die "Base class event_read called for $_[0]\n"; }
1167              
1168             =head2 (VIRTUAL) C<< $obj->event_err() >>
1169              
1170             Error event handler. Concrete deriviatives of Danga::Socket should
1171             provide an implementation of this. The default implementation will die if
1172             called.
1173              
1174             =cut
1175 0     0 1 0 sub event_err { die "Base class event_err called for $_[0]\n"; }
1176              
1177             =head2 (VIRTUAL) C<< $obj->event_hup() >>
1178              
1179             'Hangup' event handler. Concrete deriviatives of Danga::Socket should
1180             provide an implementation of this. The default implementation will die if
1181             called.
1182              
1183             =cut
1184 0     0 1 0 sub event_hup { die "Base class event_hup called for $_[0]\n"; }
1185              
1186             =head2 C<< $obj->event_write() >>
1187              
1188             Writable event handler. Concrete deriviatives of Danga::Socket may wish to
1189             provide an implementation of this. The default implementation calls
1190             C with an C.
1191              
1192             =cut
1193             sub event_write {
1194 0     0 1 0 my $self = shift;
1195 0         0 $self->write(undef);
1196             }
1197              
1198             =head2 C<< $obj->watch_read( $boolean ) >>
1199              
1200             Turn 'readable' event notification on or off.
1201              
1202             =cut
1203             sub watch_read {
1204 8     8 1 464 my Danga::Socket $self = shift;
1205 8 50 33     45 return if $self->{closed} || !$self->{sock};
1206              
1207 8         13 my $val = shift;
1208 8         16 my $event = $self->{event_watch};
1209              
1210 8 100       18 $event &= ~POLLIN if ! $val;
1211 8 100       21 $event |= POLLIN if $val;
1212              
1213             # If it changed, set it
1214 8 50       21 if ($event != $self->{event_watch}) {
1215 8 50       24 if ($HaveKQueue) {
    50          
1216 0 0       0 $KQueue->EV_SET($self->{fd}, IO::KQueue::EVFILT_READ(),
1217             $val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE());
1218             }
1219             elsif ($HaveEpoll) {
1220 8 50       21 epoll_ctl($Epoll, EPOLL_CTL_MOD, $self->{fd}, $event)
1221             and $self->dump_error("couldn't modify epoll settings for $self->{fd} " .
1222             "from $self->{event_watch} -> $event: $! (" . ($!+0) . ")");
1223             }
1224 8         109 $self->{event_watch} = $event;
1225             }
1226             }
1227              
1228             =head2 C<< $obj->watch_write( $boolean ) >>
1229              
1230             Turn 'writable' event notification on or off.
1231              
1232             =cut
1233             sub watch_write {
1234 6     6 1 41 my Danga::Socket $self = shift;
1235 6 50 33     32 return if $self->{closed} || !$self->{sock};
1236              
1237 6         8 my $val = shift;
1238 6         11 my $event = $self->{event_watch};
1239              
1240 6 100       14 $event &= ~POLLOUT if ! $val;
1241 6 100       19 $event |= POLLOUT if $val;
1242              
1243 6 100 66     33 if ($val && caller ne __PACKAGE__) {
1244             # A subclass registered interest, it's now responsible for this.
1245 4         10 $self->{write_set_watch} = 0;
1246             }
1247              
1248             # If it changed, set it
1249 6 50       26 if ($event != $self->{event_watch}) {
1250 6 50       21 if ($HaveKQueue) {
    50          
1251 0 0       0 $KQueue->EV_SET($self->{fd}, IO::KQueue::EVFILT_WRITE(),
1252             $val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE());
1253             }
1254             elsif ($HaveEpoll) {
1255 6 50       19 epoll_ctl($Epoll, EPOLL_CTL_MOD, $self->{fd}, $event)
1256             and $self->dump_error("couldn't modify epoll settings for $self->{fd} " .
1257             "from $self->{event_watch} -> $event: $! (" . ($!+0) . ")");
1258             }
1259 6         90 $self->{event_watch} = $event;
1260             }
1261             }
1262              
1263             =head2 C<< $obj->dump_error( $message ) >>
1264              
1265             Prints to STDERR a backtrace with information about this socket and what lead
1266             up to the dump_error call.
1267              
1268             =cut
1269             sub dump_error {
1270 0     0 1 0 my $i = 0;
1271 0         0 my @list;
1272 0         0 while (my ($file, $line, $sub) = (caller($i++))[1..3]) {
1273 0         0 push @list, "\t$file:$line called $sub\n";
1274             }
1275              
1276 0         0 warn "ERROR: $_[1]\n" .
1277             "\t$_[0] = " . $_[0]->as_string . "\n" .
1278             join('', @list);
1279             }
1280              
1281             =head2 C<< $obj->debugmsg( $format, @args ) >>
1282              
1283             Print the debugging message specified by the C-style I and
1284             I.
1285              
1286             =cut
1287             sub debugmsg {
1288 0     0 1 0 my ( $self, $fmt, @args ) = @_;
1289 0 0       0 confess "Not an object" unless ref $self;
1290              
1291 0         0 chomp $fmt;
1292 0         0 printf STDERR ">>> $fmt\n", @args;
1293             }
1294              
1295              
1296             =head2 C<< $obj->peer_ip_string() >>
1297              
1298             Returns the string describing the peer's IP
1299              
1300             =cut
1301             sub peer_ip_string {
1302 2     2 1 4 my Danga::Socket $self = shift;
1303 2 50       6 return _undef("peer_ip_string undef: no sock") unless $self->{sock};
1304 2 50       6 return $self->{peer_ip} if defined $self->{peer_ip};
1305              
1306 2         20 my $pn = getpeername($self->{sock});
1307 2 50       7 return _undef("peer_ip_string undef: getpeername") unless $pn;
1308              
1309 2         4 my ($port, $iaddr) = eval {
1310 2 50       4 if (length($pn) >= 28) {
1311 0         0 return Socket6::unpack_sockaddr_in6($pn);
1312             } else {
1313 2         5 return Socket::sockaddr_in($pn);
1314             }
1315             };
1316              
1317 2 50       20 if ($@) {
1318 0         0 $self->{peer_port} = "[Unknown peerport '$@']";
1319 0         0 return "[Unknown peername '$@']";
1320             }
1321              
1322 2         11 $self->{peer_port} = $port;
1323              
1324 2 50       8 if (length($iaddr) == 4) {
1325 2         22 return $self->{peer_ip} = Socket::inet_ntoa($iaddr);
1326             } else {
1327 0         0 $self->{peer_v6} = 1;
1328 0         0 return $self->{peer_ip} = Socket6::inet_ntop(Socket6::AF_INET6(),
1329             $iaddr);
1330             }
1331             }
1332              
1333             =head2 C<< $obj->peer_addr_string() >>
1334              
1335             Returns the string describing the peer for the socket which underlies this
1336             object in form "ip:port"
1337              
1338             =cut
1339             sub peer_addr_string {
1340 2     2 1 15 my Danga::Socket $self = shift;
1341 2 50       9 my $ip = $self->peer_ip_string
1342             or return undef;
1343             return $self->{peer_v6} ?
1344 2 50       13 "[$ip]:$self->{peer_port}" :
1345             "$ip:$self->{peer_port}";
1346             }
1347              
1348             =head2 C<< $obj->local_ip_string() >>
1349              
1350             Returns the string describing the local IP
1351              
1352             =cut
1353             sub local_ip_string {
1354 2     2 1 4 my Danga::Socket $self = shift;
1355 2 50       5 return _undef("local_ip_string undef: no sock") unless $self->{sock};
1356 2 50       5 return $self->{local_ip} if defined $self->{local_ip};
1357              
1358 2         17 my $pn = getsockname($self->{sock});
1359 2 50       6 return _undef("local_ip_string undef: getsockname") unless $pn;
1360              
1361 2         4 my ($port, $iaddr) = Socket::sockaddr_in($pn);
1362 2         15 $self->{local_port} = $port;
1363              
1364 2         8 return $self->{local_ip} = Socket::inet_ntoa($iaddr);
1365             }
1366              
1367             =head2 C<< $obj->local_addr_string() >>
1368              
1369             Returns the string describing the local end of the socket which underlies this
1370             object in form "ip:port"
1371              
1372             =cut
1373             sub local_addr_string {
1374 2     2 1 13 my Danga::Socket $self = shift;
1375 2         8 my $ip = $self->local_ip_string;
1376 2 50       10 return $ip ? "$ip:$self->{local_port}" : undef;
1377             }
1378              
1379              
1380             =head2 C<< $obj->as_string() >>
1381              
1382             Returns a string describing this socket.
1383              
1384             =cut
1385             sub as_string {
1386 0     0 1   my Danga::Socket $self = shift;
1387             my $rw = "(" . ($self->{event_watch} & POLLIN ? 'R' : '') .
1388 0 0         ($self->{event_watch} & POLLOUT ? 'W' : '') . ")";
    0          
1389 0 0         my $ret = ref($self) . "$rw: " . ($self->{closed} ? "closed" : "open");
1390 0           my $peer = $self->peer_addr_string;
1391 0 0         if ($peer) {
1392 0           $ret .= " to " . $self->peer_addr_string;
1393             }
1394 0           return $ret;
1395             }
1396              
1397             sub _undef {
1398 0 0   0     return undef unless $ENV{DS_DEBUG};
1399 0   0       my $msg = shift || "";
1400 0           warn "Danga::Socket: $msg\n";
1401 0           return undef;
1402             }
1403              
1404             package Danga::Socket::Timer;
1405             # [$abs_float_firetime, $coderef];
1406             sub cancel {
1407 0     0     $_[0][1] = undef;
1408             }
1409              
1410             =head1 AUTHORS
1411              
1412             Brad Fitzpatrick - author
1413              
1414             Michael Granger - docs, testing
1415              
1416             Mark Smith - contributor, heavy user, testing
1417              
1418             Matt Sergeant - kqueue support, docs, timers, other bits
1419              
1420             =head1 BUGS
1421              
1422             Not documented enough (but isn't that true of every project?).
1423              
1424             tcp_cork only works on Linux for now. No BSD push/nopush support.
1425              
1426             =head1 LICENSE
1427              
1428             License is granted to use and distribute this module under the same
1429             terms as Perl itself.
1430              
1431             =cut
1432              
1433             1;
1434              
1435             # Local Variables:
1436             # mode: perl
1437             # c-basic-indent: 4
1438             # indent-tabs-mode: nil
1439             # End: