File Coverage

blib/lib/Queue/DBI/Element.pm
Criterion Covered Total %
statement 69 81 85.1
branch 18 48 37.5
condition 6 21 28.5
subroutine 14 14 100.0
pod 10 10 100.0
total 117 174 67.2


line stmt bran cond sub pod time code
1             package Queue::DBI::Element;
2              
3 39     39   14277 use warnings;
  39         37  
  39         1073  
4 39     39   128 use strict;
  39         40  
  39         2076  
5              
6 39     39   999 use Data::Dumper;
  39         6099  
  39         1474  
7 39     39   144 use Carp;
  39         30  
  39         27049  
8              
9              
10             =head1 NAME
11              
12             Queue::DBI::Element - An object representing an element pulled from the queue.
13              
14              
15             =head1 VERSION
16              
17             Version 2.6.2
18              
19             =cut
20              
21             our $VERSION = '2.6.2';
22              
23              
24             =head1 SYNOPSIS
25              
26             Please refer to the documentation for Queue::DBI.
27              
28             =head1 METHODS
29              
30             =head2 new()
31              
32             Create a new Queue::DBI::Element object.
33              
34             my $element = Queue::DBI::Element->new(
35             'queue' => $queue,
36             'data' => $data,
37             'id' => $id,
38             'requeue_count' => $requeue_count,
39             'created' => $created,
40             );
41              
42             All parameters are mandatory and correspond respectively to the Queue::DBI
43             object used to pull the element's data, the data, the ID of the element
44             in the database and the number of times the element has been requeued before.
45              
46             It is not recommended for direct use. You should be using the following to get
47             Queue::DBI::Element objects:
48              
49             my $queue = $queue->next();
50              
51             =cut
52              
53             sub new
54             {
55 28     28 1 809 my ( $class, %args ) = @_;
56              
57             # Check parameters
58 28         63 foreach my $arg ( qw( data id requeue_count created ) )
59             {
60 112 50 33     492 croak "Argument '$arg' is needed to create the Queue::DBI object"
61             if !defined( $args{$arg} ) || ( $args{$arg} eq '' );
62             }
63 28 50 33     259 croak 'Pass a Queue::DBI object to create an Queue::DBI::Element object'
64             unless defined( $args{'queue'} ) && $args{'queue'}->isa( 'Queue::DBI' );
65              
66             # Create the object
67 28         158 my $self = bless(
68             {
69             'queue' => $args{'queue'},
70             'data' => $args{'data'},
71             'id' => $args{'id'},
72             'requeue_count' => $args{'requeue_count'},
73             'created' => $args{'created'},
74             },
75             $class
76             );
77              
78 28         120 return $self;
79             }
80              
81              
82             =head2 lock()
83              
84             Locks the element so that another process acting on the queue cannot get a hold
85             of it
86              
87             if ( $element->lock() )
88             {
89             print "Element successfully locked.\n";
90             }
91             else
92             {
93             print "The element has already been removed or locked.\n";
94             }
95              
96             =cut
97              
98             sub lock ## no critic (Subroutines::ProhibitBuiltinHomonyms)
99             {
100 19     19 1 1774 my ( $self ) = @_;
101 19         60 my $queue = $self->get_queue();
102 19         53 my $verbose = $queue->get_verbose();
103 19         79 my $dbh = $queue->get_dbh();
104 19 50       48 carp "Entering lock()." if $verbose;
105              
106 19   33     52 my $rows = $dbh->do(
107             sprintf(
108             q|
109             UPDATE %s
110             SET lock_time = ?
111             WHERE queue_element_id = ?
112             AND lock_time IS NULL
113             |,
114             $dbh->quote_identifier( $queue->get_queue_elements_table_name() ),
115             ),
116             {},
117             time(),
118             $self->id(),
119             ) || croak 'Cannot lock element: ' . $dbh->errstr;
120              
121 19 50 33     473054 my $success = ( defined( $rows ) && ( $rows == 1 ) ) ? 1 : 0;
122 19 0       57 carp "Element locked: " . ( $success ? 'success' : 'already locked or gone' ) . "." if $verbose;
    50          
123              
124 19 50       50 carp "Leaving lock()." if $verbose;
125 19         110 return $success;
126             }
127              
128              
129             =head2 requeue()
130              
131             In case the processing of an element has failed
132              
133             if ( $element->requeue() )
134             {
135             print "Element successfully requeued.\n";
136             }
137             else
138             {
139             print "The element has already been removed or been requeued.\n";
140             }
141              
142             =cut
143              
144             sub requeue
145             {
146 7     7 1 178 my ( $self ) = @_;
147 7         22 my $queue = $self->get_queue();
148 7         39 my $verbose = $queue->get_verbose();
149 7         28 my $dbh = $queue->get_dbh();
150 7 50       24 carp "Entering requeue()." if $verbose;
151              
152 7         31 my $rows = $dbh->do(
153             sprintf(
154             q|
155             UPDATE %s
156             SET
157             lock_time = NULL,
158             requeue_count = requeue_count + 1
159             WHERE queue_element_id = ?
160             AND lock_time IS NOT NULL
161             |,
162             $dbh->quote_identifier( $queue->get_queue_elements_table_name() ),
163             ),
164             {},
165             $self->id(),
166             );
167              
168             # Since Queue::DBI does not enclose the SELECTing of a queue_element
169             # to be requeued, and this actual requeueing, it is possible for the
170             # element to be requeued by another process in-between. It may even
171             # be requeued, relocked, and successfully removed in-between. In either
172             # case, the number of rows affected would be 0, and do() would return
173             # 0E0, perl's "0 but true" value. This is not an error. However, if
174             # -1 or undef is returned, DBI.pm encountered some sort of error.
175 7 50 33     53621 if ( ! defined( $rows ) || $rows == -1 )
176             {
177             # Always carp the information, since it is an error that
178             # most likely doesn't come from this module.
179 0         0 my $error = $dbh->errstr();
180 0 0       0 carp 'Cannot requeue element: ' . ( defined( $error ) ? $error : 'no error returned by DBI' );
181 0         0 return 0;
182             }
183              
184 7 50       25 my $requeued = ( $rows == 1 ) ? 1 : 0;
185 7 0       21 carp "Element requeued: " . ( $requeued ? 'done' : 'already requeued or gone' ) . "." if $verbose;
    50          
186              
187             # Update the requeue_count on the object as well if the database update was
188             # successful.
189 7 50       34 $self->{'requeue_count'}++
190             if $requeued;
191              
192 7 50       17 carp "Leaving requeue()." if $verbose;
193 7         38 return $requeued;
194             }
195              
196              
197             =head2 success()
198              
199             Removes the element from the queue after its processing has successfully been
200             completed.
201              
202             if ( $element->success() )
203             {
204             print "Element successfully removed from queue.\n";
205             }
206             else
207             {
208             print "The element has already been removed.\n";
209             }
210              
211             =cut
212              
213             sub success
214             {
215 13     13 1 2022 my ( $self ) = @_;
216 13         43 my $queue = $self->get_queue();
217 13         69 my $verbose = $queue->get_verbose();
218 13         42 my $dbh = $queue->get_dbh();
219 13 50       39 carp "Entering success()." if $verbose;
220              
221             # Possible improvement:
222             # Add $self->{'lock_time'} in lock() and insist that it matches that value
223             # when trying to delete the element here.
224              
225             # First, we try to delete the LOCKED element.
226 13         157 my $rows = $dbh->do(
227             sprintf(
228             q|
229             DELETE
230             FROM %s
231             WHERE queue_element_id = ?
232             AND lock_time IS NOT NULL
233             |,
234             $dbh->quote_identifier( $queue->get_queue_elements_table_name() ),
235             ),
236             {},
237             $self->id(),
238             );
239              
240 13 50 33     409437 if ( ! defined( $rows ) || $rows == -1 )
241             {
242 0         0 croak 'Cannot remove element: ' . $dbh->errstr();
243             }
244              
245 13         34 my $success = 0;
246 13 50       41 if ( $rows == 1 )
247             {
248             # A LOCKED element was found and deleted, this is a success.
249 13 50       46 carp "Found a LOCKED element and deleted it. Element successfully processed." if $verbose;
250 13         28 $success = 1;
251             }
252             else
253             {
254             # No LOCKED element found to delete, try to find an UNLOCKED one in case it
255             # got requeued by a parallel process.
256 0         0 my $deleted_rows = $dbh->do(
257             sprintf(
258             q|
259             DELETE
260             FROM %s
261             WHERE queue_element_id = ?
262             |,
263             $dbh->quote_identifier( $queue->get_queue_elements_table_name() ),
264             ),
265             {},
266             $self->id(),
267             );
268              
269 0 0 0     0 if ( ! defined( $deleted_rows ) || $deleted_rows == -1 )
270             {
271 0         0 croak 'Cannot remove element: ' . $dbh->errstr;
272             }
273              
274 0 0       0 if ( $deleted_rows == 1 )
275             {
276             # An UNLOCKED element was found and deleted. It probably means that
277             # another process is still working on that element as well (possibly
278             # because this element's lock timed-out, got cleaned up and picked by
279             # another process).
280             # Always carp for those, technically we processed the element successfully
281             # so deleting it is the correct step to take, but we still want to throw
282             # some warning for the user.
283 0         0 carp 'Another process is probably working on the same element, as it was found UNLOCKED when we deleted it. '
284             . 'Check parallelization issues in your code!';
285 0         0 $success = 1;
286             }
287             else
288             {
289             # No element found at all. It probably means that another process had been
290             # working on that element, but completed successfully its run and deleted
291             # it.
292 0 0       0 carp 'Another process has probably worked on this element and already deleted it after completing its operations. '
293             . 'Check parallelization issues in your code!' if $verbose;
294 0         0 $success = 0;
295             }
296             }
297              
298 13 50       71 carp "Leaving success()." if $verbose;
299 13         68 return $success;
300             }
301              
302              
303             =head2 data()
304              
305             Returns the data initially queued.
306              
307             my $data = $element->data();
308              
309             =cut
310              
311             sub data
312             {
313 5     5 1 3503 my ( $self ) = @_;
314              
315 5         19 return $self->{'data'};
316             }
317              
318              
319             =head2 requeue_count()
320              
321             Returns the number of times that the current element has been requeued.
322              
323             my $requeue_count = $element->requeue_count();
324              
325             =cut
326              
327             sub requeue_count
328             {
329 6     6 1 3372 my ( $self ) = @_;
330              
331 6         25 return $self->{'requeue_count'};
332             }
333              
334              
335             =head2 id()
336              
337             Returns the ID of the current element
338              
339             my $id = $element->id();
340              
341             =cut
342              
343             sub id
344             {
345 74     74 1 3290 my ( $self ) = @_;
346              
347 74         391 return $self->{'id'};
348             }
349              
350              
351             =head2 get_created_time()
352              
353             Returns the unixtime at which the element was originally created.
354              
355             my $created = $element->get_created_time();
356              
357             =cut
358              
359             sub get_created_time
360             {
361 2     2 1 3 my ( $self ) = @_;
362              
363 2         3 return $self->{'created'};
364             }
365              
366              
367             =head2 is_over_lifetime()
368              
369             Returns a boolean indicating whether the current element is over the lifetime
370             specified when instanciating the queue. This is especially helpful if you
371             retrieve a large batch of elements and do long processing operations on each
372             of them.
373              
374             my $is_over_lifetime = $element->is_over_lifetime();
375              
376             =cut
377              
378             sub is_over_lifetime
379             {
380 2     2 1 5 my ( $self ) = @_;
381 2         3 my $queue = $self->get_queue();
382 2         5 my $lifetime = $queue->get_lifetime();
383              
384             # If the queue doesn't a lifetime, an element will never "expire".
385 2 50       4 return 0 if !defined( $lifetime );
386              
387             # Check the time the element was created.
388 2         4 my $created_time = $self->get_created_time();
389 2         8 return time() - $created_time > $lifetime;
390             }
391              
392              
393             =head1 INTERNAL METHODS
394              
395             =head2 get_queue()
396              
397             Returns the Queue::DBI object used to pull the current element.
398              
399             my $queue = $element->get_queue();
400              
401             =cut
402              
403             sub get_queue
404             {
405 41     41 1 81 my ( $self ) = @_;
406              
407 41         124 return $self->{'queue'};
408             }
409              
410              
411             =head1 BUGS
412              
413             Please report any bugs or feature requests through the web interface at
414             L.
415             I will be notified, and then you'll automatically be notified of progress on
416             your bug as I make changes.
417              
418              
419             =head1 SUPPORT
420              
421             You can find documentation for this module with the perldoc command.
422              
423             perldoc Queue::DBI::Element
424              
425              
426             You can also look for information at:
427              
428             =over 4
429              
430             =item * GitHub's request tracker
431              
432             L
433              
434             =item * AnnoCPAN: Annotated CPAN documentation
435              
436             L
437              
438             =item * CPAN Ratings
439              
440             L
441              
442             =item * MetaCPAN
443              
444             L
445              
446             =back
447              
448              
449             =head1 AUTHOR
450              
451             L,
452             C<< >>.
453              
454              
455             =head1 ACKNOWLEDGEMENTS
456              
457             I originally developed this project for ThinkGeek
458             (L). Thanks for allowing me to open-source it!
459              
460              
461             =head1 COPYRIGHT & LICENSE
462              
463             Copyright 2009-2015 Guillaume Aubert.
464              
465             This program is free software: you can redistribute it and/or modify it under
466             the terms of the GNU General Public License version 3 as published by the Free
467             Software Foundation.
468              
469             This program is distributed in the hope that it will be useful, but WITHOUT ANY
470             WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
471             PARTICULAR PURPOSE. See the GNU General Public License for more details.
472              
473             You should have received a copy of the GNU General Public License along with
474             this program. If not, see http://www.gnu.org/licenses/
475              
476             =cut
477              
478             1;