File Coverage

blib/lib/Sub/Throttler.pm
Criterion Covered Total %
statement 293 308 95.1
branch 85 96 88.5
condition 19 33 57.5
subroutine 44 45 97.7
pod 10 10 100.0
total 451 492 91.6


line stmt bran cond sub pod time code
1             package Sub::Throttler;
2 13     13   193109 use 5.010001;
  13         36  
  13         436  
3 13     13   54 use warnings;
  13         19  
  13         317  
4 13     13   89 use strict;
  13         38  
  13         523  
5 13     13   3071 use utf8;
  13         54  
  13         63  
6 13     13   388 use Carp;
  13         21  
  13         1177  
7              
8             our $VERSION = 'v0.2.3';
9              
10 13     13   10176 use Perl6::Export::Attrs;
  13         114203  
  13         73  
11 13     13   713 use Scalar::Util qw( weaken refaddr blessed );
  13         22  
  13         1088  
12              
13              
14 13     13   66 use constant CALLER_SUBROUTINE => 3;
  13         18  
  13         4391  
15              
16             my (@Throttles, @Tasks, @AsapTasks, %Running);
17             my $IN_flush = 0;
18             my $IN_flush_recursion = 0;
19             my $IN_flush_ignore_recursion = 0;
20             my $IN_flush_pending = 0;
21              
22              
23             sub done_cb :Export {
24 41     41 1 97 my ($done, $cb_or_obj_or_class, @p) = @_;
25 41 100 33     162 if (ref $cb_or_obj_or_class eq 'CODE') {
    100          
    50          
26 21         18 my $cb = $cb_or_obj_or_class;
27 21     21   165 return sub { $done->(); $cb->(@p, @_) };
  21         33399  
  21         51  
28             }
29             elsif (blessed($cb_or_obj_or_class)) {
30 14         17 my $obj = $cb_or_obj_or_class;
31 14         29 weaken($obj);
32 14         16 my $method = shift @p;
33 14 50 66     68 croak 'second param must be $method'
      33        
34             if !$method || (ref $method && ref $method ne 'CODE');
35 14 100   14   119 return sub { $done->(); $obj && $obj->$method(@p, @_) };
  14         57289  
  14         88  
36             }
37             elsif (defined $cb_or_obj_or_class && !ref $cb_or_obj_or_class) {
38 6         8 my $class = $cb_or_obj_or_class;
39 6         7 my $method = shift @p;
40 6 50 33     21 croak 'second param must be $method'
      33        
41             if !$method || (ref $method && ref $method ne 'CODE');
42 6     6   55 return sub { $done->(); $class->$method(@p, @_) };
  6         20487  
  6         26  
43             }
44             else {
45 0         0 croak 'first param must be $cb or $obj or $class';
46             }
47 13     13   68 }
  13         20  
  13         71  
48              
49             sub throttle_add :Export {
50 105     105 1 2630 my ($throttle, $target) = @_;
51 105 100       267 croak 'require 2 params' if 2 != @_;
52 102 100       214 croak 'throttle must be an object' if !ref $throttle;
53 99 100       308 croak 'target must be CODE' if ref $target ne 'CODE';
54 84         144 push @Throttles, [$throttle, $target];
55 84         166 return $throttle;
56 13     13   3569 }
  13         24  
  13         50  
57              
58             sub throttle_del :Export {
59 59     59 1 10791 my ($throttle) = @_;
60 59 100       104 @Throttles = grep { $throttle && $_->[0] != $throttle } @Throttles;
  32         254  
61 59         141 throttle_flush();
62 59         90 return;
63 13     13   2916 }
  13         18  
  13         52  
