File Coverage

blib/lib/Forks/Queue/SQLite.pm
Criterion Covered Total %
statement 456 602 75.7
branch 171 272 62.8
condition 89 158 56.3
subroutine 51 55 92.7
pod 15 18 83.3
total 782 1105 70.7


line stmt bran cond sub pod time code
1             package Forks::Queue::SQLite;
2 61     61   2136175 use strict;
  61         157  
  61         1976  
3 61     61   359 use warnings;
  61         119  
  61         1673  
4 61     61   322 use Carp;
  61         126  
  61         3008  
5 61     61   12061 use JSON;
  61         154300  
  61         416  
6 61     61   7204 use DBI;
  61         134  
  61         2125  
7 61     61   409 use DBD::SQLite;
  61         155  
  61         2051  
8 61     61   326 use Time::HiRes 'time';
  61         121  
  61         425  
9 61     61   9238 use base 'Forks::Queue';
  61         151  
  61         13257  
10 61     61   1113 use 5.010; # implementation contains // //= operators
  61         229  
11              
12             our $VERSION = '0.14';
13             our ($DEBUG,$XDEBUG);
14             *DEBUG = \$Forks::Queue::DEBUG;
15             *XDEBUG = \$Forks::Queue::XDEBUG;
16              
17             $SIG{IO} = sub { } if $Forks::Queue::NOTIFY_OK;
18              
19             our $jsonizer = JSON->new->allow_nonref(1)->ascii(1);
20              
21             sub new {
22 38     38 1 3191 my $class = shift;
23 38         381 my %opts = (%Forks::Queue::OPTS, @_);
24              
25 38 50 66     428 if ($opts{join} && !$opts{db_file}) {
26 0         0 croak "Forks::Queue::SQLite: db_file opt required with join";
27             }
28 38 50 66     339 if ($opts{file} && !$opts{db_file}) {
29 0         0 carp "file => passed to Forks::Queue::SQLite constructor! ",
30             "You probably meant db_file => ... !";
31             }
32 38   66     632 $opts{db_file} //= _impute_file(); # $opts{file} = $opts{db_file};
33 38   50     300 $opts{limit} //= -1;
34 38   50     225 $opts{on_limit} //= 'fail';
35 38   50     394 $opts{style} //= 'fifo';
36 38         175 my $list = delete $opts{list};
37              
38 38 100 100     1300 if (!$opts{join} && -f $opts{db_file}) {
39 2         1002 carp "Forks::Queue: sqlite db file $opts{db_file} already exists!";
40             }
41              
42 38   66     298 my $exists = $opts{join} && -f $opts{db_file};
43 38         337 $opts{_pid} = [ $$, TID() ];
44             # process id is tied to database handle. If process id doesn't match
45             # $self->{_pid}, we must open a new process id.
46              
47 38         442 my $self = bless { %opts }, $class;
48              
49 38 100       209 if (!$exists) {
50             my $dbh = DBI->connect("dbi:SQLite:dbname=" . $opts{db_file},
51 36         1051 "", "");
52 36         84080 $self->{_dbh} = $opts{_dbh} = $dbh;
53 36 100       138 if (!eval { $self->_init }) {
  36         194  
54 2         338 carp __PACKAGE__, ": db initialization failed";
55 2         28 return;
56             }
57             } else {
58 2         64 $self->_dbh;
59             }
60 36 100       238 if (defined($list)) {
61 5 50       35 if (ref($list) eq 'ARRAY') {
62 5         47 $self->push( @$list );
63             } else {
64 0         0 carp "Forks::Queue::new: 'list' option must be an array ref";
65             }
66             }
67              
68 36         553 return $self;
69             }
70              
71             # wrapper for database operations I expect to succeed, but may fail with
72             # intermittent synchronization issues ("attempt to write to a readonly
73             # database...") on perl v5.10 and v5.12. Pausing and retrying the operation
74             # generally fixes these issues.
75             sub _try {
76 24751     24751   54120 my ($count, $code) = @_;
77 24751 50       58602 $count = 1 if $] >= 5.014;
78 24751         40638 my $z = $code->();
79 24751         104026 my ($f0,$f1) = (1,1);
80 24751         62104 while (!$z) {
81 0 0       0 last if --$count <= 0;
82 0         0 ($f0,$f1)=($f1,$f0+$f1);
83 0         0 my (undef,undef,$lcaller) = caller(0);
84 0 0       0 $DEBUG && print STDERR "retry after ${f0}s: $lcaller\a\n";
85 0         0 sleep $f0;
86 0         0 $z = $code->();
87             }
88 24751         58582 return $z;
89             }
90              
91             sub _init {
92 36     36   110 my $self = shift;
93 36         104 my $dbh = $self->{_dbh};
94              
95 36         603 my $z1 = $dbh->do("CREATE TABLE the_queue (
96             timestamp decimal(27,15), batchid mediumint,
97             item text)");
98 36 100       411813 if (!$z1) {
99 2         404 carp __PACKAGE__, ": error creating init table";
100 2         30 return;
101             }
102              
103 34         551 my $z2 = $dbh->do("CREATE TABLE pids (pid mediumint,tid mediumint)");
104 34 50       387724 if (!$z2) {
105 0         0 carp __PACKAGE__, ": error creating init table";
106 0         0 return;
107             }
108              
109 34         620 my $sth = $dbh->prepare("INSERT INTO pids VALUES (?,?)");
110 34         3913 my $z3 = $sth->execute(@{$self->{_pid}});
  34         367617  
111 34 50       430 if (!$z3) {
112 0         0 carp __PACKAGE__, ": error adding process id to tracker";
113 0         0 return;
114             }
115              
116 34         556 my $z4 = $dbh->do("CREATE TABLE status(key text,value text)");
117 34 50       387277 if (!$z4) {
118 0         0 carp __PACKAGE__, ": error creating init table";
119 0         0 return;
120             }
121              
122 34         650 $self->_status("db_file", $self->{db_file});
123 34         196 $self->_status("owner", "@{$self->{_pid}}");
  34         750  
124 34         366 $self->_status("style", $self->{style});
125 34         373 $self->_status("limit", $self->{limit});
126 34         332 $self->_status("on_limit", $self->{on_limit});
127 34         661 return 1;
128             }
129              
130 28724 50   28724 0 71660 sub TID { $INC{'threads.pm'} ? threads->tid : 0 }
131              
132             sub _dbh {
133 28525     28525   39260 my $self = shift;
134 28525         50219 my $tid = TID();
135 28525 100 100     175915 if ($self->{_dbh} && $$ == $self->{_pid}[0] && $tid == $self->{_pid}[1]) {
      66        
136 28511         160021 return $self->{_dbh};
137             }
138              
139 14         574 $self->{_pid} = [$$,$tid];
140             $self->{_dbh} =
141 14         679 DBI->connect("dbi:SQLite:dbname=".$self->{db_file},"","");
142 14         20254 $self->{_dbh}{AutoCommit} = 1;
143 14 100       263 if (!$self->{_DESTROY}) {
144 12         578 $self->{_dbh}->begin_work;
145 12         702 $self->{_dbh}->do("DELETE FROM pids WHERE pid=$$ AND tid=$tid");
146 12         734590 $self->{_dbh}->do("INSERT INTO pids VALUES ($$,$tid)");
147 12         135032 $self->{_dbh}->commit;
148 12         539 $self->{style} = $self->_status("style");
149 12         165 $self->{limit} = $self->_status("limit");
150 12         145 $self->{on_limit} = $self->_status("on_limit");
151             }
152 14         181 return $self->{_dbh};
153             }
154              
155             sub DESTROY {
156 34     34   8066271 my $self = shift;
157 34         273 $self->{_DESTROY}++;
158 34   33     451 my $_DEBUG = $self->{debug} // $DEBUG;
159 34         200 my $dbh = $self->_dbh;
160 34 50       297 my $tid = $self->{_pid} ? $self->{_pid}[1] : TID();
161 34         141 my $t = [[-1]];
162 34   33     365 my $pid_rm = $dbh && eval {
163             $dbh->{PrintWarn} = # suppress "attempt to write ..."
164             $dbh->{PrintError} = 0; # warnings, particularly on 5.010, 5.012
165             $dbh->begin_work;
166              
167             my $z1 = _try(3, sub {
168 34     34   581 $dbh->do("DELETE FROM pids WHERE pid=$$ AND tid=$tid") } );
169              
170             if ($z1) {
171             my $sth = $dbh->prepare("SELECT COUNT(*) FROM pids");
172             my $z2 = $sth->execute;
173             $t = $sth->fetchall_arrayref;
174             } else {
175             $_DEBUG && print STDERR "$$ DESTROY: DELETE FROM pids failed\n";
176             $t = [[-2]];
177             }
178             $dbh->commit;
179             $_DEBUG and print STDERR "$$ DESTROY npids=$t->[0][0]\n";
180             1;
181             };
182 34 50       268 $dbh && eval { $dbh->disconnect };
  34         3007  
183 34 100 33     3521 if ($t && $t->[0] && $t->[0][0] == 0) {
      66        
184 22 50       119 $_DEBUG and print STDERR "$$ Unlinking files from here\n";
185 22 100       590 if (!$self->{persist}) {
186 20         20003145 sleep 1;
187 20         6805 unlink $self->{db_file};
188             }
189             } else {
190             }
191             }
192              
193             sub _status {
194             # if transactions are desired, they must be provided by the caller
195 22788     22788   34645 my $self = shift;
196 22788         45801 my $dbh = $self->_dbh;
197 22788 0 33     47914 return if !$dbh && $self->{_DESTROY};
198 22788 100       43349 if (@_ == 1) {
    50          
199 22590         108271 my $sth = $dbh->prepare("SELECT value FROM status WHERE key=?");
200 22590 0 33     1346471 if (!$sth && $self->{_DESTROY}) {
201 0         0 warn "prepare failed in global destruction: $$";
202 0         0 return;
203             }
204              
205 22590         39535 my $key = $_[0];
206 22590     22590   112448 my $z = _try( 3, sub { $sth->execute($key) } );
  22590         1272040  
207              
208 22590 50       81970 if (!$z) {
209 0         0 carp __PACKAGE__, ": lookup on status key '$_[0]' failed";
210 0         0 return;
211             }
212 22590         110700 my $t = $sth->fetchall_arrayref;
213 22590 100       55642 if (@$t == 0) {
214 22519         324875 return; # no value
215             }
216 71         1655 return $t->[0][0];
217             } elsif (@_ == 2) {
218 198         1457 my ($key,$value) = @_;
219 198         1712 my $sth1 = $dbh->prepare("DELETE FROM status WHERE key=?");
220 198         19493 my $sth2 = $dbh->prepare("INSERT INTO status VALUES(?,?)");
221              
222 198     198   13767 my $z1 = _try( 3, sub { $sth1->execute($key) } );
  198         23013  
223 198   33 198   2734 my $z2 = $z1 && _try( 5, sub { $sth2->execute($key,$value) } );
  198         1869634  
224              
225 198   33     7216 return $z1 && $z2;
226             } else {
227 0         0 croak "Forks::Queue::SQLite: wrong number of args to _status call";
228             }
229 0         0 return;
230             }
231              
232             sub end {
233 8     8 1 10472549 my $self = shift;
234 8         90 my $dbh = $self->_dbh;
235              
236 8         43 my $end = $self->_end;
237 8 50       49 if ($end) {
238 0         0 carp "Forks::Queue: end() called from $$, ",
239             "previously called from $end";
240             }
241              
242 8 50       32 if (!$end) {
243 8         66 $dbh->begin_work;
244 8         190 $self->_status("end",$$);
245 8         104927 $dbh->commit;
246             }
247 8         146 $self->_notify;
248 8         38 return;
249             }
250              
251             sub _end {
252 22550     22550   42621 my $self = shift;
253 22550   100     80311 return $self->{_end} ||= $self->_status("end");
254             # XXX - can end condition be cleared? Not yet, but when it can,
255             # this code will have to change
256             }
257              
258              
259             # MagicLimit: a tie class to allow $q->limit to work as an lvalue
260              
261             sub Forks::Queue::SQLite::MagicLimit::TIESCALAR {
262 4     4   59 my ($pkg,$obj) = @_;
263 4         46 return bless \$obj,$pkg;
264             }
265              
266             sub Forks::Queue::SQLite::MagicLimit::FETCH {
267 82 50   82   442 $XDEBUG && print STDERR "MagicLimit::FETCH => ",${$_[0]}->{limit},"\n";
  0         0  
268 82         140 return ${$_[0]}->{limit};
  82         742  
269             }
270              
271             sub Forks::Queue::SQLite::MagicLimit::STORE {
272 4     4   18 my ($tie,$val) = @_;
273 4 50       37 $XDEBUG && print STDERR "MagicLimit::STORE => $val\n";
274 4         16 my $queue = $$tie;
275 4         35 my $oldval = $queue->{limit};
276 4         14 $queue->{limit} = $val;
277              
278 4         15 my $dbh = $queue->_dbh;
279 4         45 $dbh->begin_work;
280 4         73 $queue->_status("limit",$val);
281 4         39472 $dbh->commit;
282 4         56 return $oldval;
283             }
284              
285             sub limit :lvalue {
286 36     36 1 1000502 my $self = shift;
287 36 100       412 if (!$self->{_limit_magic}) {
288 4         110 tie $self->{_limit_magic}, 'Forks::Queue::SQLite::MagicLimit', $self;
289 4 50       24 $XDEBUG && print STDERR "tied \$self->\{_limit_magic\}\n";
290             }
291 36 100       149 if (@_) {
292 10         53 $self->_dbh->begin_work;
293 10 50       179 $XDEBUG && print STDERR "setting _limit_magic to $_[0]\n";
294 10         57 $self->_status("limit", shift);
295 10 100       45 if (@_) {
296 6 50       27 $XDEBUG && print STDERR "setting on_limit to $_[0]\n";
297 6         75 $self->_status("on_limit", $self->{on_limit} = $_[0]);
298             }
299 10         39 $self->_dbh->commit;
300             } else {
301 26         182 $self->{limit} = $self->_status("limit");
302 26 50       98 $XDEBUG && print STDERR "updating {limit} to $self->{limit}\n";
303             }
304 36         233 return $self->{_limit_magic};
305             }
306              
307             sub status {
308 164     164 1 20008260 my $self = shift;
309 164         700 my $dbh = $self->_dbh;
310 164         478 my $status = {};
311 164         1685 my $sth = $dbh->prepare("SELECT key,value FROM status");
312 164         101695 my $z = $sth->execute;
313 164         5140 my $tt = $sth->fetchall_arrayref;
314 164         806 foreach my $t (@$tt) {
315 833         2806 $status->{$t->[0]} = $t->[1];
316             }
317 164         924 $status->{avail} = $self->_avail; # update {count}, {avail}
318 164   100     1290 $status->{end} //= 0;
319 164         2065 return $status;
320             }
321              
322             sub _avail {
323             # if transactions are needed, set them up in the caller
324 3747     3747   8383 my ($self,$dbh) = @_;
325 3747   33     12768 $dbh ||= $self->_dbh;
326 3747 50       7131 return unless $dbh;
327 3747         18350 my $sth = $dbh->prepare("SELECT COUNT(*) FROM the_queue");
328 3747 50       195394 return unless $sth;
329 3747         125194 my $z = $sth->execute;
330 3747         47413 my $tt = $sth->fetchall_arrayref;
331 3747         51952 return $self->{avail} = $tt->[0][0];
332             }
333              
334             sub _maintain {
335 0     0   0 my ($self) = @_;
336 0         0 return;
337             }
338              
339             sub push {
340 140     140 1 1204 my ($self,@items) = @_;
341 140         1260 $self->_push(+1,@items);
342             }
343              
344             sub enqueue {
345 2     2 1 1596 my ($self,@items) = @_;
346 2         5 my $tfactor = +1;
347 2         6 my (@deferred_items,$failed_items);
348 2         4 my $pushed = 0;
349 2   33     34 my $_DEBUG = $self->{debug} // $DEBUG;
350              
351 2 50       29 if ($self->_end) {
352             carp "Forks::Queue: put call from process $$ ",
353 0         0 "after end call from process " . $self->{_end};
354 0         0 return 0;
355             }
356              
357 2         9 my $limit = $self->{limit};
358 2 50       10 $limit = 9E9 if $self->{limit} <= 0;
359 2         9 my $dbh = $self->_dbh;
360              
361 2         53 $dbh->begin_work;
362 2         65 my $stamp = Time::HiRes::time;
363 2         27 my $id = $self->_batch_id($stamp,$dbh);
364             # For Thread::queue compatibility, enqueue puts all items on
365             # the queue without blocking if there is even one free space,
366 2 50 33     40 if (@items && $self->_avail < $limit) {
367 2         20 foreach my $item (@items) {
368 8         38 $self->_add($item, $stamp, $id++);
369 8         22 $pushed++;
370             }
371 2         10 @items = ();
372             }
373 2         16138 $dbh->commit;
374 2 50       26 if (@items > 0) {
375 0         0 @deferred_items = @items;
376 0         0 $failed_items = @deferred_items;
377             }
378 2 50       23 $self->_notify if $pushed;
379              
380 2 50       12 if ($failed_items) {
381 0 0       0 if ($self->{on_limit} eq 'fail') {
382 0         0 carp "Forks::Queue: queue buffer is full ",
383             "and $failed_items items were not added";
384             } else {
385 0 0       0 $_DEBUG && print STDERR "$$ $failed_items on enqueue. ",
386             "Waiting for capacity\n";
387 0         0 $self->_wait_for_capacity;
388 0 0       0 $_DEBUG && print STDERR "$$ got some capacity\n";
389 0         0 $pushed += $self->enqueue(@deferred_items);
390             }
391             }
392 2         14 return $pushed;
393             }
394              
395             sub unshift {
396 0     0 1 0 my ($self,@items) = @_;
397 0         0 $self->_push(-1,@items);
398             }
399              
400             sub _add {
401             # do not use transactions here!
402             # if they are needed, call begin_work/commit from the caller
403 1308     1308   2857 my ($self,$item,$timestamp,$id) = @_;
404 1308         7019 my $jitem = $jsonizer->encode($item);
405 1308         2925 my $dbh = $self->_dbh;
406 1308         4815 my $sth = $dbh->prepare("INSERT INTO the_queue VALUES(?,?,?)");
407 1308     1308   59505 my $z = _try(3, sub { $sth->execute($timestamp, $id, $jitem) } );
  1308         42146  
408 1308         13390 return $z;
409             }
410              
411             sub _push {
412 148     148   864 my ($self,$tfactor,@items) = @_;
413              
414 148         446 my (@deferred_items,$failed_items);
415 148         417 my $pushed = 0;
416 148   33     1387 my $_DEBUG = $self->{debug} // $DEBUG;
417              
418 148 100       1889 if ($self->_end) {
419             carp "Forks::Queue: put call from process $$ ",
420 2         458 "after end call from process " . $self->{_end};
421 2         197 return 0;
422             }
423              
424 146         562 my $limit = $self->{limit};
425 146 100       534 $limit = 9E9 if $self->{limit} <= 0;
426              
427 146         431 my $dbh = $self->_dbh;
428            
429              
430 146         1745 $dbh->begin_work;
431 146         3466 my $stamp = Time::HiRes::time;
432 146         732 my $id = $self->_batch_id($stamp,$dbh);
433 146   100     1454 while (@items && $self->_avail < $limit) {
434 1276         2790 my $item = shift @items;
435 1276         3788 $self->_add($item, $stamp, $id++);
436 1276         4771 $pushed++;
437             }
438 146         1522370 $dbh->commit;
439 146 100       1509 if (@items > 0) {
440 21         87 @deferred_items = @items;
441 21         54 $failed_items = @deferred_items;
442             }
443 146 100       1440 $self->_notify if $pushed;
444              
445 146 100       648 if ($failed_items) {
446 21 100       88 if ($self->{on_limit} eq 'fail') {
447 13         4164 carp "Forks::Queue: queue buffer is full ",
448             "and $failed_items items were not added";
449             } else {
450 8 50       166 $_DEBUG && print STDERR "$$ $failed_items on put. ",
451             "Waiting for capacity\n";
452 8         126 $self->_wait_for_capacity;
453 8 50       60 $_DEBUG && print STDERR "$$ got some capacity\n";
454 8         179 $pushed += $self->_push($tfactor,@deferred_items);
455             }
456             }
457 146         3120 return $pushed;
458             }
459              
460             sub _wait_for_item {
461 7     7   17 my $self = shift;
462 7         17 my $ready = 0;
463 7         16 do {
464 2222   100     9224 $ready = $self->_avail || $self->_end || $self->_expired;
465 2222 100 50     14347939 sleep($Forks::Queue::SLEEP_INTERVAL || 1) if !$ready;
466             } while !$ready;
467 7         27 return $self->{avail};
468             }
469              
470             sub _wait_for_capacity {
471 8     8   66 my $self = shift;
472 8 50       66 if ($self->{limit} <= 0) {
473 0         0 return 9E9;
474             }
475 8         45 my $ready = 0;
476 8 50       33 my $count = @_ ? shift : 1;
477 8         51 while (!$ready) {
478 16 100       230 last if $self->_avail + $count <= $self->{limit};
479 8 50       35 last if $self->_end;
480 8   50     16001230 sleep($Forks::Queue::SLEEP_INTERVAL || 1);
481             }
482 8         127 return $self->{avail} + $count <= $self->{limit};
483             }
484              
485             sub _batch_id {
486 148     148   488 my ($self,$stamp,$dbh) = @_;
487 148   33     459 $dbh ||= $self->_dbh;
488 148         943 my $sth = $dbh->prepare("SELECT MAX(batchid) FROM the_queue WHERE timestamp=?");
489 148         362554 my $z = $sth->execute($stamp);
490 148         2175 my $tt = $sth->fetchall_arrayref;
491 148 50       720 if (@$tt == 0) {
492 0         0 return 0;
493             } else {
494 148         2185 return $tt->[0][0];
495             }
496             }
497              
498             sub dequeue {
499 8     8 1 3328 my $self = shift;
500 8 50       54 Forks::Queue::_validate_input($_[0], 'count', 1) if @_;
501 3   50     13 my $count = $_[0] || 1;
502 3 50 33     13 if ($self->limit > 0 && $count > $self->limit) {
503 0         0 croak "dequeue: exceeds queue size limit";
504             }
505 3 50       17 if ($self->{style} ne 'lifo') {
506 3 50       23 return @_ ? $self->_retrieve(-1,1,2,0,$_[0])
507             : $self->_retrieve(-1,1,2,0);
508             } else {
509 0 0       0 return @_ ? $self->_retrieve(+1,1,2,0,$_[0])
510             : $self->_retrieve(+1,1,2,0);
511             }
512             }
513              
514             sub shift :method {
515 37     37 1 1000344 my $self = shift;
516             # purge, block
517 37 100       493 return @_ ? $self->_retrieve(-1,1,1,0,$_[0]) : $self->_retrieve(-1,1,1,0);
518             }
519              
520             sub pop {
521 9     9 1 2482 my $self = shift;
522 9 100       33 Forks::Queue::_validate_input($_[0], 'index', 1) if @_;
523             # purge, block
524 9   100     81 my @popped = $self->_retrieve(+1,1,1,0,$_[0] // 1);
525 9 100       80 return @_ ? reverse(@popped) : $popped[0];
526             }
527              
528             sub shift_nb {
529 1     1 1 6 my $self = shift;
530             # purge, no block
531 1 50       5 return @_ ? $self->_retrieve(-1,1,0,0,$_[0]) : $self->_retrieve(-1,1,0,0);
532             }
533              
534             sub pop_nb {
535 2     2 1 16 my $self = shift;
536             # purge, no block
537 2 50       32 my @popped = @_
538             ? $self->_retrieve(+1,1,0,0,$_[0]) : $self->_retrieve(+1,1,0,0);
539 2 50       13 return @_ ? @popped : $popped[0];
540 0         0 return @popped;
541             }
542              
543             sub extract {
544 26     26 1 12171 my $self = shift;
545 26 100       145 Forks::Queue::_validate_input( $_[0], 'index' ) if @_;
546 23   100     77 my $index = shift || 0;
547 23 100       77 Forks::Queue::_validate_input( $_[0], 'count', 1) if @_;
548 18   100     59 my $count = $_[0] // 1;
549 18         43 my $reverse = 0;
550              
551 18         26 my $tfactor = -1;
552 18 100       67 if ($self->{style} eq 'lifo') {
553 9         11 $tfactor = 1;
554 9         14 $reverse = 1;
555             }
556 18 50       41 if ($count <= 0) {
557 0         0 carp "Forks::Queue::extract: count must be positive";
558 0         0 return;
559             }
560 18 100       56 if ($index < 0) {
561 8 50       31 if ($index + $count > 0) {
562 0         0 $count = -$index;
563             }
564 8         19 $index = -$index - 1;
565 8         13 $index -= $count - 1;
566              
567 8         18 $tfactor *= -1;
568 8         15 $reverse = !$reverse;
569             }
570             # purge, no block
571 18         72 my @items = $self->_retrieve( $tfactor, 1, 0, $index, $index+$count);
572 18 100       63 if ($reverse) {
573 9         24 @items = reverse(@items);
574             }
575 18 100 66     146 return @_ ? @items : $items[0] // ();
576             }
577              
578             sub insert {
579 14     14 1 2645 my ($self, $pos, @items) = @_;
580 14         59 Forks::Queue::_validate_input($pos,'index');
581 10         16 my (@deferred_items);
582 10   33     47 my $_DEBUG = $self->{debug} // $DEBUG;
583 10         19 my $inserted = 0;
584 10 50       30 if ($self->_end) {
585             carp "Forks::Queue: insert call from process $$ ",
586 0         0 "after end call from process " . $self->{_end} . "\n";
587 0         0 return 0;
588             }
589              
590 10         25 my $limit = $self->{limit};
591 10 50       28 $limit = 9E9 if $self->{limit} <= 0;
592              
593 10 100       31 if ($pos >= $self->_avail) {
594 2 50       14 if ($self->{on_limit} eq 'tq-compat') {
595 0         0 my $limit = $self->{limit};
596 0         0 $self->{limit} = 0;
597 0         0 my $enq = $self->enqueue(@items);
598 0         0 $self->{limit} = $limit;
599 0         0 return $enq;
600             } else {
601 2         44 return $self->put(@items);
602             }
603             }
604 8 100       28 if ($pos <= -$self->_avail) {
605             #return $self->unshift(@items);
606 2         9 $pos = 0;
607             }
608 8 100       29 if ($pos < 0) {
609 2         15 $pos += $self->_avail;
610             }
611              
612             # find timestamps for items $pos and $pos+1
613             # choose 0+@items intermediate timestamps
614             # if $pos+1 is undef, use current time as timestamp
615             # as in the _push function, add items
616 8         22 my $dbh = $self->_dbh;
617 8         42 my $sths = $dbh->prepare(
618             "SELECT timestamp,batchid FROM the_queue ORDER BY timestamp,batchid LIMIT ?");
619 8         587 $dbh->begin_work;
620 8         947 my $z = $sths->execute($pos+1);
621 8         143 my $tt = $sths->fetchall_arrayref;
622 8         42 $DB::single = 1;
623 8         21 my ($t1,$t2,$b1,$b2);
624 8 50       28 if (@$tt > 0) {
625 8         22 $t2 = $tt->[-1][0];
626 8         15 $b2 = $tt->[-1][1];
627             } else {
628 0         0 $t2 = Time::HiRes::time();
629 0         0 $b2 = 0;
630             }
631 8 50       30 if (@$tt == $pos) {
    100          
632 0         0 $t1 = $t2;
633 0         0 $b1 = $b2;
634 0         0 $b2 = 0;
635 0 0       0 if ($t2 < 0) {
636 0         0 $t2 = -Time::HiRes::time();
637             } else {
638 0         0 $t2 = Time::HiRes::time();
639             }
640             } elsif ($pos == 0) {
641 2         9 $t1 = $t2 - 100000;
642 2         7 $b1 = 0;
643             } else {
644 6         15 $t1 = $tt->[-2][0];
645 6         10 $b1 = $tt->[-2][1];
646             }
647              
648 8         18 my ($t3,$b3);
649 8 100       21 if ($t1 == $t2) {
650 6         34 my $sthr = $dbh->prepare("UPDATE the_queue SET batchid=batchid+?
651             WHERE timestamp=? AND batchid>=?");
652 6         1275 $sthr->execute(0+@items,$t1,$b2);
653 6         30 $t3 = $t1;
654 6         79 $b3 = $b1+1;
655             } else {
656 2         11 $t3 = ($t1 + $t2) / 2;
657 2         5 $b3 = 0;
658 2 50       11 if ($t3 == $t1) {
659 0         0 $b3 = $b1+1;
660             }
661             }
662 8 50       44 if ($self->{on_limit} eq "tq-compat") {
663 0         0 for my $item (@items) {
664 0         0 $self->_add($item,$t3,$b3);
665 0         0 $inserted++;
666 0         0 $b3++;
667             }
668 0         0 @items = ();
669             } else {
670 8   100     60 while (@items && $self->_avail < $limit) {
671 24         105 my $item = shift @items;
672 24     24   139 _try(3, sub { $self->_add($item,$t3,$b3) });
  24         64  
673 24         66 $inserted++;
674 24         94 $b3++;
675             }
676             }
677 8         63439 $dbh->commit;
678 8 100       73 if (@items > 0) {
679 2         14 @deferred_items = @items;
680             }
681 8 100       34 if (@deferred_items) {
682 2 50       14 if ($self->{on_limit} eq 'fail') {
683 2         919 carp "Forks::Queue: queue buffer is full and ",
684             0+@deferred_items," items were not inserted";
685             } else {
686 0 0       0 $_DEBUG && print STDERR "$$ ",0+@deferred_items, " on insert. ",
687             "Waiting for capacity\n";
688 0         0 $self->_wait_for_capacity;
689 0 0       0 $_DEBUG && print STDERR "$$ got some capacity\n";
690 0         0 $inserted += $self->insert($pos+$inserted,@deferred_items);
691             }
692             }
693 8 50       358 $self->_notify if $inserted;
694 8         102 return $inserted;
695             }
696              
697             sub _retrieve {
698 117     117   229 my $self = shift;
699 117         224 my $tfactor = shift;
700             # tfactor = -1: select newest items first
701             # tfactor = +1: select oldest items first
702 117         209 my $purge = shift;
703             # purge = 0: do not delete items that we retrieve
704             # purge = 1: delete items that we retrieve
705 117         185 my $block = shift;
706             # block = 0: no block if queue is empty
707             # block = 1: block only if queue is empty
708             # block = 2: block if full request can not be fulfilled
709 117         202 my $lo = shift;
710 117 100       436 my $hi = @_ ? $_[0] : $lo+1;
711 117 50       324 return if $hi <= $lo;
712              
713             # attempt to retrieve items $lo .. $hi and return them
714             # retrieved items are removed from the queue if $purge is set
715             # get newest items first if $tfactor > 0, oldest first if $tfactor < 0
716             # only block while
717             # $block is set
718             # zero items have been found
719              
720 117 50 66     466 if ($lo > 0 && $block) {
721 0         0 carp __PACKAGE__, ": _retrieve() didn't expect block=$block and lo=$lo";
722 0         0 $block = 0;
723             }
724              
725 117 100       462 my $order = $tfactor > 0
726             ? "timestamp DESC,batchid DESC" : "timestamp,batchid";
727 117         378 my $dbh = $self->_dbh;
728 117         1131 my $sths = $dbh->prepare(
729             "SELECT item,batchid,timestamp FROM the_queue
730             ORDER BY $order LIMIT ?");
731 117   66     10530 my $sthd = $purge && $dbh->prepare(
732             "DELETE FROM the_queue WHERE item=? AND timestamp=? AND batchid=?");
733 117         4610 my @return;
734 117 50       342 if (!$sths) {
735 0         0 warn "prepare queue SELECT statement failed: $dbh->errstr";
736             }
737              
738 117         387 while (@return <= 0) {
739 20228 50       54630 my $limit = $hi - @return + ($lo < 0 ? $lo : 0);
740 20228         84982 $dbh->begin_work;
741 20228   33     1650691 my $z = $sths && $sths->execute($limit);
742 20228   33     326891 my $tt = $sths && $sths->fetchall_arrayref;
743 20228 50 33     59302 if ($lo < 0 && -$lo > @$tt) {
744 0         0 $hi += (@$tt - $lo);
745 0         0 $lo += (@$tt - $lo);
746             }
747 20228 100 66     135694 if (!$tt || @$tt == 0) {
    100 66        
    100 100        
748 14         453 $dbh->rollback;
749 14 100       78 if ($block) {
750 7         59 $self->_wait_for_item;
751 7         34 next;
752             } else {
753 7         160 return;
754             }
755             } elsif ($block > 1 && $lo == 0 && @$tt < $hi) {
756             # not enough items on queue to satisfy request
757 20104         475422 $dbh->rollback;
758 20104         93956 next;
759             } elsif (@$tt <= $lo) {
760             # not enough items on queue to satisfy request
761 6         178 $dbh->rollback;
762 6         154 return;
763             }
764 104 100       279 $hi = @$tt if $hi > @$tt;
765              
766 104         563 foreach my $itt ($lo .. $hi-1) {
767 443 50       1073 if (!defined($tt->[$itt])) {
768 0         0 warn "\nResult $itt from $lo .. $hi-1 is undefined!";
769             }
770 443         601 my ($item,$bid,$timestamp) = @{$tt->[$itt]};
  443         1042  
771 443         2437 CORE::push @return, $jsonizer->decode($item);
772 443 100       972 if ($purge) {
773              
774 399     399   1820 my $zd = _try(4, sub { $sthd->execute($item,$timestamp,$bid)} );
  399         16127  
775 399 50       1534 if (!$zd) {
776 0         0 warn "Forks::Queue::SQLite: ",
777             "purge failed: $item,$timestamp,$bid";
778             }
779             }
780             }
781 104         659687 $dbh->commit;
782             } continue {
783 20215 100       48131 if ($block) {
784 20156 100 100     56543 if ($self->_end || $self->_expired) {
785 29         159 $block = 0;
786             }
787             }
788             }
789 104 100 33     2633 return @_ ? @return : $return[0] // ();
790             }
791              
792              
793              
794             sub _pop {
795 0     0   0 my $self = shift;
796 0         0 my $tfactor = shift;
797 0         0 my $purge = shift;
798 0         0 my $block = shift;
799 0         0 my $wantarray = shift;
800 0         0 my ($count) = @_;
801 0   0     0 $count ||= 1;
802              
803 0         0 my $order = "timestamp,batchid";
804 0 0       0 if ($tfactor > 0) {
805 0         0 $order = "timestamp DESC,batchid DESC";
806             }
807 0         0 my $dbh = $self->_dbh;
808 0         0 my $sths = $dbh->prepare(
809             "SELECT item,timestamp,pid FROM the_queue ORDER BY $order LIMIT ?");
810 0         0 my $sthd = $dbh->prepare(
811             "DELETE FROM the_queue WHERE item=? AND timestamp=? AND pid=?");
812 0         0 my @return = ();
813 0         0 while (@return == 0) {
814 0         0 my $limit = $count - @return;
815 0         0 my $z = $sths->execute($limit);
816 0         0 my $tt = $sths->fetchall_arrayref;
817 0 0       0 if (@$tt == 0) {
818 0 0 0     0 if ($block && $self->_wait_for_item) {
819 0         0 next;
820             } else {
821 0         0 last;
822             }
823             }
824 0         0 foreach my $t (@$tt) {
825 0         0 my ($item,$bid,$timestamp) = @$t;
826 0         0 CORE::push @return, $jsonizer->decode($item);
827 0 0       0 if ($purge) {
828 0         0 $dbh->begin_work;
829 0         0 my $zd = $sthd->execute($item,$timestamp,$bid);
830 0 0       0 if (!$zd) {
831 0         0 carp "purge failed: $item,$timestamp,$bid\n";
832             }
833 0         0 $dbh->commit;
834             }
835             }
836             }
837 0 0       0 return $wantarray ? @return : $return[0];
838             }
839              
840             sub clear {
841 16     16 1 4197 my $self = shift;
842 16         62 my $dbh = $self->_dbh;
843 16         127 $dbh->begin_work;
844 16         363 $dbh->do("DELETE FROM the_queue");
845 16         151849 $dbh->commit;
846             }
847              
848             sub peek_front {
849 35     35 0 624 my $self = shift;
850 35         64 my ($index) = @_;
851 35   100     135 $index ||= 0;
852 35 100       76 if ($index < 0) {
853 10         30 return $self->peek_back(-$index - 1);
854             }
855             # no purge, no block, always retrieve a single item
856 25         83 return $self->_retrieve(-1,0,0,$index);
857             }
858              
859             sub peek_back {
860 27     27 0 56 my $self = shift;
861 27         50 my ($index) = @_;
862 27   100     103 $index ||= 0;
863 27 100       66 if ($index < 0) {
864 5         14 return $self->peek_front(-$index - 1);
865             }
866             # no purge, no block, always retrieve a single item
867 22         54 return $self->_retrieve(+1,0,0,$index);
868             }
869              
870             sub _notify {
871 161 50   161   684 return unless $Forks::Queue::NOTIFY_OK;
872              
873 161         405 my $self = shift;
874 161         710 my $dbh = $self->_dbh;
875 161         1662 my $sth = $dbh->prepare("SELECT pid,tid FROM pids");
876 161         26572 my $z = $sth->execute;
877 161         4270 my $pt = $sth->fetchall_arrayref;
878 161         914 my @pids = map { $_->[0] } grep { $_->[0] != $$ } @$pt;
  91         396  
  252         1342  
879 161 100       686 if (@pids) {
880 76 50 33     751 ($self->{debug} // $DEBUG) && print STDERR "$$ notify: @pids\n";
881 76         4105 kill 'IO', @pids;
882             }
883 161 100       583 my @tids = map { $_->[1] } grep { $_->[0] == $$ && $_->[1] != TID() } @$pt;
  0         0  
  252         1627  
884 161 50       3322 if (@tids) {
885 0         0 foreach my $tid (@tids) {
886 0         0 my $thr = threads->object($tid);
887 0 0       0 $thr && $thr->kill('IO');
888             }
889             }
890             }
891              
892             my $id = 0;
893             sub _impute_file {
894 17     17   154 my $base = $0;
895 17         172 $base =~ s{.*[/\\](.)}{$1};
896 17         147 $base =~ s{[/\\]$}{};
897 17         64 $id++;
898 17         43 my @candidates;
899 17 50       252 if ($^O eq 'MSWin32') {
900 0         0 @candidates = (qw(C:/Temp C:/Windows/Temp));
901             } else {
902 17         137 @candidates = qw(/tmp /var/tmp);
903             }
904 17         204 for my $candidate ($ENV{FORKS_QUEUE_DIR},
905             $ENV{TMPDIR}, $ENV{TEMP},
906             $ENV{TMP}, @candidates,
907             $ENV{HOME}, ".") {
908 85 50 66     999 if (defined($candidate) && $candidate ne '' &&
      66        
      33        
      33        
909             -d $candidate && -w _ && -x _) {
910 17         235 return $candidate . "/fq-$$-$id-$base.sql3";
911             }
912             }
913 0           my $file = "./fq-$$-$id-$base.sql3";
914 0           carp __PACKAGE__, ": queue db file $file might not be a good location!";
915 0           return $file;
916             }
917              
918             sub _DUMP {
919 0     0     my ($self,$fh_dump) = @_;
920 0           my $dbh = $self->_dbh;
921 0   0       $fh_dump ||= *STDERR;
922              
923 0           my $sth = $dbh->prepare("SELECT * FROM pids");
924 0           my $z = $sth->execute;
925 0           print {$fh_dump} "\n\n=== pids ===\n------------\n";
  0            
926 0           foreach my $r (@{$sth->fetchall_arrayref}) {
  0            
927 0           print {$fh_dump} join("\t",@$r),"\n";
  0            
928             }
929              
930 0           $sth = $dbh->prepare("SELECT * FROM status");
931 0           $z = $sth->execute;
932 0           print {$fh_dump} "\n\n=== status ===\n--------------\n";
  0            
933 0           foreach my $r (@{$sth->fetchall_arrayref}) {
  0            
934 0           print {$fh_dump} join("\t",@$r),"\n";
  0            
935             }
936              
937 0           $sth = $dbh->prepare("SELECT * FROM the_queue");
938 0           $z = $sth->execute;
939 0           print {$fh_dump} "\n\n=== queue ===\n-------------\n";
  0            
940 0           foreach my $r (@{$sth->fetchall_arrayref}) {
  0            
941 0           print {$fh_dump} join("\t",@$r),"\n";
  0            
942             }
943 0           print {$fh_dump} "\n\n";
  0            
944             }
945              
946             1;
947              
948             =head1 NAME
949              
950             Forks::Queue::SQLite - SQLite-based implementation of Forks::Queue
951              
952             =head1 VERSION
953              
954             0.14
955              
956             =head1 SYNOPSIS
957              
958             my $q = Forks::Queue->new( impl => 'SQLite', db_file => "queue-file" );
959             $q->put( "job1" );
960             $q->put( { name => "job2", task => "do something", data => [42,19] } );
961             ...
962             $q->end;
963             for my $w (1 .. $num_workers) {
964             if (fork() == 0) {
965             my $task;
966             while (defined($task = $q->get)) {
967             ... perform task in child ...
968             }
969             exit;
970             }
971             }
972              
973             =head1 DESCRIPTION
974              
975             SQLite-based implementation of L.
976             It requires the C libraries and the L
977             Perl module.
978              
979             =head1 METHODS
980              
981             See L for an overview of the methods supported by
982             this C implementation.
983              
984             =head2 new
985              
986             =head2 $queue = Forks::Queue::SQLite->new( %opts )
987              
988             =head2 $queue = Forks::Queue->new( impl => 'SQLite', %opts )
989              
990             The C constructor recognized the following
991             configuration options.
992              
993             =over 4
994              
995             =item * db_file
996              
997             The name of the file to use to store queue data and metadata.
998             If omitted, a temporary filename is chosen.
999              
1000             =item * style
1001              
1002             =item * limit
1003              
1004             =item * on_limit
1005              
1006             =item * join
1007              
1008             =item * persist
1009              
1010             See L for descriptions of these options.
1011              
1012             =back
1013              
1014             =head1 LICENSE AND COPYRIGHT
1015              
1016             Copyright (c) 2017-2019, Marty O'Brien.
1017              
1018             This library is free software; you can redistribute it and/or modify
1019             it under the same terms as Perl itself, either Perl version 5.10.1 or,
1020             at your option, any later version of Perl 5 you may have available.
1021              
1022             See http://dev.perl.org/licenses/ for more information.
1023              
1024             =cut