File Coverage

blib/lib/Forks/Queue/SQLite.pm
Criterion Covered Total %
statement 428 554 77.2
branch 160 246 65.0
condition 85 147 57.8
subroutine 50 54 92.5
pod 14 17 82.3
total 737 1018 72.4


line stmt bran cond sub pod time code
1             package Forks::Queue::SQLite;
2 61     61   1988458 use strict;
  61         136  
  61         1593  
3 61     61   281 use warnings;
  61         155  
  61         1553  
4 61     61   279 use Carp;
  61         105  
  61         2800  
5 61     61   10751 use JSON;
  61         141623  
  61         619  
6 61     61   6556 use DBI;
  61         129  
  61         2151  
7 61     61   302 use DBD::SQLite;
  61         108  
  61         1350  
8 61     61   264 use Time::HiRes 'time';
  61         115  
  61         403  
9 61     61   8143 use base 'Forks::Queue';
  61         124  
  61         12016  
10 61     61   993 use 5.010; # implementation contains // //= operators
  61         186  
11              
12             our $VERSION = '0.13';
13             our ($DEBUG,$XDEBUG);
14             *DEBUG = \$Forks::Queue::DEBUG;
15             *XDEBUG = \$Forks::Queue::XDEBUG;
16              
17             $SIG{IO} = sub { } if $Forks::Queue::NOTIFY_OK;
18              
19             our $jsonizer = JSON->new->allow_nonref(1)->ascii(1);
20              
21             sub new {
22 38     38 1 4549 my $class = shift;
23 38         346 my %opts = (%Forks::Queue::OPTS, @_);
24              
25 38 50 66     343 if ($opts{join} && !$opts{db_file}) {
26 0         0 croak "Forks::Queue::SQLite: db_file opt required with join";
27             }
28 38 50 66     277 if ($opts{file} && !$opts{db_file}) {
29 0         0 carp "file => passed to Forks::Queue::SQLite constructor! ",
30             "You probably meant db_file => ... !";
31             }
32 38   66     549 $opts{db_file} //= _impute_file(); # $opts{file} = $opts{db_file};
33 38   50     199 $opts{limit} //= -1;
34 38   50     157 $opts{on_limit} //= 'fail';
35 38   50     634 $opts{style} //= 'fifo';
36 38         127 my $list = delete $opts{list};
37              
38 38 100 100     1486 if (!$opts{join} && -f $opts{db_file}) {
39 2         958 carp "Forks::Queue: sqlite db file $opts{db_file} already exists!";
40             }
41              
42 38   66     213 my $exists = $opts{join} && -f $opts{db_file};
43 38         262 $opts{_pid} = [ $$, TID() ];
44             # process id is tied to database handle. If process id doesn't match
45             # $self->{_pid}, we must open a new process id.
46              
47 38         421 my $self = bless { %opts }, $class;
48              
49 38 100       187 if (!$exists) {
50             my $dbh = DBI->connect("dbi:SQLite:dbname=" . $opts{db_file},
51 36         907 "", "");
52 36         104258 $self->{_dbh} = $opts{_dbh} = $dbh;
53 36 100       90 if (!eval { $self->_init }) {
  36         191  
54 2         346 carp __PACKAGE__, ": db initialization failed";
55 2         34 return;
56             }
57             } else {
58 2         49 $self->_dbh;
59             }
60 36 100       170 if (defined($list)) {
61 5 50       25 if (ref($list) eq 'ARRAY') {
62 5         62 $self->push( @$list );
63             } else {
64 0         0 carp "Forks::Queue::new: 'list' option must be an array ref";
65             }
66             }
67              
68 36         550 return $self;
69             }
70              
71             # wrapper for database operations I expect to succeed, but may fail with
72             # intermittent synchronization issues ("attempt to write to a readonly
73             # database...") on perl v5.10 and v5.12. Pausing and retrying the operation
74             # generally fixes these issues.
75             sub _try {
76 29462     29462   52905 my ($count, $code) = @_;
77 29462 50       56773 $count = 1 if $] >= 5.014;
78 29462         41208 my $z = $code->();
79 29462         116772 my ($f0,$f1) = (1,1);
80 29462         62640 while (!$z) {
81 0 0       0 last if --$count <= 0;
82 0         0 ($f0,$f1)=($f1,$f0+$f1);
83 0         0 my (undef,undef,$lcaller) = caller(0);
84 0 0       0 $DEBUG && print STDERR "retry after ${f0}s: $lcaller\a\n";
85 0         0 sleep $f0;
86 0         0 $z = $code->();
87             }
88 29462         57606 return $z;
89             }
90              
91             sub _init {
92 36     36   122 my $self = shift;
93 36         106 my $dbh = $self->{_dbh};
94              
95 36         547 my $z1 = $dbh->do("CREATE TABLE the_queue (
96             timestamp decimal(27,15), batchid mediumint,
97             item text)");
98 36 100       1541518 if (!$z1) {
99 2         416 carp __PACKAGE__, ": error creating init table";
100 2         24 return;
101             }
102              
103 34         622 my $z2 = $dbh->do("CREATE TABLE pids (pid mediumint,tid mediumint)");
104 34 50       1830570 if (!$z2) {
105 0         0 carp __PACKAGE__, ": error creating init table";
106 0         0 return;
107             }
108              
109 34         2023 my $sth = $dbh->prepare("INSERT INTO pids VALUES (?,?)");
110 34         7588 my $z3 = $sth->execute(@{$self->{_pid}});
  34         924278  
111 34 50       513 if (!$z3) {
112 0         0 carp __PACKAGE__, ": error adding process id to tracker";
113 0         0 return;
114             }
115              
116 34         731 my $z4 = $dbh->do("CREATE TABLE status(key text,value text)");
117 34 50       633614 if (!$z4) {
118 0         0 carp __PACKAGE__, ": error creating init table";
119 0         0 return;
120             }
121              
122 34         571 $self->_status("db_file", $self->{db_file});
123 34         119 $self->_status("owner", "@{$self->{_pid}}");
  34         540  
124 34         374 $self->_status("style", $self->{style});
125 34         290 $self->_status("limit", $self->{limit});
126 34         259 $self->_status("on_limit", $self->{on_limit});
127 34         505 return 1;
128             }
129              
130 33686 50   33686 0 74320 sub TID { $INC{'threads.pm'} ? threads->tid : 0 }
131              
132             sub _dbh {
133 33487     33487   42165 my $self = shift;
134 33487         50563 my $tid = TID();
135 33487 100 100     179099 if ($self->{_dbh} && $$ == $self->{_pid}[0] && $tid == $self->{_pid}[1]) {
      66        
136 33473         160440 return $self->{_dbh};
137             }
138              
139 14         404 $self->{_pid} = [$$,$tid];
140             $self->{_dbh} =
141 14         572 DBI->connect("dbi:SQLite:dbname=".$self->{db_file},"","");
142 14         18059 $self->{_dbh}{AutoCommit} = 1;
143 14 100       156 if (!$self->{_DESTROY}) {
144 12         434 $self->{_dbh}->begin_work;
145 12         551 $self->{_dbh}->do("DELETE FROM pids WHERE pid=$$ AND tid=$tid");
146 12         1667598 $self->{_dbh}->do("INSERT INTO pids VALUES ($$,$tid)");
147 12         265496 $self->{_dbh}->commit;
148 12         332 $self->{style} = $self->_status("style");
149 12         66 $self->{limit} = $self->_status("limit");
150 12         69 $self->{on_limit} = $self->_status("on_limit");
151             }
152 14         126 return $self->{_dbh};
153             }
154              
155             sub DESTROY {
156 35     35   7562139 my $self = shift;
157 35         231 $self->{_DESTROY}++;
158 35   33     422 my $_DEBUG = $self->{debug} // $DEBUG;
159 35         307 my $dbh = $self->_dbh;
160 35 50       181 my $tid = $self->{_pid} ? $self->{_pid}[1] : TID();
161 35         269 my $t = [[-1]];
162 35   33     312 my $pid_rm = $dbh && eval {
163             $dbh->{PrintWarn} = # suppress "attempt to write ..."
164             $dbh->{PrintError} = 0; # warnings, particularly on 5.010, 5.012
165             $dbh->begin_work;
166              
167             my $z1 = _try(3, sub {
168 35     35   510 $dbh->do("DELETE FROM pids WHERE pid=$$ AND tid=$tid") } );
169              
170             if ($z1) {
171             my $sth = $dbh->prepare("SELECT COUNT(*) FROM pids");
172             my $z2 = $sth->execute;
173             $t = $sth->fetchall_arrayref;
174             } else {
175             $_DEBUG && print STDERR "$$ DESTROY: DELETE FROM pids failed\n";
176             $t = [[-2]];
177             }
178             $dbh->commit;
179             $_DEBUG and print STDERR "$$ DESTROY npids=$t->[0][0]\n";
180             1;
181             };
182 35 50       199 $dbh && eval { $dbh->disconnect };
  35         3583  
183 35 100 33     3438 if ($t && $t->[0] && $t->[0][0] == 0) {
      66        
184 22 50       110 $_DEBUG and print STDERR "$$ Unlinking files from here\n";
185 22 100       498 if (!$self->{persist}) {
186 20         20002434 sleep 1;
187 20         117928 unlink $self->{db_file};
188             }
189             } else {
190             }
191             }
192              
193             sub _status {
194             # if transactions are desired, they must be provided by the caller
195 27498     27498   35294 my $self = shift;
196 27498         55899 my $dbh = $self->_dbh;
197 27498 0 33     52185 return if !$dbh && $self->{_DESTROY};
198 27498 100       45277 if (@_ == 1) {
    50          
199 27300         100500 my $sth = $dbh->prepare("SELECT value FROM status WHERE key=?");
200 27300 0 33     1338256 if (!$sth && $self->{_DESTROY}) {
201 0         0 warn "prepare failed in global destruction: $$";
202 0         0 return;
203             }
204              
205 27300         40903 my $key = $_[0];
206 27300     27300   109492 my $z = _try( 3, sub { $sth->execute($key) } );
  27300         1273075  
207              
208 27300 50       83692 if (!$z) {
209 0         0 carp __PACKAGE__, ": lookup on status key '$_[0]' failed";
210 0         0 return;
211             }
212 27300         117913 my $t = $sth->fetchall_arrayref;
213 27300 100       57084 if (@$t == 0) {
214 27232         330664 return; # no value
215             }
216 68         1508 return $t->[0][0];
217             } elsif (@_ == 2) {
218 198         835 my ($key,$value) = @_;
219 198         1821 my $sth1 = $dbh->prepare("DELETE FROM status WHERE key=?");
220 198         19119 my $sth2 = $dbh->prepare("INSERT INTO status VALUES(?,?)");
221              
222 198     198   25293 my $z1 = _try( 3, sub { $sth1->execute($key) } );
  198         20236  
223 198   33 198   3790 my $z2 = $z1 && _try( 5, sub { $sth2->execute($key,$value) } );
  198         3093448  
224              
225 198   33     6660 return $z1 && $z2;
226             } else {
227 0         0 croak "Forks::Queue::SQLite: wrong number of args to _status call";
228             }
229 0         0 return;
230             }
231              
232             sub end {
233 8     8 1 10165524 my $self = shift;
234 8         61 my $dbh = $self->_dbh;
235              
236 8         64 my $end = $self->_end;
237 8 50       47 if ($end) {
238 0         0 carp "Forks::Queue: end() called from $$, ",
239             "previously called from $end";
240             }
241              
242 8 50       48 if (!$end) {
243 8         69 $dbh->begin_work;
244 8         247 $self->_status("end",$$);
245 8         205692 $dbh->commit;
246             }
247 8         124 $self->_notify;
248 8         36 return;
249             }
250              
251             sub _end {
252 27263     27263   41512 my $self = shift;
253 27263   100     78448 return $self->{_end} ||= $self->_status("end");
254             # XXX - can end condition be cleared? Not yet, but when it can,
255             # this code will have to change
256             }
257              
258              
259             # MagicLimit: a tie class to allow $q->limit to work as an lvalue
260              
261             sub Forks::Queue::SQLite::MagicLimit::TIESCALAR {
262 3     3   18 my ($pkg,$obj) = @_;
263 3         24 return bless \$obj,$pkg;
264             }
265              
266             sub Forks::Queue::SQLite::MagicLimit::FETCH {
267 75 50   75   356 $XDEBUG && print STDERR "MagicLimit::FETCH => ",${$_[0]}->{limit},"\n";
  0         0  
268 75         99 return ${$_[0]}->{limit};
  75         619  
269             }
270              
271             sub Forks::Queue::SQLite::MagicLimit::STORE {
272 4     4   31 my ($tie,$val) = @_;
273 4 50       30 $XDEBUG && print STDERR "MagicLimit::STORE => $val\n";
274 4         15 my $queue = $$tie;
275 4         23 my $oldval = $queue->{limit};
276 4         11 $queue->{limit} = $val;
277              
278 4         21 my $dbh = $queue->_dbh;
279 4         43 $dbh->begin_work;
280 4         94 $queue->_status("limit",$val);
281 4         72159 $dbh->commit;
282 4         55 return $oldval;
283             }
284              
285             sub limit :lvalue {
286 33     33 1 1000378 my $self = shift;
287 33 100       318 if (!$self->{_limit_magic}) {
288 3         57 tie $self->{_limit_magic}, 'Forks::Queue::SQLite::MagicLimit', $self;
289 3 50       15 $XDEBUG && print STDERR "tied \$self->\{_limit_magic\}\n";
290             }
291 33 100       125 if (@_) {
292 10         42 $self->_dbh->begin_work;
293 10 50       209 $XDEBUG && print STDERR "setting _limit_magic to $_[0]\n";
294 10         46 $self->_status("limit", shift);
295 10 100       33 if (@_) {
296 6 50       18 $XDEBUG && print STDERR "setting on_limit to $_[0]\n";
297 6         21 $self->_status("on_limit", $self->{on_limit} = $_[0]);
298             }
299 10         50 $self->_dbh->commit;
300             } else {
301 23         122 $self->{limit} = $self->_status("limit");
302 23 50       73 $XDEBUG && print STDERR "updating {limit} to $self->{limit}\n";
303             }
304 33         210 return $self->{_limit_magic};
305             }
306              
307             sub status {
308 161     161 1 20005979 my $self = shift;
309 161         762 my $dbh = $self->_dbh;
310 161         467 my $status = {};
311 161         1936 my $sth = $dbh->prepare("SELECT key,value FROM status");
312 161         444782 my $z = $sth->execute;
313 161         4750 my $tt = $sth->fetchall_arrayref;
314 161         1734 foreach my $t (@$tt) {
315 818         2430 $status->{$t->[0]} = $t->[1];
316             }
317 161         712 $status->{avail} = $self->_avail; # update {count}, {avail}
318 161   100     1233 $status->{end} //= 0;
319 161         1836 return $status;
320             }
321              
322             sub _avail {
323             # if transactions are needed, set them up in the caller
324 4007     4007   8063 my ($self,$dbh) = @_;
325 4007   33     13045 $dbh ||= $self->_dbh;
326 4007 50       6950 return unless $dbh;
327 4007         16907 my $sth = $dbh->prepare("SELECT COUNT(*) FROM the_queue");
328 4007 50       197316 return unless $sth;
329 4007         133857 my $z = $sth->execute;
330 4007         50791 my $tt = $sth->fetchall_arrayref;
331 4007         51322 return $self->{avail} = $tt->[0][0];
332             }
333              
334             sub _maintain {
335 0     0   0 my ($self) = @_;
336 0         0 return;
337             }
338              
339             sub push {
340 142     142 1 876 my ($self,@items) = @_;
341 142         920 $self->_push(+1,@items);
342             }
343              
344             sub unshift {
345 0     0 1 0 my ($self,@items) = @_;
346 0         0 $self->_push(-1,@items);
347             }
348              
349             sub _add {
350             # do not use transactions here!
351             # if they are needed, call begin_work/commit from the caller
352 1308     1308   3602 my ($self,$item,$timestamp,$id) = @_;
353 1308         6393 my $jitem = $jsonizer->encode($item);
354 1308         2764 my $dbh = $self->_dbh;
355 1308         4352 my $sth = $dbh->prepare("INSERT INTO the_queue VALUES(?,?,?)");
356              
357 1308     1308   56917 my $z = _try(3, sub { $sth->execute($timestamp, $id, $jitem) } );
  1308         40829  
358            
359 1308         11646 return $z;
360             }
361              
362             sub _push {
363 150     150   741 my ($self,$tfactor,@items) = @_;
364              
365 150         359 my (@deferred_items,$failed_items);
366 150         360 my $pushed = 0;
367 150   33     1273 my $_DEBUG = $self->{debug} // $DEBUG;
368              
369 150 100       767 if ($self->_end) {
370             carp "Forks::Queue: put call from process $$ ",
371 2         557 "after end call from process " . $self->{_end};
372 2         1523 return 0;
373             }
374              
375 148         512 my $limit = $self->{limit};
376 148 100       738 $limit = 9E9 if $self->{limit} <= 0;
377              
378 148         492 my $dbh = $self->_dbh;
379            
380              
381 148         1608 $dbh->begin_work;
382 148         6037 my $stamp = Time::HiRes::time;
383 148         790 my $id = $self->_batch_id($stamp,$dbh);
384 148   100     1860 while (@items && $self->_avail < $limit) {
385 1284         2510 my $item = shift @items;
386 1284         3326 $self->_add($item, $stamp, $id++);
387 1284         4157 $pushed++;
388             }
389 148         3166408 $dbh->commit;
390 148 100       2047 if (@items > 0) {
391 21         88 @deferred_items = @items;
392 21         48 $failed_items = @deferred_items;
393             }
394 148 100       1642 $self->_notify if $pushed;
395              
396 148 100       467 if ($failed_items) {
397 21 100       86 if ($self->{on_limit} eq 'fail') {
398 13         4249 carp "Forks::Queue: queue buffer is full ",
399             "and $failed_items items were not added";
400             } else {
401 8 50       102 $_DEBUG && print STDERR "$$ $failed_items on put. ",
402             "Waiting for capacity\n";
403 8         92 $self->_wait_for_capacity;
404 8 50       38 $_DEBUG && print STDERR "$$ got some capacity\n";
405 8         136 $pushed += $self->_push($tfactor,@deferred_items);
406             }
407             }
408 148         5367 return $pushed;
409             }
410              
411             sub _wait_for_item {
412 7     7   19 my $self = shift;
413 7         15 my $ready = 0;
414 7         19 do {
415 2479   100     9094 $ready = $self->_avail || $self->_end || $self->_expired;
416 2479 100 50     14482316 sleep($Forks::Queue::SLEEP_INTERVAL || 1) if !$ready;
417             } while !$ready;
418 7         26 return $self->{avail};
419             }
420              
421             sub _wait_for_capacity {
422 8     8   42 my $self = shift;
423 8 50       46 if ($self->{limit} <= 0) {
424 0         0 return 9E9;
425             }
426 8         33 my $ready = 0;
427 8         71 while (!$ready) {
428 16 100       133 last if $self->_avail < $self->{limit};
429 8 50       74 last if $self->_end;
430 8   50     16001274 sleep($Forks::Queue::SLEEP_INTERVAL || 1);
431             }
432 8         46 return $self->{avail} < $self->{limit};
433             }
434              
435             sub _batch_id {
436 148     148   1669 my ($self,$stamp,$dbh) = @_;
437 148   33     590 $dbh ||= $self->_dbh;
438 148         697 my $sth = $dbh->prepare("SELECT MAX(batchid) FROM the_queue WHERE timestamp=?");
439 148         260696 my $z = $sth->execute($stamp);
440 148         4294 my $tt = $sth->fetchall_arrayref;
441 148 50       699 if (@$tt == 0) {
442 0         0 return 0;
443             } else {
444 148         2151 return $tt->[0][0];
445             }
446             }
447              
448             sub dequeue {
449 8     8 1 3595 my $self = shift;
450 8 50       38 Forks::Queue::_validate_input($_[0], 'count', 1) if @_;
451 3 50       21 if ($self->{style} ne 'lifo') {
452 3 50       51 return @_ ? $self->_retrieve(-1,1,2,0,$_[0])
453             : $self->_retrieve(-1,1,2,0);
454             } else {
455 0 0       0 return @_ ? $self->_retrieve(+1,1,2,0,$_[0])
456             : $self->_retrieve(+1,1,2,0);
457             }
458             }
459              
460             sub shift :method {
461 37     37 1 1000239 my $self = shift;
462             # purge, block
463 37 100       446 return @_ ? $self->_retrieve(-1,1,1,0,$_[0]) : $self->_retrieve(-1,1,1,0);
464             }
465              
466             sub pop {
467 9     9 1 4445 my $self = shift;
468 9 100       36 Forks::Queue::_validate_input($_[0], 'index', 1) if @_;
469             # purge, block
470 9   100     53 my @popped = $self->_retrieve(+1,1,1,0,$_[0] // 1);
471 9 100       87 return @_ ? reverse(@popped) : $popped[0];
472             }
473              
474             sub shift_nb {
475 1     1 1 6 my $self = shift;
476             # purge, no block
477 1 50       4 return @_ ? $self->_retrieve(-1,1,0,0,$_[0]) : $self->_retrieve(-1,1,0,0);
478             }
479              
480             sub pop_nb {
481 2     2 1 17 my $self = shift;
482             # purge, no block
483 2 50       26 my @popped = @_
484             ? $self->_retrieve(+1,1,0,0,$_[0]) : $self->_retrieve(+1,1,0,0);
485 2 50       11 return @_ ? @popped : $popped[0];
486 0         0 return @popped;
487             }
488              
489             sub extract {
490 26     26 1 10964 my $self = shift;
491 26 100       137 Forks::Queue::_validate_input( $_[0], 'index' ) if @_;
492 23   100     226 my $index = shift || 0;
493 23 100       83 Forks::Queue::_validate_input( $_[0], 'count', 1) if @_;
494 18   100     73 my $count = $_[0] // 1;
495 18         31 my $reverse = 0;
496              
497 18         29 my $tfactor = -1;
498 18 100       45 if ($self->{style} eq 'lifo') {
499 9         14 $tfactor = 1;
500 9         15 $reverse = 1;
501             }
502 18 50       44 if ($count <= 0) {
503 0         0 carp "Forks::Queue::extract: count must be positive";
504 0         0 return;
505             }
506 18 100       45 if ($index < 0) {
507 8 50       31 if ($index + $count > 0) {
508 0         0 $count = -$index;
509             }
510 8         18 $index = -$index - 1;
511 8         16 $index -= $count - 1;
512              
513 8         14 $tfactor *= -1;
514 8         14 $reverse = !$reverse;
515             }
516             # purge, no block
517 18         66 my @items = $self->_retrieve( $tfactor, 1, 0, $index, $index+$count);
518 18 100       65 if ($reverse) {
519 9         25 @items = reverse(@items);
520             }
521 18 100 66     184 return @_ ? @items : $items[0] // ();
522             }
523              
524             sub insert {
525 14     14 1 2712 my ($self, $pos, @items) = @_;
526 14         55 Forks::Queue::_validate_input($pos,'index');
527 10         19 my (@deferred_items);
528 10   33     52 my $_DEBUG = $self->{debug} // $DEBUG;
529 10         16 my $inserted = 0;
530 10 50       37 if ($self->_end) {
531             carp "Forks::Queue: insert call from process $$ ",
532 0         0 "after end call from process " . $self->{_end} . "\n";
533 0         0 return 0;
534             }
535              
536 10         28 my $limit = $self->{limit};
537 10 50       29 $limit = 9E9 if $self->{limit} <= 0;
538              
539 10 100       34 if ($pos >= $self->_avail) {
540 2         23 return $self->put(@items);
541             }
542 8 100       27 if ($pos <= -$self->_avail) {
543             #return $self->unshift(@items);
544 2         6 $pos = 0;
545             }
546 8 100       29 if ($pos < 0) {
547 2         6 $pos += $self->_avail;
548             }
549              
550             # find timestamps for items $pos and $pos+1
551             # choose 0+@items intermediate timestamps
552             # if $pos+1 is undef, use current time as timestamp
553             # as in the _push function, add items
554 8         24 my $dbh = $self->_dbh;
555 8         33 my $sths = $dbh->prepare(
556             "SELECT timestamp,batchid FROM the_queue ORDER BY timestamp,batchid LIMIT ?");
557 8         521 $dbh->begin_work;
558 8         770 my $z = $sths->execute($pos+1);
559 8         118 my $tt = $sths->fetchall_arrayref;
560 8         42 $DB::single = 1;
561 8         22 my ($t1,$t2,$b1,$b2);
562 8 50       30 if (@$tt > 0) {
563 8         18 $t2 = $tt->[-1][0];
564 8         15 $b2 = $tt->[-1][1];
565             } else {
566 0         0 $t2 = Time::HiRes::time();
567 0         0 $b2 = 0;
568             }
569 8 50       31 if (@$tt == $pos) {
    100          
570 0         0 $t1 = $t2;
571 0         0 $b1 = $b2;
572 0         0 $b2 = 0;
573 0 0       0 if ($t2 < 0) {
574 0         0 $t2 = -Time::HiRes::time();
575             } else {
576 0         0 $t2 = Time::HiRes::time();
577             }
578             } elsif ($pos == 0) {
579 2         8 $t1 = $t2 - 100000;
580 2         6 $b1 = 0;
581             } else {
582 6         11 $t1 = $tt->[-2][0];
583 6         13 $b1 = $tt->[-2][1];
584             }
585              
586 8         15 my ($t3,$b3);
587 8 100       25 if ($t1 == $t2) {
588 6         30 my $sthr = $dbh->prepare("UPDATE the_queue SET batchid=batchid+?
589             WHERE timestamp=? AND batchid>=?");
590 6         1564 $sthr->execute(0+@items,$t1,$b2);
591 6         25 $t3 = $t1;
592 6         70 $b3 = $b1+1;
593             } else {
594 2         5 $t3 = ($t1 + $t2) / 2;
595 2         5 $b3 = 0;
596 2 50       7 if ($t3 == $t1) {
597 0         0 $b3 = $b1+1;
598             }
599             }
600 8   100     42 while (@items && $self->_avail < $limit) {
601 24         49 my $item = shift @items;
602 24     24   128 _try(3, sub { $self->_add($item,$t3,$b3) });
  24         66  
603 24         54 $inserted++;
604 24         71 $b3++;
605             }
606 8         86443 $dbh->commit;
607 8 100       119 if (@items > 0) {
608 2         15 @deferred_items = @items;
609             }
610 8 100       28 if (@deferred_items) {
611 2 50       18 if ($self->{on_limit} eq 'fail') {
612 2         679 carp "Forks::Queue: queue buffer is full and ",
613             0+@deferred_items," items were not inserted";
614             } else {
615 0 0       0 $_DEBUG && print STDERR "$$ ",0+@deferred_items, " on insert. ",
616             "Waiting for capacity\n";
617 0         0 $self->_wait_for_capacity;
618 0 0       0 $_DEBUG && print STDERR "$$ got some capacity\n";
619 0         0 $inserted += $self->insert($pos+$inserted,@deferred_items);
620             }
621             }
622 8 50       2653 $self->_notify if $inserted;
623 8         140 return $inserted;
624             }
625              
626             sub _retrieve {
627 111     111   221 my $self = shift;
628 111         218 my $tfactor = shift;
629             # tfactor = -1: select newest items first
630             # tfactor = +1: select oldest items first
631 111         257 my $purge = shift;
632             # purge = 0: do not delete items that we retrieve
633             # purge = 1: delete items that we retrieve
634 111         184 my $block = shift;
635             # block = 0: no block if queue is empty
636             # block = 1: block only if queue is empty
637             # block = 2: block if full request can not be fulfilled
638 111         169 my $lo = shift;
639 111 100       318 my $hi = @_ ? $_[0] : $lo+1;
640 111 50       281 return if $hi <= $lo;
641              
642             # attempt to retrieve items $lo .. $hi and return them
643             # retrieved items are removed from the queue if $purge is set
644             # get newest items first if $tfactor > 0, oldest first if $tfactor < 0
645             # only block while
646             # $block is set
647             # zero items have been found
648              
649 111 50 66     421 if ($lo > 0 && $block) {
650 0         0 carp __PACKAGE__, ": _retrieve() didn't expect block=$block and lo=$lo";
651 0         0 $block = 0;
652             }
653              
654 111 100       423 my $order = $tfactor > 0
655             ? "timestamp DESC,batchid DESC" : "timestamp,batchid";
656 111         359 my $dbh = $self->_dbh;
657 111         1261 my $sths = $dbh->prepare(
658             "SELECT item,batchid,timestamp FROM the_queue
659             ORDER BY $order LIMIT ?");
660 111   66     11149 my $sthd = $purge && $dbh->prepare(
661             "DELETE FROM the_queue WHERE item=? AND timestamp=? AND batchid=?");
662 111         6539 my @return;
663 111 50       354 if (!$sths) {
664 0         0 warn "prepare queue SELECT statement failed: $dbh->errstr";
665             }
666              
667 111         371 while (@return <= 0) {
668 24678 50       52050 my $limit = $hi - @return + ($lo < 0 ? $lo : 0);
669 24678         79771 $dbh->begin_work;
670 24678   33     1656870 my $z = $sths && $sths->execute($limit);
671 24678   33     319600 my $tt = $sths && $sths->fetchall_arrayref;
672 24678 50 33     58713 if ($lo < 0 && -$lo > @$tt) {
673 0         0 $hi += (@$tt - $lo);
674 0         0 $lo += (@$tt - $lo);
675             }
676 24678 100 66     133990 if (!$tt || @$tt == 0) {
    100 66        
    100 100        
677 14         460 $dbh->rollback;
678 14 100       63 if ($block) {
679 7         60 $self->_wait_for_item;
680 7         25 next;
681             } else {
682 7         186 return;
683             }
684             } elsif ($block > 1 && $lo == 0 && @$tt < $hi) {
685             # not enough items on queue to satisfy request
686 24560         486895 $dbh->rollback;
687 24560         101960 next;
688             } elsif (@$tt <= $lo) {
689             # not enough items on queue to satisfy request
690 6         192 $dbh->rollback;
691 6         133 return;
692             }
693 98 100       259 $hi = @$tt if $hi > @$tt;
694              
695 98         410 foreach my $itt ($lo .. $hi-1) {
696 437 50       992 if (!defined($tt->[$itt])) {
697 0         0 warn "\nResult $itt from $lo .. $hi-1 is undefined!";
698             }
699 437         550 my ($item,$bid,$timestamp) = @{$tt->[$itt]};
  437         1006  
700 437         2311 CORE::push @return, $jsonizer->decode($item);
701 437 100       838 if ($purge) {
702              
703 399     399   1855 my $zd = _try(4, sub { $sthd->execute($item,$timestamp,$bid)} );
  399         31956  
704 399 50       1342 if (!$zd) {
705 0         0 warn "Forks::Queue::SQLite: ",
706             "purge failed: $item,$timestamp,$bid";
707             }
708             }
709             }
710 98         1542368 $dbh->commit;
711             } continue {
712 24665 100       46442 if ($block) {
713 24612 100 100     57146 if ($self->_end || $self->_expired) {
714 29         151 $block = 0;
715             }
716             }
717             }
718 98 100 33     2593 return @_ ? @return : $return[0] // ();
719             }
720              
721              
722              
723             sub _pop {
724 0     0   0 my $self = shift;
725 0         0 my $tfactor = shift;
726 0         0 my $purge = shift;
727 0         0 my $block = shift;
728 0         0 my $wantarray = shift;
729 0         0 my ($count) = @_;
730 0   0     0 $count ||= 1;
731              
732 0         0 my $order = "timestamp,batchid";
733 0 0       0 if ($tfactor > 0) {
734 0         0 $order = "timestamp DESC,batchid DESC";
735             }
736 0         0 my $dbh = $self->_dbh;
737 0         0 my $sths = $dbh->prepare(
738             "SELECT item,timestamp,pid FROM the_queue ORDER BY $order LIMIT ?");
739 0         0 my $sthd = $dbh->prepare(
740             "DELETE FROM the_queue WHERE item=? AND timestamp=? AND pid=?");
741 0         0 my @return = ();
742 0         0 while (@return == 0) {
743 0         0 my $limit = $count - @return;
744 0         0 my $z = $sths->execute($limit);
745 0         0 my $tt = $sths->fetchall_arrayref;
746 0 0       0 if (@$tt == 0) {
747 0 0 0     0 if ($block && $self->_wait_for_item) {
748 0         0 next;
749             } else {
750 0         0 last;
751             }
752             }
753 0         0 foreach my $t (@$tt) {
754 0         0 my ($item,$bid,$timestamp) = @$t;
755 0         0 CORE::push @return, $jsonizer->decode($item);
756 0 0       0 if ($purge) {
757 0         0 $dbh->begin_work;
758 0         0 my $zd = $sthd->execute($item,$timestamp,$bid);
759 0 0       0 if (!$zd) {
760 0         0 carp "purge failed: $item,$timestamp,$bid\n";
761             }
762 0         0 $dbh->commit;
763             }
764             }
765             }
766 0 0       0 return $wantarray ? @return : $return[0];
767             }
768              
769             sub clear {
770 16     16 1 6141 my $self = shift;
771 16         52 my $dbh = $self->_dbh;
772 16         130 $dbh->begin_work;
773 16         724 $dbh->do("DELETE FROM the_queue");
774 16         296648 $dbh->commit;
775             }
776              
777             sub peek_front {
778 39     39 0 102 my $self = shift;
779 39         69 my ($index) = @_;
780 39   100     135 $index ||= 0;
781 39 100       80 if ($index < 0) {
782 17         48 return $self->peek_back(-$index - 1);
783             }
784             # no purge, no block, always retrieve a single item
785 22         65 return $self->_retrieve(-1,0,0,$index);
786             }
787              
788             sub peek_back {
789 19     19 0 34 my $self = shift;
790 19         29 my ($index) = @_;
791 19   100     51 $index ||= 0;
792 19 50       40 if ($index < 0) {
793 0         0 return $self->peek_front(-$index - 1);
794             }
795             # no purge, no block, always retrieve a single item
796 19         44 return $self->_retrieve(+1,0,0,$index);
797             }
798              
799             sub _notify {
800 161 50   161   770 return unless $Forks::Queue::NOTIFY_OK;
801              
802 161         428 my $self = shift;
803 161         835 my $dbh = $self->_dbh;
804 161         2614 my $sth = $dbh->prepare("SELECT pid,tid FROM pids");
805 161         34833 my $z = $sth->execute;
806 161         4335 my $pt = $sth->fetchall_arrayref;
807 161         1465 my @pids = map { $_->[0] } grep { $_->[0] != $$ } @$pt;
  79         348  
  240         1138  
808 161 100       617 if (@pids) {
809 68 50 33     666 ($self->{debug} // $DEBUG) && print STDERR "$$ notify: @pids\n";
810 68         14239 kill 'IO', @pids;
811             }
812 161 100       573 my @tids = map { $_->[1] } grep { $_->[0] == $$ && $_->[1] != TID() } @$pt;
  0         0  
  240         1393  
813 161 50       2935 if (@tids) {
814 0         0 foreach my $tid (@tids) {
815 0         0 my $thr = threads->object($tid);
816 0 0       0 $thr && $thr->kill('IO');
817             }
818             }
819             }
820              
821             my $id = 0;
822             sub _impute_file {
823 17     17   110 my $base = $0;
824 17         190 $base =~ s{.*[/\\](.)}{$1};
825 17         107 $base =~ s{[/\\]$}{};
826 17         49 $id++;
827 17         52 my @candidates;
828 17 50       228 if ($^O eq 'MSWin32') {
829 0         0 @candidates = (qw(C:/Temp C:/Windows/Temp));
830             } else {
831 17         100 @candidates = qw(/tmp /var/tmp);
832             }
833 17         173 for my $candidate ($ENV{FORKS_QUEUE_DIR},
834             $ENV{TMPDIR}, $ENV{TEMP},
835             $ENV{TMP}, @candidates,
836             $ENV{HOME}, ".") {
837 85 50 66     815 if (defined($candidate) && $candidate ne '' &&
      66        
      33        
      33        
838             -d $candidate && -w _ && -x _) {
839 17         192 return $candidate . "/fq-$$-$id-$base.sql3";
840             }
841             }
842 0           my $file = "./fq-$$-$id-$base.sql3";
843 0           carp __PACKAGE__, ": queue db file $file might not be a good location!";
844 0           return $file;
845             }
846              
847             sub _DUMP {
848 0     0     my ($self,$fh_dump) = @_;
849 0           my $dbh = $self->_dbh;
850 0   0       $fh_dump ||= *STDERR;
851              
852 0           my $sth = $dbh->prepare("SELECT * FROM pids");
853 0           my $z = $sth->execute;
854 0           print {$fh_dump} "\n\n=== pids ===\n------------\n";
  0            
855 0           foreach my $r (@{$sth->fetchall_arrayref}) {
  0            
856 0           print {$fh_dump} join("\t",@$r),"\n";
  0            
857             }
858              
859 0           $sth = $dbh->prepare("SELECT * FROM status");
860 0           $z = $sth->execute;
861 0           print {$fh_dump} "\n\n=== status ===\n--------------\n";
  0            
862 0           foreach my $r (@{$sth->fetchall_arrayref}) {
  0            
863 0           print {$fh_dump} join("\t",@$r),"\n";
  0            
864             }
865              
866 0           $sth = $dbh->prepare("SELECT * FROM the_queue");
867 0           $z = $sth->execute;
868 0           print {$fh_dump} "\n\n=== queue ===\n-------------\n";
  0            
869 0           foreach my $r (@{$sth->fetchall_arrayref}) {
  0            
870 0           print {$fh_dump} join("\t",@$r),"\n";
  0            
871             }
872 0           print {$fh_dump} "\n\n";
  0            
873             }
874              
875             1;
876              
877             =head1 NAME
878              
879             Forks::Queue::SQLite - SQLite-based implementation of Forks::Queue
880              
881             =head1 VERSION
882              
883             0.13
884              
885             =head1 SYNOPSIS
886              
887             my $q = Forks::Queue->new( impl => 'SQLite', db_file => "queue-file" );
888             $q->put( "job1" );
889             $q->put( { name => "job2", task => "do something", data => [42,19] } );
890             ...
891             $q->end;
892             for my $w (1 .. $num_workers) {
893             if (fork() == 0) {
894             my $task;
895             while (defined($task = $q->get)) {
896             ... perform task in child ...
897             }
898             exit;
899             }
900             }
901              
902             =head1 DESCRIPTION
903              
904             SQLite-based implementation of L.
905             It requires the C libraries and the L
906             Perl module.
907              
908             =head1 METHODS
909              
910             See L for an overview of the methods supported by
911             this C implementation.
912              
913             =head2 new
914              
915             =head2 $queue = Forks::Queue::SQLite->new( %opts )
916              
917             =head2 $queue = Forks::Queue->new( impl => 'SQLite', %opts )
918              
919             The C constructor recognized the following
920             configuration options.
921              
922             =over 4
923              
924             =item * db_file
925              
926             The name of the file to use to store queue data and metadata.
927             If omitted, a temporary filename is chosen.
928              
929             =item * style
930              
931             =item * limit
932              
933             =item * on_limit
934              
935             =item * join
936              
937             =item * persist
938              
939             See L for descriptions of these options.
940              
941             =back
942              
943             =head1 LICENSE AND COPYRIGHT
944              
945             Copyright (c) 2017-2019, Marty O'Brien.
946              
947             This library is free software; you can redistribute it and/or modify
948             it under the same terms as Perl itself, either Perl version 5.10.1 or,
949             at your option, any later version of Perl 5 you may have available.
950              
951             See http://dev.perl.org/licenses/ for more information.
952              
953             =cut