File Coverage

blib/lib/XAS/Lib/Process.pm
Criterion Covered Total %
statement 20 258 7.7
branch 1 76 1.3
condition 0 16 0.0
subroutine 7 33 21.2
pod 8 8 100.0
total 36 391 9.2


line stmt bran cond sub pod time code
1             package XAS::Lib::Process;
2              
3             our $VERSION = '0.02';
4              
5             my $mixin;
6              
7             BEGIN {
8 1     1   2209 $mixin = 'XAS::Lib::Process::Unix';
9 1 50       25 $mixin = 'XAS::Lib::Process::Win32' if ($^O eq 'MSWin32');
10             }
11              
12 1     1   414 use Set::Light;
  1         1279  
  1         23  
13 1     1   5 use Hash::Merge;
  1         1  
  1         32  
14 1     1   4 use Badger::Filesystem 'Cwd Dir File';
  1         1  
  1         19  
15 1     1   130 use XAS::Constants ':process CODEREF';
  1         1  
  1         6  
16 1     1   187 use POE qw(Wheel Driver::SysRW Filter::Line);
  1         2  
  1         5  
17              
18             use XAS::Class
19             debug => 0,
20             version => $VERSION,
21             base => 'XAS::Lib::POE::Service',
22             mixin => "XAS::Lib::Mixins::Process $mixin",
23             utils => ':validation dotid trim',
24             mutators => 'input_handle output_handle status retries',
25             accessors => 'pid exit_code exit_signal process ID merger',
26             vars => {
27             PARAMS => {
28             -command => 1,
29             -auto_start => { optional => 1, default => 1 },
30             -auto_restart => { optional => 1, default => 1 },
31             -environment => { optional => 1, default => {} },
32             -exit_codes => { optional => 1, default => '0,1' },
33             -exit_retries => { optional => 1, default => 5 },
34             -group => { optional => 1, default => 'nobody' },
35             -priority => { optional => 1, default => 0 },
36             -pty => { optional => 1, default => 0 },
37             -umask => { optional => 1, default => '0022' },
38             -user => { optional => 1, default => 'nobody' },
39             -redirect => { optional => 1, default => 0 },
40             -retry_delay => { optional => 1, default => 0 },
41             -input_driver => { optional => 1, default => POE::Driver::SysRW->new() },
42             -output_driver => { optional => 1, default => POE::Driver::SysRW->new() },
43             -input_filter => { optional => 1, default => POE::Filter::Line->new(Literal => "\n") },
44             -output_filter => { optional => 1, default => POE::Filter::Line->new(Literal => "\n") },
45             -directory => { optional => 1, default => Cwd, isa => 'Badger::Filesystem::Directory' },
46             -output_handler => { optional => 1, type => CODEREF, default => sub {
47 0           my $output = shift;
48 0           printf("%s\n", trim($output));
49             }
50             },
51             }
52             }
53 1     1   562 ;
  1         2  
  1         21  
