File Coverage

blib/lib/HPC/Runner/Command/submit_jobs/Utils/Scheduler/ResolveDeps.pm
Criterion Covered Total %
statement 44 207 21.2
branch 0 36 0.0
condition 0 3 0.0
subroutine 15 28 53.5
pod 6 11 54.5
total 65 285 22.8


line stmt bran cond sub pod time code
1             package HPC::Runner::Command::submit_jobs::Utils::Scheduler::ResolveDeps;
2 1     1   780 use 5.010;
  1         4  
3              
4 1     1   6 use Moose::Role;
  1         3  
  1         8  
5 1     1   7673 use List::MoreUtils qw(natatime);
  1         3  
  1         13  
6 1     1   944 use List::Util qw(first);
  1         3  
  1         108  
7 1     1   10 use Storable qw(dclone);
  1         4  
  1         160  
8 1     1   10 use Data::Dumper;
  1         3  
  1         80  
9 1     1   9 use Algorithm::Dependency::Source::HoA;
  1         4  
  1         39  
10 1     1   466 use Algorithm::Dependency::Ordered;
  1         732  
  1         43  
11 1     1   11 use HPC::Runner::Command::submit_jobs::Utils::Scheduler::Batch;
  1         4  
  1         38  
12 1     1   8 use POSIX;
  1         4  
  1         10  
13 1     1   4248 use String::Approx qw(amatch);
  1         5327  
  1         76  
14 1     1   10 use Text::ASCIITable;
  1         3  
  1         41  
15 1     1   5 use Try::Tiny;
  1         3  
  1         48  
16 1     1   8 use Memoize;
  1         4  
  1         50  
17 1     1   462 use Array::Compare;
  1         118218  
  1         2648  
