File Coverage

blib/lib/Schedule/LongSteps/Storage/DBIxClass.pm
Criterion Covered Total %
statement 12 16 75.0
branch n/a
condition n/a
subroutine 4 6 66.6
pod 1 1 100.0
total 17 23 73.9


line stmt bran cond sub pod time code
1             package Schedule::LongSteps::Storage::DBIxClass;
2             $Schedule::LongSteps::Storage::DBIxClass::VERSION = '0.021';
3 2     2   75824 use Moose;
  2         631093  
  2         17  
4             extends qw/Schedule::LongSteps::Storage/;
5              
6 2     2   19410 use DateTime;
  2         1018954  
  2         129  
7 2     2   501 use Log::Any qw/$log/;
  2         11158  
  2         18  
8 2     2   3814 use Scope::Guard;
  2         1010  
  2         1261  
9              
10             has 'schema' => ( is => 'ro', isa => 'DBIx::Class::Schema', required => 1);
11             has 'resultset_name' => ( is => 'ro', isa => 'Str', required => 1);
12              
13             has 'limit_per_tick' => ( is => 'ro', isa => 'Int', default => 50 );
14              
15             sub _get_resultset{
16 0     0     my ($self) = @_;
17 0           return $self->schema()->resultset($self->resultset_name());
18             }
19              
20             around [ 'prepare_due_processes', 'create_process' ] => sub{
21             my ($orig, $self, @rest ) = @_;
22              
23             # Transfer the current autocommit nature of the DBH
24             # as a transation might have been created on this DBH outside
25             # of this schema. A transaction on DBI sets AutoCommit to false
26             # on the DBH. transaction_depth is just a boolean on the storage.
27              
28             # First restore transaction depth as it was.
29             my $pre_transaction_depth = $self->schema()->storage()->transaction_depth();
30             my $guard = Scope::Guard->new(
31             sub{
32             $log->trace("Restoring transaction_depth = $pre_transaction_depth");
33             $self->schema()->storage()->transaction_depth( $pre_transaction_depth );
34             });
35              
36             my $current_transaction_depth = $self->schema()->storage()->dbh()->{AutoCommit} ? 0 : 1;
37             $log->trace("Setting transaction_depth as NOT dbh AutoCommit = ".$current_transaction_depth);
38             $self->schema()->storage()->transaction_depth( $current_transaction_depth );
39             return $self->$orig( @rest );
40             };
41              
42              
43             =head1 NAME
44              
45             Schedule::LongSteps::Storage::DBIxClass - DBIx::Class based storage.
46              
47             =head1 SYNOPSIS
48              
49             First instantiate a storage with your L<DBIx::Class::Schema> and the name
50             of the resultset that represent the stored process:
51              
52             my $storage = Schedule::LongSteps::Storage::DBIxClass->new({
53             schema => $dbic_schema,
54             resultset_name => 'LongstepsProcess'
55             });
56              
57             Then build and use a L<Schedule::LongSteps> object:
58              
59             my $long_steps = Schedule::LongSteps->new({ storage => $storage });
60              
61             ...
62              
63             =head1 ATTRIBUTES
64              
65             =over
66              
67             =item schema
68              
69             You DBIx::Class::Schema. Mandatory.
70              
71             =item resultset_name
72              
73             The name of the resultset holding the processes in your Schema. See section 'RESULTSET REQUIREMENTS'. Mandatory.
74              
75             =item limit_per_tick
76              
77             The maximum number of processes that will actually run each time you
78             call $longsteps->run_due_processes(). Use that to control how long it takes to run
79             a single call to $longsteps->run_due_processes().
80              
81             Note that you can have an arbitrary number of processes all doing $longsteps->run_due_processes() AT THE SAME TIME.
82              
83             This will ensure that no process step is run more than one time.
84              
85             Default to 50.
86              
87             =back
88              
89             =head1 RESULTSET REQUIREMENTS
90              
91             The resultset to use with this storage MUST contain the following columns, constraints and indices:
92              
93             =over
94              
95             =item id
96              
97             A unique primary key auto incrementable identifier
98              
99             =item process_class
100              
101             A VARCHAR long enough to hold your L<Schedule::LongSteps::Process> class names. NOT NULL.
102              
103             =item what
104              
105             A VARCHAR long enough to hold the name of one of your steps. Can be NULL.
106              
107             =item status
108              
109             A VARCHAR(50) NOT NULL, defaults to 'pending'
110              
111             =item run_at
112              
113             A Datetime (or timestamp with timezone in PgSQL). Will hold a UTC Timezoned date of the next run. Default to NULL.
114              
115             Please index this so it is fast to select a range.
116              
117             =item run_id
118              
119             A CHAR or VARCHAR (at least 36). Default to NULL.
120              
121             Please index this so it is fast to select rows with a matching run_id
122              
123             =item state
124              
125             A Reasonably long TEXT field (or JSON field in supporting databases) capable of holding
126             a JSON dump of pure Perl data. NOT NULL.
127              
128             You HAVE to implement inflating and deflating yourself. See L<DBIx::Class::InflateColumn::Serializer::JSON>
129             or similar techniques.
130              
131             See t/fullblown.t for a full blown working example.
132              
133             =item error
134              
135             A reasonably long TEXT field capable of holding a full stack trace in case something goes wrong. Defaults to NULL.
136              
137             =back
138              
139             =cut
140              
141             =head2 prepare_due_processes
142              
143             See L<Schedule::LongSteps::Storage::DBIxClass>
144              
145             =cut
146              
147             sub prepare_due_processes{
148             my ($self) = @_;
149              
150             my $now = DateTime->now();
151             my $rs = $self->_get_resultset();
152             my $dtf = $self->schema()->storage()->datetime_parser();
153              
154             my $uuid = $self->uuid()->create_str();
155             $log->info("Creating batch ID $uuid");
156              
157              
158             # Note that we do not use the SELECT FOR UPDATE technique here.
159             # Instead this generates a single UPDATE statement like this one:
160             # UPDATE longsteps_process SET run_id = ?, status = ? WHERE ( id IN ( SELECT me.id FROM longsteps_process me WHERE ( ( run_at <= ? AND run_id IS NULL ) ) LIMIT ? ) )
161             my $stuff = sub{
162             $rs->search({
163             run_at => { '<=' => $dtf->format_datetime( $now ) },
164             run_id => undef,
165             }, {
166             rows => $self->limit_per_tick(),
167             } )
168             ->update({
169             run_id => $uuid,
170             status => 'running'
171             });
172             };
173             $stuff->();
174              
175             # And return them as individual results.
176             return $rs->search({
177             run_id => $uuid,
178             })->all();
179             }
180              
181             =head2 create_process
182              
183             See L<Schedule::LongSteps::Storage>
184              
185             =cut
186              
187             sub create_process{
188             my ($self, $process_properties) = @_;
189             return $self->_get_resultset()->create($process_properties);
190             }
191              
192             =head2 find_process
193              
194             See L<Schedule::LongSteps::Storage>
195              
196             =cut
197              
198             sub find_process{
199 0     0 1   my ($self, $process_id) = @_;
200 0           return $self->_get_resultset()->find({ id => $process_id });
201             }
202              
203             __PACKAGE__->meta->make_immutable();