File Coverage

blib/lib/Job/Machine/DB.pm
Criterion Covered Total %
statement 21 237 8.8
branch 0 48 0.0
condition 0 70 0.0
subroutine 7 42 16.6
pod 27 34 79.4
total 55 431 12.7


line stmt bran cond sub pod time code
1             package Job::Machine::DB;
2             $Job::Machine::DB::VERSION = '0.25';
3 1     1   3 use strict;
  1         2  
  1         23  
4 1     1   4 use warnings;
  1         1  
  1         23  
5 1     1   4 use Carp qw/croak confess/;
  1         1  
  1         44  
6 1     1   2720 use DBI;
  1         13745  
  1         57  
7 1     1   2014 use Data::Serializer;
  1         2282  
  1         27  
8              
9 1     1   8 use constant QUEUE_PREFIX => 'jm:';
  1         1  
  1         81  
10 1     1   4 use constant RESPONSE_PREFIX => 'jmr:';
  1         1  
  1         2410  
11              
12              
13             sub new {
14 0     0 1   my ($class, %args) = @_;
15 0 0 0       croak "No connect information" unless $args{dbh} or $args{dsn};
16 0 0 0       croak "invalid queue" if ref $args{queue} and ref $args{queue} ne 'ARRAY';
17              
18 0 0         $args{dbh_inherited} = 1 if $args{dbh};
19 0   0       $args{user} ||= undef;
20 0   0       $args{password} ||= undef;
21 0   0       $args{db_attr} ||= undef;
22 0   0       $args{dbh} ||= DBI->connect($args{dsn},$args{user},$args{password},$args{db_attr});
23 0   0       $args{database_schema} ||= 'jobmachine';
24 0           return bless \%args, $class;
25             }
26              
27              
28             sub serializer {
29 0     0 1   my ($self) = @_;
30 0   0       my $args = $self->{serializer_args} || {};
31 0   0       $args->{serializer} ||= $self->{serializer} || 'Sereal';
      0        
32 0   0       return $self->{serialize} ||= Data::Serializer->new(%$args);
33             }
34              
35              
36             sub listen {
37 0     0 1   my ($self, %args) = @_;
38 0   0       my $queue = $args{queue} || return undef;
39              
40 0 0         my $prefix = $args{reply} ? RESPONSE_PREFIX : QUEUE_PREFIX;
41 0 0         for my $q (ref $queue ? @$queue : ($queue)) {
42 0           $self->{dbh}->do(qq{listen "$prefix$q";});
43             }
44             }
45              
46              
47             sub unlisten {
48 0     0 1   my ($self, %args) = @_;
49 0   0       my $queue = $args{queue} || return undef;
50              
51 0 0         my $prefix = $args{reply} ? RESPONSE_PREFIX : QUEUE_PREFIX;
52 0 0         for my $q (ref $queue ? @$queue : ($queue)) {
53 0           $self->{dbh}->do(qq{unlisten "$prefix$q";});
54             }
55             }
56              
57              
58             sub notify {
59 0     0 1   my ($self, %args) = @_;
60 0   0       my $queue = $args{queue} || return undef;
61 0           my $payload = $args{payload};
62 0 0         my $prefix = $args{reply} ? RESPONSE_PREFIX : QUEUE_PREFIX;
63 0           $queue = $prefix . $queue;
64 0           my $sql = qq{SELECT pg_notify(?,?)};
65 0           my $task = $self->select_first(
66             sql => $sql,
67             data => [ $queue, $payload],
68             );
69             }
70              
71              
72             sub get_notification {
73 0     0 1   my ($self,$timeout) = @_;
74 0           my $dbh = $self->dbh;
75 0           my $notifies = $dbh->func('pg_notifies');
76 0           return $notifies;
77             }
78              
79              
80             sub set_listen {
81 0     0 1   my ($self,$timeout) = @_;
82 0           my $dbh = $self->dbh;
83 0           my $notifies = $dbh->func('pg_notifies');
84 0 0         if (!$notifies) {
85 0           my $fd = $dbh->{pg_socket};
86 0           vec(my $rfds='',$fd,1) = 1;
87 0           my $n = select($rfds, undef, undef, $timeout);
88 0           $notifies = $dbh->func('pg_notifies');
89             }
90 0   0       return $notifies || [0,0];
91             }
92              
93              
94             sub fetch_work_task {
95 0     0 1   my $self = shift;
96 0 0         my $queue = ref $self->{queue} ? $self->{queue} : [$self->{queue}];
97 0           $self->{current_table} = 'task';
98 0           my $elems = join(',', ('?') x @$queue);
99 0           my $sql = qq{
100             UPDATE "$self->{database_schema}".$self->{current_table} t
101             SET status=100,
102             modified=default
103             FROM "jobmachine".class cx
104             WHERE t.class_id = cx.class_id
105             AND task_id = (
106             SELECT min(task_id)
107             FROM "$self->{database_schema}".$self->{current_table} t
108             JOIN "jobmachine".class c USING (class_id)
109             WHERE t.status=0
110             AND c.name IN ($elems)
111             AND (t.run_after IS NULL
112             OR t.run_after > now())
113             )
114             AND t.status=0
115             RETURNING *
116             ;
117             };
118 0   0       my $task = $self->select_first(
119             sql => $sql,
120             data => $queue
121             ) || return;
122              
123 0           $self->{task_id} = $task->{task_id};
124 0           $task->{data} = $self->serializer->deserialize(delete $task->{parameters});
125 0           return $task;
126             }
127              
128              
129             sub insert_task {
130 0     0 1   my ($self,$data,$queue) = @_;
131 0           my $class = $self->fetch_class($queue);
132 0           $self->{current_table} = 'task';
133 0           my $frozen = $self->serializer->serialize($data);
134 0           my $sql = qq{
135             INSERT INTO "$self->{database_schema}".$self->{current_table}
136             (class_id,parameters,status)
137             VALUES (?,?,?)
138             RETURNING task_id
139             };
140 0           $self->insert(sql => $sql,data => [$class->{class_id},$frozen,0]);
141             }
142              
143              
144             sub set_task_status {
145 0     0 1   my ($self,$status) = @_;
146 0           my $id = $self->task_id;
147 0           $self->{current_table} = 'task';
148 0           my $sql = qq{
149             UPDATE "$self->{database_schema}".$self->{current_table}
150             SET status=?
151             WHERE task_id=?
152             };
153 0           $self->update(sql => $sql,data => [$status,$id]);
154             }
155              
156              
157             sub fetch_class {
158 0     0 1   my ($self,$queue) = @_;
159 0           $self->{current_table} = 'class';
160 0           my $sql = qq{
161             SELECT *
162             FROM "$self->{database_schema}".$self->{current_table}
163             WHERE name=?
164             };
165 0   0       return $self->select_first(sql => $sql,data => [$queue]) || $self->insert_class($queue);
166             }
167              
168              
169             sub fetch_task {
170 0     0 1   my ($self,$id) = @_;
171 0           $self->{current_table} = 'task';
172 0           my $sql = qq{
173             SELECT *
174             FROM "$self->{database_schema}".$self->{current_table}
175             JOIN "$self->{database_schema}".class USING (class_id)
176             WHERE task_id=?
177             };
178 0 0         my $task = $self->select_first(sql => $sql,data => [$id]) or return;
179              
180 0           $task->{frozen} = $task->{parameters};
181 0           $task->{parameters} = $self->serializer->deserialize($task->{parameters});
182 0           return $task;
183             }
184              
185              
186             sub insert_class {
187 0     0 1   my ($self,$queue) = @_;
188 0           my $sql = qq{
189             INSERT INTO "$self->{database_schema}".$self->{current_table}
190             (name)
191             VALUES (?)
192             RETURNING class_id
193             };
194 0           $self->select_first(sql => $sql,data => [$queue]);
195             }
196              
197              
198             sub insert_result {
199 0     0 1   my ($self,$data) = @_;
200 0           $self->{current_table} = 'result';
201 0           my @columns = qw/task_id result/;
202 0           my @values = ($self->{task_id});
203 0 0         if (ref $data eq 'HASH') {
204 0           push @columns, 'resulttype';
205 0           my $type = delete $data->{type};
206 0           push @values, $self->serializer->serialize($data), $type;
207             } else {
208 0           push @values, $self->serializer->serialize($data);
209             }
210 0           my $columns = join ', ', @columns;
211 0           my $qs = join(',', ('?') x @columns);
212 0           my $sql = qq{
213             INSERT INTO "$self->{database_schema}".$self->{current_table}
214             ($columns)
215             VALUES ($qs)
216             RETURNING result_id
217             };
218 0           $self->insert(sql => $sql,data => \@values);
219             }
220              
221              
222             sub fetch_result {
223 0     0 1   my ($self,$result_id) = @_;
224 0           $self->{current_table} = 'result';
225 0           my $sql = qq{
226             SELECT *
227             FROM "$self->{database_schema}".$self->{current_table}
228             WHERE result_id=?
229             };
230 0   0       my $result = $self->select_first(sql => $sql,data => [$result_id]) || return;
231              
232 0           my $r = $self->serializer->deserialize($result->{result});
233 0           $result->{result} = $r;
234 0           return $result;
235             }
236              
237              
238             sub fetch_first_result {
239 0     0 1   my ($self,$task_id) = @_;
240 0           $self->{current_table} = 'result';
241 0           my $sql = qq{
242             SELECT *
243             FROM "$self->{database_schema}".$self->{current_table}
244             WHERE task_id=?
245             ORDER BY result_id DESC
246             };
247 0   0       my $result = $self->select_first(sql => $sql,data => [$task_id]) || return;
248              
249 0           return $self->serializer->deserialize($result->{result});
250             }
251              
252              
253             sub fetch_results {
254 0     0 1   my ($self,$id) = @_;
255 0           $self->{current_table} = 'result';
256 0           my $sql = qq{
257             SELECT *
258             FROM "$self->{database_schema}".$self->{current_table}
259             WHERE task_id=?
260             ORDER BY result_id DESC
261             };
262 0   0       my $results = $self->select_all(sql => $sql,data => [$id]) || return;
263              
264 0           return [map { {id => $_->{result_id}, type => $_->{resulttype}, result => $self->serializer->deserialize($_->{result}) } } @{ $results } ];
  0            
  0            
265             }
266              
267              
268             sub get_statuses {
269 0     0 1   my ($self) = @_;
270 0           $self->{current_table} = 'task';
271 0           my $sql = qq{
272             SELECT status
273             FROM "$self->{database_schema}".$self->{current_table}
274             GROUP BY status
275             };
276 0   0       my $stats = $self->select_all(sql => $sql) || return;
277 0           return $stats;
278             }
279              
280              
281             sub get_classes {
282 0     0 1   my ($self) = @_;
283 0           $self->{current_table} = 'class';
284 0           my $sql = qq{
285             SELECT *
286             FROM "$self->{database_schema}".$self->{current_table}
287             };
288 0   0       my $stats = $self->select_all(sql => $sql) || return;
289 0           return $stats;
290             }
291              
292              
293             sub get_tasks {
294 0     0 1   my ($self,%args) = @_;
295 0           $self->{current_table} = 'task';
296 0           my ($where_clause, @where_args) = $self->where_clause($args{where});
297 0           my $order_by = $self->order_by($args{order_by});
298 0           my $sql = qq{
299             SELECT *
300             FROM "$self->{database_schema}".$self->{current_table} t
301             JOIN "$self->{database_schema}".class c USING (class_id)
302             $where_clause
303             $order_by
304             };
305 0   0       my $tasks = $self->select_all(sql => $sql,data => \@where_args) || return;
306 0           return $tasks;
307             }
308              
309              
310             sub revive_tasks {
311 0     0 1   my ($self,$max) = @_;
312 0           $self->{current_table} = 'task';
313 0           my $status = 100;
314 0           my $sql = qq{
315             UPDATE "$self->{database_schema}".$self->{current_table}
316             SET status=0
317             WHERE status=?
318             AND modified < now() - INTERVAL '$max seconds'
319             };
320 0           my $result = $self->do(sql => $sql,data => [$status]);
321 0           return $result;
322             }
323              
324              
325             sub fail_tasks {
326 0     0 1   my ($self,$retries) = @_;
327 0           $self->{current_table} = 'result';
328 0           my $limit = 100;
329 0           my $sql = qq{
330             SELECT task_id
331             FROM "$self->{database_schema}".$self->{current_table}
332             GROUP BY task_id
333             HAVING count(*)>?
334             LIMIT ?
335             };
336 0   0       my $result = $self->select_all(sql => $sql,data => [$retries,$limit]) || return 0;
337 0 0         return 0 unless @$result;
338              
339 0           my $task_ids = join ',',map {$_->{task_id}} @$result;
  0            
340 0           $self->{current_table} = 'task';
341 0           my $status = 900;
342 0           $sql = qq{
343             UPDATE "$self->{database_schema}".$self->{current_table}
344             SET status=?
345             WHERE task_id IN ($task_ids)
346             };
347 0           $self->do(sql => $sql,data => [$status]);
348 0           return scalar @$result;
349             }
350              
351              
352             sub remove_tasks {
353 0     0 1   my ($self,$after) = @_;
354 0 0         return 0 unless $after;
355              
356 0           $self->{current_table} = 'task';
357 0           my $limit = 100;
358 0           my $sql = qq{
359             DELETE FROM "$self->{database_schema}".$self->{current_table}
360             WHERE modified < now() - INTERVAL '$after days'
361             };
362 0           my $result = $self->do(sql => $sql,data => []);
363 0           return $result;
364             }
365              
366              
367             sub select_first {
368 0     0 1   my ($self, %args) = @_;
369 0   0       my $sth = $self->dbh->prepare($args{sql}) || return 0;
370              
371 0 0         unless($sth->execute(@{$args{data}})) {
  0            
372 0           my @c = caller;
373 0           print STDERR "File: $c[1] line $c[2]\n";
374 0 0         print STDERR $args{sql}."\n" if($args{sql});
375 0           return 0;
376             }
377 0           my $r = $sth->fetchrow_hashref();
378 0           $sth->finish();
379 0           return ( $r );
380             }
381              
382              
383             sub select_all {
384 0     0 1   my ($self, %args) = @_;
385 0   0       my $sth = $self->dbh->prepare($args{sql}) || return 0;
386 0   0       $self->set_bind_type($sth,$args{data} || []);
387 0 0         unless($sth->execute(@{$args{data}})) {
  0            
388 0           my @c = caller;
389 0           print STDERR "File: $c[1] line $c[2]\n";
390 0 0         print STDERR $args{sql}."\n" if($args{sql});
391 0           return 0;
392             }
393 0           my @result;
394 0           while( my $r = $sth->fetchrow_hashref) {
395 0           push(@result,$r);
396             }
397 0           $sth->finish();
398 0           return ( \@result );
399             }
400              
401             sub set_bind_type {
402 0     0 0   my ($self,$sth,$data) = @_;
403 0           for my $i (0..scalar(@$data)-1) {
404 0 0         next unless(ref($data->[$i]));
405              
406 0           $sth->bind_param($i+1, undef, $data->[$i]->[1]);
407 0           $data->[$i] = $data->[$i]->[0];
408             }
409 0           return;
410             }
411              
412             sub do {
413 0     0 0   my ($self, %args) = @_;
414 0   0       my $sth = $self->dbh->prepare($args{sql}) || return 0;
415              
416 0           $sth->execute(@{$args{data}});
  0            
417 0           my $rows = $sth->rows;
418 0           $sth->finish();
419 0           return $rows;
420             }
421              
422             sub insert {
423 0     0 0   my ($self, %args) = @_;
424 0   0       my $sth = $self->dbh->prepare($args{sql}) || return 0;
425              
426 0           $sth->execute(@{$args{data}});
  0            
427 0           my $retval = $sth->fetch()->[0];
428 0           $sth->finish();
429 0           return $retval;
430             }
431              
432             sub update {
433 0     0 0   my $self = shift;
434 0           $self->do(@_);
435 0           return;
436             }
437              
438             sub dbh {
439 0   0 0 0   return $_[0]->{dbh} || confess "No database handle";
440             }
441              
442             sub task_id {
443 0   0 0 0   return $_[0]->{task_id} || confess "No task id";
444             }
445              
446             sub disconnect {
447 0 0   0 0   return $_[0]->{dbh}->disconnect if $_[0]->{dbh};
448             }
449              
450             sub DESTROY {
451 0     0     my $self = shift;
452 0 0         $self->disconnect() unless $self->{dbh_inherited};
453 0           return;
454             }
455              
456              
457             sub where_clause {
458 0     0 1   my ($self, $where) = @_;
459 0           my $where_clause = join(' AND ', ("$_ = ?") x keys %$where);
460 0 0         $where_clause = "WHERE $where_clause" if $where_clause;
461 0           return $where_clause, values %$where;
462             }
463              
464              
465             sub order_by {
466 0     0 1   my ($self, $order) = @_;
467 0 0         return unless ref $order eq 'HASH';
468              
469 0           my $order_by = join(',', ("$_") x keys %$order);
470 0 0         $order_by = "ORDER BY $order_by" if $order_by;
471 0           return $order_by;
472             }
473              
474             1;
475              
476             __END__