18              
19             with 'HPC::Runner::Command::submit_jobs::Utils::Scheduler::ResolveDeps::BuildTaskDeps';
20              
21             =head1 HPC::Runner::Command::submit_jobs::Utils::Scheduler::ResolveDeps;
22              
23             Once we have parsed the input file parse each job_type for job_batches
24              
25             =head2 Attributes
26              
27             =cut
28              
29             =head3 schedule
30              
31             Schedule our jobs
32              
33             =cut
34              
35             has 'schedule' => (
36             traits => ['Array'],
37             is => 'rw',
38             isa => 'ArrayRef',
39             default => sub { [] },
40             handles => {
41             all_schedules => 'elements',
42             add_schedule => 'push',
43             has_schedules => 'count',
44             clear_schedule => 'clear',
45             has_no_schedules => 'is_empty',
46             },
47             );
48              
49             has 'batch_tags' => (
50             is => 'rw',
51             isa => 'HashRef',
52             default => sub {
53             return {};
54             }
55             );
56              
57             =head2 Subroutines
58              
59             =cut
60              
61             #Just putting this here
62             #scontrol update job=9314_2 Dependency=afterok:9320_1
63              
64             =head3 schedule_jobs
65              
66             Use Algorithm::Dependency to schedule the jobs
67              
68             Catch any scheduling errors not caught by the sanity check
69              
70             =cut
71              
72             sub schedule_jobs {
73 0     0 1   my $self = shift;
74              
75 0           my $source =
76             Algorithm::Dependency::Source::HoA->new( $self->graph_job_deps );
77              
78 0           my $dep = Algorithm::Dependency::Ordered->new(
79             source => $source,
80             selected => []
81             );
82              
83             try {
84 0     0     $self->schedule( $dep->schedule_all );
85             }
86             catch {
87 0     0     $self->app_log->fatal(
88             'There was a problem creating your schedule. Aborting mission!');
89 0           exit 1;
90             }
91              
92 0           }
93              
94             =head3 sanity_check_schedule
95              
96             Run a sanity check on the schedule. All the job deps should have existing job names
97              
98             =cut
99              
100             sub sanity_check_schedule {
101 0     0 1   my $self = shift;
102              
103             #$DB::single = 2;
104              
105 0           my @jobnames = keys %{ $self->graph_job_deps };
  0            
106 0           @jobnames = sort(@jobnames);
107 0           my $search = 1;
108 0           my $t = Text::ASCIITable->new();
109              
110 0           my $x = 0;
111              
112 0           my @rows = ();
113              
114             #Search the dependencies for matching jobs
115 0           foreach my $job (@jobnames) {
116 0           my $row = [];
117 0           my $ref = $self->graph_job_deps->{$job};
118 0           push( @$row, $job );
119              
120 0           my $y = 0;
121 0           my $depstring;
122              
123             #TODO This should be a proper error
124 0           foreach my $r (@$ref) {
125              
126 0 0         if ( !exists $self->graph_job_deps->{$r} ) {
127 0           $ref->[$y] = "**$r**";
128              
129 0           $self->app_log->fatal("Job dep $r is not in joblist.");
130 0           $search = 0;
131              
132 0           my @matches = amatch( $r, @jobnames );
133 0 0         if (@matches) {
134 0           push( @$row, join( ", ", @matches ) );
135 0           $self->app_log->warn( "Did you mean ( "
136             . join( ", ", @matches )
137             . " ) instead of $r?" );
138             }
139             else {
140 0           $self->app_log->fatal(
141             "No potential matches were found for dependency $r");
142             }
143             }
144              
145 0           $y++;
146             }
147              
148 0           $depstring = join( ", ", @{$ref} );
  0            
149 0           push( @$row, $depstring );
150              
151 0           my $count_cmd = $self->jobs->{$job}->cmd_counter;
152 0           push( @$row, $count_cmd );
153              
154 0           push( @rows, $row );
155 0           $x++;
156             }
157              
158             #IF there are no broken dependencies - return
159 0 0         return $search if $search;
160              
161 0           $t->setCols( [ "JobName", "Deps", "Suggested" ] );
162 0           map { $t->addRow($_) } @rows;
  0            
163 0           $self->app_log->fatal(
164             'There were one or more problems with your job schedule.');
165 0           $self->app_log->warn(
166             "Here is your tabular dependency list in alphabetical order");
167 0           $self->app_log->info( "\n\n" . $t );
168              
169 0           return $search;
170             }
171              
172              
173             =head3 chunk_commands
174              
175             Chunk commands per job into batches
176              
177             #TODO Clean this up
178              
179             =cut
180              
181             sub chunk_commands {
182 0     0 1   my $self = shift;
183              
184 0           $self->reset_cmd_counter;
185 0           $self->reset_batch_counter;
186              
187 0 0         return if $self->has_no_schedules;
188              
189 0           $self->clear_scheduler_ids();
190              
191 0           $self->chunk_commands_jobs;
192              
193 0           $self->reset_job_counter;
194 0           $self->reset_cmd_counter;
195 0           $self->reset_batch_counter;
196              
197             }
198              
199             sub chunk_commands_jobs {
200 0     0 0   my $self = shift;
201              
202 0           foreach my $job ( $self->all_schedules ) {
203              
204 0           $self->current_job($job);
205              
206 0 0         next unless $self->jobs->{ $self->current_job };
207              
208 0           $self->reset_cmd_counter;
209 0           $self->reset_batch_counter;
210              
211             # $self->jobs->{ $self->current_job }->{batch_index_start} =
212             # $self->batch_counter;
213              
214 0           $self->chunk_commands_jobs_check;
215 0 0         next unless $self->jobs->{ $self->current_job }->cmd_counter;
216              
217             my $commands_per_node =
218 0           $self->jobs->{ $self->current_job }->commands_per_node;
219              
220 0           my @cmds = @{ $self->parse_cmd_file };
  0            
221              
222 0           my $iter = natatime $commands_per_node, @cmds;
223              
224 0           $self->assign_batches($iter);
225 0           $self->assign_batch_stats( scalar @cmds );
226              
227 0           $self->inc_job_counter;
228              
229             my $batch_index_start =
230 0           $self->jobs->{ $self->current_job }->{batch_index_start};
231              
232             my $batch_index_end =
233             $self->jobs->{ $self->current_job }->{batch_index_end}
234 0   0       || $self->jobs->{ $self->current_job }->{batch_index_start};
235              
236             my $number_of_batches =
237 0           $self->jobs->{ $self->current_job }->{num_job_arrays};
238              
239 0           $self->return_ranges( $batch_index_start, $batch_index_end,
240             $number_of_batches );
241             }
242             }
243              
244             sub chunk_commands_jobs_check {
245 0     0 0   my $self = shift;
246              
247 0 0         if ( !$self->jobs->{ $self->current_job }->can('count_cmds') ) {
248 0           warn
249             "You seem to be mixing and matching job dependency declaration types! Here there be dragons! We are dying now.\n";
250 0           exit 1;
251             }
252             }
253              
254             sub assign_num_max_array {
255 0     0 0   my $self = shift;
256 0           my $job = shift;
257              
258 0           my $commands_per_node = $self->jobs->{$job}->commands_per_node;
259              
260 0 0         $self->max_array_size(1) if $self->use_batches;
261              
262             my $number_of_batches =
263             resolve_max_array_size( $self->max_array_size, $commands_per_node,
264 0           $self->jobs->{$job}->cmd_counter );
265              
266 0           $self->jobs->{$job}->{num_job_arrays} = $number_of_batches;
267             }
268              
269             sub parse_cmd_file {
270 0     0 0   my $self = shift;
271              
272 0           my $fh = $self->job_files->{ $self->current_job };
273 0           seek( $fh, 0, 0 );
274              
275 0           my @cmds = ();
276 0           my $cmd = '';
277 0           while (<$fh>) {
278 0           my $line = $_;
279              
280 0 0         next unless $line;
281              
282 0           $cmd .= $line;
283 0 0         next if $line =~ m/\\$/;
284 0 0         next if $line =~ m/^#/;
285 0           push( @cmds, $cmd );
286 0           $cmd = '';
287             }
288 0           return \@cmds;
289             }
290              
291             #TODO put arrays in oen place, batches in another
292              
293             =head3 resolve_max_array_size
294              
295             Arrays should not be greater than the max_array_size variable
296              
297             If it is they need to be chunked up into various arrays
298              
299             Each array becomes its own 'batch'
300              
301             =cut
302              
303             memoize('resolve_max_array_size');
304              
305             sub resolve_max_array_size {
306             my $max_array_size = shift;
307             my $commands_per_node = shift;
308             my $cmd_size = shift;
309              
310             my $num_arrays = $cmd_size / $max_array_size;
311             $num_arrays = $num_arrays / $commands_per_node;
312              
313             my $ceil = POSIX::ceil($num_arrays);
314              
315             return POSIX::ceil($num_arrays);
316             }
317              
318             #TODO get rid of this
319             sub return_ranges {
320 0     0 0   my $self = shift;
321              
322 0           my $batch_start = shift;
323 0           my $batch_end = shift;
324 0           my $num_arrays = shift;
325              
326 0           my $new_array;
327 0 0         if ( $num_arrays == 1 ) {
328 0           $new_array = {
329             'batch_index_start' => $batch_start,
330             'batch_index_end' => $batch_end
331             };
332 0           $self->jobs->{ $self->current_job }->add_batch_indexes($new_array);
333 0           return;
334             }
335              
336 0           my $x = $batch_start;
337              
338 0           my $array_ref = [];
339 0           while ( $x <= $batch_end ) {
340 0           my $t_batch_end = $x + $self->max_array_size - 1;
341 0 0         if ( $t_batch_end < $batch_end ) {
342 0           $new_array = {
343             'batch_index_start' => $x,
344             'batch_index_end' => $t_batch_end,
345             };
346             }
347             else {
348 0           $new_array = {
349             'batch_index_start' => $x,
350             'batch_index_end' => $batch_end,
351             };
352             }
353 0           $x += $self->max_array_size;
354 0           $self->jobs->{ $self->current_job }->add_batch_indexes($new_array);
355             }
356              
357 0           return;
358             }
359              
360             =head3 assign_batch_stats
361              
362             Iterate through the batches to assign stats (number of batches per job, number of tasks per command, etc)
363              
364             =cut
365              
366             sub assign_batch_stats {
367 0     0 1   my $self = shift;
368 0           my $cmd_count = shift;
369              
370             $self->jobs->{ $self->current_job }->{cmd_start} =
371 0           $self->total_cmd_counter;
372 0           foreach my $batch ( @{ $self->jobs->{ $self->current_job }->batches } ) {
  0            
373 0           $self->current_batch($batch);
374              
375             #Counter per job
376             $self->inc_cmd_counter(
377 0           $self->jobs->{ $self->current_job }->commands_per_node );
378              
379             #Total counter
380             $self->inc_total_cmd_counter(
381 0           $self->jobs->{ $self->current_job }->commands_per_node );
382              
383 0           $self->job_stats->collect_stats( $self->batch_counter,
384             $self->cmd_counter, $self->current_job );
385              
386 0           $self->inc_batch_counter;
387 0           $self->reset_cmd_counter;
388             }
389             }
390              
391             =head3 assign_batches
392              
393             Each jobtype has one or more batches
394             iterate over the the batches to get some data and assign s
395              
396             For batches - each HPC::Runner::Command::submit_jobs::Utils::Scheduler::Batch
397              
398             is an element in the array
399             Each element could has commands_per_node tasks
400              
401             =cut
402              
403             sub assign_batches {
404 0     0 1   my $self = shift;
405 0           my $iter = shift;
406              
407 0           my $x = 0;
408 0           my $y = 1;
409 0           $self->jobs->{$self->current_job}->{batch_index_start} = 1;
410 0           while ( my @vals = $iter->() ) {
411              
412 0           my $batch_cmds = \@vals;
413 0           my $batch_tags = $self->assign_batch_tags($batch_cmds);
414             push(
415 0           @{ $self->batch_tags->{ $self->current_job } },
  0            
416             dclone($batch_tags)
417             );
418              
419             ##TODO Possibly get rid of this in next release?
420 0           my $batch_ref =
421             HPC::Runner::Command::submit_jobs::Utils::Scheduler::Batch->new(
422             batch_tags => $batch_tags,
423             job => $self->current_job,
424             cmd_count => scalar @vals,
425             cmd_start => $y,
426             );
427              
428 0           $self->jobs->{ $self->current_job }->add_batches($batch_ref);
429             $self->jobs->{ $self->current_job }->submit_by_tags(1)
430 0 0         if @{$batch_tags};
  0            
431              
432 0           $x++;
433 0           $y = $y + $self->jobs->{ $self->current_job }->commands_per_node;
434             }
435              
436 0           $self->jobs->{$self->current_job}->{batch_index_end} = $x;
437             }
438              
439             =head3 assign_batch_tags
440              
441             Parse the #TASK lines to get batch_tags
442             #TODO We should do this while are reading in the file
443              
444             =cut
445              
446             sub assign_batch_tags {
447 0     0 1   my $self = shift;
448 0           my $batch_cmds = shift;
449              
450 0           my @batch_tags = ();
451              
452 0           foreach my $lines ( @{$batch_cmds} ) {
  0            
453              
454 0           my @lines = split( "\n", $lines );
455              
456 0           foreach my $line (@lines) {
457              
458             #TODO Change this to TASK
459 0 0         next unless $line =~ m/^#TASK/;
460 0           chomp($line);
461 0           my ( $t1, $t2 ) = parse_meta($line);
462              
463 0 0         next unless $t2;
464 0           my @tags = split( ",", $t2 );
465              
466 0 0         if ( $t1 eq 'tags' ) {
467 0 0         map { push( @batch_tags, $_ ) if $_ } @tags;
  0            
468             }
469             else {
470 0           $self->app_log->warn(
471             'You are using an unknown directive with #TASK '
472             . "\n$line\nDirectives should be one of 'tags' or 'deps'"
473             );
474             }
475             }
476             }
477              
478 0           return \@batch_tags;
479             }
480              
481             memoize('parse_meta');
482              
483             sub parse_meta {
484             my $line = shift;
485             my ( @match, $t1, $t2 );
486              
487             @match = $line =~ m/ (\w+)=(.+)$/;
488             ( $t1, $t2 ) = ( $match[0], $match[1] );
489              
490             return ( $t1, $2 );
491             }
492              
493              
494             1;