File Coverage

blib/lib/HPC/Runner/Command/submit_jobs/Utils/Scheduler.pm
Criterion Covered Total %
statement 51 153 33.3
branch 0 38 0.0
condition n/a
subroutine 17 30 56.6
pod 12 13 92.3
total 80 234 34.1


line stmt bran cond sub pod time code
1             package HPC::Runner::Command::submit_jobs::Utils::Scheduler;
2              
3 1     1   822 use MooseX::App::Role;
  1         2  
  1         8  
4 1     1   7900 use namespace::autoclean;
  1         3  
  1         9  
5              
6 1     1   99 use File::Path qw(make_path);
  1         3  
  1         63  
7 1     1   8 use File::Temp qw/ tempfile /;
  1         3  
  1         63  
8 1     1   8 use IO::Select;
  1         3  
  1         43  
9 1     1   7 use Cwd;
  1         3  
  1         79  
10 1     1   9 use IPC::Open3;
  1         3  
  1         60  
11 1     1   8 use Symbol;
  1         4  
  1         74  
12 1     1   485 use Template;
  1         21445  
  1         41  
13              
14 1     1   341 use DBM::Deep;
  1         5653  
  1         4  
15 1     1   30 use Storable qw(dclone);
  1         3  
  1         43  
16              
17 1     1   6 use Text::ASCIITable;
  1         3  
  1         28  
18 1     1   398 use Memoize;
  1         2162  
  1         48  
19 1     1   7 use List::MoreUtils qw(first_index);
  1         2  
  1         10  
20              
21 1     1   853 use BioSAILs::Utils::Traits qw(ArrayRefOfStrs);
  1         25840  
  1         7  
22              
23             with 'HPC::Runner::Command::submit_jobs::Utils::Scheduler::ParseInput';
24             with 'HPC::Runner::Command::submit_jobs::Utils::Scheduler::ResolveDeps';
25             with 'HPC::Runner::Command::submit_jobs::Utils::Scheduler::Files';
26             with 'HPC::Runner::Command::submit_jobs::Utils::Scheduler::Directives';
27             with 'HPC::Runner::Command::submit_jobs::Utils::Scheduler::Submit';
28             with 'HPC::Runner::Command::submit_jobs::Utils::Log';
29             with
30             'HPC::Runner::Command::submit_jobs::Utils::Scheduler::ResolveDeps::AssignTaskDeps';
31              
32 1     1   1889 use HPC::Runner::Command::submit_jobs::Utils::Scheduler::JobStats;
  1         4  
  1         42  
33 1     1   448 use HPC::Runner::Command::submit_jobs::Utils::Scheduler::Job;
  1         4  
  1         1513  
