File Coverage

blib/lib/DBIx/BatchChunker.pm
Criterion Covered Total %
statement 325 363 89.5
branch 147 190 77.3
condition 43 76 56.5
subroutine 45 47 95.7
pod 3 4 75.0
total 563 680 82.7


line stmt bran cond sub pod time code
1             package DBIx::BatchChunker;
2              
3             our $AUTHORITY = 'cpan:GSG';
4             # ABSTRACT: Run large database changes safely
5 7     7   1255169 use version;
  7         11031  
  7         33  
6             our $VERSION = 'v0.941.1'; # VERSION
7              
8 7     7   3610 use Moo;
  7         65479  
  7         38  
9 7     7   10820 use MooX::StrictConstructor;
  7         79571  
  7         30  
10              
11 7     7   135249 use CLDR::Number;
  7         335221  
  7         262  
12              
13 7     7   3407 use Types::Standard qw( Str Bool Undef ArrayRef HashRef CodeRef InstanceOf Tuple Maybe Optional slurpy );
  7         462504  
  7         63  
14 7     7   14869 use Types::Numbers qw( NumRange UnsignedInt PerlSafeInt PositiveInt PositiveOrZeroNum );
  7         873552  
  7         82  
15 7     7   10462 use Type::Utils;
  7         28591  
  7         64  
16              
17 7     7   8956 use List::Util 1.33 (qw( min max sum any first )); # has any/all/etc.
  7         142  
  7         497  
18 7     7   41 use Math::BigInt upgrade => 'Math::BigFloat';
  7         14  
  7         52  
19 7     7   2099 use Math::BigFloat;
  7         27  
  7         35  
20 7     7   3443 use POSIX qw( ceil );
  7         15  
  7         46  
21 7     7   409 use Scalar::Util qw( blessed weaken );
  7         20  
  7         266  
22 7     7   3185 use Term::ProgressBar 2.14; # with silent option
  7         379493  
  7         286  
23 7     7   53 use Time::HiRes qw( time sleep );
  7         16  
  7         67  
24              
25 7     7   3858 use DBIx::BatchChunker::LoopState;
  7         21  
  7         283  
26              
27             # Don't export the above, but don't conflict with StrictConstructor, either
28 7     7   45 use namespace::clean -except => [qw< new meta >];
  7         15  
  7         46  
