File Coverage

blib/lib/Mojo/AsyncList.pm
Criterion Covered Total %
statement 42 42 100.0
branch 11 16 68.7
condition 2 5 40.0
subroutine 10 10 100.0
pod 4 4 100.0
total 69 77 89.6


line stmt bran cond sub pod time code
1             package Mojo::AsyncList;
2 1     1   196787 use Mojo::Base 'Mojo::EventEmitter';
  1         12  
  1         6  
3              
4 1     1   2048 use Mojo::IOLoop;
  1         146125  
  1         9  
5 1     1   59 use Time::HiRes ();
  1         2  
  1         633  
6              
7             our $VERSION = '0.01';
8              
9             has concurrent => 0;
10             has ioloop => sub { Mojo::IOLoop->singleton };
11             has offset => 1;
12              
13             sub new {
14 1     1 1 133 my $class = shift;
15 1 50       6 my $item_cb = ref $_[0] eq 'CODE' ? shift : undef;
16 1 50       6 my $finish_cb = ref $_[0] eq 'CODE' ? shift : undef;
17 1         14 my $self = $class->SUPER::new(@_);
18              
19 1 50       20 $self->on(item => $item_cb) if $item_cb;
20 1 50       21 $self->on(finish => $finish_cb) if $finish_cb;
21              
22 1         7 return $self;
23             }
24              
25             sub process {
26 1     1 1 23 my ($self, $items) = @_;
27 1         4 my $remaining = int @$items;
28 1         5 my ($gather_cb, $item_pos, $pos, @res) = (undef, 0, 0);
29              
30             my $stats = $self->{stats}
31 1         9 = {done => 0, remaining => int(@$items), t0 => [Time::HiRes::gettimeofday]};
32              
33             $gather_cb = sub {
34 3     3   114 my $res_pos = $pos++;
35              
36             return sub {
37 3         164284 shift for 1 .. $self->offset;
38 3         44 $stats->{done}++;
39 3         8 $stats->{remaining}--;
40 3         18 $res[$res_pos] = [@_];
41 3         29 $self->emit(result => @_);
42 3 100       79 return $self->emit(finish => @res) unless $stats->{remaining};
43 2 100       17 return $self->emit(item => $items->[$item_pos++], $gather_cb->())
44             if $item_pos < @$items;
45 3         42 };
46 1         6 };
47              
48             $self->ioloop->next_tick(sub {
49             $self->emit(item => $items->[$item_pos++], $gather_cb->())
50 1   33 1   120 for 1 .. ($self->concurrent || @$items);
51 1         8 });
52              
53 1         142 return $self;
54             }
55              
56             sub stats {
57 4     4 1 780 my ($self, $key) = @_;
58 4 100 50     36 return $key ? $self->{stats}{$key} // 0 : $self->{stats};
59             }
60              
61             sub wait {
62 1     1 1 4 my $self = shift;
63 1 50       18 return if (my $loop = $self->ioloop)->is_running;
64 1         28 my $done;
65 1     1   7 $self->on(finish => sub { $done++; $loop->stop });
  1         26  
  1         8  
66 1         11 $loop->start until $done;
67             }
68              
69             1;
70              
71             =head1 NAME
72              
73             Mojo::AsyncList - Process a list with callbacks
74              
75             =head1 SYNOPSIS
76              
77             use Mojo::AsyncList;
78             use Mojo::mysql;
79              
80             my $mysql = Mojo::mysql->new;
81             my $db = $mysql->db;
82              
83             my $async_list = Mojo::AsyncList->new(
84             sub { # Specify a "item" event handler
85             my ($async_list, $username, $gather_cb) = @_;
86             $db->select("users", {username => $username}, $gather_cb);
87             },
88             sub { # Specify a "finish" event handler
89             my $async_list = shift;
90             warn $_->[0]{user_id} for @_; # @_ = ([$db_res_supergirl], [$db_res_superman], ...)
91             },
92             );
93              
94             my @users = qw(supergirl superman batman);
95             $async_list->concurrent(2);
96             $async_list->process(\@users);
97             $async_list->wait;
98              
99             =head1 DESCRIPTION
100              
101             L is a module that can asynchronously process a list of items
102             with callback.
103              
104             =head1 EVENTS
105              
106             =head2 finish
107              
108             $async_list->on(finish => sub { my ($async_list, @all_res) = @_; });
109              
110             Emitted when L is done with all the C<$items>. C<@all_res> is a list
111             of array-refs, where each item is C<@res> passed on to L.
112              
113             =head2 item
114              
115             $async_list->on(item => sub { my ($async_list, $item, $gather_cb) = @_; });
116              
117             Used to process the next C<$item> in C<$items> passed on to L.
118              
119             =head2 result
120              
121             $async_list->on(result => sub { my ($async_list, @res) = @_; });
122              
123             Emitted when a new result is ready, C<@res> contains the data passed on to
124             C<$gather_cb>.
125              
126             =head1 ATTRIBUTES
127              
128             =head2 concurrent
129              
130             $int = $async_list->concurrent;
131             $async_list = $async_list->concurrent(0);
132              
133             Used to set the number of concurrent items to process. Default value is zero,
134             which means "process all items" at once.
135              
136             Used to see how many items that is processing right now.
137              
138             =head2 offset
139              
140             $int = $async_list->offset;
141             $async_list = $async_list->offset(1);
142              
143             Will remove the number of arguments passed on to <$gather_cb>, used in the
144             L event. Default to "1", meaning it will remove the invocant.
145              
146             =head1 METHODS
147              
148             =head2 new
149              
150             $async_list = Mojo::AsyncList->new;
151             $async_list = Mojo::AsyncList->new(@attrs);
152             $async_list = Mojo::AsyncList->new(\%attrs);
153             $async_list = Mojo::AsyncList->new($item_cb, $finish_cb);
154             $async_list = Mojo::AsyncList->new($item_cb, $finish_cb, \%attrs);
155              
156             Used to create a new L object. L and L event
157             callbacks can be provided when constructing the object.
158              
159             =head2 process
160              
161             $async_list = $async_list->process(@items);
162             $async_list = $async_list->process([@items]);
163              
164             Process C<$items> and emit L while doing so.
165              
166             =head2 stats
167              
168             $int = $async_list->stats("done");
169             $int = $async_list->stats("remaining");
170             $gettimeofday = $async_list->stats("t0");
171             $hash_ref = $async_list->stats;
172              
173             Used to extract stats while items are processing. This can be useful inside the
174             L, or within a recurring timer:
175              
176             Mojo::IOLoop->recurring(1 => sub {
177             warn sprintf "[%s] done: %s\n", time, $async_list->stats("done");
178             });
179              
180             Changing the C<$hash_ref> will have fatal consequences.
181              
182             =head2 wait
183              
184             $async_list->concurrent(2)->process(\@items)->wait;
185             $async_list->wait;
186              
187             Used to block and wait until L is done with the C<$items>
188             passed on to L.
189              
190             =head1 AUTHOR
191              
192             Jan Henning Thorsen
193              
194             =cut