54              
55             #use Data::Dumper;
56              
57             # ----------------------------------------------------------------------
58             # Public Methods
59             # ----------------------------------------------------------------------
60              
61             sub session_initialize {
62 0     0 1   my $self = shift;
63              
64 0           my $alias = $self->alias;
65              
66 0           $self->log->debug("$alias: entering session_initialize()");
67              
68 0           $poe_kernel->state('get_event', $self, '_get_event');
69 0           $poe_kernel->state('flush_event', $self, '_flush_event');
70 0           $poe_kernel->state('error_event', $self, '_error_event');
71 0           $poe_kernel->state('close_event', $self, '_close_event');
72 0           $poe_kernel->state('check_status', $self, '_check_status');
73 0           $poe_kernel->state('poll_child', $self, '_poll_child');
74 0           $poe_kernel->state('child_exit', $self, '_child_exit');
75              
76 0           $poe_kernel->state('start_process', $self, '_start_process');
77 0           $poe_kernel->state('stop_process', $self, '_stop_process');
78 0           $poe_kernel->state('pause_process', $self, '_pause_process');
79 0           $poe_kernel->state('resume_process', $self, '_resume_process');
80 0           $poe_kernel->state('kill_process', $self, '_kill_process');
81              
82              
83             # walk the chain
84              
85 0           $self->SUPER::session_initialize();
86              
87 0           $poe_kernel->post($alias, 'session_startup');
88              
89 0           $self->log->debug("$alias: leaving session_initialize()");
90              
91             }
92              
93             sub session_startup {
94 0     0 1   my $self = shift;
95              
96 0           my $alias = $self->alias;
97              
98 0           $self->log->debug("$alias: entering session_startup()");
99              
100 0 0         if ($self->auto_start) {
101              
102 0           $poe_kernel->call($alias, 'start_process');
103            
104             }
105              
106             # walk the chain
107              
108 0           $self->SUPER::session_startup();
109              
110 0           $self->log->debug("$alias: leaving session_startup()");
111              
112             }
113              
114             sub session_pause {
115 0     0 1   my $self = shift;
116              
117 0           my $alias = $self->alias;
118              
119 0           $self->log->debug("$alias: entering session_pause()");
120              
121 0           $poe_kernel->call($alias, 'pause_process');
122            
123             # walk the chain
124              
125 0           $self->SUPER::session_pause();
126              
127 0           $self->log->debug("$alias: leaving session_pause()");
128              
129             }
130              
131             sub session_resume {
132 0     0 1   my $self = shift;
133              
134 0           my $alias = $self->alias;
135              
136 0           $self->log->debug("$alias: entering session_resume()");
137              
138 0           $poe_kernel->call($alias, 'resume_process');
139              
140             # walk the chain
141              
142 0           $self->SUPER::session_resume();
143              
144 0           $self->log->debug("$alias: leaving session_resume()");
145              
146             }
147              
148             sub session_stop {
149 0     0 1   my $self = shift;
150              
151 0           my $alias = $self->alias;
152              
153 0           $self->log->debug("$alias: entering session_stop()");
154              
155 0           $self->kill_process();
156 0           $poe_kernel->sig_handled();
157              
158             # walk the chain
159              
160 0           $self->SUPER::session_stop();
161              
162 0           $self->log->debug("$alias: leaving session_stop()");
163              
164             }
165              
166             sub session_shutdown {
167 0     0 1   my $self = shift;
168              
169 0           my $alias = $self->alias;
170              
171 0           $self->log->debug("$alias: entering session_shutdown()");
172              
173 0           $self->status(PROC_SHUTDOWN);
174              
175 0           $poe_kernel->call($alias, 'stop_process');
176 0           $poe_kernel->sig_handled();
177            
178             # walk the chain
179              
180 0           $self->SUPER::session_shutdown();
181              
182 0           $self->log->debug("$alias: leaving session_shutdown()");
183              
184             }
185              
186             sub put {
187 0     0 1   my $self = shift;
188 0           my ($chunk) = validate_params(\@_, [1]);
189              
190 0           my @chunks;
191 0           my $driver = $self->input_driver;
192 0           my $filter = $self->input_filter;
193              
194             # Avoid big bada boom if someone put()s on a dead wheel.
195              
196 0 0         unless ($self->input_handle) {
197              
198 0           $self->throw_msg(
199             dotid($self->class) . '.put_input.writerr',
200             'process_writerr',
201             'called put() on a wheel without an open INPUT handle'
202             );
203              
204             }
205            
206 0           push(@chunks, $chunk);
207              
208 0 0         if ($self->{'buffer'} = $driver->put($filter->put(\@chunks))) {
209              
210 0           $poe_kernel->select_resume_write($self->input_handle);
211              
212             }
213              
214 0           return 0;
215              
216             }
217              
218             sub DESTROY {
219 0     0     my $self = shift;
220              
221 0 0         if ($self->input_handle) {
222              
223 0           $poe_kernel->select_write($self->input_handle);
224 0           $self->input_handle(undef);
225              
226             }
227              
228 0 0         if ($self->output_handle) {
229              
230 0           $poe_kernel->select_read($self->output_handle);
231 0           $self->output_handle(undef);
232              
233             }
234              
235 0           $self->destroy();
236              
237 0           POE::Wheel::free_wheel_id($self->ID);
238              
239             }
240              
241             # ----------------------------------------------------------------------
242             # Public Events
243             # ----------------------------------------------------------------------
244              
245             # ----------------------------------------------------------------------
246             # Private Events
247             # ----------------------------------------------------------------------
248              
249             sub _start_process {
250 0     0     my $self = $_[OBJECT];
251              
252 0           my $count = 1;
253 0           my $alias = $self->alias;
254              
255 0 0         if ($self->status == PROC_STOPPED) {
256              
257 0           $self->start_process();
258 0           $poe_kernel->post($alias, 'check_status', $count);
259              
260             }
261              
262             }
263              
264             sub _resume_process {
265 0     0     my $self = $_[OBJECT];
266              
267 0           my $count = 1;
268 0           my $alias = $self->alias;
269              
270 0           $self->resume_process();
271 0           $poe_kernel->post($alias, 'check_status', $count);
272              
273             }
274              
275             sub _pause_process {
276 0     0     my $self = $_[OBJECT];
277              
278 0           my $count = 1;
279 0           my $alias = $self->alias;
280              
281 0           $self->pause_process();
282 0           $poe_kernel->post($alias, 'check_status', $count);
283              
284             }
285              
286             sub _stop_process {
287 0     0     my $self = $_[OBJECT];
288              
289 0           my $count = 1;
290 0           my $alias = $self->alias;
291              
292 0           $self->stop_process();
293 0           $poe_kernel->post($alias, 'check_status', $count);
294              
295             }
296              
297             sub _kill_process {
298 0     0     my $self = $_[OBJECT];
299              
300 0           my $count = 1;
301 0           my $alias = $self->alias;
302              
303 0           $self->kill_process();
304 0           $poe_kernel->post($alias, 'check_status', $count);
305              
306             }
307              
308             sub _get_event {
309 0     0     my ($self, $output, $wheel) = @_[OBJECT,ARG0,ARG1];
310              
311 0           $self->output_handler->($output);
312              
313             }
314              
315             sub _check_status {
316 0     0     my ($self, $count) = @_[OBJECT, ARG0];
317              
318 0           my $alias = $self->alias;
319 0           my $stat = $self->stat_process();
320              
321 0           $self->log->debug(sprintf('%s: check_status: process: %s, status: %s, count %s', $alias, $stat, $self->status, $count));
322              
323 0           $count++;
324              
325 0 0         if ($self->status == PROC_STARTED) {
    0          
    0          
    0          
    0          
326              
327 0 0 0       if (($stat == 3) || ($stat == 2)) {
328              
329 0           $self->status(PROC_RUNNING);
330 0           $self->log->info_msg('process_started', $alias, $self->pid);
331              
332             } else {
333              
334 0           $poe_kernel->delay('check_status', 5, $count);
335              
336             }
337              
338             } elsif ($self->status == PROC_RUNNING) {
339              
340 0 0 0       if (($stat != 3) || ($stat != 2)) {
341              
342 0           $self->resume_process();
343 0           $poe_kernel->delay('check_status', 5, $count);
344              
345             }
346              
347             } elsif ($self->status == PROC_PAUSED) {
348              
349 0 0         if ($stat != 6) {
350              
351 0           $self->pause_process();
352 0           $poe_kernel->delay('check_status', 5, $count);
353              
354             }
355              
356             } elsif ($self->status == PROC_STOPPED) {
357              
358 0 0         if ($stat != 0) {
359              
360 0           $self->stop_process();
361 0           $poe_kernel->delay('check_status', 5, $count);
362              
363             }
364              
365             } elsif($self->status == PROC_KILLED) {
366              
367 0 0         if ($stat != 0) {
368              
369 0           $self->kill_process();
370 0           $poe_kernel->delay('check_status', 5, $count);
371              
372             }
373              
374             }
375              
376             }
377              
378             sub _flush_event {
379 0     0     my ($self, $wheel) = @_[OBJECT,ARG0];
380              
381 0           my $alias = $self->alias;
382              
383 0           $self->log->debug("$alias: flush_event");
384              
385             }
386              
387             sub _error_event {
388 0     0     my ($self, $operation, $errno, $errstr, $wheel, $type) = @_[OBJECT,ARG0..ARG4];
389              
390 0           my $alias = $self->alias;
391              
392 0           $self->log->debug(
393             sprintf('%s: error_event - ops: %s, errno: %s, errstr: %s',
394             $alias, $operation, $errno, $errstr)
395             );
396              
397             }
398              
399             sub _close_event {
400 0     0     my ($self, $wheel) = @_[OBJECT,ARG0];
401              
402 0           my $alias = $self->alias;
403              
404 0           $self->log->debug("$alias: close_event");
405              
406 0           $poe_kernel->select_write($self->input_handle);
407 0           $self->input_handle(undef);
408              
409 0           $poe_kernel->select_read($self->output_handle);
410 0           $self->output_handle(undef);
411              
412             }
413              
414             sub _child_exit {
415 0     0     my ($self, $signal, $pid, $exitcode) = @_[OBJECT,ARG0...ARG2];
416              
417 0           my $alias = $self->alias;
418 0           my $status = $self->status;
419 0           my $retries = $self->retries;
420              
421 0           $self->{'pid'} = undef;
422 0           $self->{'exit_code'} = $exitcode >> 8;
423 0           $self->{'exit_signal'} = $exitcode & 127;
424              
425 0           $self->log->warn_msg('process_exited', $alias, $pid, $self->exit_code, $self->exit_signal);
426              
427 0 0         if ($status == PROC_STOPPED) {
428              
429 0 0         if ($self->auto_restart) {
430              
431 0 0 0       if (($retries < $self->exit_retries) || ($self->exit_retries < 0)) {
432              
433 0           $retries += 1;
434 0           $self->retries($retries);
435              
436 0 0         if ($self->exit_codes->has($self->exit_code)) {
437              
438 0 0         if ($self->retry_delay) {
439              
440 0           $poe_kernel->delay('start_process', $self->retry_delay);
441              
442             } else {
443              
444 0           $poe_kernel->call($alias, 'start_process');
445              
446             }
447              
448             } else {
449              
450 0   0       $self->log->warn_msg(
      0        
451             'process_unknown_exitcode',
452             $alias,
453             $self->exit_code || '',
454             $self->exit_signal || '',
455             );
456              
457             }
458              
459             } else {
460              
461 0           $self->log->warn_msg('process_nomore_retries', $alias, $retries);
462              
463             }
464              
465             } else {
466              
467 0           $self->log->warn_msg('process_no_autorestart', $alias);
468              
469             }
470              
471             }
472              
473             }
474              
475             # ----------------------------------------------------------------------
476             # Private Methods
477             # ----------------------------------------------------------------------
478              
479             # stolen from POE::Wheel::Run - more or less
480              
481             sub _process_output {
482 0     0     my $self = shift;
483              
484 0           my $id = $self->ID;
485 0           my $driver = $self->output_driver;
486 0           my $filter = $self->output_filter;
487 0           my $output = $self->output_handle;
488 0           my $state = ref($self) . "($id) -> select output";
489              
490 0 0 0       if ($filter->can('get_one') and $filter->can('get_one_start')) {
491              
492             $poe_kernel->state(
493             $state,
494             sub {
495 0     0     my ($k, $me, $handle) = @_[KERNEL, SESSION, ARG0];
496 0 0         if (defined(my $raw = $driver->get($handle))) {
497 0           $filter->get_one_start($raw);
498 0           while (1) {
499 0           my $next_rec = $filter->get_one();
500 0 0         last unless @$next_rec;
501 0           foreach my $cooked (@$next_rec) {
502 0           $k->call($me, 'get_event', $cooked, $id);
503             }
504             }
505             } else {
506 0           $k->call($me, 'error_event', 'read', ($!+0), $!, $id, 'OUTPUT');
507 0           $k->call($me, 'close_event', $id);
508 0           $k->select_read($handle);
509             }
510             }
511 0           );
512              
513             } else {
514              
515             $poe_kernel->state(
516             $state,
517             sub {
518 0     0     my ($k, $me, $handle) = @_[KERNEL, SESSION, ARG0];
519 0 0         if (defined(my $raw = $driver->get($handle))) {
520 0           foreach my $cooked (@{$filter->get($raw)}) {
  0            
521 0           $k->call($me, 'get_event', $cooked, $id);
522             }
523             } else {
524 0           $k->call($me, 'error_event', 'read', ($!+0), $!, $id, 'OUTPUT');
525 0           $k->call($me, 'close_event', $id);
526 0           $k->select_read($handle);
527             }
528             }
529 0           );
530              
531             }
532              
533 0           $poe_kernel->select_read($output, $state);
534              
535             }
536              
537             sub _process_input {
538 0     0     my $self = shift;
539              
540 0           my $id = $self->ID;
541 0           my $driver = $self->input_driver;
542 0           my $filter = $self->input_filter;
543 0           my $input = $self->input_handle;
544 0           my $buffer = \$self->{'buffer'};
545 0           my $state = ref($self) . "($id) -> select input";
546              
547             $poe_kernel->state(
548             $state,
549             sub {
550 0     0     my ($k, $me, $handle) = @_[KERNEL,SESSION,ARG0];
551 0           $$buffer = $driver->flush($handle);
552             # When you can't write, nothing else matters.
553 0 0         if ($!) {
554 0           $k->call($me, 'error_event', 'write', ($!+0), $!, $id, 'INPUT');
555 0           $k->select_write($handle);
556             } else {
557             # Could write, or perhaps couldn't but only because the
558             # filehandle's buffer is choked.
559             # All chunks written; fire off a "flushed" event.
560 0 0         unless ($$buffer) {
561 0           $k->select_pause_write($handle);
562 0           $k->call($me, 'flush_event', $id);
563             }
564             }
565             }
566 0           );
567              
568 0           $poe_kernel->select_write($input, $state);
569              
570             # Pause the write select immediately, unless output is pending.
571              
572 0 0         $poe_kernel->select_pause_write($input) unless ($buffer);
573              
574             }
575              
576             # Stolen from Proc::Background - more or less
577              
578             sub _resolve_path {
579 0     0     my $self = shift;
580 0           my $command = shift;
581 0           my $extensions = shift;
582 0           my $xpaths = shift;
583              
584             # Make the path to the progam absolute if it isn't already. If the
585             # path is not absolute and if the path contains a directory element
586             # separator, then only prepend the current working to it. If the
587             # path is not absolute, then look through the PATH environment to
588             # find the executable.
589              
590 0           my $alias = $self->alias;
591 0           my $path = File($command);
592              
593 0 0         if ($path->is_absolute) {
    0          
594              
595 0 0         if ($path->exists) {
596              
597 0           return $path->absolute;
598              
599             }
600              
601             } elsif ($path->is_relative) {
602              
603 0 0         if ($path->name eq $path) {
604              
605 0           foreach my $xpath (@$xpaths) {
606              
607 0 0         next if ($xpath eq '');
608              
609 0 0         if ($path->extension) {
610              
611 0           my $p = File($xpath, $path->name);
612              
613 0 0         if ($p->exists) {
614              
615 0           return $p->absolute;
616              
617             }
618              
619             } else {
620              
621 0           foreach my $ext (@$extensions) {
622              
623 0           my $p = File($xpath, $path->basename . $ext);
624              
625 0 0         if ($p->exists) {
626              
627 0           return $p->absolute;
628              
629             }
630              
631             }
632              
633             }
634              
635             }
636              
637             } else {
638              
639 0           my $p = File($path->absoulte);
640              
641 0 0         if ($p->exists) {
642              
643 0           return $p->absolute;
644              
645             }
646              
647             }
648              
649             }
650              
651             $self->throw_msg(
652 0           dotid($self->class) . '.resolve_path.path',
653             'location',
654             $alias, $command
655             );
656              
657              
658             }
659              
660             sub init {
661 0     0 1   my $class = shift;
662              
663 0           my $self = $class->SUPER::init(@_);
664              
665 0           my @exit_codes = split(',', $self->exit_codes);
666              
667 0           $self->{'exit_codes'} = Set::Light->new(@exit_codes);
668 0           $self->{'ID'} = POE::Wheel::allocate_wheel_id();
669 0           $self->{'merger'} = Hash::Merge->new('RIGHT_PRECEDENT');
670              
671 0           $self->retries(1);
672 0           $self->init_process();
673 0           $self->status(PROC_STOPPED);
674              
675 0           return $self;
676              
677             }
678              
679             1;
680              
681             __END__
682              
683             =head1 NAME
684              
685             XAS::Lib::Process - A class for managing processes within the XAS environment
686              
687             =head1 SYNOPSIS
688              
689             use XAS::Lib::Process;
690              
691             my $process = XAS::Lib::Process->new(
692             -command => 'perl test.pl'
693             );
694            
695             $process->run();
696              
697             =head1 DESCRIPTION
698              
699             This class manages a sub process in a platform independent way. Mixins
700             are loaded to handle the differences between Unix/Linux and Windows.
701             This module inherits from L<XAS::Lib::POE::Service|XAS::Lib::POE::Service>.
702             Please refer to that module for additional help.
703              
704             =head1 METHODS
705              
706             =head2 new
707              
708             This method initialized the module and takes the following parameters:
709              
710             =over 4
711              
712             =item B<-auto_start>
713              
714             This indicates wither to auto start the process. The default is true.
715              
716             =item B<-auto_restart>
717              
718             This indicates wither to auto restart the process if it exits. The default
719             is true.
720              
721             =item B<-command>
722              
723             The command to run.
724              
725             =item B<-directory>
726              
727             The optional directory to start the process in. Defaults to the current
728             directory of the parent process.
729              
730             =item B<-environment>
731              
732             Optional, additional environment variables to provide to the process.
733             The default is none.
734              
735             =item B<-exit_codes>
736              
737             Optional exit codes to check for the process. They default to '0,1'.
738             If the exit code matches, then the process is auto restarted. This should
739             be a comma delimited list of values.
740              
741             =item B<-exit_retries>
742              
743             The optional number of retries for restarting the process. The default
744             is 5.
745              
746             =item B<-group>
747              
748             The group to run the process under. Defaults to 'nobody'. This group
749             may not be defined on your system. This option is not implemented on Windows.
750              
751             =item B<-priority>
752              
753             The optional priority to run the process at. Defaults to 0. This option
754             is not implemented on Windows.
755              
756             =item B<-umask>
757              
758             The optional protection mask for the process. Defaults to '0022'. This
759             option is not implemented on Windows.
760              
761             =item B<-user>
762              
763             The optional user to run the process under. Defaults to 'nobody'. This user
764             may not be defined on your system. This option is not implemented on Windows.
765              
766             =item B<-redirect>
767              
768             This option is used to indicate wither to redirect stdout and stderr
769             from the child process to the parent and stdin from the parent to the
770             child process. The redirection combines stderr with stdout. Redirection
771             is implemented using sockets. This may cause buffering problems with the
772             child process.
773              
774             The default is no.
775              
776             =item B<-retry_delay>
777              
778             The optional number of seconds to delay a retry on an auto restart process.
779             The default is 0, or no delay in restarting the process.
780              
781             =item B<-input_driver>
782              
783             The optional input driver to use. Defaults to POE::Driver::SysRW.
784              
785             =item B<-output_driver>
786              
787             The optional output driver to use. Defaults to POE::Driver::SysRW.
788              
789             =item B<-input_filter>
790              
791             The optional filter to use for input. Defaults to POE::Filter::Line.
792              
793             =item B<-output_filter>
794              
795             The optional output filter to use. Defaults to POE::Filter::Line.
796              
797             =item B<-output_handler>
798              
799             This is an optional coderef to handle output from the process. The coderef
800             takes one parameter, the output from the process.
801              
802             =back
803              
804             =head1 METHODS
805              
806             =head2 put($data)
807              
808             This method will write a buffer to stdin.
809              
810             =head1 SEE ALSO
811              
812             =over 4
813              
814             =item L<XAS::Lib::Process::Unix|XAS::Lib::Process::Unix>
815              
816             =item L<XAS::Lib::Process::Win32|XAS::Lib::Process::Win32>
817              
818             =item L<XAS|XAS>
819              
820             =back
821              
822             =head1 AUTHOR
823              
824             Kevin L. Esteb, E<lt>kevin@kesteb.usE<gt>
825              
826             =head1 COPYRIGHT AND LICENSE
827              
828             Copyright (c) 2012-2015 Kevin L. Esteb
829              
830             This is free software; you can redistribute it and/or modify it under
831             the terms of the Artistic License 2.0. For details, see the full text
832             of the license at http://www.perlfoundation.org/artistic_license_2_0.
833              
834             =cut