File Coverage

blib/lib/Mojo/IOLoop.pm
Criterion Covered Total %
statement 134 138 97.1
branch 40 44 90.9
condition 32 44 72.7
subroutine 40 42 95.2
pod 16 16 100.0
total 262 284 92.2


line stmt bran cond sub pod time code
1             package Mojo::IOLoop;
2 63     63   364131 use Mojo::Base 'Mojo::EventEmitter';
  63         144  
  63         446  
3              
4             # "Professor: Amy, technology isn't intrinsically good or evil. It's how it's
5             # used. Like the death ray."
6 63     63   487 use Carp qw(croak);
  63         155  
  63         3171  
7 63     63   29341 use Mojo::IOLoop::Client;
  63         243  
  63         461  
8 63     63   29831 use Mojo::IOLoop::Server;
  63         236  
  63         1098  
9 63     63   29822 use Mojo::IOLoop::Stream;
  63         246  
  63         692  
10 63     63   29469 use Mojo::IOLoop::Subprocess;
  63         218  
  63         660  
11 63     63   29483 use Mojo::Reactor::Poll;
  63         234  
  63         813  
12 63     63   451 use Mojo::Util qw(md5_sum steady_time);
  63         183  
  63         3224  
13 63     63   409 use Scalar::Util qw(blessed weaken);
  63         159  
  63         3962  
14              
15 63   50 63   444 use constant DEBUG => $ENV{MOJO_IOLOOP_DEBUG} || 0;
  63         146  
  63         183369  
