File Coverage

blib/lib/IO/Async/Loop/Epoll.pm
Criterion Covered Total %
statement 185 192 96.3
branch 73 106 68.8
condition 26 29 89.6
subroutine 24 24 100.0
pod 7 8 87.5
total 315 359 87.7


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2008-2020 -- leonerd@leonerd.org.uk
5              
6             package IO::Async::Loop::Epoll;
7              
8 13     13   283709 use strict;
  13         58  
  13         307  
9 13     13   53 use warnings;
  13         14  
  13         429  
10              
11             our $VERSION = '0.21';
12 13     13   60 use constant API_VERSION => '0.76';
  13         25  
  13         813  
13              
14             # Only Linux is known always to be able to report EOF conditions on
15             # filehandles using EPOLLHUP
16             # This is probably redundant as epoll is probably Linux-only also, but it
17             # doesn't harm anything to test specially.
18 13     13   64 use constant _CAN_ON_HANGUP => ( $^O eq "linux" );
  13         15  
  13         556  
19              
20 13     13   63 use base qw( IO::Async::Loop );
  13         21  
  13         8566  
21              
22 13     13   171457 use Carp;
  13         30  
  13         720  
23              
24 13     13   5268 use Linux::Epoll 0.005;
  13         40916  
  13         635  
25              
26 13     13   83 use POSIX qw( EINTR EPERM SIG_BLOCK SIG_UNBLOCK sigprocmask sigaction ceil );
  13         22  
  13         78  
27 13     13   1194 use Scalar::Util qw( refaddr );
  13         49  
  13         462  
28              
29 13     13   61 use constant _CAN_WATCHDOG => 1;
  13         24  
  13         674  
30 13     13   65 use constant WATCHDOG_ENABLE => IO::Async::Loop->WATCHDOG_ENABLE;
  13         14  
  13         605  
31              
32 13     13   5002 use Struct::Dumb;
  13         22025  
  13         41  
33              
34             struct SignalWatch => [qw( code pending orig )];
35              
36             =head1 NAME
37              
38             C - use C with C on Linux
39              
40             =head1 SYNOPSIS
41              
42             use IO::Async::Loop::Epoll;
43              
44             use IO::Async::Stream;
45             use IO::Async::Signal;
46              
47             my $loop = IO::Async::Loop::Epoll->new;
48              
49             $loop->add( IO::Async::Stream->new(
50             read_handle => \*STDIN,
51             on_read => sub {
52             my ( $self, $buffref ) = @_;
53             while( $$buffref =~ s/^(.*)\r?\n// ) {
54             print "You said: $1\n";
55             }
56             },
57             ) );
58              
59             $loop->add( IO::Async::Signal->new(
60             name => 'INT',
61             on_receipt => sub {
62             print "SIGINT, will now quit\n";
63             $loop->stop;
64             },
65             ) );
66              
67             $loop->run;
68              
69             =head1 DESCRIPTION
70              
71             This subclass of L uses C on Linux to perform
72             read-ready and write-ready tests so that the OZ<>(1) high-performance
73             multiplexing of Linux's C syscall can be used.
74              
75             The C Linux subsystem uses a persistent registration system, meaning
76             that better performance can be achieved in programs using a large number of
77             filehandles. Each C syscall only has an overhead proportional
78             to the number of ready filehandles, rather than the total number being
79             watched. For more detail, see the C manpage.
80              
81             This class uses the C system call, which atomically switches
82             the process's signal mask, performs a wait exactly as C would,
83             then switches it back. This allows a process to block the signals it cares
84             about, but switch in an empty signal mask during the poll, allowing it to
85             handle file IO and signals concurrently.
86              
87             =cut
88              
89             =head1 CONSTRUCTOR
90              
91             =cut
92              
93             =head2 new
94              
95             $loop = IO::Async::Loop::Epoll->new()
96              
97             This function returns a new instance of a C object.
98              
99             =cut
100              
101             sub new
102             {
103 12     12 1 429 my $class = shift;
104 12         28 my ( %args ) = @_;
105              
106 12         356 my $epoll = Linux::Epoll->new;
107 12 50       53 defined $epoll or croak "Cannot create epoll handle - $!";
108              
109 12         180 my $self = $class->SUPER::__new( %args );
110              
111 12         519 $self->{epoll} = $epoll;
112 12         110 $self->{sigmask} = POSIX::SigSet->new();
113 12         24 $self->{maxevents} = 8;
114              
115 12         23 $self->{fakeevents} = {};
116              
117 12         22 $self->{signals} = {}; # {$name} => SignalWatch
118 12         44 $self->{masks} = {};
119              
120 12         123 $self->{pid} = $$;
121              
122             # epoll gets very upset if applications close() filehandles without telling
123             # it, and then try to add that mask a second time. We can attempt to detect
124             # this by storing the mapping from fileno to refaddr($fh)
125 12         31 $self->{refaddr_for_fileno} = {};
126              
127 12         42 return $self;
128             }
129              
130             # Some bits to keep track of in {masks}
131             use constant {
132 13         20500 WATCH_READ => 0x01,
133             WATCH_WRITE => 0x02,
134             WATCH_HUP => 0x04,
135 13     13   2456 };
  13         162  
