File Coverage

blib/lib/Mojo/Reactor/IOAsync.pm
Criterion Covered Total %
statement 109 109 100.0
branch 28 32 87.5
condition 6 8 75.0
subroutine 25 25 100.0
pod 10 10 100.0
total 178 184 96.7


line stmt bran cond sub pod time code
1             package Mojo::Reactor::IOAsync;
2 1     1   76881 use Mojo::Base 'Mojo::Reactor::Poll';
  1         2  
  1         8  
3              
4             $ENV{MOJO_REACTOR} ||= 'Mojo::Reactor::IOAsync';
5              
6 1     1   142436 use Carp 'croak';
  1         2  
  1         38  
7 1     1   547 use IO::Async::Loop;
  1         21314  
  1         32  
8 1     1   308 use IO::Async::Handle;
  1         13250  
  1         36  
9 1     1   276 use IO::Async::Timer::Countdown;
  1         1182  
  1         26  
10 1     1   5 use Mojo::Util 'md5_sum';
  1         4  
  1         44  
11 1     1   5 use Scalar::Util 'weaken';
  1         2  
  1         47  
12              
13 1   50 1   6 use constant DEBUG => $ENV{MOJO_REACTOR_IOASYNC_DEBUG} || 0;
  1         1  
  1         1181  
