File Coverage

blib/lib/Forks/Queue/File.pm
Criterion Covered Total %
statement 665 840 79.1
branch 214 382 56.0
condition 93 195 47.6
subroutine 75 80 93.7
pod 14 18 77.7
total 1061 1515 70.0


line stmt bran cond sub pod time code
1              
2             package Forks::Queue::File;
3 100     100   979971 use strict;
  100         209  
  100         2622  
4 100     100   443 use warnings;
  100         170  
  100         2442  
5 100     100   431 use Carp;
  100         165  
  100         5090  
6 100     100   53679 use JSON;
  100         700419  
  100         477  
7 100     100   12232 use Time::HiRes;
  100         2285  
  100         495  
8 100     100   6050 use base 'Forks::Queue';
  100         175  
  100         30436  
9 100     100   1479 use 5.010; # sorry, v5.08. I love the // //= operators too much
  100         267  
10              
11             our $VERSION = '0.13';
12             our $DEBUG;
13             *DEBUG = \$Forks::Queue::DEBUG;
14              
15             $SIG{IO} = sub { } if $Forks::Queue::NOTIFY_OK;
16              
17              
18             # prefer functional JSON calls because we still want to use JSON
19             # during global destruction, and a JSON object might not be available
20             # then
21             sub jsonize {
22 4055     4055 0 16298 JSON::to_json($_[0], { allow_nonref=>1, ascii=>1 } );
23             }
24              
25             sub dejsonize {
26 15427     15427 0 70763 JSON::from_json($_[0], { allow_nonref => 1, ascii => 1 } );
27             }
28              
29             # if we exercise firm control over line endings,
30             # we won't have any DOS vs Unix vs Mac fights.
31 100     100   550 use constant EOL => "\x{0a}";
  100         189  
  100         72596  
