File Coverage

blib/lib/DBIx/BatchChunker.pm
Criterion Covered Total %
statement 320 358 89.3
branch 143 186 76.8
condition 43 76 56.5
subroutine 45 47 95.7
pod 3 4 75.0
total 554 671 82.5


line stmt bran cond sub pod time code
1             package DBIx::BatchChunker;
2              
3             our $AUTHORITY = 'cpan:GSG';
4             # ABSTRACT: Run large database changes safely
5 6     6   1215054 use version;
  6         10020  
  6         32  
6             our $VERSION = 'v0.941.0'; # VERSION
7              
8 6     6   3139 use Moo;
  6         60019  
  6         31  
9 6     6   9924 use MooX::StrictConstructor;
  6         72534  
  6         26  
10              
11 6     6   128123 use CLDR::Number;
  6         301962  
  6         233  
12              
13 6     6   3169 use Types::Standard qw( Str Bool Undef ArrayRef HashRef CodeRef InstanceOf Tuple Maybe Optional slurpy );
  6         413689  
  6         71  
14 6     6   13933 use Types::Numbers qw( NumRange UnsignedInt PerlSafeInt PositiveInt PositiveOrZeroNum );
  6         771079  
  6         82  
15 6     6   9152 use Type::Utils;
  6         25130  
  6         54  
16              
17 6     6   7735 use List::Util 1.33 (qw( min max sum any first )); # has any/all/etc.
  6         114  
  6         447  
18 6     6   34 use Math::BigInt upgrade => 'Math::BigFloat';
  6         11  
  6         60  
19 6     6   1893 use Math::BigFloat;
  6         11  
  6         36  
20 6     6   3476 use POSIX qw( ceil );
  6         13  
  6         50  
21 6     6   362 use Scalar::Util qw( blessed weaken );
  6         12  
  6         238  
22 6     6   2835 use Term::ProgressBar 2.14; # with silent option
  6         339780  
  6         282  
23 6     6   781 use Time::HiRes qw( time sleep );
  6         1277  
  6         56  
24              
25 6     6   3852 use DBIx::BatchChunker::LoopState;
  6         19  
  6         307  
26              
27             # Don't export the above, but don't conflict with StrictConstructor, either
28 6     6   48 use namespace::clean -except => [qw< new meta >];
  6         12  
  6         50  
