File Coverage

blib/lib/HPC/Runner/Command/submit_jobs/Utils/Scheduler/ResolveDeps/AssignTaskDeps.pm
Criterion Covered Total %
statement 12 95 12.6
branch 0 14 0.0
condition n/a
subroutine 4 14 28.5
pod 4 10 40.0
total 20 133 15.0


line stmt bran cond sub pod time code
1             package HPC::Runner::Command::submit_jobs::Utils::Scheduler::ResolveDeps::AssignTaskDeps;
2              
3 1     1   428 use Moose::Role;
  1         3  
  1         5  
4              
5 1     1   4158 use Memoize;
  1         2  
  1         53  
6 1     1   6 use List::MoreUtils qw(first_index indexes uniq);
  1         2  
  1         11  
7 1     1   723 use List::Util qw(first);
  1         2  
  1         909  
8              
9             =head3 update_job_scheduler_ids_by_task
10              
11             #TODO do this after the all batches for a single job have been passed
12              
13             #DEPRACATED job_scheduler_ids_by_array
14              
15             for job at jobs
16             for batch at batches
17             for task at tasks
18              
19             =cut
20              
21             sub update_job_scheduler_deps_by_task {
22 0     0 0   my $self = shift;
23              
24 0           $self->app_log->info(
25             'Updating task dependencies. This may take some time...');
26              
27 0           foreach my $job ( $self->all_schedules ) {
28 0 0         next if $self->jobs->{$job}->submission_failure;
29 0           $self->current_job($job);
30 0           $self->batch_scheduler_ids_by_task;
31             }
32              
33             ##TODO consider changing this to each schedule
34 0           $self->update_job_deps;
35             }
36              
37             sub batch_scheduler_ids_by_task {
38 0     0 0   my $self = shift;
39              
40 0 0         return unless $self->jobs->{ $self->current_job }->has_deps;
41              
42             $self->batch_counter(
43 0           $self->jobs->{ $self->current_job }->{batch_index_start} );
44              
45 0           my $scheduler_index = $self->process_all_batch_deps;
46              
47 0           while ( my ( $dep_job, $v ) = each %{$scheduler_index} ) {
  0            
48 0           my @dep_jobs = @{$v};
  0            
49 0           my $dep_indices = $scheduler_index->{$dep_job};
50 0           $self->dep_scheduler_ids_by_task( $dep_job, $dep_indices );
51             }
52              
53             }
54              
55             has 'dep_scheduler_ids_by_task_cache' => (
56             is => 'rw',
57             isa => 'HashRef',
58             default => sub { {} },
59             clearer => 'clear_dep_scheduler_ids_by_task_cache',
60             );
61              
62             sub dep_scheduler_ids_by_task {
63 0     0 0   my $self = shift;
64 0           my $dep_job = shift;
65 0           my $dep_indices = shift;
66              
67 0           for ( my $y = 0 ; $y < scalar @{$dep_indices} ; $y++ ) {
  0            
68             ##This is the current_batch_index
69              
70 0           my $batch_ref =
71             $self->check_find_dep_indexes_cache( $self->current_job, $y );
72              
73 0           for ( my $z = 0 ; $z < scalar @{ $dep_indices->[$y] } ; $z++ ) {
  0            
74             #This is the dependency_batch_index
75              
76 0           my $dep_index = $dep_indices->[$y]->[$z];
77 0           my $dep_ref =
78             $self->check_find_dep_indexes_cache( $dep_job, $dep_index );
79              
80 0           my $array_dep = $self->build_task_deps(
81             $batch_ref->[0], $dep_ref->[0],
82             $batch_ref->[1], $dep_ref->[1],
83             );
84              
85 0           $self->push_array_deps($array_dep);
86             }
87             }
88              
89 0           $self->clean_array_deps;
90             }
91              
92             =head3 assign_scheduler_deps
93              
94             Jobs should only depend upon all jobs they need - not all jobs from the previous dep
95              
96             =cut
97              
98             sub assign_scheduler_deps {
99 0     0 1   my $self = shift;
100 0           my $batch_scheduler_id = shift;
101 0           my $dep_scheduler_id = shift;
102             # my $batch_task_index = shift;
103             # my $dep_task_index = shift;
104              
105 0           my $array_dep = [ $batch_scheduler_id, $dep_scheduler_id, ];
106              
107 0           return $array_dep;
108             }
109              
110             sub check_find_dep_indexes_cache {
111 0     0 0   my $self = shift;
112 0           my $job = shift;
113 0           my $index = shift;
114              
115 0 0         if ( exists $self->dep_scheduler_ids_by_task_cache->{$job}->{$index} ) {
116 0           return $self->dep_scheduler_ids_by_task_cache->{$job}->{$index};
117             }
118             else {
119             my $scheduler_id =
120 0           $self->jobs->{$job}->{batches}->[$index]->{scheduler_id};
121              
122             my $task_index =
123             $self->jobs->{$job}->batches->[$index]->cmd_start +
124 0           $self->jobs->{$job}->{cmd_start};
125              
126 0           $self->dep_scheduler_ids_by_task_cache->{$job}->{$index} =
127             [ $scheduler_id, $task_index ];
128              
129 0           return $self->dep_scheduler_ids_by_task_cache->{$job}->{$index};
130             }
131             }
132              
133             sub push_array_deps {
134 0     0 0   my $self = shift;
135 0           my $array_dep = shift;
136              
137 0 0         if ( $self->exists_array_dep( $array_dep->[0] ) ) {
138 0           push( @{ $self->array_deps->{ $array_dep->[0] } }, $array_dep->[1] );
  0            
139             }
140             else {
141 0           $self->array_deps->{ $array_dep->[0] } = [ $array_dep->[1] ];
142             }
143             }
144              
145             sub clean_array_deps {
146 0     0 0   my $self = shift;
147              
148 0           while ( my ( $k, $v ) = each %{ $self->array_deps } ) {
  0            
149 0           my @uniq = uniq( @{$v} );
  0            
150 0           @uniq = sort @uniq;
151 0           $self->array_deps->{$k} = \@uniq;
152             }
153             }
154              
155             =head3 update_scheduler_ids_by_array
156              
157             Update the scheduler ids by the task/batch
158              
159             #TODO There must be a better way to do this
160              
161             =cut
162              
163             sub update_scheduler_ids_by_array {
164 0     0 1   my $self = shift;
165              
166 0           my $current_batch_index = $self->batch_counter - 1;
167              
168 0           my $index_in_batch =
169             $self->index_in_batch( $self->current_job, $current_batch_index );
170              
171 0 0         if ( !defined $index_in_batch ) {
172 0           $self->app_log->warn( "Job "
173             . $self->current_job
174             . " does not have an appropriate index. If you think are reaching this in error please report the issue to github.\n"
175             );
176 0           return;
177             }
178              
179             my $batch_scheduler_id =
180 0           $self->jobs->{ $self->current_job }->scheduler_ids->[$index_in_batch];
181              
182             ##IF there is no batch id, that means something went wrong with submission
183 0 0         $self->current_batch->scheduler_id($batch_scheduler_id)
184             if $batch_scheduler_id;
185             }
186              
187             =head3 index_in_batch
188              
189             Using job arrays each job is divided into one or batches of size self->max_array_size
190              
191             max_array_size = 10
192             001_job.sh --array=1-10
193             002_job.sh --array=10-11
194              
195             self->jobs->{a_job}->all_batch_indexes
196              
197             job001 => [
198             {batch_index_start => 1, batch_index_end => 10 },
199             {batch_index_start => 11, batch_index_end => 20}
200             ]
201              
202             The index argument is zero indexed, and our counters (job_counter, batch_counter) are 1 indexed
203              
204             =cut
205              
206             sub index_in_batch {
207 0     0 1   my $self = shift;
208 0           my $job = shift;
209 0           my $index = shift;
210              
211 0           $index++;
212              
213 0           my $batches = $self->jobs->{$job}->batch_indexes;
214 0           return check_batch_index( $batches, $index );
215             }
216              
217             memoize('check_batch_index');
218              
219             sub check_batch_index {
220             my $batches = shift;
221             my $search_index = shift;
222              
223             my $x = first_index {
224             search_index( $_, $search_index );
225             }
226             @{$batches};
227              
228             return $x if defined $x;
229             return undef;
230             }
231              
232             memoize('search_index');
233              
234             sub search_index {
235             my $batch_index = shift;
236             my $search_index = shift;
237             my $batch_start = $batch_index->{batch_index_start};
238             my $batch_end = $batch_index->{batch_index_end};
239              
240             if ( $search_index >= $batch_start && $search_index <= $batch_end ) {
241             return 1;
242             }
243             return undef;
244             }
245              
246             =head3 scheduler_ids_by_batch
247              
248             ##DEPRACATED
249              
250             =cut
251              
252             sub scheduler_ids_by_batch {
253 0     0 1   my $self = shift;
254              
255 0           my $scheduler_index = $self->process_batch_deps( $self->current_batch );
256              
257 0           my @jobs = keys %{$scheduler_index};
  0            
258              
259 0           my @scheduler_ids = ();
260              
261 0           foreach my $job (@jobs) {
262 0           my $batch_index = $scheduler_index->{$job};
263 0           my $dep_scheduler_ids = $self->jobs->{$job}->scheduler_ids;
264              
265 0           foreach my $index ( @{$batch_index} ) {
  0            
266 0           push( @scheduler_ids, $dep_scheduler_ids->[$index] );
267             }
268             }
269              
270 0 0         $self->scheduler_ids( \@scheduler_ids ) if @scheduler_ids;
271             }
272              
273             1;