File Coverage

blib/lib/Mojo/Reactor/POE.pm
Criterion Covered Total %
statement 158 159 99.3
branch 44 56 78.5
condition 12 28 42.8
subroutine 33 33 100.0
pod 8 8 100.0
total 255 284 89.7


line stmt bran cond sub pod time code
1             package Mojo::Reactor::POE;
2              
3 1     1   282792 use POE; # Loaded early to avoid event loop confusion
  1         18451  
  1         8  
4 1     1   63993 BEGIN { POE::Kernel->run } # silence run() warning
5              
6 1     1   2123 use Mojo::Base 'Mojo::Reactor::Poll';
  1         2  
  1         11  
7              
8             $ENV{MOJO_REACTOR} ||= 'Mojo::Reactor::POE';
9              
10 1     1   41964 use Carp 'croak';
  1         2  
  1         51  
11 1     1   9 use Mojo::Util qw(md5_sum steady_time);
  1         2  
  1         94  
12 1     1   9 use Scalar::Util 'weaken';
  1         2  
  1         52  
13              
14 1     1   7 use constant { POE_IO_READ => 0, POE_IO_WRITE => 1 };
  1         3  
  1         110  
15 1   50 1   8 use constant DEBUG => $ENV{MOJO_REACTOR_POE_DEBUG} || 0;
  1         2  
  1         2709  
