File Coverage

blib/lib/Mojo/Reactor/IOAsync.pm
Criterion Covered Total %
statement 112 112 100.0
branch 30 34 88.2
condition 8 14 57.1
subroutine 25 25 100.0
pod 10 10 100.0
total 185 195 94.8


line stmt bran cond sub pod time code
1             package Mojo::Reactor::IOAsync;
2 1     1   238111 use Mojo::Base 'Mojo::Reactor::Poll';
  1         10  
  1         7  
3              
4             $ENV{MOJO_REACTOR} ||= 'Mojo::Reactor::IOAsync';
5              
6 1     1   36564 use Carp 'croak';
  1         2  
  1         42  
7 1     1   861 use IO::Async::Loop;
  1         20461  
  1         41  
8 1     1   642 use IO::Async::Handle;
  1         17758  
  1         43  
9 1     1   561 use IO::Async::Timer::Countdown;
  1         1492  
  1         29  
10 1     1   7 use Mojo::Util qw(md5_sum steady_time);
  1         2  
  1         62  
11 1     1   7 use Scalar::Util 'weaken';
  1         2  
  1         55  
12              
13 1   50 1   5 use constant DEBUG => $ENV{MOJO_REACTOR_IOASYNC_DEBUG} || 0;
  1         2  
  1         1385  
