File Coverage

blib/lib/DBIx/BatchChunker.pm
Criterion Covered Total %
statement 340 363 93.6
branch 154 190 81.0
condition 46 76 60.5
subroutine 45 47 95.7
pod 3 4 75.0
total 588 680 86.4


line stmt bran cond sub pod time code
1             package DBIx::BatchChunker;
2              
3             our $AUTHORITY = 'cpan:GSG';
4             # ABSTRACT: Run large database changes safely
5 7     7   1276384 use version;
  7         12395  
  7         43  
6             our $VERSION = 'v1.0.0'; # VERSION
7              
8 7     7   3880 use Moo;
  7         66952  
  7         29  
9 7     7   11318 use MooX::StrictConstructor;
  7         81290  
  7         36  
10              
11 7     7   139077 use CLDR::Number;
  7         346359  
  7         261  
12              
13 7     7   3652 use Types::Standard qw( Str Bool Undef ArrayRef HashRef CodeRef InstanceOf Tuple Maybe Optional slurpy );
  7         470305  
  7         84  
14 7     7   15944 use Types::Numbers qw( NumRange UnsignedInt PerlSafeInt PositiveInt PositiveOrZeroNum );
  7         892365  
  7         95  
15 7     7   11256 use Type::Utils;
  7         28919  
  7         69  
16              
17 7     7   9156 use List::Util 1.33 (qw( min max sum any first )); # has any/all/etc.
  7         151  
  7         526  
18 7     7   48 use Math::BigInt upgrade => 'Math::BigFloat';
  7         20  
  7         59  
19 7     7   2162 use Math::BigFloat;
  7         16  
  7         39  
20 7     7   3502 use POSIX qw( ceil );
  7         14  
  7         54  
21 7     7   435 use Scalar::Util qw( blessed weaken );
  7         25  
  7         279  
22 7     7   3331 use Term::ProgressBar 2.14; # with silent option
  7         384866  
  7         300  
23 7     7   65 use Time::HiRes qw( time sleep );
  7         17  
  7         69  
24              
25 7     7   4103 use DBIx::BatchChunker::LoopState;
  7         32  
  7         307  
26              
27             # Don't export the above, but don't conflict with StrictConstructor, either
28 7     7   50 use namespace::clean -except => [qw< new meta >];
  7         14  
  7         72  
