File Coverage

blib/lib/Mojo/Reactor/Epoll.pm
Criterion Covered Total %
statement 133 134 99.2
branch 44 52 84.6
condition 25 41 60.9
subroutine 26 26 100.0
pod 12 12 100.0
total 240 265 90.5


line stmt bran cond sub pod time code
1             package Mojo::Reactor::Epoll;
2 1     1   217776 use Mojo::Base 'Mojo::Reactor';
  1         10  
  1         6  
3              
4             $ENV{MOJO_REACTOR} ||= 'Mojo::Reactor::Epoll';
5              
6 1     1   30342 use Carp 'croak';
  1         1  
  1         38  
7 1     1   395 use Linux::Epoll;
  1         433  
  1         40  
8 1     1   5 use List::Util 'min';
  1         35  
  1         61  
9 1     1   5 use Mojo::Util qw(md5_sum steady_time);
  1         2  
  1         36  
10 1     1   5 use Scalar::Util 'weaken';
  1         2  
  1         30  
11 1     1   5 use Time::HiRes 'usleep';
  1         2  
  1         15  
12              
13 1   50 1   187 use constant DEBUG => $ENV{MOJO_REACTOR_EPOLL_DEBUG} || 0;
  1         2  
  1         1761  
14              
15             our $VERSION = '0.009';
16              
17             sub again {
18 6     6 1 223 my ($self, $id) = @_;
19 6 100       252 croak 'Timer not active' unless my $timer = $self->{timers}{$id};
20 5         10 $timer->{time} = steady_time + $timer->{after};
21             }
22              
23             sub io {
24 10     10 1 3559 my ($self, $handle, $cb) = @_;
25 10         26 my $fd = fileno $handle;
26 10         31 $self->{io}{$fd}{cb} = $cb;
27 10         14 warn "-- Set IO watcher for $fd\n" if DEBUG;
28 10         27 return $self->watch($handle, 1, 1);
29             }
30              
31 10     10 1 3488 sub is_running { !!shift->{running} }
32              
33             sub next_tick {
34 15     15 1 78325 my ($self, $cb) = @_;
35 15         17 push @{$self->{next_tick}}, $cb;
  15         36  
36 15   66     45 $self->{next_timer} //= $self->timer(0 => \&_next);
37 15         23 return undef;
38             }
39              
40             sub one_tick {
41 17822     17822 1 23225 my $self = shift;
42            
43             # Just one tick
44 17822 100       26033 local $self->{running} = 1 unless $self->{running};
45            
46             # Wait for one event
47 17822         21739 my $i;
48 17822   66     45309 until ($i || !$self->{running}) {
49             # Stop automatically if there is nothing to watch
50 17822 100 100     19638 return $self->stop unless keys %{$self->{timers}} || keys %{$self->{io}};
  17822         32866  
  7         21  
51            
52             # Calculate ideal timeout based on timers
53 17816         20744 my $min = min map { $_->{time} } values %{$self->{timers}};
  21580         45646  
  17816         29432  
54 17816 100       37467 my $timeout = defined $min ? ($min - steady_time) : 0.5;
55 17816 100       86820 $timeout = 0 if $timeout < 0;
56            
57             # I/O
58 17816 100       19753 if (my $watched = keys %{$self->{io}}) {
  17816 50       30953  
59             # Set max events to half the number of descriptors, to a minimum of 10
60 15481         23246 my $maxevents = int $watched/2;
61 15481 50       23170 $maxevents = 10 if $maxevents < 10;
62            
63 15481   66     27219 my $epoll = $self->{epoll} // $self->_create_epoll;
64              
65 15481         197179 my $count = $epoll->wait($maxevents, $timeout);
66 15481 50       37455 $i += $count if defined $count;
67             }
68            
69             # Wait for timeout if epoll can't be used
70 0         0 elsif ($timeout) { usleep $timeout * 1000000 }
71            
72             # Timers (time should not change in between timers)
73 17816         41098 my $now = steady_time;
74 17816         83313 for my $id (keys %{$self->{timers}}) {
  17816         38399  
75 21581 50       38725 next unless my $t = $self->{timers}{$id};
76 21581 100       75428 next unless $t->{time} <= $now;
77            
78             # Recurring timer
79 3060 100       4439 if (exists $t->{recurring}) { $t->{time} = $now + $t->{recurring} }
  3033         4559  
80            
81             # Normal timer
82 27         56 else { $self->remove($id) }
83            
84 3060 50 50     7543 ++$i and $self->_try('Timer', $t->{cb}) if $t->{cb};
85             }
86             }
87             }
88              
89 4     4 1 641 sub recurring { shift->_timer(1, @_) }
90              
91             sub remove {
92 37     37 1 3869 my ($self, $remove) = @_;
93 37 100       74 if (ref $remove) {
94 5         23 my $fd = fileno $remove;
95 5 50 66     26 if (exists $self->{io}{$fd} and exists $self->{io}{$fd}{epoll_cb}) {
96 3         50 $self->{epoll}->delete($remove);
97             }
98 5         9 warn "-- Removed IO watcher for $fd\n" if DEBUG;
99 5         52 return !!delete $self->{io}{$fd};
100             } else {
101 32         34 warn "-- Removed timer $remove\n" if DEBUG;
102 32         142 return !!delete $self->{timers}{$remove};
103             }
104             }
105              
106             sub reset {
107 2     2 1 6 my $self = shift;
108 2         4 delete @{$self}{qw(epoll io next_tick next_timer pending_watch timers)};
  2         41  
109             }
110              
111             sub start {
112 24     24 1 126 my $self = shift;
113 24         42 $self->{running}++;
114 24         72 $self->one_tick while $self->{running};
115             }
116              
117 26     26 1 201 sub stop { delete shift->{running} }
118              
119 33     33 1 12174 sub timer { shift->_timer(0, @_) }
120              
121             sub watch {
122 34     34 1 271 my ($self, $handle, $read, $write) = @_;
123            
124 34         57 my $fd = fileno $handle;
125 34 100       227 croak 'I/O watcher not active' unless my $io = $self->{io}{$fd};
126              
127 33         44 my $epoll = $self->{epoll};
128 33 100       56 unless (defined $epoll) {
129 9         12 push @{$self->{pending_watch}}, [$handle, $read, $write];
  9         22  
130 9         24 return $self;
131             }
132            
133 24         41 my @events;
134 24 100       52 push @events, 'in', 'prio' if $read;
135 24 100       46 push @events, 'out' if $write;
136            
137 24 100       66 my $op = exists $io->{epoll_cb} ? 'modify' : 'add';
138            
139 24         66 weaken $self;
140             my $cb = $io->{epoll_cb} // sub {
141 15482     15482   35702 my ($events) = @_;
142 15482 50 66     30956 if ($events->{in} or $events->{prio} or $events->{hup} or $events->{err}) {
      33        
      33        
143 15139 50       28191 return unless exists $self->{io}{$fd};
144 15139         30312 $self->_try('I/O watcher', $self->{io}{$fd}{cb}, 0);
145             }
146 15482 100 66     60358 if ($events->{out} or $events->{hup} or $events->{err}) {
      33        
147 1809 100       3353 return unless exists $self->{io}{$fd};
148 1808         2900 $self->_try('I/O watcher', $self->{io}{$fd}{cb}, 1);
149             }
150 24   100     74 };
151 24         306 $epoll->$op($handle, \@events, $cb);
152            
153             # Cache callback for future modify operations, after successfully added to epoll
154 24   66     84 $io->{epoll_cb} //= $cb;
155            
156 24         84 return $self;
157             }
158              
159             sub _create_epoll {
160 3     3   5 my $self = shift;
161 3         109 $self->{epoll} = Linux::Epoll->new;
162 3   50     8 $self->watch(@$_) for @{delete $self->{pending_watch} // []};
  3         16  
163 3         22 return $self->{epoll};
164             }
165              
166             sub _id {
167 37     37   53 my $self = shift;
168 37         116 my $id;
169 37         95 do { $id = md5_sum 't' . steady_time . rand 999 } while $self->{timers}{$id};
  37         103  
170 37         853 return $id;
171             }
172              
173             sub _next {
174 5     5   7 my $self = shift;
175 5         10 delete $self->{next_timer};
176 5         5 while (my $cb = shift @{$self->{next_tick}}) { $self->$cb }
  14         25  
  19         78  
177             }
178              
179             sub _timer {
180 37     37   69 my ($self, $recurring, $after, $cb) = @_;
181            
182 37         303 my $id = $self->_id;
183 37         128 my $timer = $self->{timers}{$id}
184             = {cb => $cb, after => $after, time => steady_time + $after};
185 37 100       349 $timer->{recurring} = $after if $recurring;
186            
187 37         44 if (DEBUG) {
188             my $is_recurring = $recurring ? ' (recurring)' : '';
189             warn "-- Set timer $id after $after seconds$is_recurring\n";
190             }
191            
192 37         94 return $id;
193             }
194              
195             sub _try {
196 20007     20007   32183 my ($self, $what, $cb) = (shift, shift, shift);
197 20007 100       28711 eval { $self->$cb(@_); 1 } or $self->emit(error => "$what failed: $@");
  20007         58690  
  20006         75468  
198             }
199              
200             1;
201              
202             =head1 NAME
203              
204             Mojo::Reactor::Epoll - epoll backend for Mojo::Reactor
205              
206             =head1 SYNOPSIS
207              
208             use Mojo::Reactor::Epoll;
209              
210             # Watch if handle becomes readable or writable
211             my $reactor = Mojo::Reactor::Epoll->new;
212             $reactor->io($first => sub {
213             my ($reactor, $writable) = @_;
214             say $writable ? 'First handle is writable' : 'First handle is readable';
215             });
216              
217             # Change to watching only if handle becomes writable
218             $reactor->watch($first, 0, 1);
219              
220             # Turn file descriptor into handle and watch if it becomes readable
221             my $second = IO::Handle->new_from_fd($fd, 'r');
222             $reactor->io($second => sub {
223             my ($reactor, $writable) = @_;
224             say $writable ? 'Second handle is writable' : 'Second handle is readable';
225             })->watch($second, 1, 0);
226              
227             # Add a timer
228             $reactor->timer(15 => sub {
229             my $reactor = shift;
230             $reactor->remove($first);
231             $reactor->remove($second);
232             say 'Timeout!';
233             });
234              
235             # Start reactor if necessary
236             $reactor->start unless $reactor->is_running;
237              
238             # Or in an application using Mojo::IOLoop
239             use Mojo::Reactor::Epoll;
240             use Mojo::IOLoop;
241            
242             # Or in a Mojolicious application
243             $ MOJO_REACTOR=Mojo::Reactor::Epoll hypnotoad script/myapp
244              
245             =head1 DESCRIPTION
246              
247             L is an event reactor for L that uses the
248             L Linux subsystem. The usage is exactly the same as other
249             L implementations such as L.
250             L will be used as the default backend for L
251             if it is loaded before L or any module using the loop. However,
252             when invoking a L application through L or L,
253             the reactor must be set as the default by setting the C
254             environment variable to C.
255              
256             =head1 EVENTS
257              
258             L inherits all events from L.
259              
260             =head1 METHODS
261              
262             L inherits all methods from L and
263             implements the following new ones.
264              
265             =head2 again
266              
267             $reactor->again($id);
268              
269             Restart timer. Note that this method requires an active timer.
270              
271             =head2 io
272              
273             $reactor = $reactor->io($handle => sub {...});
274              
275             Watch handle for I/O events, invoking the callback whenever handle becomes
276             readable or writable.
277              
278             # Callback will be invoked twice if handle becomes readable and writable
279             $reactor->io($handle => sub {
280             my ($reactor, $writable) = @_;
281             say $writable ? 'Handle is writable' : 'Handle is readable';
282             });
283              
284             =head2 is_running
285              
286             my $bool = $reactor->is_running;
287              
288             Check if reactor is running.
289              
290             =head2 next_tick
291              
292             my $undef = $reactor->next_tick(sub {...});
293              
294             Invoke callback as soon as possible, but not before returning or other
295             callbacks that have been registered with this method, always returns C.
296              
297             =head2 one_tick
298              
299             $reactor->one_tick;
300              
301             Run reactor until an event occurs or no events are being watched anymore. Note
302             that this method can recurse back into the reactor, so you need to be careful.
303              
304             # Don't block longer than 0.5 seconds
305             my $id = $reactor->timer(0.5 => sub {});
306             $reactor->one_tick;
307             $reactor->remove($id);
308              
309             =head2 recurring
310              
311             my $id = $reactor->recurring(0.25 => sub {...});
312              
313             Create a new recurring timer, invoking the callback repeatedly after a given
314             amount of time in seconds.
315              
316             =head2 remove
317              
318             my $bool = $reactor->remove($handle);
319             my $bool = $reactor->remove($id);
320              
321             Remove handle or timer.
322              
323             =head2 reset
324              
325             $reactor->reset;
326              
327             Remove all handles and timers.
328              
329             =head2 start
330              
331             $reactor->start;
332              
333             Start watching for I/O and timer events, this will block until L is
334             called or no events are being watched anymore.
335              
336             # Start reactor only if it is not running already
337             $reactor->start unless $reactor->is_running;
338              
339             =head2 stop
340              
341             $reactor->stop;
342              
343             Stop watching for I/O and timer events.
344              
345             =head2 timer
346              
347             my $id = $reactor->timer(0.5 => sub {...});
348              
349             Create a new timer, invoking the callback after a given amount of time in
350             seconds.
351              
352             =head2 watch
353              
354             $reactor = $reactor->watch($handle, $readable, $writable);
355              
356             Change I/O events to watch handle for with true and false values. Note that
357             this method requires an active I/O watcher.
358              
359             # Watch only for readable events
360             $reactor->watch($handle, 1, 0);
361              
362             # Watch only for writable events
363             $reactor->watch($handle, 0, 1);
364              
365             # Watch for readable and writable events
366             $reactor->watch($handle, 1, 1);
367              
368             # Pause watching for events
369             $reactor->watch($handle, 0, 0);
370              
371             =head1 CAVEATS
372              
373             The epoll notification facility is exclusive to Linux systems.
374              
375             The epoll handle is not usable across forks, and this is not currently managed
376             for you, though it is not created until the loop is started to allow for
377             preforking deployments such as L.
378              
379             =head1 BUGS
380              
381             Report any issues on the public bugtracker.
382              
383             =head1 AUTHOR
384              
385             Dan Book, C
386              
387             =head1 COPYRIGHT AND LICENSE
388              
389             Copyright 2015, Dan Book.
390              
391             This library is free software; you may redistribute it and/or modify it under
392             the terms of the Artistic License version 2.0.
393              
394             =head1 SEE ALSO
395              
396             L, L, L