File Coverage

blib/lib/POE/Component/ResourcePool.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             #!/usr/bin/perl
2              
3             package POE::Component::ResourcePool;
4 2     2   6314 use MooseX::POE;
  0            
  0            
5              
6             use Carp::Clan qr/^(?:POE::Component::ResourcePool|Moose|Class::MOP)/;
7              
8             use Tie::RefHash;
9             use Tie::RefHash::Weak;
10              
11             #use MooseX::Types::Set::Object;
12              
13             our $VERSION = "0.04";
14              
15             # nested pools?
16             # with qw(POE::Component::ResourcePool::Resource);
17              
18             use POE::Component::ResourcePool::Resource; # load type constraint
19              
20             use POE::Component::ResourcePool::Request;
21              
22             with qw(MooseX::POE::Aliased);
23              
24             sub spawn { shift->new(@_) }
25              
26             has resources => (
27             isa => "HashRef[POE::Component::ResourcePool::Resource]",
28             is => "ro",
29             required => 1,
30             );
31              
32             has weak_queue => (
33             isa => "Bool",
34             is => "ro",
35             default => 0,
36             );
37              
38             has refcount_pending => (
39             isa => "Bool",
40             is => "ro",
41             default => 1,
42             );
43              
44             has refcount_allocated => (
45             isa => "Bool",
46             is => "ro",
47             default => 0,
48             );
49              
50             sub BUILD {
51             my $self = shift;
52              
53             $self->MooseX::POE::Aliased::BUILD(@_);
54              
55             foreach my $resource ( values %{ $self->resources } ) {
56             $resource->register_pool($self);
57             }
58             }
59              
60             sub DEMOLISH {
61             my $self = shift;
62              
63             # the extra checks are because in global destruction these values are
64             # sometimes already gone
65             foreach my $resource ( grep { defined } values %{ $self->resources || return } ) {
66             $resource->unregister_pool($self);
67             }
68             }
69              
70             has request_class => (
71             isa => "ClassName",
72             is => "rw",
73             default => "POE::Component::ResourcePool::Request",
74             );
75              
76             has params => (
77             isa => "HashRef[HashRef]",
78             is => "ro",
79             init_arg => undef,
80             default => sub { Tie::RefHash::Weak::fieldhash my %h },
81             );
82              
83             sub request {
84             my ( $self, @args ) = @_;
85              
86             my $request = @args == 1 ? $args[0] : $self->create_request( @args );
87              
88             $self->queue($request);
89              
90             return $request;
91             }
92              
93             sub create_request {
94             my ( $self, @args ) = @_;
95              
96             $self->construct_request( pool => $self, @args );
97             }
98              
99             sub construct_request {
100             my ( $self, @args ) = @_;
101              
102             $self->request_class->new( @args );
103             }
104              
105             sub dismiss {
106             my ( $self, $request ) = @_;
107              
108             $request->_dismissed(1);
109              
110             $self->_remove_from_queue($request);
111              
112             $self->_free_allocations($request);
113             }
114              
115             sub queue {
116             my ( $self, $request ) = @_;
117              
118             $self->_queue_request($request);
119              
120             $poe_kernel->refcount_increment( $request->session_id, __PACKAGE__ . "::pending_requests" ) if $self->refcount_pending;
121              
122             $self->yield( new_request => $request );
123             }
124              
125             sub resource_updated {
126             my ( $self, $resource, @requests ) = @_;
127              
128             unless ( @requests ) {
129             @requests = $self->_requests_by_resource->{$resource}->members;
130             }
131              
132             my @ready = $self->_unblock_resource( $resource, @requests );
133              
134             $self->call( requests_ready => @ready );
135             }
136              
137             sub pending_requests {
138             my ( $self, $resource ) = @_;
139              
140             if ( $resource ) {
141             $resource = $self->resources->{$resource} unless ref $resource;
142             return $self->_requests_for_resource($resource)->members;
143             } else {
144             return keys %{ $self->_resources_by_request };
145             }
146             }
147              
148             sub allocated_requests {
149             my ( $self, $resource ) = @_;
150              
151             if ( $resource ) {
152             my $resources = $self->resources;
153              
154             my $resource_name = ref $resource
155             ? (grep { $resources->{$_} == $resource } keys %$resources )[0]
156             : $resource;
157              
158             my $allocations = $self->_allocations;
159              
160             return grep { exists $allocations->{$_}{$resource_name} } keys %$allocations;
161             } else {
162             return keys %{ $self->_allocations };
163             }
164             }
165              
166             sub all_requests {
167             my ( $self, @args ) = @_;
168              
169             return (
170             $self->pending_requests(@args),
171             $self->allocated_requests(@args),
172             )
173             }
174              
175             sub shutdown { shift->clear_alias }
176              
177             # keyed by request
178             has _allocations => (
179             isa => "HashRef[HashRef[ArrayRef]]",
180             is => "ro",
181             init_arg => undef,
182             default => sub { Tie::RefHash::Weak::fieldhash my %h },
183             );
184              
185              
186             # these attributes and methods implement the qeueue
187             has _requests_by_resource => (
188             #isa => "HashRef[Set::Object[POE::Component::ResourcePool::Request]]",
189             is => "ro",
190             init_arg => undef,
191             default => sub { tie my %h, 'Tie::RefHash'; \%h },
192             );
193              
194             has _resources_by_request => (
195             #isa => "HashRef[HashRef[Set::Object[POE::Component::ResourcePool::Resource]]]",
196             is => "ro",
197             init_arg => undef,
198             lazy_build => 1,
199             );
200              
201             sub _build__resources_by_request {
202             my $self = shift;
203              
204             tie my %h, $self->weak_queue ? "Tie::RefHash:Weak" : "Tie::RefHash";
205              
206             return \%h;
207             }
208              
209             sub _queue_request {
210             my ( $self, $request ) = @_;
211              
212             $self->_validate_request_params($request);
213              
214             my $resources = $self->resources;
215              
216             my %resources = map { $_ => $resources->{$_} } keys %{ $request->params };
217              
218             foreach my $set ( @{ $self->_requests_by_resource }{ values %resources } ) {
219             $set ||= Set::Object::Weak->new;
220             $set->insert($request);
221             }
222              
223             $self->_resources_by_request->{$request} = {
224             blocked => Set::Object->new(),
225             ready => Set::Object->new( values %resources ),
226             };
227              
228             foreach my $resource_name ( keys %resources ) {
229             $resources{$resource_name}->register_request( $self, $request, $request->params->{$resource_name} );
230             }
231             }
232              
233             sub _remove_from_queue {
234             my ( $self, $request ) = @_;
235              
236             return unless exists $self->_resources_by_request->{$request};
237              
238             $poe_kernel->refcount_decrement( $request->session_id, __PACKAGE__ . "::pending_requests" ) if $self->refcount_pending;
239              
240             my @resources = $self->_all_resources_for_request($request);
241              
242             $_->forget_request($self, $request) for @resources;
243              
244             foreach my $set ( @{ $self->_requests_by_resource }{ @resources } ) {
245             $set->remove($request);
246             }
247              
248             delete $self->_resources_by_request->{$request};
249             }
250              
251             sub _requests_for_resource {
252             my ( $self, $resource ) = @_;
253              
254             $self->_requests_by_resource->{$resource};
255             }
256              
257             sub _resource_sets_for_request {
258             my ( $self, $request ) = @_;
259              
260             @{ $self->_resources_by_request->{$request} || return }{qw(ready blocked)}
261             }
262              
263             sub _blocked_resources_for_request {
264             my ( $self, $request ) = @_;
265              
266             $self->_resources_by_request->{$request}{blocked};
267             }
268              
269             sub _all_resources_for_request {
270             my ( $self, $request ) = @_;
271              
272             map { $_->members } grep { defined } $self->_resource_sets_for_request($request);
273             }
274              
275             sub _unblock_resource {
276             my ( $self, $resource, @requests ) = @_;
277              
278             my @ret;
279              
280             foreach my $request ( @requests ) {
281             my ( $ready, $blocked ) = $self->_resource_sets_for_request($request);
282              
283             if ( $blocked->remove($resource) ) {
284             $ready->insert($resource);
285             push @ret, $request if $blocked->is_null;
286             }
287             }
288              
289             return @ret;
290             }
291              
292             sub _block_resource {
293             my ( $self, $resource, @requests ) = @_;
294              
295             foreach my $request ( @requests ) {
296             my ( $ready, $blocked ) = $self->_resource_sets_for_request($request);
297              
298             $ready->remove($resource) and $blocked->insert($resource);
299             }
300             }
301              
302             # end of queue methods
303              
304              
305              
306             sub _validate_request_params {
307             my ( $self, $request ) = @_;
308              
309             my $params = $request->params;
310             my $resources = $self->resources;
311              
312             if ( my @missing = grep { not exists $resources->{$_} } keys %$params ) {
313             croak "request $request has parameters for which no resource can be found: " . join ", ", @missing;
314             }
315              
316             my @failed;
317             foreach my $name ( keys %$params ) {
318             my $resource = $resources->{$name};
319              
320             unless ( $resource->could_allocate( $self, $request, $params->{$name} ) ) {
321             push @failed, $name;
322             }
323             }
324              
325             if ( @failed ) {
326             croak "The following resources rejected $request: " . join ", ", @failed;
327             }
328             }
329              
330             event new_request => sub {
331             my ( $self, $request ) = @_[OBJECT, ARG0 .. $#_];
332              
333             $self->_try_allocating($request);
334             };
335              
336             event requests_ready => sub {
337             my ( $self, @requests ) = @_[OBJECT, ARG0 .. $#_];
338              
339             foreach my $req ( @requests ) {
340             $self->_try_allocating($req);
341             }
342             };
343              
344              
345             sub _free_allocations {
346             my ( $self, $request ) = @_;
347              
348             $poe_kernel->refcount_decrement( $request->session_id, __PACKAGE__ . "::allocated_requests" ) if $self->refcount_allocated;
349              
350             my $allocations = delete $self->_allocations->{$request} || return;
351              
352             my $resources = $self->resources;
353              
354             foreach my $name ( keys %$allocations ) {
355             $resources->{$name}->free_allocation( $self, $request, @{ $allocations->{$name} } );
356             }
357             }
358              
359             sub _try_allocating {
360             my ( $self, $request ) = @_;
361              
362             return if $request->fulfilled;
363              
364             my $blocked = $self->_blocked_resources_for_request($request);
365              
366             return unless $blocked->is_null; # can't allocate if there are blocking resources
367              
368             my $resources = $self->resources;
369              
370             my $params = $request->params;
371              
372             my %allocations;
373              
374             # attempt to allocate the value from each resource
375             foreach my $resource_name ( keys %$params ) {
376             my $res_params = $params->{$resource_name};
377              
378             my $resource = $resources->{$resource_name};
379              
380             my @allocation = $resource->try_allocating( $self, $request, $params->{$resource_name} );
381              
382             if ( @allocation ) {
383             $allocations{$resource_name} = \@allocation;
384             } else {
385             $self->_block_resource($resource, $request);
386             }
387             }
388              
389             # if no allocations failed then the blocked set is still empty
390             return unless $blocked->is_null;
391              
392             $poe_kernel->refcount_increment( $request->session_id, __PACKAGE__ . "::finalizing_allocation" );
393              
394             $poe_kernel->refcount_increment( $request->session_id, __PACKAGE__ . "::allocated_requests" ) if $self->refcount_allocated;
395              
396             # the item can now be removed from the queue, and dispatched
397             $self->_remove_from_queue($request);
398              
399             $request->_fulfilled(1);
400              
401             $self->_allocations->{$request} = \%allocations;
402              
403             my %output_params;
404              
405             foreach my $resource_name ( keys %$params ) {
406             my $resource = $resources->{$resource_name};
407             $output_params{$resource_name} = $resource->finalize_allocation( $self, $request, @{ $allocations{$resource_name} } );
408             }
409              
410             $request->_results(\%output_params);
411              
412             $request->invoke_callback( pool => $self, %output_params );
413              
414             $poe_kernel->refcount_decrement( $request->session_id, __PACKAGE__ . "::finalizing_allocation" );
415              
416             return $request;
417             }
418              
419             no MooseX::POE;
420              
421             __PACKAGE__
422              
423             __END__
424              
425             =pod
426              
427             =head1 NAME
428              
429             POE::Component::ResourcePool - Asynchronous generic resource management for POE
430             based apps.
431              
432             =head1 SYNOPSIS
433              
434             my $resource = POE::Component::ResourcePool::Resource::Blah->new( ... );
435              
436             my $pool = POE::Component::ResourcePool->new(
437             resources => {
438             moose => $resource,
439             elk => ...,
440             cow => ...,
441             },
442             );
443              
444             # ... in some session somewhere:
445              
446             $pool->request(
447             params => {
448             moose => ..., # arbitrary params for Blah type resources
449             elk => ...,
450             },
451             event => "got_it", # dispatched when both moose and elk can be allocated at the same time
452             );
453              
454             =head1 DESCRIPTION
455              
456             This resource pool object provides very flexible resource management for POE
457             based apps.
458              
459             A pool consists of any number of named, abstract resources to be shared amongst
460             several tasks.
461              
462             Requests for resources can contain arbitrary parameters and are fulfilled with
463             arbitrary values.
464              
465             The pool will manage resources, sharing them between requests as they become
466             available.
467              
468             Using a simple interface one can easily write arbitrary resource abstractions,
469             which are potentially affected by outside mechanisms (for example the token
470             bucket resource allows time based throttling).
471              
472             =head1 QUEUE ALGORITHM
473              
474             The request queue works by maintaining a set of ready and blocked resources for
475             each request.
476              
477             Whenever all the resources for a given request are ready the pool will attempt
478             to allocate the request.
479              
480             If any resource failed to allocate the parameter specified for it in the
481             request, it is marked as blocked for that request.
482              
483             If all resources succeeded the allocations are finalized, the request callback
484             is invoked and the request is removed from the queue.
485              
486             Whenever a resource signals that it's been updated (for example if its
487             allocation has been freed, or if some other POE event changed it) it will be
488             marked as ready for allocation in the queue again.
489              
490             =head1 REFERENCE MANAGEMENT
491              
492             Based on the values of C<refcount_allocated> (defaults to false) and
493             C<refcount_pending> (defaults to true) the resource pool will increment the
494             reference count of sessions that have created requests and decrement it when
495             the resource is fulfilled.
496              
497             This is because typically a session is not doing anything, and as such has no
498             resources/events associated with it while it waits for a resource.
499              
500             Once the resource is allocated the session will probably have at least one more
501             event (depending on the callback), and will continue working until it's done,
502             at which point the kernel will garbage collect it.
503              
504             This default behavior allows you to simply keep your requests on the heap so
505             that when the session closes automatically fulfilled requests will be freed.
506              
507             Setting C<refcount_allocated> will cause the session to remain alive until the
508             resource is dismissed (whether manually or due to C<DESTROY>). Note that if
509             C<refcount_allocated> is true and the resource is kept on the heap a circular
510             reference is caused and the session will leak unless the resource is explicitly
511             dismissed.
512              
513             Setting C<refcount_pending> to a false value may cause sessions to disappear
514             prematurely. The resource pool will not check that the session still exists
515             when issuing the callback so this may cause problems.
516              
517             =head1 METHODS
518              
519             =over 4
520              
521             =item new %args
522              
523             =item spawn %args
524              
525             Construct a new resource pool.
526              
527             See L</ATTRIBUTES> for parameters.
528              
529             C<spawn> is provided as an alias due to L<POE::Component> convensions.
530              
531             =item request %args
532              
533             =item request $req
534              
535             Queues a new request, optionally creating a request object based on the
536             C<request_class> attribute.
537              
538             See L<POE::Component::ResourcePool::Request>.
539              
540             =item create_request %args
541              
542             Used by C<request> to create a request object with some default arguments in
543             addition to the supplied ones.
544              
545             Delegates to C<construct_request>.
546              
547             =item construct_request @args
548              
549             Calls C<new> on the class returned by C<request_class> with the provided arguments.
550              
551             =item queue $request
552              
553             Inserts the request into the queue.
554              
555             =item dismiss $request
556              
557             Deallocates the request if it has been fulfilled, or cancels it otherwise.
558              
559             =item shutdown
560              
561             Remove the alias for the pool, causing its session to close.
562              
563             =item resource_updated $resource, [ @requests ]
564              
565             Called by resources to signal that a resource has been updated.
566              
567             C<@requests> can be specified in order to only recheck certain requests
568             (instead of all the requests associated with the resource).
569              
570             =item pending_requests
571              
572             Returns a list of the currently pending requests.
573              
574             If a resource is specified as the first argument then only returns the requests
575             for that resource.
576              
577             =item allocated_requests
578              
579             Returns a list of the currently allocated requests.
580              
581             If a resource is specified as the first argument then only returns the requests
582             for that resource.
583              
584             =item all_requests
585              
586             Returns all the requests active in the pool (pending and allocated).
587              
588             If a resource is specified as the first argument then only returns the requests
589             for that resource.
590              
591             =back
592              
593             =head1 ATTRIBUTES
594              
595             =over 4
596              
597             =item resources
598              
599             The hash of resources to manage.
600              
601             Resources may be shared by several pools.
602              
603             Modifying this hash is not supported yet but might be in the future using a
604             method API.
605              
606             =item alias
607              
608             Comes from L<MooseX::POE::Aliased>.
609              
610             Note that the alias is not currently useful for anything, since the only events
611             the resource pool currently responds to are internal.
612              
613             =item request_class
614              
615             The class to use when constructing new request objects.
616              
617             =item weak_queue
618              
619             Normally strong references are made to requests in the queue, to prevent their
620             destruction.
621              
622             When requests leave the queue all references to them maintained by the pool are
623             weak, so that if the request gets garbage collected its allocations may be
624             returned to the resources.
625              
626             If this parameter is set then unfulfilled requests will also be weak, so that
627             requests which are no longer referenced elsewhere are canceled.
628              
629             =item refcount_pending
630              
631             Whether or not to maintain POE reference counts for sessions that have pending
632             requests.
633              
634             Defaults to true.
635              
636             See L</REFERENCE MANAGEMENT>.
637              
638             =item refcount_allocated
639              
640             Whether or not to maintain POE reference counts for sessions that have
641             allocated requests.
642              
643             Defaults to false.
644              
645             See L</REFERENCE MANAGEMENT>.
646              
647             =back
648              
649             =head1 TODO
650              
651             =head2 Prioritization
652              
653             Resource contention is a problem, so a pluggable scheduler should be available,
654             with the default one being a FIFO (the current order is based on
655             L<Set::Object>'s internal hasing).
656              
657             The module should ship with a priority based FIFO queue that supports priority
658             inheritence as well, in order to provide decent prioritization facilities out
659             of the box.
660              
661             =head2 Nestability
662              
663             Allow pools to also behave as resources in other pools.
664              
665             This should be fairly easy.
666              
667             =head2 Allow weak lifetime without an alias
668              
669             Try to find a way for L<POE> to keep the pool alive as long as other sessions
670             may use it, just like when it's got an alias, but without needing to set one.
671              
672             This is very annoying for resources that need their own sessions, as it rarely
673             akes sense for them to also have aliases.
674              
675             =head1 VERSION CONTROL
676              
677             This module is maintained using Darcs. You can get the latest version from
678             L<http://nothingmuch.woobling.org/code>, and use C<darcs send> to commit
679             changes.
680              
681             =head1 AUTHOR
682              
683             Yuval Kogman E<lt>nothingmuch@woobling.orgE<gt>
684              
685             =head1 COPYRIGHT
686              
687             Copyright (c) 2008 Yuval Kogman. All rights reserved
688             This program is free software; you can redistribute
689             it and/or modify it under the same terms as Perl itself.
690              
691             =cut