14              
15             our $VERSION = '1.000';
16              
17             my $IOAsync;
18              
19             # Use IO::Async::Loop singleton for the first instance only
20             sub new {
21 7     7 1 66731 my $self = shift->SUPER::new;
22 7 100       78 if ($IOAsync++) {
23 4         26 $self->{loop} = IO::Async::Loop->really_new;
24             } else {
25 3         28 $self->{loop} = IO::Async::Loop->new;
26 3         2300 $self->{loop_singleton} = 1;
27             }
28 7         1660 return $self;
29             }
30              
31             sub DESTROY {
32 6     6   4171 my $self = shift;
33 6         19 $self->reset;
34 6 100       248 undef $IOAsync if $self->{loop_singleton};
35             }
36              
37             sub again {
38 7     7 1 378 my ($self, $id) = @_;
39 7 100       294 croak 'Timer not active' unless my $timer = $self->{timers}{$id};
40 6         29 $timer->{watcher}->reset;
41             }
42              
43             sub io {
44 10     10 1 7724 my ($self, $handle, $cb) = @_;
45 10         39 my $fd = fileno $handle;
46 10         42 $self->{io}{$fd}{cb} = $cb;
47 10         22 warn "-- Set IO watcher for $fd\n" if DEBUG;
48 10         41 return $self->watch($handle, 1, 1);
49             }
50              
51             sub one_tick {
52 9133     9133 1 334649 my $self = shift;
53            
54             # Just one tick
55 9133 100       19145 local $self->{running} = 1 unless $self->{running};
56            
57             # Stop automatically if there is nothing to watch
58 9133 100 100     12857 return $self->stop unless keys %{$self->{timers}} || keys %{$self->{io}} || $self->{loop}->notifiers;
  9133   66     22486  
  7         56  
59            
60 9127         23151 $self->{loop}->loop_once;
61             }
62              
63 4     4 1 983 sub recurring { shift->_timer(1, @_) }
64              
65             sub remove {
66 10     10 1 3161 my ($self, $remove) = @_;
67 10 50       40 return !!0 unless defined $remove;
68 10 100       36 if (ref $remove) {
69 5         17 my $fd = fileno $remove;
70 5         19 my $io = delete $self->{io}{$fd};
71 5 100       18 if ($io) {
72 3         7 warn "-- Removed IO watcher for $fd\n" if DEBUG;
73 3 50       29 $io->{watcher}->remove_from_parent if $io->{watcher};
74             }
75 5         846 return !!$io;
76             } else {
77 5         21 my $timer = delete $self->{timers}{$remove};
78 5 100       17 if ($timer) {
79 4         8 warn "-- Removed timer $remove\n" if DEBUG;
80 4 50       27 $timer->{watcher}->remove_from_parent if $timer->{watcher};
81             }
82 5         545 return !!$timer;
83             }
84             }
85              
86             sub reset {
87 8     8 1 32 my $self = shift;
88 8         14 $_->remove_from_parent for
89 6 50       44 map { $_->{watcher} ? ($_->{watcher}) : () }
90 8         26 values %{$self->{io}}, values %{$self->{timers}};
  8         28  
91 8         1321 $self->SUPER::reset;
92             }
93              
94             sub stop {
95 26     26 1 335 my $self = shift;
96 26         67 delete $self->{running};
97 26         120 $self->{loop}->loop_stop;
98             }
99              
100 33     33 1 15134 sub timer { shift->_timer(0, @_) }
101              
102             sub watch {
103 24     24 1 724 my ($self, $handle, $read, $write) = @_;
104            
105 24         69 my $fd = fileno $handle;
106 24 100       253 croak 'I/O watcher not active' unless my $io = $self->{io}{$fd};
107 23 100       76 if (my $w = $io->{watcher}) {
108 14         66 $w->want_readready($read);
109 14         888 $w->want_writeready($write);
110             } else {
111 9         36 weaken $self;
112 9     8367   43 my $on_read = sub { $self->_try('I/O watcher', $self->{io}{$fd}{cb}, 0) };
  8367         674781  
113 9     922   34 my $on_write = sub { $self->_try('I/O watcher', $self->{io}{$fd}{cb}, 1) };
  922         23881  
114 9         72 my $w = $io->{watcher} = IO::Async::Handle->new(
115             handle => $handle,
116             on_read_ready => $on_read,
117             on_write_ready => $on_write,
118             want_readready => $read,
119             want_writeready => $write,
120             );
121 9         2331 $self->{loop}->add($w);
122             }
123            
124 23         4218 return $self;
125             }
126              
127             sub _id {
128 37     37   81 my $self = shift;
129 37         74 my $id;
130 37         74 do { $id = md5_sum 't' . $self->{loop}->time . rand 999 } while $self->{timers}{$id};
  37         210  
131 37         1083 return $id;
132             }
133              
134             sub _timer {
135 37     37   146 my ($self, $recurring, $after, $cb) = @_;
136            
137 37         145 my $id = $self->_id;
138 37         156 weaken $self;
139             my $on_expire = sub {
140 927     927   138748 my $w = shift;
141 927 100       2070 if ($recurring) {
142 900         2288 $w->start;
143             } else {
144 27         124 $w->remove_from_parent;
145 27         2859 delete $self->{timers}{$id};
146             }
147             #warn "-- Event fired for timer $id\n" if DEBUG;
148 927         67430 $self->_try('Timer', $cb);
149 37         213 };
150 37         238 my $w = $self->{timers}{$id}{watcher} = IO::Async::Timer::Countdown->new(
151             delay => $after,
152             on_expire => $on_expire,
153             )->start;
154 37         4345 $self->{loop}->add($w);
155            
156 37         9518 if (DEBUG) {
157             my $is_recurring = $recurring ? ' (recurring)' : '';
158             warn "-- Set timer $id after $after seconds$is_recurring\n";
159             }
160            
161 37         148 return $id;
162             }
163              
164             sub _try {
165 10216     10216   22622 my ($self, $what, $cb) = (shift, shift, shift);
166 10216 100       16546 eval { $self->$cb(@_); 1 } or $self->emit(error => "$what failed: $@");
  10216         26050  
  10215         49454  
167             }
168              
169             =head1 NAME
170              
171             Mojo::Reactor::IOAsync - IO::Async backend for Mojo::Reactor
172              
173             =head1 SYNOPSIS
174              
175             use Mojo::Reactor::IOAsync;
176              
177             # Watch if handle becomes readable or writable
178             my $reactor = Mojo::Reactor::IOAsync->new;
179             $reactor->io($first => sub {
180             my ($reactor, $writable) = @_;
181             say $writable ? 'First handle is writable' : 'First handle is readable';
182             });
183              
184             # Change to watching only if handle becomes writable
185             $reactor->watch($first, 0, 1);
186              
187             # Turn file descriptor into handle and watch if it becomes readable
188             my $second = IO::Handle->new_from_fd($fd, 'r');
189             $reactor->io($second => sub {
190             my ($reactor, $writable) = @_;
191             say $writable ? 'Second handle is writable' : 'Second handle is readable';
192             })->watch($second, 1, 0);
193              
194             # Add a timer
195             $reactor->timer(15 => sub {
196             my $reactor = shift;
197             $reactor->remove($first);
198             $reactor->remove($second);
199             say 'Timeout!';
200             });
201              
202             # Start reactor if necessary
203             $reactor->start unless $reactor->is_running;
204              
205             # Or in an application using Mojo::IOLoop
206             use Mojo::Reactor::IOAsync;
207             use Mojo::IOLoop;
208            
209             # Or in a Mojolicious application
210             $ MOJO_REACTOR=Mojo::Reactor::IOAsync hypnotoad script/myapp
211              
212             =head1 DESCRIPTION
213              
214             L is an event reactor for L that uses
215             L. The usage is exactly the same as other L
216             implementations such as L. L will
217             be used as the default backend for L if it is loaded before
218             L or any module using the loop. However, when invoking a
219             L application through L or L, the reactor must
220             be set as the default by setting the C environment variable to
221             C.
222              
223             =head1 EVENTS
224              
225             L inherits all events from L.
226              
227             =head1 METHODS
228              
229             L inherits all methods from L and
230             implements the following new ones.
231              
232             =head2 new
233              
234             my $reactor = Mojo::Reactor::IOAsync->new;
235              
236             Construct a new L object.
237              
238             =head2 again
239              
240             $reactor->again($id);
241              
242             Restart timer. Note that this method requires an active timer.
243              
244             =head2 io
245              
246             $reactor = $reactor->io($handle => sub {...});
247              
248             Watch handle for I/O events, invoking the callback whenever handle becomes
249             readable or writable.
250              
251             # Callback will be invoked twice if handle becomes readable and writable
252             $reactor->io($handle => sub {
253             my ($reactor, $writable) = @_;
254             say $writable ? 'Handle is writable' : 'Handle is readable';
255             });
256              
257             =head2 one_tick
258              
259             $reactor->one_tick;
260              
261             Run reactor until an event occurs or no events are being watched anymore. See
262             L.
263              
264             # Don't block longer than 0.5 seconds
265             my $id = $reactor->timer(0.5 => sub {});
266             $reactor->one_tick;
267             $reactor->remove($id);
268              
269             =head2 recurring
270              
271             my $id = $reactor->recurring(0.25 => sub {...});
272              
273             Create a new recurring timer, invoking the callback repeatedly after a given
274             amount of time in seconds.
275              
276             =head2 remove
277              
278             my $bool = $reactor->remove($handle);
279             my $bool = $reactor->remove($id);
280              
281             Remove handle or timer.
282              
283             =head2 reset
284              
285             $reactor->reset;
286              
287             Remove all handles and timers.
288              
289             =head2 stop
290              
291             $reactor->stop;
292              
293             Stop watching for I/O and timer events.
294              
295             =head2 timer
296              
297             my $id = $reactor->timer(0.5 => sub {...});
298              
299             Create a new timer, invoking the callback after a given amount of time in
300             seconds.
301              
302             =head2 watch
303              
304             $reactor = $reactor->watch($handle, $readable, $writable);
305              
306             Change I/O events to watch handle for with true and false values. Note that
307             this method requires an active I/O watcher.
308              
309             # Watch only for readable events
310             $reactor->watch($handle, 1, 0);
311              
312             # Watch only for writable events
313             $reactor->watch($handle, 0, 1);
314              
315             # Watch for readable and writable events
316             $reactor->watch($handle, 1, 1);
317              
318             # Pause watching for events
319             $reactor->watch($handle, 0, 0);
320              
321             =head1 CAVEATS
322              
323             When using L with L, the event loop must be controlled
324             by L or L, such as with the methods
325             L, L, and L. Starting or
326             stopping the event loop through L will not provide required
327             functionality to L applications.
328              
329             Externally-added L notifiers will keep the L loop
330             running if they are added to the event loop as a notifier, see
331             L.
332              
333             =head1 BUGS
334              
335             Report any issues on the public bugtracker.
336              
337             =head1 AUTHOR
338              
339             Dan Book, C
340              
341             =head1 COPYRIGHT AND LICENSE
342              
343             Copyright 2015, Dan Book.
344              
345             This library is free software; you may redistribute it and/or modify it under
346             the terms of the Artistic License version 2.0.
347              
348             =head1 SEE ALSO
349              
350             L, L, L
351              
352             =cut
353              
354             1;