File Coverage

blib/lib/Thread/Queue/Priority.pm
Criterion Covered Total %
statement 96 159 60.3
branch 30 60 50.0
condition 21 42 50.0
subroutine 14 17 82.3
pod 8 8 100.0
total 169 286 59.0


line stmt bran cond sub pod time code
1             package Thread::Queue::Priority;
2              
3 4     4   152913 use strict;
  4         8  
  4         183  
4 4     4   20 use warnings;
  4         5  
  4         393  
5              
6             our $VERSION = '1.03';
7             $VERSION = eval $VERSION;
8              
9 4     4   10468 use threads::shared 1.21;
  4         6843  
  4         26  
10 4     4   354 use Scalar::Util qw(looks_like_number);
  4         8  
  4         678  
11              
12             # Carp errors from threads::shared calls should complain about caller
13             our @CARP_NOT = ("threads::shared");
14              
15             sub new {
16 3     3 1 809 my $class = shift;
17 4     4   3441 my %queue :shared = ();
  4         6621  
  4         7694  
  3         27  
18 3         125 my %self :shared = (
19             '_queue' => \%queue,
20             '_count' => 0,
21             '_ended' => 0,
22             );
23 3         76 return bless(\%self, $class);
24             }
25              
26             # add items to the tail of a queue
27             sub enqueue {
28 22     22 1 1117 my ($self, $item, $priority) = @_;
29 22         30 lock(%{$self});
  22         53  
30              
31             # if the queue has "ended" then we can't enqueue anything
32 22 50       72 if ($self->{'_ended'}) {
33 0         0 require Carp;
34 0         0 Carp::croak("'enqueue' method called on queue that has been 'end'ed");
35             }
36              
37 22         37 my $queue = $self->{'_queue'};
38 22 100       98 $priority = defined($priority) ? $self->_validate_priority($priority) : 50;
39              
40             # if the priority group hasn't been created then create it
41 22         87 my @group :shared = ();
42 22 100       601 $queue->{$priority} = \@group unless exists($queue->{$priority});
43              
44             # increase our global count
45 22         40 ++$self->{'_count'};
46              
47             # add the new item to the priority list and signal that we're done
48 22 50       25 push(@{$self->{'_queue'}->{$priority}}, shared_clone($item)) and cond_signal(%{$self});
  22         8327  
  22         92  
49             }
50              
51             # return a count of the number of items on a queue
52             sub pending {
53 6     6 1 1162 my $self = shift;
54 6         10 lock(%{$self});
  6         14  
55              
56             # return undef if the queue has ended and is empty
57 6 50 33     27 return if $self->{'_ended'} && !$self->{'_count'};
58 6         33 return $self->{'_count'};
59             }
60              
61             # indicate that no more data will enter the queue
62             sub end {
63 0     0 1 0 my $self = shift;
64 0         0 lock(%{$self});
  0         0  
65              
66             # no more data is coming
67 0         0 $self->{'_ended'} = 1;
68              
69             # try to release at least one blocked thread
70 0         0 cond_signal(%{$self});
  0         0  
71             }
72              
73             # return 1 or more items from the head of a queue, blocking if needed
74             sub dequeue {
75 12     12 1 21165 my $self = shift;
76 12         21 lock(%{$self});
  12         31  
77              
78 12         30 my $queue = $self->{'_queue'};
79 12 100       47 my $count = scalar(@_) ? $self->_validate_count(shift(@_)) : 1;
80              
81             # wait for requisite number of items
82 7   33     34 cond_wait(%{$self}) while (($self->{'_count'} < $count) && ! $self->{'_ended'});
  0         0  
83 7 100 66     38 cond_signal(%{$self}) if (($self->{'_count'} > $count) || $self->{'_ended'});
  6         225  
84              
85             # if no longer blocking, try getting whatever is left on the queue
86 7 50       33 return $self->dequeue_nb($count) if ($self->{'_ended'});
87              
88             # return single item
89 7 50       21 if ($count == 1) {
90 7         12 for my $priority (sort keys %{$queue}) {
  7         40  
91 7 50       10 if (scalar(@{$queue->{$priority}})) {
  7         24  
92 7         8 --$self->{'_count'};
93 7         8 return shift(@{$queue->{$priority}});
  7         31  
94             }
95             }
96 0         0 return;
97             }
98              
99             # return multiple items
100 0         0 my @items = ();
101 0         0 for (1 .. $count) {
102 0         0 for my $priority (sort keys %{$queue}) {
  0         0  
103 0 0       0 if (scalar(@{$queue->{$priority}})) {
  0         0  
104 0         0 --$self->{'_count'};
105 0         0 push(@items, shift(@{$queue->{$priority}}));
  0         0  
106             }
107             }
108             }
109 0         0 return @items;
110             }
111              
112             # return items from the head of a queue with no blocking
113             sub dequeue_nb {
114 6     6 1 2464 my $self = shift;
115 6         8 lock(%{$self});
  6         13  
116              
117 6         15 my $queue = $self->{'_queue'};
118 6 100       24 my $count = scalar(@_) ? $self->_validate_count(shift(@_)) : 1;
119              
120             # return single item
121 1 50       5 if ($count == 1) {
122 1         2 for my $priority (sort keys %{$queue}) {
  1         7  
123 1 50       7 if (scalar(@{$queue->{$priority}})) {
  1         8  
124 0         0 --$self->{'_count'};
125 0         0 return shift(@{$queue->{$priority}});
  0         0  
126             }
127             }
128 1         6 return;
129             }
130              
131             # return multiple items
132 0         0 my @items = ();
133 0         0 for (1 .. $count) {
134 0         0 for my $priority (sort keys %{$queue}) {
  0         0  
135 0 0       0 if (scalar(@{$queue->{$priority}})) {
  0         0  
136 0         0 --$self->{'_count'};
137 0         0 push(@items, shift(@{$queue->{$priority}}));
  0         0  
138             }
139             }
140             }
141              
142 0         0 return @items;
143             }
144              
145             # return items from the head of a queue, blocking if needed up to a timeout
146             sub dequeue_timed {
147 0     0 1 0 my $self = shift;
148 0         0 lock(%{$self});
  0         0  
149              
150 0         0 my $queue = $self->{'_queue'};
151 0 0       0 my $timeout = scalar(@_) ? $self->_validate_timeout(shift(@_)) : -1;
152 0 0       0 my $count = scalar(@_) ? $self->_validate_count(shift(@_)) : 1;
153              
154             # timeout may be relative or absolute
155             # convert to an absolute time for use with cond_timedwait()
156             # so if the timeout is less than a year then we assume it's relative
157 0 0       0 $timeout += time() if ($timeout < 322000000); # more than one year
158              
159             # wait for requisite number of items, or until timeout
160 0   0     0 while ($self->{'_count'} < $count && !$self->{'_ended'}) {
161 0 0       0 last unless cond_timedwait(%{$self}, $timeout);
  0         0  
162             }
163 0 0 0     0 cond_signal(%{$self}) if (($self->{'_count'} > $count) || $self->{'_ended'});
  0         0  
164              
165             # get whatever we need off the queue if available
166 0         0 return $self->dequeue_nb($count);
167             }
168              
169             # return an item without removing it from a queue
170             sub peek {
171 18     18 1 1539 my $self = shift;
172 18         24 lock(%{$self});
  18         35  
173              
174 18         38 my $queue = $self->{'_queue'};
175 18 100       697 my $index = scalar(@_) ? $self->_validate_index(shift(@_)) : 0;
176              
177 15         20 for my $priority (sort keys %{$queue}) {
  15         79  
178 28         27 my $size = scalar(@{$queue->{$priority}});
  28         59  
179 28 100       75 if ($index < $size) {
180 15         98 return $queue->{$priority}->[$index];
181             } else {
182 13         60 $index = ($index - $size);
183             }
184             }
185              
186 0         0 return;
187             }
188              
189             ### internal functions ###
190              
191             # check value of the requested index
192             sub _validate_index {
193 16     16   30 my ($self, $index) = @_;
194              
195 16 100 100     170 if (!defined($index) || !looks_like_number($index) || (int($index) != $index)) {
      100        
196 3         17 require Carp;
197 3         18 my ($method) = (caller(1))[3];
198 3         8 my $class_name = ref($self);
199 3         30 $method =~ s/${class_name}:://;
200 3 100       9 $index = 'undef' unless defined($index);
201 3         420 Carp::croak("Invalid 'index' argument (${index}) to '${method}' method");
202             }
203              
204 13         25 return $index;
205             }
206              
207             # check value of the requested count
208             sub _validate_count {
209 10     10   15 my ($self, $count) = @_;
210              
211 10 50 100     119 if (!defined($count) || !looks_like_number($count) || (int($count) != $count) || ($count < 1)) {
      100        
      66        
212 10         66 require Carp;
213 10         55 my ($method) = (caller(1))[3];
214 10         24 my $class_name = ref($self);
215 10         71 $method =~ s/${class_name}:://;
216 10 100       28 $count = 'undef' unless defined($count);
217 10         1496 Carp::croak("Invalid 'count' argument (${count}) to '${method}' method");
218             }
219              
220 0         0 return $count;
221             }
222              
223             # check value of the requested timeout
224             sub _validate_timeout {
225 0     0   0 my ($self, $timeout) = @_;
226              
227 0 0 0     0 if (!defined($timeout) || !looks_like_number($timeout)) {
228 0         0 require Carp;
229 0         0 my ($method) = (caller(1))[3];
230 0         0 my $class_name = ref($self);
231 0         0 $method =~ s/${class_name}:://;
232 0 0       0 $timeout = 'undef' unless defined($timeout);
233 0         0 Carp::croak("Invalid 'timeout' argument (${timeout}) to '${method}' method");
234             }
235              
236 0         0 return $timeout;
237             }
238              
239             # check value of the requested timeout
240             sub _validate_priority {
241 4     4   8 my ($self, $priority) = @_;
242              
243 4 50 33     58 if (!defined($priority) || !looks_like_number($priority) || (int($priority) != $priority) || ($priority < 0)) {
      33        
      33        
244 0         0 require Carp;
245 0         0 my ($method) = (caller(1))[3];
246 0         0 my $class_name = ref($self);
247 0         0 $method =~ s/${class_name}:://;
248 0 0       0 $priority = 'undef' unless defined($priority);
249 0         0 Carp::croak("Invalid 'priority' argument (${priority}) to '${method}' method");
250             }
251              
252 4         9 return $priority;
253             }
254              
255             1;
256              
257             =head1 NAME
258              
259             Thread::Queue::Priority - Thread-safe queues with priorities
260              
261             =head1 VERSION
262              
263             This document describes Thread::Queue::Priority version 1.03
264              
265             =head1 SYNOPSIS
266              
267             use strict;
268             use warnings;
269              
270             use threads;
271             use Thread::Queue::Priority;
272              
273             # create a new empty queue
274             my $q = Thread::Queue::Priority->new();
275              
276             # add a new element with default priority 50
277             $q->enqueue("foo");
278              
279             # add a new element with priority 1
280             $q->enqueue("foo", 1);
281              
282             # dequeue the highest priority on the queue
283             my $value = $q->dequeue();
284              
285             =head1 DESCRIPTION
286              
287             This is a variation on L that will dequeue items based on their
288             priority. This module is NOT a drop-in replacement for L as it
289             does not implement all of its methods as they don't all make sense. However,
290             for the methods implemented and described below, consider the functionality to
291             be the same as that of L.
292              
293             =head1 QUEUE CREATION
294              
295             =over
296              
297             =item ->new()
298              
299             Creates a new empty queue. A list cannot be created with items already on it.
300              
301             =item ->enqueue(ITEM, PRIORITY)
302              
303             Adds an item onto the queue with the givern priority. Only one item may be
304             added at a time. If no priority is given, it is given a default value of 50.
305             There are no constraints on the priority number with the exception that it must
306             be greater than zero and it must be a number. The smaller the number, the
307             greater the priority.
308              
309             =item ->dequeue()
310              
311             =item ->dequeue(COUNT)
312              
313             Removes and returns the requested number of items (default is 1) in priority
314             order where smaller numbers indicate greater priority. If the queue contains
315             fewer than the requested number of items, then the thread will be blocked until
316             the requisite number of items are available (i.e., until other threads
317             more items).
318              
319             =item ->dequeue_nb()
320              
321             =item ->dequeue_nb(COUNT)
322              
323             This functions the same as C but it will not block if the queue is
324             empty or the queue does not have COUNT items. Instead it will return whatever
325             is on the queue up to COUNT, or C if the queue is empty. Again, items
326             will come off the queue in priority order where smaller numbers have a higher
327             priority.
328              
329             =item ->dequeue_timed(TIMEOUT)
330              
331             =item ->dequeue_timed(TIMEOUT, COUNT)
332              
333             This functions the same as C but will only block for the length of the
334             given timeout. If the timeout is reached, it returns whatever items there are
335             on the queue, or C if the queue is empty. Again, items will come off the
336             queue in priority order where smaller numbers have a higher priority.
337              
338             The timeout may be a number of seconds relative to the current time (e.g., 5
339             seconds from when the call is made), or may be an absolute timeout in I
340             seconds the same as would be used with
341             L.
342             Fractional seconds (e.g., 2.5 seconds) are also supported (to the extent of
343             the underlying implementation).
344              
345             If C is missing, C, or less than or equal to 0, then this call
346             behaves the same as C.
347              
348             =item ->pending()
349              
350             Returns the number of items still in the queue. Returns C if the queue
351             has been ended (see below), and there are no more items in the queue.
352              
353             =item ->end()
354              
355             Declares that no more items will be added to the queue.
356              
357             All threads blocking on C calls will be unblocked with any
358             remaining items in the queue and/or C being returned. Any subsequent
359             calls to C will behave like C.
360              
361             Once ended, no more items may be placed in the queue.
362              
363             =item ->peek(INDEX)
364              
365             Returns n item from the queue without dequeuing anything. Defaults to the
366             the head of queue (at index position 0) if no index is specified. Negative
367             index values are supported as with L (i.e., -1
368             is the end of the queue, -2 is next to last, and so on).
369              
370             If no items exists at the specified index (i.e., the queue is empty, or the
371             index is beyond the number of items on the queue), then C is returned.
372              
373             Remember, the returned item is not removed from the queue, so manipulating a
374             Ced at reference affects the item on the queue.
375              
376             =back
377              
378             =head1 SEE ALSO
379              
380             L, L, L
381              
382             =head1 MAINTAINER
383              
384             Paul Lockaby Splockaby AT cpan DOT orgE>
385              
386             =head1 CREDIT
387              
388             Significant portions of this module are directly from L which is
389             maintained by Jerry D. Hedden, .
390              
391             =head1 LICENSE
392              
393             This program is free software; you can redistribute it and/or modify it under
394             the same terms as Perl itself.
395              
396             =cut