29              
30             # This is now an unused, dummy variable
31             our $DB_MAX_ID = ~0;
32              
33             #pod =encoding utf8
34             #pod
35             #pod =head1 SYNOPSIS
36             #pod
37             #pod use DBIx::BatchChunker;
38             #pod
39             #pod my $account_rs = $schema->resultset('Account')->search({
40             #pod account_type => 'deprecated',
41             #pod });
42             #pod
43             #pod my %params = (
44             #pod chunk_size => 5000,
45             #pod target_time => 5,
46             #pod
47             #pod rs => $account_rs,
48             #pod id_name => 'account_id',
49             #pod
50             #pod coderef => sub { $_[1]->delete },
51             #pod sleep => 1,
52             #pod debug => 1,
53             #pod
54             #pod progress_name => 'Deleting deprecated accounts',
55             #pod process_past_max => 1,
56             #pod );
57             #pod
58             #pod # EITHER:
59             #pod # 1) Automatically construct and execute the changes:
60             #pod
61             #pod DBIx::BatchChunker->construct_and_execute(%params);
62             #pod
63             #pod # OR
64             #pod # 2) Manually construct and execute the changes:
65             #pod
66             #pod my $batch_chunker = DBIx::BatchChunker->new(%params);
67             #pod
68             #pod $batch_chunker->calculate_ranges;
69             #pod $batch_chunker->execute;
70             #pod
71             #pod =head1 DESCRIPTION
72             #pod
73             #pod This utility class is for running a large batch of DB changes in a manner that doesn't
74             #pod cause huge locks, outages, and missed transactions. It's highly flexible to allow for
75             #pod many different kinds of change operations, and dynamically adjusts chunks to its
76             #pod workload.
77             #pod
78             #pod It works by splitting up DB operations into smaller chunks within a loop. These chunks
79             #pod are transactionalized, either naturally as single-operation bulk work or by the loop
80             #pod itself. The full range is calculated beforehand to get the right start/end points.
81             #pod A L will be created to let the deployer know the
82             #pod processing status.
83             #pod
84             #pod There are two ways to use this class: call the automatic constructor and executor
85             #pod (L) or manually construct the object and call its methods. See
86             #pod L for examples of both.
87             #pod
88             #pod B You should not rely on this class to magically fix any and all locking
89             #pod problems the DB might experience just because it's being used. Thorough testing and
90             #pod best practices are still required.
91             #pod
92             #pod =head2 Processing Modes
93             #pod
94             #pod This class has several different modes of operation, depending on what was passed to
95             #pod the constructor:
96             #pod
97             #pod =head3 DBIC Processing
98             #pod
99             #pod If both L and L are passed, a chunk ResultSet is built from the base
100             #pod ResultSet, to add in a C clause, and the new ResultSet is passed into the
101             #pod coderef. The coderef should run some sort of active ResultSet operation from there.
102             #pod
103             #pod An L should be provided, but if it is missing it will be looked up based on
104             #pod the primary key of the ResultSource.
105             #pod
106             #pod If L is also enabled, then each chunk is wrapped in a transaction and the
107             #pod coderef is called for each row in the chunk. In this case, the coderef is passed a
108             #pod Result object instead of the chunk ResultSet.
109             #pod
110             #pod Note that whether L is enabled or not, the coderef execution is encapsulated
111             #pod in DBIC's retry logic, so any failures will re-connect and retry the coderef. Because of
112             #pod this, any changes you make within the coderef should be idempotent, or should at least be
113             #pod able to skip over any already-processed rows.
114             #pod
115             #pod =head3 Active DBI Processing
116             #pod
117             #pod If an L (DBI statement handle args) is passed without a L, the statement
118             #pod handle is merely executed on each iteration with the start and end IDs. It is assumed
119             #pod that the SQL for the statement handle contains exactly two placeholders for a C
120             #pod clause. For example:
121             #pod
122             #pod my $update_stmt = q{
123             #pod UPDATE
124             #pod accounts a
125             #pod JOIN account_updates au USING (account_id)
126             #pod SET
127             #pod a.time_stamp = au.time_stamp
128             #pod WHERE
129             #pod a.account_id BETWEEN ? AND ? AND
130             #pod a.time_stamp != au.time_stamp
131             #pod });
132             #pod
133             #pod The C clause should, of course, match the IDs being used in the loop.
134             #pod
135             #pod The statement is ran with L for retry protection. Therefore, the
136             #pod statement should also be idempotent.
137             #pod
138             #pod =head3 Query DBI Processing
139             #pod
140             #pod If both a L and a L are passed, the statement handle is prepared and
141             #pod executed. Like the L mode, the SQL for the statement should
142             #pod contain exactly two placeholders for a C clause. Then the C<$sth> is passed to
143             #pod the coderef. It's up to the coderef to extract data from the executed statement handle,
144             #pod and do something with it.
145             #pod
146             #pod If C is enabled, each chunk is wrapped in a transaction and the coderef is
147             #pod called for each row in the chunk. In this case, the coderef is passed a hashref of the
148             #pod row instead of the executed C<$sth>, with lowercase alias names used as keys.
149             #pod
150             #pod Note that in both cases, the coderef execution is encapsulated in a L
151             #pod call to either C or C (using L), so any failures will
152             #pod re-connect and retry the coderef. Because of this, any changes you make within the
153             #pod coderef should be idempotent, or should at least be able to skip over any
154             #pod already-processed rows.
155             #pod
156             #pod =head3 DIY Processing
157             #pod
158             #pod If a L is passed but neither a C nor a C are passed, then the
159             #pod multiplier loop does not touch the database. The coderef is merely passed the start and
160             #pod end IDs for each chunk. It is expected that the coderef will run through all database
161             #pod operations using those start and end points.
162             #pod
163             #pod It's still valid to include L, L, and/or L in the
164             #pod constructor to enable features like L or
165             #pod L.
166             #pod
167             #pod If you're not going to include any min/max statements for L, you will
168             #pod need to set L and L yourself, either in the constructor or before the
169             #pod L call. Using L is also not an option in this case, as
170             #pod this tries to call L without a way to do so.
171             #pod
172             #pod =head3 TL;DR Version
173             #pod
174             #pod $stmt = Active DBI Processing
175             #pod $stmt + $coderef = Query DBI Processing | $bc->$coderef($executed_sth)
176             #pod $stmt + $coderef + single_rows=>1 = Query DBI Processing | $bc->$coderef($row_hashref)
177             #pod $rs + $coderef = DBIC Processing | $bc->$coderef($chunk_rs)
178             #pod $rs + $coderef + single_rows=>1 = DBIC Processing | $bc->$coderef($result)
179             #pod $coderef = DIY Processing | $bc->$coderef($start, $end)
180             #pod
181             #pod =head1 ATTRIBUTES
182             #pod
183             #pod See the L section for more in-depth descriptions of these attributes and their
184             #pod usage.
185             #pod
186             #pod =head2 DBIC Processing Attributes
187             #pod
188             #pod =head3 rs
189             #pod
190             #pod A L. This is used by all methods as the base ResultSet onto which
191             #pod the DB changes will be applied. Required for DBIC processing.
192             #pod
193             #pod =cut
194              
195             has rs => (
196             is => 'ro',
197             isa => InstanceOf['DBIx::Class::ResultSet'],
198             required => 0,
199             );
200              
201             #pod =head3 rsc
202             #pod
203             #pod A L. This is only used to override L for min/max
204             #pod calculations. Optional.
205             #pod
206             #pod =cut
207              
208             has rsc => (
209             is => 'ro',
210             isa => InstanceOf['DBIx::Class::ResultSetColumn'],
211             required => 0,
212             );
213              
214             #pod =head3 dbic_retry_opts
215             #pod
216             #pod A hashref of DBIC retry options. These options control how retry protection works within
217             #pod DBIC. So far, there are two supported options:
218             #pod
219             #pod max_attempts = Number of times to retry
220             #pod retry_handler = Coderef that returns true to continue to retry or false to re-throw
221             #pod the last exception
222             #pod
223             #pod The default is to let the DBIC storage engine handle its own protection, which will retry
224             #pod once if the DB connection was disconnected. If you specify any options, even a blank
225             #pod hashref, BatchChunker will fill in a default C of 10, and an always-true
226             #pod C. This is similar to L's defaults.
227             #pod
228             #pod Under the hood, these are options that are passed to the as-yet-undocumented
229             #pod L. The C has access to the same
230             #pod BlockRunner object (passed as its only argument) and its methods/accessors, such as C,
231             #pod C, and C.
232             #pod
233             #pod =cut
234              
235             has dbic_retry_opts => (
236             is => 'ro',
237             isa => HashRef,
238             required => 0,
239             predicate => '_has_dbic_retry_opts',
240             );
241              
242             sub _dbic_block_runner {
243 675     675   1704 my ($self, $method, $coderef) = @_;
244              
245 675         1433 my $storage = $self->dbic_storage;
246              
247             # Block running disabled
248 675 100       2276 unless ($self->_has_dbic_retry_opts) {
249 510 100       1369 return $storage->txn_do($coderef) if $method eq 'txn';
250 434         2472 return $storage->dbh_do($coderef);
251             }
252              
253             # A very light wrapper around BlockRunner. No need to load BlockRunner, since DBIC
254             # loads it in before us if we're using this method.
255             DBIx::Class::Storage::BlockRunner->new(
256             # in case they are not defined with a custom dbic_retry_opts
257             max_attempts => 10,
258 102     102   62639 retry_handler => sub { 1 },
259              
260             # never overrides the important ones below
261 165 100       526 %{ $self->dbic_retry_opts },
  165         3513  
262              
263             storage => $storage,
264             wrap_txn => ($method eq 'txn' ? 1 : 0),
265             )->run($coderef);
266             }
267              
268             #pod =head2 DBI Processing Attributes
269             #pod
270             #pod =head3 dbi_connector
271             #pod
272             #pod A L object. Instead of L statement handles, this is the
273             #pod recommended way for BatchChunker to interface with the DBI, as it handles retries on
274             #pod failures. The connection mode used is whatever default is set within the object.
275             #pod
276             #pod Required for DBI Processing, unless L is specified.
277             #pod
278             #pod =cut
279              
280             has dbi_connector => (
281             is => 'ro',
282             isa => InstanceOf['DBIx::Connector::Retry'],
283             required => 0,
284             );
285              
286             #pod =head3 dbic_storage
287             #pod
288             #pod A DBIC storage object, as an alternative for L. There may be times when
289             #pod you want to run plain DBI statements, but are still using DBIC. In these cases, you
290             #pod don't have to create a L object to run those statements.
291             #pod
292             #pod This uses a BlockRunner object for retry protection, so the options in
293             #pod L would apply here.
294             #pod
295             #pod Required for DBI Processing, unless L is specified.
296             #pod
297             #pod =cut
298              
299             has dbic_storage => (
300             is => 'ro',
301             isa => InstanceOf['DBIx::Class::Storage::DBI'],
302             required => 0,
303             );
304              
305             #pod =head3 min_stmt
306             #pod
307             #pod =head3 max_stmt
308             #pod
309             #pod SQL statement strings or an arrayref of parameters for L.
310             #pod
311             #pod When executed, these statements should each return a single value, either the minimum or
312             #pod maximum ID that will be affected by the DB changes. These are used by
313             #pod L. Required if using either type of DBI Processing.
314             #pod
315             #pod =cut
316              
317             my $SQLStringOrSTHArgs_type = Type::Utils::declare(
318             name => 'SQLStringOrSTHArgs',
319             # Allow an SQL string, an optional hashref/undef, and any number of strings/undefs
320             parent => Tuple->parameterize(Str, Optional[Maybe[HashRef]], slurpy ArrayRef[Maybe[Str]]),
321             coercion => sub { $_ = [ $_ ] if Str->check($_); $_ },
322             message => sub { 'Must be either an SQL string or an arrayref of parameters for $sth creation (SQL + hashref/undef + binds)' },
323             );
324              
325             has min_stmt => (
326             is => 'ro',
327             isa => $SQLStringOrSTHArgs_type,
328             required => 0,
329             coerce => 1,
330             );
331              
332             has max_stmt => (
333             is => 'ro',
334             isa => $SQLStringOrSTHArgs_type,
335             required => 0,
336             coerce => 1,
337             );
338              
339             #pod =head3 stmt
340             #pod
341             #pod A SQL statement string or an arrayref of parameters for L + binds.
342             #pod
343             #pod If using L (no coderef), this is a L statement
344             #pod (usually DML like C). If using L (with
345             #pod coderef), this is a passive DQL (C
346             #pod
347             #pod In either case, the statement should contain C placeholders, which will be
348             #pod executed with the start/end ID points. If there are already bind placeholders in the
349             #pod arrayref, then make sure the C bind points are last on the list.
350             #pod
351             #pod Required for DBI Processing.
352             #pod
353             #pod =cut
354              
355             has stmt => (
356             is => 'ro',
357             isa => $SQLStringOrSTHArgs_type,
358             required => 0,
359             coerce => 1,
360             );
361              
362             #pod =head3 count_stmt
363             #pod
364             #pod A C
365             #pod L.
366             #pod
367             #pod Like L, it should contain C placeholders. In fact, the SQL should look
368             #pod exactly like the L query, except with C instead of the column list.
369             #pod
370             #pod Used only for L. Optional, but recommended for
371             #pod L.
372             #pod
373             #pod =cut
374              
375             has count_stmt => (
376             is => 'ro',
377             isa => $SQLStringOrSTHArgs_type,
378             required => 0,
379             coerce => 1,
380             );
381              
382             #pod =head2 Progress Bar Attributes
383             #pod
384             #pod =head3 progress_bar
385             #pod
386             #pod The progress bar used for all methods. This can be specified right before the method
387             #pod call to override the default used for that method. Unlike most attributes, this one
388             #pod is read-write, so it can be switched on-the-fly.
389             #pod
390             #pod Don't forget to remove or switch to a different progress bar if you want to use a
391             #pod different one for another method:
392             #pod
393             #pod $batch_chunker->progress_bar( $calc_pb );
394             #pod $batch_chunker->calculate_ranges;
395             #pod $batch_chunker->progress_bar( $loop_pb );
396             #pod $batch_chunker->execute;
397             #pod
398             #pod All of this is optional. If the progress bar isn't specified, the method will create
399             #pod a default one. If the terminal isn't interactive, the default L will
400             #pod be set to C to naturally skip the output.
401             #pod
402             #pod =cut
403              
404             has progress_bar => (
405             is => 'rw',
406             isa => InstanceOf['Term::ProgressBar'],
407             );
408              
409             #pod =head3 progress_name
410             #pod
411             #pod A string used by L to assist in creating a progress bar. Ignored if
412             #pod L is already specified.
413             #pod
414             #pod This is the preferred way of customizing the progress bar without having to create one
415             #pod from scratch.
416             #pod
417             #pod =cut
418              
419             has progress_name => (
420             is => 'rw',
421             isa => Str,
422             required => 0,
423             lazy => 1,
424             default => sub {
425             my $rs = shift->rs;
426             'Processing'.(defined $rs ? ' '.$rs->result_source->name : '');
427             },
428             );
429              
430             #pod =head3 cldr
431             #pod
432             #pod A L object. English speakers that use a typical C<1,234.56> format would
433             #pod probably want to leave it at the default. Otherwise, you should provide your own.
434             #pod
435             #pod =cut
436              
437             has cldr => (
438             is => 'rw',
439             isa => InstanceOf['CLDR::Number'],
440             required => 0,
441             lazy => 1,
442             default => sub { CLDR::Number->new(locale => 'en') },
443             );
444              
445             #pod =head3 debug
446             #pod
447             #pod Boolean. If turned on, displays timing stats on each chunk, as well as total numbers.
448             #pod
449             #pod =cut
450              
451             has debug => (
452             is => 'rw',
453             isa => Bool,
454             required => 0,
455             default => 0,
456             );
457              
458             #pod =head2 Common Attributes
459             #pod
460             #pod =head3 id_name
461             #pod
462             #pod The column name used as the iterator in the processing loops. This should be a primary
463             #pod key or integer-based (indexed) key, tied to the L.
464             #pod
465             #pod Optional. Used mainly in DBIC processing. If not specified, it will look up
466             #pod the first primary key column from L and use that.
467             #pod
468             #pod This can still be specified for other processing modes to use in progress bars.
469             #pod
470             #pod =cut
471              
472             has id_name => (
473             is => 'rw',
474             isa => Str,
475             required => 0,
476             trigger => \&_fix_id_name,
477             );
478              
479             sub _fix_id_name {
480 59     59   5879 my ($self, $id_name) = @_;
481 59 100 100     613 return if !$id_name || $id_name =~ /\./ || !defined $self->rs; # prevent an infinite trigger loop
      100        
482 11         125 $self->id_name( $self->rs->current_source_alias.".$id_name" );
483             }
484              
485             #pod =head3 coderef
486             #pod
487             #pod The coderef that will be called either on each chunk or each row, depending on how
488             #pod L is set. The first input is always the BatchChunker object. The rest
489             #pod vary depending on the processing mode:
490             #pod
491             #pod $stmt + $coderef = Query DBI Processing | $bc->$coderef($executed_sth)
492             #pod $stmt + $coderef + single_rows=>1 = Query DBI Processing | $bc->$coderef($row_hashref)
493             #pod $rs + $coderef = DBIC Processing | $bc->$coderef($chunk_rs)
494             #pod $rs + $coderef + single_rows=>1 = DBIC Processing | $bc->$coderef($result)
495             #pod $coderef = DIY Processing | $bc->$coderef($start, $end)
496             #pod
497             #pod The loop does not monitor the return values from the coderef.
498             #pod
499             #pod Required for all processing modes except L.
500             #pod
501             #pod =cut
502              
503             has coderef => (
504             is => 'ro',
505             isa => CodeRef,
506             required => 0,
507             );
508              
509             #pod =head3 chunk_size
510             #pod
511             #pod The amount of rows to be processed in each loop.
512             #pod
513             #pod Default is 1000 rows. This figure should be sized to keep per-chunk processing time
514             #pod at around 5 seconds. If this is too large, rows may lock for too long. If it's too
515             #pod small, processing may be unnecessarily slow.
516             #pod
517             #pod =cut
518              
519             has chunk_size => (
520             is => 'rw',
521             isa => PositiveInt,
522             required => 0,
523             default => 1000,
524             );
525              
526             #pod =head3 target_time
527             #pod
528             #pod The target runtime (in seconds) that chunk processing should strive to achieve, not
529             #pod including L. If the chunk processing times are too high or too low, this will
530             #pod dynamically adjust L to try to match the target.
531             #pod
532             #pod B!> If the starting chunk
533             #pod size is grossly inaccurate to the workload, you could end up with several chunks in the
534             #pod beginning causing long-lasting locks before the runtime targeting reduces them down to a
535             #pod reasonable size.
536             #pod
537             #pod Default is 5 seconds. Set this to zero to turn off runtime targeting. (This was
538             #pod previously defaulted to off prior to v0.92, and set to 15 in v0.92.)
539             #pod
540             #pod =cut
541              
542             has target_time => (
543             is => 'ro',
544             isa => PositiveOrZeroNum,
545             required => 0,
546             default => 5,
547             );
548              
549             #pod =head3 sleep
550             #pod
551             #pod The number of seconds to sleep after each chunk. It uses L's version, so
552             #pod fractional numbers are allowed.
553             #pod
554             #pod Default is 0, which is fine for most operations. But, it is highly recommended to turn
555             #pod this on (say, 1 to 5 seconds) for really long one-off DB operations, especially if a lot
556             #pod of disk I/O is involved. Without this, there's a chance that the slaves will have a hard
557             #pod time keeping up, and/or the master won't have enough processing power to keep up with
558             #pod standard load.
559             #pod
560             #pod This will increase the overall processing time of the loop, so try to find a balance
561             #pod between the two.
562             #pod
563             #pod =cut
564              
565             has 'sleep' => (
566             is => 'ro',
567             isa => PositiveOrZeroNum,
568             required => 0,
569             default => 0,
570             );
571              
572             #pod =head3 process_past_max
573             #pod
574             #pod Boolean that controls whether to check past the L during the loop. If the loop
575             #pod hits the end point, it will run another maximum ID check in the DB, and adjust C
576             #pod accordingly. If it somehow cannot run a DB check (no L or L available,
577             #pod for example), the last chunk will just be one at the end of C<< max_id + chunk_size >>.
578             #pod
579             #pod This is useful if the entire table is expected to be processed, and you don't want to
580             #pod miss any new rows that come up between L and the end of the loop.
581             #pod
582             #pod Turned off by default.
583             #pod
584             #pod =cut
585              
586             has process_past_max => (
587             is => 'ro',
588             isa => Bool,
589             required => 0,
590             default => 0,
591             );
592              
593             #pod =head3 single_rows
594             #pod
595             #pod Boolean that controls whether single rows are passed to the L or the chunk's
596             #pod ResultSets/statement handle is passed.
597             #pod
598             #pod Since running single-row operations in a DB is painfully slow (compared to bulk
599             #pod operations), this also controls whether the entire set of coderefs are encapsulated into
600             #pod a DB transaction. Transactionalizing the entire chunk brings the speed, and atomicity,
601             #pod back to what a bulk operation would be. (Bulk operations are still faster, but you can't
602             #pod do anything you want in a single DML statement.)
603             #pod
604             #pod Used only by L and L.
605             #pod
606             #pod =cut
607              
608             has single_rows => (
609             is => 'ro',
610             isa => Bool,
611             required => 0,
612             default => 0,
613             );
614              
615             #pod =head3 min_chunk_percent
616             #pod
617             #pod The minimum row count, as a percentage of L. This value is actually
618             #pod expressed in decimal form, i.e.: between 0 and 1.
619             #pod
620             #pod This value will be used to determine when to process, skip, or expand a block, based on
621             #pod a count query. The default is C<0.5> or 50%, which means that it will try to expand the
622             #pod block to a larger size if the row count is less than 50% of the chunk size. Zero-sized
623             #pod blocks will be skipped entirely.
624             #pod
625             #pod This "chunk resizing" is useful for large regions of the table that have been deleted, or
626             #pod when the incrementing ID has large gaps in it for other reasons. Wasting time on
627             #pod numerical gaps that span millions can slow down the processing considerably, especially
628             #pod if L is enabled.
629             #pod
630             #pod If this needs to be disabled, set this to 0. The maximum chunk percentage does not have
631             #pod a setting and is hard-coded at C<< 100% + min_chunk_percent >>.
632             #pod
633             #pod If DBIC processing isn't used, L is also required to enable chunk resizing.
634             #pod
635             #pod =cut
636              
637             has min_chunk_percent => (
638             is => 'ro',
639             isa => Type::Utils::declare(
640             name => 'PositiveZeroToOneNum',
641             parent => NumRange->parameterize(0, 1),
642             message => sub { 'Must be a number between 0 and 1' },
643             ),
644             required => 0,
645             default => 0.5,
646             );
647              
648             #pod =head3 min_id
649             #pod
650             #pod =head3 max_id
651             #pod
652             #pod Used by L to figure out the main start and end points. Calculated by
653             #pod L.
654             #pod
655             #pod Manually setting this is not recommended, as each database is different and the
656             #pod information may have changed between the DB change development and deployment. Instead,
657             #pod use L to fill in these values right before running the loop.
658             #pod
659             #pod =cut
660              
661             has min_id => (
662             is => 'rw',
663             isa => UnsignedInt,
664             );
665              
666             has max_id => (
667             is => 'rw',
668             isa => UnsignedInt,
669             );
670              
671             # Big number handling
672             has _use_bignums => (
673             is => 'rw',
674             isa => Bool,
675             default => 0,
676             trigger => \&_upgrade_attrs_to_bigint,
677             );
678              
679             my @BIGNUM_BC_ATTRS = (qw< chunk_size min_id max_id >);
680             my @BIGNUM_LS_ATTRS = (qw< start end prev_end multiplier_range multiplier_step chunk_size chunk_count >);
681              
682             sub _check_bignums {
683 694     694   1643 my ($self) = shift;
684 694 100       13841 return 1 if $self->_use_bignums; # already checked these
685              
686             # Auto-set _use_bignums if we detect that we need it
687 619         4479 my $set_bignums = 0;
688              
689             # If other values are passed, check those, too
690 619         1471 foreach my $val (@_) {
691 585 100       2235 next unless defined $val;
692 424 50 33     2619 $set_bignums = 1 if blessed $val || !PerlSafeInt->check($val);
693             }
694              
695             # Check BatchChunker attributes
696 619         10541 foreach my $attr (@BIGNUM_BC_ATTRS) {
697 1857         45516 my $val = $self->$attr();
698 1857 100       11884 next unless defined $val;
699 1739 100 66     5991 $set_bignums = 1 if blessed $val || !PerlSafeInt->check($val);
700             }
701              
702             # Check LoopState attributes
703 619 100       15184 if (my $ls = $self->loop_state) {
704 533         3672 foreach my $attr (@BIGNUM_LS_ATTRS) {
705 3731         93011 my $val = $ls->$attr();
706 3731 100       24381 next unless defined $val;
707 3198 100 66     8374 $set_bignums = 1 if blessed $val || !PerlSafeInt->check($val);
708             }
709             }
710              
711 619 100       8910 $self->_use_bignums(1) if $set_bignums;
712 619         1957 return $set_bignums;
713             }
714              
715             sub _upgrade_attrs_to_bigint {
716 3     3   82 my ($self, $is_on) = @_;
717 3 50       8 return unless $is_on;
718              
719             # Fix BatchChunker attributes
720 3         10 foreach my $attr (@BIGNUM_BC_ATTRS) {
721 9         1367 my $val = $self->$attr();
722 9 100       51 next unless defined $val; # nothing to upgrade
723 5 100       16 next if blessed $val; # already upgraded
724 3         15 $self->$attr( Math::BigInt->new($val) );
725             }
726              
727             # Fix LoopState attributes
728 3         689 my $ls = $self->loop_state;
729 3 100       21 return unless $ls;
730 1         3 foreach my $attr (@BIGNUM_LS_ATTRS) {
731 7         4499 my $val = $ls->$attr();
732 7 100       50 next unless defined $val; # nothing to upgrade
733 6 50       19 next if blessed $val; # already upgraded
734 6         16 $ls->$attr( Math::BigInt->new($val) );
735             }
736             }
737              
738             #pod =head3 loop_state
739             #pod
740             #pod A L object designed to hold variables during the
741             #pod processing loop. The object will be cleared out after use. Most of the complexity is
742             #pod needed for chunk resizing.
743             #pod
744             #pod =cut
745              
746             has loop_state => (
747             is => 'rw',
748             isa => InstanceOf['DBIx::BatchChunker::LoopState'],
749             required => 0,
750             init_arg => undef,
751             clearer => 'clear_loop_state',
752             );
753              
754             # Backwards-compatibility
755             *_loop_state = \&loop_state;
756              
757             around BUILDARGS => sub {
758             my $next = shift;
759             my $class = shift;
760              
761             my %args = @_ == 1 ? %{ $_[0] } : @_;
762              
763             # Auto-building of rsc and id_name can be a weird dependency dance, so it's better to
764             # handle it here.
765             my ($rsc, $rs, $id_name) = @args{qw< rsc rs id_name >};
766             if (defined $rsc && !$id_name) {
767             $args{id_name} = $rsc->{_as};
768             }
769             elsif (!defined $rsc && $id_name && defined $rs) {
770             $args{rsc} = $rs->get_column( $args{id_name} );
771             }
772             elsif (!defined $rsc && !$id_name && defined $rs) {
773             $args{id_name} = ($rs->result_source->primary_columns)[0];
774             $args{rsc} = $rs->get_column( $args{id_name} );
775             }
776             $rsc = $args{rsc};
777              
778             # Auto-add dbic_storage, if available
779             if (!defined $args{dbic_storage} && (defined $rs || defined $rsc)) {
780             $args{dbic_storage} = defined $rs ? $rs->result_source->storage : $rsc->_resultset->result_source->storage;
781             }
782              
783             # Find something to use as a dbi_connector, if it doesn't already exist
784             my @old_attrs = qw< sth min_sth max_sth count_sth >;
785             my @new_attrs = map { my $k = $_; $k =~ s/sth$/stmt/; $k } @old_attrs;
786             my $example_key = first { $args{$_} } @old_attrs;
787             if ($example_key && !defined $args{dbi_connector}) {
788             warn join "\n",
789             'The sth/*_sth options are now considered legacy usage in DBIx::BatchChunker. Because there is no',
790             'way to re-acquire the password, any attempt to reconnect will fail. Please use dbi_connector and',
791             'stmt/*_stmt instead for reconnection support.',
792             ''
793             ;
794              
795             # NOTE: There was a way to monkey-patch _connect to use $dbh->clone, but I've considered it
796             # too intrusive of a solution to use. Better to demand that the user switch to the new
797             # attributes, but have something that still works in most cases.
798              
799             # Attempt to build some sort of Connector object
800             require DBIx::Connector::Retry;
801             my $dbh = $args{$example_key}->{Database};
802              
803             my $conn = DBIx::Connector::Retry->new(
804             connect_info => [
805             join(':', 'dbi', $dbh->{Driver}{Name}, $dbh->{Name}),
806             $dbh->{Username},
807             '', # XXX: Can't acquire the password
808             # Sane %attr defaults on the off-chance that it actually re-connects
809             { AutoCommit => 1, RaiseError => 1 },
810             ],
811              
812             # Do not disconnect on DESTROY. The $dbh might still be used post-run.
813             disconnect_on_destroy => 0,
814             );
815              
816             # Pretend $conn->_connect was called and store our pre-existing $dbh
817             $conn->{_pid} = $$;
818             $conn->{_tid} = threads->tid if $INC{'threads.pm'};
819             $conn->{_dbh} = $dbh;
820             $conn->driver;
821              
822             $args{dbi_connector} = $conn;
823             }
824              
825             # Handle legacy options for sth/*_sth
826             foreach my $old_attr (grep { $args{$_} } @old_attrs) {
827             my $new_attr = $old_attr;
828             $new_attr =~ s/sth$/stmt/;
829              
830             my $sth = delete $args{$old_attr};
831             $args{$new_attr} ||= [ $sth->{Statement} ];
832             }
833              
834             # Now check to make sure dbi_connector is available for DBI processing
835             die 'DBI processing requires a dbi_connector or dbic_storage attribute!' if (
836             !(defined $args{dbi_connector} || defined $args{dbic_storage}) &&
837             (defined first { $args{$_} } @new_attrs)
838             );
839              
840             # Other sanity checks
841             die 'Range calculations require one of these attr sets: rsc, rs, or dbi_connector|dbic_storage + min_stmt + max_stmt' unless (
842             defined $args{rsc} ||
843             (defined $args{min_stmt} && defined $args{max_stmt}) ||
844             (!defined $args{dbi_connector} && !defined $args{dbic_storage} && defined $args{coderef}) # DIY mode is exempt
845             );
846              
847             die 'Block execution requires one of these attr sets: dbi_connector|dbic_storage + stmt, rs + coderef, or coderef' unless (
848             $args{stmt} ||
849             (defined $args{rs} && $args{coderef}) ||
850             $args{coderef}
851             );
852              
853             $class->$next( %args );
854             };
855              
856             sub BUILD {
857 32     32 0 10424 my $self = shift;
858             # Make sure id_name gets fixed at the right time
859 32         533 $self->_fix_id_name( $self->id_name );
860 32         104 $self->_check_bignums;
861             }
862              
863             #pod =head1 CONSTRUCTORS
864             #pod
865             #pod See L for information on what can be passed into these constructors.
866             #pod
867             #pod =head2 new
868             #pod
869             #pod my $batch_chunker = DBIx::BatchChunker->new(...);
870             #pod
871             #pod A standard object constructor. If you use this constructor, you will need to
872             #pod manually call L and L to execute the DB changes.
873             #pod
874             #pod =head2 construct_and_execute
875             #pod
876             #pod my $batch_chunker = DBIx::BatchChunker->construct_and_execute(...);
877             #pod
878             #pod Constructs a DBIx::BatchChunker object and automatically calls
879             #pod L and L on it. Anything passed to this method will be passed
880             #pod through to the constructor.
881             #pod
882             #pod Returns the constructed object, post-execution. This is typically only useful if you want
883             #pod to inspect the attributes after the process has finished. Otherwise, it's safe to just
884             #pod ignore the return and throw away the object immediately.
885             #pod
886             #pod =cut
887              
888             sub construct_and_execute {
889 4     4 1 12049 my $class = shift;
890 4         94 my $db_change = $class->new(@_);
891              
892 3         14 $db_change->calculate_ranges;
893 3         10 $db_change->execute;
894              
895 3         80 return $db_change;
896             }
897              
898             #pod =head1 METHODS
899             #pod
900             #pod =head2 calculate_ranges
901             #pod
902             #pod my $batch_chunker = DBIx::BatchChunker->new(
903             #pod rsc => $account_rsc, # a ResultSetColumn
904             #pod ### OR ###
905             #pod rs => $account_rs, # a ResultSet
906             #pod id_name => 'account_id', # can be looked up if not provided
907             #pod ### OR ###
908             #pod dbi_connector => $conn, # DBIx::Connector::Retry object
909             #pod min_stmt => $min_stmt, # a SQL statement or DBI $sth args
910             #pod max_stmt => $max_stmt, # ditto
911             #pod
912             #pod ### Optional but recommended ###
913             #pod id_name => 'account_id', # will also be added into the progress bar title
914             #pod chunk_size => 20_000, # default is 1000
915             #pod
916             #pod ### Optional ###
917             #pod progress_bar => $progress, # defaults to a 2-count 'Calculating ranges' bar
918             #pod
919             #pod # ...other attributes for execute...
920             #pod );
921             #pod
922             #pod my $has_data_to_process = $batch_chunker->calculate_ranges;
923             #pod
924             #pod Given a L, L, or L statement
925             #pod argument set, this method calculates the min/max IDs of those objects. It fills in the
926             #pod L and L attributes, based on the ID data, and then returns 1.
927             #pod
928             #pod If either of the min/max statements don't return any ID data, this method will return 0.
929             #pod
930             #pod =cut
931              
932             sub calculate_ranges {
933 28     28 1 8481 my $self = shift;
934              
935 28   100     484 my $column_name = $self->id_name || '';
936 28         250 $column_name =~ s/^\w+\.//;
937              
938 28   33     422 my $progress = $self->progress_bar || Term::ProgressBar->new({
939             name => 'Calculating ranges'.($column_name ? " for $column_name" : ''),
940             count => 2,
941             ETA => 'linear',
942             silent => !(-t *STDERR && -t *STDIN), # STDERR is what {fh} is set to use
943             });
944              
945             # Actually run the statements
946 28         48452 my ($min_id, $max_id);
947 28 100       230 if ($self->rsc) {
    100          
948             $self->_dbic_block_runner( run => sub {
949             # In case the sub is retried
950 14     14   3405 $progress->update(0);
951              
952 14         574 $min_id = $self->rsc->min;
953 14         37915 $progress->update(1);
954              
955 14         672 $max_id = $self->rsc->max;
956 14         24206 $progress->update(2);
957 14         113 });
958             }
959             elsif ($self->dbic_storage) {
960             $self->_dbic_block_runner( run => sub {
961 5     5   1438 my $dbh = $self->dbic_storage->dbh;
962              
963             # In case the sub is retried
964 5         2092 $progress->update(0);
965              
966 5         218 ($min_id) = $dbh->selectrow_array(@{ $self->min_stmt });
  5         67  
967 5         1056 $progress->update(1);
968              
969 5         215 ($max_id) = $dbh->selectrow_array(@{ $self->max_stmt });
  5         47  
970 5         637 $progress->update(2);
971 5         58 });
972             }
973             else {
974             $self->dbi_connector->run(sub {
975 9     9   1530 my $dbh = $_;
976              
977             # In case the sub is retried
978 9         33 $progress->update(0);
979              
980 9         347 ($min_id) = $dbh->selectrow_array(@{ $self->min_stmt });
  9         107  
981 9         2194 $progress->update(1);
982              
983 9         431 ($max_id) = $dbh->selectrow_array(@{ $self->max_stmt });
  9         85  
984 9         1003 $progress->update(2);
985 9         232 });
986             }
987              
988             # Set the ranges and return
989 28 50 33     3042 return 0 unless defined $min_id && defined $max_id;
990              
991             # This would be the primary spot where we notice we need to upgrade, so check the values before
992             # we attempt to mangle them.
993 28 100       121 if ($self->_check_bignums($min_id, $max_id)) {
994 2         36 $min_id = Math::BigFloat->new($min_id)->as_int;
995 2         527 $max_id = Math::BigFloat->new($max_id)->as_int;
996             }
997             else {
998 26         53 $min_id = int $min_id;
999 26         46 $max_id = int $max_id;
1000             }
1001              
1002 28         1375 $self->min_id($min_id);
1003 28         2705 $self->max_id($max_id);
1004              
1005 28         2541 return 1;
1006             }
1007              
1008             #pod =head2 execute
1009             #pod
1010             #pod my $batch_chunker = DBIx::BatchChunker->new(
1011             #pod # ...other attributes for calculate_ranges...
1012             #pod
1013             #pod dbi_connector => $conn, # DBIx::Connector::Retry object
1014             #pod stmt => $do_stmt, # INSERT/UPDATE/DELETE $stmt with BETWEEN placeholders
1015             #pod ### OR ###
1016             #pod dbi_connector => $conn, # DBIx::Connector::Retry object
1017             #pod stmt => $select_stmt, # SELECT $stmt with BETWEEN placeholders
1018             #pod count_stmt => $count_stmt, # SELECT COUNT $stmt to be used for min_chunk_percent; optional
1019             #pod coderef => $coderef, # called code that does the actual work
1020             #pod ### OR ###
1021             #pod rs => $account_rs, # base ResultSet, which gets filtered with -between later on
1022             #pod id_name => 'account_id', # can be looked up if not provided
1023             #pod coderef => $coderef, # called code that does the actual work
1024             #pod ### OR ###
1025             #pod coderef => $coderef, # DIY database work; just pass the $start/$end IDs
1026             #pod
1027             #pod ### Optional but recommended ###
1028             #pod sleep => 0.25, # number of seconds to sleep each chunk; defaults to 0
1029             #pod process_past_max => 1, # use this if processing the whole table
1030             #pod single_rows => 1, # does $coderef get a single $row or the whole $chunk_rs / $stmt
1031             #pod min_chunk_percent => 0.25, # minimum row count of chunk size percentage; defaults to 0.5 (or 50%)
1032             #pod target_time => 5, # target runtime for dynamic chunk size scaling; default is 5 seconds
1033             #pod
1034             #pod progress_name => 'Updating Accounts', # easier than creating your own progress_bar
1035             #pod
1036             #pod ### Optional ###
1037             #pod progress_bar => $progress, # defaults to "Processing $source_name" bar
1038             #pod debug => 1, # displays timing stats on each chunk
1039             #pod );
1040             #pod
1041             #pod $batch_chunker->execute if $batch_chunker->calculate_ranges;
1042             #pod
1043             #pod Applies the configured DB changes in chunks. Runs through the loop, processing a
1044             #pod statement handle, ResultSet, and/or coderef as it goes. Each loop iteration processes a
1045             #pod chunk of work, determined by L.
1046             #pod
1047             #pod The L method should be run first to fill in L and L.
1048             #pod If either of these are missing, the function will assume L couldn't
1049             #pod find them and warn about it.
1050             #pod
1051             #pod More details can be found in the L and L sections.
1052             #pod
1053             #pod =cut
1054              
1055             sub execute {
1056 30     30 1 22263 my $self = shift;
1057 30         96 $self->_check_bignums;
1058              
1059 30         57 my $count;
1060 30 100 66     417 if (defined $self->min_id && defined $self->max_id) {
1061 29         1042 $count = $self->max_id - $self->min_id + 1;
1062             }
1063              
1064             # Fire up the progress bar
1065 30   33     1479 my $progress = $self->progress_bar || Term::ProgressBar->new({
1066             name => $self->progress_name,
1067             count => $count || 1,
1068             ETA => 'linear',
1069             silent => !(-t *STDERR && -t *STDIN), # STDERR is what {fh} is set to use
1070             });
1071              
1072 30 100       57487 unless ($count) {
1073 1         5 $progress->message('No chunks; nothing to process...');
1074 1         35 return;
1075             }
1076              
1077 29 50       626 if ($self->debug) {
1078             $progress->message(
1079             sprintf "(%s total chunks; %s total rows)",
1080 0         0 map { $self->cldr->decimal_formatter->format($_) } ( ceil($count / $self->chunk_size), $count)
  0         0  
1081             );
1082             }
1083              
1084             # Loop state setup
1085 29         657 $self->clear_loop_state;
1086 29         610 my $ls = $self->loop_state( DBIx::BatchChunker::LoopState->new({
1087             batch_chunker => $self,
1088             progress_bar => $progress,
1089             }) );
1090              
1091             # Da loop
1092 29   66     3129 while ($ls->prev_end < $self->max_id || $ls->start) {
1093 598         74532 $ls->multiplier_range($ls->multiplier_range + $ls->multiplier_step);
1094 598 100       87022 $ls->start ($ls->prev_end + 1) unless defined $ls->start; # this could be already set because of early 'next' calls
1095 598         77846 $ls->end(
1096             min(
1097             $ls->start + ceil($ls->multiplier_range * $ls->chunk_size) - 1, # ceil, because multiplier_* could be fractional
1098             $self->max_id, # ensure we never exceed max_id
1099             )
1100             );
1101 598         177176 $ls->chunk_count (undef);
1102              
1103 598 50       15264 next unless $self->_process_past_max_checker;
1104              
1105             # The actual DB processing
1106 598 100       4244 next unless $self->_process_block;
1107              
1108             # Record the time quickly
1109 499         26637 $ls->prev_runtime(time - $ls->timer);
1110              
1111             # Give the DB a little bit of breathing room
1112 499 100       8352077 sleep $self->sleep if $self->sleep;
1113              
1114 499         3070 $self->_print_debug_status('processed');
1115 499         5205 $self->_increment_progress;
1116 499         29188 $self->_runtime_checker;
1117              
1118             # End-of-loop activities (skipped by early next)
1119 499         3058 $ls->_reset_chunk_state;
1120             }
1121 29         4148 $self->clear_loop_state;
1122              
1123             # Keep the finished time from the progress bar, in case there are other loops or output
1124 29 50       776 unless ($progress->silent) {
1125 0         0 $progress->update( $progress->target );
1126 0         0 print "\n";
1127             }
1128             }
1129              
1130             #pod =head1 PRIVATE METHODS
1131             #pod
1132             #pod =head2 _process_block
1133             #pod
1134             #pod Runs the DB work and passes it to the coderef. Its return value determines whether the
1135             #pod block should be processed or not.
1136             #pod
1137             #pod =cut
1138              
1139             sub _process_block {
1140 598     598   1291 my ($self) = @_;
1141              
1142 598         7672 my $ls = $self->loop_state;
1143 598         3766 my $conn = $self->dbi_connector;
1144 598         1577 my $coderef = $self->coderef;
1145 598         1556 my $rs = $self->rs;
1146              
1147             # Figure out if the row count is worth the work
1148 598         811 my $chunk_rs;
1149 598         1384 my $count_stmt = $self->count_stmt;
1150 598         889 my $chunk_count;
1151 598 100 100     3197 if ($count_stmt && defined $self->dbic_storage) {
    100          
    100          
1152             $self->_dbic_block_runner( run => sub {
1153 93 100   93   20795 $chunk_count = $self->dbic_storage->dbh->selectrow_array(
1154             @$count_stmt,
1155             (@$count_stmt == 1 ? undef : ()),
1156             $ls->start, $ls->end,
1157             );
1158 93         676 });
1159             }
1160             elsif ($count_stmt) {
1161             $chunk_count = $conn->run(sub {
1162 109 100   109   18943 $_->selectrow_array(
1163             @$count_stmt,
1164             (@$count_stmt == 1 ? undef : ()),
1165             $ls->start, $ls->end,
1166             );
1167 109         2261 });
1168             }
1169             elsif (defined $rs) {
1170 235         3597 $chunk_rs = $rs->search({
1171             $self->id_name => { -between => [$ls->start, $ls->end] },
1172             });
1173              
1174             $self->_dbic_block_runner( run => sub {
1175 235     235   51351 $chunk_count = $chunk_rs->count;
1176 235         106749 });
1177             }
1178              
1179 598 100       1035009 $chunk_count = Math::BigInt->new($chunk_count) if $self->_check_bignums($chunk_count);
1180 598         11710 $ls->chunk_count($chunk_count);
1181              
1182 598 100       60420 return unless $self->_chunk_count_checker;
1183              
1184             # NOTE: Try to minimize the amount of closures by using $self as much as possible
1185             # inside coderefs.
1186              
1187             # Do the work
1188 499 100 66     2591 if (my $stmt = $self->stmt) {
    100          
1189             ### Statement handle
1190 211 100       997 my @prepare_args = @$stmt > 2 ? @$stmt[0..1] : @$stmt;
1191 211 100       3516 my @execute_args = (
1192             (@$stmt > 2 ? @$stmt[2..$#$stmt] : ()),
1193             $ls->start, $ls->end,
1194             );
1195              
1196 211 100 66     5300 if ($self->single_rows && $coderef) {
1197             # Transactional work
1198 67 100       216 if ($self->dbic_storage) {
1199             $self->_dbic_block_runner( txn => sub {
1200 49     49   16250 $self->loop_state->_mark_timer; # reset timer on retries
1201              
1202 49         2620 my $sth = $self->dbic_storage->dbh->prepare(@prepare_args);
1203 49         17869 $sth->execute(@execute_args);
1204              
1205 49         2019 while (my $row = $sth->fetchrow_hashref('NAME_lc')) { $self->coderef->($self, $row) }
  73         35903  
1206 44         451 });
1207             }
1208             else {
1209             $conn->txn(sub {
1210 84     84   26193 $self->loop_state->_mark_timer; # reset timer on retries
1211              
1212 84         3371 my $sth = $_->prepare(@prepare_args);
1213 84         5912 $sth->execute(@execute_args);
1214              
1215 28         2673 while (my $row = $sth->fetchrow_hashref('NAME_lc')) { $self->coderef->($self, $row) }
  22         94  
1216 23         477 });
1217             }
1218             }
1219             else {
1220             # Bulk work (or DML)
1221 144 100       987 if ($self->dbic_storage) {
1222             $self->_dbic_block_runner( run => sub {
1223 97     97   29656 $self->loop_state->_mark_timer; # reset timer on retries
1224              
1225 97         4406 my $sth = $self->dbic_storage->dbh->prepare(@prepare_args);
1226 97         38767 $sth->execute(@execute_args);
1227              
1228 51 100       1620 $self->coderef->($self, $sth) if $self->coderef;
1229 51         478 });
1230             }
1231             else {
1232             $conn->run(sub {
1233 139     139   30485 $self->loop_state->_mark_timer; # reset timer on retries
1234              
1235 139         6904 my $sth = $_->prepare(@prepare_args);
1236 139         12303 $sth->execute(@execute_args);
1237              
1238 93 100       4689 $self->coderef->($self, $sth) if $self->coderef;
1239 93         2656 });
1240             }
1241             }
1242             }
1243             elsif (defined $rs && $coderef) {
1244             ### ResultSet with coderef
1245              
1246 227 100       744 if ($self->single_rows) {
1247             # Transactional work
1248             $self->_dbic_block_runner( txn => sub {
1249             # reset timer/$rs on retries
1250 83     83   24943 $self->loop_state->_mark_timer;
1251 83         3460 $chunk_rs->reset;
1252              
1253 83         14004 while (my $row = $chunk_rs->next) { $self->coderef->($self, $row) }
  76         108743  
1254 78         373 });
1255             }
1256             else {
1257             # Bulk work
1258             $self->_dbic_block_runner( run => sub {
1259             # reset timer/$rs on retries
1260 195     195   46473 $self->loop_state->_mark_timer;
1261 195         9693 $chunk_rs->reset;
1262              
1263 195         45819 $self->coderef->($self, $chunk_rs);
1264 149         910 });
1265             }
1266             }
1267             else {
1268             ### Something a bit more free-form
1269              
1270 61         849 $self->$coderef($ls->start, $ls->end);
1271             }
1272              
1273 499         17359479 return 1;
1274             }
1275              
1276             #pod =head2 _process_past_max_checker
1277             #pod
1278             #pod Checks to make sure the current endpoint is actually the end, by checking the database.
1279             #pod Its return value determines whether the block should be processed or not.
1280             #pod
1281             #pod See L.
1282             #pod
1283             #pod =cut
1284              
1285             sub _process_past_max_checker {
1286 598     598   1738 my ($self) = @_;
1287 598         8491 my $ls = $self->loop_state;
1288 598         10158 my $progress = $ls->progress_bar;
1289              
1290 598 100       5345 return 1 unless $self->process_past_max;
1291 80 100       983 return 1 unless $ls->end >= $self->max_id;
1292              
1293             # No checks for DIY, if they didn't include a max_stmt
1294 6 50 33     220 unless (defined $self->rsc || $self->max_stmt) {
1295             # There's no way to size this, so add one more chunk
1296 0         0 $ls->end($self->max_id + $ls->chunk_size);
1297 0         0 return 1;
1298             }
1299              
1300             # Run another MAX check
1301 6 50       97 $progress->message('Reached end; re-checking max ID') if $self->debug;
1302 6         40 my $new_max_id;
1303 6 50       22 if (defined( my $rsc = $self->rsc )) {
    0          
1304             $self->_dbic_block_runner( run => sub {
1305 6     6   1228 $new_max_id = $rsc->max;
1306 6         39 });
1307             }
1308             elsif ($self->dbic_storage) {
1309             $self->_dbic_block_runner( run => sub {
1310 0     0   0 ($new_max_id) = $self->dbic_storage->dbh->selectrow_array(@{ $self->max_stmt });
  0         0  
1311 0         0 });
1312             }
1313             else {
1314             ($new_max_id) = $self->dbi_connector->run(sub {
1315 0     0   0 $_->selectrow_array(@{ $self->max_stmt });
  0         0  
1316 0         0 });
1317             }
1318 6         11061 $ls->_mark_timer; # the above query shouldn't impact runtimes
1319              
1320             # Convert $new_max_id if necessary
1321 6 100       255 $new_max_id = Math::BigInt->new($new_max_id) if $self->_check_bignums($new_max_id);
1322              
1323 6 50 33     193 if (!$new_max_id || $new_max_id eq '0E0') {
    100          
    50          
1324             # No max: No affected rows to change
1325 0 0       0 $progress->message('No max ID found; nothing left to process...') if $self->debug;
1326 0         0 $ls->end($self->max_id);
1327              
1328 0         0 $ls->prev_check('no max');
1329 0         0 return 0;
1330             }
1331             elsif ($new_max_id > $self->max_id) {
1332             # New max ID
1333 2 50       139 $progress->message( sprintf 'New max ID set from %s to %s', $self->max_id, $new_max_id ) if $self->debug;
1334 2         37 $self->max_id($new_max_id);
1335 2         737 $progress->target( $new_max_id - $self->min_id + 1 );
1336 2         3406 $progress->update( $progress->last_update );
1337             }
1338             elsif ($new_max_id == $self->max_id) {
1339             # Same max ID
1340 4 50       337 $progress->message( sprintf 'Found max ID %s; same as end', $new_max_id ) if $self->debug;
1341             }
1342             else {
1343             # Max too low
1344 0 0       0 $progress->message( sprintf 'Found max ID %s; ignoring...', $new_max_id ) if $self->debug;
1345             }
1346              
1347             # Run another boundary check with the new max_id value
1348 6         382 $ls->end( min($ls->end, $self->max_id) );
1349              
1350 6         1687 return 1;
1351             }
1352              
1353             #pod =head2 _chunk_count_checker
1354             #pod
1355             #pod Checks the chunk count to make sure it's properly sized. If not, it will try to shrink
1356             #pod or expand the current chunk (in C increments) as necessary. Its return value
1357             #pod determines whether the block should be processed or not.
1358             #pod
1359             #pod See L.
1360             #pod
1361             #pod This is not to be confused with the L, which adjusts C
1362             #pod after processing, based on previous run times.
1363             #pod
1364             #pod =cut
1365              
1366             sub _chunk_count_checker {
1367 598     598   1280 my ($self) = @_;
1368 598         8591 my $ls = $self->loop_state;
1369 598         10285 my $progress = $ls->progress_bar;
1370              
1371             # Chunk sizing is essentially disabled, so bounce out of here
1372 598 100 100     8390 if ($self->min_chunk_percent <= 0 || !defined $ls->chunk_count) {
1373 429         7442 $ls->prev_check('disabled');
1374 429         12870 return 1;
1375             }
1376              
1377 169         3013 my $chunk_percent = $ls->chunk_count / $ls->chunk_size;
1378 169         37013 $ls->checked_count( $ls->checked_count + 1 );
1379              
1380 169 100 66     6723 if ($ls->chunk_count == 0 && $self->min_chunk_percent > 0) {
    100          
    50          
    100          
    100          
1381             # No rows: Skip the block entirely, and accelerate the stepping
1382 3         41 $self->_print_debug_status('skipped');
1383              
1384 3         24 $self->_increment_progress;
1385              
1386 3         160 my $step = $ls->multiplier_step;
1387 3         26 $ls->_reset_chunk_state;
1388 3         60 $ls->multiplier_step( $step * 2 );
1389              
1390 3         134 $ls->prev_check('skipped rows');
1391 3         126 return 0;
1392             }
1393             elsif ($chunk_percent > 1 + $self->min_chunk_percent) {
1394             # Too many rows: Backtrack to the previous range and try to bisect
1395 21         8497 $self->_print_debug_status('shrunk');
1396              
1397 21         231 $ls->_mark_timer;
1398              
1399             # If we have a min/max range, bisect down the middle. If not, walk back
1400             # to the previous range and decelerate the stepping, which should bring
1401             # it to a halfway point from this range and last.
1402 21         1221 my $lr = $ls->last_range;
1403 21 50 66     454 $lr->{max} = $ls->multiplier_range if !defined $lr->{max} || $ls->multiplier_range < $lr->{max};
1404 21   66     1167 $ls->multiplier_range( $lr->{min} || ($ls->multiplier_range - $ls->multiplier_step) );
1405             $ls->multiplier_step(
1406 21 100       12453 defined $lr->{min} ? ($lr->{max} - $lr->{min}) / 2 : $ls->multiplier_step / 2
1407             );
1408              
1409 21         31674 $ls->prev_check('too many rows');
1410 21         1330 return 0;
1411             }
1412              
1413             # The above two are more important than skipping the count checks. Better to
1414             # have too few rows than too many.
1415              
1416             elsif ($ls->checked_count > 10) {
1417             # Checked too many times: Just process it
1418 0         0 $ls->prev_check('too many checks');
1419 0         0 return 1;
1420             }
1421             elsif ($ls->end >= $self->max_id) {
1422             # At the end: Just process it
1423 11         1448 $ls->prev_check('at max_id');
1424 11         295 return 1;
1425             }
1426             elsif ($chunk_percent < $self->min_chunk_percent) {
1427             # Too few rows: Keep the start ID and accelerate towards a better endpoint
1428 75         5544 $self->_print_debug_status('expanded');
1429              
1430 75         634 $ls->_mark_timer;
1431              
1432             # If we have a min/max range, bisect down the middle. If not, keep
1433             # accelerating the stepping.
1434 75         4225 my $lr = $ls->last_range;
1435 75 50 66     1466 $lr->{min} = $ls->multiplier_range if !defined $lr->{min} || $ls->multiplier_range > $lr->{min};
1436             $ls->multiplier_step(
1437 75 50       1716 defined $lr->{max} ? ($lr->{max} - $lr->{min}) / 2 : $ls->multiplier_step * 2
1438             );
1439              
1440 75         4500 $ls->prev_check('too few rows');
1441 75         3337 return 0;
1442             }
1443              
1444 59         16903 $ls->prev_check('nothing wrong');
1445 59         1554 return 1;
1446             }
1447              
1448             #pod =head2 _runtime_checker
1449             #pod
1450             #pod Stores the previously processed chunk's runtime, and then adjusts C as
1451             #pod necessary.
1452             #pod
1453             #pod See L.
1454             #pod
1455             #pod =cut
1456              
1457             sub _runtime_checker {
1458 499     499   1057 my ($self) = @_;
1459 499         7814 my $ls = $self->loop_state;
1460 499 100       4021 return unless $self->target_time;
1461 110 50 33     1612 return unless $ls->chunk_size && $ls->prev_runtime; # prevent DIV/0
1462              
1463 110         5076 my $timings = $ls->last_timings;
1464              
1465 110   66     2262 my $new_timing = {
1466             runtime => $ls->prev_runtime,
1467             chunk_count => $ls->chunk_count || $ls->chunk_size,
1468             };
1469 110         14081 $new_timing->{chunk_per} = $new_timing->{chunk_count} / $ls->chunk_size;
1470              
1471             # Rowtime: a measure of how much of the chunk_size actually impacted the runtime
1472 110         25258 $new_timing->{rowtime} = $new_timing->{runtime} / $new_timing->{chunk_per};
1473              
1474             # Store the last five processing times
1475 110         21682 push @$timings, $new_timing;
1476 110 100       453 shift @$timings if @$timings > 5;
1477              
1478             # Figure out the averages and adjustment factor
1479 110         511 my $ttl = scalar @$timings;
1480 110         318 my $avg_rowtime = sum(map { $_->{rowtime} } @$timings) / $ttl;
  476         1447  
1481 110         32435 my $adjust_factor = $self->target_time / $avg_rowtime;
1482              
1483 110         19158 my $new_target_chunk_size = $ls->chunk_size;
1484 110         634 my $adjective;
1485 110 100       550 if ($adjust_factor > 1.05) {
    50          
1486             # Too fast: Raise the chunk size
1487              
1488 24 100       79 return unless $ttl >= 5; # must have a full set of timings
1489 4 50   20   26 return if any { $_->{runtime} >= $self->target_time } @$timings; # must ALL have low runtimes
  20         41  
1490              
1491 4         21 $new_target_chunk_size *= min(2, $adjust_factor); # never more than double
1492 4         9 $adjective = 'fast';
1493             }
1494             elsif ($adjust_factor < 0.95) {
1495             # Too slow: Lower the chunk size
1496              
1497 86 100       14749 return unless $ls->prev_runtime > $self->target_time; # last runtime must actually be too high
1498              
1499 65 50       1272 $new_target_chunk_size *=
1500             ($ls->prev_runtime < $self->target_time * 3) ?
1501             max(0.5, $adjust_factor) : # never less than half...
1502             $adjust_factor # ...unless the last runtime was waaaay off
1503             ;
1504 65 50       677 $new_target_chunk_size = 1 if $new_target_chunk_size < 1;
1505 65         136 $adjective = 'slow';
1506             }
1507              
1508 69         187 $new_target_chunk_size = int $new_target_chunk_size;
1509 69 100       2135 return if $new_target_chunk_size == $ls->chunk_size; # either nothing changed or it's too miniscule
1510 5 50       49 return if $new_target_chunk_size < 1;
1511              
1512             # Print out a debug line, if enabled
1513 5 50       67 if ($self->debug) {
1514             # CLDR number formatters
1515 0         0 my $integer = $self->cldr->decimal_formatter;
1516 0         0 my $percent = $self->cldr->percent_formatter;
1517              
1518 0         0 $ls->{progress_bar}->message( sprintf(
1519             "Processing too %s, avg %4s of target time, adjusting chunk size from %s to %s",
1520             $adjective,
1521             $percent->format( 1 / $adjust_factor ),
1522             $integer->format( $ls->chunk_size ),
1523             $integer->format( $new_target_chunk_size ),
1524             ) );
1525             }
1526              
1527             # Change it!
1528 5         87 $ls->chunk_size($new_target_chunk_size);
1529 5 100       206 $ls->_reset_last_timings if $adjective eq 'fast'; # never snowball too quickly
1530 5         131 return 1;
1531             }
1532              
1533             #pod =head2 _increment_progress
1534             #pod
1535             #pod Increments the progress bar.
1536             #pod
1537             #pod =cut
1538              
1539             sub _increment_progress {
1540 502     502   1133 my ($self) = @_;
1541 502         7318 my $ls = $self->loop_state;
1542 502         9886 my $progress = $ls->progress_bar;
1543              
1544 502         9697 my $so_far = $ls->end - $self->min_id + 1;
1545 502 50       37477 $progress->target($so_far+1) if $ls->end > $self->max_id;
1546 502         16486 $progress->update($so_far);
1547             }
1548              
1549             #pod =head2 _print_debug_status
1550             #pod
1551             #pod Prints out a standard debug status line, if debug is enabled. What it prints is
1552             #pod generally uniform, but it depends on the processing action. Most of the data is
1553             #pod pulled from L.
1554             #pod
1555             #pod =cut
1556              
1557             sub _print_debug_status {
1558 598     598   1904 my ($self, $action) = @_;
1559 598 50       13163 return unless $self->debug;
1560              
1561 0           my $ls = $self->loop_state;
1562 0   0       my $sleep = $self->sleep || 0;
1563              
1564             # CLDR number formatters
1565 0           my $integer = $self->cldr->decimal_formatter;
1566 0           my $percent = $self->cldr->percent_formatter;
1567 0           my $decimal = $self->cldr->decimal_formatter(
1568             minimum_fraction_digits => 2,
1569             maximum_fraction_digits => 2,
1570             );
1571              
1572 0           my $message;
1573 0 0 0       if ($ls->start < 1_000_000_000 && $ls->end < 1_000_000_000) {
1574 0           $message = sprintf(
1575             'IDs %6u to %6u %9s, %9s rows found',
1576             $ls->start, $ls->end, $action,
1577             $integer->format( $ls->chunk_count ),
1578             );
1579             }
1580             else {
1581 0           $message = sprintf(
1582             'IDs %s to %s %s, %s rows found',
1583             $ls->start, $ls->end, $action,
1584             $ls->chunk_count,
1585             );
1586             }
1587              
1588 0 0         $message .= sprintf(
1589             ' (%4s of chunk size)',
1590             $percent->format( $ls->chunk_count / $ls->chunk_size ),
1591             ) if $ls->chunk_count;
1592              
1593 0 0         if ($action eq 'processed') {
1594 0 0         $message .= $sleep ?
1595             sprintf(
1596             ', %5s+%s sec runtime+sleep',
1597             $decimal->format( $ls->prev_runtime ),
1598             $decimal->format( $sleep )
1599             ) :
1600             sprintf(
1601             ', %5s sec runtime',
1602             $decimal->format( $ls->prev_runtime ),
1603             )
1604             ;
1605             }
1606              
1607             # Reduce spacing if the numbers are too large
1608 0 0 0       if ($ls->start > 1_000_000_000 || $ls->end > 1_000_000_000) {
1609 0           $message =~ s/\s+/ /g;
1610 0           $message =~ s/\(\s+/\(/g;
1611             }
1612              
1613 0           return $ls->progress_bar->message($message);
1614             }
1615              
1616             #pod =head1 CAVEATS
1617             #pod
1618             #pod =head2 Big Number Support
1619             #pod
1620             #pod If the module detects that the ID numbers are no longer safe for standard Perl NV
1621             #pod storage, it will automatically switch to using L and L for
1622             #pod big number support. If any blessed numbers are already being used to define the
1623             #pod attributes, this will also switch on the support.
1624             #pod
1625             #pod =head2 String-based IDs
1626             #pod
1627             #pod If you're working with C types or other string-based IDs to represent integers,
1628             #pod these may be subject to whatever string-based comparison rules your RDBMS uses when
1629             #pod calculating with C/C or using C. Row counting and chunk size scaling
1630             #pod will try to compensate, but will be mixing string-based comparisons from the RDBMS and
1631             #pod Perl-based integer math.
1632             #pod
1633             #pod Using the C function may help, but it may also cause critical indexes to be
1634             #pod ignored, especially if the function is used on the left-hand side against the column.
1635             #pod Strings with the exact same length may be safe from comparison weirdness, but YMMV.
1636             #pod
1637             #pod Non-integer inputs from ID columns, such as GUIDs or other alphanumeric strings, are not
1638             #pod currently supported. They would have to be converted to integers via SQL, and doing so
1639             #pod may run into a similar risk of having your RDBMS ignore indexes.
1640             #pod
1641             #pod =head1 SEE ALSO
1642             #pod
1643             #pod L, L, L
1644             #pod
1645             #pod =cut
1646              
1647             1;
1648              
1649             __END__