File Coverage

blib/lib/Mojo/Reactor/Poll.pm
Criterion Covered Total %
statement 100 100 100.0
branch 42 46 91.3
condition 14 23 60.8
subroutine 22 22 100.0
pod 12 12 100.0
total 190 203 93.6


line stmt bran cond sub pod time code
1             package Mojo::Reactor::Poll;
2 63     63   139180 use Mojo::Base 'Mojo::Reactor';
  63         157  
  63         415  
3              
4 63     63   455 use Carp qw(croak);
  63         153  
  63         3047  
5 63     63   506 use IO::Poll qw(POLLERR POLLHUP POLLIN POLLNVAL POLLOUT POLLPRI);
  63         141  
  63         4093  
6 63     63   414 use List::Util qw(min);
  63         151  
  63         3920  
7 63     63   389 use Mojo::Util qw(md5_sum steady_time);
  63         142  
  63         2814  
8 63     63   397 use Time::HiRes qw(usleep);
  63         140  
  63         2521  
9              
10             sub again {
11 12432     12432 1 25697 my ($self, $id, $after) = @_;
12 12432 100       33442 croak 'Timer not active' unless my $timer = $self->{timers}{$id};
13 12431 100       26619 $timer->{after} = $after if defined $after;
14 12431         31433 $timer->{time} = steady_time + $timer->{after};
15             }
16              
17             sub io {
18 979     979 1 3756 my ($self, $handle, $cb) = @_;
19 979   33     5191 $self->{io}{fileno($handle) // croak 'Handle is closed'} = {cb => $cb};
20 979         2755 return $self->watch($handle, 1, 1);
21             }
22              
23 3256     3256 1 16431 sub is_running { !!shift->{running} }
24              
25             sub next_tick {
26 2006     2006 1 6564 my ($self, $cb) = @_;
27 2006         3016 push @{$self->{next_tick}}, $cb;
  2006         4420  
28 2006   66     8302 $self->{next_timer} //= $self->timer(0 => \&_next);
29 2006         7705 return undef;
30             }
31              
32             sub one_tick {
33 51171     51171 1 72696 my $self = shift;
34              
35             # Just one tick
36 51171 100       88204 local $self->{running} = 1 unless $self->{running};
37              
38             # Wait for one event
39 51171         63565 my $i;
40 51171   66     148757 until ($i || !$self->{running}) {
41              
42             # Stop automatically if there is nothing to watch
43 51171 100 100     66182 return $self->stop unless keys %{$self->{timers}} || keys %{$self->{io}};
  51171         110401  
  11         69  
44              
45             # Calculate ideal timeout based on timers and round up to next millisecond
46 51161         69954 my $min = min map { $_->{time} } values %{$self->{timers}};
  99223         195780  
  51161         90127  
47 51161 100       126912 my $timeout = defined $min ? $min - steady_time : 0.5;
48 51161 100       202470 $timeout = $timeout <= 0 ? 0 : int($timeout * 1000) + 1;
49              
50             # I/O
51 51161 100       64432 if (keys %{$self->{io}}) {
  51161 100       106615  
52 25134         33017 my @poll = map { $_ => $self->{io}{$_}{mode} } keys %{$self->{io}};
  61450         126063  
  25134         47939  
53              
54             # This may break in the future, but is worth it for performance
55 25134 100       6068833 if (IO::Poll::_poll($timeout, @poll) > 0) {
56 24104         86911 while (my ($fd, $mode) = splice @poll, 0, 2) {
57              
58 57332 100       104613 if ($mode & (POLLIN | POLLPRI | POLLNVAL | POLLHUP | POLLERR)) {
59 20085 50       52186 next unless my $io = $self->{io}{$fd};
60 20085 50       57186 ++$i and $self->_try('I/O watcher', $io->{cb}, 0);
61             }
62 57332 100 100     197262 next unless $mode & POLLOUT && (my $io = $self->{io}{$fd});
63 9815 50       31066 ++$i and $self->_try('I/O watcher', $io->{cb}, 1);
64             }
65             }
66             }
67              
68             # Wait for timeout if poll can't be used
69 18         851728 elsif ($timeout) { usleep($timeout * 1000) }
70              
71             # Timers (time should not change in between timers)
72 51161         114344 my $now = steady_time;
73 51161         162297 for my $id (keys %{$self->{timers}}) {
  51161         117057  
74 99275 100       189645 next unless my $t = $self->{timers}{$id};
75 99270 100       307159 next unless $t->{time} <= $now;
76              
77             # Recurring timer
78 28177 100       43646 if ($t->{recurring}) { $t->{time} = $now + $t->{after} }
  26854         41145  
79              
80             # Normal timer
81 1323         3737 else { $self->remove($id) }
82              
83 28177 50 33     76876 ++$i and $self->_try('Timer', $t->{cb}) if $t->{cb};
84             }
85             }
86             }
87              
88 12     12 1 671 sub recurring { shift->_timer(1, @_) }
89              
90             sub remove {
91 3260     3260 1 12039 my ($self, $remove) = @_;
92 3260 100       12926 return !!delete $self->{timers}{$remove} unless ref $remove;
93 901   33     15897 return !!delete $self->{io}{fileno($remove) // croak 'Handle is closed'};
94             }
95              
96 7     7 1 626 sub reset { delete @{shift()}{qw(events io next_tick next_timer running timers)} }
  7         73  
97              
98             sub start {
99 1021     1021 1 2019 my $self = shift;
100 1021   50     4750 local $self->{running} = ($self->{running} || 0) + 1;
101 1021         3496 $self->one_tick while $self->{running};
102             }
103              
104 1025     1025 1 3356 sub stop { delete shift->{running} }
105              
106 1935     1935 1 11926 sub timer { shift->_timer(0, @_) }
107              
108             sub watch {
109 11400     11400 1 24169 my ($self, $handle, $read, $write) = @_;
110              
111 11400 100       38726 croak 'I/O watcher not active' unless my $io = $self->{io}{fileno $handle};
112 11399         20412 $io->{mode} = 0;
113 11399 100       25281 $io->{mode} |= POLLIN | POLLPRI if $read;
114 11399 100       22770 $io->{mode} |= POLLOUT if $write;
115              
116 11399         23764 return $self;
117             }
118              
119             sub _id {
120 1947     1947   3101 my $self = shift;
121 1947         2924 my $id;
122 1947         2855 do { $id = md5_sum 't' . steady_time . rand } while $self->{timers}{$id};
  1947         5258  
123 1947         36428 return $id;
124             }
125              
126             sub _next {
127 1263     1263   2124 my $self = shift;
128 1263         2709 delete $self->{next_timer};
129 1263         2220 while (my $cb = shift @{$self->{next_tick}}) { $self->$cb }
  2005         5715  
  3268         9607  
130             }
131              
132             sub _timer {
133 1947     1947   4371 my ($self, $recurring, $after, $cb) = @_;
134 1947         4484 my $id = $self->_id;
135 1947         5191 $self->{timers}{$id} = {cb => $cb, after => $after, recurring => $recurring, time => steady_time + $after};
136 1947         19608 return $id;
137             }
138              
139             sub _try {
140 58077     58077   110752 my ($self, $what, $cb) = (shift, shift, shift);
141 58077 100       79967 eval { $self->$cb(@_); 1 } or $self->emit(error => "$what failed: $@");
  58077         147487  
  58074         257436  
142             }
143              
144             1;
145              
146             =encoding utf8
147              
148             =head1 NAME
149              
150             Mojo::Reactor::Poll - Low-level event reactor with poll support
151              
152             =head1 SYNOPSIS
153              
154             use Mojo::Reactor::Poll;
155              
156             # Watch if handle becomes readable or writable
157             my $reactor = Mojo::Reactor::Poll->new;
158             $reactor->io($first => sub ($reactor, $writable) {
159             say $writable ? 'First handle is writable' : 'First handle is readable';
160             });
161              
162             # Change to watching only if handle becomes writable
163             $reactor->watch($first, 0, 1);
164              
165             # Turn file descriptor into handle and watch if it becomes readable
166             my $second = IO::Handle->new_from_fd($fd, 'r');
167             $reactor->io($second => sub ($reactor, $writable) {
168             say $writable ? 'Second handle is writable' : 'Second handle is readable';
169             })->watch($second, 1, 0);
170              
171             # Add a timer
172             $reactor->timer(15 => sub ($reactor) {
173             $reactor->remove($first);
174             $reactor->remove($second);
175             say 'Timeout!';
176             });
177              
178             # Start reactor if necessary
179             $reactor->start unless $reactor->is_running;
180              
181             =head1 DESCRIPTION
182              
183             L is a low-level event reactor based on L.
184              
185             =head1 EVENTS
186              
187             L inherits all events from L.
188              
189             =head1 METHODS
190              
191             L inherits all methods from L and implements the following new ones.
192              
193             =head2 again
194              
195             $reactor->again($id);
196             $reactor->again($id, 0.5);
197              
198             Restart timer and optionally change the invocation time. Note that this method requires an active timer.
199              
200             =head2 io
201              
202             $reactor = $reactor->io($handle => sub {...});
203              
204             Watch handle for I/O events, invoking the callback whenever handle becomes readable or writable.
205              
206             # Callback will be executed twice if handle becomes readable and writable
207             $reactor->io($handle => sub ($reactor, $writable) {
208             say $writable ? 'Handle is writable' : 'Handle is readable';
209             });
210              
211             =head2 is_running
212              
213             my $bool = $reactor->is_running;
214              
215             Check if reactor is running.
216              
217             =head2 next_tick
218              
219             my $undef = $reactor->next_tick(sub {...});
220              
221             Execute callback as soon as possible, but not before returning or other callbacks that have been registered with this
222             method, always returns C.
223              
224             =head2 one_tick
225              
226             $reactor->one_tick;
227              
228             Run reactor until an event occurs or no events are being watched anymore.
229              
230             # Don't block longer than 0.5 seconds
231             my $id = $reactor->timer(0.5 => sub {});
232             $reactor->one_tick;
233             $reactor->remove($id);
234              
235             =head2 recurring
236              
237             my $id = $reactor->recurring(0.25 => sub {...});
238              
239             Create a new recurring timer, invoking the callback repeatedly after a given amount of time in seconds.
240              
241             =head2 remove
242              
243             my $bool = $reactor->remove($handle);
244             my $bool = $reactor->remove($id);
245              
246             Remove handle or timer.
247              
248             =head2 reset
249              
250             $reactor->reset;
251              
252             Remove all handles and timers.
253              
254             =head2 start
255              
256             $reactor->start;
257              
258             Start watching for I/O and timer events, this will block until L is called or no events are being watched
259             anymore.
260              
261             # Start reactor only if it is not running already
262             $reactor->start unless $reactor->is_running;
263              
264             =head2 stop
265              
266             $reactor->stop;
267              
268             Stop watching for I/O and timer events.
269              
270             =head2 timer
271              
272             my $id = $reactor->timer(0.5 => sub {...});
273              
274             Create a new timer, invoking the callback after a given amount of time in seconds.
275              
276             =head2 watch
277              
278             $reactor = $reactor->watch($handle, $readable, $writable);
279              
280             Change I/O events to watch handle for with true and false values. Note that this method requires an active I/O watcher.
281              
282             # Watch only for readable events
283             $reactor->watch($handle, 1, 0);
284              
285             # Watch only for writable events
286             $reactor->watch($handle, 0, 1);
287              
288             # Watch for readable and writable events
289             $reactor->watch($handle, 1, 1);
290              
291             # Pause watching for events
292             $reactor->watch($handle, 0, 0);
293              
294             =head1 SEE ALSO
295              
296             L, L, L.
297              
298             =cut