File Coverage

blib/lib/IO/Async/Loop/Epoll.pm
Criterion Covered Total %
statement 135 188 71.8
branch 26 106 24.5
condition 14 29 48.2
subroutine 22 23 95.6
pod 7 7 100.0
total 204 353 57.7


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2008-2018 -- leonerd@leonerd.org.uk
5              
6             package IO::Async::Loop::Epoll;
7              
8 11     11   623661 use strict;
  11         50  
  11         302  
9 11     11   54 use warnings;
  11         21  
  11         391  
10              
11             our $VERSION = '0.20';
12 11     11   47 use constant API_VERSION => '0.49';
  11         21  
  11         702  
13              
14             # Only Linux is known always to be able to report EOF conditions on
15             # filehandles using EPOLLHUP
16             # This is probably redundant as epoll is probably Linux-only also, but it
17             # doesn't harm anything to test specially.
18 11     11   59 use constant _CAN_ON_HANGUP => ( $^O eq "linux" );
  11         17  
  11         482  
19              
20 11     11   55 use base qw( IO::Async::Loop );
  11         17  
  11         7045  
21              
22 11     11   177972 use Carp;
  11         23  
  11         621  
23              
24 11     11   5594 use Linux::Epoll 0.005;
  11         15539  
  11         536  
25              
26 11     11   76 use POSIX qw( EINTR EPERM SIG_BLOCK SIG_UNBLOCK sigprocmask sigaction ceil );
  11         19  
  11         62  
27 11     11   1000 use Scalar::Util qw( refaddr );
  11         16  
  11         393  
28              
29 11     11   52 use constant _CAN_WATCHDOG => 1;
  11         26  
  11         584  
30 11     11   55 use constant WATCHDOG_ENABLE => IO::Async::Loop->WATCHDOG_ENABLE;
  11         17  
  11         462  
31              
32 11     11   4269 use Struct::Dumb;
  11         20329  
  11         39  
33              
34             struct SignalWatch => [qw( code pending orig )];
35              
36             =head1 NAME
37              
38             C - use C with C on Linux
39              
40             =head1 SYNOPSIS
41              
42             use IO::Async::Loop::Epoll;
43              
44             use IO::Async::Stream;
45             use IO::Async::Signal;
46              
47             my $loop = IO::Async::Loop::Epoll->new();
48              
49             $loop->add( IO::Async::Stream->new(
50             read_handle => \*STDIN,
51             on_read => sub {
52             my ( $self, $buffref ) = @_;
53             while( $$buffref =~ s/^(.*)\r?\n// ) {
54             print "You said: $1\n";
55             }
56             },
57             ) );
58              
59             $loop->add( IO::Async::Signal->new(
60             name => 'INT',
61             on_receipt => sub {
62             print "SIGINT, will now quit\n";
63             $loop->loop_stop;
64             },
65             ) );
66              
67             $loop->loop_forever();
68              
69             =head1 DESCRIPTION
70              
71             This subclass of L uses C on Linux to perform
72             read-ready and write-ready tests so that the OZ<>(1) high-performance
73             multiplexing of Linux's C syscall can be used.
74              
75             The C Linux subsystem uses a persistent registration system, meaning
76             that better performance can be achieved in programs using a large number of
77             filehandles. Each C syscall only has an overhead proportional
78             to the number of ready filehandles, rather than the total number being
79             watched. For more detail, see the C manpage.
80              
81             This class uses the C system call, which atomically switches
82             the process's signal mask, performs a wait exactly as C would,
83             then switches it back. This allows a process to block the signals it cares
84             about, but switch in an empty signal mask during the poll, allowing it to
85             handle file IO and signals concurrently.
86              
87             =cut
88              
89             =head1 CONSTRUCTOR
90              
91             =cut
92              
93             =head2 new
94              
95             $loop = IO::Async::Loop::Epoll->new()
96              
97             This function returns a new instance of a C object.
98              
99             =cut
100              
101             sub new
102             {
103 10     10 1 398 my $class = shift;
104 10         24 my ( %args ) = @_;
105              
106 10         301 my $epoll = Linux::Epoll->new;
107 10 50       49 defined $epoll or croak "Cannot create epoll handle - $!";
108              
109 10         84 my $self = $class->SUPER::__new( %args );
110              
111 10         373 $self->{epoll} = $epoll;
112 10         83 $self->{sigmask} = POSIX::SigSet->new();
113 10         19 $self->{maxevents} = 8;
114              
115 10         19 $self->{fakeevents} = {};
116              
117 10         20 $self->{signals} = {}; # {$name} => SignalWatch
118 10         24 $self->{masks} = {};
119              
120 10         29 $self->{pid} = $$;
121              
122             # epoll gets very upset if applications close() filehandles without telling
123             # it, and then try to add that mask a second time. We can attempt to detect
124             # this by storing the mapping from fileno to refaddr($fh)
125 10         20 $self->{refaddr_for_fileno} = {};
126              
127 10         25 return $self;
128             }
129              
130             # Some bits to keep track of in {masks}
131             use constant {
132 11         18731 WATCH_READ => 0x01,
133             WATCH_WRITE => 0x02,
134             WATCH_HUP => 0x04,
135 11     11   2090 };
  11         119  