29              
30             # This is now an unused, dummy variable
31             our $DB_MAX_ID = ~0;
32              
33             #pod =encoding utf8
34             #pod
35             #pod =head1 SYNOPSIS
36             #pod
37             #pod use DBIx::BatchChunker;
38             #pod
39             #pod my $account_rs = $schema->resultset('Account')->search({
40             #pod account_type => 'deprecated',
41             #pod });
42             #pod
43             #pod my %params = (
44             #pod chunk_size => 5000,
45             #pod target_time => 5,
46             #pod
47             #pod rs => $account_rs,
48             #pod id_name => 'account_id',
49             #pod
50             #pod coderef => sub { $_[1]->delete },
51             #pod sleep => 1,
52             #pod verbose => 1,
53             #pod
54             #pod progress_name => 'Deleting deprecated accounts',
55             #pod process_past_max => 1,
56             #pod );
57             #pod
58             #pod # EITHER:
59             #pod # 1) Automatically construct and execute the changes:
60             #pod
61             #pod DBIx::BatchChunker->construct_and_execute(%params);
62             #pod
63             #pod # OR
64             #pod # 2) Manually construct and execute the changes:
65             #pod
66             #pod my $batch_chunker = DBIx::BatchChunker->new(%params);
67             #pod
68             #pod $batch_chunker->calculate_ranges;
69             #pod $batch_chunker->execute;
70             #pod
71             #pod =head1 DESCRIPTION
72             #pod
73             #pod This utility class is for running a large batch of DB changes in a manner that doesn't
74             #pod cause huge locks, outages, and missed transactions. It's highly flexible to allow for
75             #pod many different kinds of change operations, and dynamically adjusts chunks to its
76             #pod workload.
77             #pod
78             #pod It works by splitting up DB operations into smaller chunks within a loop. These chunks
79             #pod are transactionalized, either naturally as single-operation bulk work or by the loop
80             #pod itself. The full range is calculated beforehand to get the right start/end points.
81             #pod A L will be created to let the deployer know the
82             #pod processing status.
83             #pod
84             #pod There are two ways to use this class: call the automatic constructor and executor
85             #pod (L) or manually construct the object and call its methods. See
86             #pod L for examples of both.
87             #pod
88             #pod B You should not rely on this class to magically fix any and all locking
89             #pod problems the DB might experience just because it's being used. Thorough testing and
90             #pod best practices are still required.
91             #pod
92             #pod =head2 Processing Modes
93             #pod
94             #pod This class has several different modes of operation, depending on what was passed to
95             #pod the constructor:
96             #pod
97             #pod =head3 DBIC Processing
98             #pod
99             #pod If both L and L are passed, a chunk ResultSet is built from the base
100             #pod ResultSet, to add in a C clause, and the new ResultSet is passed into the
101             #pod coderef. The coderef should run some sort of active ResultSet operation from there.
102             #pod
103             #pod An L should be provided, but if it is missing it will be looked up based on
104             #pod the primary key of the ResultSource.
105             #pod
106             #pod If L is also enabled, then each chunk is wrapped in a transaction and the
107             #pod coderef is called for each row in the chunk. In this case, the coderef is passed a
108             #pod Result object instead of the chunk ResultSet.
109             #pod
110             #pod Note that whether L is enabled or not, the coderef execution is encapsulated
111             #pod in DBIC's retry logic, so any failures will re-connect and retry the coderef. Because of
112             #pod this, any changes you make within the coderef should be idempotent, or should at least be
113             #pod able to skip over any already-processed rows.
114             #pod
115             #pod =head3 Active DBI Processing
116             #pod
117             #pod If an L (DBI statement handle args) is passed without a L, the statement
118             #pod handle is merely executed on each iteration with the start and end IDs. It is assumed
119             #pod that the SQL for the statement handle contains exactly two placeholders for a C
120             #pod clause. For example:
121             #pod
122             #pod my $update_stmt = q{
123             #pod UPDATE
124             #pod accounts a
125             #pod JOIN account_updates au USING (account_id)
126             #pod SET
127             #pod a.time_stamp = au.time_stamp
128             #pod WHERE
129             #pod a.account_id BETWEEN ? AND ? AND
130             #pod a.time_stamp != au.time_stamp
131             #pod });
132             #pod
133             #pod The C clause should, of course, match the IDs being used in the loop.
134             #pod
135             #pod The statement is ran with L for retry protection. Therefore, the
136             #pod statement should also be idempotent.
137             #pod
138             #pod =head3 Query DBI Processing
139             #pod
140             #pod If both a L and a L are passed, the statement handle is prepared and
141             #pod executed. Like the L mode, the SQL for the statement should
142             #pod contain exactly two placeholders for a C clause. Then the C<$sth> is passed to
143             #pod the coderef. It's up to the coderef to extract data from the executed statement handle,
144             #pod and do something with it.
145             #pod
146             #pod If C is enabled, each chunk is wrapped in a transaction and the coderef is
147             #pod called for each row in the chunk. In this case, the coderef is passed a hashref of the
148             #pod row instead of the executed C<$sth>, with lowercase alias names used as keys.
149             #pod
150             #pod Note that in both cases, the coderef execution is encapsulated in a L
151             #pod call to either C or C (using L), so any failures will
152             #pod re-connect and retry the coderef. Because of this, any changes you make within the
153             #pod coderef should be idempotent, or should at least be able to skip over any
154             #pod already-processed rows.
155             #pod
156             #pod =head3 DIY Processing
157             #pod
158             #pod If a L is passed but neither a C nor a C are passed, then the
159             #pod multiplier loop does not touch the database. The coderef is merely passed the start and
160             #pod end IDs for each chunk. It is expected that the coderef will run through all database
161             #pod operations using those start and end points.
162             #pod
163             #pod It's still valid to include L, L, and/or L in the
164             #pod constructor to enable features like L or
165             #pod L.
166             #pod
167             #pod If you're not going to include any min/max statements for L, you will
168             #pod need to set L and L yourself, either in the constructor or before the
169             #pod L call. Using L is also not an option in this case, as
170             #pod this tries to call L without a way to do so.
171             #pod
172             #pod =head3 TL;DR Version
173             #pod
174             #pod $stmt = Active DBI Processing
175             #pod $stmt + $coderef = Query DBI Processing | $bc->$coderef($executed_sth)
176             #pod $stmt + $coderef + single_rows=>1 = Query DBI Processing | $bc->$coderef($row_hashref)
177             #pod $rs + $coderef = DBIC Processing | $bc->$coderef($chunk_rs)
178             #pod $rs + $coderef + single_rows=>1 = DBIC Processing | $bc->$coderef($result)
179             #pod $coderef = DIY Processing | $bc->$coderef($start, $end)
180             #pod
181             #pod =head1 ATTRIBUTES
182             #pod
183             #pod See the L section for more in-depth descriptions of these attributes and their
184             #pod usage.
185             #pod
186             #pod =head2 DBIC Processing Attributes
187             #pod
188             #pod =head3 rs
189             #pod
190             #pod A L. This is used by all methods as the base ResultSet onto which
191             #pod the DB changes will be applied. Required for DBIC processing.
192             #pod
193             #pod =cut
194              
195             has rs => (
196             is => 'ro',
197             isa => InstanceOf['DBIx::Class::ResultSet'],
198             required => 0,
199             );
200              
201             #pod =head3 rsc
202             #pod
203             #pod A L. This is only used to override L for min/max
204             #pod calculations. Optional.
205             #pod
206             #pod =cut
207              
208             has rsc => (
209             is => 'ro',
210             isa => InstanceOf['DBIx::Class::ResultSetColumn'],
211             required => 0,
212             );
213              
214             #pod =head3 dbic_retry_opts
215             #pod
216             #pod A hashref of DBIC retry options. These options control how retry protection works within
217             #pod DBIC. So far, there are two supported options:
218             #pod
219             #pod max_attempts = Number of times to retry
220             #pod retry_handler = Coderef that returns true to continue to retry or false to re-throw
221             #pod the last exception
222             #pod
223             #pod The default is to let the DBIC storage engine handle its own protection, which will retry
224             #pod once if the DB connection was disconnected. If you specify any options, even a blank
225             #pod hashref, BatchChunker will fill in a default C of 10, and an always-true
226             #pod C. This is similar to L's defaults.
227             #pod
228             #pod Under the hood, these are options that are passed to the as-yet-undocumented
229             #pod L. The C has access to the same
230             #pod BlockRunner object (passed as its only argument) and its methods/accessors, such as C,
231             #pod C, and C.
232             #pod
233             #pod =cut
234              
235             has dbic_retry_opts => (
236             is => 'ro',
237             isa => HashRef,
238             required => 0,
239             predicate => '_has_dbic_retry_opts',
240             );
241              
242             sub _dbic_block_runner {
243 686     686   2396 my ($self, $method, $coderef) = @_;
244              
245 686         2090 my $storage = $self->dbic_storage;
246              
247             # Block running disabled
248 686 100       2956 unless ($self->_has_dbic_retry_opts) {
249 521 100       1816 return $storage->txn_do($coderef) if $method eq 'txn';
250 445         2641 return $storage->dbh_do($coderef);
251             }
252              
253             # A very light wrapper around BlockRunner. No need to load BlockRunner, since DBIC
254             # loads it in before us if we're using this method.
255             DBIx::Class::Storage::BlockRunner->new(
256             # in case they are not defined with a custom dbic_retry_opts
257             max_attempts => 10,
258 102     102   69332 retry_handler => sub { 1 },
259              
260             # never overrides the important ones below
261 165 100       699 %{ $self->dbic_retry_opts },
  165         4214  
262              
263             storage => $storage,
264             wrap_txn => ($method eq 'txn' ? 1 : 0),
265             )->run($coderef);
266             }
267              
268             #pod =head2 DBI Processing Attributes
269             #pod
270             #pod =head3 dbi_connector
271             #pod
272             #pod A L object. Instead of L statement handles, this is the
273             #pod recommended way for BatchChunker to interface with the DBI, as it handles retries on
274             #pod failures. The connection mode used is whatever default is set within the object.
275             #pod
276             #pod Required for DBI Processing, unless L is specified.
277             #pod
278             #pod =cut
279              
280             has dbi_connector => (
281             is => 'ro',
282             isa => InstanceOf['DBIx::Connector::Retry'],
283             required => 0,
284             );
285              
286             #pod =head3 dbic_storage
287             #pod
288             #pod A DBIC storage object, as an alternative for L. There may be times when
289             #pod you want to run plain DBI statements, but are still using DBIC. In these cases, you
290             #pod don't have to create a L object to run those statements.
291             #pod
292             #pod This uses a BlockRunner object for retry protection, so the options in
293             #pod L would apply here.
294             #pod
295             #pod Required for DBI Processing, unless L is specified.
296             #pod
297             #pod =cut
298              
299             has dbic_storage => (
300             is => 'ro',
301             isa => InstanceOf['DBIx::Class::Storage::DBI'],
302             required => 0,
303             );
304              
305             #pod =head3 min_stmt
306             #pod
307             #pod =head3 max_stmt
308             #pod
309             #pod SQL statement strings or an arrayref of parameters for L.
310             #pod
311             #pod When executed, these statements should each return a single value, either the minimum or
312             #pod maximum ID that will be affected by the DB changes. These are used by
313             #pod L. Required if using either type of DBI Processing.
314             #pod
315             #pod =cut
316              
317             my $SQLStringOrSTHArgs_type = Type::Utils::declare(
318             name => 'SQLStringOrSTHArgs',
319             # Allow an SQL string, an optional hashref/undef, and any number of strings/undefs
320             parent => Tuple->parameterize(Str, Optional[Maybe[HashRef]], slurpy ArrayRef[Maybe[Str]]),
321             coercion => sub { $_ = [ $_ ] if Str->check($_); $_ },
322             message => sub { 'Must be either an SQL string or an arrayref of parameters for $sth creation (SQL + hashref/undef + binds)' },
323             );
324              
325             has min_stmt => (
326             is => 'ro',
327             isa => $SQLStringOrSTHArgs_type,
328             required => 0,
329             coerce => 1,
330             );
331              
332             has max_stmt => (
333             is => 'ro',
334             isa => $SQLStringOrSTHArgs_type,
335             required => 0,
336             coerce => 1,
337             );
338              
339             #pod =head3 stmt
340             #pod
341             #pod A SQL statement string or an arrayref of parameters for L + binds.
342             #pod
343             #pod If using L (no coderef), this is a L statement
344             #pod (usually DML like C). If using L (with
345             #pod coderef), this is a passive DQL (C
346             #pod
347             #pod In either case, the statement should contain C placeholders, which will be
348             #pod executed with the start/end ID points. If there are already bind placeholders in the
349             #pod arrayref, then make sure the C bind points are last on the list.
350             #pod
351             #pod Required for DBI Processing.
352             #pod
353             #pod =cut
354              
355             has stmt => (
356             is => 'ro',
357             isa => $SQLStringOrSTHArgs_type,
358             required => 0,
359             coerce => 1,
360             );
361              
362             #pod =head3 count_stmt
363             #pod
364             #pod A C
365             #pod L.
366             #pod
367             #pod Like L, it should contain C placeholders. In fact, the SQL should look
368             #pod exactly like the L query, except with C instead of the column list.
369             #pod
370             #pod Used only for L. Optional, but recommended for
371             #pod L.
372             #pod
373             #pod =cut
374              
375             has count_stmt => (
376             is => 'ro',
377             isa => $SQLStringOrSTHArgs_type,
378             required => 0,
379             coerce => 1,
380             );
381              
382             #pod =head2 Progress Bar Attributes
383             #pod
384             #pod =head3 progress_bar
385             #pod
386             #pod The progress bar used for all methods. This can be specified right before the method
387             #pod call to override the default used for that method. Unlike most attributes, this one
388             #pod is read-write, so it can be switched on-the-fly.
389             #pod
390             #pod Don't forget to remove or switch to a different progress bar if you want to use a
391             #pod different one for another method:
392             #pod
393             #pod $batch_chunker->progress_bar( $calc_pb );
394             #pod $batch_chunker->calculate_ranges;
395             #pod $batch_chunker->progress_bar( $loop_pb );
396             #pod $batch_chunker->execute;
397             #pod
398             #pod All of this is optional. If the progress bar isn't specified, the method will create
399             #pod a default one. If the terminal isn't interactive, the default L will
400             #pod be set to C to naturally skip the output.
401             #pod
402             #pod =cut
403              
404             has progress_bar => (
405             is => 'rw',
406             isa => InstanceOf['Term::ProgressBar'],
407             );
408              
409             #pod =head3 progress_name
410             #pod
411             #pod A string used by L to assist in creating a progress bar. Ignored if
412             #pod L is already specified.
413             #pod
414             #pod This is the preferred way of customizing the progress bar without having to create one
415             #pod from scratch.
416             #pod
417             #pod =cut
418              
419             has progress_name => (
420             is => 'rw',
421             isa => Str,
422             required => 0,
423             lazy => 1,
424             default => sub {
425             my $rs = shift->rs;
426             'Processing'.(defined $rs ? ' '.$rs->result_source->name : '');
427             },
428             );
429              
430             #pod =head3 cldr
431             #pod
432             #pod A L object. English speakers that use a typical C<1,234.56> format would
433             #pod probably want to leave it at the default. Otherwise, you should provide your own.
434             #pod
435             #pod =cut
436              
437             has cldr => (
438             is => 'rw',
439             isa => InstanceOf['CLDR::Number'],
440             required => 0,
441             lazy => 1,
442             default => sub { CLDR::Number->new(locale => 'en') },
443             );
444              
445             #pod =head3 verbose
446             #pod
447             #pod Boolean. By default, this is on, which displays timing stats on each chunk, as well as
448             #pod total numbers. This is still subject to non-interactivity checks from L.
449             #pod
450             #pod (This was previously defaulted to off, and called C, prior to v1.0.0.)
451             #pod
452             #pod =cut
453              
454             has verbose => (
455             is => 'rw',
456             isa => Bool,
457             required => 0,
458             default => 1,
459             );
460              
461             # Backwards-compatibility
462             *debug = \&verbose;
463              
464             #pod =head2 Common Attributes
465             #pod
466             #pod =head3 id_name
467             #pod
468             #pod The column name used as the iterator in the processing loops. This should be a primary
469             #pod key or integer-based (indexed) key, tied to the L.
470             #pod
471             #pod Optional. Used mainly in DBIC processing. If not specified, it will look up
472             #pod the first primary key column from L and use that.
473             #pod
474             #pod This can still be specified for other processing modes to use in progress bars.
475             #pod
476             #pod =cut
477              
478             has id_name => (
479             is => 'rw',
480             isa => Str,
481             required => 0,
482             trigger => \&_fix_id_name,
483             );
484              
485             sub _fix_id_name {
486 62     62   6187 my ($self, $id_name) = @_;
487 62 100 100     710 return if !$id_name || $id_name =~ /\./ || !defined $self->rs; # prevent an infinite trigger loop
      100        
488 12         56 $self->id_name( $self->rs->current_source_alias.".$id_name" );
489             }
490              
491             #pod =head3 coderef
492             #pod
493             #pod The coderef that will be called either on each chunk or each row, depending on how
494             #pod L is set. The first input is always the BatchChunker object. The rest
495             #pod vary depending on the processing mode:
496             #pod
497             #pod $stmt + $coderef = Query DBI Processing | $bc->$coderef($executed_sth)
498             #pod $stmt + $coderef + single_rows=>1 = Query DBI Processing | $bc->$coderef($row_hashref)
499             #pod $rs + $coderef = DBIC Processing | $bc->$coderef($chunk_rs)
500             #pod $rs + $coderef + single_rows=>1 = DBIC Processing | $bc->$coderef($result)
501             #pod $coderef = DIY Processing | $bc->$coderef($start, $end)
502             #pod
503             #pod The loop does not monitor the return values from the coderef.
504             #pod
505             #pod Required for all processing modes except L.
506             #pod
507             #pod =cut
508              
509             has coderef => (
510             is => 'ro',
511             isa => CodeRef,
512             required => 0,
513             );
514              
515             #pod =head3 chunk_size
516             #pod
517             #pod The amount of rows to be processed in each loop.
518             #pod
519             #pod This figure should be sized to keep per-chunk processing time at around 5 seconds. If
520             #pod this is too large, rows may lock for too long. If it's too small, processing may be
521             #pod unnecessarily slow.
522             #pod
523             #pod Default is 1 row, which is only appropriate if L (on by default) is
524             #pod enabled. This will cause the processing to slowly ramp up to the target time as
525             #pod BatchChunker gathers more data.
526             #pod
527             #pod Otherwise, if you using static chunk sizes with C turned off, figure out
528             #pod the right chunk size with a few test runs and set it here.
529             #pod
530             #pod (This was previously defaulted to 1000 rows, prior to v1.0.0.)
531             #pod
532             #pod =cut
533              
534             has chunk_size => (
535             is => 'rw',
536             isa => PositiveInt,
537             required => 0,
538             default => 1,
539             );
540              
541             #pod =head3 target_time
542             #pod
543             #pod The target runtime (in seconds) that chunk processing should strive to achieve, not
544             #pod including L. If the chunk processing times are too high or too low, this will
545             #pod dynamically adjust L to try to match the target.
546             #pod
547             #pod BatchChunker will still use the initial C, and it will need at least one
548             #pod chunk processed, before it makes adjustments. If the starting chunk size is grossly
549             #pod inaccurate to the workload, you could end up with several chunks in the beginning causing
550             #pod long-lasting locks before the runtime targeting reduces them down to a reasonable size.
551             #pod
552             #pod (Chunk size reductions are prioritized before increases, so it should re-size as soon as
553             #pod it finds the problem. But, one bad chunk could be all it takes to cause an outage.)
554             #pod
555             #pod Default is 5 seconds. Set this to zero to turn off runtime targeting. (This was
556             #pod previously defaulted to off prior to v0.92, and set to 15 in v0.92.)
557             #pod
558             #pod =cut
559              
560             has target_time => (
561             is => 'ro',
562             isa => PositiveOrZeroNum,
563             required => 0,
564             default => 5,
565             );
566              
567             #pod =head3 sleep
568             #pod
569             #pod The number of seconds to sleep after each chunk. It uses L's version, so
570             #pod fractional numbers are allowed.
571             #pod
572             #pod Default is 0.5 seconds, which is fine for most operations. You can likely get away with
573             #pod zero for smaller operations, but test it out first. If processing is going to take up a
574             #pod lot of disk I/O, you may want to consider a higher setting. If the database server
575             #pod spends too much time on processing, the replicas may have a hard time keeping up with
576             #pod standard load.
577             #pod
578             #pod This will increase the overall processing time of the loop, so try to find a balance
579             #pod between the two.
580             #pod
581             #pod (This was previously defaulted to 0 seconds, prior to v1.0.0.)
582             #pod
583             #pod =cut
584              
585             has 'sleep' => (
586             is => 'ro',
587             isa => PositiveOrZeroNum,
588             required => 0,
589             default => 0.5,
590             );
591              
592             #pod =head3 process_past_max
593             #pod
594             #pod Boolean that controls whether to check past the L during the loop. If the loop
595             #pod hits the end point, it will run another maximum ID check in the DB, and adjust C
596             #pod accordingly. If it somehow cannot run a DB check (no L or L available,
597             #pod for example), the last chunk will just be one at the end of C<< max_id + chunk_size >>.
598             #pod
599             #pod This is useful if the entire table is expected to be processed, and you don't want to
600             #pod miss any new rows that come up between L and the end of the loop.
601             #pod
602             #pod Turned off by default.
603             #pod
604             #pod =cut
605              
606             has process_past_max => (
607             is => 'ro',
608             isa => Bool,
609             required => 0,
610             default => 0,
611             );
612              
613             #pod =head3 single_rows
614             #pod
615             #pod Boolean that controls whether single rows are passed to the L or the chunk's
616             #pod ResultSets/statement handle is passed.
617             #pod
618             #pod Since running single-row operations in a DB is painfully slow (compared to bulk
619             #pod operations), this also controls whether the entire set of coderefs are encapsulated into
620             #pod a DB transaction. Transactionalizing the entire chunk brings the speed, and atomicity,
621             #pod back to what a bulk operation would be. (Bulk operations are still faster, but you can't
622             #pod do anything you want in a single DML statement.)
623             #pod
624             #pod Used only by L and L.
625             #pod
626             #pod =cut
627              
628             has single_rows => (
629             is => 'ro',
630             isa => Bool,
631             required => 0,
632             default => 0,
633             );
634              
635             #pod =head3 min_chunk_percent
636             #pod
637             #pod The minimum row count, as a percentage of L. This value is actually
638             #pod expressed in decimal form, i.e.: between 0 and 1.
639             #pod
640             #pod This value will be used to determine when to process, skip, or expand a block, based on
641             #pod a count query. The default is C<0.5> or 50%, which means that it will try to expand the
642             #pod block to a larger size if the row count is less than 50% of the chunk size. Zero-sized
643             #pod blocks will be skipped entirely.
644             #pod
645             #pod This "chunk resizing" is useful for large regions of the table that have been deleted, or
646             #pod when the incrementing ID has large gaps in it for other reasons. Wasting time on
647             #pod numerical gaps that span millions can slow down the processing considerably, especially
648             #pod if L is enabled.
649             #pod
650             #pod If this needs to be disabled, set this to 0. The maximum chunk percentage does not have
651             #pod a setting and is hard-coded at C<< 100% + min_chunk_percent >>.
652             #pod
653             #pod If DBIC processing isn't used, L is also required to enable chunk resizing.
654             #pod
655             #pod =cut
656              
657             has min_chunk_percent => (
658             is => 'ro',
659             isa => Type::Utils::declare(
660             name => 'PositiveZeroToOneNum',
661             parent => NumRange->parameterize(0, 1),
662             message => sub { 'Must be a number between 0 and 1' },
663             ),
664             required => 0,
665             default => 0.5,
666             );
667              
668             #pod =head3 min_id
669             #pod
670             #pod =head3 max_id
671             #pod
672             #pod Used by L to figure out the main start and end points. Calculated by
673             #pod L.
674             #pod
675             #pod Manually setting this is not recommended, as each database is different and the
676             #pod information may have changed between the DB change development and deployment. Instead,
677             #pod use L to fill in these values right before running the loop.
678             #pod
679             #pod =cut
680              
681             has min_id => (
682             is => 'rw',
683             isa => UnsignedInt,
684             );
685              
686             has max_id => (
687             is => 'rw',
688             isa => UnsignedInt,
689             );
690              
691             # Big number handling
692             has _use_bignums => (
693             is => 'rw',
694             isa => Bool,
695             default => 0,
696             trigger => \&_upgrade_attrs_to_bigint,
697             );
698              
699             my @BIGNUM_BC_ATTRS = (qw< chunk_size min_id max_id >);
700             my @BIGNUM_LS_ATTRS = (qw< start end prev_end multiplier_range multiplier_step chunk_size chunk_count >);
701              
702             sub _check_bignums {
703 704     704   2131 my ($self) = shift;
704 704 100       15194 return 1 if $self->_use_bignums; # already checked these
705              
706             # Auto-set _use_bignums if we detect that we need it
707 624         5388 my $set_bignums = 0;
708              
709             # If other values are passed, check those, too
710 624         2300 foreach my $val (@_) {
711 589 100       2762 next unless defined $val;
712 428 50 33     3494 $set_bignums = 1 if blessed $val || !PerlSafeInt->check($val);
713             }
714              
715             # Check BatchChunker attributes
716 624         13235 foreach my $attr (@BIGNUM_BC_ATTRS) {
717 1872         48933 my $val = $self->$attr();
718 1872 100       13773 next unless defined $val;
719 1750 100 66     6472 $set_bignums = 1 if blessed $val || !PerlSafeInt->check($val);
720             }
721              
722             # Check LoopState attributes
723 624 100       15948 if (my $ls = $self->loop_state) {
724 535         4934 foreach my $attr (@BIGNUM_LS_ATTRS) {
725 3745         92851 my $val = $ls->$attr();
726 3745 100       25542 next unless defined $val;
727 3210 100 66     8773 $set_bignums = 1 if blessed $val || !PerlSafeInt->check($val);
728             }
729             }
730              
731 624 100       2297 $self->_use_bignums(1) if $set_bignums;
732 624         2125 return $set_bignums;
733             }
734              
735             sub _upgrade_attrs_to_bigint {
736 4     4   115 my ($self, $is_on) = @_;
737 4 50       13 return unless $is_on;
738              
739             # Fix BatchChunker attributes
740 4         10 foreach my $attr (@BIGNUM_BC_ATTRS) {
741 12         2833 my $val = $self->$attr();
742 12 100       79 next unless defined $val; # nothing to upgrade
743 8 100       28 next if blessed $val; # already upgraded
744 6         28 $self->$attr( Math::BigInt->new($val) );
745             }
746              
747             # Fix LoopState attributes
748 4         1302 my $ls = $self->loop_state;
749 4 100       29 return unless $ls;
750 2         6 foreach my $attr (@BIGNUM_LS_ATTRS) {
751 14         8985 my $val = $ls->$attr();
752 14 100       92 next unless defined $val; # nothing to upgrade
753 12 50       35 next if blessed $val; # already upgraded
754 12         31 $ls->$attr( Math::BigInt->new($val) );
755             }
756             }
757              
758             #pod =head3 loop_state
759             #pod
760             #pod A L object designed to hold variables during the
761             #pod processing loop. The object will be cleared out after use. Most of the complexity is
762             #pod needed for chunk resizing.
763             #pod
764             #pod =cut
765              
766             has loop_state => (
767             is => 'rw',
768             isa => InstanceOf['DBIx::BatchChunker::LoopState'],
769             required => 0,
770             init_arg => undef,
771             clearer => 'clear_loop_state',
772             );
773              
774             # Backwards-compatibility
775             *_loop_state = \&loop_state;
776              
777             around BUILDARGS => sub {
778             my $next = shift;
779             my $class = shift;
780              
781             my %args = @_ == 1 ? %{ $_[0] } : @_;
782              
783             # debug -> verbose
784             $args{verbose} //= delete $args{debug} if exists $args{debug};
785              
786             # Auto-building of rsc and id_name can be a weird dependency dance, so it's better to
787             # handle it here.
788             my ($rsc, $rs, $id_name) = @args{qw< rsc rs id_name >};
789             if (defined $rsc && !$id_name) {
790             $args{id_name} = $rsc->{_as};
791             }
792             elsif (!defined $rsc && $id_name && defined $rs) {
793             $args{rsc} = $rs->get_column( $args{id_name} );
794             }
795             elsif (!defined $rsc && !$id_name && defined $rs) {
796             $args{id_name} = ($rs->result_source->primary_columns)[0];
797             $args{rsc} = $rs->get_column( $args{id_name} );
798             }
799             $rsc = $args{rsc};
800              
801             # Auto-add dbic_storage, if available
802             if (!defined $args{dbic_storage} && (defined $rs || defined $rsc)) {
803             $args{dbic_storage} = defined $rs ? $rs->result_source->storage : $rsc->_resultset->result_source->storage;
804             }
805              
806             # Find something to use as a dbi_connector, if it doesn't already exist
807             my @old_attrs = qw< sth min_sth max_sth count_sth >;
808             my @new_attrs = map { my $k = $_; $k =~ s/sth$/stmt/; $k } @old_attrs;
809             my $example_key = first { $args{$_} } @old_attrs;
810             if ($example_key && !defined $args{dbi_connector}) {
811             warn join "\n",
812             'The sth/*_sth options are now considered legacy usage in DBIx::BatchChunker. Because there is no',
813             'way to re-acquire the password, any attempt to reconnect will fail. Please use dbi_connector and',
814             'stmt/*_stmt instead for reconnection support.',
815             ''
816             ;
817              
818             # NOTE: There was a way to monkey-patch _connect to use $dbh->clone, but I've considered it
819             # too intrusive of a solution to use. Better to demand that the user switch to the new
820             # attributes, but have something that still works in most cases.
821              
822             # Attempt to build some sort of Connector object
823             require DBIx::Connector::Retry;
824             my $dbh = $args{$example_key}->{Database};
825              
826             my $conn = DBIx::Connector::Retry->new(
827             connect_info => [
828             join(':', 'dbi', $dbh->{Driver}{Name}, $dbh->{Name}),
829             $dbh->{Username},
830             '', # XXX: Can't acquire the password
831             # Sane %attr defaults on the off-chance that it actually re-connects
832             { AutoCommit => 1, RaiseError => 1 },
833             ],
834              
835             # Do not disconnect on DESTROY. The $dbh might still be used post-run.
836             disconnect_on_destroy => 0,
837             );
838              
839             # Pretend $conn->_connect was called and store our pre-existing $dbh
840             $conn->{_pid} = $$;
841             $conn->{_tid} = threads->tid if $INC{'threads.pm'};
842             $conn->{_dbh} = $dbh;
843             $conn->driver;
844              
845             $args{dbi_connector} = $conn;
846             }
847              
848             # Handle legacy options for sth/*_sth
849             foreach my $old_attr (grep { $args{$_} } @old_attrs) {
850             my $new_attr = $old_attr;
851             $new_attr =~ s/sth$/stmt/;
852              
853             my $sth = delete $args{$old_attr};
854             $args{$new_attr} ||= [ $sth->{Statement} ];
855             }
856              
857             # Now check to make sure dbi_connector is available for DBI processing
858             die 'DBI processing requires a dbi_connector or dbic_storage attribute!' if (
859             !(defined $args{dbi_connector} || defined $args{dbic_storage}) &&
860             (defined first { $args{$_} } @new_attrs)
861             );
862              
863             # Other sanity checks
864             die 'Range calculations require one of these attr sets: rsc, rs, or dbi_connector|dbic_storage + min_stmt + max_stmt' unless (
865             defined $args{rsc} ||
866             (defined $args{min_stmt} && defined $args{max_stmt}) ||
867             (!defined $args{dbi_connector} && !defined $args{dbic_storage} && defined $args{coderef}) # DIY mode is exempt
868             );
869              
870             die 'Block execution requires one of these attr sets: dbi_connector|dbic_storage + stmt, rs + coderef, or coderef' unless (
871             $args{stmt} ||
872             (defined $args{rs} && $args{coderef}) ||
873             $args{coderef}
874             );
875              
876             if (exists $args{target_time} && $args{target_time} == 0 && !$args{chunk_size}) {
877             warn join "\n",
878             'Dynamic chunk resizing is turned off and the chunk_size is still set to its default of 1.',
879             'This is probably not desirable, and you should find an appropriate static chunk size for',
880             'your workload.',
881             ''
882             ;
883             }
884              
885             $class->$next( %args );
886             };
887              
888             sub BUILD {
889 33     33 0 11412 my $self = shift;
890             # Make sure id_name gets fixed at the right time
891 33         554 $self->_fix_id_name( $self->id_name );
892 33         134 $self->_check_bignums;
893             }
894              
895             #pod =head1 CONSTRUCTORS
896             #pod
897             #pod See L for information on what can be passed into these constructors.
898             #pod
899             #pod =head2 new
900             #pod
901             #pod my $batch_chunker = DBIx::BatchChunker->new(...);
902             #pod
903             #pod A standard object constructor. If you use this constructor, you will need to
904             #pod manually call L and L to execute the DB changes.
905             #pod
906             #pod =head2 construct_and_execute
907             #pod
908             #pod my $batch_chunker = DBIx::BatchChunker->construct_and_execute(...);
909             #pod
910             #pod Constructs a DBIx::BatchChunker object and automatically calls
911             #pod L and L on it. Anything passed to this method will be passed
912             #pod through to the constructor.
913             #pod
914             #pod Returns the constructed object, post-execution. This is typically only useful if you want
915             #pod to inspect the attributes after the process has finished. Otherwise, it's safe to just
916             #pod ignore the return and throw away the object immediately.
917             #pod
918             #pod =cut
919              
920             sub construct_and_execute {
921 4     4 1 21111 my $class = shift;
922 4         99 my $db_change = $class->new(@_);
923              
924 3         13 $db_change->calculate_ranges;
925 3         14 $db_change->execute;
926              
927 3         132 return $db_change;
928             }
929              
930             #pod =head1 METHODS
931             #pod
932             #pod =head2 calculate_ranges
933             #pod
934             #pod my $batch_chunker = DBIx::BatchChunker->new(
935             #pod rsc => $account_rsc, # a ResultSetColumn
936             #pod ### OR ###
937             #pod rs => $account_rs, # a ResultSet
938             #pod id_name => 'account_id', # can be looked up if not provided
939             #pod ### OR ###
940             #pod dbi_connector => $conn, # DBIx::Connector::Retry object
941             #pod min_stmt => $min_stmt, # a SQL statement or DBI $sth args
942             #pod max_stmt => $max_stmt, # ditto
943             #pod
944             #pod ### Optional but recommended ###
945             #pod id_name => 'account_id', # will also be added into the progress bar title
946             #pod chunk_size => 20_000, # default is 1000
947             #pod
948             #pod ### Optional ###
949             #pod progress_bar => $progress, # defaults to a 2-count 'Calculating ranges' bar
950             #pod
951             #pod # ...other attributes for execute...
952             #pod );
953             #pod
954             #pod my $has_data_to_process = $batch_chunker->calculate_ranges;
955             #pod
956             #pod Given a L, L, or L statement
957             #pod argument set, this method calculates the min/max IDs of those objects. It fills in the
958             #pod L and L attributes, based on the ID data, and then returns 1.
959             #pod
960             #pod If either of the min/max statements don't return any ID data, this method will return 0.
961             #pod
962             #pod =cut
963              
964             sub calculate_ranges {
965 29     29 1 9556 my $self = shift;
966              
967 29   100     515 my $column_name = $self->id_name || '';
968 29         335 $column_name =~ s/^\w+\.//;
969              
970 29   33     491 my $progress = $self->progress_bar || Term::ProgressBar->new({
971             name => 'Calculating ranges'.($column_name ? " for $column_name" : ''),
972             count => 2,
973             ETA => 'linear',
974             silent => !(-t *STDERR && -t *STDIN), # STDERR is what {fh} is set to use
975             });
976              
977             # Actually run the statements
978 29         52864 my ($min_id, $max_id);
979 29 100       207 if ($self->rsc) {
    100          
980             $self->_dbic_block_runner( run => sub {
981             # In case the sub is retried
982 15     15   4610 $progress->update(0);
983              
984 15         764 $min_id = $self->rsc->min;
985 15         47219 $progress->update(1);
986              
987 15         875 $max_id = $self->rsc->max;
988 15         30828 $progress->update(2);
989 15         139 });
990             }
991             elsif ($self->dbic_storage) {
992             $self->_dbic_block_runner( run => sub {
993 5     5   1279 my $dbh = $self->dbic_storage->dbh;
994              
995             # In case the sub is retried
996 5         1541 $progress->update(0);
997              
998 5         210 ($min_id) = $dbh->selectrow_array(@{ $self->min_stmt });
  5         75  
999 5         912 $progress->update(1);
1000              
1001 5         222 ($max_id) = $dbh->selectrow_array(@{ $self->max_stmt });
  5         50  
1002 5         630 $progress->update(2);
1003 5         44 });
1004             }
1005             else {
1006             $self->dbi_connector->run(sub {
1007 9     9   1651 my $dbh = $_;
1008              
1009             # In case the sub is retried
1010 9         38 $progress->update(0);
1011              
1012 9         366 ($min_id) = $dbh->selectrow_array(@{ $self->min_stmt });
  9         124  
1013 9         2383 $progress->update(1);
1014              
1015 9         379 ($max_id) = $dbh->selectrow_array(@{ $self->max_stmt });
  9         83  
1016 9         975 $progress->update(2);
1017 9         251 });
1018             }
1019              
1020             # Set the ranges and return
1021 29 50 33     3140 return 0 unless defined $min_id && defined $max_id;
1022              
1023             # This would be the primary spot where we notice we need to upgrade, so check the values before
1024             # we attempt to mangle them.
1025 29 100       130 if ($self->_check_bignums($min_id, $max_id)) {
1026 2         39 $min_id = Math::BigFloat->new($min_id)->as_int;
1027 2         660 $max_id = Math::BigFloat->new($max_id)->as_int;
1028             }
1029             else {
1030 27         69 $min_id = int $min_id;
1031 27         60 $max_id = int $max_id;
1032             }
1033              
1034 29         1342 $self->min_id($min_id);
1035 29         2946 $self->max_id($max_id);
1036              
1037 29         2677 return 1;
1038             }
1039              
1040             #pod =head2 execute
1041             #pod
1042             #pod my $batch_chunker = DBIx::BatchChunker->new(
1043             #pod # ...other attributes for calculate_ranges...
1044             #pod
1045             #pod dbi_connector => $conn, # DBIx::Connector::Retry object
1046             #pod stmt => $do_stmt, # INSERT/UPDATE/DELETE $stmt with BETWEEN placeholders
1047             #pod ### OR ###
1048             #pod dbi_connector => $conn, # DBIx::Connector::Retry object
1049             #pod stmt => $select_stmt, # SELECT $stmt with BETWEEN placeholders
1050             #pod count_stmt => $count_stmt, # SELECT COUNT $stmt to be used for min_chunk_percent; optional
1051             #pod coderef => $coderef, # called code that does the actual work
1052             #pod ### OR ###
1053             #pod rs => $account_rs, # base ResultSet, which gets filtered with -between later on
1054             #pod id_name => 'account_id', # can be looked up if not provided
1055             #pod coderef => $coderef, # called code that does the actual work
1056             #pod ### OR ###
1057             #pod coderef => $coderef, # DIY database work; just pass the $start/$end IDs
1058             #pod
1059             #pod ### Optional but recommended ###
1060             #pod sleep => 0.25, # number of seconds to sleep each chunk; defaults to 0
1061             #pod process_past_max => 1, # use this if processing the whole table
1062             #pod single_rows => 1, # does $coderef get a single $row or the whole $chunk_rs / $stmt
1063             #pod min_chunk_percent => 0.25, # minimum row count of chunk size percentage; defaults to 0.5 (or 50%)
1064             #pod target_time => 5, # target runtime for dynamic chunk size scaling; default is 5 seconds
1065             #pod
1066             #pod progress_name => 'Updating Accounts', # easier than creating your own progress_bar
1067             #pod
1068             #pod ### Optional ###
1069             #pod progress_bar => $progress, # defaults to "Processing $source_name" bar
1070             #pod verbose => 1, # displays timing stats on each chunk
1071             #pod );
1072             #pod
1073             #pod $batch_chunker->execute if $batch_chunker->calculate_ranges;
1074             #pod
1075             #pod Applies the configured DB changes in chunks. Runs through the loop, processing a
1076             #pod statement handle, ResultSet, and/or coderef as it goes. Each loop iteration processes a
1077             #pod chunk of work, determined by L.
1078             #pod
1079             #pod The L method should be run first to fill in L and L.
1080             #pod If either of these are missing, the function will assume L couldn't
1081             #pod find them and warn about it.
1082             #pod
1083             #pod More details can be found in the L and L sections.
1084             #pod
1085             #pod =cut
1086              
1087             sub execute {
1088 31     31 1 23264 my $self = shift;
1089 31         103 $self->_check_bignums;
1090              
1091 31         79 my $count;
1092 31 100 66     437 if (defined $self->min_id && defined $self->max_id) {
1093 30         1110 $count = $self->max_id - $self->min_id + 1;
1094             }
1095              
1096             # Fire up the progress bar
1097 31   33     1676 my $progress = $self->progress_bar || Term::ProgressBar->new({
1098             name => $self->progress_name,
1099             count => $count || 1,
1100             ETA => 'linear',
1101             silent => !(-t *STDERR && -t *STDIN), # STDERR is what {fh} is set to use
1102             });
1103              
1104 31 100       57833 unless ($count) {
1105 1         6 $progress->message('No chunks; nothing to process...');
1106 1         34 return;
1107             }
1108              
1109 30 100       612 if ($self->verbose) {
1110             $progress->message(
1111             sprintf "(%s total chunks; %s total rows)",
1112 1         19 map { $self->cldr->decimal_formatter->format($_) } ( ceil($count / $self->chunk_size), $count)
  2         31046  
1113             );
1114             }
1115              
1116             # Loop state setup
1117 30         1967 $self->clear_loop_state;
1118 30         694 my $ls = $self->loop_state( DBIx::BatchChunker::LoopState->new({
1119             batch_chunker => $self,
1120             progress_bar => $progress,
1121             }) );
1122              
1123             # Da loop
1124 30   66     3537 while ($ls->prev_end < $self->max_id || $ls->start) {
1125 605         86152 $ls->multiplier_range($ls->multiplier_range + $ls->multiplier_step);
1126 605 100       99993 $ls->start ($ls->prev_end + 1) unless defined $ls->start; # this could be already set because of early 'next' calls
1127 605         91592 $ls->end(
1128             min(
1129             $ls->start + ceil($ls->multiplier_range * $ls->chunk_size) - 1, # ceil, because multiplier_* could be fractional
1130             $self->max_id, # ensure we never exceed max_id
1131             )
1132             );
1133 605         193519 $ls->chunk_count (undef);
1134              
1135 605 50       19842 next unless $self->_process_past_max_checker;
1136              
1137             # The actual DB processing
1138 605 100       4963 next unless $self->_process_block;
1139              
1140             # Record the time quickly
1141 502         28836 $ls->prev_runtime(time - $ls->timer);
1142              
1143             # Give the DB a little bit of breathing room
1144 502 50       217919029 sleep $self->sleep if $self->sleep;
1145              
1146 502         12666 $self->_print_chunk_status('processed');
1147 502         11630 $self->_increment_progress;
1148 502         43811 $self->_runtime_checker;
1149              
1150             # End-of-loop activities (skipped by early next)
1151 502         4575 $ls->_reset_chunk_state;
1152             }
1153 30         5914 $self->clear_loop_state;
1154              
1155             # Keep the finished time from the progress bar, in case there are other loops or output
1156 30 50       1181 unless ($progress->silent) {
1157 0         0 $progress->update( $progress->target );
1158 0         0 print "\n";
1159             }
1160             }
1161              
1162             #pod =head1 PRIVATE METHODS
1163             #pod
1164             #pod =head2 _process_block
1165             #pod
1166             #pod Runs the DB work and passes it to the coderef. Its return value determines whether the
1167             #pod block should be processed or not.
1168             #pod
1169             #pod =cut
1170              
1171             sub _process_block {
1172 605     605   1457 my ($self) = @_;
1173              
1174 605         8838 my $ls = $self->loop_state;
1175 605         4947 my $conn = $self->dbi_connector;
1176 605         2155 my $coderef = $self->coderef;
1177 605         2224 my $rs = $self->rs;
1178              
1179             # Figure out if the row count is worth the work
1180 605         980 my $chunk_rs;
1181 605         1976 my $count_stmt = $self->count_stmt;
1182 605         1451 my $chunk_count;
1183 605 100 100     4556 if ($count_stmt && defined $self->dbic_storage) {
    100          
    100          
1184             $self->_dbic_block_runner( run => sub {
1185 93 100   93   25788 $chunk_count = $self->dbic_storage->dbh->selectrow_array(
1186             @$count_stmt,
1187             (@$count_stmt == 1 ? undef : ()),
1188             $ls->start, $ls->end,
1189             );
1190 93         928 });
1191             }
1192             elsif ($count_stmt) {
1193             $chunk_count = $conn->run(sub {
1194 109 100   109   24786 $_->selectrow_array(
1195             @$count_stmt,
1196             (@$count_stmt == 1 ? undef : ()),
1197             $ls->start, $ls->end,
1198             );
1199 109         3093 });
1200             }
1201             elsif (defined $rs) {
1202 242         4592 $chunk_rs = $rs->search({
1203             $self->id_name => { -between => [$ls->start, $ls->end] },
1204             });
1205              
1206             $self->_dbic_block_runner( run => sub {
1207 242     242   76089 $chunk_count = $chunk_rs->count;
1208 242         149294 });
1209             }
1210              
1211 605 100       1174601 $chunk_count = Math::BigInt->new($chunk_count) if $self->_check_bignums($chunk_count);
1212 605         13414 $ls->chunk_count($chunk_count);
1213              
1214 605 100       68409 return unless $self->_chunk_count_checker;
1215              
1216             # NOTE: Try to minimize the amount of closures by using $self as much as possible
1217             # inside coderefs.
1218              
1219             # Do the work
1220 502 100 66     3522 if (my $stmt = $self->stmt) {
    100          
1221             ### Statement handle
1222 211 100       1017 my @prepare_args = @$stmt > 2 ? @$stmt[0..1] : @$stmt;
1223 211 100       3766 my @execute_args = (
1224             (@$stmt > 2 ? @$stmt[2..$#$stmt] : ()),
1225             $ls->start, $ls->end,
1226             );
1227              
1228 211 100 66     5943 if ($self->single_rows && $coderef) {
1229             # Transactional work
1230 67 100       311 if ($self->dbic_storage) {
1231             $self->_dbic_block_runner( txn => sub {
1232 49     49   17732 $self->loop_state->_mark_timer; # reset timer on retries
1233              
1234 49         2853 my $sth = $self->dbic_storage->dbh->prepare(@prepare_args);
1235 49         17166 $sth->execute(@execute_args);
1236              
1237 49         1911 while (my $row = $sth->fetchrow_hashref('NAME_lc')) { $self->coderef->($self, $row) }
  73         41073  
1238 44         380 });
1239             }
1240             else {
1241             $conn->txn(sub {
1242 84     84   34515 $self->loop_state->_mark_timer; # reset timer on retries
1243              
1244 84         4154 my $sth = $_->prepare(@prepare_args);
1245 84         8141 $sth->execute(@execute_args);
1246              
1247 28         3666 while (my $row = $sth->fetchrow_hashref('NAME_lc')) { $self->coderef->($self, $row) }
  22         173  
1248 23         615 });
1249             }
1250             }
1251             else {
1252             # Bulk work (or DML)
1253 144 100       667 if ($self->dbic_storage) {
1254             $self->_dbic_block_runner( run => sub {
1255 97     97   29934 $self->loop_state->_mark_timer; # reset timer on retries
1256              
1257 97         4351 my $sth = $self->dbic_storage->dbh->prepare(@prepare_args);
1258 97         37934 $sth->execute(@execute_args);
1259              
1260 51 100       1580 $self->coderef->($self, $sth) if $self->coderef;
1261 51         627 });
1262             }
1263             else {
1264             $conn->run(sub {
1265 139     139   33163 $self->loop_state->_mark_timer; # reset timer on retries
1266              
1267 139         6682 my $sth = $_->prepare(@prepare_args);
1268 139         12469 $sth->execute(@execute_args);
1269              
1270 93 100       4464 $self->coderef->($self, $sth) if $self->coderef;
1271 93         2585 });
1272             }
1273             }
1274             }
1275             elsif (defined $rs && $coderef) {
1276             ### ResultSet with coderef
1277              
1278 230 100       1046 if ($self->single_rows) {
1279             # Transactional work
1280             $self->_dbic_block_runner( txn => sub {
1281             # reset timer/$rs on retries
1282 83     83   27272 $self->loop_state->_mark_timer;
1283 83         4366 $chunk_rs->reset;
1284              
1285 83         15792 while (my $row = $chunk_rs->next) { $self->coderef->($self, $row) }
  76         122565  
1286 78         593 });
1287             }
1288             else {
1289             # Bulk work
1290             $self->_dbic_block_runner( run => sub {
1291             # reset timer/$rs on retries
1292 198     198   49328 $self->loop_state->_mark_timer;
1293 198         10231 $chunk_rs->reset;
1294              
1295 198         33307 $self->coderef->($self, $chunk_rs);
1296 152         1179 });
1297             }
1298             }
1299             else {
1300             ### Something a bit more free-form
1301              
1302 61         951 $self->$coderef($ls->start, $ls->end);
1303             }
1304              
1305 502         17462617 return 1;
1306             }
1307              
1308             #pod =head2 _process_past_max_checker
1309             #pod
1310             #pod Checks to make sure the current endpoint is actually the end, by checking the database.
1311             #pod Its return value determines whether the block should be processed or not.
1312             #pod
1313             #pod See L.
1314             #pod
1315             #pod =cut
1316              
1317             sub _process_past_max_checker {
1318 605     605   1730 my ($self) = @_;
1319 605         9691 my $ls = $self->loop_state;
1320 605         11949 my $progress = $ls->progress_bar;
1321              
1322 605 100       6706 return 1 unless $self->process_past_max;
1323 80 100       1199 return 1 unless $ls->end >= $self->max_id;
1324              
1325             # No checks for DIY, if they didn't include a max_stmt
1326 6 50 33     266 unless (defined $self->rsc || $self->max_stmt) {
1327             # There's no way to size this, so add one more chunk
1328 0         0 $ls->end($self->max_id + $ls->chunk_size);
1329 0         0 return 1;
1330             }
1331              
1332             # Run another MAX check
1333 6 50       92 $progress->message('Reached end; re-checking max ID') if $self->verbose;
1334 6         57 my $new_max_id;
1335 6 50       33 if (defined( my $rsc = $self->rsc )) {
    0          
1336             $self->_dbic_block_runner( run => sub {
1337 6     6   1665 $new_max_id = $rsc->max;
1338 6         56 });
1339             }
1340             elsif ($self->dbic_storage) {
1341             $self->_dbic_block_runner( run => sub {
1342 0     0   0 ($new_max_id) = $self->dbic_storage->dbh->selectrow_array(@{ $self->max_stmt });
  0         0  
1343 0         0 });
1344             }
1345             else {
1346             ($new_max_id) = $self->dbi_connector->run(sub {
1347 0     0   0 $_->selectrow_array(@{ $self->max_stmt });
  0         0  
1348 0         0 });
1349             }
1350 6         13563 $ls->_mark_timer; # the above query shouldn't impact runtimes
1351              
1352             # Convert $new_max_id if necessary
1353 6 100       286 $new_max_id = Math::BigInt->new($new_max_id) if $self->_check_bignums($new_max_id);
1354              
1355 6 50 33     195 if (!$new_max_id || $new_max_id eq '0E0') {
    100          
    50          
1356             # No max: No affected rows to change
1357 0 0       0 $progress->message('No max ID found; nothing left to process...') if $self->verbose;
1358 0         0 $ls->end($self->max_id);
1359              
1360 0         0 $ls->prev_check('no max');
1361 0         0 return 0;
1362             }
1363             elsif ($new_max_id > $self->max_id) {
1364             # New max ID
1365 2 50       166 $progress->message( sprintf 'New max ID set from %s to %s', $self->max_id, $new_max_id ) if $self->verbose;
1366 2         46 $self->max_id($new_max_id);
1367 2         769 $progress->target( $new_max_id - $self->min_id + 1 );
1368 2         3476 $progress->update( $progress->last_update );
1369             }
1370             elsif ($new_max_id == $self->max_id) {
1371             # Same max ID
1372 4 50       298 $progress->message( sprintf 'Found max ID %s; same as end', $new_max_id ) if $self->verbose;
1373             }
1374             else {
1375             # Max too low
1376 0 0       0 $progress->message( sprintf 'Found max ID %s; ignoring...', $new_max_id ) if $self->verbose;
1377             }
1378              
1379             # Run another boundary check with the new max_id value
1380 6         430 $ls->end( min($ls->end, $self->max_id) );
1381              
1382 6         1791 return 1;
1383             }
1384              
1385             #pod =head2 _chunk_count_checker
1386             #pod
1387             #pod Checks the chunk count to make sure it's properly sized. If not, it will try to shrink
1388             #pod or expand the current chunk (in C increments) as necessary. Its return value
1389             #pod determines whether the block should be processed or not.
1390             #pod
1391             #pod See L.
1392             #pod
1393             #pod This is not to be confused with the L, which adjusts C
1394             #pod after processing, based on previous run times.
1395             #pod
1396             #pod =cut
1397              
1398             sub _chunk_count_checker {
1399 605     605   1494 my ($self) = @_;
1400 605         9310 my $ls = $self->loop_state;
1401 605         11280 my $progress = $ls->progress_bar;
1402              
1403             # Chunk sizing is essentially disabled, so bounce out of here
1404 605 100 100     9712 if ($self->min_chunk_percent <= 0 || !defined $ls->chunk_count) {
1405 429         8139 $ls->prev_check('disabled');
1406 429         15911 return 1;
1407             }
1408              
1409 176         3507 my $chunk_percent = $ls->chunk_count / $ls->chunk_size;
1410 176         36807 $ls->checked_count( $ls->checked_count + 1 );
1411              
1412 176 100 66     7994 if ($ls->chunk_count == 0 && $self->min_chunk_percent > 0) {
    100          
    100          
    50          
    100          
    100          
1413             # No rows: Skip the block entirely, and accelerate the stepping
1414 3         45 $self->_print_chunk_status('skipped');
1415              
1416 3         24 $self->_increment_progress;
1417              
1418 3         151 my $step = $ls->multiplier_step;
1419 3         29 $ls->_reset_chunk_state;
1420 3         58 $ls->multiplier_step( $step * 2 );
1421              
1422 3         102 $ls->prev_check('skipped rows');
1423 3         139 return 0;
1424             }
1425             elsif ($ls->end - $ls->start <= 0) {
1426             # Down to a single ID: We _have_ to process it
1427 12         3968 $ls->prev_check('at a single ID');
1428              
1429             # Complain, because this can be dangerous with a wild enough Row:ID ratio
1430 12 100       500 if ($ls->chunk_count > 1) {
1431 9         896 $progress->message('WARNING: Processing a single ID with many rows attached because resizing cannot proceed any further.');
1432 9         359 $progress->message('Consider flipping the relationship so that IDs and row counts are 1:1.');
1433             }
1434              
1435 12         322 return 1;
1436             }
1437             elsif ($chunk_percent > 1 + $self->min_chunk_percent) {
1438             # Too many rows: Backtrack to the previous range and try to bisect
1439 25         15546 $self->_print_chunk_status('shrunk');
1440              
1441 25         316 $ls->_mark_timer;
1442              
1443             # If we have a min/max range, bisect down the middle. If not, walk back
1444             # to the previous range and decelerate the stepping, which should bring
1445             # it to a halfway point from this range and last.
1446 25         1577 my $lr = $ls->last_range;
1447 25 50 66     568 $lr->{max} = $ls->multiplier_range if !defined $lr->{max} || $ls->multiplier_range < $lr->{max};
1448 25   66     1486 $ls->multiplier_range( $lr->{min} || ($ls->multiplier_range - $ls->multiplier_step) );
1449             $ls->multiplier_step(
1450 25 100       15576 defined $lr->{min} ? ($lr->{max} - $lr->{min}) / 2 : $ls->multiplier_step / 2
1451             );
1452              
1453 25         21191 $ls->prev_check('too many rows');
1454 25         1292 return 0;
1455             }
1456              
1457             # The above three are more important than skipping the count checks. Better to
1458             # have too few rows than too many. The single ID check prevents infinite loops
1459             # from bisecting, though.
1460              
1461             elsif ($ls->checked_count > 10) {
1462             # Checked too many times: Just process it
1463 0         0 $ls->prev_check('too many checks');
1464 0         0 return 1;
1465             }
1466             elsif ($ls->end >= $self->max_id) {
1467             # At the end: Just process it
1468 8         1568 $ls->prev_check('at max_id');
1469 8         233 return 1;
1470             }
1471             elsif ($chunk_percent < $self->min_chunk_percent) {
1472             # Too few rows: Keep the start ID and accelerate towards a better endpoint
1473 75         8717 $self->_print_chunk_status('expanded');
1474              
1475 75         1234 $ls->_mark_timer;
1476              
1477             # If we have a min/max range, bisect down the middle. If not, keep
1478             # accelerating the stepping.
1479 75         4358 my $lr = $ls->last_range;
1480 75 50 66     1742 $lr->{min} = $ls->multiplier_range if !defined $lr->{min} || $ls->multiplier_range > $lr->{min};
1481             $ls->multiplier_step(
1482 75 50       2096 defined $lr->{max} ? ($lr->{max} - $lr->{min}) / 2 : $ls->multiplier_step * 2
1483             );
1484              
1485 75         4151 $ls->prev_check('too few rows');
1486 75         3581 return 0;
1487             }
1488              
1489 53         13358 $ls->prev_check('nothing wrong');
1490 53         1415 return 1;
1491             }
1492              
1493             #pod =head2 _runtime_checker
1494             #pod
1495             #pod Stores the previously processed chunk's runtime, and then adjusts C as
1496             #pod necessary.
1497             #pod
1498             #pod See L.
1499             #pod
1500             #pod =cut
1501              
1502             sub _runtime_checker {
1503 502     502   1586 my ($self) = @_;
1504 502         9215 my $ls = $self->loop_state;
1505 502 100       6573 return unless $self->target_time;
1506 110 50 33     2620 return unless $ls->chunk_size && $ls->prev_runtime; # prevent DIV/0
1507              
1508 110         8017 my $timings = $ls->last_timings;
1509              
1510 110   66     2629 my $new_timing = {
1511             runtime => $ls->prev_runtime,
1512             chunk_count => $ls->chunk_count || $ls->chunk_size,
1513             };
1514 110         7531 $new_timing->{chunk_per} = $new_timing->{chunk_count} / $ls->chunk_size;
1515              
1516             # Rowtime: a measure of how much of the chunk_size actually impacted the runtime
1517 110         36457 $new_timing->{rowtime} = $new_timing->{runtime} / $new_timing->{chunk_per};
1518              
1519             # Store the last five processing times
1520 110         27117 push @$timings, $new_timing;
1521 110 100       607 shift @$timings if @$timings > 5;
1522              
1523             # Figure out the averages and adjustment factor
1524 110         780 my $ttl = scalar @$timings;
1525 110         458 my $avg_rowtime = sum(map { $_->{rowtime} } @$timings) / $ttl;
  476         2178  
1526 110         40733 my $adjust_factor = $self->target_time / $avg_rowtime;
1527              
1528 110         23450 my $new_target_chunk_size = $ls->chunk_size;
1529 110         993 my $adjective;
1530 110 100       756 if ($adjust_factor > 1.05) {
    50          
1531             # Too fast: Raise the chunk size
1532              
1533 24 100       131 return unless $ttl >= 5; # must have a full set of timings
1534 4 50   20   45 return if any { $_->{runtime} >= $self->target_time } @$timings; # must ALL have low runtimes
  20         48  
1535              
1536 4         32 $new_target_chunk_size *= min(2, $adjust_factor); # never more than double
1537 4         13 $adjective = 'fast';
1538             }
1539             elsif ($adjust_factor < 0.95) {
1540             # Too slow: Lower the chunk size
1541              
1542 86 100       17847 return unless $ls->prev_runtime > $self->target_time; # last runtime must actually be too high
1543              
1544 65 50       1625 $new_target_chunk_size *=
1545             ($ls->prev_runtime < $self->target_time * 3) ?
1546             max(0.5, $adjust_factor) : # never less than half...
1547             $adjust_factor # ...unless the last runtime was waaaay off
1548             ;
1549 65 50       861 $new_target_chunk_size = 1 if $new_target_chunk_size < 1;
1550 65         134 $adjective = 'slow';
1551             }
1552              
1553 69         188 $new_target_chunk_size = int $new_target_chunk_size;
1554 69 100       1185 return if $new_target_chunk_size == $ls->chunk_size; # either nothing changed or it's too miniscule
1555 5 50       62 return if $new_target_chunk_size < 1;
1556              
1557             # Print out a processing line, if enabled
1558 5 50       96 if ($self->verbose) {
1559             # CLDR number formatters
1560 0         0 my $integer = $self->cldr->decimal_formatter;
1561 0         0 my $percent = $self->cldr->percent_formatter;
1562              
1563 0         0 $ls->{progress_bar}->message( sprintf(
1564             "Processing too %s, avg %4s of target time, adjusting chunk size from %s to %s",
1565             $adjective,
1566             $percent->format( 1 / $adjust_factor ),
1567             $integer->format( $ls->chunk_size ),
1568             $integer->format( $new_target_chunk_size ),
1569             ) );
1570             }
1571              
1572             # Change it!
1573 5         128 $ls->chunk_size($new_target_chunk_size);
1574 5 100       295 $ls->_reset_last_timings if $adjective eq 'fast'; # never snowball too quickly
1575 5         131 return 1;
1576             }
1577              
1578             #pod =head2 _increment_progress
1579             #pod
1580             #pod Increments the progress bar.
1581             #pod
1582             #pod =cut
1583              
1584             sub _increment_progress {
1585 505     505   1421 my ($self) = @_;
1586 505         11887 my $ls = $self->loop_state;
1587 505         16721 my $progress = $ls->progress_bar;
1588              
1589 505         15016 my $so_far = $ls->end - $self->min_id + 1;
1590 505 50       57946 $progress->target($so_far+1) if $ls->end > $self->max_id;
1591 505         24801 $progress->update($so_far);
1592             }
1593              
1594             #pod =head2 _print_chunk_status
1595             #pod
1596             #pod Prints out a standard chunk status line, if L is enabled. What it prints is
1597             #pod generally uniform, but it depends on the processing action. Most of the data is
1598             #pod pulled from L.
1599             #pod
1600             #pod =cut
1601              
1602             sub _print_chunk_status {
1603 605     605   4595 my ($self, $action) = @_;
1604 605 100       47419 return unless $self->verbose;
1605              
1606 14         369 my $ls = $self->loop_state;
1607 14   50     160 my $sleep = $self->sleep || 0;
1608              
1609             # CLDR number formatters
1610 14         214 my $integer = $self->cldr->decimal_formatter;
1611 14         17217 my $percent = $self->cldr->percent_formatter;
1612 14         28564 my $decimal = $self->cldr->decimal_formatter(
1613             minimum_fraction_digits => 2,
1614             maximum_fraction_digits => 2,
1615             );
1616              
1617 14         13757 my $message;
1618 14 50 33     254 if ($ls->start < 1_000_000_000 && $ls->end < 1_000_000_000) {
1619 14         636 $message = sprintf(
1620             'IDs %6u to %6u %9s, %9s rows found',
1621             $ls->start, $ls->end, $action,
1622             $integer->format( $ls->chunk_count ),
1623             );
1624             }
1625             else {
1626 0         0 $message = sprintf(
1627             'IDs %s to %s %s, %s rows found',
1628             $ls->start, $ls->end, $action,
1629             $ls->chunk_count,
1630             );
1631             }
1632              
1633 14 50       7486 $message .= sprintf(
1634             ' (%4s of chunk size)',
1635             $percent->format( $ls->chunk_count / $ls->chunk_size ),
1636             ) if $ls->chunk_count;
1637              
1638 14 100       10763 if ($action eq 'processed') {
1639 5 50       178 $message .= $sleep ?
1640             sprintf(
1641             ', %5s+%s sec runtime+sleep',
1642             $decimal->format( $ls->prev_runtime ),
1643             $decimal->format( $sleep )
1644             ) :
1645             sprintf(
1646             ', %5s sec runtime',
1647             $decimal->format( $ls->prev_runtime ),
1648             )
1649             ;
1650             }
1651              
1652             # Reduce spacing if the numbers are too large
1653 14 50 33     5418 if ($ls->start > 1_000_000_000 || $ls->end > 1_000_000_000) {
1654 0         0 $message =~ s/\s+/ /g;
1655 0         0 $message =~ s/\(\s+/\(/g;
1656             }
1657              
1658 14         601 return $ls->progress_bar->message($message);
1659             }
1660              
1661             #pod =head1 CAVEATS
1662             #pod
1663             #pod =head2 Big Number Support
1664             #pod
1665             #pod If the module detects that the ID numbers are no longer safe for standard Perl NV
1666             #pod storage, it will automatically switch to using L and L for
1667             #pod big number support. If any blessed numbers are already being used to define the
1668             #pod attributes, this will also switch on the support.
1669             #pod
1670             #pod =head2 String-based IDs
1671             #pod
1672             #pod If you're working with C types or other string-based IDs to represent integers,
1673             #pod these may be subject to whatever string-based comparison rules your RDBMS uses when
1674             #pod calculating with C/C or using C. Row counting and chunk size scaling
1675             #pod will try to compensate, but will be mixing string-based comparisons from the RDBMS and
1676             #pod Perl-based integer math.
1677             #pod
1678             #pod Using the C function may help, but it may also cause critical indexes to be
1679             #pod ignored, especially if the function is used on the left-hand side against the column.
1680             #pod Strings with the exact same length may be safe from comparison weirdness, but YMMV.
1681             #pod
1682             #pod Non-integer inputs from ID columns, such as GUIDs or other alphanumeric strings, are not
1683             #pod currently supported. They would have to be converted to integers via SQL, and doing so
1684             #pod may run into a similar risk of having your RDBMS ignore indexes.
1685             #pod
1686             #pod =head1 SEE ALSO
1687             #pod
1688             #pod L, L, L
1689             #pod
1690             #pod =cut
1691              
1692             1;
1693              
1694             __END__