File Coverage

blib/lib/Mojo/IOLoop.pm
Criterion Covered Total %
statement 134 138 97.1
branch 39 44 88.6
condition 32 44 72.7
subroutine 40 42 95.2
pod 16 16 100.0
total 261 284 91.9


line stmt bran cond sub pod time code
1             package Mojo::IOLoop;
2 63     63   351761 use Mojo::Base 'Mojo::EventEmitter';
  63         150  
  63         392  
3              
4             # "Professor: Amy, technology isn't intrinsically good or evil. It's how it's
5             # used. Like the death ray."
6 63     63   436 use Carp qw(croak);
  63         147  
  63         3128  
7 63     63   28398 use Mojo::IOLoop::Client;
  63         241  
  63         447  
8 63     63   28070 use Mojo::IOLoop::Server;
  63         203  
  63         958  
9 63     63   29087 use Mojo::IOLoop::Stream;
  63         192  
  63         694  
10 63     63   28698 use Mojo::IOLoop::Subprocess;
  63         224  
  63         646  
11 63     63   28417 use Mojo::Reactor::Poll;
  63         213  
  63         726  
12 63     63   431 use Mojo::Util qw(md5_sum steady_time);
  63         161  
  63         3257  
13 63     63   388 use Scalar::Util qw(blessed weaken);
  63         136  
  63         3837  
14              
15 63   50 63   438 use constant DEBUG => $ENV{MOJO_IOLOOP_DEBUG} || 0;
  63         142  
  63         171251  
