File Coverage

blib/lib/Mojo/Reactor/UV.pm
Criterion Covered Total %
statement 111 113 98.2
branch 42 50 84.0
condition 7 11 63.6
subroutine 28 29 96.5
pod 13 13 100.0
total 201 216 93.0


line stmt bran cond sub pod time code
1             package Mojo::Reactor::UV;
2 1     1   837 use Mojo::Base 'Mojo::Reactor';
  1         1  
  1         13  
3              
4             $ENV{MOJO_REACTOR} ||= 'Mojo::Reactor::UV';
5              
6 1     1   177 use Carp 'croak';
  1         1  
  1         56  
7 1     1   14 use Mojo::Reactor::Poll;
  1         2  
  1         9  
8 1     1   21 use Mojo::Util 'md5_sum';
  1         1  
  1         40  
9 1     1   4 use Scalar::Util 'weaken';
  1         2  
  1         37  
10 1     1   477 use UV;
  1         1097  
  1         65  
11              
12 1   50 1   50 use constant DEBUG => $ENV{MOJO_REACTOR_UV_DEBUG} || 0;
  1         2  
  1         1333  
13              
14             our $VERSION = '0.001';
15              
16             my $UV;
17              
18 2     2   532 sub DESTROY { undef $UV }
19              
20             sub again {
21 7     7 1 304 my $self = shift;
22 7 100       340 croak 'Timer not active' unless my $timer = $self->{timers}{shift()};
23 6 50       34 $self->_error if UV::timer_again($timer->{watcher}) < 0;
24             }
25              
26             sub io {
27 10     10 1 3549 my ($self, $handle, $cb) = @_;
28 10         24 my $fd = fileno $handle;
29 10         36 $self->{io}{$fd}{cb} = $cb;
30 10         14 warn "-- Set IO watcher for $fd\n" if DEBUG;
31 10         31 return $self->watch($handle, 1, 1);
32             }
33              
34 10     10 1 3336 sub is_running { !!(shift->{running}) }
35              
36             # We have to fall back to Mojo::Reactor::Poll, since UV is unique
37 6 100   6 1 77639 sub new { $UV++ ? Mojo::Reactor::Poll->new : shift->SUPER::new }
38              
39             sub next_tick {
40 15     15 1 872 my ($self, $cb) = @_;
41 15         14 push @{$self->{next_tick}}, $cb;
  15         26  
42 15   66     41 $self->{next_timer} //= $self->timer(0 => \&_next);
43 15         25 return undef;
44             }
45              
46             sub one_tick {
47 55860     55860 1 46106 my $self = shift;
48             # Just one tick
49 55860 100       83466 local $self->{running} = 1 unless $self->{running};
50 55860 100       203327 UV::run(UV::RUN_ONCE) or $self->stop;
51             }
52              
53 3     3 1 630 sub recurring { shift->_timer(1, @_) }
54              
55             sub remove {
56 34     34 1 3381 my ($self, $remove) = @_;
57 34 50       90 return unless defined $remove;
58 34 100       89 if (ref $remove) {
59 5         16 my $fd = fileno $remove;
60 5 100       24 if (exists $self->{io}{$fd}) {
61 3         4 warn "-- Removed IO watcher for $fd\n" if DEBUG;
62 3         12 my $w = delete $self->{io}{$fd}{watcher};
63 3 50       52 UV::close($w) if $w;
64             }
65 5         45 return !!delete $self->{io}{$fd};
66             } else {
67 29 100       110 if (exists $self->{timers}{$remove}) {
68 28         23 warn "-- Removed timer $remove\n" if DEBUG;
69 28         98 my $w = delete $self->{timers}{$remove}{watcher};
70 28 50       201 UV::close($w) if $w;
71             }
72 29         133 return !!delete $self->{timers}{$remove};
73             }
74             }
75              
76             sub reset {
77 2     2 1 4 my $self = shift;
78 2     5   15 UV::walk(sub { UV::close($_[0]) });
  5         14  
79 2         7 delete @{$self}{qw(io next_tick next_timer timers)};
  2         12  
80             }
81              
82             sub start {
83 22     22 1 90 my $self = shift;
84 22         50 $self->{running}++;
85 22         86 $self->one_tick while $self->{running};
86             }
87              
88 25     25 1 223 sub stop { delete shift->{running} }
89              
90 30     30 1 62597 sub timer { shift->_timer(0, @_) }
91              
92             sub watch {
93 24     24 1 243 my ($self, $handle, $read, $write) = @_;
94            
95 24         38 my $fd = fileno $handle;
96 24 100       284 croak 'I/O watcher not active' unless my $io = $self->{io}{$fd};
97            
98 23         26 my $mode = 0;
99 23 100       44 $mode |= UV::READABLE if $read;
100 23 100       45 $mode |= UV::WRITABLE if $write;
101            
102 23         23 my $w;
103 23 100       56 unless ($w = $io->{watcher}) { $w = $io->{watcher} = UV::poll_init($fd); }
  9         68  
104            
105 23 50       48 if ($mode == 0) { $self->_error if UV::poll_stop($w) < 0; }
  2 100       13  
106             else {
107 21         47 weaken $self;
108             my $cb = sub {
109 55807     55807   48909 my ($status, $events) = @_;
110 55807 50       79862 return $self->_error if $status < 0;
111 55807 100       131173 $self->_try('I/O watcher', $self->{io}{$fd}{cb}, 0)
112             if UV::READABLE & $events;
113 55807 100 66     246391 $self->_try('I/O watcher', $self->{io}{$fd}{cb}, 1)
114             if UV::WRITABLE & $events && $self->{io}{$fd};
115 21         92 };
116 21 50       138 $self->_error if UV::poll_start($w, $mode, $cb) < 0;
117             }
118            
119 23         63 return $self;
120             }
121              
122             sub _error {
123 0     0   0 my $self = shift;
124 0         0 $self->emit(error => sprintf "UV error: %s", UV::strerror(UV::last_error()));
125             }
126              
127             sub _id {
128 33     33   43 my $self = shift;
129 33         34 my $id;
130 33         37 do { $id = md5_sum 't' . UV::now() . rand 999 } while $self->{timers}{$id};
  33         655  
131 33         340 return $id;
132             }
133              
134             sub _next {
135 5     5   17 my $self = shift;
136 5         9 delete $self->{next_timer};
137 5         7 while (my $cb = shift @{$self->{next_tick}}) { $self->$cb }
  19         85  
  14         27  
138             }
139              
140             sub _timer {
141 33     33   64 my ($self, $recurring, $after, $cb) = @_;
142 33         61 $after *= 1000; # Intervals in milliseconds
143 33         40 my $recur_after = $after;
144             # Timer will not repeat with (integer) interval of 0
145 33 100 66     99 $recur_after = 1 if $recurring and $after < 1;
146            
147 33         76 my $id = $self->_id;
148 33         85 weaken $self;
149             my $wrapper = sub {
150 146 100   146   351 $self->remove($id) unless $recurring;
151 146         382 $self->_try('Timer', $cb);
152 33         136 };
153 33         268 my $w = $self->{timers}{$id}{watcher} = UV::timer_init();
154 33 50       144 $self->_error if UV::timer_start($w, $after, $recur_after, $wrapper) < 0;
155            
156 33         40 if (DEBUG) {
157             my $is_recurring = $recurring ? ' (recurring)' : '';
158             my $seconds = $after / 1000;
159             warn "-- Set timer $id after $seconds seconds$is_recurring\n";
160             }
161            
162 33         94 return $id;
163             }
164              
165             sub _try {
166 60507     60507   61753 my ($self, $what, $cb) = (shift, shift, shift);
167 60507 100       49654 eval { $self->$cb(@_); 1 } or $self->emit(error => "$what failed: $@");
  60507         77723  
  60506         255959  
168             }
169              
170             1;
171              
172             =encoding utf8
173              
174             =head1 NAME
175              
176             Mojo::Reactor::UV - UV backend for Mojo::Reactor
177              
178             =head1 SYNOPSIS
179              
180             use Mojo::Reactor::UV;
181              
182             # Watch if handle becomes readable or writable
183             my $reactor = Mojo::Reactor::UV->new;
184             $reactor->io($first => sub {
185             my ($reactor, $writable) = @_;
186             say $writable ? 'First handle is writable' : 'First handle is readable';
187             });
188              
189             # Change to watching only if handle becomes writable
190             $reactor->watch($first, 0, 1);
191              
192             # Turn file descriptor into handle and watch if it becomes readable
193             my $second = IO::Handle->new_from_fd($fd, 'r');
194             $reactor->io($second => sub {
195             my ($reactor, $writable) = @_;
196             say $writable ? 'Second handle is writable' : 'Second handle is readable';
197             })->watch($second, 1, 0);
198              
199             # Add a timer
200             $reactor->timer(15 => sub {
201             my $reactor = shift;
202             $reactor->remove($first);
203             $reactor->remove($second);
204             say 'Timeout!';
205             });
206              
207             # Start reactor if necessary
208             $reactor->start unless $reactor->is_running;
209              
210             # Or in an application using Mojo::IOLoop
211             use Mojo::Reactor::UV;
212             use Mojo::IOLoop;
213              
214             # Or in a Mojolicious application
215             $ MOJO_REACTOR=Mojo::Reactor::UV hypnotoad script/myapp
216              
217             =head1 DESCRIPTION
218              
219             L is an event reactor for L that uses
220             C. The usage is exactly the same as other L
221             implementations such as L. L will be
222             used as the default backend for L if it is loaded before
223             L or any module using the loop. However, when invoking a
224             L application through L or L, the reactor must
225             be set as the default by setting the C environment variable to
226             C.
227              
228             =head1 EVENTS
229              
230             L inherits all events from L.
231              
232             =head1 METHODS
233              
234             L inherits all methods from L and implements
235             the following new ones.
236              
237             =head2 again
238              
239             $reactor->again($id);
240              
241             Restart timer. Note that this method requires an active timer.
242              
243             =head2 io
244              
245             $reactor = $reactor->io($handle => sub {...});
246              
247             Watch handle for I/O events, invoking the callback whenever handle becomes
248             readable or writable.
249              
250             # Callback will be invoked twice if handle becomes readable and writable
251             $reactor->io($handle => sub {
252             my ($reactor, $writable) = @_;
253             say $writable ? 'Handle is writable' : 'Handle is readable';
254             });
255              
256             =head2 is_running
257              
258             my $bool = $reactor->is_running;
259              
260             Check if reactor is running.
261              
262             =head2 new
263              
264             my $reactor = Mojo::Reactor::UV->new;
265              
266             Construct a new L object.
267              
268             =head2 next_tick
269              
270             my $undef = $reactor->next_tick(sub {...});
271              
272             Invoke callback as soon as possible, but not before returning or other
273             callbacks that have been registered with this method, always returns C.
274              
275             =head2 one_tick
276              
277             $reactor->one_tick;
278              
279             Run reactor until an event occurs or no events are being watched anymore. Note
280             that this method can recurse back into the reactor, so you need to be careful.
281              
282             # Don't block longer than 0.5 seconds
283             my $id = $reactor->timer(0.5 => sub {});
284             $reactor->one_tick;
285             $reactor->remove($id);
286              
287             =head2 recurring
288              
289             my $id = $reactor->recurring(0.25 => sub {...});
290              
291             Create a new recurring timer, invoking the callback repeatedly after a given
292             amount of time in seconds.
293              
294             =head2 remove
295              
296             my $bool = $reactor->remove($handle);
297             my $bool = $reactor->remove($id);
298              
299             Remove handle or timer.
300              
301             =head2 reset
302              
303             $reactor->reset;
304              
305             Remove all handles and timers.
306              
307             =head2 start
308              
309             $reactor->start;
310              
311             Start watching for I/O and timer events, this will block until L is
312             called or no events are being watched anymore.
313              
314             # Start reactor only if it is not running already
315             $reactor->start unless $reactor->is_running;
316              
317             =head2 stop
318              
319             $reactor->stop;
320              
321             Stop watching for I/O and timer events.
322              
323             =head2 timer
324              
325             my $id = $reactor->timer(0.5 => sub {...});
326              
327             Create a new timer, invoking the callback after a given amount of time in
328             seconds.
329              
330             =head2 watch
331              
332             $reactor = $reactor->watch($handle, $readable, $writable);
333              
334             Change I/O events to watch handle for with true and false values. Note that
335             this method requires an active I/O watcher.
336              
337             # Watch only for readable events
338             $reactor->watch($handle, 1, 0);
339              
340             # Watch only for writable events
341             $reactor->watch($handle, 0, 1);
342              
343             # Watch for readable and writable events
344             $reactor->watch($handle, 1, 1);
345              
346             # Pause watching for events
347             $reactor->watch($handle, 0, 0);
348              
349             =head1 CAVEATS
350              
351             When using L with L, the event loop must be controlled by
352             L or L, such as with the methods L,
353             L, and L. Starting or stopping the event loop through
354             L will not provide required functionality to L applications.
355              
356             Care should be taken that file descriptors are not closed while being watched
357             by the reactor. They can be safely closed after calling L with
358             C and C set to 0, or after removing the handle with
359             L or L.
360              
361             On windows, C can only watch sockets, not regular filehandles.
362              
363             =head1 BUGS
364              
365             Report any issues on the public bugtracker.
366              
367             =head1 AUTHOR
368              
369             Dan Book, C
370              
371             =head1 COPYRIGHT AND LICENSE
372              
373             Copyright 2015, Dan Book.
374              
375             This library is free software; you may redistribute it and/or modify it under
376             the terms of the Artistic License version 2.0.
377              
378             =head1 SEE ALSO
379              
380             L, L, L