16              
17             has max_accepts => 0;
18             has max_connections => 1000;
19             has reactor => sub {
20             my $class = Mojo::Reactor::Poll->detect;
21             warn "-- Reactor initialized ($class)\n" if DEBUG;
22             return $class->new->catch(sub { warn "@{[blessed $_[0]]}: $_[1]" });
23             };
24              
25             # Ignore PIPE signal
26             $SIG{PIPE} = 'IGNORE';
27              
28             # Initialize singleton reactor early
29             __PACKAGE__->singleton->reactor;
30              
31             sub acceptor {
32 305     305 1 5184 my ($self, $acceptor) = (_instance(shift), @_);
33              
34             # Find acceptor for id
35 305 100       1486 return $self->{acceptors}{$acceptor} unless ref $acceptor;
36              
37             # Connect acceptor with reactor
38 147         592 $self->{acceptors}{my $id = $self->_id} = $acceptor->reactor($self->reactor);
39              
40             # Allow new acceptor to get picked up
41 147         562 $self->_not_accepting->_maybe_accepting;
42              
43 147         639 return $id;
44             }
45              
46             sub client {
47 196     196 1 1347 my ($self, $cb) = (_instance(shift), pop);
48              
49 196         715 my $id = $self->_id;
50 196         1075 my $client = $self->{out}{$id}{client} = Mojo::IOLoop::Client->new(reactor => $self->reactor);
51              
52 196         729 weaken $self;
53             $client->on(
54             connect => sub {
55 191     191   693 delete $self->{out}{$id}{client};
56 191         1537 my $stream = Mojo::IOLoop::Stream->new(pop);
57 191         1032 $self->_stream($stream => $id);
58 191         952 $self->$cb(undef, $stream);
59             }
60 196         1638 );
61 196     3   1391 $client->on(error => sub { $self->_remove($id); $self->$cb(pop, undef) });
  3         11  
  3         21  
62 196         1287 $client->connect(@_);
63              
64 196         1246 return $id;
65             }
66              
67 3253     3253 1 6690 sub is_running { _instance(shift)->reactor->is_running }
68              
69             sub next_tick {
70 1797     1797 1 11649 my ($self, $cb) = (_instance(shift), @_);
71 1797         5699 weaken $self;
72 1797     1797   4208 return $self->reactor->next_tick(sub { $self->$cb });
  1797         4932  
73             }
74              
75             sub one_tick {
76 1255     1255 1 4945 my $self = _instance(shift);
77 1255 100       2917 croak 'Mojo::IOLoop already running' if $self->is_running;
78 1254         3627 $self->reactor->one_tick;
79             }
80              
81 7     7 1 627 sub recurring { shift->_timer(recurring => @_) }
82              
83             sub remove {
84 442     442 1 1298 my ($self, $id) = (_instance(shift), @_);
85 442   100     2149 my $c = $self->{in}{$id} || $self->{out}{$id};
86 442 100 100     1980 if ($c && (my $stream = $c->{stream})) { return $stream->close_gracefully }
  284         1117  
87 158         565 $self->_remove($id);
88             }
89              
90             sub reset {
91 4   100 4 1 35 my ($self, $options) = (_instance(shift), shift // {});
92              
93 4         18 $self->emit('reset')->stop;
94 4 100       17 if ($options->{freeze}) {
95 1         3 state @frozen;
96 1         12 push @frozen, {%$self};
97 1         4 delete $self->{reactor};
98             }
99 3         11 else { $self->reactor->reset }
100              
101 4         26 delete @$self{qw(accepting acceptors events in out stop)};
102             }
103              
104             sub server {
105 147     147 1 5727 my ($self, $cb) = (_instance(shift), pop);
106              
107 147         1370 my $server = Mojo::IOLoop::Server->new;
108 147         634 weaken $self;
109             $server->on(
110             accept => sub {
111 194     194   1583 my $stream = Mojo::IOLoop::Stream->new(pop);
112 194         851 $self->$cb($stream, $self->_stream($stream, $self->_id, 1));
113              
114             # Enforce connection limit (randomize to improve load balancing)
115 194 100       1002 if (my $max = $self->max_accepts) {
116 1   33     17 $self->{accepts} //= $max - int rand $max / 2;
117 1 50       6 $self->stop_gracefully if ($self->{accepts} -= 1) <= 0;
118             }
119              
120             # Stop accepting if connection limit has been reached
121 194 100       696 $self->_not_accepting if $self->_limit;
122             }
123 147         1242 );
124 147         829 $server->listen(@_);
125              
126 146         697 return $self->acceptor($server);
127             }
128              
129 1334     1334 1 19906 sub singleton { state $loop = shift->new }
130              
131             sub start {
132 1001     1001 1 8046 my $self = _instance(shift);
133 1001 100       2628 croak 'Mojo::IOLoop already running' if $self->is_running;
134 998         3130 $self->reactor->start;
135             }
136              
137 995     995 1 3044 sub stop { _instance(shift)->reactor->stop }
138              
139             sub stop_gracefully {
140 3     3 1 29 my $self = _instance(shift)->_not_accepting;
141 3 100 66     21 ++$self->{stop} and !$self->emit('finish')->_in and $self->stop;
142             }
143              
144             sub stream {
145 7125     7125 1 16573 my ($self, $stream) = (_instance(shift), @_);
146 7125 100       15735 return $self->_stream($stream => $self->_id) if ref $stream;
147 7124   100     29287 my $c = $self->{in}{$stream} || $self->{out}{$stream} // {};
      100        
148 7124         37738 return $c->{stream};
149             }
150              
151             sub subprocess {
152 0     0 1 0 my $subprocess = Mojo::IOLoop::Subprocess->new(ioloop => _instance(shift));
153 0 0       0 return @_ ? $subprocess->run(@_) : $subprocess;
154             }
155              
156 53     53 1 482 sub timer { shift->_timer(timer => @_) }
157              
158             sub _id {
159 538     538   1004 my $self = shift;
160 538         946 my $id;
161 538   33     876 do { $id = md5_sum 'c' . steady_time . rand } while $self->{in}{$id} || $self->{out}{$id} || $self->{acceptors}{$id};
  538   33     1865  
162 538         14712 return $id;
163             }
164              
165 457   100 457   785 sub _in { scalar keys %{shift->{in} // {}} }
  457         2466  
166              
167 16583 100   16583   49967 sub _instance { ref $_[0] ? $_[0] : $_[0]->singleton }
168              
169 453 100   453   1865 sub _limit { $_[0]{stop} ? 1 : $_[0]->_in >= $_[0]->max_connections }
170              
171             sub _maybe_accepting {
172 596     596   1128 my $self = shift;
173 596 100 100     2290 return if $self->{accepting} || $self->_limit;
174 258   50     629 $_->start for values %{$self->{acceptors} // {}};
  258         1520  
175 258         1234 $self->{accepting} = 1;
176             }
177              
178             sub _not_accepting {
179 263     263   489 my $self = shift;
180 263 100       1023 return $self unless delete $self->{accepting};
181 200   50     450 $_->stop for values %{$self->{acceptors} // {}};
  200         1481  
182 200         665 return $self;
183             }
184              
185 0   0 0   0 sub _out { scalar keys %{shift->{out} // {}} }
  0         0  
186              
187             sub _remove {
188 499     499   4866 my ($self, $id) = @_;
189              
190             # Timer
191 499 50       1369 return undef unless my $reactor = $self->reactor;
192 499 100       2036 return undef if $reactor->remove($id);
193              
194             # Acceptor
195 474 100       2044 return $self->_not_accepting->_maybe_accepting if delete $self->{acceptors}{$id};
196              
197             # Connection
198 364 100 100     2188 return undef unless delete $self->{in}{$id} || delete $self->{out}{$id};
199 341 100 100     1295 return $self->stop if $self->{stop} && !$self->_in;
200 339         1105 $self->_maybe_accepting;
201 339         893 warn "-- $id <<< $$ (@{[$self->_in]}:@{[$self->_out]})\n" if DEBUG;
202             }
203              
204             sub _stream {
205 386     386   1181 my ($self, $stream, $id, $server) = @_;
206              
207             # Connect stream with reactor
208 386 100       1178 $self->{$server ? 'in' : 'out'}{$id}{stream} = $stream->reactor($self->reactor);
209 386         748 warn "-- $id >>> $$ (@{[$self->_in]}:@{[$self->_out]})\n" if DEBUG;
210 386         1179 weaken $self;
211 386 100   340   2585 $stream->on(close => sub { $self && $self->_remove($id) });
  340         1981  
212 386         1763 $stream->start;
213              
214 386         1724 return $id;
215             }
216              
217             sub _timer {
218 60     60   213 my ($self, $method, $after, $cb) = (_instance(shift), @_);
219 60         236 weaken $self;
220 60     23832   184 return $self->reactor->$method($after => sub { $self->$cb });
  23832         40300  
221             }
222              
223             1;
224              
225             =encoding utf8
226              
227             =head1 NAME
228              
229             Mojo::IOLoop - Minimalistic event loop
230              
231             =head1 SYNOPSIS
232              
233             use Mojo::IOLoop;
234              
235             # Listen on port 3000
236             Mojo::IOLoop->server({port => 3000} => sub ($loop, $stream, $id) {
237             $stream->on(read => sub ($stream, $bytes) {
238             # Process input chunk
239             say $bytes;
240              
241             # Write response
242             $stream->write('HTTP/1.1 200 OK');
243             });
244             });
245              
246             # Connect to port 3000
247             my $id = Mojo::IOLoop->client({port => 3000} => sub ($loop, $err, $stream) {
248             $stream->on(read => sub ($stream, $bytes) {
249             # Process input
250             say "Input: $bytes";
251             });
252              
253             # Write request
254             $stream->write("GET / HTTP/1.1\x0d\x0a\x0d\x0a");
255             });
256              
257             # Add a timer
258             Mojo::IOLoop->timer(5 => sub ($loop) { $loop->remove($id) });
259              
260             # Start event loop if necessary
261             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
262              
263             =head1 DESCRIPTION
264              
265             L is a very minimalistic event loop based on L, it has been reduced to the absolute
266             minimal feature set required to build solid and scalable non-blocking clients and servers.
267              
268             Depending on operating system, the default per-process and system-wide file descriptor limits are often very low and
269             need to be tuned for better scalability. The C environment variable should also be used to select the best
270             possible L backend, which usually defaults to the not very scalable C
271              
272             LIBEV_FLAGS=1 # select
273             LIBEV_FLAGS=2 # poll
274             LIBEV_FLAGS=4 # epoll (Linux)
275             LIBEV_FLAGS=8 # kqueue (*BSD, OS X)
276             LIBEV_FLAGS=64 # Linux AIO
277              
278             The event loop will be resilient to time jumps if a monotonic clock is available through L. A TLS
279             certificate and key are also built right in, to make writing test servers as easy as possible. Also note that for
280             convenience the C signal will be set to C when L is loaded.
281              
282             For better scalability (epoll, kqueue) and to provide non-blocking name resolution, SOCKS5 as well as TLS support, the
283             optional modules L (4.32+), L (0.15+), L (0.64+) and L
284             (2.009+) will be used automatically if possible. Individual features can also be disabled with the C,
285             C and C environment variables.
286              
287             See L for more.
288              
289             =head1 EVENTS
290              
291             L inherits all events from L and can emit the following new ones.
292              
293             =head2 finish
294              
295             $loop->on(finish => sub ($loop) {...});
296              
297             Emitted when the event loop wants to shut down gracefully and is just waiting for all existing connections to be
298             closed.
299              
300             =head2 reset
301              
302             $loop->on(reset => sub ($loop) {...});
303              
304             Emitted when the event loop is reset, this usually happens after the process is forked to clean up resources that
305             cannot be shared.
306              
307             =head1 ATTRIBUTES
308              
309             L implements the following attributes.
310              
311             =head2 max_accepts
312              
313             my $max = $loop->max_accepts;
314             $loop = $loop->max_accepts(1000);
315              
316             The maximum number of connections this event loop is allowed to accept, before shutting down gracefully without
317             interrupting existing connections, defaults to C<0>. Setting the value to C<0> will allow this event loop to accept new
318             connections indefinitely. Note that up to half of this value can be subtracted randomly to improve load balancing
319             between multiple server processes, and to make sure that not all of them restart at the same time.
320              
321             =head2 max_connections
322              
323             my $max = $loop->max_connections;
324             $loop = $loop->max_connections(100);
325              
326             The maximum number of accepted connections this event loop is allowed to handle concurrently, before stopping to accept
327             new incoming connections, defaults to C<1000>.
328              
329             =head2 reactor
330              
331             my $reactor = $loop->reactor;
332             $loop = $loop->reactor(Mojo::Reactor->new);
333              
334             Low-level event reactor, usually a L or L object with a default subscriber to
335             the event L.
336              
337             # Watch if handle becomes readable or writable
338             Mojo::IOLoop->singleton->reactor->io($handle => sub ($reactor, $writable) {
339             say $writable ? 'Handle is writable' : 'Handle is readable';
340             });
341              
342             # Change to watching only if handle becomes writable
343             Mojo::IOLoop->singleton->reactor->watch($handle, 0, 1);
344              
345             # Remove handle again
346             Mojo::IOLoop->singleton->reactor->remove($handle);
347              
348             =head1 METHODS
349              
350             L inherits all methods from L and implements the following new ones.
351              
352             =head2 acceptor
353              
354             my $server = Mojo::IOLoop->acceptor($id);
355             my $server = $loop->acceptor($id);
356             my $id = $loop->acceptor(Mojo::IOLoop::Server->new);
357              
358             Get L object for id or turn object into an acceptor.
359              
360             =head2 client
361              
362             my $id = Mojo::IOLoop->client(address => '127.0.0.1', port => 3000, sub {...});
363             my $id = $loop->client(address => '127.0.0.1', port => 3000, sub {...});
364             my $id = $loop->client({address => '127.0.0.1', port => 3000} => sub {...});
365              
366             Open a TCP/IP or UNIX domain socket connection with L and create a stream object (usually
367             L), takes the same arguments as L.
368              
369             =head2 is_running
370              
371             my $bool = Mojo::IOLoop->is_running;
372             my $bool = $loop->is_running;
373              
374             Check if event loop is running.
375              
376             =head2 next_tick
377              
378             my $undef = Mojo::IOLoop->next_tick(sub ($loop) {...});
379             my $undef = $loop->next_tick(sub ($loop) {...});
380              
381             Execute callback as soon as possible, but not before returning or other callbacks that have been registered with this
382             method, always returns C.
383              
384             # Perform operation on next reactor tick
385             Mojo::IOLoop->next_tick(sub ($loop) {...});
386              
387             =head2 one_tick
388              
389             Mojo::IOLoop->one_tick;
390             $loop->one_tick;
391              
392             Run event loop until an event occurs.
393              
394             # Don't block longer than 0.5 seconds
395             my $id = Mojo::IOLoop->timer(0.5 => sub ($loop) {});
396             Mojo::IOLoop->one_tick;
397             Mojo::IOLoop->remove($id);
398              
399             =head2 recurring
400              
401             my $id = Mojo::IOLoop->recurring(3 => sub ($loop) {...});
402             my $id = $loop->recurring(0 => sub ($loop) {...});
403             my $id = $loop->recurring(0.25 => sub ($loop) {...});
404              
405             Create a new recurring timer, invoking the callback repeatedly after a given amount of time in seconds.
406              
407             # Perform operation every 5 seconds
408             Mojo::IOLoop->recurring(5 => sub ($loop) {...});
409              
410             =head2 remove
411              
412             Mojo::IOLoop->remove($id);
413             $loop->remove($id);
414              
415             Remove anything with an id, connections will be dropped gracefully by allowing them to finish writing all data in their
416             write buffers.
417              
418             =head2 reset
419              
420             Mojo::IOLoop->reset;
421             $loop->reset;
422             $loop->reset({freeze => 1});
423              
424             Remove everything and stop the event loop.
425              
426             These options are currently available:
427              
428             =over 2
429              
430             =item freeze
431              
432             freeze => 1
433              
434             Freeze the current state of the event loop in time before resetting it. This will prevent active connections from
435             getting closed immediately, which can help with many unintended side effects when processes are forked. Note that this
436             option is B and might change without warning!
437              
438             =back
439              
440             =head2 server
441              
442             my $id = Mojo::IOLoop->server(port => 3000, sub {...});
443             my $id = $loop->server(port => 3000, sub {...});
444             my $id = $loop->server({port => 3000} => sub {...});
445              
446             Accept TCP/IP and UNIX domain socket connections with L and create stream objects (usually
447             L, takes the same arguments as L.
448              
449             # Listen on random port
450             my $id = Mojo::IOLoop->server({address => '127.0.0.1'} => sub ($loop, $stream, $id) {...});
451             my $port = Mojo::IOLoop->acceptor($id)->port;
452              
453             =head2 singleton
454              
455             my $loop = Mojo::IOLoop->singleton;
456              
457             The global L singleton, used to access a single shared event loop object from everywhere inside the
458             process.
459              
460             # Many methods also allow you to take shortcuts
461             Mojo::IOLoop->timer(2 => sub { Mojo::IOLoop->stop });
462             Mojo::IOLoop->start;
463              
464             # Restart active timer
465             my $id = Mojo::IOLoop->timer(3 => sub { say 'Timeout!' });
466             Mojo::IOLoop->singleton->reactor->again($id);
467              
468             # Turn file descriptor into handle and watch if it becomes readable
469             my $handle = IO::Handle->new_from_fd($fd, 'r');
470             Mojo::IOLoop->singleton->reactor->io($handle => sub ($reactor, $writable) {
471             say $writable ? 'Handle is writable' : 'Handle is readable';
472             })->watch($handle, 1, 0);
473              
474             =head2 start
475              
476             Mojo::IOLoop->start;
477             $loop->start;
478              
479             Start the event loop, this will block until L is called. Note that some reactors stop automatically if there
480             are no events being watched anymore.
481              
482             # Start event loop only if it is not running already
483             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
484              
485             =head2 stop
486              
487             Mojo::IOLoop->stop;
488             $loop->stop;
489              
490             Stop the event loop, this will not interrupt any existing connections and the event loop can be restarted by running
491             L again.
492              
493             =head2 stop_gracefully
494              
495             Mojo::IOLoop->stop_gracefully;
496             $loop->stop_gracefully;
497              
498             Stop accepting new connections and wait for already accepted connections to be closed, before stopping the event loop.
499              
500             =head2 stream
501              
502             my $stream = Mojo::IOLoop->stream($id);
503             my $stream = $loop->stream($id);
504             my $id = $loop->stream(Mojo::IOLoop::Stream->new);
505              
506             Get L object for id or turn object into a connection.
507              
508             # Increase inactivity timeout for connection to 300 seconds
509             Mojo::IOLoop->stream($id)->timeout(300);
510              
511             =head2 subprocess
512              
513             my $subprocess = Mojo::IOLoop->subprocess;
514             my $subprocess = $loop->subprocess;
515             my $subprocess = $loop->subprocess(sub ($subprocess) {...}, sub ($subprocess, $err, @results) {...});
516              
517             Build L object to perform computationally expensive operations in subprocesses, without
518             blocking the event loop. Callbacks will be passed along to L.
519              
520             # Operation that would block the event loop for 5 seconds
521             Mojo::IOLoop->subprocess->run_p(sub {
522             sleep 5;
523             return '♥', 'Mojolicious';
524             })->then(sub (@results) {
525             say "I $results[0] $results[1]!";
526             })->catch(sub ($err) {
527             say "Subprocess error: $err";
528             });
529              
530             =head2 timer
531              
532             my $id = Mojo::IOLoop->timer(3 => sub ($loop) {...});
533             my $id = $loop->timer(0 => sub ($loop) {...});
534             my $id = $loop->timer(0.25 => sub ($loop) {...});
535              
536             Create a new timer, invoking the callback after a given amount of time in seconds.
537              
538             # Perform operation in 5 seconds
539             Mojo::IOLoop->timer(5 => sub ($loop) {...});
540              
541             =head1 DEBUGGING
542              
543             You can set the C environment variable to get some advanced diagnostics information printed to
544             C.
545              
546             MOJO_IOLOOP_DEBUG=1
547              
548             =head1 SEE ALSO
549              
550             L, L, L.
551              
552             =cut