16              
17             has max_accepts => 0;
18             has max_connections => 1000;
19             has reactor => sub {
20             my $class = Mojo::Reactor::Poll->detect;
21             warn "-- Reactor initialized ($class)\n" if DEBUG;
22             return $class->new->catch(sub { warn "@{[blessed $_[0]]}: $_[1]" });
23             };
24              
25             # Ignore PIPE signal
26             $SIG{PIPE} = 'IGNORE';
27              
28             # Initialize singleton reactor early
29             __PACKAGE__->singleton->reactor;
30              
31             sub acceptor {
32 305     305 1 4797 my ($self, $acceptor) = (_instance(shift), @_);
33              
34             # Find acceptor for id
35 305 100       1468 return $self->{acceptors}{$acceptor} unless ref $acceptor;
36              
37             # Connect acceptor with reactor
38 147         552 $self->{acceptors}{my $id = $self->_id} = $acceptor->reactor($self->reactor);
39              
40             # Allow new acceptor to get picked up
41 147         545 $self->_not_accepting->_maybe_accepting;
42              
43 147         613 return $id;
44             }
45              
46             sub client {
47 196     196 1 1174 my ($self, $cb) = (_instance(shift), pop);
48              
49 196         635 my $id = $self->_id;
50 196         952 my $client = $self->{out}{$id}{client} = Mojo::IOLoop::Client->new(reactor => $self->reactor);
51              
52 196         694 weaken $self;
53             $client->on(
54             connect => sub {
55 191     191   580 delete $self->{out}{$id}{client};
56 191         1356 my $stream = Mojo::IOLoop::Stream->new(pop);
57 191         899 $self->_stream($stream => $id);
58 191         780 $self->$cb(undef, $stream);
59             }
60 196         1489 );
61 196     3   1343 $client->on(error => sub { $self->_remove($id); $self->$cb(pop, undef) });
  3         11  
  3         10  
62 196         1123 $client->connect(@_);
63              
64 196         1191 return $id;
65             }
66              
67 3255     3255 1 6633 sub is_running { _instance(shift)->reactor->is_running }
68              
69             sub next_tick {
70 1797     1797 1 10403 my ($self, $cb) = (_instance(shift), @_);
71 1797         5382 weaken $self;
72 1797     1797   4503 return $self->reactor->next_tick(sub { $self->$cb });
  1797         4522  
73             }
74              
75             sub one_tick {
76 1257     1257 1 3918 my $self = _instance(shift);
77 1257 100       3121 croak 'Mojo::IOLoop already running' if $self->is_running;
78 1256         3541 $self->reactor->one_tick;
79             }
80              
81 7     7 1 510 sub recurring { shift->_timer(recurring => @_) }
82              
83             sub remove {
84 442     442 1 1177 my ($self, $id) = (_instance(shift), @_);
85 442   100     1959 my $c = $self->{in}{$id} || $self->{out}{$id};
86 442 100 100     1835 if ($c && (my $stream = $c->{stream})) { return $stream->close_gracefully }
  283         1061  
87 159         525 $self->_remove($id);
88             }
89              
90             sub reset {
91 4   100 4 1 23 my ($self, $options) = (_instance(shift), shift // {});
92              
93 4         19 $self->emit('reset')->stop;
94 4 100       15 if ($options->{freeze}) {
95 1         2 state @frozen;
96 1         8 push @frozen, {%$self};
97 1         5 delete $self->{reactor};
98             }
99 3         8 else { $self->reactor->reset }
100              
101 4         25 delete @$self{qw(accepting acceptors events in out stop)};
102             }
103              
104             sub server {
105 147     147 1 4853 my ($self, $cb) = (_instance(shift), pop);
106              
107 147         1300 my $server = Mojo::IOLoop::Server->new;
108 147         596 weaken $self;
109             $server->on(
110             accept => sub {
111 194     194   1424 my $stream = Mojo::IOLoop::Stream->new(pop);
112 194         754 $self->$cb($stream, $self->_stream($stream, $self->_id, 1));
113              
114             # Enforce connection limit (randomize to improve load balancing)
115 194 100       839 if (my $max = $self->max_accepts) {
116 1   33     14 $self->{accepts} //= $max - int rand $max / 2;
117 1 50       4 $self->stop_gracefully if ($self->{accepts} -= 1) <= 0;
118             }
119              
120             # Stop accepting if connection limit has been reached
121 194 100       609 $self->_not_accepting if $self->_limit;
122             }
123 147         1253 );
124 147         823 $server->listen(@_);
125              
126 146         630 return $self->acceptor($server);
127             }
128              
129 1336     1336 1 17139 sub singleton { state $loop = shift->new }
130              
131             sub start {
132 1001     1001 1 7588 my $self = _instance(shift);
133 1001 100       2649 croak 'Mojo::IOLoop already running' if $self->is_running;
134 998         2831 $self->reactor->start;
135             }
136              
137 995     995 1 2780 sub stop { _instance(shift)->reactor->stop }
138              
139             sub stop_gracefully {
140 3     3 1 20 my $self = _instance(shift)->_not_accepting;
141 3 100 66     18 ++$self->{stop} and !$self->emit('finish')->_in and $self->stop;
142             }
143              
144             sub stream {
145 7124     7124 1 15912 my ($self, $stream) = (_instance(shift), @_);
146 7124 100       16172 return $self->_stream($stream => $self->_id) if ref $stream;
147 7123   100     28427 my $c = $self->{in}{$stream} || $self->{out}{$stream} // {};
      100        
148 7123         34627 return $c->{stream};
149             }
150              
151             sub subprocess {
152 0     0 1 0 my $subprocess = Mojo::IOLoop::Subprocess->new(ioloop => _instance(shift));
153 0 0       0 return @_ ? $subprocess->run(@_) : $subprocess;
154             }
155              
156 53     53 1 391 sub timer { shift->_timer(timer => @_) }
157              
158             sub _id {
159 538     538   951 my $self = shift;
160 538         816 my $id;
161 538   33     842 do { $id = md5_sum 'c' . steady_time . rand } while $self->{in}{$id} || $self->{out}{$id} || $self->{acceptors}{$id};
  538   33     1787  
162 538         12996 return $id;
163             }
164              
165 458   100 458   713 sub _in { scalar keys %{shift->{in} // {}} }
  458         2330  
166              
167 16586 100   16586   48128 sub _instance { ref $_[0] ? $_[0] : $_[0]->singleton }
168              
169 454 100   454   1687 sub _limit { $_[0]{stop} ? 1 : $_[0]->_in >= $_[0]->max_connections }
170              
171             sub _maybe_accepting {
172 596     596   969 my $self = shift;
173 596 100 100     2096 return if $self->{accepting} || $self->_limit;
174 258   50     563 $_->start for values %{$self->{acceptors} // {}};
  258         1484  
175 258         1096 $self->{accepting} = 1;
176             }
177              
178             sub _not_accepting {
179 263     263   491 my $self = shift;
180 263 100       1053 return $self unless delete $self->{accepting};
181 200   50     413 $_->stop for values %{$self->{acceptors} // {}};
  200         1345  
182 200         631 return $self;
183             }
184              
185 0   0 0   0 sub _out { scalar keys %{shift->{out} // {}} }
  0         0  
186              
187             sub _remove {
188 500     500   1059 my ($self, $id) = @_;
189              
190             # Timer
191 500 50       4787 return undef unless my $reactor = $self->reactor;
192 500 100       1839 return undef if $reactor->remove($id);
193              
194             # Acceptor
195 475 100       2360 return $self->_not_accepting->_maybe_accepting if delete $self->{acceptors}{$id};
196              
197             # Connection
198 365 100 100     1863 return undef unless delete $self->{in}{$id} || delete $self->{out}{$id};
199 341 100 100     1247 return $self->stop if $self->{stop} && !$self->_in;
200 339         1028 $self->_maybe_accepting;
201 339         853 warn "-- $id <<< $$ (@{[$self->_in]}:@{[$self->_out]})\n" if DEBUG;
202             }
203              
204             sub _stream {
205 386     386   1195 my ($self, $stream, $id, $server) = @_;
206              
207             # Connect stream with reactor
208 386 100       1126 $self->{$server ? 'in' : 'out'}{$id}{stream} = $stream->reactor($self->reactor);
209 386         665 warn "-- $id >>> $$ (@{[$self->_in]}:@{[$self->_out]})\n" if DEBUG;
210 386         1066 weaken $self;
211 386 50   338   2260 $stream->on(close => sub { $self && $self->_remove($id) });
  338         1744  
212 386         1589 $stream->start;
213              
214 386         1379 return $id;
215             }
216              
217             sub _timer {
218 60     60   178 my ($self, $method, $after, $cb) = (_instance(shift), @_);
219 60         217 weaken $self;
220 60     24254   201 return $self->reactor->$method($after => sub { $self->$cb });
  24254         40515  
221             }
222              
223             1;
224              
225             =encoding utf8
226              
227             =head1 NAME
228              
229             Mojo::IOLoop - Minimalistic event loop
230              
231             =head1 SYNOPSIS
232              
233             use Mojo::IOLoop;
234              
235             # Listen on port 3000
236             Mojo::IOLoop->server({port => 3000} => sub ($loop, $stream, $id) {
237             $stream->on(read => sub ($stream, $bytes) {
238             # Process input chunk
239             say $bytes;
240              
241             # Write response
242             $stream->write('HTTP/1.1 200 OK');
243             });
244             });
245              
246             # Connect to port 3000
247             my $id = Mojo::IOLoop->client({port => 3000} => sub ($loop, $err, $stream) {
248             $stream->on(read => sub ($stream, $bytes) {
249             # Process input
250             say "Input: $bytes";
251             });
252              
253             # Write request
254             $stream->write("GET / HTTP/1.1\x0d\x0a\x0d\x0a");
255             });
256              
257             # Add a timer
258             Mojo::IOLoop->timer(5 => sub ($loop) { $loop->remove($id) });
259              
260             # Start event loop if necessary
261             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
262              
263             =head1 DESCRIPTION
264              
265             L is a very minimalistic event loop based on L, it has been reduced to the absolute
266             minimal feature set required to build solid and scalable non-blocking clients and servers.
267              
268             Depending on operating system, the default per-process and system-wide file descriptor limits are often very low and
269             need to be tuned for better scalability. The C environment variable should also be used to select the best
270             possible L backend, which usually defaults to the not very scalable C
271              
272             LIBEV_FLAGS=1 # select
273             LIBEV_FLAGS=2 # poll
274             LIBEV_FLAGS=4 # epoll (Linux)
275             LIBEV_FLAGS=8 # kqueue (*BSD, OS X)
276             LIBEV_FLAGS=64 # Linux AIO
277              
278             The event loop will be resilient to time jumps if a monotonic clock is available through L. A TLS
279             certificate and key are also built right in, to make writing test servers as easy as possible. Also note that for
280             convenience the C signal will be set to C when L is loaded.
281              
282             For better scalability (epoll, kqueue) and to provide non-blocking name resolution, SOCKS5 as well as TLS support, the
283             optional modules L (4.32+), L (0.15+), L (0.64+) and L
284             (2.009+) will be used automatically if possible. Individual features can also be disabled with the C,
285             C and C environment variables.
286              
287             See L for more.
288              
289             =head1 EVENTS
290              
291             L inherits all events from L and can emit the following new ones.
292              
293             =head2 finish
294              
295             $loop->on(finish => sub ($loop) {...});
296              
297             Emitted when the event loop wants to shut down gracefully and is just waiting for all existing connections to be
298             closed.
299              
300             =head2 reset
301              
302             $loop->on(reset => sub ($loop) {...});
303              
304             Emitted when the event loop is reset, this usually happens after the process is forked to clean up resources that
305             cannot be shared.
306              
307             =head1 ATTRIBUTES
308              
309             L implements the following attributes.
310              
311             =head2 max_accepts
312              
313             my $max = $loop->max_accepts;
314             $loop = $loop->max_accepts(1000);
315              
316             The maximum number of connections this event loop is allowed to accept, before shutting down gracefully without
317             interrupting existing connections, defaults to C<0>. Setting the value to C<0> will allow this event loop to accept new
318             connections indefinitely. Note that up to half of this value can be subtracted randomly to improve load balancing
319             between multiple server processes, and to make sure that not all of them restart at the same time.
320              
321             =head2 max_connections
322              
323             my $max = $loop->max_connections;
324             $loop = $loop->max_connections(100);
325              
326             The maximum number of accepted connections this event loop is allowed to handle concurrently, before stopping to accept
327             new incoming connections, defaults to C<1000>.
328              
329             =head2 reactor
330              
331             my $reactor = $loop->reactor;
332             $loop = $loop->reactor(Mojo::Reactor->new);
333              
334             Low-level event reactor, usually a L or L object with a default subscriber to
335             the event L.
336              
337             # Watch if handle becomes readable or writable
338             Mojo::IOLoop->singleton->reactor->io($handle => sub ($reactor, $writable) {
339             say $writable ? 'Handle is writable' : 'Handle is readable';
340             });
341              
342             # Change to watching only if handle becomes writable
343             Mojo::IOLoop->singleton->reactor->watch($handle, 0, 1);
344              
345             # Remove handle again
346             Mojo::IOLoop->singleton->reactor->remove($handle);
347              
348             =head1 METHODS
349              
350             L inherits all methods from L and implements the following new ones.
351              
352             =head2 acceptor
353              
354             my $server = Mojo::IOLoop->acceptor($id);
355             my $server = $loop->acceptor($id);
356             my $id = $loop->acceptor(Mojo::IOLoop::Server->new);
357              
358             Get L object for id or turn object into an acceptor.
359              
360             =head2 client
361              
362             my $id = Mojo::IOLoop->client(address => '127.0.0.1', port => 3000, sub {...});
363             my $id = $loop->client(address => '127.0.0.1', port => 3000, sub {...});
364             my $id = $loop->client({address => '127.0.0.1', port => 3000} => sub {...});
365              
366             Open a TCP/IP or UNIX domain socket connection with L and create a stream object (usually
367             L), takes the same arguments as L.
368              
369             =head2 is_running
370              
371             my $bool = Mojo::IOLoop->is_running;
372             my $bool = $loop->is_running;
373              
374             Check if event loop is running.
375              
376             =head2 next_tick
377              
378             my $undef = Mojo::IOLoop->next_tick(sub ($loop) {...});
379             my $undef = $loop->next_tick(sub ($loop) {...});
380              
381             Execute callback as soon as possible, but not before returning or other callbacks that have been registered with this
382             method, always returns C.
383              
384             # Perform operation on next reactor tick
385             Mojo::IOLoop->next_tick(sub ($loop) {...});
386              
387             =head2 one_tick
388              
389             Mojo::IOLoop->one_tick;
390             $loop->one_tick;
391              
392             Run event loop until an event occurs.
393              
394             # Don't block longer than 0.5 seconds
395             my $id = Mojo::IOLoop->timer(0.5 => sub ($loop) {});
396             Mojo::IOLoop->one_tick;
397             Mojo::IOLoop->remove($id);
398              
399             =head2 recurring
400              
401             my $id = Mojo::IOLoop->recurring(3 => sub ($loop) {...});
402             my $id = $loop->recurring(0 => sub ($loop) {...});
403             my $id = $loop->recurring(0.25 => sub ($loop) {...});
404              
405             Create a new recurring timer, invoking the callback repeatedly after a given amount of time in seconds.
406              
407             # Perform operation every 5 seconds
408             Mojo::IOLoop->recurring(5 => sub ($loop) {...});
409              
410             =head2 remove
411              
412             Mojo::IOLoop->remove($id);
413             $loop->remove($id);
414              
415             Remove anything with an id, connections will be dropped gracefully by allowing them to finish writing all data in their
416             write buffers.
417              
418             =head2 reset
419              
420             Mojo::IOLoop->reset;
421             $loop->reset;
422             $loop->reset({freeze => 1});
423              
424             Remove everything and stop the event loop.
425              
426             These options are currently available:
427              
428             =over 2
429              
430             =item freeze
431              
432             freeze => 1
433              
434             Freeze the current state of the event loop in time before resetting it. This will prevent active connections from
435             getting closed immediately, which can help with many unintended side effects when processes are forked. Note that this
436             option is B and might change without warning!
437              
438             =back
439              
440             =head2 server
441              
442             my $id = Mojo::IOLoop->server(port => 3000, sub {...});
443             my $id = $loop->server(port => 3000, sub {...});
444             my $id = $loop->server({port => 3000} => sub {...});
445              
446             Accept TCP/IP and UNIX domain socket connections with L and create stream objects (usually
447             L, takes the same arguments as L.
448              
449             # Listen on random port
450             my $id = Mojo::IOLoop->server({address => '127.0.0.1'} => sub ($loop, $stream, $id) {...});
451             my $port = Mojo::IOLoop->acceptor($id)->port;
452              
453             =head2 singleton
454              
455             my $loop = Mojo::IOLoop->singleton;
456              
457             The global L singleton, used to access a single shared event loop object from everywhere inside the
458             process.
459              
460             # Many methods also allow you to take shortcuts
461             Mojo::IOLoop->timer(2 => sub { Mojo::IOLoop->stop });
462             Mojo::IOLoop->start;
463              
464             # Restart active timer
465             my $id = Mojo::IOLoop->timer(3 => sub { say 'Timeout!' });
466             Mojo::IOLoop->singleton->reactor->again($id);
467              
468             # Turn file descriptor into handle and watch if it becomes readable
469             my $handle = IO::Handle->new_from_fd($fd, 'r');
470             Mojo::IOLoop->singleton->reactor->io($handle => sub ($reactor, $writable) {
471             say $writable ? 'Handle is writable' : 'Handle is readable';
472             })->watch($handle, 1, 0);
473              
474             =head2 start
475              
476             Mojo::IOLoop->start;
477             $loop->start;
478              
479             Start the event loop, this will block until L is called. Note that some reactors stop automatically if there
480             are no events being watched anymore.
481              
482             # Start event loop only if it is not running already
483             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
484              
485             =head2 stop
486              
487             Mojo::IOLoop->stop;
488             $loop->stop;
489              
490             Stop the event loop, this will not interrupt any existing connections and the event loop can be restarted by running
491             L again.
492              
493             =head2 stop_gracefully
494              
495             Mojo::IOLoop->stop_gracefully;
496             $loop->stop_gracefully;
497              
498             Stop accepting new connections and wait for already accepted connections to be closed, before stopping the event loop.
499              
500             =head2 stream
501              
502             my $stream = Mojo::IOLoop->stream($id);
503             my $stream = $loop->stream($id);
504             my $id = $loop->stream(Mojo::IOLoop::Stream->new);
505              
506             Get L object for id or turn object into a connection.
507              
508             # Increase inactivity timeout for connection to 300 seconds
509             Mojo::IOLoop->stream($id)->timeout(300);
510              
511             =head2 subprocess
512              
513             my $subprocess = Mojo::IOLoop->subprocess;
514             my $subprocess = $loop->subprocess;
515             my $subprocess = $loop->subprocess(sub ($subprocess) {...}, sub ($subprocess, $err, @results) {...});
516              
517             Build L object to perform computationally expensive operations in subprocesses, without
518             blocking the event loop. Callbacks will be passed along to L.
519              
520             # Operation that would block the event loop for 5 seconds
521             Mojo::IOLoop->subprocess->run_p(sub {
522             sleep 5;
523             return '♥', 'Mojolicious';
524             })->then(sub (@results) {
525             say "I $results[0] $results[1]!";
526             })->catch(sub ($err) {
527             say "Subprocess error: $err";
528             });
529              
530             =head2 timer
531              
532             my $id = Mojo::IOLoop->timer(3 => sub ($loop) {...});
533             my $id = $loop->timer(0 => sub ($loop) {...});
534             my $id = $loop->timer(0.25 => sub ($loop) {...});
535              
536             Create a new timer, invoking the callback after a given amount of time in seconds.
537              
538             # Perform operation in 5 seconds
539             Mojo::IOLoop->timer(5 => sub ($loop) {...});
540              
541             =head1 DEBUGGING
542              
543             You can set the C environment variable to get some advanced diagnostics information printed to
544             C.
545              
546             MOJO_IOLOOP_DEBUG=1
547              
548             =head1 SEE ALSO
549              
550             L, L, L.
551              
552             =cut