File Coverage

blib/lib/XAS/Lib/Process.pm
Criterion Covered Total %
statement 20 258 7.7
branch 1 76 1.3
condition 0 16 0.0
subroutine 7 33 21.2
pod 8 8 100.0
total 36 391 9.2


line stmt bran cond sub pod time code
1             package XAS::Lib::Process;
2              
3             our $VERSION = '0.02';
4              
5             my $mixin;
6              
7             BEGIN {
8 1     1   1382 $mixin = 'XAS::Lib::Process::Unix';
9 1 50       22 $mixin = 'XAS::Lib::Process::Win32' if ($^O eq 'MSWin32');
10             }
11              
12 1     1   428 use Set::Light;
  1         1172  
  1         23  
13 1     1   5 use Hash::Merge;
  1         1  
  1         32  
14 1     1   4 use Badger::Filesystem 'Cwd Dir File';
  1         1  
  1         14  
15 1     1   111 use XAS::Constants ':process CODEREF';
  1         1  
  1         4  
16 1     1   463 use POE qw(Wheel Driver::SysRW Filter::Line);
  1         1  
  1         4  
17              
18             use XAS::Class
19             debug => 0,
20             version => $VERSION,
21             base => 'XAS::Lib::POE::Service',
22             mixin => "XAS::Lib::Mixins::Process $mixin",
23             utils => ':validation dotid trim',
24             mutators => 'input_handle output_handle status retries',
25             accessors => 'pid exit_code exit_signal process ID merger',
26             vars => {
27             PARAMS => {
28             -command => 1,
29             -auto_start => { optional => 1, default => 1 },
30             -auto_restart => { optional => 1, default => 1 },
31             -environment => { optional => 1, default => {} },
32             -exit_codes => { optional => 1, default => '0,1' },
33             -exit_retries => { optional => 1, default => 5 },
34             -group => { optional => 1, default => 'nobody' },
35             -priority => { optional => 1, default => 0 },
36             -pty => { optional => 1, default => 0 },
37             -umask => { optional => 1, default => '0022' },
38             -user => { optional => 1, default => 'nobody' },
39             -redirect => { optional => 1, default => 0 },
40             -retry_delay => { optional => 1, default => 0 },
41             -input_driver => { optional => 1, default => POE::Driver::SysRW->new() },
42             -output_driver => { optional => 1, default => POE::Driver::SysRW->new() },
43             -input_filter => { optional => 1, default => POE::Filter::Line->new(Literal => "\n") },
44             -output_filter => { optional => 1, default => POE::Filter::Line->new(Literal => "\n") },
45             -directory => { optional => 1, default => Cwd, isa => 'Badger::Filesystem::Directory' },
46             -output_handler => { optional => 1, type => CODEREF, default => sub {
47 0           my $output = shift;
48 0           printf("%s\n", trim($output));
49             }
50             },
51             }
52             }
53 1     1   522 ;
  1         2  
  1         17  
