File Coverage

blib/lib/HPC/Runner/Command/submit_jobs/Utils/Scheduler.pm
Criterion Covered Total %
statement 51 153 33.3
branch 0 38 0.0
condition n/a
subroutine 17 30 56.6
pod 12 13 92.3
total 80 234 34.1


line stmt bran cond sub pod time code
1             package HPC::Runner::Command::submit_jobs::Utils::Scheduler;
2              
3 1     1   1209 use MooseX::App::Role;
  1         5  
  1         14  
4 1     1   7223 use namespace::autoclean;
  1         3  
  1         9  
5              
6 1     1   72 use File::Path qw(make_path);
  1         3  
  1         97  
7 1     1   11 use File::Temp qw/ tempfile /;
  1         4  
  1         58  
8 1     1   6 use IO::Select;
  1         7  
  1         31  
9 1     1   5 use Cwd;
  1         2  
  1         47  
10 1     1   6 use IPC::Open3;
  1         1  
  1         43  
11 1     1   5 use Symbol;
  1         2  
  1         41  
12 1     1   399 use Template;
  1         21601  
  1         45  
13              
14 1     1   508 use DBM::Deep;
  1         9586  
  1         7  
15 1     1   59 use Storable qw(dclone);
  1         3  
  1         78  
16              
17 1     1   10 use Text::ASCIITable;
  1         4  
  1         47  
18 1     1   686 use Memoize;
  1         2898  
  1         78  
19 1     1   11 use List::MoreUtils qw(first_index);
  1         4  
  1         15  
20              
21 1     1   1439 use BioSAILs::Utils::Traits qw(ArrayRefOfStrs);
  1         33453  
  1         9  
22              
23             with 'HPC::Runner::Command::submit_jobs::Utils::Scheduler::ParseInput';
24             with 'HPC::Runner::Command::submit_jobs::Utils::Scheduler::ResolveDeps';
25             with 'HPC::Runner::Command::submit_jobs::Utils::Scheduler::Files';
26             with 'HPC::Runner::Command::submit_jobs::Utils::Scheduler::Directives';
27             with 'HPC::Runner::Command::submit_jobs::Utils::Scheduler::Submit';
28             with 'HPC::Runner::Command::submit_jobs::Utils::Log';
29             with
30             'HPC::Runner::Command::submit_jobs::Utils::Scheduler::ResolveDeps::AssignTaskDeps';
31              
32 1     1   2219 use HPC::Runner::Command::submit_jobs::Utils::Scheduler::JobStats;
  1         6  
  1         58  
33 1     1   579 use HPC::Runner::Command::submit_jobs::Utils::Scheduler::Job;
  1         6  
  1         2939  
