File Coverage

blib/lib/Qudo/Driver/DBI.pm
Criterion Covered Total %
statement 29 262 11.0
branch 0 58 0.0
condition 0 15 0.0
subroutine 7 29 24.1
pod 0 17 0.0
total 36 381 9.4


line stmt bran cond sub pod time code
1             package Qudo::Driver::DBI;
2              
3 1     1   397 use strict;
  1         1  
  1         24  
4 1     1   2 use warnings;
  1         1  
  1         30  
5 1     1   1208 use DBI;
  1         11972  
  1         54  
6 1     1   7 use Carp qw/croak/;
  1         1  
  1         44  
7              
8 1     1   370 use Qudo::Driver::DBI::DBD;
  1         2  
  1         1805  
9              
10             sub init_driver {
11 0     0 0 0 my ($class, $master) = @_;
12              
13 0         0 for my $database (@{$master->{databases}}) {
  0         0  
14 0         0 my $connection = bless {
15             database => $database,
16             dbh => '',
17             dbd => '',
18             }, $class;
19 0         0 $connection->_connect();
20              
21 0         0 my $dbd_type = $connection->{dbh}->{Driver}->{Name};
22 0         0 $connection->{dbd} = Qudo::Driver::DBI::DBD->new($dbd_type);
23              
24 0         0 $master->set_connection($database->{dsn}, $connection);
25             }
26             }
27              
28             sub _connect {
29 0     0   0 my $self = shift;
30            
31             $self->{dbh} = DBI->connect(
32             $self->{database}->{dsn},
33             $self->{database}->{username},
34             $self->{database}->{password},
35 0 0       0 { RaiseError => 1, PrintError => 0, AutoCommit => 1, %{ $self->{database}->{connect_options} || {} } }
  0         0  
36             );
37             }
38              
39             sub job_status_list {
40 0     0 0 0 my ($self, $args) = @_;
41              
42 0         0 my $sql = q{
43             SELECT
44             job_status.id,
45             job_status.func_id,
46             job_status.arg,
47             job_status.uniqkey,
48             job_status.status,
49             job_status.job_start_time,
50             job_status.job_end_time
51             FROM
52             job_status
53             };
54              
55 0         0 my @funcs;
56 0 0       0 if( $args->{funcs} ){
57 0 0       0 if( ref($args->{funcs}) eq 'ARRAY' ){
58 0         0 @funcs = @{$args->{funcs}};
  0         0  
59             }
60             else{
61 0         0 push @funcs , $args->{funcs};
62             }
63              
64             $sql .= sprintf( q{
65             INNER JOIN
66             func
67             ON
68             job_status.func_id = func.id
69 0         0 WHERE (func.name IN (%s) )}, join(',',map{'?'} @funcs) );
  0         0  
70             }
71              
72 0         0 $sql .= q{ LIMIT ? OFFSET ? };
73              
74             my $sth = $self->_execute(
75             $sql,
76 0         0 [ @funcs , $args->{limit} ,$args->{offset} ]
77             );
78              
79 0         0 my @job_status_list;
80 0         0 while (my $row = $sth->fetchrow_hashref) {
81 0         0 push @job_status_list, $row;
82             }
83 0         0 return \@job_status_list;
84             }
85              
86             sub job_count {
87 0     0 0 0 my ($self , $funcs) = @_;
88              
89 0         0 my $sql = q{
90             SELECT
91             COUNT(job.id) AS count
92             FROM
93             job, func
94             WHERE
95             job.func_id = func.id
96             };
97 0 0       0 if( $funcs ){
98 0         0 $sql .= q{ AND }. $self->_join_func_name($funcs);
99             }
100              
101 0         0 my $sth = $self->{dbh}->prepare( $sql );
102              
103 0         0 eval{
104 0         0 $sth->execute( @{$funcs} );
  0         0  
105             };
106 0 0       0 if( my $e = $@ ){
107 0         0 croak 'job_count ERROR'.$e;
108             }
109 0         0 my $ret = $sth->fetchrow_hashref();
110 0         0 return $ret->{count};
111             }
112              
113             sub job_list {
114 0     0 0 0 my ($self, $limit, $funcs) = @_;
115              
116 0         0 my $sql = $self->_search_job_sql();
117 0         0 $sql .= q{
118             WHERE
119             job.grabbed_until <= ?
120             AND
121             job.run_after <= ?
122             };
123 0         0 my @bind = $self->get_server_time;
124 0         0 push @bind, $self->get_server_time;
125              
126             # func.name
127 0 0       0 if( $funcs ){
128 0         0 $sql .= q{ AND }. $self->_join_func_name($funcs);
129 0         0 push @bind , @{$funcs};
  0         0  
130             }
131              
132             # limit
133 0         0 $sql .= q{LIMIT ?};
134 0         0 push @bind , $limit;
135              
136 0         0 my $sth = $self->{dbh}->prepare( $sql );
137              
138 0         0 eval{
139 0         0 $sth->execute( @bind );
140             };
141 0 0       0 if( my $e = $@ ){
142 0         0 croak 'job_list ERROR'.$e;
143             }
144              
145 0         0 my $code = $self->_get_job_data( $sth );
146              
147 0         0 my @jobs;
148 0         0 while (1) {
149 0         0 my $row = $code->();
150 0 0       0 last unless $row;
151 0         0 push @jobs, $row;
152             }
153 0         0 return \@jobs;
154             }
155              
156             sub exception_list {
157 0     0 0 0 my ($self, $args) = @_;
158              
159 0         0 my @bind = ();
160 0         0 my $limit = $args->{limit};
161 0         0 my $offset = $args->{offset};
162 0   0     0 my $funcs = $args->{funcs} || '';
163 0         0 my $sql = q{
164             SELECT
165             exception_log.id,
166             exception_log.func_id,
167             exception_log.exception_time,
168             exception_log.message,
169             exception_log.uniqkey,
170             exception_log.arg,
171             exception_log.retried
172             FROM
173             exception_log
174             };
175              
176             # funcs
177 0 0       0 if ($funcs) {
178 0         0 $sql .= q{
179             INNER JOIN
180             func
181             ON
182             exception_log.func_id = func.id
183             WHERE
184             };
185 0         0 $sql .= $self->_join_func_name($funcs);
186 0         0 push @bind , @{$funcs};
  0         0  
187             }
188              
189             # limit
190 0 0       0 if( $limit ){
191 0         0 $sql .= q{ LIMIT ? };
192 0         0 push @bind , $limit;
193             }
194              
195             #offset
196 0 0       0 if( $offset ){
197 0         0 $sql .= q{OFFSET ?};
198 0         0 push @bind , $offset;
199             }
200              
201 0         0 my $sth = $self->{dbh}->prepare( $sql );
202 0         0 eval{
203 0         0 $sth->execute( @bind );
204             };
205 0 0       0 if( my $e = $@ ){
206 0         0 croak 'exception_list ERROR'.$e;
207             }
208 0         0 my @exception_list;
209 0         0 while (my $row = $sth->fetchrow_hashref) {
210 0         0 push @exception_list, $row;
211             }
212 0         0 return \@exception_list;
213             }
214              
215              
216             sub lookup_job {
217 0     0 0 0 my ($self, $job_id) = @_;
218              
219 0         0 my $sql = $self->_search_job_sql();
220              
221 0         0 my @bind;
222             # func.name
223 0 0       0 if( $job_id ){
224 0         0 $sql .= q{ WHERE job.id = ?};
225 0         0 push @bind , $job_id;
226             }
227              
228             # limit
229 0         0 $sql .= q{LIMIT 1};
230              
231 0         0 my $sth = $self->{dbh}->prepare( $sql );
232              
233 0         0 eval{
234 0         0 $sth->execute( @bind );
235             };
236 0 0       0 if( my $e = $@ ){
237 0         0 croak 'lookup_job ERROR'.$e;
238             }
239              
240 0         0 return $self->_get_job_data( $sth );
241             }
242              
243             sub find_job {
244 0     0 0 0 my ($self, $limit, $func_map) = @_;
245              
246 0         0 my $sql = $self->_search_job_sql();
247 0         0 $sql .= q{
248             WHERE
249             job.grabbed_until <= ?
250             AND
251             job.run_after <= ?
252             };
253 0         0 my @bind = $self->get_server_time;
254 0         0 push @bind, $self->get_server_time;
255              
256             # func.name
257 0 0       0 if( $func_map ){
258 0         0 my $keys = [keys %$func_map];
259 0         0 $sql .= q{ AND }. $self->_join_func_name($keys);
260 0         0 push @bind , @{$keys};
  0         0  
261             }
262              
263             # priority
264 0         0 $sql .= q{ ORDER BY job.priority DESC };
265              
266             # limit
267 0         0 $sql .= q{ LIMIT ? };
268 0         0 push @bind , $limit;
269              
270 0         0 my $sth = $self->{dbh}->prepare( $sql );
271              
272 0         0 eval{
273 0         0 $sth->execute( @bind );
274             };
275 0 0       0 if( my $e = $@ ){
276 0         0 croak 'find_job ERROR'.$e;
277             }
278              
279 0         0 return $self->_get_job_data( $sth );
280             }
281              
282             sub _search_job_sql {
283 0     0   0 q{
284             SELECT
285             job.id AS id,
286             job.arg AS arg,
287             job.uniqkey AS uniqkey,
288             job.func_id AS func_id,
289             job.grabbed_until AS grabbed_until,
290             job.retry_cnt AS retry_cnt,
291             job.priority AS priority,
292             func.name AS funcname
293             FROM job
294             INNER JOIN
295             func ON job.func_id = func.id
296             };
297             }
298              
299             sub _get_job_data {
300 0     0   0 my ($self, $sth) = @_;
301             sub{
302 0     0   0 while (my $row = $sth->fetchrow_hashref) {
303             return +{
304             job_id => $row->{id},
305             job_arg => $row->{arg},
306             job_uniqkey => $row->{uniqkey},
307             job_grabbed_until => $row->{grabbed_until},
308             job_retry_cnt => $row->{retry_cnt},
309             job_priority => $row->{priority},
310             func_id => $row->{func_id},
311             func_name => $row->{funcname},
312 0         0 };
313             }
314 0         0 return;
315 0         0 };
316             }
317              
318             sub grab_a_job {
319 0     0 0 0 my ($self, %args) = @_;
320              
321             my $sth = $self->{dbh}->prepare(
322 0         0 q{
323             UPDATE
324             job
325             SET
326             grabbed_until = ?
327             WHERE
328             id = ?
329             AND
330             grabbed_until = ?
331             }
332             );
333              
334 0         0 my $rows;
335 0         0 eval{
336             $rows = $sth->execute(
337             $args{grabbed_until},
338             $args{job_id},
339             $args{old_grabbed_until},
340 0         0 );
341             };
342 0 0       0 if( my $e = $@ ){
343 0         0 croak 'grab_a_job ERROR'.$e;
344 0         0 return;
345             }
346              
347 0         0 return $rows;
348             }
349              
350             sub logging_exception {
351 0     0 0 0 my ($self, $args) = @_;
352              
353             my $sth = $self->{dbh}->prepare(
354 0         0 q{
355             INSERT INTO exception_log
356             ( func_id , message , uniqkey, arg, exception_time, retried)
357             VALUES
358             ( ? , ? , ?, ?, ?, ?)
359             }
360             );
361              
362 0         0 eval{
363             $sth->execute(
364             $args->{func_id} ,
365             $args->{message} ,
366             $args->{uniqkey} ,
367             $args->{arg} ,
368 0         0 time(),
369             0,
370             );
371             };
372 0 0       0 if( my $e = $@ ){
373 0         0 croak 'logging_exception ERROR'.$e;
374             }
375 0         0 return;
376             }
377              
378             sub set_job_status{
379 0     0 0 0 my ($self, $args) = @_;
380              
381 0         0 my @column = keys %{$args};
  0         0  
382 0         0 my $sql = $self->_build_insert_sql(
383             'job_status',
384             \@column
385             );
386              
387 0         0 my @bind = map {$args->{$_}} @column;
  0         0  
388              
389 0         0 $self->_execute(
390             $sql,
391             \@bind
392             );
393              
394 0         0 return;
395             }
396              
397             sub get_server_time {
398 0     0 0 0 my $self = shift;
399              
400 0         0 my $unixtime_sql = $self->{dbd}->sql_for_unixtime;
401 0         0 my $time;
402 0         0 eval {
403 0         0 $time = $self->{dbh}->selectrow_array("SELECT $unixtime_sql");
404             };
405 0 0       0 if ($@) { $time = time }
  0         0  
406 0         0 return $time;
407             }
408              
409             sub enqueue {
410 0     0 0 0 my ($self, $args) = @_;
411              
412 0   0     0 $args->{enqueue_time} ||= time;
413 0   0     0 $args->{grabbed_until} ||= 0;
414 0   0     0 $args->{retry_cnt} ||= 0;
415 0   0     0 $args->{run_after} = time + ($args->{run_after}||0);
416 0   0     0 $args->{priority} ||= 0;
417              
418 0         0 my @column = keys %{$args};
  0         0  
419 0         0 my $sql = $self->_build_insert_sql(
420             'job',
421             \@column
422             );
423              
424 0         0 my $sth_ins = $self->{dbh}->prepare( $sql );
425 0         0 my @bind = map {$args->{$_}} @column;
  0         0  
426 0         0 eval{
427 0         0 $sth_ins->execute( @bind );
428             };
429 0 0       0 if( $@ ){
430 0         0 croak 'enqueue ERROR'.$@;
431             }
432              
433 0         0 my $id = $self->{dbd}->last_insert_id($self->{dbh}, $sth_ins);
434             my $sth_sel = $self->{dbh}->prepare(
435 0         0 q{SELECT * FROM job WHERE id = ?}
436             );
437              
438 0         0 $sth_sel->execute( $id );
439 0         0 my $ret_sel = $sth_sel->fetchrow_hashref();
440 0 0       0 return $ret_sel ? $ret_sel->{id} : undef;
441             }
442              
443             sub reenqueue {
444 0     0 0 0 my ($self, $job_id, $args) = @_;
445              
446             my $sth = $self->{dbh}->prepare(
447 0         0 q{
448             UPDATE
449             job
450             SET
451             enqueue_time = ?,
452             run_after = ?,
453             retry_cnt = ?
454             WHERE
455             id = ?
456             }
457             );
458              
459 0         0 my $row;
460 0         0 eval{
461             $row = $sth->execute(
462             time,
463             (time + ($args->{retry_delay}||0) ),
464             $args->{retry_cnt},
465 0   0     0 $job_id,
466             );
467             };
468 0 0       0 if( my $e = $@ ){
469 0         0 croak 'reenqueue ERROR'.$e;
470 0         0 return;
471             }
472              
473 0         0 return $row;
474             }
475              
476              
477             sub dequeue {
478 0     0 0 0 my ($self, $args) = @_;
479             my $sth = $self->{dbh}->prepare(
480 0         0 q{DELETE FROM job WHERE id = ?}
481             );
482              
483 0         0 my $row;
484 0         0 eval{
485 0         0 $row = $sth->execute( $args->{id} );
486             };
487 0 0       0 if( my $e = $@ ){
488 0         0 croak 'dequeue ERROR'.$e;
489             }
490              
491 0         0 return $row;
492             }
493              
494              
495             sub get_func_id {
496 0     0 0 0 my ($self, $funcname) = @_;
497            
498             my $sth_sel = $self->{dbh}->prepare(
499 0         0 q{SELECT * FROM func WHERE name = ?}
500             );
501              
502 0         0 $sth_sel->execute( $funcname );
503 0         0 my $func_id;
504 0         0 my $ret_hashref = $sth_sel->fetchrow_hashref();
505 0 0       0 if ( $ret_hashref ){
506 0         0 $func_id = $ret_hashref->{id};
507             }
508             else{
509             my $sth_ins = $self->{dbh}->prepare(
510 0         0 q{INSERT INTO func ( name ) VALUES ( ? )}
511             );
512 0         0 eval{
513 0         0 $sth_ins->execute( $funcname );
514             };
515 0 0       0 if( my $e = $@ ){
516 0         0 croak $e;
517             }
518 0         0 $sth_sel->execute( $funcname );
519 0         0 my $ret_hashref = $sth_sel->fetchrow_hashref();
520 0 0       0 if ( $ret_hashref ){
521 0         0 $func_id = $ret_hashref->{id};
522             }
523             }
524              
525 0         0 return $func_id;
526             }
527              
528             sub get_func_name {
529 0     0 0 0 my ($self, $funcid) = @_;
530              
531 0         0 my $sth;
532 0         0 eval {
533             $sth = $self->{dbh}->prepare(
534 0         0 q{SELECT * FROM func WHERE id = ?}
535             );
536 0         0 $sth->execute( $funcid );
537             };
538 0 0       0 if ($@) { croak $@ }
  0         0  
539              
540 0         0 my $ret_hashref = $sth->fetchrow_hashref();
541 0 0       0 return $ret_hashref ? $ret_hashref->{name} : undef;
542             }
543              
544             sub retry_from_exception_log {
545 0     0 0 0 my ($self, $exception_log_id) = @_;
546              
547 0         0 $self->_execute(
548             q{UPDATE exception_log SET retried = 1 WHERE id = ?},
549             [$exception_log_id]
550             );
551             }
552              
553             sub _execute {
554 0     0   0 my ($self, $sql, $bind) = @_;
555              
556 0         0 my $sth;
557 0         0 eval {
558 0         0 $sth = $self->{dbh}->prepare($sql);
559 0         0 $sth->execute(@{$bind});
  0         0  
560             };
561 0 0       0 if ($@) { croak $@ }
  0         0  
562 0         0 $sth;
563             }
564              
565             sub _join_func_name{
566 3     3   23 my ($self , $funcs ) = @_;
567              
568             my $func_name = sprintf(
569             q{ func.name IN (%s) } ,
570 3         5 join(',', map { '?' } @{$funcs} )
  4         11  
  3         11  
571             );
572              
573 3         13 return $func_name;
574             }
575              
576             sub _build_insert_sql{
577 2     2   22 my( $self , $table , $column_ary_ref ) = @_;
578              
579 2         6 my $sql = qq{ INSERT INTO $table ( };
580 2         3 $sql .= join ' , ' , @{$column_ary_ref};
  2         6  
581 2         3 $sql .= ' ) VALUES ( ';
582 2         3 $sql .= join( ' , ', ('?') x @{$column_ary_ref} );
  2         5  
583 2         2 $sql .= ' )';
584              
585 2         5 return $sql;
586             }
587              
588             1;
589              
590