File Coverage

blib/lib/Verby/Dispatcher.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             #!/usr/bin/perl
2              
3             package Verby::Dispatcher;
4 1     1   9711 use Moose;
  0            
  0            
5              
6             our $VERSION = "0.05";
7              
8             use Set::Object;
9             use Verby::Context;
10             use Carp qw/croak/;
11             use Tie::RefHash;
12              
13             use POE;
14              
15             require overload;
16              
17             has step_set => (
18             isa => "Set::Object",
19             is => "ro",
20             default => sub { Set::Object->new },
21             );
22              
23             has satisfied_set => (
24             isa => "Set::Object",
25             is => "ro",
26             default => sub { Set::Object->new },
27             );
28              
29             has cxt_of_step => (
30             isa => "HashRef",
31             is => "ro",
32             default => sub {
33             tie my %cxt_of_step, "Tie::RefHash";
34             return \%cxt_of_step;
35             },
36             );
37              
38             has derivable_cxts => (
39             isa => "HashRef",
40             is => "ro",
41             default => sub {
42             tie my %derivable_cxts, "Tie::RefHash";
43             return \%derivable_cxts;
44             },
45             );
46              
47             has config_hub => (
48             isa => "Object",
49             is => "rw",
50             default => sub {
51             require Verby::Config::Data;
52             Verby::Config::Data->new;
53             },
54             );
55              
56             has global_context => (
57             isa => "Object",
58             is => "ro",
59             lazy => 1,
60             default => sub { $_[0]->config_hub->derive("Verby::Context") },
61             );
62              
63             has resource_pool => (
64             isa => "POE::Component::ResourcePool",
65             is => "ro",
66             predicate => "has_resource_pool",
67             );
68              
69             sub add_step {
70             my $self = shift;
71              
72             my $steps = $self->step_set;
73              
74             foreach my $step (@_) {
75             next if $steps->includes($step);
76              
77             $self->add_step($step->depends);
78              
79             (my $logger = $self->global_context->logger)->debug("adding step $step");
80             $steps->insert($step);
81             }
82             }
83              
84             sub add_steps {
85             my $self = shift;
86             $self->add_step(@_);
87             }
88              
89             sub get_cxt {
90             my $self = shift;
91             my $step = shift;
92              
93             $self->cxt_of_step->{$step} ||= Verby::Context->new($self->get_derivable_cxts($step));
94             }
95              
96             sub get_derivable_cxts {
97             my $self = shift;
98             my $step = shift;
99              
100             @{ $self->derivable_cxts->{$step} ||= (
101             $step->provides_cxt
102             ? [ Verby::Context->new($self->get_parent_cxts($step)) ]
103             : [ $self->get_parent_cxts($step) ]
104             )};
105             }
106              
107             sub get_parent_cxts {
108             my $self = shift;
109             my $step = shift;
110              
111             if ( my @cxts = map { $self->get_derivable_cxts($_) } $step->depends ) {
112             return @cxts;
113             } else {
114             return $self->global_context;
115             }
116             }
117              
118             sub create_poe_sessions {
119             my ( $self ) = @_;
120              
121             my $g_cxt = $self->global_context;
122             $g_cxt->logger->debug("Creating parent POE session");
123              
124             POE::Session->create(
125             inline_states => {
126             _start => sub {
127             my ( $kernel, $heap ) = @_[KERNEL, HEAP];
128             my $self = $heap->{verby_dispatcher};
129              
130             # FIXME
131             # handle sigint
132              
133             my $g_cxt = $self->global_context;
134              
135             my $all_steps = $self->step_set;
136             my $satisfied = $self->satisfied_set;
137              
138             my $pending = $all_steps->difference( $satisfied );
139              
140             foreach my $step ( $pending->members ) {
141             $g_cxt->logger->debug("Creating POE session for step $step");
142              
143             POE::Session->create(
144             inline_states => {
145             _start => sub {
146             my ( $kernel, $session) = @_[KERNEL, SESSION];
147              
148             $kernel->sig("VERBY_STEP_FINISHED" => "step_finished");
149             $kernel->refcount_increment( $session->ID, "unresolved_dependencies" );
150              
151             $kernel->yield("try_executing_step");
152             },
153             step_finished => sub {
154             my ( $kernel, $heap, $done ) = @_[KERNEL, HEAP, ARG1];
155              
156             my $deps = $heap->{dependencies};
157              
158             if ( $deps->includes($done) ) {
159             $deps->remove( $done );
160             $kernel->yield("try_executing_step") unless $deps->size;
161             }
162             },
163             try_executing_step => sub {
164             my ( $kernel, $session, $heap ) = @_[KERNEL, SESSION, HEAP];
165              
166             return if $heap->{dependencies}->size; # don't run if we're waiting
167             return if $heap->{ran}++; # don't run twice
168              
169             $heap->{g_cxt}->logger->debug("All dependencies of '$step' have finished, starting");
170              
171             $kernel->sig("VERBY_STEP_FINISHED"); # we're no longer waiting for other steps to finish
172             $kernel->refcount_decrement( $session->ID, "unresolved_dependencies" );
173              
174             if ( my $pool = $heap->{resource_pool} and my @req = $heap->{step}->resources ) {
175             $heap->{resource_request} = $pool->request(
176             params => { @req },
177             event => "execute_step",
178             );
179             } else {
180             $kernel->call( $session, "execute_step" );
181             }
182             },
183             execute_step => sub {
184             my ( $kernel, $session, $heap ) = @_[KERNEL, SESSION, HEAP];
185              
186             # this may create child sessions. If it doesn't this session will go away
187             $heap->{verby_dispatcher}->start_step( $heap->{step}, \@_ );
188             },
189             _stop => sub {
190             my ( $kernel, $heap ) = @_[KERNEL, HEAP];
191             my $step = $heap->{step};
192              
193             if ( my $request = delete $heap->{resource_request} ) {
194             $request->dismiss;
195             }
196              
197             $heap->{g_cxt}->logger->info("step $step has finished.");
198              
199             $_->() for @{ $heap->{post_hooks} };
200              
201             return $step;
202             },
203             DIE => sub { $_[HEAP]{g_cxt}->logger->warn("cought exception: @_") },
204             _child => sub { $_[HEAP]{g_cxt}->logger->debug("Step $_[HEAP]{step} _child event: $_[ARG0]") },
205             },
206             heap => {
207             %{ $heap },
208             step => $step,
209             dependencies => Set::Object->new( $step->depends )->difference($satisfied),
210             ran => 0,
211             post_hooks => [],
212             },
213             );
214             }
215             },
216             _child => sub {
217             my ( $kernel, $session, $heap, $type, $step ) = @_[KERNEL, SESSION, HEAP, ARG0, ARG2];
218              
219             if ( $type eq "lose" ) {
220             $heap->{satisfied}->insert($step);
221             $kernel->signal( $session, "VERBY_STEP_FINISHED", $step );
222             }
223             },
224             DIE => sub { $_[HEAP]{g_cxt}->logger->warn("cought exception: @_") },
225             _stop => sub { $_[HEAP]{g_cxt}->logger->debug("parent POE session closing") },
226             },
227             heap => {
228             verby_dispatcher => $self,
229             g_cxt => $g_cxt, # convenience
230             satisfied => $self->satisfied_set,
231             ( $self->has_resource_pool ? ( resource_pool => $self->resource_pool ) : () ),
232             }
233             );
234             }
235              
236             sub do_all {
237             my $self = shift;
238             $self->create_poe_sessions;
239             $self->global_context->logger->debug("Starting POE main loop");
240             $poe_kernel->run;
241             }
242              
243             sub start_step {
244             my ( $self, $step, $poe ) = @_;
245              
246             my $g_cxt = $self->global_context;
247             my $cxt = $self->get_cxt($step);
248              
249             if ($step->is_satisfied($cxt, $poe)){
250             $g_cxt->logger->debug("step $step has already been satisfied, running isn't necessary.");
251             return;
252             }
253              
254             $g_cxt->logger->debug("starting step $step");
255             $step->do($cxt, $poe);
256             }
257              
258             sub _set_members_query {
259             my $self = shift;
260             my $set = shift;
261             return wantarray ? $set->members : $set->size;
262             }
263              
264             sub steps {
265             my $self = shift;
266             $self->_set_members_query($self->step_set);
267             }
268              
269             sub is_satisfied {
270             my $self = shift;
271             my $step = shift;
272              
273             croak "$step is not registered at all"
274             unless $self->step_set->contains($step);
275              
276             $self->satisfied_set->contains($step);
277             }
278              
279             __PACKAGE__
280              
281             __END__
282              
283             =pod
284              
285             =head1 NAME
286              
287             Verby::Dispatcher - Takes steps and executes them. Sort of like what make(1) is to a
288             Makefile.
289              
290             =head1 SYNOPSIS
291              
292             use Verby::Dispatcher;
293             use Verby::Config::Data; # or something equiv
294              
295             my $c = Verby::Config::Data->new(); # ... needs the "logger" field set
296              
297             my $d = Verby::Dispatcher->new;
298             $d->config_hub($c);
299              
300             $d->add_steps(@steps);
301              
302             $d->do_all;
303              
304             =head1 DESCRIPTION
305              
306             =head1 ATTRIBUTES
307              
308             =item B<resource_pool>
309              
310             If provided with a L<POE::Component::ResourcePool> instance, that resource pool
311             will be used to handle resource allocation.
312              
313             The L<Verby::Step/resources> method is used to declare the required resources
314             for each step.
315              
316             =item B<step_set>
317              
318             Returns the L<Set::Object> that is used for internal bookkeeping of the steps
319             involved.
320              
321             =item B<satisfied_set>
322              
323             Returns the L<Set::Object> that is used to track which steps are satisfied.
324              
325             =item B<config_hub>
326              
327             The configuration hub that all contexts inherit from.
328              
329             Defaults to an empty parameter set.
330              
331             =item B<global_context>
332              
333             The global context objects.
334              
335             Defaults to a derivation of B<config_hub>.
336              
337             =head1 METHODS
338              
339             =over 4
340              
341             =item B<new>
342              
343             Returns a new L<Verby::Dispatcher>. Duh!
344              
345             =item B<add_steps *@steps>
346              
347             =item B<add_step *@steps>
348              
349             Add a number of steps into the dispatcher pool.
350              
351             Anything returned from L<Verby::Step/depends> is aggregated recursively here, and
352             added into the batch too.
353              
354             =item B<do_all>
355              
356             Calculate all the dependencies, and then dispatch in order.
357              
358             =back
359              
360             =begin private
361              
362             =over 4
363              
364             =item B<is_satisfied $step>
365              
366             Whether or not $step does not need to be executed (because it was already
367             executed or because it didn't need to be in the first place).
368              
369             =item B<get_cxt $step>
370              
371             Returns the context associated with $step. This is where $step will write it's
372             data.
373              
374             =item B<get_derivable_cxts $step>
375              
376             Returns the contexts to derive from, when creating a context for $step.
377              
378             If $step starts a new context (L<Step/provides_cxt> is true) then a new context
379             is created here, derived from get_parent_cxts($step). Otherwise it simply
380             returns get_parent_cxts($step).
381              
382             Note that when a step 'provides a context' this really means that a new context
383             is created, and this context is derived for the step, and any step that depends
384             on it.
385              
386             =item B<get_parent_cxts $step>
387              
388             If $step depends on any other steps, take their contexts. Otherwise, returns
389             the global context.
390              
391             =item B<start_step $step>
392              
393             Starts the
394              
395             =item B<steps>
396              
397             Returns a list of steps that the dispatcher cares about.
398              
399             =back
400              
401             =end
402              
403             =head1 BUGS
404              
405             None that we are aware of. Of course, if you find a bug, let us know, and we
406             will be sure to fix it.
407              
408             =head1 CODE COVERAGE
409              
410             We use B<Devel::Cover> to test the code coverage of the tests, please refer to
411             COVERAGE section of the L<Verby> module for more information.
412              
413             =head1 SEE ALSO
414              
415             =head1 AUTHOR
416              
417             Yuval Kogman, E<lt>nothingmuch@woobling.orgE<gt>
418             stevan little, E<lt>stevan@iinteractive.comE<gt>
419              
420             =head1 COPYRIGHT AND LICENSE
421              
422             Copyright 2005-2008 by Infinity Interactive, Inc.
423              
424             L<http://www.iinteractive.com>
425              
426             This library is free software; you can redistribute it and/or modify
427             it under the same terms as Perl itself.
428              
429             =cut