File Coverage

blib/lib/Broker/Async.pm
Criterion Covered Total %
statement 61 61 100.0
branch 11 14 78.5
condition 1 3 33.3
subroutine 13 13 100.0
pod 1 6 16.6
total 87 97 89.6


line stmt bran cond sub pod time code
1             package Broker::Async;
2 6     6   2339 use strict;
  6         9  
  6         133  
3 6     6   18 use warnings;
  6         6  
  6         111  
4 6     6   2002 use Broker::Async::Worker;
  6         10  
  6         117  
5 6     6   25 use Carp;
  6         25  
  6         266  
6 6     6   20 use Scalar::Util qw( blessed weaken );
  6         5  
  6         362  
7              
8             =head1 NAME
9              
10             Broker::Async - broker tasks for multiple workers
11              
12             =for html
13              
14             =head1 SYNOPSIS
15              
16             my @workers;
17             for my $uri (@uris) {
18             my $client = SomeClient->new($uri);
19             push @workers, sub { $client->request(@_) };
20             }
21              
22             my $broker = Broker::Async->new(workers => \@workers);
23             for my $future (map $broker->do($_), @requests) {
24             my $result = $future->get;
25             ...
26             }
27              
28             =head1 DESCRIPTION
29              
30             This module brokers tasks for multiple asynchronous workers. A worker can be any code reference that returns a L, representing work awaiting completion.
31              
32             Some common use cases include throttling asynchronous requests to a server, or delegating tasks to a limited number of processes.
33              
34             =cut
35              
36             our $VERSION = "0.0.1"; # __VERSION__
37              
38             =head1 ATTRIBUTES
39              
40             =head2 workers
41              
42             An array ref of workers used for handling tasks.
43             Can be a code reference, a hash ref of L arguments, or a L object.
44             Every invocation of a worker must return a L object.
45              
46             Under the hood, code and hash references are simply used to instantiate a L object.
47             See L for more documentation about how these parameters are used.
48              
49             =cut
50              
51             use Class::Tiny qw( workers ), {
52 5         32 queue => sub { [] },
53 6     6   19 };
  6         5  
  6         28  
54              
55             =head1 METHODS
56              
57             =head2 new
58              
59             my $broker = Broker::Async->new(
60             workers => [ sub { ... }, ... ],
61             );
62              
63             =cut
64              
65             sub active {
66 130     130 0 3728 my ($self) = @_;
67 130         3747 return grep { $_->active } @{ $self->workers };
  1045         28917  
  130         37809  
68             }
69              
70             sub available {
71 408     408 0 12356 my ($self) = @_;
72 408         5085 return grep { $_->available } @{ $self->workers };
  3740         146600  
  408         53531  
73             }
74              
75             sub BUILD {
76 10     10 0 53819 my ($self) = @_;
77 10         18 for my $name (qw( workers )) {
78 10 50       224 croak "$name attribute required" unless defined $self->$name;
79             }
80              
81 10         183 my $workers = $self->workers;
82 10 50       41 croak "workers attribute must be an array ref: received $workers"
83             unless ref($workers) eq 'ARRAY';
84              
85 10         27 for (my $i = 0; $i < @$workers; $i++) {
86 21         105 my $worker = $workers->[$i];
87              
88 21         21 my $type = ref($worker);
89 21 100       39 if ($type eq 'CODE') {
    100          
90 17         51 $workers->[$i] = Broker::Async::Worker->new({code => $worker});
91             } elsif ($type eq 'HASH') {
92 3         11 $workers->[$i] = Broker::Async::Worker->new($worker);
93             }
94             }
95             }
96              
97             =head2 do
98              
99             my $future = $broker->do(@args);
100              
101             Queue the invocation of a worker with @args.
102             @args can be any data structure, and is passed as is to a worker code ref.
103             Returns a L object that resolves when the work is done.
104              
105             There is no guarantee when a worker will be called, that depends on when a worker becomes available.
106             However, calls are guaranteed to be invoked in the order they are seen by $broker->do.
107              
108             =cut
109              
110              
111             sub do {
112 130     130 1 29142 my ($self, @args) = @_;
113              
114             # enforces consistent order of task execution
115             # makes sure current task is only started if nothing else is queued
116 130         3785 $self->process_queue;
117              
118 130         4019 my $future;
119 130 100       3755 if (my @active_futures = map $_->active, $self->active) {
    50          
120             # generate future from an existing future
121             # see Future::_new_convergent
122 119         12718 my $_future = $active_futures[0];
123 119   33     3250 ref($_) eq "Future" or $_future = $_, last for @active_futures;
124              
125 119         2621 $future = $_future->new;
126 119         10177 push @{ $self->queue }, {args => \@args, future => $future};
  119         26225  
127             } elsif (my ($available_worker) = $self->available) {
128             # should only be here if there's nothing active and nothing queued
129             # so start the task and return it's future
130 11         6136 $future = $self->do_worker($available_worker, @args);
131             }
132              
133             # start any recently queued tasks, if there are available workers
134 130         75243 $self->process_queue;
135 130         96115 return $future;
136             }
137              
138             sub do_worker {
139 130     130 0 3826 weaken(my $self = shift);
140 130         3789 my ($worker, @args) = @_;
141              
142             return $worker->do(@args)->on_ready(sub{
143             # queue next task
144 126     126   56154 $self->process_queue;
145 130         3889 });
146             }
147              
148             sub process_queue {
149 386     386 0 11441 weaken(my $self = shift);
150 386         113856 my $queue = $self->queue;
151              
152 386         46547 while (@$queue) {
153 397 100       7901 my ($worker) = $self->available or last;
154 119         12934 my $task = shift @$queue;
155              
156 119         5050 $self->do_worker($worker, @{$task->{args}})
157 119         2517 ->on_ready($task->{future});
158             }
159             }
160              
161             =head1 AUTHOR
162              
163             Mark Flickinger Emaf@cpan.orgE
164              
165             =head1 LICENSE
166              
167             This software is licensed under the same terms as Perl itself.
168              
169             =cut
170              
171             1;