File Coverage

blib/lib/Forks/Queue.pm
Criterion Covered Total %
statement 100 153 65.3
branch 39 64 60.9
condition 19 19 100.0
subroutine 27 46 58.7
pod 23 26 88.4
total 208 308 67.5


line stmt bran cond sub pod time code
1             package Forks::Queue;
2 122     122   2903631 use strict;
  122         669  
  122         3478  
3 122     122   583 use warnings;
  122         249  
  122         3697  
4 122     122   709 use Scalar::Util 'looks_like_number';
  122         211  
  122         5916  
5 122     122   688 use Carp;
  122         256  
  122         6217  
6 122     122   24188 use Time::HiRes;
  122         61079  
  122         702  
7 122     122   11314 use Config;
  122         401  
  122         21983  
8              
9             our $VERSION = '0.14';
10             our $DEBUG = $ENV{FORKS_QUEUE_DEBUG} || 0;
11              
12             our $NOTIFY_OK = $ENV{FORKS_QUEUE_NOTIFY} // do {
13             $Config::Config{sig_name} =~ /\bIO\b/;
14             };
15              
16             our $SLEEP_INTERVAL = $NOTIFY_OK ? 2 : 1;
17             our $SLEEP_INTERVALX = 2;
18              
19             # default values to apply to all new Forks::Queue implementations
20             our %OPTS = (limit => -1, on_limit => 'fail', style => 'fifo',
21             impl => $ENV{FORKS_QUEUE_IMPL} || 'File' );
22              
23             sub new {
24 171     171 1 28700027 my $class = shift;
25 122     122   789 no warnings 'once';
  122         901  
  122         190106  
26 171         5440 my %opts = (%OPTS, @_);
27              
28 171 100       1736 if ($opts{impl}) {
29 170         1006 my $pkg = delete $opts{impl};
30 170 50       2272 if ($pkg =~ /[^\w:]/) {
31 0         0 croak "Forks::Queue cannot be instantiated. Invalid 'impl' $pkg";
32             }
33 170         24156 my $e1 = eval "require Forks::Queue::$pkg; 1";
34 170 50       1282 if ($e1) {
35 170         820 $pkg = "Forks::Queue::" . $pkg;
36 170         2456 return $pkg->new(%opts);
37             }
38 0         0 my $err1 = $@;
39 0 0       0 if (eval "require $pkg; 1") {
40 0         0 return $pkg->new(%opts);
41             } else {
42 0 0       0 warn $err1 if $err1;
43 0         0 croak "Forks::Queue cannot be instantiated. ",
44             "Did not recognize 'impl' option '$opts{impl}'";
45             }
46             }
47 1         183 croak "Forks::Queue cannot be instantiated. ",
48             "Use an implementation or pass the 'impl' option to the constructor";
49             }
50              
51             sub import {
52 246     246   3307 my ($class, @args) = @_;
53 246         6045 for (my $i=0; $i<@args; $i++) {
54 0         0 foreach my $key (qw(limit on_limit impl style)) {
55 0 0       0 if ($args[$i] eq $key) {
56 0         0 $OPTS{$args[$i]} = $args[$i+1];
57 0         0 $i++;
58 0         0 next;
59             }
60             }
61             }
62             }
63              
64             sub put {
65 524     524 1 36145754 my $self = shift;
66 524         5443 return $self->push(@_);
67             }
68              
69             sub enqueue {
70 0     0 1 0 my $self = shift;
71 0         0 return $self->push(@_);
72             }
73              
74             sub get {
75 161     161 1 30061675 my $self = shift;
76 161 100       1289 _validate_input($_[0], 'count', 1) if @_;
77 161 100       967 if ($self->{style} eq 'fifo') {
78 133 100       1644 return @_ ? $self->shift(@_) : $self->shift;
79             } else {
80 28 100       180 my @gotten = @_ ? reverse($self->pop(@_)) : $self->pop;
81 28 100       166 return @_ ? @gotten : $gotten[0];
82             }
83             }
84              
85 0     0 1 0 sub dequeue { goto &get; }
86              
87             sub dequeue_timed {
88 18     18 1 46 my $self = shift;
89 18         34 my $timeout = shift;
90 18         84 _validate_input($timeout, 'timeout', 0, 1);
91 18         70 local $self->{_expire} = Time::HiRes::time + $timeout;
92 18         43 local $SLEEP_INTERVAL = $Forks::Queue::SLEEP_INTERVAL;
93 18         98 $SLEEP_INTERVAL /= 2 while $SLEEP_INTERVAL > 0.25 * $timeout;
94 18 50       143 return @_ ? $self->dequeue(@_) : $self->dequeue;
95             }
96              
97             sub get_timed {
98 12     12 1 916 my $self = shift;
99 12         27 my $timeout = shift;
100 12         120 _validate_input($timeout, 'timeout', 0, 1);
101 12         97 local $self->{_expire} = Time::HiRes::time + $timeout;
102 12         89 local $SLEEP_INTERVAL = $Forks::Queue::SLEEP_INTERVAL;
103 12         107 $SLEEP_INTERVAL /= 2 while $SLEEP_INTERVAL > 0.25 * $timeout;
104 12 100       316 return @_ ? $self->get(@_) : $self->get;
105             }
106              
107             sub shift_timed {
108 0     0 1 0 my $self = shift;
109 0         0 my $timeout = shift;
110 0         0 _validate_input($timeout, 'timeout', 0, 1);
111 0         0 local $self->{_expire} = Time::HiRes::time + $timeout;
112 0         0 local $SLEEP_INTERVAL = $Forks::Queue::SLEEP_INTERVAL;
113 0         0 $SLEEP_INTERVAL /= 2 while $SLEEP_INTERVAL > 0.25 * $timeout;
114 0 0       0 return @_ ? $self->shift(@_) : $self->shift;
115             }
116              
117             sub pop_timed {
118 0     0 1 0 my $self = shift;
119 0         0 my $timeout = shift;
120 0         0 _validate_input($timeout, 'timeout', 0, 1);
121 0         0 local $self->{_expire} = Time::HiRes::time + $timeout;
122 0         0 local $SLEEP_INTERVAL = $Forks::Queue::SLEEP_INTERVAL;
123 0         0 $SLEEP_INTERVAL /= 2 while $SLEEP_INTERVAL > 0.25 * $timeout;
124 0 0       0 return @_ ? $self->pop(@_) : $self->pop;
125             }
126              
127             sub _expired {
128 36299     36299   84553 my ($self) = @_;
129 36299 100       241006 $self->{_expire} && Time::HiRes::time >= $self->{_expire};
130             }
131              
132             sub get_nb {
133 18     18 1 100 my $self = shift;
134 18 100       127 _validate_input($_[0], 'count', 1) if @_;
135 3 50       51 if ($self->{style} eq 'fifo') {
136 0 0       0 return @_ ? $self->shift_nb(@_) : $self->shift_nb;
137             } else {
138 3 50       129 return @_ ? @ := reverse($self->pop_nb(@_)) : $self->pop_nb;
139             }
140             }
141 15     15 1 9512 sub dequeue_nb { goto &get_nb; }
142              
143             sub peek {
144 129     129 1 12353 my ($self,$index) = @_;
145 129 100       485 _validate_input($index, 'index') if @_ > 1;
146 120 100       310 if ($self->{style} eq 'lifo') {
147 48   100     222 return $self->peek_back($index || 0);
148             } else {
149 72   100     301 return $self->peek_front($index || 0);
150             }
151             }
152              
153             sub pending {
154 267     267 1 102228 my $self = shift;
155 267         1348 my $s = $self->status;
156 267 50       3155 return $s->{avail} ? $s->{avail} : $s->{end} ? undef : 0;
    100          
157             }
158              
159             sub _unimpl {
160 0     0   0 my $func = shift;
161 0         0 croak "Forks::Queue: $func not implemented in abstract base class";
162             }
163              
164             sub limit :lvalue {
165 0     0 1 0 my $self = shift;
166 0 0       0 if (@_) {
167 0         0 $self->{limit} = shift @_;
168 0 0       0 if (@_) {
169 0         0 $self->{on_limit} = shift @_;
170             }
171             }
172 0         0 $self->{limit};
173             }
174              
175             sub _validate_input {
176 430     430   1476 my ($value,$name,$ge,$float_ok) = @_;
177 430 100 100     11975 croak "Invalid '$name'"
      100        
      100        
      100        
      100        
178             if !defined($value) ||
179             !looks_like_number($value) ||
180             (!$float_ok && $value != int($value)) ||
181             (defined($ge) && $value < $ge);
182 355         725 return $value;
183             }
184              
185 0     0 1 0 sub push { _unimpl("push/put") }
186 0     0 0 0 sub peek_front { _unimpl("peek") }
187 0     0 0 0 sub peek_back { _unimpl("peek") }
188 0     0 1 0 sub shift :method { _unimpl("shift/get") }
189 0     0 1 0 sub unshift { _unimpl("unshift") }
190 0     0 1 0 sub pop { _unimpl("pop/get") }
191 0     0 1 0 sub shift_nb { _unimpl("shift/get") }
192 0     0 1 0 sub pop_nb { _unimpl("pop/get") }
193 0     0 1 0 sub status { _unimpl("pending/status") }
194 0     0 1 0 sub clear { _unimpl("clear") }
195 0     0 1 0 sub end { _unimpl("end") }
196 0     0 0 0 sub lock :method { _unimpl("lock") }
197              
198              
199              
200              
201              
202             sub Forks::Queue::Util::__is_nfs {
203 198     198   28949 my ($dir) = @_;
204 198 50       1191 if ($^O ne 'linux') {
205 0         0 return;
206             }
207 198         182175 my $pid = fork();
208 198 100       25857 if ($pid == 0) {
209 48         3859 close STDOUT;
210 48         1007 close STDERR;
211             # http://superuser.com/q/422061
212 48         0 exec("df",$dir,"-t","nfs");
213 0         0 die;
214             }
215 150         8134 local $?;
216 150         139130169 waitpid $pid,0;
217 150         13333 return $? == 0;
218             }
219              
220             sub Forks::Queue::Util::QID {
221 122     122   963 no warnings 'uninitialized';
  122         234  
  122         14701  
222 136 100   136   942 if ($Forks::Queue::Util::PID != $$) {
223 86         368 $Forks::Queue::Util::PID = $$;
224 86         312 $Forks::Queue::Util::QID = 0;
225             }
226 136         393 $Forks::Queue::Util::QID++;
227 136         1634 join("-", $Forks::Queue::Util::PID, $Forks::Queue::Util::QID);
228             }
229              
230              
231             # manage global destruction phase
232             BEGIN {
233 122 50   122   833 if (defined(${^GLOBAL_PHASE})) {
234 122 50   15214   10987 eval 'sub __inGD(){%{^GLOBAL_PHASE} eq q{DESTRUCT} && __END()};1'
  15214         76605  
235             } else {
236 0         0 require B;
237 0         0 eval 'sub __inGD(){${B::main_cv()}==0 && __END()};1'
238             }
239             }
240 74     74   1078819 END { &__END }
241             sub __END {
242 122     122   918 no warnings 'redefine';
  122         287  
  122         23919  
243 74     0   2172 *DB::DB = sub {};
        74      
244 74         7168 *__inGD = sub () { 1 };
245             }
246              
247              
248              
249             1;
250              
251             =head1 NAME
252              
253             Forks::Queue - queue that can be shared safely across processes
254              
255             =head1 VERSION
256              
257             0.14
258              
259             =head1 SYNOPSIS
260              
261             use Forks::Queue;
262             $q = Forks::Queue->new( impl => ..., style => 'lifo' );
263              
264             # put items on queue
265             $q->put("a scalar item");
266             $q->put(["an","arrayref","item"]);
267             $q->put({"a"=>"hash","reference"=>"item"});
268             $q->put("list","of","multiple",["items"]);
269             $q->end; # no more jobs will be added to queue
270              
271             # retrieve items from queue, possibly after a fork
272             $item = $q->get;
273             $item = $q->peek; # get item without removing it
274             @up_to_10_items = $q->get(10);
275             $remaining_items = $q->pending;
276              
277             =head1 DESCRIPTION
278              
279             Interface for a queue object that can be shared across processes
280             and threads.
281             Available implementations are L,
282             L,
283             L.
284              
285             =head1 METHODS
286              
287             Many of these methods pass or return "items". For this distribution,
288             an "item" is any scalar or reference that can be serialized and
289             shared across processes.
290              
291             This will include scalars and most unblessed references
292              
293             "42"
294             [1,2,3,"forty-two"]
295             { name=>"a job", timestamp=>time, input=>{foo=>[19],bar=>\%bardata} }
296              
297             but will generally preclude data with blessed references and code references
298              
299             { name => "bad job", callback => \&my_callback_routine }
300             [ 46, $url13, File::Temp->new ]
301              
302             Many of the methods of C have analogues in the
303             L class, and many scripts using L
304             can be easily transformed to use C.
305              
306             =head2 new
307              
308             $queue = Forks::Queue->new( %opts )
309              
310             Instantiates a new queue object with the given configuration.
311              
312             If one of the options is C, the constructor from that
313             C subclass will be invoked.
314              
315             Other options that should be supported on all implementations include
316              
317             =over 4
318              
319             =item * style
320              
321             =item * C<< style => 'fifo' | 'lifo' >>
322              
323             Indicates whether the L<"get"> method will return items in
324             first-in-first-out order or last-in-first-out order (and which
325             end of the queue the L<"peek"> method will examine)
326              
327             =item * limit
328              
329             =item * C<< limit => int >>
330              
331             A maximum size for the queue. Set to a non-positive value to
332             specify an unlimited size queue.
333              
334             =item * on_limit
335              
336             =item * C<< on_limit => 'block' | 'fail' | 'tq-compat' >>
337              
338             Dictates what the queue should do when an attempt is made to
339             add items beyond the queue's limit. If C, the queue
340             will block and wait until items are removed from the queue.
341             If C, the queue will warn and return immediately without
342             changing the queue.
343              
344             The setting C is similar to C, but has the
345             additional effect where the L<"insert"> method operates without
346             regard to the queue limit. This behavior is compatible with
347             the way queue limits and the insert method work in the
348             L package.
349              
350             See the L<"enqueue">, L<"put">, L<"enqueue">, L<"push">, L<"unshift">,
351             and L<"insert"> methods, which increase the length
352             of the queue and may be affected by this setting.
353              
354             =item * join
355              
356             =item * C<< join => bool >>
357              
358             If true, expects that the queue referred to by this constructor
359             has already been created in another process, and that the current
360             process should access the existing queue. This allows a queue to
361             be shared across unrelated processes (i.e., processes that do not
362             have a parent-child relationship).
363              
364             # my_daemon.pl - may run "all the time" in the background
365             $q = Forks::Queue::File->new(file=>'/var/spool/foo/q17');
366             # creates new queue object
367             ...
368              
369             # worker.pl - may run periodically for a short time, launched from
370             # cron or from command line, but not from the daemon
371             $q = Forks::Queue->new( impl => 'File', join => 1,
372             file => '/var/spool/foo/q17',
373             # the new queue attaches to existing file at /var/spool/foo/q17
374             ...
375              
376             C is not necessary for child processes forked from a process with
377             an existing queue
378              
379             $q = Forks::Queue->new(...)
380             ...
381             if (fork() == 0) {
382             # $q already exists and the child process can begin using it,
383             # no need for a Forks::Queue constructor with join
384             ...
385             }
386              
387             =item * persist
388              
389             =item * C<< persist => bool >>
390              
391             Active C objects affect your system, writing to disk or
392             writing to memory, and in general they clean themselves up when they
393             detect that no more processes are using the queue. The C option,
394             if set to true, instructs the queue object to leave its state intact
395             after destruction.
396              
397             An obvious use case for this option is debugging, to examine the
398             state of the queue after abnormal termination of your program.
399              
400             A second use case is to create persistent queues -- queues that are
401             shared not only among different processes, but among different
402             processes that are running at different times. The persistent queue
403             can be used by supplying both the C and the C options
404             to the C constructor.
405              
406             $queue_file = "/tmp/persistent.job.queue";
407             $join = -f $queue_file;
408             $q = Forks::Queue->new( impl => 'File', file => $queue_file,
409             join => $join, persist => 1 );
410             ... work with the queue ...
411             # the queue remains intact if this program exits or aborts
412              
413             =item * C<< list => ARRAYREF >>
414              
415             Initializes the contents of the queue with the argument to the
416             C option. The argument must be an array reference.
417              
418             If the C option is specified, the contents of the list
419             could be added to an already existing queue.
420              
421             =back
422              
423             See the global L<"%OPTS"> variable for information about
424             default values for many of these settings.
425              
426             =head2 put
427              
428             =head2 enqueue
429              
430             $count = $queue->put(@items);
431             $count = $queue->enqueue(@items)
432              
433             Place one or more "items" on the queue, and returns the number of
434             items successfully added to the queue.
435              
436             Adding items to the queue will fail if the L<"end"> method of
437             the queue had previously been called from any process.
438              
439             See the L<"limit"> method to see how the C method behaves
440             when adding items would cause the queue to exceed its maximum size.
441              
442             The C method name is provided for compatibility with
443             L.
444              
445              
446              
447             =head2 push
448              
449             $count = $queue->push(@items)
450              
451             Equivalent to L<"put">, adding items to the end of the queue and
452             returning the number of items successfully added. The most recent
453             items appended to the queue by C or C will be the first
454             items taken from the queue by L<"pop"> or by L<"get"> with LIFO
455             style queues, and the last items removed by L<"shift"> or L<"get">
456             with FIFO style queues.
457              
458             If the items added to the queue would cause the queue to exceed
459             its queue size limit (as determined by the L<"limit"> attribute),
460             this method will either block until queue capacity is available,
461             or issue a warning about the uninserted items and return the
462             number of items added, depending on the queue's setting for
463             L<"on_limit"|Forks::Queue/"new">.
464              
465              
466             =head2 unshift
467              
468             $count = $queue->unshift(@items)
469              
470             Equivalent to C, adding items to the front
471             of the queue, and returning the number of items successfully
472             added. In FIFO queues, items added to the queue with C
473             will be the last items taken from the queue by L<"get">,
474             and in LIFO queues, they will be the first items taken from the
475             queue by L<"get">.
476              
477             This method is inefficient for some queue implementations.
478              
479             =head2 end
480              
481             $queue->end
482              
483             Indicates that no more items are to be put on the queue,
484             so that when a process tries to retrieve an item from an empty queue,
485             it will not block and wait for new items to be added. Causes any
486             processes blocking on a L<"get">/L<"dequeue">/L<"shift">/L<"pop">
487             call to become unblocked and return C.
488             This method may be called from any process that has access to the queue.
489              
490             Calling C on a queue more than once will generate a warning
491             message, even if the caller is not the same process/thread that
492             made the original C call.
493              
494              
495             =head2 get
496              
497             =head2 dequeue
498              
499             $item = $queue->get;
500             $item = $queue->dequeue;
501              
502             @items = $queue->get($count);
503             @items = $queue->dequeue($count);
504              
505             Attempt to retrieve one or more "items" on the queue. If the
506             queue is empty, and if L<"end"> has not been called on the queue,
507             this call blocks until an item is available or until the L<"end">
508             method has been called from some other process. If the queue is
509             empty and L<"end"> has been called, this method returns an
510             empty list in list context or C in scalar context.
511              
512             If a C<$count> argument is supplied, returns up to C<$count> items
513             or however many items are currently availble on the queue, whichever
514             is fewer. But the call still blocks if L<"end"> has not been called
515             until there is at least one item available. See L<"get_nb"> for a
516             non-blocking version of this method. The return value of this
517             function when a C<$count> argument is supplied is always a list,
518             so if you evaluate it in scalar context you will get the number of items
519             retrieved from the queue, not the items themselves.
520              
521             $job = $q->get; # $job is an item from the queue
522             $job = $q->get(1); # returns # of items retrieved, not an actual item!
523             ($job) = $q->get(1); # $job is an item from the queue
524              
525             The only important difference between C and C is what
526             happens when there is a C<$count> argument, and the queue currently has
527             more than zero but fewer than C<$count> items available. In this case,
528             the C call will return all of the available items. The C
529             method will block until at least C<$count> items are available on the
530             queue, or until the L<"end"> method has been called on the queue.
531             This C behavior is consistent with the behavior of the
532             L<"dequeue" method in Thread::Queue|Thread::Queue/"dequeue">.
533              
534              
535             =head2 pop
536              
537             $item = $queue->pop
538             @items = $queue->pop($count)
539              
540             Retrieves one or more items from the "back" of the queue.
541             For LIFO style queues, the L<"get"> method is equivalent to this method.
542             Like C<"get">, this method blocks while the queue is empty and the
543             L<"end"> method has not been called on the queue.
544              
545             If a C<$count> argument is supplied, returns up to C<$count> items or however
546             many items are currently available on the queue, whichever is fewer.
547             (Like the L<"get"> call, this method blocks when waiting for input. See
548             L<"pop_nb"> for a non-blocking version of the method. Also like
549             L<"get">, you should be wary of using this method in scalar context
550             if you provide a C<$count> argument).
551              
552             =head2 shift
553              
554             $item = $queue->shift
555             @items = $queue->shift($count)
556              
557             Retrieves one or more items from the "front" of the queue.
558             For FIFO style queues, the L<"get"> method is equivalent to this method.
559             Like C<"get">, this method blocks while the queue is empty and the
560             L<"end"> method has not been called on the queue.
561              
562             If a C<$count> argument is supplied, returns up to C<$count> items or however
563             many items are currently available on the queue, whichever is fewer. (Like the
564             L<"get"> call, this method blocks when waiting for input. See
565             L<"shift_nb"> for a non-blocking version of the method. Also like
566             L<"get">, you should be wary of using this method in scalar context
567             if you provide a C<$count> argument).
568              
569              
570             =head2 get_nb
571              
572             =head2 dequeue_nb
573              
574             =head2 pop_nb
575              
576             =head2 shift_nb
577              
578             $item = $queue->XXX_nb
579             @items = $queue->XXX_nb($count)
580              
581             Non-blocking versions of the L<"get">, L<"dequeue">, L<"pop">,
582             and L<"shift"> methods. These functions return immediately if
583             there are no items in the queue to retrieve, returning C
584             in the case with no arguments and an empty list when a
585             C<$count> argument is supplied.
586              
587             =head2 get_timed
588              
589             =head2 dequeue_timed
590              
591             =head2 shift_timed
592              
593             =head2 pop_timed
594              
595             $item = $queue->XXX_timed($timeout)
596             @item = $queue->XXX_timed($timeout,$count)
597              
598             Timed versions of L<"get">, L<"dequeue">, L<"shift">, and L<"pop">
599             that take a C<$timeout> argument and will stop blocking after
600             C<$timeout> seconds have elapsed.
601              
602             If a C<$count> argument is supplied to C, the function
603             will wait up to C<$timeout> seconds for at least C<$count> items to
604             be available on the queue. After C<$timeout> seconds have passed,
605             the function will return up to C<$count> available items.
606              
607             For other timed methods, supplying a C<$count> argument for a
608             queue with more than zero but fewer than C<$count> items available
609             will return all available items without blocking.
610              
611              
612             =head2 peek
613              
614             $item = $queue->peek
615             $item = $queue->peek($index)
616             $item = $queue->peek_front
617             $item = $queue->peek_back
618              
619             Returns an item from the queue without removing it. The C
620             and C methods inspect the item at the front and the back of
621             the queue, respectively. The generic C method is equivalent to
622             C for FIFO style queues and C for LIFO style
623             queues. If an index is specified, returns the item at that position
624             in the queue (where position 0 is the head of the queue). Negative
625             indices are supported, so a call to C<< $queue->peek(-2) >>,
626             for example, would return the second to last item in the queue.
627              
628             If the queue is empty or if the specified index is larger than the
629             number of elements currently in the queue, these methods will
630             return C without blocking.
631              
632             Note that unlike the
633             L<< "peek" method in C|Thread::Queue/"peek" >>,
634             C returns a copy of the item on the queue,
635             so manipulating a reference returned from C while B
636             affect the item on the queue.
637              
638             =head2 extract
639              
640             $item = $queue->extract
641             $item = $queue->extract($index)
642             @items = $queue->extract($index,$count)
643              
644             Removes and returns the specified number of items from the queue
645             at the specified index position, to provide random access to the
646             queue. The method is non-blocking and may return fewer than the
647             number of items requested (or zero items) if there are not enough
648             items in the queue to satisfy the request.
649              
650             If the C<$count> argument is not provided, the method will return
651             (if available) a single item. If the C<$index> argument is also
652             not provided, it will return the first item on the queue exactly
653             like the L<"get_nb"> method with no arguments.
654              
655             Negative C<$index> values are supported, in which case this
656             method will extract the corresponding items at the back of the
657             queue.
658              
659             Like C vs. C, the return value is always a
660             scalar when no C<$count> argument is provided, and always a list
661             when it is.
662              
663              
664             =head2 insert
665              
666             $count = $queue->insert($index, @list)
667              
668             Provides random access to the queue, inserting the items specified
669             in C<@list> into the queue after index position C<$index>.
670             Negative C<$index> values are supported, which indicate that the
671             items should be inserted after that position relative to the
672             back of the queue. Returns the number of items that were
673             inserted into the queue.
674              
675             If the queue has a L<"limit"> set, and inserting all the items on
676             the list would cause the queue size to exceed the limit, the setting
677             of the queue's C<"on_limit">
678             parameter will govern how this method will behave.
679             See the L<"on_limit"|Forks::Queue/"new"> setting for details.
680              
681             This method is inefficient for some queue implementations.
682              
683              
684             =head2 pending
685              
686             $num_items_avail = $queue->pending
687              
688             Returns the total number of items available on the queue. There is no
689             guarantee that the number of available items will not change between a
690             call to C and a subsequent call to L<"get">
691              
692             =head2 clear
693              
694             $queue->clear
695              
696             Removes all items from the queue.
697              
698              
699             =head2 status
700              
701             $status = $queue->status
702              
703             Returns a hash reference with meta information about the queue.
704             The information should at least include the number of items remaining in
705             the queue. Other implementations may provide additional information
706             in this return value.
707              
708              
709             =head2 limit
710              
711             $max = $queue->limit
712             $queue->limit($new_limit)
713             $queue->limit($new_limit,$on_limit)
714             $queue->limit = $new_limit # limit as lvalue requires Perl >=v5.14
715              
716             Returns or updates the maximum size of the queue. With no args, returns
717             the existing maximum queue size, with a non-positive value indicating
718             that the queue does not have a maximum size.
719              
720             The return value also acts as an lvalue through which the maximum
721             queue size can be set, and allows the C method to be used
722             in the same way as
723             L<< the C method in Thread::Queue|Thread::Queue/"limit" >>.
724             I<< Note: lvalue feature rqeuires Perl v5.14 or better. >>
725              
726             If arguments are provided, the first argument is used to set the
727             maximum queue size. A non-positive queue size can be specified to
728             indicate that the queue does not have a maximum size.
729            
730             The second argument, if provided, updates the behavior of the queue
731             when an attempt is made to add items beyond the maximum size.
732             See L<"on_limit"|Forks::Queue/"new"> for the recognized values
733             of this argument and how they affect the behavior of the
734             L<"put">/L<"push">/L<"enqueue">, L<"unshift">,
735             L<"insert">, and L<"dequeue"> methods.
736              
737              
738             =head1 VARIABLES
739              
740             =head2 %OPTS
741              
742             Global hash containing the set of default options for all
743             C constructors. Initially this hash contains the
744             key-value pairs
745              
746             impl "File"
747             style "fifo"
748             limit -1
749             on_limit "fail"
750              
751             but they may be changed at any time to affect all subsequently
752             constructed C objects. The global options can also
753             be set at import time with additional arguments for the C
754             statement.
755              
756             use Forks::Queue impl => 'SQLite'; # use SQLite queues by default
757             $Forks::Queue::OPTS{impl} = 'SQLite'; # equivalent run-time call
758            
759             use Forks::Queue
760             on_limit => 'block', limit => 10; # finite, blocking queues by default
761             $Forks::Queue::OPTS{limit} = 10;
762             $Forks::Queue::OPTS{on_limit} = 'block'; # equivalent run-time calls
763              
764             =head1 ENVIRONMENT
765              
766             Some environment variable settings that can affect this module:
767              
768             =over 4
769              
770             =item * FORKS_QUEUE_IMPL
771              
772             Specifies a default implementation to use, overriding the initial setting
773             of C<$Forks::Queue::OPTS{"impl"}>, in cases where the C
774             constructor is invoked without passing an C option.
775              
776             =item * FORKS_QUEUE_DEBUG
777              
778             If set to a true value, outputs information about the activity of
779             the queues to standard error.
780              
781             =item * FORKS_QUEUE_NOTIFY
782              
783             If set to a false value, disables use of signals on POSIX-y platforms
784             that may help improve queue performance
785              
786             =item * FORKS_QUEUE_DIR
787              
788             Specifies a directory to use for temporary queue files in the
789             L and
790             L implementations.
791             If this directory is not specified, the implementations will try to make
792             a reasonable choice based on your platform and other environment settings.
793              
794             =back
795              
796             =head1 DEPENDENCIES
797              
798             The C module and all its current implementations require
799             the L module.
800              
801             =head1 SEE ALSO
802              
803             L, L,
804             L, L,
805             L, L.
806              
807             =head1 SUPPORT
808              
809             You can find documentation for this module with the perldoc command.
810              
811             perldoc Forks::Queue
812              
813              
814             You can also look for information at:
815              
816             =over 4
817              
818             =item * RT: CPAN's request tracker (report bugs here)
819              
820             L
821              
822             =item * CPAN Ratings
823              
824             L
825              
826             =item * Search CPAN
827              
828             L
829              
830             =back
831              
832              
833             =head1 LICENSE AND COPYRIGHT
834              
835             Copyright (c) 2017-2019, Marty O'Brien.
836              
837             This library is free software; you can redistribute it and/or modify
838             it under the same terms as Perl itself, either Perl version 5.10.1 or,
839             at your option, any later version of Perl 5 you may have available.
840              
841             See http://dev.perl.org/licenses/ for more information.
842              
843             =cut
844              
845             # TODO:
846             #
847             # priorities
848             # Directory implementation (see Queue::Dir)
849             # Distinguish enqueue and put . enqueue should behave like
850             # Thread::Queue and only check the limit once