File Coverage

blib/lib/Mojo/IOLoop.pm
Criterion Covered Total %
statement 134 138 97.1
branch 40 44 90.9
condition 30 44 68.1
subroutine 40 42 95.2
pod 16 16 100.0
total 260 284 91.5


line stmt bran cond sub pod time code
1             package Mojo::IOLoop;
2 64     64   366807 use Mojo::Base 'Mojo::EventEmitter';
  64         142  
  64         464  
3              
4             # "Professor: Amy, technology isn't intrinsically good or evil. It's how it's
5             # used. Like the death ray."
6 64     64   484 use Carp qw(croak);
  64         158  
  64         3270  
7 64     64   30175 use Mojo::IOLoop::Client;
  64         282  
  64         497  
8 64     64   31401 use Mojo::IOLoop::Server;
  64         237  
  64         1037  
9 64     64   30841 use Mojo::IOLoop::Stream;
  64         236  
  64         776  
10 64     64   30640 use Mojo::IOLoop::Subprocess;
  64         239  
  64         677  
11 64     64   30492 use Mojo::Reactor::Poll;
  64         213  
  64         830  
12 64     64   480 use Mojo::Util qw(md5_sum steady_time);
  64         169  
  64         3466  
13 64     64   423 use Scalar::Util qw(blessed weaken);
  64         142  
  64         3872  
14              
15 64   50 64   428 use constant DEBUG => $ENV{MOJO_IOLOOP_DEBUG} || 0;
  64         174  
  64         188914  