136              
137             =head1 METHODS
138              
139             As this is a subclass of L, all of its methods are inherited.
140             Expect where noted below, all of the class's methods behave identically to
141             C.
142              
143             =cut
144              
145             sub DESTROY
146             {
147 8     8   45951 my $self = shift;
148              
149 8         77 foreach my $signal ( keys %{ $self->{signals} } ) {
  8         517  
150 5         331 $self->unwatch_signal( $signal );
151             }
152             }
153              
154             sub is_running
155             {
156 1     1 0 92 my $self = shift;
157 1         8 return $self->{running};
158             }
159              
160             =head2 loop_once
161              
162             $count = $loop->loop_once( $timeout )
163              
164             This method calls C, and processes the results of that call.
165             It returns the total number of C callbacks invoked, or
166             C if the underlying C method returned an error. If the
167             C was interrupted by a signal, then 0 is returned instead.
168              
169             =cut
170              
171             sub loop_once
172             {
173 41     41 1 3114266 my $self = shift;
174 41         188 my ( $timeout ) = @_;
175              
176 41 50       245 $self->post_fork if $self->{pid} != $$;
177              
178 41         1033 $self->_adjust_timeout( \$timeout );
179              
180             # Round up to next milisecond to avoid zero timeouts
181 41 50       5519 my $msec = defined $timeout ? ceil( $timeout * 1000 ) : -1;
182              
183 41 100       160 $timeout = 0 if keys %{ $self->{fakeevents} };
  41         154  
184              
185 41         287 $self->pre_wait;
186 41         5055814 my $ret = $self->{epoll}->wait( $self->{maxevents}, $msec / 1000, $self->{sigmask} );
187 41         549 $self->post_wait;
188              
189 41 50 66     2089 return undef if !defined $ret and $! != EINTR;
190              
191 41   100     172 my $count = $ret || 0;
192              
193 41         53 if( WATCHDOG_ENABLE and !$self->{alarmed} ) {
194             alarm( IO::Async::Loop->WATCHDOG_INTERVAL );
195             $self->{alarmed}++;
196             }
197              
198 41         88 my $iowatches = $self->{iowatches};
199              
200 41         97 my $fakeevents = $self->{fakeevents};
201 41         131 my @fakeevents = map { [ $_ => $fakeevents->{$_} ] } keys %$fakeevents;
  1         9  
202              
203 41         172 foreach my $ev ( @fakeevents ) {
204 1         4 my ( $fd, $bits ) = @$ev;
205              
206 1         3 my $watch = $iowatches->{$fd};
207              
208 1 50       5 if( $bits & WATCH_READ ) {
209 1 50       18 $watch->[1]->() if $watch->[1];
210 1         4 $count++;
211             }
212              
213 1 50       5 if( $bits & WATCH_WRITE ) {
214 1 50       7 $watch->[2]->() if $watch->[2];
215 1         3 $count++;
216             }
217              
218 1 50       5 if( $bits & WATCH_HUP ) {
219 0 0       0 $watch->[3]->() if $watch->[3];
220 0         0 $count++;
221             }
222             }
223              
224 41         399 my $signals = $self->{signals};
225 41         176 foreach my $sigslot ( values %$signals ) {
226 17 50       151 if( $sigslot->pending ) {
227 17         178 $sigslot->pending = 0;
228 17         122 $sigslot->code->();
229 17         2056 $count++;
230             }
231             }
232              
233 41         224 $count += $self->_manage_queues;
234              
235             # If we entirely filled the event buffer this time, we may have missed some
236             # Lets get a bigger buffer next time
237 41 50 66     2531 $self->{maxevents} *= 2 if defined $ret and $ret == $self->{maxevents};
238              
239 41         234 alarm( 0 ), undef $self->{alarmed} if WATCHDOG_ENABLE;
240              
241 41         423 return $count;
242             }
243              
244             sub watch_io
245             {
246 14     14 1 49189 my $self = shift;
247 14         135 my %params = @_;
248              
249 14 100       317 $self->post_fork if $self->{pid} != $$;
250              
251 14         27 my $epoll = $self->{epoll};
252              
253 14         463 $self->__watch_io( %params );
254              
255 14         1036 my $handle = $params{handle};
256 14         56 my $fd = $handle->fileno;
257              
258 14         62 my $watch = $self->{iowatches}->{$fd};
259              
260 14         44 my $alarmed = \$self->{alarmed};
261              
262 14   100     127 my $curmask = $self->{masks}->{$fd} || 0;
263             my $cb = $self->{callbacks}->{$fd} ||= sub {
264 14     14   46 my ( $events ) = @_;
265              
266 14         21 if( WATCHDOG_ENABLE and !$$alarmed ) {
267             alarm( IO::Async::Loop->WATCHDOG_INTERVAL );
268             $$alarmed = 1;
269             }
270              
271 14 100 100     64 if( $events->{in} or $events->{hup} or $events->{err} ) {
      100        
272 11 100       49 $watch->[1]->() if $watch->[1];
273             }
274              
275 14 100 100     132 if( $events->{out} or $events->{hup} or $events->{err} ) {
      100        
276 9 100       21 $watch->[2]->() if $watch->[2];
277             }
278              
279 14 100 100     67 if( $events->{hup} or $events->{err} ) {
280 6 100       21 $watch->[3]->() if $watch->[3];
281             }
282 14   100     177 };
283              
284 14         25 my $mask = $curmask;
285 14 100       71 $params{on_read_ready} and $mask |= WATCH_READ;
286 14 100       243 $params{on_write_ready} and $mask |= WATCH_WRITE;
287 14 100       58 $params{on_hangup} and $mask |= WATCH_HUP;
288              
289 14         32 my @bits;
290 14 100       47 push @bits, 'in' if $mask & WATCH_READ;
291 14 100       48 push @bits, 'out' if $mask & WATCH_WRITE;
292 14 100       32 push @bits, 'hup' if $mask & WATCH_HUP;
293              
294 14         26 my $fakeevents = $self->{fakeevents};
295              
296 14 100       52 if( !$curmask ) {
    50          
297 13 50       32 defined $self->{refaddr_for_fileno}->{$fd} and
298             croak "Epoll has already seen this filehandle; cannot add it a second time";
299 13         583 $self->{refaddr_for_fileno}->{$fd} = refaddr $handle;
300              
301 13 100       211 if( defined $epoll->add( $handle, \@bits, $cb ) ) {
    50          
302             # All OK
303             }
304             elsif( $! == EPERM ) {
305             # The filehandle isn't epoll'able. This means kernel thinks it should
306             # always be ready.
307 1         3 $fakeevents->{$fd} = $mask;
308             }
309             else {
310 0         0 croak "Cannot EPOLL_CTL_ADD($fd,$mask) - $!";
311             }
312              
313 13         83 $self->{masks}->{$fd} = $mask;
314             }
315             elsif( $mask != $curmask ) {
316 1 50       6 $self->{refaddr_for_fileno}->{$fd} == refaddr $handle or
317             croak "Epoll cannot cope with fd $fd changing handle under it";
318              
319 1 50       4 if( exists $fakeevents->{$fd} ) {
320 0         0 $fakeevents->{$fd} = $mask;
321             }
322             else {
323 1 50       15 defined $epoll->modify( $handle, \@bits, $cb )
324             or croak "Cannot EPOLL_CTL_MOD($fd,$mask) - $!";
325             }
326              
327 1         5 $self->{masks}->{$fd} = $mask;
328             }
329             }
330              
331             sub unwatch_io
332             {
333 11     11 1 4979 my $self = shift;
334 11         33 my %params = @_;
335              
336 11 50       38 $self->post_fork if $self->{pid} != $$;
337              
338 11         52 $self->__unwatch_io( %params );
339              
340 11         332 my $epoll = $self->{epoll};
341              
342 11         19 my $handle = $params{handle};
343 11         23 my $fd = $handle->fileno;
344              
345 11 50       58 my $curmask = $self->{masks}->{$fd} or return;
346 11 50       27 my $cb = $self->{callbacks}->{$fd} or return;
347              
348 11         16 my $mask = $curmask;
349 11 100       24 $params{on_read_ready} and $mask &= ~WATCH_READ;
350 11 100       24 $params{on_write_ready} and $mask &= ~WATCH_WRITE;
351 11 100       23 $params{on_hangup} and $mask &= ~WATCH_HUP;
352              
353 11         15 my $fakeevents = $self->{fakeevents};
354              
355 11 50       38 $self->{refaddr_for_fileno}->{$fd} == refaddr $handle or
356             croak "Epoll cannot cope with fd $fd changing handle under it";
357              
358 11 100       22 if( $mask ) {
359 1 50       3 if( exists $fakeevents->{$fd} ) {
360 0         0 $fakeevents->{$fd} = $mask;
361             }
362             else {
363 1         2 my @bits;
364 1 50       4 push @bits, 'in' if $mask & WATCH_READ;
365 1 50       3 push @bits, 'out' if $mask & WATCH_WRITE;
366 1 50       4 push @bits, 'hup' if $mask & WATCH_HUP;
367              
368 1 50       14 defined $epoll->modify( $handle, \@bits, $cb )
369             or croak "Cannot EPOLL_CTL_MOD($fd,$mask) - $!";
370             }
371              
372 1         5 $self->{masks}->{$fd} = $mask;
373             }
374             else {
375 10 100       27 if( exists $fakeevents->{$fd} ) {
376 1         2 delete $fakeevents->{$fd};
377             }
378             else {
379 9 50       103 defined $epoll->delete( $handle )
380             or croak "Cannot EPOLL_CTL_DEL($fd) - $!";
381             }
382              
383 10         24 delete $self->{masks}->{$fd};
384 10         16 delete $self->{callbacks}->{$fd};
385              
386 10         207 delete $self->{refaddr_for_fileno}->{$fd};
387             }
388             }
389              
390             sub watch_signal
391             {
392 5     5 1 12300 my $self = shift;
393 5         60 my ( $signal, $code ) = @_;
394              
395 5 50       115 exists $SIG{$signal} or croak "Unrecognised signal name $signal";
396              
397             # We cannot simply set $SIG{$signal} = $code here, because of perl bug
398             # http://rt.perl.org/rt3/Ticket/Display.html?id=82040
399             # Instead, we'll store a tiny piece of code that just sets a flag, and
400             # check the flags on return from the epoll_pwait call.
401              
402 5         130 $self->{signals}{$signal} = SignalWatch( $code, 0, $SIG{$signal} );
403 5         335 my $pending = \$self->{signals}{$signal}->pending;
404              
405 5         205 my $signum = $self->signame2num( $signal );
406 5         3070 sigprocmask( SIG_BLOCK, POSIX::SigSet->new( $signum ) );
407              
408             # Note this is an unsafe signal handler, and as such it should do as little
409             # as possible.
410 5     17   430 my $sigaction = POSIX::SigAction->new( sub { $$pending = 1 } );
  17         248  
411 5 50       375 sigaction( $signum, $sigaction ) or croak "Unable to sigaction - $!";
412             }
413              
414             sub unwatch_signal
415             {
416 5     5 1 17 my $self = shift;
417 5         59 my ( $signal ) = @_;
418              
419 5 50       67 exists $SIG{$signal} or croak "Unrecognised signal name $signal";
420              
421             # When we saved the original value, we might have got an undef. But %SIG
422             # doesn't like having undef assigned back in, so we need to translate
423 5   50     427 $SIG{$signal} = ( $self->{signals}{$signal} && $self->{signals}{$signal}->orig ) || 'DEFAULT';
424              
425 5         975 delete $self->{signals}{$signal};
426            
427 5         313 my $signum = $self->signame2num( $signal );
428              
429 5         1.36791599636801e-315 sigprocmask( SIG_UNBLOCK, POSIX::SigSet->new( $signum ) );
430             }
431              
432             sub post_fork
433             {
434 3     3 1 193 my $self = shift;
435              
436 3         7.61130182508871e-316 $self->{epoll} = Linux::Epoll->new;
437 3         55 $self->{pid} = $$;
438              
439 3 50       55 my $watches = $self->{iowatches} or return;
440              
441 3         47 foreach my $watch ( values %$watches ) {
442 0           my ( $handle, $on_read_ready, $on_write_ready, $on_hangup ) = @$watch;
443 0           $self->watch_io(
444             handle => $handle,
445             on_read_ready => $on_read_ready,
446             on_write_ready => $on_write_ready,
447             on_hangup => $on_hangup,
448             );
449             }
450             }
451              
452             =head1 SEE ALSO
453              
454             =over 4
455              
456             =item *
457              
458             L - O(1) multiplexing for Linux
459              
460             =item *
461              
462             L - use IO::Async with poll(2)
463              
464             =back
465              
466             =head1 AUTHOR
467              
468             Paul Evans
469              
470             =cut
471              
472             0x55AA;