File Coverage

blib/lib/Mojo/AsyncList.pm
Criterion Covered Total %
statement 44 44 100.0
branch 15 18 83.3
condition 3 5 60.0
subroutine 10 10 100.0
pod 4 4 100.0
total 76 81 93.8


line stmt bran cond sub pod time code
1             package Mojo::AsyncList;
2 1     1   161824 use Mojo::Base 'Mojo::EventEmitter';
  1         9  
  1         5  
3              
4 1     1   1687 use Mojo::IOLoop;
  1         118236  
  1         6  
5 1     1   42 use Time::HiRes ();
  1         2  
  1         551  
6              
7             our $VERSION = '0.02';
8              
9             has concurrent => 0;
10             has ioloop => sub { Mojo::IOLoop->singleton };
11             has offset => 1;
12              
13             sub new {
14 2     2 1 105 my $class = shift;
15 2 50       10 my $item_cb = ref $_[0] eq 'CODE' ? shift : undef;
16 2 100       7 my $finish_cb = ref $_[0] eq 'CODE' ? shift : undef;
17 2         14 my $self = $class->SUPER::new(@_);
18              
19 2 50       28 $self->on(item => $item_cb) if $item_cb;
20 2 100       25 $self->on(finish => $finish_cb) if $finish_cb;
21              
22 2         11 return $self;
23             }
24              
25             sub process {
26 2     2 1 25 my ($self, $items) = @_;
27 2         4 my $remaining = int @$items;
28 2         7 my ($gather_cb, $item_pos, $pos, @res) = (undef, 0, 0);
29              
30             my $stats = $self->{stats}
31 2         15 = {done => 0, remaining => int(@$items), t0 => [Time::HiRes::gettimeofday]};
32              
33             $gather_cb = sub {
34 4     4   84 my $res_pos = $pos++;
35              
36             return sub {
37 4         634561 shift for 1 .. $self->offset;
38 4         92 $stats->{done}++;
39 4         15 $stats->{remaining}--;
40 4         22 $res[$res_pos] = [@_];
41 4         35 $self->emit(result => @_);
42 4 100       112 return $self->emit(finish => @res) unless $stats->{remaining};
43 2 100       19 return $self->emit(item => $items->[$item_pos++], $gather_cb->())
44             if $item_pos < @$items;
45 4         50 };
46 2         9 };
47              
48             $self->ioloop->next_tick(sub {
49 2     2   195 my $n = $self->concurrent;
50 2 100 66     22 $n = @$items if !$n or $n > @$items;
51 2         10 $self->emit(item => $items->[$item_pos++], $gather_cb->()) for 1 .. $n;
52 2         9 });
53              
54 2         213 return $self;
55             }
56              
57             sub stats {
58 4     4 1 729 my ($self, $key) = @_;
59 4 100 50     33 return $key ? $self->{stats}{$key} // 0 : $self->{stats};
60             }
61              
62             sub wait {
63 2     2 1 5 my $self = shift;
64 2 50       11 return if (my $loop = $self->ioloop)->is_running;
65 2         39 my $done;
66 2     2   11 $self->on(finish => sub { $done++; $loop->stop });
  2         36  
  2         10  
67 2         17 $loop->start until $done;
68             }
69              
70             1;
71              
72             =head1 NAME
73              
74             Mojo::AsyncList - Process a list with callbacks
75              
76             =head1 SYNOPSIS
77              
78             use Mojo::AsyncList;
79             use Mojo::mysql;
80              
81             my $mysql = Mojo::mysql->new;
82             my $db = $mysql->db;
83              
84             my $async_list = Mojo::AsyncList->new(
85             sub { # Specify a "item" event handler
86             my ($async_list, $username, $gather_cb) = @_;
87             $db->select("users", {username => $username}, $gather_cb);
88             },
89             sub { # Specify a "finish" event handler
90             my $async_list = shift;
91             warn $_->[0]{user_id} for @_; # @_ = ([$db_res_supergirl], [$db_res_superman], ...)
92             },
93             );
94              
95             my @users = qw(supergirl superman batman);
96             $async_list->concurrent(2);
97             $async_list->process(\@users);
98             $async_list->wait;
99              
100             =head1 DESCRIPTION
101              
102             L is a module that can asynchronously process a list of items
103             with callback.
104              
105             =head1 EVENTS
106              
107             =head2 finish
108              
109             $async_list->on(finish => sub { my ($async_list, @all_res) = @_; });
110              
111             Emitted when L is done with all the C<$items>. C<@all_res> is a list
112             of array-refs, where each item is C<@res> passed on to L.
113              
114             =head2 item
115              
116             $async_list->on(item => sub { my ($async_list, $item, $gather_cb) = @_; });
117              
118             Used to process the next C<$item> in C<$items> passed on to L.
119              
120             =head2 result
121              
122             $async_list->on(result => sub { my ($async_list, @res) = @_; });
123              
124             Emitted when a new result is ready, C<@res> contains the data passed on to
125             C<$gather_cb>.
126              
127             =head1 ATTRIBUTES
128              
129             =head2 concurrent
130              
131             $int = $async_list->concurrent;
132             $async_list = $async_list->concurrent(0);
133              
134             Used to set the number of concurrent items to process. Default value is zero,
135             which means "process all items" at once.
136              
137             Used to see how many items that is processing right now.
138              
139             =head2 offset
140              
141             $int = $async_list->offset;
142             $async_list = $async_list->offset(1);
143              
144             Will remove the number of arguments passed on to <$gather_cb>, used in the
145             L event. Default to "1", meaning it will remove the invocant.
146              
147             =head1 METHODS
148              
149             =head2 new
150              
151             $async_list = Mojo::AsyncList->new;
152             $async_list = Mojo::AsyncList->new(@attrs);
153             $async_list = Mojo::AsyncList->new(\%attrs);
154             $async_list = Mojo::AsyncList->new($item_cb, $finish_cb);
155             $async_list = Mojo::AsyncList->new($item_cb, $finish_cb, \%attrs);
156              
157             Used to create a new L object. L and L event
158             callbacks can be provided when constructing the object.
159              
160             =head2 process
161              
162             $async_list = $async_list->process(@items);
163             $async_list = $async_list->process([@items]);
164              
165             Process C<$items> and emit L while doing so.
166              
167             =head2 stats
168              
169             $int = $async_list->stats("done");
170             $int = $async_list->stats("remaining");
171             $gettimeofday = $async_list->stats("t0");
172             $hash_ref = $async_list->stats;
173              
174             Used to extract stats while items are processing. This can be useful inside the
175             L, or within a recurring timer:
176              
177             Mojo::IOLoop->recurring(1 => sub {
178             warn sprintf "[%s] done: %s\n", time, $async_list->stats("done");
179             });
180              
181             Changing the C<$hash_ref> will have fatal consequences.
182              
183             =head2 wait
184              
185             $async_list->concurrent(2)->process(\@items)->wait;
186             $async_list->wait;
187              
188             Used to block and wait until L is done with the C<$items>
189             passed on to L.
190              
191             =head1 AUTHOR
192              
193             Jan Henning Thorsen
194              
195             =cut