29              
30             # This is now an unused, dummy variable
31             our $DB_MAX_ID = ~0;
32              
33             #pod =encoding utf8
34             #pod
35             #pod =head1 SYNOPSIS
36             #pod
37             #pod use DBIx::BatchChunker;
38             #pod
39             #pod my $account_rs = $schema->resultset('Account')->search({
40             #pod account_type => 'deprecated',
41             #pod });
42             #pod
43             #pod my %params = (
44             #pod chunk_size => 5000,
45             #pod target_time => 5,
46             #pod
47             #pod rs => $account_rs,
48             #pod id_name => 'account_id',
49             #pod
50             #pod coderef => sub { $_[1]->delete },
51             #pod sleep => 1,
52             #pod debug => 1,
53             #pod
54             #pod progress_name => 'Deleting deprecated accounts',
55             #pod process_past_max => 1,
56             #pod );
57             #pod
58             #pod # EITHER:
59             #pod # 1) Automatically construct and execute the changes:
60             #pod
61             #pod DBIx::BatchChunker->construct_and_execute(%params);
62             #pod
63             #pod # OR
64             #pod # 2) Manually construct and execute the changes:
65             #pod
66             #pod my $batch_chunker = DBIx::BatchChunker->new(%params);
67             #pod
68             #pod $batch_chunker->calculate_ranges;
69             #pod $batch_chunker->execute;
70             #pod
71             #pod =head1 DESCRIPTION
72             #pod
73             #pod This utility class is for running a large batch of DB changes in a manner that doesn't
74             #pod cause huge locks, outages, and missed transactions. It's highly flexible to allow for
75             #pod many different kinds of change operations, and dynamically adjusts chunks to its
76             #pod workload.
77             #pod
78             #pod It works by splitting up DB operations into smaller chunks within a loop. These chunks
79             #pod are transactionalized, either naturally as single-operation bulk work or by the loop
80             #pod itself. The full range is calculated beforehand to get the right start/end points.
81             #pod A L will be created to let the deployer know the
82             #pod processing status.
83             #pod
84             #pod There are two ways to use this class: call the automatic constructor and executor
85             #pod (L) or manually construct the object and call its methods. See
86             #pod L for examples of both.
87             #pod
88             #pod B You should not rely on this class to magically fix any and all locking
89             #pod problems the DB might experience just because it's being used. Thorough testing and
90             #pod best practices are still required.
91             #pod
92             #pod =head2 Processing Modes
93             #pod
94             #pod This class has several different modes of operation, depending on what was passed to
95             #pod the constructor:
96             #pod
97             #pod =head3 DBIC Processing
98             #pod
99             #pod If both L and L are passed, a chunk ResultSet is built from the base
100             #pod ResultSet, to add in a C clause, and the new ResultSet is passed into the
101             #pod coderef. The coderef should run some sort of active ResultSet operation from there.
102             #pod
103             #pod An L should be provided, but if it is missing it will be looked up based on
104             #pod the primary key of the ResultSource.
105             #pod
106             #pod If L is also enabled, then each chunk is wrapped in a transaction and the
107             #pod coderef is called for each row in the chunk. In this case, the coderef is passed a
108             #pod Result object instead of the chunk ResultSet.
109             #pod
110             #pod Note that whether L is enabled or not, the coderef execution is encapsulated
111             #pod in DBIC's retry logic, so any failures will re-connect and retry the coderef. Because of
112             #pod this, any changes you make within the coderef should be idempotent, or should at least be
113             #pod able to skip over any already-processed rows.
114             #pod
115             #pod =head3 Active DBI Processing
116             #pod
117             #pod If an L (DBI statement handle args) is passed without a L, the statement
118             #pod handle is merely executed on each iteration with the start and end IDs. It is assumed
119             #pod that the SQL for the statement handle contains exactly two placeholders for a C
120             #pod clause. For example:
121             #pod
122             #pod my $update_stmt = q{
123             #pod UPDATE
124             #pod accounts a
125             #pod JOIN account_updates au USING (account_id)
126             #pod SET
127             #pod a.time_stamp = au.time_stamp
128             #pod WHERE
129             #pod a.account_id BETWEEN ? AND ? AND
130             #pod a.time_stamp != au.time_stamp
131             #pod });
132             #pod
133             #pod The C clause should, of course, match the IDs being used in the loop.
134             #pod
135             #pod The statement is ran with L for retry protection. Therefore, the
136             #pod statement should also be idempotent.
137             #pod
138             #pod =head3 Query DBI Processing
139             #pod
140             #pod If both a L and a L are passed, the statement handle is prepared and
141             #pod executed. Like the L mode, the SQL for the statement should
142             #pod contain exactly two placeholders for a C clause. Then the C<$sth> is passed to
143             #pod the coderef. It's up to the coderef to extract data from the executed statement handle,
144             #pod and do something with it.
145             #pod
146             #pod If C is enabled, each chunk is wrapped in a transaction and the coderef is
147             #pod called for each row in the chunk. In this case, the coderef is passed a hashref of the
148             #pod row instead of the executed C<$sth>, with lowercase alias names used as keys.
149             #pod
150             #pod Note that in both cases, the coderef execution is encapsulated in a L
151             #pod call to either C or C (using L), so any failures will
152             #pod re-connect and retry the coderef. Because of this, any changes you make within the
153             #pod coderef should be idempotent, or should at least be able to skip over any
154             #pod already-processed rows.
155             #pod
156             #pod =head3 DIY Processing
157             #pod
158             #pod If a L is passed but neither a C nor a C are passed, then the
159             #pod multiplier loop does not touch the database. The coderef is merely passed the start and
160             #pod end IDs for each chunk. It is expected that the coderef will run through all database
161             #pod operations using those start and end points.
162             #pod
163             #pod It's still valid to include L, L, and/or L in the
164             #pod constructor to enable features like L or
165             #pod L.
166             #pod
167             #pod If you're not going to include any min/max statements for L, you will
168             #pod need to set L and L yourself, either in the constructor or before the
169             #pod L call. Using L is also not an option in this case, as
170             #pod this tries to call L without a way to do so.
171             #pod
172             #pod =head3 TL;DR Version
173             #pod
174             #pod $stmt = Active DBI Processing
175             #pod $stmt + $coderef = Query DBI Processing | $bc->$coderef($executed_sth)
176             #pod $stmt + $coderef + single_rows=>1 = Query DBI Processing | $bc->$coderef($row_hashref)
177             #pod $rs + $coderef = DBIC Processing | $bc->$coderef($chunk_rs)
178             #pod $rs + $coderef + single_rows=>1 = DBIC Processing | $bc->$coderef($result)
179             #pod $coderef = DIY Processing | $bc->$coderef($start, $end)
180             #pod
181             #pod =head1 ATTRIBUTES
182             #pod
183             #pod See the L section for more in-depth descriptions of these attributes and their
184             #pod usage.
185             #pod
186             #pod =head2 DBIC Processing Attributes
187             #pod
188             #pod =head3 rs
189             #pod
190             #pod A L. This is used by all methods as the base ResultSet onto which
191             #pod the DB changes will be applied. Required for DBIC processing.
192             #pod
193             #pod =cut
194              
195             has rs => (
196             is => 'ro',
197             isa => InstanceOf['DBIx::Class::ResultSet'],
198             required => 0,
199             );
200              
201             #pod =head3 rsc
202             #pod
203             #pod A L. This is only used to override L for min/max
204             #pod calculations. Optional.
205             #pod
206             #pod =cut
207              
208             has rsc => (
209             is => 'ro',
210             isa => InstanceOf['DBIx::Class::ResultSetColumn'],
211             required => 0,
212             );
213              
214             #pod =head3 dbic_retry_opts
215             #pod
216             #pod A hashref of DBIC retry options. These options control how retry protection works within
217             #pod DBIC. So far, there are two supported options:
218             #pod
219             #pod max_attempts = Number of times to retry
220             #pod retry_handler = Coderef that returns true to continue to retry or false to re-throw
221             #pod the last exception
222             #pod
223             #pod The default is to let the DBIC storage engine handle its own protection, which will retry
224             #pod once if the DB connection was disconnected. If you specify any options, even a blank
225             #pod hashref, BatchChunker will fill in a default C of 10, and an always-true
226             #pod C. This is similar to L's defaults.
227             #pod
228             #pod Under the hood, these are options that are passed to the as-yet-undocumented
229             #pod L. The C has access to the same
230             #pod BlockRunner object (passed as its only argument) and its methods/accessors, such as C,
231             #pod C, and C.
232             #pod
233             #pod =cut
234              
235             has dbic_retry_opts => (
236             is => 'ro',
237             isa => HashRef,
238             required => 0,
239             predicate => '_has_dbic_retry_opts',
240             );
241              
242             sub _dbic_block_runner {
243 686     686   1724 my ($self, $method, $coderef) = @_;
244              
245 686         1693 my $storage = $self->dbic_storage;
246              
247             # Block running disabled
248 686 100       2384 unless ($self->_has_dbic_retry_opts) {
249 521 100       1630 return $storage->txn_do($coderef) if $method eq 'txn';
250 445         1972 return $storage->dbh_do($coderef);
251             }
252              
253             # A very light wrapper around BlockRunner. No need to load BlockRunner, since DBIC
254             # loads it in before us if we're using this method.
255             DBIx::Class::Storage::BlockRunner->new(
256             # in case they are not defined with a custom dbic_retry_opts
257             max_attempts => 10,
258 102     102   57831 retry_handler => sub { 1 },
259              
260             # never overrides the important ones below
261 165 100       510 %{ $self->dbic_retry_opts },
  165         3427  
262              
263             storage => $storage,
264             wrap_txn => ($method eq 'txn' ? 1 : 0),
265             )->run($coderef);
266             }
267              
268             #pod =head2 DBI Processing Attributes
269             #pod
270             #pod =head3 dbi_connector
271             #pod
272             #pod A L object. Instead of L statement handles, this is the
273             #pod recommended way for BatchChunker to interface with the DBI, as it handles retries on
274             #pod failures. The connection mode used is whatever default is set within the object.
275             #pod
276             #pod Required for DBI Processing, unless L is specified.
277             #pod
278             #pod =cut
279              
280             has dbi_connector => (
281             is => 'ro',
282             isa => InstanceOf['DBIx::Connector::Retry'],
283             required => 0,
284             );
285              
286             #pod =head3 dbic_storage
287             #pod
288             #pod A DBIC storage object, as an alternative for L. There may be times when
289             #pod you want to run plain DBI statements, but are still using DBIC. In these cases, you
290             #pod don't have to create a L object to run those statements.
291             #pod
292             #pod This uses a BlockRunner object for retry protection, so the options in
293             #pod L would apply here.
294             #pod
295             #pod Required for DBI Processing, unless L is specified.
296             #pod
297             #pod =cut
298              
299             has dbic_storage => (
300             is => 'ro',
301             isa => InstanceOf['DBIx::Class::Storage::DBI'],
302             required => 0,
303             );
304              
305             #pod =head3 min_stmt
306             #pod
307             #pod =head3 max_stmt
308             #pod
309             #pod SQL statement strings or an arrayref of parameters for L.
310             #pod
311             #pod When executed, these statements should each return a single value, either the minimum or
312             #pod maximum ID that will be affected by the DB changes. These are used by
313             #pod L. Required if using either type of DBI Processing.
314             #pod
315             #pod =cut
316              
317             my $SQLStringOrSTHArgs_type = Type::Utils::declare(
318             name => 'SQLStringOrSTHArgs',
319             # Allow an SQL string, an optional hashref/undef, and any number of strings/undefs
320             parent => Tuple->parameterize(Str, Optional[Maybe[HashRef]], slurpy ArrayRef[Maybe[Str]]),
321             coercion => sub { $_ = [ $_ ] if Str->check($_); $_ },
322             message => sub { 'Must be either an SQL string or an arrayref of parameters for $sth creation (SQL + hashref/undef + binds)' },
323             );
324              
325             has min_stmt => (
326             is => 'ro',
327             isa => $SQLStringOrSTHArgs_type,
328             required => 0,
329             coerce => 1,
330             );
331              
332             has max_stmt => (
333             is => 'ro',
334             isa => $SQLStringOrSTHArgs_type,
335             required => 0,
336             coerce => 1,
337             );
338              
339             #pod =head3 stmt
340             #pod
341             #pod A SQL statement string or an arrayref of parameters for L + binds.
342             #pod
343             #pod If using L (no coderef), this is a L statement
344             #pod (usually DML like C). If using L (with
345             #pod coderef), this is a passive DQL (C
346             #pod
347             #pod In either case, the statement should contain C placeholders, which will be
348             #pod executed with the start/end ID points. If there are already bind placeholders in the
349             #pod arrayref, then make sure the C bind points are last on the list.
350             #pod
351             #pod Required for DBI Processing.
352             #pod
353             #pod =cut
354              
355             has stmt => (
356             is => 'ro',
357             isa => $SQLStringOrSTHArgs_type,
358             required => 0,
359             coerce => 1,
360             );
361              
362             #pod =head3 count_stmt
363             #pod
364             #pod A C
365             #pod L.
366             #pod
367             #pod Like L, it should contain C placeholders. In fact, the SQL should look
368             #pod exactly like the L query, except with C instead of the column list.
369             #pod
370             #pod Used only for L. Optional, but recommended for
371             #pod L.
372             #pod
373             #pod =cut
374              
375             has count_stmt => (
376             is => 'ro',
377             isa => $SQLStringOrSTHArgs_type,
378             required => 0,
379             coerce => 1,
380             );
381              
382             #pod =head2 Progress Bar Attributes
383             #pod
384             #pod =head3 progress_bar
385             #pod
386             #pod The progress bar used for all methods. This can be specified right before the method
387             #pod call to override the default used for that method. Unlike most attributes, this one
388             #pod is read-write, so it can be switched on-the-fly.
389             #pod
390             #pod Don't forget to remove or switch to a different progress bar if you want to use a
391             #pod different one for another method:
392             #pod
393             #pod $batch_chunker->progress_bar( $calc_pb );
394             #pod $batch_chunker->calculate_ranges;
395             #pod $batch_chunker->progress_bar( $loop_pb );
396             #pod $batch_chunker->execute;
397             #pod
398             #pod All of this is optional. If the progress bar isn't specified, the method will create
399             #pod a default one. If the terminal isn't interactive, the default L will
400             #pod be set to C to naturally skip the output.
401             #pod
402             #pod =cut
403              
404             has progress_bar => (
405             is => 'rw',
406             isa => InstanceOf['Term::ProgressBar'],
407             );
408              
409             #pod =head3 progress_name
410             #pod
411             #pod A string used by L to assist in creating a progress bar. Ignored if
412             #pod L is already specified.
413             #pod
414             #pod This is the preferred way of customizing the progress bar without having to create one
415             #pod from scratch.
416             #pod
417             #pod =cut
418              
419             has progress_name => (
420             is => 'rw',
421             isa => Str,
422             required => 0,
423             lazy => 1,
424             default => sub {
425             my $rs = shift->rs;
426             'Processing'.(defined $rs ? ' '.$rs->result_source->name : '');
427             },
428             );
429              
430             #pod =head3 cldr
431             #pod
432             #pod A L object. English speakers that use a typical C<1,234.56> format would
433             #pod probably want to leave it at the default. Otherwise, you should provide your own.
434             #pod
435             #pod =cut
436              
437             has cldr => (
438             is => 'rw',
439             isa => InstanceOf['CLDR::Number'],
440             required => 0,
441             lazy => 1,
442             default => sub { CLDR::Number->new(locale => 'en') },
443             );
444              
445             #pod =head3 debug
446             #pod
447             #pod Boolean. If turned on, displays timing stats on each chunk, as well as total numbers.
448             #pod
449             #pod =cut
450              
451             has debug => (
452             is => 'rw',
453             isa => Bool,
454             required => 0,
455             default => 0,
456             );
457              
458             #pod =head2 Common Attributes
459             #pod
460             #pod =head3 id_name
461             #pod
462             #pod The column name used as the iterator in the processing loops. This should be a primary
463             #pod key or integer-based (indexed) key, tied to the L.
464             #pod
465             #pod Optional. Used mainly in DBIC processing. If not specified, it will look up
466             #pod the first primary key column from L and use that.
467             #pod
468             #pod This can still be specified for other processing modes to use in progress bars.
469             #pod
470             #pod =cut
471              
472             has id_name => (
473             is => 'rw',
474             isa => Str,
475             required => 0,
476             trigger => \&_fix_id_name,
477             );
478              
479             sub _fix_id_name {
480 62     62   5889 my ($self, $id_name) = @_;
481 62 100 100     681 return if !$id_name || $id_name =~ /\./ || !defined $self->rs; # prevent an infinite trigger loop
      100        
482 12         48 $self->id_name( $self->rs->current_source_alias.".$id_name" );
483             }
484              
485             #pod =head3 coderef
486             #pod
487             #pod The coderef that will be called either on each chunk or each row, depending on how
488             #pod L is set. The first input is always the BatchChunker object. The rest
489             #pod vary depending on the processing mode:
490             #pod
491             #pod $stmt + $coderef = Query DBI Processing | $bc->$coderef($executed_sth)
492             #pod $stmt + $coderef + single_rows=>1 = Query DBI Processing | $bc->$coderef($row_hashref)
493             #pod $rs + $coderef = DBIC Processing | $bc->$coderef($chunk_rs)
494             #pod $rs + $coderef + single_rows=>1 = DBIC Processing | $bc->$coderef($result)
495             #pod $coderef = DIY Processing | $bc->$coderef($start, $end)
496             #pod
497             #pod The loop does not monitor the return values from the coderef.
498             #pod
499             #pod Required for all processing modes except L.
500             #pod
501             #pod =cut
502              
503             has coderef => (
504             is => 'ro',
505             isa => CodeRef,
506             required => 0,
507             );
508              
509             #pod =head3 chunk_size
510             #pod
511             #pod The amount of rows to be processed in each loop.
512             #pod
513             #pod Default is 1000 rows. This figure should be sized to keep per-chunk processing time
514             #pod at around 5 seconds. If this is too large, rows may lock for too long. If it's too
515             #pod small, processing may be unnecessarily slow.
516             #pod
517             #pod =cut
518              
519             has chunk_size => (
520             is => 'rw',
521             isa => PositiveInt,
522             required => 0,
523             default => 1000,
524             );
525              
526             #pod =head3 target_time
527             #pod
528             #pod The target runtime (in seconds) that chunk processing should strive to achieve, not
529             #pod including L. If the chunk processing times are too high or too low, this will
530             #pod dynamically adjust L to try to match the target.
531             #pod
532             #pod B!> If the starting chunk
533             #pod size is grossly inaccurate to the workload, you could end up with several chunks in the
534             #pod beginning causing long-lasting locks before the runtime targeting reduces them down to a
535             #pod reasonable size.
536             #pod
537             #pod Default is 5 seconds. Set this to zero to turn off runtime targeting. (This was
538             #pod previously defaulted to off prior to v0.92, and set to 15 in v0.92.)
539             #pod
540             #pod =cut
541              
542             has target_time => (
543             is => 'ro',
544             isa => PositiveOrZeroNum,
545             required => 0,
546             default => 5,
547             );
548              
549             #pod =head3 sleep
550             #pod
551             #pod The number of seconds to sleep after each chunk. It uses L's version, so
552             #pod fractional numbers are allowed.
553             #pod
554             #pod Default is 0, which is fine for most operations. But, it is highly recommended to turn
555             #pod this on (say, 1 to 5 seconds) for really long one-off DB operations, especially if a lot
556             #pod of disk I/O is involved. Without this, there's a chance that the slaves will have a hard
557             #pod time keeping up, and/or the master won't have enough processing power to keep up with
558             #pod standard load.
559             #pod
560             #pod This will increase the overall processing time of the loop, so try to find a balance
561             #pod between the two.
562             #pod
563             #pod =cut
564              
565             has 'sleep' => (
566             is => 'ro',
567             isa => PositiveOrZeroNum,
568             required => 0,
569             default => 0,
570             );
571              
572             #pod =head3 process_past_max
573             #pod
574             #pod Boolean that controls whether to check past the L during the loop. If the loop
575             #pod hits the end point, it will run another maximum ID check in the DB, and adjust C
576             #pod accordingly. If it somehow cannot run a DB check (no L or L available,
577             #pod for example), the last chunk will just be one at the end of C<< max_id + chunk_size >>.
578             #pod
579             #pod This is useful if the entire table is expected to be processed, and you don't want to
580             #pod miss any new rows that come up between L and the end of the loop.
581             #pod
582             #pod Turned off by default.
583             #pod
584             #pod =cut
585              
586             has process_past_max => (
587             is => 'ro',
588             isa => Bool,
589             required => 0,
590             default => 0,
591             );
592              
593             #pod =head3 single_rows
594             #pod
595             #pod Boolean that controls whether single rows are passed to the L or the chunk's
596             #pod ResultSets/statement handle is passed.
597             #pod
598             #pod Since running single-row operations in a DB is painfully slow (compared to bulk
599             #pod operations), this also controls whether the entire set of coderefs are encapsulated into
600             #pod a DB transaction. Transactionalizing the entire chunk brings the speed, and atomicity,
601             #pod back to what a bulk operation would be. (Bulk operations are still faster, but you can't
602             #pod do anything you want in a single DML statement.)
603             #pod
604             #pod Used only by L and L.
605             #pod
606             #pod =cut
607              
608             has single_rows => (
609             is => 'ro',
610             isa => Bool,
611             required => 0,
612             default => 0,
613             );
614              
615             #pod =head3 min_chunk_percent
616             #pod
617             #pod The minimum row count, as a percentage of L. This value is actually
618             #pod expressed in decimal form, i.e.: between 0 and 1.
619             #pod
620             #pod This value will be used to determine when to process, skip, or expand a block, based on
621             #pod a count query. The default is C<0.5> or 50%, which means that it will try to expand the
622             #pod block to a larger size if the row count is less than 50% of the chunk size. Zero-sized
623             #pod blocks will be skipped entirely.
624             #pod
625             #pod This "chunk resizing" is useful for large regions of the table that have been deleted, or
626             #pod when the incrementing ID has large gaps in it for other reasons. Wasting time on
627             #pod numerical gaps that span millions can slow down the processing considerably, especially
628             #pod if L is enabled.
629             #pod
630             #pod If this needs to be disabled, set this to 0. The maximum chunk percentage does not have
631             #pod a setting and is hard-coded at C<< 100% + min_chunk_percent >>.
632             #pod
633             #pod If DBIC processing isn't used, L is also required to enable chunk resizing.
634             #pod
635             #pod =cut
636              
637             has min_chunk_percent => (
638             is => 'ro',
639             isa => Type::Utils::declare(
640             name => 'PositiveZeroToOneNum',
641             parent => NumRange->parameterize(0, 1),
642             message => sub { 'Must be a number between 0 and 1' },
643             ),
644             required => 0,
645             default => 0.5,
646             );
647              
648             #pod =head3 min_id
649             #pod
650             #pod =head3 max_id
651             #pod
652             #pod Used by L to figure out the main start and end points. Calculated by
653             #pod L.
654             #pod
655             #pod Manually setting this is not recommended, as each database is different and the
656             #pod information may have changed between the DB change development and deployment. Instead,
657             #pod use L to fill in these values right before running the loop.
658             #pod
659             #pod =cut
660              
661             has min_id => (
662             is => 'rw',
663             isa => UnsignedInt,
664             );
665              
666             has max_id => (
667             is => 'rw',
668             isa => UnsignedInt,
669             );
670              
671             # Big number handling
672             has _use_bignums => (
673             is => 'rw',
674             isa => Bool,
675             default => 0,
676             trigger => \&_upgrade_attrs_to_bigint,
677             );
678              
679             my @BIGNUM_BC_ATTRS = (qw< chunk_size min_id max_id >);
680             my @BIGNUM_LS_ATTRS = (qw< start end prev_end multiplier_range multiplier_step chunk_size chunk_count >);
681              
682             sub _check_bignums {
683 704     704   1847 my ($self) = shift;
684 704 100       14084 return 1 if $self->_use_bignums; # already checked these
685              
686             # Auto-set _use_bignums if we detect that we need it
687 624         4650 my $set_bignums = 0;
688              
689             # If other values are passed, check those, too
690 624         1554 foreach my $val (@_) {
691 589 100       2402 next unless defined $val;
692 428 50 33     2613 $set_bignums = 1 if blessed $val || !PerlSafeInt->check($val);
693             }
694              
695             # Check BatchChunker attributes
696 624         10243 foreach my $attr (@BIGNUM_BC_ATTRS) {
697 1872         44331 my $val = $self->$attr();
698 1872 100       11727 next unless defined $val;
699 1750 100 66     5638 $set_bignums = 1 if blessed $val || !PerlSafeInt->check($val);
700             }
701              
702             # Check LoopState attributes
703 624 100       14775 if (my $ls = $self->loop_state) {
704 535         3856 foreach my $attr (@BIGNUM_LS_ATTRS) {
705 3745         87838 my $val = $ls->$attr();
706 3745 100       23216 next unless defined $val;
707 3210 100 66     7914 $set_bignums = 1 if blessed $val || !PerlSafeInt->check($val);
708             }
709             }
710              
711 624 100       2093 $self->_use_bignums(1) if $set_bignums;
712 624         1789 return $set_bignums;
713             }
714              
715             sub _upgrade_attrs_to_bigint {
716 4     4   121 my ($self, $is_on) = @_;
717 4 50       13 return unless $is_on;
718              
719             # Fix BatchChunker attributes
720 4         13 foreach my $attr (@BIGNUM_BC_ATTRS) {
721 12         2749 my $val = $self->$attr();
722 12 100       72 next unless defined $val; # nothing to upgrade
723 8 100       30 next if blessed $val; # already upgraded
724 6         36 $self->$attr( Math::BigInt->new($val) );
725             }
726              
727             # Fix LoopState attributes
728 4         1341 my $ls = $self->loop_state;
729 4 100       32 return unless $ls;
730 2         7 foreach my $attr (@BIGNUM_LS_ATTRS) {
731 14         9615 my $val = $ls->$attr();
732 14 100       138 next unless defined $val; # nothing to upgrade
733 12 50       34 next if blessed $val; # already upgraded
734 12         37 $ls->$attr( Math::BigInt->new($val) );
735             }
736             }
737              
738             #pod =head3 loop_state
739             #pod
740             #pod A L object designed to hold variables during the
741             #pod processing loop. The object will be cleared out after use. Most of the complexity is
742             #pod needed for chunk resizing.
743             #pod
744             #pod =cut
745              
746             has loop_state => (
747             is => 'rw',
748             isa => InstanceOf['DBIx::BatchChunker::LoopState'],
749             required => 0,
750             init_arg => undef,
751             clearer => 'clear_loop_state',
752             );
753              
754             # Backwards-compatibility
755             *_loop_state = \&loop_state;
756              
757             around BUILDARGS => sub {
758             my $next = shift;
759             my $class = shift;
760              
761             my %args = @_ == 1 ? %{ $_[0] } : @_;
762              
763             # Auto-building of rsc and id_name can be a weird dependency dance, so it's better to
764             # handle it here.
765             my ($rsc, $rs, $id_name) = @args{qw< rsc rs id_name >};
766             if (defined $rsc && !$id_name) {
767             $args{id_name} = $rsc->{_as};
768             }
769             elsif (!defined $rsc && $id_name && defined $rs) {
770             $args{rsc} = $rs->get_column( $args{id_name} );
771             }
772             elsif (!defined $rsc && !$id_name && defined $rs) {
773             $args{id_name} = ($rs->result_source->primary_columns)[0];
774             $args{rsc} = $rs->get_column( $args{id_name} );
775             }
776             $rsc = $args{rsc};
777              
778             # Auto-add dbic_storage, if available
779             if (!defined $args{dbic_storage} && (defined $rs || defined $rsc)) {
780             $args{dbic_storage} = defined $rs ? $rs->result_source->storage : $rsc->_resultset->result_source->storage;
781             }
782              
783             # Find something to use as a dbi_connector, if it doesn't already exist
784             my @old_attrs = qw< sth min_sth max_sth count_sth >;
785             my @new_attrs = map { my $k = $_; $k =~ s/sth$/stmt/; $k } @old_attrs;
786             my $example_key = first { $args{$_} } @old_attrs;
787             if ($example_key && !defined $args{dbi_connector}) {
788             warn join "\n",
789             'The sth/*_sth options are now considered legacy usage in DBIx::BatchChunker. Because there is no',
790             'way to re-acquire the password, any attempt to reconnect will fail. Please use dbi_connector and',
791             'stmt/*_stmt instead for reconnection support.',
792             ''
793             ;
794              
795             # NOTE: There was a way to monkey-patch _connect to use $dbh->clone, but I've considered it
796             # too intrusive of a solution to use. Better to demand that the user switch to the new
797             # attributes, but have something that still works in most cases.
798              
799             # Attempt to build some sort of Connector object
800             require DBIx::Connector::Retry;
801             my $dbh = $args{$example_key}->{Database};
802              
803             my $conn = DBIx::Connector::Retry->new(
804             connect_info => [
805             join(':', 'dbi', $dbh->{Driver}{Name}, $dbh->{Name}),
806             $dbh->{Username},
807             '', # XXX: Can't acquire the password
808             # Sane %attr defaults on the off-chance that it actually re-connects
809             { AutoCommit => 1, RaiseError => 1 },
810             ],
811              
812             # Do not disconnect on DESTROY. The $dbh might still be used post-run.
813             disconnect_on_destroy => 0,
814             );
815              
816             # Pretend $conn->_connect was called and store our pre-existing $dbh
817             $conn->{_pid} = $$;
818             $conn->{_tid} = threads->tid if $INC{'threads.pm'};
819             $conn->{_dbh} = $dbh;
820             $conn->driver;
821              
822             $args{dbi_connector} = $conn;
823             }
824              
825             # Handle legacy options for sth/*_sth
826             foreach my $old_attr (grep { $args{$_} } @old_attrs) {
827             my $new_attr = $old_attr;
828             $new_attr =~ s/sth$/stmt/;
829              
830             my $sth = delete $args{$old_attr};
831             $args{$new_attr} ||= [ $sth->{Statement} ];
832             }
833              
834             # Now check to make sure dbi_connector is available for DBI processing
835             die 'DBI processing requires a dbi_connector or dbic_storage attribute!' if (
836             !(defined $args{dbi_connector} || defined $args{dbic_storage}) &&
837             (defined first { $args{$_} } @new_attrs)
838             );
839              
840             # Other sanity checks
841             die 'Range calculations require one of these attr sets: rsc, rs, or dbi_connector|dbic_storage + min_stmt + max_stmt' unless (
842             defined $args{rsc} ||
843             (defined $args{min_stmt} && defined $args{max_stmt}) ||
844             (!defined $args{dbi_connector} && !defined $args{dbic_storage} && defined $args{coderef}) # DIY mode is exempt
845             );
846              
847             die 'Block execution requires one of these attr sets: dbi_connector|dbic_storage + stmt, rs + coderef, or coderef' unless (
848             $args{stmt} ||
849             (defined $args{rs} && $args{coderef}) ||
850             $args{coderef}
851             );
852              
853             $class->$next( %args );
854             };
855              
856             sub BUILD {
857 33     33 0 10515 my $self = shift;
858             # Make sure id_name gets fixed at the right time
859 33         561 $self->_fix_id_name( $self->id_name );
860 33         119 $self->_check_bignums;
861             }
862              
863             #pod =head1 CONSTRUCTORS
864             #pod
865             #pod See L for information on what can be passed into these constructors.
866             #pod
867             #pod =head2 new
868             #pod
869             #pod my $batch_chunker = DBIx::BatchChunker->new(...);
870             #pod
871             #pod A standard object constructor. If you use this constructor, you will need to
872             #pod manually call L and L to execute the DB changes.
873             #pod
874             #pod =head2 construct_and_execute
875             #pod
876             #pod my $batch_chunker = DBIx::BatchChunker->construct_and_execute(...);
877             #pod
878             #pod Constructs a DBIx::BatchChunker object and automatically calls
879             #pod L and L on it. Anything passed to this method will be passed
880             #pod through to the constructor.
881             #pod
882             #pod Returns the constructed object, post-execution. This is typically only useful if you want
883             #pod to inspect the attributes after the process has finished. Otherwise, it's safe to just
884             #pod ignore the return and throw away the object immediately.
885             #pod
886             #pod =cut
887              
888             sub construct_and_execute {
889 4     4 1 11903 my $class = shift;
890 4         89 my $db_change = $class->new(@_);
891              
892 3         9 $db_change->calculate_ranges;
893 3         9 $db_change->execute;
894              
895 3         64 return $db_change;
896             }
897              
898             #pod =head1 METHODS
899             #pod
900             #pod =head2 calculate_ranges
901             #pod
902             #pod my $batch_chunker = DBIx::BatchChunker->new(
903             #pod rsc => $account_rsc, # a ResultSetColumn
904             #pod ### OR ###
905             #pod rs => $account_rs, # a ResultSet
906             #pod id_name => 'account_id', # can be looked up if not provided
907             #pod ### OR ###
908             #pod dbi_connector => $conn, # DBIx::Connector::Retry object
909             #pod min_stmt => $min_stmt, # a SQL statement or DBI $sth args
910             #pod max_stmt => $max_stmt, # ditto
911             #pod
912             #pod ### Optional but recommended ###
913             #pod id_name => 'account_id', # will also be added into the progress bar title
914             #pod chunk_size => 20_000, # default is 1000
915             #pod
916             #pod ### Optional ###
917             #pod progress_bar => $progress, # defaults to a 2-count 'Calculating ranges' bar
918             #pod
919             #pod # ...other attributes for execute...
920             #pod );
921             #pod
922             #pod my $has_data_to_process = $batch_chunker->calculate_ranges;
923             #pod
924             #pod Given a L, L, or L statement
925             #pod argument set, this method calculates the min/max IDs of those objects. It fills in the
926             #pod L and L attributes, based on the ID data, and then returns 1.
927             #pod
928             #pod If either of the min/max statements don't return any ID data, this method will return 0.
929             #pod
930             #pod =cut
931              
932             sub calculate_ranges {
933 29     29 1 11439 my $self = shift;
934              
935 29   100     529 my $column_name = $self->id_name || '';
936 29         273 $column_name =~ s/^\w+\.//;
937              
938 29   33     439 my $progress = $self->progress_bar || Term::ProgressBar->new({
939             name => 'Calculating ranges'.($column_name ? " for $column_name" : ''),
940             count => 2,
941             ETA => 'linear',
942             silent => !(-t *STDERR && -t *STDIN), # STDERR is what {fh} is set to use
943             });
944              
945             # Actually run the statements
946 29         49402 my ($min_id, $max_id);
947 29 100       212 if ($self->rsc) {
    100          
948             $self->_dbic_block_runner( run => sub {
949             # In case the sub is retried
950 15     15   3459 $progress->update(0);
951              
952 15         672 $min_id = $self->rsc->min;
953 15         43094 $progress->update(1);
954              
955 15         864 $max_id = $self->rsc->max;
956 15         31817 $progress->update(2);
957 15         123 });
958             }
959             elsif ($self->dbic_storage) {
960             $self->_dbic_block_runner( run => sub {
961 5     5   964 my $dbh = $self->dbic_storage->dbh;
962              
963             # In case the sub is retried
964 5         1497 $progress->update(0);
965              
966 5         202 ($min_id) = $dbh->selectrow_array(@{ $self->min_stmt });
  5         49  
967 5         761 $progress->update(1);
968              
969 5         206 ($max_id) = $dbh->selectrow_array(@{ $self->max_stmt });
  5         55  
970 5         655 $progress->update(2);
971 5         35 });
972             }
973             else {
974             $self->dbi_connector->run(sub {
975 9     9   2142 my $dbh = $_;
976              
977             # In case the sub is retried
978 9         43 $progress->update(0);
979              
980 9         364 ($min_id) = $dbh->selectrow_array(@{ $self->min_stmt });
  9         154  
981 9         2928 $progress->update(1);
982              
983 9         413 ($max_id) = $dbh->selectrow_array(@{ $self->max_stmt });
  9         152  
984 9         1052 $progress->update(2);
985 9         299 });
986             }
987              
988             # Set the ranges and return
989 29 50 33     2868 return 0 unless defined $min_id && defined $max_id;
990              
991             # This would be the primary spot where we notice we need to upgrade, so check the values before
992             # we attempt to mangle them.
993 29 100       129 if ($self->_check_bignums($min_id, $max_id)) {
994 2         35 $min_id = Math::BigFloat->new($min_id)->as_int;
995 2         586 $max_id = Math::BigFloat->new($max_id)->as_int;
996             }
997             else {
998 27         52 $min_id = int $min_id;
999 27         52 $max_id = int $max_id;
1000             }
1001              
1002 29         1304 $self->min_id($min_id);
1003 29         2892 $self->max_id($max_id);
1004              
1005 29         2486 return 1;
1006             }
1007              
1008             #pod =head2 execute
1009             #pod
1010             #pod my $batch_chunker = DBIx::BatchChunker->new(
1011             #pod # ...other attributes for calculate_ranges...
1012             #pod
1013             #pod dbi_connector => $conn, # DBIx::Connector::Retry object
1014             #pod stmt => $do_stmt, # INSERT/UPDATE/DELETE $stmt with BETWEEN placeholders
1015             #pod ### OR ###
1016             #pod dbi_connector => $conn, # DBIx::Connector::Retry object
1017             #pod stmt => $select_stmt, # SELECT $stmt with BETWEEN placeholders
1018             #pod count_stmt => $count_stmt, # SELECT COUNT $stmt to be used for min_chunk_percent; optional
1019             #pod coderef => $coderef, # called code that does the actual work
1020             #pod ### OR ###
1021             #pod rs => $account_rs, # base ResultSet, which gets filtered with -between later on
1022             #pod id_name => 'account_id', # can be looked up if not provided
1023             #pod coderef => $coderef, # called code that does the actual work
1024             #pod ### OR ###
1025             #pod coderef => $coderef, # DIY database work; just pass the $start/$end IDs
1026             #pod
1027             #pod ### Optional but recommended ###
1028             #pod sleep => 0.25, # number of seconds to sleep each chunk; defaults to 0
1029             #pod process_past_max => 1, # use this if processing the whole table
1030             #pod single_rows => 1, # does $coderef get a single $row or the whole $chunk_rs / $stmt
1031             #pod min_chunk_percent => 0.25, # minimum row count of chunk size percentage; defaults to 0.5 (or 50%)
1032             #pod target_time => 5, # target runtime for dynamic chunk size scaling; default is 5 seconds
1033             #pod
1034             #pod progress_name => 'Updating Accounts', # easier than creating your own progress_bar
1035             #pod
1036             #pod ### Optional ###
1037             #pod progress_bar => $progress, # defaults to "Processing $source_name" bar
1038             #pod debug => 1, # displays timing stats on each chunk
1039             #pod );
1040             #pod
1041             #pod $batch_chunker->execute if $batch_chunker->calculate_ranges;
1042             #pod
1043             #pod Applies the configured DB changes in chunks. Runs through the loop, processing a
1044             #pod statement handle, ResultSet, and/or coderef as it goes. Each loop iteration processes a
1045             #pod chunk of work, determined by L.
1046             #pod
1047             #pod The L method should be run first to fill in L and L.
1048             #pod If either of these are missing, the function will assume L couldn't
1049             #pod find them and warn about it.
1050             #pod
1051             #pod More details can be found in the L and L sections.
1052             #pod
1053             #pod =cut
1054              
1055             sub execute {
1056 31     31 1 22048 my $self = shift;
1057 31         101 $self->_check_bignums;
1058              
1059 31         67 my $count;
1060 31 100 66     465 if (defined $self->min_id && defined $self->max_id) {
1061 30         1091 $count = $self->max_id - $self->min_id + 1;
1062             }
1063              
1064             # Fire up the progress bar
1065 31   33     1642 my $progress = $self->progress_bar || Term::ProgressBar->new({
1066             name => $self->progress_name,
1067             count => $count || 1,
1068             ETA => 'linear',
1069             silent => !(-t *STDERR && -t *STDIN), # STDERR is what {fh} is set to use
1070             });
1071              
1072 31 100       57169 unless ($count) {
1073 1         5 $progress->message('No chunks; nothing to process...');
1074 1         34 return;
1075             }
1076              
1077 30 50       629 if ($self->debug) {
1078             $progress->message(
1079             sprintf "(%s total chunks; %s total rows)",
1080 0         0 map { $self->cldr->decimal_formatter->format($_) } ( ceil($count / $self->chunk_size), $count)
  0         0  
1081             );
1082             }
1083              
1084             # Loop state setup
1085 30         662 $self->clear_loop_state;
1086 30         650 my $ls = $self->loop_state( DBIx::BatchChunker::LoopState->new({
1087             batch_chunker => $self,
1088             progress_bar => $progress,
1089             }) );
1090              
1091             # Da loop
1092 30   66     3280 while ($ls->prev_end < $self->max_id || $ls->start) {
1093 605         76640 $ls->multiplier_range($ls->multiplier_range + $ls->multiplier_step);
1094 605 100       90790 $ls->start ($ls->prev_end + 1) unless defined $ls->start; # this could be already set because of early 'next' calls
1095 605         83223 $ls->end(
1096             min(
1097             $ls->start + ceil($ls->multiplier_range * $ls->chunk_size) - 1, # ceil, because multiplier_* could be fractional
1098             $self->max_id, # ensure we never exceed max_id
1099             )
1100             );
1101 605         178889 $ls->chunk_count (undef);
1102              
1103 605 50       15215 next unless $self->_process_past_max_checker;
1104              
1105             # The actual DB processing
1106 605 100       3839 next unless $self->_process_block;
1107              
1108             # Record the time quickly
1109 502         26465 $ls->prev_runtime(time - $ls->timer);
1110              
1111             # Give the DB a little bit of breathing room
1112 502 100       8339943 sleep $self->sleep if $self->sleep;
1113              
1114 502         3471 $self->_print_debug_status('processed');
1115 502         5471 $self->_increment_progress;
1116 502         30245 $self->_runtime_checker;
1117              
1118             # End-of-loop activities (skipped by early next)
1119 502         2823 $ls->_reset_chunk_state;
1120             }
1121 30         5201 $self->clear_loop_state;
1122              
1123             # Keep the finished time from the progress bar, in case there are other loops or output
1124 30 50       781 unless ($progress->silent) {
1125 0         0 $progress->update( $progress->target );
1126 0         0 print "\n";
1127             }
1128             }
1129              
1130             #pod =head1 PRIVATE METHODS
1131             #pod
1132             #pod =head2 _process_block
1133             #pod
1134             #pod Runs the DB work and passes it to the coderef. Its return value determines whether the
1135             #pod block should be processed or not.
1136             #pod
1137             #pod =cut
1138              
1139             sub _process_block {
1140 605     605   1195 my ($self) = @_;
1141              
1142 605         8324 my $ls = $self->loop_state;
1143 605         3907 my $conn = $self->dbi_connector;
1144 605         1527 my $coderef = $self->coderef;
1145 605         1597 my $rs = $self->rs;
1146              
1147             # Figure out if the row count is worth the work
1148 605         886 my $chunk_rs;
1149 605         1629 my $count_stmt = $self->count_stmt;
1150 605         1095 my $chunk_count;
1151 605 100 100     3385 if ($count_stmt && defined $self->dbic_storage) {
    100          
    100          
1152             $self->_dbic_block_runner( run => sub {
1153 93 100   93   19082 $chunk_count = $self->dbic_storage->dbh->selectrow_array(
1154             @$count_stmt,
1155             (@$count_stmt == 1 ? undef : ()),
1156             $ls->start, $ls->end,
1157             );
1158 93         567 });
1159             }
1160             elsif ($count_stmt) {
1161             $chunk_count = $conn->run(sub {
1162 109 100   109   25393 $_->selectrow_array(
1163             @$count_stmt,
1164             (@$count_stmt == 1 ? undef : ()),
1165             $ls->start, $ls->end,
1166             );
1167 109         2907 });
1168             }
1169             elsif (defined $rs) {
1170 242         3663 $chunk_rs = $rs->search({
1171             $self->id_name => { -between => [$ls->start, $ls->end] },
1172             });
1173              
1174             $self->_dbic_block_runner( run => sub {
1175 242     242   52606 $chunk_count = $chunk_rs->count;
1176 242         104412 });
1177             }
1178              
1179 605 100       985174 $chunk_count = Math::BigInt->new($chunk_count) if $self->_check_bignums($chunk_count);
1180 605         12904 $ls->chunk_count($chunk_count);
1181              
1182 605 100       70590 return unless $self->_chunk_count_checker;
1183              
1184             # NOTE: Try to minimize the amount of closures by using $self as much as possible
1185             # inside coderefs.
1186              
1187             # Do the work
1188 502 100 66     2753 if (my $stmt = $self->stmt) {
    100          
1189             ### Statement handle
1190 211 100       881 my @prepare_args = @$stmt > 2 ? @$stmt[0..1] : @$stmt;
1191 211 100       3358 my @execute_args = (
1192             (@$stmt > 2 ? @$stmt[2..$#$stmt] : ()),
1193             $ls->start, $ls->end,
1194             );
1195              
1196 211 100 66     5670 if ($self->single_rows && $coderef) {
1197             # Transactional work
1198 67 100       246 if ($self->dbic_storage) {
1199             $self->_dbic_block_runner( txn => sub {
1200 49     49   14127 $self->loop_state->_mark_timer; # reset timer on retries
1201              
1202 49         2258 my $sth = $self->dbic_storage->dbh->prepare(@prepare_args);
1203 49         15593 $sth->execute(@execute_args);
1204              
1205 49         1398 while (my $row = $sth->fetchrow_hashref('NAME_lc')) { $self->coderef->($self, $row) }
  73         39400  
1206 44         254 });
1207             }
1208             else {
1209             $conn->txn(sub {
1210 84     84   30660 $self->loop_state->_mark_timer; # reset timer on retries
1211              
1212 84         3619 my $sth = $_->prepare(@prepare_args);
1213 84         6688 $sth->execute(@execute_args);
1214              
1215 28         3830 while (my $row = $sth->fetchrow_hashref('NAME_lc')) { $self->coderef->($self, $row) }
  22         156  
1216 23         546 });
1217             }
1218             }
1219             else {
1220             # Bulk work (or DML)
1221 144 100       514 if ($self->dbic_storage) {
1222             $self->_dbic_block_runner( run => sub {
1223 97     97   24064 $self->loop_state->_mark_timer; # reset timer on retries
1224              
1225 97         3896 my $sth = $self->dbic_storage->dbh->prepare(@prepare_args);
1226 97         27293 $sth->execute(@execute_args);
1227              
1228 51 100       1251 $self->coderef->($self, $sth) if $self->coderef;
1229 51         298 });
1230             }
1231             else {
1232             $conn->run(sub {
1233 139     139   32325 $self->loop_state->_mark_timer; # reset timer on retries
1234              
1235 139         6530 my $sth = $_->prepare(@prepare_args);
1236 139         13138 $sth->execute(@execute_args);
1237              
1238 93 100       5216 $self->coderef->($self, $sth) if $self->coderef;
1239 93         2535 });
1240             }
1241             }
1242             }
1243             elsif (defined $rs && $coderef) {
1244             ### ResultSet with coderef
1245              
1246 230 100       789 if ($self->single_rows) {
1247             # Transactional work
1248             $self->_dbic_block_runner( txn => sub {
1249             # reset timer/$rs on retries
1250 83     83   25392 $self->loop_state->_mark_timer;
1251 83         4072 $chunk_rs->reset;
1252              
1253 83         15569 while (my $row = $chunk_rs->next) { $self->coderef->($self, $row) }
  76         119330  
1254 78         469 });
1255             }
1256             else {
1257             # Bulk work
1258             $self->_dbic_block_runner( run => sub {
1259             # reset timer/$rs on retries
1260 198     198   46341 $self->loop_state->_mark_timer;
1261 198         9435 $chunk_rs->reset;
1262              
1263 198         31049 $self->coderef->($self, $chunk_rs);
1264 152         913 });
1265             }
1266             }
1267             else {
1268             ### Something a bit more free-form
1269              
1270 61         866 $self->$coderef($ls->start, $ls->end);
1271             }
1272              
1273 502         17346643 return 1;
1274             }
1275              
1276             #pod =head2 _process_past_max_checker
1277             #pod
1278             #pod Checks to make sure the current endpoint is actually the end, by checking the database.
1279             #pod Its return value determines whether the block should be processed or not.
1280             #pod
1281             #pod See L.
1282             #pod
1283             #pod =cut
1284              
1285             sub _process_past_max_checker {
1286 605     605   1399 my ($self) = @_;
1287 605         8799 my $ls = $self->loop_state;
1288 605         10556 my $progress = $ls->progress_bar;
1289              
1290 605 100       5648 return 1 unless $self->process_past_max;
1291 80 100       1004 return 1 unless $ls->end >= $self->max_id;
1292              
1293             # No checks for DIY, if they didn't include a max_stmt
1294 6 50 33     228 unless (defined $self->rsc || $self->max_stmt) {
1295             # There's no way to size this, so add one more chunk
1296 0         0 $ls->end($self->max_id + $ls->chunk_size);
1297 0         0 return 1;
1298             }
1299              
1300             # Run another MAX check
1301 6 50       89 $progress->message('Reached end; re-checking max ID') if $self->debug;
1302 6         54 my $new_max_id;
1303 6 50       22 if (defined( my $rsc = $self->rsc )) {
    0          
1304             $self->_dbic_block_runner( run => sub {
1305 6     6   1314 $new_max_id = $rsc->max;
1306 6         40 });
1307             }
1308             elsif ($self->dbic_storage) {
1309             $self->_dbic_block_runner( run => sub {
1310 0     0   0 ($new_max_id) = $self->dbic_storage->dbh->selectrow_array(@{ $self->max_stmt });
  0         0  
1311 0         0 });
1312             }
1313             else {
1314             ($new_max_id) = $self->dbi_connector->run(sub {
1315 0     0   0 $_->selectrow_array(@{ $self->max_stmt });
  0         0  
1316 0         0 });
1317             }
1318 6         10997 $ls->_mark_timer; # the above query shouldn't impact runtimes
1319              
1320             # Convert $new_max_id if necessary
1321 6 100       278 $new_max_id = Math::BigInt->new($new_max_id) if $self->_check_bignums($new_max_id);
1322              
1323 6 50 33     229 if (!$new_max_id || $new_max_id eq '0E0') {
    100          
    50          
1324             # No max: No affected rows to change
1325 0 0       0 $progress->message('No max ID found; nothing left to process...') if $self->debug;
1326 0         0 $ls->end($self->max_id);
1327              
1328 0         0 $ls->prev_check('no max');
1329 0         0 return 0;
1330             }
1331             elsif ($new_max_id > $self->max_id) {
1332             # New max ID
1333 2 50       164 $progress->message( sprintf 'New max ID set from %s to %s', $self->max_id, $new_max_id ) if $self->debug;
1334 2         40 $self->max_id($new_max_id);
1335 2         790 $progress->target( $new_max_id - $self->min_id + 1 );
1336 2         3756 $progress->update( $progress->last_update );
1337             }
1338             elsif ($new_max_id == $self->max_id) {
1339             # Same max ID
1340 4 50       306 $progress->message( sprintf 'Found max ID %s; same as end', $new_max_id ) if $self->debug;
1341             }
1342             else {
1343             # Max too low
1344 0 0       0 $progress->message( sprintf 'Found max ID %s; ignoring...', $new_max_id ) if $self->debug;
1345             }
1346              
1347             # Run another boundary check with the new max_id value
1348 6         397 $ls->end( min($ls->end, $self->max_id) );
1349              
1350 6         1940 return 1;
1351             }
1352              
1353             #pod =head2 _chunk_count_checker
1354             #pod
1355             #pod Checks the chunk count to make sure it's properly sized. If not, it will try to shrink
1356             #pod or expand the current chunk (in C increments) as necessary. Its return value
1357             #pod determines whether the block should be processed or not.
1358             #pod
1359             #pod See L.
1360             #pod
1361             #pod This is not to be confused with the L, which adjusts C
1362             #pod after processing, based on previous run times.
1363             #pod
1364             #pod =cut
1365              
1366             sub _chunk_count_checker {
1367 605     605   1549 my ($self) = @_;
1368 605         8686 my $ls = $self->loop_state;
1369 605         10701 my $progress = $ls->progress_bar;
1370              
1371             # Chunk sizing is essentially disabled, so bounce out of here
1372 605 100 100     8556 if ($self->min_chunk_percent <= 0 || !defined $ls->chunk_count) {
1373 429         6976 $ls->prev_check('disabled');
1374 429         12601 return 1;
1375             }
1376              
1377 176         3266 my $chunk_percent = $ls->chunk_count / $ls->chunk_size;
1378 176         41294 $ls->checked_count( $ls->checked_count + 1 );
1379              
1380 176 100 66     7642 if ($ls->chunk_count == 0 && $self->min_chunk_percent > 0) {
    100          
    100          
    50          
    100          
    100          
1381             # No rows: Skip the block entirely, and accelerate the stepping
1382 3         78 $self->_print_debug_status('skipped');
1383              
1384 3         33 $self->_increment_progress;
1385              
1386 3         179 my $step = $ls->multiplier_step;
1387 3         32 $ls->_reset_chunk_state;
1388 3         63 $ls->multiplier_step( $step * 2 );
1389              
1390 3         111 $ls->prev_check('skipped rows');
1391 3         136 return 0;
1392             }
1393             elsif ($ls->end - $ls->start <= 0) {
1394             # Down to a single ID: We _have_ to process it
1395 12         4497 $ls->prev_check('at a single ID');
1396              
1397             # Complain, because this can be dangerous with a wild enough Row:ID ratio
1398 12 100       547 if ($ls->chunk_count > 1) {
1399 9         966 $progress->message('WARNING: Processing a single ID with many rows attached because resizing cannot proceed any further.');
1400 9         414 $progress->message('Consider flipping the relationship so that IDs and row counts are 1:1.');
1401             }
1402              
1403 12         330 return 1;
1404             }
1405             elsif ($chunk_percent > 1 + $self->min_chunk_percent) {
1406             # Too many rows: Backtrack to the previous range and try to bisect
1407 25         17409 $self->_print_debug_status('shrunk');
1408              
1409 25         327 $ls->_mark_timer;
1410              
1411             # If we have a min/max range, bisect down the middle. If not, walk back
1412             # to the previous range and decelerate the stepping, which should bring
1413             # it to a halfway point from this range and last.
1414 25         1727 my $lr = $ls->last_range;
1415 25 50 66     599 $lr->{max} = $ls->multiplier_range if !defined $lr->{max} || $ls->multiplier_range < $lr->{max};
1416 25   66     1563 $ls->multiplier_range( $lr->{min} || ($ls->multiplier_range - $ls->multiplier_step) );
1417             $ls->multiplier_step(
1418 25 100       16488 defined $lr->{min} ? ($lr->{max} - $lr->{min}) / 2 : $ls->multiplier_step / 2
1419             );
1420              
1421 25         23492 $ls->prev_check('too many rows');
1422 25         1393 return 0;
1423             }
1424              
1425             # The above three are more important than skipping the count checks. Better to
1426             # have too few rows than too many. The single ID check prevents infinite loops
1427             # from bisecting, though.
1428              
1429             elsif ($ls->checked_count > 10) {
1430             # Checked too many times: Just process it
1431 0         0 $ls->prev_check('too many checks');
1432 0         0 return 1;
1433             }
1434             elsif ($ls->end >= $self->max_id) {
1435             # At the end: Just process it
1436 8         1477 $ls->prev_check('at max_id');
1437 8         208 return 1;
1438             }
1439             elsif ($chunk_percent < $self->min_chunk_percent) {
1440             # Too few rows: Keep the start ID and accelerate towards a better endpoint
1441 75         7972 $self->_print_debug_status('expanded');
1442              
1443 75         744 $ls->_mark_timer;
1444              
1445             # If we have a min/max range, bisect down the middle. If not, keep
1446             # accelerating the stepping.
1447 75         4508 my $lr = $ls->last_range;
1448 75 50 66     1555 $lr->{min} = $ls->multiplier_range if !defined $lr->{min} || $ls->multiplier_range > $lr->{min};
1449             $ls->multiplier_step(
1450 75 50       1870 defined $lr->{max} ? ($lr->{max} - $lr->{min}) / 2 : $ls->multiplier_step * 2
1451             );
1452              
1453 75         3898 $ls->prev_check('too few rows');
1454 75         3100 return 0;
1455             }
1456              
1457 53         14834 $ls->prev_check('nothing wrong');
1458 53         1500 return 1;
1459             }
1460              
1461             #pod =head2 _runtime_checker
1462             #pod
1463             #pod Stores the previously processed chunk's runtime, and then adjusts C as
1464             #pod necessary.
1465             #pod
1466             #pod See L.
1467             #pod
1468             #pod =cut
1469              
1470             sub _runtime_checker {
1471 502     502   1137 my ($self) = @_;
1472 502         7387 my $ls = $self->loop_state;
1473 502 100       4161 return unless $self->target_time;
1474 110 50 33     1603 return unless $ls->chunk_size && $ls->prev_runtime; # prevent DIV/0
1475              
1476 110         4964 my $timings = $ls->last_timings;
1477              
1478 110   66     2065 my $new_timing = {
1479             runtime => $ls->prev_runtime,
1480             chunk_count => $ls->chunk_count || $ls->chunk_size,
1481             };
1482 110         5521 $new_timing->{chunk_per} = $new_timing->{chunk_count} / $ls->chunk_size;
1483              
1484             # Rowtime: a measure of how much of the chunk_size actually impacted the runtime
1485 110         27626 $new_timing->{rowtime} = $new_timing->{runtime} / $new_timing->{chunk_per};
1486              
1487             # Store the last five processing times
1488 110         21136 push @$timings, $new_timing;
1489 110 100       405 shift @$timings if @$timings > 5;
1490              
1491             # Figure out the averages and adjustment factor
1492 110         531 my $ttl = scalar @$timings;
1493 110         299 my $avg_rowtime = sum(map { $_->{rowtime} } @$timings) / $ttl;
  476         1317  
1494 110         34904 my $adjust_factor = $self->target_time / $avg_rowtime;
1495              
1496 110         20710 my $new_target_chunk_size = $ls->chunk_size;
1497 110         734 my $adjective;
1498 110 100       517 if ($adjust_factor > 1.05) {
    50          
1499             # Too fast: Raise the chunk size
1500              
1501 24 100       76 return unless $ttl >= 5; # must have a full set of timings
1502 4 50   20   21 return if any { $_->{runtime} >= $self->target_time } @$timings; # must ALL have low runtimes
  20         38  
1503              
1504 4         19 $new_target_chunk_size *= min(2, $adjust_factor); # never more than double
1505 4         6 $adjective = 'fast';
1506             }
1507             elsif ($adjust_factor < 0.95) {
1508             # Too slow: Lower the chunk size
1509              
1510 86 100       15892 return unless $ls->prev_runtime > $self->target_time; # last runtime must actually be too high
1511              
1512 65 50       1236 $new_target_chunk_size *=
1513             ($ls->prev_runtime < $self->target_time * 3) ?
1514             max(0.5, $adjust_factor) : # never less than half...
1515             $adjust_factor # ...unless the last runtime was waaaay off
1516             ;
1517 65 50       630 $new_target_chunk_size = 1 if $new_target_chunk_size < 1;
1518 65         143 $adjective = 'slow';
1519             }
1520              
1521 69         168 $new_target_chunk_size = int $new_target_chunk_size;
1522 69 100       982 return if $new_target_chunk_size == $ls->chunk_size; # either nothing changed or it's too miniscule
1523 5 50       41 return if $new_target_chunk_size < 1;
1524              
1525             # Print out a debug line, if enabled
1526 5 50       68 if ($self->debug) {
1527             # CLDR number formatters
1528 0         0 my $integer = $self->cldr->decimal_formatter;
1529 0         0 my $percent = $self->cldr->percent_formatter;
1530              
1531 0         0 $ls->{progress_bar}->message( sprintf(
1532             "Processing too %s, avg %4s of target time, adjusting chunk size from %s to %s",
1533             $adjective,
1534             $percent->format( 1 / $adjust_factor ),
1535             $integer->format( $ls->chunk_size ),
1536             $integer->format( $new_target_chunk_size ),
1537             ) );
1538             }
1539              
1540             # Change it!
1541 5         99 $ls->chunk_size($new_target_chunk_size);
1542 5 100       172 $ls->_reset_last_timings if $adjective eq 'fast'; # never snowball too quickly
1543 5         103 return 1;
1544             }
1545              
1546             #pod =head2 _increment_progress
1547             #pod
1548             #pod Increments the progress bar.
1549             #pod
1550             #pod =cut
1551              
1552             sub _increment_progress {
1553 505     505   1081 my ($self) = @_;
1554 505         7683 my $ls = $self->loop_state;
1555 505         9895 my $progress = $ls->progress_bar;
1556              
1557 505         9559 my $so_far = $ls->end - $self->min_id + 1;
1558 505 50       41414 $progress->target($so_far+1) if $ls->end > $self->max_id;
1559 505         16985 $progress->update($so_far);
1560             }
1561              
1562             #pod =head2 _print_debug_status
1563             #pod
1564             #pod Prints out a standard debug status line, if debug is enabled. What it prints is
1565             #pod generally uniform, but it depends on the processing action. Most of the data is
1566             #pod pulled from L.
1567             #pod
1568             #pod =cut
1569              
1570             sub _print_debug_status {
1571 605     605   1901 my ($self, $action) = @_;
1572 605 50       14672 return unless $self->debug;
1573              
1574 0           my $ls = $self->loop_state;
1575 0   0       my $sleep = $self->sleep || 0;
1576              
1577             # CLDR number formatters
1578 0           my $integer = $self->cldr->decimal_formatter;
1579 0           my $percent = $self->cldr->percent_formatter;
1580 0           my $decimal = $self->cldr->decimal_formatter(
1581             minimum_fraction_digits => 2,
1582             maximum_fraction_digits => 2,
1583             );
1584              
1585 0           my $message;
1586 0 0 0       if ($ls->start < 1_000_000_000 && $ls->end < 1_000_000_000) {
1587 0           $message = sprintf(
1588             'IDs %6u to %6u %9s, %9s rows found',
1589             $ls->start, $ls->end, $action,
1590             $integer->format( $ls->chunk_count ),
1591             );
1592             }
1593             else {
1594 0           $message = sprintf(
1595             'IDs %s to %s %s, %s rows found',
1596             $ls->start, $ls->end, $action,
1597             $ls->chunk_count,
1598             );
1599             }
1600              
1601 0 0         $message .= sprintf(
1602             ' (%4s of chunk size)',
1603             $percent->format( $ls->chunk_count / $ls->chunk_size ),
1604             ) if $ls->chunk_count;
1605              
1606 0 0         if ($action eq 'processed') {
1607 0 0         $message .= $sleep ?
1608             sprintf(
1609             ', %5s+%s sec runtime+sleep',
1610             $decimal->format( $ls->prev_runtime ),
1611             $decimal->format( $sleep )
1612             ) :
1613             sprintf(
1614             ', %5s sec runtime',
1615             $decimal->format( $ls->prev_runtime ),
1616             )
1617             ;
1618             }
1619              
1620             # Reduce spacing if the numbers are too large
1621 0 0 0       if ($ls->start > 1_000_000_000 || $ls->end > 1_000_000_000) {
1622 0           $message =~ s/\s+/ /g;
1623 0           $message =~ s/\(\s+/\(/g;
1624             }
1625              
1626 0           return $ls->progress_bar->message($message);
1627             }
1628              
1629             #pod =head1 CAVEATS
1630             #pod
1631             #pod =head2 Big Number Support
1632             #pod
1633             #pod If the module detects that the ID numbers are no longer safe for standard Perl NV
1634             #pod storage, it will automatically switch to using L and L for
1635             #pod big number support. If any blessed numbers are already being used to define the
1636             #pod attributes, this will also switch on the support.
1637             #pod
1638             #pod =head2 String-based IDs
1639             #pod
1640             #pod If you're working with C types or other string-based IDs to represent integers,
1641             #pod these may be subject to whatever string-based comparison rules your RDBMS uses when
1642             #pod calculating with C/C or using C. Row counting and chunk size scaling
1643             #pod will try to compensate, but will be mixing string-based comparisons from the RDBMS and
1644             #pod Perl-based integer math.
1645             #pod
1646             #pod Using the C function may help, but it may also cause critical indexes to be
1647             #pod ignored, especially if the function is used on the left-hand side against the column.
1648             #pod Strings with the exact same length may be safe from comparison weirdness, but YMMV.
1649             #pod
1650             #pod Non-integer inputs from ID columns, such as GUIDs or other alphanumeric strings, are not
1651             #pod currently supported. They would have to be converted to integers via SQL, and doing so
1652             #pod may run into a similar risk of having your RDBMS ignore indexes.
1653             #pod
1654             #pod =head1 SEE ALSO
1655             #pod
1656             #pod L, L, L
1657             #pod
1658             #pod =cut
1659              
1660             1;
1661              
1662             __END__