File Coverage

blib/lib/Mojo/Reactor/Poll.pm
Criterion Covered Total %
statement 100 100 100.0
branch 43 46 93.4
condition 14 23 60.8
subroutine 22 22 100.0
pod 12 12 100.0
total 191 203 94.0


line stmt bran cond sub pod time code
1             package Mojo::Reactor::Poll;
2 64     64   146097 use Mojo::Base 'Mojo::Reactor';
  64         155  
  64         509  
3              
4 64     64   557 use Carp qw(croak);
  64         185  
  64         3403  
5 64     64   426 use IO::Poll qw(POLLERR POLLHUP POLLIN POLLNVAL POLLOUT POLLPRI);
  64         145  
  64         4347  
6 64     64   453 use List::Util qw(min);
  64         162  
  64         6244  
7 64     64   403 use Mojo::Util qw(md5_sum steady_time);
  64         146  
  64         3501  
8 64     64   446 use Time::HiRes qw(usleep);
  64         208  
  64         882  
9              
10             sub again {
11 12384     12384 1 26169 my ($self, $id, $after) = @_;
12 12384 100       34391 croak 'Timer not active' unless my $timer = $self->{timers}{$id};
13 12383 100       25884 $timer->{after} = $after if defined $after;
14 12383         31786 $timer->{time} = steady_time + $timer->{after};
15             }
16              
17             sub io {
18 984     984 1 3917 my ($self, $handle, $cb) = @_;
19 984   33     5538 $self->{io}{fileno($handle) // croak 'Handle is closed'} = {cb => $cb};
20 984         2775 return $self->watch($handle, 1, 1);
21             }
22              
23 3292     3292 1 16172 sub is_running { !!shift->{running} }
24              
25             sub next_tick {
26 2037     2037 1 6613 my ($self, $cb) = @_;
27 2037         3153 push @{$self->{next_tick}}, $cb;
  2037         4566  
28 2037   66     8524 $self->{next_timer} //= $self->timer(0 => \&_next);
29 2037         8176 return undef;
30             }
31              
32             sub one_tick {
33 50052     50052 1 73863 my $self = shift;
34              
35             # Just one tick
36 50052 100       91163 local $self->{running} = 1 unless $self->{running};
37              
38             # Wait for one event
39 50052         63617 my $i;
40 50052   66     148355 until ($i || !$self->{running}) {
41              
42             # Stop automatically if there is nothing to watch
43 50052 100 100     62964 return $self->stop unless keys %{$self->{timers}} || keys %{$self->{io}};
  50052         111869  
  11         71  
44              
45             # Calculate ideal timeout based on timers and round up to next millisecond
46 50042         68393 my $min = min map { $_->{time} } values %{$self->{timers}};
  97984         196170  
  50042         91798  
47 50042 100       123936 my $timeout = defined $min ? $min - steady_time : 0.5;
48 50042 100       203746 $timeout = $timeout <= 0 ? 0 : int($timeout * 1000) + 1;
49              
50             # I/O
51 50042 100       62687 if (keys %{$self->{io}}) {
  50042 100       108557  
52 23957         31922 my @poll = map { $_ => $self->{io}{$_}{mode} } keys %{$self->{io}};
  59989         123363  
  23957         47557  
53              
54             # This may break in the future, but is worth it for performance
55 23957 100       6070251 if (IO::Poll::_poll($timeout, @poll) > 0) {
56 22914         92289 while (my ($fd, $mode) = splice @poll, 0, 2) {
57              
58 55820 100       106622 if ($mode & (POLLIN | POLLPRI | POLLNVAL | POLLHUP | POLLERR)) {
59 18928 100       51660 next unless my $io = $self->{io}{$fd};
60 18926 50       55760 ++$i and $self->_try('I/O watcher', $io->{cb}, 0);
61             }
62 55818 100 100     201594 next unless $mode & POLLOUT && (my $io = $self->{io}{$fd});
63 9629 50       31613 ++$i and $self->_try('I/O watcher', $io->{cb}, 1);
64             }
65             }
66             }
67              
68             # Wait for timeout if poll can't be used
69 18         852587 elsif ($timeout) { usleep($timeout * 1000) }
70              
71             # Timers (time should not change in between timers)
72 50042         117316 my $now = steady_time;
73 50042         162313 for my $id (keys %{$self->{timers}}) {
  50042         115686  
74 98036 100       192505 next unless my $t = $self->{timers}{$id};
75 98035 100       312866 next unless $t->{time} <= $now;
76              
77             # Recurring timer
78 28206 100       45513 if ($t->{recurring}) { $t->{time} = $now + $t->{after} }
  26869         39656  
79              
80             # Normal timer
81 1337         4261 else { $self->remove($id) }
82              
83 28206 50 33     77979 ++$i and $self->_try('Timer', $t->{cb}) if $t->{cb};
84             }
85             }
86             }
87              
88 12     12 1 695 sub recurring { shift->_timer(1, @_) }
89              
90             sub remove {
91 3292     3292 1 11705 my ($self, $remove) = @_;
92 3292 100       13786 return !!delete $self->{timers}{$remove} unless ref $remove;
93 911   33     18190 return !!delete $self->{io}{fileno($remove) // croak 'Handle is closed'};
94             }
95              
96 7     7 1 665 sub reset { delete @{shift()}{qw(events io next_tick next_timer running timers)} }
  7         70  
97              
98             sub start {
99 1033     1033 1 1982 my $self = shift;
100 1033   50     5128 local $self->{running} = ($self->{running} || 0) + 1;
101 1033         3572 $self->one_tick while $self->{running};
102             }
103              
104 1038     1038 1 3502 sub stop { delete shift->{running} }
105              
106 1950     1950 1 13999 sub timer { shift->_timer(0, @_) }
107              
108             sub watch {
109 11356     11356 1 24308 my ($self, $handle, $read, $write) = @_;
110              
111 11356 100       40454 croak 'I/O watcher not active' unless my $io = $self->{io}{fileno $handle};
112 11355         20477 $io->{mode} = 0;
113 11355 100       25439 $io->{mode} |= POLLIN | POLLPRI if $read;
114 11355 100       23117 $io->{mode} |= POLLOUT if $write;
115              
116 11355         24769 return $self;
117             }
118              
119             sub _id {
120 1962     1962   3189 my $self = shift;
121 1962         2899 my $id;
122 1962         2902 do { $id = md5_sum 't' . steady_time . rand } while $self->{timers}{$id};
  1962         5342  
123 1962         36145 return $id;
124             }
125              
126             sub _next {
127 1277     1277   2172 my $self = shift;
128 1277         2905 delete $self->{next_timer};
129 1277         2224 while (my $cb = shift @{$self->{next_tick}}) { $self->$cb }
  2036         5959  
  3313         9637  
130             }
131              
132             sub _timer {
133 1962     1962   4424 my ($self, $recurring, $after, $cb) = @_;
134 1962         4521 my $id = $self->_id;
135 1962         5160 $self->{timers}{$id} = {cb => $cb, after => $after, recurring => $recurring, time => steady_time + $after};
136 1962         20817 return $id;
137             }
138              
139             sub _try {
140 56761     56761   107523 my ($self, $what, $cb) = (shift, shift, shift);
141 56761 100       78396 eval { $self->$cb(@_); 1 } or $self->emit(error => "$what failed: $@");
  56761         150809  
  56758         255221  
142             }
143              
144             1;
145              
146             =encoding utf8
147              
148             =head1 NAME
149              
150             Mojo::Reactor::Poll - Low-level event reactor with poll support
151              
152             =head1 SYNOPSIS
153              
154             use Mojo::Reactor::Poll;
155              
156             # Watch if handle becomes readable or writable
157             my $reactor = Mojo::Reactor::Poll->new;
158             $reactor->io($first => sub ($reactor, $writable) {
159             say $writable ? 'First handle is writable' : 'First handle is readable';
160             });
161              
162             # Change to watching only if handle becomes writable
163             $reactor->watch($first, 0, 1);
164              
165             # Turn file descriptor into handle and watch if it becomes readable
166             my $second = IO::Handle->new_from_fd($fd, 'r');
167             $reactor->io($second => sub ($reactor, $writable) {
168             say $writable ? 'Second handle is writable' : 'Second handle is readable';
169             })->watch($second, 1, 0);
170              
171             # Add a timer
172             $reactor->timer(15 => sub ($reactor) {
173             $reactor->remove($first);
174             $reactor->remove($second);
175             say 'Timeout!';
176             });
177              
178             # Start reactor if necessary
179             $reactor->start unless $reactor->is_running;
180              
181             =head1 DESCRIPTION
182              
183             L is a low-level event reactor based on L.
184              
185             =head1 EVENTS
186              
187             L inherits all events from L.
188              
189             =head1 METHODS
190              
191             L inherits all methods from L and implements the following new ones.
192              
193             =head2 again
194              
195             $reactor->again($id);
196             $reactor->again($id, 0.5);
197              
198             Restart timer and optionally change the invocation time. Note that this method requires an active timer.
199              
200             =head2 io
201              
202             $reactor = $reactor->io($handle => sub {...});
203              
204             Watch handle for I/O events, invoking the callback whenever handle becomes readable or writable.
205              
206             # Callback will be executed twice if handle becomes readable and writable
207             $reactor->io($handle => sub ($reactor, $writable) {
208             say $writable ? 'Handle is writable' : 'Handle is readable';
209             });
210              
211             =head2 is_running
212              
213             my $bool = $reactor->is_running;
214              
215             Check if reactor is running.
216              
217             =head2 next_tick
218              
219             my $undef = $reactor->next_tick(sub {...});
220              
221             Execute callback as soon as possible, but not before returning or other callbacks that have been registered with this
222             method, always returns C.
223              
224             =head2 one_tick
225              
226             $reactor->one_tick;
227              
228             Run reactor until an event occurs or no events are being watched anymore.
229              
230             # Don't block longer than 0.5 seconds
231             my $id = $reactor->timer(0.5 => sub {});
232             $reactor->one_tick;
233             $reactor->remove($id);
234              
235             =head2 recurring
236              
237             my $id = $reactor->recurring(0.25 => sub {...});
238              
239             Create a new recurring timer, invoking the callback repeatedly after a given amount of time in seconds.
240              
241             =head2 remove
242              
243             my $bool = $reactor->remove($handle);
244             my $bool = $reactor->remove($id);
245              
246             Remove handle or timer.
247              
248             =head2 reset
249              
250             $reactor->reset;
251              
252             Remove all handles and timers.
253              
254             =head2 start
255              
256             $reactor->start;
257              
258             Start watching for I/O and timer events, this will block until L is called or no events are being watched
259             anymore.
260              
261             # Start reactor only if it is not running already
262             $reactor->start unless $reactor->is_running;
263              
264             =head2 stop
265              
266             $reactor->stop;
267              
268             Stop watching for I/O and timer events.
269              
270             =head2 timer
271              
272             my $id = $reactor->timer(0.5 => sub {...});
273              
274             Create a new timer, invoking the callback after a given amount of time in seconds.
275              
276             =head2 watch
277              
278             $reactor = $reactor->watch($handle, $readable, $writable);
279              
280             Change I/O events to watch handle for with true and false values. Note that this method requires an active I/O watcher.
281              
282             # Watch only for readable events
283             $reactor->watch($handle, 1, 0);
284              
285             # Watch only for writable events
286             $reactor->watch($handle, 0, 1);
287              
288             # Watch for readable and writable events
289             $reactor->watch($handle, 1, 1);
290              
291             # Pause watching for events
292             $reactor->watch($handle, 0, 0);
293              
294             =head1 SEE ALSO
295              
296             L, L, L.
297              
298             =cut