File Coverage

blib/lib/Job/Machine/DB.pm
Criterion Covered Total %
statement 21 189 11.1
branch 0 38 0.0
condition 0 62 0.0
subroutine 7 35 20.0
pod 5 27 18.5
total 33 351 9.4


line stmt bran cond sub pod time code
1             package Job::Machine::DB;
2             $Job::Machine::DB::VERSION = '0.22';
3 1     1   3 use strict;
  1         1  
  1         22  
4 1     1   3 use warnings;
  1         1  
  1         19  
5 1     1   2 use Carp qw/croak confess/;
  1         1  
  1         35  
6 1     1   1201 use DBI;
  1         10786  
  1         51  
7 1     1   503 use Data::Serializer;
  1         1777  
  1         22  
8              
9 1     1   4 use constant QUEUE_PREFIX => 'jm:';
  1         1  
  1         55  
10 1     1   4 use constant RESPONSE_PREFIX => 'jmr:';
  1         1  
  1         1479  
11              
12             sub new {
13 0     0 1   my ($class, %args) = @_;
14 0 0 0       croak "No connect information" unless $args{dbh} or $args{dsn};
15 0 0 0       croak "invalid queue" if ref $args{queue} and ref $args{queue} ne 'ARRAY';
16              
17 0 0         $args{dbh_inherited} = 1 if $args{dbh};
18 0   0       $args{user} ||= undef;
19 0   0       $args{password} ||= undef;
20 0   0       $args{db_attr} ||= undef;
21 0   0       $args{dbh} ||= DBI->connect($args{dsn},$args{user},$args{password},$args{db_attr});
22 0   0       $args{database_schema} ||= 'jobmachine';
23 0           return bless \%args, $class;
24             }
25              
26             sub serializer {
27 0     0 0   my ($self) = @_;
28 0   0       my $args = $self->{serializer_args} || {};
29 0   0       $args->{serializer} ||= $self->{serializer} || 'Sereal';
      0        
30 0   0       return $self->{serialize} ||= Data::Serializer->new(%$args);
31             }
32              
33             sub listen {
34 0     0 0   my ($self, %args) = @_;
35 0   0       my $queue = $args{queue} || return undef;
36              
37 0 0         my $prefix = $args{reply} ? RESPONSE_PREFIX : QUEUE_PREFIX;
38 0 0         for my $q (ref $queue ? @$queue : ($queue)) {
39 0           $self->{dbh}->do(qq{listen "$prefix$q";});
40             }
41             }
42              
43             sub unlisten {
44 0     0 1   my ($self, %args) = @_;
45 0   0       my $queue = $args{queue} || return undef;
46              
47 0 0         my $prefix = $args{reply} ? RESPONSE_PREFIX : QUEUE_PREFIX;
48 0 0         for my $q (ref $queue ? @$queue : ($queue)) {
49 0           $self->{dbh}->do(qq{unlisten "$prefix$q";});
50             }
51             }
52              
53             sub notify {
54 0     0 1   my ($self, %args) = @_;
55 0   0       my $queue = $args{queue} || return undef;
56 0           my $payload = $args{payload};
57 0 0         my $prefix = $args{reply} ? RESPONSE_PREFIX : QUEUE_PREFIX;
58 0           $queue = $prefix . $queue;
59 0           my $sql = qq{SELECT pg_notify(?,?)};
60 0           my $task = $self->select_first(
61             sql => $sql,
62             data => [ $queue, $payload],
63             );
64             }
65              
66             sub get_notification {
67 0     0 1   my ($self,$timeout) = @_;
68 0           my $dbh = $self->dbh;
69 0           my $notifies = $dbh->func('pg_notifies');
70 0           return $notifies;
71             }
72              
73             sub set_listen {
74 0     0 1   my ($self,$timeout) = @_;
75 0           my $dbh = $self->dbh;
76 0           my $notifies = $dbh->func('pg_notifies');
77 0 0         if (!$notifies) {
78 0           my $fd = $dbh->{pg_socket};
79 0           vec(my $rfds='',$fd,1) = 1;
80 0           my $n = select($rfds, undef, undef, $timeout);
81 0           $notifies = $dbh->func('pg_notifies');
82             }
83 0   0       return $notifies || [0,0];
84             }
85              
86             sub fetch_work_task {
87 0     0 0   my ($self,$pid) = @_;
88 0 0         my $queue = ref $self->{queue} ? $self->{queue} : [$self->{queue}];
89 0           $self->{current_table} = 'task';
90 0           my $elems = join(',', ('?') x @$queue);
91 0           my $sql = qq{
92             UPDATE "$self->{database_schema}".$self->{current_table} t
93             SET status=100,
94             modified=default
95             FROM "jobmachine".class cx
96             WHERE t.class_id = cx.class_id
97             AND task_id = (
98             SELECT min(task_id)
99             FROM "$self->{database_schema}".$self->{current_table} t
100             JOIN "jobmachine".class c USING (class_id)
101             WHERE t.status=0
102             AND c.name IN ($elems)
103             AND (t.run_after IS NULL
104             OR t.run_after > now())
105             )
106             AND t.status=0
107             RETURNING *
108             ;
109             };
110 0   0       my $task = $self->select_first(
111             sql => $sql,
112             data => $queue
113             ) || return;
114              
115 0           $self->{task_id} = $task->{task_id};
116 0           $task->{data} = $self->serializer->deserialize(delete $task->{parameters});
117 0           return $task;
118             }
119              
120             sub insert_task {
121 0     0 0   my ($self,$data,$queue) = @_;
122 0           my $class = $self->fetch_class($queue);
123 0           $self->{current_table} = 'task';
124 0           my $frozen = $self->serializer->serialize($data);
125 0           my $sql = qq{
126             INSERT INTO "$self->{database_schema}".$self->{current_table}
127             (class_id,parameters,status)
128             VALUES (?,?,?)
129             RETURNING task_id
130             };
131 0           $self->insert(sql => $sql,data => [$class->{class_id},$frozen,0]);
132             }
133              
134             sub set_task_status {
135 0     0 0   my ($self,$status) = @_;
136 0           my $id = $self->task_id;
137 0           $self->{current_table} = 'task';
138 0           my $sql = qq{
139             UPDATE "$self->{database_schema}".$self->{current_table}
140             SET status=?
141             WHERE task_id=?
142             };
143 0           $self->update(sql => $sql,data => [$status,$id]);
144             }
145              
146             sub fetch_class {
147 0     0 0   my ($self,$queue) = @_;
148 0           $self->{current_table} = 'class';
149 0           my $sql = qq{
150             SELECT *
151             FROM "$self->{database_schema}".$self->{current_table}
152             WHERE name=?
153             };
154 0   0       return $self->select_first(sql => $sql,data => [$queue]) || $self->insert_class($queue);
155             }
156              
157             sub insert_class {
158 0     0 0   my ($self,$queue) = @_;
159 0           my $sql = qq{
160             INSERT INTO "$self->{database_schema}".$self->{current_table}
161             (name)
162             VALUES (?)
163             RETURNING class_id
164             };
165 0           $self->select_first(sql => $sql,data => [$queue]);
166             }
167              
168             sub insert_result {
169 0     0 0   my ($self,$data,$queue) = @_;
170 0           $self->{current_table} = 'result';
171 0           my $frozen = $self->serializer->serialize($data);
172 0           my $sql = qq{
173             INSERT INTO "$self->{database_schema}".$self->{current_table}
174             (task_id,result)
175             VALUES (?,?)
176             RETURNING result_id
177             };
178 0           $self->insert(sql => $sql,data => [$self->{task_id},$frozen]);
179             }
180              
181             sub fetch_result {
182 0     0 0   my ($self,$id) = @_;
183 0           $self->{current_table} = 'result';
184 0           my $sql = qq{
185             SELECT *
186             FROM "$self->{database_schema}".$self->{current_table}
187             WHERE task_id=?
188             ORDER BY result_id DESC
189             };
190 0   0       my $result = $self->select_first(sql => $sql,data => [$id]) || return;
191              
192 0           return $self->serializer->deserialize($result->{result});
193             }
194              
195             sub fetch_results {
196 0     0 0   my ($self,$id) = @_;
197 0           $self->{current_table} = 'result';
198 0           my $sql = qq{
199             SELECT *
200             FROM "$self->{database_schema}".$self->{current_table}
201             WHERE task_id=?
202             ORDER BY result_id DESC
203             };
204 0   0       my $results = $self->select_all(sql => $sql,data => [$id]) || return;
205              
206 0           return [map { $self->serializer->deserialize($_->{result}) } @{ $results } ];
  0            
  0            
207             }
208              
209             # 1. Find started tasks that have passed the time limit, most probably because
210             # of a dead worker. (status 100, modified < now - max_runtime)
211             # 2. Trim status so task can be tried again
212              
213             sub revive_tasks {
214 0     0 0   my ($self,$max) = @_;
215 0           $self->{current_table} = 'task';
216 0           my $status = 100;
217 0           my $sql = qq{
218             UPDATE "$self->{database_schema}".$self->{current_table}
219             SET status=0
220             WHERE status=?
221             AND modified < now() - INTERVAL '$max seconds'
222             };
223 0           my $result = $self->do(sql => $sql,data => [$status]);
224 0           return $result;
225             }
226              
227             # 1. Find tasks that have failed too many times (# of result rows > $self->retries
228             # 2. fail them (Set status 900)
229             # There's a hard limit (100) for how many tasks can be failed at one time for
230             # performance resons
231              
232             sub fail_tasks {
233 0     0 0   my ($self,$retries) = @_;
234 0           $self->{current_table} = 'result';
235 0           my $limit = 100;
236 0           my $sql = qq{
237             SELECT task_id
238             FROM "$self->{database_schema}".$self->{current_table}
239             GROUP BY task_id
240             HAVING count(*)>?
241             LIMIT ?
242             };
243 0   0       my $result = $self->select_all(sql => $sql,data => [$retries,$limit]) || return 0;
244 0 0         return 0 unless @$result;
245              
246 0           my $task_ids = join ',',map {$_->{task_id}} @$result;
  0            
247 0           $self->{current_table} = 'task';
248 0           my $status = 900;
249 0           $sql = qq{
250             UPDATE "$self->{database_schema}".$self->{current_table}
251             SET status=?
252             WHERE task_id IN ($task_ids)
253             };
254 0           $self->do(sql => $sql,data => [$status]);
255 0           return scalar @$result;
256             }
257              
258             # 3. Find tasks that should be removed (remove_task < now)
259             # - delete them
260             # - log
261             sub remove_tasks {
262 0     0 0   my ($self,$after) = @_;
263 0 0         return 0 unless $after;
264              
265 0           $self->{current_table} = 'task';
266 0           my $limit = 100;
267 0           my $sql = qq{
268             DELETE FROM "$self->{database_schema}".$self->{current_table}
269             WHERE modified < now() - INTERVAL '$after days'
270             };
271 0           my $result = $self->do(sql => $sql,data => []);
272 0           return $result;
273             }
274              
275             sub select_first {
276 0     0 0   my ($self, %args) = @_;
277 0   0       my $sth = $self->dbh->prepare($args{sql}) || return 0;
278              
279 0 0         unless($sth->execute(@{$args{data}})) {
  0            
280 0           my @c = caller;
281 0           print STDERR "File: $c[1] line $c[2]\n";
282 0 0         print STDERR $args{sql}."\n" if($args{sql});
283 0           return 0;
284             }
285 0           my $r = $sth->fetchrow_hashref();
286 0           $sth->finish();
287 0           return ( $r );
288             }
289              
290             sub select_all {
291 0     0 0   my ($self, %args) = @_;
292 0   0       my $sth = $self->dbh->prepare($args{sql}) || return 0;
293              
294 0   0       $self->set_bind_type($sth,$args{data} || []);
295 0 0         unless($sth->execute(@{$args{data}})) {
  0            
296 0           my @c = caller;
297 0           print STDERR "File: $c[1] line $c[2]\n";
298 0 0         print STDERR $args{sql}."\n" if($args{sql});
299 0           return 0;
300             }
301 0           my @result;
302 0           while( my $r = $sth->fetchrow_hashref) {
303 0           push(@result,$r);
304             }
305 0           $sth->finish();
306 0           return ( \@result );
307             }
308              
309             sub set_bind_type {
310 0     0 0   my ($self,$sth,$data) = @_;
311 0           for my $i (0..scalar(@$data)-1) {
312 0 0         next unless(ref($data->[$i]));
313              
314 0           $sth->bind_param($i+1, undef, $data->[$i]->[1]);
315 0           $data->[$i] = $data->[$i]->[0];
316             }
317 0           return;
318             }
319              
320             sub do {
321 0     0 0   my ($self, %args) = @_;
322 0   0       my $sth = $self->dbh->prepare($args{sql}) || return 0;
323              
324 0           $sth->execute(@{$args{data}});
  0            
325 0           my $rows = $sth->rows;
326 0           $sth->finish();
327 0           return $rows;
328             }
329              
330             sub insert {
331 0     0 0   my ($self, %args) = @_;
332 0   0       my $sth = $self->dbh->prepare($args{sql}) || return 0;
333              
334 0           $sth->execute(@{$args{data}});
  0            
335 0           my $retval = $sth->fetch()->[0];
336 0           $sth->finish();
337 0           return $retval;
338             }
339              
340             sub update {
341 0     0 0   my $self = shift;
342 0           $self->do(@_);
343 0           return;
344             }
345              
346             sub dbh {
347 0   0 0 0   return $_[0]->{dbh} || confess "No database handle";
348             }
349              
350             sub task_id {
351 0   0 0 0   return $_[0]->{task_id} || confess "No task id";
352             }
353              
354             sub disconnect {
355 0 0   0 0   return $_[0]->{dbh}->disconnect if $_[0]->{dbh};
356             }
357              
358             sub DESTROY {
359 0     0     my $self = shift;
360 0 0         $self->disconnect() unless $self->{dbh_inherited};
361 0           return;
362             }
363              
364             1;
365              
366             __END__