File Coverage

blib/lib/Thread/Queue/MaxSize.pm
Criterion Covered Total %
statement 91 111 81.9
branch 31 46 67.3
condition 24 41 58.5
subroutine 10 10 100.0
pod 3 3 100.0
total 159 211 75.3


line stmt bran cond sub pod time code
1             package Thread::Queue::MaxSize;
2              
3 8     8   150747 use strict;
  8         19  
  8         352  
4 8     8   43 use warnings;
  8         15  
  8         546  
5              
6             our $VERSION = '1.02';
7             $VERSION = eval $VERSION;
8              
9 8     8   7290 use parent qw(Thread::Queue);
  8         2761  
  8         43  
10              
11 8     8   878530 use threads::shared 1.21;
  8         179  
  8         57  
12 8     8   691 use Scalar::Util 1.10 qw(looks_like_number);
  8         139  
  8         11497  
13              
14             sub new {
15 17     17 1 11442 my ($class, $config) = @_;
16 17         214 my $self = $class->SUPER::new();
17              
18 17 50 33     1198 if ($config && (!ref($config) || ref($config) ne "HASH")) {
      33        
19 0         0 require Carp;
20 0         0 Carp::croak("invalid first argument to constructor -- must be a hashref with any configuration options");
21             }
22              
23             # make sure that maxsize is actually a number
24 17 50       98 my $maxsize = ($config) ? $config->{'maxsize'} : undef;
25 17         59 $self->{'MAXSIZE'} = $self->_validate_maxsize($maxsize);
26              
27             # determine what type of action we'll take on exceeding our max size
28             # 1. raise an exception (die)
29             # 2. warn and reject entire addition/insertion
30             # 3. silently reject entire addition/insertion
31             # 4. warn, process addition/insertion and then truncate to max size
32             # 5. silently process addition/insertion and then truncate to max size
33 17 50       74 my $on_maxsize = ($config) ? $config->{'on_maxsize'} : undef;
34 17   100     79 $self->{'ON_MAXSIZE'} = $self->_validate_on_maxsize($on_maxsize || 'silent_truncate');
35              
36 17         78 return $self;
37             }
38              
39             # add items to the tail of a queue
40             sub enqueue {
41 84     84 1 24302 my $self = shift;
42 84         119 lock(%$self);
43              
44 84 50       258 if ($self->{'ENDED'}) {
45 0         0 require Carp;
46 0         0 Carp::croak("'enqueue' method called on queue that has been 'end'ed");
47             }
48              
49 84         112 my $queue = $self->{'queue'};
50              
51             # queue can't be too big so shift the oldest things off if necessary
52 84 100 66     2119 if (defined($self->{'MAXSIZE'}) && $self->{'MAXSIZE'} > 0) {
53 81 100 100     781 if ((scalar(@{$queue}) + scalar(@_)) > $self->{'MAXSIZE'} &&
  81         983  
54             $self->{'ON_MAXSIZE'} =~ /^(die|warn_and_reject|silent_reject|warn_and_truncate)$/ix) {
55 35 100       173 if ($1 =~ /^warn_and_truncate$/ix) {
    100          
    100          
    50          
56 11         66 warn "queue exceeding its maximum size: truncating\n";
57             } elsif ($1 =~ /^silent_reject$/ix) {
58 11         21 return;
59             } elsif ($1 =~ /^warn_and_reject$/ix) {
60 11         1018 warn "not enqueuing new items: queue would exceed its maximum size\n";
61 11         58 return;
62             } elsif ($1 =~ /^die$/ix) {
63 2         17 die "not enqueuing new items: queue would exceed its maximum size\n";
64             }
65             }
66              
67             # remove things already on the queue
68 57   100     70 while (scalar(@{$queue}) && (scalar(@{$queue}) + scalar(@_)) > $self->{'MAXSIZE'}) {
  79         180  
  64         213  
69 22         23 shift(@{$queue});
  22         29  
70             }
71              
72             # if we've already removed everything off of the queue and we're still
73             # over maxsize then take things off of the list of new items
74 57   33     1225 while (scalar(@_) && (scalar(@_)) > $self->{'MAXSIZE'}) {
75 0         0 shift(@_);
76             }
77             }
78              
79 60 50       69 push(@{$queue}, map { shared_clone($_) } @_) and cond_signal(%$self);
  60         224  
  179         1077  
80             }
81              
82             # insert items anywhere into a queue
83             sub insert {
84 9     9 1 12244 my $self = shift;
85 9         19 lock(%$self);
86              
87 9 50       36 if ($self->{'ENDED'}) {
88 0         0 require Carp;
89 0         0 Carp::croak("'insert' method called on queue that has been 'end'ed");
90             }
91              
92 9         18 my $queue = $self->{'queue'};
93              
94 9         55 my $index = $self->_validate_index(shift);
95              
96             # make sure we have something to insert
97 5 50       56 return unless @_;
98              
99             # support negative indices
100 5 50       19 if ($index < 0) {
101 0         0 $index += @{$queue};
  0         0  
102 0 0       0 $index = 0 if ($index < 0);
103             }
104              
105             # dequeue items from $index onward
106 5         24 my @tmp = ();
107 5         64 while (@{$queue} > $index) {
  30         245  
108 25         31 unshift(@tmp, pop(@{$queue}))
  25         43  
109             }
110              
111             # queue can't be too big so shift the oldest things off if necessary
112 5 50 33     41 if (defined($self->{'MAXSIZE'}) && $self->{'MAXSIZE'} > 0) {
113 5 100 66     8 if ((scalar(@{$queue}) + scalar(@_) + scalar(@tmp)) > $self->{'MAXSIZE'} &&
  5         58  
114             $self->{'ON_MAXSIZE'} =~ /^(die|warn_and_reject|silent_reject|warn_and_truncate)$/ix) {
115 4 100       36 if ($1 =~ /^warn_and_truncate$/ix) {
    100          
    100          
    50          
116 1         7 warn "queue exceeding its maximum size: truncating\n";
117             } elsif ($1 =~ /^silent_reject$/ix) {
118             # reset queue before dying
119 1         1 push(@{$queue}, @tmp);
  1         2  
120 1         2 return;
121             } elsif ($1 =~ /^warn_and_reject$/ix) {
122             # reset queue before dying
123 1         2 push(@{$queue}, @tmp);
  1         3  
124 1         232 warn "not inserting new items: queue would exceed its maximum size\n";
125 1         5 return;
126             } elsif ($1 =~ /^die$/ix) {
127             # reset queue before dying
128 1         1 push(@{$queue}, @tmp);
  1         3  
129 1         9 die "not inserting new items: queue would exceed its maximum size\n";
130             }
131             }
132              
133             # remove things already on the queue
134 2   66     3 while (scalar(@{$queue}) && (scalar(@{$queue}) + scalar(@_) + scalar(@tmp)) > $self->{'MAXSIZE'}) {
  8         19  
  6         23  
135 6         4 shift(@{$queue});
  6         10  
136             }
137              
138             # if we've already removed everything off of the queue and we're still
139             # over maxsize then take things off of the list of new items
140 2   66     15 while (scalar(@_) && (scalar(@_) + scalar(@tmp)) > $self->{'MAXSIZE'}) {
141 2         10 shift(@_);
142             }
143             }
144              
145             # add new items to the queue
146 2         2 push(@{$queue}, map { shared_clone($_) } @_);
  2         4  
  10         55  
147              
148             # add previous items back onto the queue
149 2         13 push(@{$queue}, @tmp);
  2         20  
150              
151             # soup's up
152 2         49 cond_signal(%$self);
153             }
154              
155             sub _validate_maxsize {
156 17     17   31 my ($self, $maxsize) = @_;
157              
158 17 50 33     231 if (defined($maxsize) && (!looks_like_number($maxsize) || (int($maxsize) != $maxsize) || ($maxsize < 1))) {
      66        
159 0         0 require Carp;
160 0         0 my ($method) = (caller(1))[3];
161 0         0 my $class_name = ref($self);
162 0         0 $method =~ s/${class_name}:://;
163 0         0 Carp::croak("Invalid 'maxsize' argument ($maxsize) to '$method' method");
164             }
165              
166 17         81 return $maxsize;
167             }
168              
169             sub _validate_on_maxsize {
170 17     17   34 my ($self, $on_maxsize) = @_;
171              
172 17 50 33     254 if (defined($on_maxsize) && ($on_maxsize !~ /^(?:die|warn_and_reject|silent_reject|warn_and_truncate|silent_truncate)$/ix)) {
173 0         0 require Carp;
174 0         0 my ($method) = (caller(1))[3];
175 0         0 my $class_name = ref($self);
176 0         0 $method =~ s/${class_name}:://;
177 0         0 Carp::croak("Invalid 'on_maxsize' argument ($on_maxsize) to '$method' method");
178             }
179              
180 17         49 return $on_maxsize;
181             }
182              
183             1;
184              
185             =head1 NAME
186              
187             Thread::Queue::MaxSize - Thread-safe queues with an upper bound
188              
189             =head1 VERSION
190              
191             This document describes Thread::Queue::MaxSize version 1.02
192              
193             =head1 SYNOPSIS
194              
195             use strict;
196             use warnings;
197              
198             use threads;
199             use Thread::Queue::MaxSize;
200              
201             # create a new empty queue with no max limit
202             my $q = Thread::Queue::MaxSize->new();
203              
204             # create a new empty queue that will only ever store 1000 items
205             my $q = Thread::Queue::MaxSize->new({ maxsize => 1000 });
206              
207             # create a queue that will die when too many items are enqueued
208             my $q = Thread::Queue::MaxSize->new({ maxsize => 1000, on_maxsize => 'die' });
209              
210             =head1 DESCRIPTION
211              
212             This is a subclass to L that will enforce an upper bound on the
213             number of items in a queue. This can be used to prevent memory use from
214             exploding on a queue that might never empty.
215              
216             =head1 QUEUE CREATION
217              
218             =over
219              
220             =item ->new()
221              
222             Creates a new empty queue. This queue will have no items to start.
223              
224             =item ->new(OPTIONS)
225              
226             Creates a new empty queue with some options. The two configurable options are:
227              
228             =over
229              
230             =item maxsize
231              
232             Defines the maximum size that the queue can ever be.
233              
234             =item on_maxsize
235              
236             Defines the action that will be taken when a queue reaches its maximum size.
237             There are five actions that can be taken when the list of items to enqueue or
238             insert would cause the queue to go over its maximum size. In all cases either
239             the all items are enqueued/inserted or none of the items are enqueued/inserted.
240              
241             =over
242              
243             =item die
244              
245             No items will be enqueued/inserted and the queue will throw an exception.
246              
247             =item warn_and_reject
248              
249             No items will be enqueued/inserted and the queue will issue a warning.
250              
251             =item silent_reject
252              
253             No items will be enqueued/inserted and no indication will be given as to why.
254              
255             =back warn_and_truncate
256              
257             All items will be enqueued/inserted, the oldest items on the list will be
258             truncated off the end, and the queue will issue a warning.
259              
260             =back silent_truncate
261              
262             All items will be enqueued/insertd, the oldest items on the list will be
263             truncated off the end, and no indication will be given as to why. This is the
264             default action.
265              
266             =back
267              
268             =head1 SEE ALSO
269              
270             L, L, L
271              
272             =head1 MAINTAINER
273              
274             Paul Lockaby Splockaby AT cpan DOT orgE>
275              
276             =head1 CREDIT
277              
278             Significant portions of this module are directly from L which is
279             maintained by Jerry D. Hedden, .
280              
281             =head1 LICENSE
282              
283             This program is free software; you can redistribute it and/or modify it under
284             the same terms as Perl itself.
285              
286             =cut