File Coverage

blib/lib/Qudo/Manager.pm
Criterion Covered Total %
statement 18 125 14.4
branch 0 20 0.0
condition 0 19 0.0
subroutine 6 31 19.3
pod 0 23 0.0
total 24 218 11.0


line stmt bran cond sub pod time code
1             package Qudo::Manager;
2 30     30   114 use strict;
  30         33  
  30         719  
3 30     30   98 use warnings;
  30         33  
  30         692  
4 30     30   7402 use Qudo::Job;
  30         53  
  30         714  
5 30     30   135 use Carp;
  30         30  
  30         1746  
6 30     30   13361 use UNIVERSAL::require;
  30         31655  
  30         252  
7 30     30   8924 use Qudo::HookLoader;
  30         57  
  30         240  
8              
9             sub new {
10 0     0 0   my $class = shift;
11              
12 0           my $self = bless {
13             driver_for => '',
14             shuffled_databases => '',
15             find_job_limit_size => '',
16             retry_seconds => '',
17             fanc_map => +{},
18             default_hooks => [],
19             default_plugins => [],
20             hooks => +{},
21             plugin => +{},
22             abilities => [],
23             @_
24             }, $class;
25              
26 0           $self->global_register_hooks(@{$self->{default_hooks}});
  0            
27 0           $self->register_plugins(@{$self->{default_plugins}});
  0            
28 0           $self->register_abilities(@{$self->{abilities}});
  0            
29              
30 0           return $self;
31             }
32              
33 0     0 0   sub driver_for { $_[0]->{driver_for}->($_[1]) }
34 0     0 0   sub shuffled_databases { $_[0]->{shuffled_databases}->() }
35 0     0 0   sub plugin { $_[0]->{plugin} }
36              
37             sub register_abilities {
38 0     0 0   my ($self, @abilities) = @_;
39              
40 0           for my $ability (@abilities) {
41 0           $self->can_do($ability);
42             }
43             }
44              
45             sub has_abilities {
46 0     0 0   keys %{$_[0]->{func_map}};
  0            
47             }
48              
49             sub register_plugins {
50 0     0 0   my ($self, @plugins) = @_;
51              
52 0           for my $plugin (@plugins) {
53 0 0         $plugin->require or Carp::croak $@;
54 0           my ($plugin_name, $code) = $plugin->load();
55 0           $self->{plugin}->{$plugin_name} = $code;
56             }
57             }
58              
59             sub call_hook {
60 0     0 0   my ($self, $hook_point, $worker_class, $args) = @_;
61              
62 0           for my $module (keys %{$worker_class->hooks->{$hook_point}}) {
  0            
63 0           my $code = $worker_class->hooks->{$hook_point}->{$module};
64 0           $code->($args);
65             }
66              
67 0           for my $module (keys %{$self->hooks->{$hook_point}}) {
  0            
68 0           my $code = $self->hooks->{$hook_point}->{$module};
69 0           $code->($args);
70             }
71             }
72              
73 0     0 0   sub hooks { $_[0]->{hooks} }
74              
75             sub global_register_hooks {
76 0     0 0   my ($self, @hook_modules) = @_;
77              
78 0           Qudo::HookLoader->register_hooks($self, \@hook_modules);
79             }
80              
81             sub global_unregister_hooks {
82 0     0 0   my ($self, @hook_modules) = @_;
83              
84 0           Qudo::HookLoader->unregister_hooks($self, \@hook_modules);
85             }
86              
87             sub can_do {
88 0     0 0   my ($self, $funcname) = @_;
89              
90 0           $funcname->use;
91 0           $self->{func_map}->{$funcname} = 1;
92             }
93              
94             sub funcname_to_id {
95 0     0 0   my ($self, $funcname, $db) = @_;
96 0   0       $self->{_func_cache}->{$db}->{funcname2id}->{$funcname} ||= $self->driver_for($db)->get_func_id( $funcname );
97             }
98              
99             sub funcid_to_name {
100 0     0 0   my ($self, $funcid, $db) = @_;
101 0   0       $self->{_func_cache}->{$db}->{funcid2name}->{$funcid} ||= $self->driver_for($db)->get_func_name( $funcid );
102             }
103              
104             sub enqueue {
105 0     0 0   my ($self, $funcname, $arg) = @_;
106              
107 0           my $db = $self->shuffled_databases;
108 0           my $func_id = $self->funcname_to_id($funcname, $db);
109              
110 0 0         unless ($func_id) {
111 0           croak "$funcname can't get";
112             }
113              
114             my $args = +{
115             func_id => $func_id,
116             arg => $arg->{arg},
117             uniqkey => $arg->{uniqkey},
118             run_after => $arg->{run_after}||0,
119 0   0       priority => $arg->{priority} ||0,
      0        
120             };
121              
122 0           $self->call_hook('pre_enqueue', $funcname, $args);
123 0           $self->call_hook('serialize', $funcname, $args);
124              
125 0           my $job_id = $self->driver_for($db)->enqueue($args);
126 0           my $job = $self->lookup_job($job_id, $db);
127              
128 0           $self->call_hook('post_enqueue', $funcname, $job);
129              
130 0           return $job;
131             }
132              
133             sub reenqueue {
134 0     0 0   my ($self, $job, $args) = @_;
135              
136 0           my $db = $self->shuffled_databases;
137 0           $self->driver_for($db)->reenqueue($job->id, $args);
138              
139 0           return $self->lookup_job($job->id);
140             }
141              
142             sub dequeue {
143 0     0 0   my ($self, $job) = @_;
144 0           $self->driver_for($job->db)->dequeue({id => $job->id});
145             }
146              
147             sub work_once {
148 0     0 0   my $self = shift;
149              
150 0           my $job = $self->find_job;
151 0 0         return unless $job;
152              
153 0           my $worker_class = $job->funcname;
154 0 0         return unless $worker_class;
155              
156 0           $self->call_hook('deserialize', $worker_class, $job);
157 0           $self->call_hook('pre_work', $worker_class, $job);
158              
159 0           my $res = $worker_class->work_safely($self, $job);
160              
161 0           $self->call_hook('post_work', $worker_class, $job);
162              
163 0           return $res;
164             }
165              
166             sub lookup_job {
167 0     0 0   my ($self, $job_id, $db) = @_;
168              
169 0   0       $db ||= $self->shuffled_databases;
170              
171 0           my $callback = $self->driver_for($db)->lookup_job($job_id);
172 0           my $job_data = $callback->();
173 0 0         return $job_data ? $self->_data2job($job_data, $db) : undef;
174             }
175              
176             sub find_job {
177 0     0 0   my $self = shift;
178              
179 0           for my $db ($self->shuffled_databases) {
180 0 0         return unless keys %{$self->{func_map}};
  0            
181 0           my $callback = $self->driver_for($db)->find_job($self->{find_job_limit_size}, $self->{func_map});
182              
183 0           return $self->_grab_a_job($callback, $db);
184             }
185             }
186              
187             sub _data2job {
188 0     0     my ($self, $job_data, $db) = @_;
189              
190 0           Qudo::Job->new(
191             manager => $self,
192             job_data => $job_data,
193             db => $db,
194             );
195             }
196              
197             sub _grab_a_job {
198 0     0     my ($self, $callback, $db) = @_;
199              
200 0           while (1) {
201 0           my $job_data = $callback->();
202 0 0         last unless $job_data;
203              
204 0           my $old_grabbed_until = $job_data->{job_grabbed_until};
205 0 0         my $server_time = $self->driver_for($db)->get_server_time
206             or die "expected a server time";
207              
208 0           my $worker_class = $job_data->{func_name};
209             my $grab_job = $self->driver_for($db)->grab_a_job(
210             grabbed_until => ($server_time + $worker_class->grab_for),
211             job_id => $job_data->{job_id},
212 0           old_grabbed_until => $old_grabbed_until,
213             );
214 0 0         next if $grab_job < 1;
215              
216 0           return $self->_data2job($job_data, $db);
217             }
218 0           return;
219             }
220              
221             sub job_failed {
222 0     0 0   my ($self, $job, $message) = @_;
223              
224 0   0       $self->driver_for($job->db)->logging_exception(
225             {
226             func_id => $job->func_id,
227             message => $message,
228             uniqkey => $job->uniqkey,
229             arg => $job->arg_origin || $job->arg,
230             }
231             );
232             }
233              
234             sub set_job_status {
235 0     0 0   my ($self, $job, $status) = @_;
236              
237 0   0       $self->driver_for($job->db)->set_job_status(
238             {
239             func_id => $job->func_id,
240             arg => $job->arg_origin || $job->arg,
241             uniqkey => $job->uniqkey,
242             status => $status,
243             job_start_time => $job->job_start_time,
244             job_end_time => time(),
245             }
246             );
247             }
248              
249             sub enqueue_from_failed_job {
250 0     0 0   my ($self, $exception_log, $db) = @_;
251              
252 0 0         if ( $exception_log->{retried} ) {
253 0           Carp::carp('this exception is already retried');
254 0           return;
255             }
256             my $args = +{
257             func_id => $exception_log->{func_id},
258             arg => $exception_log->{arg},
259             uniqkey => $exception_log->{uniqkey},
260 0           };
261              
262 0           my $job_id = $self->driver_for($db)->enqueue($args);
263              
264 0           $self->driver_for($db)->retry_from_exception_log($exception_log->{id});
265              
266 0           $self->lookup_job($job_id, $db);
267             }
268              
269             1;
270