16              
17             has max_accepts => 0;
18             has max_connections => 1000;
19             has reactor => sub {
20             my $class = Mojo::Reactor::Poll->detect;
21             warn "-- Reactor initialized ($class)\n" if DEBUG;
22             return $class->new->catch(sub { warn "@{[blessed $_[0]]}: $_[1]" });
23             };
24              
25             # Ignore PIPE signal
26             $SIG{PIPE} = 'IGNORE';
27              
28             # Initialize singleton reactor early
29             __PACKAGE__->singleton->reactor;
30              
31             sub acceptor {
32 309     309 1 5585 my ($self, $acceptor) = (_instance(shift), @_);
33              
34             # Find acceptor for id
35 309 100       1614 return $self->{acceptors}{$acceptor} unless ref $acceptor;
36              
37             # Connect acceptor with reactor
38 149         609 $self->{acceptors}{my $id = $self->_id} = $acceptor->reactor($self->reactor);
39              
40             # Allow new acceptor to get picked up
41 149         578 $self->_not_accepting->_maybe_accepting;
42              
43 149         654 return $id;
44             }
45              
46             sub client {
47 197     197 1 1237 my ($self, $cb) = (_instance(shift), pop);
48              
49 197         647 my $id = $self->_id;
50 197         964 my $client = $self->{out}{$id}{client} = Mojo::IOLoop::Client->new(reactor => $self->reactor);
51              
52 197         746 weaken $self;
53             $client->on(
54             connect => sub {
55 192     192   662 delete $self->{out}{$id}{client};
56 192         1599 my $stream = Mojo::IOLoop::Stream->new(pop);
57 192         999 $self->_stream($stream => $id);
58 192         1086 $self->$cb(undef, $stream);
59             }
60 197         1640 );
61 197     3   1342 $client->on(error => sub { $self->_remove($id); $self->$cb(pop, undef) });
  3         23  
  3         12  
62 197         1214 $client->connect(@_);
63              
64 197         1219 return $id;
65             }
66              
67 3291     3291 1 6701 sub is_running { _instance(shift)->reactor->is_running }
68              
69             sub next_tick {
70 1827     1827 1 11225 my ($self, $cb) = (_instance(shift), @_);
71 1827         5741 weaken $self;
72 1827     1827   4199 return $self->reactor->next_tick(sub { $self->$cb });
  1827         4861  
73             }
74              
75             sub one_tick {
76 1269     1269 1 4076 my $self = _instance(shift);
77 1269 100       3020 croak 'Mojo::IOLoop already running' if $self->is_running;
78 1268         3645 $self->reactor->one_tick;
79             }
80              
81 7     7 1 471 sub recurring { shift->_timer(recurring => @_) }
82              
83             sub remove {
84 446     446 1 1288 my ($self, $id) = (_instance(shift), @_);
85 446   100     2238 my $c = $self->{in}{$id} || $self->{out}{$id};
86 446 100 100     2039 if ($c && (my $stream = $c->{stream})) { return $stream->close_gracefully }
  286         1148  
87 160         573 $self->_remove($id);
88             }
89              
90             sub reset {
91 4   100 4 1 37 my ($self, $options) = (_instance(shift), shift // {});
92              
93 4         15 $self->emit('reset')->stop;
94 4 100       15 if ($options->{freeze}) {
95 1         7 state @frozen;
96 1         40 push @frozen, {%$self};
97 1         7 delete $self->{reactor};
98             }
99 3         8 else { $self->reactor->reset }
100              
101 4         34 delete @$self{qw(accepting acceptors events in out stop)};
102             }
103              
104             sub server {
105 149     149 1 5853 my ($self, $cb) = (_instance(shift), pop);
106              
107 149         1468 my $server = Mojo::IOLoop::Server->new;
108 149         737 weaken $self;
109             $server->on(
110             accept => sub {
111 193     193   1630 my $stream = Mojo::IOLoop::Stream->new(pop);
112 193         877 $self->$cb($stream, $self->_stream($stream, $self->_id, 1));
113              
114             # Enforce connection limit (randomize to improve load balancing)
115 193 100       942 if (my $max = $self->max_accepts) {
116 1   33     13 $self->{accepts} //= $max - int rand $max / 2;
117 1 50       7 $self->stop_gracefully if ($self->{accepts} -= 1) <= 0;
118             }
119              
120             # Stop accepting if connection limit has been reached
121 193 100       712 $self->_not_accepting if $self->_limit;
122             }
123 149         1448 );
124 149         905 $server->listen(@_);
125              
126 148         718 return $self->acceptor($server);
127             }
128              
129 1343     1343 1 22323 sub singleton { state $loop = shift->new }
130              
131             sub start {
132 1013     1013 1 7820 my $self = _instance(shift);
133 1013 100       2710 croak 'Mojo::IOLoop already running' if $self->is_running;
134 1010         3044 $self->reactor->start;
135             }
136              
137 1008     1008 1 3119 sub stop { _instance(shift)->reactor->stop }
138              
139             sub stop_gracefully {
140 3     3 1 22 my $self = _instance(shift)->_not_accepting;
141 3 100 66     23 ++$self->{stop} and !$self->emit('finish')->_in and $self->stop;
142             }
143              
144             sub stream {
145 7209     7209 1 16629 my ($self, $stream) = (_instance(shift), @_);
146 7209 100       15938 return $self->_stream($stream => $self->_id) if ref $stream;
147 7208   100     29339 my $c = $self->{in}{$stream} || $self->{out}{$stream} // {};
      100        
148 7208         37333 return $c->{stream};
149             }
150              
151             sub subprocess {
152 0     0 1 0 my $subprocess = Mojo::IOLoop::Subprocess->new(ioloop => _instance(shift));
153 0 0       0 return @_ ? $subprocess->run(@_) : $subprocess;
154             }
155              
156 53     53 1 463 sub timer { shift->_timer(timer => @_) }
157              
158             sub _id {
159 540     540   1011 my $self = shift;
160 540         868 my $id;
161 540   33     882 do { $id = md5_sum 'c' . steady_time . rand } while $self->{in}{$id} || $self->{out}{$id} || $self->{acceptors}{$id};
  540   33     1793  
162 540         14224 return $id;
163             }
164              
165 459   100 459   761 sub _in { scalar keys %{shift->{in} // {}} }
  459         2447  
166              
167 16785 100   16785   50557 sub _instance { ref $_[0] ? $_[0] : $_[0]->singleton }
168              
169 455 100   455   2035 sub _limit { $_[0]{stop} ? 1 : $_[0]->_in >= $_[0]->max_connections }
170              
171             sub _maybe_accepting {
172 601     601   1096 my $self = shift;
173 601 100 66     2246 return if $self->{accepting} || $self->_limit;
174 262   50     652 $_->start for values %{$self->{acceptors} // {}};
  262         1588  
175 262         1265 $self->{accepting} = 1;
176             }
177              
178             sub _not_accepting {
179 267     267   511 my $self = shift;
180 267 100       1041 return $self unless delete $self->{accepting};
181 203   50     465 $_->stop for values %{$self->{acceptors} // {}};
  203         1420  
182 203         652 return $self;
183             }
184              
185 0   0 0   0 sub _out { scalar keys %{shift->{out} // {}} }
  0         0  
186              
187             sub _remove {
188 502     502   1158 my ($self, $id) = @_;
189              
190             # Timer
191 502 50       1404 return undef unless my $reactor = $self->reactor;
192 502 100       1614 return undef if $reactor->remove($id);
193              
194             # Acceptor
195 477 100       2226 return $self->_not_accepting->_maybe_accepting if delete $self->{acceptors}{$id};
196              
197             # Connection
198 365 100 100     2551 return undef unless delete $self->{in}{$id} || delete $self->{out}{$id};
199 342 100 66     1327 return $self->stop if $self->{stop} && !$self->_in;
200 340         1529 $self->_maybe_accepting;
201 340         900 warn "-- $id <<< $$ (@{[$self->_in]}:@{[$self->_out]})\n" if DEBUG;
202             }
203              
204             sub _stream {
205 386     386   1210 my ($self, $stream, $id, $server) = @_;
206              
207             # Connect stream with reactor
208 386 100       1189 $self->{$server ? 'in' : 'out'}{$id}{stream} = $stream->reactor($self->reactor);
209 386         746 warn "-- $id >>> $$ (@{[$self->_in]}:@{[$self->_out]})\n" if DEBUG;
210 386         1089 weaken $self;
211 386 100   343   2481 $stream->on(close => sub { $self && $self->_remove($id) });
  343         2085  
212 386         1683 $stream->start;
213              
214 386         1649 return $id;
215             }
216              
217             sub _timer {
218 60     60   179 my ($self, $method, $after, $cb) = (_instance(shift), @_);
219 60         220 weaken $self;
220 60     24359   213 return $self->reactor->$method($after => sub { $self->$cb });
  24359         40114  
221             }
222              
223             1;
224              
225             =encoding utf8
226              
227             =head1 NAME
228              
229             Mojo::IOLoop - Minimalistic event loop
230              
231             =head1 SYNOPSIS
232              
233             use Mojo::IOLoop;
234              
235             # Listen on port 3000
236             Mojo::IOLoop->server({port => 3000} => sub ($loop, $stream, $id) {
237             $stream->on(read => sub ($stream, $bytes) {
238             # Process input chunk
239             say $bytes;
240              
241             # Write response
242             $stream->write('HTTP/1.1 200 OK');
243             });
244             });
245              
246             # Connect to port 3000
247             my $id = Mojo::IOLoop->client({port => 3000} => sub ($loop, $err, $stream) {
248             $stream->on(read => sub ($stream, $bytes) {
249             # Process input
250             say "Input: $bytes";
251             });
252              
253             # Write request
254             $stream->write("GET / HTTP/1.1\x0d\x0a\x0d\x0a");
255             });
256              
257             # Add a timer
258             Mojo::IOLoop->timer(5 => sub ($loop) { $loop->remove($id) });
259              
260             # Start event loop if necessary
261             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
262              
263             =head1 DESCRIPTION
264              
265             L is a very minimalistic event loop based on L, it has been reduced to the absolute
266             minimal feature set required to build solid and scalable non-blocking clients and servers.
267              
268             Depending on operating system, the default per-process and system-wide file descriptor limits are often very low and
269             need to be tuned for better scalability. The C environment variable should also be used to select the best
270             possible L backend, which usually defaults to the not very scalable C
271              
272             LIBEV_FLAGS=1 # select
273             LIBEV_FLAGS=2 # poll
274             LIBEV_FLAGS=4 # epoll (Linux)
275             LIBEV_FLAGS=8 # kqueue (*BSD, OS X)
276             LIBEV_FLAGS=64 # Linux AIO
277              
278             The event loop will be resilient to time jumps if a monotonic clock is available through L. A TLS
279             certificate and key are also built right in, to make writing test servers as easy as possible. Also note that for
280             convenience the C signal will be set to C when L is loaded.
281              
282             For better scalability (epoll, kqueue) and to provide non-blocking name resolution, SOCKS5 as well as TLS support, the
283             optional modules L (4.32+), L (0.15+), L (0.64+) and L
284             (2.009+) will be used automatically if possible. Individual features can also be disabled with the C,
285             C and C environment variables.
286              
287             See L for more.
288              
289             =head1 EVENTS
290              
291             L inherits all events from L and can emit the following new ones.
292              
293             =head2 finish
294              
295             $loop->on(finish => sub ($loop) {...});
296              
297             Emitted when the event loop wants to shut down gracefully and is just waiting for all existing connections to be
298             closed.
299              
300             =head2 reset
301              
302             $loop->on(reset => sub ($loop) {...});
303              
304             Emitted when the event loop is reset, this usually happens after the process is forked to clean up resources that
305             cannot be shared.
306              
307             =head1 ATTRIBUTES
308              
309             L implements the following attributes.
310              
311             =head2 max_accepts
312              
313             my $max = $loop->max_accepts;
314             $loop = $loop->max_accepts(1000);
315              
316             The maximum number of connections this event loop is allowed to accept, before shutting down gracefully without
317             interrupting existing connections, defaults to C<0>. Setting the value to C<0> will allow this event loop to accept new
318             connections indefinitely. Note that up to half of this value can be subtracted randomly to improve load balancing
319             between multiple server processes, and to make sure that not all of them restart at the same time.
320              
321             =head2 max_connections
322              
323             my $max = $loop->max_connections;
324             $loop = $loop->max_connections(100);
325              
326             The maximum number of accepted connections this event loop is allowed to handle concurrently, before stopping to accept
327             new incoming connections, defaults to C<1000>.
328              
329             =head2 reactor
330              
331             my $reactor = $loop->reactor;
332             $loop = $loop->reactor(Mojo::Reactor->new);
333              
334             Low-level event reactor, usually a L or L object with a default subscriber to
335             the event L.
336              
337             # Watch if handle becomes readable or writable
338             Mojo::IOLoop->singleton->reactor->io($handle => sub ($reactor, $writable) {
339             say $writable ? 'Handle is writable' : 'Handle is readable';
340             });
341              
342             # Change to watching only if handle becomes writable
343             Mojo::IOLoop->singleton->reactor->watch($handle, 0, 1);
344              
345             # Remove handle again
346             Mojo::IOLoop->singleton->reactor->remove($handle);
347              
348             =head1 METHODS
349              
350             L inherits all methods from L and implements the following new ones.
351              
352             =head2 acceptor
353              
354             my $server = Mojo::IOLoop->acceptor($id);
355             my $server = $loop->acceptor($id);
356             my $id = $loop->acceptor(Mojo::IOLoop::Server->new);
357              
358             Get L object for id or turn object into an acceptor.
359              
360             =head2 client
361              
362             my $id = Mojo::IOLoop->client(address => '127.0.0.1', port => 3000, sub {...});
363             my $id = $loop->client(address => '127.0.0.1', port => 3000, sub {...});
364             my $id = $loop->client({address => '127.0.0.1', port => 3000} => sub {...});
365              
366             Open a TCP/IP or UNIX domain socket connection with L and create a stream object (usually
367             L), takes the same arguments as L.
368              
369             =head2 is_running
370              
371             my $bool = Mojo::IOLoop->is_running;
372             my $bool = $loop->is_running;
373              
374             Check if event loop is running.
375              
376             =head2 next_tick
377              
378             my $undef = Mojo::IOLoop->next_tick(sub ($loop) {...});
379             my $undef = $loop->next_tick(sub ($loop) {...});
380              
381             Execute callback as soon as possible, but not before returning or other callbacks that have been registered with this
382             method, always returns C.
383              
384             # Perform operation on next reactor tick
385             Mojo::IOLoop->next_tick(sub ($loop) {...});
386              
387             =head2 one_tick
388              
389             Mojo::IOLoop->one_tick;
390             $loop->one_tick;
391              
392             Run event loop until an event occurs.
393              
394             # Don't block longer than 0.5 seconds
395             my $id = Mojo::IOLoop->timer(0.5 => sub ($loop) {});
396             Mojo::IOLoop->one_tick;
397             Mojo::IOLoop->remove($id);
398              
399             =head2 recurring
400              
401             my $id = Mojo::IOLoop->recurring(3 => sub ($loop) {...});
402             my $id = $loop->recurring(0 => sub ($loop) {...});
403             my $id = $loop->recurring(0.25 => sub ($loop) {...});
404              
405             Create a new recurring timer, invoking the callback repeatedly after a given amount of time in seconds.
406              
407             # Perform operation every 5 seconds
408             Mojo::IOLoop->recurring(5 => sub ($loop) {...});
409              
410             =head2 remove
411              
412             Mojo::IOLoop->remove($id);
413             $loop->remove($id);
414              
415             Remove anything with an id, connections will be dropped gracefully by allowing them to finish writing all data in their
416             write buffers.
417              
418             =head2 reset
419              
420             Mojo::IOLoop->reset;
421             $loop->reset;
422             $loop->reset({freeze => 1});
423              
424             Remove everything and stop the event loop.
425              
426             These options are currently available:
427              
428             =over 2
429              
430             =item freeze
431              
432             freeze => 1
433              
434             Freeze the current state of the event loop in time before resetting it. This will prevent active connections from
435             getting closed immediately, which can help with many unintended side effects when processes are forked. Note that this
436             option is B and might change without warning!
437              
438             =back
439              
440             =head2 server
441              
442             my $id = Mojo::IOLoop->server(port => 3000, sub {...});
443             my $id = $loop->server(port => 3000, sub {...});
444             my $id = $loop->server({port => 3000} => sub {...});
445              
446             Accept TCP/IP and UNIX domain socket connections with L and create stream objects (usually
447             L, takes the same arguments as L.
448              
449             # Listen on random port
450             my $id = Mojo::IOLoop->server({address => '127.0.0.1'} => sub ($loop, $stream, $id) {...});
451             my $port = Mojo::IOLoop->acceptor($id)->port;
452              
453             =head2 singleton
454              
455             my $loop = Mojo::IOLoop->singleton;
456              
457             The global L singleton, used to access a single shared event loop object from everywhere inside the
458             process.
459              
460             # Many methods also allow you to take shortcuts
461             Mojo::IOLoop->timer(2 => sub { Mojo::IOLoop->stop });
462             Mojo::IOLoop->start;
463              
464             # Restart active timer
465             my $id = Mojo::IOLoop->timer(3 => sub { say 'Timeout!' });
466             Mojo::IOLoop->singleton->reactor->again($id);
467              
468             # Turn file descriptor into handle and watch if it becomes readable
469             my $handle = IO::Handle->new_from_fd($fd, 'r');
470             Mojo::IOLoop->singleton->reactor->io($handle => sub ($reactor, $writable) {
471             say $writable ? 'Handle is writable' : 'Handle is readable';
472             })->watch($handle, 1, 0);
473              
474             =head2 start
475              
476             Mojo::IOLoop->start;
477             $loop->start;
478              
479             Start the event loop, this will block until L is called. Note that some reactors stop automatically if there
480             are no events being watched anymore.
481              
482             # Start event loop only if it is not running already
483             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
484              
485             =head2 stop
486              
487             Mojo::IOLoop->stop;
488             $loop->stop;
489              
490             Stop the event loop, this will not interrupt any existing connections and the event loop can be restarted by running
491             L again.
492              
493             =head2 stop_gracefully
494              
495             Mojo::IOLoop->stop_gracefully;
496             $loop->stop_gracefully;
497              
498             Stop accepting new connections and wait for already accepted connections to be closed, before stopping the event loop.
499              
500             =head2 stream
501              
502             my $stream = Mojo::IOLoop->stream($id);
503             my $stream = $loop->stream($id);
504             my $id = $loop->stream(Mojo::IOLoop::Stream->new);
505              
506             Get L object for id or turn object into a connection.
507              
508             # Increase inactivity timeout for connection to 300 seconds
509             Mojo::IOLoop->stream($id)->timeout(300);
510              
511             =head2 subprocess
512              
513             my $subprocess = Mojo::IOLoop->subprocess;
514             my $subprocess = $loop->subprocess;
515             my $subprocess = $loop->subprocess(sub ($subprocess) {...}, sub ($subprocess, $err, @results) {...});
516              
517             Build L object to perform computationally expensive operations in subprocesses, without
518             blocking the event loop. Callbacks will be passed along to L.
519              
520             # Operation that would block the event loop for 5 seconds
521             Mojo::IOLoop->subprocess->run_p(sub {
522             sleep 5;
523             return '♥', 'Mojolicious';
524             })->then(sub (@results) {
525             say "I $results[0] $results[1]!";
526             })->catch(sub ($err) {
527             say "Subprocess error: $err";
528             });
529              
530             =head2 timer
531              
532             my $id = Mojo::IOLoop->timer(3 => sub ($loop) {...});
533             my $id = $loop->timer(0 => sub ($loop) {...});
534             my $id = $loop->timer(0.25 => sub ($loop) {...});
535              
536             Create a new timer, invoking the callback after a given amount of time in seconds.
537              
538             # Perform operation in 5 seconds
539             Mojo::IOLoop->timer(5 => sub ($loop) {...});
540              
541             =head1 DEBUGGING
542              
543             You can set the C environment variable to get some advanced diagnostics information printed to
544             C.
545              
546             MOJO_IOLOOP_DEBUG=1
547              
548             =head1 SEE ALSO
549              
550             L, L, L.
551              
552             =cut