34              
35             =head1 HPC::Runner::Command::submit_jobs::Utils::Scheduler
36              
37              
38             =head2 Command Line Options
39              
40             #TODO Move this over to docs
41              
42             =head3 config
43              
44             Config file to pass to command line as --config /path/to/file. It should be a yaml or other config supplied by L<Config::Any>
45             This is optional. Paramaters can be passed straight to the command line
46              
47             =head3 example.yml
48              
49             ---
50             infile: "/path/to/commands/testcommand.in"
51             outdir: "path/to/testdir"
52             module:
53             - "R2"
54             - "shared"
55              
56             =cut
57              
58             =head3 infile
59              
60             infile of commands separated by newline. The usual bash convention of escaping a newline is also supported.
61              
62             =head4 example.in
63              
64             cmd1
65             #Multiline command
66             cmd2 --input --input \
67             --someotherinput
68             wait
69             #Wait tells slurm to make sure previous commands have exited with exit status 0.
70             cmd3 ##very heavy job
71             newnode
72             #cmd3 is a very heavy job so lets start the next job on a new node
73              
74             =cut
75              
76             =head3 jobname
77              
78             Specify a job name, and jobs will be 001_jobname, 002_jobname, 003_jobname
79              
80             Separating this out from Base - submit_jobs and execute_job have different ways of dealing with this
81              
82             =cut
83              
84             option 'jobname' => (
85             is => 'rw',
86             isa => 'Str',
87             required => 0,
88             default => 'hpcjob_001',
89             traits => ['String'],
90             predicate => 'has_jobname',
91             handles => {
92             add_jobname => 'append',
93             clear_jobname => 'clear',
94             replace_jobname => 'replace',
95             prepend_jobname => 'prepend',
96             match_jobname => 'match',
97             },
98             trigger => sub {
99             my $self = shift;
100             $self->check_add_to_jobs();
101             },
102             documentation =>
103             q{Specify a job name, each job will be appended with its batch order},
104             );
105              
106             =head3 max_array_size
107              
108             =cut
109              
110             option 'max_array_size' => (
111             is => 'rw',
112             isa => 'Int',
113             default => 200,
114             );
115              
116             =head3 use_batches
117              
118             The default is to submit using job arrays.
119              
120             If specified it will submit each job individually.
121              
122             Example:
123              
124             #HPC jobname=gzip
125             #HPC commands_per_node=1
126             gzip 1
127             gzip 2
128             gzip 3
129              
130             Batches:
131             sbatch 001_gzip.sh
132             sbatch 002_gzip.sh
133             sbatch 003_gzip.sh
134              
135             Arrays:
136              
137             sbatch --array=1-3 gzip.sh
138              
139             =cut
140              
141             option 'use_batches' => (
142             is => 'rw',
143             isa => 'Bool',
144             default => 0,
145             required => 0,
146             documentation =>
147             q{Switch to use batches. The default is to use job arrays. Warning! This was the default way of submitting before 3.0, but is not well supported.},
148             );
149              
150             =head3 afterok
151              
152             The afterok switch in slurm. --afterok 123 will tell slurm to start this job after job 123 has completed successfully.
153              
154             =cut
155              
156             option 'afterok' => (
157             traits => ['Array'],
158             is => 'rw',
159             required => 0,
160             isa => ArrayRefOfStrs,
161             documentation =>
162             'afterok switch in slurm. --afterok 123,134 will tell slurm to start this job after 123,134 have exited successfully',
163             default => sub { [] },
164             cmd_split => qr/,/,
165             handles => {
166             all_afterok => 'elements',
167             has_afterok => 'count',
168             clear_afterok => 'clear',
169             join_afterok => 'join',
170             },
171             );
172              
173             =head3 no_submit_to_slurm
174              
175             Bool value whether or not to submit to slurm. If you are looking to debug your files, or this script you will want to set this to zero.
176             Don't submit to slurm with --no_submit_to_slurm from the command line or
177             $self->no_submit_to_slurm(0); within your code
178              
179             DEPRECATED - use --dry_run instead
180             =cut
181              
182             # option 'no_submit_to_slurm' => (
183             # is => 'rw',
184             # isa => 'Bool',
185             # default => 0,
186             # required => 0,
187             # documentation =>
188             # q{Bool value whether or not to submit to slurm. If you are looking to debug your files, or this script you will want to set this to zero.},
189             # );
190              
191              
192             =head3 serial
193              
194             Option to run all jobs serially, one after the other, no parallelism
195             The default is to use 4 procs
196              
197             =cut
198              
199             option 'serial' => (
200             is => 'rw',
201             isa => 'Bool',
202             default => 0,
203             documentation =>
204             q{Use this if you wish to run each job run one after another, with each job starting only after the previous has completed successfully},
205             predicate => 'has_serial',
206             clearer => 'clear_serial'
207             );
208              
209             =head3 use_custom
210              
211             Supply your own command instead of mcerunner/threadsrunner/etc
212              
213             =cut
214              
215             option 'custom_command' => (
216             is => 'rw',
217             isa => 'Str',
218             predicate => 'has_custom_command',
219             clearer => 'clear_custom_command',
220             required => 0
221             );
222              
223             =head2 Internal Attributes
224              
225             =head3 scheduler_ids
226              
227             Our current scheduler job dependencies
228              
229             =cut
230              
231             #Keep this or afterok?
232              
233             has 'scheduler_ids' => (
234             traits => ['Array'],
235             is => 'rw',
236             isa => 'ArrayRef[Str|Num]',
237             default => sub { [] },
238             handles => {
239             all_scheduler_ids => 'elements',
240             add_scheduler_id => 'push',
241             join_scheduler_ids => 'join',
242             count_scheduler_ids => 'count',
243             has_scheduler_ids => 'count',
244             clear_scheduler_ids => 'clear',
245             },
246             );
247              
248             has 'array_deps' => (
249             traits => ['Hash'],
250             isa => 'HashRef',
251             is => 'rw',
252             default => sub { return {} },
253             handles => {
254             has_array_deps => 'count',
255             array_dep_pairs => 'kv',
256             set_array_dep => 'set',
257             get_array_dep => 'get',
258             exists_array_dep => 'exists',
259             },
260             );
261              
262             =head3 job_stats
263              
264             Object describing the number of jobs, number of batches per job, etc
265              
266             =cut
267              
268             has 'job_stats' => (
269             is => 'rw',
270             isa => 'HPC::Runner::Command::submit_jobs::Utils::Scheduler::JobStats',
271             default => sub {
272             return HPC::Runner::Command::submit_jobs::Utils::Scheduler::JobStats
273             ->new();
274             }
275             );
276              
277             =head3 deps
278              
279             Call as
280              
281             #HPC deps=job01,job02
282              
283             =cut
284              
285             has 'deps' => (
286             traits => ['Array'],
287             is => 'rw',
288             isa => ArrayRefOfStrs,
289             coerce => 1,
290             predicate => 'has_deps',
291             clearer => 'clear_deps',
292             required => 0,
293             trigger => sub {
294             my $self = shift;
295              
296             $self->graph_job_deps->{ $self->jobname } = $self->deps;
297             $self->jobs->{ $self->jobname }->{deps} = $self->deps;
298              
299             }
300             );
301              
302             =head3 current_job
303              
304             Keep track of our currently running job
305              
306             =cut
307              
308             has 'current_job' => (
309             is => 'rw',
310             isa => 'Str',
311             default => '',
312             required => 0,
313             predicate => 'has_current_job',
314             );
315              
316             =head3 current_batch
317              
318             Keep track of our currently batch
319              
320             =cut
321              
322             has 'current_batch' => (
323             is => 'rw',
324             required => 0,
325             );
326              
327             =head3 template
328              
329             template object for writing slurm batch submission script
330              
331             =cut
332              
333             has 'template' => (
334             is => 'rw',
335             required => 0,
336             default => sub {
337             return Template->new( ABSOLUTE => 1, PRE_CHOMP => 1, TRIM => 1, EVAL_PERL => 1, );
338             },
339             );
340              
341             =head3 cmd_counter
342              
343             keep track of the number of commands - when we get to more than commands_per_node restart so we get submit to a new node.
344             This is the number of commands within a batch. Each new batch resets it.
345              
346             =cut
347              
348             has 'cmd_counter' => (
349             traits => ['Counter'],
350             is => 'rw',
351             isa => 'Num',
352             required => 1,
353             default => 0,
354             handles => {
355             inc_cmd_counter => 'inc',
356             reset_cmd_counter => 'reset',
357             },
358             );
359              
360             =head3 total_cmd_counter
361              
362             =cut
363              
364             has 'total_cmd_counter' => (
365             traits => ['Counter'],
366             is => 'rw',
367             isa => 'Num',
368             required => 1,
369             default => 0,
370             handles => {
371             inc_total_cmd_counter => 'inc',
372             reset_total_cmd_counter => 'reset',
373             },
374             );
375              
376             =head2 batch_counter
377              
378             Keep track of how many batches we have submited to slurm
379              
380             =cut
381              
382             has 'batch_counter' => (
383             traits => ['Counter'],
384             is => 'rw',
385             isa => 'Num',
386             required => 1,
387             default => 1,
388             handles => {
389             inc_batch_counter => 'inc',
390             reset_batch_counter => 'reset',
391             },
392             );
393              
394             =head2 job_counter
395              
396             Keep track of how many jobes we have submited to slurm
397              
398             =cut
399              
400             has 'job_counter' => (
401             traits => ['Counter'],
402             is => 'rw',
403             isa => 'Num',
404             required => 1,
405             default => 1,
406             handles => {
407             inc_job_counter => 'inc',
408             reset_job_counter => 'reset',
409             },
410             );
411              
412             =head3 batch
413              
414             List of commands to submit to slurm
415              
416             =cut
417              
418             has 'batch' => (
419             traits => ['String'],
420             is => 'rw',
421             isa => 'Str',
422             default => q{},
423             required => 0,
424             handles => { add_batch => 'append', },
425             clearer => 'clear_batch',
426             predicate => 'has_batch',
427             );
428              
429             =head3 jobs
430              
431             Contains all of our info for jobs
432              
433             {
434             job03 => {
435             deps => ['job01', 'job02'],
436             schedulerIds => ['123.hpc.inst.edu'],
437             submitted => 1/0,
438             batch => 'String of whole commands',
439             cmds => [
440             'cmd1',
441             'cmd2',
442             ]
443             },
444             schedule => ['job01', 'job02', 'job03']
445             }
446              
447             =cut
448              
449             has 'jobs' => (
450             is => 'rw',
451             isa => 'DBM::Deep',
452             lazy => 1,
453             default => sub {
454             my $self = shift;
455             my $fh = tempfile();
456             my $db = DBM::Deep->new( fh => $fh, num_txns => 2 );
457             return $db;
458              
459             # return {};
460             },
461             );
462              
463             has 'jobs_current_job' => ( is => 'rw', );
464              
465             =head3 graph_job_deps
466              
467             Hashref of jobdeps to pass to Algorithm::Dependency
468              
469             Job03 depends on job01 and job02
470              
471             { 'job03' => ['job01', 'job02'] }
472              
473             =cut
474              
475             has 'graph_job_deps' => (
476             traits => ['Hash'],
477             is => 'rw',
478             isa => 'HashRef',
479             lazy => 1,
480             handles => {
481             set_graph_job_deps => 'set',
482             get_graph_job_deps => 'get',
483             exists_graph_job_deps => 'exists',
484             has_no_graph_job_deps => 'is_empty',
485             num_graph_job_depss => 'count',
486             delete_graph_job_deps => 'delete',
487             graph_job_deps_pairs => 'kv',
488             },
489             default => sub { my $self = shift; return { $self->jobname => [] } },
490             );
491              
492             =head2 Subroutines
493              
494             =head3 Workflow
495              
496             There are a lot of things happening here
497              
498             parse_file_slurm #we also resolve the dependency tree and write out the batch files in here
499             schedule_jobs
500             iterate_schedule
501              
502             for $job (@scheduled_jobs)
503             (set current_job)
504             process_jobs
505             if !use_batches
506             submit_job #submit the whole job is using job arrays - which is the default
507             pre_process_batch
508             (current_job, current_batch)
509             scheduler_ids_by_batch
510             if use_batches
511             submit_job
512             else
513             run scontrol to update our jobs by job array id
514              
515             =cut
516              
517             =head3 run
518              
519             =cut
520              
521             sub run {
522 0     0 1   my $self = shift;
523              
524 0           $self->logname('slurm_logs');
525 0           $self->check_add_to_jobs;
526              
527             #TODO add back in support for serial workflows
528 0 0         if ( $self->serial ) {
529 0           $self->procs(1);
530             }
531              
532 0           $self->check_files;
533 0           $self->check_jobname;
534              
535 0           $self->parse_file_slurm;
536 0           $self->iterate_schedule;
537             }
538              
539             =head3 check_jobname
540              
541             Check to see if we the user has chosen the default jobname, 'job'
542              
543             =cut
544              
545             sub check_jobname {
546 0     0 1   my $self = shift;
547              
548 0 0         $self->increase_jobname if $self->match_jobname(qr/^hpcjob_/);
549             }
550              
551             =head3 check_add_to_jobs
552              
553             Make sure each jobname has an entry. We set the defaults as the global configuration.
554              
555             =cut
556              
557             #Apply hpcjob_001 hpc_meta as globals
558              
559             sub check_add_to_jobs {
560 0     0 1   my $self = shift;
561              
562 0 0         if ( !exists $self->jobs->{ $self->jobname } ) {
563 0           $self->jobs->{ $self->jobname } =
564             HPC::Runner::Command::submit_jobs::Utils::Scheduler::Job->new(
565             mem => $self->mem,
566             walltime => $self->walltime,
567             cpus_per_task => $self->cpus_per_task,
568             nodes_count => $self->nodes_count,
569             ntasks_per_nodes => $self->ntasks_per_node,
570             );
571 0 0         $self->jobs->{ $self->jobname }->partition( $self->partition )
572             if $self->has_partition;
573 0 0         $self->jobs->{ $self->jobname }->account( $self->account )
574             if $self->has_account;
575             }
576 0           $self->graph_job_deps->{ $self->jobname } = [];
577 0 0         if ( !exists $self->job_files->{ $self->jobname } ) {
578 0           $self->job_files->{ $self->jobname } =
579             File::Temp->new( UNLINK => 0, SUFFIX => '.dat' );
580             }
581 0 0         if ( !exists $self->batch_tags->{ $self->jobname } ) {
582 0           $self->batch_tags->{ $self->jobname } = [];
583             }
584             }
585              
586             =head3 increase_jobname
587              
588             Increase jobname. job_001, job_002. Used for graph_job_deps
589              
590             =cut
591              
592             sub increase_jobname {
593 0     0 1   my $self = shift;
594              
595 0           $self->inc_job_counter;
596 0           my $counter = $self->job_counter;
597              
598             #TODO Change this to 4
599 0           $counter = sprintf( "%03d", $counter );
600              
601 0           $self->jobname( "hpcjob_" . $counter );
602             }
603              
604             =head3 check_files
605              
606             Check to make sure the outdir exists.
607             If it doesn't exist the entire path will be created
608              
609             =cut
610              
611             sub check_files {
612 0     0 1   my ($self) = @_;
613              
614 0 0         make_path( $self->outdir ) if !-d $self->outdir;
615             }
616              
617             =head3 iterate_schedule
618              
619             Iterate over the schedule generated by schedule_jobs
620              
621             =cut
622              
623             sub iterate_schedule {
624 0     0 1   my $self = shift;
625              
626             ##No batch_tags here
627 0 0         return if $self->has_no_schedules;
628 0           $self->reset_job_counter;
629 0           $self->reset_batch_counter;
630              
631 0           $self->clear_scheduler_ids;
632 0           $self->app_log->info('Beginning to submit jobs to the scheduler');
633              
634             $self->app_log->info(
635 0           'Schedule is ' . join( ", ", @{ $self->schedule } ) . "\n" );
  0            
636              
637 0           foreach my $job ( $self->all_schedules ) {
638              
639 0           $self->app_log->info( 'Submitting all ' . $job . ' job types' );
640              
641 0           $self->reset_batch_counter;
642 0           $self->current_job($job);
643              
644 0           $self->reset_cmd_counter;
645 0 0         next unless $self->iterate_job_deps;
646              
647 0           $self->log_job_info;
648 0           $self->process_jobs;
649             }
650              
651 0           $self->update_job_scheduler_deps_by_task;
652              
653 0           $self->summarize_jobs;
654              
655             # $self->write_job_project_table;
656             # $self->write_task_project_table;
657             }
658              
659             =head3 iterate_job_deps
660              
661             Check to see if we are actually submitting
662              
663             Make sure each dep has already been submitted
664              
665             Return job schedulerIds
666              
667             =cut
668              
669             sub iterate_job_deps {
670 0     0 1   my $self = shift;
671              
672 0           $self->clear_scheduler_ids;
673 0           my $deps = $self->graph_job_deps->{ $self->current_job };
674              
675 0 0         return unless $deps;
676              
677 0           my $submit_ok = 1;
678 0           foreach my $dep ( @{$deps} ) {
  0            
679              
680 0 0         if ( $self->jobs->{$dep}->submission_failure ) {
681 0           $self->jobs->{ $self->current_job }->submission_failure(1);
682 0           $self->app_log->warn( 'Trying to submit job '
683             . $self->current_job
684             . ' which depends upon '
685             . $dep );
686 0           $self->app_log->warn(
687             'Job ' . $dep . ' failed, so we are skipping this submission' );
688 0           $submit_ok = 0;
689 0           $self->clear_scheduler_ids;
690             }
691             else {
692 0           map { $self->add_scheduler_id($_) }
693 0           $self->jobs->{$dep}->all_scheduler_ids;
694             }
695             }
696              
697 0           return $submit_ok;
698             }
699              
700             =head3 process_jobs
701              
702             =cut
703              
704             sub process_jobs {
705 0     0 1   my $self = shift;
706              
707 0           my $jobref = $self->jobs->{ $self->current_job };
708              
709 0 0         return if $self->jobs->{ $self->current_job }->submission_failure;
710 0 0         return if $jobref->submitted;
711              
712 0           $self->prepare_batch_files_array;
713              
714 0           $self->work;
715             }
716              
717             =head3 pre_process_batch
718              
719             Log info for the job to the screen
720              
721             =cut
722              
723             sub log_job_info {
724 0     0 0   my $self = shift;
725              
726             $self->app_log->info( 'There are '
727 0           . $self->jobs->{ $self->current_job }->count_batches . ' '
728             . $self->desc
729             . ' for job type '
730             . $self->current_job );
731              
732             $self->app_log->info( 'Submitted in '
733             . $self->jobs->{ $self->current_job }->{num_job_arrays}
734 0 0         . ' job arrays.'
735             . "\n" )
736             unless $self->use_batches;
737             }
738              
739             =head3 work
740              
741             Process the batch
742             Submit to the scheduler slurm/pbs/etc
743             Take care of the counters
744              
745             =cut
746              
747             sub work {
748 0     0 1   my $self = shift;
749              
750 0           $self->process_batch;
751 0           $self->clear_batch;
752              
753 0           $self->reset_cmd_counter;
754             }
755              
756             =head3 process_batch
757              
758             Create the slurm submission script from the slurm template
759             Write out template, submission job, and infile for parallel runner
760              
761             =cut
762              
763             #TODO think of more informative sub name
764             #TODO split this into process_arrays and process_batches
765              
766             sub process_batch {
767 0     0 1   my $self = shift;
768              
769 0           my $ok;
770             #TODO Rework this so we only get the arrayids we need the first time around
771 0 0         $ok = $self->join_scheduler_ids(':') if $self->has_scheduler_ids;
772              
773 0           my $count_by = $self->prepare_batch_indexes;
774              
775 0           for ( my $x = 0 ; $x <= scalar @{$count_by} ; $x++ ) {
  0            
776 0           my $batch_indexes = $count_by->[$x];
777 0 0         next unless $batch_indexes;
778              
779 0           my $counter = $self->gen_counter_str;
780              
781             ##Create file per submission
782 0           $self->prepare_files;
783              
784             # $DB::single = 2;
785 0           my $array_str = '';
786 0 0         $array_str = $self->gen_array_str($batch_indexes)
787             if $self->can('gen_array_str');
788              
789 0           my $command = $self->process_submit_command($counter);
790              
791 0           $self->process_template( $counter, $command, $ok, $array_str );
792              
793 0           $self->post_process_jobs;
794              
795 0           $self->post_process_batch_indexes($batch_indexes);
796 0           $self->inc_batch_counter;
797             }
798             }
799              
800             =head3 post_process_batch_indexes
801              
802             Put the scheduler_id in each batch
803              
804             =cut
805              
806             sub post_process_batch_indexes {
807 0     0 1   my $self = shift;
808 0           my $batch_indexes = shift;
809              
810 0           my $scheduler_id = $self->jobs->{ $self->current_job }->scheduler_ids->[-1];
811              
812 0           for (
813             my $x = $batch_indexes->{batch_index_start} - 1 ;
814             $x <= $batch_indexes->{batch_index_end} - 1 ;
815             $x++
816             )
817             {
818 0           my $batch = $self->jobs->{ $self->current_job }->batches->[$x];
819 0 0         next unless $batch;
820 0           $batch->{scheduler_id} = $scheduler_id;
821             }
822              
823             }
824              
825             =head3 post_process_jobs
826              
827             =cut
828              
829             sub post_process_jobs {
830 0     0 1   my $self = shift;
831              
832 0           $self->jobs->{ $self->current_job }->submitted(1);
833              
834 0           $self->inc_job_counter;
835             }
836              
837             1;