64              
65             sub throttle_flush :Export {
66 360 100   360 1 2110 if ($IN_flush) {
67 75 100       145 if (!$IN_flush_ignore_recursion) {
68 46         50 $IN_flush_recursion = 1;
69             }
70 75         110 return;
71             }
72 285         242 $IN_flush = 1;
73 285         264 $IN_flush_recursion = 0;
74 285         213 $IN_flush_ignore_recursion = 0;
75 285         200 $IN_flush_pending = 0;
76              
77 285         460 for my $tasks (\@AsapTasks, \@Tasks) {
78 570         472 my @tasks = @{$tasks};
  570         692  
79 570         482 @{$tasks} = ();
  570         544  
80 570         398 my @delayed;
81             TASK:
82 570         730 for my $task (@tasks) {
83 197         142 my ($done, $name, $this, $code, @params) = @{$task};
  197         315  
84 197         320 my $id = refaddr $done;
85 197 100       324 if (!defined $this) {
86 2         4 $done->(); # release $done
87 2         3 next;
88             }
89 195         157 my %acquired;
90 195         197 for (@Throttles) {
91 205         147 my ($throttle, $target) = @{$_};
  205         274  
92 205         369 my $resources = $target->($this, $name, @params);
93 205 100       704 next if !defined $resources;
94 196 100       385 die "Sub::Throttler: target returns not a HASHREF: $resources\n"
95             if ref $resources ne 'HASH';
96 191 100       121 next if !keys %{$resources};
  191         365  
97 190         168 my $acquired = 0;
98 190         130 while (my ($key, $quantity) = each %{$resources}) {
  287         641  
99 200 100       296 die "Sub::Throttler: target returns bad quantity for '$key': $quantity\n"
100             if ref $quantity;
101 196 100       444 if ($throttle->try_acquire($id, $key, $quantity)) {
102 97         200 $acquired++;
103             }
104             else {
105 92         91 last;
106             }
107             }
108 179 100       153 if ($acquired == keys %{$resources}) {
  179         268  
109 87         288 $acquired{$throttle} = $throttle;
110             }
111             else {
112 92         73 $IN_flush_ignore_recursion = 1;
113 92 100       124 if ($acquired) {
114 2         6 $throttle->release_unused($id);
115             }
116 92         145 for (values %acquired) {
117 13         31 $_->release_unused($id);
118             }
119 92         84 $IN_flush_ignore_recursion = 0;
120 92         85 push @delayed, $task;
121 92         256 next TASK;
122             }
123             }
124 87         183 $Running{$id} = [values %acquired];
125 87         149 _run_task($this, $code, $done, @params);
126             }
127 554         473 @{$tasks} = (@delayed, @{$tasks}); # while _run_task() new tasks may be added
  554         945  
  554         506  
128             }
129              
130 269         294 $IN_flush = 0;
131 269 100       505 goto &throttle_flush if $IN_flush_recursion;
132 236         312 return;
133 13     13   7790 }
  13         25  
  13         65  
134              
135             sub throttle_it :Export {
136 4     4 1 36 return _it(0, 0, @_);
137 13     13   2539 }
  13         21  
  13         45  
138              
139             sub throttle_it_asap :Export {
140 1     1 1 16 return _it(0, 1, @_);
141 13     13   2494 }
  13         21  
  13         54  
142              
143             sub throttle_it_sync :Export {
144 0     0 1 0 return _it(1, 0, @_);
145 13     13   6907 }
  13         15  
  13         48  
146              
147             sub throttle_me :Export {
148 175     175 1 9254 return _me(\@Tasks, \@_);
149 13     13   2426 }
  13         19  
  13         45  
150              
151             sub throttle_me_asap :Export {
152 19     19 1 47 return _me(\@AsapTasks, \@_);
153 13     13   2287 }
  13         25  
  13         41  