16              
17             our $VERSION = '1.000';
18              
19             my $POE;
20              
21             # We have to fall back to Mojo::Reactor::Poll, since POE::Kernel is unique
22 6 100   6 1 89100 sub new { $POE++ ? Mojo::Reactor::Poll->new : shift->SUPER::new }
23              
24             sub DESTROY {
25 2     2   1125 my $self = shift;
26 2         9 $self->reset; # Close session
27 2         20 undef $POE;
28             }
29              
30             sub again {
31 9     9 1 400 my ($self, $id, $after) = @_;
32 9 100       322 croak 'Timer not active' unless my $timer = $self->{timers}{$id};
33 8 100       24 $timer->{after} = $after if defined $after;
34 8         65 $timer->{time} = steady_time + $timer->{after};
35             # If session doesn't exist, the time will be set when it starts
36 8 50       106 $self->_session_call(mojo_adjust_timer => $id) if $self->_session_exists;
37             }
38              
39             sub one_tick {
40 5768     5768 1 516968 my $self = shift;
41 5768         13528 $self->_init_session;
42            
43             # Stop automatically if there is nothing to watch
44 5768 100 100     8080 return $self->stop unless keys %{$self->{timers}} || keys %{$self->{io}};
  5768         14991  
  6         44  
45            
46             # Just one tick
47 5763 100       11556 local $self->{running} = 1 unless $self->{running};
48            
49 5763         13406 POE::Kernel->run_one_timeslice;
50             }
51              
52 4     4 1 793 sub recurring { shift->_timer(1, @_) }
53              
54             sub remove {
55 11     11 1 3418 my ($self, $remove) = @_;
56 11 50       38 return unless defined $remove;
57 11 100       38 if (ref $remove) {
58 5   33     23 my $fileno = fileno($remove) // croak 'Handle is closed';
59 5 100       20 if (exists $self->{io}{$fileno}) {
60 3         9 warn "-- Removed IO watcher for $fileno\n" if DEBUG;
61             # If session doesn't exist, the watcher won't be re-added
62 3 50       12 $self->_session_call(mojo_clear_io => $fileno) if $self->_session_exists;
63             }
64 5         46 return !!delete $self->{io}{$fileno};
65             } else {
66 6 100       18 if (exists $self->{timers}{$remove}) {
67 5         8 warn "-- Removed timer $remove\n" if DEBUG;
68             # If session doesn't exist, the timer won't be re-added
69 5 50       15 $self->_session_call(mojo_clear_timer => $remove) if $self->_session_exists;
70             }
71 6         45 return !!delete $self->{timers}{$remove};
72             }
73             }
74              
75             sub reset {
76 6     6 1 27608 my $self = shift;
77             # If session doesn't exist, watchers won't be re-added
78 6 100       22 if ($self->_session_exists) {
79 3         11 $self->_session_call('mojo_clear_timers');
80 3         8 $self->_session_call(mojo_clear_io => $_) for keys %{$self->{io}};
  3         15  
81             }
82 6         43 $self->SUPER::reset;
83             }
84              
85 32     32 1 37448 sub timer { shift->_timer(0, @_) }
86              
87             sub watch {
88 25     25 1 6897 my ($self, $handle, $read, $write) = @_;
89              
90 25         59 my $fileno = fileno $handle;
91 25 100       199 croak 'I/O watcher not active' unless my $io = $self->{io}{$fileno};
92 24         59 $io->{handle} = $handle;
93 24         54 $io->{read} = $read;
94 24         42 $io->{write} = $write;
95            
96 24         38 warn "-- Set IO watcher for $fileno\n" if DEBUG;
97            
98 24         71 $self->_init_session->_session_call(mojo_set_io => $fileno);
99            
100 24         85 return $self;
101             }
102              
103             sub _id {
104 36     36   64 my $self = shift;
105 36         51 my $id;
106 36         56 do { $id = md5_sum 't' . steady_time . rand } while $self->{timers}{$id};
  36         125  
107 36         902 return $id;
108             }
109              
110             sub _timer {
111 36     36   107 my ($self, $recurring, $after, $cb) = @_;
112            
113 36         102 my $id = $self->_id;
114 36         110 my $timer = $self->{timers}{$id} = {
115             cb => $cb,
116             after => $after,
117             recurring => $recurring,
118             time => steady_time + $after,
119             };
120            
121 36         329 if (DEBUG) {
122             my $is_recurring = $recurring ? ' (recurring)' : '';
123             warn "-- Set timer $id after $after seconds$is_recurring\n";
124             }
125            
126 36         96 $self->_init_session->_session_call(mojo_set_timer => $id);
127            
128 36         117 return $id;
129             }
130              
131             sub _try {
132 6266     6266   11816 my ($self, $what, $cb) = (shift, shift, shift);
133 6266 100       8287 eval { $self->$cb(@_); 1 } or $self->emit(error => "$what failed: $@");
  6266         16018  
  6265         31366  
134             }
135              
136             sub _init_session {
137 6114     6114   8343 my $self = shift;
138 6114 100       10531 unless ($self->_session_exists) {
139 9         157 my $session = POE::Session->create(
140             inline_states => {
141             _start => \&_event_start,
142             _stop => \&_event_stop,
143             mojo_set_timer => \&_event_set_timer,
144             mojo_clear_timer => \&_event_clear_timer,
145             mojo_adjust_timer => \&_event_adjust_timer,
146             mojo_clear_timers => \&_event_clear_timers,
147             mojo_set_io => \&_event_set_io,
148             mojo_clear_io => \&_event_clear_io,
149             mojo_timer => \&_event_timer,
150             mojo_io => \&_event_io,
151             },
152             heap => { mojo_reactor => $self },
153             );
154 9         1393 weaken $session->get_heap()->{mojo_reactor};
155 9         66 $self->{session_id} = $session->ID;
156             }
157 6114         10662 return $self;
158             }
159              
160             sub _session_exists {
161 6136     6136   8818 my $self = shift;
162 6136         17223 return defined $self->{session_id};
163             }
164              
165             sub _session_call {
166 368     368   534 my $self = shift;
167 368 50       750 croak 'Session call on nonexistent session' unless defined $self->{session_id};
168 368         1357 POE::Kernel->call($self->{session_id}, @_);
169 368         3090 return $self;
170             }
171              
172             sub _event_start {
173 9     9   3515 my $self = $_[HEAP]{mojo_reactor};
174 9         17 my $session = $_[SESSION];
175            
176 9         29 warn "-- POE session started\n" if DEBUG;
177             }
178              
179             sub _event_stop {
180 8     8   1157 my $self = $_[HEAP]{mojo_reactor};
181            
182 8         12 warn "-- POE session stopped\n" if DEBUG;
183            
184 8 50       48 delete $self->{session_id} if $self;
185             }
186              
187             sub _event_set_timer {
188 322     322   13717 my $self = $_[HEAP]{mojo_reactor};
189 322         480 my $id = $_[ARG0];
190             return unless exists $self->{timers}{$id}
191 322 50 33     1324 and defined $self->{timers}{$id}{time};
192 322         514 my $timer = $self->{timers}{$id};
193 322         676 my $delay_time = $timer->{time} - steady_time;
194 322         2097 my $poe_id = POE::Kernel->delay_set(mojo_timer => $delay_time, $id);
195 322         22413 $timer->{poe_id} = $poe_id;
196            
197 322         739 warn "-- Set POE timer $poe_id in $delay_time seconds\n" if DEBUG;
198             }
199              
200             sub _event_clear_timer {
201 5     5   326 my $self = $_[HEAP]{mojo_reactor};
202 5         20 my $id = $_[ARG0];
203             return unless exists $self->{timers}{$id}
204 5 50 33     39 and defined $self->{timers}{$id}{poe_id};
205 5         13 my $timer = $self->{timers}{$id};
206 5         25 POE::Kernel->alarm_remove($timer->{poe_id});
207            
208 5         529 warn "-- Cleared POE timer $timer->{poe_id}\n" if DEBUG;
209             }
210              
211             sub _event_adjust_timer {
212 8     8   377 my $self = $_[HEAP]{mojo_reactor};
213 8         14 my $id = $_[ARG0];
214             return unless exists $self->{timers}{$id}
215             and defined $self->{timers}{$id}{time}
216 8 50 33     61 and defined $self->{timers}{$id}{poe_id};
      33        
217 8         17 my $timer = $self->{timers}{$id};
218 8         20 my $new_delay = $timer->{time} - steady_time;
219 8         63 POE::Kernel->delay_adjust($timer->{poe_id}, $new_delay);
220            
221 8         955 warn "-- Adjusted POE timer $timer->{poe_id} to $new_delay seconds\n"
222             if DEBUG;
223             }
224              
225             sub _event_clear_timers {
226 3   50 3   183 my $self = $_[HEAP]{mojo_reactor} // return;
227 3         13 POE::Kernel->alarm_remove_all;
228            
229 3         318 warn "-- Cleared all POE timers\n" if DEBUG;
230             }
231              
232             sub _event_set_io {
233 24     24   1448 my $self = $_[HEAP]{mojo_reactor};
234 24         40 my $fd = $_[ARG0];
235             return unless exists $self->{io}{$fd}
236 24 50 33     123 and defined $self->{io}{$fd}{handle};
237 24         48 my $io = $self->{io}{$fd};
238 24 100       104 if ($io->{read}) {
239 19         67 POE::Kernel->select_read($io->{handle}, 'mojo_io');
240             } else {
241 5         24 POE::Kernel->select_read($io->{handle});
242             }
243 24 100       2323 if ($io->{write}) {
244 17         55 POE::Kernel->select_write($io->{handle}, 'mojo_io');
245             } else {
246 7         22 POE::Kernel->select_write($io->{handle});
247             }
248            
249 24         1680 warn "-- Set POE IO watcher for $fd " .
250             "with read: $io->{read}, write: $io->{write}\n" if DEBUG;
251             }
252              
253             sub _event_clear_io {
254 6     6   403 my $self = $_[HEAP]{mojo_reactor};
255 6         14 my $fd = $_[ARG0];
256             return unless exists $self->{io}{$fd}
257 6 50 33     40 and defined $self->{io}{$fd}{handle};
258 6         13 my $io = $self->{io}{$fd};
259 6         25 POE::Kernel->select_read($io->{handle});
260 6         508 POE::Kernel->select_write($io->{handle});
261 6         677 delete $io->{handle};
262            
263 6         20 warn "-- Cleared POE IO watcher for $fd\n" if DEBUG;
264             }
265              
266             sub _event_timer {
267 312     312   32724 my $self = $_[HEAP]{mojo_reactor};
268 312         526 my $id = $_[ARG0];
269            
270 312         541 my $timer = $self->{timers}{$id};
271 312         426 warn "-- Event fired for timer $id\n" if DEBUG;
272 312 100       611 if ($timer->{recurring}) {
273 286         778 $timer->{time} = steady_time + $timer->{after};
274 286         2229 $self->_init_session->_session_call(mojo_set_timer => $id);
275             } else {
276 26         82 delete $self->{timers}{$id};
277             }
278            
279 312         809 $self->_try('Timer', $timer->{cb});
280             }
281              
282             sub _event_io {
283 5954     5954   676705 my $self = $_[HEAP]{mojo_reactor};
284 5954         12570 my ($handle, $mode) = @_[ARG0, ARG1];
285            
286 5954         14675 my $io = $self->{io}{fileno $handle};
287             #warn "-- Event fired for IO watcher ".fileno($handle)."\n" if DEBUG;
288 5954 100       11055 if ($mode == POE_IO_READ) {
    50          
289 5439         12331 $self->_try('I/O watcher', $io->{cb}, 0);
290             } elsif ($mode == POE_IO_WRITE) {
291 515         1067 $self->_try('I/O watcher', $io->{cb}, 1);
292             } else {
293 0           die "Unknown POE I/O mode $mode";
294             }
295             }
296              
297             1;
298              
299             =head1 NAME
300              
301             Mojo::Reactor::POE - POE backend for Mojo::Reactor
302              
303             =head1 SYNOPSIS
304              
305             use Mojo::Reactor::POE;
306              
307             # Watch if handle becomes readable or writable
308             my $reactor = Mojo::Reactor::POE->new;
309             $reactor->io($first => sub {
310             my ($reactor, $writable) = @_;
311             say $writable ? 'First handle is writable' : 'First handle is readable';
312             });
313              
314             # Change to watching only if handle becomes writable
315             $reactor->watch($first, 0, 1);
316              
317             # Turn file descriptor into handle and watch if it becomes readable
318             my $second = IO::Handle->new_from_fd($fd, 'r');
319             $reactor->io($second => sub {
320             my ($reactor, $writable) = @_;
321             say $writable ? 'Second handle is writable' : 'Second handle is readable';
322             })->watch($second, 1, 0);
323              
324             # Add a timer
325             $reactor->timer(15 => sub {
326             my $reactor = shift;
327             $reactor->remove($first);
328             $reactor->remove($second);
329             say 'Timeout!';
330             });
331              
332             # Start reactor if necessary
333             $reactor->start unless $reactor->is_running;
334              
335             # Or in an application using Mojo::IOLoop
336             use POE qw(Loop::IO_Poll);
337             use Mojo::Reactor::POE;
338             use Mojo::IOLoop;
339            
340             # Or in a Mojolicious application
341             $ MOJO_REACTOR=Mojo::Reactor::POE POE_EVENT_LOOP=POE::Loop::IO_Poll hypnotoad script/myapp
342              
343             =head1 DESCRIPTION
344              
345             L is an event reactor for L that uses L.
346             The usage is exactly the same as other L implementations such as
347             L. L will be used as the default
348             backend for L if it is loaded before L or any
349             module using the loop. However, when invoking a L application
350             through L or L, the reactor must be set as the default by
351             setting the C environment variable to C.
352              
353             Note that if L detects multiple potential event loops it will fail. This
354             includes L and L (loaded by L) if the
355             appropriate L modules are installed. To avoid this, load L
356             before any L module, or specify the L event loop explicitly.
357             This means that for L applications invoked through L or
358             L, the L event loop may also need to be set in the environment.
359             See L.
360              
361             =head1 EVENTS
362              
363             L inherits all events from L.
364              
365             =head1 METHODS
366              
367             L inherits all methods from L and
368             implements the following new ones.
369              
370             =head2 new
371              
372             my $reactor = Mojo::Reactor::POE->new;
373              
374             Construct a new L object.
375              
376             =head2 again
377              
378             $reactor->again($id);
379             $reactor->again($id, 0.5);
380              
381             Restart timer and optionally change the invocation time. Note that this method
382             requires an active timer.
383              
384             =head2 one_tick
385              
386             $reactor->one_tick;
387              
388             Run reactor until an event occurs or no events are being watched anymore. Note
389             that this method can recurse back into the reactor, so you need to be careful.
390              
391             # Don't block longer than 0.5 seconds
392             my $id = $reactor->timer(0.5 => sub {});
393             $reactor->one_tick;
394             $reactor->remove($id);
395              
396             =head2 recurring
397              
398             my $id = $reactor->recurring(0.25 => sub {...});
399              
400             Create a new recurring timer, invoking the callback repeatedly after a given
401             amount of time in seconds.
402              
403             =head2 remove
404              
405             my $bool = $reactor->remove($handle);
406             my $bool = $reactor->remove($id);
407              
408             Remove handle or timer.
409              
410             =head2 reset
411              
412             $reactor->reset;
413              
414             Remove all handles and timers.
415              
416             =head2 timer
417              
418             my $id = $reactor->timer(0.5 => sub {...});
419              
420             Create a new timer, invoking the callback after a given amount of time in
421             seconds.
422              
423             =head2 watch
424              
425             $reactor = $reactor->watch($handle, $readable, $writable);
426              
427             Change I/O events to watch handle for with true and false values. Note that
428             this method requires an active I/O watcher.
429              
430             # Watch only for readable events
431             $reactor->watch($handle, 1, 0);
432              
433             # Watch only for writable events
434             $reactor->watch($handle, 0, 1);
435              
436             # Watch for readable and writable events
437             $reactor->watch($handle, 1, 1);
438              
439             # Pause watching for events
440             $reactor->watch($handle, 0, 0);
441              
442             =head1 CAVEATS
443              
444             When using L with L, the event loop must be controlled by
445             L or L, such as with the methods
446             L, L, and
447             L. Starting or stopping the event loop through L will not
448             provide required functionality to L applications.
449              
450             Externally-added sessions will not keep the L running if
451             L has nothing left to watch. This can be worked around by
452             adding a recurring timer for the reactor to watch.
453              
454             =head1 BUGS
455              
456             L has a complex session system which may lead to bugs when used in this
457             manner. Report any issues on the public bugtracker.
458              
459             =head1 AUTHOR
460              
461             Dan Book, C
462              
463             =head1 COPYRIGHT AND LICENSE
464              
465             Copyright 2015, Dan Book.
466              
467             This library is free software; you may redistribute it and/or modify it under
468             the terms of the Artistic License version 2.0.
469              
470             =head1 SEE ALSO
471              
472             L, L, L