File Coverage

blib/lib/Data/Consumer.pm
Criterion Covered Total %
statement 115 174 66.0
branch 31 84 36.9
condition 15 34 44.1
subroutine 18 28 64.2
pod 21 21 100.0
total 200 341 58.6


line stmt bran cond sub pod time code
1             package Data::Consumer;
2              
3 11     11   57125 use warnings;
  11         28  
  11         355  
4 11     11   60 use strict;
  11         21  
  11         256  
5 11     11   53 use Carp qw(confess cluck);
  11         19  
  11         555  
6 11     11   52 use vars qw/$Debug $VERSION $Fail $Cmd/;
  11         22  
  11         9644  
7              
8             # This code was formatted with the following perltidy options:
9             # -ple -ce -bbb -bbc -bbs -nolq -l=100 -noll -nola -nwls='=' -isbc -nolc -otr -kis
10             # If you patch it please use the same options for your patch.
11              
12             =head1 NAME
13              
14             Data::Consumer - Repeatedly consume a data resource in a robust way
15              
16             =head1 VERSION
17              
18             Version 0.17
19              
20             =cut
21              
22             $VERSION= '0.17';
23              
24             =head1 SYNOPSIS
25              
26             use Data::Consumer;
27              
28             my $consumer = Data::Consumer->new(
29             type => $consumer_name,
30             unprocessed => $unprocessed,
31             working => $working,
32             processed => $processed,
33             failed => $failed,
34             max_passes => $num_or_undef,
35             max_process => $num_or_undef,
36             max_elapsed => $seconds_or_undef,
37             );
38              
39             $consumer->consume( sub {
40             my $id = shift;
41             print "processed $id\n";
42             } );
43              
44             =head1 DESCRIPTION
45              
46             It is a common requirement to need to process a feed of items of some
47             sort in a robust manner. Such a feed might be records that are inserted
48             into a table, or files dropped in a delivery directory.
49             Writing a script that handles all the edge cases, like getting "stuck"
50             on a failed item, and manages things like locking so that the script
51             can be parallelized can be tricky and is certainly repetitive.
52              
53             The aim of L is to provide a framework to allow writing
54             such consumer type scripts as easy as writing a callback that processes
55             each item. The framework handles the rest.
56              
57             The basic idea is that one need only use, or in the case of a feed type
58             not already supported, define a L subclass
59             which implements a few reasonably well defined primitive methods which
60             handle the required tasks, and then the L methods use
61             those to provide a DWIMily consistent interface to the end consumer.
62              
63             Currently L is distributed with two subclasses, (well
64             three actually, but L is deprecated in favour
65             of L) L for handling
66             records in a MySQL db (using the MySQL C function), and
67             L for handling a drop directory scenario (like
68             for FTP or a mail directory).
69              
70             Once a resource type has been defined as a L subclass
71             the use pattern is to construct the subclass with the appropriate
72             arguments, and then call consume with a callback.
73              
74             =head2 The Consumer Pattern
75              
76             The consumer pattern is where code wants to consume an 'atomic' resource
77             piece by piece. The consuming code doesn't really want to worry much
78             about how they got the piece, a task that should be handled by the framework.
79             The consumer subclasses assume that the resource can be modeled as a
80             queue (that there is some ordering principle by which they can be processed
81             in a predictable sequence). The consume pattern in full glory is something
82             very close to the following following pseudo code. The items marked with
83             asterisks are where user callbacks may be invoked:
84              
85             DO
86             RESET TO THE BEGINNING OF THE QUEUE
87             WHILE QUEUE NOT EMPTY AND CAN *PROCEED*
88             ACQUIRE NEXT ITEM TO PROCESS FROM QUEUE
89             MARK AS 'WORKING'
90             *PROCESS* ITEM
91             IF PROCESSING FAILED
92             MARK AS 'FAILED'
93             OTHERWISE
94             MARK AS 'PROCESSED'
95             SWEEP UP ABANDONDED 'WORKING' ITEMS AND MARK THEM AS 'FAILED'
96             UNTIL WE CANNOT *PROCEED* OR NOTHING WAS PROCESSED
97             RELEASE ANY LOCKS STILL HELD
98              
99             This implies that each item potentially has four states: C,
100             C, C and C. In a database these might be
101             values in a field, in a drop directory scenario these would be different
102             directories, but with all of them they would normally be supplied as
103             values to the L subclass being created.
104              
105             =head2 Subclassing Data::Consumer
106              
107             L can be used with any resource type that can be modeled
108             as a queue, supports some form of advisory locking mechanism, and
109             provides a way to discriminate between at least the C and
110             C state.
111              
112             The routines that must be defined for a new consumer type are C,
113             C, C, C, and C<_mark_as()>,
114             C<_do_callback()>.
115              
116             =over 4
117              
118             =item new
119              
120             It is almost for sure that a subclass will need to override the default
121             constructor. All L objects are blessed hashes, and in
122             fact you should always call the parents classes constructor first with:
123              
124             my $self= $class->SUPER::new();
125              
126             =item reset
127              
128             This routine is used to reset the objects internal state so the next call to acquire
129             will return the first available item in the queue.
130              
131             =item acquire
132              
133             This routine is to find and in some way lock the next item in the queue. It should ensure
134             that it call is_ignored() on each item to verify the item has not been requested to be
135             ignored.
136              
137             =item release
138              
139             This routine is to release any held locks in the object.
140              
141             =item _mark_as
142              
143             This routine is called to "mark" an item as a particular state. It
144             should be able to handle user supplied values. For instance
145             L implements this as an update statement that
146             maps user supplied values to the consumer state names.
147              
148             Possible states are: C, C, C,
149             C.
150              
151             =item _do_callback
152              
153             This routine is used to call the user supplied callback with the correct
154             arguments. What arguments are appropriate for the callback are context
155             dependent on the type of class. For instance in L
156             calls the callback with the arguments C<($consumer, $id, $dbh)> whereas
157             L calls the callback with the arguments
158             C<($consumer, $filespec, $filehandle, $filename)>. The point is that the
159             end user should be passed the arguments that make sense, not necessarily
160             the same thing for each consumer type.
161              
162             =back
163              
164             Every well-behaved L subclass should include the
165             functional equivalent of the following code in its .pm file:
166              
167             use base 'Data::Consumer';
168             __PACKAGE__->register();
169              
170             This will ensure that it can be properly loaded by
171             C<< Data::Consumer->new(type=>$shortname) >>.
172              
173             It is also normal for a L subclass to provide special
174             methods as needed. For instance C<< Data::Consumer::Dir->fh() >> and
175             C<< Data::Consumer::MySQL->dbh() >>.
176              
177              
178              
179             =head1 METHODS
180              
181             =head2 CLASS->new(%opts)
182              
183             Constructor. Normally L's constructor is not called
184             directly, instead the constructor of a subclass is used. However to
185             make it easier to have a data driven load process L
186             accepts the C argument which should specify the the short name of
187             the subclass (the part after C) or the full name of
188             the subclass.
189              
190             Thus
191              
192             Data::Consumer->new(type=>'MySQL',%args);
193              
194             is exactly equivalent to calling
195              
196             Data::Consumer::MySQL->new(%args);
197              
198             except that the former will automatically require or use the appropriate module
199             and the latter necessitates that you do so yourself.
200              
201             Every L subclass constructor supports the following
202             arguments on top of any that are subclass specific. Additionally some
203             arguments are universally used, but have different meaning depending on
204             the subclass.
205              
206             =over 4
207              
208             =item unprocessed
209              
210             How to tell if the item is unprocessed.
211              
212             How this argument is interpreted depends on the L
213             subclass involved.
214              
215             =item working
216              
217             How to tell if the item is currently being worked on.
218              
219             How this argument is interpreted depends on the L
220             subclass involved.
221              
222             =item processed
223              
224             How to tell if the item has already been worked on.
225              
226             How this argument is interpreted depends on the L
227             subclass involved.
228              
229             =item failed
230              
231             How to tell if processing failed while handling the item.
232              
233             How this argument is interpreted depends on the L
234             subclass involved.
235              
236             =item max_passes => $num_or_undef
237              
238             Normally C will loop through the data set until it is
239             exhausted. By setting this parameter you can control the maximum number
240             of iterations, for instance setting it to C<1> will result in a single
241             pass through the data per invocation. If C<0> (or any other false value)
242             is treated as meaning "loop until exhausted".
243              
244             =item max_processed => $num_or_undef
245              
246             Maximum number of items to process per invocation.
247              
248             If set to a false value there is no limit.
249              
250             =item max_failed => $num_or_undef
251              
252             Maximum number of failed process attempts that may occur before consume will stop.
253             If set to a false value there is no limit. Setting this to 1 will cause processing
254             to stop after the first failure.
255              
256             =item max_elapsed => $seconds_or_undef
257              
258             Maximum amount of time that may have elapsed when starting a new
259             process. If more than this value has elapsed then no further processing
260             occurs. If C<0> (or any false value) then there is no time limit.
261              
262             =item proceed => $code_ref
263              
264             This is a callback that may be used to control the looping process in
265             consume via the C method. See the documentation of
266             C and C
267              
268             =item sweep => $bool
269              
270             *** NOTE CURRENTLY THIS OPTION IS DISABLED ***
271              
272             If this parameter is true, and there are four modes defined
273             (C, C, C, C) then consume will
274             perform a "sweep up" after every pass, which is responsible for moving
275             "abandonded" files from the working directory (such as from a previous
276             process that segfaulted during processing). Generally this should
277             not be necessary.
278              
279             =back
280              
281              
282             =head2 CLASS->register(@alias)
283              
284             Used by subclasses to register themselves as a L
285             subclass and register any additional aliases that the class may be
286             identified as.
287              
288             Will throw an exception if any of the aliases are already associated to
289             a different class.
290              
291             When called on a subclass in list context returns a list of the
292             subclasses registered aliases,
293              
294             If called on L in list context returns a list of all
295             alias class mappings.
296              
297             =cut
298              
299              
300              
301             =head2 $class_or_object->debug_warn_hook()
302              
303             Specify a callback to use to capture diagnostics data produced
304             by a Data::Consumer object.
305              
306             If called as a class method, sets the default object for all
307             Data::Consumer objects that have not explicitly set a hook.
308              
309             If called as an object method, sets the hook to use for that
310             object alone.
311              
312             Returns the current effective hook. Defaults to use
313             the C method for the object. Thus
314             it can be overridden by a subclass if necessary.
315              
316             The hook will be called with the arguments
317              
318             ($consumer,$level,@lines)
319              
320             and is not expected to return anything.
321              
322             =cut
323              
324             my $debug_warn_hook;
325             sub debug_warn_hook {
326 96     96 1 176 my $self= shift;
327 96 50       503 if (@_) {
328 0 0       0 if (ref $self) {
329 0         0 $self->{debug_warn_hook}= shift;
330             } else {
331 0         0 $debug_warn_hook= shift;
332             }
333             }
334 96 50 66     678 if (ref $self and defined $self->{debug_warn_hook}) {
335 0         0 return $self->{debug_warn_hook};
336             }
337 96   33     1007 return $debug_warn_hook || $self->can('default_debug_warn');
338             }
339              
340             =head2 $class_or_object->default_debug_warn($level,$debug);
341              
342             Use warn to output diagnostics. Message includes the process id
343             and the class name.
344              
345             =cut
346              
347             sub default_debug_warn {
348 96     96 1 251 my $self= shift;
349 96         299 my $level= shift;
350 96 50       520 cluck($level) if $level=~/\D/;
351 96         377 my $debug_level= $self->debug_level;
352 96 50       380 if ( $debug_level > $level ) {
353 0   0     0 warn ref($self) || $self, "\t$$\t>>> $_\n" for @_;
354             }
355             }
356              
357             =head2 $class_or_object->debug_level($level,@debug_lines)
358              
359             Set the minimum debug level.
360              
361             When called as an object method sets the value of that object
362             alone. undef is distinct from 0 in that undef results in
363             the global debug level being used for that object.
364              
365             When called as a class method sets the value for all objects
366             which do not have a defined debug level.
367              
368             Returns the current effective debug level for the object or
369             class.
370              
371             =cut
372              
373              
374             sub debug_level {
375 96     96 1 189 my $self= shift;
376 96 50       278 if (@_) {
377 0 0       0 if (ref $self) {
378 0         0 $self->{debug_level}= shift;
379             } else {
380 0         0 $Debug= shift;
381             }
382             }
383 96 50 66     518 if (ref $self and defined $self->{debug_level}) {
384 0         0 return $self->{debug_level};
385             }
386 96   50     476 return $Debug || 0;
387              
388             }
389              
390             =head2 $class_or_object->debug_warn($level,@debug_lines)
391              
392             If the current debugging level is above C<$level> then call
393             the current debug_warn_hook() to output a set of diagnostic
394             messages.
395              
396             =cut
397              
398              
399             sub debug_warn {
400 96     96 1 271 my $self=shift;
401 96         255 my $level=shift;
402 96         323 my $hook=$self->debug_warn_hook;
403 96 100 50     587 my $pfx= ref $self ? $self->{debug_pfx} || '' : '';
404 96         342 $hook->($self,$level,map { $pfx.$_ } @_);
  96         532  
405             }
406              
407 0         0 BEGIN {
408 11     11   48 my %alias2class;
409             my %class2alias;
410 11 50 33     2172 $Debug and $Debug >= 5 and warn "\n";
411              
412             sub register {
413 10     10 1 39 my $class= shift;
414              
415 10 50       37 ref $class
416             and confess "register() is a class method and cannot be called on an object\n";
417 10         21 my $pack= __PACKAGE__;
418              
419 10 50       39 if ( $class eq $pack ) {
420 0 0       0 return wantarray ? %alias2class : 0 + keys %alias2class;
421             }
422              
423 10         178 ( my $std_name= $class ) =~ s/^\Q$pack\E:://;
424 10         33 $std_name =~ s/::/-/g;
425              
426 10         15 my @failed;
427 10         26 for my $name ( $class, $std_name, @_ ) {
428 20 50 33     64 if ( $alias2class{$name} and $alias2class{$name} ne $class ) {
429 0         0 push @failed, $name;
430 0         0 next;
431             }
432 20         88 __PACKAGE__->debug_warn( 5, "registered '$name' as an alias of '$class'" );
433 20         45 $alias2class{$name}= $class;
434 20         51 $class2alias{$class}{$name}= $class;
435             }
436             @failed
437             and confess "Failed to register aliases for '$class' as they are already used\n",
438 10 50       29 join( "\n", map { "\t'$_' is already assigned to '$alias2class{$_}'" } @failed ),
  0         0  
439             "\n";
440 10 50       33 return wantarray ? %{ $class2alias{$class} } : 0 + keys %{ $class2alias{$class} };
  0         0  
  10         8229  
441             }
442              
443             sub new {
444 5     5 1 47 my ( $class, %opts )= @_;
445 5 50       68 ref $class
446             and confess "new() is a class method and cannot be called on an object\n";
447              
448 5 50       45 if ( $class eq __PACKAGE__ ) {
449             my $type= $opts{type}
450 0 0       0 or confess "'type' is a mandatory named parameter for $class->new()\n";
451 0         0 my $full = $type;
452 0 0       0 if (!$alias2class{$full}) {
453 0 0       0 if ($full!~/::/) {
454 0         0 $full=~s/-/::/g;
455 0         0 $full=join '::',$class,$full;
456             }
457 0 0       0 eval "require $full; 1"
458             or confess "'type' parameter '$type' could not be loaded properly: $@\n";
459             }
460 0 0       0 $class= $alias2class{$full}
461             or confess "'type' parameter '$type' mapped to '$full' which does not seem to exist\n";
462             }
463 5         70 my $object= bless {}, $class;
464 5         192 $class->debug_warn( 5, "created new object '$object'" );
465 5         23 return $object;
466             }
467             }
468              
469             =head2 $object->last_id()
470              
471             Returns the identifier for the last item acquired.
472              
473             Returns undef if acquire has never been called or if the last
474             attempt to acquire data failed because none was available.
475              
476             =cut
477              
478             sub last_id {
479 150     150 1 325 my $self= shift;
480 150         637 return $self->{last_id};
481             }
482              
483             # Until i figure out to make gedit handle begin/end directives this has to
484             # stay commented out
485             #=begin dev
486             #
487             #=head2 $object->_mark_as($type,$id)
488             #
489             #** Must be overriden **
490             #
491             #Mark an item as a particular type if the object defines that type.
492             #
493             #This is wrapped by mark_as() for error checking, so you are guaranteed
494             #that $type will be one of
495             #
496             # 'unprocessed', 'working', 'processed', 'failed'
497             #
498             #and that $object->{$type} will be true value, and that $id will be from
499             #the currently acquired item.
500             #
501             #=end dev
502              
503             =head2 $object->mark_as($type)
504              
505             Mark an item as a particular type if the object defines that type.
506              
507             Allowed types are C, C, C, C
508              
509             =cut
510              
511 0     0   0 sub _mark_as { confess "must be overriden" }
512              
513 0         0 BEGIN {
514 11     11   45 my ( %valid, @valid );
515 11         29 @valid= qw ( unprocessed working processed failed );
516 11         9326 @valid{@valid}= ( 1 .. @valid );
517              
518             sub mark_as {
519 100     100 1 375 my $self= shift @_;
520 100         233 my $key= shift @_;
521              
522             $valid{$key}
523             or confess "Unknown type in mark_as(), valid options are ",
524 100 50       400 join( ", ", map { "'$_'" } @valid ),
  0         0  
525             "\n";
526              
527 100 50       494 my $id= @_ ? shift @_ : $self->last_id;
528 100 50       370 defined $id
529             or confess "Nothing acquired to be marked as '$key' in mark_as.\n";
530              
531 100 50       388 return unless defined $self->{$key};
532 100         463 return $self->_mark_as( $key, $id );
533             }
534             }
535              
536             =head2 $object->process($callback)
537              
538             Marks the current item as C and processes it using the
539             C<$callback>. If the C<$callback> dies then the item is marked as
540             C, otherwise the item is marked as C once the
541             C<$callback> returns. The return value of the C<$callback> is ignored.
542              
543             C<$callback> will be called with at least two arguments, the first being
544             the $consumer object itself, and the second being an identifier for the
545             current record. Normally additional, likely to be useful, arguments are
546             provided as well, on a per subclass basis. For example
547             L will pass in the consumer object, the id of the to
548             be processed record, and a copy of the consumers database handle as well for
549             convenience. On the other hand L will pass in the
550             consumer object, followed by a filespecification for the file to be
551             processed, an open filehandle to the file, and the filename itself (with
552             no path).
553              
554             The callback may call the methods 'leave', 'ignore', 'fail', and 'halt' on
555             the consumer object before returning, typically by doing something like
556              
557             return $consumer->ignore;
558              
559             this allows the callback to send specific signals to consume, specifically
560              
561             leave : return the item to the unprocessed state after the callback returns.
562             ignore : return the item to the unprocessed state after the callback returns
563             and never attempt to process it again with this consumer object.
564             fail : same result as dieing in a callback, except without throwing an exception
565             in the situation where there might be $SIG{__DIE__} hooks to worry about.
566             halt : stop the consume() process after this has been executed
567              
568             For further details always consult the relevant subclasses documentation for
569             C
570              
571             =cut
572              
573             sub process {
574 50     50 1 119 my $self= shift;
575 50         105 my $callback= shift;
576 50         117 delete $self->{fail};
577 50         168 my $id= $self->last_id;
578 50 50       172 defined $id
579             or $self->error("Undefined last_id. Nothing acquired yet?");
580 50         248 $self->mark_as('working');
581 50         124 local $Cmd;
582 50         105 delete $self->{defer_leave};
583 50         207 my $error= $self->_do_callback($callback);
584 50   33     676 $error ||= $self->{fail};
585 50 50       201 if ( $error ) {
586 0         0 $self->mark_as('failed');
587 0         0 $self->error($error);
588             } else {
589 50 50       311 if ($self->{defer_leave}) {
590 0         0 $self->mark_as('unprocessed');
591             } else {
592 50         389 $self->mark_as('processed');
593             }
594             }
595 50         257 return 1;
596             }
597              
598              
599             =head2 $consumer->leave()
600              
601             Sometimes its useful to defer processing. This method when called
602             from within a consume/process callback will result in the
603             item being marked as 'unprocessed' after the callback returns
604             (so long as it does not die).
605              
606             Typically this is invoked as
607              
608             return $consumer->leave;
609              
610             from withing a consume/process callback.
611              
612             Returns $consumer. Will die if not 'unprocessed' state is defined.
613              
614             =cut
615              
616             sub leave {
617 0     0 1 0 my $self= shift;
618 0 0       0 confess("Can't leave as 'unprocessed' is undefined!") if not defined $self->{unprocessed};
619 0         0 $self->{defer_leave}++;
620 0         0 return $self;
621             }
622              
623             =head2 $consumer->ignore(@list)
624              
625             This can used to cause acquire to ignore each item in @list.
626              
627             If @list is empty then it is assumed it is being called from
628             within consume/process and marks the currently acquired item
629             as ignored and calls C<< $consumer->leave() >>.
630              
631             Returns $consumer. Will die if no 'unprocessed' state is defined.
632              
633             =cut
634              
635              
636             sub ignore {
637 0     0 1 0 my $self= shift;
638 0 0       0 if (@_) {
639 0         0 for my $id (@_) {
640 0         0 $self->{ignore}{$id}++;
641             }
642             } else {
643 0         0 my $id= $self->last_id;
644 0         0 $self->{ignore}{$id}++;
645 0         0 $self->leave;
646             }
647 0         0 return $self;
648             }
649              
650             =head2 $consumer->fail($message)
651              
652             Same as doing C from within a consume/process callback except
653             that no exception is thrown (no C<$SIG{__DIE__}> callbacks are invoked) and
654             the error is deferred until the callback actually returns.
655              
656             Typically used as
657              
658             return $consumer->fail;
659              
660             from within a consumer() callback.
661              
662             Returns the $consumer object.
663              
664             =cut
665              
666             sub fail {
667 0     0 1 0 my $self= shift;
668 0         0 $self->{fail}= shift;
669 0         0 return $self;
670             }
671              
672             =head2 $consumer->halt()
673              
674             Causes consume() to halt processing and exit once
675             the callback returns. Typically invoked like
676              
677             return $consumer->halt;
678              
679             or
680              
681             return $consumer->fail->halt;
682              
683             Returns the consumer object.
684              
685             =cut
686              
687              
688             sub halt {
689 0     0 1 0 my $self= shift;
690 0         0 $self->{halt}++;
691 0         0 return $self;
692             }
693              
694              
695              
696             =head2 $object->is_ignored($id)
697              
698             Returns true if an item has been set to be ignored. If $id is omitted
699             defaults to last_id
700              
701             =cut
702              
703             sub is_ignored {
704 240     240 1 467 my $self= shift;
705 240 50       663 my $id= @_ ? shift @_ : $self->last_id;
706 240 50       612 return if !defined $id;
707 240 50       999 return $self->{ignore}{$id} ? 1 : 0
708             }
709              
710             =head2 $object->reset()
711              
712             Reset the state of the object.
713              
714             =head2 $object->acquire()
715              
716             Acquire an item to be processed.
717              
718             Returns an identifier to be used to identify the item acquired.
719              
720             =head2 $object->release()
721              
722             Release any locks on the currently held item.
723              
724             Normally there is no need to call this directly.
725              
726             =cut
727              
728 0     0 1 0 sub reset { confess "abstract method must be overriden by subclass\n"; }
729 0     0 1 0 sub acquire { confess "abstract method must be overriden by subclass\n"; }
730 0     0 1 0 sub release { confess "abstract method must be overriden by subclass\n"; }
731              
732             =head2 $object->error()
733              
734             Calls the C callback if the user has provided one, otherwise
735             calls C. Probably not all that useful for an end user.
736              
737             =cut
738              
739             sub error {
740 0     0 1 0 my $self= shift;
741 0 0       0 if ( $self->{error} ) {
742 0         0 $self->{error}->(@_);
743             } else {
744 0         0 confess @_;
745             }
746             }
747              
748             =head2 $object->consume($callback)
749              
750             Consumes a data resource until it is exhausted using C,
751             C, and C as appropriate. Normally this is the main
752             method used by external processes.
753              
754             Before each attempt to acquire a new resource, and once at the end of
755             each pass consume will call C to determine if it can do so.
756             The user may hook into this by specifying a callback in the constructor.
757             This callback will be executed with no args when it is in the inner loop
758             (per item), and with the number of passes at the end of each pass
759             (starting with 1).
760              
761             =head2 $object->proceed($passes)
762              
763             Returns C if the conditions specified at construction time are
764             satisfied and processing may proceed. Returns C otherwise.
765              
766             If the user has specified a C callback in the constructor then
767             this will be executed before any other rules are applied, with a
768             reference to the current C<$object>, a reference to the runstats, and if
769             being called at the end of pass with the number of passes.
770              
771             If this callback returns C then the other rules will be applied,
772             and only if all other conditions from the constructor are satisfied
773             will C itself return C.
774              
775             =head2 $object->runstats()
776              
777             Returns a reference to a hash of statistics about the last (or currently running)
778             execution of consume. Example:
779              
780             {
781             'passes' => 2,
782             'processed_this_pass' => 0,
783             'processed' => 3,
784             'start_time' => 1209750962,
785             'failed' => 0,
786             'elapsed' => 0,
787             'end_time' => 1209750962,
788             'failed_this_pass' => 0
789             }
790              
791             Note that start_time and end_time are unix timestamps.
792              
793             =cut
794              
795 0     0 1 0 sub runstats { $_[0]->{runstats} }
796              
797             sub proceed {
798 70     70 1 245 my $self= shift;
799 70         183 my $runstats= $self->{runstats};
800 70         201 $runstats->{end_time}= time;
801 70         233 $runstats->{elapsed}= $runstats->{end_time} - $runstats->{start_time};
802              
803 70 50       542 if ( my $cb= $self->{proceed} ) {
804 0 0       0 $cb->( $self, $self->{runstats}, @_ ) # pass on the $passes argument if its there
805             or return;
806             }
807 70         335 for my $key (qw(elapsed passes processed failed)) {
808 280         619 my $max= "max_$key";
809 280 50 33     1195 return if $self->{$max} && $runstats->{$key} >= $self->{$max};
810             }
811 70 50       240 return if $self->{halt};
812 70         483 return 1;
813             }
814              
815             sub consume {
816 5     5 1 188 my $self= shift;
817 5         10 my $callback= shift;
818              
819 5         12 my $passes= 0;
820              
821 5 50       14 unless ($self->{runstats}) {
822 5         17 $self->{runstats}= {};
823             $self->{runstats}{$_}= 0
824 5         40 for qw(passes processed failed processed_this_pass failed_this_pass);
825             }
826              
827 5         11 my $runstats= $self->{runstats};
828 5         25 $runstats->{start_time}= time;
829              
830 5         28 $self->reset();
831             do {
832 10         28 ++$runstats->{passes};
833 10         24 $runstats->{processed_this_pass}= $runstats->{failed_this_pass}= 0;
834 10   66     68 while ( $self->proceed && defined( my $item= $self->acquire ) ) {
835             eval {
836 50         283 $self->process($callback);
837 50         250 $runstats->{processed_this_pass}++;
838 50         139 $runstats->{processed}++;
839 50         598 1;
840             }
841 50 50       120 or do {
842 0         0 $runstats->{failed_this_pass}++;
843 0         0 $runstats->{failed}++;
844              
845             # quotes force string copy
846 0         0 $self->debug_warn(5, "Failed during \$self->process(\$callback): $@");
847             }
848             }
849             } while $self->proceed( $runstats->{passes} )
850 5   66     10 && $runstats->{processed_this_pass};
851              
852             # if we still hold a lock let it go.
853 5         17 delete $self->{halt};
854 5         30 $self->release;
855 5         21 return $runstats;
856             }
857              
858              
859             =head1 AUTHOR
860              
861             Yves Orton, C<< >>
862              
863             =head1 BUGS
864              
865             Please report any bugs or feature requests to
866             C, or through the web interface at
867             L.
868              
869             I will be notified, and then you'll automatically be notified of progress on
870             your bug as I make changes.
871              
872             =head1 SUPPORT
873              
874             You can find documentation for this module with the perldoc command.
875              
876             perldoc Data::Consumer
877              
878              
879             You can also look for information at:
880              
881             =over 4
882              
883             =item * RT: CPAN's request tracker
884              
885             L
886              
887             =item * AnnoCPAN: Annotated CPAN documentation
888              
889             L
890              
891             =item * CPAN Ratings
892              
893             L
894              
895             =item * Search CPAN
896              
897             L
898              
899             =back
900              
901              
902             =head1 ACKNOWLEDGEMENTS
903              
904             Igor Sutton for ideas, testing and support
905              
906             =head1 COPYRIGHT & LICENSE
907              
908             Copyright 2008, 2010, 2011 Yves Orton, all rights reserved.
909              
910             This program is free software; you can redistribute it and/or modify it
911             under the same terms as Perl itself.
912              
913             =cut
914              
915             1; # End of Data::Consumer
916