File Coverage

blib/lib/Forks/Queue.pm
Criterion Covered Total %
statement 100 146 68.4
branch 39 62 62.9
condition 18 19 94.7
subroutine 28 44 63.6
pod 22 24 91.6
total 207 295 70.1


line stmt bran cond sub pod time code
1             package Forks::Queue;
2 121     121   2658967 use strict;
  121         560  
  121         3293  
3 121     121   516 use warnings;
  121         465  
  121         3099  
4 121     121   555 use Scalar::Util 'looks_like_number';
  121         206  
  121         5264  
5 121     121   575 use Carp;
  121         178  
  121         5234  
6 121     121   20080 use Time::HiRes;
  121         49928  
  121         497  
7 121     121   8607 use Config;
  121         374  
  121         16391  
8              
9             our $VERSION = '0.13';
10             our $DEBUG = $ENV{FORKS_QUEUE_DEBUG} || 0;
11              
12             our $NOTIFY_OK = $ENV{FORKS_QUEUE_NOTIFY} // do {
13             $Config::Config{sig_name} =~ /\bIO\b/;
14             };
15              
16             our $SLEEP_INTERVAL = $NOTIFY_OK ? 2 : 1;
17             our $SLEEP_INTERVALX = 2;
18              
19             # default values to apply to all new Forks::Queue implementations
20             our %OPTS = (limit => -1, on_limit => 'fail', style => 'fifo',
21             impl => $ENV{FORKS_QUEUE_IMPL} || 'File' );
22              
23             sub new {
24 171     171 1 28962763 my $class = shift;
25 121     121   667 no warnings 'once';
  121         672  
  121         161471  
26 171         4789 my %opts = (%OPTS, @_);
27              
28 171 100       1490 if ($opts{impl}) {
29 170         899 my $pkg = delete $opts{impl};
30 170 50       1921 if ($pkg =~ /[^\w:]/) {
31 0         0 croak "Forks::Queue cannot be instantiated. Invalid 'impl' $pkg";
32             }
33 170 50       21384 if (eval "require Forks::Queue::$pkg; 1") {
    0          
34 170         967 $pkg = "Forks::Queue::" . $pkg;
35 170         2205 return $pkg->new(%opts);
36             } elsif (eval "require $pkg; 1") {
37 0         0 return $pkg->new(%opts);
38             } else {
39 0         0 croak "Forks::Queue cannot be instantiated. ",
40             "Did not recognize 'impl' option '$opts{impl}'";
41             }
42             }
43 1         166 croak "Forks::Queue cannot be instantiated. ",
44             "Use an implementation or pass the 'impl' option to the constructor";
45             }
46              
47             sub import {
48 245     245   3107 my ($class, @args) = @_;
49 245         5037 for (my $i=0; $i<@args; $i++) {
50 0         0 foreach my $key (qw(limit on_limit impl style)) {
51 0 0       0 if ($args[$i] eq $key) {
52 0         0 $OPTS{$args[$i]} = $args[$i+1];
53 0         0 $i++;
54 0         0 next;
55             }
56             }
57             }
58             }
59              
60             sub put {
61 530     530 1 36159933 my $self = shift;
62 530         5610 return $self->push(@_);
63             }
64              
65 6     6 1 4116 sub enqueue { goto &put; }
66              
67             sub get {
68 161     161 1 30137339 my $self = shift;
69 161 100       1043 _validate_input($_[0], 'count', 1) if @_;
70 161 100       802 if ($self->{style} eq 'fifo') {
71 133 100       1521 return @_ ? $self->shift(@_) : $self->shift;
72             } else {
73 28 100       239 my @gotten = @_ ? reverse($self->pop(@_)) : $self->pop;
74 28 100       153 return @_ ? @gotten : $gotten[0];
75             }
76             }
77              
78 0     0 1 0 sub dequeue { goto &get; }
79              
80             sub dequeue_timed {
81 18     18 1 43 my $self = shift;
82 18         38 my $timeout = shift;
83 18         65 _validate_input($timeout, 'timeout', 0, 1);
84 18         68 local $self->{_expire} = Time::HiRes::time + $timeout;
85 18         32 local $SLEEP_INTERVAL = $Forks::Queue::SLEEP_INTERVAL;
86 18         88 $SLEEP_INTERVAL /= 2 while $SLEEP_INTERVAL > 0.25 * $timeout;
87 18 50       104 return @_ ? $self->dequeue(@_) : $self->dequeue;
88             }
89              
90             sub get_timed {
91 12     12 1 24 my $self = shift;
92 12         25 my $timeout = shift;
93 12         71 _validate_input($timeout, 'timeout', 0, 1);
94 12         106 local $self->{_expire} = Time::HiRes::time + $timeout;
95 12         75 local $SLEEP_INTERVAL = $Forks::Queue::SLEEP_INTERVAL;
96 12         64 $SLEEP_INTERVAL /= 2 while $SLEEP_INTERVAL > 0.25 * $timeout;
97 12 100       188 return @_ ? $self->get(@_) : $self->get;
98             }
99              
100             sub shift_timed {
101 0     0 1 0 my $self = shift;
102 0         0 my $timeout = shift;
103 0         0 _validate_input($timeout, 'timeout', 0, 1);
104 0         0 local $self->{_expire} = Time::HiRes::time + $timeout;
105 0         0 local $SLEEP_INTERVAL = $Forks::Queue::SLEEP_INTERVAL;
106 0         0 $SLEEP_INTERVAL /= 2 while $SLEEP_INTERVAL > 0.25 * $timeout;
107 0 0       0 return @_ ? $self->shift(@_) : $self->shift;
108             }
109              
110             sub pop_timed {
111 0     0 1 0 my $self = shift;
112 0         0 my $timeout = shift;
113 0         0 _validate_input($timeout, 'timeout', 0, 1);
114 0         0 local $self->{_expire} = Time::HiRes::time + $timeout;
115 0         0 local $SLEEP_INTERVAL = $Forks::Queue::SLEEP_INTERVAL;
116 0         0 $SLEEP_INTERVAL /= 2 while $SLEEP_INTERVAL > 0.25 * $timeout;
117 0 0       0 return @_ ? $self->pop(@_) : $self->pop;
118             }
119              
120             sub _expired {
121 39014     39014   74984 my ($self) = @_;
122 39014 100       219846 $self->{_expire} && Time::HiRes::time >= $self->{_expire};
123             }
124              
125             sub get_nb {
126 18     18 1 61 my $self = shift;
127 18 100       93 _validate_input($_[0], 'count', 1) if @_;
128 3 50       66 if ($self->{style} eq 'fifo') {
129 0 0       0 return @_ ? $self->shift_nb(@_) : $self->shift_nb;
130             } else {
131 3 50       108 return @_ ? @ := reverse($self->pop_nb(@_)) : $self->pop_nb;
132             }
133             }
134 15     15 1 9244 sub dequeue_nb { goto &get_nb; }
135              
136             sub peek {
137 86     86 1 10727 my ($self,$index) = @_;
138 86 100       323 _validate_input($index, 'index') if @_ > 1;
139 77 100       183 if ($self->{style} eq 'lifo') {
140 8   50     68 return $self->peek_back($index || 0);
141             } else {
142 69   100     263 return $self->peek_front($index || 0);
143             }
144             }
145              
146             sub pending {
147 267     267 1 131063 my $self = shift;
148 267         1286 my $s = $self->status;
149 267 50       3438 return $s->{avail} ? $s->{avail} : $s->{end} ? undef : 0;
    100          
150             }
151              
152             sub _unimpl {
153 0     0   0 my $func = shift;
154 0         0 croak "Forks::Queue: $func not implemented in abstract base class";
155             }
156              
157             sub limit :lvalue {
158 0     0 1 0 my $self = shift;
159 0 0       0 if (@_) {
160 0         0 $self->{limit} = shift @_;
161 0 0       0 if (@_) {
162 0         0 $self->{on_limit} = shift @_;
163             }
164             }
165 0         0 $self->{limit};
166             }
167              
168             sub _validate_input {
169 394     394   1305 my ($value,$name,$ge,$float_ok) = @_;
170 394 100 100     10506 croak "Invalid '$name'"
      100        
      100        
      100        
      100        
171             if !defined($value) ||
172             !looks_like_number($value) ||
173             (!$float_ok && $value != int($value)) ||
174             (defined($ge) && $value < $ge);
175 319         625 return $value;
176             }
177              
178 0     0 1 0 sub push { _unimpl("push/put") }
179 0     0 0 0 sub peek_front { _unimpl("peek") }
180 0     0 0 0 sub peek_back { _unimpl("peek") }
181 0     0 1 0 sub shift :method { _unimpl("shift/get") }
182 0     0 1 0 sub unshift { _unimpl("unshift") }
183 0     0 1 0 sub pop { _unimpl("pop/get") }
184 0     0 1 0 sub shift_nb { _unimpl("shift/get") }
185 0     0 1 0 sub pop_nb { _unimpl("pop/get") }
186 0     0 1 0 sub status { _unimpl("pending/status") }
187 0     0 1 0 sub clear { _unimpl("clear") }
188              
189              
190              
191              
192              
193              
194             sub Forks::Queue::Util::__is_nfs {
195 198     198   33059 my ($dir) = @_;
196 198 50       1673 if ($^O ne 'linux') {
197 0         0 return;
198             }
199 198         256718 my $pid = fork();
200 198 100       13853 if ($pid == 0) {
201 48         4810 close STDOUT;
202 48         1031 close STDERR;
203             # http://superuser.com/q/422061
204 48         0 exec("df",$dir,"-t","nfs");
205 0         0 die;
206             }
207 150         12767 local $?;
208 150         120419944 waitpid $pid,0;
209 150         12032 return $? == 0;
210             }
211              
212             sub Forks::Queue::Util::QID {
213 121     121   800 no warnings 'uninitialized';
  121         229  
  121         12725  
214 136 100   136   919 if ($Forks::Queue::Util::PID != $$) {
215 86         296 $Forks::Queue::Util::PID = $$;
216 86         232 $Forks::Queue::Util::QID = 0;
217             }
218 136         342 $Forks::Queue::Util::QID++;
219 136         1901 join("-", $Forks::Queue::Util::PID, $Forks::Queue::Util::QID);
220             }
221              
222              
223             # manage global destruction phase
224             BEGIN {
225 121 50   121   740 if (defined(${^GLOBAL_PHASE})) {
226 121 50   13202   9990 eval 'sub __inGD(){%{^GLOBAL_PHASE} eq q{DESTRUCT} && __END()};1'
  13202         65206  
227             } else {
228 0         0 require B;
229 0         0 eval 'sub __inGD(){${B::main_cv()}==0 && __END()};1'
230             }
231             }
232 73     73   1078843 END { &__END }
233             sub __END {
234 121     121   665 no warnings 'redefine';
  121         220  
  121         20097  
235 73     0   1780 *DB::DB = sub {};
        73      
236 73         6008 *__inGD = sub () { 1 };
237             }
238              
239              
240              
241             1;
242              
243             =head1 NAME
244              
245             Forks::Queue - queue that can be shared across processes
246              
247             =head1 VERSION
248              
249             0.13
250              
251             =head1 SYNOPSIS
252              
253             use Forks::Queue;
254             $q = Forks::Queue->new( impl => ..., style => 'lifo' );
255              
256             # put items on queue
257             $q->put("a scalar item");
258             $q->put(["an","arrayref","item"]);
259             $q->put({"a"=>"hash","reference"=>"item"});
260             $q->put("list","of","multiple",["items"]);
261             $q->end; # no more jobs will be added to queue
262              
263             # retrieve items from queue, possibly after a fork
264             $item = $q->get;
265             $item = $q->peek; # get item without removing it
266             @up_to_10_items = $q->get(10);
267             $remaining_items = $q->pending;
268              
269             =head1 DESCRIPTION
270              
271             Interface for a queue object that can be shared across processes
272             and threads.
273             Available implementations are L,
274             L,
275             L.
276              
277             =head1 METHODS
278              
279             Many of these methods pass or return "items". For this distribution,
280             an "item" is any scalar or reference that can be serialized and
281             shared across processes.
282              
283             This will include scalars and most unblessed references
284              
285             "42"
286             [1,2,3,"forty-two"]
287             { name=>"a job", timestamp=>time, input=>{foo=>[19],bar=>\%bardata} }
288              
289             but will generally preclude data with blessed references and code references
290              
291             { name => "bad job", callback => \&my_callback_routine }
292             [ 46, $url13, File::Temp->new ]
293              
294             =head2 new
295              
296             $queue = Forks::Queue->new( %opts )
297              
298             Instantiates a new queue object with the given configuration.
299              
300             If one of the options is C, the constructor from that
301             C subclass will be invoked.
302              
303             Other options that should be supported on all implementations include
304              
305             =over 4
306              
307             =item * style
308              
309             =item * C<< style => 'fifo' | 'lifo' >>
310              
311             Indicates whether the L<"get"> method will return items in
312             first-in-first-out order or last-in-first-out order (and which
313             end of the queue the L<"peek"> method will examine)
314              
315             =item * limit
316              
317             =item * C<< limit => int >>
318              
319             A maximum size for the queue. Set to a non-positive value to
320             specify an unlimited size queue.
321              
322             =item * on_limit
323              
324             =item * C<< on_limit => 'block' | 'fail' >>
325              
326             Dictates what the queue should do when an attempt is made to
327             add items beyond the queue's limit. If C, the queue
328             will block and wait until items are removed from the queue.
329             If C, the queue will warn and return immediately without
330             changing the queue.
331              
332             See the L<"enqueue">, L<"put">, L<"push">, L<"unshift">,
333             and L<"insert"> methods, which are used to increase the length
334             of the queue and may be affected by this setting.
335              
336             =item * join
337              
338             =item * C<< join => bool >>
339              
340             If true, expects that the queue referred to by this constructor
341             has already been created in another process, and that the current
342             process should access the existing queue. This allows a queue to
343             be shared across unrelated processes (i.e., processes that do not
344             have a parent-child relationship).
345              
346             # my_daemon.pl - may run "all the time" in the background
347             $q = Forks::Queue::File->new(file=>'/var/spool/foo/q17');
348             # creates new queue object
349             ...
350              
351             # worker.pl - may run periodically for a short time, launched from
352             # cron or from command line, but not from the daemon
353             $q = Forks::Queue->new( impl => 'File', join => 1,
354             file => '/var/spool/foo/q17',
355             # the new queue attaches to existing file at /var/spool/foo/q17
356             ...
357              
358             C is not necessary for child processes forked from a process with
359             an existing queue
360              
361             $q = Forks::Queue->new(...)
362             ...
363             if (fork() == 0) {
364             # $q already exists and the child process can begin using it,
365             # no need for a Forks::Queue constructor with join
366             ...
367             }
368              
369             =item * persist
370              
371             =item * C<< persist => bool >>
372              
373             Active C objects affect your system, writing to disk or
374             writing to memory, and in general they clean themselves up when they
375             detect that no more processes are using the queue. The C option,
376             if set to true, instructs the queue object to leave its state intact
377             after destruction.
378              
379             An obvious use case for this option is debugging, to examine the
380             state of the queue after abnormal termination of your program.
381              
382             A second use case is to create persistent queues -- queues that are
383             shared not only among different processes, but among different
384             processes that are running at different times. The persistent queue
385             can be used by supplying both the C and the C options
386             to the C constructor.
387              
388             $queue_file = "/tmp/persistent.job.queue";
389             $join = -f $queue_file;
390             $q = Forks::Queue->new( impl => 'File', file => $queue_file,
391             join => $join, persist => 1 );
392             ... work with the queue ...
393             # the queue remains intact if this program exits or aborts
394              
395             =item * C<< list => ARRAYREF >>
396              
397             Initializes the contents of the queue with the argument to the
398             C option. The argument must be an array reference.
399              
400             If the C option is specified, the contents of the list
401             could be added to an already existing queue.
402              
403             =back
404              
405             See the global L<"%OPTS"> variable for information about
406             default values for many of these settings.
407              
408             =head2 put
409              
410             =head2 enqueue
411              
412             $count = $queue->put(@items);
413             $count = $queue->enqueue(@items)
414              
415             Place one or more "items" on the queue, and returns the number of
416             items successfully added to the queue.
417              
418             Adding items to the queue will fail if the L<"end"> method of
419             the queue had previously been called from any process.
420              
421             The C method name is provided for compatibility with
422             L.
423              
424             See the L<"limit"> method to see how the C method behaves
425             when adding items would cause the queue to exceed its maximum size.
426              
427              
428             =head2 push
429              
430             $count = $queue->push(@items)
431              
432             Equivalent to L<"put">, adding items to the end of the queue and
433             returning the number of items successfully added. The most recent
434             items appended to the queue by C or C will be the first
435             items taken from the queue by L<"pop"> or by L<"get"> with LIFO
436             style queues, and the last items removed by L<"shift"> or L<"get">
437             with FIFO style queues.
438              
439             If the items added to the queue would cause the queue to exceed
440             its queue size limit (as determined by the L<"limit"> attribute),
441             this method will either block until queue capacity is available,
442             or issue a warning about the uninserted items and return the
443             number of items added, depending on the queue's setting for
444             L<"on_limit"|Forks::Queue/"new">.
445              
446              
447             =head2 unshift
448              
449             $count = $queue->unshift(@items)
450              
451             Equivalent to C, adding items to the front
452             of the queue, and returning the number of items successfully
453             added. In FIFO queues, items added to the queue with C
454             will be the last items taken from the queue by L<"get">,
455             and in LIFO queues, they will be the first items taken from the
456             queue by L<"get">.
457              
458             This method is inefficient for some queue implementations.
459              
460             =head2 end
461              
462             $queue->end
463              
464             Indicates that no more items are to be put on the queue,
465             so that when a process tries to retrieve an item from an empty queue,
466             it will not block and wait for new items to be added. Causes any
467             processes blocking on a L<"get">/L<"dequeue">/L<"shift">/L<"pop">
468             call to become unblocked and return C.
469             This method may be called from any process that has access to the queue.
470              
471             Calling C on a queue more than once will generate a warning
472             message, even if the caller is not the same process/thread that
473             made the original C call.
474              
475              
476             =head2 get
477              
478             =head2 dequeue
479              
480             $item = $queue->get;
481             $item = $queue->dequeue;
482              
483             @items = $queue->get($count);
484             @items = $queue->dequeue($count);
485              
486             Attempt to retrieve one or more "items" on the queue. If the
487             queue is empty, and if L<"end"> has not been called on the queue,
488             this call blocks until an item is available or until the L<"end">
489             method has been called from some other process. If the queue is
490             empty and L<"end"> has been called, this method returns an
491             empty list in list context or C in scalar context.
492              
493             If a C<$count> argument is supplied, returns up to C<$count> items
494             or however many items are currently availble on the queue, whichever
495             is fewer. But the call still blocks if L<"end"> has not been called
496             until there is at least one item available. See L<"get_nb"> for a
497             non-blocking version of this method. The return value of this
498             function when a C<$count> argument is supplied is always a list,
499             so if you evaluate it in scalar context you will get the number of items
500             retrieved from the queue, not the items themselves.
501              
502             $job = $q->get; # $job is an item from the queue
503             $job = $q->get(1); # returns # of items retrieved, not an actual item!
504             ($job) = $q->get(1); # $job is an item from the queue
505              
506             The only important difference between C and C is what
507             happens when there is a C<$count> argument, and the queue currently has
508             more than zero but fewer than C<$count> items available. In this case,
509             the C call will return all of the available items. The C
510             method will block until at least C<$count> items are available on the
511             queue, or until the L<"end"> method has been called on the queue.
512             This C behavior is consistent with the behavior of the
513             L<"dequeue" method in Thread::Queue|Thread::Queue/"dequeue">.
514              
515              
516             =head2 pop
517              
518             $item = $queue->pop
519             @items = $queue->pop($count)
520              
521             Retrieves one or more items from the "back" of the queue.
522             For LIFO style queues, the L<"get"> method is equivalent to this method.
523             Like C<"get">, this method blocks while the queue is empty and the
524             L<"end"> method has not been called on the queue.
525              
526             If a C<$count> argument is supplied, returns up to C<$count> items or however
527             many items are currently available on the queue, whichever is fewer.
528             (Like the L<"get"> call, this method blocks when waiting for input. See
529             L<"pop_nb"> for a non-blocking version of the method. Also like
530             L<"get">, you should be wary of using this method in scalar context
531             if you provide a C<$count> argument).
532              
533             =head2 shift
534              
535             $item = $queue->shift
536             @items = $queue->shift($count)
537              
538             Retrieves one or more items from the "front" of the queue.
539             For FIFO style queues, the L<"get"> method is equivalent to this method.
540             Like C<"get">, this method blocks while the queue is empty and the
541             L<"end"> method has not been called on the queue.
542              
543             If a C<$count> argument is supplied, returns up to C<$count> items or however
544             many items are currently available on the queue, whichever is fewer. (Like the
545             L<"get"> call, this method blocks when waiting for input. See
546             L<"shift_nb"> for a non-blocking version of the method. Also like
547             L<"get">, you should be wary of using this method in scalar context
548             if you provide a C<$count> argument).
549              
550              
551             =head2 get_nb
552              
553             =head2 dequeue_nb
554              
555             =head2 pop_nb
556              
557             =head2 shift_nb
558              
559             $item = $queue->XXX_nb
560             @items = $queue->XXX_nb($count)
561              
562             Non-blocking versions of the L<"get">, L<"dequeue">, L<"pop">,
563             and L<"shift"> methods. These functions return immediately if
564             there are no items in the queue to retrieve, returning C
565             in the case with no arguments and an empty list when a
566             C<$count> argument is supplied.
567              
568             =head2 get_timed
569              
570             =head2 dequeue_timed
571              
572             =head2 shift_timed
573              
574             =head2 pop_timed
575              
576             $item = $queue->XXX_timed($timeout)
577             @item = $queue->XXX_timed($timeout,$count)
578              
579             Timed versions of L<"get">, L<"dequeue">, L<"shift">, and L<"pop">
580             that take a C<$timeout> argument and will stop blocking after
581             C<$timeout> seconds have elapsed.
582              
583             If a C<$count> argument is supplied to C, the function
584             will wait up to C<$timeout> seconds for at least C<$count> items to
585             be available on the queue. After C<$timeout> seconds have passed,
586             the function will return up to C<$count> available items.
587              
588             For other timed methods, supplying a C<$count> argument for a
589             queue with more than zero but fewer than C<$count> items available
590             will return all available items without blocking.
591              
592              
593             =head2 peek
594              
595             $item = $queue->peek
596             $item = $queue->peek($index)
597             $item = $queue->peek_front
598             $item = $queue->peek_back
599              
600             Returns an item from the queue without removing it. The C
601             and C methods inspect the item at the front and the back of
602             the queue, respectively. The generic C method is equivalent to
603             C for FIFO style queues and C for LIFO style
604             queues. If an index is specified, returns the item at that position
605             in the queue (where position 0 is the head of the queue). Negative
606             indices are supported, so a call to C<< $queue->peek(-2) >>,
607             for example, would return the second to last item in the queue.
608              
609             If the queue is empty or if the specified index is larger than the
610             number of elements currently in the queue, these methods will
611             return C without blocking.
612              
613             Note that unlike the
614             L<< "peek" method in C|Thread::Queue/"peek" >>,
615             C returns a copy of the item on the queue,
616             so manipulating a reference returned from C while B
617             affect the item on the queue.
618              
619             =head2 extract
620              
621             $item = $queue->extract
622             $item = $queue->extract($index)
623             @items = $queue->extract($index,$count)
624              
625             Removes and returns the specified number of items from the queue
626             at the specified index position, to provide random access to the
627             queue. The method is non-blocking and may return fewer than the
628             number of items requested (or zero items) if there are not enough
629             items in the queue to satisfy the request.
630              
631             If the C<$count> argument is not provided, the method will return
632             (if available) a single item. If the C<$index> argument is also
633             not provided, it will return the first item on the queue exactly
634             like the L<"get_nb"> method with no arguments.
635              
636             Negative C<$index> values are supported, in which case this
637             method will extract the corresponding items at the back of the
638             queue.
639              
640             Like C vs. C, the return value is always a
641             scalar when no C<$count> argument is provided, and always a list
642             when it is.
643              
644              
645             =head2 insert
646              
647             $count = $queue->insert($index, @list)
648              
649             Provides random access to the queue, inserting the items specified
650             in C<@list> into the queue after index position C<$index>.
651             Negative C<$index> values are supported, which indicate that the
652             items should be inserted after that position relative to the
653             back of the queue.
654              
655             Returns the number of items that were inserted into the queue.
656             If the queue has a L<"limit"> set, and inserting all the items on
657             the list would cause the queue size to exceed the limit, this
658             method will either block until capacity to insert the whole list
659             becomes available, or it will insert items up to the queue size
660             limit and issue a warning about the uninserted items, depending
661             on the queue's L<"on_limit"|Forks::Queue/"new"> setting.
662              
663             This method is inefficient for some queue implementations.
664              
665              
666             =head2 pending
667              
668             $num_items_avail = $queue->pending
669              
670             Returns the total number of items available on the queue. There is no
671             guarantee that the number of available items will not change between a
672             call to C and a subsequent call to L<"get">
673              
674             =head2 clear
675              
676             $queue->clear
677              
678             Removes all items from the queue.
679              
680              
681             =head2 status
682              
683             $status = $queue->status
684              
685             Returns a hash reference with meta information about the queue.
686             The information should at least include the number of items remaining in
687             the queue. Other implementations may provide additional information
688             in this return value.
689              
690              
691             =head2 limit
692              
693             $max = $queue->limit
694             $queue->limit($new_limit)
695             $queue->limit($new_limit,$on_limit)
696             $queue->limit = $new_limit # may not work on some configurations
697              
698             Returns or updates the maximum size of the queue. With no args, returns
699             the existing maximum queue size, with a non-positive value indicating
700             that the queue does not have a maximum size.
701              
702             The return value also acts as an lvalue through which the maximum
703             queue size can be set, and allows the C method to be used
704             in the same way as
705             L<< the C method in Thread::Queue|Thread::Queue/"limit" >>.
706             I<< Note: lvalue feature may not work with Perl v<5.14. >>
707              
708             If arguments are provided, the first argument is used to set the
709             maximum queue size. A non-positive queue size can be specified to
710             indicate that the queue does not have a maximum size.
711             The second argument, if provided, updates the behavior of the queue
712             when an attempt is made to add items beyond the maximum size.
713             The acceptable values for the second argument are C, which causes
714             an insertion operation to block until there is capacity on the queue,
715             or C, which returns immediately from an insertion operation with
716             a warning about items that were not added to the queue.
717              
718              
719             =head1 VARIABLES
720              
721             =head2 %OPTS
722              
723             Global hash containing the set of default options for all
724             C constructors. Initially this hash contains the
725             key-value pairs
726              
727             impl "File"
728             style "fifo"
729             limit -1
730             on_limit "fail"
731              
732             but they may be changed at any time to affect all subsequently
733             constructed C objects. The global options can also
734             be set at import time with additional arguments for the C
735             statement.
736              
737             use Forks::Queue impl => 'SQLite'; # use SQLite queues by default
738             $Forks::Queue::OPTS{impl} = 'SQLite'; # equivalent run-time call
739            
740             use Forks::Queue
741             on_limit => 'block', limit => 10; # finite, blocking queues by default
742             $Forks::Queue::OPTS{limit} = 10;
743             $Forks::Queue::OPTS{on_limit} = 'block'; # equivalent run-time calls
744              
745             =head1 ENVIRONMENT
746              
747             Some environment variable settings that can affect this module:
748              
749             =over 4
750              
751             =item * FORKS_QUEUE_IMPL
752              
753             Specifies a default implementation to use, overriding the initial setting
754             of C<$Forks::Queue::OPTS{"impl"}>, in cases where the C
755             constructor is invoked without passing an C option.
756              
757             =item * FORKS_QUEUE_DEBUG
758              
759             If set to a true value, outputs information about the activity of
760             the queues to standard error.
761              
762             =item * FORKS_QUEUE_NOTIFY
763              
764             If set to a false value, disables use of signals on POSIX-y platforms
765             that may help improve queue performance
766              
767             =item * FORKS_QUEUE_DIR
768              
769             Specifies a directory to use for temporary queue files in the
770             L and
771             L implementations.
772             If this directory is not specified, the implementations will try to make
773             a reasonable choice based on your platform and other environment settings.
774              
775             =back
776              
777             =head1 DEPENDENCIES
778              
779             The C module and all its current implementations require
780             the L module.
781              
782             =head1 SEE ALSO
783              
784             L, L,
785             L, L,
786             L, L.
787              
788             =head1 SUPPORT
789              
790             You can find documentation for this module with the perldoc command.
791              
792             perldoc Forks::Queue
793              
794              
795             You can also look for information at:
796              
797             =over 4
798              
799             =item * RT: CPAN's request tracker (report bugs here)
800              
801             L
802              
803             =item * CPAN Ratings
804              
805             L
806              
807             =item * Search CPAN
808              
809             L
810              
811             =back
812              
813              
814             =head1 LICENSE AND COPYRIGHT
815              
816             Copyright (c) 2017-2019, Marty O'Brien.
817              
818             This library is free software; you can redistribute it and/or modify
819             it under the same terms as Perl itself, either Perl version 5.10.1 or,
820             at your option, any later version of Perl 5 you may have available.
821              
822             See http://dev.perl.org/licenses/ for more information.
823              
824             =cut
825              
826             # TODO:
827             #
828             # priorities
829             # Directory implementation (see Queue::Dir)
830             #