File Coverage

blib/lib/Net/Server/ZMQ.pm
Criterion Covered Total %
statement 19 21 90.4
branch n/a
condition n/a
subroutine 7 7 100.0
pod n/a
total 26 28 92.8


line stmt bran cond sub pod time code
1             package Net::Server::ZMQ;
2              
3             # ABSTRACT: Preforking ZeroMQ job server
4              
5 1     1   15080 use warnings;
  1         2  
  1         31  
6 1     1   4 use strict;
  1         1  
  1         30  
7 1     1   4 use base 'Net::Server::PreFork';
  1         4  
  1         548  
8              
9 1     1   52516 use Carp;
  1         2  
  1         77  
10 1     1   4 use POSIX qw/WNOHANG/;
  1         1  
  1         8  
11 1     1   54 use Net::Server::SIG qw/register_sig check_sigs/;
  1         2  
  1         36  
12 1     1   243 use ZMQ::FFI;
  0            
  0            
13             use ZMQ::FFI::Constants qw/ZMQ_ROUTER ZMQ_DEALER/;
14              
15             our $VERSION = "0.001001";
16             $VERSION = eval $VERSION;
17              
18             =head1 NAME
19              
20             Net::Server::ZMQ - Preforking ZeroMQ job server
21              
22             =head1 SYNOPSIS
23              
24             use Net::Server::ZMQ;
25              
26             Net::Server::ZMQ->run(
27             port => [6660, 6661], # [frontend port, backend port]
28             min_servers => 5,
29             max_servers => 10,
30             app => sub { # this is your worker code
31             my $payload = shift;
32              
33             return uc($payload);
34             }
35             );
36              
37             =head1 DESCRIPTION
38              
39             C is a L personality based on L,
40             providing an easy way of creating a preforking ZeroMQ job server. It uses L
41             for ZeroMQ integration, independent of the installed C version. You will need
42             to have C installed.
43              
44             Currently, this personality implements the load balancing "simple pirate" pattern
45             described in the L. The server creates
46             a C-to-C broker in the parent process, and one or more child processes
47             as C workers. Multiple C clients can send requests to those workers through
48             the broker, which operates in a non-blocking way and balances requests across the workers.
49              
50             The created topology looks like this:
51              
52             +--------+ +--------+ +--------+
53             | CLIENT | | CLIENT | | CLIENT |
54             +--------+ +--------+ +--------+
55             | REQ | | REQ | | REQ |
56             +---+----+ +---+----+ +---+----+
57             | | |
58             |______________|______________|
59             |
60             |
61             +---+----+
62             | ROUTER |
63             +--------+
64             | BROKER |
65             +--------+
66             | ROUTER |
67             +---+----|
68             |
69             _____________|_____________
70             | | |
71             | | |
72             +----+---+ +----+---+ +----+---+
73             | DEALER | | DEALER | | DEALER |
74             +--------+ +--------+ +--------+
75             | WORKER | | WORKER | | WORKER |
76             +--------+ +--------+ +--------+
77              
78             You get the full benefits of C, including the ability to increase
79             or decrease the number of workers at real-time by sending the C and C signals
80             to the server, respectively.
81              
82             This is an early release, do not rely on it on production systems without thoroughly testing
83             it beforehand.
84              
85             I plan to implement better reliability as described in the ZeroMQ guide in future versions,
86             and also add support for different patterns such as publish-subscribe.
87              
88             The ZMQ server does not care about the format of messages passed between clients and workers,
89             this kind of logic is left to the applications. You can easily implement a JSON-based job broker,
90             for example, either by taking care of encoding/decoding in the worker code, or by extending this
91             class and overriding C.
92              
93             Note that configuration of a ZMQ server requires two ports, one for the frontend (the port to
94             which clients connect), and one for the backend (the port to which workers connect).
95              
96             =head2 INTERNAL NOTES
97              
98             ZeroMQ has some different concepts regarding sockets, and as such this class overrides
99             the bindings done by C so they do nothing (C, C and
100             C are emptied). Also, since ZeroMQ never exposes client information to request
101             handlers, it is possible for C to provide workers with data such as the
102             IP address of the client, and the C method is empties as well. Supplying client
103             information should therefore be done applicatively. The C method is also
104             overridden to always return true, for the same reason, though I'm not so certain yet
105             whether a better solution can be implemented.
106              
107             Unfortunately, I did have to override quite a few methods I really didn't want to, such
108             as C, C, C and C, mostly to get
109             rid of any traditional socket communication between the child and parent processes and
110             replace it was ZeroMQ communication.
111              
112             =head2 CLIENT IMPLEMENTATION
113              
114             Clients should be implemented according to the L
115             in the ZeroMQ guide. Clients I define a unique identity on their sockets when communicating
116             with the broker, otherwise the broker will not be able to direct responses from the workers back
117             to the correct client.
118              
119             A client implementation, L, is provided with this distribution to get up and running as
120             quickly as possible.
121              
122             =head1 OVERRIDDEN METHODS
123              
124             =head2 pre_bind()
125              
126             =head2 bind()
127              
128             =head2 post_bind()
129              
130             Emptied out
131              
132             =cut
133              
134             sub pre_bind { }
135              
136             sub bind { }
137              
138             sub post_bind { }
139              
140             =head2 options()
141              
142             Adds the custom C option to C. It takes the subroutine reference
143             that handles requests, i.e. the worker subroutine.
144              
145             =cut
146              
147             sub options {
148             my $self = shift;
149             my $ref = $self->SUPER::options(@_);
150             my $prop = $self->{server};
151              
152             $ref->{app} = \$prop->{app};
153              
154             return $ref;
155             }
156              
157             =head2 post_configure()
158              
159             Validates the C option and provides a useless default (a worker
160             subroutine that simply echos back what the client sends). Validates
161             the C option, and sets default values for C and C.
162              
163             =cut
164              
165             sub post_configure {
166             my $self = shift;
167             my $prop = $self->{server};
168              
169             $self->SUPER::post_configure;
170              
171             $prop->{app} = sub { $_[0] }
172             unless defined $prop->{app};
173              
174             $prop->{user} ||= $>;
175             $prop->{group} ||= $);
176              
177             confess "app must be a subroutine reference"
178             unless ref $prop->{app} && ref $prop->{app} eq 'CODE';
179              
180             confess "port must contain a frontend port and a backend port"
181             unless ref $prop->{port} && ref $prop->{port} eq 'ARRAY' && scalar @{$prop->{port}} >= 2;
182             }
183              
184             =head2 loop()
185              
186             Overrides the main loop subroutine to remove pipe creation.
187              
188             =cut
189              
190             sub loop {
191             my $self = shift;
192             my $prop = $self->{server};
193              
194             # get ready for children
195             $prop->{children} = {};
196             $prop->{reaped_children} = {};
197             if ($ENV{HUP_CHILDREN}) {
198             foreach my $line (split /\n/, $ENV{HUP_CHILDREN}) {
199             my ($pid, $status) = ($line =~ /^(\d+)\t(\w+)$/) ? ($1, $2) : next;
200             $prop->{children}->{$pid} = { status => $status, hup => 1 };
201             }
202             }
203              
204             $prop->{tally} = {
205             time => time(),
206             waiting => scalar(grep { $_->{status} eq 'waiting' } values %{$prop->{children}}),
207             processing => scalar(grep { $_->{status} eq 'processing' } values %{$prop->{children}}),
208             dequeue => scalar(grep { $_->{status} eq 'dequeue' } values %{$prop->{children}})
209             };
210              
211             $self->log(3, "Beginning prefork ($prop->{min_servers} processes)");
212             $self->run_n_children($prop->{min_servers});
213             $self->run_parent;
214             }
215              
216             =head2 run_parent()
217              
218             Creates the broker process, binding a C on the frontend port
219             (facing clients), and C on the backend port (facing workers).
220              
221             It then starts polling on both sockets for events and passes messages
222             between clients and workers.
223              
224             The parent process will receive the proctitle "zmq broker -",
225             where " is the frontend port and "" is the backend port.
226              
227             =cut
228              
229             sub run_parent {
230             my $self = shift;
231             my $prop = $self->{server};
232            
233             @{ $prop }{qw(last_checked_for_dead last_checked_for_waiting last_checked_for_dequeue last_process last_kill)} = (time) x 5;
234              
235             register_sig(
236             PIPE => 'IGNORE',
237             INT => sub { $self->server_close },
238             TERM => sub { $self->server_close },
239             HUP => sub { $self->sig_hup },
240             CHLD => sub {
241             while (defined(my $chld = waitpid(-1, WNOHANG))) {
242             last unless $chld > 0;
243             $self->{reaped_children}->{$chld} = 1;
244             }
245             },
246             QUIT => sub { $self->{server}->{kind_quit} = 1; $self->server_close() },
247             TTIN => sub { $self->{server}->{$_}++ for qw(min_servers max_servers); $self->log(3, "Increasing server count ($self->{server}->{max_servers})") },
248             TTOU => sub { $self->{server}->{$_}-- for qw(min_servers max_servers); $self->log(3, "Decreasing server count ($self->{server}->{max_servers})") },
249             );
250              
251             $self->register_sig_pass;
252              
253             if ($ENV{HUP_CHILDREN}) {
254             while (defined(my $chld = waitpid(-1, WNOHANG))) {
255             last unless $chld > 0;
256             $self->{reaped_children}->{$chld} = 1;
257             }
258             }
259              
260             my $fport = $prop->{port}->[0];
261             my $bport = $prop->{port}->[1];
262              
263             $0 = "zmq broker $fport-$bport";
264              
265             my $ctx = ZMQ::FFI->new;
266              
267             my $f = $ctx->socket(ZMQ_ROUTER);
268             $f->set_linger(0);
269             $f->bind('tcp://*:'.$fport);
270              
271             my $b = $ctx->socket(ZMQ_ROUTER);
272             $b->set_linger(0);
273             $b->bind('tcp://*:'.$bport);
274              
275             my (@workers, $w_addr, $delim, $c_addr, $data);
276             while (1) {
277             check_sigs();
278              
279             $self->idle_loop_hook;
280              
281             # poll on the frontend or the backend, but only poll
282             # on the frontend if there are workers
283             if (scalar @workers && $f->has_pollin) {
284             my @msg = $f->recv_multipart;
285             $b->send_multipart([ pop(@workers), '', $msg[0], '', $msg[2] ]);
286             } elsif ($b->has_pollin) {
287             my @msg = $b->recv_multipart;
288              
289             $w_addr = $msg[0];
290             $c_addr = $msg[2];
291              
292             next unless defined $c_addr;
293              
294             if ($c_addr =~ m/^(?:waiting|processing|dequeue|exiting)$/) {
295             my ($pid) = ($w_addr =~ m/^child_(\d+)$/);
296             my $status = $c_addr;
297              
298             last if $self->parent_read_hook($c_addr);
299              
300             push(@workers, $w_addr)
301             if $status eq 'waiting';
302              
303             $self->log(3, "$w_addr status $status");
304              
305             if (my $child = $prop->{children}->{$pid}) {
306             if ($status eq 'exiting') {
307             $self->delete_child($pid);
308             } else {
309             # Decrement tally of state pid was in (plus sanity check)
310             my $old_status = $child->{status}
311             || $self->log(2, "No status for $pid when changing to $status");
312              
313             --$prop->{tally}->{$old_status} >= 0
314             || $self->log(2, "Tally for $status < 0 changing pid $pid from $old_status to $status");
315              
316             $child->{status} = $status;
317             ++$prop->{tally}->{$status};
318              
319             $prop->{last_process} = time()
320             if $status eq 'processing';
321             }
322             }
323             } else {
324             last if $self->parent_read_hook($msg[4]);
325              
326             $self->log(4, "$w_addr sending to $c_addr: $msg[4]");
327             $f->send_multipart([ $c_addr, '', $msg[4] ]);
328             }
329              
330             $self->coordinate_children();
331             }
332             }
333             }
334              
335             =head2 run_n_children( $n )
336              
337             The same as in C, with all socket communication
338             code removed.
339              
340             =cut
341              
342             sub run_n_children {
343             my ($self, $n) = @_;
344             my $prop = $self->{server};
345              
346             return unless $n > 0;
347              
348             $self->run_n_children_hook($n);
349              
350             $self->log(3, "Starting \"$n\" children");
351             $prop->{last_start} = time();
352              
353             for (1 .. $n) {
354             $self->pre_fork_hook;
355              
356             local $!;
357              
358             my $pid = fork;
359             if (!defined $pid) {
360             $self->fatal("Bad fork [$!]");
361             }
362              
363             if ($pid) { # parent
364             $prop->{children}->{$pid}->{status} = 'waiting';
365             $prop->{tally}->{waiting}++;
366             } else { # child
367             $self->run_child;
368             }
369             }
370             }
371              
372             =head2 run_child()
373              
374             Creates a C socket between workers and server. Every child
375             process with get a proctitle of "zmq worker ", where ""
376             is the backend port.
377              
378             The child then signals the server that it is ready, and waits for requests.
379              
380             =cut
381              
382             sub run_child {
383             my $self = shift;
384             my $prop = $self->{server};
385              
386             $SIG{'INT'} = $SIG{'TERM'} = $SIG{'QUIT'} = sub {
387             $self->child_finish_hook;
388             exit;
389             };
390             $SIG{'PIPE'} = 'IGNORE';
391             $SIG{'CHLD'} = 'DEFAULT';
392             $SIG{'HUP'} = sub {
393             if (! $prop->{'connected'}) {
394             $self->child_finish_hook;
395             exit;
396             }
397             $prop->{'SigHUPed'} = 1;
398             };
399              
400             $self->log(4, "Child Preforked ($$)");
401              
402             delete @{ $prop }{qw(children tally last_start last_process)};
403              
404             $self->child_init_hook;
405              
406             my $port = $prop->{port}->[1];
407              
408             $0 = "zmq worker $port";
409              
410             my $ctx = ZMQ::FFI->new;
411             my $s = $ctx->socket(ZMQ_DEALER);
412             $s->set_identity("child_$$");
413             $s->set_linger(0);
414             $s->connect("tcp://localhost:$port");
415              
416             $prop->{sock} = [$s];
417             $prop->{context} = $ctx;
418              
419             $s->send_multipart([ '', 'waiting' ]);
420              
421             while ($self->accept) {
422             $prop->{connected} = 1;
423              
424             $s->send_multipart([ '', 'processing' ]);
425              
426             my $ok = eval { $self->run_client_connection; 1 };
427             if (! $ok) {
428             $s->send_multipart([ '', 'exiting' ]);
429             die $@;
430             }
431              
432             last if $self->done;
433              
434             $prop->{connected} = 0;
435              
436             $s->send_multipart([ '', 'waiting' ]);
437             }
438              
439             $self->child_finish_hook;
440              
441             $s->send_multipart([ '', 'exiting' ]);
442             exit;
443             }
444              
445             =head2 accept()
446              
447             Waits for new messages from clients. When a message is received, it
448             is stored as the "payload" attribute, with the socket stored as the
449             "client" attribute.
450              
451             =cut
452              
453             sub accept {
454             my $self = shift;
455             my $prop = $self->{server};
456              
457             my $sock = $prop->{sock}->[0];
458              
459             $self->fatal("Received a bad sock!")
460             unless defined $sock;
461              
462             while (1) {
463             next unless $sock->has_pollin;
464              
465             my @msg = $sock->recv_multipart;
466              
467             $self->log(4, $sock->get_identity." got: $msg[3]");
468              
469             $prop->{client} = $sock;
470             $prop->{peername} = $msg[1];
471             $prop->{payload} = $msg[3];
472              
473             return 1;
474             }
475             }
476              
477             =head2 post_accept()
478              
479             =head2 get_client_info()
480              
481             Emptied out
482              
483             =cut
484              
485             sub post_accept { }
486              
487             sub get_client_info { }
488              
489             =head2 allow_deny()
490              
491             Simply returns a true value
492              
493             =cut
494              
495             sub allow_deny { 1 }
496              
497             =head2 process_request()
498              
499             Calls the C (i.e. worker subroutine) with the payload from the
500             client, and sends the result back to the client.
501              
502             =cut
503              
504             sub process_request {
505             my $self = shift;
506             my $prop = $self->{server};
507              
508             $prop->{client}->send_multipart([
509             '',
510             $prop->{peername},
511             '',
512             $prop->{app}->($prop->{payload})
513             ]);
514             }
515              
516             =head2 post_process_request()
517              
518             Removes the C attribute (holding the C socket) at the end
519             of the request.
520              
521             =cut
522              
523             sub post_process_request { delete $_[0]->{server}->{client} }
524              
525             =head2 sig_hup()
526              
527             Overridden to simply send C to the children (to restart them),
528             and that's it
529              
530             =cut
531              
532             sub sig_hup {
533             my $self = shift;
534             $self->log(2, "Received a SIG HUP");
535             $self->hup_children;
536             }
537              
538             =head2 shutdown_sockets()
539              
540             Closes the ZeroMQ sockets
541              
542             =cut
543              
544             sub shutdown_sockets {
545             my $self = shift;
546             my $prop = $self->{server};
547              
548             foreach (@{$prop->{sock}}) {
549             $_->close;
550             }
551              
552             $prop->{sock} = [];
553             }
554              
555             =head2 child_finish_hook()
556              
557             Closes the children's socket and destroys the context (this is
558             necessary, otherwise we'll have zombies).
559              
560             =cut
561              
562             sub child_finish_hook {
563             my $self = shift;
564             my $prop = $self->{server};
565              
566             eval {
567             $prop->{sock}->[0]->close;
568             $prop->{context}->destroy;
569             };
570             }
571              
572             =head2 delete_child( $pid )
573              
574             Overridden to remove dealing with sockets.
575              
576             =cut
577              
578             sub delete_child {
579             my ($self, $pid) = @_;
580             my $prop = $self->{server};
581              
582             my $child = $prop->{children}->{$pid};
583             if (! $child) {
584             $self->log(2, "Attempt to delete already deleted child $pid");
585             return;
586             }
587              
588             return if ! exists $prop->{children}->{$pid}; # Already gone?
589              
590             my $status = $child->{'status'} || $self->log(2, "No status for $pid when deleting child");
591             --$prop->{'tally'}->{$status} >= 0 || $self->log(2, "Tally for $status < 0 deleting pid $pid");
592             $prop->{'tally'}->{'time'} = 0 if $child->{'hup'};
593              
594             delete $prop->{'children'}->{$pid};
595             }
596              
597             =head1 CONFIGURATION AND ENVIRONMENT
598            
599             Read L for more information about configuration.
600              
601             =head1 DEPENDENCIES
602              
603             C depends on the following CPAN modules:
604              
605             =over
606              
607             =item * L
608              
609             =item * L
610              
611             =item * L
612              
613             =item * L
614              
615             =item * L
616              
617             =back
618              
619             =head1 BUGS AND LIMITATIONS
620              
621             Please report any bugs or feature requests to
622             C, or through the web interface at
623             L.
624              
625             =head1 SUPPORT
626              
627             You can find documentation for this module with the perldoc command.
628              
629             perldoc Net::Server::ZMQ
630              
631             You can also look for information at:
632              
633             =over 4
634            
635             =item * RT: CPAN's request tracker
636            
637             L
638            
639             =item * AnnoCPAN: Annotated CPAN documentation
640            
641             L
642            
643             =item * CPAN Ratings
644            
645             L
646            
647             =item * Search CPAN
648            
649             L
650            
651             =back
652            
653             =head1 AUTHOR
654            
655             Ido Perlmuter
656              
657             =head1 ACKNOWLEDGMENTS
658              
659             In writing this module I relied heavily on L by Tatsuhiko Miyagawa, and
660             on code and information from the official L.
661            
662             =head1 LICENSE AND COPYRIGHT
663            
664             Copyright (c) 2015, Ido Perlmuter C<< ido@ido50.net >>.
665              
666             This module is free software; you can redistribute it and/or
667             modify it under the same terms as Perl itself, either version
668             5.8.1 or any later version. See L
669             and L.
670            
671             The full text of the license can be found in the
672             LICENSE file included with this module.
673            
674             =head1 DISCLAIMER OF WARRANTY
675            
676             BECAUSE THIS SOFTWARE IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY
677             FOR THE SOFTWARE, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN
678             OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES
679             PROVIDE THE SOFTWARE "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER
680             EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
681             WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE
682             ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE SOFTWARE IS WITH
683             YOU. SHOULD THE SOFTWARE PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL
684             NECESSARY SERVICING, REPAIR, OR CORRECTION.
685            
686             IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING
687             WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR
688             REDISTRIBUTE THE SOFTWARE AS PERMITTED BY THE ABOVE LICENCE, BE
689             LIABLE TO YOU FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL,
690             OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE
691             THE SOFTWARE (INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING
692             RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A
693             FAILURE OF THE SOFTWARE TO OPERATE WITH ANY OTHER SOFTWARE), EVEN IF
694             SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF
695             SUCH DAMAGES.
696              
697             =cut
698              
699             1;
700             __END__