File Coverage

blib/lib/Forks/Queue/File.pm
Criterion Covered Total %
statement 673 852 78.9
branch 220 392 56.1
condition 95 204 46.5
subroutine 76 81 93.8
pod 15 19 78.9
total 1079 1548 69.7


line stmt bran cond sub pod time code
1             package Forks::Queue::File;
2 100     100   1154465 use strict;
  100         241  
  100         2985  
3 100     100   512 use warnings;
  100         214  
  100         2739  
4 100     100   496 use Carp;
  100         175  
  100         5456  
5 100     100   61042 use JSON;
  100         788342  
  100         539  
6 100     100   13018 use Time::HiRes;
  100         2589  
  100         482  
7 100     100   6804 use base 'Forks::Queue';
  100         229  
  100         31152  
8 100     100   1729 use 5.010; # sorry, v5.08. I love the // //= operators too much
  100         349  
9              
10             our $VERSION = '0.14';
11             our $DEBUG;
12             *DEBUG = \$Forks::Queue::DEBUG;
13              
14             $SIG{IO} = sub { } if $Forks::Queue::NOTIFY_OK;
15              
16              
17             # prefer functional JSON calls because we still want to use JSON
18             # during global destruction, and a JSON object might not be available
19             # then
20             sub jsonize {
21 4050     4050 0 16820 JSON::to_json($_[0], { allow_nonref=>1, ascii=>1 } );
22             }
23              
24             sub dejsonize {
25 17444     17444 0 78177 JSON::from_json($_[0], { allow_nonref => 1, ascii => 1 } );
26             }
27              
28             # if we exercise firm control over line endings,
29             # we won't have any DOS vs Unix vs Mac fights.
30 100     100   917 use constant EOL => "\x{0a}";
  100         188  
  100         78161  
