File Coverage

blib/lib/HPC/Runner/Command/submit_jobs/Utils/Scheduler/ResolveDeps.pm
Criterion Covered Total %
statement 44 212 20.7
branch 0 38 0.0
condition 0 3 0.0
subroutine 15 28 53.5
pod 6 11 54.5
total 65 292 22.2


line stmt bran cond sub pod time code
1             package HPC::Runner::Command::submit_jobs::Utils::Scheduler::ResolveDeps;
2 1     1   432 use 5.010;
  1         4  
3              
4 1     1   5 use Moose::Role;
  1         2  
  1         6  
5 1     1   4121 use List::MoreUtils qw(natatime);
  1         2  
  1         5  
6 1     1   542 use List::Util qw(first);
  1         2  
  1         56  
7 1     1   6 use Storable qw(dclone);
  1         2  
  1         32  
8 1     1   4 use Data::Dumper;
  1         2  
  1         37  
9 1     1   5 use Algorithm::Dependency::Source::HoA;
  1         2  
  1         26  
10 1     1   304 use Algorithm::Dependency::Ordered;
  1         376  
  1         25  
11 1     1   6 use HPC::Runner::Command::submit_jobs::Utils::Scheduler::Batch;
  1         2  
  1         19  
12 1     1   5 use POSIX;
  1         2  
  1         7  
13 1     1   2307 use String::Approx qw(amatch);
  1         2858  
  1         55  
14 1     1   8 use Text::ASCIITable;
  1         2  
  1         30  
15 1     1   4 use Try::Tiny;
  1         2  
  1         42  
16 1     1   5 use Memoize;
  1         2  
  1         32  
17 1     1   306 use Array::Compare;
  1         122969  
  1         1563  
