File Coverage

blib/lib/Mojo/Reactor/IOAsync.pm
Criterion Covered Total %
statement 112 112 100.0
branch 30 34 88.2
condition 8 14 57.1
subroutine 25 25 100.0
pod 10 10 100.0
total 185 195 94.8


line stmt bran cond sub pod time code
1             package Mojo::Reactor::IOAsync;
2 1     1   285034 use Mojo::Base 'Mojo::Reactor::Poll';
  1         13  
  1         9  
3              
4             $ENV{MOJO_REACTOR} ||= 'Mojo::Reactor::IOAsync';
5              
6 1     1   44079 use Carp 'croak';
  1         3  
  1         46  
7 1     1   852 use IO::Async::Loop;
  1         31946  
  1         36  
8 1     1   606 use IO::Async::Handle;
  1         22437  
  1         41  
9 1     1   580 use IO::Async::Timer::Countdown;
  1         1800  
  1         34  
10 1     1   8 use Mojo::Util qw(md5_sum steady_time);
  1         4  
  1         57  
11 1     1   7 use Scalar::Util 'weaken';
  1         2  
  1         51  
12              
13 1   50 1   7 use constant DEBUG => $ENV{MOJO_REACTOR_IOASYNC_DEBUG} || 0;
  1         1  
  1         1931  
