File Coverage

blib/lib/Queue/DBI.pm
Criterion Covered Total %
statement 188 194 96.9
branch 97 142 68.3
condition 55 79 69.6
subroutine 28 32 87.5
pod 24 24 100.0
total 392 471 83.2


line stmt bran cond sub pod time code
1             package Queue::DBI;
2              
3 38     38   840522 use warnings;
  38         64  
  38         1262  
4 38     38   179 use strict;
  38         55  
  38         947  
5              
6 38     38   14002 use Data::Dumper;
  38         160573  
  38         1984  
7 38     38   12166 use Data::Validate::Type;
  38         143167  
  38         1415  
8 38     38   212 use Carp;
  38         47  
  38         1767  
9 38     38   24920 use Storable qw();
  38         113141  
  38         1355  
10 38     38   20331 use MIME::Base64 qw();
  38         22783  
  38         1259  
11              
12 38     38   15446 use Queue::DBI::Element;
  38         77  
  38         95058  
13              
14              
15             =head1 NAME
16              
17             Queue::DBI - A queueing module with an emphasis on safety, using DBI as a storage system for queued data.
18              
19              
20             =head1 VERSION
21              
22             Version 2.6.1
23              
24             =cut
25              
26             our $VERSION = '2.6.1';
27              
28             our $DEFAULT_QUEUES_TABLE_NAME = 'queues';
29              
30             our $DEFAULT_QUEUE_ELEMENTS_TABLE_NAME = 'queue_elements';
31              
32             our $MAX_VALUE_SIZE = 65535;
33              
34              
35             =head1 SYNOPSIS
36              
37             This module allows you to safely use a queueing system by preventing
38             backtracking, infinite loops and data loss.
39              
40             An emphasis of this distribution is to provide an extremely reliable dequeueing
41             mechanism without having to use transactions.
42              
43             use Queue::DBI;
44             my $queue = Queue::DBI->new(
45             'queue_name' => $queue_name,
46             'database_handle' => $dbh,
47             'cleanup_timeout' => 3600,
48             'verbose' => 1,
49             );
50              
51             $queue->enqueue( $data );
52              
53             while ( my $queue_element = $queue->next() )
54             {
55             next
56             unless $queue_element->lock();
57              
58             eval {
59             # Do some work
60             process( $queue_element->{'email'} );
61             };
62             if ( $@ )
63             {
64             # Something failed, we clear the lock but don't delete the record in the
65             # queue so that we can try again next time
66             $queue_element->requeue();
67             }
68             else
69             {
70             # All good, remove definitively the element
71             $queue_element->success();
72             }
73             }
74              
75             # Requeue items that have been locked for more than 6 hours
76             $queue->cleanup( 6 * 3600 );
77              
78              
79             =head1 SUPPORTED DATABASES
80              
81             This distribution currently supports:
82              
83             =over 4
84              
85             =item * SQLite
86              
87             =item * MySQL
88              
89             =item * PostgreSQL
90              
91             =back
92              
93             Please contact me if you need support for another database type, I'm always
94             glad to add extensions if you can help me with testing.
95              
96              
97             =head1 METHODS
98              
99             =head2 new()
100              
101             Create a new Queue::DBI object.
102              
103             my $queue = Queue::DBI->new(
104             'queue_name' => $queue_name,
105             'database_handle' => $dbh,
106             'cleanup_timeout' => 3600,
107             'verbose' => 1,
108             'max_requeue_count' => 5,
109             );
110              
111             # Custom table names (optional).
112             my $queue = Queue::DBI->new(
113             'queue_name' => $queue_name,
114             'database_handle' => $dbh,
115             'cleanup_timeout' => 3600,
116             'verbose' => 1,
117             'max_requeue_count' => 5,
118             'queues_table_name' => $custom_queues_table_name,
119             'queue_elements_table_name' => $custom_queue_elements_table_name,
120             );
121              
122             Parameters:
123              
124             =over 4
125              
126             =item * 'queue_name'
127              
128             Mandatory, the name of the queue elements will be added to / removed from.
129              
130             =item * 'database handle'
131              
132             Mandatory, a DBI object.
133              
134             =item * 'cleanup_timeout'
135              
136             Optional, if set to an integer representing a time in seconds, the module will
137             automatically make available again elements that have been locked longuer than
138             that time.
139              
140             =item * 'verbose'
141              
142             Optional, control the verbosity of the warnings in the code. 0 will not display
143             any warning; 1 will only give one line warnings about the current operation;
144             2 will also usually output the SQL queries performed.
145              
146             =item * 'max_requeue_count'
147              
148             By default, Queue:::DBI will retrieve again the queue elements that were
149             requeued without limit to the number of times they have been requeued. Use this
150             option to specify how many times an element can be requeued before it is
151             ignored when retrieving elements.
152              
153             =item * 'queues_table_name'
154              
155             By default, Queue::DBI uses a table named 'queues' to store the queue
156             definitions. This allows using your own name, if you want to support separate
157             queuing systems or legacy systems.
158              
159             =item * 'queue_elements_table_name'
160              
161             By default, Queue::DBI uses a table named 'queue_elements' to store the queued
162             data. This allows using your own name, if you want to support separate queuing
163             systems or legacy systems.
164              
165             =item * 'lifetime'
166              
167             By default, Queue:::DBI will fetch elements regardless of how old they are. Use
168             this option to specify how old (in seconds) an element can be and still be
169             retrieved for processing.
170              
171             =back
172              
173             =cut
174              
175             sub new
176             {
177 50     50 1 201980 my ( $class, %args ) = @_;
178              
179             # Check parameters.
180 50         133 foreach my $arg ( qw( queue_name database_handle ) )
181             {
182 99 100 66     698 croak "Argument '$arg' is needed to create the Queue::DBI object"
183             if !defined( $args{$arg} ) || ( $args{$arg} eq '' );
184             }
185 48 100 100     464 croak 'Argument "cleanup_timeout" must be an integer representing seconds'
186             if defined( $args{'cleanup_timeout'} ) && ( $args{'cleanup_timeout'} !~ m/^\d+$/ );
187 47 100 100     200 croak 'Argument "lifetime" must be an integer representing seconds'
188             if defined( $args{'lifetime'} ) && ( $args{'lifetime'} !~ m/^\d+$/ );
189 46 100 100     180 croak 'Argument "serializer_freeze" must be a code reference'
190             if defined( $args{'serializer_freeze'} ) && !Data::Validate::Type::is_coderef( $args{'serializer_freeze'} );
191 45 100 100     201 croak 'Argument "serializer_thaw" must be a code reference'
192             if defined( $args{'serializer_thaw'} ) && !Data::Validate::Type::is_coderef( $args{'serializer_thaw'} );
193 44 100 100     283 croak 'Arguments "serializer_freeze" and "serializer_thaw" must be defined together'
194             if defined( $args{'serializer_freeze'} ) xor defined( $args{'serializer_thaw'} );
195              
196             # Create the object.
197 42         71 my $dbh = $args{'database_handle'};
198 42         375 my $self = bless(
199             {
200             'dbh' => $dbh,
201             'queue_name' => $args{'queue_name'},
202             'table_names' =>
203             {
204             'queues' => $args{'queues_table_name'},
205             'queue_elements' => $args{'queue_elements_table_name'},
206             },
207             'serializer' =>
208             {
209             'freeze' => $args{'serializer_freeze'},
210             'thaw' => $args{'serializer_thaw'},
211             }
212             },
213             $class
214             );
215              
216             # Find the queue id.
217 42         65 my $queue_id;
218             {
219 42         70 local $dbh->{'RaiseError'} = 1;
  42         674  
220 42         1062 my $data = $dbh->selectrow_arrayref(
221             sprintf(
222             q|
223             SELECT queue_id
224             FROM %s
225             WHERE name = ?
226             |,
227             $dbh->quote_identifier( $self->get_queues_table_name() ),
228             ),
229             {},
230             $args{'queue_name'},
231             );
232              
233 42 100 66     14603 $queue_id = defined( $data ) && scalar( @$data ) != 0
234             ? $data->[0]
235             : undef;
236             }
237              
238 42 100 66     541 croak "The queue >$args{'queue_name'}< doesn't exist in the lookup table."
239             unless defined( $queue_id ) && ( $queue_id =~ m/^\d+$/ );
240 36         98 $self->{'queue_id'} = $queue_id;
241              
242             # Set optional parameters.
243 36         162 $self->set_verbose( $args{'verbose'} );
244 36         140 $self->set_max_requeue_count( $args{'max_requeue_count'} );
245 36         142 $self->set_lifetime( $args{'lifetime'} );
246              
247             # Perform queue cleanup if a timeout is specified.
248 36 100       239 $self->cleanup( $args{'cleanup_timeout'} )
249             if defined( $args{'cleanup_timeout'} );
250              
251 36         247 return $self;
252             }
253              
254              
255             =head2 get_queue_id()
256              
257             Returns the queue ID corresponding to the current queue object.
258              
259             my $queue_id = $queue->get_queue_id();
260              
261             =cut
262              
263             sub get_queue_id
264             {
265 115     115 1 3422 my ( $self ) = @_;
266              
267 115         1202 return $self->{'queue_id'};
268             }
269              
270              
271             =head2 count()
272              
273             Returns the number of elements in the queue.
274              
275             my $elements_count = $queue->count();
276              
277             Optional parameter:
278              
279             =over 4
280              
281             =item * exclude_locked_elements
282              
283             Exclude locked elements from the count. Default 0.
284              
285             =back
286              
287             my $unlocked_elements_count = $queue->count(
288             exclude_locked_elements => 1
289             );
290              
291             =cut
292              
293             sub count
294             {
295 11     11 1 7767 my ( $self, %args ) = @_;
296 11   100     54 my $exclude_locked_elements = delete( $args{'exclude_locked_elements'} ) || 0;
297              
298 11         28 my $verbose = $self->get_verbose();
299 11         24 my $dbh = $self->get_dbh();
300 11 50       26 carp "Entering count()." if $verbose;
301              
302             # Prepare optional additional clause to exclude locked elements.
303 11 100       23 my $exclude_locked_elements_sql = $exclude_locked_elements
304             ? 'AND lock_time IS NULL'
305             : '';
306              
307             # Count elements.
308 11         11 my $element_count;
309             {
310 11         11 local $dbh->{'RaiseError'} = 1;
  11         121  
311 11         189 my $data = $dbh->selectrow_arrayref(
312             sprintf(
313             q|
314             SELECT COUNT(*)
315             FROM %s
316             WHERE queue_id = ?
317             %s
318             |,
319             $dbh->quote_identifier( $self->get_queue_elements_table_name() ),
320             $exclude_locked_elements_sql,
321             ),
322             {},
323             $self->get_queue_id(),
324             );
325 11 50 33     1919 $element_count = defined( $data ) && scalar( @$data ) != 0 && defined( $data->[0] )
326             ? $data->[0]
327             : 0;
328             }
329              
330 11 50       33 carp "Found $element_count elements, leaving count()." if $verbose;
331              
332 11         67 return $element_count;
333             }
334              
335              
336             =head2 enqueue()
337              
338             Adds a new element at the end of the current queue.
339              
340             my $queue_element_id = $queue->enqueue( $data );
341              
342             The data passed can be a scalar or a reference to a complex data
343             structure. There is no limitation on the type of data that can be stored
344             as it is serialized for storage in the database.
345              
346             =cut
347              
348             sub enqueue
349             {
350 15     15 1 9451 my ( $self, $data ) = @_;
351 15         49 my $verbose = $self->get_verbose();
352 15         48 my $dbh = $self->get_dbh();
353 15 50       54 carp "Entering enqueue()." if $verbose;
354 15 50       85 carp "Data is: " . Dumper( $data ) if $verbose > 1;
355              
356 15         65 my $encoded_data = $self->freeze( $data );
357 15 50       1160 croak 'The size of the data to store exceeds the maximum internal storage size available.'
358             if length( $encoded_data ) > $MAX_VALUE_SIZE;
359              
360 15 50       48 $dbh->do(
361             sprintf(
362             q|
363             INSERT INTO %s( queue_id, data, created )
364             VALUES ( ?, ?, ? )
365             |,
366             $dbh->quote_identifier( $self->get_queue_elements_table_name() ),
367             ),
368             {},
369             $self->get_queue_id(),
370             $encoded_data,
371             time(),
372             ) || croak 'Cannot execute SQL: ' . $dbh->errstr();
373              
374             # We need to reset the internal cached value preventing infinite loops, other-
375             # wise this new element will not be taken into account by the current queue
376             # object.
377 15         723344 $self->{'max_id'} = undef;
378              
379 15 50       84 carp "Element inserted, leaving enqueue()." if $verbose;
380              
381 15         100 return $dbh->last_insert_id(
382             undef,
383             undef,
384             $self->get_queue_elements_table_name(),
385             'queue_element_id',
386             );
387             }
388              
389              
390             =head2 next()
391              
392             Retrieves the next element from the queue and returns it in the form of a
393             Queue::DBI::Element object.
394              
395             my $queue_element = $queue->next();
396              
397             while ( my $queue_element = $queue->next() )
398             {
399             # [...]
400             }
401              
402             Additionally, for testing purposes, a list of IDs to use when trying to retrieve
403             elements can be specified using 'search_in_ids':
404              
405             my $queue_item = $queue->next( 'search_in_ids' => [ 123, 124, 125 ] );
406              
407             =cut
408              
409             sub next ## no critic (Subroutines::ProhibitBuiltinHomonyms)
410             {
411 32     32 1 21114 my ( $self, %args ) = @_;
412 32         198 my $verbose = $self->get_verbose();
413 32 50       116 carp "Entering next()." if $verbose;
414              
415 32 50       226 my $elements = $self->retrieve_batch(
416             1,
417             'search_in_ids' => defined( $args{'search_in_ids'} )
418             ? $args{'search_in_ids'}
419             : undef,
420             );
421              
422 32 100 100     361 my $return = defined( $elements ) && ( scalar( @$elements ) != 0 )
423             ? $elements->[0]
424             : undef;
425              
426 32 50       86 carp "Leaving next()." if $verbose;
427 32         175 return $return;
428             }
429              
430              
431             =head2 retrieve_batch()
432              
433             Retrieves a batch of elements from the queue and returns them in an arrayref.
434              
435             This method requires an integer to be passed as parameter to indicate the
436             maximum size of the batch to be retrieved.
437              
438             my $queue_elements = $queue->retrieve_batch( 500 );
439              
440             foreach ( @$queue_elements )
441             {
442             # [...]
443             }
444              
445             Additionally, for testing purposes, a list of IDs to use when trying to retrieve
446             elements can be specified using 'search_in_ids':
447              
448             my $queue_items = $queue->retrieve_batch(
449             10,
450             'search_in_ids' => [ 123, 124, 125 ],
451             );
452              
453             =cut
454              
455             sub retrieve_batch
456             {
457 32     32 1 102 my ( $self, $number_of_elements_to_retrieve, %args ) = @_;
458 32         83 my $verbose = $self->get_verbose();
459 32         90 my $dbh = $self->get_dbh();
460 32 50       106 carp "Entering retrieve_batch()." if $verbose;
461              
462             # Check parameters
463 32 50 33     414 croak 'The number of elements to retrieve from the queue is not properly formatted'
464             unless defined( $number_of_elements_to_retrieve ) && ( $number_of_elements_to_retrieve =~ m/^\d+$/ );
465              
466             # Prevent infinite loops
467 32 100       116 unless ( defined( $self->{'max_id'} ) )
468             {
469 21         35 my $max_id;
470             {
471 21         31 local $dbh->{'RaiseError'} = 1;
  21         329  
472 21         537 my $data = $dbh->selectrow_arrayref(
473             sprintf(
474             q|
475             SELECT MAX(queue_element_id)
476             FROM %s
477             WHERE queue_id = ?
478             |,
479             $dbh->quote_identifier( $self->get_queue_elements_table_name() ),
480             ),
481             {},
482             $self->get_queue_id(),
483             );
484              
485 21 50 33     4176 $max_id = defined( $data ) && scalar( @$data ) != 0
486             ? $data->[0]
487             : undef;
488             }
489              
490 21 100       88 if ( defined( $max_id ) )
491             {
492 19         62 $self->{'max_id'} = $max_id;
493             }
494             else
495             {
496             # Empty queue
497 2 50       5 carp "Detected empty queue, leaving." if $verbose;
498 2         7 return;
499             }
500             }
501              
502             # Prevent backtracking in case elements are requeued
503 30 100       120 $self->{'last_id'} = -1
504             unless defined( $self->{'last_id'} );
505              
506             # Detect end of queue quicker
507 30 100       108 if ( $self->{'last_id'} == $self->{'max_id'} )
508             {
509 3 50       9 carp "Finished processing queue, leaving." if $verbose;
510 3         14 return [];
511             }
512              
513             # Make sure we don't use requeued elements more times than specified.
514 27         93 my $max_requeue_count = $self->get_max_requeue_count();
515 27 100       144 my $sql_max_requeue_count = defined( $max_requeue_count )
516             ? 'AND requeue_count <= ' . $dbh->quote( $max_requeue_count )
517             : '';
518              
519             # Make sure we don't use elements that exceed the specified lifetime.
520 27         252 my $lifetime = $self->get_lifetime();
521 27 100       79 my $sql_lifetime = defined( $lifetime )
522             ? 'AND created >= ' . ( time() - $lifetime )
523             : '';
524              
525             # If specified, retrieve only those IDs.
526 0         0 my $ids = defined( $args{'search_in_ids'} )
527 27 50       84 ? 'AND queue_element_id IN (' . join( ',', map { $dbh->quote( $_ ) } @{ $args{'search_in_ids' } } ) . ')'
  0         0  
528             : '';
529              
530             # Retrieve the first available elements from the queue.
531 27 50       70 carp "Retrieving data." if $verbose;
532 27 50       160 carp "Parameters:\n\tLast ID: $self->{'last_id'}\n\tMax ID: $self->{'max_id'}\n" if $verbose > 1;
533 27         83 my $data = $dbh->selectall_arrayref(
534             sprintf(
535             q|
536             SELECT queue_element_id, data, requeue_count, created
537             FROM %s
538             WHERE queue_id = ?
539             AND lock_time IS NULL
540             AND queue_element_id >= ?
541             AND queue_element_id <= ?
542             %s
543             %s
544             %s
545             ORDER BY queue_element_id ASC
546             LIMIT ?
547             |,
548             $dbh->quote_identifier( $self->get_queue_elements_table_name() ),
549             $ids,
550             $sql_max_requeue_count,
551             $sql_lifetime,
552             ),
553             {},
554             $self->get_queue_id(),
555             $self->{'last_id'} + 1,
556             $self->{'max_id'},
557             $number_of_elements_to_retrieve,
558             );
559 27 50       5602 croak 'Cannot execute SQL: ' . $dbh->errstr() if defined( $dbh->errstr() );
560              
561             # All the remaining elements are locked
562 27 100 66     241 return []
563             if !defined( $data ) || ( scalar( @$data) == 0 );
564              
565             # Create objects
566 25 50       75 carp "Creating new Queue::DBI::Element objects." if $verbose;
567 25         54 my @return = ();
568 25         73 foreach my $row ( @$data )
569             {
570 25         115 push(
571             @return,
572             Queue::DBI::Element->new(
573             'queue' => $self,
574             'data' => $self->thaw( $row->[1] ),
575             'id' => $row->[0],
576             'requeue_count' => $row->[2],
577             'created' => $row->[3],
578             )
579             );
580             }
581              
582             # Prevent backtracking in case elements are requeued
583 25         123 $self->{'last_id'} = $return[-1]->id();
584              
585 25 50       72 carp "Leaving retrieve_batch()." if $verbose;
586 25         133 return \@return;
587             }
588              
589              
590             =head2 get_element_by_id()
591              
592             Retrieves a queue element using a queue element ID, ignoring any lock placed on
593             that element.
594              
595             This method is mostly useful when doing a lock on an element and then calling
596             success/requeue asynchroneously.
597              
598             This method requires a queue element ID to be passed as parameter.
599              
600             my $queue_element = $queue->get_element_by_id( 123456 );
601              
602             =cut
603              
604             sub get_element_by_id
605             {
606 2     2 1 5 my ( $self, $queue_element_id ) = @_;
607 2         12 my $verbose = $self->get_verbose();
608 2         9 my $dbh = $self->get_dbh();
609 2 50       7 carp "Entering get_element_by_id()." if $verbose;
610              
611             # Check parameters.
612 2 50       6 croak 'A queue element ID is required by this method'
613             unless defined( $queue_element_id );
614              
615             # Retrieve the specified element from the queue.
616 2 50       5 carp "Retrieving data." if $verbose;
617 2         7 my $data = $dbh->selectrow_hashref(
618             sprintf(
619             q|
620             SELECT *
621             FROM %s
622             WHERE queue_id = ?
623             AND queue_element_id = ?
624             |,
625             $dbh->quote_identifier( $self->get_queue_elements_table_name() ),
626             ),
627             {},
628             $self->get_queue_id(),
629             $queue_element_id,
630             );
631 2 50       659 croak 'Cannot execute SQL: ' . $dbh->errstr() if defined( $dbh->errstr() );
632              
633             # Queue element ID doesn't exist or belongs to another queue.
634 2 50       6 return unless defined( $data );
635              
636             # Create the Queue::DBI::Element object.
637 2 50       9 carp "Creating a new Queue::DBI::Element object." if $verbose;
638              
639 2         9 my $queue_element = Queue::DBI::Element->new(
640             'queue' => $self,
641             'data' => $self->thaw( $data->{'data'} ),
642             'id' => $data->{'queue_element_id'},
643             'requeue_count' => $data->{'requeue_count'},
644             'created' => $data->{'created'},
645             );
646              
647 2 50       6 carp "Leaving get_element_by_id()." if $verbose;
648 2         12 return $queue_element;
649             }
650              
651              
652             =head2 cleanup()
653              
654             Requeue items that have been locked for more than the time in seconds specified
655             as parameter.
656              
657             Returns the items requeued so that a specific action can be taken on them.
658              
659             my $elements = $queue->cleanup( $time_in_seconds );
660             foreach my $element ( @$elements )
661             {
662             # $element is a Queue::DBI::Element object
663             }
664              
665             =cut
666              
667             sub cleanup
668             {
669 30     30 1 68902 my ( $self, $time_in_seconds ) = @_;
670 30         93 my $verbose = $self->get_verbose();
671 30         106 my $dbh = $self->get_dbh();
672 30 50       92 carp "Entering cleanup()." if $verbose;
673              
674 30   50     95 $time_in_seconds ||= '';
675 30 50       158 croak 'Time in seconds is not correctly formatted'
676             unless $time_in_seconds =~ m/^\d+$/;
677              
678             # Find all the orphans
679 30 50       89 carp "Retrieving data." if $verbose;
680 30         127 my $rows = $dbh->selectall_arrayref(
681             sprintf(
682             q|
683             SELECT queue_element_id, data, requeue_count, created
684             FROM %s
685             WHERE queue_id = ?
686             AND lock_time < ?
687             |,
688             $dbh->quote_identifier( $self->get_queue_elements_table_name() ),
689             ),
690             {},
691             $self->get_queue_id(),
692             time() - $time_in_seconds,
693             );
694 30 50       5150 croak 'Cannot execute SQL: ' . $dbh->errstr() if defined( $dbh->errstr() );
695 30 50       106 return []
696             unless defined( $rows );
697              
698             # Create objects and requeue them
699 30 50       78 carp "Creating new Queue::DBI::Element objects." if $verbose;
700 30         55 my $queue_elements = [];
701 30         96 foreach my $row ( @$rows )
702             {
703 1         7 my $queue_element = Queue::DBI::Element->new(
704             'queue' => $self,
705             'data' => $self->thaw( $row->[1] ),
706             'id' => $row->[0],
707             'requeue_count' => $row->[2],
708             'created' => $row->[3],
709             );
710             # If this item was requeued by another process since its
711             # being SELECTed a moment ago, requeue() will return failure
712             # and this process will ignore it.
713 1 50       5 push( @$queue_elements, $queue_element )
714             if $queue_element->requeue();
715             }
716 30 50       76 carp "Found " . scalar( @$queue_elements ) . " orphaned element(s)." if $verbose;
717              
718 30 50       79 carp "Leaving cleanup()." if $verbose;
719 30         178 return $queue_elements;
720             }
721              
722              
723             =head2 purge()
724              
725             Remove (permanently, caveat emptor!) queue elements based on how many times
726             they've been requeued or how old they are, and return the number of elements
727             deleted.
728              
729             # Remove permanently elements that have been requeued more than 10 times.
730             my $deleted_elements_count = $queue->purge( max_requeue_count => 10 );
731              
732             # Remove permanently elements that were created over an hour ago.
733             my $deleted_elements_count = $queue->purge( lifetime => 3600 );
734              
735             Important: locked elements are not purged even if they match the criteria, as
736             they are presumed to be currently in process and purging them would create
737             unexpected failures in the application processing them.
738              
739             Also note that I and I cannot be combined.
740              
741             =cut
742              
743             sub purge
744             {
745 7     7 1 1374 my ( $self, %args ) = @_;
746 7         22 my $verbose = $self->get_verbose();
747 7         18 my $dbh = $self->get_dbh();
748 7 50       22 carp "Entering cleanup()." if $verbose;
749              
750 7         12 my $max_requeue_count = $args{'max_requeue_count'};
751 7         12 my $lifetime = $args{'lifetime'};
752              
753             # Check parameters.
754 7 50 66     48 croak '"max_requeue_count" must be an integer'
755             if defined( $max_requeue_count ) && ( $max_requeue_count !~ m/^\d+$/ );
756 7 50 66     47 croak '"lifetime" must be an integer representing seconds'
757             if defined( $lifetime ) && ( $lifetime !~ m/^\d+$/ );
758 7 100 100     39 croak '"max_requeue_count" and "lifetime" cannot be combined, specify one OR the other'
759             if defined( $lifetime ) && defined( $max_requeue_count );
760 6 100 66     40 croak '"max_requeue_count" or "lifetime" must be specified'
761             if !defined( $lifetime ) && !defined( $max_requeue_count );
762              
763             # Prepare query clauses.
764 5 100       25 my $sql_lifetime = defined( $lifetime )
765             ? 'AND created < ' . ( time() - $lifetime )
766             : '';
767 5 100       43 my $sql_max_requeue_count = defined( $max_requeue_count )
768             ? 'AND requeue_count > ' . $dbh->quote( $max_requeue_count )
769             : '';
770              
771             # Purge the queue.
772 5   33     54 my $rows_deleted = $dbh->do(
773             sprintf(
774             q|
775             DELETE
776             FROM %s
777             WHERE queue_id = ?
778             AND lock_time IS NULL
779             %s
780             %s
781             |,
782             $dbh->quote_identifier( $self->get_queue_elements_table_name() ),
783             $sql_lifetime,
784             $sql_max_requeue_count,
785             ),
786             {},
787             $self->get_queue_id(),
788             ) || croak 'Cannot execute SQL: ' . $dbh->errstr();
789              
790 5 50       91518 carp "Leaving cleanup()." if $verbose;
791             # Account for '0E0' which means no rows affected, and translates into no
792             # rows deleted in our case.
793 5 100       67 return $rows_deleted eq '0E0'
794             ? 0
795             : $rows_deleted;
796             }
797              
798              
799             =head1 ACCESSORS
800              
801             =head2 get_max_requeue_count()
802              
803             Return how many times an element can be requeued before it is ignored when
804             retrieving elements.
805              
806             my $max_requeue_count = $queue->get_max_requeue_count();
807              
808             =cut
809              
810             sub get_max_requeue_count
811             {
812 28     28 1 865 my ( $self ) = @_;
813              
814 28         89 return $self->{'max_requeue_count'};
815             }
816              
817              
818             =head2 set_max_requeue_count()
819              
820             Set the number of time an element can be requeued before it is ignored when
821             retrieving elements. Set it to C to disable the limit.
822              
823             # Don't keep pulling the element if it has been requeued more than 5 times.
824             $queue->set_max_requeue_count( 5 );+
825              
826             # Retry without limit.
827             $queue->set_max_requeue_count( undef );
828              
829             =cut
830              
831             sub set_max_requeue_count
832             {
833 36     36 1 74 my ( $self, $max_requeue_count ) = @_;
834              
835 36 50 66     221 croak 'max_requeue_count must be an integer or undef'
836             if defined( $max_requeue_count ) && ( $max_requeue_count !~ /^\d+$/ );
837              
838 36         93 $self->{'max_requeue_count'} = $max_requeue_count;
839              
840 36         54 return;
841             }
842              
843              
844             =head2 get_lifetime()
845              
846             Return how old an element can be before it is ignored when retrieving elements.
847              
848             # Find how old an element can be before the queue will stop retrieving it.
849             my $lifetime = $queue->get_lifetime();
850              
851             =cut
852              
853             sub get_lifetime
854             {
855 33     33 1 741 my ( $self ) = @_;
856              
857 33         95 return $self->{'lifetime'};
858             }
859              
860              
861             =head2 set_lifetime()
862              
863             Set how old an element can be before it is ignored when retrieving elements.
864              
865             Set it to C to reset Queue::DBI back to its default behavior of
866             retrieving elements without time limit.
867              
868             # Don't pull queue elements that are more than an hour old.
869             $queue->set_lifetime( 3600 );
870              
871             # Pull elements without time limit.
872             $queue->set_lifetime( undef );
873              
874             =cut
875              
876             sub set_lifetime
877             {
878 38     38 1 119 my ( $self, $lifetime ) = @_;
879              
880 38 50 66     150 croak 'lifetime must be an integer or undef'
881             if defined( $lifetime ) && ( $lifetime !~ /^\d+$/ );
882              
883 38         107 $self->{'lifetime'} = $lifetime;
884              
885 38         56 return;
886             }
887              
888              
889             =head2 get_verbose()
890              
891             Return the verbosity level, which is used in the module to determine when and
892             what type of debugging statements / information should be warned out.
893              
894             See C for the possible values this function can return.
895              
896             warn 'Verbose' if $queue->get_verbose();
897              
898             warn 'Very verbose' if $queue->get_verbose() > 1;
899              
900             =cut
901              
902             sub get_verbose
903             {
904 168     168 1 298 my ( $self ) = @_;
905              
906 168         440 return $self->{'verbose'};
907             }
908              
909              
910             =head2 set_verbose()
911              
912             Control the verbosity of the warnings in the code:
913              
914             =over 4
915              
916             =item * 0 will not display any warning;
917              
918             =item * 1 will only give one line warnings about the current operation;
919              
920             =item * 2 will also usually output the SQL queries performed.
921              
922             =back
923              
924             $queue->set_verbose(1); # turn on verbose information
925              
926             $queue->set_verbose(2); # be extra verbose
927              
928             $queue->set_verbose(0); # quiet now!
929              
930             =cut
931              
932             sub set_verbose
933             {
934 36     36 1 67 my ( $self, $verbose ) = @_;
935              
936 36   50     186 $self->{'verbose'} = ( $verbose || 0 );
937              
938 36         66 return;
939             }
940              
941              
942             =head1 INTERNAL METHODS
943              
944             =head2 freeze()
945              
946             Serialize an element to store it in a SQL "text" column.
947              
948             my $frozen_data = $queue->freeze( $data );
949              
950             =cut
951              
952             sub freeze
953             {
954 16     16 1 844 my ( $self, $data ) = @_;
955              
956 16 100 66     194 return defined( $self->{'serializer'} ) && defined( $self->{'serializer'}->{'freeze'} )
957             ? $self->{'serializer'}->{'freeze'}($data)
958             : MIME::Base64::encode_base64( Storable::freeze( $data ) );
959             }
960              
961             =head2 thaw()
962              
963             Deserialize an element which was stored a SQL "text" column.
964              
965             my $thawed_data = $queue->thaw( $frozen_data );
966              
967             =cut
968              
969             sub thaw
970             {
971 29     29 1 1004 my ( $self, $data ) = @_;
972              
973 29 100 66     409 return defined( $self->{'serializer'} ) && defined( $self->{'serializer'}->{'thaw'} )
974             ? $self->{'serializer'}->{'thaw'}($data)
975             : Storable::thaw( MIME::Base64::decode_base64( $data ) );
976             }
977              
978              
979             =head1 DEPRECATED METHODS
980              
981             =head2 create_tables()
982              
983             Please use C in L instead.
984              
985             Here is an example that shows how to refactor your call to this deprecated
986             function:
987              
988             # Load the admin module.
989             use Queue::DBI::Admin;
990              
991             # Create the object which will allow managing the queues.
992             my $queues_admin = Queue::DBI::Admin->new(
993             database_handle => $dbh,
994             );
995              
996             # Create the tables required by Queue::DBI to store the queues and data.
997             $queues_admin->create_tables(
998             drop_if_exist => $boolean,
999             );
1000              
1001             =cut
1002              
1003             sub create_tables
1004             {
1005 0     0 1 0 croak 'create_tables() in Queue::DBI has been deprecated, please use create_tables() in Queue::DBI::Admin instead.';
1006             }
1007              
1008              
1009             =head2 lifetime()
1010              
1011             Please use C and C instead.
1012              
1013             =cut
1014              
1015             sub lifetime
1016             {
1017 0     0 1 0 croak 'lifetime() has been deprecated, please use get_lifetime() / set_lifetime() instead.';
1018             }
1019              
1020              
1021             =head2 verbose()
1022              
1023             Please use C and C instead.
1024              
1025             =cut
1026              
1027             sub verbose
1028             {
1029 0     0 1 0 croak 'verbose() has been deprecated, please use get_verbose() / set_verbose() instead.';
1030             }
1031              
1032              
1033             =head2 max_requeue_count()
1034              
1035             Please use C and C instead.
1036              
1037             =cut
1038              
1039             sub max_requeue_count
1040             {
1041 0     0 1 0 croak 'max_requeue_count() has been deprecated, please use get_max_requeue_count() / set_max_requeue_count() instead.';
1042             }
1043              
1044              
1045             =head1 INTERNAL METHODS
1046              
1047             =head2 get_dbh()
1048              
1049             Returns the database handle used for this queue.
1050              
1051             my $dbh = $queue->get_dbh();
1052              
1053             =cut
1054              
1055             sub get_dbh
1056             {
1057 137     137 1 458 my ( $self ) = @_;
1058              
1059 137         335 return $self->{'dbh'};
1060             }
1061              
1062              
1063             =head2 get_queues_table_name()
1064              
1065             Returns the name of the table used to store queue definitions.
1066              
1067             my $queues_table_name = $queue->get_queues_table_name();
1068              
1069             =cut
1070              
1071             sub get_queues_table_name
1072             {
1073 42     42 1 70 my ( $self ) = @_;
1074              
1075 42 100 66     854 return defined( $self->{'table_names'}->{'queues'} ) && ( $self->{'table_names'}->{'queues'} ne '' )
1076             ? $self->{'table_names'}->{'queues'}
1077             : $DEFAULT_QUEUES_TABLE_NAME;
1078             }
1079              
1080              
1081             =head2 get_queue_elements_table_name()
1082              
1083             Returns the name of the table used to store queue definitions.
1084              
1085             my $queue_elements_table_name = $queue->get_queue_elements_table_name();
1086              
1087             =cut
1088              
1089             sub get_queue_elements_table_name
1090             {
1091 165     165 1 301 my ( $self ) = @_;
1092              
1093 165 50 33     2071 return defined( $self->{'table_names'}->{'queue_elements'} ) && ( $self->{'table_names'}->{'queue_elements'} ne '' )
1094             ? $self->{'table_names'}->{'queue_elements'}
1095             : $DEFAULT_QUEUE_ELEMENTS_TABLE_NAME;
1096             }
1097              
1098              
1099             =head1 BUGS
1100              
1101             Please report any bugs or feature requests through the web interface at
1102             L.
1103             I will be notified, and then you'll automatically be notified of progress on
1104             your bug as I make changes.
1105              
1106              
1107             =head1 SUPPORT
1108              
1109             You can find documentation for this module with the perldoc command.
1110              
1111             perldoc Queue::DBI
1112              
1113              
1114             You can also look for information at:
1115              
1116             =over 4
1117              
1118             =item * GitHub's request tracker
1119              
1120             L
1121              
1122             =item * AnnoCPAN: Annotated CPAN documentation
1123              
1124             L
1125              
1126             =item * CPAN Ratings
1127              
1128             L
1129              
1130             =item * MetaCPAN
1131              
1132             L
1133              
1134             =back
1135              
1136              
1137             =head1 AUTHOR
1138              
1139             L,
1140             C<< >>.
1141              
1142              
1143             =head1 ACKNOWLEDGEMENTS
1144              
1145             I originally developed this project for ThinkGeek
1146             (L). Thanks for allowing me to open-source it!
1147              
1148              
1149             =head1 COPYRIGHT & LICENSE
1150              
1151             Copyright 2009-2014 Guillaume Aubert.
1152              
1153             This program is free software: you can redistribute it and/or modify it under
1154             the terms of the GNU General Public License version 3 as published by the Free
1155             Software Foundation.
1156              
1157             This program is distributed in the hope that it will be useful, but WITHOUT ANY
1158             WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
1159             PARTICULAR PURPOSE. See the GNU General Public License for more details.
1160              
1161             You should have received a copy of the GNU General Public License along with
1162             this program. If not, see http://www.gnu.org/licenses/
1163              
1164             =cut
1165              
1166             1;