154              
155             sub throttle_me_sync :Export {
156 64     64 1 13064 my ($done, $failed);
157              
158 0         0 my ($this, @params);
159 64         196 my $func = (caller 1)[CALLER_SUBROUTINE];
160 64 50       1910 croak 'impossible to throttle anonymous function' if !defined &{$func};
  64         695  
161 64         443 my ($pkg, $name) = $func =~ /\A(.*)::(.*)\z/ms;
162 64         92 my $is_method = eval { local $SIG{__DIE__}; $_[0]->isa($pkg) };
  64         198  
  64         537  
163 64 100       146 if ($is_method) {
164 14         28 ($this, @params) = @_;
165 14         77 $done = Sub::Throttler::__done->new($this.q{->}.$name);
166             }
167             else {
168 50         123 ($this, @params) = (q{}, @_);
169 50         65 $name = $func;
170 50         197 $done = Sub::Throttler::__done->new($func);
171             }
172              
173 64         109 my @old = ($IN_flush, $IN_flush_ignore_recursion);
174 64         80 ($IN_flush, $IN_flush_ignore_recursion) = (1, 1);
175 64         114 my $id = refaddr $done;
176 75         78 ACQUIRE_ALL:
177             {
178 64         63 my %acquired;
179 75         139 for (@Throttles) {
180 86         104 my ($throttle, $target) = @{$_};
  86         126  
181 86         232 my $resources = $target->($this, $name, @params);
182 86 100       447 next if !defined $resources;
183 73 100       228 die "Sub::Throttler: target returns not a HASHREF: $resources\n"
184             if ref $resources ne 'HASH';
185 68 100       53 next if !keys %{$resources};
  68         181  
186 67         75 while (my ($key, $quantity) = each %{$resources}) {
  110         348  
187 73 100       194 die "Sub::Throttler: target returns bad quantity for '$key': $quantity\n"
188             if ref $quantity;
189 69 100       226 if ($throttle->try_acquire($id, $key, $quantity)) {
190 43         138 $acquired{$throttle} = $throttle;
191             }
192             else {
193 19         34 eval { ## no critic (RequireCheckingReturnValueOfEval)
194 19         57 local $SIG{__DIE__};
195 19         66 $throttle->acquire($id, $key, $quantity);
196 11         101 $acquired{$throttle} = $throttle;
197             };
198 19         5174 $failed = $@;
199 19         61 for (values %acquired) {
200 14         55 $_->release_unused($id);
201             }
202 19 100       46 if ($failed) {
203 8         23 last ACQUIRE_ALL;
204             } else {
205 11         64 redo ACQUIRE_ALL;
206             }
207             }
208             }
209             }
210 40         128 $Running{$id} = [values %acquired];
211             }
212 48         77 ($IN_flush, $IN_flush_ignore_recursion) = @old;
213             # while waiting for resources needed for this sync call some resources
214             # needed for queued async calls may be released, but in this case
215             # throttle_flush() wasn't called because it was blocked, so let's
216             # ensure it will be called no late than this sync call will $done->()
217 48         50 $IN_flush_pending = 1;
218              
219 48 100       84 if ($failed) {
220 8         67 croak $failed;
221             } else {
222 40         370 return $done;
223             }
224 13     13   12375 }
  13         27  
  13         60  
