File Coverage

blib/lib/Qudo/Manager.pm
Criterion Covered Total %
statement 18 145 12.4
branch 0 32 0.0
condition 0 13 0.0
subroutine 6 34 17.6
pod 9 26 34.6
total 33 250 13.2


line stmt bran cond sub pod time code
1             package Qudo::Manager;
2 33     33   253 use strict;
  33         65  
  33         1145  
3 33     33   234 use warnings;
  33         52  
  33         797  
4 33     33   15549 use Qudo::Job;
  33         72  
  33         883  
5 33     33   189 use Carp ();
  33         56  
  33         610  
6 33     33   31573 use UNIVERSAL::require;
  33         71452  
  33         323  
7 33     33   1038 use Scalar::Util qw/weaken/;
  33         61  
  33         75241  
8              
9             sub new {
10 0     0 1   my $class = shift;
11              
12 0           my $self = bless {
13             driver_for => '',
14             shuffled_databases => '',
15             find_job_limit_size => '',
16             retry_seconds => '',
17             fanc_map => +{},
18             default_hooks => [],
19             default_plugins => [],
20             hooks => +{},
21             plugin => +{},
22             abilities => [],
23             @_
24             }, $class;
25 0           weaken($self->{qudo});
26              
27 0           $self->register_hooks(@{$self->{default_hooks}});
  0            
28 0           $self->register_plugins(@{$self->{default_plugins}});
  0            
29 0           $self->register_abilities(@{$self->{abilities}});
  0            
30              
31 0           $self;
32             }
33              
34 0     0 1   sub driver_for { $_[0]->{qudo}->driver_for($_[1]) }
35 0     0 1   sub shuffled_databases { $_[0]->{qudo}->shuffled_databases() }
36              
37 0     0 1   sub plugin { $_[0]->{plugin} }
38 0     0 0   sub hooks { $_[0]->{hooks} }
39              
40             sub can_do {
41 0     0 0   my ($self, $funcname) = @_;
42 0           $self->{func_map}->{$funcname} = 1;
43             }
44              
45             sub register_abilities {
46 0     0 1   my ($self, @abilities) = @_;
47              
48 0           for my $ability (@abilities) {
49 0           $self->can_do($ability);
50             }
51             }
52              
53             sub has_abilities {
54 0     0 0   keys %{$_[0]->{func_map}};
  0            
55             }
56              
57             sub register_plugins {
58 0     0 1   my ($self, @plugins) = @_;
59              
60 0           for my $plugin (@plugins) {
61              
62 0           my ($plugin_name, $code);
63              
64 0 0         if (ref $plugin eq 'HASH') {
    0          
65 0           my $klass = $plugin->{name};
66 0 0         $klass->require or Carp::croak $@;
67 0           ($plugin_name, $code) = $klass->load($plugin->{option});
68             } elsif (not ref $plugin) {
69 0 0         $plugin->require or Carp::croak $@;
70 0           ($plugin_name, $code) = $plugin->load();
71             } else {
72 0           Carp::croak 'register_plugins require HASH or SCALAR.';
73             }
74              
75 0           $self->{plugin}->{$plugin_name} = $code;
76             }
77             }
78              
79             sub call_hook {
80 0     0 0   my ($self, $hook_point, $args) = @_;
81              
82 0           for my $module (keys %{$self->hooks->{$hook_point}}) {
  0            
83 0           my $code = $self->hooks->{$hook_point}->{$module};
84 0           $code->($args);
85             }
86             }
87              
88             sub register_hooks {
89 0     0 0   my ($self, @hook_modules) = @_;
90              
91 0           for my $module (@hook_modules) {
92 0 0         $module->require or Carp::croak $@;
93 0           $module->load($self);
94             }
95             }
96              
97             sub global_register_hooks {
98 0     0 1   warn q{global_register_hooks method is deprecated. Use 'register_hooks' instead.};
99 0           shift->register_hooks(@_);
100             }
101              
102             sub unregister_hooks {
103 0     0 0   my ($self, @hook_modules) = @_;
104              
105 0           for my $module (@hook_modules) {
106 0           $module->unload($self);
107             }
108             }
109              
110             sub global_unregister_hooks {
111 0     0 1   warn q{global_unregister_hooks method is deprecated. Use 'unregister_hooks' instead.};
112 0           shift->unregister_hooks(@_);
113             }
114              
115             sub funcname_to_id {
116 0     0 0   my ($self, $funcname, $db) = @_;
117              
118 0 0         $self->{_func_cache}->{$db}->{funcname2id}->{$funcname} or do {
119 0           my $func = $self->driver_for($db)->func_from_name($funcname);
120 0           $self->{_func_cache}->{$db}->{funcname2id}->{$funcname} = $func->{id};
121 0           $self->{_func_cache}->{$db}->{funcid2name}->{$func->{id}} = $func->{name};
122              
123             };
124 0           $self->{_func_cache}->{$db}->{funcname2id}->{$funcname};
125             }
126              
127             sub funcnames_to_ids {
128 0     0 0   my ($self, $db) = @_;
129             [
130 0           map {
131 0           $self->funcname_to_id($_, $db)
132 0           } keys %{$self->{func_map}}
133             ];
134             }
135              
136             sub funcid_to_name {
137 0     0 0   my ($self, $funcid, $db) = @_;
138              
139 0 0         $self->{_func_cache}->{$db}->{funcid2name}->{$funcid} or do {
140 0           my $func = $self->driver_for($db)->func_from_id($funcid);
141 0           $self->{_func_cache}->{$db}->{funcname2id}->{$func->name} = $func->{id};
142 0           $self->{_func_cache}->{$db}->{funcidename}->{$funcid} = $func->{name};
143             };
144 0           $self->{_func_cache}->{$db}->{funcid2name}->{$funcid};
145             }
146              
147             sub enqueue {
148 0     0 1   my ($self, $funcname, $arg, $db) = @_;
149              
150 0   0       $db ||= $self->shuffled_databases;
151 0           my $func_id = $self->funcname_to_id($funcname, $db);
152              
153 0   0       my $args = +{
      0        
154             func_id => $func_id,
155             arg => $arg->{arg},
156             uniqkey => $arg->{uniqkey},
157             run_after => $arg->{run_after}||0,
158             priority => $arg->{priority} ||0,
159             };
160              
161 0           $self->call_hook('pre_enqueue', $args);
162 0           $self->call_hook('serialize', $args);
163              
164 0           my $job_id = $self->driver_for($db)->enqueue($args);
165 0 0         return if $arg->{suppress_job};
166              
167 0           my $job = $self->lookup_job($job_id, $db);
168              
169 0           $self->call_hook('post_enqueue', $job);
170              
171 0           return $job;
172             }
173              
174             sub reenqueue {
175 0     0 0   my ($self, $job, $args) = @_;
176              
177 0           my $db = $self->shuffled_databases;
178 0           $self->driver_for($db)->reenqueue($job->id, $args);
179              
180 0           return $self->lookup_job($job->id, $db);
181             }
182              
183             sub dequeue {
184 0     0 0   my ($self, $job) = @_;
185 0           $self->driver_for($job->db)->dequeue({id => $job->id});
186             }
187              
188             sub work_once {
189 0     0 0   my $self = shift;
190              
191 0           my $job = $self->find_job;
192 0 0         return unless $job;
193              
194 0           my $worker_class = $job->funcname;
195 0 0         return unless $worker_class;
196              
197 0           $self->call_hook('deserialize', $job);
198 0           $self->call_hook('pre_work', $job);
199              
200 0           $worker_class->work_safely($job);
201              
202 0           $self->call_hook('post_work', $job);
203              
204 0           return 1;
205             }
206              
207             sub lookup_job {
208 0     0 0   my ($self, $job_id, $db) = @_;
209              
210 0           my $callback = $self->driver_for($db)->lookup_job($job_id);
211 0           my $job_data = $callback->();
212 0 0         return $job_data ? $self->_data2job($job_data, $db) : undef;
213             }
214              
215             sub find_job {
216 0     0 0   my $self = shift;
217              
218 0           for my $db ($self->shuffled_databases) {
219 0 0         return unless keys %{$self->{func_map}};
  0            
220 0           my $func_ids = $self->funcnames_to_ids($db);
221 0           my $callback = $self->driver_for($db)->find_job($self->{find_job_limit_size}, $func_ids);
222              
223 0           return $self->_grab_a_job($callback, $db);
224             }
225             }
226              
227             sub _data2job {
228 0     0     my ($self, $job_data, $db) = @_;
229              
230 0           Qudo::Job->new(
231             manager => $self,
232             job_data => $job_data,
233             db => $db,
234             );
235             }
236              
237             sub _grab_a_job {
238 0     0     my ($self, $callback, $db) = @_;
239              
240 0           while (1) {
241 0           my $job_data = $callback->();
242 0 0         last unless $job_data;
243              
244 0           my $old_grabbed_until = $job_data->{job_grabbed_until};
245 0 0         my $server_time = $self->driver_for($db)->get_server_time
246             or die "expected a server time";
247              
248 0           my $worker_class = $self->funcid_to_name($job_data->{func_id}, $db);
249 0           my $grab_job = $self->driver_for($db)->grab_a_job(
250             grabbed_until => ($server_time + $worker_class->grab_for),
251             job_id => $job_data->{job_id},
252             old_grabbed_until => $old_grabbed_until,
253             );
254 0 0         next if $grab_job < 1;
255              
256 0           return $self->_data2job($job_data, $db);
257             }
258 0           return;
259             }
260              
261             sub job_failed {
262 0     0 0   my ($self, $job, $message) = @_;
263              
264 0   0       $self->driver_for($job->db)->logging_exception(
265             {
266             func_id => $job->func_id,
267             message => $message,
268             uniqkey => $job->uniqkey,
269             arg => $job->arg_origin || $job->arg,
270             }
271             );
272             }
273              
274             sub set_job_status {
275 0     0 0   my ($self, $job, $status) = @_;
276              
277 0   0       $self->driver_for($job->db)->set_job_status(
278             {
279             func_id => $job->func_id,
280             arg => $job->arg_origin || $job->arg,
281             uniqkey => $job->uniqkey,
282             status => $status,
283             job_start_time => $job->job_start_time,
284             job_end_time => time(),
285             }
286             );
287             }
288              
289             sub enqueue_from_failed_job {
290 0     0 0   my ($self, $exception_log, $db) = @_;
291              
292 0 0         if ( $exception_log->{retried} ) {
293 0           Carp::carp('this exception is already retried');
294 0           return;
295             }
296 0           my $args = +{
297             func_id => $exception_log->{func_id},
298             arg => $exception_log->{arg},
299             uniqkey => $exception_log->{uniqkey},
300             };
301              
302 0           my $job_id = $self->driver_for($db)->enqueue($args);
303              
304 0           $self->driver_for($db)->retry_from_exception_log($exception_log->{id});
305              
306 0           $self->lookup_job($job_id, $db);
307             }
308              
309             1;
310              
311             =head1 NAME
312              
313             Qudo::Manager - qudo manager class.
314              
315             =head1 DESCRIPTION
316              
317             Qudo::Manager is job managiment base class.
318             It the job enqueue, dequeue, lookup and more.
319              
320             =head1 METHODS
321              
322             =head2 new
323              
324             get Qudo::Manager instance.
325             It is called from L usually.
326              
327             =head2 driver_for
328              
329             get database driver from some databases.
330              
331             =head2 shuffled_databases
332              
333             get shuffled databases.
334              
335             =head2 plugin
336              
337             get plugin instances.
338              
339             =head2 register_abilities
340              
341             The function that this manager is processing is registered.
342              
343             =head2 register_plugins
344              
345             Plugin is set for manager.
346              
347             =head2 global_register_hooks
348              
349             Hooks are set for manager.
350              
351             =head2 global_unregister_hooks
352              
353             Hooks are unset from manager.
354              
355             =head2 Cenqueue( $funcname, $args )>
356              
357             enqueue the job.
358              
359             =over 4
360              
361             =item * C<$funcname>
362              
363             =item * C<$args>
364              
365             =over 4
366              
367             =item * C
368              
369             job argments.
370              
371             =item * C
372              
373             job unique key.
374              
375             =item * C
376              
377             the value you want to check <= against on the run_after column
378             require int value.
379              
380             =item * C
381              
382             job priority.
383             require int value.
384              
385             =back
386              
387             =back
388              
389             =cut
390