File Coverage

blib/lib/Schedule/LongSteps.pm
Criterion Covered Total %
statement 83 86 96.5
branch 19 22 86.3
condition 6 8 75.0
subroutine 12 13 92.3
pod 6 6 100.0
total 126 135 93.3


line stmt bran cond sub pod time code
1             package Schedule::LongSteps;
2             $Schedule::LongSteps::VERSION = '0.021';
3             # ABSTRACT: Manage long term processes over arbitrary large spans of time.
4              
5 13     13   2693695 use Moose;
  13         4841420  
  13         94  
6              
7             =head1 NAME
8              
9             Schedule::LongSteps - Manage long term processes over arbitrary large spans of time.
10              
11              
12             =head1 ABSTRACT
13              
14             =for html <a href="https://travis-ci.org/skinnyjeans/Schedule-LongSteps"><img src="https://travis-ci.org/skinnyjeans/Schedule-LongSteps.svg?branch=dynamo"></a>
15              
16             This attempts to solve the problem of defining and running a set of potentially conditional steps accross an arbitrary long timespan.
17              
18             An example of such a process would be: "After an order has been started, if more than one hour, send an email reminder every 2 days until the order is finished. Give up after a month"". You get the idea.
19              
20             Such a process is usually a pain to implement and this is an attempt to provide a framework so it would make writing and testing such a process as easy as writing and testing a good old Class.
21              
22             =head1 INCOMPATIBLE CHANGES
23              
24             From 0.013
25              
26             The API of Storage has changed between 0.012 and 0.013. If you've written your own storage, you
27             will want to manage that. If not, then don't worry about it.
28              
29             =head1 CONCEPTS
30              
31             =head2 Process
32              
33             A Process represents a set of logically linked steps that need to run over a long span of times (hours, months, even years..). It persists in a Storage.
34              
35             At the logical level, the persistant Process has the following attributes (See L<Schedule::LongSteps::Storage::DBIxClass> for a comprehensive list):
36              
37             - what. Which step should it run next.
38              
39             - run_at. A L<DateTime> at which this next step should be run. This allows running a step far in the future.
40              
41             - status. Is the step running, or paused or is the process terminated.
42              
43             - state. The persistant state of your application. This should be a pure Perl hash (JSONable).
44              
45             Users (you) implement their business process as a subclass of L<Schedule::LongSteps::Process>. Such subclasses can have contextual properties
46             as Moose properties that will have to be supplied by the L<Schedule::LongSteps> management methods.
47              
48             =head2 Steps
49              
50             A step is simply a subroutine in a process class that runs some business code. It always returns either a new step to be run
51             or a final step marker.
52              
53             =head2 Storage
54              
55             A storage provides the backend to persist processes. Build a Schedule::LongSteps with a storage instance.
56              
57             See section PERSISTANCE for a list of available storage classes.
58              
59             =head2 Manager: Schedule::LongSteps
60              
61             A L<Schedule::LongSteps> provides an entry point to all thing related to Schedule::LongSteps process management.
62             You should keep once instance of this in your application (well, one instance per process) as this is what you
63             are going to use to launch and manage processes.
64              
65             =head1 QUICK START AND SYNOPSIS
66              
67             First write a class to represent your long running set of steps
68              
69             package My::Application::MyLongProcess;
70              
71             use Moose;
72             extends qw/Schedule::LongSteps::Process/;
73              
74             # Some contextual things.
75             has 'thing' => ( is => 'ro', required => 1); # Some mandatory context provided by your application at each regular run.
76              
77             # The first step should be executed after the process is installed on the target.
78             sub build_first_step{
79             my ($self) = @_;
80             return $self->new_step({ what => 'do_stuff1', run_at => DateTime->now() });
81             }
82              
83             sub do_stuff1{
84             my ($self) = @_;
85              
86             # The starting state
87             my $state = $self->state();
88              
89             my $thing = $self->thing();
90              
91             .. Do some stuff and return the next step to execute ..
92              
93             return $self->new_step({ what => 'do_stuff2', run_at => DateTime->... , state => { some => 'jsonable', hash => 'ref' ] });
94             }
95              
96             sub do_stuff2{
97             my ($self, $step) = @_;
98              
99             $self->wait_for_steps('do_stuff1', 'do_stuff2' );
100              
101             .. Do some stuff and terminate the process or goto do_stuff1 ..
102              
103             if( ... ){
104             return Schedule::LongSteps::Step->new({ what => 'do_stuff1', run_at => DateTime->... , state => { some jsonable structure } });
105             }
106             return $self->final_step({ state => { the => final, state => 1 } }) ;
107             }
108              
109             __PACKAGE__->meta->make_immutable();
110              
111             Then in you main application do this once per 'target':
112              
113             my $dbic_storage = Schedule::LongSteps::Storage::DBIxClass->new(...);
114             # Keep only ONE Instance of this in your application.
115             my $longsteps = Schedule::LongSteps->new({ storage => $dbic_storage });
116             ...
117              
118             $longsteps->instantiate_process('My::Application::MyProcess', { thing => 'whatever' }, { the => 'init', state => 1 });
119              
120             Then regularly (in a cron, or a recurring callback):
121              
122             my $dbic_storage = Schedule::LongSteps::Storage::DBIxClass->new(...);
123             # Keep only ONE instance of this in your application.
124             my $longsteps = Schedule::LongSteps->new({ storage => $dbic_storage });
125             ...
126              
127             $long_steps->run_due_steps({ thing => 'whatever' });
128              
129             =head1 EXAMPLE
130              
131             Look at L<https://github.com/jeteve/Schedule-LongSteps/blob/master/t/fullblown.t> for a full blown working
132             example.
133              
134             =head1 PERSISTANCE
135              
136             The persistance of processes is managed by a subclass of L<Schedule::LongSteps::Storage> that you should instantiate
137             and given to the constructor of L<Schedule::LongSteps>
138              
139             Example:
140              
141             my $dbic_storage = Schedule::LongSteps::Storage::DBIxClass->new(...);
142             my $longsteps = Schedule::LongSteps->new({ storage => $dbic_storage });
143             ...
144              
145             Out of the box, the following storage classes are available:
146              
147             =over
148              
149             =item L<Schedule::LongSteps::Storage::Memory>
150              
151             Persist processes in memory. Not very useful, except for testing. This is the storage of choice to unit test your processes.
152              
153             =item L<Schedule::LongSteps::Storage::AutoDBIx>
154              
155             Persist processes in a relational DB (a $dbh from L<DBI>). This is the easiest thing to use if you want to persist processes in a database, without having
156             to worry about creating a DBIx::Class model yourself.
157              
158             =item L<Schedule::LongSteps::Storage::DBIxClass>
159              
160             Persist processes in an existing L<DBIx::Class> schema. Nice if you want to have only one instance of Schema in your application and if
161             don't mind writing your own resultset.
162              
163             =item L<Schedule::LongSteps::Storage::DynamoDB>
164              
165             Persist processes in a DynamoDB table in AWS. Please consider this Alpha. Give it a go and report any issue!
166              
167             =back
168              
169             =head1 COOKBOOK
170              
171             =head2 WRITING A NEW PROCESS
172              
173             See 'QUICK START AND SYNOPSIS'
174              
175             =head2 INSTANTIATING A NEW PROCESS
176              
177             See 'QUICK START AND SYNOPSIS'
178              
179             =head2 RUNNING PROCESS STEPS
180              
181             See 'QUICK START AND SYNOPSIS
182              
183             =head2 BEING NOTIFIED OF ANY OF YOUR PROCESS ERROR
184              
185             Use the property 'on_error' on the Schedule::LongStep manager:
186              
187              
188             my $longsteps = Schedule::LongStep->new({ storage => ..,
189             on_error => sub{
190             my ( $stored_process , $error ) = @_;
191             .. do stuff with: ..
192             $error, # The original error. Not trimmed, and can be an object raised by
193             # the process.
194             $stored_process->error(), # The stored error. A string that might be trimmed.
195             $stored_process->process_class(),
196             $stored_process->state(), etc...
197             }
198             });
199              
200             Note that an error in your error handler itself will result in the output of
201             a pure Perl warning and an emmission of a 'critical' level L<Log::Any> log event.
202              
203             =head2 INJECTING PARAMETERS IN YOUR PROCESSES
204              
205             Of course each instance of your process will most probably need to
206             act on different pieces of application data. The one and only way to
207             give 'parameters' to your processes is to specify an initial state when
208             you instantiate a process:
209              
210             $longsteps->instantiate_process('My::App', { app => $app } , { work => 'on' , this => 'user_id' });
211              
212             =head2 INJECTING CONTEXT IN YOUR PROCESSES
213              
214             Let's say you hold an instance of your application object:
215              
216             my $app = ...;
217              
218             And you want to use it in your processes:
219              
220             package MyProcess;
221             ...
222             has 'app' => (is => 'ro', isa => 'My::App', required => 1);
223              
224             You can inject your $app instance in your processes at instantiation time:
225              
226             $longsteps->instantiate_process('My::App', { app => $app });
227              
228             And also when running the due steps:
229              
230             $longsteps->run_due_steps({ app => $app });
231              
232             The injected context should be stable over time. Do NOT use this to inject parameters. (See INJECTING PARAMETERS).
233              
234              
235             =head2 PROCESS WRITING
236              
237             This package should be expressive enough for you to implement business processes
238             as complex as those given as an example on this page: L<https://en.wikipedia.org/wiki/XPDL>
239              
240             Proper support for XPDL is not implemented yet, but here is a list of recipes to implement
241             the most common process patterns:
242              
243             =head3 MOVING TO A FINAL STATE
244              
245             Simply do in your step 'do_last_stuff' implementation:
246              
247             sub do_last_stuff{
248             my ($self) = @_;
249             # Return final_step with the final state.
250             return $self->final_step({ state => { the => 'final' , state => 1 } });
251             }
252              
253             =head3 DO SOMETHING ELSE IN X AMOUNT OF TIME
254              
255             sub do_stuff{
256             ...
257             # Do the things that have to be done NOW
258             ...
259             # And in two days, to this
260             return $self->new_step({ what => 'do_stuff_later', run_at => DateTime->now()->add( days => 2 ) , state => { some => 'new one' }});
261             }
262              
263              
264             =head3 DO SOMETHING CONDITIONALLY
265              
266             sub do_choose{
267             if( ... ){
268             return $self->new_step({ what => 'do_choice1', run_at => DateTime->now() });
269             }
270             return $self->new_step({ what => 'do_choice2', run_at => DateTime->now() });
271             }
272              
273             sub do_choice1{...}
274             sub do_choice2{...}
275              
276             =head3 FORKING AND WAITING FOR PROCESSES
277              
278              
279             sub do_fork{
280             ...
281             my $p1 = $self->longsteps->instantiate_process('AnotherProcessClass', \%build_args , \%initial_state );
282             my $p2 = $self->longsteps->instantiate_process('YetAnotherProcessClass', \%build_args2 , \%initial_state2 );
283             ...
284             return $self->new_step({ what => 'do_join', run_at => DateTime->now() , { processes => [ $p1->id(), p2->id() ] } });
285             }
286              
287             sub do_join{
288             return $self->wait_processes( $self->state()->{processes}, sub{
289             my ( @terminated_processes ) = @_;
290             my $state1 = $terminated_processes[0]->state();
291             my $state2 = $terminated_processes[1]->state();
292             ...
293             # And as usual:
294             return $self->...
295             });
296             }
297              
298             =head1 ATTRIBUTES
299              
300             =over
301              
302             =item storage
303              
304             An instance of a subclass of L<Schedule::LongSteps::Storage>. See SYNOPSIS.
305              
306             =item on_error
307              
308             A callback called like $on_error->( $stored_process , $error ). See COOKBOOK for an example
309              
310             =item error_limit
311              
312             Maximum size of error message to log and store. Defaults to 2000 characters.
313              
314             =back
315              
316             =head1 METHODS
317              
318             =head2 uuid
319              
320             Returns a L<Data::UUID> from the storage.
321              
322             =head2 run_due_processes
323              
324             Runs all the due processes steps according to now(). All processes
325             are given the context to be built.
326              
327             Usage:
328              
329             # No context given:
330             $this->run_due_processes();
331              
332             # With 'thing' as context:
333             $this->run_due_processes({ thing => ... });
334              
335             Returns the number of processes run
336              
337             =head2 instantiate_process
338              
339             Instantiate a stored process from the given process class returns a new process that will have an ID.
340              
341             Usage:
342              
343             $this->instantiate_process( 'MyProcessClass', { process_attribute1 => .. } , { initial => 'state' });
344              
345             =head2 find_process
346              
347             Shortcut to $self->storage->find_process( $pid );
348              
349             =head2 load_process
350              
351             Returns a loaded process for a given pid, or undef if there is no process
352             associated with the PID.
353              
354             An optional hash ref context can also be passed in
355             and will be used to load the process, a blank context is used if not provided.
356              
357             if( my $loaded_process = $self->load_process( $pid , $context ) ){
358             ...
359             }
360              
361             =head2 revive
362              
363             Revive a longstep process to a given step within a Longstep process.
364              
365             A context is required when the reviving process contains required attributes
366             and when setting a step to reviving step. If no step is given then the process
367             will revive on the failed process step, when setting a step that doesn't
368             require a context, use an empty hashref '{}'.
369              
370             If you need to modify the state before reviving the longstep process, it is
371             recommended to have a revive step ("revive_do_broken_step") which modifies
372             the state as needed and returns a next_step to continue the process.
373              
374             This method will confess on any issues.
375              
376             eval {
377             $self->revive( $pid, $context, $method_to_revive_to );
378             };
379              
380             =head1 SEE ALSO
381              
382             L<BPM::Engine> A business Process engine based on XPDL, in Alpha version since 2012 (at this time of writing)
383              
384             =head1 Copyright and Acknowledgement
385              
386             This code is released under the Perl5 Terms by Jerome Eteve (JETEVE), with the support of Broadbean Technologies Ltd.
387              
388             See L<perlartistic>
389              
390             =for HTML <a href="https://travis-ci.org/jeteve/Schedule-LongSteps"><img src="https://travis-ci.org/jeteve/Schedule-LongSteps.svg?branch=master"></a>
391              
392             =cut
393              
394 13     13   82646 use Class::Load;
  13         32  
  13         616  
