File Coverage

blib/lib/Broker/Async/Worker.pm
Criterion Covered Total %
statement 34 35 97.1
branch 4 6 66.6
condition 1 3 33.3
subroutine 9 9 100.0
pod 1 3 33.3
total 49 56 87.5


line stmt bran cond sub pod time code
1             package Broker::Async::Worker;
2 7     7   449 use strict;
  7         8  
  7         222  
3 7     7   25 use warnings;
  7         10  
  7         152  
4 7     7   23 use Carp;
  7         6  
  7         580  
5 7     7   31 use Scalar::Util qw( blessed weaken );
  7         16  
  7         1161  
6              
7             =head1 NAME
8              
9             Broker::Async::Worker
10              
11             =head1 DESCRIPTION
12              
13             Used by L for tracking the state of asynchronous work.
14              
15             =cut
16              
17             our $VERSION = "0.0.6"; # __VERSION__
18              
19             =head1 ATTRIBUTES
20              
21             =head2 code
22              
23             The code reference used to start the work.
24             This will be invoked with the arguments passed to C.
25              
26             Must return a L subclass.
27              
28             =head2 concurrency
29              
30             The number of concurrent tasks a worker can execute.
31             Do'ing more tasks than this limit is a fatal error.
32              
33             Defaults to 1.
34              
35             =cut
36              
37             use Class::Tiny qw( code ), {
38 17         614 concurrency => sub { 1 },
39 19         231 futures => sub { +{} },
40 19         390 available => sub { shift->concurrency },
41 7     7   3909 };
  7         20728  
  7         84  
42              
43             =head1 METHODS
44              
45             =head2 new
46              
47             my $worker = Broker::Async::Worker->new(
48             code => sub { ... },
49             concurrency => $max,
50             );
51              
52             =head2 available
53              
54             Indicates whether the worker is available to C tasks.
55             It is a fatal error to invoke C when this is false.
56              
57             =head2 do
58              
59             my $future = $worker->do($task);
60              
61             Invokes the code attribute with the given arguments.
62             Returns a future that will be resolved when the work is done.
63              
64             =cut
65              
66             sub active {
67 2015     2015 0 36896 my ($self) = @_;
68 2015         11625 return values %{ $self->futures };
  2015         137597  
69             }
70              
71             sub BUILD {
72 26     26 0 7557 my ($self) = @_;
73 26         40 for my $name (qw( code )) {
74 26 50       532 croak "$name attribute required" unless defined $self->$name;
75             }
76             }
77              
78             sub do {
79 136     136 1 5486 weaken(my $self = shift);
80 136         4474 my (@args) = @_;
81 136 100       45805 if (not( $self->available )) {
82 1         165 croak "worker $self is not available for work";
83             }
84              
85 135         54633 my $f = $self->code->(@args);
86 135 50 33     31133 if (not( blessed($f) and $f->isa('Future') )) {
87 0         0 croak "code for worker $self did not return a Future: returned $f";
88             }
89 135         45986 $self->available( $self->available - 1 );
90              
91             return $self->futures->{$f} = $f->on_ready(sub{
92 130     130   388149 delete $self->futures->{$f};
93 130         55058 $self->available( $self->available + 1 );
94 135         23044 });
95             }
96              
97             =head1 AUTHOR
98              
99             Mark Flickinger Emaf@cpan.orgE
100              
101             =head1 LICENSE
102              
103             This software is licensed under the same terms as Perl itself.
104              
105             =cut
106              
107              
108             1;