File Coverage

blib/lib/Data/Consumer/MySQL2.pm
Criterion Covered Total %
statement 22 124 17.7
branch 0 60 0.0
condition 0 13 0.0
subroutine 8 16 50.0
pod 5 5 100.0
total 35 218 16.0


line stmt bran cond sub pod time code
1             package Data::Consumer::MySQL2;
2              
3 5     5   95281 use warnings;
  5         7  
  5         128  
4 5     5   16 use strict;
  5         5  
  5         95  
5 5     5   16 use DBI;
  5         6  
  5         166  
6 5     5   18 use Carp qw(confess);
  5         5  
  5         182  
7 5     5   17 use warnings FATAL => 'all';
  5         5  
  5         166  
8 5     5   17 use base 'Data::Consumer';
  5         5  
  5         353  
9 5     5   18 use vars qw/$Debug $VERSION $Cmd $Fail/;
  5         6  
  5         377  
10              
11             # This code was formatted with the following perltidy options:
12             # -ple -ce -bbb -bbc -bbs -nolq -l=100 -noll -nola -nwls='=' -isbc -nolc -otr -kis
13             # If you patch it please use the same options for your patch.
14              
15             *Debug= *Data::Consumer::Debug;
16             *Cmd= *Data::Consumer::Cmd;
17             *Fail= *Data::Consumer::Fail;
18              
19             BEGIN {
20 5     5   34 __PACKAGE__->register();
21             }
22              
23             =head1 NAME
24              
25             Data::Consumer::MySQL2 - Data::Consumer implementation for a mysql database table resource
26              
27             =head1 VERSION
28              
29             Version 0.15
30              
31             =cut
32              
33             $VERSION= '0.15';
34              
35             =head1 SYNOPSIS
36              
37             use Data::Consumer::MySQL2;
38              
39             my $consumer = Data::Consumer::MySQL2->new(
40             dbh => $dbh,
41             table => 'T',
42             id_field= > 'id',
43             flag_field => 'done',
44             lock_prefix => $worker_name,
45             unprocessed => 0,
46             working => 1,
47             processed => 2,
48             failed => 3,
49             );
50              
51             $consumer->consume( sub {
52             my $id = shift;
53             print "processed $id\n";
54             } );
55              
56              
57             =head1 FUNCTIONS
58              
59             =head2 CLASS->new(%opts)
60              
61             Constructor for a L instance.
62              
63             Options are as follows:
64              
65             =over 4
66              
67             =item connect => \@connect_args
68              
69             Will use C<@connect_args> to connect to the database using
70             Cconnect()>. This argument is mandatory if the C argument is
71             not provided.
72              
73             =item dbh => $dbh
74              
75             Use C<$dbh> as the database connection object. If this argument is
76             provided then connect will be ignored.
77              
78             =item table => 'some_table_name'
79              
80             Process records in the specified table.
81              
82             =item id_field => 'id'
83              
84             The column name of the primary key of the table being processed
85              
86             =item flag_field => 'process_state'
87              
88             The column name in the table being processed which shows whether
89             an object is processed or not.
90              
91             =item lock_prefix => 'my-lock-name'
92              
93             The prefix to use for the mysql locks. Defaults to C<$0-$table>.
94              
95             It is B recommended that end-users of this module explicitly
96             specify a lock_prefix in production environments. A multi-process
97             system relying on mutual exclusion B run into problems when
98             consuming from the same source if $0 and $table are not identical
99             between workers. Generally, using the name of the consuming module
100             should suffice (e.g. Your::Data::Consumer::Worker).
101              
102             =item unprocessed => 0
103              
104             The value of the C which indicates that an item is not
105             processed. If not provided defaults to C<0>.
106              
107             =item working => 1
108              
109             The value of the C which indicates that an item is currently
110             being processed. If not provided defaults to C<1>.
111              
112             =item processed => 2
113              
114             The value of the C which indicates that an item has been
115             successfully processed. If not provided defaults to C<2>.
116              
117             =item failed => 3
118              
119             The value of the C which indicates that processing of an
120             item has failed. If not provided defaults to C<3>.
121              
122             =item init_id => 0
123              
124             The value which the first acquired record's C must be greater
125             than. Should be smaller than any legal id in the table. Defaults to C<0>.
126              
127             =item select_sql
128              
129             =item select_args
130              
131             These arguments are optional, and will be synthesized from the other values if not provided.
132              
133             SQL select query which can be executed to acquire an item to be processed. Should
134             return a single record with a single column contain the id to be processed, at the
135             same time it should ensure that a lock on the id is created.
136              
137             The query will be executed with the id of the last processed item, followed by the arguments
138             provided by the C property.
139              
140             =item check_sql
141              
142             =item check_args
143              
144             These arguments are optional, unless you specify C yourself, in which case
145             it is required you also specify C as well.
146              
147             SQL select query which can be executed to verify that the item to be processed still has
148             the expected flag fields set appropriately.
149              
150             There is a very annoying and sublte race condition (possibly only in modern MySQL's) which
151             means that is possible that the query used for C might return an id for a record
152             which has already been processed. This query is used to avoid that race condition.
153              
154             The query should validate any flag fields or constraints specified in C are
155             true, it should return only the id of the record to be processed.
156              
157             The query will be executed with the id of the item to process, followed by the arguments
158             provided by the C property.
159              
160             =item update_sql
161              
162             =item update_args
163              
164             These arguments are optional, and will be synthesized from the other values if not provided.
165              
166             SQL update query which can be used to change the status the record being processed.
167              
168             Will be executed with the arguments provided in update_args followed the new status,
169             and the id.
170              
171             =item release_sql
172              
173             =item release_args
174              
175             These arguments are optional, and will be synthesized from the other values if not provided.
176              
177             SQL select query which can be used to clear the currently held lock.
178              
179             Will be called with the arguments provided in release_args, plust the id.
180              
181             =back
182              
183             =cut
184              
185             sub new {
186 0     0 1   my ( $class, %opts )= @_;
187 0           my $self= $class->SUPER::new(); # let Data::Consumer bless the hash
188              
189 0           my @bad;
190 0           foreach my $opt (qw(unprocessed processed working failed lock_prefix)) {
191 0 0         if (ref $opts{$opt}) {
    0          
192 0           push @bad, "option '$opt' is not allowed to be a ref in DC::MySQL2";
193             } elsif (!defined $opts{$opt}) {
194 0           push @bad, "option '$opt' is not allowed to be missing or undefined in DC::MySQL2";
195             }
196             }
197 0 0         if (@bad) {
198 0           confess "Bad option in $class->new(): " . join "\n", @bad;
199             }
200              
201 0 0 0       if ( !$opts{dbh} and $opts{connect} ) {
202 0 0         $opts{dbh}= DBI->connect( @{ $opts{connect} } )
  0            
203             or confess
204             "Could not connect to database '$opts{connect}[0]' as '$opts{user}[1]': $DBI::errstr\n";
205             }
206             $opts{dbh}
207 0 0         or confess "Must have a database handle!";
208 0 0         $opts{dbh}->isa('DBI::db')
209             or die "First argument must be a DBI handle! $opts{dbh}\n";
210              
211 0           $self->{dbh}= $opts{dbh};
212              
213 0   0       $opts{id_field} ||= 'id';
214 0   0       $opts{flag_field} ||= 'process_state';
215 0 0         $opts{init_id}= 0 unless exists $opts{init_id};
216              
217 0 0 0       if (!$opts{check_sql} and $opts{select_sql}) {
218 0           confess "In $class if you specify 'select_sql' you MUST provide 'check_sql' as well!";
219             }
220              
221 0 0         unless ( $opts{select_sql} ) {
222 0           $opts{select_sql}= do {
223 0           local $_= '
224             SELECT
225             $id_field
226             FROM $table
227             WHERE
228             $id_field > ?
229             AND $flag_field = ?
230             AND GET_LOCK( CONCAT_WS("=", ?, $id_field ), 0) != 0
231             LIMIT 1
232             ';
233 0           s/^\s+//mg;
234 0 0         s/\$(\w+)/$opts{$1} || confess "Option $1 is mandatory"/ge;
  0            
235 0           $_;
236             };
237 0           $opts{select_args}= [ $opts{unprocessed}, $opts{lock_prefix} ];
238              
239 0           $opts{check_sql}= do {
240 0           local $_= '
241             SELECT
242             $id_field
243             FROM $table
244             WHERE
245             $id_field = ?
246             AND $flag_field = ?
247             ';
248 0           s/^\s+//mg;
249 0 0         s/\$(\w+)/$opts{$1} || confess "Option $1 is mandatory"/ge;
  0            
250 0           $_;
251             };
252 0           $opts{check_args}= [ $opts{unprocessed} ];
253             }
254              
255 0   0       $opts{update_sql} ||= do {
256 0           local $_= '
257             UPDATE $table
258             SET $flag_field = ?
259             WHERE
260             $id_field = ?
261             ';
262 0           s/^\s+//mg;
263 0 0         s/\$(\w+)/$opts{$1} || confess "Option $1 is mandatory"/ge;
  0            
264 0           $_;
265             };
266 0 0         if ( !$opts{release_sql} ) {
267 0           $opts{release_sql}= do {
268 0           local $_= '
269             SELECT RELEASE_LOCK( CONCAT_WS("=", ?, ? ) )
270             ';
271 0           s/^\s+//mg;
272 0 0         s/\$(\w+)/$opts{$1} || confess "Option $1 is mandatory"/ge;
  0            
273 0           $_;
274             };
275 0           $opts{release_args}= [ $opts{lock_prefix} ];
276             }
277 0           %$self= %opts;
278              
279 0           return $self;
280             }
281              
282             =head2 $object->reset()
283              
284             Reset the state of the object.
285              
286             =head2 $object->acquire()
287              
288             Aquire an item to be processed.
289              
290             Returns an identifier to be used to identify the item acquired.
291              
292             =head2 $object->release()
293              
294             Release any locks on the currently held item.
295              
296             Normally there is no need to call this directly.
297              
298             =cut
299              
300             sub reset {
301 0     0 1   my $self= shift;
302 0           $self->debug_warn( 5, "reset" );
303 0           $self->release();
304 0           $self->{last_id}= $self->{init_id};
305 0           return $self;
306             }
307              
308             sub _do_callback {
309 0     0     my ( $self, $callback )= @_;
310 0 0         if ( eval { $callback->( $self, @{$self}{qw(last_id dbh)} ); 1; } ) {
  0            
  0            
  0            
311 0           return;
312             } else {
313 0           return "Callback failed: $@";
314             }
315             }
316              
317             sub acquire {
318 0     0 1   my $self= shift;
319 0           my $dbh= $self->{dbh};
320              
321 0 0         $self->reset if !defined $self->{last_id};
322 0           while (1) {
323 0           $self->debug_warn( 5, "last_id was $self->{last_id}");
324 0 0         my ($id)= $dbh->selectrow_array( $self->{select_sql}, undef, $self->{last_id}, @{ $self->{select_args} || [] } );
  0            
325 0 0         if ( defined $id ) {
326 0 0         next if $self->is_ignored($id);
327 0 0         my ($got_id) = $dbh->selectrow_array( $self->{check_sql}, undef, $id, @{ $self->{check_args} || [] } );
  0            
328 0 0         if ( not defined $got_id) {
329 0           $self->debug_warn(5, "race condition avoided for '$id', check_sql and select_sql did not line up!");
330 0           next;
331             }
332 0           $self->{last_lock}= $id;
333 0           $self->debug_warn( 5, "acquired '$id'" );
334             } else {
335 0           $self->debug_warn( 5, "acquire failed -- resource has been exhausted" );
336             }
337 0           $self->{last_id}= $id;
338 0           last;
339             }
340 0           return $self->{last_id};
341             }
342              
343              
344             sub release {
345 0     0 1   my $self= shift;
346              
347 0 0         return 0 unless exists $self->{last_lock};
348              
349 0 0         my $res=
350             $self->{dbh}
351 0           ->do( $self->{release_sql}, undef, @{ $self->{release_args} || [] }, $self->{last_lock} );
352 0 0         defined $res
353             or $self->error( "Failed to execute '$self->{release_sql}' with args '$self->{last_lock}': "
354             . $self->{dbh}->errstr() );
355              
356 0           $self->debug_warn( 5, "release lock '$self->{last_lock}' status: $res" ); # XXX
357 0           delete $self->{last_lock};
358 0           return 1;
359             }
360              
361             sub _mark_as {
362 0     0     my ( $self, $key, $id )= @_;
363 0           $self->debug_warn(5, "$key => $id");
364 0 0         if ( defined $self->{$key} ) {
365 0           $self->debug_warn( 5, "marking '$id' as '$key' ($self->{$key})" );
366 0 0         my $res= $self->{dbh}->do( $self->{update_sql}, undef, @{ $self->{update_args} || [] }, $self->{$key}, $id )
  0 0          
367             or
368             $self->error( "Failed to execute '$self->{update_sql}' with args '$self->{$key}','$id': "
369             . $self->{dbh}->errstr() );
370 0 0         0 + $res or $self->error("Update resulted in 0 records changing!");
371 0           $self->debug_warn( 5, "result: $res");
372              
373             }
374             }
375              
376             =head2 $object->dbh()
377              
378             returns the database handle the object is using to communicate to the db with.
379              
380             =cut
381              
382 0     0 1   sub dbh { $_[0]->{dbh} }
383              
384             sub DESTROY {
385 0     0     my $self= shift;
386 0 0         $self->release() if $self;
387             }
388              
389             =head1 AUTHOR
390              
391             Yves Orton, C<< >>
392              
393             =head1 BUGS
394              
395             Please report any bugs or feature requests to
396             C, or through the web interface at
397             L.
398              
399             I will be notified, and then you'll automatically be notified of progress on
400             your bug as I make changes.
401              
402             =head1 ACKNOWLEDGEMENTS
403              
404             Igor Sutton for ideas, testing and support.
405              
406             =head1 COPYRIGHT & LICENSE
407              
408             Copyright 2008, 2010, 2011 Yves Orton, all rights reserved.
409              
410             This program is free software; you can redistribute it and/or modify it
411             under the same terms as Perl itself.
412              
413             =cut
414              
415             1; # End of Data::Consumer::MySQL2
416