File Coverage

blib/lib/Danga/Socket.pm
Criterion Covered Total %
statement 323 527 61.2
branch 125 292 42.8
condition 38 119 31.9
subroutine 46 69 66.6
pod 36 43 83.7
total 568 1050 54.1


line stmt bran cond sub pod time code
1             ###########################################################################
2              
3             =head1 NAME
4              
5             Danga::Socket - Event loop and event-driven async socket base class
6              
7             =head1 SYNOPSIS
8              
9             package My::Socket
10             use Danga::Socket;
11             use base ('Danga::Socket');
12             use fields ('my_attribute');
13              
14             sub new {
15             my My::Socket $self = shift;
16             $self = fields::new($self) unless ref $self;
17             $self->SUPER::new( @_ );
18              
19             $self->{my_attribute} = 1234;
20             return $self;
21             }
22              
23             sub event_err { ... }
24             sub event_hup { ... }
25             sub event_write { ... }
26             sub event_read { ... }
27             sub close { ... }
28              
29             $my_sock->tcp_cork($bool);
30              
31             # write returns 1 if all writes have gone through, or 0 if there
32             # are writes in queue
33             $my_sock->write($scalar);
34             $my_sock->write($scalarref);
35             $my_sock->write(sub { ... }); # run when previous data written
36             $my_sock->write(undef); # kick-starts
37              
38             # read max $bytecount bytes, or undef on connection closed
39             $scalar_ref = $my_sock->read($bytecount);
40              
41             # watch for writability. not needed with ->write(). write()
42             # will automatically turn on watch_write when you wrote too much
43             # and turn it off when done
44             $my_sock->watch_write($bool);
45              
46             # watch for readability
47             $my_sock->watch_read($bool);
48              
49             # if you read too much and want to push some back on
50             # readable queue. (not incredibly well-tested)
51             $my_sock->push_back_read($buf); # scalar or scalar ref
52              
53             Danga::Socket->AddOtherFds(..);
54             Danga::Socket->SetLoopTimeout($millisecs);
55             Danga::Socket->DescriptorMap();
56             Danga::Socket->WatchedSockets(); # count of DescriptorMap keys
57             Danga::Socket->SetPostLoopCallback($code);
58             Danga::Socket->EventLoop();
59              
60             =head1 DESCRIPTION
61              
62             This is an abstract base class for objects backed by a socket which
63             provides the basic framework for event-driven asynchronous IO,
64             designed to be fast. Danga::Socket is both a base class for objects,
65             and an event loop.
66              
67             Callers subclass Danga::Socket. Danga::Socket's constructor registers
68             itself with the Danga::Socket event loop, and invokes callbacks on the
69             object for readability, writability, errors, and other conditions.
70              
71             Because Danga::Socket uses the "fields" module, your subclasses must
72             too.
73              
74             =head1 MORE INFO
75              
76             For now, see servers using Danga::Socket for guidance. For example:
77             perlbal, mogilefsd, or ddlockd.
78              
79             =head1 API
80              
81             Note where "C" is used below, normally you would call these methods as:
82              
83             Danga::Socket->method(...);
84              
85             However using a subclass works too.
86              
87             The CLASS methods are all methods for the event loop part of Danga::Socket,
88             whereas the object methods are all used on your subclasses.
89              
90             =cut
91              
92             ###########################################################################
93              
94             package Danga::Socket;
95 4     4   88187 use strict;
  4         11  
  4         178  
96 4     4   3628 use bytes;
  4         34  
  4         27  
97 4     4   12647 use POSIX ();
  4         43506  
  4         103  
98 4     4   4561 use Time::HiRes ();
  4         19996  
  4         200  
99              
100 4     4   2002 my $opt_bsd_resource = eval "use BSD::Resource; 1;";
  0         0  
  0         0  
101              
102 4     4   40 use vars qw{$VERSION};
  4         9  
  4         4186  
103             $VERSION = "1.61";
104              
105 4     4   30 use warnings;
  4         8  
  4         380  
106 4     4   23 no warnings qw(deprecated);
  4         8  
  4         161  
107              
108 4     4   5957 use Sys::Syscall qw(:epoll);
  4         22454  
  4         1666  
109              
110 4         35 use fields ('sock', # underlying socket
111             'fd', # numeric file descriptor
112             'write_buf', # arrayref of scalars, scalarrefs, or coderefs to write
113             'write_buf_offset', # offset into first array of write_buf to start writing at
114             'write_buf_size', # total length of data in all write_buf items
115             'write_set_watch', # bool: true if we internally set watch_write rather than by a subclass
116             'read_push_back', # arrayref of "pushed-back" read data the application didn't want
117             'closed', # bool: socket is closed
118             'corked', # bool: socket is corked
119             'event_watch', # bitmask of events the client is interested in (POLLIN,OUT,etc.)
120             'peer_v6', # bool: cached; if peer is an IPv6 address
121             'peer_ip', # cached stringified IP address of $sock
122             'peer_port', # cached port number of $sock
123             'local_ip', # cached stringified IP address of local end of $sock
124             'local_port', # cached port number of local end of $sock
125             'writer_func', # subref which does writing. must return bytes written (or undef) and set $! on errors
126 4     4   4940 );
  4         7856  
127              
128 4         812 use Errno qw(EINPROGRESS EWOULDBLOCK EISCONN ENOTSOCK
129 4     4   4912 EPIPE EAGAIN EBADF ECONNRESET ENOPROTOOPT);
  4         6697  
130 4     4   4791 use Socket qw(IPPROTO_TCP);
  4         19972  
  4         1118  
131 4     4   48 use Carp qw(croak confess);
  4         6  
  4         264  
132              
133 4 50   4   22 use constant TCP_CORK => ($^O eq "linux" ? 3 : 0); # FIXME: not hard-coded (Linux-specific too)
  4         8  
  4         259  
134 4     4   19 use constant DebugLevel => 0;
  4         7  
  4         148  
135              
136 4     4   19 use constant POLLIN => 1;
  4         6  
  4         144  
137 4     4   20 use constant POLLOUT => 4;
  4         8  
  4         288  
138 4     4   20 use constant POLLERR => 8;
  4         6  
  4         162  
139 4     4   20 use constant POLLHUP => 16;
  4         9  
  4         147  
140 4     4   19 use constant POLLNVAL => 32;
  4         6  
  4         61941  