136              
137             =head1 METHODS
138              
139             As this is a subclass of L, all of its methods are inherited.
140             Expect where noted below, all of the class's methods behave identically to
141             C.
142              
143             =cut
144              
145             sub DESTROY
146             {
147 6     6   26810 my $self = shift;
148              
149 6         54 foreach my $signal ( keys %{ $self->{signals} } ) {
  6         205  
150 4         99 $self->unwatch_signal( $signal );
151             }
152             }
153              
154             =head2 loop_once
155              
156             $count = $loop->loop_once( $timeout )
157              
158             This method calls C, and processes the results of that call.
159             It returns the total number of C callbacks invoked, or
160             C if the underlying C method returned an error. If the
161             C was interrupted by a signal, then 0 is returned instead.
162              
163             =cut
164              
165             sub loop_once
166             {
167 26     26 1 3062415 my $self = shift;
168 26         67 my ( $timeout ) = @_;
169              
170 26 50       248 $self->post_fork if $self->{pid} != $$;
171              
172 26         214 $self->_adjust_timeout( \$timeout );
173              
174             # Round up to next milisecond to avoid zero timeouts
175 26 50       1185 my $msec = defined $timeout ? ceil( $timeout * 1000 ) : -1;
176              
177 26 50       73 $timeout = 0 if keys %{ $self->{fakeevents} };
  26         131  
178              
179 26         3788938 my $ret = $self->{epoll}->wait( $self->{maxevents}, $msec / 1000, $self->{sigmask} );
180              
181 26 50 66     508 return undef if !defined $ret and $! != EINTR;
182              
183 26   100     159 my $count = $ret || 0;
184              
185 26         66 if( WATCHDOG_ENABLE and !$self->{alarmed} ) {
186             alarm( IO::Async::Loop->WATCHDOG_INTERVAL );
187             $self->{alarmed}++;
188             }
189              
190 26         115 my $iowatches = $self->{iowatches};
191              
192 26         59 my $fakeevents = $self->{fakeevents};
193 26         93 my @fakeevents = map { [ $_ => $fakeevents->{$_} ] } keys %$fakeevents;
  0         0  
194              
195 26         115 foreach my $ev ( @fakeevents ) {
196 0         0 my ( $fd, $bits ) = @$ev;
197              
198 0         0 my $watch = $iowatches->{$fd};
199              
200 0 0       0 if( $bits & WATCH_READ ) {
201 0 0       0 $watch->[1]->() if $watch->[1];
202 0         0 $count++;
203             }
204              
205 0 0       0 if( $bits & WATCH_WRITE ) {
206 0 0       0 $watch->[2]->() if $watch->[2];
207 0         0 $count++;
208             }
209              
210 0 0       0 if( $bits & WATCH_HUP ) {
211 0 0       0 $watch->[3]->() if $watch->[3];
212 0         0 $count++;
213             }
214             }
215              
216 26         91 my $signals = $self->{signals};
217 26         98 foreach my $sigslot ( values %$signals ) {
218 16 100       104 if( $sigslot->pending ) {
219 15         178 $sigslot->pending = 0;
220 15         105 $sigslot->code->();
221 15         913 $count++;
222             }
223             }
224              
225 26         308 $count += $self->_manage_queues;
226              
227             # If we entirely filled the event buffer this time, we may have missed some
228             # Lets get a bigger buffer next time
229 26 50 66     1815 $self->{maxevents} *= 2 if defined $ret and $ret == $self->{maxevents};
230              
231 26         48 alarm( 0 ), undef $self->{alarmed} if WATCHDOG_ENABLE;
232              
233 26         104 return $count;
234             }
235              
236             sub watch_io
237             {
238 3     3 1 65641 my $self = shift;
239 3         147 my %params = @_;
240              
241 3 50       246 $self->post_fork if $self->{pid} != $$;
242              
243 3         9 my $epoll = $self->{epoll};
244              
245 3         162 $self->__watch_io( %params );
246              
247 3         469 my $handle = $params{handle};
248 3         28 my $fd = $handle->fileno;
249              
250 3         21 my $watch = $self->{iowatches}->{$fd};
251              
252 3         23 my $alarmed = \$self->{alarmed};
253              
254 3   50     64 my $curmask = $self->{masks}->{$fd} || 0;
255             my $cb = $self->{callbacks}->{$fd} ||= sub {
256 3     3   19 my ( $events ) = @_;
257              
258 3         6 if( WATCHDOG_ENABLE and !$$alarmed ) {
259             alarm( IO::Async::Loop->WATCHDOG_INTERVAL );
260             $$alarmed = 1;
261             }
262              
263 3 0 33     33 if( $events->{in} or $events->{hup} or $events->{err} ) {
      0        
264 3 50       25 $watch->[1]->() if $watch->[1];
265             }
266              
267 3 50 33     53 if( $events->{out} or $events->{hup} or $events->{err} ) {
      33        
268 0 0       0 $watch->[2]->() if $watch->[2];
269             }
270              
271 3 50 33     22 if( $events->{hup} or $events->{err} ) {
272 0 0       0 $watch->[3]->() if $watch->[3];
273             }
274 3   50     104 };
275              
276 3         33 my $mask = $curmask;
277 3 50       11 $params{on_read_ready} and $mask |= WATCH_READ;
278 3 50       43 $params{on_write_ready} and $mask |= WATCH_WRITE;
279 3 50       14 $params{on_hangup} and $mask |= WATCH_HUP;
280              
281 3         5 my @bits;
282 3 50       12 push @bits, 'in' if $mask & WATCH_READ;
283 3 50       9 push @bits, 'out' if $mask & WATCH_WRITE;
284 3 50       20 push @bits, 'hup' if $mask & WATCH_HUP;
285              
286 3         16 my $fakeevents = $self->{fakeevents};
287              
288 3 50       53 if( !$curmask ) {
    0          
289 3 50       11 defined $self->{refaddr_for_fileno}->{$fd} and
290             croak "Epoll has already seen this filehandle; cannot add it a second time";
291 3         51 $self->{refaddr_for_fileno}->{$fd} = refaddr $handle;
292              
293 3 50       53 if( defined $epoll->add( $handle, \@bits, $cb ) ) {
    0          
294             # All OK
295             }
296             elsif( $! == EPERM ) {
297             # The filehandle isn't epoll'able. This means kernel thinks it should
298             # always be ready.
299 0         0 $fakeevents->{$fd} = $mask;
300             }
301             else {
302 0         0 croak "Cannot EPOLL_CTL_ADD($fd,$mask) - $!";
303             }
304              
305 3         15 $self->{masks}->{$fd} = $mask;
306             }
307             elsif( $mask != $curmask ) {
308 0 0       0 $self->{refaddr_for_fileno}->{$fd} == refaddr $handle or
309             croak "Epoll cannot cope with fd $fd changing handle under it";
310              
311 0 0       0 if( exists $fakeevents->{$fd} ) {
312 0         0 $fakeevents->{$fd} = $mask;
313             }
314             else {
315 0 0       0 defined $epoll->modify( $handle, \@bits, $cb )
316             or croak "Cannot EPOLL_CTL_MOD($fd,$mask) - $!";
317             }
318              
319 0         0 $self->{masks}->{$fd} = $mask;
320             }
321             }
322              
323             sub unwatch_io
324             {
325 0     0 1 0 my $self = shift;
326 0         0 my %params = @_;
327              
328 0 0       0 $self->post_fork if $self->{pid} != $$;
329              
330 0         0 $self->__unwatch_io( %params );
331              
332 0         0 my $epoll = $self->{epoll};
333              
334 0         0 my $handle = $params{handle};
335 0         0 my $fd = $handle->fileno;
336              
337 0 0       0 my $curmask = $self->{masks}->{$fd} or return;
338 0 0       0 my $cb = $self->{callbacks}->{$fd} or return;
339              
340 0         0 my $mask = $curmask;
341 0 0       0 $params{on_read_ready} and $mask &= ~WATCH_READ;
342 0 0       0 $params{on_write_ready} and $mask &= ~WATCH_WRITE;
343 0 0       0 $params{on_hangup} and $mask &= ~WATCH_HUP;
344              
345 0         0 my $fakeevents = $self->{fakeevents};
346              
347 0 0       0 $self->{refaddr_for_fileno}->{$fd} == refaddr $handle or
348             croak "Epoll cannot cope with fd $fd changing handle under it";
349              
350 0 0       0 if( $mask ) {
351 0 0       0 if( exists $fakeevents->{$fd} ) {
352 0         0 $fakeevents->{$fd} = $mask;
353             }
354             else {
355 0         0 my @bits;
356 0 0       0 push @bits, 'in' if $mask & WATCH_READ;
357 0 0       0 push @bits, 'out' if $mask & WATCH_WRITE;
358 0 0       0 push @bits, 'hup' if $mask & WATCH_HUP;
359              
360 0 0       0 defined $epoll->modify( $handle, \@bits, $cb )
361             or croak "Cannot EPOLL_CTL_MOD($fd,$mask) - $!";
362             }
363              
364 0         0 $self->{masks}->{$fd} = $mask;
365             }
366             else {
367 0 0       0 if( exists $fakeevents->{$fd} ) {
368 0         0 delete $fakeevents->{$fd};
369             }
370             else {
371 0 0       0 defined $epoll->delete( $handle )
372             or croak "Cannot EPOLL_CTL_DEL($fd) - $!";
373             }
374              
375 0         0 delete $self->{masks}->{$fd};
376 0         0 delete $self->{callbacks}->{$fd};
377              
378 0         0 delete $self->{refaddr_for_fileno}->{$fd};
379             }
380             }
381              
382             sub watch_signal
383             {
384 7     7 1 10425 my $self = shift;
385 7         45 my ( $signal, $code ) = @_;
386              
387 7 100       261 exists $SIG{$signal} or croak "Unrecognised signal name $signal";
388              
389             # We cannot simply set $SIG{$signal} = $code here, because of perl bug
390             # http://rt.perl.org/rt3/Ticket/Display.html?id=82040
391             # Instead, we'll store a tiny piece of code that just sets a flag, and
392             # check the flags on return from the epoll_pwait call.
393              
394 6         110 $self->{signals}{$signal} = SignalWatch( $code, 0, $SIG{$signal} );
395 6         300 my $pending = \$self->{signals}{$signal}->pending;
396              
397 6         207 my $signum = $self->signame2num( $signal );
398 6         2707 sigprocmask( SIG_BLOCK, POSIX::SigSet->new( $signum ) );
399              
400             # Note this is an unsafe signal handler, and as such it should do as little
401             # as possible.
402 6     15   234 my $sigaction = POSIX::SigAction->new( sub { $$pending = 1 } );
  15         246  
403 6 50       391 sigaction( $signum, $sigaction ) or croak "Unable to sigaction - $!";
404             }
405              
406             sub unwatch_signal
407             {
408 6     6 1 1471 my $self = shift;
409 6         37 my ( $signal ) = @_;
410              
411 6 50       68 exists $SIG{$signal} or croak "Unrecognised signal name $signal";
412              
413             # When we saved the original value, we might have got an undef. But %SIG
414             # doesn't like having undef assigned back in, so we need to translate
415 6   100     310 $SIG{$signal} = ( $self->{signals}{$signal} && $self->{signals}{$signal}->orig ) || 'DEFAULT';
416              
417 6         605 delete $self->{signals}{$signal};
418            
419 6         81 my $signum = $self->signame2num( $signal );
420              
421 6         1359 sigprocmask( SIG_UNBLOCK, POSIX::SigSet->new( $signum ) );
422             }
423              
424             sub post_fork
425             {
426 3     3 1 28 my $self = shift;
427              
428 3         7.72054685392951e-316 $self->{epoll} = Linux::Epoll->new;
429 3         60 $self->{pid} = $$;
430              
431 3 50       56 my $watches = $self->{iowatches} or return;
432              
433 3         47 foreach my $watch ( values %$watches ) {
434 0           my ( $handle, $on_read_ready, $on_write_ready, $on_hangup ) = @$watch;
435 0           $self->watch_io(
436             handle => $handle,
437             on_read_ready => $on_read_ready,
438             on_write_ready => $on_write_ready,
439             on_hangup => $on_hangup,
440             );
441             }
442             }
443              
444             =head1 SEE ALSO
445              
446             =over 4
447              
448             =item *
449              
450             L - O(1) multiplexing for Linux
451              
452             =item *
453              
454             L - use IO::Async with poll(2)
455              
456             =back
457              
458             =head1 AUTHOR
459              
460             Paul Evans
461              
462             =cut
463              
464             0x55AA;