File Coverage

blib/lib/Qudo/Driver/DBI.pm
Criterion Covered Total %
statement 29 240 12.0
branch 0 40 0.0
condition 0 15 0.0
subroutine 7 33 21.2
pod 0 20 0.0
total 36 348 10.3


line stmt bran cond sub pod time code
1             package Qudo::Driver::DBI;
2              
3 3     3   1445 use strict;
  3         6  
  3         106  
4 3     3   15 use warnings;
  3         5  
  3         144  
5              
6             our $VERSION = '0.03';
7              
8 3     3   4524 use DBI;
  3         40814  
  3         207  
9 3     3   31 use Carp qw/croak/;
  3         7  
  3         206  
10              
11 3     3   1814 use Qudo::Driver::DBI::DBD;
  3         7  
  3         7722  
12              
13             sub init_driver {
14 0     0 0 0 my ($class, $master) = @_;
15              
16 0         0 for my $database (@{$master->{databases}}) {
  0         0  
17 0         0 my $connection = bless {
18             database => $database,
19             dbh => '',
20             dbd => '',
21             }, $class;
22 0         0 $connection->_connect();
23              
24 0         0 my $dbd_type = $connection->{dbh}->{Driver}->{Name};
25 0         0 $connection->{dbd} = Qudo::Driver::DBI::DBD->new($dbd_type);
26              
27 0         0 $master->set_connection($database->{dsn}, $connection);
28             }
29             }
30              
31             sub _connect {
32 0     0   0 my $self = shift;
33            
34 0 0       0 $self->{dbh} = DBI->connect(
35             $self->{database}->{dsn},
36             $self->{database}->{username},
37             $self->{database}->{password},
38 0         0 { RaiseError => 1, PrintError => 0, AutoCommit => 1, %{ $self->{database}->{connect_options} || {} } }
39             );
40             }
41              
42             sub dbh{
43 0     0 0 0 my $self = shift;
44              
45 0         0 return $self->{dbh};
46             }
47              
48             sub job_status_list {
49 0     0 0 0 my ($self, $args) = @_;
50              
51 0         0 my $sql = q{
52             SELECT
53             job_status.id,
54             job_status.func_id,
55             job_status.arg,
56             job_status.uniqkey,
57             job_status.status,
58             job_status.job_start_time,
59             job_status.job_end_time
60             FROM
61             job_status
62             };
63              
64 0         0 my @funcs;
65 0 0       0 if( $args->{funcs} ){
66 0 0       0 if( ref($args->{funcs}) eq 'ARRAY' ){
67 0         0 @funcs = @{$args->{funcs}};
  0         0  
68             }
69             else{
70 0         0 push @funcs , $args->{funcs};
71             }
72              
73 0         0 $sql .= sprintf( q{
74             INNER JOIN
75             func
76             ON
77             job_status.func_id = func.id
78 0         0 WHERE (func.name IN (%s) )}, join(',',map{'?'} @funcs) );
79             }
80              
81 0         0 $sql .= q{ LIMIT ? OFFSET ? };
82              
83 0         0 my $sth = $self->_execute(
84             $sql,
85             [ @funcs , $args->{limit} ,$args->{offset} ]
86             );
87              
88 0         0 my @job_status_list;
89 0         0 while (my $row = $sth->fetchrow_hashref) {
90 0         0 push @job_status_list, $row;
91             }
92 0         0 return \@job_status_list;
93             }
94              
95             sub job_count {
96 0     0 0 0 my ($self , $funcs) = @_;
97              
98 0         0 my @bind;
99 0 0       0 if( ref $funcs eq 'ARRAY' ){
    0          
100 0         0 @bind = @{$funcs};
  0         0  
101             }
102             elsif( defined $funcs ){
103 0         0 push @bind , $funcs;
104             }
105              
106 0         0 my $sql = q{
107             SELECT
108             COUNT(job.id) AS count
109             FROM
110             job
111             };
112              
113 0 0       0 if( scalar @bind ){
114 0         0 $sql .= q{
115             INNER JOIN
116             func ON job.func_id = func.id
117             WHERE
118             };
119 0         0 $sql .= $self->_join_func_name( \@bind );
120             }
121              
122 0         0 my $sth = $self->_execute(
123             $sql,
124             \@bind
125             );
126              
127 0         0 my $ret = $sth->fetchrow_hashref();
128 0         0 return $ret->{count};
129             }
130              
131             sub job_list {
132 0     0 0 0 my ($self, $limit, $funcs) = @_;
133              
134 0         0 my $sql = $self->_search_job_sql();
135 0         0 $sql .= q{
136             WHERE
137             job.grabbed_until <= ?
138             AND
139             job.run_after <= ?
140             };
141 0         0 my @bind = $self->get_server_time;
142 0         0 push @bind, $self->get_server_time;
143              
144             # func.name
145 0 0       0 if( $funcs ){
146 0         0 $sql .= q{ AND }. $self->_join_func_name($funcs);
147 0         0 push @bind , @{$funcs};
  0         0  
148             }
149              
150             # limit
151 0         0 $sql .= q{LIMIT ?};
152 0         0 push @bind , $limit;
153              
154 0         0 my $sth = $self->_execute(
155             $sql,
156             \@bind
157             );
158              
159 0         0 my $code = $self->_get_job_data( $sth );
160              
161 0         0 my @jobs;
162 0         0 while (1) {
163 0         0 my $row = $code->();
164 0 0       0 last unless $row;
165 0         0 push @jobs, $row;
166             }
167 0         0 return \@jobs;
168             }
169              
170             sub exception_list {
171 0     0 0 0 my ($self, $args) = @_;
172              
173 0         0 my @bind = ();
174 0         0 my $limit = $args->{limit};
175 0         0 my $offset = $args->{offset};
176 0   0     0 my $funcs = $args->{funcs} || '';
177 0         0 my $sql = q{
178             SELECT
179             exception_log.id,
180             exception_log.func_id,
181             exception_log.exception_time,
182             exception_log.message,
183             exception_log.uniqkey,
184             exception_log.arg,
185             exception_log.retried
186             FROM
187             exception_log
188             };
189              
190             # funcs
191 0 0       0 if ($funcs) {
192 0         0 $sql .= q{
193             INNER JOIN
194             func
195             ON
196             exception_log.func_id = func.id
197             WHERE
198             };
199 0         0 $sql .= $self->_join_func_name($funcs);
200 0         0 push @bind , @{$funcs};
  0         0  
201             }
202              
203             # limit
204 0 0       0 if( $limit ){
205 0         0 $sql .= q{ LIMIT ? };
206 0         0 push @bind , $limit;
207             }
208              
209             #offset
210 0 0       0 if( $offset ){
211 0         0 $sql .= q{OFFSET ?};
212 0         0 push @bind , $offset;
213             }
214              
215 0         0 my $sth = $self->_execute(
216             $sql,
217             \@bind
218             );
219              
220 0         0 my @exception_list;
221 0         0 while (my $row = $sth->fetchrow_hashref) {
222 0         0 push @exception_list, $row;
223             }
224 0         0 return \@exception_list;
225             }
226              
227              
228             sub lookup_job {
229 0     0 0 0 my ($self, $job_id) = @_;
230              
231 0         0 my $sql = $self->_search_job_sql();
232              
233 0         0 my @bind;
234             # func.name
235 0 0       0 if( $job_id ){
236 0         0 $sql .= q{ WHERE job.id = ?};
237 0         0 push @bind , $job_id;
238             }
239              
240             # limit
241 0         0 $sql .= q{ LIMIT 1};
242              
243 0         0 my $sth = $self->_execute(
244             $sql,
245             \@bind
246             );
247              
248 0         0 return $self->_get_job_data( $sth );
249             }
250              
251             sub find_job {
252 0     0 0 0 my ($self, $limit, $func_ids) = @_;
253              
254 0         0 my $sql = $self->_search_job_sql();
255 0         0 $sql .= q{
256             WHERE
257             job.grabbed_until <= ?
258             AND
259             job.run_after <= ?
260             };
261 0         0 my @bind = $self->get_server_time;
262 0         0 push @bind, $self->get_server_time;
263              
264             # func.name
265 0 0       0 if( $func_ids ){
266 0         0 $sql .= q{ AND }. $self->_join_func_ids($func_ids);
267 0         0 push @bind , @{$func_ids};
  0         0  
268             }
269              
270             # priority
271 0         0 $sql .= q{ ORDER BY job.priority DESC };
272              
273             # limit
274 0         0 $sql .= q{ LIMIT ? };
275 0         0 push @bind , $limit;
276              
277 0         0 my $sth = $self->_execute(
278             $sql,
279             \@bind
280             );
281              
282 0         0 return $self->_get_job_data( $sth );
283             }
284              
285             sub _search_job_sql {
286 0     0   0 q{
287             SELECT
288             job.id AS id,
289             job.arg AS arg,
290             job.uniqkey AS uniqkey,
291             job.func_id AS func_id,
292             job.grabbed_until AS grabbed_until,
293             job.retry_cnt AS retry_cnt,
294             job.priority AS priority
295             FROM job
296             };
297             }
298              
299             sub _get_job_data {
300 0     0   0 my ($self, $sth) = @_;
301             sub{
302 0     0   0 while (my $row = $sth->fetchrow_hashref) {
303             return +{
304 0         0 job_id => $row->{id},
305             job_arg => $row->{arg},
306             job_uniqkey => $row->{uniqkey},
307             job_grabbed_until => $row->{grabbed_until},
308             job_retry_cnt => $row->{retry_cnt},
309             job_priority => $row->{priority},
310             func_id => $row->{func_id},
311             };
312             }
313 0         0 return;
314 0         0 };
315             }
316              
317             sub grab_a_job {
318 0     0 0 0 my ($self, %args) = @_;
319              
320 0         0 my $sql = q{
321             UPDATE
322             job
323             SET
324             grabbed_until = ?
325             WHERE
326             id = ?
327             AND
328             grabbed_until = ?
329             };
330              
331 0         0 my @bind = (
332             $args{grabbed_until},
333             $args{job_id},
334             $args{old_grabbed_until},
335             );
336              
337 0         0 my $sth = $self->_execute(
338             $sql,
339             \@bind
340             );
341              
342 0         0 return $sth->rows;
343             }
344              
345             sub logging_exception {
346 0     0 0 0 my ($self, $args) = @_;
347              
348 0         0 my $sql = q{
349             INSERT INTO exception_log
350             ( func_id , message , uniqkey, arg, exception_time, retried)
351             VALUES
352             ( ? , ? , ?, ?, ?, ?)
353             };
354 0         0 my @bind = (
355             $args->{func_id} ,
356             $args->{message} ,
357             $args->{uniqkey} ,
358             $args->{arg} ,
359             time(),
360             0,
361             );
362              
363 0         0 my $sth = $self->_execute(
364             $sql,
365             \@bind
366             );
367              
368 0         0 return;
369             }
370              
371             sub set_job_status{
372 0     0 0 0 my ($self, $args) = @_;
373              
374 0         0 my @column = keys %{$args};
  0         0  
375 0         0 my $sql = $self->_build_insert_sql(
376             'job_status',
377             \@column
378             );
379              
380 0         0 my @bind = map {$args->{$_}} @column;
  0         0  
381              
382 0         0 $self->_execute(
383             $sql,
384             \@bind
385             );
386              
387 0         0 return;
388             }
389              
390             sub get_server_time {
391 0     0 0 0 my $self = shift;
392              
393 0         0 my $unixtime_sql = $self->{dbd}->sql_for_unixtime;
394 0         0 my $time;
395 0         0 eval {
396 0         0 $time = $self->{dbh}->selectrow_array("SELECT $unixtime_sql");
397             };
398 0 0       0 if ($@) { $time = time }
  0         0  
399 0         0 return $time;
400             }
401              
402             sub enqueue {
403 0     0 0 0 my ($self, $args) = @_;
404              
405 0   0     0 $args->{enqueue_time} ||= time;
406 0   0     0 $args->{grabbed_until} ||= 0;
407 0   0     0 $args->{retry_cnt} ||= 0;
408 0   0     0 $args->{run_after} = time + ($args->{run_after}||0);
409 0   0     0 $args->{priority} ||= 0;
410              
411 0         0 my @column = keys %{$args};
  0         0  
412 0         0 my $sql = $self->_build_insert_sql(
413             'job',
414             \@column
415             );
416 0         0 my @bind = map {$args->{$_}} @column;
  0         0  
417              
418 0         0 my $sth_ins = $self->_execute(
419             $sql,
420             \@bind
421             );
422              
423              
424 0         0 my $id = $self->{dbd}->last_insert_id($self->{dbh}, $sth_ins);
425 0         0 my $sth_sel = $self->_execute(
426             q{SELECT * FROM job WHERE id = ?} ,
427             [ $id ]
428             );
429              
430 0         0 my $ret_sel = $sth_sel->fetchrow_hashref();
431 0 0       0 return $ret_sel ? $ret_sel->{id} : undef;
432             }
433              
434             sub reenqueue {
435 0     0 0 0 my ($self, $job_id, $args) = @_;
436              
437 0         0 my $sql = q{
438             UPDATE
439             job
440             SET
441             enqueue_time = ?,
442             run_after = ?,
443             retry_cnt = ?,
444             grabbed_until = ?
445             WHERE
446             id = ?
447             };
448              
449 0   0     0 my @bind = (
450             time,
451             (time + ($args->{retry_delay}||0) ),
452             $args->{retry_cnt},
453             $args->{grabbed_until},
454             $job_id,
455             );
456              
457 0         0 my $sth = $self->_execute(
458             $sql,
459             \@bind
460             );
461              
462 0         0 return $sth->rows;
463             }
464              
465              
466             sub dequeue {
467 0     0 0 0 my ($self, $args) = @_;
468              
469 0         0 my $sth = $self->_execute(
470             q{DELETE FROM job WHERE id = ?} ,
471             [ $args->{id} ]
472             );
473              
474 0         0 return $sth->rows;
475             }
476              
477              
478             sub get_func_id {
479 0     0 0 0 my ($self, $funcname) = @_;
480            
481 0         0 my $sth_sel = $self->_execute(
482             q{SELECT * FROM func WHERE name = ?} ,
483             [ $funcname ]
484             );
485              
486 0         0 my $func_id;
487 0         0 my $ret_hashref = $sth_sel->fetchrow_hashref();
488 0 0       0 if ( $ret_hashref ){
489 0         0 $func_id = $ret_hashref->{id};
490             }
491             else{
492 0         0 my $sth_ins = $self->_execute(
493             q{INSERT INTO func ( name ) VALUES ( ? )} ,
494             [ $funcname ]
495             );
496              
497 0         0 $sth_sel->execute( $funcname );
498 0         0 my $ret_hashref = $sth_sel->fetchrow_hashref();
499 0 0       0 if ( $ret_hashref ){
500 0         0 $func_id = $ret_hashref->{id};
501             }
502             }
503              
504 0         0 return $func_id;
505             }
506              
507             sub get_func_name {
508 0     0 0 0 my ($self, $funcid) = @_;
509              
510 0         0 my $sth = $self->_execute(
511             q{SELECT * FROM func WHERE id = ?} ,
512             [ $funcid ]
513             );
514              
515 0         0 my $ret_hashref = $sth->fetchrow_hashref();
516 0 0       0 return $ret_hashref ? $ret_hashref->{name} : undef;
517             }
518              
519             sub retry_from_exception_log {
520 0     0 0 0 my ($self, $exception_log_id) = @_;
521              
522 0         0 $self->_execute(
523             q{UPDATE exception_log SET retried = 1 WHERE id = ?},
524             [$exception_log_id]
525             );
526             }
527              
528             sub _execute {
529 0     0   0 my ($self, $sql, $bind) = @_;
530              
531 0         0 my $sth;
532 0         0 eval {
533 0         0 $sth = $self->{dbh}->prepare($sql);
534 0         0 $sth->execute(@{$bind});
  0         0  
535             };
536 0 0       0 if ($@) { croak $@ }
  0         0  
537 0         0 $sth;
538             }
539              
540             sub _join_func_ids{
541 0     0   0 my ($self , $func_ids ) = @_;
542              
543 0         0 my $func_in_sql = sprintf(
544             q{ job.func_id IN (%s) } ,
545 0         0 join(',', map { '?' } @{$func_ids} )
  0         0  
546             );
547              
548 0         0 return $func_in_sql;
549             }
550              
551             sub _join_func_name{
552 3     3   14 my ($self , $funcs ) = @_;
553              
554 4         11 my $func_name = sprintf(
555             q{ func.name IN (%s) } ,
556 3         5 join(',', map { '?' } @{$funcs} )
  3         9  
557             );
558              
559 3         15 return $func_name;
560             }
561              
562             sub _build_insert_sql{
563 2     2   788 my( $self , $table , $column_ary_ref ) = @_;
564              
565 2         5 my $sql = qq{ INSERT INTO $table ( };
566 2         3 $sql .= join ' , ' , @{$column_ary_ref};
  2         5  
567 2         4 $sql .= ' ) VALUES ( ';
568 2         3 $sql .= join( ' , ', ('?') x @{$column_ary_ref} );
  2         5  
569 2         3 $sql .= ' )';
570              
571 2         5 return $sql;
572             }
573              
574             sub func_from_name {
575 0     0 0   my ($self, $funcname) = @_;
576              
577 0           my $result;
578 0           while ( ! $result ) {
579 0           my $sth = $self->_execute(q{
580             SELECT id, name FROM func WHERE name = ? LIMIT 1
581             }, [$funcname]);
582 0           my $couner = 0;
583 0           while ( my $row = $sth->fetchrow_hashref() ) {
584 0           $result = $row;
585             }
586 0 0         if ( ! $result ) {
587 0           $self->_execute(q{
588             INSERT INTO func (name) VALUES (?)
589             }, [ $funcname ]);
590             }
591             }
592 0           return $result;
593             }
594              
595             sub func_from_id {
596 0     0 0   my ($self, $funcid) = @_;
597 0           my $result;
598 0           my $sth = $self->_execute(q{
599             SELECT id, name FROM func WHERE id = ? LIMIT 1
600             }, [$funcid]);
601 0           my $couner = 0;
602 0           while ( my $row = $sth->fetchrow_hashref() ) {
603 0           $result = $row;
604             }
605 0           return $result;
606             }
607              
608             1;
609              
610             __END__