141              
142             our $HAVE_KQUEUE = eval { require IO::KQueue; 1 };
143              
144             our (
145             $HaveEpoll, # Flag -- is epoll available? initially undefined.
146             $HaveKQueue,
147             %DescriptorMap, # fd (num) -> Danga::Socket object
148             %PushBackSet, # fd (num) -> Danga::Socket (fds with pushed back read data)
149             $Epoll, # Global epoll fd (for epoll mode only)
150             $KQueue, # Global kqueue fd (for kqueue mode only)
151             @ToClose, # sockets to close when event loop is done
152             %OtherFds, # A hash of "other" (non-Danga::Socket) file
153             # descriptors for the event loop to track.
154              
155             $PostLoopCallback, # subref to call at the end of each loop, if defined (global)
156             %PLCMap, # fd (num) -> PostLoopCallback (per-object)
157              
158             $LoopTimeout, # timeout of event loop in milliseconds
159             $DoProfile, # if on, enable profiling
160             %Profiling, # what => [ utime, stime, calls ]
161             $DoneInit, # if we've done the one-time module init yet
162             @Timers, # timers
163             );
164              
165             Reset();
166              
167             #####################################################################
168             ### C L A S S M E T H O D S
169             #####################################################################
170              
171             =head2 C<< CLASS->Reset() >>
172              
173             Reset all state
174              
175             =cut
176             sub Reset {
177 6     6 1 3157 %DescriptorMap = ();
178 6         17 %PushBackSet = ();
179 6         15 @ToClose = ();
180 6         14 %OtherFds = ();
181 6         15 $LoopTimeout = -1; # no timeout by default
182 6         9 $DoProfile = 0;
183 6         44 %Profiling = ();
184 6         14 @Timers = ();
185              
186 6         15 $PostLoopCallback = undef;
187 6         26 %PLCMap = ();
188 6         13 $DoneInit = 0;
189              
190 6 100 66     91 POSIX::close($Epoll) if defined $Epoll && $Epoll >= 0;
191 6 50 33     35 POSIX::close($KQueue) if defined $KQueue && $KQueue >= 0;
192            
193 6         43 *EventLoop = *FirstTimeEventLoop;
194             }
195              
196             =head2 C<< CLASS->HaveEpoll() >>
197              
198             Returns a true value if this class will use IO::Epoll for async IO.
199              
200             =cut
201             sub HaveEpoll {
202 1     1 1 45 _InitPoller();
203 1         8 return $HaveEpoll;
204             }
205              
206             =head2 C<< CLASS->WatchedSockets() >>
207              
208             Returns the number of file descriptors which are registered with the global
209             poll object.
210              
211             =cut
212             sub WatchedSockets {
213 4     4 1 4071 return scalar keys %DescriptorMap;
214             }
215             *watched_sockets = *WatchedSockets;
216              
217             =head2 C<< CLASS->EnableProfiling() >>
218              
219             Turns profiling on, clearing current profiling data.
220              
221             =cut
222             sub EnableProfiling {
223 0 0   0 1 0 if ($opt_bsd_resource) {
224 0         0 %Profiling = ();
225 0         0 $DoProfile = 1;
226 0         0 return 1;
227             }
228 0         0 return 0;
229             }
230              
231             =head2 C<< CLASS->DisableProfiling() >>
232              
233             Turns off profiling, but retains data up to this point
234              
235             =cut
236             sub DisableProfiling {
237 0     0 1 0 $DoProfile = 0;
238             }
239              
240             =head2 C<< CLASS->ProfilingData() >>
241              
242             Returns reference to a hash of data in format:
243              
244             ITEM => [ utime, stime, #calls ]
245              
246             =cut
247             sub ProfilingData {
248 0     0 1 0 return \%Profiling;
249             }
250              
251             =head2 C<< CLASS->ToClose() >>
252              
253             Return the list of sockets that are awaiting close() at the end of the
254             current event loop.
255              
256             =cut
257 0     0 1 0 sub ToClose { return @ToClose; }
258              
259             =head2 C<< CLASS->OtherFds( [%fdmap] ) >>
260              
261             Get/set the hash of file descriptors that need processing in parallel with
262             the registered Danga::Socket objects.
263              
264             =cut
265             sub OtherFds {
266 0     0 1 0 my $class = shift;
267 0 0       0 if ( @_ ) { %OtherFds = @_ }
  0         0  
268 0 0       0 return wantarray ? %OtherFds : \%OtherFds;
269             }
270              
271             =head2 C<< CLASS->AddOtherFds( [%fdmap] ) >>
272              
273             Add fds to the OtherFds hash for processing.
274              
275             =cut
276             sub AddOtherFds {
277 0     0 1 0 my $class = shift;
278 0         0 %OtherFds = ( %OtherFds, @_ ); # FIXME investigate what happens on dupe fds
279 0 0       0 return wantarray ? %OtherFds : \%OtherFds;
280             }
281              
282             =head2 C<< CLASS->SetLoopTimeout( $timeout ) >>
283              
284             Set the loop timeout for the event loop to some value in milliseconds.
285              
286             A timeout of 0 (zero) means poll forever. A timeout of -1 means poll and return
287             immediately.
288              
289             =cut
290             sub SetLoopTimeout {
291 3     3 1 31 return $LoopTimeout = $_[1] + 0;
292             }
293              
294             =head2 C<< CLASS->DebugMsg( $format, @args ) >>
295              
296             Print the debugging message specified by the C-style I and
297             I
298              
299             =cut
300             sub DebugMsg {
301 0     0 1 0 my ( $class, $fmt, @args ) = @_;
302 0         0 chomp $fmt;
303 0         0 printf STDERR ">>> $fmt\n", @args;
304             }
305              
306             =head2 C<< CLASS->AddTimer( $seconds, $coderef ) >>
307              
308             Add a timer to occur $seconds from now. $seconds may be fractional, but timers
309             are not guaranteed to fire at the exact time you ask for.
310              
311             Returns a timer object which you can call C<< $timer->cancel >> on if you need to.
312              
313             =cut
314             sub AddTimer {
315 6     6 1 1723 my $class = shift;
316 6         11 my ($secs, $coderef) = @_;
317              
318 6         19 my $fire_time = Time::HiRes::time() + $secs;
319              
320 6         20 my $timer = bless [$fire_time, $coderef], "Danga::Socket::Timer";
321              
322 6 100 100     41 if (!@Timers || $fire_time >= $Timers[-1][0]) {
323 2         4 push @Timers, $timer;
324 2         6 return $timer;
325             }
326              
327             # Now, where do we insert? (NOTE: this appears slow, algorithm-wise,
328             # but it was compared against calendar queues, heaps, naive push/sort,
329             # and a bunch of other versions, and found to be fastest with a large
330             # variety of datasets.)
331 4         12 for (my $i = 0; $i < @Timers; $i++) {
332 11 100       30 if ($Timers[$i][0] > $fire_time) {
333 4         7 splice(@Timers, $i, 0, $timer);
334 4         11 return $timer;
335             }
336             }
337              
338 0         0 die "Shouldn't get here.";
339             }
340              
341             =head2 C<< CLASS->DescriptorMap() >>
342              
343             Get the hash of Danga::Socket objects keyed by the file descriptor (fileno) they
344             are wrapping.
345              
346             Returns a hash in list context or a hashref in scalar context.
347              
348             =cut
349             sub DescriptorMap {
350 2 50   2 1 958 return wantarray ? %DescriptorMap : \%DescriptorMap;
351             }
352             *descriptor_map = *DescriptorMap;
353             *get_sock_ref = *DescriptorMap;
354              
355             sub _InitPoller
356             {
357 12 100   12   110 return if $DoneInit;
358 4         15 $DoneInit = 1;
359              
360 4 50       37 if ($HAVE_KQUEUE) {
    50          
361 0         0 $KQueue = IO::KQueue->new();
362 0         0 $HaveKQueue = $KQueue >= 0;
363 0 0       0 if ($HaveKQueue) {
364 0         0 *EventLoop = *KQueueEventLoop;
365             }
366             }
367             elsif (Sys::Syscall::epoll_defined()) {
368 4         30 $Epoll = eval { epoll_create(1024); };
  4         21  
369 4   33     141 $HaveEpoll = defined $Epoll && $Epoll >= 0;
370 4 50       13 if ($HaveEpoll) {
371 4         111 *EventLoop = *EpollEventLoop;
372             }
373             }
374              
375 4 50 33     36 if (!$HaveEpoll && !$HaveKQueue) {
376 0         0 require IO::Poll;
377 0         0 *EventLoop = *PollEventLoop;
378             }
379             }
380              
381             =head2 C<< CLASS->EventLoop() >>
382              
383             Start processing IO events. In most daemon programs this never exits. See
384             C below for how to exit the loop.
385              
386             =cut
387             sub FirstTimeEventLoop {
388 1     1 0 7 my $class = shift;
389              
390 1         6 _InitPoller();
391              
392 1 50       3 if ($HaveEpoll) {
    0          
393 1         4 EpollEventLoop($class);
394             } elsif ($HaveKQueue) {
395 0         0 KQueueEventLoop($class);
396             } else {
397 0         0 PollEventLoop($class);
398             }
399             }
400              
401             ## profiling-related data/functions
402             our ($Prof_utime0, $Prof_stime0);
403             sub _pre_profile {
404 0     0   0 ($Prof_utime0, $Prof_stime0) = getrusage();
405             }
406              
407             sub _post_profile {
408             # get post information
409 0     0   0 my ($autime, $astime) = getrusage();
410              
411             # calculate differences
412 0         0 my $utime = $autime - $Prof_utime0;
413 0         0 my $stime = $astime - $Prof_stime0;
414              
415 0         0 foreach my $k (@_) {
416 0   0     0 $Profiling{$k} ||= [ 0.0, 0.0, 0 ];
417 0         0 $Profiling{$k}->[0] += $utime;
418 0         0 $Profiling{$k}->[1] += $stime;
419 0         0 $Profiling{$k}->[2]++;
420             }
421             }
422              
423             # runs timers and returns milliseconds for next one, or next event loop
424             sub RunTimers {
425 49 100   49 0 585 return $LoopTimeout unless @Timers;
426              
427 21         120 my $now = Time::HiRes::time();
428              
429             # Run expired timers
430 21   100     218 while (@Timers && $Timers[0][0] <= $now) {
431 6         12 my $to_run = shift(@Timers);
432 6 50       40 $to_run->[1]->($now) if $to_run->[1];
433             }
434              
435 21 100       11425 return $LoopTimeout unless @Timers;
436              
437             # convert time to an even number of milliseconds, adding 1
438             # extra, otherwise floating point fun can occur and we'll
439             # call RunTimers like 20-30 times, each returning a timeout
440             # of 0.0000212 seconds
441 20         107 my $timeout = int(($Timers[0][0] - $now) * 1000) + 1;
442              
443             # -1 is an infinite timeout, so prefer a real timeout
444 20 50       57 return $timeout if $LoopTimeout == -1;
445              
446             # otherwise pick the lower of our regular timeout and time until
447             # the next timer
448 20 100       79 return $LoopTimeout if $LoopTimeout < $timeout;
449 5         17 return $timeout;
450             }
451              
452             ### The epoll-based event loop. Gets installed as EventLoop if IO::Epoll loads
453             ### okay.
454             sub EpollEventLoop {
455 5     5 0 46 my $class = shift;
456              
457 5         21 foreach my $fd ( keys %OtherFds ) {
458 0 0       0 if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, EPOLLIN) == -1) {
459 0         0 warn "epoll_ctl(): failure adding fd=$fd; $! (", $!+0, ")\n";
460             }
461             }
462              
463 5         10 while (1) {
464 43         80 my @events;
465             my $i;
466 43         139 my $timeout = RunTimers();
467              
468             # get up to 1000 events
469 43         211 my $evcount = epoll_wait($Epoll, 1000, $timeout, \@events);
470             EVENT:
471 43         6941361 for ($i=0; $i<$evcount; $i++) {
472 16         25 my $ev = $events[$i];
473              
474             # it's possible epoll_wait returned many events, including some at the end
475             # that ones in the front triggered unregister-interest actions. if we
476             # can't find the %sock entry, it's because we're no longer interested
477             # in that event.
478 16         41 my Danga::Socket $pob = $DescriptorMap{$ev->[0]};
479 16         25 my $code;
480 16         21 my $state = $ev->[1];
481              
482             # if we didn't find a Perlbal::Socket subclass for that fd, try other
483             # pseudo-registered (above) fds.
484 16 50       35 if (! $pob) {
485 0 0       0 if (my $code = $OtherFds{$ev->[0]}) {
486 0         0 $code->($state);
487             } else {
488 0         0 my $fd = $ev->[0];
489 0         0 warn "epoll() returned fd $fd w/ state $state for which we have no mapping. removing.\n";
490 0         0 POSIX::close($fd);
491 0         0 epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, 0);
492             }
493 0         0 next;
494             }
495              
496 16         15 DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), state=%d \@ %s\n",
497             $ev->[0], ref($pob), $ev->[1], time);
498              
499 16 50       36 if ($DoProfile) {
500 0         0 my $class = ref $pob;
501              
502             # call profiling action on things that need to be done
503 0 0 0     0 if ($state & EPOLLIN && ! $pob->{closed}) {
504 0         0 _pre_profile();
505 0         0 $pob->event_read;
506 0         0 _post_profile("$class-read");
507             }
508              
509 0 0 0     0 if ($state & EPOLLOUT && ! $pob->{closed}) {
510 0         0 _pre_profile();
511 0         0 $pob->event_write;
512 0         0 _post_profile("$class-write");
513             }
514              
515 0 0       0 if ($state & (EPOLLERR|EPOLLHUP)) {
516 0 0 0     0 if ($state & EPOLLERR && ! $pob->{closed}) {
517 0         0 _pre_profile();
518 0         0 $pob->event_err;
519 0         0 _post_profile("$class-err");
520             }
521 0 0 0     0 if ($state & EPOLLHUP && ! $pob->{closed}) {
522 0         0 _pre_profile();
523 0         0 $pob->event_hup;
524 0         0 _post_profile("$class-hup");
525             }
526             }
527              
528 0         0 next;
529             }
530              
531             # standard non-profiling codepat
532 16 100 66     67 $pob->event_read if $state & EPOLLIN && ! $pob->{closed};
533 16 100 100     1593 $pob->event_write if $state & EPOLLOUT && ! $pob->{closed};
534 15 50       67 if ($state & (EPOLLERR|EPOLLHUP)) {
535 0 0 0     0 $pob->event_err if $state & EPOLLERR && ! $pob->{closed};
536 0 0 0     0 $pob->event_hup if $state & EPOLLHUP && ! $pob->{closed};
537             }
538             }
539 42 100       319 return unless PostEventLoop();
540             }
541 0         0 exit 0;
542             }
543              
544             ### The fallback IO::Poll-based event loop. Gets installed as EventLoop if
545             ### IO::Epoll fails to load.
546             sub PollEventLoop {
547 1     1 0 1507 my $class = shift;
548              
549 1         3 my Danga::Socket $pob;
550              
551 1         1 while (1) {
552 6         36 my $timeout = RunTimers();
553              
554             # the following sets up @poll as a series of ($poll,$event_mask)
555             # items, then uses IO::Poll::_poll, implemented in XS, which
556             # modifies the array in place with the even elements being
557             # replaced with the event masks that occured.
558 6         13 my @poll;
559 6         111 foreach my $fd ( keys %OtherFds ) {
560 0         0 push @poll, $fd, POLLIN;
561             }
562 6         36 while ( my ($fd, $sock) = each %DescriptorMap ) {
563 10         53 push @poll, $fd, $sock->{event_watch};
564             }
565              
566             # if nothing to poll, either end immediately (if no timeout)
567             # or just keep calling the callback
568 6 100       20 unless (@poll) {
569 1         150410 select undef, undef, undef, ($timeout / 1000);
570 1 50       23 return unless PostEventLoop();
571 1         6 next;
572             }
573              
574 5         308270 my $count = IO::Poll::_poll($timeout, @poll);
575 5 100       31 unless ($count) {
576 2 50       17 return unless PostEventLoop();
577 2         9 next;
578             }
579              
580             # Fetch handles with read events
581 3         11 while (@poll) {
582 8         20 my ($fd, $state) = splice(@poll, 0, 2);
583 8 100       22 next unless $state;
584              
585 4         12 $pob = $DescriptorMap{$fd};
586              
587 4 50       16 if (!$pob) {
588 0 0       0 if (my $code = $OtherFds{$fd}) {
589 0         0 $code->($state);
590             }
591 0         0 next;
592             }
593              
594 4 100 66     36 $pob->event_read if $state & POLLIN && ! $pob->{closed};
595 4 100 66     2761 $pob->event_write if $state & POLLOUT && ! $pob->{closed};
596 4 50 33     14 $pob->event_err if $state & POLLERR && ! $pob->{closed};
597 4 50 33     23 $pob->event_hup if $state & POLLHUP && ! $pob->{closed};
598             }
599              
600 3 100       10 return unless PostEventLoop();
601             }
602              
603 0         0 exit 0;
604             }
605              
606             ### The kqueue-based event loop. Gets installed as EventLoop if IO::KQueue works
607             ### okay.
608             sub KQueueEventLoop {
609 0     0 0 0 my $class = shift;
610              
611 0         0 foreach my $fd (keys %OtherFds) {
612 0         0 $KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(), IO::KQueue::EV_ADD());
613             }
614              
615 0         0 while (1) {
616 0         0 my $timeout = RunTimers();
617 0         0 my @ret = $KQueue->kevent($timeout);
618              
619 0         0 foreach my $kev (@ret) {
620 0         0 my ($fd, $filter, $flags, $fflags) = @$kev;
621 0         0 my Danga::Socket $pob = $DescriptorMap{$fd};
622 0 0       0 if (!$pob) {
623 0 0       0 if (my $code = $OtherFds{$fd}) {
624 0         0 $code->($filter);
625             } else {
626 0         0 warn "kevent() returned fd $fd for which we have no mapping. removing.\n";
627 0         0 POSIX::close($fd); # close deletes the kevent entry
628             }
629 0         0 next;
630             }
631              
632 0         0 DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), flags=%d \@ %s\n",
633             $fd, ref($pob), $flags, time);
634              
635 0 0 0     0 $pob->event_read if $filter == IO::KQueue::EVFILT_READ() && !$pob->{closed};
636 0 0 0     0 $pob->event_write if $filter == IO::KQueue::EVFILT_WRITE() && !$pob->{closed};
637 0 0 0     0 if ($flags == IO::KQueue::EV_EOF() && !$pob->{closed}) {
638 0 0       0 if ($fflags) {
639 0         0 $pob->event_err;
640             } else {
641 0         0 $pob->event_hup;
642             }
643             }
644             }
645 0 0       0 return unless PostEventLoop();
646             }
647              
648 0         0 exit(0);
649             }
650              
651             =head2 C<< CLASS->SetPostLoopCallback( CODEREF ) >>
652              
653             Sets post loop callback function. Pass a subref and it will be
654             called every time the event loop finishes.
655              
656             Return 1 (or any true value) from the sub to make the loop continue, 0 or false
657             and it will exit.
658              
659             The callback function will be passed two parameters: \%DescriptorMap, \%OtherFds.
660              
661             =cut
662             sub SetPostLoopCallback {
663 6     6 1 505 my ($class, $ref) = @_;
664              
665 6 100       24 if (ref $class) {
666             # per-object callback
667 2         4 my Danga::Socket $self = $class;
668 2 50 33     17 if (defined $ref && ref $ref eq 'CODE') {
669 2         11 $PLCMap{$self->{fd}} = $ref;
670             } else {
671 0         0 delete $PLCMap{$self->{fd}};
672             }
673             } else {
674             # global callback
675 4 50 33     39 $PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef;
676             }
677             }
678              
679             # Internal function: run the post-event callback, send read events
680             # for pushed-back data, and close pending connections. returns 1
681             # if event loop should continue, or 0 to shut it all down.
682             sub PostEventLoop {
683             # fire read events for objects with pushed-back read data
684 48     48 0 123 my $loop = 1;
685 48         288 while ($loop) {
686 54         2075 $loop = 0;
687 54         490 foreach my $fd (keys %PushBackSet) {
688 6         11 my Danga::Socket $pob = $PushBackSet{$fd};
689              
690             # a previous event_read invocation could've closed a
691             # connection that we already evaluated in "keys
692             # %PushBackSet", so skip ones that seem to have
693             # disappeared. this is expected.
694 6 50       18 next unless $pob;
695              
696 6 50       8 die "ASSERT: the $pob socket has no read_push_back" unless @{$pob->{read_push_back}};
  6         19  
697 6 50 33     146 next unless (! $pob->{closed} &&
698             $pob->{event_watch} & POLLIN);
699 6         8 $loop = 1;
700 6         19 $pob->event_read;
701             }
702             }
703              
704             # now we can close sockets that wanted to close during our event processing.
705             # (we didn't want to close them during the loop, as we didn't want fd numbers
706             # being reused and confused during the event loop)
707 48         358 while (my $sock = shift @ToClose) {
708 1         3 my $fd = fileno($sock);
709              
710             # close the socket. (not a Danga::Socket close)
711 1         14 $sock->close;
712              
713             # and now we can finally remove the fd from the map. see
714             # comment above in _cleanup.
715 1         119 delete $DescriptorMap{$fd};
716             }
717              
718              
719             # by default we keep running, unless a postloop callback (either per-object
720             # or global) cancels it
721 48         93 my $keep_running = 1;
722              
723             # per-object post-loop-callbacks
724 48         300 for my $plc (values %PLCMap) {
725 2   66     947 $keep_running &&= $plc->(\%DescriptorMap, \%OtherFds);
726             }
727              
728             # now we're at the very end, call callback if defined
729 48 100       733 if (defined $PostLoopCallback) {
730 38   100     1133 $keep_running &&= $PostLoopCallback->(\%DescriptorMap, \%OtherFds);
731             }
732              
733 48         4293 return $keep_running;
734             }
735              
736             #####################################################################
737             ### Danga::Socket-the-object code
738             #####################################################################
739              
740             =head2 OBJECT METHODS
741              
742             =head2 C<< CLASS->new( $socket ) >>
743              
744             Create a new Danga::Socket subclass object for the given I which will
745             react to events on it during the C.
746              
747             This is normally (always?) called from your subclass via:
748              
749             $class->SUPER::new($socket);
750              
751             =cut
752             sub new {
753 10     10 1 19462 my Danga::Socket $self = shift;
754 10 100       75 $self = fields::new($self) unless ref $self;
755              
756 10         11671 my $sock = shift;
757              
758 10         29 $self->{sock} = $sock;
759 10         32 my $fd = fileno($sock);
760              
761 10 50 0     85 Carp::cluck("undef sock and/or fd in Danga::Socket->new. sock=" . ($sock || "") . ", fd=" . ($fd || ""))
      0        
      33        
762             unless $sock && $fd;
763              
764 10         33 $self->{fd} = $fd;
765 10         30 $self->{write_buf} = [];
766 10         23 $self->{write_buf_offset} = 0;
767 10         21 $self->{write_buf_size} = 0;
768 10         24 $self->{closed} = 0;
769 10         20 $self->{corked} = 0;
770 10         24 $self->{read_push_back} = [];
771              
772 10         20 $self->{event_watch} = POLLERR|POLLHUP|POLLNVAL;
773              
774 10         46 _InitPoller();
775              
776 10 50       60 if ($HaveEpoll) {
    0          
777 10 50       65 epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $self->{event_watch})
778             and die "couldn't add epoll watch for $fd\n";
779             }
780             elsif ($HaveKQueue) {
781             # Add them to the queue but disabled for now
782 0         0 $KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(),
783             IO::KQueue::EV_ADD() | IO::KQueue::EV_DISABLE());
784 0         0 $KQueue->EV_SET($fd, IO::KQueue::EVFILT_WRITE(),
785             IO::KQueue::EV_ADD() | IO::KQueue::EV_DISABLE());
786             }
787              
788 10 50       308 Carp::cluck("Danga::Socket::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})")
789             if $DescriptorMap{$fd};
790              
791 10         28 $DescriptorMap{$fd} = $self;
792 10         35 return $self;
793             }
794              
795              
796             #####################################################################
797             ### I N S T A N C E M E T H O D S
798             #####################################################################
799              
800             =head2 C<< $obj->tcp_cork( $boolean ) >>
801              
802             Turn TCP_CORK on or off depending on the value of I.
803              
804             =cut
805             sub tcp_cork {
806 1     1 1 2 my Danga::Socket $self = $_[0];
807 1         2 my $val = $_[1];
808              
809             # make sure we have a socket
810 1 50       5 return unless $self->{sock};
811 1 50       5 return if $val == $self->{corked};
812              
813 0         0 my $rv;
814 0         0 if (TCP_CORK) {
815 0 0       0 $rv = setsockopt($self->{sock}, IPPROTO_TCP, TCP_CORK,
816             pack("l", $val ? 1 : 0));
817             } else {
818             # FIXME: implement freebsd *PUSH sockopts
819             $rv = 1;
820             }
821              
822             # if we failed, close (if we're not already) and warn about the error
823 0 0       0 if ($rv) {
824 0         0 $self->{corked} = $val;
825             } else {
826 0 0 0     0 if ($! == EBADF || $! == ENOTSOCK) {
    0 0        
      0        
827             # internal state is probably corrupted; warn and then close if
828             # we're not closed already
829 0         0 warn "setsockopt: $!";
830 0         0 $self->close('tcp_cork_failed');
831             } elsif ($! == ENOPROTOOPT || $!{ENOTSOCK} || $!{EOPNOTSUPP}) {
832             # TCP implementation doesn't support corking, so just ignore it
833             # or we're trying to tcp-cork a non-socket (like a socketpair pipe
834             # which is acting like a socket, which Perlbal does for child
835             # processes acting like inetd-like web servers)
836             } else {
837             # some other error; we should never hit here, but if we do, die
838 0         0 die "setsockopt: $!";
839             }
840             }
841             }
842              
843             =head2 C<< $obj->steal_socket() >>
844              
845             Basically returns our socket and makes it so that we don't try to close it,
846             but we do remove it from epoll handlers. THIS CLOSES $self. It is the same
847             thing as calling close, except it gives you the socket to use.
848              
849             =cut
850             sub steal_socket {
851 0     0 1 0 my Danga::Socket $self = $_[0];
852 0 0       0 return if $self->{closed};
853              
854             # cleanup does most of the work of closing this socket
855 0         0 $self->_cleanup();
856              
857             # now undef our internal sock and fd structures so we don't use them
858 0         0 my $sock = $self->{sock};
859 0         0 $self->{sock} = undef;
860 0         0 return $sock;
861             }
862              
863             =head2 C<< $obj->close( [$reason] ) >>
864              
865             Close the socket. The I argument will be used in debugging messages.
866              
867             =cut
868             sub close {
869 10     10 1 135 my Danga::Socket $self = $_[0];
870 10 100       35 return if $self->{closed};
871              
872             # print out debugging info for this close
873 1         2 if (DebugLevel) {
874             my ($pkg, $filename, $line) = caller;
875             my $reason = $_[1] || "";
876             warn "Closing \#$self->{fd} due to $pkg/$filename/$line ($reason)\n";
877             }
878              
879             # this does most of the work of closing us
880 1         7 $self->_cleanup();
881              
882             # defer closing the actual socket until the event loop is done
883             # processing this round of events. (otherwise we might reuse fds)
884 1 50       4 if ($self->{sock}) {
885 1         3 push @ToClose, $self->{sock};
886 1         3 $self->{sock} = undef;
887             }
888              
889 1         2 return 0;
890             }
891              
892             ### METHOD: _cleanup()
893             ### Called by our closers so we can clean internal data structures.
894             sub _cleanup {
895 1     1   2 my Danga::Socket $self = $_[0];
896              
897             # we're effectively closed; we have no fd and sock when we leave here
898 1         8 $self->{closed} = 1;
899              
900             # we need to flush our write buffer, as there may
901             # be self-referential closures (sub { $client->close })
902             # preventing the object from being destroyed
903 1         3 $self->{write_buf} = [];
904              
905             # uncork so any final data gets sent. only matters if the person closing
906             # us forgot to do it, but we do it to be safe.
907 1         8 $self->tcp_cork(0);
908              
909             # if we're using epoll, we have to remove this from our epoll fd so we stop getting
910             # notifications about it
911 1 50 33     8 if ($HaveEpoll && $self->{fd}) {
912 1 50       4 if (epoll_ctl($Epoll, EPOLL_CTL_DEL, $self->{fd}, $self->{event_watch}) != 0) {
913             # dump_error prints a backtrace so we can try to figure out why this happened
914 0         0 $self->dump_error("epoll_ctl(): failure deleting fd=$self->{fd} during _cleanup(); $! (" . ($!+0) . ")");
915             }
916             }
917              
918             # now delete from mappings. this fd no longer belongs to us, so we don't want
919             # to get alerts for it if it becomes writable/readable/etc.
920 1         13 delete $PushBackSet{$self->{fd}};
921 1         2 delete $PLCMap{$self->{fd}};
922              
923             # we explicitly don't delete from DescriptorMap here until we
924             # actually close the socket, as we might be in the middle of
925             # processing an epoll_wait/etc that returned hundreds of fds, one
926             # of which is not yet processed and is what we're closing. if we
927             # keep it in DescriptorMap, then the event harnesses can just
928             # looked at $pob->{closed} and ignore it. but if it's an
929             # un-accounted for fd, then it (understandably) freak out a bit
930             # and emit warnings, thinking their state got off.
931              
932             # and finally get rid of our fd so we can't use it anywhere else
933 1         2 $self->{fd} = undef;
934             }
935              
936             =head2 C<< $obj->sock() >>
937              
938             Returns the underlying IO::Handle for the object.
939              
940             =cut
941             sub sock {
942 0     0 1 0 my Danga::Socket $self = shift;
943 0         0 return $self->{sock};
944             }
945              
946             =head2 C<< $obj->set_writer_func( CODEREF ) >>
947              
948             Sets a function to use instead of C when writing data to the socket.
949              
950             =cut
951             sub set_writer_func {
952 0     0 1 0 my Danga::Socket $self = shift;
953 0         0 my $wtr = shift;
954 0 0 0     0 Carp::croak("Not a subref") unless !defined $wtr || UNIVERSAL::isa($wtr, "CODE");
955 0         0 $self->{writer_func} = $wtr;
956             }
957              
958             =head2 C<< $obj->write( $data ) >>
959              
960             Write the specified data to the underlying handle. I may be scalar,
961             scalar ref, code ref (to run when there), or undef just to kick-start.
962             Returns 1 if writes all went through, or 0 if there are writes in queue. If
963             it returns 1, caller should stop waiting for 'writable' events)
964              
965             =cut
966             sub write {
967 2     2 1 1226 my Danga::Socket $self;
968             my $data;
969 2         6 ($self, $data) = @_;
970              
971             # nobody should be writing to closed sockets, but caller code can
972             # do two writes within an event, have the first fail and
973             # disconnect the other side (whose destructor then closes the
974             # calling object, but it's still in a method), and then the
975             # now-dead object does its second write. that is this case. we
976             # just lie and say it worked. it'll be dead soon and won't be
977             # hurt by this lie.
978 2 50       7 return 1 if $self->{closed};
979              
980 2         5 my $bref;
981              
982             # just queue data if there's already a wait
983             my $need_queue;
984              
985 2 50       7 if (defined $data) {
986 2 50       7 $bref = ref $data ? $data : \$data;
987 2 50       10 if ($self->{write_buf_size}) {
988 0         0 push @{$self->{write_buf}}, $bref;
  0         0  
989 0 0       0 $self->{write_buf_size} += ref $bref eq "SCALAR" ? length($$bref) : 1;
990 0         0 return 0;
991             }
992              
993             # this flag says we're bypassing the queue system, knowing we're the
994             # only outstanding write, and hoping we don't ever need to use it.
995             # if so later, though, we'll need to queue
996 2         5 $need_queue = 1;
997             }
998              
999             WRITE:
1000 2         3 while (1) {
1001 2 50 33     23 return 1 unless $bref ||= $self->{write_buf}[0];
1002              
1003 2         3 my $len;
1004 2         4 eval {
1005 2         6 $len = length($$bref); # this will die if $bref is a code ref, caught below
1006             };
1007 2 50       7 if ($@) {
1008 0 0       0 if (UNIVERSAL::isa($bref, "CODE")) {
1009 0 0       0 unless ($need_queue) {
1010 0         0 $self->{write_buf_size}--; # code refs are worth 1
1011 0         0 shift @{$self->{write_buf}};
  0         0  
1012             }
1013 0         0 $bref->();
1014              
1015             # code refs are just run and never get reenqueued
1016             # (they're one-shot), so turn off the flag indicating the
1017             # outstanding data needs queueing.
1018 0         0 $need_queue = 0;
1019              
1020 0         0 undef $bref;
1021 0         0 next WRITE;
1022             }
1023 0         0 die "Write error: $@ <$bref>";
1024             }
1025              
1026 2         6 my $to_write = $len - $self->{write_buf_offset};
1027 2         4 my $written;
1028 2 50       6 if (my $wtr = $self->{writer_func}) {
1029 0         0 $written = $wtr->($bref, $to_write, $self->{write_buf_offset});
1030             } else {
1031 2         190 $written = syswrite($self->{sock}, $$bref, $to_write, $self->{write_buf_offset});
1032             }
1033              
1034 2 50       22 if (! defined $written) {
    50          
    50          
1035 0 0       0 if ($! == EPIPE) {
    0          
    0          
1036 0         0 return $self->close("EPIPE");
1037             } elsif ($! == EAGAIN) {
1038             # since connection has stuff to write, it should now be
1039             # interested in pending writes:
1040 0 0       0 if ($need_queue) {
1041 0         0 push @{$self->{write_buf}}, $bref;
  0         0  
1042 0         0 $self->{write_buf_size} += $len;
1043             }
1044 0 0       0 $self->{write_set_watch} = 1 unless $self->{event_watch} & POLLOUT;
1045 0         0 $self->watch_write(1);
1046 0         0 return 0;
1047             } elsif ($! == ECONNRESET) {
1048 0         0 return $self->close("ECONNRESET");
1049             }
1050              
1051 0         0 DebugLevel >= 1 && $self->debugmsg("Closing connection ($self) due to write error: $!\n");
1052              
1053 0         0 return $self->close("write_error");
1054             } elsif ($written != $to_write) {
1055 0         0 DebugLevel >= 2 && $self->debugmsg("Wrote PARTIAL %d bytes to %d",
1056             $written, $self->{fd});
1057 0 0       0 if ($need_queue) {
1058 0         0 push @{$self->{write_buf}}, $bref;
  0         0  
1059 0         0 $self->{write_buf_size} += $len;
1060             }
1061             # since connection has stuff to write, it should now be
1062             # interested in pending writes:
1063 0         0 $self->{write_buf_offset} += $written;
1064 0         0 $self->{write_buf_size} -= $written;
1065 0         0 $self->on_incomplete_write;
1066 0         0 return 0;
1067             } elsif ($written == $to_write) {
1068 2         3 DebugLevel >= 2 && $self->debugmsg("Wrote ALL %d bytes to %d (nq=%d)",
1069             $written, $self->{fd}, $need_queue);
1070 2         19 $self->{write_buf_offset} = 0;
1071              
1072 2 50       8 if ($self->{write_set_watch}) {
1073 0         0 $self->watch_write(0);
1074 0         0 $self->{write_set_watch} = 0;
1075             }
1076              
1077             # this was our only write, so we can return immediately
1078             # since we avoided incrementing the buffer size or
1079             # putting it in the buffer. we also know there
1080             # can't be anything else to write.
1081 2 50       10 return 1 if $need_queue;
1082              
1083 0         0 $self->{write_buf_size} -= $written;
1084 0         0 shift @{$self->{write_buf}};
  0         0  
1085 0         0 undef $bref;
1086 0         0 next WRITE;
1087             }
1088             }
1089             }
1090              
1091             sub on_incomplete_write {
1092 0     0 0 0 my Danga::Socket $self = shift;
1093 0 0       0 $self->{write_set_watch} = 1 unless $self->{event_watch} & POLLOUT;
1094 0         0 $self->watch_write(1);
1095             }
1096              
1097             =head2 C<< $obj->push_back_read( $buf ) >>
1098              
1099             Push back I (a scalar or scalarref) into the read stream. Useful if you read
1100             more than you need to and want to return this data on the next "read".
1101              
1102             =cut
1103             sub push_back_read {
1104 4     4 1 2973 my Danga::Socket $self = shift;
1105 4         9 my $buf = shift;
1106 4 50       6 push @{$self->{read_push_back}}, ref $buf ? $buf : \$buf;
  4         20  
1107 4         18 $PushBackSet{$self->{fd}} = $self;
1108             }
1109              
1110             =head2 C<< $obj->read( $bytecount ) >>
1111              
1112             Read at most I bytes from the underlying handle; returns scalar
1113             ref on read, or undef on connection closed.
1114              
1115             =cut
1116             sub read {
1117 10     10 1 130 my Danga::Socket $self = shift;
1118 10 50       30 return if $self->{closed};
1119 10         12 my $bytes = shift;
1120 10         9 my $buf;
1121 10         16 my $sock = $self->{sock};
1122              
1123 10 100       14 if (@{$self->{read_push_back}}) {
  10         422  
1124 6         8 $buf = shift @{$self->{read_push_back}};
  6         14  
1125 6         19 my $len = length($$buf);
1126              
1127 6 100       16 if ($len <= $bytes) {
1128 4 50       4 delete $PushBackSet{$self->{fd}} unless @{$self->{read_push_back}};
  4         18  
1129 4         14 return $buf;
1130             } else {
1131             # if the pushed back read is too big, we have to split it
1132 2         7 my $overflow = substr($$buf, $bytes);
1133 2         6 $buf = substr($$buf, 0, $bytes);
1134 2         3 unshift @{$self->{read_push_back}}, \$overflow;
  2         7  
1135 2         6 return \$buf;
1136             }
1137             }
1138              
1139             # if this is too high, perl quits(!!). reports on mailing lists
1140             # don't seem to point to a universal answer. 5MB worked for some,
1141             # crashed for others. 1MB works for more people. let's go with 1MB
1142             # for now. :/
1143 4 50       12 my $req_bytes = $bytes > 1048576 ? 1048576 : $bytes;
1144              
1145 4         93 my $res = sysread($sock, $buf, $req_bytes, 0);
1146 4         5 DebugLevel >= 2 && $self->debugmsg("sysread = %d; \$! = %d", $res, $!);
1147              
1148 4 50 33     19 if (! $res && $! != EWOULDBLOCK) {
1149             # catches 0=conn closed or undef=error
1150 0         0 DebugLevel >= 2 && $self->debugmsg("Fd \#%d read hit the end of the road.", $self->{fd});
1151 0         0 return undef;
1152             }
1153              
1154 4         13 return \$buf;
1155             }
1156              
1157             =head2 (VIRTUAL) C<< $obj->event_read() >>
1158              
1159             Readable event handler. Concrete deriviatives of Danga::Socket should
1160             provide an implementation of this. The default implementation will die if
1161             called.
1162              
1163             =cut
1164 0     0 1 0 sub event_read { die "Base class event_read called for $_[0]\n"; }
1165              
1166             =head2 (VIRTUAL) C<< $obj->event_err() >>
1167              
1168             Error event handler. Concrete deriviatives of Danga::Socket should
1169             provide an implementation of this. The default implementation will die if
1170             called.
1171              
1172             =cut
1173 0     0 1 0 sub event_err { die "Base class event_err called for $_[0]\n"; }
1174              
1175             =head2 (VIRTUAL) C<< $obj->event_hup() >>
1176              
1177             'Hangup' event handler. Concrete deriviatives of Danga::Socket should
1178             provide an implementation of this. The default implementation will die if
1179             called.
1180              
1181             =cut
1182 0     0 1 0 sub event_hup { die "Base class event_hup called for $_[0]\n"; }
1183              
1184             =head2 C<< $obj->event_write() >>
1185              
1186             Writable event handler. Concrete deriviatives of Danga::Socket may wish to
1187             provide an implementation of this. The default implementation calls
1188             C with an C.
1189              
1190             =cut
1191             sub event_write {
1192 0     0 1 0 my $self = shift;
1193 0         0 $self->write(undef);
1194             }
1195              
1196             =head2 C<< $obj->watch_read( $boolean ) >>
1197              
1198             Turn 'readable' event notification on or off.
1199              
1200             =cut
1201             sub watch_read {
1202 8     8 1 1154 my Danga::Socket $self = shift;
1203 8 50 33     70 return if $self->{closed} || !$self->{sock};
1204              
1205 8         12 my $val = shift;
1206 8         18 my $event = $self->{event_watch};
1207              
1208 8 100       62 $event &= ~POLLIN if ! $val;
1209 8 100       29 $event |= POLLIN if $val;
1210              
1211             # If it changed, set it
1212 8 50       29 if ($event != $self->{event_watch}) {
1213 8 50       102 if ($HaveKQueue) {
    50          
1214 0 0       0 $KQueue->EV_SET($self->{fd}, IO::KQueue::EVFILT_READ(),
1215             $val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE());
1216             }
1217             elsif ($HaveEpoll) {
1218 8 50       34 epoll_ctl($Epoll, EPOLL_CTL_MOD, $self->{fd}, $event)
1219             and $self->dump_error("couldn't modify epoll settings for $self->{fd} " .
1220             "from $self->{event_watch} -> $event: $! (" . ($!+0) . ")");
1221             }
1222 8         109 $self->{event_watch} = $event;
1223             }
1224             }
1225              
1226             =head2 C<< $obj->watch_write( $boolean ) >>
1227              
1228             Turn 'writable' event notification on or off.
1229              
1230             =cut
1231             sub watch_write {
1232 6     6 1 55 my Danga::Socket $self = shift;
1233 6 50 33     50 return if $self->{closed} || !$self->{sock};
1234              
1235 6         12 my $val = shift;
1236 6         12 my $event = $self->{event_watch};
1237              
1238 6 100       17 $event &= ~POLLOUT if ! $val;
1239 6 100       17 $event |= POLLOUT if $val;
1240              
1241 6 100 66     75 if ($val && caller ne __PACKAGE__) {
1242             # A subclass registered interest, it's now responsible for this.
1243 4         12 $self->{write_set_watch} = 0;
1244             }
1245              
1246             # If it changed, set it
1247 6 50       19 if ($event != $self->{event_watch}) {
1248 6 50       24 if ($HaveKQueue) {
    50          
1249 0 0       0 $KQueue->EV_SET($self->{fd}, IO::KQueue::EVFILT_WRITE(),
1250             $val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE());
1251             }
1252             elsif ($HaveEpoll) {
1253 6 50       28 epoll_ctl($Epoll, EPOLL_CTL_MOD, $self->{fd}, $event)
1254             and $self->dump_error("couldn't modify epoll settings for $self->{fd} " .
1255             "from $self->{event_watch} -> $event: $! (" . ($!+0) . ")");
1256             }
1257 6         132 $self->{event_watch} = $event;
1258             }
1259             }
1260              
1261             =head2 C<< $obj->dump_error( $message ) >>
1262              
1263             Prints to STDERR a backtrace with information about this socket and what lead
1264             up to the dump_error call.
1265              
1266             =cut
1267             sub dump_error {
1268 0     0 1 0 my $i = 0;
1269 0         0 my @list;
1270 0         0 while (my ($file, $line, $sub) = (caller($i++))[1..3]) {
1271 0         0 push @list, "\t$file:$line called $sub\n";
1272             }
1273              
1274 0         0 warn "ERROR: $_[1]\n" .
1275             "\t$_[0] = " . $_[0]->as_string . "\n" .
1276             join('', @list);
1277             }
1278              
1279             =head2 C<< $obj->debugmsg( $format, @args ) >>
1280              
1281             Print the debugging message specified by the C-style I and
1282             I.
1283              
1284             =cut
1285             sub debugmsg {
1286 0     0 1 0 my ( $self, $fmt, @args ) = @_;
1287 0 0       0 confess "Not an object" unless ref $self;
1288              
1289 0         0 chomp $fmt;
1290 0         0 printf STDERR ">>> $fmt\n", @args;
1291             }
1292              
1293              
1294             =head2 C<< $obj->peer_ip_string() >>
1295              
1296             Returns the string describing the peer's IP
1297              
1298             =cut
1299             sub peer_ip_string {
1300 2     2 1 4 my Danga::Socket $self = shift;
1301 2 50       9 return _undef("peer_ip_string undef: no sock") unless $self->{sock};
1302 2 50       228 return $self->{peer_ip} if defined $self->{peer_ip};
1303              
1304 2         25 my $pn = getpeername($self->{sock});
1305 2 50       7 return _undef("peer_ip_string undef: getpeername") unless $pn;
1306              
1307 2         5 my ($port, $iaddr) = eval {
1308 2 50       9 if (length($pn) >= 28) {
1309 0         0 return Socket6::unpack_sockaddr_in6($pn);
1310             } else {
1311 2         13 return Socket::sockaddr_in($pn);
1312             }
1313             };
1314              
1315 2 50       36 if ($@) {
1316 0         0 $self->{peer_port} = "[Unknown peerport '$@']";
1317 0         0 return "[Unknown peername '$@']";
1318             }
1319              
1320 2         7 $self->{peer_port} = $port;
1321              
1322 2 50       19 if (length($iaddr) == 4) {
1323 2         26 return $self->{peer_ip} = Socket::inet_ntoa($iaddr);
1324             } else {
1325 0         0 $self->{peer_v6} = 1;
1326 0         0 return $self->{peer_ip} = Socket6::inet_ntop(Socket6::AF_INET6(),
1327             $iaddr);
1328             }
1329             }
1330              
1331             =head2 C<< $obj->peer_addr_string() >>
1332              
1333             Returns the string describing the peer for the socket which underlies this
1334             object in form "ip:port"
1335              
1336             =cut
1337             sub peer_addr_string {
1338 2     2 1 22 my Danga::Socket $self = shift;
1339 2 50       52 my $ip = $self->peer_ip_string
1340             or return undef;
1341 2 50       64 return $self->{peer_v6} ?
1342             "[$ip]:$self->{peer_port}" :
1343             "$ip:$self->{peer_port}";
1344             }
1345              
1346             =head2 C<< $obj->local_ip_string() >>
1347              
1348             Returns the string describing the local IP
1349              
1350             =cut
1351             sub local_ip_string {
1352 2     2 1 4 my Danga::Socket $self = shift;
1353 2 50       16 return _undef("local_ip_string undef: no sock") unless $self->{sock};
1354 2 50       8 return $self->{local_ip} if defined $self->{local_ip};
1355              
1356 2         17 my $pn = getsockname($self->{sock});
1357 2 50       25 return _undef("local_ip_string undef: getsockname") unless $pn;
1358              
1359 2         8 my ($port, $iaddr) = Socket::sockaddr_in($pn);
1360 2         20 $self->{local_port} = $port;
1361              
1362 2         14 return $self->{local_ip} = Socket::inet_ntoa($iaddr);
1363             }
1364              
1365             =head2 C<< $obj->local_addr_string() >>
1366              
1367             Returns the string describing the local end of the socket which underlies this
1368             object in form "ip:port"
1369              
1370             =cut
1371             sub local_addr_string {
1372 2     2 1 21 my Danga::Socket $self = shift;
1373 2         12 my $ip = $self->local_ip_string;
1374 2 50       14 return $ip ? "$ip:$self->{local_port}" : undef;
1375             }
1376              
1377              
1378             =head2 C<< $obj->as_string() >>
1379              
1380             Returns a string describing this socket.
1381              
1382             =cut
1383             sub as_string {
1384 0     0 1   my Danga::Socket $self = shift;
1385 0 0         my $rw = "(" . ($self->{event_watch} & POLLIN ? 'R' : '') .
    0          
1386             ($self->{event_watch} & POLLOUT ? 'W' : '') . ")";
1387 0 0         my $ret = ref($self) . "$rw: " . ($self->{closed} ? "closed" : "open");
1388 0           my $peer = $self->peer_addr_string;
1389 0 0         if ($peer) {
1390 0           $ret .= " to " . $self->peer_addr_string;
1391             }
1392 0           return $ret;
1393             }
1394              
1395             sub _undef {
1396 0 0   0     return undef unless $ENV{DS_DEBUG};
1397 0   0       my $msg = shift || "";
1398 0           warn "Danga::Socket: $msg\n";
1399 0           return undef;
1400             }
1401              
1402             package Danga::Socket::Timer;
1403             # [$abs_float_firetime, $coderef];
1404             sub cancel {
1405 0     0     $_[0][1] = undef;
1406             }
1407              
1408             =head1 AUTHORS
1409              
1410             Brad Fitzpatrick - author
1411              
1412             Michael Granger - docs, testing
1413              
1414             Mark Smith - contributor, heavy user, testing
1415              
1416             Matt Sergeant - kqueue support, docs, timers, other bits
1417              
1418             =head1 BUGS
1419              
1420             Not documented enough (but isn't that true of every project?).
1421              
1422             tcp_cork only works on Linux for now. No BSD push/nopush support.
1423              
1424             =head1 LICENSE
1425              
1426             License is granted to use and distribute this module under the same
1427             terms as Perl itself.
1428              
1429             =cut
1430              
1431             1;
1432              
1433             # Local Variables:
1434             # mode: perl
1435             # c-basic-indent: 4
1436             # indent-tabs-mode: nil
1437             # End: