File Coverage

blib/lib/Event/ExecFlow/Job/Command.pm
Criterion Covered Total %
statement 104 146 71.2
branch 21 40 52.5
condition 1 3 33.3
subroutine 31 38 81.5
pod 0 33 0.0
total 157 260 60.3


line stmt bran cond sub pod time code
1             package Event::ExecFlow::Job::Command;
2              
3 2     2   10 use base qw( Event::ExecFlow::Job );
  2         3  
  2         1104  
4              
5 2     2   26 use Locale::TextDomain $Event::ExecFlow::locale_textdomain;
  2         5  
  2         16  
6              
7 2     2   457 use strict;
  2         4  
  2         67  
8 2     2   4582 use AnyEvent;
  2         18496  
  2         5571  
9              
10             # prevent warnings from AnyEvent
11             { package AnyEvent::Impl::Event::CondVar;
12             package AnyEvent::Impl::Event::Glib; }
13              
14 105     105 0 544 sub get_type { "command" }
15 0     0 0 0 sub get_exec_type { "async" }
16              
17             #------------------------------------------------------------------------
18              
19 10     10 0 31 sub get_command { shift->{command} }
20 36     36 0 169 sub get_fetch_output { shift->{fetch_output} }
21 10     10 0 41 sub get_node { shift->{node} }
22 0     0 0 0 sub get_output { shift->{output} }
23 36     36 0 184 sub get_progress_parser { shift->{progress_parser} }
24 10     10 0 47 sub get_got_exec_ok { shift->{got_exec_ok} }
25 10     10 0 32 sub get_configure_callback { shift->{configure_callback} }
26              
27 10     10 0 20 sub set_command { shift->{command} = $_[1] }
28 10     10 0 17 sub set_fetch_output { shift->{fetch_output} = $_[1] }
29 10     10 0 25 sub set_node { shift->{node} = $_[1] }
30 10     10 0 70 sub set_output { shift->{output} = $_[1] }
31 10     10 0 16 sub set_progress_parser { shift->{progress_parser} = $_[1] }
32 10     10 0 62 sub set_got_exec_ok { shift->{got_exec_ok} = $_[1] }
33 10     10 0 15 sub set_configure_callback { shift->{configure_callback} = $_[1] }
34              
35             #------------------------------------------------------------------------
36              
37 20     20 0 114 sub get_pids { shift->{pids} }
38 56     56 0 419 sub get_fh { shift->{fh} }
39 0     0 0 0 sub get_watcher { shift->{watcher} }
40 0     0 0 0 sub get_executed_command { shift->{executed_command} }
41              
42 20     20 0 119 sub set_pids { shift->{pids} = $_[1] }
43 30     30 0 118 sub set_fh { shift->{fh} = $_[1] }
44 30     30 0 77 sub set_watcher { shift->{watcher} = $_[1] }
45 10     10 0 53 sub set_executed_command { shift->{executed_command} = $_[1] }
46              
47             #------------------------------------------------------------------------
48              
49             sub new {
50 10     10 0 151 my $class = shift;
51 10         51 my %par = @_;
52 10         24 my ($command, $fetch_output, $node, $progress_parser) =
53             @par{'command','fetch_output','node','progress_parser'};
54 10         17 my ($configure_callback) =
55             $par{'configure_callback'};
56              
57 10         44 my $self = $class->SUPER::new(@_);
58              
59 10         22 $self->set_command($command);
60 10         20 $self->set_fetch_output($fetch_output);
61 10         21 $self->set_node($node);
62 10         23 $self->set_progress_parser($progress_parser);
63 10         20 $self->set_configure_callback($configure_callback);
64              
65 10         142 return $self;
66             }
67              
68             sub init {
69 10     10 0 19 my $self = shift;
70            
71 10         101 $self->SUPER::init();
72            
73 10         54 $self->set_pids([]);
74 10         34 $self->set_fh();
75 10         22 $self->set_watcher();
76 10         31 $self->set_output("");
77              
78 10         27 1;
79             }
80              
81             sub execute {
82 10     10 0 17 my $self = shift;
83              
84 10         31 $self->open_pipe;
85              
86 10         104 1;
87             }
88              
89             sub open_pipe {
90 10     10 0 17 my $self = shift;
91              
92 10         49 my $command = $self->get_command;
93              
94 10 50       36 if ( ref $command eq 'CODE' ) {
95 0         0 $Event::ExecFlow::JOB = $self;
96 0         0 $command = $command->($self);
97 0         0 $Event::ExecFlow::JOB = undef;
98             }
99              
100 10 50       62 if ( $self->get_configure_callback ) {
101 0         0 my $cb = $self->get_configure_callback;
102 0         0 $command = &$cb($command);
103             }
104              
105 10 50       39 if ( $self->get_node ) {
106 0         0 $command = $self->get_node->prepare_command($command, $self);
107             }
108              
109 10         576 $command =~ s/\s+$//;
110              
111 10 50       45 my $execflow = $command =~ /execflow/ ? "" : "execflow ";
112 10         34 $command = $execflow.$command;
113 10 50       52 $command .= " && echo EXECFLOW_OK" if $command !~ /EXECFLOW_OK/;
114              
115 10         157 $self->log (__x("Executing command: {command}", command => $command));
116 10 50       38 $Event::ExecFlow::DEBUG && print "Command(".$self->get_info."): command=$command\n";
117              
118 10         75 $self->set_executed_command($command);
119              
120 10         78 local $ENV{LC_ALL} = "C";
121 10         60 local $ENV{LANG} = "C";
122              
123 10 50       88815 my $pid = open (my $fh, "( $command ) 2>&1 |")
124             or die "can't fork '$command'";
125              
126             my $watcher = AnyEvent->io ( fh => $fh, poll => 'r', cb => sub {
127 46     46   4013752 $self->command_progress;
128 10         1098 });
129              
130 10         691 push @{$self->get_pids}, $pid;
  10         420  
131 10         218 $self->set_fh($fh);
132 10         106 $self->set_watcher($watcher);
133              
134 10         568 return $fh;
135             }
136              
137             sub close_pipe {
138 10     10 0 21 my $self = shift;
139            
140 10         47 $self->set_watcher(undef);
141            
142 10         110 close($self->get_fh);
143 10         44 $self->set_fh(undef);
144 10         58 $self->set_pids([]);
145              
146 10 50 33     68 if ( !$self->get_error_message && !$self->get_got_exec_ok ) {
147 0         0 $self->set_error_message(
148             "Command exits with failure code:\n".
149             "Command: ".$self->get_executed_command."\n\n".
150             "Output: ".$self->get_output
151             );
152             }
153            
154 10         22 1;
155             }
156              
157             sub command_progress {
158 46     46 0 121 my $self = shift;
159            
160 46         176 my $fh = $self->get_fh;
161              
162             #-- read and check for eof
163 46         80 my $buffer;
164 46 100       756 if ( !sysread($fh, $buffer, 4096) ) {
165 10         42 $self->close_pipe;
166 10         69 $self->execution_finished;
167 10         311 return;
168             }
169            
170             #-- get job's PID
171 36         210 my ($pid) = ( $buffer =~ /EXEC_FLOW_JOB_PID=(\d+)/ );
172 36 100       128 if ( defined $pid ) {
173 10         14 push @{$self->get_pids}, $pid;
  10         32  
174 10         75 $buffer =~ s/EXEC_FLOW_JOB_PID=(\d+)\n//;
175             }
176              
177             #-- succesfully executed?
178 36 100       208 if ( $buffer =~ s/EXECFLOW_OK\n// ) {
179 10         41 $self->set_got_exec_ok(1);
180             }
181              
182             #-- store output
183 36 50       179 if ( $self->get_fetch_output ) {
184 0         0 $self->{output} .= $buffer;
185             } else {
186 36         211 $self->{output} = substr($self->{output}.$buffer,-16384);
187             }
188              
189             #-- parse output & report progress
190 36         130 my $progress_parser = $self->get_progress_parser;
191 36 50       243 if ( ref $progress_parser eq 'CODE' ) {
    50          
192 0         0 $progress_parser->($self, $buffer);
193             }
194             elsif ( ref $progress_parser eq 'Regexp' ) {
195 36 100       697 if ( $buffer =~ $progress_parser ) {
196 20         154 $self->set_progress_cnt($1);
197             }
198             }
199              
200 36 100       245 $self->get_frontend->report_job_progress($self)
201             if $self->progress_has_changed;
202              
203 36         7851 1;
204             }
205              
206             sub cancel {
207 0     0 0   my $self = shift;
208              
209 0           $self->set_cancelled(1);
210              
211 0           my $pids = $self->get_pids;
212 0 0         return unless @{$pids};
  0            
213              
214 0           kill 9, @{$pids};
  0            
215              
216 0           $self->log(__x("Sending signal 9 to PID(s)")." ".join(", ", @{$pids}));
  0            
217              
218 0           1;
219             }
220              
221             sub pause_job {
222 0     0 0   my $self = shift;
223              
224 0           my $signal;
225 0 0         if ( $self->get_paused ) {
226 0           $signal = "STOP";
227             }
228             else {
229 0           $signal = "CONT";
230             }
231              
232 0           my $pids = $self->get_pids;
233 0 0         kill $signal, @{$pids} if @{$pids};
  0            
  0            
234            
235 0           1;
236             }
237              
238             sub backup_state {
239 0     0 0   my $self = shift;
240            
241 0           my $data_href = $self->SUPER::backup_state();
242            
243 0           delete $data_href->{configure_callback};
244 0           delete $data_href->{progress_parser};
245 0           delete $data_href->{node};
246 0           delete $data_href->{watcher};
247 0           delete $data_href->{fh};
248 0 0         delete $data_href->{command}
249             if ref $data_href->{command} eq 'CODE';
250              
251 0           return $data_href;
252             }
253              
254             1;
255              
256             __END__