File Coverage

blib/lib/Data/Consumer/MySQL2.pm
Criterion Covered Total %
statement 22 126 17.4
branch 0 60 0.0
condition 0 13 0.0
subroutine 8 16 50.0
pod 5 5 100.0
total 35 220 15.9


line stmt bran cond sub pod time code
1             package Data::Consumer::MySQL2;
2              
3 5     5   134236 use warnings;
  5         15  
  5         235  
4 5     5   33 use strict;
  5         13  
  5         114  
5 5     5   24 use DBI;
  5         9  
  5         209  
6 5     5   29 use Carp qw(confess);
  5         11  
  5         269  
7 5     5   34 use warnings FATAL => 'all';
  5         11  
  5         217  
8 5     5   31 use base 'Data::Consumer';
  5         12  
  5         798  
9 5     5   36 use vars qw/$Debug $VERSION $Cmd $Fail/;
  5         9  
  5         427  
10              
11             # This code was formatted with the following perltidy options:
12             # -ple -ce -bbb -bbc -bbs -nolq -l=100 -noll -nola -nwls='=' -isbc -nolc -otr -kis
13             # If you patch it please use the same options for your patch.
14              
15             *Debug= *Data::Consumer::Debug;
16             *Cmd= *Data::Consumer::Cmd;
17             *Fail= *Data::Consumer::Fail;
18              
19             BEGIN {
20 5     5   41 __PACKAGE__->register();
21             }
22              
23             =head1 NAME
24              
25             Data::Consumer::MySQL2 - Data::Consumer implementation for a mysql database table resource
26              
27             =head1 VERSION
28              
29             Version 0.17
30              
31             =cut
32              
33             $VERSION= '0.17';
34              
35             =head1 SYNOPSIS
36              
37             use Data::Consumer::MySQL2;
38              
39             my $consumer = Data::Consumer::MySQL2->new(
40             dbh => $dbh,
41             table => 'T',
42             id_field= > 'id',
43             flag_field => 'done',
44             lock_prefix => $worker_name,
45             unprocessed => 0,
46             working => 1,
47             processed => 2,
48             failed => 3,
49             );
50              
51             $consumer->consume( sub {
52             my $id = shift;
53             print "processed $id\n";
54             } );
55              
56              
57             =head1 FUNCTIONS
58              
59             =head2 CLASS->new(%opts)
60              
61             Constructor for a L instance.
62              
63             Options are as follows:
64              
65             =over 4
66              
67             =item connect => \@connect_args
68              
69             Will use C<@connect_args> to connect to the database using
70             Cconnect()>. This argument is mandatory if the C argument is
71             not provided.
72              
73             =item dbh => $dbh
74              
75             Use C<$dbh> as the database connection object. If this argument is
76             provided then connect will be ignored.
77              
78             =item table => 'some_table_name'
79              
80             Process records in the specified table.
81              
82             =item id_field => 'id'
83              
84             The column name of the primary key of the table being processed
85              
86             =item flag_field => 'process_state'
87              
88             The column name in the table being processed which shows whether
89             an object is processed or not.
90              
91             =item lock_prefix => 'my-lock-name'
92              
93             The prefix to use for the mysql locks. Defaults to C<$0-$table>.
94              
95             It is B recommended that end-users of this module explicitly
96             specify a lock_prefix in production environments. A multi-process
97             system relying on mutual exclusion B run into problems when
98             consuming from the same source if $0 and $table are not identical
99             between workers. Generally, using the name of the consuming module
100             should suffice (e.g. Your::Data::Consumer::Worker).
101              
102             =item unprocessed => 0
103              
104             The value of the C which indicates that an item is not
105             processed. If not provided defaults to C<0>.
106              
107             =item working => 1
108              
109             The value of the C which indicates that an item is currently
110             being processed. If not provided defaults to C<1>.
111              
112             =item processed => 2
113              
114             The value of the C which indicates that an item has been
115             successfully processed. If not provided defaults to C<2>.
116              
117             =item failed => 3
118              
119             The value of the C which indicates that processing of an
120             item has failed. If not provided defaults to C<3>.
121              
122             =item init_id => 0
123              
124             The value which the first acquired record's C must be greater
125             than. Should be smaller than any legal id in the table. Defaults to C<0>.
126              
127             =item select_sql
128              
129             =item select_args
130              
131             These arguments are optional, and will be synthesized from the other values if not provided.
132              
133             SQL select query which can be executed to acquire an item to be processed. Should
134             return a single record with a single column contain the id to be processed, at the
135             same time it should ensure that a lock on the id is created.
136              
137             The query will be executed with the id of the last processed item, followed by the arguments
138             provided by the C property.
139              
140             =item check_sql
141              
142             =item check_args
143              
144             These arguments are optional, unless you specify C yourself, in which case
145             it is required you also specify C as well.
146              
147             SQL select query which can be executed to verify that the item to be processed still has
148             the expected flag fields set appropriately.
149              
150             There is a very annoying and sublte race condition (possibly only in modern MySQL's) which
151             means that is possible that the query used for C might return an id for a record
152             which has already been processed. This query is used to avoid that race condition.
153              
154             The query should validate any flag fields or constraints specified in C are
155             true, it should return only the id of the record to be processed.
156              
157             The query will be executed with the id of the item to process, followed by the arguments
158             provided by the C property.
159              
160             =item update_sql
161              
162             =item update_args
163              
164             These arguments are optional, and will be synthesized from the other values if not provided.
165              
166             SQL update query which can be used to change the status the record being processed.
167              
168             Will be executed with the arguments provided in update_args followed the new status,
169             and the id.
170              
171             =item release_sql
172              
173             =item release_args
174              
175             These arguments are optional, and will be synthesized from the other values if not provided.
176              
177             SQL select query which can be used to clear the currently held lock.
178              
179             Will be called with the arguments provided in release_args, plust the id.
180              
181             =back
182              
183             =cut
184              
185             sub new {
186 0     0 1   my ( $class, %opts )= @_;
187 0           my $self= $class->SUPER::new(); # let Data::Consumer bless the hash
188              
189 0           my @bad;
190 0           foreach my $opt (qw(unprocessed processed working failed lock_prefix)) {
191 0 0         if (ref $opts{$opt}) {
    0          
192 0           push @bad, "option '$opt' is not allowed to be a ref in DC::MySQL2";
193             } elsif (!defined $opts{$opt}) {
194 0           push @bad, "option '$opt' is not allowed to be missing or undefined in DC::MySQL2";
195             }
196             }
197 0 0         if (@bad) {
198 0           confess "Bad option in $class->new(): " . join "\n", @bad;
199             }
200              
201 0 0 0       if ( !$opts{dbh} and $opts{connect} ) {
202 0 0         $opts{dbh}= DBI->connect( @{ $opts{connect} } )
  0            
203             or confess
204             "Could not connect to database '$opts{connect}[0]' as '$opts{user}[1]': $DBI::errstr\n";
205             }
206             $opts{dbh}
207 0 0         or confess "Must have a database handle!";
208 0 0         $opts{dbh}->isa('DBI::db')
209             or die "First argument must be a DBI handle! $opts{dbh}\n";
210              
211 0           $self->{dbh}= $opts{dbh};
212              
213 0   0       $opts{id_field} ||= 'id';
214 0   0       $opts{flag_field} ||= 'process_state';
215 0 0         $opts{init_id}= 0 unless exists $opts{init_id};
216              
217 0 0 0       if (!$opts{check_sql} and $opts{select_sql}) {
218 0           confess "In $class if you specify 'select_sql' you MUST provide 'check_sql' as well!";
219             }
220              
221 0 0         unless ( $opts{select_sql} ) {
222 0           $opts{select_sql}= do {
223 0           local $_= '
224             SELECT
225             $id_field
226             FROM $table
227             WHERE
228             $id_field > ?
229             AND $flag_field = ?
230             AND GET_LOCK( CONCAT_WS("=", ?, $id_field ), 0) != 0
231             LIMIT 1
232             ';
233 0           s/^\s+//mg;
234 0 0         s/\$(\w+)/$opts{$1} || confess "Option $1 is mandatory"/ge;
  0            
235 0           $_;
236             };
237 0           $opts{select_args}= [ $opts{unprocessed}, $opts{lock_prefix} ];
238              
239 0           $opts{check_sql}= do {
240 0           local $_= '
241             SELECT
242             $id_field
243             FROM $table
244             WHERE
245             $id_field = ?
246             AND $flag_field = ?
247             ';
248 0           s/^\s+//mg;
249 0 0         s/\$(\w+)/$opts{$1} || confess "Option $1 is mandatory"/ge;
  0            
250 0           $_;
251             };
252 0           $opts{check_args}= [ $opts{unprocessed} ];
253             }
254              
255 0   0       $opts{update_sql} ||= do {
256 0           local $_= '
257             UPDATE $table
258             SET $flag_field = ?
259             WHERE
260             $id_field = ?
261             ';
262 0           s/^\s+//mg;
263 0 0         s/\$(\w+)/$opts{$1} || confess "Option $1 is mandatory"/ge;
  0            
264 0           $_;
265             };
266 0 0         if ( !$opts{release_sql} ) {
267 0           $opts{release_sql}= do {
268 0           local $_= '
269             SELECT RELEASE_LOCK( CONCAT_WS("=", ?, ? ) )
270             ';
271 0           s/^\s+//mg;
272 0 0         s/\$(\w+)/$opts{$1} || confess "Option $1 is mandatory"/ge;
  0            
273 0           $_;
274             };
275 0           $opts{release_args}= [ $opts{lock_prefix} ];
276             }
277 0           %$self= %opts;
278              
279 0           return $self;
280             }
281              
282             =head2 $object->reset()
283              
284             Reset the state of the object.
285              
286             =head2 $object->acquire()
287              
288             Aquire an item to be processed.
289              
290             Returns an identifier to be used to identify the item acquired.
291              
292             =head2 $object->release()
293              
294             Release any locks on the currently held item.
295              
296             Normally there is no need to call this directly.
297              
298             =cut
299              
300             sub reset {
301 0     0 1   my $self= shift;
302 0           $self->debug_warn( 5, "reset" );
303 0           $self->release();
304 0           $self->{last_id}= $self->{init_id};
305 0           return $self;
306             }
307              
308             sub _do_callback {
309 0     0     my ( $self, $callback )= @_;
310 0 0         if ( eval { $callback->( $self, @{$self}{qw(last_id dbh)} ); 1; } ) {
  0            
  0            
  0            
311 0           return;
312             } else {
313 0           return "Callback failed: $@";
314             }
315             }
316              
317             sub acquire {
318 0     0 1   my $self= shift;
319 0           my $dbh= $self->{dbh};
320              
321 0 0         $self->reset if !defined $self->{last_id};
322 0           while (1) {
323 0           $self->debug_warn( 5, "last_id was $self->{last_id}");
324 0 0         my ($id)= $dbh->selectrow_array( $self->{select_sql}, undef, $self->{last_id}, @{ $self->{select_args} || [] } );
  0            
325 0 0         if ( defined $id ) {
326 0 0         if ( $self->is_ignored($id) ) {
327 0           $self->{last_id}= $id;
328 0           next;
329             }
330 0 0         my ($got_id) = $dbh->selectrow_array( $self->{check_sql}, undef, $id, @{ $self->{check_args} || [] } );
  0            
331 0 0         if ( not defined $got_id) {
332 0           $self->debug_warn(5, "race condition avoided for '$id', check_sql and select_sql did not line up!");
333 0           next;
334             }
335 0           $self->{last_lock}= $id;
336 0           $self->debug_warn( 5, "acquired '$id'" );
337             } else {
338 0           $self->debug_warn( 5, "acquire failed -- resource has been exhausted" );
339             }
340 0           $self->{last_id}= $id;
341 0           last;
342             }
343 0           return $self->{last_id};
344             }
345              
346              
347             sub release {
348 0     0 1   my $self= shift;
349              
350 0 0         return 0 unless exists $self->{last_lock};
351              
352             my $res=
353             $self->{dbh}
354 0 0         ->do( $self->{release_sql}, undef, @{ $self->{release_args} || [] }, $self->{last_lock} );
  0            
355             defined $res
356             or $self->error( "Failed to execute '$self->{release_sql}' with args '$self->{last_lock}': "
357 0 0         . $self->{dbh}->errstr() );
358              
359 0           $self->debug_warn( 5, "release lock '$self->{last_lock}' status: $res" ); # XXX
360 0           delete $self->{last_lock};
361 0           return 1;
362             }
363              
364             sub _mark_as {
365 0     0     my ( $self, $key, $id )= @_;
366 0           $self->debug_warn(5, "$key => $id");
367 0 0         if ( defined $self->{$key} ) {
368 0           $self->debug_warn( 5, "marking '$id' as '$key' ($self->{$key})" );
369 0 0         my $res= $self->{dbh}->do( $self->{update_sql}, undef, @{ $self->{update_args} || [] }, $self->{$key}, $id )
370             or
371             $self->error( "Failed to execute '$self->{update_sql}' with args '$self->{$key}','$id': "
372 0 0         . $self->{dbh}->errstr() );
373 0 0         0 + $res or $self->error("Update resulted in 0 records changing!");
374 0           $self->debug_warn( 5, "result: $res");
375              
376             }
377             }
378              
379             =head2 $object->dbh()
380              
381             returns the database handle the object is using to communicate to the db with.
382              
383             =cut
384              
385 0     0 1   sub dbh { $_[0]->{dbh} }
386              
387             sub DESTROY {
388 0     0     my $self= shift;
389 0 0         $self->release() if $self;
390             }
391              
392             =head1 AUTHOR
393              
394             Yves Orton, C<< >>
395              
396             =head1 BUGS
397              
398             Please report any bugs or feature requests to
399             C, or through the web interface at
400             L.
401              
402             I will be notified, and then you'll automatically be notified of progress on
403             your bug as I make changes.
404              
405             =head1 ACKNOWLEDGEMENTS
406              
407             Igor Sutton for ideas, testing and support.
408              
409             =head1 COPYRIGHT & LICENSE
410              
411             Copyright 2008, 2010, 2011 Yves Orton, all rights reserved.
412              
413             This program is free software; you can redistribute it and/or modify it
414             under the same terms as Perl itself.
415              
416             =cut
417              
418             1; # End of Data::Consumer::MySQL2
419