File Coverage

blib/lib/Queue/DBI.pm
Criterion Covered Total %
statement 189 195 96.9
branch 99 144 68.7
condition 55 79 69.6
subroutine 28 32 87.5
pod 24 24 100.0
total 395 474 83.3


line stmt bran cond sub pod time code
1             package Queue::DBI;
2              
3 38     38   837189 use warnings;
  38         60  
  38         1402  
4 38     38   170 use strict;
  38         50  
  38         880  
5              
6 38     38   14466 use Data::Dumper;
  38         175299  
  38         2395  
7 38     38   11006 use Data::Validate::Type;
  38         136787  
  38         1637  
8 38     38   636 use Carp;
  38         49  
  38         1795  
9 38     38   24258 use Storable qw();
  38         101873  
  38         930  
10 38     38   18380 use MIME::Base64 qw();
  38         18913  
  38         1045  
11              
12 38     38   17039 use Queue::DBI::Element;
  38         76  
  38         96237  
13              
14              
15             =head1 NAME
16              
17             Queue::DBI - A queueing module with an emphasis on safety, using DBI as a storage system for queued data.
18              
19              
20             =head1 VERSION
21              
22             Version 2.7.0
23              
24             =cut
25              
26             our $VERSION = '2.7.0';
27              
28             our $DEFAULT_QUEUES_TABLE_NAME = 'queues';
29              
30             our $DEFAULT_QUEUE_ELEMENTS_TABLE_NAME = 'queue_elements';
31              
32             our $MAX_VALUE_SIZE = 65535;
33              
34              
35             =head1 SYNOPSIS
36              
37             This module allows you to safely use a queueing system by preventing
38             backtracking, infinite loops and data loss.
39              
40             An emphasis of this distribution is to provide an extremely reliable dequeueing
41             mechanism without having to use transactions.
42              
43             use Queue::DBI;
44             my $queue = Queue::DBI->new(
45             'queue_name' => $queue_name,
46             'database_handle' => $dbh,
47             'cleanup_timeout' => 3600,
48             'verbose' => 1,
49             );
50              
51             # Store a complex data structure.
52             $queue->enqueue(
53             {
54             values => [ 1, 2, 3 ],
55             data => { key1 => 1, key2 => 2 },
56             }
57             );
58              
59             # Store a scalar, which must be passed by reference.
60             $queue->enqueue( \"Lorem ipsum dolor sit amet" );
61              
62             # Process the queued elements one by one.
63             while ( my $queue_element = $queue->next() )
64             {
65             # Skip elements that cannot be locked.
66             next
67             unless $queue_element->lock();
68              
69             eval {
70             # Do some work
71             process( $queue_element->{'email'} );
72             };
73             if ( $@ )
74             {
75             # Something failed, we clear the lock but don't delete the record in the
76             # queue so that we can try again next time
77             $queue_element->requeue();
78             }
79             else
80             {
81             # All good, remove definitively the element
82             $queue_element->success();
83             }
84             }
85              
86             # Requeue items that have been locked for more than 6 hours
87             $queue->cleanup( 6 * 3600 );
88              
89              
90             =head1 SUPPORTED DATABASES
91              
92             This distribution currently supports:
93              
94             =over 4
95              
96             =item * SQLite
97              
98             =item * MySQL
99              
100             =item * PostgreSQL
101              
102             =back
103              
104             Please contact me if you need support for another database type, I'm always
105             glad to add extensions if you can help me with testing.
106              
107              
108             =head1 METHODS
109              
110             =head2 new()
111              
112             Create a new Queue::DBI object.
113              
114             my $queue = Queue::DBI->new(
115             'queue_name' => $queue_name,
116             'database_handle' => $dbh,
117             'cleanup_timeout' => 3600,
118             'verbose' => 1,
119             'max_requeue_count' => 5,
120             );
121              
122             # Custom table names (optional).
123             my $queue = Queue::DBI->new(
124             'queue_name' => $queue_name,
125             'database_handle' => $dbh,
126             'cleanup_timeout' => 3600,
127             'verbose' => 1,
128             'max_requeue_count' => 5,
129             'queues_table_name' => $custom_queues_table_name,
130             'queue_elements_table_name' => $custom_queue_elements_table_name,
131             );
132              
133             Parameters:
134              
135             =over 4
136              
137             =item * 'queue_name'
138              
139             Mandatory, the name of the queue elements will be added to / removed from.
140              
141             =item * 'database handle'
142              
143             Mandatory, a DBI object.
144              
145             =item * 'cleanup_timeout'
146              
147             Optional, if set to an integer representing a time in seconds, the module will
148             automatically make available again elements that have been locked longuer than
149             that time.
150              
151             =item * 'verbose'
152              
153             Optional, control the verbosity of the warnings in the code. 0 will not display
154             any warning; 1 will only give one line warnings about the current operation;
155             2 will also usually output the SQL queries performed.
156              
157             =item * 'max_requeue_count'
158              
159             By default, Queue:::DBI will retrieve again the queue elements that were
160             requeued without limit to the number of times they have been requeued. Use this
161             option to specify how many times an element can be requeued before it is
162             ignored when retrieving elements.
163              
164             =item * 'queues_table_name'
165              
166             By default, Queue::DBI uses a table named 'queues' to store the queue
167             definitions. This allows using your own name, if you want to support separate
168             queuing systems or legacy systems.
169              
170             =item * 'queue_elements_table_name'
171              
172             By default, Queue::DBI uses a table named 'queue_elements' to store the queued
173             data. This allows using your own name, if you want to support separate queuing
174             systems or legacy systems.
175              
176             =item * 'lifetime'
177              
178             By default, Queue:::DBI will fetch elements regardless of how old they are. Use
179             this option to specify how old (in seconds) an element can be and still be
180             retrieved for processing.
181              
182             =back
183              
184             =cut
185              
186             sub new
187             {
188 50     50 1 249733 my ( $class, %args ) = @_;
189              
190             # Check parameters.
191 50         136 foreach my $arg ( qw( queue_name database_handle ) )
192             {
193             croak "Argument '$arg' is needed to create the Queue::DBI object"
194 99 100 66     708 if !defined( $args{$arg} ) || ( $args{$arg} eq '' );
195             }
196             croak 'Argument "cleanup_timeout" must be an integer representing seconds'
197 48 100 100     487 if defined( $args{'cleanup_timeout'} ) && ( $args{'cleanup_timeout'} !~ m/^\d+$/ );
198             croak 'Argument "lifetime" must be an integer representing seconds'
199 47 100 100     221 if defined( $args{'lifetime'} ) && ( $args{'lifetime'} !~ m/^\d+$/ );
200             croak 'Argument "serializer_freeze" must be a code reference'
201 46 100 100     183 if defined( $args{'serializer_freeze'} ) && !Data::Validate::Type::is_coderef( $args{'serializer_freeze'} );
202             croak 'Argument "serializer_thaw" must be a code reference'
203 45 100 100     198 if defined( $args{'serializer_thaw'} ) && !Data::Validate::Type::is_coderef( $args{'serializer_thaw'} );
204             croak 'Arguments "serializer_freeze" and "serializer_thaw" must be defined together'
205 44 100 100     311 if defined( $args{'serializer_freeze'} ) xor defined( $args{'serializer_thaw'} );
206              
207             # Create the object.
208 42         79 my $dbh = $args{'database_handle'};
209             my $self = bless(
210             {
211             'dbh' => $dbh,
212             'queue_name' => $args{'queue_name'},
213             'table_names' =>
214             {
215             'queues' => $args{'queues_table_name'},
216             'queue_elements' => $args{'queue_elements_table_name'},
217             },
218             'serializer' =>
219             {
220             'freeze' => $args{'serializer_freeze'},
221 42         386 'thaw' => $args{'serializer_thaw'},
222             }
223             },
224             $class
225             );
226              
227             # Find the queue id.
228 42         70 my $queue_id;
229             {
230 42         69 local $dbh->{'RaiseError'} = 1;
  42         553  
231             my $data = $dbh->selectrow_arrayref(
232             sprintf(
233             q|
234             SELECT queue_id
235             FROM %s
236             WHERE name = ?
237             |,
238             $dbh->quote_identifier( $self->get_queues_table_name() ),
239             ),
240             {},
241 42         892 $args{'queue_name'},
242             );
243              
244 42 100 66     18017 $queue_id = defined( $data ) && scalar( @$data ) != 0
245             ? $data->[0]
246             : undef;
247             }
248              
249 42 100 66     595 croak "The queue >$args{'queue_name'}< doesn't exist in the lookup table."
250             unless defined( $queue_id ) && ( $queue_id =~ m/^\d+$/ );
251 36         108 $self->{'queue_id'} = $queue_id;
252              
253             # Set optional parameters.
254 36         155 $self->set_verbose( $args{'verbose'} );
255 36         159 $self->set_max_requeue_count( $args{'max_requeue_count'} );
256 36         187 $self->set_lifetime( $args{'lifetime'} );
257              
258             # Perform queue cleanup if a timeout is specified.
259             $self->cleanup( $args{'cleanup_timeout'} )
260 36 100       185 if defined( $args{'cleanup_timeout'} );
261              
262 36         242 return $self;
263             }
264              
265              
266             =head2 get_queue_id()
267              
268             Returns the queue ID corresponding to the current queue object.
269              
270             my $queue_id = $queue->get_queue_id();
271              
272             =cut
273              
274             sub get_queue_id
275             {
276 115     115 1 3536 my ( $self ) = @_;
277              
278 115         1288 return $self->{'queue_id'};
279             }
280              
281              
282             =head2 count()
283              
284             Returns the number of elements in the queue.
285              
286             my $elements_count = $queue->count();
287              
288             Optional parameter:
289              
290             =over 4
291              
292             =item * exclude_locked_elements
293              
294             Exclude locked elements from the count. Default 0.
295              
296             =back
297              
298             my $unlocked_elements_count = $queue->count(
299             exclude_locked_elements => 1
300             );
301              
302             =cut
303              
304             sub count
305             {
306 11     11 1 9358 my ( $self, %args ) = @_;
307 11   100     54 my $exclude_locked_elements = delete( $args{'exclude_locked_elements'} ) || 0;
308              
309 11         29 my $verbose = $self->get_verbose();
310 11         24 my $dbh = $self->get_dbh();
311 11 50       29 carp "Entering count()." if $verbose;
312              
313             # Prepare optional additional clause to exclude locked elements.
314 11 100       28 my $exclude_locked_elements_sql = $exclude_locked_elements
315             ? 'AND lock_time IS NULL'
316             : '';
317              
318             # Count elements.
319 11         14 my $element_count;
320             {
321 11         13 local $dbh->{'RaiseError'} = 1;
  11         403  
322 11         229 my $data = $dbh->selectrow_arrayref(
323             sprintf(
324             q|
325             SELECT COUNT(*)
326             FROM %s
327             WHERE queue_id = ?
328             %s
329             |,
330             $dbh->quote_identifier( $self->get_queue_elements_table_name() ),
331             $exclude_locked_elements_sql,
332             ),
333             {},
334             $self->get_queue_id(),
335             );
336 11 50 33     2266 $element_count = defined( $data ) && scalar( @$data ) != 0 && defined( $data->[0] )
337             ? $data->[0]
338             : 0;
339             }
340              
341 11 50       33 carp "Found $element_count elements, leaving count()." if $verbose;
342              
343 11         74 return $element_count;
344             }
345              
346              
347             =head2 enqueue()
348              
349             Adds a new element at the end of the current queue.
350              
351             # Store a scalar by passing its reference.
352             my $queue_element_id = $queue->enqueue( \$string );
353             my $queue_element_id = $queue->enqueue( \"string" );
354              
355             # Store an array reference.
356             my $queue_element_id = $queue->enqueue( [ 1, 2, 3 ] );
357              
358             # Store a hash reference.
359             my $queue_element_id = $queue->enqueue( { key => 123 } );
360              
361             # Store a complex datastructure.
362             my $queue_element_id = $queue->enqueue(
363             {
364             values => [ 1, 2, 3 ],
365             data => { key1 => 1, key2 => 2 },
366             }
367             );
368              
369             The data passed should be a reference to a scalar or a reference to a complex
370             data structure, but you cannot pass a scalar directly. There is otherwise no
371             limitation on the type of data that can be stored as it is serialized for
372             storage in the database.
373              
374             =cut
375              
376             sub enqueue
377             {
378 16     16 1 11172 my ( $self, $data ) = @_;
379 16         58 my $verbose = $self->get_verbose();
380 16         50 my $dbh = $self->get_dbh();
381 16 50       57 carp "Entering enqueue()." if $verbose;
382 16 50       93 carp "Data is: " . Dumper( $data ) if $verbose > 1;
383              
384             # Make sure the data passed is a reference. We don't support scalars, as
385             # trying to store both scalars and references results in a mess documented in
386             # GH-3.
387 16 100       82 croak 'The data passed must be a reference, not a scalar'
388             if !ref( $data );
389              
390 15         55 my $encoded_data = $self->freeze( $data );
391 15 50       1585 croak 'The size of the data to store exceeds the maximum internal storage size available.'
392             if length( $encoded_data ) > $MAX_VALUE_SIZE;
393              
394 15 50       108 $dbh->do(
395             sprintf(
396             q|
397             INSERT INTO %s( queue_id, data, created )
398             VALUES ( ?, ?, ? )
399             |,
400             $dbh->quote_identifier( $self->get_queue_elements_table_name() ),
401             ),
402             {},
403             $self->get_queue_id(),
404             $encoded_data,
405             time(),
406             ) || croak 'Cannot execute SQL: ' . $dbh->errstr();
407              
408             # We need to reset the internal cached value preventing infinite loops, other-
409             # wise this new element will not be taken into account by the current queue
410             # object.
411 15         2691005 $self->{'max_id'} = undef;
412              
413 15 50       80 carp "Element inserted, leaving enqueue()." if $verbose;
414              
415 15         100 return $dbh->last_insert_id(
416             undef,
417             undef,
418             $self->get_queue_elements_table_name(),
419             'queue_element_id',
420             );
421             }
422              
423              
424             =head2 next()
425              
426             Retrieves the next element from the queue and returns it in the form of a
427             Queue::DBI::Element object.
428              
429             my $queue_element = $queue->next();
430              
431             while ( my $queue_element = $queue->next() )
432             {
433             # [...]
434             }
435              
436             Additionally, for testing purposes, a list of IDs to use when trying to retrieve
437             elements can be specified using 'search_in_ids':
438              
439             my $queue_item = $queue->next( 'search_in_ids' => [ 123, 124, 125 ] );
440              
441             =cut
442              
443             sub next ## no critic (Subroutines::ProhibitBuiltinHomonyms)
444             {
445 32     32 1 25839 my ( $self, %args ) = @_;
446 32         126 my $verbose = $self->get_verbose();
447 32 50       111 carp "Entering next()." if $verbose;
448              
449             my $elements = $self->retrieve_batch(
450             1,
451             'search_in_ids' => defined( $args{'search_in_ids'} )
452 32 50       222 ? $args{'search_in_ids'}
453             : undef,
454             );
455              
456 32 100 100     237 my $return = defined( $elements ) && ( scalar( @$elements ) != 0 )
457             ? $elements->[0]
458             : undef;
459              
460 32 50       89 carp "Leaving next()." if $verbose;
461 32         178 return $return;
462             }
463              
464              
465             =head2 retrieve_batch()
466              
467             Retrieves a batch of elements from the queue and returns them in an arrayref.
468              
469             This method requires an integer to be passed as parameter to indicate the
470             maximum size of the batch to be retrieved.
471              
472             my $queue_elements = $queue->retrieve_batch( 500 );
473              
474             foreach ( @$queue_elements )
475             {
476             # [...]
477             }
478              
479             Additionally, for testing purposes, a list of IDs to use when trying to retrieve
480             elements can be specified using 'search_in_ids':
481              
482             my $queue_items = $queue->retrieve_batch(
483             10,
484             'search_in_ids' => [ 123, 124, 125 ],
485             );
486              
487             =cut
488              
489             sub retrieve_batch
490             {
491 32     32 1 121 my ( $self, $number_of_elements_to_retrieve, %args ) = @_;
492 32         88 my $verbose = $self->get_verbose();
493 32         101 my $dbh = $self->get_dbh();
494 32 50       119 carp "Entering retrieve_batch()." if $verbose;
495              
496             # Check parameters
497 32 50 33     421 croak 'The number of elements to retrieve from the queue is not properly formatted'
498             unless defined( $number_of_elements_to_retrieve ) && ( $number_of_elements_to_retrieve =~ m/^\d+$/ );
499              
500             # Prevent infinite loops
501 32 100       123 unless ( defined( $self->{'max_id'} ) )
502             {
503 21         35 my $max_id;
504             {
505 21         34 local $dbh->{'RaiseError'} = 1;
  21         299  
506 21         1276 my $data = $dbh->selectrow_arrayref(
507             sprintf(
508             q|
509             SELECT MAX(queue_element_id)
510             FROM %s
511             WHERE queue_id = ?
512             |,
513             $dbh->quote_identifier( $self->get_queue_elements_table_name() ),
514             ),
515             {},
516             $self->get_queue_id(),
517             );
518              
519 21 50 33     4433 $max_id = defined( $data ) && scalar( @$data ) != 0
520             ? $data->[0]
521             : undef;
522             }
523              
524 21 100       83 if ( defined( $max_id ) )
525             {
526 19         63 $self->{'max_id'} = $max_id;
527             }
528             else
529             {
530             # Empty queue
531 2 50       7 carp "Detected empty queue, leaving." if $verbose;
532 2         7 return;
533             }
534             }
535              
536             # Prevent backtracking in case elements are requeued
537             $self->{'last_id'} = -1
538 30 100       132 unless defined( $self->{'last_id'} );
539              
540             # Detect end of queue quicker
541 30 100       119 if ( $self->{'last_id'} == $self->{'max_id'} )
542             {
543 3 50       12 carp "Finished processing queue, leaving." if $verbose;
544 3         16 return [];
545             }
546              
547             # Make sure we don't use requeued elements more times than specified.
548 27         115 my $max_requeue_count = $self->get_max_requeue_count();
549 27 100       170 my $sql_max_requeue_count = defined( $max_requeue_count )
550             ? 'AND requeue_count <= ' . $dbh->quote( $max_requeue_count )
551             : '';
552              
553             # Make sure we don't use elements that exceed the specified lifetime.
554 27         249 my $lifetime = $self->get_lifetime();
555 27 100       149 my $sql_lifetime = defined( $lifetime )
556             ? 'AND created >= ' . ( time() - $lifetime )
557             : '';
558              
559             # If specified, retrieve only those IDs.
560             my $ids = defined( $args{'search_in_ids'} )
561 27 50       90 ? 'AND queue_element_id IN (' . join( ',', map { $dbh->quote( $_ ) } @{ $args{'search_in_ids' } } ) . ')'
  0         0  
  0         0  
562             : '';
563              
564             # Retrieve the first available elements from the queue.
565 27 50       90 carp "Retrieving data." if $verbose;
566 27 50       87 carp "Parameters:\n\tLast ID: $self->{'last_id'}\n\tMax ID: $self->{'max_id'}\n" if $verbose > 1;
567             my $data = $dbh->selectall_arrayref(
568             sprintf(
569             q|
570             SELECT queue_element_id, data, requeue_count, created
571             FROM %s
572             WHERE queue_id = ?
573             AND lock_time IS NULL
574             AND queue_element_id >= ?
575             AND queue_element_id <= ?
576             %s
577             %s
578             %s
579             ORDER BY queue_element_id ASC
580             LIMIT ?
581             |,
582             $dbh->quote_identifier( $self->get_queue_elements_table_name() ),
583             $ids,
584             $sql_max_requeue_count,
585             $sql_lifetime,
586             ),
587             {},
588             $self->get_queue_id(),
589             $self->{'last_id'} + 1,
590 27         90 $self->{'max_id'},
591             $number_of_elements_to_retrieve,
592             );
593 27 50       6793 croak 'Cannot execute SQL: ' . $dbh->errstr() if defined( $dbh->errstr() );
594              
595             # All the remaining elements are locked
596 27 100 66     247 return []
597             if !defined( $data ) || ( scalar( @$data) == 0 );
598              
599             # Create objects
600 25 50       74 carp "Creating new Queue::DBI::Element objects." if $verbose;
601 25         64 my @return = ();
602 25         76 foreach my $row ( @$data )
603             {
604 25         133 push(
605             @return,
606             Queue::DBI::Element->new(
607             'queue' => $self,
608             'data' => $self->thaw( $row->[1] ),
609             'id' => $row->[0],
610             'requeue_count' => $row->[2],
611             'created' => $row->[3],
612             )
613             );
614             }
615              
616             # Prevent backtracking in case elements are requeued
617 25         127 $self->{'last_id'} = $return[-1]->id();
618              
619 25 50       89 carp "Leaving retrieve_batch()." if $verbose;
620 25         204 return \@return;
621             }
622              
623              
624             =head2 get_element_by_id()
625              
626             Retrieves a queue element using a queue element ID, ignoring any lock placed on
627             that element.
628              
629             This method is mostly useful when doing a lock on an element and then calling
630             success/requeue asynchroneously.
631              
632             This method requires a queue element ID to be passed as parameter.
633              
634             my $queue_element = $queue->get_element_by_id( 123456 );
635              
636             =cut
637              
638             sub get_element_by_id
639             {
640 2     2 1 5 my ( $self, $queue_element_id ) = @_;
641 2         11 my $verbose = $self->get_verbose();
642 2         7 my $dbh = $self->get_dbh();
643 2 50       7 carp "Entering get_element_by_id()." if $verbose;
644              
645             # Check parameters.
646 2 50       6 croak 'A queue element ID is required by this method'
647             unless defined( $queue_element_id );
648              
649             # Retrieve the specified element from the queue.
650 2 50       4 carp "Retrieving data." if $verbose;
651 2         8 my $data = $dbh->selectrow_hashref(
652             sprintf(
653             q|
654             SELECT *
655             FROM %s
656             WHERE queue_id = ?
657             AND queue_element_id = ?
658             |,
659             $dbh->quote_identifier( $self->get_queue_elements_table_name() ),
660             ),
661             {},
662             $self->get_queue_id(),
663             $queue_element_id,
664             );
665 2 50       545 croak 'Cannot execute SQL: ' . $dbh->errstr() if defined( $dbh->errstr() );
666              
667             # Queue element ID doesn't exist or belongs to another queue.
668 2 50       6 return unless defined( $data );
669              
670             # Create the Queue::DBI::Element object.
671 2 50       7 carp "Creating a new Queue::DBI::Element object." if $verbose;
672              
673             my $queue_element = Queue::DBI::Element->new(
674             'queue' => $self,
675             'data' => $self->thaw( $data->{'data'} ),
676             'id' => $data->{'queue_element_id'},
677             'requeue_count' => $data->{'requeue_count'},
678 2         10 'created' => $data->{'created'},
679             );
680              
681 2 50       7 carp "Leaving get_element_by_id()." if $verbose;
682 2         14 return $queue_element;
683             }
684              
685              
686             =head2 cleanup()
687              
688             Requeue items that have been locked for more than the time in seconds specified
689             as parameter.
690              
691             Returns the items requeued so that a specific action can be taken on them.
692              
693             my $elements = $queue->cleanup( $time_in_seconds );
694             foreach my $element ( @$elements )
695             {
696             # $element is a Queue::DBI::Element object
697             }
698              
699             =cut
700              
701             sub cleanup
702             {
703 30     30 1 12963 my ( $self, $time_in_seconds ) = @_;
704 30         95 my $verbose = $self->get_verbose();
705 30         106 my $dbh = $self->get_dbh();
706 30 50       85 carp "Entering cleanup()." if $verbose;
707              
708 30   50     79 $time_in_seconds ||= '';
709 30 50       162 croak 'Time in seconds is not correctly formatted'
710             unless $time_in_seconds =~ m/^\d+$/;
711              
712             # Find all the orphans
713 30 50       84 carp "Retrieving data." if $verbose;
714 30         139 my $rows = $dbh->selectall_arrayref(
715             sprintf(
716             q|
717             SELECT queue_element_id, data, requeue_count, created
718             FROM %s
719             WHERE queue_id = ?
720             AND lock_time < ?
721             |,
722             $dbh->quote_identifier( $self->get_queue_elements_table_name() ),
723             ),
724             {},
725             $self->get_queue_id(),
726             time() - $time_in_seconds,
727             );
728 30 50       4899 croak 'Cannot execute SQL: ' . $dbh->errstr() if defined( $dbh->errstr() );
729 30 50       101 return []
730             unless defined( $rows );
731              
732             # Create objects and requeue them
733 30 50       84 carp "Creating new Queue::DBI::Element objects." if $verbose;
734 30         51 my $queue_elements = [];
735 30         93 foreach my $row ( @$rows )
736             {
737 1         7 my $queue_element = Queue::DBI::Element->new(
738             'queue' => $self,
739             'data' => $self->thaw( $row->[1] ),
740             'id' => $row->[0],
741             'requeue_count' => $row->[2],
742             'created' => $row->[3],
743             );
744             # If this item was requeued by another process since its
745             # being SELECTed a moment ago, requeue() will return failure
746             # and this process will ignore it.
747 1 50       6 push( @$queue_elements, $queue_element )
748             if $queue_element->requeue();
749             }
750 30 50       79 carp "Found " . scalar( @$queue_elements ) . " orphaned element(s)." if $verbose;
751              
752 30 50       74 carp "Leaving cleanup()." if $verbose;
753 30         92 return $queue_elements;
754             }
755              
756              
757             =head2 purge()
758              
759             Remove (permanently, caveat emptor!) queue elements based on how many times
760             they've been requeued or how old they are, and return the number of elements
761             deleted.
762              
763             # Remove permanently elements that have been requeued more than 10 times.
764             my $deleted_elements_count = $queue->purge( max_requeue_count => 10 );
765              
766             # Remove permanently elements that were created over an hour ago.
767             my $deleted_elements_count = $queue->purge( lifetime => 3600 );
768              
769             Important: locked elements are not purged even if they match the criteria, as
770             they are presumed to be currently in process and purging them would create
771             unexpected failures in the application processing them.
772              
773             Also note that I and I cannot be combined.
774              
775             =cut
776              
777             sub purge
778             {
779 7     7 1 1769 my ( $self, %args ) = @_;
780 7         21 my $verbose = $self->get_verbose();
781 7         22 my $dbh = $self->get_dbh();
782 7 50       22 carp "Entering cleanup()." if $verbose;
783              
784 7         13 my $max_requeue_count = $args{'max_requeue_count'};
785 7         37 my $lifetime = $args{'lifetime'};
786              
787             # Check parameters.
788 7 50 66     53 croak '"max_requeue_count" must be an integer'
789             if defined( $max_requeue_count ) && ( $max_requeue_count !~ m/^\d+$/ );
790 7 50 66     50 croak '"lifetime" must be an integer representing seconds'
791             if defined( $lifetime ) && ( $lifetime !~ m/^\d+$/ );
792 7 100 100     42 croak '"max_requeue_count" and "lifetime" cannot be combined, specify one OR the other'
793             if defined( $lifetime ) && defined( $max_requeue_count );
794 6 100 66     50 croak '"max_requeue_count" or "lifetime" must be specified'
795             if !defined( $lifetime ) && !defined( $max_requeue_count );
796              
797             # Prepare query clauses.
798 5 100       24 my $sql_lifetime = defined( $lifetime )
799             ? 'AND created < ' . ( time() - $lifetime )
800             : '';
801 5 100       39 my $sql_max_requeue_count = defined( $max_requeue_count )
802             ? 'AND requeue_count > ' . $dbh->quote( $max_requeue_count )
803             : '';
804              
805             # Purge the queue.
806 5   33     54 my $rows_deleted = $dbh->do(
807             sprintf(
808             q|
809             DELETE
810             FROM %s
811             WHERE queue_id = ?
812             AND lock_time IS NULL
813             %s
814             %s
815             |,
816             $dbh->quote_identifier( $self->get_queue_elements_table_name() ),
817             $sql_lifetime,
818             $sql_max_requeue_count,
819             ),
820             {},
821             $self->get_queue_id(),
822             ) || croak 'Cannot execute SQL: ' . $dbh->errstr();
823              
824 5 50       56202 carp "Leaving cleanup()." if $verbose;
825             # Account for '0E0' which means no rows affected, and translates into no
826             # rows deleted in our case.
827 5 100       67 return $rows_deleted eq '0E0'
828             ? 0
829             : $rows_deleted;
830             }
831              
832              
833             =head1 ACCESSORS
834              
835             =head2 get_max_requeue_count()
836              
837             Return how many times an element can be requeued before it is ignored when
838             retrieving elements.
839              
840             my $max_requeue_count = $queue->get_max_requeue_count();
841              
842             =cut
843              
844             sub get_max_requeue_count
845             {
846 28     28 1 746 my ( $self ) = @_;
847              
848 28         84 return $self->{'max_requeue_count'};
849             }
850              
851              
852             =head2 set_max_requeue_count()
853              
854             Set the number of time an element can be requeued before it is ignored when
855             retrieving elements. Set it to C to disable the limit.
856              
857             # Don't keep pulling the element if it has been requeued more than 5 times.
858             $queue->set_max_requeue_count( 5 );+
859              
860             # Retry without limit.
861             $queue->set_max_requeue_count( undef );
862              
863             =cut
864              
865             sub set_max_requeue_count
866             {
867 36     36 1 91 my ( $self, $max_requeue_count ) = @_;
868              
869 36 50 66     224 croak 'max_requeue_count must be an integer or undef'
870             if defined( $max_requeue_count ) && ( $max_requeue_count !~ /^\d+$/ );
871              
872 36         75 $self->{'max_requeue_count'} = $max_requeue_count;
873              
874 36         58 return;
875             }
876              
877              
878             =head2 get_lifetime()
879              
880             Return how old an element can be before it is ignored when retrieving elements.
881              
882             # Find how old an element can be before the queue will stop retrieving it.
883             my $lifetime = $queue->get_lifetime();
884              
885             =cut
886              
887             sub get_lifetime
888             {
889 33     33 1 719 my ( $self ) = @_;
890              
891 33         305 return $self->{'lifetime'};
892             }
893              
894              
895             =head2 set_lifetime()
896              
897             Set how old an element can be before it is ignored when retrieving elements.
898              
899             Set it to C to reset Queue::DBI back to its default behavior of
900             retrieving elements without time limit.
901              
902             # Don't pull queue elements that are more than an hour old.
903             $queue->set_lifetime( 3600 );
904              
905             # Pull elements without time limit.
906             $queue->set_lifetime( undef );
907              
908             =cut
909              
910             sub set_lifetime
911             {
912 38     38 1 127 my ( $self, $lifetime ) = @_;
913              
914 38 50 66     172 croak 'lifetime must be an integer or undef'
915             if defined( $lifetime ) && ( $lifetime !~ /^\d+$/ );
916              
917 38         116 $self->{'lifetime'} = $lifetime;
918              
919 38         53 return;
920             }
921              
922              
923             =head2 get_verbose()
924              
925             Return the verbosity level, which is used in the module to determine when and
926             what type of debugging statements / information should be warned out.
927              
928             See C for the possible values this function can return.
929              
930             warn 'Verbose' if $queue->get_verbose();
931              
932             warn 'Very verbose' if $queue->get_verbose() > 1;
933              
934             =cut
935              
936             sub get_verbose
937             {
938 169     169 1 233 my ( $self ) = @_;
939              
940 169         458 return $self->{'verbose'};
941             }
942              
943              
944             =head2 set_verbose()
945              
946             Control the verbosity of the warnings in the code:
947              
948             =over 4
949              
950             =item * 0 will not display any warning;
951              
952             =item * 1 will only give one line warnings about the current operation;
953              
954             =item * 2 will also usually output the SQL queries performed.
955              
956             =back
957              
958             $queue->set_verbose(1); # turn on verbose information
959              
960             $queue->set_verbose(2); # be extra verbose
961              
962             $queue->set_verbose(0); # quiet now!
963              
964             =cut
965              
966             sub set_verbose
967             {
968 36     36 1 76 my ( $self, $verbose ) = @_;
969              
970 36   50     238 $self->{'verbose'} = ( $verbose || 0 );
971              
972 36         73 return;
973             }
974              
975              
976             =head1 INTERNAL METHODS
977              
978             =head2 freeze()
979              
980             Serialize an element to store it in a SQL "text" column.
981              
982             my $frozen_data = $queue->freeze( $data );
983              
984             =cut
985              
986             sub freeze
987             {
988 16     16 1 819 my ( $self, $data ) = @_;
989              
990             return defined( $self->{'serializer'} ) && defined( $self->{'serializer'}->{'freeze'} )
991 16 100 66     196 ? $self->{'serializer'}->{'freeze'}($data)
992             : MIME::Base64::encode_base64( Storable::freeze( $data ) );
993             }
994              
995             =head2 thaw()
996              
997             Deserialize an element which was stored a SQL "text" column.
998              
999             my $thawed_data = $queue->thaw( $frozen_data );
1000              
1001             =cut
1002              
1003             sub thaw
1004             {
1005 29     29 1 861 my ( $self, $data ) = @_;
1006              
1007             return defined( $self->{'serializer'} ) && defined( $self->{'serializer'}->{'thaw'} )
1008 29 100 66     675 ? $self->{'serializer'}->{'thaw'}($data)
1009             : Storable::thaw( MIME::Base64::decode_base64( $data ) );
1010             }
1011              
1012              
1013             =head1 DEPRECATED METHODS
1014              
1015             =head2 create_tables()
1016              
1017             Please use C in L instead.
1018              
1019             Here is an example that shows how to refactor your call to this deprecated
1020             function:
1021              
1022             # Load the admin module.
1023             use Queue::DBI::Admin;
1024              
1025             # Create the object which will allow managing the queues.
1026             my $queues_admin = Queue::DBI::Admin->new(
1027             database_handle => $dbh,
1028             );
1029              
1030             # Create the tables required by Queue::DBI to store the queues and data.
1031             $queues_admin->create_tables(
1032             drop_if_exist => $boolean,
1033             );
1034              
1035             =cut
1036              
1037             sub create_tables
1038             {
1039 0     0 1 0 croak 'create_tables() in Queue::DBI has been deprecated, please use create_tables() in Queue::DBI::Admin instead.';
1040             }
1041              
1042              
1043             =head2 lifetime()
1044              
1045             Please use C and C instead.
1046              
1047             =cut
1048              
1049             sub lifetime
1050             {
1051 0     0 1 0 croak 'lifetime() has been deprecated, please use get_lifetime() / set_lifetime() instead.';
1052             }
1053              
1054              
1055             =head2 verbose()
1056              
1057             Please use C and C instead.
1058              
1059             =cut
1060              
1061             sub verbose
1062             {
1063 0     0 1 0 croak 'verbose() has been deprecated, please use get_verbose() / set_verbose() instead.';
1064             }
1065              
1066              
1067             =head2 max_requeue_count()
1068              
1069             Please use C and C instead.
1070              
1071             =cut
1072              
1073             sub max_requeue_count
1074             {
1075 0     0 1 0 croak 'max_requeue_count() has been deprecated, please use get_max_requeue_count() / set_max_requeue_count() instead.';
1076             }
1077              
1078              
1079             =head1 INTERNAL METHODS
1080              
1081             =head2 get_dbh()
1082              
1083             Returns the database handle used for this queue.
1084              
1085             my $dbh = $queue->get_dbh();
1086              
1087             =cut
1088              
1089             sub get_dbh
1090             {
1091 138     138 1 599 my ( $self ) = @_;
1092              
1093 138         275 return $self->{'dbh'};
1094             }
1095              
1096              
1097             =head2 get_queues_table_name()
1098              
1099             Returns the name of the table used to store queue definitions.
1100              
1101             my $queues_table_name = $queue->get_queues_table_name();
1102              
1103             =cut
1104              
1105             sub get_queues_table_name
1106             {
1107 42     42 1 74 my ( $self ) = @_;
1108              
1109             return defined( $self->{'table_names'}->{'queues'} ) && ( $self->{'table_names'}->{'queues'} ne '' )
1110 42 100 66     882 ? $self->{'table_names'}->{'queues'}
1111             : $DEFAULT_QUEUES_TABLE_NAME;
1112             }
1113              
1114              
1115             =head2 get_queue_elements_table_name()
1116              
1117             Returns the name of the table used to store queue definitions.
1118              
1119             my $queue_elements_table_name = $queue->get_queue_elements_table_name();
1120              
1121             =cut
1122              
1123             sub get_queue_elements_table_name
1124             {
1125 165     165 1 296 my ( $self ) = @_;
1126              
1127             return defined( $self->{'table_names'}->{'queue_elements'} ) && ( $self->{'table_names'}->{'queue_elements'} ne '' )
1128 165 50 33     3088 ? $self->{'table_names'}->{'queue_elements'}
1129             : $DEFAULT_QUEUE_ELEMENTS_TABLE_NAME;
1130             }
1131              
1132              
1133             =head1 BUGS
1134              
1135             Please report any bugs or feature requests through the web interface at
1136             L.
1137             I will be notified, and then you'll automatically be notified of progress on
1138             your bug as I make changes.
1139              
1140              
1141             =head1 SUPPORT
1142              
1143             You can find documentation for this module with the perldoc command.
1144              
1145             perldoc Queue::DBI
1146              
1147              
1148             You can also look for information at:
1149              
1150             =over 4
1151              
1152             =item * GitHub's request tracker
1153              
1154             L
1155              
1156             =item * AnnoCPAN: Annotated CPAN documentation
1157              
1158             L
1159              
1160             =item * CPAN Ratings
1161              
1162             L
1163              
1164             =item * MetaCPAN
1165              
1166             L
1167              
1168             =back
1169              
1170              
1171             =head1 AUTHOR
1172              
1173             L,
1174             C<< >>.
1175              
1176              
1177             =head1 ACKNOWLEDGEMENTS
1178              
1179             I originally developed this project for ThinkGeek
1180             (L). Thanks for allowing me to open-source it!
1181              
1182              
1183             =head1 COPYRIGHT & LICENSE
1184              
1185             Copyright 2009-2017 Guillaume Aubert.
1186              
1187             This code is free software; you can redistribute it and/or modify it under the
1188             same terms as Perl 5 itself.
1189              
1190             This program is distributed in the hope that it will be useful, but WITHOUT ANY
1191             WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
1192             PARTICULAR PURPOSE. See the LICENSE file for more details.
1193              
1194             =cut
1195              
1196             1;