31             # Anything that can't be a valid JSON substring is ok to use here
32              
33             sub _lock {
34             # a file based queue generally lends itself to file based
35             # advisory locking, though it doesn't work on Solaris with threads.
36             # The generic _lock and _unlock functions can support other
37             # schemes.
38              
39            
40 16437     16437   24370 my $self = shift;
41 16437 50       31158 if ($self->{_locked}) {
42 0         0 Carp::cluck "$$ acquiring lock but already have lock";
43 0         0 return;
44             }
45 16437   33     40446 my $_DEBUG = $self->{debug} // $DEBUG;
46              
47 16437         84051 local $! = 0;
48 16437 100       43974 if ($self->{_lockdir}) {
    50          
49 243         1032 my $z = Dir::Flock::lock($self->{_lockdir});
50 243 50       676 $_DEBUG && print STDERR ">> flock_dir lock by ".
51             _PID() . " z=$z \!$=$!\n";
52 243 50       687 if (!$z) {
53 0         0 carp "Forks::Queue: lock queue by flock_dir failed: $!";
54             }
55             } elsif ($self->{lock}) {
56             # file-based advisory file locking with flock
57             # Doesn't work across threads in Solaris, since fcntl implementation
58             # passes the process id but not the thread id to the locking
59             # functions.
60            
61 16194         563105 open my $lockfh, ">>", $self->{lock};
62 16194         323455 my $z = flock $lockfh, 2;
63 16194   66     70472 while (!$z && $Forks::Queue::NOTIFY_OK && $!{EINTR}) {
      33        
64             # SIGIO can interrupt flock
65 139         31948 $z = flock $lockfh, 2;
66             }
67 16194         34336 $self->{lockfh} = $lockfh;
68 16194 50       40596 $_DEBUG && print STDERR ">> flock lock by " . _PID() . "\n";
69             }
70 16437         84123 $self->{_locked} = 1;
71             }
72              
73             sub _unlock {
74 16437     16437   25563 my $self = shift;
75 16437         23678 $self->{_locked} = 0;
76 16437   33     42359 my $_DEBUG = $self->{debug} // $DEBUG;
77 16437         73805 local $! = 0;
78 16437 100       47670 if ($self->{_lockdir}) {
    50          
79 243         757 my $z = Dir::Flock::unlock($self->{_lockdir});
80 243 50       681 $_DEBUG && print STDERR "<< flock_dir unlock by " . _PID() . " z=$z\n";
81             } elsif ($self->{lockfh}) {
82 16194         204900 my $z = close delete $self->{lockfh};
83 16194 50       71686 $_DEBUG && print STDERR "<< flock unlock by " . _PID() . " z=$z\n";
84             }
85 16437         49366 return;
86             }
87              
88              
89              
90              
91             # execute a block of code in a way where only one
92             # thread/process can be executing code for this queue
93             sub _SYNC (&$) {
94 16401     16401   31604 my ($block,$self) = @_;
95 16401 50       428255 return if Forks::Queue::__inGD();
96 16401   33     62707 my $_DEBUG = $self->{debug} // $DEBUG;
97              
98 16401         45912 $self->_lock;
99 16401         35960 my $result = $block->($self);
100 16401         46550 $self->_unlock;
101 16401         40892 return $result;
102             }
103              
104             sub _SYNCWA (&$) { # wantarray version of _SYNC
105 36     36   71 my ($block,$self) = @_;
106 36   33     124 my $_DEBUG = $self->{debug} // $DEBUG;
107              
108 36         137 $self->_lock;
109 36         91 my @result = $block->($self);
110 36         117 $self->_unlock;
111 36         80 return @result;
112             }
113              
114             sub _PID {
115 1942 50   1942   15410 $INC{'threads.pm'} ? join("-", $$, threads->tid) : $$
116             }
117              
118             sub new {
119 104     104 1 9359 my $class = shift;
120 104         940 my %opts = (%Forks::Queue::OPTS, @_);
121              
122 104         399 ${^_nfs} = 0;
123 104   100     906 $opts{file} //= _impute_file();
124 93         1421 $opts{lock} = $opts{file} . ".lock";
125 93         628 my $list = delete $opts{list};
126              
127             # my $fh;
128              
129 93   50     2152 $opts{_header_size} //= 2048;
130 93         546 $opts{_end} = 0; # whether "end" has been called for this obj
131 93         437 $opts{_pos} = 0; # "cursor", index of next item to shift out
132 93         558 $opts{_tell} = $opts{_header_size}; # file position of cursor
133              
134 93         347 $opts{_count} = 0; # index of next item to be appended
135 93         1330 $opts{_pids} = { _PID() => 'P' };
136 93         942 $opts{_version} = $Forks::Queue::VERSION;
137 93         1725 $opts{_qid} = Forks::Queue::Util::QID();
138              
139             # how often to refactor the queue file. use small values to keep file
140             # sizes small and large values to improve performance
141 93   50     1173 $opts{_maintenance_freq} //= 128;
142              
143              
144            
145              
146 93 50       18471 open my $fh1, '>>', $opts{lock} or die;
147 93 50       2053 close $fh1 or die;
148              
149 93         3687 my $self = bless { %opts }, $class;
150              
151             # Normal flock can not be used with multi-threaded solaris,
152             # and may be flaky with files on NFS directories.
153 93 50       2684 if ($^O eq 'solaris') {
    50          
    50          
154 0   0     0 $opts{dflock} //= 1;
155             } elsif (${^_nfs}) {
156 0   0     0 $opts{dflock} //= 1;
157             } elsif (Forks::Queue::Util::__is_nfs( $opts{file} )) {
158 0   0     0 $opts{dflock} //= 1;
159             }
160              
161 67 100       1843 if ($opts{dflock}) {
162             # Dir::Flock (included in this distribution) provides a safer
163             # (if more cumbersome) advisory locking method to synchronize
164             # the queue.
165 100     100   743 no warnings 'numeric';
  100         207  
  100         70777  
166 11         8050 require Dir::Flock;
167 11         196 $self->{_lockdir} = Dir::Flock::getDir( $opts{lock} );
168 11         64 $Dir::Flock::HEARTBEAT_CHECK = 5;
169 11         38 $Dir::Flock::PAUSE_LENGTH = 0.01;
170             }
171              
172              
173 67 100 66     1615 if ($opts{join} && -f $opts{file}) {
174 2         61 $DB::single = 1;
175 2 50       205 open my $fh2, '+<', $opts{file} or die;
176 2         44 $self->{_fh} = *$fh2;
177 2         49 my $fhx = select $fh2; $| = 1; select $fhx;
  2         37  
  2         32  
178 2     2   77 _SYNC { $self->_read_header } $self;
  2         58  
179             } else {
180 65 100       2168 if (-f $opts{file}) {
181 4         2392 carp "Forks::Queue: Queue file $opts{file} already exists. ",
182             "Expect trouble if another process created this file.";
183 4         240 unlink $opts{file};
184             }
185 65 50       8964 open my $fh3, '>', $opts{file} or die;
186 65 50       1428 close $fh3 or die;
187              
188 65 50       3356 open my $fh4, '+<', $opts{file} or die;
189 65         1469 my $fhx = select $fh4; $| = 1; select $fhx;
  65         863  
  65         995  
190 65         3323 $self->{_fh} = *$fh4;
191 65         1088 seek $fh4, 0, 0;
192              
193 65         659 $self->{_locked}++;
194 65         1924 $self->_write_header;
195 65         240 $self->{_locked}--;
196 65 50       730 if (tell($fh4) < $self->{_header_size}) {
197 65         1953 print $fh4 "\0" x ($self->{_header_size} - tell($fh4));
198             }
199             }
200 67 100       496 if (defined($list)) {
201 6 50       38 if (ref($list) eq 'ARRAY') {
202 6         322 $self->push( @$list );
203             } else {
204 0         0 carp "Forks::Queue::new: 'list' option must be an array ref";
205             }
206             }
207              
208 67         6404 return $self;
209             }
210              
211              
212             sub DESTROY {
213 85     85   15385797 my $self = shift;
214 85         758 my $pid = _PID();
215 85   33     1140 my $_DEBUG = $self->{debug} // $DEBUG;
216 85 50       521 $_DEBUG && print STDERR "$pid DESTROY called\n";
217 85         599 $self->{_DESTROY}++;
218 85 50       3235 if (Forks::Queue::__inGD()) {
219 0         0 $self->{_locked} = -1;
220 0 0       0 if (my $h = $self->_read_header) {
221 0 0       0 $_DEBUG && print STDERR "$pid DESTROY header at GD: $h\n";
222 0         0 my $role = delete $self->{_pids}{$pid};
223 0 0 0     0 if ($role && $role eq 'P') {
224 0         0 $self->{_pids} = {};
225 0 0       0 $_DEBUG && print STDERR "$pid DESTROY role=P\n";
226 0         0 $self->_write_header;
227             }
228             }
229 0         0 delete $self->{_locked};
230             } else {
231 85         212 eval {
232             _SYNC {
233 85 50   85   421 if ($self->_read_header) {
234             $_DEBUG and print STDERR
235             "$pid DESTROY: pids at destruction: ",
236 85 50       359 join(" ",keys %{$self->{_pids}}),"\n";
  0         0  
237 85         351 delete $self->{_pids}{$pid};
238 85         447 $self->_write_header;
239 85 50       573 $_DEBUG and print STDERR "$pid DESTROY header updated.\n";
240             } else {
241 0 0       0 $_DEBUG and print STDERR
242             "$$ DESTROY: header not available\n";
243             }
244 85         1691 } $self;
245 85 50       744 $_DEBUG && print STDERR
246             "$pid DESTROY final header read complete\n";
247             };
248 85 50       311 if ($@) {
249 0 0       0 if ($@ !~ /malformed JSON ...* at character offset 0/) {
    0          
250 100     100   3252 use Data::Dumper;
  100         20485  
  100         38488  
251 0         0 print STDERR Dumper($@,$self);
252             } elsif ($_DEBUG) {
253 0         0 print STDERR "$pid DESTROY error reading header: $@";
254             }
255             }
256             }
257 85 50       1248 $self->{_fh} && close $self->{_fh};
258             $_DEBUG and print STDERR "$pid DESTROY: remaining pids: ",
259 85 50       476 join(" ",keys %{$self->{_pids}}),"\n";
  0         0  
260 85 100 66     948 if ($self->{_pids} && 0 == keys %{$self->{_pids}}) {
  85         7072  
261 54 50       180 $_DEBUG and print STDERR "$$ Unlinking files from here\n";
262 54         128 my $u2 = -1;
263 54         2673 my $u1 = unlink $self->{lock};
264 54 100       1937 $u2 = unlink $self->{file} unless $self->{persist};
265 54 50       291 $_DEBUG and print STDERR
266             "$$ DESTROY unlink results $u1/$u2 $self->{lock} $self->{file}\n";
267 54 50       1923 $_DEBUG and print STDERR
268             "$$ DESTROY: unlink time " . Time::HiRes::time . "\n";
269             }
270             }
271              
272             # the key to a shared file acting as a queue is the header,
273             # which holds the queue metadata like the file position of
274             # the current front and back of the queue, and the identifiers
275             # of processes that are using the queue.
276             #
277             # this function should only be called from inside a _SYNC block.
278              
279             sub _read_header {
280 16379     16379   31184 my ($self) = @_;
281 16379 50       32092 Carp::cluck "unsafe _read_header" unless $self->{_locked};
282 16379         76706 local $/ = EOL;
283 16379         33344 my $_DEBUG = $self->_debug;
284 16379         25027 my $h = "";
285 16379 100       30944 if ($self->{_DESTROY}) {
286 100     100   782 no warnings 'closed';
  100         208  
  100         46825  
287 85         1124 seek $self->{_fh}, 0, 0;
288 85   50     1874 $h = readline($self->{_fh}) // "";
289 85 50       434 $_DEBUG && print STDERR
290             "$$ Read ",length($h)," bytes from header during DESTROY\n";
291             } else {
292 16294         68200 local $! = 0;
293 16294 50       195799 if (seek $self->{_fh}, 0, 0) {
294 16294         217446 $h = readline($self->{_fh});
295             } else {
296 0         0 Carp::cluck "_read_header: invalid seek $!";
297 0         0 return;
298             }
299             }
300 16379 50       49301 if (!$h) {
301 0 0       0 if ($self->{_DESTROY}) {
302 0 0       0 $_DEBUG && print STDERR "$$ in DESTROY and header not found\n";
303 0         0 return;
304             }
305 0         0 Carp::cluck "_read_header: header not found";
306             }
307 16379         28997 chomp($h);
308 16379         33873 $h = dejsonize($h);
309 16379         524553 $self->{_pos} = $h->{index};
310 16379         25946 $self->{_end} = $h->{end};
311 16379         22014 $self->{_tell} = $h->{tell};
312 16379         21145 $self->{_count} = $h->{count};
313 16379         23291 $self->{_header_size} = $h->{headerSize};
314 16379         23897 $self->{_maintenance_freq} = $h->{maintFreq};
315 16379         24027 $self->{_version} = $h->{version};
316 16379         28350 $self->{_pids} = $h->{pids};
317 16379   100     52895 $self->{_lockdir} = $h->{lockdir} || undef;
318 16379 50       33077 $self->{limit} = $h->{limit} if $h->{limit};
319              
320 16379 50       28717 $_DEBUG && print STDERR "$$ read header\n";
321              
322 16379         31051 $h->{avail} = $self->{_avail} = $h->{count} - $h->{index}; # not written
323 16379         54230 return $h;
324             }
325              
326             sub _write_header {
327 1005     1005   2573 my ($self) = @_;
328 1005 50       2922 Carp::cluck "unsafe _write_header" unless $self->{_locked};
329 1005   33     5277 my $_DEBUG = $self->{debug} // $DEBUG;
330             my $header = { index => $self->{_pos}, end => $self->{_end},
331             tell => $self->{_tell}, count => $self->{_count},
332             limit => $self->{limit},
333             pids => $self->{_pids},
334             qid => $self->{_qid},
335             headerSize => $self->{_header_size},
336             maintFreq => $self->{_maintenance_freq},
337             ($self->{_lockdir} ? (lockdir => $self->{_lockdir}) : ()),
338 1005 100       12545 version => $self->{_version} };
339              
340 1005         3633 my $headerstr = jsonize($header);
341 1005         40636 while (length($headerstr) >= $self->{_header_size}) {
342 0         0 $self->_increase_header_size(length($headerstr) + 32);
343 0         0 $header->{tell} = $self->{_tell};
344 0         0 $headerstr = jsonize($header);
345             }
346              
347 1005         2211 eval {
348 100     100   733 no warnings;
  100         251  
  100         652885  
349 1005         9596 seek $self->{_fh}, 0, 0;
350 1005         2834 print {$self->{_fh}} $headerstr,EOL;
  1005         57499  
351 1005 50       8210 $_DEBUG && print STDERR "$$ updated header $headerstr\n";
352             };
353             }
354              
355             sub _notify {
356 660 50   660   1920 return unless $Forks::Queue::NOTIFY_OK;
357              
358 660         1215 my $self = shift;
359 660   33     3143 my $_DEBUG = $self->{debug} // $DEBUG;
360 660     660   4624 _SYNC { $self->_read_header } $self;
  660         2315  
361 660         2353 my @ids = keys %{$self->{_pids}};
  660         3311  
362 660         1454 my (@pids,@tids);
363 660         1540 my $me = _PID();
364 660 50       1760 $_DEBUG && print STDERR "$$ _notify \$me=$me \@ids=@ids\n";
365 660         2126 foreach my $id (@ids) {
366 1336         4423 my ($p,$t) = split /-/,$id;
367 1336 50       2782 if (!$p) {
368 0         0 ($p,$t) = (-$t,0);
369             }
370 1336 100 33     5598 if ($p != $$) {
    50          
371 676         1891 push @pids, $p;
372             } elsif (defined($t) && $id ne $me) {
373 0         0 push @tids, $t;
374             }
375             }
376 660 50       1763 if (@tids) {
377 0 0       0 $_DEBUG && print STDERR "$$ notify: tid @tids\n";
378 0         0 foreach my $tid (@tids) {
379 0         0 my $thr = threads->object($tid);
380 0 0       0 if ($thr) {
    0          
381 0         0 my $z7;
382 0 0 0     0 $thr && ($z7 = $thr->kill('IO')) &&
      0        
383             $_DEBUG && print STDERR
384             "$$ _notify to tid $$-$tid \$z7=$z7\n";
385 0 0       0 if ($tid ne $tids[-1]) {
386             #Time::HiRes::sleep 0.25;
387             }
388              
389             # $thr->kill is not reliable?
390            
391             } elsif ($tid == 0) {
392 0 0       0 $_DEBUG && print STDERR "$$ _notify SIGIO to tid main\n";
393 0         0 kill 'IO', $$;
394             } else {
395 0 0       0 $_DEBUG && print STDERR
396             "$$ _notify failed to SIGIO tid $tid\n";
397             }
398             }
399             }
400 660 100       1792 if (@pids) {
401 438 50       1099 $_DEBUG && print STDERR "$$ _notify to pids @pids\n";
402 438         23399 kill 'IO', @pids;
403             }
404             }
405              
406             sub clear {
407 62     62 1 18657 my $self = shift;
408 62 50       142 if (! eval { $self->_check_pid; 1 } ) {
  62         211  
  62         173  
409 0         0 carp("File::Queue::clear operation failed: $@");
410 0         0 return;
411             }
412             _SYNC {
413 62     62   355 $self->_read_header;
414 62         172 $self->{_pos} = 0;
415 62         176 $self->{_tell} = $self->{_header_size};
416 62         103 $self->{_count} = 0;
417 62         1869 truncate $self->{_fh}, $self->{_tell};
418 62         320 $self->_write_header;
419 62         575 } $self;
420             }
421              
422             sub end {
423 19     19 1 30781822 my ($self) = @_;
424 19 50       83 if (! eval { $self->_check_pid; 1 } ) {
  19         113  
  19         105  
425 0         0 carp "Forks::Queue::end operation failed: $@";
426 0         0 return;
427             }
428             _SYNC {
429 19     19   137 $self->_read_header;
430 19 50       137 if ($self->{_end}) {
431 0         0 carp "Forks::Queue: end() called from $$, ",
432             "previously called from $self->{_end}";
433             } else {
434 19         80 $self->{_end} = _PID();
435             }
436 19         89 $self->_write_header;
437 19         595 } $self;
438 19         191 $self->_notify;
439 19         82 return;
440             }
441              
442             sub status {
443 338     338 1 60023870 my ($self) = @_;
444 338     338   3755 my $status = _SYNC { $self->_read_header } $self;
  338         1488  
445 338         2408 $status->{file} = $self->{file};
446 338         4249 $status->{filesize} = -s $self->{_fh};
447 338         964 $status->{end} = $self->{_end};
448 338         1316 return $status;
449             }
450              
451             sub _check_pid {
452 986     986   2669 my ($self) = @_;
453 986   33     7307 my $_DEBUG = $self->{debug} // $DEBUG;
454 986 100       4766 if (!defined $self->{_pids}{_PID()}) {
455 28 50       1341 if ($Forks::Queue::NOTIFY_OK) {
456 28 50       473 if (_PID() =~ /.-[1-9]/) {
457             # SIGIO can't be reliably passed to threads, so can't
458             # rely on long sleep command being interrupted
459 0         0 $Forks::Queue::SLEEP_INTERVAL = 1;
460             }
461 28     270   2150 $SIG{IO} = sub { };
462             }
463 28         4205 my $ostatus = open $self->{_fh}, '+<', $self->{file};
464 28         539 for (1..5) {
465 28 50       493 last if $ostatus;
466 0         0 sleep int(sqrt($_));
467 0         0 $ostatus = open $self->{_fh}, '+<', $self->{file};
468             }
469 28 50       269 if (!$ostatus) {
470 0         0 Carp::confess("Forks::Queue::check_pid: ",
471             "Could not open $self->{file} after 5 tries: $!");
472 0         0 return;
473             }
474 28 50       328 if ($self->{_locked}) {
475 0 0       0 $_DEBUG && print STDERR
476             "Forks::Queue: $$ new pid update header\n";
477 0         0 $self->{_pids}{_PID()} = 'C';
478 0         0 $self->_write_header;
479 0         0 return;
480             } else {
481 28 50       338 $_DEBUG and print STDERR "Forks::Queue: $$ new pid sync\n";
482             _SYNC {
483 28     28   652 $self->_read_header;
484 28         240 $self->{_pids}{_PID()} = 'C';
485 28         388 $self->_write_header;
486 28         843 } $self;
487 28         206 return;
488             }
489             }
490 958         3009 return;
491             }
492              
493             sub _increase_header_size {
494 0     0   0 my ($self,$min_size) = @_;
495             # assumes $self has been updated by $self->_read_header recently
496 0 0       0 return if $min_size <= $self->{_header_size};
497              
498 0         0 local $/ = EOL;
499 0         0 my $delta = $min_size - $self->{_header_size};
500 0         0 seek $self->{_fh}, $self->{_header_size}, 0;
501 0         0 my @data = readline($self->{_fh});
502 0         0 seek $self->{_fh}, 0, 0;
503 0         0 print {$self->{_fh}} "\0" x $min_size;
  0         0  
504 0         0 print {$self->{_fh}} @data;
  0         0  
505 0         0 $self->{_header_size} = $min_size;
506 0         0 $self->{_tell} += $delta;
507 0         0 return;
508             }
509              
510             sub _maintain {
511 5     5   9 my ($self) = @_;
512             # assumes $self has been updated by $self->_read_header recently
513              
514 5         15 my $delta = $self->{_tell} - $self->{_header_size};
515 5 50       11 return if $delta == 0;
516 5         17 local $/ = EOL;
517 5         46 seek $self->{_fh}, $self->{_tell}, 0;
518 5         117 my @data = readline($self->{_fh});
519 5         34 seek $self->{_fh}, $self->{_header_size}, 0;
520 5         15 print {$self->{_fh}} @data;
  5         53  
521 5         84 truncate $self->{_fh}, tell($self->{_fh});
522              
523 5         14 $self->{_avail} = $self->{_count} = @data;
524 5         18 $self->{_pos} = 0;
525 5         8 $self->{_tell} = $self->{_header_size};
526 5         24 return;
527             }
528              
529             sub push {
530 475     475 1 2738 my ($self,@items) = @_;
531 475 50       1416 if (! eval { $self->_check_pid; 1 } ) {
  475         2987  
  475         1734  
532 0         0 carp "Forks::Queue::put call from process $$ failed: $@";
533 0         0 return;
534             }
535              
536 475         1186 my (@deferred_items,$failed_items);
537 475         879 my $pushed = 0;
538             _SYNC {
539 475     355   2458 $self->_read_header;
540 475 100       1541 if ($self->{_end}) {
541             carp "Forks::Queue: put call from process $$ ",
542 5         1608 "after end call from process ", $self->{_end}, "!";
543 5         600 return 0;
544             }
545              
546             # put: add whatever items there is room for
547             # enqueue: add all items if there is room for one item
548 470 100       1691 if ($self->{limit} > 0) {
549 159 100       396 if ($Forks::Queue::File::_ENQUEUE) {
550 4 50       13 if ($self->{_avail} >= $self->{limit}) {
551 0         0 $failed_items = @deferred_items = @items;
552 0         0 @items = ();
553             }
554             } else {
555 155         486 $failed_items = $self->{_avail} + @items - $self->{limit};
556 155 100       465 if ($failed_items > 0) {
557 120         513 @deferred_items = splice @items, -$failed_items;
558 120 100       403 if (@items == 0) {
559 4         12 return;
560             }
561             } else {
562 35         97 $failed_items = 0;
563             }
564             }
565             }
566              
567 466 50       1488 if (@items > 0) {
568 466         5768 seek $self->{_fh}, 0, 2;
569 466 50       2287 if (tell($self->{_fh}) < $self->{_tell}) {
570 0         0 Carp::cluck "funny seek";
571 0         0 seek $self->{_fh}, $self->{_tell}, 0;
572             }
573 466         1200 foreach my $item (@items) {
574 2981         5648 my $json = jsonize($item);
575 2981         58211 print {$self->{_fh}} $json,EOL;
  2981         32240  
576 2981         10004 $self->{_count}++;
577 2981         3968 $self->{_avail}++;
578 2981         4254 $pushed++;
579 2981 50       6248 $self->_debug && print STDERR
580             "$$ put item [$json] $pushed/",0+@items,"\n";
581             }
582             }
583 466         1667 $self->_write_header;
584 475         7076 } $self;
585 475 50 66     7286 if ($pushed && $self->_debug) {
586 0         0 print STDERR "_notify from push(\$pushed=$pushed)\n";
587             }
588 475 100       2397 $self->_notify if $pushed;
589              
590 475 100       1818 if ($failed_items) {
591 120 100       423 if ($self->{on_limit} eq 'fail') {
592 49         13885 carp "Forks::Queue: queue buffer is full ",
593             "and $failed_items items were not added";
594             } else {
595 71 50       298 $self->_debug && print STDERR
596             "$$ $failed_items on put. Waiting for capacity\n";
597 71         531 $self->_wait_for_capacity;
598 71 50       238 $self->_debug && print STDERR "$$ got some capacity\n";
599 71         1365 return $pushed + $self->push(@deferred_items);
600             }
601             }
602 404         9773 return $pushed;
603             }
604              
605             sub enqueue {
606 4     4 1 3691 undef $Forks::Queue::File::_ENQUEUE;
607 4         28 local $Forks::Queue::File::_ENQUEUE = 1;
608 4         36 return Forks::Queue::File::push(@_);
609             }
610              
611             sub unshift {
612 0     0 1 0 my ($self,@items) = @_;
613 0         0 return $self->insert(0, @items);
614             }
615              
616             sub _SLEEP {
617 14006     14006   18479 my $self = shift;
618             # my $tid = threads->self;
619 14006   50     111119291 my $n = sleep($Forks::Queue::SLEEP_INTERVAL || 1);
620             #Carp::cluck("LONG SLEEP \$n=$n") if $n > 10;
621 14006         64385 return $n;
622             }
623              
624             sub _wait_for_item {
625 99     99   266 my ($self) = @_;
626 99         197 my $ready = 0;
627 99         198 do {
628 13983     13983   77106 _SYNC { $self->_read_header } $self;
  13983         34231  
629 13983   100     88919 $ready = $self->{_avail} || $self->{_end} || $self->_expired;
630 13983 100       27778 if (!$ready) {
631 13884         26132 _SLEEP($self); #sleep($Forks::Queue::SLEEP_INTERVAL||1)
632             }
633             } while !$ready;
634 99         439 return $self->{_avail};
635             }
636              
637             sub _wait_for_capacity {
638 71     71   288 my ($self) = @_;
639 71         152 my $ready = 0;
640 71         129 do {
641 142 50       712 if ($self->{limit} <= 0) {
642 0         0 $ready = 1;
643             } else {
644 142     142   1586 _SYNC { $self->_read_header } $self;
  142         604  
645 142   66     1422 $ready = $self->{_avail} < $self->{limit} && !$self->{_end};
646 142 100       540 if (!$ready) {
647 71         337 _SLEEP($self); #sleep($Forks::Queue::SLEEP_INTERVAL || 1) if !$ready;
648             }
649             }
650             } while !$ready;
651 71         259 return $self->{_avail} < $self->{limit};
652             }
653              
654             sub dequeue {
655 25     25 1 6916 my $self = shift;
656 25 50       157 Forks::Queue::_validate_input($_[0],'count',1) if @_;
657 15 50       55 if ($self->{style} ne 'lifo') {
658 15 50       146 return @_ ? $self->_dequeue_front(@_) : $self->_dequeue_front;
659             } else {
660 0 0       0 return @_ ? $self->_dequeue_back(@_) : $self->_dequeue_back;
661             }
662             }
663              
664             sub _dequeue_back {
665 0     0   0 my $self = shift;
666 0 0 0     0 my $count = @_ ? $_[0] // 1 : 1;
667 0 0       0 if (! eval { $self->_check_pid; 1 } ) {
  0         0  
  0         0  
668 0         0 carp "Forks::Queue::pop operation failed: $@";
669 0         0 return;
670             }
671 0 0 0     0 if ($self->limit > 0 && $count > $self->limit) {
672             # error message compatible with Thread::Queue
673 0         0 croak "dequeue: 'count' argument ($count) exceeds queue size limit (",
674             $self->limit, ")";
675             }
676 0         0 my @return;
677 0         0 local $/ = EOL;
678 0         0 while (@return == 0) {
679             _SYNC {
680 0 0 0 0   0 return if $self->{_avail} < $count && !$self->{_end};
681 0         0 seek $self->{_fh}, $self->{_tell}, 0;
682 0         0 my $avail = $self->{_avail};
683 0         0 while ($avail > $count) {
684 0         0 scalar readline($self->{_fh});
685 0         0 $avail--;
686             }
687 0         0 my $spot = tell $self->{_fh};
688 0         0 @return = map dejsonize($_), readline($self->{_fh});
689 0         0 truncate $self->{_fh}, $spot;
690 0         0 $self->{_count} -= @return;
691 0         0 $self->_write_header;
692 0         0 } $self;
693 0 0 0     0 last if @return || $self->{_end} || $self->_expired;
      0        
694 0         0 _SLEEP($self); #sleep($Forks::Queue::SLEEP_INTERVAL || 1);
695             }
696 0 0       0 $self->_notify if @return;
697 0 0 0     0 if ($self->_expired && @return == 0) {
698 0 0       0 return @_ ? $self->pop_nb(@_) : $self->pop_nb;
699             }
700 0 0 0     0 return @_ ? @return : $return[0] // ();
701             }
702              
703             sub _dequeue_front {
704 15     15   37 my $self = shift;
705 15 50 50     65 my $count = @_ ? $_[0] // 1 : 1;
706 15 50       24 if (! eval { $self->_check_pid; 1 } ) {
  15         55  
  15         44  
707 0         0 carp "Forks::Queue::shift operation failed: $@";
708 0         0 return;
709             }
710 15 50 33     69 if ($self->limit > 0 && $count > $self->limit) {
711             # error message compatible with Thread::Queue
712 0         0 croak "dequeue: 'count' argument ($count) exceeds queue size limit (",
713             $self->limit, ")";
714             }
715 15         35 my @return;
716 15         54 local $/ = EOL;
717 15         42 while (@return == 0) {
718             _SYNC {
719 60     60   464 $self->_read_header;
720 60 100 66     583 return if $self->{_avail} < $count && !$self->{_end};
721 10         131 seek $self->{_fh}, $self->{_tell}, 0;
722 10   66     197 while (@return < $count && $self->{_avail} > 0) {
723 35         157 my $item = readline($self->{_fh});
724 35 50       95 if (!defined($item)) {
725 0         0 $self->_write_header;
726 0         0 return;
727             }
728 35         89 chomp($item);
729 35         51 eval {
730 35         72 CORE::push @return, dejsonize($item);
731             };
732 35 50       684 if ($@) {
733 0         0 $self->_write_header;
734 0         0 die "JSON was \"$item\", error was $@";
735             }
736 35         67 $self->{_pos}++;
737 35         74 $self->{_tell} = tell $self->{_fh};
738 35         116 $self->{_avail}--;
739             }
740 10 50 33     66 if ($self->{_maintenance_freq} &&
741             $self->{_pos} >= $self->{_maintenance_freq}) {
742              
743 0         0 $self->_maintain;
744             }
745 10         49 $self->_write_header;
746 60         1350 } $self;
747 60 100 66     1194 last if @return || $self->{_end} || $self->_expired;
      66        
748 45         171 _SLEEP($self); #sleep($Forks::Queue::SLEEP_INTERVAL || 1);
749             }
750 15 100       94 $self->_notify if @return;
751 15 100 66     104 if ($self->_expired && @return == 0) {
752 5 50       110 return @_ ? $self->shift_nb(@_) : $self->shift_nb;
753             }
754 10 50 0     138 return @_ ? @return : $return[0] // ();
755             }
756              
757             sub shift :method {
758 99     99 1 2000827 my ($self,$count) = @_;
759 99   100     844 $count ||= 1;
760 99 50       283 if (! eval { $self->_check_pid; 1 } ) {
  99         740  
  99         359  
761 0         0 carp "Forks::Queue::shift method failed: $@";
762 0         0 return;
763             }
764              
765 99         234 my @return;
766 99         499 while (@return == 0) {
767 99         203 my $h;
768 99 100       655 return if !$self->_wait_for_item;
769 90         361 local $/ = EOL;
770             _SYNC {
771 90     90   386 $self->_read_header;
772              
773 90         1183 seek $self->{_fh}, $self->{_tell}, 0;
774 90   100     980 while (@return < $count && $self->{_avail} > 0) {
775 728         2294 my $item = readline($self->{_fh});
776 728 50       1333 if (defined($item)) {
777 728         1012 chomp($item);
778 728         914 eval {
779 728         1248 CORE::push @return, dejsonize($item);
780             };
781 728 50       13286 if ($@) {
782 0         0 $self->_write_header;
783 0         0 die "JSON was \"$item\", error was $@";
784             }
785 728         1066 $self->{_pos}++;
786 728         1228 $self->{_tell} = tell $self->{_fh};
787 728         2347 $self->{_avail}--;
788             }
789             }
790 90 100 66     521 if ($self->{_maintenance_freq} &&
791             $self->{_pos} >= $self->{_maintenance_freq}) {
792              
793 5         30 $self->_maintain;
794             }
795 90         356 $self->_write_header;
796 90         1112 } $self;
797             }
798 90 50       582 $self->_notify if @return;
799 90 100 100     750 if (!wantarray && @_ < 2) {
800 48   33     443 return $return[0] // ();
801             } else {
802 42         323 return @return;
803             }
804             }
805              
806             sub shift_nb {
807 7     7 1 73 my ($self,$count) = @_;
808 7   100     44 $count ||= 1;
809 7 50       19 if (! eval { $self->_check_pid; 1 } ) {
  7         32  
  7         41  
810 0         0 carp "Forks::Queue::shift operation failed: $@";
811 0         0 return;
812             }
813              
814 7         18 my @return;
815             my $h;
816             #return if !$self->_wait_for_item;
817 7         34 local $/ = EOL;
818             _SYNC {
819 7     7   27 $self->_read_header;
820              
821 7         87 seek $self->{_fh}, $self->{_tell}, 0;
822 7   66     146 while (@return < $count && $self->{_avail} > 0) {
823 20         85 my $item = readline($self->{_fh});
824 20 50       54 if (!defined($item)) {
825 0         0 $self->_write_header;
826 0         0 return;
827             }
828 20         41 chomp($item);
829 20         25 eval {
830 20         77 CORE::push @return, dejsonize($item);
831             };
832 20 50       485 if ($@) {
833 0         0 die "JSON was \"$item\", error was $@";
834             }
835 20         27 $self->{_pos}++;
836 20         48 $self->{_tell} = tell $self->{_fh};
837 20         98 $self->{_avail}--;
838             }
839 7 50 33     61 if ($self->{_maintenance_freq} &&
840             $self->{_pos} >= $self->{_maintenance_freq}) {
841              
842 0         0 $self->_maintain;
843             }
844 7         33 $self->_write_header;
845 7         20 return;
846 7         166 } $self;
847 7 100       103 $self->_notify if @return;
848 7 100 66     60 if (!wantarray && @_ < 2) {
849 2   33     16 return $return[0] // ();
850             } else {
851 5         69 return @return;
852             }
853             }
854              
855             sub peek_front {
856 71     71 0 1398 my ($self, $index) = @_;
857 71   100     219 $index ||= 0;
858 71 100       198 if ($index < 0) {
859 20         66 return $self->peek_back(-$index - 1);
860             }
861 51 50       85 if (! eval { $self->_check_pid; 1 } ) {
  51         143  
  51         129  
862 0         0 carp "Forks::Queue::peek operation failed: $@";
863 0         0 return;
864             }
865 51         85 my @return;
866 51         160 local $/ = EOL;
867              
868 51         85 my $h;
869 51     51   255 _SYNC { $self->_read_header } $self;
  51         139  
870 51 100       281 return if $self->{_avail} <= $index;
871              
872             _SYNC {
873 49     49   150 $self->_read_header;
874              
875 49         563 seek $self->{_fh}, $self->{_tell}, 0;
876 49         115 my $item;
877 49         147 while ($index-- >= 0) {
878 351         829 $item = readline($self->{_fh});
879 351 50       723 if (!defined($item)) {
880 0         0 return;
881             }
882             }
883 49         103 chomp($item);
884              
885 49         115 CORE::push @return, dejsonize($item);
886 49         312 } $self;
887 49         438 return $return[0];
888             }
889              
890             sub peek_back {
891 58     58 0 133 my ($self, $index) = @_;
892 58   100     192 $index ||= 0;
893 58 100       120 if ($index < 0) {
894 10         28 return $self->peek_front(-$index - 1);
895             }
896 48 50       70 if (! eval { $self->_check_pid; 1 } ) {
  48         135  
  48         106  
897 0         0 carp "Forks::Queue::peek operation failed: $@";
898 0         0 return;
899             }
900 48         82 my $count = $index + 1;
901 48         159 local $/ = EOL;
902 48         75 my @return;
903              
904             my $h;
905             _SYNC {
906 48     48   136 $self->_read_header;
907 48 100       135 return if $self->{_avail} <= $index;
908              
909 43         484 seek $self->{_fh}, $self->{_tell}, 0;
910 43         113 my $pos = $self->{_pos};
911 43         128 while ($pos + $count < $self->{_count}) {
912 528         965 scalar readline($self->{_fh});
913 528         939 $pos++;
914             }
915 43         109 my $item = readline($self->{_fh});
916 43         72 chomp($item);
917 43         188 @return = dejsonize($item);
918 48         305 } $self;
919 48         569 return $return[0];
920             }
921              
922             sub extract {
923 52     52 1 21799 my $self = shift;
924 52 100       255 Forks::Queue::_validate_input( $_[0], 'index' ) if @_;
925 46   100     136 my $index = shift || 0;
926 46 100       139 Forks::Queue::_validate_input( $_[0], 'count', 1) if @_;
927            
928 36   100     97 my $count = $_[0] // 1;
929             # my $count = @_ ? shift : 1;
930 36 100       93 if ($self->{style} eq 'lifo') {
931 18         34 $index = -1 - $index;
932 18         27 $index -= $count - 1;
933             }
934 36         142 local $/ = EOL;
935 36         57 my @return;
936             _SYNCWA {
937 36     36   109 $self->_read_header;
938 36         72 my $n = $self->{_avail};
939 36 50       81 if ($count <= 0) {
940 0         0 carp "Forks::Queue::extract: count must be positive";
941 0         0 return;
942             }
943 36 100       73 if ($index < 0) {
944 18         30 $index = $index + $n;
945 18 100       38 if ($index < 0) {
946 8         14 $count += $index;
947 8         14 $index = 0;
948             }
949             }
950 36 100 100     134 if ($count <= 0 || $index >= $n) {
951 8         20 return;
952             }
953 28 100       63 if ($index + $count >= $n) {
954 6         10 $count = $n - $index;
955             }
956              
957 28         322 seek $self->{_fh}, $self->{_tell}, 0;
958 28         516 scalar readline($self->{_fh}) for 0..$index-1; # skip
959 28         80 my $save = tell $self->{_fh};
960             @return = map {
961 28         73 my $item = readline($self->{_fh});
  84         1151  
962 84         171 chomp($item);
963 84         113 $self->{_avail}--;
964 84         108 $self->{_count}--;
965 84         178 dejsonize($item);
966             } 1..$count;
967 28         1041 my @buffer = readline($self->{_fh});
968 28         226 seek $self->{_fh}, $save, 0;
969 28         61 print {$self->{_fh}} @buffer;
  28         333  
970 28         572 truncate $self->{_fh}, tell $self->{_fh};
971 28         109 $self->_write_header;
972 36         262 } $self;
973 36 100       393 $self->_notify if @return;
974 36 100 66     257 return @_ ? @return : $return[0] // ();
975             }
976              
977             sub insert {
978 28     28 1 5054 my ($self, $pos, @items) = @_;
979 28         132 Forks::Queue::_validate_input( $pos, 'index' );
980 20 50       32 if (! eval { $self->_check_pid; 1 } ) {
  20         76  
  20         44  
981 0         0 carp "Forks::Queue::insert operation failed: $@";
982 0         0 return;
983             }
984 20         74 local $/ = EOL;
985 20         42 my $nitems = @items;
986 20         41 my (@deferred_items, $failed_items);
987 20         31 my $inserted = 0;
988             _SYNC {
989 20     20   62 $self->_read_header;
990 20 50       55 if ($self->{_end}) {
991             carp "Forks::Queue::insert call from process $$ ",
992 0         0 "after end call from process ", $self->{_end}, "!";
993 0         0 return 0;
994             }
995 20 50 33     93 if ($self->{on_limit} ne "tq-compat" && $self->{limit} > 0) {
996 20         48 my $failed_items = $self->{_avail} + @items - $self->{limit};
997 20 100       46 if ($failed_items > 0) {
998 4         18 @deferred_items = splice @items, -$failed_items;
999 4 50       12 if (@items == 0) {
1000 0         0 return;
1001             }
1002             } else {
1003 16         24 $failed_items = 0;
1004             }
1005             }
1006              
1007 20 100       44 if ($pos < 0) {
1008 8         14 $pos += $self->{_avail};
1009             }
1010 20 100       42 if ($pos >= $self->{_avail}) {
1011             # insert at end of queue (append)
1012 4         47 seek $self->{_fh}, 0, 2;
1013 4 50       36 if (tell($self->{_fh}) < $self->{_tell}) {
1014 0         0 Carp::cluck("funny seek");
1015 0         0 seek $self->{_fh}, $self->{_tell}, 0;
1016             }
1017 4         10 foreach my $item (@items) {
1018 16         24 print {$self->{_fh}} jsonize($item),EOL;
  16         55  
1019 16         490 $self->{_count}++;
1020 16         25 $self->{_avail}++;
1021 16         23 $inserted++;
1022 16 50       38 $self->_debug && print STDERR
1023             "$$ insert item $inserted/",0+@items,"\n";
1024             }
1025 4         15 $self->_write_header;
1026 4         10 return;
1027             }
1028 16 100       33 if ($pos < 0) {
1029 4         7 $pos = 0;
1030             }
1031 16         183 seek $self->{_fh}, $self->{_tell}, 0;
1032 16         54 while ($pos > 0) {
1033 64         184 scalar readline($self->{_fh});
1034 64         112 $pos--;
1035             }
1036 16         40 my $save = tell($self->{_fh});
1037 16         188 my @buffer = readline($self->{_fh});
1038 16         124 seek $self->{_fh}, $save, 0;
1039 16         45 foreach my $item (@items) {
1040 48         70 print {$self->{_fh}} jsonize($item),EOL;
  48         189  
1041 48         1532 $self->{_count}++;
1042 48         80 $self->{_avail}++;
1043 48         64 $inserted++;
1044 48 50       103 $self->_debug && print STDERR
1045             "$$ insert item $inserted/",0+@items,"\n";
1046             }
1047 16         26 print {$self->{_fh}} @buffer;
  16         154  
1048 16         69 $self->_write_header;
1049 20         165 } $self;
1050 20 50       279 if ($failed_items) {
1051 0 0       0 if ($self->{on_limit} eq 'fail') {
1052 0         0 carp "Forks::Queue: queue buffer is full ",
1053             "and $failed_items items were not inserted";
1054             } else {
1055 0 0       0 $self->_debug && print STDERR
1056             "$$ $failed_items on insert. Waiting for capacity\n";
1057 0         0 $self->_wait_for_capacity;
1058 0 0       0 $self->_debug && print STDERR "$$ got some capacity\n";
1059 0         0 return $inserted + $self->insert($pos+$inserted, @deferred_items);
1060             }
1061             }
1062 20 50       79 $self->_notify if $inserted;
1063 20         151 return $inserted;
1064             }
1065              
1066             sub pop {
1067 22     22 1 5434 my ($self,$count) = @_;
1068 22   100     121 $count ||= 1;
1069 22 50       36 if (! eval { $self->_check_pid; 1 } ) {
  22         88  
  22         60  
1070 0         0 carp "Forks::Queue::pop operation failed: $@";
1071 0         0 return;
1072             }
1073 22         90 local $/ = EOL;
1074 22         64 my @return;
1075 22         99 while (@return == 0) {
1076 22         34 my $h;
1077             do {
1078 28     28   258 _SYNC { $self->_read_header } $self;
  28         117  
1079             } while (!$self->{_avail} && !$self->{_end} &&
1080 22   66     32 1 + _SLEEP($self)); #sleep($Forks::Queue::SLEEP_INTERVAL || 1));
      66        
1081              
1082 22 50 66     139 return if $self->{_end} && !$self->{_avail};
1083              
1084             _SYNC {
1085 22     22   99 $self->_read_header;
1086 22         270 seek $self->{_fh}, $self->{_tell}, 0;
1087 22 100       99 if ($self->{_avail} <= $count) {
1088 5         91 my @items = readline($self->{_fh});
1089 5         22 chomp(@items);
1090 5         21 @return = map dejsonize($_), @items;
1091 5         207 truncate $self->{_fh}, $self->{_tell};
1092 5         31 $self->{_count} -= @items;
1093             } else {
1094 17         44 my $pos = $self->{_pos};
1095 17         83 while ($pos + $count < $self->{_count}) {
1096 136         311 scalar readline($self->{_fh});
1097 136         605 $pos++;
1098             }
1099 17         43 my $eof = tell $self->{_fh};
1100 17         163 my @items = readline($self->{_fh});
1101 17         469 truncate $self->{_fh}, $eof;
1102 17         64 $self->{_count} -= @items;
1103 17         42 chomp(@items);
1104 17         116 @return = map dejsonize($_), @items;
1105             }
1106 22         505 $self->_write_header;
1107 22         208 } $self;
1108             }
1109 22 50       120 $self->_notify if @return;
1110 22 100 66     147 if (!wantarray && @_ < 2) {
1111 2         20 return $return[0];
1112             } else {
1113 20         113 return @return;
1114             }
1115             }
1116              
1117             sub pop_nb {
1118 4     4 1 18 my ($self,$count) = @_;
1119 4   50     46 $count ||= 1;
1120 4 50       7 if (! eval { $self->_check_pid; 1 } ) {
  4         80  
  4         11  
1121 0         0 carp "Forks::Queue::pop operation failed: $@";
1122 0         0 return;
1123             }
1124 4         64 local $/ = EOL;
1125 4         19 my @return;
1126             my $h;
1127 4     4   81 _SYNC { $self->_read_header } $self;
  4         24  
1128 4 50 33     80 return if $self->{_end} && !$self->{_avail};
1129              
1130             _SYNC {
1131 4     4   15 $self->_read_header;
1132              
1133 4         51 seek $self->{_fh}, $self->{_tell}, 0;
1134 4 50       17 if ($self->{_avail} <= $count) {
1135 4         37 my @items = readline($self->{_fh});
1136 4         11 chomp(@items);
1137 4         9 @return = map dejsonize($_), @items;
1138 4         85 truncate $self->{_fh}, $self->{_tell};
1139 4         15 $self->{_count} -= @items;
1140 4         33 $self->_write_header;
1141 4         10 return;
1142             }
1143              
1144 0         0 my $pos = $self->{_pos};
1145 0         0 while ($pos + $count < $self->{_count}) {
1146 0         0 scalar readline($self->{_fh});
1147 0         0 $pos++;
1148             }
1149 0         0 my $eof = tell $self->{_fh};
1150 0         0 my @items = readline($self->{_fh});
1151 0         0 truncate $self->{_fh}, $eof;
1152 0         0 $self->{_count} -= @items;
1153 0         0 chomp(@items);
1154 0         0 @return = map dejsonize($_), @items;
1155 0         0 $self->_write_header;
1156 0         0 return;
1157 4         49 } $self;
1158 4 50       49 $self->_notify if @return;
1159 4 50 33     44 if (!wantarray && @_ < 2) {
1160 4         25 return $return[0];
1161             } else {
1162 0         0 return @return;
1163             }
1164             }
1165              
1166             # MagicLimit: a tie class to allow $q->limit to work as an lvalue
1167              
1168             sub Forks::Queue::File::MagicLimit::TIESCALAR {
1169 20     20   189 my ($pkg,$obj) = @_;
1170 20         210 return bless \$obj,$pkg;
1171             }
1172              
1173             sub Forks::Queue::File::MagicLimit::FETCH {
1174 379     379   1535 return ${$_[0]}->{limit};
  379         2774  
1175             }
1176              
1177             sub Forks::Queue::File::MagicLimit::STORE {
1178 15     15   108 my ($tie,$val) = @_;
1179 15         57 my $queue = $$tie;
1180 15         41 my $oldval = $queue->{limit};
1181 15         37 $queue->{limit} = $val;
1182 15     15   196 _SYNC { $queue->_write_header } $queue;
  15         53  
1183 15         97 return $oldval;
1184             }
1185              
1186             sub limit :lvalue {
1187 164     164 1 9002869 my $self = shift;
1188 164 50       828 if (! eval { $self->_check_pid; 1 } ) {
  164         1723  
  164         484  
1189 0         0 carp "Forke::Queue::limit operation failed: $@";
1190 0         0 return;
1191             }
1192 164 100       989 if (!$self->{_limit_magic}) {
1193 20         615 tie $self->{_limit_magic},'Forks::Queue::File::MagicLimit', $self;
1194             }
1195 164     164   1809 _SYNC { $self->_read_header } $self;
  164         856  
1196 164 100       815 if (@_) {
1197 43         126 $self->{limit} = shift @_;
1198 43 100       135 if (@_) {
1199 28         233 $self->{on_limit} = shift @_;
1200             }
1201 43     43   415 _SYNC { $self->_write_header } $self;
  43         154  
1202             }
1203 164         1005 return $self->{_limit_magic};
1204             }
1205              
1206             sub _debug {
1207 20032   33 20032   63585 shift->{debug} // $Forks::Queue::DEBUG;
1208             }
1209              
1210             sub _DUMP {
1211 0     0   0 my ($self,$fh_dump) = @_;
1212 0   0     0 $fh_dump ||= *STDERR;
1213 0         0 open my $fh_qdata, '<', $self->{file};
1214 0         0 print {$fh_dump} <$fh_qdata>;
  0         0  
1215 0         0 close $fh_qdata;
1216             }
1217              
1218             my $id = 0;
1219             sub _impute_file {
1220 51     51   210 my $base = $0;
1221 51         440 $base =~ s{.*[/\\](.)}{$1};
1222 51         217 $base =~ s{[/\\]$}{};
1223 51         141 $id++;
1224 51         106 my $file;
1225             my @candidates;
1226 51 50       348 if ($^O eq 'MSWin32') {
1227 0         0 @candidates = (qw(C:/Temp C:/Windows/Temp));
1228             } else {
1229 51         221 @candidates = qw(/tmp /var/tmp);
1230             }
1231              
1232             # try hard to avoid using an NFS drive
1233 51         571 for my $candidate ($ENV{FORKS_QUEUE_DIR},
1234             $ENV{TMPDIR}, $ENV{TEMP},
1235             $ENV{TMP}, @candidates,
1236             $ENV{HOME}, ".") {
1237 255 100       579 next if !defined($candidate);
1238 51 50 33     1905 if (-d $candidate && -w _ && -x _) {
      33        
1239 51   33     870 $file //= "$candidate/.fq-$$-$id-base";
1240 51 50       290 next if Forks::Queue::Util::__is_nfs($candidate);
1241 40         582 ${^_nfs} = 0;
1242 40         3288 return "$candidate/.fq-$$-$id-$base";
1243             }
1244             }
1245              
1246 0           ${^_nfs} = 1;
1247 0           carp "Forks::Queue::File: queue file $file might be on an NFS filesystem!";
1248 0           return $file;
1249             }
1250              
1251             1;
1252              
1253             =head1 NAME
1254              
1255             Forks::Queue::File - file-based implementation of Forks::Queue
1256              
1257             =head1 VERSION
1258              
1259             0.14
1260              
1261             =head1 SYNOPSIS
1262              
1263             my $q = Forks::Queue::File->new( file => "queue-file" );
1264             $q->put( "job1" );
1265             $q->put( { name => "job2", task => "do something", data => [42,19] } );
1266             ...
1267             $q->end;
1268             for my $w (1 .. $num_workers) {
1269             if (fork() == 0) {
1270             my $task;
1271             while (defined($task = $q->get)) {
1272             ... perform task in child ...
1273             }
1274             exit;
1275             }
1276             }
1277              
1278             =head1 METHODS
1279              
1280             See L for an overview of the methods supported by
1281             this C implementation.
1282              
1283             =head2 new
1284              
1285             =head2 $queue = Forks::Queue::File->new( %opts )
1286              
1287             =head2 $queue = Forks::Queue->new( impl => 'File', %opts )
1288              
1289             The C constructor recognized the following configuration
1290             options.
1291              
1292             =over 4
1293              
1294             =item * file
1295              
1296             The name of the file to use to score queue data and metadata.
1297             If omitted, a temporary filename is chosen.
1298              
1299             It is strongly recommended not to use a file that would reside on an
1300             NFS filesystem, since these filesystems have notorious difficulty
1301             with synchronizing files across processes.
1302              
1303             =item * style
1304              
1305             =item * limit
1306              
1307             =item * on_limit
1308              
1309             =item * join
1310              
1311             =item * persist
1312              
1313             See L for descriptions of these options.
1314              
1315             =item * debug
1316              
1317             Boolean value to enable or disable debugging on this queue,
1318             overriding the value in C<$Forks::Queue::DEBUG>.
1319              
1320             =item * dflock
1321              
1322             Boolean value to enable directory-based alternative to flock
1323             for synchronization of the queue across processeses. The module
1324             will often be able to guess whether this flag should be
1325             set by default, but it should be used explicitly in some cases
1326             such as sharing a queue over processes on different hosts
1327             accessing a shared, networked filesystem.
1328              
1329             =back
1330              
1331             =head1 BUGS AND LIMITATIONS
1332              
1333             As with anything that requires C, you should avoid allowing the
1334             queue file to reside on an NFS drive.
1335              
1336             =head1 LICENSE AND COPYRIGHT
1337              
1338             Copyright (c) 2017-2019, Marty O'Brien.
1339              
1340             This library is free software; you can redistribute it and/or modify
1341             it under the same terms as Perl itself, either Perl version 5.10.1 or,
1342             at your option, any later version of Perl 5 you may have available.
1343              
1344             See http://dev.perl.org/licenses/ for more information.
1345              
1346             =cut