File Coverage

blib/lib/POE/Component/PreforkDispatch.pm
Criterion Covered Total %
statement 107 219 48.8
branch 22 58 37.9
condition 9 33 27.2
subroutine 17 35 48.5
pod 1 16 6.2
total 156 361 43.2


line stmt bran cond sub pod time code
1             package POE::Component::PreforkDispatch;
2              
3             our $VERSION = 0.101;
4              
5             =cut
6              
7             =head1 NAME
8              
9             POE::Component::PreforkDispatch - Preforking task dispatcher
10              
11             =head1 DESCRIPTION
12              
13             Applications that require lots of asynchronous tasks going at once may suffer a performance hit from repeating the fork/die process over and over again with each enqueued job. Similar to how Apache forks, this dispatcher will maintain a pool of available forks and a queue of pending tasks. Each task (request) will be handled in turn, and will return to the callback when done.
14              
15             =head1 SYNOPSIS
16              
17             use POE qw(Component::PreforkDispatch);
18              
19             POE::Session->create(
20             inline_states => {
21             _start => \&start,
22             do_slow_task => \&task,
23             do_slow_task_cb => \&task_cb,
24             },
25             );
26              
27             $poe_kernel->run();
28              
29             sub start {
30             POE::Component::PreforkDispatch->create(
31             max_forks => 4,
32             pre_fork => 2,
33             );
34             foreach (1..5) {
35             print "Enqueued request $_\n";
36             $poe_kernel->post(PreforkDispatch => 'new_request', {
37             method => 'do_slow_task',
38             upon_result => 'do_slow_task_cb',
39             params => [ 'a value', $_, ],
40             });
41             }
42             }
43              
44             sub task {
45             my ($kernel, $heap, $from, $param1, $param2) = @_[KERNEL, HEAP, ARG0 .. $#_];
46              
47             # ... do something slow
48             print STDERR "Task running with '$param1', '$param2'\n";
49             sleep 10;
50              
51             # Return hashref or arrayref
52             return { success => 1 };
53             }
54              
55             sub task_cb {
56             my ($kernel, $heap, $request, $result) = @_[KERNEL, HEAP, ARG0, ARG1];
57              
58             print STDERR "Task with param ".$request->{params}[1]." returned "
59             .($result->{success} ? 'successful' : 'failure')."\n";
60             }
61              
62             =cut
63              
64 1     1   186229 use strict;
  1         3  
  1         33  
65 1     1   4 use warnings;
  1         2  
  1         28  
66 1     1   5 use POE qw/Wheel::Run Filter::Reference/;
  1         7  
  1         8  
67 1     1   43052 use IO::Capture::Stdout;
  1         3315  
  1         29  
68 1     1   872 use Error qw(:try);
  1         6935  
  1         6  
69 1     1   3884 use Data::Dumper;
  1         7621  
  1         82  
70 1     1   1447 use Params::Validate;
  1         24728  
  1         4285  
71              
72              
73             ### Class Methods ##
74              
75             =cut
76              
77             =head1 USAGE
78              
79             =head2 Methods
80              
81             =head3 Constructor
82              
83             Call ->create() like with any other C, passing a list of named parameters.
84              
85             =over 4
86              
87             =item * methods => \%methods
88              
89             =item * classes => \@classes
90              
91             =item * xmlrpc_server_parent => $session_name
92              
93             Provide an optional means of finding a method to dispatch a request to. If none are provided, the request itself needs to indicate it's method.
94              
95             methods => {
96             'do_something' => \&do_something,
97             'do_else' => 'do_else_state',
98             },
99              
100             Methods will be searched for by name and will call either the state or the subroutine. See below for how either is called.
101              
102             classes => [ 'My::Class' ],
103              
104             Methods will be attempted in each namespace provided, and called as subroutines.
105              
106             xmlrpc_server_parent => 'XMLRPC_Session_Alias',
107              
108             Requests will be wrapped in a pseudo-transaction capable of being passed onto a L session for handling.
109              
110             =item * upon_result => $subref || $state_name
111              
112             If provided, used as a fallback result function to send completed requests to.
113              
114             =item * max_forks => $num
115              
116             Number of forks, max, to spawn off to handle requests.
117              
118             =item * pre_fork => $num
119              
120             How many forks to start out with. The rest are spawned as needed, with a 2 sec delay between new forks.
121              
122             =item * max_requests => $num
123              
124             How many requests each fork can handle before being slayed and respawned (if necessary).
125              
126             =item * verbose => $num (defaults 0)
127              
128             =item * talkback => sub { }
129              
130             The dispatcher logs certain events, and can be verbose about it. The talkback function will be passed a single arg of a log line. This defaults to printing to STDOUT.
131              
132             =item * fork_name => $name
133              
134             In process lists on a POSIX system, you can change the name of the forked children so you can at a glance know that they're dispatcher forks and not the parent process. Will be renamed to "$name child".
135              
136             =item * alias => $session_name
137              
138             Provide a session name. Defaults to 'PreforkDispatch'.
139              
140             =back
141              
142             =cut
143              
144             sub create {
145 1     1 0 349 my $class = shift;
146              
147             my %param = validate(@_, {
148             methods => 0,
149             classes => 0,
150             xmlrpc_server_parent => 0,
151              
152             max_forks => 0,
153             pre_fork => 0,
154             max_requests => 0,
155 0     0   0 talkback => { default => sub { print $_[0] . "\n" } },
156 1         78 fork_name => 0,
157             alias => { default => 'PreforkDispatch' },
158             upon_result => { default => 'dispatch_result' },
159             verbose => 0,
160             });
161              
162 1         31 my $session = POE::Session->create(
163             package_states => [
164             $class => [
165             qw/_start _stop init kill new_request return_result process_queue child_exited/,
166             # Callbacks from forked wheels
167             qw/fork_input fork_debug fork_closed fork_error/,
168             ],
169             ],
170             heap => {
171             %param,
172             request_queue => [],
173             # takes form: [
174             # {
175             # method_name => '...',
176             # from => '...',
177             # params => [ { ... } | ..., ... ],
178             # },
179             # ]
180              
181             forks => [],
182             # takes form: [
183             # {
184             # id => $wheel_id,
185             # wheel => POE::Wheel::Run->new(),
186             # status => 'idle|waiting_response',
187             # active_request => { ... },
188             # started_request => time,
189             # },
190             # ]
191             },
192             );
193 1         748 return $session;
194             }
195              
196             sub _start {
197 1     1   348 my ($kernel, $heap) = @_[KERNEL, HEAP];
198              
199 1 50       10 $kernel->alias_set( $heap->{alias} ? $heap->{alias} : 'RPCDispatch' );
200              
201 1         44 $heap->{talkback}("Started prefork dispatcher");
202              
203             # Register signal for rpc forks exiting
204 1         7 $kernel->sig(CHLD => "child_exited");
205              
206             # Do preforking if requested
207 1         145 my $prefork = $heap->{pre_fork};
208 1   50     13 $prefork ||= 0;
209              
210 1         6 for (my $i = 1; $i <= $prefork; $i++) {
211 2         17 my $fork = fork_new($heap);
212 2         25 $heap->{talkback}("Pre spawning fork " . $fork->{id});
213             }
214             }
215              
216             sub _stop {
217 0     0   0 my $kernel = $_[KERNEL];
218 0         0 $kernel->alias_remove();
219             }
220              
221             sub init {
222 0     0 0 0 my ($kernel, $heap) = @_[KERNEL, HEAP];
223             }
224              
225             sub kill {
226 0     0 0 0 my ($kernel, $heap) = @_[KERNEL, HEAP];
227              
228 0         0 foreach my $fork (@{ $heap->{forks} }) {
  0         0  
229 0         0 $fork->{wheel}->kill(9);
230             }
231             }
232              
233             =cut
234              
235             =head2 Session States
236              
237             =head3 new_request (\%param)
238              
239             The primary interface to enqueueing requests. Takes the following arguments in a hashref.
240              
241             $poe_kernel->post( PreforkDispatch => 'new_request', {
242             method_name => 'do_something',
243             params => [ 'Computer 3' ],
244             });
245              
246             =over 4
247              
248             =item * method_name
249              
250             Provide a method name for searching for an appropriate method to dispatch to. Most akin to XMLRPC's method_name.
251              
252             =item * method => $subref || $session_state
253              
254             Instead of using the method_name, you can provide the method session state or subref to use as a request handler.
255              
256             =item * upon_result => $subref || $session_state
257              
258             Instead of using the global upon_result, provide a per-request callback.
259              
260             =item * params => $arrayref
261              
262             An arrayref, this is where you put your payload of the request.
263              
264             =item * from
265              
266             An XMLRPC value, this is not used typically for a single-host application.
267              
268             =back
269              
270             =cut
271              
272             sub new_request {
273 5     5 1 3195 my ($kernel, $heap, $session, $sender, $request) = @_[KERNEL, HEAP, SESSION, SENDER, ARG0];
274              
275 5   50     59 $request->{from} ||= 'local';
276 5   50     20 $request->{params} ||= [];
277 5 50 33     86 $request->{method_name} ||= $request->{method} && ! ref($request->{method}) ? $request->{method} : 'anonymous method';
      33        
278              
279             # Record where it came from
280 5         366 $request->{session} = $sender->ID;
281              
282             # Handle asynchronously
283 5 50       45 if ($heap->{max_forks}) {
284             # Enqueue the request and yield to fork queue checker
285 5         11 push @{ $heap->{request_queue} }, $request;
  5         11  
286 5         19 $kernel->yield('process_queue');
287              
288 5         400 return;
289             }
290              
291 0 0       0 $heap->{talkback}("Handling request $$request{method_name} synchronously") if $heap->{verbose};
292 0         0 $request->{result} = handle_request_wrapper($heap, $request);
293 0         0 $kernel->call($session, 'return_result', $request);
294             }
295              
296             =cut
297              
298             =head2 Request to response
299              
300             After a new_request() is issued, the dispatcher will process it in a FIFO queue, using forks if available, or handling it synchronously otherwise. Handling a request is done by searching for a valid method, either picking the $request->{method}, or if not available, searching the dispatcher methods, classes and finally the xmlrpc_server_parent for something to handle $request->{method_name}.
301              
302             If the method given is a subref, it will be passed ($from, @args). If a POE session state name, the calling session will have this state posted to with the same args ($from, @args):
303              
304             my ($from, @args) = @_;
305              
306             or
307              
308             my ($kernel, $heap, $from, @args) = @_[KERNEL, HEAP, ARG0 .. $#_];
309              
310             Once the request is handled, successfully or not, a response is sent to either the request's 'upon_result', or the dispatchers. If the method is a subref, it will be handed ($request, $response). Similar for session state. The request will be the same as passed, but with the additional key/value of 'elapsed' containing the seconds the request took to process. The response will be the response value of the method that handled the request, or in the case of an error, a hashref with the key 'error'.
311              
312             =head2 Special methods
313              
314             There are some methods that are special and can be used to control child fork behavior
315              
316             =over 4
317              
318             =item * _precall
319              
320             =item * _postcall
321              
322             Not sure if these are useful, but will be called before and after the named method. Can be used as universal constructor/destructors for method calls. Passed the main method params.
323              
324             =item * _fork_preinit
325              
326             =item * _fork_postinit
327              
328             Code to be called before and after actually forking (in the parent process).
329              
330             =item * _fork_init
331              
332             Not passed anything, this permits the fork to do something that's better done after forking (opening handles and such).
333              
334             =back
335              
336             =cut
337              
338             sub return_result {
339 5     5 0 4220 my ($kernel, $heap, $request) = @_[KERNEL, HEAP, ARG0];
340              
341 5         10 my $result = delete $request->{result};
342              
343 5 50       16 if (! defined $result) {
344             # The method was not found
345 0         0 $result = { error => "Could not handle method '".$request->{method_name}."'; no means found" };
346             }
347              
348 5   33     18 my $return_to = $request->{upon_result} || $heap->{upon_result};
349 5 50       12 if (ref($return_to)) {
350 0         0 $return_to->($request, $result);
351             }
352             else {
353 5         23 $kernel->post(delete $request->{session}, $return_to, $request, $result);
354             }
355             }
356              
357             sub handle_request_wrapper {
358 0     0 0 0 my ($heap, $request) = @_;
359              
360 0         0 my $result;
361             try {
362 0     0   0 $result = handle_request($heap, $request);
363             }
364             otherwise {
365 0     0   0 my ($ex) = @_;
366 0 0 0     0 $result = {
367             error =>
368             "Method '$$request{method_name}' threw exception: " .
369             ( ref($ex) && $ex->can('stringify') ? $ex->stringify() : $ex )
370             };
371 0         0 };
372 0         0 return $result;
373             }
374              
375             sub handle_request {
376 0     0 0 0 my ($heap, $request) = (shift, shift);
377              
378 0         0 my $method_name = $request->{method_name};
379 0         0 my @args = ($request->{from}, @{ $request->{params} });
  0         0  
380              
381 0         0 call_method($heap, '_precall', \@args);
382 0         0 my $result = call_method($heap, $method_name, \@args, $request);
383 0         0 call_method($heap, '_postcall', \@args);
384              
385 0         0 return $result;
386             }
387              
388             sub call_method {
389 6     6 0 15 my ($heap, $method_name, $args, $request) = @_;
390              
391             # Find a method to handle this
392              
393 6 50       21 my $method = $heap->{methods} ? $heap->{methods}{$method_name} : undef;
394 6   33     43 $method ||= $request->{method};
395              
396 6 50       32 if ($method) {
    50          
    50          
397 0 0       0 if (ref($method)) {
398 0         0 return $method->(@$args);
399             }
400             else {
401 0         0 return $poe_kernel->call( $request->{session}, $method, @$args );
402             }
403             }
404             elsif ($heap->{classes}) {
405 0         0 foreach my $class (@{ $heap->{classes} }) {
  0         0  
406             # TODO - see if class has function $method_name
407             }
408 0         0 return { error => "Class-based method calls not yet implemented" };
409             }
410             elsif ($heap->{xmlrpc_server_parent}) {
411 0         0 my $from = shift @$args;
412 0         0 my $transaction = POE::Component::PreforkDispatch::PseudoXMLRPCTransaction->new(@$args);
413 0         0 $poe_kernel->call( $heap->{xmlrpc_server_parent}, $method_name, $transaction );
414 0 0       0 return { error => "Couldn't call XMLRPC method $method_name on session ".$heap->{xmlrpc_server_parent} } if $!;
415 0         0 return $transaction->result();
416             }
417             else {
418 6         36 return { error => "Unknown XMLRPC method $method_name" };
419             }
420             }
421              
422              
423             ## RPC methods
424              
425             sub process_queue {
426 10     10 0 2891 my ($kernel, $heap) = @_[KERNEL, HEAP];
427              
428             # Do nothing if queue is empty
429 10 100       15 return if $#{ $heap->{request_queue} } < 0;
  10         42  
430              
431             # Find a fork to use
432              
433             # Check for available, not busy existing forks.
434             # Choose the fork that's been waiting the longest
435 1         7 my @avail_forks =
436 18         174 sort { $a->{finished_request} <=> $b->{finished_request} }
437 7         14 grep { $_->{status} eq 'idle' }
438 7         11 @{ $heap->{forks} };
439              
440 7 100       24 my $use_fork = $avail_forks[0] ? $avail_forks[0] : undef;
441              
442             # If no fork found, create a new one if possible. Otherwise, wait.
443 7 100       23 if (! $use_fork) {
444 3 50       5 if (int @{ $heap->{forks} } == $heap->{max_forks}) {
  3         12  
445             # Already forked the max number; have to wait for one to return
446 0 0       0 $heap->{talkback}("All forks are busy; will wait to handle request after a fork returns") if $heap->{verbose};
447 0         0 return;
448             }
449              
450             # Don't forkbomb; delay before spawning another fork
451 3 100 66     28 if ($heap->{last_fork_created} && time - $heap->{last_fork_created} < 5) {
452 2 50       16 $heap->{talkback}("Delaying 2 sec on creating another fork") if $heap->{verbose};
453 2         18 $kernel->delay('process_queue', 2);
454 2         445 return;
455             }
456 1         3 $use_fork = fork_new($heap);
457 1         27 $heap->{talkback}("Creating new fork " . $use_fork->{id});
458 1         17 $heap->{last_fork_created} = time;
459             }
460              
461             ## With a fork found, hand off the first request in queue to this fork
462              
463 5         41 my $request = shift @{ $heap->{request_queue} };
  5         23  
464              
465 5 50       24 $heap->{talkback}("Handling request " . $request->{method_name} . " with fork " . $use_fork->{id}) if $heap->{verbose};
466              
467 5         10 $use_fork->{active_request} = $request;
468 5         22 $use_fork->{status} = 'waiting_response';
469 5         8 $use_fork->{started_request} = time;
470              
471 5         53 $use_fork->{wheel}->put( $request );
472             }
473              
474             sub fork_new {
475 3     3 0 6 my ($heap) = @_;
476              
477 3         9 call_method($heap, '_fork_preinit');
478              
479             # Create a new fork
480             my $wheel = POE::Wheel::Run->new(
481 0     0   0 Program => sub { fork_main($heap) },
482 3         95 StdinFilter => POE::Filter::Reference->new(),
483             StdoutFilter => POE::Filter::Reference->new(),
484             StdoutEvent => 'fork_input',
485             StderrEvent => 'fork_debug',
486             CloseEvent => 'fork_closed',
487             ErrorEvent => 'fork_error',
488             );
489 3         25929 my $fork = {
490             status => 'idle',
491             wheel => $wheel,
492             id => $wheel->ID,
493              
494             active_request => undef,
495             started_request => 0,
496             finished_request => 0,
497             };
498 3         105 push @{ $heap->{forks} }, $fork;
  3         18  
499 3         48 call_method($heap, '_fork_postinit', $fork);
500 3         45 return $fork;
501             }
502              
503             sub fork_main {
504 0     0 0 0 my ($heap) = @_;
505 0         0 my $raw;
506 0         0 my $size = 4096;
507 0         0 my $filter = POE::Filter::Reference->new();
508 0         0 my $request_counter = 0;
509 0   0     0 my $request_max = $heap->{max_requests} || 0;
510              
511             # Set my `ps aux` name if desired
512 0 0       0 $0 = "$$heap{fork_name} child" if $heap->{fork_name};
513              
514             # Do init (if needed)
515 0         0 call_method($heap, '_fork_init');
516              
517             READ:
518 0         0 while (sysread( STDIN, $raw, $size )) {
519 0         0 my $requests = $filter->get( [$raw] );
520 0         0 foreach my $request (@$requests) {
521             # Need to capture STDOUT so the handle_request doesn't accidently write
522             # to the STDOUT (reserved for communications with the control)
523 0         0 my $capture = IO::Capture::Stdout->new();
524 0         0 $capture->start();
525              
526 0         0 my $result = handle_request_wrapper($heap, $request);
527              
528             # Stop the STDOUT capture. If it said anything, spit it out via
529             # STDERR to display to the controlling terminal (or piped log)
530 0         0 $capture->stop();
531 0         0 while (my $line = $capture->read) {
532 0         0 chomp $line;
533 0         0 print STDERR "$line\n";
534             }
535              
536             # Re-freeze the data structure and spit it out
537              
538 0         0 print STDOUT @{ $filter->put( [ $result ] ) };
  0         0  
539              
540 0         0 $request_counter++;
541             }
542 0 0 0     0 if ($request_max && $request_counter >= $request_max) {
543 0         0 print STDERR "Closing fork as requests $request_counter >= max $request_max\n";
544 0         0 return 1;
545             }
546             }
547             }
548              
549             sub child_exited {
550 0     0 0 0 my ($kernel, $heap, $sig_name, $child_id) = @_[KERNEL,HEAP,ARG0,ARG1];
551              
552 0         0 $heap->{talkback}("Child exited with signal $sig_name ($child_id)");
553             }
554              
555             sub fork_input {
556 5     5 0 5090 my ($kernel, $heap, $input, $wheel_id) = @_[KERNEL, HEAP, ARG0, ARG1];
557              
558 5         18 my ($fork) = grep { $wheel_id == $_->{id} } @{ $heap->{forks} };
  15         185  
  5         13  
559 5 50       18 die "Got fork input from an unknown wheel id" if ! $fork;
560              
561 5 50       19 if ($fork->{status} eq 'idle') {
562 0         0 $heap->{talkback}("Got input from an idle fork");
563 0         0 print STDERR Dumper($input, $fork);
564 0         0 $kernel->yield('fork_closed', $wheel_id);
565 0         0 return;
566             }
567              
568 5         15 $fork->{status} = 'idle';
569 5         11 $fork->{finished_request} = time;
570              
571             # Reply to the original request
572 5         11 my $request = delete $fork->{active_request};
573 5         17 $request->{result} = $input;
574 5         36 $request->{elapsed} = $fork->{finished_request} - $fork->{started_request};
575 5         21 $kernel->yield('return_result', $request);
576              
577             # Continue more requests if any
578 5         455 $kernel->yield('process_queue');
579             }
580              
581             sub fork_debug {
582 0     0 0   my ($kernel, $heap, $input, $wheel_id) = @_[KERNEL, HEAP, ARG0, ARG1];
583              
584 0           my ($fork) = grep { $wheel_id == $_->{id} } @{ $heap->{forks} };
  0            
  0            
585 0 0         die "Got fork error from an unknown wheel id" if ! $fork;
586              
587 0           my $method_name = $fork->{active_request}{method_name};
588 0   0       $method_name ||= "(unknown)";
589              
590 0           $heap->{talkback}("STDERR:$wheel_id $method_name: $input");
591             }
592              
593             sub fork_error {
594 0     0 0   my ($kernel, $heap, $syscall, $errno, $errstr, $wheel_id, $handle) =
595             @_[KERNEL, HEAP, ARG0 .. $#_];
596              
597 0           my ($fork) = grep { $wheel_id == $_->{id} } @{ $heap->{forks} };
  0            
  0            
598 0 0         die "Got fork error from an unknown wheel id" if ! $fork;
599              
600 0           my $method_name = $fork->{active_request}{method_name};
601 0   0       $method_name ||= "(unknown)";
602              
603 0           $heap->{talkback}("$handle:$wheel_id $method_name: Generated $syscall error $errno: $errstr");
604             }
605              
606             sub fork_closed {
607 0     0 0   my ($kernel, $heap, $wheel_id) = @_[KERNEL, HEAP, ARG0];
608              
609 0           my ($fork) = grep { $wheel_id == $_->{id} } @{ $heap->{forks} };
  0            
  0            
610 0 0         die "Got fork closed from an unknown wheel id" if ! $fork;
611              
612 0           $heap->{talkback}("Fork $wheel_id closed");
613              
614             # Forks shouldn't close unless there's an error. May also want to clean up after
615             # zombies somehow... and reprocess requests that died...
616              
617 0 0         if ($fork->{active_request}{method_name}) {
618 0           unshift @{ $heap->{request_queue} }, $fork->{active_request};
  0            
619             }
620              
621 0           $fork->{wheel}->kill(9);
622              
623             # Remove the fork from the list
624 0           my @new_list = grep { $wheel_id != $_->{id} } @{ $heap->{forks} };
  0            
  0            
625 0           $heap->{forks} = \@new_list;
626              
627             # Continue more requests if any
628 0           $kernel->yield('process_queue');
629             }
630              
631             # hide from CPAN
632             package
633             POE::Component::PreforkDispatch::PseudoXMLRPCTransaction;
634              
635 1     1   10 use strict;
  1         2  
  1         36  
636 1     1   6 use warnings;
  1         2  
  1         266  
637              
638             sub new {
639 0     0     my ($class, @params) = @_;
640 0           my %self = ( params => \@params );
641 0           return bless \%self, $class;
642             }
643              
644             sub params {
645 0     0     my $self = shift;
646 0           return $self->{params};
647             }
648              
649             sub return {
650 0     0     my ($self, $result) = @_;
651 0           $self->{result} = $result;
652             }
653              
654             sub result {
655 0     0     my $self = shift;
656 0           return $self->{result};
657             }
658              
659             =head1 SEE ALSO
660              
661             L, L, L
662              
663             =head1 TODO
664              
665             =over 4
666              
667             =item * Class-based method discovery
668              
669             =item * More tests
670              
671             =back
672              
673             =head1 AUTHOR
674              
675             Eric Waters
676              
677             =head1 COPYRIGHT
678              
679             Copyright (c) 2007 Eric Waters and XMission LLC (http://www.xmission.com/). All rights reserved. This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
680              
681             The full text of the license can be found in the LICENSE file included with this module.
682              
683             =cut
684              
685             1;