14              
15             our $VERSION = '1.002';
16              
17             my $IOAsync;
18              
19             # Use IO::Async::Loop singleton for the first instance only
20             sub new {
21 7     7 1 107923 my $self = shift->SUPER::new;
22 7 100       66 if ($IOAsync++) {
23 4         18 $self->{loop} = IO::Async::Loop->really_new;
24             } else {
25 3         27 $self->{loop} = IO::Async::Loop->new;
26 3         3964 $self->{loop_singleton} = 1;
27             }
28 7         463 return $self;
29             }
30              
31             sub DESTROY {
32 6     6   3260 my $self = shift;
33 6         31 $self->reset;
34 6 100       87 undef $IOAsync if $self->{loop_singleton};
35             }
36              
37             sub again {
38 9     9 1 517 my ($self, $id, $after) = @_;
39 9 100       284 croak 'Timer not active' unless my $timer = $self->{timers}{$id};
40 8         19 my $w = $timer->{watcher};
41 8         36 $w->stop;
42 8 100       869 $w->configure(delay => $after) if defined $after;
43 8         107 $w->start;
44             }
45              
46             sub io {
47 10     10 1 6556 my ($self, $handle, $cb) = @_;
48 10   33     48 my $fd = fileno($handle) // croak 'Handle is closed';
49 10         40 $self->{io}{$fd}{cb} = $cb;
50 10         17 warn "-- Set IO watcher for $fd\n" if DEBUG;
51 10         35 return $self->watch($handle, 1, 1);
52             }
53              
54             sub one_tick {
55 6847     6847 1 318333 my $self = shift;
56            
57             # Stop automatically if there is nothing to watch
58 6847 100 100     9927 return $self->stop unless keys %{$self->{timers}} || keys %{$self->{io}} || $self->{loop}->notifiers;
  6847   66     17964  
  7         57  
59            
60             # Just one tick
61 6841 100       13374 local $self->{running} = 1 unless $self->{running};
62            
63 6841         16669 $self->{loop}->loop_once;
64             }
65              
66 5     5 1 664 sub recurring { shift->_timer(1, @_) }
67              
68             sub remove {
69 11     11 1 3390 my ($self, $remove) = @_;
70 11 50       41 return !!0 unless defined $remove;
71 11 100       42 if (ref $remove) {
72 5   33     25 my $fd = fileno($remove) // croak 'Handle is closed';
73 5         21 my $io = delete $self->{io}{$fd};
74 5 100       19 if ($io) {
75 3         5 warn "-- Removed IO watcher for $fd\n" if DEBUG;
76 3 50       34 $io->{watcher}->remove_from_parent if $io->{watcher};
77             }
78 5         813 return !!$io;
79             } else {
80 6         24 my $timer = delete $self->{timers}{$remove};
81 6 100       19 if ($timer) {
82 5         6 warn "-- Removed timer $remove\n" if DEBUG;
83 5 50       28 $timer->{watcher}->remove_from_parent if $timer->{watcher};
84             }
85 6         907 return !!$timer;
86             }
87             }
88              
89             sub reset {
90 10     10 1 1816 my $self = shift;
91 10         17 $_->remove_from_parent for
92 7 50       84 map { $_->{watcher} ? ($_->{watcher}) : () }
93 10         35 values %{$self->{io}}, values %{$self->{timers}};
  10         35  
94 10         1243 $self->SUPER::reset;
95             }
96              
97             sub stop {
98 26     26 1 273 my $self = shift;
99 26         70 delete $self->{running};
100 26         92 $self->{loop}->loop_stop;
101             }
102              
103 35     35 1 13987 sub timer { shift->_timer(0, @_) }
104              
105             sub watch {
106 25     25 1 531 my ($self, $handle, $read, $write) = @_;
107            
108 25         63 my $fd = fileno $handle;
109 25 100       216 croak 'I/O watcher not active' unless my $io = $self->{io}{$fd};
110 24 100       76 if (my $w = $io->{watcher}) {
111 15         60 $w->want_readready($read);
112 15         797 $w->want_writeready($write);
113             } else {
114 9         29 weaken $self;
115 9     6421   33 my $on_read = sub { $self->_try('I/O watcher', $self->{io}{$fd}{cb}, 0) };
  6421         718504  
116 9     689   28 my $on_write = sub { $self->_try('I/O watcher', $self->{io}{$fd}{cb}, 1) };
  689         22405  
117 9         64 my $w = $io->{watcher} = IO::Async::Handle->new(
118             handle => $handle,
119             on_read_ready => $on_read,
120             on_write_ready => $on_write,
121             want_readready => $read,
122             want_writeready => $write,
123             );
124 9         1878 $self->{loop}->add($w);
125             }
126            
127 24         3983 return $self;
128             }
129              
130             sub _id {
131 40     40   66 my $self = shift;
132 40         60 my $id;
133 40         73 do { $id = md5_sum 't' . steady_time . rand } while $self->{timers}{$id};
  40         184  
134 40         1082 return $id;
135             }
136              
137             sub _timer {
138 40     40   115 my ($self, $recurring, $after, $cb) = @_;
139            
140 40         123 my $id = $self->_id;
141 40         140 weaken $self;
142             my $on_expire = sub {
143 471     471   308091 my $w = shift;
144 471 100       965 if ($recurring) {
145 442         1163 $w->start;
146             } else {
147 29         112 $w->remove_from_parent;
148 29         2796 delete $self->{timers}{$id};
149             }
150             #warn "-- Event fired for timer $id\n" if DEBUG;
151 471         54647 $self->_try('Timer', $cb);
152 40         152 };
153 40         215 my $w = $self->{timers}{$id}{watcher} = IO::Async::Timer::Countdown->new(
154             delay => $after,
155             on_expire => $on_expire,
156             )->start;
157 40         3596 $self->{loop}->add($w);
158            
159 40         14519 if (DEBUG) {
160             my $is_recurring = $recurring ? ' (recurring)' : '';
161             warn "-- Set timer $id after $after seconds$is_recurring\n";
162             }
163            
164 40         135 return $id;
165             }
166              
167             sub _try {
168 7581     7581   16804 my ($self, $what, $cb) = (shift, shift, shift);
169 7581 100       11622 eval { $self->$cb(@_); 1 } or $self->emit(error => "$what failed: $@");
  7581         20755  
  7580         34604  
170             }
171              
172             =head1 NAME
173              
174             Mojo::Reactor::IOAsync - IO::Async backend for Mojo::Reactor
175              
176             =head1 SYNOPSIS
177              
178             use Mojo::Reactor::IOAsync;
179              
180             # Watch if handle becomes readable or writable
181             my $reactor = Mojo::Reactor::IOAsync->new;
182             $reactor->io($first => sub {
183             my ($reactor, $writable) = @_;
184             say $writable ? 'First handle is writable' : 'First handle is readable';
185             });
186              
187             # Change to watching only if handle becomes writable
188             $reactor->watch($first, 0, 1);
189              
190             # Turn file descriptor into handle and watch if it becomes readable
191             my $second = IO::Handle->new_from_fd($fd, 'r');
192             $reactor->io($second => sub {
193             my ($reactor, $writable) = @_;
194             say $writable ? 'Second handle is writable' : 'Second handle is readable';
195             })->watch($second, 1, 0);
196              
197             # Add a timer
198             $reactor->timer(15 => sub {
199             my $reactor = shift;
200             $reactor->remove($first);
201             $reactor->remove($second);
202             say 'Timeout!';
203             });
204              
205             # Start reactor if necessary
206             $reactor->start unless $reactor->is_running;
207              
208             # Or in an application using Mojo::IOLoop
209             use Mojo::Reactor::IOAsync;
210             use Mojo::IOLoop;
211            
212             # Or in a Mojolicious application
213             $ MOJO_REACTOR=Mojo::Reactor::IOAsync hypnotoad script/myapp
214              
215             =head1 DESCRIPTION
216              
217             L is an event reactor for L that uses
218             L. The usage is exactly the same as other L
219             implementations such as L. L will
220             be used as the default backend for L if it is loaded before
221             L or any module using the loop. However, when invoking a
222             L application through L or L, the reactor must
223             be set as the default by setting the C environment variable to
224             C.
225              
226             =head1 EVENTS
227              
228             L inherits all events from L.
229              
230             =head1 METHODS
231              
232             L inherits all methods from L and
233             implements the following new ones.
234              
235             =head2 new
236              
237             my $reactor = Mojo::Reactor::IOAsync->new;
238              
239             Construct a new L object.
240              
241             =head2 again
242              
243             $reactor->again($id);
244             $reactor->again($id, 0.5);
245              
246             Restart timer and optionally change the invocation time. Note that this method
247             requires an active timer.
248              
249             =head2 io
250              
251             $reactor = $reactor->io($handle => sub {...});
252              
253             Watch handle for I/O events, invoking the callback whenever handle becomes
254             readable or writable.
255              
256             # Callback will be invoked twice if handle becomes readable and writable
257             $reactor->io($handle => sub {
258             my ($reactor, $writable) = @_;
259             say $writable ? 'Handle is writable' : 'Handle is readable';
260             });
261              
262             =head2 one_tick
263              
264             $reactor->one_tick;
265              
266             Run reactor until an event occurs or no events are being watched anymore. See
267             L.
268              
269             # Don't block longer than 0.5 seconds
270             my $id = $reactor->timer(0.5 => sub {});
271             $reactor->one_tick;
272             $reactor->remove($id);
273              
274             =head2 recurring
275              
276             my $id = $reactor->recurring(0.25 => sub {...});
277              
278             Create a new recurring timer, invoking the callback repeatedly after a given
279             amount of time in seconds.
280              
281             =head2 remove
282              
283             my $bool = $reactor->remove($handle);
284             my $bool = $reactor->remove($id);
285              
286             Remove handle or timer.
287              
288             =head2 reset
289              
290             $reactor->reset;
291              
292             Remove all handles and timers.
293              
294             =head2 stop
295              
296             $reactor->stop;
297              
298             Stop watching for I/O and timer events.
299              
300             =head2 timer
301              
302             my $id = $reactor->timer(0.5 => sub {...});
303              
304             Create a new timer, invoking the callback after a given amount of time in
305             seconds.
306              
307             =head2 watch
308              
309             $reactor = $reactor->watch($handle, $readable, $writable);
310              
311             Change I/O events to watch handle for with true and false values. Note that
312             this method requires an active I/O watcher.
313              
314             # Watch only for readable events
315             $reactor->watch($handle, 1, 0);
316              
317             # Watch only for writable events
318             $reactor->watch($handle, 0, 1);
319              
320             # Watch for readable and writable events
321             $reactor->watch($handle, 1, 1);
322              
323             # Pause watching for events
324             $reactor->watch($handle, 0, 0);
325              
326             =head1 CAVEATS
327              
328             When using L with L, the event loop must be controlled
329             by L or L, such as with the methods
330             L, L, and L. Starting or
331             stopping the event loop through L will not provide required
332             functionality to L applications.
333              
334             Externally-added L notifiers will keep the L loop
335             running if they are added to the event loop as a notifier, see
336             L.
337              
338             =head1 BUGS
339              
340             Report any issues on the public bugtracker.
341              
342             =head1 AUTHOR
343              
344             Dan Book, C
345              
346             =head1 COPYRIGHT AND LICENSE
347              
348             Copyright 2015, Dan Book.
349              
350             This library is free software; you may redistribute it and/or modify it under
351             the terms of the Artistic License version 2.0.
352              
353             =head1 SEE ALSO
354              
355             L, L, L
356              
357             =cut
358              
359             1;