File Coverage

blib/lib/Util/Task/Multi.pm
Criterion Covered Total %
statement 12 90 13.3
branch 0 20 0.0
condition 0 5 0.0
subroutine 4 11 36.3
pod 2 5 40.0
total 18 131 13.7


line stmt bran cond sub pod time code
1              
2              
3             =head1 NAME
4              
5             Util::Task::Multi - A special task that actually runs multiple tasks
6              
7             =head1 SYNOPSIS
8              
9             my $task = Util::Task::Multi->new();
10             $task->add_subtask(task1 => Util::Task::Something->new());
11             $task->add_subtask(task2 => Util::Task::SomethingElse->new());
12             $task->execute();
13              
14             =head1 DESCRIPTION
15              
16             This task subclass allows multiple atomic tasks to be run as a single task, coalescing them
17             in the most efficient way possible.
18              
19             =cut
20              
21             package Util::Task::Multi;
22              
23 1     1   1436 use strict;
  1         2  
  1         27  
24 1     1   5 use warnings;
  1         1  
  1         24  
25 1     1   4 use base qw(Util::Task);
  1         2  
  1         82  
26 1     1   5 use Carp;
  1         1  
  1         911  
27              
28             sub new {
29 0     0 0   my ($class) = @_;
30              
31 0           return bless {
32             tasks => {},
33             }, $class;
34             }
35              
36             sub add_subtask {
37 0     0 0   my ($self, $k, $task) = @_;
38              
39 0           $self->{tasks}{$k} = $task;
40             }
41              
42             # This is provided to allow developers to analyse the batching behavior of a particular multi-task,
43             # to find out how the task will be executed and analyse how well the batcher is performing.
44             # (In other words, this is our equivalent of EXPLAIN.)
45             sub batches_for_debugging {
46 0     0 0   my ($self) = @_;
47              
48 0           my $batches = {};
49 0           my $tasks_by_id = {};
50 0           my $task_ids_by_key = {};
51 0           my $progressions_by_id = {};
52 0           my $id_ref_by_id = {};
53 0           my $idx = 1;
54              
55             # Recursively fill the above data structures, flattening out nested Multi-tasks.
56 0           $self->_make_batches($self->{tasks}, $batches, $tasks_by_id, $task_ids_by_key, $progressions_by_id, $id_ref_by_id, \$idx);
57              
58 0           return [ values(%$batches) ];
59             }
60              
61             sub execute {
62 0     0 1   my ($self) = @_;
63              
64 0           my $batches = {};
65 0           my $tasks_by_id = {};
66 0           my $task_ids_by_key = {};
67 0           my $progressions_by_id = {};
68 0           my $id_ref_by_id = {};
69 0           my $idx = 1;
70              
71 0           my $tasks = $self->{tasks};
72              
73             # Recursively fill the above data structures, flattening out nested Multi-tasks.
74 0           my $task_ids_by_k = $self->_make_batches($tasks, $batches, $tasks_by_id, $task_ids_by_key, $progressions_by_id, $id_ref_by_id, \$idx);
75              
76 0           my $results = {};
77              
78 0           while (%$batches) {
79              
80             # By now we've got everything nicely batched up in $batches, so let's execute the batch jobs.
81 0           foreach my $global_batch_key (keys %$batches) {
82 0           my $batch = $batches->{$global_batch_key};
83              
84 0           my ($class, $batch_key, $tasks) = @{$batch};
  0            
85 0           $class->execute_multi($batch_key, $tasks, $results);
86             }
87              
88 0 0         if (%$progressions_by_id) {
89             # If there are any progressions, then we need to run another phase.
90 0           my $next_tasks = {};
91 0           foreach my $task_id (keys %$progressions_by_id) {
92 0           my $progression = $progressions_by_id->{$task_id};
93 0           my $intermediate_result = $results->{$task_id};
94 0           $results->{$task_id} = undef;
95 0           my $next_task = $progression->($intermediate_result);
96              
97 0 0         if (defined($next_task)) {
98 0 0         $next_tasks->{$task_id} = $next_task if defined($next_task);
99             }
100             else {
101             # Leave the result as undef and carry on.
102             }
103             }
104              
105             # Reset and calculate the batches for the next phase.
106 0           $batches = {};
107 0           $tasks_by_id = {};
108 0           $progressions_by_id = {};
109             # We intentionally don't reset $id_ref_by_id because we're going to use it
110             # to update the input-keys-to-task-ids mapping in a moment.
111             # We also leave task_ids_by_key so that we won't re-run coalescable tasks
112             # that we've already run.
113 0           my $task_ids_by_original_task_id = $self->_make_batches($next_tasks, $batches, $tasks_by_id, $task_ids_by_key, $progressions_by_id, $id_ref_by_id, \$idx);
114              
115             # Update $task_ids_by_k to point at the new task ids rather than the old,
116             # so that when we're done we use the final result.
117 0           foreach my $old_task_id (%$task_ids_by_original_task_id) {
118 0 0         if (my $id_ref = $id_ref_by_id->{$old_task_id}) {
119 0           $$id_ref = $task_ids_by_original_task_id->{$old_task_id};
120             }
121             }
122             }
123             else {
124             # We're done!
125 0           last;
126             }
127              
128             }
129              
130             # To avoid copying, we prepare the return value inside the $task_ids_by_k hash, since
131             # we don't need it anymore.
132 0           my $ret = $task_ids_by_k;
133              
134             # This uses the ids it finds in $ret to find the corresponding results and then overwrites the ids
135             # with the actual results. By the time this returns, $ret is full of actual results rather than ids.
136 0           $self->_prepare_response($results, $ret);
137              
138 0           return $ret;
139             }
140              
141             sub _make_batches {
142 0     0     my ($self, $tasks, $batches, $tasks_by_id, $task_ids_by_key, $progressions_by_id, $id_ref_by_id, $idx_ref) = @_;
143              
144 0           my $task_ids_by_k = {};
145              
146 0           foreach my $k (keys(%$tasks)) {
147 0           my $task = $tasks->{$k};
148              
149             # If the caller passed in an $idx_ref then they want us to assign ids. Otherwise, the ids
150             # are already assigned in $k.
151 0           my $task_id = $$idx_ref++;
152              
153 0 0         if ($task->isa('Util::Task::Sequence')) {
154             # If we have a sequence, then we make a note that it's a sequence and then
155             # treat it as if it were its base step for the purposes of batching.
156 0           $progressions_by_id->{$task_id} = $task->progression_function;
157 0           $task = $task->base_task;
158             }
159              
160 0 0         if ($task->isa('Util::Task::Multi')) {
161             # If we have nested Multi-tasks, flatten it all out so that we can
162             # batch the sub-tasks too.
163 0           $task_ids_by_k->{$k} = $task->_make_batches($task->{tasks}, $batches, $tasks_by_id, $task_ids_by_key, $progressions_by_id, $id_ref_by_id, $idx_ref);
164             }
165             else {
166 0           my ($class, $batch_key, $task_key) = $task->batching_keys;
167 0           my $global_batch_key = join("\t", $class, $batch_key);
168 0 0         my $global_task_key = defined($task_key) ? join("\t", $global_batch_key, $task_key) : undef;
169 0   0       $batches->{$global_batch_key} ||= [$class, $batch_key, {}];
170              
171             # Unless we've already encountered another instance of this task
172 0 0 0       unless ($global_task_key && $task_ids_by_key->{$global_task_key}) {
173 0 0         $task_ids_by_key->{$global_task_key} = $task_id if $global_task_key;
174 0           $tasks_by_id->{$task_id} = $task;
175 0           $batches->{$global_batch_key}[2]{$task_id} = $task;
176             }
177              
178 0           $task_ids_by_k->{$k} = $task_id;
179 0           $id_ref_by_id->{$task_id} = \$task_ids_by_k->{$k};
180             }
181             }
182              
183 0           return $task_ids_by_k;
184             }
185              
186             sub _prepare_response {
187 0     0     my ($self, $results, $response) = @_;
188              
189 0           foreach my $k (keys %$response) {
190 0           my $id = $response->{$k};
191              
192 0 0         if (ref $id eq 'HASH') {
193             # This was a nested multi-task, so let's reconstruct the tree.
194 0           my $sub_response = $id;
195 0           $self->_prepare_response($results, $sub_response);
196             }
197             else {
198 0           $response->{$k} = $results->{$id};
199             }
200             }
201             }
202              
203             # If this ever gets called directly then someone's doing something wrong.
204             # execute_multi is only intended to be used by this class's execute implementation.
205             sub execute_multi {
206 0     0 1   Carp::croak("Shouldn't call Util::Task::Multi->execute_multi directly");
207             }
208              
209             1;