File Coverage

blib/lib/HPC/Runner/Command/execute_job/Utils/MCE.pm
Criterion Covered Total %
statement 24 129 18.6
branch 0 42 0.0
condition 0 6 0.0
subroutine 8 16 50.0
pod 3 8 37.5
total 35 201 17.4


line stmt bran cond sub pod time code
1             package HPC::Runner::Command::execute_job::Utils::MCE;
2              
3 1     1   601 use MooseX::App::Role;
  1         3  
  1         10  
4 1     1   6483 use namespace::autoclean;
  1         2  
  1         10  
5              
6 1     1   72 use MooseX::Types::Path::Tiny qw/Path Paths AbsPath AbsFile/;
  1         2  
  1         12  
7              
8             with 'HPC::Runner::Command::execute_job::Base';
9              
10 1     1   3574 use MCE;
  1         17125  
  1         5  
11 1     1   593 use MCE::Queue;
  1         7783  
  1         4  
12 1     1   182 use DateTime;
  1         2  
  1         19  
13 1     1   5 use DateTime::Format::Duration;
  1         2  
  1         35  
14 1     1   5 use Memoize;
  1         2  
  1         1302  
15              
16             =head1 HPC::Runner::App::MCE
17              
18             Execute the job.
19              
20             =cut
21              
22             =head2 Command Line Options
23              
24             =cut
25              
26             option 'commands' => (
27             is => 'rw',
28             isa => 'Num',
29             required => 0,
30             default => 1,
31             );
32              
33             =head2 read_command
34              
35             Commands in the command file are 0 indexed
36             The first command is 0
37              
38             =cut
39              
40             has 'read_command' => (
41             is => 'rw',
42             isa => 'Num|Undef',
43             required => 0,
44             predicate => 'has_read_command',
45             lazy => 1,
46             default => sub {
47             my $self = shift;
48             if ( $self->can('task_id') && $self->can('batch_index_start') ) {
49             return $self->task_id - $self->batch_index_start;
50             }
51             elsif ( $self->can('batch_index_start') ) {
52             return $self->batch_index_start - 1;
53             }
54             else {
55             $self->log->fatal(
56             'Not able to determine job execution type. Exiting.');
57             exit 1;
58             }
59             }
60             );
61              
62             =head3 single_node
63              
64             Enable this option if you want to run from a single node or workstation
65              
66             =cut
67              
68             option 'single_node' => (
69             traits => ['Bool'],
70             is => 'rw',
71             isa => 'Bool',
72             default => 0,
73             handles => {
74             'not_single_node' => 'not',
75             },
76             documentation => 'Run a job from a single node or workstation.',
77             );
78              
79             =head3 jobname
80              
81             Specify a job name, and jobs will be 001_jobname, 002_jobname, 003_jobname
82              
83             =cut
84              
85             option 'jobname' => (
86             is => 'rw',
87             isa => 'Str',
88             required => 0,
89             traits => ['String'],
90             default => q{job},
91             default => sub {
92             return
93             $ENV{SLURM_JOB_NAME}
94             || $ENV{SBATCH_JOB_NAME}
95             || $ENV{PBS_JOBNAME}
96             || $ENV{JOB_NAME}
97             || 'job';
98             },
99             predicate => 'has_jobname',
100             handles => {
101             add_jobname => 'append',
102             clear_jobname => 'clear',
103             },
104             documentation =>
105             q{Specify a job name, each job will be appended with its batch order},
106             );
107              
108             =head2 Attributes
109              
110             =cut
111              
112             has 'queue' => (
113             is => 'rw',
114             lazy => 0, ## must be 0 to ensure the queue is created prior to spawning
115             default => sub {
116             my $self = shift;
117             return MCE::Queue->new();
118             }
119             );
120              
121             has 'mce' => (
122             is => 'rw',
123             lazy => 1,
124             clearer => '_clear_mce',
125             default => sub {
126             my $self = shift;
127             return MCE->new(
128             max_workers => $self->procs,
129             use_threads => 0,
130             user_func => sub {
131             my $mce = shift;
132             while (1) {
133             my ( $counter, $cmd ) = $self->queue->dequeue(2);
134             last unless defined $counter;
135             $self->counter($counter);
136             $self->cmd($cmd);
137             $self->run_command_mce();
138             }
139             }
140             );
141             }
142             );
143              
144             has 'using_mce' => (
145             is => 'rw',
146             isa => 'Bool',
147             default => 1,
148             required => 1,
149             );
150              
151             =head2 Subroutines
152              
153             =cut
154              
155             =head3 go
156              
157             Initialize MCE queues
158              
159             =cut
160              
161             sub run_mce {
162 0     0 0   my $self = shift;
163              
164 0           my $dt1 = DateTime->now( time_zone => 'local' );
165              
166 0           $self->prepend_logfile("MAIN_");
167 0           $self->append_logfile(".log");
168 0           $self->log( $self->init_log );
169              
170 0           $self->mce->spawn;
171              
172             #MCE specific
173 0           $self->parse_file_mce;
174              
175             #$DB::single = 2;
176              
177             # MCE workers dequeue 2 elements at a time. Thus the reason for * 2.
178 0           $self->queue->enqueue( (undef) x ( $self->procs * 2 ) );
179              
180             # MCE will automatically shutdown after running for 1 or no args.
181 0           $self->mce->run(1);
182              
183             #End MCE specific
184              
185 0           my $dt2 = DateTime->now( time_zone => 'local' );
186 0           my $duration = $dt2 - $dt1;
187 0           my $format =
188             DateTime::Format::Duration->new(
189             pattern => '%e days, %H hours, %M minutes, %S seconds' );
190              
191 0           $self->log_main_messages( 'info',
192             "Total execution time " . $format->format_duration($duration) );
193 0           return;
194             }
195              
196             =head3 parse_file_mce
197              
198             The default method of parsing the file.
199              
200             #starts a comment
201             wait - says wait until all other processes/threads exitcode
202              
203             #this is a one line command
204             echo "starting"
205              
206             #This is a multiline command
207             echo "starting line 1" \
208             echo "starting line 2" \
209             echo "finishing
210              
211             =cut
212              
213             sub parse_file_mce {
214 0     0 1   my $self = shift;
215              
216 0           $self->process_table;
217              
218 0 0         my $fh = IO::File->new( $self->infile, q{<} )
219             or $self->log_main_messages( "fatal",
220             "Error opening file " . $self->infile . " " . $! );
221 0 0         die print "The infile does not exist!\n" unless $fh;
222              
223 0 0         if ( $self->single_node ) {
    0          
224 0           $self->log_main_messages( 'info', 'Running in single node mode' );
225 0           while (<$fh>) {
226 0           my $line = $_;
227 0           $self->process_lines($line);
228             }
229             }
230             elsif ( defined $self->read_command ) {
231 0           $self->log_main_messages( 'info',
232             'Executing Command # ' . $self->read_command );
233 0           my $cmds = $self->parse_cmd_file($fh);
234              
235 0           foreach my $cmd (@$cmds) {
236 0           map { $self->process_lines( $_ . "\n" ) } split( "\n", $cmd );
  0            
237 0           $self->wait(0);
238             }
239             }
240             else {
241 0           $self->log_main_messages( 'fatal', 'No running mode found. Exiting' );
242 0           exit 1;
243             }
244             }
245              
246             =head3 parse_cmd_file
247              
248             Parse the command file for the read_command
249             Commands are 0 indexed
250              
251             =cut
252              
253             sub parse_cmd_file {
254 0     0 1   my $self = shift;
255 0           my $fh = shift;
256              
257 0           my $x = 0;
258 0           my $add_cmds = 0;
259 0           my $cmd_count = 0;
260              
261 0           my @cmds = ();
262 0           my $cmd = '';
263              
264 0           while (<$fh>) {
265 0           my $line = $_;
266 0 0         next unless $line;
267 0 0         next unless $line =~ m/\S/;
268              
269 0           $cmd .= $line;
270 0 0         next if $line =~ m/\\$/;
271 0 0         next if $line =~ m/^#/;
272 0 0 0       if ( $x == $self->read_command && $cmd_count < $self->commands ) {
273 0           $add_cmds = 1;
274             }
275 0 0         if ($add_cmds) {
276 0           push( @cmds, $cmd );
277 0           $cmd_count++;
278             }
279 0           $x++;
280              
281 0 0 0       if ( $x >= $self->read_command && $cmd_count >= $self->commands ) {
282 0           last;
283             }
284 0           $cmd = '';
285             }
286              
287 0           close($fh);
288 0           return \@cmds;
289             }
290              
291             ##TODO separate out single node mode
292             sub process_lines {
293 0     0 0   my $self = shift;
294 0           my $line = shift;
295              
296 0 0         if ( $line =~ m/^#TASK/ ) {
297 0           $self->add_cmd($line);
298             }
299              
300 0 0         $self->check_single_node($line) if $self->single_node;
301              
302 0 0         return if $line =~ m/^#/;
303 0           $self->add_cmd($line);
304              
305             ##Bash style we continue to the next lime if the current line ends in \
306 0 0         return if $line =~ m/\\$/;
307 0 0         if ( $self->match_cmd(qr/^wait$/) ) {
308 0           $self->hold_pool;
309             }
310             else {
311 0           $self->add_pool;
312             }
313             }
314              
315             sub check_single_node {
316 0     0 0   my $self = shift;
317 0           my $line = shift;
318              
319 0 0         if ( $line =~ m/^#HPC jobname=/ ) {
320 0           $self->hold_pool;
321 0           $self->_clear_mce;
322 0           my ( $t1, $t2 ) = parse_meta($line);
323 0           $self->jobname($t2);
324             ##Trigger outdir
325 0           $self->logname($t2);
326 0           $self->logfile( $self->set_logfile );
327 0           $self->logdir( $self->set_logdir );
328             }
329 0 0         if ( $line =~ m/^#HPC procs=/ ) {
330 0           $self->hold_pool;
331 0           $self->_clear_mce;
332 0           my ( $t1, $t2 ) = parse_meta($line);
333 0           $self->procs($t2);
334 0           $self->hold_pool;
335             }
336             }
337              
338             sub add_pool {
339 0     0 0   my $self = shift;
340 0           $self->log_main_messages( 'debug', "Enqueuing command:\n\t" . $self->cmd );
341              
342             ##Task ID is the counter for the array
343 0 0         $self->task_id( $self->counter ) if $self->can('task_id');
344              
345 0           $self->queue->enqueue( $self->counter, $self->cmd );
346 0           $self->clear_cmd;
347 0           $self->inc_counter;
348             }
349              
350             sub hold_pool {
351 0     0 0   my $self = shift;
352              
353 0 0         $self->log_main_messages( 'debug', "Beginning command:\n\t" . $self->cmd )
354             if $self->has_cmd;
355 0 0         $self->log_main_messages( 'debug',
356             'Waiting for all threads to complete...' )
357             if $self->has_cmd;
358              
359 0           $self->wait(1);
360 0           push( @{ $self->jobref }, [] );
  0            
361 0           $self->queue->enqueue( (undef) x ( $self->procs * 2 ) );
362 0           $self->mce->run(0); # 0 indicates do not shutdown after running
363              
364 0           $self->log_main_messages( 'debug',
365             'All children have completed processing!' );
366 0           $self->clear_cmd;
367             }
368              
369             memoize('parse_meta');
370              
371             sub parse_meta {
372             my $line = shift;
373             my ( @match, $t1, $t2 );
374              
375             @match = $line =~ m/ (\w+)=(.+)$/;
376             ( $t1, $t2 ) = ( $match[0], $match[1] );
377              
378             return ( $t1, $2 );
379             }
380              
381             =head3 run_command_mce
382              
383             MCE knows which subcommand to use from Runner/MCE - object mce
384              
385             =cut
386              
387             sub run_command_mce {
388 0     0 1   my $self = shift;
389              
390 0           my $pid = $$;
391              
392             #$DB::single = 2;
393              
394 0           push( @{ $self->jobref->[-1] }, $pid );
  0            
395 0           $self->_log_commands($pid);
396              
397 0           return;
398             }
399              
400             =head1 AUTHOR
401              
402             Jillian Rowe E<lt>jillian.e.rowe@gmail.comE<gt>
403             Mario Roy E<lt>marioeroy@gmail.comE<gt>
404              
405             =cut
406              
407             1;