File Coverage

blib/lib/Mojo/Reactor/Epoll.pm
Criterion Covered Total %
statement 115 116 99.1
branch 41 48 85.4
condition 25 44 56.8
subroutine 20 20 100.0
pod 7 7 100.0
total 208 235 88.5


line stmt bran cond sub pod time code
1             package Mojo::Reactor::Epoll;
2 1     1   234264 use Mojo::Base 'Mojo::Reactor::Poll';
  1         14  
  1         7  
3              
4             $ENV{MOJO_REACTOR} ||= 'Mojo::Reactor::Epoll';
5              
6 1     1   35547 use Carp 'croak';
  1         2  
  1         37  
7 1     1   530 use Linux::Epoll;
  1         533  
  1         41  
8 1     1   8 use List::Util 'min';
  1         2  
  1         58  
9 1     1   6 use Mojo::Util qw(md5_sum steady_time);
  1         2  
  1         40  
10 1     1   5 use Scalar::Util 'weaken';
  1         2  
  1         36  
11 1     1   5 use Time::HiRes 'usleep';
  1         1  
  1         6  
12              
13 1   50 1   84 use constant DEBUG => $ENV{MOJO_REACTOR_EPOLL_DEBUG} || 0;
  1         2  
  1         1540  
14              
15             our $VERSION = '0.011';
16              
17             sub io {
18 10     10 1 63963 my ($self, $handle, $cb) = @_;
19 10   33     41 my $fd = fileno($handle) // croak 'Handle is closed';
20 10         35 $self->{io}{$fd}{cb} = $cb;
21 10         15 warn "-- Set IO watcher for $fd\n" if DEBUG;
22 10         30 return $self->watch($handle, 1, 1);
23             }
24              
25             sub one_tick {
26 18592     18592 1 36826 my $self = shift;
27            
28             # Just one tick
29 18592 100       26232 local $self->{running} = 1 unless $self->{running};
30            
31             # Wait for one event
32 18592         18804 my $i;
33 18592   66     44214 until ($i || !$self->{running}) {
34             # Stop automatically if there is nothing to watch
35 18592 100 100     19294 return $self->stop unless keys %{$self->{timers}} || keys %{$self->{io}};
  18592         31467  
  7         38  
36            
37             # Calculate ideal timeout based on timers
38 18586         21002 my $min = min map { $_->{time} } values %{$self->{timers}};
  21416         43691  
  18586         27268  
39 18586 100       37395 my $timeout = defined $min ? ($min - steady_time) : 0.5;
40 18586 100       81052 $timeout = 0 if $timeout < 0;
41            
42             # I/O
43 18586 100       18860 if (my $watched = keys %{$self->{io}}) {
  18586 100       29867  
44             # Set max events to half the number of descriptors, to a minimum of 10
45 16529         24527 my $maxevents = int $watched/2;
46 16529 50       23399 $maxevents = 10 if $maxevents < 10;
47            
48 16529   66     26598 my $epoll = $self->{epoll} // $self->_create_epoll;
49              
50 16529         187921 my $count = $epoll->wait($maxevents, $timeout);
51 16529 50       37190 $i += $count if defined $count;
52             }
53            
54             # Wait for timeout if epoll can't be used
55 7         178240 elsif ($timeout) { usleep $timeout * 1000000 }
56            
57             # Timers (time should not change in between timers)
58 18586         35234 my $now = steady_time;
59 18586         82467 for my $id (keys %{$self->{timers}}) {
  18586         37089  
60 21417 50       37480 next unless my $t = $self->{timers}{$id};
61 21417 100       64406 next unless $t->{time} <= $now;
62            
63             # Recurring timer
64 2855 100       3816 if ($t->{recurring}) { $t->{time} = $now + $t->{after} }
  2826         3620  
65            
66             # Normal timer
67 29         97 else { $self->remove($id) }
68            
69 2855 50 50     6270 ++$i and $self->_try('Timer', $t->{cb}) if $t->{cb};
70             }
71             }
72             }
73              
74 5     5 1 524 sub recurring { shift->_timer(1, @_) }
75              
76             sub remove {
77 40     40 1 3136 my ($self, $remove) = @_;
78 40 100       92 if (ref $remove) {
79 5   33     26 my $fd = fileno($remove) // croak 'Handle is closed';
80 5 50 66     30 if (exists $self->{io}{$fd} and exists $self->{io}{$fd}{epoll_cb}) {
81 3         54 $self->{epoll}->delete($remove);
82             }
83 5         9 warn "-- Removed IO watcher for $fd\n" if DEBUG;
84 5         60 return !!delete $self->{io}{$fd};
85             } else {
86 35         47 warn "-- Removed timer $remove\n" if DEBUG;
87 35         191 return !!delete $self->{timers}{$remove};
88             }
89             }
90              
91             sub reset {
92 4     4 1 1458 my $self = shift;
93 4         7 delete @{$self}{qw(epoll pending_watch)};
  4         40  
94 0         23 $self->SUPER::reset;
95             }
96              
97 35     35 1 264825 sub timer { shift->_timer(0, @_) }
98              
99             sub watch {
100 34     34 1 714 my ($self, $handle, $read, $write) = @_;
101            
102 34         65 my $fd = fileno $handle;
103 34 100       294 croak 'I/O watcher not active' unless my $io = $self->{io}{$fd};
104              
105 33         45 my $epoll = $self->{epoll};
106 33 100       71 unless (defined $epoll) {
107 9         11 push @{$self->{pending_watch}}, [$handle, $read, $write];
  9         20  
108 9         26 return $self;
109             }
110            
111 24         28 my @events;
112 24 100       67 push @events, 'in', 'prio' if $read;
113 24 100       50 push @events, 'out' if $write;
114            
115 24 100       49 my $op = exists $io->{epoll_cb} ? 'modify' : 'add';
116            
117 24         63 weaken $self;
118             my $cb = $io->{epoll_cb} // sub {
119 16530     16530   36538 my ($events) = @_;
120 16530 50 66     31790 if ($events->{in} or $events->{prio} or $events->{hup} or $events->{err}) {
      33        
      33        
121 16186 50       28358 return unless exists $self->{io}{$fd};
122 16186         36293 $self->_try('I/O watcher', $self->{io}{$fd}{cb}, 0);
123             }
124 16530 100 66     59971 if ($events->{out} or $events->{hup} or $events->{err}) {
      33        
125 2040 100       3196 return unless exists $self->{io}{$fd};
126 2039         3280 $self->_try('I/O watcher', $self->{io}{$fd}{cb}, 1);
127             }
128 24   100     103 };
129 24         376 $epoll->$op($handle, \@events, $cb);
130            
131             # Cache callback for future modify operations, after successfully added to epoll
132 24   66     104 $io->{epoll_cb} //= $cb;
133            
134 24         77 return $self;
135             }
136              
137             sub _create_epoll {
138 3     3   5 my $self = shift;
139 3         255 $self->{epoll} = Linux::Epoll->new;
140 3   50     11 $self->watch(@$_) for @{delete $self->{pending_watch} // []};
  3         20  
141 3         25 return $self->{epoll};
142             }
143              
144             sub _id {
145 40     40   63 my $self = shift;
146 40         53 my $id;
147 40         55 do { $id = md5_sum 't' . steady_time . rand } while $self->{timers}{$id};
  40         95  
148 40         796 return $id;
149             }
150              
151             sub _timer {
152 40     40   120 my ($self, $recurring, $after, $cb) = @_;
153            
154 40         99 my $id = $self->_id;
155 40         105 my $timer = $self->{timers}{$id} = {
156             cb => $cb,
157             after => $after,
158             recurring => $recurring,
159             time => steady_time + $after,
160             };
161            
162 40         307 if (DEBUG) {
163             my $is_recurring = $recurring ? ' (recurring)' : '';
164             warn "-- Set timer $id after $after seconds$is_recurring\n";
165             }
166            
167 40         115 return $id;
168             }
169              
170             sub _try {
171 21080     21080   31340 my ($self, $what, $cb) = (shift, shift, shift);
172 21080 100       25714 eval { $self->$cb(@_); 1 } or $self->emit(error => "$what failed: $@");
  21080         52437  
  21079         68692  
173             }
174              
175             1;
176              
177             =head1 NAME
178              
179             Mojo::Reactor::Epoll - epoll backend for Mojo::Reactor
180              
181             =head1 SYNOPSIS
182              
183             use Mojo::Reactor::Epoll;
184              
185             # Watch if handle becomes readable or writable
186             my $reactor = Mojo::Reactor::Epoll->new;
187             $reactor->io($first => sub {
188             my ($reactor, $writable) = @_;
189             say $writable ? 'First handle is writable' : 'First handle is readable';
190             });
191              
192             # Change to watching only if handle becomes writable
193             $reactor->watch($first, 0, 1);
194              
195             # Turn file descriptor into handle and watch if it becomes readable
196             my $second = IO::Handle->new_from_fd($fd, 'r');
197             $reactor->io($second => sub {
198             my ($reactor, $writable) = @_;
199             say $writable ? 'Second handle is writable' : 'Second handle is readable';
200             })->watch($second, 1, 0);
201              
202             # Add a timer
203             $reactor->timer(15 => sub {
204             my $reactor = shift;
205             $reactor->remove($first);
206             $reactor->remove($second);
207             say 'Timeout!';
208             });
209              
210             # Start reactor if necessary
211             $reactor->start unless $reactor->is_running;
212              
213             # Or in an application using Mojo::IOLoop
214             use Mojo::Reactor::Epoll;
215             use Mojo::IOLoop;
216            
217             # Or in a Mojolicious application
218             $ MOJO_REACTOR=Mojo::Reactor::Epoll hypnotoad script/myapp
219              
220             =head1 DESCRIPTION
221              
222             L is an event reactor for L that uses the
223             L Linux subsystem. The usage is exactly the same as other
224             L implementations such as L.
225             L will be used as the default backend for L
226             if it is loaded before L or any module using the loop. However,
227             when invoking a L application through L or L,
228             the reactor must be set as the default by setting the C
229             environment variable to C.
230              
231             =head1 EVENTS
232              
233             L inherits all events from L.
234              
235             =head1 METHODS
236              
237             L inherits all methods from L and
238             implements the following new ones.
239              
240             =head2 io
241              
242             $reactor = $reactor->io($handle => sub {...});
243              
244             Watch handle for I/O events, invoking the callback whenever handle becomes
245             readable or writable.
246              
247             # Callback will be invoked twice if handle becomes readable and writable
248             $reactor->io($handle => sub {
249             my ($reactor, $writable) = @_;
250             say $writable ? 'Handle is writable' : 'Handle is readable';
251             });
252              
253             =head2 one_tick
254              
255             $reactor->one_tick;
256              
257             Run reactor until an event occurs or no events are being watched anymore. Note
258             that this method can recurse back into the reactor, so you need to be careful.
259              
260             # Don't block longer than 0.5 seconds
261             my $id = $reactor->timer(0.5 => sub {});
262             $reactor->one_tick;
263             $reactor->remove($id);
264              
265             =head2 recurring
266              
267             my $id = $reactor->recurring(0.25 => sub {...});
268              
269             Create a new recurring timer, invoking the callback repeatedly after a given
270             amount of time in seconds.
271              
272             =head2 remove
273              
274             my $bool = $reactor->remove($handle);
275             my $bool = $reactor->remove($id);
276              
277             Remove handle or timer.
278              
279             =head2 reset
280              
281             $reactor->reset;
282              
283             Remove all handles and timers.
284              
285             =head2 timer
286              
287             my $id = $reactor->timer(0.5 => sub {...});
288              
289             Create a new timer, invoking the callback after a given amount of time in
290             seconds.
291              
292             =head2 watch
293              
294             $reactor = $reactor->watch($handle, $readable, $writable);
295              
296             Change I/O events to watch handle for with true and false values. Note that
297             this method requires an active I/O watcher.
298              
299             # Watch only for readable events
300             $reactor->watch($handle, 1, 0);
301              
302             # Watch only for writable events
303             $reactor->watch($handle, 0, 1);
304              
305             # Watch for readable and writable events
306             $reactor->watch($handle, 1, 1);
307              
308             # Pause watching for events
309             $reactor->watch($handle, 0, 0);
310              
311             =head1 CAVEATS
312              
313             The epoll notification facility is exclusive to Linux systems.
314              
315             The epoll handle is not usable across forks, and this is not currently managed
316             for you, though it is not created until the loop is started to allow for
317             preforking deployments such as L.
318              
319             =head1 BUGS
320              
321             Report any issues on the public bugtracker.
322              
323             =head1 AUTHOR
324              
325             Dan Book, C
326              
327             =head1 COPYRIGHT AND LICENSE
328              
329             Copyright 2015, Dan Book.
330              
331             This library is free software; you may redistribute it and/or modify it under
332             the terms of the Artistic License version 2.0.
333              
334             =head1 SEE ALSO
335              
336             L, L, L