14              
15             our $VERSION = '1.001';
16              
17             my $IOAsync;
18              
19             # Use IO::Async::Loop singleton for the first instance only
20             sub new {
21 7     7 1 76603 my $self = shift->SUPER::new;
22 7 100       62 if ($IOAsync++) {
23 4         17 $self->{loop} = IO::Async::Loop->really_new;
24             } else {
25 3         24 $self->{loop} = IO::Async::Loop->new;
26 3         2563 $self->{loop_singleton} = 1;
27             }
28 7         381 return $self;
29             }
30              
31             sub DESTROY {
32 6     6   13566 my $self = shift;
33 6         34 $self->reset;
34 6 100       111 undef $IOAsync if $self->{loop_singleton};
35             }
36              
37             sub again {
38 9     9 1 452 my ($self, $id, $after) = @_;
39 9 100       238 croak 'Timer not active' unless my $timer = $self->{timers}{$id};
40 8         14 my $w = $timer->{watcher};
41 8         23 $w->stop;
42 8 100       720 $w->configure(delay => $after) if defined $after;
43 8         92 $w->start;
44             }
45              
46             sub io {
47 10     10 1 5268 my ($self, $handle, $cb) = @_;
48 10   33     53 my $fd = fileno($handle) // croak 'Handle is closed';
49 10         32 $self->{io}{$fd}{cb} = $cb;
50 10         17 warn "-- Set IO watcher for $fd\n" if DEBUG;
51 10         30 return $self->watch($handle, 1, 1);
52             }
53              
54             sub one_tick {
55 9396     9396 1 354247 my $self = shift;
56            
57             # Stop automatically if there is nothing to watch
58 9396 100 100     10249 return $self->stop unless keys %{$self->{timers}} || keys %{$self->{io}} || $self->{loop}->notifiers;
  9396   66     19575  
  7         45  
59            
60             # Just one tick
61 9390 100       15641 local $self->{running} = 1 unless $self->{running};
62            
63 9390         15686 $self->{loop}->loop_once;
64             }
65              
66 5     5 1 525 sub recurring { shift->_timer(1, @_) }
67              
68             sub remove {
69 11     11 1 2796 my ($self, $remove) = @_;
70 11 50       46 return !!0 unless defined $remove;
71 11 100       33 if (ref $remove) {
72 5   33     21 my $fd = fileno($remove) // croak 'Handle is closed';
73 5         30 my $io = delete $self->{io}{$fd};
74 5 100       16 if ($io) {
75 3         6 warn "-- Removed IO watcher for $fd\n" if DEBUG;
76 3 50       28 $io->{watcher}->remove_from_parent if $io->{watcher};
77             }
78 5         661 return !!$io;
79             } else {
80 6         19 my $timer = delete $self->{timers}{$remove};
81 6 100       15 if ($timer) {
82 5         8 warn "-- Removed timer $remove\n" if DEBUG;
83 5 50       23 $timer->{watcher}->remove_from_parent if $timer->{watcher};
84             }
85 6         754 return !!$timer;
86             }
87             }
88              
89             sub reset {
90 10     10 1 1475 my $self = shift;
91 10         19 $_->remove_from_parent for
92 7 50       37 map { $_->{watcher} ? ($_->{watcher}) : () }
93 10         33 values %{$self->{io}}, values %{$self->{timers}};
  10         29  
94 10         985 $self->SUPER::reset;
95             }
96              
97             sub stop {
98 26     26 1 244 my $self = shift;
99 26         47 delete $self->{running};
100 26         83 $self->{loop}->loop_stop;
101             }
102              
103 35     35 1 12068 sub timer { shift->_timer(0, @_) }
104              
105             sub watch {
106 25     25 1 446 my ($self, $handle, $read, $write) = @_;
107            
108 25         47 my $fd = fileno $handle;
109 25 100       166 croak 'I/O watcher not active' unless my $io = $self->{io}{$fd};
110 24 100       58 if (my $w = $io->{watcher}) {
111 15         52 $w->want_readready($read);
112 15         649 $w->want_writeready($write);
113             } else {
114 9         28 weaken $self;
115 9     8830   25 my $on_read = sub { $self->_try('I/O watcher', $self->{io}{$fd}{cb}, 0) };
  8830         672232  
116 9     939   25 my $on_write = sub { $self->_try('I/O watcher', $self->{io}{$fd}{cb}, 1) };
  939         21809  
117 9         60 my $w = $io->{watcher} = IO::Async::Handle->new(
118             handle => $handle,
119             on_read_ready => $on_read,
120             on_write_ready => $on_write,
121             want_readready => $read,
122             want_writeready => $write,
123             );
124 9         1547 $self->{loop}->add($w);
125             }
126            
127 24         3217 return $self;
128             }
129              
130             sub _id {
131 40     40   62 my $self = shift;
132 40         54 my $id;
133 40         63 do { $id = md5_sum 't' . steady_time . rand } while $self->{timers}{$id};
  40         167  
134 40         912 return $id;
135             }
136              
137             sub _timer {
138 40     40   103 my ($self, $recurring, $after, $cb) = @_;
139            
140 40         113 my $id = $self->_id;
141 40         119 weaken $self;
142             my $on_expire = sub {
143 607     607   314617 my $w = shift;
144 607 100       967 if ($recurring) {
145 578         1160 $w->start;
146             } else {
147 29         113 $w->remove_from_parent;
148 29         2431 delete $self->{timers}{$id};
149             }
150             #warn "-- Event fired for timer $id\n" if DEBUG;
151 607         57158 $self->_try('Timer', $cb);
152 40         151 };
153 40         187 my $w = $self->{timers}{$id}{watcher} = IO::Async::Timer::Countdown->new(
154             delay => $after,
155             on_expire => $on_expire,
156             )->start;
157 40         3384 $self->{loop}->add($w);
158            
159 40         11572 if (DEBUG) {
160             my $is_recurring = $recurring ? ' (recurring)' : '';
161             warn "-- Set timer $id after $after seconds$is_recurring\n";
162             }
163            
164 40         109 return $id;
165             }
166              
167             sub _try {
168 10376     10376   17827 my ($self, $what, $cb) = (shift, shift, shift);
169 10376 100       12270 eval { $self->$cb(@_); 1 } or $self->emit(error => "$what failed: $@");
  10376         23151  
  10375         37983  
170             }
171              
172             =head1 NAME
173              
174             Mojo::Reactor::IOAsync - IO::Async backend for Mojo::Reactor
175              
176             =head1 SYNOPSIS
177              
178             use Mojo::Reactor::IOAsync;
179              
180             # Watch if handle becomes readable or writable
181             my $reactor = Mojo::Reactor::IOAsync->new;
182             $reactor->io($first => sub {
183             my ($reactor, $writable) = @_;
184             say $writable ? 'First handle is writable' : 'First handle is readable';
185             });
186              
187             # Change to watching only if handle becomes writable
188             $reactor->watch($first, 0, 1);
189              
190             # Turn file descriptor into handle and watch if it becomes readable
191             my $second = IO::Handle->new_from_fd($fd, 'r');
192             $reactor->io($second => sub {
193             my ($reactor, $writable) = @_;
194             say $writable ? 'Second handle is writable' : 'Second handle is readable';
195             })->watch($second, 1, 0);
196              
197             # Add a timer
198             $reactor->timer(15 => sub {
199             my $reactor = shift;
200             $reactor->remove($first);
201             $reactor->remove($second);
202             say 'Timeout!';
203             });
204              
205             # Start reactor if necessary
206             $reactor->start unless $reactor->is_running;
207              
208             # Or in an application using Mojo::IOLoop
209             use Mojo::Reactor::IOAsync;
210             use Mojo::IOLoop;
211            
212             # Or in a Mojolicious application
213             $ MOJO_REACTOR=Mojo::Reactor::IOAsync hypnotoad script/myapp
214              
215             =head1 DESCRIPTION
216              
217             L is an event reactor for L that uses
218             L. The usage is exactly the same as other L
219             implementations such as L. L will
220             be used as the default backend for L if it is loaded before
221             L or any module using the loop. However, when invoking a
222             L application through L or L, the reactor must
223             be set as the default by setting the C environment variable to
224             C.
225              
226             =head1 EVENTS
227              
228             L inherits all events from L.
229              
230             =head1 METHODS
231              
232             L inherits all methods from L and
233             implements the following new ones.
234              
235             =head2 new
236              
237             my $reactor = Mojo::Reactor::IOAsync->new;
238              
239             Construct a new L object.
240              
241             =head2 again
242              
243             $reactor->again($id);
244             $reactor->again($id, 0.5);
245              
246             Restart timer and optionally change the invocation time. Note that this method
247             requires an active timer.
248              
249             =head2 io
250              
251             $reactor = $reactor->io($handle => sub {...});
252              
253             Watch handle for I/O events, invoking the callback whenever handle becomes
254             readable or writable.
255              
256             # Callback will be invoked twice if handle becomes readable and writable
257             $reactor->io($handle => sub {
258             my ($reactor, $writable) = @_;
259             say $writable ? 'Handle is writable' : 'Handle is readable';
260             });
261              
262             =head2 one_tick
263              
264             $reactor->one_tick;
265              
266             Run reactor until an event occurs or no events are being watched anymore. See
267             L.
268              
269             # Don't block longer than 0.5 seconds
270             my $id = $reactor->timer(0.5 => sub {});
271             $reactor->one_tick;
272             $reactor->remove($id);
273              
274             =head2 recurring
275              
276             my $id = $reactor->recurring(0.25 => sub {...});
277              
278             Create a new recurring timer, invoking the callback repeatedly after a given
279             amount of time in seconds.
280              
281             =head2 remove
282              
283             my $bool = $reactor->remove($handle);
284             my $bool = $reactor->remove($id);
285              
286             Remove handle or timer.
287              
288             =head2 reset
289              
290             $reactor->reset;
291              
292             Remove all handles and timers.
293              
294             =head2 stop
295              
296             $reactor->stop;
297              
298             Stop watching for I/O and timer events.
299              
300             =head2 timer
301              
302             my $id = $reactor->timer(0.5 => sub {...});
303              
304             Create a new timer, invoking the callback after a given amount of time in
305             seconds.
306              
307             =head2 watch
308              
309             $reactor = $reactor->watch($handle, $readable, $writable);
310              
311             Change I/O events to watch handle for with true and false values. Note that
312             this method requires an active I/O watcher.
313              
314             # Watch only for readable events
315             $reactor->watch($handle, 1, 0);
316              
317             # Watch only for writable events
318             $reactor->watch($handle, 0, 1);
319              
320             # Watch for readable and writable events
321             $reactor->watch($handle, 1, 1);
322              
323             # Pause watching for events
324             $reactor->watch($handle, 0, 0);
325              
326             =head1 CAVEATS
327              
328             When using L with L, the event loop must be controlled
329             by L or L, such as with the methods
330             L, L, and L. Starting or
331             stopping the event loop through L will not provide required
332             functionality to L applications.
333              
334             Externally-added L notifiers will keep the L loop
335             running if they are added to the event loop as a notifier, see
336             L.
337              
338             =head1 BUGS
339              
340             Report any issues on the public bugtracker.
341              
342             =head1 AUTHOR
343              
344             Dan Book, C
345              
346             =head1 COPYRIGHT AND LICENSE
347              
348             Copyright 2015, Dan Book.
349              
350             This library is free software; you may redistribute it and/or modify it under
351             the terms of the Artistic License version 2.0.
352              
353             =head1 SEE ALSO
354              
355             L, L, L
356              
357             =cut
358              
359             1;