File Coverage

blib/lib/Schedule/LongSteps.pm
Criterion Covered Total %
statement 83 86 96.5
branch 19 22 86.3
condition 6 8 75.0
subroutine 12 13 92.3
pod 6 6 100.0
total 126 135 93.3


line stmt bran cond sub pod time code
1             package Schedule::LongSteps;
2             $Schedule::LongSteps::VERSION = '0.022';
3             # ABSTRACT: Manage long term processes over arbitrary large spans of time.
4              
5 13     13   2893523 use Moose;
  13         5220332  
  13         95  
6              
7             =head1 NAME
8              
9             Schedule::LongSteps - Manage long term processes over arbitrary large spans of time.
10              
11              
12             =head1 ABSTRACT
13              
14             =for html <a href="https://travis-ci.org/skinnyjeans/Schedule-LongSteps"><img src="https://travis-ci.org/skinnyjeans/Schedule-LongSteps.svg?branch=dynamo"></a>
15              
16             This attempts to solve the problem of defining and running a set of potentially conditional steps accross an arbitrary long timespan.
17              
18             An example of such a process would be: "After an order has been started, if more than one hour, send an email reminder every 2 days until the order is finished. Give up after a month"". You get the idea.
19              
20             Such a process is usually a pain to implement and this is an attempt to provide a framework so it would make writing and testing such a process as easy as writing and testing a good old Class.
21              
22             =head1 INCOMPATIBLE CHANGES
23              
24             From 0.013
25              
26             The API of Storage has changed between 0.012 and 0.013. If you've written your own storage, you
27             will want to manage that. If not, then don't worry about it.
28              
29             =head1 CONCEPTS
30              
31             =head2 Process
32              
33             A Process represents a set of logically linked steps that need to run over a long span of times (hours, months, even years..). It persists in a Storage.
34              
35             At the logical level, the persistant Process has the following attributes (See L<Schedule::LongSteps::Storage::DBIxClass> for a comprehensive list):
36              
37             - what. Which step should it run next.
38              
39             - run_at. A L<DateTime> at which this next step should be run. This allows running a step far in the future.
40              
41             - status. Is the step running, or paused or is the process terminated.
42              
43             - state. The persistant state of your application. This should be a pure Perl hash (JSONable).
44              
45             Users (you) implement their business process as a subclass of L<Schedule::LongSteps::Process>. Such subclasses can have contextual properties
46             as Moose properties that will have to be supplied by the L<Schedule::LongSteps> management methods.
47              
48             =head2 Steps
49              
50             A step is simply a subroutine in a process class that runs some business code. It always returns either a new step to be run
51             or a final step marker.
52              
53             =head2 Storage
54              
55             A storage provides the backend to persist processes. Build a Schedule::LongSteps with a storage instance.
56              
57             See section PERSISTANCE for a list of available storage classes.
58              
59             =head2 Manager: Schedule::LongSteps
60              
61             A L<Schedule::LongSteps> provides an entry point to all thing related to Schedule::LongSteps process management.
62             You should keep once instance of this in your application (well, one instance per process) as this is what you
63             are going to use to launch and manage processes.
64              
65             =head1 QUICK START AND SYNOPSIS
66              
67             First write a class to represent your long running set of steps
68              
69             package My::Application::MyLongProcess;
70              
71             use Moose;
72             extends qw/Schedule::LongSteps::Process/;
73              
74             # Some contextual things.
75             has 'thing' => ( is => 'ro', required => 1); # Some mandatory context provided by your application at each regular run.
76              
77             # The first step should be executed after the process is installed on the target.
78             sub build_first_step{
79             my ($self) = @_;
80             return $self->new_step({ what => 'do_stuff1', run_at => DateTime->now() });
81             }
82              
83             sub do_stuff1{
84             my ($self) = @_;
85              
86             # The starting state
87             my $state = $self->state();
88              
89             my $thing = $self->thing();
90              
91             .. Do some stuff and return the next step to execute ..
92              
93             return $self->new_step({ what => 'do_stuff2', run_at => DateTime->... , state => { some => 'jsonable', hash => 'ref' ] });
94             }
95              
96             sub do_stuff2{
97             my ($self, $step) = @_;
98              
99             $self->wait_for_steps('do_stuff1', 'do_stuff2' );
100              
101             .. Do some stuff and terminate the process or goto do_stuff1 ..
102              
103             if( ... ){
104             return Schedule::LongSteps::Step->new({ what => 'do_stuff1', run_at => DateTime->... , state => { some jsonable structure } });
105             }
106             return $self->final_step({ state => { the => final, state => 1 } }) ;
107             }
108              
109             __PACKAGE__->meta->make_immutable();
110              
111             Then in you main application do this once per 'target':
112              
113             my $dbic_storage = Schedule::LongSteps::Storage::DBIxClass->new(...);
114             # Keep only ONE Instance of this in your application.
115             my $longsteps = Schedule::LongSteps->new({ storage => $dbic_storage });
116             ...
117              
118             $longsteps->instantiate_process('My::Application::MyProcess', { thing => 'whatever' }, { the => 'init', state => 1 });
119              
120             Then regularly (in a cron, or a recurring callback):
121              
122             my $dbic_storage = Schedule::LongSteps::Storage::DBIxClass->new(...);
123             # Keep only ONE instance of this in your application.
124             my $longsteps = Schedule::LongSteps->new({ storage => $dbic_storage });
125             ...
126              
127             $long_steps->run_due_steps({ thing => 'whatever' });
128              
129             =head1 EXAMPLE
130              
131             Look at L<https://github.com/jeteve/Schedule-LongSteps/blob/master/t/fullblown.t> for a full blown working
132             example.
133              
134             =head1 PERSISTANCE
135              
136             The persistance of processes is managed by a subclass of L<Schedule::LongSteps::Storage> that you should instantiate
137             and given to the constructor of L<Schedule::LongSteps>
138              
139             Example:
140              
141             my $dbic_storage = Schedule::LongSteps::Storage::DBIxClass->new(...);
142             my $longsteps = Schedule::LongSteps->new({ storage => $dbic_storage });
143             ...
144              
145             Out of the box, the following storage classes are available:
146              
147             =over
148              
149             =item L<Schedule::LongSteps::Storage::Memory>
150              
151             Persist processes in memory. Not very useful, except for testing. This is the storage of choice to unit test your processes.
152              
153             =item L<Schedule::LongSteps::Storage::AutoDBIx>
154              
155             Persist processes in a relational DB (a $dbh from L<DBI>). This is the easiest thing to use if you want to persist processes in a database, without having
156             to worry about creating a DBIx::Class model yourself. We recommend you give this storage its own dbh connection, segregated from the rest
157             of your application.
158              
159             =item L<Schedule::LongSteps::Storage::DBIxClass>
160              
161             Persist processes in an existing L<DBIx::Class> schema. Note that although this makes a reasonable attempts not to interfer
162             with your own transactions, we recommend that you build a instance of your schema with a dedicated and segregated L<DBI> connection
163             just for this LongSteps purpose.
164              
165             =item L<Schedule::LongSteps::Storage::DynamoDB>
166              
167             Persist processes in a DynamoDB table in AWS. Please consider this Alpha. Give it a go and report any issue!
168              
169             =back
170              
171             =head1 COOKBOOK
172              
173             =head2 WRITING A NEW PROCESS
174              
175             See 'QUICK START AND SYNOPSIS'
176              
177             =head2 INSTANTIATING A NEW PROCESS
178              
179             See 'QUICK START AND SYNOPSIS'
180              
181             =head2 RUNNING PROCESS STEPS
182              
183             See 'QUICK START AND SYNOPSIS
184              
185             =head2 BEING NOTIFIED OF ANY OF YOUR PROCESS ERROR
186              
187             Use the property 'on_error' on the Schedule::LongStep manager:
188              
189              
190             my $longsteps = Schedule::LongStep->new({ storage => ..,
191             on_error => sub{
192             my ( $stored_process , $error ) = @_;
193             .. do stuff with: ..
194             $error, # The original error. Not trimmed, and can be an object raised by
195             # the process.
196             $stored_process->error(), # The stored error. A string that might be trimmed.
197             $stored_process->process_class(),
198             $stored_process->state(), etc...
199             }
200             });
201              
202             Note that an error in your error handler itself will result in the output of
203             a pure Perl warning and an emmission of a 'critical' level L<Log::Any> log event.
204              
205             =head2 INJECTING PARAMETERS IN YOUR PROCESSES
206              
207             Of course each instance of your process will most probably need to
208             act on different pieces of application data. The one and only way to
209             give 'parameters' to your processes is to specify an initial state when
210             you instantiate a process:
211              
212             $longsteps->instantiate_process('My::App', { app => $app } , { work => 'on' , this => 'user_id' });
213              
214             =head2 INJECTING CONTEXT IN YOUR PROCESSES
215              
216             Let's say you hold an instance of your application object:
217              
218             my $app = ...;
219              
220             And you want to use it in your processes:
221              
222             package MyProcess;
223             ...
224             has 'app' => (is => 'ro', isa => 'My::App', required => 1);
225              
226             You can inject your $app instance in your processes at instantiation time:
227              
228             $longsteps->instantiate_process('My::App', { app => $app });
229              
230             And also when running the due steps:
231              
232             $longsteps->run_due_steps({ app => $app });
233              
234             The injected context should be stable over time. Do NOT use this to inject parameters. (See INJECTING PARAMETERS).
235              
236              
237             =head2 PROCESS WRITING
238              
239             This package should be expressive enough for you to implement business processes
240             as complex as those given as an example on this page: L<https://en.wikipedia.org/wiki/XPDL>
241              
242             Proper support for XPDL is not implemented yet, but here is a list of recipes to implement
243             the most common process patterns:
244              
245             =head3 MOVING TO A FINAL STATE
246              
247             Simply do in your step 'do_last_stuff' implementation:
248              
249             sub do_last_stuff{
250             my ($self) = @_;
251             # Return final_step with the final state.
252             return $self->final_step({ state => { the => 'final' , state => 1 } });
253             }
254              
255             =head3 DO SOMETHING ELSE IN X AMOUNT OF TIME
256              
257             sub do_stuff{
258             ...
259             # Do the things that have to be done NOW
260             ...
261             # And in two days, to this
262             return $self->new_step({ what => 'do_stuff_later', run_at => DateTime->now()->add( days => 2 ) , state => { some => 'new one' }});
263             }
264              
265              
266             =head3 DO SOMETHING CONDITIONALLY
267              
268             sub do_choose{
269             if( ... ){
270             return $self->new_step({ what => 'do_choice1', run_at => DateTime->now() });
271             }
272             return $self->new_step({ what => 'do_choice2', run_at => DateTime->now() });
273             }
274              
275             sub do_choice1{...}
276             sub do_choice2{...}
277              
278             =head3 FORKING AND WAITING FOR PROCESSES
279              
280              
281             sub do_fork{
282             ...
283             my $p1 = $self->longsteps->instantiate_process('AnotherProcessClass', \%build_args , \%initial_state );
284             my $p2 = $self->longsteps->instantiate_process('YetAnotherProcessClass', \%build_args2 , \%initial_state2 );
285             ...
286             return $self->new_step({ what => 'do_join', run_at => DateTime->now() , { processes => [ $p1->id(), p2->id() ] } });
287             }
288              
289             sub do_join{
290             return $self->wait_processes( $self->state()->{processes}, sub{
291             my ( @terminated_processes ) = @_;
292             my $state1 = $terminated_processes[0]->state();
293             my $state2 = $terminated_processes[1]->state();
294             ...
295             # And as usual:
296             return $self->...
297             });
298             }
299              
300             =head1 ATTRIBUTES
301              
302             =over
303              
304             =item storage
305              
306             An instance of a subclass of L<Schedule::LongSteps::Storage>. See SYNOPSIS.
307              
308             =item on_error
309              
310             A callback called like $on_error->( $stored_process , $error ). See COOKBOOK for an example
311              
312             =item error_limit
313              
314             Maximum size of error message to log and store. Defaults to 2000 characters.
315              
316             =back
317              
318             =head1 METHODS
319              
320             =head2 uuid
321              
322             Returns a L<Data::UUID> from the storage.
323              
324             =head2 run_due_processes
325              
326             Runs all the due processes steps according to now(). All processes
327             are given the context to be built.
328              
329             Usage:
330              
331             # No context given:
332             $this->run_due_processes();
333              
334             # With 'thing' as context:
335             $this->run_due_processes({ thing => ... });
336              
337             Returns the number of processes run
338              
339             =head2 instantiate_process
340              
341             Instantiate a stored process from the given process class returns a new process that will have an ID.
342              
343             Usage:
344              
345             $this->instantiate_process( 'MyProcessClass', { process_attribute1 => .. } , { initial => 'state' });
346              
347             =head2 find_process
348              
349             Shortcut to $self->storage->find_process( $pid );
350              
351             =head2 load_process
352              
353             Returns a loaded process for a given pid, or undef if there is no process
354             associated with the PID.
355              
356             An optional hash ref context can also be passed in
357             and will be used to load the process, a blank context is used if not provided.
358              
359             if( my $loaded_process = $self->load_process( $pid , $context ) ){
360             ...
361             }
362              
363             =head2 revive
364              
365             Revive a longstep process to a given step within a Longstep process.
366              
367             A context is required when the reviving process contains required attributes
368             and when setting a step to reviving step. If no step is given then the process
369             will revive on the failed process step, when setting a step that doesn't
370             require a context, use an empty hashref '{}'.
371              
372             If you need to modify the state before reviving the longstep process, it is
373             recommended to have a revive step ("revive_do_broken_step") which modifies
374             the state as needed and returns a next_step to continue the process.
375              
376             This method will confess on any issues.
377              
378             eval {
379             $self->revive( $pid, $context, $method_to_revive_to );
380             };
381              
382             =head1 SEE ALSO
383              
384             L<BPM::Engine> A business Process engine based on XPDL, in Alpha version since 2012 (at this time of writing)
385              
386             =head1 Copyright and Acknowledgement
387              
388             This code is released under the Perl5 Terms by Jerome Eteve (JETEVE), with the support of Broadbean Technologies Ltd.
389              
390             See L<perlartistic>
391              
392             =for HTML <a href="https://travis-ci.org/jeteve/Schedule-LongSteps"><img src="https://travis-ci.org/jeteve/Schedule-LongSteps.svg?branch=master"></a>
393              
394             =cut
395              
396 13     13   89214 use Class::Load;
  13         33  
  13         640  
