| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
package Data::Consumer; |
|
2
|
|
|
|
|
|
|
|
|
3
|
11
|
|
|
11
|
|
57125
|
use warnings; |
|
|
11
|
|
|
|
|
28
|
|
|
|
11
|
|
|
|
|
355
|
|
|
4
|
11
|
|
|
11
|
|
60
|
use strict; |
|
|
11
|
|
|
|
|
21
|
|
|
|
11
|
|
|
|
|
256
|
|
|
5
|
11
|
|
|
11
|
|
53
|
use Carp qw(confess cluck); |
|
|
11
|
|
|
|
|
19
|
|
|
|
11
|
|
|
|
|
555
|
|
|
6
|
11
|
|
|
11
|
|
52
|
use vars qw/$Debug $VERSION $Fail $Cmd/; |
|
|
11
|
|
|
|
|
22
|
|
|
|
11
|
|
|
|
|
9644
|
|
|
7
|
|
|
|
|
|
|
|
|
8
|
|
|
|
|
|
|
# This code was formatted with the following perltidy options: |
|
9
|
|
|
|
|
|
|
# -ple -ce -bbb -bbc -bbs -nolq -l=100 -noll -nola -nwls='=' -isbc -nolc -otr -kis |
|
10
|
|
|
|
|
|
|
# If you patch it please use the same options for your patch. |
|
11
|
|
|
|
|
|
|
|
|
12
|
|
|
|
|
|
|
=head1 NAME |
|
13
|
|
|
|
|
|
|
|
|
14
|
|
|
|
|
|
|
Data::Consumer - Repeatedly consume a data resource in a robust way |
|
15
|
|
|
|
|
|
|
|
|
16
|
|
|
|
|
|
|
=head1 VERSION |
|
17
|
|
|
|
|
|
|
|
|
18
|
|
|
|
|
|
|
Version 0.17 |
|
19
|
|
|
|
|
|
|
|
|
20
|
|
|
|
|
|
|
=cut |
|
21
|
|
|
|
|
|
|
|
|
22
|
|
|
|
|
|
|
$VERSION= '0.17'; |
|
23
|
|
|
|
|
|
|
|
|
24
|
|
|
|
|
|
|
=head1 SYNOPSIS |
|
25
|
|
|
|
|
|
|
|
|
26
|
|
|
|
|
|
|
use Data::Consumer; |
|
27
|
|
|
|
|
|
|
|
|
28
|
|
|
|
|
|
|
my $consumer = Data::Consumer->new( |
|
29
|
|
|
|
|
|
|
type => $consumer_name, |
|
30
|
|
|
|
|
|
|
unprocessed => $unprocessed, |
|
31
|
|
|
|
|
|
|
working => $working, |
|
32
|
|
|
|
|
|
|
processed => $processed, |
|
33
|
|
|
|
|
|
|
failed => $failed, |
|
34
|
|
|
|
|
|
|
max_passes => $num_or_undef, |
|
35
|
|
|
|
|
|
|
max_process => $num_or_undef, |
|
36
|
|
|
|
|
|
|
max_elapsed => $seconds_or_undef, |
|
37
|
|
|
|
|
|
|
); |
|
38
|
|
|
|
|
|
|
|
|
39
|
|
|
|
|
|
|
$consumer->consume( sub { |
|
40
|
|
|
|
|
|
|
my $id = shift; |
|
41
|
|
|
|
|
|
|
print "processed $id\n"; |
|
42
|
|
|
|
|
|
|
} ); |
|
43
|
|
|
|
|
|
|
|
|
44
|
|
|
|
|
|
|
=head1 DESCRIPTION |
|
45
|
|
|
|
|
|
|
|
|
46
|
|
|
|
|
|
|
It is a common requirement to need to process a feed of items of some |
|
47
|
|
|
|
|
|
|
sort in a robust manner. Such a feed might be records that are inserted |
|
48
|
|
|
|
|
|
|
into a table, or files dropped in a delivery directory. |
|
49
|
|
|
|
|
|
|
Writing a script that handles all the edge cases, like getting "stuck" |
|
50
|
|
|
|
|
|
|
on a failed item, and manages things like locking so that the script |
|
51
|
|
|
|
|
|
|
can be parallelized can be tricky and is certainly repetitive. |
|
52
|
|
|
|
|
|
|
|
|
53
|
|
|
|
|
|
|
The aim of L is to provide a framework to allow writing |
|
54
|
|
|
|
|
|
|
such consumer type scripts as easy as writing a callback that processes |
|
55
|
|
|
|
|
|
|
each item. The framework handles the rest. |
|
56
|
|
|
|
|
|
|
|
|
57
|
|
|
|
|
|
|
The basic idea is that one need only use, or in the case of a feed type |
|
58
|
|
|
|
|
|
|
not already supported, define a L subclass |
|
59
|
|
|
|
|
|
|
which implements a few reasonably well defined primitive methods which |
|
60
|
|
|
|
|
|
|
handle the required tasks, and then the L methods use |
|
61
|
|
|
|
|
|
|
those to provide a DWIMily consistent interface to the end consumer. |
|
62
|
|
|
|
|
|
|
|
|
63
|
|
|
|
|
|
|
Currently L is distributed with two subclasses, (well |
|
64
|
|
|
|
|
|
|
three actually, but L is deprecated in favour |
|
65
|
|
|
|
|
|
|
of L) L for handling |
|
66
|
|
|
|
|
|
|
records in a MySQL db (using the MySQL C function), and |
|
67
|
|
|
|
|
|
|
L for handling a drop directory scenario (like |
|
68
|
|
|
|
|
|
|
for FTP or a mail directory). |
|
69
|
|
|
|
|
|
|
|
|
70
|
|
|
|
|
|
|
Once a resource type has been defined as a L subclass |
|
71
|
|
|
|
|
|
|
the use pattern is to construct the subclass with the appropriate |
|
72
|
|
|
|
|
|
|
arguments, and then call consume with a callback. |
|
73
|
|
|
|
|
|
|
|
|
74
|
|
|
|
|
|
|
=head2 The Consumer Pattern |
|
75
|
|
|
|
|
|
|
|
|
76
|
|
|
|
|
|
|
The consumer pattern is where code wants to consume an 'atomic' resource |
|
77
|
|
|
|
|
|
|
piece by piece. The consuming code doesn't really want to worry much |
|
78
|
|
|
|
|
|
|
about how they got the piece, a task that should be handled by the framework. |
|
79
|
|
|
|
|
|
|
The consumer subclasses assume that the resource can be modeled as a |
|
80
|
|
|
|
|
|
|
queue (that there is some ordering principle by which they can be processed |
|
81
|
|
|
|
|
|
|
in a predictable sequence). The consume pattern in full glory is something |
|
82
|
|
|
|
|
|
|
very close to the following following pseudo code. The items marked with |
|
83
|
|
|
|
|
|
|
asterisks are where user callbacks may be invoked: |
|
84
|
|
|
|
|
|
|
|
|
85
|
|
|
|
|
|
|
DO |
|
86
|
|
|
|
|
|
|
RESET TO THE BEGINNING OF THE QUEUE |
|
87
|
|
|
|
|
|
|
WHILE QUEUE NOT EMPTY AND CAN *PROCEED* |
|
88
|
|
|
|
|
|
|
ACQUIRE NEXT ITEM TO PROCESS FROM QUEUE |
|
89
|
|
|
|
|
|
|
MARK AS 'WORKING' |
|
90
|
|
|
|
|
|
|
*PROCESS* ITEM |
|
91
|
|
|
|
|
|
|
IF PROCESSING FAILED |
|
92
|
|
|
|
|
|
|
MARK AS 'FAILED' |
|
93
|
|
|
|
|
|
|
OTHERWISE |
|
94
|
|
|
|
|
|
|
MARK AS 'PROCESSED' |
|
95
|
|
|
|
|
|
|
SWEEP UP ABANDONDED 'WORKING' ITEMS AND MARK THEM AS 'FAILED' |
|
96
|
|
|
|
|
|
|
UNTIL WE CANNOT *PROCEED* OR NOTHING WAS PROCESSED |
|
97
|
|
|
|
|
|
|
RELEASE ANY LOCKS STILL HELD |
|
98
|
|
|
|
|
|
|
|
|
99
|
|
|
|
|
|
|
This implies that each item potentially has four states: C, |
|
100
|
|
|
|
|
|
|
C, C and C. In a database these might be |
|
101
|
|
|
|
|
|
|
values in a field, in a drop directory scenario these would be different |
|
102
|
|
|
|
|
|
|
directories, but with all of them they would normally be supplied as |
|
103
|
|
|
|
|
|
|
values to the L subclass being created. |
|
104
|
|
|
|
|
|
|
|
|
105
|
|
|
|
|
|
|
=head2 Subclassing Data::Consumer |
|
106
|
|
|
|
|
|
|
|
|
107
|
|
|
|
|
|
|
L can be used with any resource type that can be modeled |
|
108
|
|
|
|
|
|
|
as a queue, supports some form of advisory locking mechanism, and |
|
109
|
|
|
|
|
|
|
provides a way to discriminate between at least the C and |
|
110
|
|
|
|
|
|
|
C state. |
|
111
|
|
|
|
|
|
|
|
|
112
|
|
|
|
|
|
|
The routines that must be defined for a new consumer type are C, |
|
113
|
|
|
|
|
|
|
C, C, C, and C<_mark_as()>, |
|
114
|
|
|
|
|
|
|
C<_do_callback()>. |
|
115
|
|
|
|
|
|
|
|
|
116
|
|
|
|
|
|
|
=over 4 |
|
117
|
|
|
|
|
|
|
|
|
118
|
|
|
|
|
|
|
=item new |
|
119
|
|
|
|
|
|
|
|
|
120
|
|
|
|
|
|
|
It is almost for sure that a subclass will need to override the default |
|
121
|
|
|
|
|
|
|
constructor. All L objects are blessed hashes, and in |
|
122
|
|
|
|
|
|
|
fact you should always call the parents classes constructor first with: |
|
123
|
|
|
|
|
|
|
|
|
124
|
|
|
|
|
|
|
my $self= $class->SUPER::new(); |
|
125
|
|
|
|
|
|
|
|
|
126
|
|
|
|
|
|
|
=item reset |
|
127
|
|
|
|
|
|
|
|
|
128
|
|
|
|
|
|
|
This routine is used to reset the objects internal state so the next call to acquire |
|
129
|
|
|
|
|
|
|
will return the first available item in the queue. |
|
130
|
|
|
|
|
|
|
|
|
131
|
|
|
|
|
|
|
=item acquire |
|
132
|
|
|
|
|
|
|
|
|
133
|
|
|
|
|
|
|
This routine is to find and in some way lock the next item in the queue. It should ensure |
|
134
|
|
|
|
|
|
|
that it call is_ignored() on each item to verify the item has not been requested to be |
|
135
|
|
|
|
|
|
|
ignored. |
|
136
|
|
|
|
|
|
|
|
|
137
|
|
|
|
|
|
|
=item release |
|
138
|
|
|
|
|
|
|
|
|
139
|
|
|
|
|
|
|
This routine is to release any held locks in the object. |
|
140
|
|
|
|
|
|
|
|
|
141
|
|
|
|
|
|
|
=item _mark_as |
|
142
|
|
|
|
|
|
|
|
|
143
|
|
|
|
|
|
|
This routine is called to "mark" an item as a particular state. It |
|
144
|
|
|
|
|
|
|
should be able to handle user supplied values. For instance |
|
145
|
|
|
|
|
|
|
L implements this as an update statement that |
|
146
|
|
|
|
|
|
|
maps user supplied values to the consumer state names. |
|
147
|
|
|
|
|
|
|
|
|
148
|
|
|
|
|
|
|
Possible states are: C, C, C, |
|
149
|
|
|
|
|
|
|
C. |
|
150
|
|
|
|
|
|
|
|
|
151
|
|
|
|
|
|
|
=item _do_callback |
|
152
|
|
|
|
|
|
|
|
|
153
|
|
|
|
|
|
|
This routine is used to call the user supplied callback with the correct |
|
154
|
|
|
|
|
|
|
arguments. What arguments are appropriate for the callback are context |
|
155
|
|
|
|
|
|
|
dependent on the type of class. For instance in L |
|
156
|
|
|
|
|
|
|
calls the callback with the arguments C<($consumer, $id, $dbh)> whereas |
|
157
|
|
|
|
|
|
|
L calls the callback with the arguments |
|
158
|
|
|
|
|
|
|
C<($consumer, $filespec, $filehandle, $filename)>. The point is that the |
|
159
|
|
|
|
|
|
|
end user should be passed the arguments that make sense, not necessarily |
|
160
|
|
|
|
|
|
|
the same thing for each consumer type. |
|
161
|
|
|
|
|
|
|
|
|
162
|
|
|
|
|
|
|
=back |
|
163
|
|
|
|
|
|
|
|
|
164
|
|
|
|
|
|
|
Every well-behaved L subclass should include the |
|
165
|
|
|
|
|
|
|
functional equivalent of the following code in its .pm file: |
|
166
|
|
|
|
|
|
|
|
|
167
|
|
|
|
|
|
|
use base 'Data::Consumer'; |
|
168
|
|
|
|
|
|
|
__PACKAGE__->register(); |
|
169
|
|
|
|
|
|
|
|
|
170
|
|
|
|
|
|
|
This will ensure that it can be properly loaded by |
|
171
|
|
|
|
|
|
|
C<< Data::Consumer->new(type=>$shortname) >>. |
|
172
|
|
|
|
|
|
|
|
|
173
|
|
|
|
|
|
|
It is also normal for a L subclass to provide special |
|
174
|
|
|
|
|
|
|
methods as needed. For instance C<< Data::Consumer::Dir->fh() >> and |
|
175
|
|
|
|
|
|
|
C<< Data::Consumer::MySQL->dbh() >>. |
|
176
|
|
|
|
|
|
|
|
|
177
|
|
|
|
|
|
|
|
|
178
|
|
|
|
|
|
|
|
|
179
|
|
|
|
|
|
|
=head1 METHODS |
|
180
|
|
|
|
|
|
|
|
|
181
|
|
|
|
|
|
|
=head2 CLASS->new(%opts) |
|
182
|
|
|
|
|
|
|
|
|
183
|
|
|
|
|
|
|
Constructor. Normally L's constructor is not called |
|
184
|
|
|
|
|
|
|
directly, instead the constructor of a subclass is used. However to |
|
185
|
|
|
|
|
|
|
make it easier to have a data driven load process L |
|
186
|
|
|
|
|
|
|
accepts the C argument which should specify the the short name of |
|
187
|
|
|
|
|
|
|
the subclass (the part after C) or the full name of |
|
188
|
|
|
|
|
|
|
the subclass. |
|
189
|
|
|
|
|
|
|
|
|
190
|
|
|
|
|
|
|
Thus |
|
191
|
|
|
|
|
|
|
|
|
192
|
|
|
|
|
|
|
Data::Consumer->new(type=>'MySQL',%args); |
|
193
|
|
|
|
|
|
|
|
|
194
|
|
|
|
|
|
|
is exactly equivalent to calling |
|
195
|
|
|
|
|
|
|
|
|
196
|
|
|
|
|
|
|
Data::Consumer::MySQL->new(%args); |
|
197
|
|
|
|
|
|
|
|
|
198
|
|
|
|
|
|
|
except that the former will automatically require or use the appropriate module |
|
199
|
|
|
|
|
|
|
and the latter necessitates that you do so yourself. |
|
200
|
|
|
|
|
|
|
|
|
201
|
|
|
|
|
|
|
Every L subclass constructor supports the following |
|
202
|
|
|
|
|
|
|
arguments on top of any that are subclass specific. Additionally some |
|
203
|
|
|
|
|
|
|
arguments are universally used, but have different meaning depending on |
|
204
|
|
|
|
|
|
|
the subclass. |
|
205
|
|
|
|
|
|
|
|
|
206
|
|
|
|
|
|
|
=over 4 |
|
207
|
|
|
|
|
|
|
|
|
208
|
|
|
|
|
|
|
=item unprocessed |
|
209
|
|
|
|
|
|
|
|
|
210
|
|
|
|
|
|
|
How to tell if the item is unprocessed. |
|
211
|
|
|
|
|
|
|
|
|
212
|
|
|
|
|
|
|
How this argument is interpreted depends on the L |
|
213
|
|
|
|
|
|
|
subclass involved. |
|
214
|
|
|
|
|
|
|
|
|
215
|
|
|
|
|
|
|
=item working |
|
216
|
|
|
|
|
|
|
|
|
217
|
|
|
|
|
|
|
How to tell if the item is currently being worked on. |
|
218
|
|
|
|
|
|
|
|
|
219
|
|
|
|
|
|
|
How this argument is interpreted depends on the L |
|
220
|
|
|
|
|
|
|
subclass involved. |
|
221
|
|
|
|
|
|
|
|
|
222
|
|
|
|
|
|
|
=item processed |
|
223
|
|
|
|
|
|
|
|
|
224
|
|
|
|
|
|
|
How to tell if the item has already been worked on. |
|
225
|
|
|
|
|
|
|
|
|
226
|
|
|
|
|
|
|
How this argument is interpreted depends on the L |
|
227
|
|
|
|
|
|
|
subclass involved. |
|
228
|
|
|
|
|
|
|
|
|
229
|
|
|
|
|
|
|
=item failed |
|
230
|
|
|
|
|
|
|
|
|
231
|
|
|
|
|
|
|
How to tell if processing failed while handling the item. |
|
232
|
|
|
|
|
|
|
|
|
233
|
|
|
|
|
|
|
How this argument is interpreted depends on the L |
|
234
|
|
|
|
|
|
|
subclass involved. |
|
235
|
|
|
|
|
|
|
|
|
236
|
|
|
|
|
|
|
=item max_passes => $num_or_undef |
|
237
|
|
|
|
|
|
|
|
|
238
|
|
|
|
|
|
|
Normally C will loop through the data set until it is |
|
239
|
|
|
|
|
|
|
exhausted. By setting this parameter you can control the maximum number |
|
240
|
|
|
|
|
|
|
of iterations, for instance setting it to C<1> will result in a single |
|
241
|
|
|
|
|
|
|
pass through the data per invocation. If C<0> (or any other false value) |
|
242
|
|
|
|
|
|
|
is treated as meaning "loop until exhausted". |
|
243
|
|
|
|
|
|
|
|
|
244
|
|
|
|
|
|
|
=item max_processed => $num_or_undef |
|
245
|
|
|
|
|
|
|
|
|
246
|
|
|
|
|
|
|
Maximum number of items to process per invocation. |
|
247
|
|
|
|
|
|
|
|
|
248
|
|
|
|
|
|
|
If set to a false value there is no limit. |
|
249
|
|
|
|
|
|
|
|
|
250
|
|
|
|
|
|
|
=item max_failed => $num_or_undef |
|
251
|
|
|
|
|
|
|
|
|
252
|
|
|
|
|
|
|
Maximum number of failed process attempts that may occur before consume will stop. |
|
253
|
|
|
|
|
|
|
If set to a false value there is no limit. Setting this to 1 will cause processing |
|
254
|
|
|
|
|
|
|
to stop after the first failure. |
|
255
|
|
|
|
|
|
|
|
|
256
|
|
|
|
|
|
|
=item max_elapsed => $seconds_or_undef |
|
257
|
|
|
|
|
|
|
|
|
258
|
|
|
|
|
|
|
Maximum amount of time that may have elapsed when starting a new |
|
259
|
|
|
|
|
|
|
process. If more than this value has elapsed then no further processing |
|
260
|
|
|
|
|
|
|
occurs. If C<0> (or any false value) then there is no time limit. |
|
261
|
|
|
|
|
|
|
|
|
262
|
|
|
|
|
|
|
=item proceed => $code_ref |
|
263
|
|
|
|
|
|
|
|
|
264
|
|
|
|
|
|
|
This is a callback that may be used to control the looping process in |
|
265
|
|
|
|
|
|
|
consume via the C method. See the documentation of |
|
266
|
|
|
|
|
|
|
C and C |
|
267
|
|
|
|
|
|
|
|
|
268
|
|
|
|
|
|
|
=item sweep => $bool |
|
269
|
|
|
|
|
|
|
|
|
270
|
|
|
|
|
|
|
*** NOTE CURRENTLY THIS OPTION IS DISABLED *** |
|
271
|
|
|
|
|
|
|
|
|
272
|
|
|
|
|
|
|
If this parameter is true, and there are four modes defined |
|
273
|
|
|
|
|
|
|
(C, C, C, C) then consume will |
|
274
|
|
|
|
|
|
|
perform a "sweep up" after every pass, which is responsible for moving |
|
275
|
|
|
|
|
|
|
"abandonded" files from the working directory (such as from a previous |
|
276
|
|
|
|
|
|
|
process that segfaulted during processing). Generally this should |
|
277
|
|
|
|
|
|
|
not be necessary. |
|
278
|
|
|
|
|
|
|
|
|
279
|
|
|
|
|
|
|
=back |
|
280
|
|
|
|
|
|
|
|
|
281
|
|
|
|
|
|
|
|
|
282
|
|
|
|
|
|
|
=head2 CLASS->register(@alias) |
|
283
|
|
|
|
|
|
|
|
|
284
|
|
|
|
|
|
|
Used by subclasses to register themselves as a L |
|
285
|
|
|
|
|
|
|
subclass and register any additional aliases that the class may be |
|
286
|
|
|
|
|
|
|
identified as. |
|
287
|
|
|
|
|
|
|
|
|
288
|
|
|
|
|
|
|
Will throw an exception if any of the aliases are already associated to |
|
289
|
|
|
|
|
|
|
a different class. |
|
290
|
|
|
|
|
|
|
|
|
291
|
|
|
|
|
|
|
When called on a subclass in list context returns a list of the |
|
292
|
|
|
|
|
|
|
subclasses registered aliases, |
|
293
|
|
|
|
|
|
|
|
|
294
|
|
|
|
|
|
|
If called on L in list context returns a list of all |
|
295
|
|
|
|
|
|
|
alias class mappings. |
|
296
|
|
|
|
|
|
|
|
|
297
|
|
|
|
|
|
|
=cut |
|
298
|
|
|
|
|
|
|
|
|
299
|
|
|
|
|
|
|
|
|
300
|
|
|
|
|
|
|
|
|
301
|
|
|
|
|
|
|
=head2 $class_or_object->debug_warn_hook() |
|
302
|
|
|
|
|
|
|
|
|
303
|
|
|
|
|
|
|
Specify a callback to use to capture diagnostics data produced |
|
304
|
|
|
|
|
|
|
by a Data::Consumer object. |
|
305
|
|
|
|
|
|
|
|
|
306
|
|
|
|
|
|
|
If called as a class method, sets the default object for all |
|
307
|
|
|
|
|
|
|
Data::Consumer objects that have not explicitly set a hook. |
|
308
|
|
|
|
|
|
|
|
|
309
|
|
|
|
|
|
|
If called as an object method, sets the hook to use for that |
|
310
|
|
|
|
|
|
|
object alone. |
|
311
|
|
|
|
|
|
|
|
|
312
|
|
|
|
|
|
|
Returns the current effective hook. Defaults to use |
|
313
|
|
|
|
|
|
|
the C method for the object. Thus |
|
314
|
|
|
|
|
|
|
it can be overridden by a subclass if necessary. |
|
315
|
|
|
|
|
|
|
|
|
316
|
|
|
|
|
|
|
The hook will be called with the arguments |
|
317
|
|
|
|
|
|
|
|
|
318
|
|
|
|
|
|
|
($consumer,$level,@lines) |
|
319
|
|
|
|
|
|
|
|
|
320
|
|
|
|
|
|
|
and is not expected to return anything. |
|
321
|
|
|
|
|
|
|
|
|
322
|
|
|
|
|
|
|
=cut |
|
323
|
|
|
|
|
|
|
|
|
324
|
|
|
|
|
|
|
my $debug_warn_hook; |
|
325
|
|
|
|
|
|
|
sub debug_warn_hook { |
|
326
|
96
|
|
|
96
|
1
|
176
|
my $self= shift; |
|
327
|
96
|
50
|
|
|
|
503
|
if (@_) { |
|
328
|
0
|
0
|
|
|
|
0
|
if (ref $self) { |
|
329
|
0
|
|
|
|
|
0
|
$self->{debug_warn_hook}= shift; |
|
330
|
|
|
|
|
|
|
} else { |
|
331
|
0
|
|
|
|
|
0
|
$debug_warn_hook= shift; |
|
332
|
|
|
|
|
|
|
} |
|
333
|
|
|
|
|
|
|
} |
|
334
|
96
|
50
|
66
|
|
|
678
|
if (ref $self and defined $self->{debug_warn_hook}) { |
|
335
|
0
|
|
|
|
|
0
|
return $self->{debug_warn_hook}; |
|
336
|
|
|
|
|
|
|
} |
|
337
|
96
|
|
33
|
|
|
1007
|
return $debug_warn_hook || $self->can('default_debug_warn'); |
|
338
|
|
|
|
|
|
|
} |
|
339
|
|
|
|
|
|
|
|
|
340
|
|
|
|
|
|
|
=head2 $class_or_object->default_debug_warn($level,$debug); |
|
341
|
|
|
|
|
|
|
|
|
342
|
|
|
|
|
|
|
Use warn to output diagnostics. Message includes the process id |
|
343
|
|
|
|
|
|
|
and the class name. |
|
344
|
|
|
|
|
|
|
|
|
345
|
|
|
|
|
|
|
=cut |
|
346
|
|
|
|
|
|
|
|
|
347
|
|
|
|
|
|
|
sub default_debug_warn { |
|
348
|
96
|
|
|
96
|
1
|
251
|
my $self= shift; |
|
349
|
96
|
|
|
|
|
299
|
my $level= shift; |
|
350
|
96
|
50
|
|
|
|
520
|
cluck($level) if $level=~/\D/; |
|
351
|
96
|
|
|
|
|
377
|
my $debug_level= $self->debug_level; |
|
352
|
96
|
50
|
|
|
|
380
|
if ( $debug_level > $level ) { |
|
353
|
0
|
|
0
|
|
|
0
|
warn ref($self) || $self, "\t$$\t>>> $_\n" for @_; |
|
354
|
|
|
|
|
|
|
} |
|
355
|
|
|
|
|
|
|
} |
|
356
|
|
|
|
|
|
|
|
|
357
|
|
|
|
|
|
|
=head2 $class_or_object->debug_level($level,@debug_lines) |
|
358
|
|
|
|
|
|
|
|
|
359
|
|
|
|
|
|
|
Set the minimum debug level. |
|
360
|
|
|
|
|
|
|
|
|
361
|
|
|
|
|
|
|
When called as an object method sets the value of that object |
|
362
|
|
|
|
|
|
|
alone. undef is distinct from 0 in that undef results in |
|
363
|
|
|
|
|
|
|
the global debug level being used for that object. |
|
364
|
|
|
|
|
|
|
|
|
365
|
|
|
|
|
|
|
When called as a class method sets the value for all objects |
|
366
|
|
|
|
|
|
|
which do not have a defined debug level. |
|
367
|
|
|
|
|
|
|
|
|
368
|
|
|
|
|
|
|
Returns the current effective debug level for the object or |
|
369
|
|
|
|
|
|
|
class. |
|
370
|
|
|
|
|
|
|
|
|
371
|
|
|
|
|
|
|
=cut |
|
372
|
|
|
|
|
|
|
|
|
373
|
|
|
|
|
|
|
|
|
374
|
|
|
|
|
|
|
sub debug_level { |
|
375
|
96
|
|
|
96
|
1
|
189
|
my $self= shift; |
|
376
|
96
|
50
|
|
|
|
278
|
if (@_) { |
|
377
|
0
|
0
|
|
|
|
0
|
if (ref $self) { |
|
378
|
0
|
|
|
|
|
0
|
$self->{debug_level}= shift; |
|
379
|
|
|
|
|
|
|
} else { |
|
380
|
0
|
|
|
|
|
0
|
$Debug= shift; |
|
381
|
|
|
|
|
|
|
} |
|
382
|
|
|
|
|
|
|
} |
|
383
|
96
|
50
|
66
|
|
|
518
|
if (ref $self and defined $self->{debug_level}) { |
|
384
|
0
|
|
|
|
|
0
|
return $self->{debug_level}; |
|
385
|
|
|
|
|
|
|
} |
|
386
|
96
|
|
50
|
|
|
476
|
return $Debug || 0; |
|
387
|
|
|
|
|
|
|
|
|
388
|
|
|
|
|
|
|
} |
|
389
|
|
|
|
|
|
|
|
|
390
|
|
|
|
|
|
|
=head2 $class_or_object->debug_warn($level,@debug_lines) |
|
391
|
|
|
|
|
|
|
|
|
392
|
|
|
|
|
|
|
If the current debugging level is above C<$level> then call |
|
393
|
|
|
|
|
|
|
the current debug_warn_hook() to output a set of diagnostic |
|
394
|
|
|
|
|
|
|
messages. |
|
395
|
|
|
|
|
|
|
|
|
396
|
|
|
|
|
|
|
=cut |
|
397
|
|
|
|
|
|
|
|
|
398
|
|
|
|
|
|
|
|
|
399
|
|
|
|
|
|
|
sub debug_warn { |
|
400
|
96
|
|
|
96
|
1
|
271
|
my $self=shift; |
|
401
|
96
|
|
|
|
|
255
|
my $level=shift; |
|
402
|
96
|
|
|
|
|
323
|
my $hook=$self->debug_warn_hook; |
|
403
|
96
|
100
|
50
|
|
|
587
|
my $pfx= ref $self ? $self->{debug_pfx} || '' : ''; |
|
404
|
96
|
|
|
|
|
342
|
$hook->($self,$level,map { $pfx.$_ } @_); |
|
|
96
|
|
|
|
|
532
|
|
|
405
|
|
|
|
|
|
|
} |
|
406
|
|
|
|
|
|
|
|
|
407
|
0
|
|
|
|
|
0
|
BEGIN { |
|
408
|
11
|
|
|
11
|
|
48
|
my %alias2class; |
|
409
|
|
|
|
|
|
|
my %class2alias; |
|
410
|
11
|
50
|
33
|
|
|
2172
|
$Debug and $Debug >= 5 and warn "\n"; |
|
411
|
|
|
|
|
|
|
|
|
412
|
|
|
|
|
|
|
sub register { |
|
413
|
10
|
|
|
10
|
1
|
39
|
my $class= shift; |
|
414
|
|
|
|
|
|
|
|
|
415
|
10
|
50
|
|
|
|
37
|
ref $class |
|
416
|
|
|
|
|
|
|
and confess "register() is a class method and cannot be called on an object\n"; |
|
417
|
10
|
|
|
|
|
21
|
my $pack= __PACKAGE__; |
|
418
|
|
|
|
|
|
|
|
|
419
|
10
|
50
|
|
|
|
39
|
if ( $class eq $pack ) { |
|
420
|
0
|
0
|
|
|
|
0
|
return wantarray ? %alias2class : 0 + keys %alias2class; |
|
421
|
|
|
|
|
|
|
} |
|
422
|
|
|
|
|
|
|
|
|
423
|
10
|
|
|
|
|
178
|
( my $std_name= $class ) =~ s/^\Q$pack\E:://; |
|
424
|
10
|
|
|
|
|
33
|
$std_name =~ s/::/-/g; |
|
425
|
|
|
|
|
|
|
|
|
426
|
10
|
|
|
|
|
15
|
my @failed; |
|
427
|
10
|
|
|
|
|
26
|
for my $name ( $class, $std_name, @_ ) { |
|
428
|
20
|
50
|
33
|
|
|
64
|
if ( $alias2class{$name} and $alias2class{$name} ne $class ) { |
|
429
|
0
|
|
|
|
|
0
|
push @failed, $name; |
|
430
|
0
|
|
|
|
|
0
|
next; |
|
431
|
|
|
|
|
|
|
} |
|
432
|
20
|
|
|
|
|
88
|
__PACKAGE__->debug_warn( 5, "registered '$name' as an alias of '$class'" ); |
|
433
|
20
|
|
|
|
|
45
|
$alias2class{$name}= $class; |
|
434
|
20
|
|
|
|
|
51
|
$class2alias{$class}{$name}= $class; |
|
435
|
|
|
|
|
|
|
} |
|
436
|
|
|
|
|
|
|
@failed |
|
437
|
|
|
|
|
|
|
and confess "Failed to register aliases for '$class' as they are already used\n", |
|
438
|
10
|
50
|
|
|
|
29
|
join( "\n", map { "\t'$_' is already assigned to '$alias2class{$_}'" } @failed ), |
|
|
0
|
|
|
|
|
0
|
|
|
439
|
|
|
|
|
|
|
"\n"; |
|
440
|
10
|
50
|
|
|
|
33
|
return wantarray ? %{ $class2alias{$class} } : 0 + keys %{ $class2alias{$class} }; |
|
|
0
|
|
|
|
|
0
|
|
|
|
10
|
|
|
|
|
8229
|
|
|
441
|
|
|
|
|
|
|
} |
|
442
|
|
|
|
|
|
|
|
|
443
|
|
|
|
|
|
|
sub new { |
|
444
|
5
|
|
|
5
|
1
|
47
|
my ( $class, %opts )= @_; |
|
445
|
5
|
50
|
|
|
|
68
|
ref $class |
|
446
|
|
|
|
|
|
|
and confess "new() is a class method and cannot be called on an object\n"; |
|
447
|
|
|
|
|
|
|
|
|
448
|
5
|
50
|
|
|
|
45
|
if ( $class eq __PACKAGE__ ) { |
|
449
|
|
|
|
|
|
|
my $type= $opts{type} |
|
450
|
0
|
0
|
|
|
|
0
|
or confess "'type' is a mandatory named parameter for $class->new()\n"; |
|
451
|
0
|
|
|
|
|
0
|
my $full = $type; |
|
452
|
0
|
0
|
|
|
|
0
|
if (!$alias2class{$full}) { |
|
453
|
0
|
0
|
|
|
|
0
|
if ($full!~/::/) { |
|
454
|
0
|
|
|
|
|
0
|
$full=~s/-/::/g; |
|
455
|
0
|
|
|
|
|
0
|
$full=join '::',$class,$full; |
|
456
|
|
|
|
|
|
|
} |
|
457
|
0
|
0
|
|
|
|
0
|
eval "require $full; 1" |
|
458
|
|
|
|
|
|
|
or confess "'type' parameter '$type' could not be loaded properly: $@\n"; |
|
459
|
|
|
|
|
|
|
} |
|
460
|
0
|
0
|
|
|
|
0
|
$class= $alias2class{$full} |
|
461
|
|
|
|
|
|
|
or confess "'type' parameter '$type' mapped to '$full' which does not seem to exist\n"; |
|
462
|
|
|
|
|
|
|
} |
|
463
|
5
|
|
|
|
|
70
|
my $object= bless {}, $class; |
|
464
|
5
|
|
|
|
|
192
|
$class->debug_warn( 5, "created new object '$object'" ); |
|
465
|
5
|
|
|
|
|
23
|
return $object; |
|
466
|
|
|
|
|
|
|
} |
|
467
|
|
|
|
|
|
|
} |
|
468
|
|
|
|
|
|
|
|
|
469
|
|
|
|
|
|
|
=head2 $object->last_id() |
|
470
|
|
|
|
|
|
|
|
|
471
|
|
|
|
|
|
|
Returns the identifier for the last item acquired. |
|
472
|
|
|
|
|
|
|
|
|
473
|
|
|
|
|
|
|
Returns undef if acquire has never been called or if the last |
|
474
|
|
|
|
|
|
|
attempt to acquire data failed because none was available. |
|
475
|
|
|
|
|
|
|
|
|
476
|
|
|
|
|
|
|
=cut |
|
477
|
|
|
|
|
|
|
|
|
478
|
|
|
|
|
|
|
sub last_id { |
|
479
|
150
|
|
|
150
|
1
|
325
|
my $self= shift; |
|
480
|
150
|
|
|
|
|
637
|
return $self->{last_id}; |
|
481
|
|
|
|
|
|
|
} |
|
482
|
|
|
|
|
|
|
|
|
483
|
|
|
|
|
|
|
# Until i figure out to make gedit handle begin/end directives this has to |
|
484
|
|
|
|
|
|
|
# stay commented out |
|
485
|
|
|
|
|
|
|
#=begin dev |
|
486
|
|
|
|
|
|
|
# |
|
487
|
|
|
|
|
|
|
#=head2 $object->_mark_as($type,$id) |
|
488
|
|
|
|
|
|
|
# |
|
489
|
|
|
|
|
|
|
#** Must be overriden ** |
|
490
|
|
|
|
|
|
|
# |
|
491
|
|
|
|
|
|
|
#Mark an item as a particular type if the object defines that type. |
|
492
|
|
|
|
|
|
|
# |
|
493
|
|
|
|
|
|
|
#This is wrapped by mark_as() for error checking, so you are guaranteed |
|
494
|
|
|
|
|
|
|
#that $type will be one of |
|
495
|
|
|
|
|
|
|
# |
|
496
|
|
|
|
|
|
|
# 'unprocessed', 'working', 'processed', 'failed' |
|
497
|
|
|
|
|
|
|
# |
|
498
|
|
|
|
|
|
|
#and that $object->{$type} will be true value, and that $id will be from |
|
499
|
|
|
|
|
|
|
#the currently acquired item. |
|
500
|
|
|
|
|
|
|
# |
|
501
|
|
|
|
|
|
|
#=end dev |
|
502
|
|
|
|
|
|
|
|
|
503
|
|
|
|
|
|
|
=head2 $object->mark_as($type) |
|
504
|
|
|
|
|
|
|
|
|
505
|
|
|
|
|
|
|
Mark an item as a particular type if the object defines that type. |
|
506
|
|
|
|
|
|
|
|
|
507
|
|
|
|
|
|
|
Allowed types are C, C, C, C |
|
508
|
|
|
|
|
|
|
|
|
509
|
|
|
|
|
|
|
=cut |
|
510
|
|
|
|
|
|
|
|
|
511
|
0
|
|
|
0
|
|
0
|
sub _mark_as { confess "must be overriden" } |
|
512
|
|
|
|
|
|
|
|
|
513
|
0
|
|
|
|
|
0
|
BEGIN { |
|
514
|
11
|
|
|
11
|
|
45
|
my ( %valid, @valid ); |
|
515
|
11
|
|
|
|
|
29
|
@valid= qw ( unprocessed working processed failed ); |
|
516
|
11
|
|
|
|
|
9326
|
@valid{@valid}= ( 1 .. @valid ); |
|
517
|
|
|
|
|
|
|
|
|
518
|
|
|
|
|
|
|
sub mark_as { |
|
519
|
100
|
|
|
100
|
1
|
375
|
my $self= shift @_; |
|
520
|
100
|
|
|
|
|
233
|
my $key= shift @_; |
|
521
|
|
|
|
|
|
|
|
|
522
|
|
|
|
|
|
|
$valid{$key} |
|
523
|
|
|
|
|
|
|
or confess "Unknown type in mark_as(), valid options are ", |
|
524
|
100
|
50
|
|
|
|
400
|
join( ", ", map { "'$_'" } @valid ), |
|
|
0
|
|
|
|
|
0
|
|
|
525
|
|
|
|
|
|
|
"\n"; |
|
526
|
|
|
|
|
|
|
|
|
527
|
100
|
50
|
|
|
|
494
|
my $id= @_ ? shift @_ : $self->last_id; |
|
528
|
100
|
50
|
|
|
|
370
|
defined $id |
|
529
|
|
|
|
|
|
|
or confess "Nothing acquired to be marked as '$key' in mark_as.\n"; |
|
530
|
|
|
|
|
|
|
|
|
531
|
100
|
50
|
|
|
|
388
|
return unless defined $self->{$key}; |
|
532
|
100
|
|
|
|
|
463
|
return $self->_mark_as( $key, $id ); |
|
533
|
|
|
|
|
|
|
} |
|
534
|
|
|
|
|
|
|
} |
|
535
|
|
|
|
|
|
|
|
|
536
|
|
|
|
|
|
|
=head2 $object->process($callback) |
|
537
|
|
|
|
|
|
|
|
|
538
|
|
|
|
|
|
|
Marks the current item as C and processes it using the |
|
539
|
|
|
|
|
|
|
C<$callback>. If the C<$callback> dies then the item is marked as |
|
540
|
|
|
|
|
|
|
C, otherwise the item is marked as C once the |
|
541
|
|
|
|
|
|
|
C<$callback> returns. The return value of the C<$callback> is ignored. |
|
542
|
|
|
|
|
|
|
|
|
543
|
|
|
|
|
|
|
C<$callback> will be called with at least two arguments, the first being |
|
544
|
|
|
|
|
|
|
the $consumer object itself, and the second being an identifier for the |
|
545
|
|
|
|
|
|
|
current record. Normally additional, likely to be useful, arguments are |
|
546
|
|
|
|
|
|
|
provided as well, on a per subclass basis. For example |
|
547
|
|
|
|
|
|
|
L will pass in the consumer object, the id of the to |
|
548
|
|
|
|
|
|
|
be processed record, and a copy of the consumers database handle as well for |
|
549
|
|
|
|
|
|
|
convenience. On the other hand L will pass in the |
|
550
|
|
|
|
|
|
|
consumer object, followed by a filespecification for the file to be |
|
551
|
|
|
|
|
|
|
processed, an open filehandle to the file, and the filename itself (with |
|
552
|
|
|
|
|
|
|
no path). |
|
553
|
|
|
|
|
|
|
|
|
554
|
|
|
|
|
|
|
The callback may call the methods 'leave', 'ignore', 'fail', and 'halt' on |
|
555
|
|
|
|
|
|
|
the consumer object before returning, typically by doing something like |
|
556
|
|
|
|
|
|
|
|
|
557
|
|
|
|
|
|
|
return $consumer->ignore; |
|
558
|
|
|
|
|
|
|
|
|
559
|
|
|
|
|
|
|
this allows the callback to send specific signals to consume, specifically |
|
560
|
|
|
|
|
|
|
|
|
561
|
|
|
|
|
|
|
leave : return the item to the unprocessed state after the callback returns. |
|
562
|
|
|
|
|
|
|
ignore : return the item to the unprocessed state after the callback returns |
|
563
|
|
|
|
|
|
|
and never attempt to process it again with this consumer object. |
|
564
|
|
|
|
|
|
|
fail : same result as dieing in a callback, except without throwing an exception |
|
565
|
|
|
|
|
|
|
in the situation where there might be $SIG{__DIE__} hooks to worry about. |
|
566
|
|
|
|
|
|
|
halt : stop the consume() process after this has been executed |
|
567
|
|
|
|
|
|
|
|
|
568
|
|
|
|
|
|
|
For further details always consult the relevant subclasses documentation for |
|
569
|
|
|
|
|
|
|
C |
|
570
|
|
|
|
|
|
|
|
|
571
|
|
|
|
|
|
|
=cut |
|
572
|
|
|
|
|
|
|
|
|
573
|
|
|
|
|
|
|
sub process { |
|
574
|
50
|
|
|
50
|
1
|
119
|
my $self= shift; |
|
575
|
50
|
|
|
|
|
105
|
my $callback= shift; |
|
576
|
50
|
|
|
|
|
117
|
delete $self->{fail}; |
|
577
|
50
|
|
|
|
|
168
|
my $id= $self->last_id; |
|
578
|
50
|
50
|
|
|
|
172
|
defined $id |
|
579
|
|
|
|
|
|
|
or $self->error("Undefined last_id. Nothing acquired yet?"); |
|
580
|
50
|
|
|
|
|
248
|
$self->mark_as('working'); |
|
581
|
50
|
|
|
|
|
124
|
local $Cmd; |
|
582
|
50
|
|
|
|
|
105
|
delete $self->{defer_leave}; |
|
583
|
50
|
|
|
|
|
207
|
my $error= $self->_do_callback($callback); |
|
584
|
50
|
|
33
|
|
|
676
|
$error ||= $self->{fail}; |
|
585
|
50
|
50
|
|
|
|
201
|
if ( $error ) { |
|
586
|
0
|
|
|
|
|
0
|
$self->mark_as('failed'); |
|
587
|
0
|
|
|
|
|
0
|
$self->error($error); |
|
588
|
|
|
|
|
|
|
} else { |
|
589
|
50
|
50
|
|
|
|
311
|
if ($self->{defer_leave}) { |
|
590
|
0
|
|
|
|
|
0
|
$self->mark_as('unprocessed'); |
|
591
|
|
|
|
|
|
|
} else { |
|
592
|
50
|
|
|
|
|
389
|
$self->mark_as('processed'); |
|
593
|
|
|
|
|
|
|
} |
|
594
|
|
|
|
|
|
|
} |
|
595
|
50
|
|
|
|
|
257
|
return 1; |
|
596
|
|
|
|
|
|
|
} |
|
597
|
|
|
|
|
|
|
|
|
598
|
|
|
|
|
|
|
|
|
599
|
|
|
|
|
|
|
=head2 $consumer->leave() |
|
600
|
|
|
|
|
|
|
|
|
601
|
|
|
|
|
|
|
Sometimes its useful to defer processing. This method when called |
|
602
|
|
|
|
|
|
|
from within a consume/process callback will result in the |
|
603
|
|
|
|
|
|
|
item being marked as 'unprocessed' after the callback returns |
|
604
|
|
|
|
|
|
|
(so long as it does not die). |
|
605
|
|
|
|
|
|
|
|
|
606
|
|
|
|
|
|
|
Typically this is invoked as |
|
607
|
|
|
|
|
|
|
|
|
608
|
|
|
|
|
|
|
return $consumer->leave; |
|
609
|
|
|
|
|
|
|
|
|
610
|
|
|
|
|
|
|
from withing a consume/process callback. |
|
611
|
|
|
|
|
|
|
|
|
612
|
|
|
|
|
|
|
Returns $consumer. Will die if not 'unprocessed' state is defined. |
|
613
|
|
|
|
|
|
|
|
|
614
|
|
|
|
|
|
|
=cut |
|
615
|
|
|
|
|
|
|
|
|
616
|
|
|
|
|
|
|
sub leave { |
|
617
|
0
|
|
|
0
|
1
|
0
|
my $self= shift; |
|
618
|
0
|
0
|
|
|
|
0
|
confess("Can't leave as 'unprocessed' is undefined!") if not defined $self->{unprocessed}; |
|
619
|
0
|
|
|
|
|
0
|
$self->{defer_leave}++; |
|
620
|
0
|
|
|
|
|
0
|
return $self; |
|
621
|
|
|
|
|
|
|
} |
|
622
|
|
|
|
|
|
|
|
|
623
|
|
|
|
|
|
|
=head2 $consumer->ignore(@list) |
|
624
|
|
|
|
|
|
|
|
|
625
|
|
|
|
|
|
|
This can used to cause acquire to ignore each item in @list. |
|
626
|
|
|
|
|
|
|
|
|
627
|
|
|
|
|
|
|
If @list is empty then it is assumed it is being called from |
|
628
|
|
|
|
|
|
|
within consume/process and marks the currently acquired item |
|
629
|
|
|
|
|
|
|
as ignored and calls C<< $consumer->leave() >>. |
|
630
|
|
|
|
|
|
|
|
|
631
|
|
|
|
|
|
|
Returns $consumer. Will die if no 'unprocessed' state is defined. |
|
632
|
|
|
|
|
|
|
|
|
633
|
|
|
|
|
|
|
=cut |
|
634
|
|
|
|
|
|
|
|
|
635
|
|
|
|
|
|
|
|
|
636
|
|
|
|
|
|
|
sub ignore { |
|
637
|
0
|
|
|
0
|
1
|
0
|
my $self= shift; |
|
638
|
0
|
0
|
|
|
|
0
|
if (@_) { |
|
639
|
0
|
|
|
|
|
0
|
for my $id (@_) { |
|
640
|
0
|
|
|
|
|
0
|
$self->{ignore}{$id}++; |
|
641
|
|
|
|
|
|
|
} |
|
642
|
|
|
|
|
|
|
} else { |
|
643
|
0
|
|
|
|
|
0
|
my $id= $self->last_id; |
|
644
|
0
|
|
|
|
|
0
|
$self->{ignore}{$id}++; |
|
645
|
0
|
|
|
|
|
0
|
$self->leave; |
|
646
|
|
|
|
|
|
|
} |
|
647
|
0
|
|
|
|
|
0
|
return $self; |
|
648
|
|
|
|
|
|
|
} |
|
649
|
|
|
|
|
|
|
|
|
650
|
|
|
|
|
|
|
=head2 $consumer->fail($message) |
|
651
|
|
|
|
|
|
|
|
|
652
|
|
|
|
|
|
|
Same as doing C from within a consume/process callback except |
|
653
|
|
|
|
|
|
|
that no exception is thrown (no C<$SIG{__DIE__}> callbacks are invoked) and |
|
654
|
|
|
|
|
|
|
the error is deferred until the callback actually returns. |
|
655
|
|
|
|
|
|
|
|
|
656
|
|
|
|
|
|
|
Typically used as |
|
657
|
|
|
|
|
|
|
|
|
658
|
|
|
|
|
|
|
return $consumer->fail; |
|
659
|
|
|
|
|
|
|
|
|
660
|
|
|
|
|
|
|
from within a consumer() callback. |
|
661
|
|
|
|
|
|
|
|
|
662
|
|
|
|
|
|
|
Returns the $consumer object. |
|
663
|
|
|
|
|
|
|
|
|
664
|
|
|
|
|
|
|
=cut |
|
665
|
|
|
|
|
|
|
|
|
666
|
|
|
|
|
|
|
sub fail { |
|
667
|
0
|
|
|
0
|
1
|
0
|
my $self= shift; |
|
668
|
0
|
|
|
|
|
0
|
$self->{fail}= shift; |
|
669
|
0
|
|
|
|
|
0
|
return $self; |
|
670
|
|
|
|
|
|
|
} |
|
671
|
|
|
|
|
|
|
|
|
672
|
|
|
|
|
|
|
=head2 $consumer->halt() |
|
673
|
|
|
|
|
|
|
|
|
674
|
|
|
|
|
|
|
Causes consume() to halt processing and exit once |
|
675
|
|
|
|
|
|
|
the callback returns. Typically invoked like |
|
676
|
|
|
|
|
|
|
|
|
677
|
|
|
|
|
|
|
return $consumer->halt; |
|
678
|
|
|
|
|
|
|
|
|
679
|
|
|
|
|
|
|
or |
|
680
|
|
|
|
|
|
|
|
|
681
|
|
|
|
|
|
|
return $consumer->fail->halt; |
|
682
|
|
|
|
|
|
|
|
|
683
|
|
|
|
|
|
|
Returns the consumer object. |
|
684
|
|
|
|
|
|
|
|
|
685
|
|
|
|
|
|
|
=cut |
|
686
|
|
|
|
|
|
|
|
|
687
|
|
|
|
|
|
|
|
|
688
|
|
|
|
|
|
|
sub halt { |
|
689
|
0
|
|
|
0
|
1
|
0
|
my $self= shift; |
|
690
|
0
|
|
|
|
|
0
|
$self->{halt}++; |
|
691
|
0
|
|
|
|
|
0
|
return $self; |
|
692
|
|
|
|
|
|
|
} |
|
693
|
|
|
|
|
|
|
|
|
694
|
|
|
|
|
|
|
|
|
695
|
|
|
|
|
|
|
|
|
696
|
|
|
|
|
|
|
=head2 $object->is_ignored($id) |
|
697
|
|
|
|
|
|
|
|
|
698
|
|
|
|
|
|
|
Returns true if an item has been set to be ignored. If $id is omitted |
|
699
|
|
|
|
|
|
|
defaults to last_id |
|
700
|
|
|
|
|
|
|
|
|
701
|
|
|
|
|
|
|
=cut |
|
702
|
|
|
|
|
|
|
|
|
703
|
|
|
|
|
|
|
sub is_ignored { |
|
704
|
240
|
|
|
240
|
1
|
467
|
my $self= shift; |
|
705
|
240
|
50
|
|
|
|
663
|
my $id= @_ ? shift @_ : $self->last_id; |
|
706
|
240
|
50
|
|
|
|
612
|
return if !defined $id; |
|
707
|
240
|
50
|
|
|
|
999
|
return $self->{ignore}{$id} ? 1 : 0 |
|
708
|
|
|
|
|
|
|
} |
|
709
|
|
|
|
|
|
|
|
|
710
|
|
|
|
|
|
|
=head2 $object->reset() |
|
711
|
|
|
|
|
|
|
|
|
712
|
|
|
|
|
|
|
Reset the state of the object. |
|
713
|
|
|
|
|
|
|
|
|
714
|
|
|
|
|
|
|
=head2 $object->acquire() |
|
715
|
|
|
|
|
|
|
|
|
716
|
|
|
|
|
|
|
Acquire an item to be processed. |
|
717
|
|
|
|
|
|
|
|
|
718
|
|
|
|
|
|
|
Returns an identifier to be used to identify the item acquired. |
|
719
|
|
|
|
|
|
|
|
|
720
|
|
|
|
|
|
|
=head2 $object->release() |
|
721
|
|
|
|
|
|
|
|
|
722
|
|
|
|
|
|
|
Release any locks on the currently held item. |
|
723
|
|
|
|
|
|
|
|
|
724
|
|
|
|
|
|
|
Normally there is no need to call this directly. |
|
725
|
|
|
|
|
|
|
|
|
726
|
|
|
|
|
|
|
=cut |
|
727
|
|
|
|
|
|
|
|
|
728
|
0
|
|
|
0
|
1
|
0
|
sub reset { confess "abstract method must be overriden by subclass\n"; } |
|
729
|
0
|
|
|
0
|
1
|
0
|
sub acquire { confess "abstract method must be overriden by subclass\n"; } |
|
730
|
0
|
|
|
0
|
1
|
0
|
sub release { confess "abstract method must be overriden by subclass\n"; } |
|
731
|
|
|
|
|
|
|
|
|
732
|
|
|
|
|
|
|
=head2 $object->error() |
|
733
|
|
|
|
|
|
|
|
|
734
|
|
|
|
|
|
|
Calls the C callback if the user has provided one, otherwise |
|
735
|
|
|
|
|
|
|
calls C. Probably not all that useful for an end user. |
|
736
|
|
|
|
|
|
|
|
|
737
|
|
|
|
|
|
|
=cut |
|
738
|
|
|
|
|
|
|
|
|
739
|
|
|
|
|
|
|
sub error { |
|
740
|
0
|
|
|
0
|
1
|
0
|
my $self= shift; |
|
741
|
0
|
0
|
|
|
|
0
|
if ( $self->{error} ) { |
|
742
|
0
|
|
|
|
|
0
|
$self->{error}->(@_); |
|
743
|
|
|
|
|
|
|
} else { |
|
744
|
0
|
|
|
|
|
0
|
confess @_; |
|
745
|
|
|
|
|
|
|
} |
|
746
|
|
|
|
|
|
|
} |
|
747
|
|
|
|
|
|
|
|
|
748
|
|
|
|
|
|
|
=head2 $object->consume($callback) |
|
749
|
|
|
|
|
|
|
|
|
750
|
|
|
|
|
|
|
Consumes a data resource until it is exhausted using C, |
|
751
|
|
|
|
|
|
|
C, and C as appropriate. Normally this is the main |
|
752
|
|
|
|
|
|
|
method used by external processes. |
|
753
|
|
|
|
|
|
|
|
|
754
|
|
|
|
|
|
|
Before each attempt to acquire a new resource, and once at the end of |
|
755
|
|
|
|
|
|
|
each pass consume will call C to determine if it can do so. |
|
756
|
|
|
|
|
|
|
The user may hook into this by specifying a callback in the constructor. |
|
757
|
|
|
|
|
|
|
This callback will be executed with no args when it is in the inner loop |
|
758
|
|
|
|
|
|
|
(per item), and with the number of passes at the end of each pass |
|
759
|
|
|
|
|
|
|
(starting with 1). |
|
760
|
|
|
|
|
|
|
|
|
761
|
|
|
|
|
|
|
=head2 $object->proceed($passes) |
|
762
|
|
|
|
|
|
|
|
|
763
|
|
|
|
|
|
|
Returns C if the conditions specified at construction time are |
|
764
|
|
|
|
|
|
|
satisfied and processing may proceed. Returns C otherwise. |
|
765
|
|
|
|
|
|
|
|
|
766
|
|
|
|
|
|
|
If the user has specified a C callback in the constructor then |
|
767
|
|
|
|
|
|
|
this will be executed before any other rules are applied, with a |
|
768
|
|
|
|
|
|
|
reference to the current C<$object>, a reference to the runstats, and if |
|
769
|
|
|
|
|
|
|
being called at the end of pass with the number of passes. |
|
770
|
|
|
|
|
|
|
|
|
771
|
|
|
|
|
|
|
If this callback returns C then the other rules will be applied, |
|
772
|
|
|
|
|
|
|
and only if all other conditions from the constructor are satisfied |
|
773
|
|
|
|
|
|
|
will C itself return C. |
|
774
|
|
|
|
|
|
|
|
|
775
|
|
|
|
|
|
|
=head2 $object->runstats() |
|
776
|
|
|
|
|
|
|
|
|
777
|
|
|
|
|
|
|
Returns a reference to a hash of statistics about the last (or currently running) |
|
778
|
|
|
|
|
|
|
execution of consume. Example: |
|
779
|
|
|
|
|
|
|
|
|
780
|
|
|
|
|
|
|
{ |
|
781
|
|
|
|
|
|
|
'passes' => 2, |
|
782
|
|
|
|
|
|
|
'processed_this_pass' => 0, |
|
783
|
|
|
|
|
|
|
'processed' => 3, |
|
784
|
|
|
|
|
|
|
'start_time' => 1209750962, |
|
785
|
|
|
|
|
|
|
'failed' => 0, |
|
786
|
|
|
|
|
|
|
'elapsed' => 0, |
|
787
|
|
|
|
|
|
|
'end_time' => 1209750962, |
|
788
|
|
|
|
|
|
|
'failed_this_pass' => 0 |
|
789
|
|
|
|
|
|
|
} |
|
790
|
|
|
|
|
|
|
|
|
791
|
|
|
|
|
|
|
Note that start_time and end_time are unix timestamps. |
|
792
|
|
|
|
|
|
|
|
|
793
|
|
|
|
|
|
|
=cut |
|
794
|
|
|
|
|
|
|
|
|
795
|
0
|
|
|
0
|
1
|
0
|
sub runstats { $_[0]->{runstats} } |
|
796
|
|
|
|
|
|
|
|
|
797
|
|
|
|
|
|
|
sub proceed { |
|
798
|
70
|
|
|
70
|
1
|
245
|
my $self= shift; |
|
799
|
70
|
|
|
|
|
183
|
my $runstats= $self->{runstats}; |
|
800
|
70
|
|
|
|
|
201
|
$runstats->{end_time}= time; |
|
801
|
70
|
|
|
|
|
233
|
$runstats->{elapsed}= $runstats->{end_time} - $runstats->{start_time}; |
|
802
|
|
|
|
|
|
|
|
|
803
|
70
|
50
|
|
|
|
542
|
if ( my $cb= $self->{proceed} ) { |
|
804
|
0
|
0
|
|
|
|
0
|
$cb->( $self, $self->{runstats}, @_ ) # pass on the $passes argument if its there |
|
805
|
|
|
|
|
|
|
or return; |
|
806
|
|
|
|
|
|
|
} |
|
807
|
70
|
|
|
|
|
335
|
for my $key (qw(elapsed passes processed failed)) { |
|
808
|
280
|
|
|
|
|
619
|
my $max= "max_$key"; |
|
809
|
280
|
50
|
33
|
|
|
1195
|
return if $self->{$max} && $runstats->{$key} >= $self->{$max}; |
|
810
|
|
|
|
|
|
|
} |
|
811
|
70
|
50
|
|
|
|
240
|
return if $self->{halt}; |
|
812
|
70
|
|
|
|
|
483
|
return 1; |
|
813
|
|
|
|
|
|
|
} |
|
814
|
|
|
|
|
|
|
|
|
815
|
|
|
|
|
|
|
sub consume { |
|
816
|
5
|
|
|
5
|
1
|
188
|
my $self= shift; |
|
817
|
5
|
|
|
|
|
10
|
my $callback= shift; |
|
818
|
|
|
|
|
|
|
|
|
819
|
5
|
|
|
|
|
12
|
my $passes= 0; |
|
820
|
|
|
|
|
|
|
|
|
821
|
5
|
50
|
|
|
|
14
|
unless ($self->{runstats}) { |
|
822
|
5
|
|
|
|
|
17
|
$self->{runstats}= {}; |
|
823
|
|
|
|
|
|
|
$self->{runstats}{$_}= 0 |
|
824
|
5
|
|
|
|
|
40
|
for qw(passes processed failed processed_this_pass failed_this_pass); |
|
825
|
|
|
|
|
|
|
} |
|
826
|
|
|
|
|
|
|
|
|
827
|
5
|
|
|
|
|
11
|
my $runstats= $self->{runstats}; |
|
828
|
5
|
|
|
|
|
25
|
$runstats->{start_time}= time; |
|
829
|
|
|
|
|
|
|
|
|
830
|
5
|
|
|
|
|
28
|
$self->reset(); |
|
831
|
|
|
|
|
|
|
do { |
|
832
|
10
|
|
|
|
|
28
|
++$runstats->{passes}; |
|
833
|
10
|
|
|
|
|
24
|
$runstats->{processed_this_pass}= $runstats->{failed_this_pass}= 0; |
|
834
|
10
|
|
66
|
|
|
68
|
while ( $self->proceed && defined( my $item= $self->acquire ) ) { |
|
835
|
|
|
|
|
|
|
eval { |
|
836
|
50
|
|
|
|
|
283
|
$self->process($callback); |
|
837
|
50
|
|
|
|
|
250
|
$runstats->{processed_this_pass}++; |
|
838
|
50
|
|
|
|
|
139
|
$runstats->{processed}++; |
|
839
|
50
|
|
|
|
|
598
|
1; |
|
840
|
|
|
|
|
|
|
} |
|
841
|
50
|
50
|
|
|
|
120
|
or do { |
|
842
|
0
|
|
|
|
|
0
|
$runstats->{failed_this_pass}++; |
|
843
|
0
|
|
|
|
|
0
|
$runstats->{failed}++; |
|
844
|
|
|
|
|
|
|
|
|
845
|
|
|
|
|
|
|
# quotes force string copy |
|
846
|
0
|
|
|
|
|
0
|
$self->debug_warn(5, "Failed during \$self->process(\$callback): $@"); |
|
847
|
|
|
|
|
|
|
} |
|
848
|
|
|
|
|
|
|
} |
|
849
|
|
|
|
|
|
|
} while $self->proceed( $runstats->{passes} ) |
|
850
|
5
|
|
66
|
|
|
10
|
&& $runstats->{processed_this_pass}; |
|
851
|
|
|
|
|
|
|
|
|
852
|
|
|
|
|
|
|
# if we still hold a lock let it go. |
|
853
|
5
|
|
|
|
|
17
|
delete $self->{halt}; |
|
854
|
5
|
|
|
|
|
30
|
$self->release; |
|
855
|
5
|
|
|
|
|
21
|
return $runstats; |
|
856
|
|
|
|
|
|
|
} |
|
857
|
|
|
|
|
|
|
|
|
858
|
|
|
|
|
|
|
|
|
859
|
|
|
|
|
|
|
=head1 AUTHOR |
|
860
|
|
|
|
|
|
|
|
|
861
|
|
|
|
|
|
|
Yves Orton, C<< >> |
|
862
|
|
|
|
|
|
|
|
|
863
|
|
|
|
|
|
|
=head1 BUGS |
|
864
|
|
|
|
|
|
|
|
|
865
|
|
|
|
|
|
|
Please report any bugs or feature requests to |
|
866
|
|
|
|
|
|
|
C, or through the web interface at |
|
867
|
|
|
|
|
|
|
L. |
|
868
|
|
|
|
|
|
|
|
|
869
|
|
|
|
|
|
|
I will be notified, and then you'll automatically be notified of progress on |
|
870
|
|
|
|
|
|
|
your bug as I make changes. |
|
871
|
|
|
|
|
|
|
|
|
872
|
|
|
|
|
|
|
=head1 SUPPORT |
|
873
|
|
|
|
|
|
|
|
|
874
|
|
|
|
|
|
|
You can find documentation for this module with the perldoc command. |
|
875
|
|
|
|
|
|
|
|
|
876
|
|
|
|
|
|
|
perldoc Data::Consumer |
|
877
|
|
|
|
|
|
|
|
|
878
|
|
|
|
|
|
|
|
|
879
|
|
|
|
|
|
|
You can also look for information at: |
|
880
|
|
|
|
|
|
|
|
|
881
|
|
|
|
|
|
|
=over 4 |
|
882
|
|
|
|
|
|
|
|
|
883
|
|
|
|
|
|
|
=item * RT: CPAN's request tracker |
|
884
|
|
|
|
|
|
|
|
|
885
|
|
|
|
|
|
|
L |
|
886
|
|
|
|
|
|
|
|
|
887
|
|
|
|
|
|
|
=item * AnnoCPAN: Annotated CPAN documentation |
|
888
|
|
|
|
|
|
|
|
|
889
|
|
|
|
|
|
|
L |
|
890
|
|
|
|
|
|
|
|
|
891
|
|
|
|
|
|
|
=item * CPAN Ratings |
|
892
|
|
|
|
|
|
|
|
|
893
|
|
|
|
|
|
|
L |
|
894
|
|
|
|
|
|
|
|
|
895
|
|
|
|
|
|
|
=item * Search CPAN |
|
896
|
|
|
|
|
|
|
|
|
897
|
|
|
|
|
|
|
L |
|
898
|
|
|
|
|
|
|
|
|
899
|
|
|
|
|
|
|
=back |
|
900
|
|
|
|
|
|
|
|
|
901
|
|
|
|
|
|
|
|
|
902
|
|
|
|
|
|
|
=head1 ACKNOWLEDGEMENTS |
|
903
|
|
|
|
|
|
|
|
|
904
|
|
|
|
|
|
|
Igor Sutton for ideas, testing and support |
|
905
|
|
|
|
|
|
|
|
|
906
|
|
|
|
|
|
|
=head1 COPYRIGHT & LICENSE |
|
907
|
|
|
|
|
|
|
|
|
908
|
|
|
|
|
|
|
Copyright 2008, 2010, 2011 Yves Orton, all rights reserved. |
|
909
|
|
|
|
|
|
|
|
|
910
|
|
|
|
|
|
|
This program is free software; you can redistribute it and/or modify it |
|
911
|
|
|
|
|
|
|
under the same terms as Perl itself. |
|
912
|
|
|
|
|
|
|
|
|
913
|
|
|
|
|
|
|
=cut |
|
914
|
|
|
|
|
|
|
|
|
915
|
|
|
|
|
|
|
1; # End of Data::Consumer |
|
916
|
|
|
|
|
|
|
|