34              
35             =head1 HPC::Runner::Command::submit_jobs::Utils::Scheduler
36              
37              
38             =head2 Command Line Options
39              
40             #TODO Move this over to docs
41              
42             =head3 config
43              
44             Config file to pass to command line as --config /path/to/file. It should be a yaml or other config supplied by L<Config::Any>
45             This is optional. Paramaters can be passed straight to the command line
46              
47             =head3 example.yml
48              
49             ---
50             infile: "/path/to/commands/testcommand.in"
51             outdir: "path/to/testdir"
52             module:
53             - "R2"
54             - "shared"
55              
56             =cut
57              
58             =head3 infile
59              
60             infile of commands separated by newline. The usual bash convention of escaping a newline is also supported.
61              
62             =head4 example.in
63              
64             cmd1
65             #Multiline command
66             cmd2 --input --input \
67             --someotherinput
68             wait
69             #Wait tells slurm to make sure previous commands have exited with exit status 0.
70             cmd3 ##very heavy job
71             newnode
72             #cmd3 is a very heavy job so lets start the next job on a new node
73              
74             =cut
75              
76             =head3 jobname
77              
78             Specify a job name, and jobs will be 001_jobname, 002_jobname, 003_jobname
79              
80             Separating this out from Base - submit_jobs and execute_job have different ways of dealing with this
81              
82             =cut
83              
84             option 'jobname' => (
85             is => 'rw',
86             isa => 'Str',
87             required => 0,
88             default => 'hpcjob_001',
89             traits => ['String'],
90             predicate => 'has_jobname',
91             handles => {
92             add_jobname => 'append',
93             clear_jobname => 'clear',
94             replace_jobname => 'replace',
95             prepend_jobname => 'prepend',
96             match_jobname => 'match',
97             },
98             trigger => sub {
99             my $self = shift;
100             $self->check_add_to_jobs();
101             },
102             documentation =>
103             q{Specify a job name, each job will be appended with its batch order},
104             );
105              
106             =head3 max_array_size
107              
108             =cut
109              
110             option 'max_array_size' => (
111             is => 'rw',
112             isa => 'Int',
113             default => 200,
114             );
115              
116             =head3 use_batches
117              
118             The default is to submit using job arrays.
119              
120             If specified it will submit each job individually.
121              
122             Example:
123              
124             #HPC jobname=gzip
125             #HPC commands_per_node=1
126             gzip 1
127             gzip 2
128             gzip 3
129              
130             Batches:
131             sbatch 001_gzip.sh
132             sbatch 002_gzip.sh
133             sbatch 003_gzip.sh
134              
135             Arrays:
136              
137             sbatch --array=1-3 gzip.sh
138              
139             =cut
140              
141             option 'use_batches' => (
142             is => 'rw',
143             isa => 'Bool',
144             default => 0,
145             required => 0,
146             documentation =>
147             q{Switch to use batches. The default is to use job arrays. Warning! This was the default way of submitting before 3.0, but is not well supported.},
148             );
149              
150             =head3 afterok
151              
152             The afterok switch in slurm. --afterok 123 will tell slurm to start this job after job 123 has completed successfully.
153              
154             =cut
155              
156             option 'afterok' => (
157             traits => ['Array'],
158             is => 'rw',
159             required => 0,
160             isa => ArrayRefOfStrs,
161             documentation =>
162             'afterok switch in slurm. --afterok 123,134 will tell slurm to start this job after 123,134 have exited successfully',
163             default => sub { [] },
164             cmd_split => qr/,/,
165             handles => {
166             all_afterok => 'elements',
167             has_afterok => 'count',
168             clear_afterok => 'clear',
169             join_afterok => 'join',
170             },
171             );
172              
173             =head3 no_submit_to_slurm
174              
175             Bool value whether or not to submit to slurm. If you are looking to debug your files, or this script you will want to set this to zero.
176             Don't submit to slurm with --no_submit_to_slurm from the command line or
177             $self->no_submit_to_slurm(0); within your code
178              
179             DEPRECATED - use --dry_run instead
180             =cut
181              
182             # option 'no_submit_to_slurm' => (
183             # is => 'rw',
184             # isa => 'Bool',
185             # default => 0,
186             # required => 0,
187             # documentation =>
188             # q{Bool value whether or not to submit to slurm. If you are looking to debug your files, or this script you will want to set this to zero.},
189             # );
190              
191             =head3 serial
192              
193             Option to run all jobs serially, one after the other, no parallelism
194             The default is to use 4 procs
195              
196             =cut
197              
198             option 'serial' => (
199             is => 'rw',
200             isa => 'Bool',
201             default => 0,
202             documentation =>
203             q{Use this if you wish to run each job run one after another, with each job starting only after the previous has completed successfully},
204             predicate => 'has_serial',
205             clearer => 'clear_serial'
206             );
207              
208             =head3 use_custom
209              
210             Supply your own command instead of mcerunner/threadsrunner/etc
211              
212             =cut
213              
214             option 'custom_command' => (
215             is => 'rw',
216             isa => 'Str',
217             predicate => 'has_custom_command',
218             clearer => 'clear_custom_command',
219             required => 0
220             );
221              
222             =head2 Internal Attributes
223              
224             =head3 scheduler_ids
225              
226             Our current scheduler job dependencies
227              
228             =cut
229              
230             #Keep this or afterok?
231              
232             has 'scheduler_ids' => (
233             traits => ['Array'],
234             is => 'rw',
235             isa => 'ArrayRef[Str|Num]',
236             default => sub { [] },
237             handles => {
238             all_scheduler_ids => 'elements',
239             add_scheduler_id => 'push',
240             join_scheduler_ids => 'join',
241             count_scheduler_ids => 'count',
242             has_scheduler_ids => 'count',
243             clear_scheduler_ids => 'clear',
244             },
245             );
246              
247             has 'array_deps' => (
248             traits => ['Hash'],
249             isa => 'HashRef',
250             is => 'rw',
251             default => sub { return {} },
252             handles => {
253             has_array_deps => 'count',
254             array_dep_pairs => 'kv',
255             set_array_dep => 'set',
256             get_array_dep => 'get',
257             exists_array_dep => 'exists',
258             },
259             );
260              
261             =head3 job_stats
262              
263             Object describing the number of jobs, number of batches per job, etc
264              
265             =cut
266              
267             has 'job_stats' => (
268             is => 'rw',
269             isa => 'HPC::Runner::Command::submit_jobs::Utils::Scheduler::JobStats',
270             default => sub {
271             return HPC::Runner::Command::submit_jobs::Utils::Scheduler::JobStats
272             ->new();
273             }
274             );
275              
276             =head3 deps
277              
278             Call as
279              
280             #HPC deps=job01,job02
281              
282             =cut
283              
284             has 'deps' => (
285             traits => ['Array'],
286             is => 'rw',
287             isa => ArrayRefOfStrs,
288             coerce => 1,
289             predicate => 'has_deps',
290             clearer => 'clear_deps',
291             required => 0,
292             trigger => sub {
293             my $self = shift;
294              
295             $self->graph_job_deps->{ $self->jobname } = $self->deps;
296             $self->jobs->{ $self->jobname }->{deps} = $self->deps;
297              
298             }
299             );
300              
301             =head3 current_job
302              
303             Keep track of our currently running job
304              
305             =cut
306              
307             has 'current_job' => (
308             is => 'rw',
309             isa => 'Str',
310             default => '',
311             required => 0,
312             predicate => 'has_current_job',
313             );
314              
315             =head3 current_batch
316              
317             Keep track of our currently batch
318              
319             =cut
320              
321             has 'current_batch' => (
322             is => 'rw',
323             required => 0,
324             );
325              
326             =head3 template
327              
328             template object for writing slurm batch submission script
329              
330             =cut
331              
332             has 'template' => (
333             is => 'rw',
334             required => 0,
335             default => sub {
336             return Template->new(
337             ABSOLUTE => 1,
338             PRE_CHOMP => 1,
339             TRIM => 1,
340             EVAL_PERL => 1,
341             );
342             },
343             );
344              
345             =head3 cmd_counter
346              
347             keep track of the number of commands - when we get to more than commands_per_node restart so we get submit to a new node.
348             This is the number of commands within a batch. Each new batch resets it.
349              
350             =cut
351              
352             has 'cmd_counter' => (
353             traits => ['Counter'],
354             is => 'rw',
355             isa => 'Num',
356             required => 1,
357             default => 0,
358             handles => {
359             inc_cmd_counter => 'inc',
360             reset_cmd_counter => 'reset',
361             },
362             );
363              
364             =head3 total_cmd_counter
365              
366             =cut
367              
368             has 'total_cmd_counter' => (
369             traits => ['Counter'],
370             is => 'rw',
371             isa => 'Num',
372             required => 1,
373             default => 0,
374             handles => {
375             inc_total_cmd_counter => 'inc',
376             reset_total_cmd_counter => 'reset',
377             },
378             );
379              
380             =head2 batch_counter
381              
382             Keep track of how many batches we have submited to slurm
383              
384             =cut
385              
386             has 'batch_counter' => (
387             traits => ['Counter'],
388             is => 'rw',
389             isa => 'Num',
390             required => 1,
391             default => 1,
392             handles => {
393             inc_batch_counter => 'inc',
394             reset_batch_counter => 'reset',
395             },
396             );
397              
398             =head2 job_counter
399              
400             Keep track of how many jobes we have submited to slurm
401              
402             =cut
403              
404             has 'job_counter' => (
405             traits => ['Counter'],
406             is => 'rw',
407             isa => 'Num',
408             required => 1,
409             default => 1,
410             handles => {
411             inc_job_counter => 'inc',
412             reset_job_counter => 'reset',
413             },
414             );
415              
416             =head3 batch
417              
418             List of commands to submit to slurm
419              
420             =cut
421              
422             has 'batch' => (
423             traits => ['String'],
424             is => 'rw',
425             isa => 'Str',
426             default => q{},
427             required => 0,
428             handles => { add_batch => 'append', },
429             clearer => 'clear_batch',
430             predicate => 'has_batch',
431             );
432              
433             =head3 jobs
434              
435             Contains all of our info for jobs
436              
437             {
438             job03 => {
439             deps => ['job01', 'job02'],
440             schedulerIds => ['123.hpc.inst.edu'],
441             submitted => 1/0,
442             batch => 'String of whole commands',
443             cmds => [
444             'cmd1',
445             'cmd2',
446             ]
447             },
448             schedule => ['job01', 'job02', 'job03']
449             }
450              
451             =cut
452              
453             has 'jobs' => (
454             is => 'rw',
455             isa => 'DBM::Deep',
456             lazy => 1,
457             default => sub {
458             my $self = shift;
459             my $fh = tempfile();
460             my $db = DBM::Deep->new( fh => $fh, num_txns => 2 );
461             return $db;
462              
463             # return {};
464             },
465             );
466              
467             has 'jobs_current_job' => ( is => 'rw', );
468              
469             =head3 graph_job_deps
470              
471             Hashref of jobdeps to pass to Algorithm::Dependency
472              
473             Job03 depends on job01 and job02
474              
475             { 'job03' => ['job01', 'job02'] }
476              
477             =cut
478              
479             has 'graph_job_deps' => (
480             traits => ['Hash'],
481             is => 'rw',
482             isa => 'HashRef',
483             lazy => 1,
484             handles => {
485             set_graph_job_deps => 'set',
486             get_graph_job_deps => 'get',
487             exists_graph_job_deps => 'exists',
488             has_no_graph_job_deps => 'is_empty',
489             num_graph_job_depss => 'count',
490             delete_graph_job_deps => 'delete',
491             graph_job_deps_pairs => 'kv',
492             },
493             default => sub { my $self = shift; return { $self->jobname => [] } },
494             );
495              
496             =head2 Subroutines
497              
498             =head3 Workflow
499              
500             There are a lot of things happening here
501              
502             parse_file_slurm #we also resolve the dependency tree and write out the batch files in here
503             schedule_jobs
504             iterate_schedule
505              
506             for $job (@scheduled_jobs)
507             (set current_job)
508             process_jobs
509             if !use_batches
510             submit_job #submit the whole job is using job arrays - which is the default
511             pre_process_batch
512             (current_job, current_batch)
513             scheduler_ids_by_batch
514             if use_batches
515             submit_job
516             else
517             run scontrol to update our jobs by job array id
518              
519             =cut
520              
521             =head3 run
522              
523             =cut
524              
525             sub run {
526 0     0 1   my $self = shift;
527              
528 0           $self->logname('slurm_logs');
529 0           $self->check_add_to_jobs;
530              
531             #TODO add back in support for serial workflows
532 0 0         if ( $self->serial ) {
533 0           $self->procs(1);
534             }
535              
536 0           $self->check_files;
537 0           $self->check_jobname;
538              
539 0           $self->parse_file_slurm;
540 0           $self->iterate_schedule;
541             }
542              
543             =head3 check_jobname
544              
545             Check to see if we the user has chosen the default jobname, 'job'
546              
547             =cut
548              
549             sub check_jobname {
550 0     0 1   my $self = shift;
551              
552 0 0         $self->increase_jobname if $self->match_jobname(qr/^hpcjob_/);
553             }
554              
555             =head3 check_add_to_jobs
556              
557             Make sure each jobname has an entry. We set the defaults as the global configuration.
558              
559             =cut
560              
561             #Apply hpcjob_001 hpc_meta as globals
562              
563             sub check_add_to_jobs {
564 0     0 1   my $self = shift;
565              
566 0 0         if ( !exists $self->jobs->{ $self->jobname } ) {
567 0           $self->jobs->{ $self->jobname } =
568             HPC::Runner::Command::submit_jobs::Utils::Scheduler::Job->new(
569             mem => $self->mem,
570             walltime => $self->walltime,
571             cpus_per_task => $self->cpus_per_task,
572             nodes_count => $self->nodes_count,
573             ntasks_per_nodes => $self->ntasks_per_node,
574             template_file => $self->template_file,
575             );
576 0 0         $self->jobs->{ $self->jobname }->partition( $self->partition )
577             if $self->has_partition;
578 0 0         $self->jobs->{ $self->jobname }->account( $self->account )
579             if $self->has_account;
580             }
581 0           $self->graph_job_deps->{ $self->jobname } = [];
582 0 0         if ( !exists $self->job_files->{ $self->jobname } ) {
583 0           $self->job_files->{ $self->jobname } =
584             File::Temp->new( UNLINK => 0, SUFFIX => '.dat' );
585             }
586 0 0         if ( !exists $self->batch_tags->{ $self->jobname } ) {
587 0           $self->batch_tags->{ $self->jobname } = [];
588             }
589             }
590              
591             =head3 increase_jobname
592              
593             Increase jobname. job_001, job_002. Used for graph_job_deps
594              
595             =cut
596              
597             sub increase_jobname {
598 0     0 1   my $self = shift;
599              
600 0           $self->inc_job_counter;
601 0           my $counter = $self->job_counter;
602              
603             #TODO Change this to 4
604 0           $counter = sprintf( "%03d", $counter );
605              
606 0           $self->jobname( "hpcjob_" . $counter );
607             }
608              
609             =head3 check_files
610              
611             Check to make sure the outdir exists.
612             If it doesn't exist the entire path will be created
613              
614             =cut
615              
616             sub check_files {
617 0     0 1   my ($self) = @_;
618              
619 0 0         make_path( $self->outdir ) if !-d $self->outdir;
620             }
621              
622             =head3 iterate_schedule
623              
624             Iterate over the schedule generated by schedule_jobs
625              
626             =cut
627              
628             sub iterate_schedule {
629 0     0 1   my $self = shift;
630              
631             ##No batch_tags here
632 0 0         return if $self->has_no_schedules;
633 0           $self->reset_job_counter;
634 0           $self->reset_batch_counter;
635              
636 0           $self->clear_scheduler_ids;
637 0           $self->app_log->info('Beginning to submit jobs to the scheduler');
638              
639             $self->app_log->info(
640 0           'Schedule is ' . join( ", ", @{ $self->schedule } ) . "\n" );
  0            
641              
642 0           foreach my $job ( $self->all_schedules ) {
643              
644 0           $self->app_log->info( 'Submitting all ' . $job . ' job types' );
645              
646 0           $self->reset_batch_counter;
647 0           $self->current_job($job);
648              
649 0           $self->reset_cmd_counter;
650 0 0         next unless $self->iterate_job_deps;
651              
652 0           $self->log_job_info;
653 0           $self->process_jobs;
654             }
655              
656 0           $self->update_job_scheduler_deps_by_task;
657              
658 0           $self->summarize_jobs;
659              
660             # $self->write_job_project_table;
661             # $self->write_task_project_table;
662             }
663              
664             =head3 iterate_job_deps
665              
666             Check to see if we are actually submitting
667              
668             Make sure each dep has already been submitted
669              
670             Return job schedulerIds
671              
672             =cut
673              
674             sub iterate_job_deps {
675 0     0 1   my $self = shift;
676              
677 0           $self->clear_scheduler_ids;
678 0           my $deps = $self->graph_job_deps->{ $self->current_job };
679              
680 0 0         return unless $deps;
681              
682 0           my $submit_ok = 1;
683 0           foreach my $dep ( @{$deps} ) {
  0            
684              
685 0 0         if ( $self->jobs->{$dep}->submission_failure ) {
686 0           $self->jobs->{ $self->current_job }->submission_failure(1);
687 0           $self->app_log->warn( 'Trying to submit job '
688             . $self->current_job
689             . ' which depends upon '
690             . $dep );
691 0           $self->app_log->warn(
692             'Job ' . $dep . ' failed, so we are skipping this submission' );
693 0           $submit_ok = 0;
694 0           $self->clear_scheduler_ids;
695             }
696             else {
697 0           map { $self->add_scheduler_id($_) }
698 0           $self->jobs->{$dep}->all_scheduler_ids;
699             }
700             }
701              
702 0           return $submit_ok;
703             }
704              
705             =head3 process_jobs
706              
707             =cut
708              
709             sub process_jobs {
710 0     0 1   my $self = shift;
711              
712 0           my $jobref = $self->jobs->{ $self->current_job };
713              
714 0 0         return if $self->jobs->{ $self->current_job }->submission_failure;
715 0 0         return if $jobref->submitted;
716              
717 0           $self->prepare_batch_files_array;
718              
719 0           $self->work;
720             }
721              
722             =head3 pre_process_batch
723              
724             Log info for the job to the screen
725              
726             =cut
727              
728             sub log_job_info {
729 0     0 0   my $self = shift;
730              
731             $self->app_log->info( 'There are '
732 0           . $self->jobs->{ $self->current_job }->count_batches . ' '
733             . $self->desc
734             . ' for job type '
735             . $self->current_job );
736              
737             $self->app_log->info( 'Submitted in '
738             . $self->jobs->{ $self->current_job }->{num_job_arrays}
739 0 0         . ' job arrays.'
740             . "\n" )
741             unless $self->use_batches;
742             }
743              
744             =head3 work
745              
746             Process the batch
747             Submit to the scheduler slurm/pbs/etc
748             Take care of the counters
749              
750             =cut
751              
752             sub work {
753 0     0 1   my $self = shift;
754              
755 0           $self->process_batch;
756 0           $self->clear_batch;
757              
758 0           $self->reset_cmd_counter;
759             }
760              
761             =head3 process_batch
762              
763             Create the slurm submission script from the slurm template
764             Write out template, submission job, and infile for parallel runner
765              
766             =cut
767              
768             #TODO think of more informative sub name
769             #TODO split this into process_arrays and process_batches
770              
771             sub process_batch {
772 0     0 1   my $self = shift;
773              
774 0           my $ok;
775              
776             #TODO Rework this so we only get the arrayids we need the first time around
777 0 0         $ok = $self->join_scheduler_ids(':') if $self->has_scheduler_ids;
778              
779 0           my $count_by = $self->prepare_batch_indexes;
780              
781 0           for ( my $x = 0 ; $x <= scalar @{$count_by} ; $x++ ) {
  0            
782 0           my $batch_indexes = $count_by->[$x];
783 0 0         next unless $batch_indexes;
784              
785 0           my $counter = $self->gen_counter_str;
786              
787             ##Create file per submission
788 0           $self->prepare_files;
789              
790             # $DB::single = 2;
791 0           my $array_str = '';
792 0 0         $array_str = $self->gen_array_str($batch_indexes)
793             if $self->can('gen_array_str');
794              
795 0           my $command = $self->process_submit_command($counter);
796              
797 0           $self->process_template( $counter, $command, $ok, $array_str );
798              
799 0           $self->post_process_jobs;
800              
801 0           $self->post_process_batch_indexes($batch_indexes);
802 0           $self->inc_batch_counter;
803             }
804             }
805              
806             =head3 post_process_batch_indexes
807              
808             Put the scheduler_id in each batch
809              
810             =cut
811              
812             sub post_process_batch_indexes {
813 0     0 1   my $self = shift;
814 0           my $batch_indexes = shift;
815              
816 0           my $scheduler_id = $self->jobs->{ $self->current_job }->scheduler_ids->[-1];
817              
818 0           for (
819             my $x = $batch_indexes->{batch_index_start} - 1 ;
820             $x <= $batch_indexes->{batch_index_end} - 1 ;
821             $x++
822             )
823             {
824 0           my $batch = $self->jobs->{ $self->current_job }->batches->[$x];
825 0 0         next unless $batch;
826 0           $batch->{scheduler_id} = $scheduler_id;
827             }
828              
829             }
830              
831             =head3 post_process_jobs
832              
833             =cut
834              
835             sub post_process_jobs {
836 0     0 1   my $self = shift;
837              
838 0           $self->jobs->{ $self->current_job }->submitted(1);
839              
840 0           $self->inc_job_counter;
841             }
842              
843             1;