File Coverage

blib/lib/Mojo/Reactor/POE.pm
Criterion Covered Total %
statement 158 159 99.3
branch 44 56 78.5
condition 12 28 42.8
subroutine 33 33 100.0
pod 8 8 100.0
total 255 284 89.7


line stmt bran cond sub pod time code
1             package Mojo::Reactor::POE;
2              
3 1     1   284666 use POE; # Loaded early to avoid event loop confusion
  1         18358  
  1         6  
4 1     1   64803 BEGIN { POE::Kernel->run } # silence run() warning
5              
6 1     1   1947 use Mojo::Base 'Mojo::Reactor::Poll';
  1         3  
  1         9  
7              
8             $ENV{MOJO_REACTOR} ||= 'Mojo::Reactor::POE';
9              
10 1     1   43437 use Carp 'croak';
  1         3  
  1         49  
11 1     1   6 use Mojo::Util qw(md5_sum steady_time);
  1         3  
  1         43  
12 1     1   6 use Scalar::Util 'weaken';
  1         2  
  1         42  
13              
14 1     1   7 use constant { POE_IO_READ => 0, POE_IO_WRITE => 1 };
  1         2  
  1         90  
15 1   50 1   6 use constant DEBUG => $ENV{MOJO_REACTOR_POE_DEBUG} || 0;
  1         2  
  1         2899  
