File Coverage

blib/lib/Thread/Queue.pm
Criterion Covered Total %
statement 78 146 53.4
branch 22 64 34.3
condition 18 36 50.0
subroutine 15 19 78.9
pod 11 11 100.0
total 144 276 52.1


line stmt bran cond sub pod time code
1             package Thread::Queue;
2              
3 3     3   13865 use strict;
  3         3  
  3         70  
4 3     3   8 use warnings;
  3         4  
  3         127  
5              
6             our $VERSION = '3.11';
7             $VERSION = eval $VERSION;
8              
9 3     3   1351 use threads::shared 1.21;
  3         2550  
  3         11  
10 3     3   160 use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
  3         39  
  3         208  
11              
12             # Carp errors from threads::shared calls should complain about caller
13             our @CARP_NOT = ("threads::shared");
14              
15             # Create a new queue possibly pre-populated with items
16             sub new
17             {
18 2     2 1 745 my $class = shift;
19 3     3   1473 my @queue :shared = map { shared_clone($_) } @_;
  3         2637  
  3         3153  
  2         4  
  12         74  
20 2         73 my %self :shared = ( 'queue' => \@queue );
21 2         32 return bless(\%self, $class);
22             }
23              
24             # Add items to the tail of a queue
25             sub enqueue
26             {
27 2     2 1 3 my $self = shift;
28 2         3 lock(%$self);
29              
30 2 50       6 if ($$self{'ENDED'}) {
31 0         0 require Carp;
32 0         0 Carp::croak("'enqueue' method called on queue that has been 'end'ed");
33             }
34              
35             # Block if queue size exceeds any specified limit
36 2         3 my $queue = $$self{'queue'};
37 2   33     6 cond_wait(%$self) while ($$self{'LIMIT'} && (@$queue >= $$self{'LIMIT'}));
38              
39             # Add items to queue, and then signal other threads
40 2 50       4 push(@$queue, map { shared_clone($_) } @_)
  5         76  
41             and cond_signal(%$self);
42             }
43              
44             # Set or return the max. size for a queue
45             sub limit : lvalue
46             {
47 0     0 1 0 my $self = shift;
48 0         0 lock(%$self);
49 0         0 $$self{'LIMIT'};
50             }
51              
52             # Return a count of the number of items on a queue
53             sub pending
54             {
55 6     6 1 902 my $self = shift;
56 6         6 lock(%$self);
57 6 50 33     15 return if ($$self{'ENDED'} && ! @{$$self{'queue'}});
  0         0  
58 6         6 return scalar(@{$$self{'queue'}});
  6         17  
59             }
60              
61             # Indicate that no more data will enter the queue
62             sub end
63             {
64 0     0 1 0 my $self = shift;
65 0         0 lock(%$self);
66             # No more data is coming
67 0         0 $$self{'ENDED'} = 1;
68             # Try to release at least one blocked thread
69 0         0 cond_signal(%$self);
70             }
71              
72             # Return 1 or more items from the head of a queue, blocking if needed
73             sub dequeue
74             {
75 12     12 1 11018 my $self = shift;
76 12         16 lock(%$self);
77 12         14 my $queue = $$self{'queue'};
78              
79 12 100       46 my $count = @_ ? $self->_validate_count(shift) : 1;
80              
81             # Wait for requisite number of items
82 7   33     16 cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'});
83 7 50 33     169 cond_signal(%$self) if ((@$queue >= $count) || $$self{'ENDED'});
84              
85             # If no longer blocking, try getting whatever is left on the queue
86 7 50       31 return $self->dequeue_nb($count) if ($$self{'ENDED'});
87              
88             # Return single item
89 7 50       19 return shift(@$queue) if ($count == 1);
90              
91             # Return multiple items
92 0         0 my @items;
93 0         0 push(@items, shift(@$queue)) for (1..$count);
94 0         0 return @items;
95             }
96              
97             # Return items from the head of a queue with no blocking
98             sub dequeue_nb
99             {
100 6     6 1 2208 my $self = shift;
101 6         8 lock(%$self);
102 6         6 my $queue = $$self{'queue'};
103              
104 6 100       30 my $count = @_ ? $self->_validate_count(shift) : 1;
105              
106             # Return single item
107 1 50       4 return shift(@$queue) if ($count == 1);
108              
109             # Return multiple items
110 0         0 my @items;
111 0         0 for (1..$count) {
112 0 0       0 last if (! @$queue);
113 0         0 push(@items, shift(@$queue));
114             }
115 0         0 return @items;
116             }
117              
118             # Return items from the head of a queue, blocking if needed up to a timeout
119             sub dequeue_timed
120             {
121 0     0 1 0 my $self = shift;
122 0         0 lock(%$self);
123 0         0 my $queue = $$self{'queue'};
124              
125             # Timeout may be relative or absolute
126 0 0       0 my $timeout = @_ ? $self->_validate_timeout(shift) : -1;
127             # Convert to an absolute time for use with cond_timedwait()
128 0 0       0 if ($timeout < 32000000) { # More than one year
129 0         0 $timeout += time();
130             }
131              
132 0 0       0 my $count = @_ ? $self->_validate_count(shift) : 1;
133              
134             # Wait for requisite number of items, or until timeout
135 0   0     0 while ((@$queue < $count) && ! $$self{'ENDED'}) {
136 0 0       0 last if (! cond_timedwait(%$self, $timeout));
137             }
138 0 0 0     0 cond_signal(%$self) if ((@$queue >= $count) || $$self{'ENDED'});
139              
140             # Get whatever we need off the queue if available
141 0         0 return $self->dequeue_nb($count);
142             }
143              
144             # Return an item without removing it from a queue
145             sub peek
146             {
147 4     4 1 1238 my $self = shift;
148 4         5 lock(%$self);
149 4 50       16 my $index = @_ ? $self->_validate_index(shift) : 0;
150 1         1 return $$self{'queue'}[$index];
151             }
152              
153             # Insert items anywhere into a queue
154             sub insert
155             {
156 4     4 1 2130 my $self = shift;
157 4         6 lock(%$self);
158              
159 4 50       10 if ($$self{'ENDED'}) {
160 0         0 require Carp;
161 0         0 Carp::croak("'insert' method called on queue that has been 'end'ed");
162             }
163              
164 4         4 my $queue = $$self{'queue'};
165              
166 4         20 my $index = $self->_validate_index(shift);
167              
168 0 0       0 return if (! @_); # Nothing to insert
169              
170             # Support negative indices
171 0 0       0 if ($index < 0) {
172 0         0 $index += @$queue;
173 0 0       0 if ($index < 0) {
174 0         0 $index = 0;
175             }
176             }
177              
178             # Dequeue items from $index onward
179 0         0 my @tmp;
180 0         0 while (@$queue > $index) {
181 0         0 unshift(@tmp, pop(@$queue))
182             }
183              
184             # Add new items to the queue
185 0         0 push(@$queue, map { shared_clone($_) } @_);
  0         0  
186              
187             # Add previous items back onto the queue
188 0         0 push(@$queue, @tmp);
189              
190             # Soup's up
191 0         0 cond_signal(%$self);
192             }
193              
194             # Remove items from anywhere in a queue
195             sub extract
196             {
197 8     8 1 3642 my $self = shift;
198 8         10 lock(%$self);
199 8         9 my $queue = $$self{'queue'};
200              
201 8 50       19 my $index = @_ ? $self->_validate_index(shift) : 0;
202 5 50       12 my $count = @_ ? $self->_validate_count(shift) : 1;
203              
204             # Support negative indices
205 0 0       0 if ($index < 0) {
206 0         0 $index += @$queue;
207 0 0       0 if ($index < 0) {
208 0         0 $count += $index;
209 0 0       0 return if ($count <= 0); # Beyond the head of the queue
210 0         0 return $self->dequeue_nb($count); # Extract from the head
211             }
212             }
213              
214             # Dequeue items from $index+$count onward
215 0         0 my @tmp;
216 0         0 while (@$queue > ($index+$count)) {
217 0         0 unshift(@tmp, pop(@$queue))
218             }
219              
220             # Extract desired items
221 0         0 my @items;
222 0         0 unshift(@items, pop(@$queue)) while (@$queue > $index);
223              
224             # Add back any removed items
225 0         0 push(@$queue, @tmp);
226              
227             # Return single item
228 0 0       0 return $items[0] if ($count == 1);
229              
230             # Return multiple items
231 0         0 return @items;
232             }
233              
234             ### Internal Methods ###
235              
236             # Check value of the requested index
237             sub _validate_index
238             {
239 16     16   14 my $self = shift;
240 16         11 my $index = shift;
241              
242 16 100 100     106 if (! defined($index) ||
      100        
243             ! looks_like_number($index) ||
244             (int($index) != $index))
245             {
246 10         44 require Carp;
247 10         53 my ($method) = (caller(1))[3];
248 10         14 my $class_name = ref($self);
249 10         45 $method =~ s/$class_name\:://;
250 10 100       20 $index = 'undef' if (! defined($index));
251 10         907 Carp::croak("Invalid 'index' argument ($index) to '$method' method");
252             }
253              
254 6         8 return $index;
255             };
256              
257             # Check value of the requested count
258             sub _validate_count
259             {
260 15     15   16 my $self = shift;
261 15         11 my $count = shift;
262              
263 15 50 100     104 if (! defined($count) ||
      100        
      66        
264             ! looks_like_number($count) ||
265             (int($count) != $count) ||
266             ($count < 1))
267             {
268 15         65 require Carp;
269 15         70 my ($method) = (caller(1))[3];
270 15         22 my $class_name = ref($self);
271 15         59 $method =~ s/$class_name\:://;
272 15 100       28 $count = 'undef' if (! defined($count));
273 15         1266 Carp::croak("Invalid 'count' argument ($count) to '$method' method");
274             }
275              
276 0           return $count;
277             };
278              
279             # Check value of the requested timeout
280             sub _validate_timeout
281             {
282 0     0     my $self = shift;
283 0           my $timeout = shift;
284              
285 0 0 0       if (! defined($timeout) ||
286             ! looks_like_number($timeout))
287             {
288 0           require Carp;
289 0           my ($method) = (caller(1))[3];
290 0           my $class_name = ref($self);
291 0           $method =~ s/$class_name\:://;
292 0 0         $timeout = 'undef' if (! defined($timeout));
293 0           Carp::croak("Invalid 'timeout' argument ($timeout) to '$method' method");
294             }
295              
296 0           return $timeout;
297             };
298              
299             1;
300              
301             =head1 NAME
302              
303             Thread::Queue - Thread-safe queues
304              
305             =head1 VERSION
306              
307             This document describes Thread::Queue version 3.11
308              
309             =head1 SYNOPSIS
310              
311             use strict;
312             use warnings;
313              
314             use threads;
315             use Thread::Queue;
316              
317             my $q = Thread::Queue->new(); # A new empty queue
318              
319             # Worker thread
320             my $thr = threads->create(
321             sub {
322             # Thread will loop until no more work
323             while (defined(my $item = $q->dequeue())) {
324             # Do work on $item
325             ...
326             }
327             }
328             );
329              
330             # Send work to the thread
331             $q->enqueue($item1, ...);
332             # Signal that there is no more work to be sent
333             $q->end();
334             # Join up with the thread when it finishes
335             $thr->join();
336              
337             ...
338              
339             # Count of items in the queue
340             my $left = $q->pending();
341              
342             # Non-blocking dequeue
343             if (defined(my $item = $q->dequeue_nb())) {
344             # Work on $item
345             }
346              
347             # Blocking dequeue with 5-second timeout
348             if (defined(my $item = $q->dequeue_timed(5))) {
349             # Work on $item
350             }
351              
352             # Set a size for a queue
353             $q->limit = 5;
354              
355             # Get the second item in the queue without dequeuing anything
356             my $item = $q->peek(1);
357              
358             # Insert two items into the queue just behind the head
359             $q->insert(1, $item1, $item2);
360              
361             # Extract the last two items on the queue
362             my ($item1, $item2) = $q->extract(-2, 2);
363              
364             =head1 DESCRIPTION
365              
366             This module provides thread-safe FIFO queues that can be accessed safely by
367             any number of threads.
368              
369             Any data types supported by L can be passed via queues:
370              
371             =over
372              
373             =item Ordinary scalars
374              
375             =item Array refs
376              
377             =item Hash refs
378              
379             =item Scalar refs
380              
381             =item Objects based on the above
382              
383             =back
384              
385             Ordinary scalars are added to queues as they are.
386              
387             If not already thread-shared, the other complex data types will be cloned
388             (recursively, if needed, and including any Cings and read-only
389             settings) into thread-shared structures before being placed onto a queue.
390              
391             For example, the following would cause L to create a empty,
392             shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
393             and 'baz' from C<@ary> into it, and then place that shared reference onto
394             the queue:
395              
396             my @ary = qw/foo bar baz/;
397             $q->enqueue(\@ary);
398              
399             However, for the following, the items are already shared, so their references
400             are added directly to the queue, and no cloning takes place:
401              
402             my @ary :shared = qw/foo bar baz/;
403             $q->enqueue(\@ary);
404              
405             my $obj = &shared({});
406             $$obj{'foo'} = 'bar';
407             $$obj{'qux'} = 99;
408             bless($obj, 'My::Class');
409             $q->enqueue($obj);
410              
411             See L for caveats related to passing objects via queues.
412              
413             =head1 QUEUE CREATION
414              
415             =over
416              
417             =item ->new()
418              
419             Creates a new empty queue.
420              
421             =item ->new(LIST)
422              
423             Creates a new queue pre-populated with the provided list of items.
424              
425             =back
426              
427             =head1 BASIC METHODS
428              
429             The following methods deal with queues on a FIFO basis.
430              
431             =over
432              
433             =item ->enqueue(LIST)
434              
435             Adds a list of items onto the end of the queue.
436              
437             =item ->dequeue()
438              
439             =item ->dequeue(COUNT)
440              
441             Removes the requested number of items (default is 1) from the head of the
442             queue, and returns them. If the queue contains fewer than the requested
443             number of items, then the thread will be blocked until the requisite number
444             of items are available (i.e., until other threads C more items).
445              
446             =item ->dequeue_nb()
447              
448             =item ->dequeue_nb(COUNT)
449              
450             Removes the requested number of items (default is 1) from the head of the
451             queue, and returns them. If the queue contains fewer than the requested
452             number of items, then it immediately (i.e., non-blocking) returns whatever
453             items there are on the queue. If the queue is empty, then C is
454             returned.
455              
456             =item ->dequeue_timed(TIMEOUT)
457              
458             =item ->dequeue_timed(TIMEOUT, COUNT)
459              
460             Removes the requested number of items (default is 1) from the head of the
461             queue, and returns them. If the queue contains fewer than the requested
462             number of items, then the thread will be blocked until the requisite number of
463             items are available, or until the timeout is reached. If the timeout is
464             reached, it returns whatever items there are on the queue, or C if the
465             queue is empty.
466              
467             The timeout may be a number of seconds relative to the current time (e.g., 5
468             seconds from when the call is made), or may be an absolute timeout in I
469             seconds the same as would be used with
470             L.
471             Fractional seconds (e.g., 2.5 seconds) are also supported (to the extent of
472             the underlying implementation).
473              
474             If C is missing, C, or less than or equal to 0, then this call
475             behaves the same as C.
476              
477             =item ->pending()
478              
479             Returns the number of items still in the queue. Returns C if the queue
480             has been ended (see below), and there are no more items in the queue.
481              
482             =item ->limit
483              
484             Sets the size of the queue. If set, calls to C will block until
485             the number of pending items in the queue drops below the C. The
486             C does not prevent enqueuing items beyond that count:
487              
488             my $q = Thread::Queue->new(1, 2);
489             $q->limit = 4;
490             $q->enqueue(3, 4, 5); # Does not block
491             $q->enqueue(6); # Blocks until at least 2 items are
492             # dequeued
493             my $size = $q->limit; # Returns the current limit (may return
494             # 'undef')
495             $q->limit = 0; # Queue size is now unlimited
496              
497             =item ->end()
498              
499             Declares that no more items will be added to the queue.
500              
501             All threads blocking on C calls will be unblocked with any
502             remaining items in the queue and/or C being returned. Any subsequent
503             calls to C will behave like C.
504              
505             Once ended, no more items may be placed in the queue.
506              
507             =back
508              
509             =head1 ADVANCED METHODS
510              
511             The following methods can be used to manipulate items anywhere in a queue.
512              
513             To prevent the contents of a queue from being modified by another thread
514             while it is being examined and/or changed, L
515             VARIABLE"> the queue inside a local block:
516              
517             {
518             lock($q); # Keep other threads from changing the queue's contents
519             my $item = $q->peek();
520             if ($item ...) {
521             ...
522             }
523             }
524             # Queue is now unlocked
525              
526             =over
527              
528             =item ->peek()
529              
530             =item ->peek(INDEX)
531              
532             Returns an item from the queue without dequeuing anything. Defaults to the
533             the head of queue (at index position 0) if no index is specified. Negative
534             index values are supported as with L (i.e., -1
535             is the end of the queue, -2 is next to last, and so on).
536              
537             If no items exists at the specified index (i.e., the queue is empty, or the
538             index is beyond the number of items on the queue), then C is returned.
539              
540             Remember, the returned item is not removed from the queue, so manipulating a
541             Ced at reference affects the item on the queue.
542              
543             =item ->insert(INDEX, LIST)
544              
545             Adds the list of items to the queue at the specified index position (0
546             is the head of the list). Any existing items at and beyond that position are
547             pushed back past the newly added items:
548              
549             $q->enqueue(1, 2, 3, 4);
550             $q->insert(1, qw/foo bar/);
551             # Queue now contains: 1, foo, bar, 2, 3, 4
552              
553             Specifying an index position greater than the number of items in the queue
554             just adds the list to the end.
555              
556             Negative index positions are supported:
557              
558             $q->enqueue(1, 2, 3, 4);
559             $q->insert(-2, qw/foo bar/);
560             # Queue now contains: 1, 2, foo, bar, 3, 4
561              
562             Specifying a negative index position greater than the number of items in the
563             queue adds the list to the head of the queue.
564              
565             =item ->extract()
566              
567             =item ->extract(INDEX)
568              
569             =item ->extract(INDEX, COUNT)
570              
571             Removes and returns the specified number of items (defaults to 1) from the
572             specified index position in the queue (0 is the head of the queue). When
573             called with no arguments, C operates the same as C.
574              
575             This method is non-blocking, and will return only as many items as are
576             available to fulfill the request:
577              
578             $q->enqueue(1, 2, 3, 4);
579             my $item = $q->extract(2) # Returns 3
580             # Queue now contains: 1, 2, 4
581             my @items = $q->extract(1, 3) # Returns (2, 4)
582             # Queue now contains: 1
583              
584             Specifying an index position greater than the number of items in the
585             queue results in C or an empty list being returned.
586              
587             $q->enqueue('foo');
588             my $nada = $q->extract(3) # Returns undef
589             my @nada = $q->extract(1, 3) # Returns ()
590              
591             Negative index positions are supported. Specifying a negative index position
592             greater than the number of items in the queue may return items from the head
593             of the queue (similar to C) if the count overlaps the head of the
594             queue from the specified position (i.e. if queue size + index + count is
595             greater than zero):
596              
597             $q->enqueue(qw/foo bar baz/);
598             my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0
599             my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0
600             # Queue now contains: bar, baz
601             my @rest = $q->extract(-3, 4); # Returns (bar, baz) -
602             # (2+(-3)+4) > 0
603              
604             =back
605              
606             =head1 NOTES
607              
608             Queues created by L can be used in both threaded and
609             non-threaded applications.
610              
611             =head1 LIMITATIONS
612              
613             Passing objects on queues may not work if the objects' classes do not support
614             sharing. See L for more.
615              
616             Passing array/hash refs that contain objects may not work for Perl prior to
617             5.10.0.
618              
619             =head1 SEE ALSO
620              
621             Thread::Queue on MetaCPAN:
622             L
623              
624             Code repository for CPAN distribution:
625             L
626              
627             L, L
628              
629             Sample code in the I directory of this distribution on CPAN.
630              
631             =head1 MAINTAINER
632              
633             Jerry D. Hedden, Sjdhedden AT cpan DOT orgE>
634              
635             =head1 LICENSE
636              
637             This program is free software; you can redistribute it and/or modify it under
638             the same terms as Perl itself.
639              
640             =cut