18              
19             with
20             'HPC::Runner::Command::submit_jobs::Utils::Scheduler::ResolveDeps::BuildTaskDeps';
21              
22             =head1 HPC::Runner::Command::submit_jobs::Utils::Scheduler::ResolveDeps;
23              
24             Once we have parsed the input file parse each job_type for job_batches
25              
26             =head2 Attributes
27              
28             =cut
29              
30             =head3 schedule
31              
32             Schedule our jobs
33              
34             =cut
35              
36             has 'schedule' => (
37             traits => ['Array'],
38             is => 'rw',
39             isa => 'ArrayRef',
40             default => sub { [] },
41             handles => {
42             all_schedules => 'elements',
43             add_schedule => 'push',
44             has_schedules => 'count',
45             clear_schedule => 'clear',
46             has_no_schedules => 'is_empty',
47             },
48             );
49              
50             has 'batch_tags' => (
51             is => 'rw',
52             isa => 'HashRef',
53             default => sub {
54             return {};
55             }
56             );
57              
58             =head2 Subroutines
59              
60             =cut
61              
62             #Just putting this here
63             #scontrol update job=9314_2 Dependency=afterok:9320_1
64              
65             =head3 schedule_jobs
66              
67             Use Algorithm::Dependency to schedule the jobs
68              
69             Catch any scheduling errors not caught by the sanity check
70              
71             =cut
72              
73             sub schedule_jobs {
74 0     0 1   my $self = shift;
75              
76 0           my $source =
77             Algorithm::Dependency::Source::HoA->new( $self->graph_job_deps );
78              
79 0           my $dep = Algorithm::Dependency::Ordered->new(
80             source => $source,
81             selected => []
82             );
83              
84             try {
85 0     0     $self->schedule( $dep->schedule_all );
86             }
87             catch {
88             # $DB::single=2;
89 0     0     $self->app_log->fatal( 'There was a problem creating your schedule.'
90             . ' Please ensure there are no cyclic dependencies. '
91             . 'Aborting mission!' );
92 0           $self->app_log->fatal($@);
93 0           exit 1;
94             }
95              
96 0           }
97              
98             =head3 sanity_check_schedule
99              
100             Run a sanity check on the schedule. All the job deps should have existing job names
101              
102             =cut
103              
104             sub sanity_check_schedule {
105 0     0 1   my $self = shift;
106              
107 0           $DB::single = 2;
108              
109 0           my @jobnames = keys %{ $self->graph_job_deps };
  0            
110 0           @jobnames = sort(@jobnames);
111 0           my $search = 1;
112 0           my $t = Text::ASCIITable->new();
113              
114 0           my $x = 0;
115              
116 0           my @rows = ();
117              
118             #Search the dependencies for matching jobs
119 0           foreach my $job (@jobnames) {
120 0           $DB::single = 2;
121 0           my $row = [];
122 0           my $ref = $self->graph_job_deps->{$job};
123 0           push( @$row, $job );
124              
125 0           my $y = 0;
126 0           my $depstring;
127              
128             #TODO This should be a proper error
129 0           foreach my $r (@$ref) {
130 0           $DB::single = 2;
131              
132 0 0         if ( !exists $self->graph_job_deps->{$r} ) {
    0          
133 0           $ref->[$y] = "**$r**";
134              
135 0           $self->app_log->fatal("Job dep $r is not in joblist.");
136 0           $search = 0;
137              
138 0           my @matches = amatch( $r, @jobnames );
139 0 0         if (@matches) {
140 0           push( @$row, join( ", ", @matches ) );
141 0           $self->app_log->warn( "Did you mean ( "
142             . join( ", ", @matches )
143             . " ) instead of $r?" );
144             }
145             else {
146 0           $self->app_log->fatal(
147             "No potential matches were found for dependency $r");
148             }
149             }
150             elsif ( "$r" eq "$job" ) {
151 0           $self->app_log->fatal(
152             "Job dep $r deps upon itself. This schedule is not possible."
153             );
154             }
155              
156 0           $y++;
157             }
158              
159 0           $depstring = join( ", ", @{$ref} );
  0            
160 0           push( @$row, $depstring );
161              
162 0           my $count_cmd = $self->jobs->{$job}->cmd_counter;
163 0           push( @$row, $count_cmd );
164              
165 0           push( @rows, $row );
166 0           $x++;
167             }
168              
169             #IF there are no broken dependencies - return
170 0 0         return $search if $search;
171              
172 0           $t->setCols( [ "JobName", "Deps", "Suggested" ] );
173 0           map { $t->addRow($_) } @rows;
  0            
174 0           $self->app_log->fatal(
175             'There were one or more problems with your job schedule.');
176 0           $self->app_log->warn(
177             "Here is your tabular dependency list in alphabetical order");
178 0           $self->app_log->info( "\n\n" . $t );
179              
180 0           return $search;
181             }
182              
183             =head3 chunk_commands
184              
185             Chunk commands per job into batches
186              
187             #TODO Clean this up
188              
189             =cut
190              
191             sub chunk_commands {
192 0     0 1   my $self = shift;
193              
194 0           $self->reset_cmd_counter;
195 0           $self->reset_batch_counter;
196              
197 0 0         return if $self->has_no_schedules;
198              
199 0           $self->clear_scheduler_ids();
200              
201 0           $self->chunk_commands_jobs;
202              
203 0           $self->reset_job_counter;
204 0           $self->reset_cmd_counter;
205 0           $self->reset_batch_counter;
206              
207             }
208              
209             sub chunk_commands_jobs {
210 0     0 0   my $self = shift;
211              
212 0           foreach my $job ( $self->all_schedules ) {
213              
214 0           $self->current_job($job);
215              
216 0 0         next unless $self->jobs->{ $self->current_job };
217              
218 0           $self->reset_cmd_counter;
219 0           $self->reset_batch_counter;
220              
221             # $self->jobs->{ $self->current_job }->{batch_index_start} =
222             # $self->batch_counter;
223              
224 0           $self->chunk_commands_jobs_check;
225 0 0         next unless $self->jobs->{ $self->current_job }->cmd_counter;
226              
227             my $commands_per_node =
228 0           $self->jobs->{ $self->current_job }->commands_per_node;
229              
230 0           my @cmds = @{ $self->parse_cmd_file };
  0            
231              
232 0           my $iter = natatime $commands_per_node, @cmds;
233              
234 0           $self->assign_batches($iter);
235 0           $self->assign_batch_stats( scalar @cmds );
236              
237 0           $self->inc_job_counter;
238              
239             my $batch_index_start =
240 0           $self->jobs->{ $self->current_job }->{batch_index_start};
241              
242             my $batch_index_end =
243             $self->jobs->{ $self->current_job }->{batch_index_end}
244 0   0       || $self->jobs->{ $self->current_job }->{batch_index_start};
245              
246             my $number_of_batches =
247 0           $self->jobs->{ $self->current_job }->{num_job_arrays};
248              
249 0           $self->return_ranges( $batch_index_start, $batch_index_end,
250             $number_of_batches );
251             }
252             }
253              
254             sub chunk_commands_jobs_check {
255 0     0 0   my $self = shift;
256              
257 0 0         if ( !$self->jobs->{ $self->current_job }->can('count_cmds') ) {
258 0           warn
259             "You seem to be mixing and matching job dependency declaration types! Here there be dragons! We are dying now.\n";
260 0           exit 1;
261             }
262             }
263              
264             sub assign_num_max_array {
265 0     0 0   my $self = shift;
266 0           my $job = shift;
267              
268 0           my $commands_per_node = $self->jobs->{$job}->commands_per_node;
269              
270 0 0         $self->max_array_size(1) if $self->use_batches;
271              
272             my $number_of_batches =
273             resolve_max_array_size( $self->max_array_size, $commands_per_node,
274 0           $self->jobs->{$job}->cmd_counter );
275              
276 0           $self->jobs->{$job}->{num_job_arrays} = $number_of_batches;
277             }
278              
279             sub parse_cmd_file {
280 0     0 0   my $self = shift;
281              
282 0           my $fh = $self->job_files->{ $self->current_job };
283 0           seek( $fh, 0, 0 );
284              
285 0           my @cmds = ();
286 0           my $cmd = '';
287 0           while (<$fh>) {
288 0           my $line = $_;
289              
290 0 0         next unless $line;
291              
292 0           $cmd .= $line;
293 0 0         next if $line =~ m/\\$/;
294 0 0         next if $line =~ m/^#/;
295 0           push( @cmds, $cmd );
296 0           $cmd = '';
297             }
298 0           return \@cmds;
299             }
300              
301             #TODO put arrays in oen place, batches in another
302              
303             =head3 resolve_max_array_size
304              
305             Arrays should not be greater than the max_array_size variable
306              
307             If it is they need to be chunked up into various arrays
308              
309             Each array becomes its own 'batch'
310              
311             =cut
312              
313             memoize('resolve_max_array_size');
314              
315             sub resolve_max_array_size {
316             my $max_array_size = shift;
317             my $commands_per_node = shift;
318             my $cmd_size = shift;
319              
320             my $num_arrays = $cmd_size / $max_array_size;
321             $num_arrays = $num_arrays / $commands_per_node;
322              
323             my $ceil = POSIX::ceil($num_arrays);
324              
325             return POSIX::ceil($num_arrays);
326             }
327              
328             #TODO get rid of this
329             sub return_ranges {
330 0     0 0   my $self = shift;
331              
332 0           my $batch_start = shift;
333 0           my $batch_end = shift;
334 0           my $num_arrays = shift;
335              
336 0           my $new_array;
337 0 0         if ( $num_arrays == 1 ) {
338 0           $new_array = {
339             'batch_index_start' => $batch_start,
340             'batch_index_end' => $batch_end
341             };
342 0           $self->jobs->{ $self->current_job }->add_batch_indexes($new_array);
343 0           return;
344             }
345              
346 0           my $x = $batch_start;
347              
348 0           my $array_ref = [];
349 0           while ( $x <= $batch_end ) {
350 0           my $t_batch_end = $x + $self->max_array_size - 1;
351 0 0         if ( $t_batch_end < $batch_end ) {
352 0           $new_array = {
353             'batch_index_start' => $x,
354             'batch_index_end' => $t_batch_end,
355             };
356             }
357             else {
358 0           $new_array = {
359             'batch_index_start' => $x,
360             'batch_index_end' => $batch_end,
361             };
362             }
363 0           $x += $self->max_array_size;
364 0           $self->jobs->{ $self->current_job }->add_batch_indexes($new_array);
365             }
366              
367 0           return;
368             }
369              
370             =head3 assign_batch_stats
371              
372             Iterate through the batches to assign stats (number of batches per job, number of tasks per command, etc)
373              
374             =cut
375              
376             sub assign_batch_stats {
377 0     0 1   my $self = shift;
378 0           my $cmd_count = shift;
379              
380 0           $self->jobs->{ $self->current_job }->{cmd_start} = $self->total_cmd_counter;
381 0           foreach my $batch ( @{ $self->jobs->{ $self->current_job }->batches } ) {
  0            
382 0           $self->current_batch($batch);
383              
384             #Counter per job
385             $self->inc_cmd_counter(
386 0           $self->jobs->{ $self->current_job }->commands_per_node );
387              
388             #Total counter
389             $self->inc_total_cmd_counter(
390 0           $self->jobs->{ $self->current_job }->commands_per_node );
391              
392 0           $self->job_stats->collect_stats( $self->batch_counter,
393             $self->cmd_counter, $self->current_job );
394              
395 0           $self->inc_batch_counter;
396 0           $self->reset_cmd_counter;
397             }
398             }
399              
400             =head3 assign_batches
401              
402             Each jobtype has one or more batches
403             iterate over the the batches to get some data and assign s
404              
405             For batches - each HPC::Runner::Command::submit_jobs::Utils::Scheduler::Batch
406              
407             is an element in the array
408             Each element could has commands_per_node tasks
409              
410             =cut
411              
412             sub assign_batches {
413 0     0 1   my $self = shift;
414 0           my $iter = shift;
415              
416 0           my $x = 0;
417 0           my $y = 1;
418 0           $self->jobs->{ $self->current_job }->{batch_index_start} = 1;
419 0           while ( my @vals = $iter->() ) {
420              
421 0           my $batch_cmds = \@vals;
422 0           my $batch_tags = $self->assign_batch_tags($batch_cmds);
423             push(
424 0           @{ $self->batch_tags->{ $self->current_job } },
  0            
425             dclone($batch_tags)
426             );
427              
428             ##TODO Possibly get rid of this in next release?
429 0           my $batch_ref =
430             HPC::Runner::Command::submit_jobs::Utils::Scheduler::Batch->new(
431             batch_tags => $batch_tags,
432             job => $self->current_job,
433             cmd_count => scalar @vals,
434             cmd_start => $y,
435             );
436              
437 0           $self->jobs->{ $self->current_job }->add_batches($batch_ref);
438             $self->jobs->{ $self->current_job }->submit_by_tags(1)
439 0 0         if @{$batch_tags};
  0            
440              
441 0           $x++;
442 0           $y = $y + $self->jobs->{ $self->current_job }->commands_per_node;
443             }
444              
445 0           $self->jobs->{ $self->current_job }->{batch_index_end} = $x;
446             }
447              
448             =head3 assign_batch_tags
449              
450             Parse the #TASK lines to get batch_tags
451             #TODO We should do this while are reading in the file
452              
453             =cut
454              
455             sub assign_batch_tags {
456 0     0 1   my $self = shift;
457 0           my $batch_cmds = shift;
458              
459 0           my @batch_tags = ();
460              
461 0           foreach my $lines ( @{$batch_cmds} ) {
  0            
462              
463 0           my @lines = split( "\n", $lines );
464              
465 0           foreach my $line (@lines) {
466              
467             #TODO Change this to TASK
468 0 0         next unless $line =~ m/^#TASK/;
469 0           chomp($line);
470 0           my ( $t1, $t2 ) = parse_meta($line);
471              
472 0 0         next unless $t2;
473 0           my @tags = split( ",", $t2 );
474              
475 0 0         if ( $t1 eq 'tags' ) {
476 0 0         map { push( @batch_tags, $_ ) if $_ } @tags;
  0            
477             }
478             else {
479 0           $self->app_log->warn(
480             'You are using an unknown directive with #TASK '
481             . "\n$line\nDirectives should be one of 'tags' or 'deps'"
482             );
483             }
484             }
485             }
486              
487 0           return \@batch_tags;
488             }
489              
490             memoize('parse_meta');
491              
492             sub parse_meta {
493             my $line = shift;
494             my ( @match, $t1, $t2 );
495              
496             @match = $line =~ m/ (\w+)=(.+)$/;
497             ( $t1, $t2 ) = ( $match[0], $match[1] );
498              
499             return ( $t1, $2 );
500             }
501              
502             1;