395 13     13   5764 use DateTime;
  13         3124241  
  13         551  
396 13     13   4762 use Log::Any qw/$log/;
  13         70989  
  13         65  
397              
398 13     13   21999 use Schedule::LongSteps::Storage::Memory;
  13         4735  
  13         9876  
399              
400             has 'storage' => ( is => 'ro', isa => 'Schedule::LongSteps::Storage', lazy_build => 1);
401              
402             has 'on_error' => ( is => 'ro', isa => 'CodeRef', default => sub{ return sub{}; } );
403              
404             has 'error_limit' => ( is => 'ro', isa => 'Int' , default => 2000 );
405              
406             sub _build_storage{
407 12     12   35 my ($self) = @_;
408 12         91 $log->warn("No storage specified. Will use Memory storage");
409 12         577 return Schedule::LongSteps::Storage::Memory->new();
410             }
411              
412              
413             sub uuid{
414 0     0 1 0 my ($self) = @_;
415 0         0 return $self->storage()->uuid();
416             }
417              
418             sub run_due_processes{
419 37     37 1 6797 my ($self, $context) = @_;
420 37   100     318 $context ||= {};
421              
422 37         1162 my @stored_processes = $self->storage->prepare_due_processes();
423 37         83 my $process_count = 0;
424 37         89 foreach my $stored_process ( @stored_processes ){
425 32         1023 my $process_method = $stored_process->what();
426              
427 32         62 $process_count++;
428              
429 32         59 my $new_step_properties = eval{
430 32         952 $log->info( "Will do $process_method on process ID=".$stored_process->id() );
431 32         228 my $process = $self->_load_stored_process($stored_process,$context);
432 31         22701 $process->$process_method();
433             };
434 32 100       3767 if( my $original_err = $@ ){
435              
436             # Stringify the error, just in case its an object.
437 7         30 my $err = $original_err.'';
438 7 100       204 if( length( $err ) > $self->error_limit() ){
439 2         44 $log->warn("Error too long. Trimming to ".$self->error_limit());
440 2         108 $err = substr( $err , 0 , $self->error_limit() );
441             }
442 7         201 $log->error("Error running process ".$stored_process->process_class().':'.$stored_process->id().' :'.$err);
443 7         113 $stored_process->update({
444             status => 'terminated',
445             error => $err,
446             run_at => undef,
447             run_id => undef,
448             });
449              
450 7         20 eval{ $self->on_error()->( $stored_process , $original_err ); };
  7         191  
451 7 100       33 if( my $on_error_error = $@ ){
452 1         91 warn("Error handler triggered an error: $on_error_error");
453 1         16 $log->critical("Error handler triggered an error: $on_error_error");
454             }
455              
456 7         56 next;
457             }
458              
459             $stored_process->update({
460             status => 'paused',
461             run_at => undef,
462             run_id => undef,
463 25         63 %{$new_step_properties}
  25         162  
464             });
465             }
466 37         212 return $process_count;
467             }
468              
469             sub instantiate_process{
470 16     16 1 853 my ($self, $process_class, $build_args, $init_state ) = @_;
471              
472 16 100       64 defined( $build_args ) or ( $build_args = {} );
473 16 100       69 defined( $init_state ) or ( $init_state = {} );
474              
475 16         165 Class::Load::load_class($process_class);
476 16 50       844 unless( $process_class->isa('Schedule::LongSteps::Process') ){
477 0         0 confess("Class '$process_class' is not an instance of 'Schedule::LongSteps::Process'");
478             }
479 16         42 my $process = $process_class->new( { longsteps => $self, %{ $build_args } } );
  16         559  
480 16         13479 my $step_props = $process->build_first_step();
481              
482             my $stored_process = $self->storage->create_process({
483             process_class => $process_class,
484             status => 'pending',
485             state => $init_state,
486 16         657 %{$step_props}
  16         155  
487             });
488 16         485 return $stored_process;
489             }
490              
491             sub find_process{
492 18     18 1 44 my ($self, $pid) = @_;
493 18         439 return $self->storage()->find_process($pid);
494             }
495              
496             sub load_process {
497 7     7 1 23 my ( $self, $pid, $context ) = @_;
498 7   50     65 $context ||= {};
499              
500 7         24 my $stored_process = $self->find_process($pid);
501 7 100       23 return unless $stored_process;
502 6         22 return $self->_load_stored_process( $stored_process, $context );
503             }
504              
505             sub revive {
506 7     7 1 21 my ( $self, $process_id, $context, $revive_to ) = @_;
507 7   100     35 $context ||= {};
508              
509 7         24 my $stored_process = $self->find_process($process_id);
510 7 50       31 confess "There is no $process_id to revive" unless $stored_process;
511              
512 7 50       234 confess("$process_id does not have a status of 'terminated'") if ( $stored_process->status() ne "terminated" );
513              
514             # load the process and check if process have the method to revive_to
515             # if revive $revive_to was not passed, used the function we failed on.
516             # and check that also, just in case we attempt to revive on a method
517             # that was previously removed.
518 7         25 my $loaded_process = $self->_load_stored_process($stored_process, $context);
519              
520 6 100       4870 $revive_to = $stored_process->what() unless $revive_to;
521              
522             # check to see if we able to revive
523 6 100       374 confess "Unable revive $process_id to $revive_to" unless $loaded_process->can($revive_to);
524              
525             # Set the process up to be revived.
526 4         27 my $now = DateTime->now();
527 4         1510 $stored_process->what($revive_to);
528 4         135 $stored_process->error(undef);
529 4         133 $stored_process->status("paused");
530 4         129 $stored_process->run_at( $now );
531 4         30 $stored_process->update({
532             what => $revive_to,
533             error => undef,
534             status => "paused",
535             run_at => $now
536             });
537              
538 4         130 return 1;
539             }
540              
541             # load_class may croak when trying to load a module you that is not in the INC
542             # so to be safe make sure you put this in an eval, and handle the errors
543             # appropriately
544             sub _load_stored_process {
545 45     45   111 my ( $self, $stored_process, $context ) = @_;
546 45   50     99 $context ||= {};
547              
548 45         1402 Class::Load::load_class( $stored_process->process_class() );
549             return $stored_process->process_class()->new(
550             {
551             longsteps => $self,
552             stored_process => $stored_process,
553 43         2728 %{$context}
  43         1262  
554             }
555             );
556             }
557              
558              
559              
560             __PACKAGE__->meta->make_immutable();