397 13     13   6234 use DateTime;
  13         3342346  
  13         597  
398 13     13   5478 use Log::Any qw/$log/;
  13         75697  
  13         67  
399              
400 13     13   22861 use Schedule::LongSteps::Storage::Memory;
  13         5688  
  13         11581  
401              
402             has 'storage' => ( is => 'ro', isa => 'Schedule::LongSteps::Storage', lazy_build => 1);
403              
404             has 'on_error' => ( is => 'ro', isa => 'CodeRef', default => sub{ return sub{}; } );
405              
406             has 'error_limit' => ( is => 'ro', isa => 'Int' , default => 2000 );
407              
408             sub _build_storage{
409 12     12   32 my ($self) = @_;
410 12         87 $log->warn("No storage specified. Will use Memory storage");
411 12         657 return Schedule::LongSteps::Storage::Memory->new();
412             }
413              
414              
415             sub uuid{
416 0     0 1 0 my ($self) = @_;
417 0         0 return $self->storage()->uuid();
418             }
419              
420             sub run_due_processes{
421 37     37 1 6874 my ($self, $context) = @_;
422 37   100     197 $context ||= {};
423              
424 37         1221 my @stored_processes = $self->storage->prepare_due_processes();
425 37         86 my $process_count = 0;
426 37         77 foreach my $stored_process ( @stored_processes ){
427 32         1019 my $process_method = $stored_process->what();
428              
429 32         64 $process_count++;
430              
431 32         90 my $new_step_properties = eval{
432 32         967 $log->info( "Will do $process_method on process ID=".$stored_process->id() );
433 32         239 my $process = $self->_load_stored_process($stored_process,$context);
434 31         22145 $process->$process_method();
435             };
436 32 100       4116 if( my $original_err = $@ ){
437              
438             # Stringify the error, just in case its an object.
439 7         46 my $err = $original_err.'';
440 7 100       212 if( length( $err ) > $self->error_limit() ){
441 2         46 $log->warn("Error too long. Trimming to ".$self->error_limit());
442 2         110 $err = substr( $err , 0 , $self->error_limit() );
443             }
444 7         225 $log->error("Error running process ".$stored_process->process_class().':'.$stored_process->id().' :'.$err);
445 7         264 $self->storage()->update_process( $stored_process, {
446             status => 'terminated',
447             error => $err,
448             run_at => undef,
449             run_id => undef,
450             });
451              
452 7         20 eval{ $self->on_error()->( $stored_process , $original_err ); };
  7         176  
453 7 100       36 if( my $on_error_error = $@ ){
454 1         36 warn("Error handler triggered an error: $on_error_error");
455 1         12 $log->critical("Error handler triggered an error: $on_error_error");
456             }
457              
458 7         55 next;
459             }
460              
461             $self->storage()->update_process( $stored_process, {
462             status => 'paused',
463             run_at => undef,
464             run_id => undef,
465 25         683 %{$new_step_properties}
  25         202  
466             });
467             }
468 37         201 return $process_count;
469             }
470              
471             sub instantiate_process{
472 16     16 1 890 my ($self, $process_class, $build_args, $init_state ) = @_;
473              
474 16 100       64 defined( $build_args ) or ( $build_args = {} );
475 16 100       99 defined( $init_state ) or ( $init_state = {} );
476              
477 16         170 Class::Load::load_class($process_class);
478 16 50       849 unless( $process_class->isa('Schedule::LongSteps::Process') ){
479 0         0 confess("Class '$process_class' is not an instance of 'Schedule::LongSteps::Process'");
480             }
481 16         43 my $process = $process_class->new( { longsteps => $self, %{ $build_args } } );
  16         621  
482 16         14688 my $step_props = $process->build_first_step();
483              
484             my $stored_process = $self->storage->create_process({
485             process_class => $process_class,
486             status => 'pending',
487             state => $init_state,
488 16         733 %{$step_props}
  16         163  
489             });
490 16         527 return $stored_process;
491             }
492              
493             sub find_process{
494 18     18 1 45 my ($self, $pid) = @_;
495 18         449 return $self->storage()->find_process($pid);
496             }
497              
498             sub load_process {
499 7     7 1 22 my ( $self, $pid, $context ) = @_;
500 7   50     39 $context ||= {};
501              
502 7         27 my $stored_process = $self->find_process($pid);
503 7 100       24 return unless $stored_process;
504 6         21 return $self->_load_stored_process( $stored_process, $context );
505             }
506              
507             sub revive {
508 7     7 1 19 my ( $self, $process_id, $context, $revive_to ) = @_;
509 7   100     33 $context ||= {};
510              
511 7         25 my $stored_process = $self->find_process($process_id);
512 7 50       19 confess "There is no $process_id to revive" unless $stored_process;
513              
514 7 50       203 confess("$process_id does not have a status of 'terminated'") if ( $stored_process->status() ne "terminated" );
515              
516             # load the process and check if process have the method to revive_to
517             # if revive $revive_to was not passed, used the function we failed on.
518             # and check that also, just in case we attempt to revive on a method
519             # that was previously removed.
520 7         22 my $loaded_process = $self->_load_stored_process($stored_process, $context);
521              
522 6 100       4215 $revive_to = $stored_process->what() unless $revive_to;
523              
524             # check to see if we able to revive
525 6 100       349 confess "Unable revive $process_id to $revive_to" unless $loaded_process->can($revive_to);
526              
527             # Set the process up to be revived.
528 4         28 my $now = DateTime->now();
529 4         1376 $stored_process->what($revive_to);
530 4         117 $stored_process->error(undef);
531 4         111 $stored_process->status("paused");
532 4         114 $stored_process->run_at( $now );
533 4         95 $self->storage()->update_process( $stored_process, {
534             what => $revive_to,
535             error => undef,
536             status => "paused",
537             run_at => $now
538             });
539              
540 4         110 return 1;
541             }
542              
543             # load_class may croak when trying to load a module you that is not in the INC
544             # so to be safe make sure you put this in an eval, and handle the errors
545             # appropriately
546             sub _load_stored_process {
547 45     45   96 my ( $self, $stored_process, $context ) = @_;
548 45   50     113 $context ||= {};
549              
550 45         1447 Class::Load::load_class( $stored_process->process_class() );
551             return $stored_process->process_class()->new(
552             {
553             longsteps => $self,
554             stored_process => $stored_process,
555 43         2700 %{$context}
  43         1216  
556             }
557             );
558             }
559              
560              
561              
562             __PACKAGE__->meta->make_immutable();