16              
17             our $VERSION = '1.001';
18              
19             my $POE;
20              
21             # We have to fall back to Mojo::Reactor::Poll, since POE::Kernel is unique
22 6 100   6 1 103159 sub new { $POE++ ? Mojo::Reactor::Poll->new : shift->SUPER::new }
23              
24             sub DESTROY {
25 2     2   1118 my $self = shift;
26 2         8 $self->reset; # Close session
27 2         19 undef $POE;
28             }
29              
30             sub again {
31 9     9 1 339 my ($self, $id, $after) = @_;
32 9 100       269 croak 'Timer not active' unless my $timer = $self->{timers}{$id};
33 8 100       21 $timer->{after} = $after if defined $after;
34 8         29 $timer->{time} = steady_time + $timer->{after};
35             # If session doesn't exist, the time will be set when it starts
36 8 50       83 $self->_session_call(mojo_adjust_timer => $id) if $self->_session_exists;
37             }
38              
39             sub one_tick {
40 6008     6008 1 514197 my $self = shift;
41 6008         14189 $self->_init_session;
42            
43             # Stop automatically if there is nothing to watch
44 6008 100 100     7874 return $self->stop unless keys %{$self->{timers}} || keys %{$self->{io}};
  6008         14661  
  6         37  
45            
46             # Just one tick
47 6003 100       11732 local $self->{running} = 1 unless $self->{running};
48            
49 6003         14330 POE::Kernel->run_one_timeslice;
50             }
51              
52 4     4 1 649 sub recurring { shift->_timer(1, @_) }
53              
54             sub remove {
55 11     11 1 2989 my ($self, $remove) = @_;
56 11 50       38 return unless defined $remove;
57 11 100       50 if (ref $remove) {
58 5   33     27 my $fileno = fileno($remove) // croak 'Handle is closed';
59 5 100       21 if (exists $self->{io}{$fileno}) {
60 3         4 warn "-- Removed IO watcher for $fileno\n" if DEBUG;
61             # If session doesn't exist, the watcher won't be re-added
62 3 50       12 $self->_session_call(mojo_clear_io => $fileno) if $self->_session_exists;
63             }
64 5         42 return !!delete $self->{io}{$fileno};
65             } else {
66 6 100       18 if (exists $self->{timers}{$remove}) {
67 5         7 warn "-- Removed timer $remove\n" if DEBUG;
68             # If session doesn't exist, the timer won't be re-added
69 5 50       12 $self->_session_call(mojo_clear_timer => $remove) if $self->_session_exists;
70             }
71 6         38 return !!delete $self->{timers}{$remove};
72             }
73             }
74              
75             sub reset {
76 6     6 1 27487 my $self = shift;
77             # If session doesn't exist, watchers won't be re-added
78 6 100       18 if ($self->_session_exists) {
79 3         10 $self->_session_call('mojo_clear_timers');
80 3         6 $self->_session_call(mojo_clear_io => $_) for keys %{$self->{io}};
  3         30  
81             }
82 6         30 $self->SUPER::reset;
83             }
84              
85 32     32 1 35948 sub timer { shift->_timer(0, @_) }
86              
87             sub watch {
88 25     25 1 5972 my ($self, $handle, $read, $write) = @_;
89              
90 25         49 my $fileno = fileno $handle;
91 25 100       173 croak 'I/O watcher not active' unless my $io = $self->{io}{$fileno};
92 24         44 $io->{handle} = $handle;
93 24         44 $io->{read} = $read;
94 24         73 $io->{write} = $write;
95            
96 24         28 warn "-- Set IO watcher for $fileno\n" if DEBUG;
97            
98 24         57 $self->_init_session->_session_call(mojo_set_io => $fileno);
99            
100 24         83 return $self;
101             }
102              
103             sub _id {
104 36     36   69 my $self = shift;
105 36         52 my $id;
106 36         52 do { $id = md5_sum 't' . steady_time . rand } while $self->{timers}{$id};
  36         129  
107 36         850 return $id;
108             }
109              
110             sub _timer {
111 36     36   105 my ($self, $recurring, $after, $cb) = @_;
112            
113 36         109 my $id = $self->_id;
114 36         91 my $timer = $self->{timers}{$id} = {
115             cb => $cb,
116             after => $after,
117             recurring => $recurring,
118             time => steady_time + $after,
119             };
120            
121 36         321 if (DEBUG) {
122             my $is_recurring = $recurring ? ' (recurring)' : '';
123             warn "-- Set timer $id after $after seconds$is_recurring\n";
124             }
125            
126 36         91 $self->_init_session->_session_call(mojo_set_timer => $id);
127            
128 36         109 return $id;
129             }
130              
131             sub _try {
132 6632     6632   12154 my ($self, $what, $cb) = (shift, shift, shift);
133 6632 100       9315 eval { $self->$cb(@_); 1 } or $self->emit(error => "$what failed: $@");
  6632         15926  
  6631         30248  
134             }
135              
136             sub _init_session {
137 6419     6419   9101 my $self = shift;
138 6419 100       11146 unless ($self->_session_exists) {
139 9         142 my $session = POE::Session->create(
140             inline_states => {
141             _start => \&_event_start,
142             _stop => \&_event_stop,
143             mojo_set_timer => \&_event_set_timer,
144             mojo_clear_timer => \&_event_clear_timer,
145             mojo_adjust_timer => \&_event_adjust_timer,
146             mojo_clear_timers => \&_event_clear_timers,
147             mojo_set_io => \&_event_set_io,
148             mojo_clear_io => \&_event_clear_io,
149             mojo_timer => \&_event_timer,
150             mojo_io => \&_event_io,
151             },
152             heap => { mojo_reactor => $self },
153             );
154 9         1186 weaken $session->get_heap()->{mojo_reactor};
155 9         59 $self->{session_id} = $session->ID;
156             }
157 6419         10381 return $self;
158             }
159              
160             sub _session_exists {
161 6441     6441   8844 my $self = shift;
162 6441         16644 return defined $self->{session_id};
163             }
164              
165             sub _session_call {
166 433     433   557 my $self = shift;
167 433 50       783 croak 'Session call on nonexistent session' unless defined $self->{session_id};
168 433         1401 POE::Kernel->call($self->{session_id}, @_);
169 433         3046 return $self;
170             }
171              
172             sub _event_start {
173 9     9   3209 my $self = $_[HEAP]{mojo_reactor};
174 9         17 my $session = $_[SESSION];
175            
176 9         30 warn "-- POE session started\n" if DEBUG;
177             }
178              
179             sub _event_stop {
180 8     8   1065 my $self = $_[HEAP]{mojo_reactor};
181            
182 8         11 warn "-- POE session stopped\n" if DEBUG;
183            
184 8 50       38 delete $self->{session_id} if $self;
185             }
186              
187             sub _event_set_timer {
188 387     387   13098 my $self = $_[HEAP]{mojo_reactor};
189 387         506 my $id = $_[ARG0];
190             return unless exists $self->{timers}{$id}
191 387 50 33     1359 and defined $self->{timers}{$id}{time};
192 387         540 my $timer = $self->{timers}{$id};
193 387         662 my $delay_time = $timer->{time} - steady_time;
194 387         1991 my $poe_id = POE::Kernel->delay_set(mojo_timer => $delay_time, $id);
195 387         21752 $timer->{poe_id} = $poe_id;
196            
197 387         756 warn "-- Set POE timer $poe_id in $delay_time seconds\n" if DEBUG;
198             }
199              
200             sub _event_clear_timer {
201 5     5   226 my $self = $_[HEAP]{mojo_reactor};
202 5         8 my $id = $_[ARG0];
203             return unless exists $self->{timers}{$id}
204 5 50 33     31 and defined $self->{timers}{$id}{poe_id};
205 5         6 my $timer = $self->{timers}{$id};
206 5         26 POE::Kernel->alarm_remove($timer->{poe_id});
207            
208 5         442 warn "-- Cleared POE timer $timer->{poe_id}\n" if DEBUG;
209             }
210              
211             sub _event_adjust_timer {
212 8     8   349 my $self = $_[HEAP]{mojo_reactor};
213 8         13 my $id = $_[ARG0];
214             return unless exists $self->{timers}{$id}
215             and defined $self->{timers}{$id}{time}
216 8 50 33     58 and defined $self->{timers}{$id}{poe_id};
      33        
217 8         16 my $timer = $self->{timers}{$id};
218 8         21 my $new_delay = $timer->{time} - steady_time;
219 8         59 POE::Kernel->delay_adjust($timer->{poe_id}, $new_delay);
220            
221 8         870 warn "-- Adjusted POE timer $timer->{poe_id} to $new_delay seconds\n"
222             if DEBUG;
223             }
224              
225             sub _event_clear_timers {
226 3   50 3   196 my $self = $_[HEAP]{mojo_reactor} // return;
227 3         12 POE::Kernel->alarm_remove_all;
228            
229 3         279 warn "-- Cleared all POE timers\n" if DEBUG;
230             }
231              
232             sub _event_set_io {
233 24     24   1225 my $self = $_[HEAP]{mojo_reactor};
234 24         39 my $fd = $_[ARG0];
235             return unless exists $self->{io}{$fd}
236 24 50 33     124 and defined $self->{io}{$fd}{handle};
237 24         44 my $io = $self->{io}{$fd};
238 24 100       55 if ($io->{read}) {
239 19         55 POE::Kernel->select_read($io->{handle}, 'mojo_io');
240             } else {
241 5         15 POE::Kernel->select_read($io->{handle});
242             }
243 24 100       2028 if ($io->{write}) {
244 17         51 POE::Kernel->select_write($io->{handle}, 'mojo_io');
245             } else {
246 7         22 POE::Kernel->select_write($io->{handle});
247             }
248            
249 24         1328 warn "-- Set POE IO watcher for $fd " .
250             "with read: $io->{read}, write: $io->{write}\n" if DEBUG;
251             }
252              
253             sub _event_clear_io {
254 6     6   392 my $self = $_[HEAP]{mojo_reactor};
255 6         14 my $fd = $_[ARG0];
256             return unless exists $self->{io}{$fd}
257 6 50 33     35 and defined $self->{io}{$fd}{handle};
258 6         13 my $io = $self->{io}{$fd};
259 6         27 POE::Kernel->select_read($io->{handle});
260 6         456 POE::Kernel->select_write($io->{handle});
261 6         604 delete $io->{handle};
262            
263 6         19 warn "-- Cleared POE IO watcher for $fd\n" if DEBUG;
264             }
265              
266             sub _event_timer {
267 377     377   32937 my $self = $_[HEAP]{mojo_reactor};
268 377         542 my $id = $_[ARG0];
269            
270 377         570 my $timer = $self->{timers}{$id};
271 377         406 warn "-- Event fired for timer $id\n" if DEBUG;
272 377 100       659 if ($timer->{recurring}) {
273 351         778 $timer->{time} = steady_time + $timer->{after};
274 351         2126 $self->_init_session->_session_call(mojo_set_timer => $id);
275             } else {
276 26         75 delete $self->{timers}{$id};
277             }
278            
279 377         732 $self->_try('Timer', $timer->{cb});
280             }
281              
282             sub _event_io {
283 6255     6255   673754 my $self = $_[HEAP]{mojo_reactor};
284 6255         12430 my ($handle, $mode) = @_[ARG0, ARG1];
285            
286 6255         16085 my $io = $self->{io}{fileno $handle};
287             #warn "-- Event fired for IO watcher ".fileno($handle)."\n" if DEBUG;
288 6255 100       11791 if ($mode == POE_IO_READ) {
    50          
289 5644         13361 $self->_try('I/O watcher', $io->{cb}, 0);
290             } elsif ($mode == POE_IO_WRITE) {
291 611         1037 $self->_try('I/O watcher', $io->{cb}, 1);
292             } else {
293 0           die "Unknown POE I/O mode $mode";
294             }
295             }
296              
297             1;
298              
299             =head1 NAME
300              
301             Mojo::Reactor::POE - POE backend for Mojo::Reactor
302              
303             =head1 SYNOPSIS
304              
305             use Mojo::Reactor::POE;
306              
307             # Watch if handle becomes readable or writable
308             my $reactor = Mojo::Reactor::POE->new;
309             $reactor->io($first => sub {
310             my ($reactor, $writable) = @_;
311             say $writable ? 'First handle is writable' : 'First handle is readable';
312             });
313              
314             # Change to watching only if handle becomes writable
315             $reactor->watch($first, 0, 1);
316              
317             # Turn file descriptor into handle and watch if it becomes readable
318             my $second = IO::Handle->new_from_fd($fd, 'r');
319             $reactor->io($second => sub {
320             my ($reactor, $writable) = @_;
321             say $writable ? 'Second handle is writable' : 'Second handle is readable';
322             })->watch($second, 1, 0);
323              
324             # Add a timer
325             $reactor->timer(15 => sub {
326             my $reactor = shift;
327             $reactor->remove($first);
328             $reactor->remove($second);
329             say 'Timeout!';
330             });
331              
332             # Start reactor if necessary
333             $reactor->start unless $reactor->is_running;
334              
335             # Or in an application using Mojo::IOLoop
336             use POE qw(Loop::IO_Poll);
337             use Mojo::Reactor::POE;
338             use Mojo::IOLoop;
339            
340             # Or in a Mojolicious application
341             $ MOJO_REACTOR=Mojo::Reactor::POE POE_EVENT_LOOP=POE::Loop::IO_Poll hypnotoad script/myapp
342              
343             =head1 DESCRIPTION
344              
345             L is an event reactor for L that uses L.
346             The usage is exactly the same as other L implementations such as
347             L. L will be used as the default
348             backend for L if it is loaded before L or any
349             module using the loop. However, when invoking a L application
350             through L or L, the reactor must be set as the default by
351             setting the C environment variable to C.
352              
353             Note that if L detects multiple potential event loops it will fail. This
354             includes L and L (loaded by L) if the
355             appropriate L modules are installed. To avoid this, load L
356             before any L module, or specify the L event loop explicitly.
357             This means that for L applications invoked through L or
358             L, the L event loop may also need to be set in the environment.
359             See L.
360              
361             =head1 EVENTS
362              
363             L inherits all events from L.
364              
365             =head1 METHODS
366              
367             L inherits all methods from L and
368             implements the following new ones.
369              
370             =head2 new
371              
372             my $reactor = Mojo::Reactor::POE->new;
373              
374             Construct a new L object.
375              
376             =head2 again
377              
378             $reactor->again($id);
379             $reactor->again($id, 0.5);
380              
381             Restart timer and optionally change the invocation time. Note that this method
382             requires an active timer.
383              
384             =head2 one_tick
385              
386             $reactor->one_tick;
387              
388             Run reactor until an event occurs or no events are being watched anymore. Note
389             that this method can recurse back into the reactor, so you need to be careful.
390              
391             # Don't block longer than 0.5 seconds
392             my $id = $reactor->timer(0.5 => sub {});
393             $reactor->one_tick;
394             $reactor->remove($id);
395              
396             =head2 recurring
397              
398             my $id = $reactor->recurring(0.25 => sub {...});
399              
400             Create a new recurring timer, invoking the callback repeatedly after a given
401             amount of time in seconds.
402              
403             =head2 remove
404              
405             my $bool = $reactor->remove($handle);
406             my $bool = $reactor->remove($id);
407              
408             Remove handle or timer.
409              
410             =head2 reset
411              
412             $reactor->reset;
413              
414             Remove all handles and timers.
415              
416             =head2 timer
417              
418             my $id = $reactor->timer(0.5 => sub {...});
419              
420             Create a new timer, invoking the callback after a given amount of time in
421             seconds.
422              
423             =head2 watch
424              
425             $reactor = $reactor->watch($handle, $readable, $writable);
426              
427             Change I/O events to watch handle for with true and false values. Note that
428             this method requires an active I/O watcher.
429              
430             # Watch only for readable events
431             $reactor->watch($handle, 1, 0);
432              
433             # Watch only for writable events
434             $reactor->watch($handle, 0, 1);
435              
436             # Watch for readable and writable events
437             $reactor->watch($handle, 1, 1);
438              
439             # Pause watching for events
440             $reactor->watch($handle, 0, 0);
441              
442             =head1 CAVEATS
443              
444             When using L with L, the event loop must be controlled by
445             L or L, such as with the methods
446             L, L, and
447             L. Starting or stopping the event loop through L will not
448             provide required functionality to L applications.
449              
450             Externally-added sessions will not keep the L running if
451             L has nothing left to watch. This can be worked around by
452             adding a recurring timer for the reactor to watch.
453              
454             =head1 BUGS
455              
456             L has a complex session system which may lead to bugs when used in this
457             manner. Report any issues on the public bugtracker.
458              
459             =head1 AUTHOR
460              
461             Dan Book, C
462              
463             =head1 COPYRIGHT AND LICENSE
464              
465             Copyright 2015, Dan Book.
466              
467             This library is free software; you may redistribute it and/or modify it under
468             the terms of the Artistic License version 2.0.
469              
470             =head1 SEE ALSO
471              
472             L, L, L