54              
55             #use Data::Dumper;
56              
57             # ----------------------------------------------------------------------
58             # Public Methods
59             # ----------------------------------------------------------------------
60              
61             sub session_initialize {
62 0     0 1   my $self = shift;
63              
64 0           my $alias = $self->alias;
65              
66 0           $self->log->debug("$alias: entering session_initialize()");
67              
68 0           $poe_kernel->state('get_event', $self, '_get_event');
69 0           $poe_kernel->state('flush_event', $self, '_flush_event');
70 0           $poe_kernel->state('error_event', $self, '_error_event');
71 0           $poe_kernel->state('close_event', $self, '_close_event');
72 0           $poe_kernel->state('check_status', $self, '_check_status');
73 0           $poe_kernel->state('poll_child', $self, '_poll_child');
74 0           $poe_kernel->state('child_exit', $self, '_child_exit');
75              
76 0           $poe_kernel->state('start_process', $self, '_start_process');
77 0           $poe_kernel->state('stop_process', $self, '_stop_process');
78 0           $poe_kernel->state('pause_process', $self, '_pause_process');
79 0           $poe_kernel->state('resume_process', $self, '_resume_process');
80 0           $poe_kernel->state('kill_process', $self, '_kill_process');
81              
82              
83             # walk the chain
84              
85 0           $self->SUPER::session_initialize();
86              
87 0           $poe_kernel->post($alias, 'session_startup');
88              
89 0           $self->log->debug("$alias: leaving session_initialize()");
90              
91             }
92              
93             sub session_startup {
94 0     0 1   my $self = shift;
95              
96 0           my $alias = $self->alias;
97              
98 0           $self->log->debug("$alias: entering session_startup()");
99              
100 0 0         if ($self->auto_start) {
101              
102 0           $poe_kernel->call($alias, 'start_process');
103            
104             }
105              
106             # walk the chain
107              
108 0           $self->SUPER::session_startup();
109              
110 0           $self->log->debug("$alias: leaving session_startup()");
111              
112             }
113              
114             sub session_pause {
115 0     0 1   my $self = shift;
116              
117 0           my $alias = $self->alias;
118              
119 0           $self->log->debug("$alias: entering session_pause()");
120              
121 0           $poe_kernel->call($alias, 'pause_process');
122            
123             # walk the chain
124              
125 0           $self->SUPER::session_pause();
126              
127 0           $self->log->debug("$alias: leaving session_pause()");
128              
129             }
130              
131             sub session_resume {
132 0     0 1   my $self = shift;
133              
134 0           my $alias = $self->alias;
135              
136 0           $self->log->debug("$alias: entering session_resume()");
137              
138 0           $poe_kernel->call($alias, 'resume_process');
139              
140             # walk the chain
141              
142 0           $self->SUPER::session_resume();
143              
144 0           $self->log->debug("$alias: leaving session_resume()");
145              
146             }
147              
148             sub session_stop {
149 0     0 1   my $self = shift;
150              
151 0           my $alias = $self->alias;
152              
153 0           $self->log->debug("$alias: entering session_stop()");
154              
155 0           $self->kill_process();
156 0           $poe_kernel->sig_handled();
157              
158             # walk the chain
159              
160 0           $self->SUPER::session_stop();
161              
162 0           $self->log->debug("$alias: leaving session_stop()");
163              
164             }
165              
166             sub session_shutdown {
167 0     0 1   my $self = shift;
168              
169 0           my $alias = $self->alias;
170              
171 0           $self->log->debug("$alias: entering session_shutdown()");
172              
173 0           $self->status(PROC_SHUTDOWN);
174              
175 0           $poe_kernel->call($alias, 'stop_process');
176 0           $poe_kernel->sig_handled();
177            
178             # walk the chain
179              
180 0           $self->SUPER::session_shutdown();
181              
182 0           $self->log->debug("$alias: leaving session_shutdown()");
183              
184             }
185              
186             sub put {
187 0     0 1   my $self = shift;
188 0           my ($chunk) = validate_params(\@_, [1]);
189              
190 0           my @chunks;
191 0           my $driver = $self->input_driver;
192 0           my $filter = $self->input_filter;
193              
194             # Avoid big bada boom if someone put()s on a dead wheel.
195              
196 0 0         unless ($self->input_handle) {
197              
198 0           $self->throw_msg(
199             dotid($self->class) . '.put_input.writerr',
200             'process_writerr',
201             'called put() on a wheel without an open INPUT handle'
202             );
203              
204             }
205            
206 0           push(@chunks, $chunk);
207              
208 0 0         if ($self->{'buffer'} = $driver->put($filter->put(\@chunks))) {
209              
210 0           $poe_kernel->select_resume_write($self->input_handle);
211              
212             }
213              
214 0           return 0;
215              
216             }
217              
218             sub DESTROY {
219 0     0     my $self = shift;
220              
221 0 0         if ($self->input_handle) {
222              
223 0           $poe_kernel->select_write($self->input_handle);
224 0           $self->input_handle(undef);
225              
226             }
227              
228 0 0         if ($self->output_handle) {
229              
230 0           $poe_kernel->select_read($self->output_handle);
231 0           $self->output_handle(undef);
232              
233             }
234              
235 0           $self->destroy();
236              
237 0           POE::Wheel::free_wheel_id($self->ID);
238              
239             }
240              
241             # ----------------------------------------------------------------------
242             # Public Events
243             # ----------------------------------------------------------------------
244              
245             # ----------------------------------------------------------------------
246             # Private Events
247             # ----------------------------------------------------------------------
248              
249             sub _start_process {
250 0     0     my $self = $_[OBJECT];
251              
252 0           my $count = 1;
253 0           my $alias = $self->alias;
254              
255 0 0         if ($self->status == PROC_STOPPED) {
256              
257 0           $self->start_process();
258 0           $poe_kernel->post($alias, 'check_status', $count);
259              
260             }
261              
262             }
263              
264             sub _resume_process {
265 0     0     my $self = $_[OBJECT];
266              
267 0           my $count = 1;
268 0           my $alias = $self->alias;
269              
270 0           $self->resume_process();
271 0           $poe_kernel->post($alias, 'check_status', $count);
272              
273             }
274              
275             sub _pause_process {
276 0     0     my $self = $_[OBJECT];
277              
278 0           my $count = 1;
279 0           my $alias = $self->alias;
280              
281 0           $self->pause_process();
282 0           $poe_kernel->post($alias, 'check_status', $count);
283              
284             }
285              
286             sub _stop_process {
287 0     0     my $self = $_[OBJECT];
288              
289 0           my $count = 1;
290 0           my $alias = $self->alias;
291              
292 0           $self->stop_process();
293 0           $poe_kernel->post($alias, 'check_status', $count);
294              
295             }
296              
297             sub _kill_process {
298 0     0     my $self = $_[OBJECT];
299              
300 0           my $count = 1;
301 0           my $alias = $self->alias;
302              
303 0           $self->kill_process();
304 0           $poe_kernel->post($alias, 'check_status', $count);
305              
306             }
307              
308             sub _get_event {
309 0     0     my ($self, $output, $wheel) = @_[OBJECT,ARG0,ARG1];
310              
311 0           $self->output_handler->($output);
312              
313             }
314              
315             sub _check_status {
316 0     0     my ($self, $count) = @_[OBJECT, ARG0];
317              
318 0           my $alias = $self->alias;
319 0           my $stat = $self->stat_process();
320              
321 0           $self->log->debug(sprintf('%s: check_status: process: %s, status: %s, count %s', $alias, $stat, $self->status, $count));
322              
323 0           $count++;
324              
325 0 0         if ($self->status == PROC_STARTED) {
    0          
    0          
    0          
    0          
326              
327 0 0 0       if (($stat == 3) || ($stat == 2)) {
328              
329 0           $self->status(PROC_RUNNING);
330 0           $self->log->info_msg('process_started', $alias, $self->pid);
331              
332             } else {
333              
334 0           $poe_kernel->delay('check_status', 5, $count);
335              
336             }
337              
338             } elsif ($self->status == PROC_RUNNING) {
339              
340 0 0 0       if (($stat != 3) || ($stat != 2)) {
341              
342 0           $self->resume_process();
343 0           $poe_kernel->delay('check_status', 5, $count);
344              
345             }
346              
347             } elsif ($self->status == PROC_PAUSED) {
348              
349 0 0         if ($stat != 6) {
350              
351 0           $self->pause_process();
352 0           $poe_kernel->delay('check_status', 5, $count);
353              
354             }
355              
356             } elsif ($self->status == PROC_STOPPED) {
357              
358 0 0         if ($stat != 0) {
359              
360 0           $self->stop_process();
361 0           $poe_kernel->delay('check_status', 5, $count);
362              
363             }
364              
365             } elsif($self->status == PROC_KILLED) {
366              
367 0 0         if ($stat != 0) {
368              
369 0           $self->kill_process();
370 0           $poe_kernel->delay('check_status', 5, $count);
371              
372             }
373              
374             }
375              
376             }
377              
378             sub _flush_event {
379 0     0     my ($self, $wheel) = @_[OBJECT,ARG0];
380              
381 0           my $alias = $self->alias;
382              
383 0           $self->log->debug("$alias: flush_event");
384              
385             }
386              
387             sub _error_event {
388 0     0     my ($self, $operation, $errno, $errstr, $wheel, $type) = @_[OBJECT,ARG0..ARG4];
389              
390 0           my $alias = $self->alias;
391              
392 0           $self->log->debug(
393             sprintf('%s: error_event - ops: %s, errno: %s, errstr: %s',
394             $alias, $operation, $errno, $errstr)
395             );
396              
397             }
398              
399             sub _close_event {
400 0     0     my ($self, $wheel) = @_[OBJECT,ARG0];
401              
402 0           my $alias = $self->alias;
403              
404 0           $self->log->debug("$alias: close_event");
405              
406 0           $poe_kernel->select_write($self->input_handle);
407 0           $self->input_handle(undef);
408              
409 0           $poe_kernel->select_read($self->output_handle);
410 0           $self->output_handle(undef);
411              
412             }
413              
414             sub _child_exit {
415 0     0     my ($self, $signal, $pid, $exitcode) = @_[OBJECT,ARG0...ARG2];
416              
417 0           my $alias = $self->alias;
418 0           my $status = $self->status;
419 0           my $retries = $self->retries;
420              
421 0           $self->{'pid'} = undef;
422 0           $self->{'exit_code'} = $exitcode >> 8;
423 0           $self->{'exit_signal'} = $exitcode & 127;
424              
425 0           $self->log->warn_msg('process_exited', $alias, $pid, $self->exit_code, $self->exit_signal);
426              
427 0 0         if ($status == PROC_STOPPED) {
428              
429 0 0         if ($self->auto_restart) {
430              
431 0 0 0       if (($retries < $self->exit_retries) || ($self->exit_retries < 0)) {
432              
433 0           $retries += 1;
434 0           $self->retries($retries);
435              
436 0 0         if ($self->exit_codes->has($self->exit_code)) {
437              
438 0 0         if ($self->retry_delay) {
439              
440 0           $poe_kernel->delay('start_process', $self->retry_delay);
441              
442             } else {
443              
444 0           $poe_kernel->call($alias, 'start_process');
445              
446             }
447              
448             } else {
449              
450 0   0       $self->log->warn_msg(
      0        
451             'process_unknown_exitcode',
452             $alias,
453             $self->exit_code || '',
454             $self->exit_signal || '',
455             );
456              
457             }
458              
459             } else {
460              
461 0           $self->log->warn_msg('process_nomore_retries', $alias, $retries);
462              
463             }
464              
465             } else {
466              
467 0           $self->log->warn_msg('process_no_autorestart', $alias);
468              
469             }
470              
471             }
472              
473             }
474              
475             # ----------------------------------------------------------------------
476             # Private Methods
477             # ----------------------------------------------------------------------
478              
479             # stolen from POE::Wheel::Run - more or less
480              
481             sub _process_output {
482 0     0     my $self = shift;
483              
484 0           my $id = $self->ID;
485 0           my $driver = $self->output_driver;
486 0           my $filter = $self->output_filter;
487 0           my $output = $self->output_handle;
488 0           my $state = ref($self) . "($id) -> select output";
489              
490 0 0 0       if ($filter->can('get_one') and $filter->can('get_one_start')) {
491              
492             $poe_kernel->state(
493             $state,
494             sub {
495 0     0     my ($k, $me, $handle) = @_[KERNEL, SESSION, ARG0];
496 0 0         if (defined(my $raw = $driver->get($handle))) {
497 0           $filter->get_one_start($raw);
498 0           while (1) {
499 0           my $next_rec = $filter->get_one();
500 0 0         last unless @$next_rec;
501 0           foreach my $cooked (@$next_rec) {
502 0           $k->call($me, 'get_event', $cooked, $id);
503             }
504             }
505             } else {
506 0           $k->call($me, 'error_event', 'read', ($!+0), $!, $id, 'OUTPUT');
507 0           $k->call($me, 'close_event', $id);
508 0           $k->select_read($handle);
509             }
510             }
511 0           );
512              
513             } else {
514              
515             $poe_kernel->state(
516             $state,
517             sub {
518 0     0     my ($k, $me, $handle) = @_[KERNEL, SESSION, ARG0];
519 0 0         if (defined(my $raw = $driver->get($handle))) {
520 0           foreach my $cooked (@{$filter->get($raw)}) {
  0            
521 0           $k->call($me, 'get_event', $cooked, $id);
522             }
523             } else {
524 0           $k->call($me, 'error_event', 'read', ($!+0), $!, $id, 'OUTPUT');
525 0           $k->call($me, 'close_event', $id);
526 0           $k->select_read($handle);
527             }
528             }
529 0           );
530              
531             }
532              
533 0           $poe_kernel->select_read($output, $state);
534              
535             }
536              
537             sub _process_input {
538 0     0     my $self = shift;
539              
540 0           my $id = $self->ID;
541 0           my $driver = $self->input_driver;
542 0           my $filter = $self->input_filter;
543 0           my $input = $self->input_handle;
544 0           my $buffer = \$self->{'buffer'};
545 0           my $state = ref($self) . "($id) -> select input";
546              
547             $poe_kernel->state(
548             $state,
549             sub {
550 0     0     my ($k, $me, $handle) = @_[KERNEL,SESSION,ARG0];
551 0           $$buffer = $driver->flush($handle);
552             # When you can't write, nothing else matters.
553 0 0         if ($!) {
554 0           $k->call($me, 'error_event', 'write', ($!+0), $!, $id, 'INPUT');
555 0           $k->select_write($handle);
556             } else {
557             # Could write, or perhaps couldn't but only because the
558             # filehandle's buffer is choked.
559             # All chunks written; fire off a "flushed" event.
560 0 0         unless ($$buffer) {
561 0           $k->select_pause_write($handle);
562 0           $k->call($me, 'flush_event', $id);
563             }
564             }
565             }
566 0           );
567              
568 0           $poe_kernel->select_write($input, $state);
569              
570             # Pause the write select immediately, unless output is pending.
571              
572 0 0         $poe_kernel->select_pause_write($input) unless ($buffer);
573              
574             }
575              
576             # Stolen from Proc::Background - more or less
577              
578             sub _resolve_path {
579 0     0     my $self = shift;
580 0           my $command = shift;
581 0           my $extensions = shift;
582 0           my $xpaths = shift;
583              
584             # Make the path to the progam absolute if it isn't already. If the
585             # path is not absolute and if the path contains a directory element
586             # separator, then only prepend the current working to it. If the
587             # path is not absolute, then look through the PATH environment to
588             # find the executable.
589              
590 0           my $alias = $self->alias;
591 0           my $path = File($command);
592              
593 0 0         if ($path->is_absolute) {
    0          
594              
595 0 0         if ($path->exists) {
596              
597 0           return $path->absolute;
598              
599             }
600              
601             } elsif ($path->is_relative) {
602              
603 0 0         if ($path->name eq $path) {
604              
605 0           foreach my $xpath (@$xpaths) {
606              
607 0 0         next if ($xpath eq '');
608              
609 0 0         if ($path->extension) {
610              
611 0           my $p = File($xpath, $path->name);
612              
613 0 0         if ($p->exists) {
614              
615 0           return $p->absolute;
616              
617             }
618              
619             } else {
620              
621 0           foreach my $ext (@$extensions) {
622              
623 0           my $p = File($xpath, $path->basename . $ext);
624              
625 0 0         if ($p->exists) {
626              
627 0           return $p->absolute;
628              
629             }
630              
631             }
632              
633             }
634              
635             }
636              
637             } else {
638              
639 0           my $p = File($path->absoulte);
640              
641 0 0         if ($p->exists) {
642              
643 0           return $p->absolute;
644              
645             }
646              
647             }
648              
649             }
650              
651             $self->throw_msg(
652 0           dotid($self->class) . '.resolve_path.path',
653             'location',
654             $alias, $command
655             );
656              
657              
658             }
659              
660             sub init {
661 0     0 1   my $class = shift;
662              
663 0           my $self = $class->SUPER::init(@_);
664              
665 0           my @exit_codes = split(',', $self->exit_codes);
666              
667 0           $self->{'exit_codes'} = Set::Light->new(@exit_codes);
668 0           $self->{'ID'} = POE::Wheel::allocate_wheel_id();
669 0           $self->{'merger'} = Hash::Merge->new('RIGHT_PRECEDENT');
670              
671 0           $self->retries(1);
672 0           $self->init_process();
673 0           $self->status(PROC_STOPPED);
674              
675 0           return $self;
676              
677             }
678              
679             1;
680              
681             __END__