File Coverage

blib/lib/MojoX/IOLoop/Throttle.pm
Criterion Covered Total %
statement 71 76 93.4
branch 28 40 70.0
condition 15 23 65.2
subroutine 11 12 91.6
pod 5 5 100.0
total 130 156 83.3


line stmt bran cond sub pod time code
1             package MojoX::IOLoop::Throttle;
2 2     2   52356 use Mojo::Base 'Mojo::EventEmitter';
  2         23454  
  2         14  
3              
4             our $VERSION = '0.01_26';
5             $VERSION = eval $VERSION;
6              
7              
8 2     2   7327 use Mojo::IOLoop;
  2         452678  
  2         15  
9 2     2   90 use Carp qw/croak carp/;
  2         10  
  2         150  
10 2     2   13 use Scalar::Util 'weaken';
  2         4  
  2         2768  
11              
12             my $DEBUG = $ENV{MOJO_THROTTLE_DEBUG};
13              
14             has iowatcher => sub { Mojo::IOLoop->singleton->reactor };
15              
16             has [qw/is_running /];
17              
18             has [qw/ period limit_period limit_run limit/] => 0;
19              
20             has delay => '0.0001';
21              
22             has autostop => 1;
23              
24              
25             sub run {
26 10     10 1 211109 my ($self, %args) = @_;
27              
28 10 50       40 my $cb = delete $args{cb} or croak 'Usage $thr->run(cb => sub {})';
29              
30             # croak unless $cb;
31             # Инициализируем стандартными значениями
32 10         334 $self->period;
33 10         474 $self->limit_period;
34 10         353 $self->limit_run;
35 10         410 $self->limit;
36              
37              
38             # Check if we are already running
39 10 100       165 if ($self->{is_running}++) {
40 1         434 carp "I am already running. Just return";
41 1         170 return;
42             }
43 9 50       25 warn "Starting new $self\n" if $DEBUG;
44              
45              
46             # defaults
47 9         440 my $iowatcher = $self->iowatcher;
48 9   50     461 $self->{count_period} ||= 0;
49 9   50     45 $self->{count_run} ||= 0;
50 9   50     33 $self->{count_total} ||= 0;
51              
52 9         28 weaken $self;
53              
54 9 100 66     375 if ($self->period && !$self->{period_timer_id}) {
55             $self->{period_timer_id} =
56             $iowatcher->recurring($self->period =>
57 1     2   214 sub { $self->{count_period} = 0; $self->emit('period'); });
  2         75  
  2         8  
58             }
59              
60             $self->{cb_timer_id} = $iowatcher->recurring(
61             $self->delay => sub {
62              
63              
64 10603 100 100 10603   265630 unless (!$self->{limit} || $self->{count_total} < $self->{limit}) {
65 1729 50       3239 warn
66             "The limit $self->{limit} is exhausted and autodrop not setted. Emitting drain\n"
67             if $DEBUG;
68 1729         4134 $self->emit('drain');
69              
70              
71 1729         12073 return;
72             }
73              
74             # Проверяем душить или нет. Нужно чтобы был задан хотя бы один из параметров limit or limit_run, без этого будет drain
75 8874   66     48375 my $cond = (
76              
77             # Не вышли за пределы лимита или лимит не задан, но задан хотя бы limit_run
78             ( $self->{count_period} < $self->{limit_period}
79             || (!$self->{limit_period})
80             )
81              
82             # Не вышли за пределы работающих параллельно задач или если такого лимита нет, то задан хотя бы limit
83             && (($self->{count_run} < $self->{limit_run})
84             || (!$self->{limit_run}))
85             );
86              
87 8874 100       22190 if ($cond) {
    100          
88              
89             #$self->{count_period}++;
90             #$self->{count_run}++;
91             #$self->{count_total}++;
92              
93 1752         3880 $cb->($self, %args);
94              
95 1752         6151 return;
96              
97             }
98              
99             # Если вышли за пределы лимита в периоде и нет запущенных коллбэков
100             elsif (!$self->{count_run}) {
101 1730         4290 $self->emit('drain');
102 1730         19236 return;
103             }
104              
105             # Если здесь, значит у нас лимиты
106 5392         9766 return;
107             }
108 9         514 );
109 9         500 return $self;
110              
111             }
112              
113             # Сообщить что мы начали
114             sub begin {
115 20     20 1 77 my $self = shift;
116 20         31 $self->{count_period}++;
117 20         28 $self->{count_run}++;
118 20         24 $self->{count_total}++;
119 20         35 return;
120             }
121              
122             # Сообщить что мы закончили
123             sub end {
124 12     12 1 357 my ($self) = @_;
125 12         18 $self->{count_run}--;
126              
127             # Вызвали end не оттуда и произошел рассинхрон, что критично
128 12 0       31 if ($DEBUG) { warn "Not running but ended" unless $self->is_running }
  0 50       0  
129 12 100       560 return unless $self->is_running;
130              
131             # Если исчерпан лимит
132 9 100 100     86 if ( (!$self->{count_run})
      66        
133             and $self->{limit}
134             and $self->{count_total} >= $self->{limit})
135             {
136 1 50       5 warn "finish event\n" if $DEBUG;
137 1         5 $self->emit('finish');
138              
139             }
140 9         31 return;
141             }
142              
143              
144             sub DESTROY {
145 10     10   6326 my ($self) = @_;
146 10 50       22 warn "Destroing $self\n" if $DEBUG;
147              
148 10         25 $self->drop();
149 10 50       75 $self->SUPER::DESTROY() if $self->can("SUPER::DESTROY");
150 10         166 return;
151             }
152              
153             # Сразу останавливает и все обнуляет
154             sub drop {
155 13     13 1 80 my ($self) = @_;
156              
157 13 50       33 warn "Dropping $self ...\n" if $DEBUG;
158              
159             # Clear my timers
160 13 50       456 if (my $iowatcher = $self->iowatcher) {
161 13 50       202 warn "Stopping(Dropping timers)\n" if $DEBUG;
162 13 100       57 $iowatcher->remove($self->{cb_timer_id}) if $self->{cb_timer_id};
163 13 100       239 $iowatcher->remove($self->{period_timer_id}) if $self->{period_timer_id};
164             }
165              
166              
167 13         93 foreach (keys %$self) {
168 111         173 delete $self->{$_};
169             }
170              
171 13         30 return $self;
172             }
173              
174              
175             # Увеличивает общий лимит на 1 или $count раз, если $count указан в качестве второго аргумента
176             # запускает (если еще не запущен, пытается в общем) сделать $self->run
177             sub add_limit {
178 0     0 1   my ($self, $count) = @_;
179 0   0       $count ||= 1;
180 0           $self->{limit} += $count;
181 0           return $self;
182             }
183              
184              
185             1;
186              
187             =head1 NAME
188              
189             MojoX::IOLoop::Throttle - throttle Mojo events
190              
191             =head1 VERSION
192              
193             Version 0.01_25. (DEV)
194              
195             =cut
196              
197              
198             =head1 SYNOPSIS
199            
200             #!/usr/bin/env perl
201             use Mojo::Base -strict;
202             use MojoX::IOLoop::Throttle;
203             $| = 1;
204              
205             # New throttle object
206             my $throttle = MojoX::IOLoop::Throttle->new(
207              
208             # Allow not more than [limit_run] running (parallel,incomplete) jobs
209             limit_run => 3,
210              
211             # do not start more than [limit_period] jobs per [period] seconds
212             period => 2,
213             limit_period => 4,
214              
215             # Simulate a little latency
216             delay => 0.05
217             );
218              
219             my $count;
220              
221             # Subscribe to finish event
222             $throttle->on(finish =>
223             sub { say "I've processed $count jobs! Bye-bye"; Mojo::IOLoop->stop; });
224              
225             # Throttle 20 jobs!
226             $throttle->limit(20);
227              
228              
229             # CallBack to throttle
230             $throttle->run(
231             cb => sub {
232             my ($thr, %args) = @_;
233              
234             # get an option passed to us
235             my $test = delete $args{test};
236              
237             # We a beginning one job
238             $thr->begin;
239              
240             my $rand_time = rand() / 5;
241             say "Job $rand_time started: $test";
242              
243             $thr->iowatcher->timer(
244             $rand_time => sub {
245             say "job $rand_time ended";
246             $count++;
247              
248             # Say that we end (to decrease limit_run count and let other job to start)
249             $thr->end();
250             }
251             );
252             },
253              
254             # Also we can pass arguments to code
255             test => 'hello'
256             );
257              
258             # Let's start
259             Mojo::IOLoop->start();
260              
261              
262              
263              
264              
265              
266              
267             =head1 DESCRIPTION
268              
269             AHTUNG!!!
270              
271             This is a very first development release. Be patient. Documentation is in progress.
272             You can find some working real-life examples in 'example' dir.
273            
274             If your are going to use this module now, use subclassing, because all method and options are experimental
275              
276              
277             =head1 OBJECT METHODS
278              
279             =head2 C
280              
281             Say that the job was started
282              
283             =head2 C
284              
285             Say that the job was ended
286              
287              
288             =head2 C
289              
290             Drop timers, counters and events
291              
292             =head2 C
293            
294             $thr->add_limit($n);
295            
296             Increase a limit attr. If agrument is omitter, increase on 1
297              
298             =head2 C
299              
300             $thr->run(cb => sub {...}, @other_params);
301            
302             Starts doing job
303              
304             =head1 ATTRIBUTES
305              
306             =head2 C
307              
308             total limit of shots
309              
310             =head2 C
311              
312             max. number of jobs running in parallell
313              
314             =head2 C
315              
316             limit number of shots per some period
317              
318             =head2 C
319              
320             time for limit_period
321              
322             =head2 C
323              
324             simulate a lattency (timer resolution)
325              
326              
327             =head1 AUTHOR
328              
329             Alex, C<< >>
330              
331             =head1 BUGS
332              
333             Please report any bugs or feature requests to C, or through
334             the web interface at L. I will be notified, and then you'll
335             automatically be notified of progress on your bug as I make changes.
336              
337              
338              
339              
340             =head1 SUPPORT
341              
342             You can find documentation for this module with the perldoc command.
343              
344             perldoc MojoX::IOLoop::Throttle
345              
346              
347             You can also look for information at:
348              
349             =over 4
350              
351             =item * RT: CPAN's request tracker (report bugs here)
352              
353             L
354              
355             =item * AnnoCPAN: Annotated CPAN documentation
356              
357             L
358              
359             =item * CPAN Ratings
360              
361             L
362              
363             =item * Search CPAN
364              
365             L
366              
367             =back
368              
369              
370             =head1 ACKNOWLEDGEMENTS
371              
372              
373             =head1 LICENSE AND COPYRIGHT
374              
375             Copyright 2012 Alex.
376              
377             This program is free software; you can redistribute it and/or modify it
378             under the terms of either: the GNU General Public License as published
379             by the Free Software Foundation; or the Artistic License.
380              
381             See http://dev.perl.org/licenses/ for more information.
382              
383              
384             =cut
385              
386             1; # End of MojoX::IOLoop::Throttle