32             # Anything that can't be a valid JSON substring is ok to use here
33              
34             sub _lock {
35             # a file based queue generally lends itself to file based
36             # advisory locking, though it doesn't work on Solaris with threads.
37             # The generic _lock and _unlock functions can support other
38             # schemes.
39              
40            
41 14435     14435   19783 my $self = shift;
42 14435 50       27369 if ($self->{_locked}) {
43 0         0 Carp::cluck "$$ acquiring lock but already have lock";
44 0         0 return;
45             }
46 14435   33     35764 my $_DEBUG = $self->{debug} // $DEBUG;
47              
48 14435         74012 local $! = 0;
49 14435 100       39848 if ($self->{_lockdir}) {
    50          
50 236         877 my $z = Dir::Flock::lock($self->{_lockdir});
51 236 50       621 $_DEBUG && print STDERR ">> flock_dir lock by ".
52             _PID() . " z=$z \!$=$!\n";
53 236 50       562 if (!$z) {
54 0         0 carp "Forks::Queue: lock queue by flock_dir failed: $!";
55             }
56             } elsif ($self->{lock}) {
57             # file-based advisory file locking with flock
58             # Doesn't work across threads in Solaris, since fcntl implementation
59             # passes the process id but not the thread id to the locking
60             # functions.
61            
62 14199         493397 open my $lockfh, ">>", $self->{lock};
63 14199         266515 my $z = flock $lockfh, 2;
64 14199   66     47147 while (!$z && $Forks::Queue::NOTIFY_OK && $!{EINTR}) {
      33        
65             # SIGIO can interrupt flock
66 69         39493 $z = flock $lockfh, 2;
67             }
68 14199         27310 $self->{lockfh} = $lockfh;
69 14199 50       33693 $_DEBUG && print STDERR ">> flock lock by " . _PID() . "\n";
70             }
71 14435         47980 $self->{_locked} = 1;
72             }
73              
74             sub _unlock {
75 14435     14435   19693 my $self = shift;
76 14435         20367 $self->{_locked} = 0;
77 14435   33     38767 my $_DEBUG = $self->{debug} // $DEBUG;
78 14435         107431 local $! = 0;
79 14435 100       39243 if ($self->{_lockdir}) {
    50          
80 236         845 my $z = Dir::Flock::unlock($self->{_lockdir});
81 236 50       642 $_DEBUG && print STDERR "<< flock_dir unlock by " . _PID() . " z=$z\n";
82             } elsif ($self->{lockfh}) {
83 14199         203432 my $z = close delete $self->{lockfh};
84 14199 50       60085 $_DEBUG && print STDERR "<< flock unlock by " . _PID() . " z=$z\n";
85             }
86 14435         39965 return;
87             }
88              
89              
90              
91              
92             # execute a block of code in a way where only one
93             # thread/process can be executing code for this queue
94             sub _SYNC (&$) {
95 14399     14399   27189 my ($block,$self) = @_;
96 14399 50       374008 return if Forks::Queue::__inGD();
97 14399   33     52293 my $_DEBUG = $self->{debug} // $DEBUG;
98              
99 14399         47618 $self->_lock;
100 14399         34259 my $result = $block->($self);
101 14399         39214 $self->_unlock;
102 14399         34136 return $result;
103             }
104              
105             sub _SYNCWA (&$) { # wantarray version of _SYNC
106 36     36   65 my ($block,$self) = @_;
107 36   33     128 my $_DEBUG = $self->{debug} // $DEBUG;
108              
109 36         80 $self->_lock;
110 36         107 my @result = $block->($self);
111 36         124 $self->_unlock;
112 36         86 return @result;
113             }
114              
115             sub _PID {
116 1917 50   1917   20469 $INC{'threads.pm'} ? join("-", $$, threads->tid) : $$
117             }
118              
119             sub new {
120 104     104 1 28528 my $class = shift;
121 104         1476 my %opts = (%Forks::Queue::OPTS, @_);
122              
123 104         328 ${^_nfs} = 0;
124 104   100     804 $opts{file} //= _impute_file();
125 93         1111 $opts{lock} = $opts{file} . ".lock";
126 93         498 my $list = delete $opts{list};
127              
128 93         241 my $fh;
129              
130 93   50     2131 $opts{_header_size} //= 2048;
131 93         416 $opts{_end} = 0; # whether "end" has been called for this obj
132 93         364 $opts{_pos} = 0; # "cursor", index of next item to shift out
133 93         393 $opts{_tell} = $opts{_header_size}; # file position of cursor
134              
135 93         670 $opts{_count} = 0; # index of next item to be appended
136 93         829 $opts{_pids} = { _PID() => 'P' };
137 93         1245 $opts{_version} = $Forks::Queue::VERSION;
138 93         2256 $opts{_qid} = Forks::Queue::Util::QID();
139              
140             # how often to refactor the queue file. use small values to keep file
141             # sizes small and large values to improve performance
142 93   50     1286 $opts{_maintenance_freq} //= 128;
143              
144              
145            
146              
147 93 50       17615 open $fh, '>>', $opts{lock} or die;
148 93 50       1604 close $fh or die;
149              
150 93         3508 my $self = bless { %opts }, $class;
151              
152             # Normal flock can not be used with multi-threaded solaris,
153             # and may be flaky with files on NFS directories.
154 93 50       2148 if ($^O eq 'solaris') {
    50          
    50          
155 0   0     0 $opts{dflock} //= 1;
156             } elsif (${^_nfs}) {
157 0   0     0 $opts{dflock} //= 1;
158             } elsif (Forks::Queue::Util::__is_nfs( $opts{file} )) {
159 0   0     0 $opts{dflock} //= 1;
160             }
161              
162 67 100       1583 if ($opts{dflock}) {
163             # Dir::Flock (included in this distribution) provides a safer
164             # (if more cumbersome) advisory locking method to synchronize
165             # the queue.
166 100     100   658 no warnings 'numeric';
  100         181  
  100         59414  
167 11         6105 require Dir::Flock;
168 11         161 $self->{_lockdir} = Dir::Flock::getDir( $opts{lock} );
169 11         48 $Dir::Flock::HEARTBEAT_CHECK = 5;
170 11         27 $Dir::Flock::PAUSE_LENGTH = 0.01;
171             }
172              
173              
174 67 100 66     1637 if ($opts{join} && -f $opts{file}) {
175 2         69 $DB::single = 1;
176 2 50       102 open $fh, '+<', $opts{file} or die;
177 2         63 $self->{_fh} = *$fh;
178 2         44 my $fhx = select $fh; $| = 1; select $fhx;
  2         25  
  2         30  
179 2     2   70 _SYNC { $self->_read_header } $self;
  2         32  
180             } else {
181 65 100       1895 if (-f $opts{file}) {
182 4         3160 carp "Forks::Queue: Queue file $opts{file} already exists. ",
183             "Expect trouble if another process created this file.";
184             }
185 65 50       5540 open $fh, '>', $opts{file} or die;
186 65 50       1000 close $fh or die;
187              
188 65 50       2216 open $fh, '+<', $opts{file} or die;
189 65         1171 my $fhx = select $fh; $| = 1; select $fhx;
  65         693  
  65         733  
190 65         4500 $self->{_fh} = *$fh;
191 65         675 seek $fh, 0, 0;
192              
193 65         2751 $self->{_locked}++;
194 65         1503 $self->_write_header;
195 65         154 $self->{_locked}--;
196 65 50       578 if (tell($fh) < $self->{_header_size}) {
197 65         1406 print $fh "\0" x ($self->{_header_size} - tell($fh));
198             }
199             }
200 67 100       453 if (defined($list)) {
201 6 50       41 if (ref($list) eq 'ARRAY') {
202 6         301 $self->push( @$list );
203             } else {
204 0         0 carp "Forks::Queue::new: 'list' option must be an array ref";
205             }
206             }
207              
208 67         6728 return $self;
209             }
210              
211              
212             sub DESTROY {
213 90     90   13270296 my $self = shift;
214 90         523 my $pid = _PID();
215 90   33     926 my $_DEBUG = $self->{debug} // $DEBUG;
216 90 50       593 $_DEBUG && print STDERR "$pid DESTROY called\n";
217 90         691 $self->{_DESTROY}++;
218 90 50       3127 if (Forks::Queue::__inGD()) {
219 0         0 $self->{_locked} = -1;
220 0 0       0 if (my $h = $self->_read_header) {
221 0 0       0 $_DEBUG && print STDERR "$pid DESTROY header at GD: $h\n";
222 0         0 my $role = delete $self->{_pids}{$pid};
223 0 0 0     0 if ($role && $role eq 'P') {
224 0         0 $self->{_pids} = {};
225 0 0       0 $_DEBUG && print STDERR "$pid DESTROY role=P\n";
226 0         0 $self->_write_header;
227             }
228             }
229 0         0 delete $self->{_locked};
230             } else {
231 90         231 eval {
232             _SYNC {
233 90 50   90   344 if ($self->_read_header) {
234             $_DEBUG and print STDERR
235             "$pid DESTROY: pids at destruction: ",
236 90 50       289 join(" ",keys %{$self->{_pids}}),"\n";
  0         0  
237 90         296 delete $self->{_pids}{$pid};
238 90         373 $self->_write_header;
239 90 50       541 $_DEBUG and print STDERR "$pid DESTROY header updated.\n";
240             } else {
241 0 0       0 $_DEBUG and print STDERR
242             "$$ DESTROY: header not available\n";
243             }
244 90         1805 } $self;
245 90 50       817 $_DEBUG && print STDERR
246             "$pid DESTROY final header read complete\n";
247             };
248 90 50       288 if ($@) {
249 0 0       0 if ($@ !~ /malformed JSON ...* at character offset 0/) {
    0          
250 100     100   2256 use Data::Dumper;
  100         28842  
  100         35450  
251 0         0 print STDERR Dumper($@,$self);
252             } elsif ($_DEBUG) {
253 0         0 print STDERR "$pid DESTROY error reading header: $@";
254             }
255             }
256             }
257 90 50       1403 $self->{_fh} && close $self->{_fh};
258             $_DEBUG and print STDERR "$pid DESTROY: remaining pids: ",
259 90 50       456 join(" ",keys %{$self->{_pids}}),"\n";
  0         0  
260 90 100 66     1242 if ($self->{_pids} && 0 == keys %{$self->{_pids}}) {
  90         5761  
261 56 50       185 $_DEBUG and print STDERR "$$ Unlinking files from here\n";
262 56         111 my $u2 = -1;
263 56         13819 my $u1 = unlink $self->{lock};
264 56 100       1792 $u2 = unlink $self->{file} unless $self->{persist};
265 56 50       263 $_DEBUG and print STDERR
266             "$$ DESTROY unlink results $u1/$u2 $self->{lock} $self->{file}\n";
267 56 50       2166 $_DEBUG and print STDERR
268             "$$ DESTROY: unlink time " . Time::HiRes::time . "\n";
269             }
270             }
271              
272             # the key to a shared file acting as a queue is the header,
273             # which holds the queue metadata like the file position of
274             # the current front and back of the queue, and the identifiers
275             # of processes that are using the queue.
276             #
277             # this function should only be called from inside a _SYNC block.
278              
279             sub _read_header {
280 14377     14377   28757 my ($self) = @_;
281 14377 50       27228 Carp::cluck "unsafe _read_header" unless $self->{_locked};
282 14377         47374 local $/ = EOL;
283 14377         30838 my $_DEBUG = $self->_debug;
284 14377         22912 my $h = "";
285 14377 100       24739 if ($self->{_DESTROY}) {
286 100     100   641 no warnings 'closed';
  100         169  
  100         39861  
287 90         1030 seek $self->{_fh}, 0, 0;
288 90   50     1545 $h = readline($self->{_fh}) // "";
289 90 50       409 $_DEBUG && print STDERR
290             "$$ Read ",length($h)," bytes from header during DESTROY\n";
291             } else {
292 14287         53384 local $! = 0;
293 14287 50       160371 if (seek $self->{_fh}, 0, 0) {
294 14287         176788 $h = readline($self->{_fh});
295             } else {
296 0         0 Carp::cluck "_read_header: invalid seek $!";
297 0         0 return;
298             }
299             }
300 14377 50       36573 if (!$h) {
301 0 0       0 if ($self->{_DESTROY}) {
302 0 0       0 $_DEBUG && print STDERR "$$ in DESTROY and header not found\n";
303 0         0 return;
304             }
305 0         0 Carp::cluck "_read_header: header not found";
306             }
307 14377         21825 chomp($h);
308 14377         24700 $h = dejsonize($h);
309 14377         451027 $self->{_pos} = $h->{index};
310 14377         21499 $self->{_end} = $h->{end};
311 14377         25044 $self->{_tell} = $h->{tell};
312 14377         19830 $self->{_count} = $h->{count};
313 14377         23593 $self->{_header_size} = $h->{headerSize};
314 14377         18295 $self->{_maintenance_freq} = $h->{maintFreq};
315 14377         19021 $self->{_version} = $h->{version};
316 14377         23652 $self->{_pids} = $h->{pids};
317 14377   100     41461 $self->{_lockdir} = $h->{lockdir} || undef;
318 14377 50       25804 $self->{limit} = $h->{limit} if $h->{limit};
319              
320 14377 50       23906 $_DEBUG && print STDERR "$$ read header\n";
321              
322 14377         28689 $h->{avail} = $self->{_avail} = $h->{count} - $h->{index}; # not written
323 14377         44524 return $h;
324             }
325              
326             sub _write_header {
327 1010     1010   2192 my ($self) = @_;
328 1010 50       3893 Carp::cluck "unsafe _write_header" unless $self->{_locked};
329 1010   33     5321 my $_DEBUG = $self->{debug} // $DEBUG;
330             my $header = { index => $self->{_pos}, end => $self->{_end},
331             tell => $self->{_tell}, count => $self->{_count},
332             limit => $self->{limit},
333             pids => $self->{_pids},
334             qid => $self->{_qid},
335             headerSize => $self->{_header_size},
336             maintFreq => $self->{_maintenance_freq},
337             ($self->{_lockdir} ? (lockdir => $self->{_lockdir}) : ()),
338 1010 100       14590 version => $self->{_version} };
339              
340 1010         3296 my $headerstr = jsonize($header);
341 1010         38657 while (length($headerstr) >= $self->{_header_size}) {
342 0         0 $self->_increase_header_size(length($headerstr) + 32);
343 0         0 $header->{tell} = $self->{_tell};
344 0         0 $headerstr = jsonize($header);
345             }
346              
347 1010         2714 eval {
348 100     100   647 no warnings;
  100         215  
  100         571500  
349 1010         8236 seek $self->{_fh}, 0, 0;
350 1010         1926 print {$self->{_fh}} $headerstr,EOL;
  1010         16924  
351 1010 50       7068 $_DEBUG && print STDERR "$$ updated header $headerstr\n";
352             };
353             }
354              
355             sub _notify {
356 660 50   660   1687 return unless $Forks::Queue::NOTIFY_OK;
357              
358 660         1111 my $self = shift;
359 660   33     2977 my $_DEBUG = $self->{debug} // $DEBUG;
360 660     660   6344 _SYNC { $self->_read_header } $self;
  660         2195  
361 660         2143 my @ids = keys %{$self->{_pids}};
  660         2865  
362 660         1279 my (@pids,@tids);
363 660         1378 my $me = _PID();
364 660 50       1813 $_DEBUG && print STDERR "$$ _notify \$me=$me \@ids=@ids\n";
365 660         8207 foreach my $id (@ids) {
366 1302         4561 my ($p,$t) = split /-/,$id;
367 1302 50       2457 if (!$p) {
368 0         0 ($p,$t) = (-$t,0);
369             }
370 1302 100 33     4849 if ($p != $$) {
    50          
371 642         1508 push @pids, $p;
372             } elsif (defined($t) && $id ne $me) {
373 0         0 push @tids, $t;
374             }
375             }
376 660 50       1690 if (@tids) {
377 0 0       0 $_DEBUG && print STDERR "$$ notify: tid @tids\n";
378 0         0 foreach my $tid (@tids) {
379 0         0 my $thr = threads->object($tid);
380 0 0       0 if ($thr) {
    0          
381 0         0 my $z7;
382 0 0 0     0 $thr && ($z7 = $thr->kill('IO')) &&
      0        
383             $_DEBUG && print STDERR
384             "$$ _notify to tid $$-$tid \$z7=$z7\n";
385 0 0       0 if ($tid ne $tids[-1]) {
386             #Time::HiRes::sleep 0.25;
387             }
388              
389             # $thr->kill is not reliable?
390            
391             } elsif ($tid == 0) {
392 0 0       0 $_DEBUG && print STDERR "$$ _notify SIGIO to tid main\n";
393 0         0 kill 'IO', $$;
394             } else {
395 0 0       0 $_DEBUG && print STDERR
396             "$$ _notify failed to SIGIO tid $tid\n";
397             }
398             }
399             }
400 660 100       1620 if (@pids) {
401 426 50       941 $_DEBUG && print STDERR "$$ _notify to pids @pids\n";
402 426         57440 kill 'IO', @pids;
403             }
404             }
405              
406             sub clear {
407 62     62 1 17836 my $self = shift;
408 62 50       110 if (! eval { $self->_check_pid; 1 } ) {
  62         242  
  62         179  
409 0         0 carp("File::Queue::clear operation failed: $@");
410 0         0 return;
411             }
412             _SYNC {
413 62     62   210 $self->_read_header;
414 62         139 $self->{_pos} = 0;
415 62         111 $self->{_tell} = $self->{_header_size};
416 62         110 $self->{_count} = 0;
417 62         1633 truncate $self->{_fh}, $self->{_tell};
418 62         376 $self->_write_header;
419 62         564 } $self;
420             }
421              
422             sub end {
423 19     19 1 31386011 my ($self) = @_;
424 19 50       62 if (! eval { $self->_check_pid; 1 } ) {
  19         197  
  19         76  
425 0         0 carp "Forks::Queue::end operation failed: $@";
426 0         0 return;
427             }
428             _SYNC {
429 19     19   95 $self->_read_header;
430 19 50       101 if ($self->{_end}) {
431 0         0 carp "Forks::Queue: end() called from $$, ",
432             "previously called from $self->{_end}";
433             } else {
434 19         73 $self->{_end} = _PID();
435             }
436 19         99 $self->_write_header;
437 19         393 } $self;
438 19         209 $self->_notify;
439 19         81 return;
440             }
441              
442             sub status {
443 371     371 1 60040402 my ($self) = @_;
444 371     371   3379 my $status = _SYNC { $self->_read_header } $self;
  371         1203  
445 371         1896 $status->{file} = $self->{file};
446 371         3777 $status->{filesize} = -s $self->{_fh};
447 371         951 $status->{end} = $self->{_end};
448 371         1167 return $status;
449             }
450              
451             sub _check_pid {
452 956     956   2381 my ($self) = @_;
453 956   33     5910 my $_DEBUG = $self->{debug} // $DEBUG;
454 956 100       4417 if (!defined $self->{_pids}{_PID()}) {
455 28 50       1100 if ($Forks::Queue::NOTIFY_OK) {
456 28 50       590 if (_PID() =~ /.-[1-9]/) {
457             # SIGIO can't be reliably passed to threads, so can't
458             # rely on long sleep command being interrupted
459 0         0 $Forks::Queue::SLEEP_INTERVAL = 1;
460             }
461 28     220   1728 $SIG{IO} = sub { };
462             }
463 28         3901 my $ostatus = open $self->{_fh}, '+<', $self->{file};
464 28         677 for (1..5) {
465 28 50       385 last if $ostatus;
466 0         0 sleep int(sqrt($_));
467 0         0 $ostatus = open $self->{_fh}, '+<', $self->{file};
468             }
469 28 50       290 if (!$ostatus) {
470 0         0 Carp::confess("Forks::Queue::check_pid: ",
471             "Could not open $self->{file} after 5 tries: $!");
472 0         0 return;
473             }
474 28 50       466 if ($self->{_locked}) {
475 0 0       0 $_DEBUG && print STDERR
476             "Forks::Queue: $$ new pid update header\n";
477 0         0 $self->{_pids}{_PID()} = 'C';
478 0         0 $self->_write_header;
479 0         0 return;
480             } else {
481 28 50       233 $_DEBUG and print STDERR "Forks::Queue: $$ new pid sync\n";
482             _SYNC {
483 28     28   560 $self->_read_header;
484 28         199 $self->{_pids}{_PID()} = 'C';
485 28         291 $self->_write_header;
486 28         736 } $self;
487 28         304 return;
488             }
489             }
490 928         2374 return;
491             }
492              
493             sub _increase_header_size {
494 0     0   0 my ($self,$min_size) = @_;
495             # assumes $self has been updated by $self->_read_header recently
496 0 0       0 return if $min_size <= $self->{_header_size};
497              
498 0         0 local $/ = EOL;
499 0         0 my $delta = $min_size - $self->{_header_size};
500 0         0 seek $self->{_fh}, $self->{_header_size}, 0;
501 0         0 my @data = readline($self->{_fh});
502 0         0 seek $self->{_fh}, 0, 0;
503 0         0 print {$self->{_fh}} "\0" x $min_size;
  0         0  
504 0         0 print {$self->{_fh}} @data;
  0         0  
505 0         0 $self->{_header_size} = $min_size;
506 0         0 $self->{_tell} += $delta;
507 0         0 return;
508             }
509              
510             sub _maintain {
511 5     5   12 my ($self) = @_;
512             # assumes $self has been updated by $self->_read_header recently
513              
514 5         10 my $delta = $self->{_tell} - $self->{_header_size};
515 5 50       11 return if $delta == 0;
516 5         18 local $/ = EOL;
517 5         51 seek $self->{_fh}, $self->{_tell}, 0;
518 5         127 my @data = readline($self->{_fh});
519 5         38 seek $self->{_fh}, $self->{_header_size}, 0;
520 5         11 print {$self->{_fh}} @data;
  5         56  
521 5         83 truncate $self->{_fh}, tell($self->{_fh});
522              
523 5         17 $self->{_avail} = $self->{_count} = @data;
524 5         9 $self->{_pos} = 0;
525 5         7 $self->{_tell} = $self->{_header_size};
526 5         33 return;
527             }
528              
529             sub push {
530 475     475 1 2553 my ($self,@items) = @_;
531 475 50       1333 if (! eval { $self->_check_pid; 1 } ) {
  475         2909  
  475         1379  
532 0         0 carp "Forks::Queue::put call from process $$ failed: $@";
533 0         0 return;
534             }
535              
536 475         3303 my (@deferred_items,$failed_items);
537 475         914 my $pushed = 0;
538             _SYNC {
539 475     355   2192 $self->_read_header;
540 475 100       1300 if ($self->{_end}) {
541             carp "Forks::Queue: put call from process $$ ",
542 5         1631 "after end call from process ", $self->{_end}, "!";
543 5         1396 return 0;
544             }
545              
546             # TODO: check if queue limit is reached, consult on_limit
547 470 100       1199 if ($self->{limit} > 0) {
548 159         378 $failed_items = $self->{_avail} + @items - $self->{limit};
549 159 100       371 if ($failed_items > 0) {
550 120         429 @deferred_items = splice @items, -$failed_items;
551 120 100       324 if (@items == 0) {
552 4         12 return;
553             }
554             } else {
555 39         380 $failed_items = 0;
556             }
557             }
558              
559 466         4904 seek $self->{_fh}, 0, 2;
560 466 50       2093 if (tell($self->{_fh}) < $self->{_tell}) {
561 0         0 Carp::cluck "funny seek";
562 0         0 seek $self->{_fh}, $self->{_tell}, 0;
563             }
564 466         1127 foreach my $item (@items) {
565 2981         5211 my $json = jsonize($item);
566 2981         55106 print {$self->{_fh}} $json,EOL;
  2981         29718  
567 2981         9238 $self->{_count}++;
568 2981         4073 $self->{_avail}++;
569 2981         3186 $pushed++;
570 2981 50       6380 $self->_debug && print STDERR
571             "$$ put item [$json] $pushed/",0+@items,"\n";
572             }
573 466         1459 $self->_write_header;
574 475         9507 } $self;
575 475 50 66     6375 if ($pushed && $self->_debug) {
576 0         0 print STDERR "_notify from push(\$pushed=$pushed)\n";
577             }
578 475 100       2238 $self->_notify if $pushed;
579              
580 475 100       6215 if ($failed_items) {
581 120 100       375 if ($self->{on_limit} eq 'fail') {
582 49         11718 carp "Forks::Queue: queue buffer is full ",
583             "and $failed_items items were not added";
584             } else {
585 71 50       279 $self->_debug && print STDERR
586             "$$ $failed_items on put. Waiting for capacity\n";
587 71         651 $self->_wait_for_capacity;
588 71 50       207 $self->_debug && print STDERR "$$ got some capacity\n";
589 71         685 $pushed += $self->push(@deferred_items); }
590             }
591 475         15564 return $pushed;
592             }
593              
594             sub unshift {
595 0     0 1 0 my ($self,@items) = @_;
596 0         0 return $self->insert(0, @items);
597             }
598              
599             sub _SLEEP {
600 12003     12003   14403 my $self = shift;
601             # my $tid = threads->self;
602 12003   50     111976094 my $n = sleep($Forks::Queue::SLEEP_INTERVAL || 1);
603             #Carp::cluck("LONG SLEEP \$n=$n") if $n > 10;
604 12003         53124 return $n;
605             }
606              
607             sub _wait_for_item {
608 99     99   219 my ($self) = @_;
609 99         185 my $ready = 0;
610 99         204 do {
611 11980     11980   64131 _SYNC { $self->_read_header } $self;
  11980         26638  
612 11980   100     74086 $ready = $self->{_avail} || $self->{_end} || $self->_expired;
613 11980 100       21866 if (!$ready) {
614 11881         18522 _SLEEP($self); #sleep($Forks::Queue::SLEEP_INTERVAL||1)
615             }
616             } while !$ready;
617 99         358 return $self->{_avail};
618             }
619              
620             sub _wait_for_capacity {
621 71     71   182 my ($self) = @_;
622 71         159 my $ready = 0;
623 71         95 do {
624 137 50       544 if ($self->{limit} <= 0) {
625 0         0 $ready = 1;
626             } else {
627 137     137   1154 _SYNC { $self->_read_header } $self;
  137         532  
628 137   66     892 $ready = $self->{_avail} < $self->{limit} && !$self->{_end};
629 137 100       417 if (!$ready) {
630 66         173 _SLEEP($self); #sleep($Forks::Queue::SLEEP_INTERVAL || 1) if !$ready;
631             }
632             }
633             } while !$ready;
634 71         219 return $self->{_avail} < $self->{limit};
635             }
636              
637             sub dequeue {
638 25     25 1 27935 my $self = shift;
639 25 50       122 Forks::Queue::_validate_input($_[0],'count',1) if @_;
640 15 50       50 if ($self->{style} ne 'lifo') {
641 15 50       102 return @_ ? $self->_dequeue_front(@_) : $self->_dequeue_front;
642             } else {
643 0 0       0 return @_ ? $self->_dequeue_back(@_) : $self->_dequeue_back;
644             }
645             }
646              
647             sub _dequeue_back {
648 0     0   0 my $self = shift;
649 0 0 0     0 my $count = @_ ? $_[0] // 1 : 1;
650 0 0       0 if (! eval { $self->_check_pid; 1 } ) {
  0         0  
  0         0  
651 0         0 carp "Forks::Queue::pop operation failed: $@";
652 0         0 return;
653             }
654 0         0 my @return;
655 0         0 local $/ = EOL;
656 0         0 while (@return == 0) {
657             _SYNC {
658 0 0 0 0   0 return if $self->{_avail} < $count && !$self->{_end};
659 0         0 seek $self->{_fh}, $self->{_tell}, 0;
660 0         0 my $avail = $self->{_avail};
661 0         0 while ($avail > $count) {
662 0         0 scalar readline($self->{_fh});
663 0         0 $avail--;
664             }
665 0         0 my $spot = tell $self->{_fh};
666 0         0 @return = map dejsonize($_), readline($self->{_fh});
667 0         0 truncate $self->{_fh}, $spot;
668 0         0 $self->{_count} -= @return;
669 0         0 $self->_write_header;
670 0         0 } $self;
671 0 0 0     0 last if @return || $self->{_end} || $self->_expired;
      0        
672 0         0 _SLEEP($self); #sleep($Forks::Queue::SLEEP_INTERVAL || 1);
673             }
674 0 0       0 $self->_notify if @return;
675 0 0 0     0 if ($self->_expired && @return == 0) {
676 0 0       0 return @_ ? $self->pop_nb(@_) : $self->pop_nb;
677             }
678 0 0 0     0 return @_ ? @return : $return[0] // ();
679             }
680              
681             sub _dequeue_front {
682 15     15   28 my $self = shift;
683 15 50 50     53 my $count = @_ ? $_[0] // 1 : 1;
684 15 50       33 if (! eval { $self->_check_pid; 1 } ) {
  15         45  
  15         33  
685 0         0 carp "Forks::Queue::shift operation failed: $@";
686 0         0 return;
687             }
688 15         24 my @return;
689 15         95 local $/ = EOL;
690 15         65 while (@return == 0) {
691             _SYNC {
692 65     65   407 $self->_read_header;
693 65 100 66     455 return if $self->{_avail} < $count && !$self->{_end};
694 10         151 seek $self->{_fh}, $self->{_tell}, 0;
695 10   66     82 while (@return < $count && $self->{_avail} > 0) {
696 35         131 my $item = readline($self->{_fh});
697 35 50       77 if (!defined($item)) {
698 0         0 $self->_write_header;
699 0         0 return;
700             }
701 35         54 chomp($item);
702 35         48 eval {
703 35         63 CORE::push @return, dejsonize($item);
704             };
705 35 50       614 if ($@) {
706 0         0 $self->_write_header;
707 0         0 die "JSON was \"$item\", error was $@";
708             }
709 35         50 $self->{_pos}++;
710 35         53 $self->{_tell} = tell $self->{_fh};
711 35         96 $self->{_avail}--;
712             }
713 10 50 33     91 if ($self->{_maintenance_freq} &&
714             $self->{_pos} >= $self->{_maintenance_freq}) {
715              
716 0         0 $self->_maintain;
717             }
718 10         41 $self->_write_header;
719 65         1275 } $self;
720 65 100 66     1156 last if @return || $self->{_end} || $self->_expired;
      66        
721 50         346 _SLEEP($self); #sleep($Forks::Queue::SLEEP_INTERVAL || 1);
722             }
723 15 100       78 $self->_notify if @return;
724 15 100 66     93 if ($self->_expired && @return == 0) {
725 5 50       89 return @_ ? $self->shift_nb(@_) : $self->shift_nb;
726             }
727 10 50 0     113 return @_ ? @return : $return[0] // ();
728             }
729              
730             sub shift :method {
731 99     99 1 2000571 my ($self,$count) = @_;
732 99   100     653 $count ||= 1;
733 99 50       218 if (! eval { $self->_check_pid; 1 } ) {
  99         574  
  99         273  
734 0         0 carp "Forks::Queue::shift method failed: $@";
735 0         0 return;
736             }
737              
738 99         190 my @return;
739 99         424 while (@return == 0) {
740 99         173 my $h;
741 99 100       604 return if !$self->_wait_for_item;
742 90         328 local $/ = EOL;
743             _SYNC {
744 90     90   370 $self->_read_header;
745              
746 90         955 seek $self->{_fh}, $self->{_tell}, 0;
747 90   100     740 while (@return < $count && $self->{_avail} > 0) {
748 728         1928 my $item = readline($self->{_fh});
749 728 50       3656 if (defined($item)) {
750 728         1011 chomp($item);
751 728         1019 eval {
752 728         1092 CORE::push @return, dejsonize($item);
753             };
754 728 50       15501 if ($@) {
755 0         0 $self->_write_header;
756 0         0 die "JSON was \"$item\", error was $@";
757             }
758 728         922 $self->{_pos}++;
759 728         1205 $self->{_tell} = tell $self->{_fh};
760 728         2014 $self->{_avail}--;
761             }
762             }
763 90 100 66     520 if ($self->{_maintenance_freq} &&
764             $self->{_pos} >= $self->{_maintenance_freq}) {
765              
766 5         42 $self->_maintain;
767             }
768 90         316 $self->_write_header;
769 90         943 } $self;
770             }
771 90 50       507 $self->_notify if @return;
772 90 100 100     651 if (!wantarray && @_ < 2) {
773 48   33     310 return $return[0] // ();
774             } else {
775 42         287 return @return;
776             }
777             }
778              
779             sub shift_nb {
780 7     7 1 66 my ($self,$count) = @_;
781 7   100     35 $count ||= 1;
782 7 50       13 if (! eval { $self->_check_pid; 1 } ) {
  7         44  
  7         23  
783 0         0 carp "Forks::Queue::shift operation failed: $@";
784 0         0 return;
785             }
786              
787 7         17 my @return;
788             my $h;
789             #return if !$self->_wait_for_item;
790 7         178 local $/ = EOL;
791             _SYNC {
792 7     7   26 $self->_read_header;
793              
794 7         78 seek $self->{_fh}, $self->{_tell}, 0;
795 7   66     109 while (@return < $count && $self->{_avail} > 0) {
796 20         77 my $item = readline($self->{_fh});
797 20 50       41 if (!defined($item)) {
798 0         0 $self->_write_header;
799 0         0 return;
800             }
801 20         30 chomp($item);
802 20         28 eval {
803 20         44 CORE::push @return, dejsonize($item);
804             };
805 20 50       389 if ($@) {
806 0         0 die "JSON was \"$item\", error was $@";
807             }
808 20         25 $self->{_pos}++;
809 20         37 $self->{_tell} = tell $self->{_fh};
810 20         55 $self->{_avail}--;
811             }
812 7 50 33     38 if ($self->{_maintenance_freq} &&
813             $self->{_pos} >= $self->{_maintenance_freq}) {
814              
815 0         0 $self->_maintain;
816             }
817 7         33 $self->_write_header;
818 7         12 return;
819 7         143 } $self;
820 7 100       81 $self->_notify if @return;
821 7 100 66     43 if (!wantarray && @_ < 2) {
822 2   33     19 return $return[0] // ();
823             } else {
824 5         55 return @return;
825             }
826             }
827              
828             sub peek_front {
829 78     78 0 170 my ($self, $index) = @_;
830 78   100     178 $index ||= 0;
831 78 100       132 if ($index < 0) {
832 34         99 return $self->peek_back(-$index - 1);
833             }
834 44 50       62 if (! eval { $self->_check_pid; 1 } ) {
  44         100  
  44         82  
835 0         0 carp "Forks::Queue::peek operation failed: $@";
836 0         0 return;
837             }
838 44         63 my @return;
839 44         118 local $/ = EOL;
840              
841 44         63 my $h;
842 44     44   194 _SYNC { $self->_read_header } $self;
  44         109  
843 44 100       167 return if $self->{_avail} <= $index;
844              
845             _SYNC {
846 42     42   108 $self->_read_header;
847              
848 42         651 seek $self->{_fh}, $self->{_tell}, 0;
849 42         86 my $item;
850 42         99 while ($index-- >= 0) {
851 344         995 $item = readline($self->{_fh});
852 344 50       603 if (!defined($item)) {
853 0         0 return;
854             }
855             }
856 42         69 chomp($item);
857              
858 42         81 CORE::push @return, dejsonize($item);
859 42         235 } $self;
860 42         332 return $return[0];
861             }
862              
863             sub peek_back {
864 40     40 0 83 my ($self, $index) = @_;
865 40   100     113 $index ||= 0;
866 40 50       81 if ($index < 0) {
867 0         0 return $self->peek_front(-$index - 1);
868             }
869 40 50       54 if (! eval { $self->_check_pid; 1 } ) {
  40         94  
  40         79  
870 0         0 carp "Forks::Queue::peek operation failed: $@";
871 0         0 return;
872             }
873 40         73 my $count = $index + 1;
874 40         108 local $/ = EOL;
875 40         74 my @return;
876              
877             my $h;
878             _SYNC {
879 40     40   105 $self->_read_header;
880 40 100       101 return if $self->{_avail} <= $index;
881              
882 35         324 seek $self->{_fh}, $self->{_tell}, 0;
883 35         81 my $pos = $self->{_pos};
884 35         116 while ($pos + $count < $self->{_count}) {
885 504         1324 scalar readline($self->{_fh});
886 504         4081 $pos++;
887             }
888 35         67 my $item = readline($self->{_fh});
889 35         52 chomp($item);
890 35         155 @return = dejsonize($item);
891 40         686 } $self;
892 40         418 return $return[0];
893             }
894              
895             sub extract {
896 52     52 1 37983 my $self = shift;
897 52 100       253 Forks::Queue::_validate_input( $_[0], 'index' ) if @_;
898 46   100     147 my $index = shift || 0;
899 46 100       153 Forks::Queue::_validate_input( $_[0], 'count', 1) if @_;
900            
901 36   100     104 my $count = $_[0] // 1;
902             # my $count = @_ ? shift : 1;
903 36 100       102 if ($self->{style} eq 'lifo') {
904 18         21 $index = -1 - $index;
905 18         24 $index -= $count - 1;
906             }
907 36         145 local $/ = EOL;
908 36         45 my @return;
909             _SYNCWA {
910 36     36   89 $self->_read_header;
911 36         53 my $n = $self->{_avail};
912 36 50       125 if ($count <= 0) {
913 0         0 carp "Forks::Queue::extract: count must be positive";
914 0         0 return;
915             }
916 36 100       66 if ($index < 0) {
917 18         21 $index = $index + $n;
918 18 100       37 if ($index < 0) {
919 8         38 $count += $index;
920 8         14 $index = 0;
921             }
922             }
923 36 100 100     109 if ($count <= 0 || $index >= $n) {
924 8         16 return;
925             }
926 28 100       63 if ($index + $count >= $n) {
927 6         41 $count = $n - $index;
928             }
929              
930 28         311 seek $self->{_fh}, $self->{_tell}, 0;
931 28         475 scalar readline($self->{_fh}) for 0..$index-1; # skip
932 28         69 my $save = tell $self->{_fh};
933             @return = map {
934 28         64 my $item = readline($self->{_fh});
  84         1723  
935 84         169 chomp($item);
936 84         135 $self->{_avail}--;
937 84         114 $self->{_count}--;
938 84         146 dejsonize($item);
939             } 1..$count;
940 28         1042 my @buffer = readline($self->{_fh});
941 28         217 seek $self->{_fh}, $save, 0;
942 28         82 print {$self->{_fh}} @buffer;
  28         322  
943 28         492 truncate $self->{_fh}, tell $self->{_fh};
944 28         89 $self->_write_header;
945 36         233 } $self;
946 36 100       383 $self->_notify if @return;
947 36 100 66     244 return @_ ? @return : $return[0] // ();
948             }
949              
950             sub insert {
951 28     28 1 9606 my ($self, $pos, @items) = @_;
952 28         107 Forks::Queue::_validate_input( $pos, 'index' );
953 20 50       33 if (! eval { $self->_check_pid; 1 } ) {
  20         56  
  20         42  
954 0         0 carp "Forks::Queue::insert operation failed: $@";
955 0         0 return;
956             }
957 20         64 local $/ = EOL;
958 20         28 my $nitems = @items;
959 20         28 my (@deferred_items, $failed_items);
960 20         24 my $inserted = 0;
961             _SYNC {
962 20     20   51 $self->_read_header;
963 20 50       45 if ($self->{_end}) {
964             carp "Forks::Queue::insert call from process $$ ",
965 0         0 "after end call from process ", $self->{_end}, "!";
966 0         0 return 0;
967             }
968 20 50       44 if ($self->{limit} > 0) {
969 20         42 my $failed_items = $self->{_avail} + @items - $self->{limit};
970 20 100       47 if ($failed_items > 0) {
971 4         12 @deferred_items = splice @items, -$failed_items;
972 4 50       10 if (@items == 0) {
973 0         0 return;
974             }
975             } else {
976 16         24 $failed_items = 0;
977             }
978             }
979              
980 20 100       36 if ($pos < 0) {
981 8         12 $pos += $self->{_avail};
982             }
983 20 100       45 if ($pos >= $self->{_avail}) {
984             # insert at end of queue (append)
985 4         40 seek $self->{_fh}, 0, 2;
986 4 50       95 if (tell($self->{_fh}) < $self->{_tell}) {
987 0         0 Carp::cluck("funny seek");
988 0         0 seek $self->{_fh}, $self->{_tell}, 0;
989             }
990 4         9 foreach my $item (@items) {
991 16         19 print {$self->{_fh}} jsonize($item),EOL;
  16         79  
992 16         418 $self->{_count}++;
993 16         22 $self->{_avail}++;
994 16         20 $inserted++;
995 16 50       31 $self->_debug && print STDERR
996             "$$ insert item $inserted/",0+@items,"\n";
997             }
998 4         14 $self->_write_header;
999 4         11 return;
1000             }
1001 16 100       26 if ($pos < 0) {
1002 4         6 $pos = 0;
1003             }
1004 16         149 seek $self->{_fh}, $self->{_tell}, 0;
1005 16         48 while ($pos > 0) {
1006 64         147 scalar readline($self->{_fh});
1007 64         100 $pos--;
1008             }
1009 16         33 my $save = tell($self->{_fh});
1010 16         164 my @buffer = readline($self->{_fh});
1011 16         110 seek $self->{_fh}, $save, 0;
1012 16         38 foreach my $item (@items) {
1013 48         54 print {$self->{_fh}} jsonize($item),EOL;
  48         148  
1014 48         1222 $self->{_count}++;
1015 48         60 $self->{_avail}++;
1016 48         80 $inserted++;
1017 48 50       94 $self->_debug && print STDERR
1018             "$$ insert item $inserted/",0+@items,"\n";
1019             }
1020 16         60 print {$self->{_fh}} @buffer;
  16         437  
1021 16         65 $self->_write_header;
1022 20         182 } $self;
1023 20 50       8327 if ($failed_items) {
1024 0 0       0 if ($self->{on_limit} eq 'fail') {
1025 0         0 carp "Forks::Queue: queue buffer is full ",
1026             "and $failed_items items were not inserted";
1027             } else {
1028 0 0       0 $self->_debug && print STDERR
1029             "$$ $failed_items on insert. Waiting for capacity\n";
1030 0         0 $self->_wait_for_capacity;
1031 0 0       0 $self->_debug && print STDERR "$$ got some capacity\n";
1032 0         0 $inserted += $self->insert($pos+$inserted, @deferred_items);
1033             }
1034             }
1035 20 50       72 $self->_notify if $inserted;
1036 20         128 return $inserted;
1037             }
1038              
1039             sub pop {
1040 22     22 1 5340 my ($self,$count) = @_;
1041 22   100     221 $count ||= 1;
1042 22 50       46 if (! eval { $self->_check_pid; 1 } ) {
  22         75  
  22         74  
1043 0         0 carp "Forks::Queue::pop operation failed: $@";
1044 0         0 return;
1045             }
1046 22         145 local $/ = EOL;
1047 22         42 my @return;
1048 22         53 while (@return == 0) {
1049 22         31 my $h;
1050             do {
1051 28     28   234 _SYNC { $self->_read_header } $self;
  28         230  
1052             } while (!$self->{_avail} && !$self->{_end} &&
1053 22   66     32 1 + _SLEEP($self)); #sleep($Forks::Queue::SLEEP_INTERVAL || 1));
      66        
1054              
1055 22 50 66     116 return if $self->{_end} && !$self->{_avail};
1056              
1057             _SYNC {
1058 22     22   70 $self->_read_header;
1059 22         289 seek $self->{_fh}, $self->{_tell}, 0;
1060 22 100       92 if ($self->{_avail} <= $count) {
1061 5         101 my @items = readline($self->{_fh});
1062 5         319 chomp(@items);
1063 5         18 @return = map dejsonize($_), @items;
1064 5         194 truncate $self->{_fh}, $self->{_tell};
1065 5         27 $self->{_count} -= @items;
1066             } else {
1067 17         77 my $pos = $self->{_pos};
1068 17         62 while ($pos + $count < $self->{_count}) {
1069 136         350 scalar readline($self->{_fh});
1070 136         528 $pos++;
1071             }
1072 17         38 my $eof = tell $self->{_fh};
1073 17         151 my @items = readline($self->{_fh});
1074 17         425 truncate $self->{_fh}, $eof;
1075 17         51 $self->{_count} -= @items;
1076 17         37 chomp(@items);
1077 17         57 @return = map dejsonize($_), @items;
1078             }
1079 22         461 $self->_write_header;
1080 22         174 } $self;
1081             }
1082 22 50       152 $self->_notify if @return;
1083 22 100 66     95 if (!wantarray && @_ < 2) {
1084 2         14 return $return[0];
1085             } else {
1086 20         96 return @return;
1087             }
1088             }
1089              
1090             sub pop_nb {
1091 4     4 1 32 my ($self,$count) = @_;
1092 4   50     53 $count ||= 1;
1093 4 50       6 if (! eval { $self->_check_pid; 1 } ) {
  4         60  
  4         14  
1094 0         0 carp "Forks::Queue::pop operation failed: $@";
1095 0         0 return;
1096             }
1097 4         46 local $/ = EOL;
1098 4         8 my @return;
1099             my $h;
1100 4     4   68 _SYNC { $self->_read_header } $self;
  4         32  
1101 4 50 33     32 return if $self->{_end} && !$self->{_avail};
1102              
1103             _SYNC {
1104 4     4   10 $self->_read_header;
1105              
1106 4         40 seek $self->{_fh}, $self->{_tell}, 0;
1107 4 50       59 if ($self->{_avail} <= $count) {
1108 4         36 my @items = readline($self->{_fh});
1109 4         12 chomp(@items);
1110 4         7 @return = map dejsonize($_), @items;
1111 4         68 truncate $self->{_fh}, $self->{_tell};
1112 4         11 $self->{_count} -= @items;
1113 4         29 $self->_write_header;
1114 4         9 return;
1115             }
1116              
1117 0         0 my $pos = $self->{_pos};
1118 0         0 while ($pos + $count < $self->{_count}) {
1119 0         0 scalar readline($self->{_fh});
1120 0         0 $pos++;
1121             }
1122 0         0 my $eof = tell $self->{_fh};
1123 0         0 my @items = readline($self->{_fh});
1124 0         0 truncate $self->{_fh}, $eof;
1125 0         0 $self->{_count} -= @items;
1126 0         0 chomp(@items);
1127 0         0 @return = map dejsonize($_), @items;
1128 0         0 $self->_write_header;
1129 0         0 return;
1130 4         51 } $self;
1131 4 50       33 $self->_notify if @return;
1132 4 50 33     35 if (!wantarray && @_ < 2) {
1133 4         21 return $return[0];
1134             } else {
1135 0         0 return @return;
1136             }
1137             }
1138              
1139             # MagicLimit: a tie class to allow $q->limit to work as an lvalue
1140              
1141             sub Forks::Queue::File::MagicLimit::TIESCALAR {
1142 15     15   345 my ($pkg,$obj) = @_;
1143 15         204 return bless \$obj,$pkg;
1144             }
1145              
1146             sub Forks::Queue::File::MagicLimit::FETCH {
1147 344     344   1192 return ${$_[0]}->{limit};
  344         1944  
1148             }
1149              
1150             sub Forks::Queue::File::MagicLimit::STORE {
1151 15     15   104 my ($tie,$val) = @_;
1152 15         40 my $queue = $$tie;
1153 15         38 my $oldval = $queue->{limit};
1154 15         29 $queue->{limit} = $val;
1155 15     15   170 _SYNC { $queue->_write_header } $queue;
  15         43  
1156 15         73 return $oldval;
1157             }
1158              
1159             sub limit :lvalue {
1160 149     149 1 9010941 my $self = shift;
1161 149 50       548 if (! eval { $self->_check_pid; 1 } ) {
  149         978  
  149         372  
1162 0         0 carp "Forke::Queue::limit operation failed: $@";
1163 0         0 return;
1164             }
1165 149 100       771 if (!$self->{_limit_magic}) {
1166 15         330 tie $self->{_limit_magic},'Forks::Queue::File::MagicLimit', $self;
1167             }
1168 149     149   1525 _SYNC { $self->_read_header } $self;
  149         586  
1169 149 100       531 if (@_) {
1170 43         95 $self->{limit} = shift @_;
1171 43 100       109 if (@_) {
1172 28         64 $self->{on_limit} = shift @_;
1173             }
1174 43     43   409 _SYNC { $self->_write_header } $self;
  43         118  
1175             }
1176 149         735 return $self->{_limit_magic};
1177             }
1178              
1179             sub _debug {
1180 18030   33 18030   54689 shift->{debug} // $Forks::Queue::DEBUG;
1181             }
1182              
1183             sub _DUMP {
1184 0     0   0 my ($self,$fh_dump) = @_;
1185 0   0     0 $fh_dump ||= *STDERR;
1186 0         0 open my $fh_qdata, '<', $self->{file};
1187 0         0 print {$fh_dump} <$fh_qdata>;
  0         0  
1188 0         0 close $fh_qdata;
1189             }
1190              
1191             my $id = 0;
1192             sub _impute_file {
1193 51     51   221 my $base = $0;
1194 51         424 $base =~ s{.*[/\\](.)}{$1};
1195 51         217 $base =~ s{[/\\]$}{};
1196 51         107 $id++;
1197 51         114 my $file;
1198             my @candidates;
1199 51 50       279 if ($^O eq 'MSWin32') {
1200 0         0 @candidates = (qw(C:/Temp C:/Windows/Temp));
1201             } else {
1202 51         186 @candidates = qw(/tmp /var/tmp);
1203             }
1204              
1205             # try hard to avoid using an NFS drive
1206 51         449 for my $candidate ($ENV{FORKS_QUEUE_DIR},
1207             $ENV{TMPDIR}, $ENV{TEMP},
1208             $ENV{TMP}, @candidates,
1209             $ENV{HOME}, ".") {
1210 255 100       528 next if !defined($candidate);
1211 51 50 33     1694 if (-d $candidate && -w _ && -x _) {
      33        
1212 51   33     727 $file //= "$candidate/.fq-$$-$id-base";
1213 51 50       241 next if Forks::Queue::Util::__is_nfs($candidate);
1214 40         395 ${^_nfs} = 0;
1215 40         2760 return "$candidate/.fq-$$-$id-$base";
1216             }
1217             }
1218              
1219 0           ${^_nfs} = 1;
1220 0           carp "Forks::Queue::File: queue file $file might be on an NFS filesystem!";
1221 0           return $file;
1222             }
1223              
1224             1;
1225              
1226             =head1 NAME
1227              
1228             Forks::Queue::File - file-based implementation of Forks::Queue
1229              
1230             =head1 VERSION
1231              
1232             0.13
1233              
1234             =head1 SYNOPSIS
1235              
1236             my $q = Forks::Queue::File->new( file => "queue-file" );
1237             $q->put( "job1" );
1238             $q->put( { name => "job2", task => "do something", data => [42,19] } );
1239             ...
1240             $q->end;
1241             for my $w (1 .. $num_workers) {
1242             if (fork() == 0) {
1243             my $task;
1244             while (defined($task = $q->get)) {
1245             ... perform task in child ...
1246             }
1247             exit;
1248             }
1249             }
1250              
1251             =head1 METHODS
1252              
1253             See L for an overview of the methods supported by
1254             this C implementation.
1255              
1256             =head2 new
1257              
1258             =head2 $queue = Forks::Queue::File->new( %opts )
1259              
1260             =head2 $queue = Forks::Queue->new( impl => 'File', %opts )
1261              
1262             The C constructor recognized the following configuration
1263             options.
1264              
1265             =over 4
1266              
1267             =item * file
1268              
1269             The name of the file to use to score queue data and metadata.
1270             If omitted, a temporary filename is chosen.
1271              
1272             It is strongly recommended not to use a file that would reside on an
1273             NFS filesystem, since these filesystems have notorious difficulty
1274             with synchronizing files across processes.
1275              
1276             =item * style
1277              
1278             =item * limit
1279              
1280             =item * on_limit
1281              
1282             =item * join
1283              
1284             =item * persist
1285              
1286             See L for descriptions of these options.
1287              
1288             =item * debug
1289              
1290             Boolean value to enable or disable debugging on this queue,
1291             overriding the value in C<$Forks::Queue::DEBUG>.
1292              
1293             =item * dflock
1294              
1295             Boolean value to enable directory-based alternative to flock
1296             for synchronization of the queue across processeses. The module
1297             will generally be able to guess whether this flag should be
1298             set by default.
1299              
1300             =back
1301              
1302             =head1 BUGS AND LIMITATIONS
1303              
1304             As with anything that requires C, you should avoid allowing the
1305             queue file to reside on an NFS drive.
1306              
1307             =head1 LICENSE AND COPYRIGHT
1308              
1309             Copyright (c) 2017-2019, Marty O'Brien.
1310              
1311             This library is free software; you can redistribute it and/or modify
1312             it under the same terms as Perl itself, either Perl version 5.10.1 or,
1313             at your option, any later version of Perl 5 you may have available.
1314              
1315             See http://dev.perl.org/licenses/ for more information.
1316              
1317             =cut