File Coverage

lib/Sub/Slice.pm
Criterion Covered Total %
statement 194 205 94.6
branch 80 94 85.1
condition 27 35 77.1
subroutine 36 37 97.3
pod 20 22 90.9
total 357 393 90.8


line stmt bran cond sub pod time code
1             #############################################################################
2             ## Name: Sub::Slice
3             ## Purpose: Split long-running tasks into manageable chunks
4             ## Author: Simon Flack
5             ## Modified by: $Author: colinr $ on $Date: 2005/11/23 14:31:51 $
6             ## Created: 23/01/2003
7             ## RCS-ID: $Id: Slice.pm,v 1.48 2005/11/23 14:31:51 colinr Exp $
8             #############################################################################
9             package Sub::Slice;
10              
11 1     1   1520 use strict;
  1         3  
  1         42  
12 1     1   5 use vars qw/ $VERSION /;
  1         2  
  1         42  
13 1     1   4 use Carp;
  1         2  
  1         2012  
14              
15             $VERSION = sprintf"%d.%03d", q$Revision: 1.48 $ =~ /: (\d+)\.(\d+)/;
16              
17             sub new {
18 26     26 1 4306 my $class = shift;
19 26 100       263 confess "args: expecting flattened hash (even-numbered list)" if @_%2;
20 25         98 my %args = @_;
21              
22             #Load backend
23 25   100     167 my $backend_name = $args{backend} || 'Filesystem';
24 25 100       208 croak("Invalid Sub::Slice::Backend - $backend_name") if($backend_name =~ m|[^\w:]|);
25 24 100       84 $backend_name = "Sub::Slice::Backend::$backend_name" unless($backend_name =~ /::/); #If no namespace, assume in the Sub::Slice::Backend namespace
26 24         1275 eval "require $backend_name";
27 24 100       103 die("Unable to load $backend_name - $@") if($@);
28 23         132 my $backend = $backend_name->new($args{storage_options});
29              
30             #Create job
31 23         30 my $job;
32 23 100       49 if ($args{token}) {
33 18         23 my $token = $args{token};
34              
35             # Some serializers (XML::Simple?) refuse to serialize
36             # a blessed object. We can now cope with an unblessed
37             # token (though if it is blessed, we leave it as it is)
38 18 100 100     404 confess "illegal scalar token" unless(ref $token && UNIVERSAL::isa($token, 'HASH'));
39 16         68 $token = Sub::Slice::Token->rebless($token);
40 16         50 $job = $backend->load_job( $token->{id} );
41 15         1421 TRACE($job->{token}->{pin}, $token->{pin});
42 15 100       66 die("Signature from client doesn't match the value on the server") unless($job->{token}->{pin} == $token->{pin});
43 14         37 $token->clean(); # remove cruft from last run
44 14         18 $job->{'token'} = $token;
45 14 50       55 $job->{'iterations'} = $token->{iterations} if(defined $token->{iterations});
46             } else {
47 5         20 my $id = $backend->new_id();
48 5         20 $job = _create_job($id, \%args);
49 5         11 bless $job, $class;
50             }
51              
52             #Attach backend only for transient lifetime
53 19         52 $job->{'backend'} = $backend;
54 19         26 $job->{'this_iteration'} = 0;
55 19         61 return $job;
56             }
57              
58             sub DESTROY {
59 20     20   1753 my $self = shift;
60 20         71 TRACE("Sub::Slice destructor for $self");
61              
62             #Things not to persist
63 20         40 delete $self->{'return_value'};
64 20         39 my $backend = delete $self->{'backend'};
65              
66 20         23 my $new_eval_error;
67             {
68             # If DESTROY is being called as a result of something else dying, protect $@
69 20         54 local $@;
  20         21  
70              
71 20         39 my $job_id = $self->id;
72 20 100 66     44 if($self->is_done || $self->abort) {
73 2         8 TRACE("finished job: $job_id - deleting...");
74 2         8 $backend->delete_job($job_id);
75             } else {
76 18         23 eval { $backend->save_job($self) };
  18         56  
77 18 100       3554 if ($new_eval_error = $@) {
78 1         4 TRACE($new_eval_error);
79 1         2 DUMP($self)
80             }
81 18         34 TRACE('DONE');
82             }
83             }
84 20 100       240 $@ = $new_eval_error if $new_eval_error;
85             }
86              
87             sub token {
88 193     193 1 467 my $self = shift;
89 193 100       370 croak("cannot set token") if @_;
90 192         639 return $self->{'token'};
91             }
92              
93             sub id {
94 73     73 1 87 my $self = shift;
95 73 100       331 croak("cannot set id") if @_;
96 72         110 return $self->token->{id};
97             }
98              
99             sub set_estimate {
100 5     5 1 19 my $self = shift;
101 5 50       22 defined $_[0] and $self->token()->{estimate} = shift;
102             }
103              
104             sub estimate {
105 2     2 1 222 my $self = shift;
106 2 100       82 croak("use set_estimate() to change estimate") if @_;
107 1         3 return $self->token()->{estimate};
108             }
109              
110             sub count {
111 2     2 1 219 my $self = shift;
112 2 100       85 croak("cannot set count") if @_;
113 1         4 return $self->token()->{count};
114             }
115              
116             sub is_done {
117 51     51 1 266 my $self = shift;
118 51 100       152 croak("use done() to mark job as completed") if @_;
119 50         72 return $self->token()->{done};
120             }
121              
122             sub done {
123 3     3 1 229 my $self = shift;
124 3 100       86 croak("done() doesn't take arguments") if @_;
125 2         5 $self->token()->{done} = 1;
126             }
127              
128             sub abort {
129 18     18 1 20 my $self = shift;
130 18 50       38 if (defined $_[0]) {
131 0         0 $self->token()->{error} = shift;
132 0         0 $self->token()->{abort} = 1;
133             }
134 18         33 return $self->token()->{'abort'};
135             }
136              
137             sub status {
138 6     6 1 28 my $self = shift;
139 6 50       20 defined $_[0] and $self->token()->{status} = shift;
140 6         14 return $self->token()->{status};
141             }
142              
143             sub store {
144 29     29 1 386 my $self = shift;
145 29         59 while (@_) {
146 29         34 my ($key, $value) = (shift, shift);
147 29 100       117 croak("Usage: store( KEY, [VALUE] )") unless ($key);
148 28 100       122 croak("Error: invalid key: $key") if ref $key;
149 27 100 66     120 if(ref $value || !$self->{blob_threshold} || length($value) <= $self->{blob_threshold}) {
      100        
150 26         160 $self->{'backend'}->store($self, $key, $value);
151             } else {
152 1         4 $self->store_blob($key, $value);
153             }
154             }
155             }
156              
157             sub fetch {
158 44     44 1 395 my $self = shift;
159 44         44 my $key = shift;
160 44 100       157 croak("Usage: fetch( KEY )") unless ($key);
161 43 100       131 croak("Error: invalid key") if ref $key;
162 42         112 my $d = $self->{'backend'}->fetch($self, $key);
163 42 100       121 return defined $d ? $d : $self->fetch_blob($key);
164             }
165              
166             sub store_blob {
167 3     3 1 11 my $self = shift;
168 3         5 my ($key, $data) = @_;
169 3         13 $self->{'backend'}->store_blob($self, $key, $data);
170             }
171              
172             sub fetch_blob {
173 5     5 1 7 my $self = shift;
174 5         7 my ($key) = @_;
175 5         18 return $self->{'backend'}->fetch_blob($self, $key);
176             }
177              
178             sub stage {
179 52     52 1 312 my $self = shift;
180 52 100       160 croak("use next_stage() to set stage") if @_;
181 51         145 return $self->{'stage'};
182             }
183              
184             sub return_value {
185 20     20 1 50 my $self = shift;
186 20 100       41 if (@_) {
187 16         36 $self->{return_value} = [@_];
188 16         29 return @_;
189             }
190 4         7 my $rv = $self->{return_value};
191 4 50       8 if ($rv) {
192 4 50       24 return (@$rv)[0..$#$rv] if $#$rv >= 0;
193             } else {
194             return ()
195 0         0 }
196             }
197              
198             sub next_stage {
199 8     8 1 356 my $self = shift;
200 8         12 my $stage = shift;
201 8 100 100     188 croak("Error: invalid stage") if (ref $stage || !defined $stage);
202 6         8 $self->{'stage'} = $stage;
203 6         17 $self->{'token'}->{'stage'} = $stage;
204             }
205              
206             sub at_start ($&) {
207 7     7 1 2877 my $self = shift;
208 7         12 my ($code) = @_;
209 7 100 66     33 return if $self->{'initialised'} || $self->stage();
210 2         5 TRACE(sprintf 'Running at_start for job %s', $self->id);
211 2         5 $self->{'initialised'}++;
212              
213 2         42 eval {$self->return_value($code->($self))};
  2         8  
214 2 50       7 if (my $error = $@) {
215 0         0 $self->abort($error);
216 0         0 die $error;
217             }
218              
219 2         4 return $self->return_value;
220             }
221              
222             sub at_stage ($$&) {
223 28     28 1 194 my $self = shift;
224 28         31 my ($this_stage, $code) = @_;
225              
226 28 50       117 croak("undefined stage") unless defined $this_stage;
227              
228 28 50       48 return unless $self->{'initialised'};
229 28 100       46 if (my $stage = $self->stage()) {
230 26 100 66     115 return unless defined $this_stage && $this_stage eq $stage;
231             } else {
232 2         6 $self->next_stage($this_stage);
233             }
234              
235 11         24 TRACE(sprintf'Running stage:%s for job %s', $this_stage, $self->id);
236              
237 11         12 while (1) {
238 23 100 100     39 last if ( $self->is_done() || $this_stage ne $self->stage() );
239 17         43 my $iterate = $self->{'iterations'};
240 17 100 100     50 last if $iterate && $self->{'this_iteration'} >= $iterate;
241 12         14 $self->{'this_iteration'}++;
242 12         21 $self->token()->{count}++;
243              
244 12         14 eval {$self->return_value($code->($self))}; #Trap any exceptions
  12         28  
245 12 50       29 if (my $error = $@) {
246 0         0 $self->abort($error); #Record error message
247 0         0 die $error; #Re-throw exception
248             }
249             }
250              
251 11         25 return 1;
252             }
253              
254             sub at_end ($&) {
255 7     7 1 50 my $self = shift;
256 7         8 my ($code) = @_;
257 7 50       17 return unless $self->{'initialised'};
258 7 100       13 return unless $self->is_done;
259 2         5 TRACE(sprintf 'Running at_end for job %s', $self->id);
260              
261 2         3 eval {$self->return_value($code->($self))};
  2         6  
262 2 50       6 if (my $error = $@) {
263 0         0 $self->abort($error);
264 0         0 die $error;
265             }
266              
267 2         4 return $self->return_value;
268             }
269              
270              
271             #############################################################################
272             # Private Functions
273              
274             sub _create_job {
275 5     5   8 my ($id, $options) = @_;
276              
277 5         8 my $iterations = $options->{iterations};
278 5 50       20 $iterations = 1 unless defined $iterations;
279              
280 5         6 my ($fh, $token, %job);
281 5         15 DUMP('Sub::Slice storage options', $options->{storage_options});
282 5         16 $job{'storage_options'} = $options->{storage_options};
283 5   33     16 $job{'storage_options'}->{path} ||= File::Spec::Functions::tmpdir();
284 5         7 $job{'iterations'} = $iterations;
285 5         36 $job{'token'} = Sub::Slice::Token->new($id, $options->{pin_length});
286 5         13 $job{'blob_threshold'} = $options->{auto_blob_threshold};
287              
288 5         10 return \%job;
289             }
290              
291             # Log::Trace stubs
292 71     71 0 85 sub TRACE {}
293 6     6 0 8 sub DUMP {}
294              
295             #############################################################################
296             # Sub::Slice::Token
297             #############################################################################
298             package Sub::Slice::Token;
299              
300 1     1   7 use constant DEFAULT_PIN_LENGTH => 1e9;
  1         2  
  1         67  
301 1     1   18 use vars '$AUTOLOAD';
  1         2  
  1         41  
302 1     1   5 use Carp;
  1         2  
  1         52  
303 1     1   1128 use POSIX qw(log10 ceil);
  1         6935  
  1         6  
304              
305             sub rebless {
306 16     16   20 my ($class, $token) = @_;
307 16   33     46 $class = ref $class || $class;
308 16 100       45 bless $token, $class if (ref $token eq "HASH");
309 16         24 return $token;
310             }
311              
312             sub new {
313 5     5   10 my $class = shift;
314 5         13 my ($id, $pin_length) = @_;
315 5 100       10 $pin_length = DEFAULT_PIN_LENGTH unless($pin_length);
316              
317 5         24 my $self = bless {
318             id => $id,
319             estimate => 0,
320             count => 0,
321             done => 0,
322             abort => "",
323             error => "",
324             stage => "",
325             pin => $class->random_pin($pin_length),
326             }, $class;
327 5         13 return $self;
328             }
329              
330             sub random_pin {
331 5     5   8 my ($self, $pin_length) = @_;
332 5         51 my $figs = ceil(log10($pin_length));
333 5         117 return sprintf("%0${figs}d", int(rand($pin_length))); #Fixed length random number padded with zeros
334             }
335              
336             sub clean {
337 14     14   17 my $self = shift;
338 14         64 $self->{$_} = "" for qw( abort error status );
339 14         20 return $self;
340             }
341              
342 0     0   0 sub DESTROY {};
343              
344             sub AUTOLOAD {
345 20     20   169 my $self = shift;
346 20         90 (my $name = $AUTOLOAD) =~ s/.*://;
347 20 50       47 if (!exists $self->{$name}) {
348 0         0 croak("undefined method: $name");
349             } else {
350 20         91 return $self->{$name}
351             }
352             }
353              
354             1;
355              
356              
357             =head1 NAME
358              
359             Sub::Slice - split long-running tasks into manageable chunks
360              
361             =head1 SYNOPSIS
362              
363             # Client
364             # Assume methods in the Server:: package are magically remoted
365             my $token = Server::create_token();
366             for(1 .. MAX_ITERATIONS) {
367             Server::do_work($token);
368             last if $token->{done};
369             }
370              
371             # Server
372             # Imagine this is on a remote machine
373             package Server;
374             use Sub::Slice;
375              
376             sub create_token {
377             # create a new job:
378             my $job = new Sub::Slice(
379             backend => 'Filesystem',
380             storage_options => {
381             path => '/var/tmp/myproject/',
382             }
383             );
384             return $job->token;
385             }
386              
387             sub do_work {
388             # loading an existing job:
389             my $job = new Sub::Slice(
390             token => $token
391             backend => 'Filesystem',
392             storage_options => {
393             path => '/var/tmp/myproject/',
394             }
395             );
396              
397             at_start $job
398             sub {
399             $job->store('foo', '1');
400             $job->store('bar', { abc = > 'def' });
401             # store data, initialise
402             $job->set_estimate(10); # estimate number of steps
403             return ( $job->fetch('foo') );
404             };
405              
406             my $foo = $job->fetch('foo');
407              
408             at_stage $job "stage_one",
409             sub {
410             my $bar = $job->fetch('bar');
411             # do stuff
412             $job->next_stage('stage_two') if $some_condition;
413             };
414              
415             at_stage $job "stage_two",
416             sub {
417             # ...do more stuff...
418             # mark job as ready to be deleted
419             $job->done() if $job->count() == $job->estimate();
420             };
421              
422             return $job->return_value(); #Pass back any return value from coderefs
423             }
424              
425             =head1 DESCRIPTION
426              
427             Sub::Slice breaks up a long process into smaller chunks that can be executed
428             one at a time over a stateless protocol such as HTTP/SOAP so that progress may
429             be reported. This means that the client can display progress or cancel the
430             operation part-way through.
431              
432             It works by the client requesting a token from the server, and passing the
433             token back to the server on each iteration. The token passed to the client
434             contains status information which the client can use to determine if the job
435             has completed/failed and to display status/error messages.
436              
437             Within the routine called on each iteration, the server defines a set of
438             coderefs, one of which will be called for a given iteration. In addition the
439             server may define coderefs to be called at the start and end of the job. The
440             server may provide the client with an estimate of the number of iterations the
441             job is likely to take.
442              
443             It is possible to balance performance/usability by modifying the number of
444             iterations that will be executed before returning progress to the client.
445              
446             =head1 METHODS
447              
448             =over 4
449              
450             =item new( %options )
451              
452             Create a new job object. Valid options are:
453              
454             =over 4
455              
456             =item token
457              
458             A token for an existing job (optional)
459              
460             =item iterations
461              
462             The number of chunks to execute before saving the state and returning. Defaults to '1'.
463             This value may be overridden later on by setting the value in the token.
464             Set to 0 for unlimited.
465              
466             =item backend
467              
468             The storage backend.
469             This should either be a fully qualified package name or if no namespace is included it's assumed to be in the
470             Sub::Slice::Backend namespace (e.g. Database would be interpreted as Sub::Slice::Backend::Database).
471             Defaults to Sub::Slice::Backend::Filesystem.
472              
473             =item pin_length
474              
475             The size of the random PIN used to sign the token. Default is 1e9.
476              
477             =item random_pin ($l)
478              
479             Generates a random PIN of length $l. We do this using rand().
480             You might want to override this method if you require
481             cryptographic-quality randomness for your environment.
482              
483             =item auto_blob_threshold
484              
485             If this is set, any strings longer than this number of bytes will be stored as BLOBs automatically
486             (possibly taking advantage of a more efficient BLOB storage mechanism offered by the backend).
487             Note that this does not apply when you store references, only to strings of characters/bytes.
488              
489             =item storage_options
490              
491             A hash of configuration options for the backend storage.
492             See the POD of the backend module (default is L).
493              
494             =back
495              
496             Returns an existing job object with session data for C<$token>
497              
498             =back
499              
500             =head2 METHODS DEFINING STAGES OF ITERATION
501              
502             =over 4
503              
504             =item at_start $job \&coderef
505              
506             Code to initialise the job. This isn't counted as an iteration and will only
507             run once per job.
508              
509             =item at_stage $job $stage_name, \&coderef
510              
511             Executes C<\Ecoderef> up-to C times, B C<$stage_name> is the
512             current stage B if the number of executions in the current session is not
513             greater than C. It is currently required that you have at least one
514             C defined.
515              
516             If the current stage hasn't been set with C, it will implicitly be set to the first
517             C block that is seen.
518              
519             =item at_end $job \&coderef
520              
521             Code to run after the last iteration (unless the job is aborted before then). This isn't counted as an iteration and will only
522             run once per job. It's typically used as a "commit" stage.
523              
524             =back
525              
526             If a job dies in one of these blocks, Sub::Slice sets $job->abort($@) and rethrows the exception.
527             Note that C may not be run if a job is aborted during one of the earlier stages.
528             See L for an example of defensive coding to prevent resources allocated in C
529             leaking if the job is aborted part-way through.
530              
531             =head2 ACCESSOR METHODS
532              
533             =over 4
534              
535             =item $job->token()
536              
537             Returns the token object for this job. The token object will be updated
538             automatically as stages of the sub execute. The token has the following
539             properties which the client can make use of:
540              
541             =over 4
542              
543             =item done
544              
545             Read/write boolean value. Is the job done?
546             Setting this to 1 on the client will cause iterations on the server to cease,
547             and any C cleanup to be done.
548              
549             =item abort
550              
551             Read-only boolean value. Was the job aborted on the server?
552              
553             =item error
554              
555             Read-only. Error message if the job was aborted.
556              
557             =item count
558              
559             Read-only. Number of iterations performed so far.
560              
561             =item estimate
562              
563             Read-only. An estimate of the total number of iterations that will be performed. This may
564             not be totally accurate, depending if new work is "discovered" as the
565             iterations proceed.
566              
567             =item status
568              
569             Read-only. Status message.
570              
571             =item stage
572              
573             Read-only. The next stage that the job will run.
574              
575             =item iterations
576              
577             A write-only property the client can use to
578             control the number of iterations run on the server in the next call. This overrides the
579             default number of iterations set in the Sub::Slice constructor.
580              
581             =back
582              
583             =item $job->id()
584              
585             Returns the ID of the job (issued by the C function in the backend).
586             This is mainly of interest if you are writing a backend and need to get the ID from a job.
587              
588             =item $job->count()
589              
590             Returns the total number of iterations that have been executed.
591              
592             =item $job->estimate()
593              
594             Returns an estimate of how many iterations are required for the job.
595              
596             =item $job->is_done()
597              
598             Returns a boolean value. Is the job done?
599              
600             =item $job->stage()
601              
602             Returns the name of the executing code block, as set by C
603              
604             =item $job->fetch( $key )
605              
606             Returns the user data stored under C<$key>.
607             If no data is found against C<$key>, it automatically tries C to see if data was stored as a blob.
608              
609             =item $job->fetch_blob($key)
610              
611             Returns a lump of data stored using C - see the MUTATOR METHODS.
612              
613             =item $job->return_value()
614              
615             C returns the return value of the stage. This C
616             method will help you avoid mistakes like this:
617              
618             sub do_work {
619             my $job = new Sub::Slice(token => shift());
620             at_stage $job 'mystage', sub {
621             # do stuff
622             return 'abc' #only returns 1 level up
623             };
624             #nowt returned from do_work
625             }
626              
627             The caller of do_work() will not receive the return value inside the 'mystage' sub {}
628             This might be better written as :
629              
630             sub do_work {
631             my $job = new Sub::Slice(token => shift());
632             at_stage $job 'mystage', sub {
633             # do stuff
634             return 'abc' #only returns 1 level up
635             };
636             return $job->return_value(); # 'abc'
637             }
638              
639             =back
640              
641             =head2 MUTATOR METHODS THAT SET VALUES IN THE TOKEN
642              
643             =over 4
644              
645             =item $job->set_estimate( $int )
646              
647             Populates the C field in the token with an estimate of how many
648             iterations are required for this job to complete.
649              
650             =item $job->done()
651              
652             Mark the job as completed successfully. This sets the done flag in the token.
653             Serialised object data will be removed when the object is destroyed.
654              
655             =item $job->abort( $reason )
656              
657             Mark the job as aborted. This sets the abort flag in the token.
658             The optional $reason message will be stored in the token's error string.
659             Serialised object data will be removed when the object is destroyed.
660              
661             =item $job->status( $status_text )
662              
663             Set the status field in the token. This might be useful to inform users about
664             what is about to happen in the next iteration of the job.
665              
666             =back
667              
668             =head2 OTHER MUTATOR METHODS
669              
670             =over 4
671              
672             =item $job->next_stage( $stage_name )
673              
674             Tell the C<$job> object that the next time the routine is called, it should
675             execute the block named C<$stage_name>. Unless C is set, the first
676             at_stage block will be executed.
677              
678             =item $job->store( $key => $value, $key2 => $value2, ... )
679              
680             Store some user data in the object. C<$value> can be a scalar containing any
681             perl data type (such as hash/array references) - it will be automatically
682             serialised.
683              
684             Note that some objects may not be suited to serialisation. For example if
685             an object is blessed into a package that is Cd at runtime, when it is
686             deserialised, the required package may not actually be loaded.
687              
688             There may also be issues serialising some objects like DBI database handles and
689             XML::Parser objects, although this is potentially backend-specific (Filesystem
690             uses Storable, and some objects may provide serialisation hooks).
691              
692             C<$value> is optional (if not specified, C<$value> will be set to undef).
693              
694             =item $job->store_blob($key => $blob)
695              
696             Allows large lumps of data to be stored efficiently by the back end.
697              
698             =back
699              
700             =head1 VERSION
701              
702             $Revision: 1.48 $ on $Date: 2005/11/23 14:31:51 $ by $Author: colinr $
703              
704             =head1 AUTHOR
705              
706             Simon Flack and John Alden with additions by Tim Sweetman
707              
708             =head1 COPYRIGHT
709              
710             (c) BBC 2005. This program is free software; you can redistribute it and/or modify it under the GNU GPL.
711              
712             See the file COPYING in this distribution, or http://www.gnu.org/licenses/gpl.txt
713              
714             =cut