File Coverage

blib/lib/Qudo.pm
Criterion Covered Total %
statement 18 75 24.0
branch 0 12 0.0
condition 0 21 0.0
subroutine 6 19 31.5
pod 7 12 58.3
total 31 139 22.3


line stmt bran cond sub pod time code
1             package Qudo;
2 33     33   159 use strict;
  33         46  
  33         1786  
3 33     33   226 use warnings;
  33         47  
  33         1365  
4              
5             our $VERSION = '0.0213';
6              
7 33     33   14430 use Qudo::Manager;
  33         80  
  33         436  
8 33     33   939 use Carp ();
  33         64  
  33         632  
9 33     33   8417 use UNIVERSAL::require;
  33         63  
  33         1593  
10 33     33   814 use List::Util qw/shuffle/;
  33         87  
  33         40454  
11              
12             our $RETRY_SECONDS = 30;
13             our $FIND_JOB_LIMIT_SIZE = 30;
14             our $DEFAULT_DRIVER = 'Skinny';
15             our $EXCEPTION_LIMIT_SIZE = 10;
16             our $EXCEPTION_OFFSET_SIZE = 0;
17             our $JOB_STATUS_LIMIT_SIZE = 10;
18             our $JOB_STATUS_OFFSET_SIZE = 0;
19             our $WORK_DELAY = 5;
20              
21             sub new {
22 0     0 1   my $class = shift;
23              
24 0           my $self = bless {
25             retry_seconds => $RETRY_SECONDS,
26             find_job_limit_size => $FIND_JOB_LIMIT_SIZE,
27             driver_class => $DEFAULT_DRIVER,
28             default_hooks => [],
29             default_plugins => [],
30             manager => '',
31             manager_abilities => [],
32             databases => [],
33             connections => +{},
34             work_delay => $WORK_DELAY,
35             @_,
36             }, $class;
37              
38 0           $self->_setup_driver;
39              
40 0           $self;
41             }
42              
43             sub _setup_driver {
44 0     0     my $self = shift;
45              
46 0           my $driver = 'Qudo::Driver::' . $self->{driver_class};
47 0 0         $driver->use or Carp::croak $@;
48 0           $driver->init_driver($self);
49             }
50              
51             sub set_connection {
52 0     0 0   my ($self, $dsn, $connection) = @_;
53 0           $self->{connections}->{$dsn} = $connection;
54             }
55             sub get_connection {
56 0     0 0   my ($self, $dsn) = @_;
57 0           $self->{connections}->{$dsn};
58             }
59              
60             sub shuffled_databases {
61 0     0 0   my $self = shift;
62 0           my @dsns = keys %{$self->{connections}};
  0            
63 0           return shuffle(@dsns);
64             }
65              
66             sub driver {
67 0     0 0   my ($self, $dsn) = @_;
68 0   0       $dsn ||= $self->shuffled_databases;
69 0           $self->driver_for($dsn);
70             }
71              
72             sub driver_for {
73 0     0 0   my ($self, $dsn) = @_;
74 0           $self->get_connection($dsn);
75             }
76              
77             sub manager {
78 0     0 1   my $self = shift;
79              
80 0   0       $self->{manager} ||= Qudo::Manager->new(
81             qudo => $self,
82             find_job_limit_size => $self->{find_job_limit_size},
83             retry_seconds => $self->{retry_seconds},
84             default_hooks => $self->{default_hooks},
85             default_plugins => $self->{default_plugins},
86             abilities => $self->{manager_abilities},
87             );
88             }
89              
90             sub enqueue {
91 0     0 1   my $self = shift;
92 0           $self->manager->enqueue(@_);
93             }
94              
95             sub work {
96 0     0 1   my ($self, $work_delay) = @_;
97 0   0       $work_delay ||= $self->{work_delay};
98              
99 0           my $manager = $self->manager;
100 0 0         unless ($manager->has_abilities) {
101 0           Carp::croak 'manager dose not have abilities.';
102             }
103              
104 0           while (1) {
105 0 0         sleep $work_delay unless $manager->work_once;
106             }
107             }
108              
109             sub job_count {
110 0     0 1   my ($self, $funcs, $dsn) = @_;
111              
112 0 0         if ($dsn) {
113 0           return $self->driver_for($dsn)->job_count($funcs);
114             }
115              
116 0           my %job_count;
117 0           for my $db ($self->shuffled_databases) {
118 0           $job_count{$db} = $self->driver_for($db)->job_count($funcs);
119             }
120 0           return \%job_count;
121             }
122              
123             sub exception_list {
124 0     0 1   my ($self, $args, $dsn) = @_;
125              
126 0   0       $args->{limit} ||= $EXCEPTION_LIMIT_SIZE;
127 0   0       $args->{offset} ||= $EXCEPTION_OFFSET_SIZE;
128              
129 0 0         if ($dsn) {
130 0           return $self->driver_for($dsn)->exception_list($args);
131             }
132              
133 0           my %exception_list;
134 0           for my $db ($self->shuffled_databases) {
135 0           $exception_list{$db} = $self->driver_for($db)->exception_list($args);
136             }
137 0           return \%exception_list;
138             }
139              
140             sub job_status_list {
141 0     0 1   my ($self, $args, $dsn) = @_;
142              
143 0   0       $args->{limit} ||= $JOB_STATUS_LIMIT_SIZE;
144 0   0       $args->{offset} ||= $JOB_STATUS_OFFSET_SIZE;
145              
146 0 0         if ($dsn) {
147 0           return $self->driver_for($dsn)->job_status_list($args);
148             }
149              
150 0           my %job_status_list;
151 0           for my $db ($self->shuffled_databases) {
152 0           $job_status_list{$db} = $self->driver_for($db)->job_status_list($args);
153             }
154 0           return \%job_status_list;
155             }
156              
157             =head1 NAME
158              
159             Qudo - simple and extensible job queue manager
160              
161             =head1 SYNOPSIS
162              
163             # enqueue job:
164             use Qudo;
165             my $qudo = Qudo->new(
166             driver_class => 'Skinny', # optional.
167             databases => [+{
168             dsn => 'dbi:SQLite:/tmp/qudo.db',
169             username => '',
170             password => '',
171             }],
172             );
173             $qudo->enqueue("Worker::Test", { arg => 'arg', uniqkey => 'uniqkey'});
174            
175             # do work:
176             use Qudo;
177             my $qudo2 = Qudo->new(
178             driver_class => 'Skinny', # optional.
179             databases => [+{
180             dsn => 'dbi:SQLite:/tmp/qudo.db',
181             username => '',
182             password => '',
183             }],
184             manager_abilities => [qw/Worker::Test/],
185             );
186             $qudo2->work(); # boot manager
187             # work work work!
188              
189             =head1 DESCRIPTION
190              
191             Qudo is simple and extensible job queue manager system.
192              
193             Your application can insert job into DB ,that is managed by Qudo.
194             And Your application can get & execute job by Qudo worker.
195             Qudo corresponds to deal with DB as MySQL and SQLite.
196              
197             If you add Hook Point around job's working method ,
198             you can add it easily and many point of work milestone.
199             Qudo is consided about adding Hook Point Flexibility.
200              
201             =head1 USEAGE
202              
203             =head2 Cnew( %args )>
204              
205             Optional members of C<%args> are:
206              
207             =over 4
208              
209             =item * C
210              
211             set Qudo::Driver::(Skinny|DBI).
212             default driver_class is Skinny.
213              
214             =back
215              
216             =over 4
217              
218             =item * C
219              
220             An arrayref of database information. Qudo can use multiple databases,
221             such that if any of them are unavailable,
222             the worker will search for appropriate jobs in the other databases automatically.
223              
224             Each member of the C value should be a hashref containing either:
225              
226             =over 4
227              
228             =item * C
229              
230             The database DSN for this database.
231              
232             =item * C
233              
234             The username to use when connecting to this database.
235              
236             =item * C
237              
238             The password to use when connecting to this database.
239              
240             =back
241              
242             =item * C
243              
244             An arrayref of worker class name.
245             please specify it when moving it by the usage of worker.
246             it is not necessary to specify it for the usage of enqueue client.
247              
248             =item * C
249              
250             The maximum number in which it looks for job by one processing.
251             Qudo default limit 30.
252             please specify it when moving it by the usage of worker.
253             it is not necessary to specify it for the usage of enqueue client.
254              
255             =item * C
256              
257             The number of seconds after which to try reconnecting to apparently dead databases.
258             If not given, Qudo will retry connecting to databases after 30 seconds.
259              
260             =item * C
261              
262             An arrayref of hook class name.
263              
264             =item * C
265              
266             An arrayref of plugin class name.
267              
268             =back
269              
270             =head2 Cmanager>
271              
272             get Qudo::Manager instance.
273             see L
274              
275             =head2 Cenqueue( %args )>
276              
277             see L enqueue method.
278              
279             =head2 Cwork( %args )>
280              
281             Find and perform any jobs $manager can do, forever.
282              
283             When no job is available, the working process will sleep for $delay seconds (or 5, if not specified) before looking again.
284              
285             =head2 Cjob_count( $funcname, $dsn )>
286              
287             Returns a job count infomations.
288             The required arguments :
289              
290             =over 4
291              
292             =item * C
293              
294             the name of the function or a reference to an array of functions.
295              
296             =item * C
297              
298             The database DSN for job count target database.
299              
300             =back
301              
302             =head2 Cexception_list( $args, $dsn )>
303              
304             Returns a job exception infomations.
305             Optional members of C<$args> are:
306              
307             =over 4
308              
309             =item * args
310              
311             =over 4
312              
313             =item * limit
314              
315             get exception log limit size.
316             default by 10.
317              
318             =item * offset
319              
320             get exception log offset size.
321             default by 0.
322              
323             =back
324              
325             =back
326              
327             =over 4
328              
329             =item * C
330              
331             The database DSN for job count target database.
332              
333             =back
334              
335             =head2 Cjob_status_list( $args, $dsn )>
336              
337             Returns a job exception infomations.
338             Optional members of C<$args> are:
339              
340             =over 4
341              
342             =item * args
343              
344             =over 4
345              
346             =item * limit
347              
348             get job_status log limit size.
349             default by 10.
350              
351             =item * offset
352              
353             get job_status log offset size.
354             default by 0.
355              
356             =back
357              
358             =back
359              
360             =over 4
361              
362             =item * C
363              
364             The database DSN for job count target database.
365              
366             =back
367              
368             =head1 REPOS
369              
370             http://github.com/nekokak/qudo/tree/master
371              
372             =head1 AUTHOR
373              
374             Atsushi Kobayashi
375              
376             Masaru Hoshino
377              
378             =head1 COPYRIGHT
379              
380             This program is free software; you can redistribute
381             it and/or modify it under the same terms as Perl itself.
382              
383             The full text of the license can be found in the
384             LICENSE file included with this module.
385              
386             =cut
387              
388             1;
389