225              
226             sub _done { ## no critic (ProhibitUnusedPrivateSubroutines)
227 127     127   204 my ($id, $is_used) = @_;
228 127   100     451 $is_used ||= 1 == @_;
229 127   100     116 for my $throttle (@{ delete $Running{$id} // [] }) {
  127         445  
230 105 100       176 if ($is_used) {
231 90         348 $throttle->release($id);
232             } else {
233 15         35 $throttle->release_unused($id);
234             }
235             }
236 127 100       258 if ($IN_flush_pending) {
237 24         51 throttle_flush();
238             }
239 127         425 return;
240             }
241              
242             sub _it {
243 5     5   8 my ($is_sync, $is_asap, $func) = @_;
244 5 50 33     28 croak 'require function name' if !$func || ref $func;
245 5 100       19 if ($func !~ /::/ms) {
246 3         13 $func = caller(1) . q{::} . $func;
247             }
248 5 50       50 croak 'no such function: '.$func if !defined &{$func};
  5         20  
249 5         4 my $orig = \&{$func};
  5         13  
250             ## no critic (ProhibitNoWarnings ProhibitStringyEval RequireCheckingReturnValueOfEval ProhibitImplicitNewlines RequireCarping)
251 13     13   4621 no warnings 'redefine';
  13         20  
  13         6145  
252 5 100 66 46   718 eval 'sub '.$func.' {
  46 50 100 10   5830  
  32 100 33     65  
  15 50 100     20  
  15 50       40  
  15 0       388  
  0         0  
  0         0  
  0         0  
  0         0  
  14         33  
  10         25  
  10         208  
  10         34  
  10         82  
  10         17  
  5         8  
  5         10  
  5         118  
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
253             if (!'.$is_sync.' && @_ && ref $_[-1] eq "CODE") {
254             my $done = &throttle_me'.($is_asap ? '_asap' : q{}).' || return;
255             my $cb = pop;
256             $orig->(@_, done_cb($done, $cb));
257             return;
258             } elsif (wantarray) {
259             my $done = &throttle_me_sync;
260             my @res = &$orig;
261             $done->();
262             return @res;
263             } else {
264             my $done = &throttle_me_sync;
265             my $res = &$orig;
266             $done->();
267             return $res;
268             }
269             }; 1' or die $@;
270 5         11 return $orig;
271             }
272              
273             sub _me {
274 194     194   227 my ($queue, $args) = @_;
275 194         259 for (0, 1) {
276 332 100       681 if (ref $args->[$_] eq 'Sub::Throttler::__done') {
277 87         74 return splice @{$args}, $_, 1;
  87         624  
278             }
279             }
280 107         271 my $func = (caller 2)[CALLER_SUBROUTINE];
281 107 100       2216 croak 'impossible to throttle anonymous function' if !defined &{$func};
  107         341  
282 105         101 my $code = \&{$func};
  105         203  
283 105         580 my ($pkg, $name) = $func =~ /\A(.*)::(.*)\z/ms;
284 105         116 my $is_method = eval { local $SIG{__DIE__}; $args->[0]->isa($pkg) };
  105         270  
  105         647  
285 105 100       190 if ($is_method) {
286 34         30 my $self = shift @{$args};
  34         53  
287 34         146 my $done = Sub::Throttler::__done->new($self.q{->}.$name);
288 34         32 push @{$queue}, [$done, $name, $self, $code, @{$args}];
  34         38  
  34         60  
289 34 100       74 if (ref $self) {
290 28         93 weaken $queue->[-1][2];
291             }
292             }
293             else {
294 71         165 my $done = Sub::Throttler::__done->new($func);
295 71         59 push @{$queue}, [$done, $func, q{}, $code, @{$args}];
  71         96  
  71         147  
296             }
297 105         175 throttle_flush();
298 89         635 return;
299             }
300              
301             # should be used only from tests
302             sub _reset { ## no critic (ProhibitUnusedPrivateSubroutines)
303 45     45   9967 $IN_flush = 0;
304 45         56 $IN_flush_recursion = 0;
305 45         49 $IN_flush_ignore_recursion = 0;
306 45         502 @Throttles = @Tasks = @AsapTasks = %Running = ();
307 45         72 return;
308             }
309              
310             sub _run_task {
311 87     87   130 my ($this, $code, $done, @params) = @_;
312 13     13   68 no strict 'refs';
  13         16  
  13         909  
313 87 100       114 if ($this) {
314 31         275 $this->$code($done, @params);
315             } else {
316 56         415 $code->($done, @params);
317             }
318 87         361 return;
319             }
320              
321              
322             package Sub::Throttler::__done; ## no critic (ProhibitMultiplePackages)
323 13     13   61 use Carp;
  13         15  
  13         898  
324              
325 13     13   62 use Scalar::Util qw( refaddr );
  13         25  
  13         3088  
326              
327             my (%Check, %Name);
328              
329             sub new {
330 169     169   255 my (undef, $name) = @_;
331 169         124 my $id;
332             my $done = bless sub {
333 129 100   129   29719 if ($Check{$id}) {
334 2         37 croak "Sub::Throttler: $name: \$done->() already called";
335             }
336 127         205 $Check{$id}=1;
337 127         300 Sub::Throttler::_done($id, @_); ## no critic(ProtectPrivateSubs)
338 169         808 }, __PACKAGE__;
339 169         388 $id = refaddr $done;
340 169         352 $Name{$id} = $name;
341 169         262 return $done;
342             }
343              
344             sub DESTROY {
345 148     148   22662 my $done = shift;
346 148         311 my $id = refaddr $done;
347 148         258 my $name = delete $Name{$id};
348 148 100       440 if (!delete $Check{$id}) {
349 42         437 carp "Sub::Throttler: $name: \$done->() was not called";
350             }
351 148         31543 return;
352             }
353              
354              
355             1; # Magic true value required at end of module
356             __END__