File Coverage

blib/lib/Sub/Throttler.pm
Criterion Covered Total %
statement 293 308 95.1
branch 85 96 88.5
condition 19 33 57.5
subroutine 44 45 97.7
pod 10 10 100.0
total 451 492 91.6


line stmt bran cond sub pod time code
1             package Sub::Throttler;
2 13     13   128516 use 5.010001;
  13         31  
  13         365  
3 13     13   44 use warnings;
  13         13  
  13         271  
4 13     13   44 use strict;
  13         24  
  13         269  
5 13     13   1947 use utf8;
  13         37  
  13         47  
6 13     13   233 use Carp;
  13         14  
  13         844  
7              
8             our $VERSION = 'v0.2.2';
9              
10 13     13   5075 use Perl6::Export::Attrs;
  13         81940  
  13         59  
11 13     13   575 use Scalar::Util qw( weaken refaddr blessed );
  13         19  
  13         899  
12              
13              
14 13     13   55 use constant CALLER_SUBROUTINE => 3;
  13         13  
  13         3473  
15              
16             my (@Throttles, @Tasks, @AsapTasks, %Running);
17             my $IN_flush = 0;
18             my $IN_flush_recursion = 0;
19             my $IN_flush_ignore_recursion = 0;
20             my $IN_flush_pending = 0;
21              
22              
23             sub done_cb :Export {
24 41     41 1 87 my ($done, $cb_or_obj_or_class, @p) = @_;
25 41 100 33     141 if (ref $cb_or_obj_or_class eq 'CODE') {
    100          
    50          
26 21         25 my $cb = $cb_or_obj_or_class;
27 21     21   140 return sub { $done->(); $cb->(@p, @_) };
  21         35566  
  21         54  
28             }
29             elsif (blessed($cb_or_obj_or_class)) {
30 14         18 my $obj = $cb_or_obj_or_class;
31 14         27 weaken($obj);
32 14         21 my $method = shift @p;
33 14 50 66     64 croak 'second param must be $method'
      33        
34             if !$method || (ref $method && ref $method ne 'CODE');
35 14 100   14   108 return sub { $done->(); $obj && $obj->$method(@p, @_) };
  14         58277  
  14         88  
36             }
37             elsif (defined $cb_or_obj_or_class && !ref $cb_or_obj_or_class) {
38 6         5 my $class = $cb_or_obj_or_class;
39 6         8 my $method = shift @p;
40 6 50 33     19 croak 'second param must be $method'
      33        
41             if !$method || (ref $method && ref $method ne 'CODE');
42 6     6   45 return sub { $done->(); $class->$method(@p, @_) };
  6         20538  
  6         23  
43             }
44             else {
45 0         0 croak 'first param must be $cb or $obj or $class';
46             }
47 13     13   60 }
  13         15  
  13         63  
48              
49             sub throttle_add :Export {
50 105     105 1 2580 my ($throttle, $target) = @_;
51 105 100       230 croak 'require 2 params' if 2 != @_;
52 102 100       196 croak 'throttle must be an object' if !ref $throttle;
53 99 100       275 croak 'target must be CODE' if ref $target ne 'CODE';
54 84         128 push @Throttles, [$throttle, $target];
55 84         126 return $throttle;
56 13     13   2977 }
  13         21  
  13         42  
57              
58             sub throttle_del :Export {
59 59     59 1 9958 my ($throttle) = @_;
60 59 100       103 @Throttles = grep { $throttle && $_->[0] != $throttle } @Throttles;
  32         241  
61 59         100 throttle_flush();
62 59         75 return;
63 13     13   2615 }
  13         17  
  13         44  
64              
65             sub throttle_flush :Export {
66 361 100   361 1 1997 if ($IN_flush) {
67 76 100       150 if (!$IN_flush_ignore_recursion) {
68 46         43 $IN_flush_recursion = 1;
69             }
70 76         102 return;
71             }
72 285         231 $IN_flush = 1;
73 285         254 $IN_flush_recursion = 0;
74 285         199 $IN_flush_ignore_recursion = 0;
75 285         202 $IN_flush_pending = 0;
76              
77 285         443 for my $tasks (\@AsapTasks, \@Tasks) {
78 570         375 my @tasks = @{$tasks};
  570         690  
79 570         444 @{$tasks} = ();
  570         519  
80 570         373 my @delayed;
81             TASK:
82 570         530 for my $task (@tasks) {
83 197         153 my ($done, $name, $this, $code, @params) = @{$task};
  197         326  
84 197         303 my $id = refaddr $done;
85 197 100       304 if (!defined $this) {
86 2         3 $done->(); # release $done
87 2         2 next;
88             }
89 195         144 my %acquired;
90 195         211 for (@Throttles) {
91 205         145 my ($throttle, $target) = @{$_};
  205         217  
92 205         381 my $resources = $target->($this, $name, @params);
93 205 100       729 next if !defined $resources;
94 196 100       379 die "Sub::Throttler: target returns not a HASHREF: $resources\n"
95             if ref $resources ne 'HASH';
96 191 100       118 next if !keys %{$resources};
  191         383  
97 190         159 my $acquired = 0;
98 190         163 while (my ($key, $quantity) = each %{$resources}) {
  285         556  
99 198 100       277 die "Sub::Throttler: target returns bad quantity for '$key': $quantity\n"
100             if ref $quantity;
101 194 100       398 if ($throttle->try_acquire($id, $key, $quantity)) {
102 95         137 $acquired++;
103             }
104             else {
105 92         98 last;
106             }
107             }
108 179 100       215 if ($acquired == keys %{$resources}) {
  179         249  
109 87         295 $acquired{$throttle} = $throttle;
110             }
111             else {
112 92         70 $IN_flush_ignore_recursion = 1;
113 92 100       152 if ($acquired) {
114 2         5 $throttle->release_unused($id);
115             }
116 92         154 for (values %acquired) {
117 13         25 $_->release_unused($id);
118             }
119 92         75 $IN_flush_ignore_recursion = 0;
120 92         73 push @delayed, $task;
121 92         228 next TASK;
122             }
123             }
124 87         168 $Running{$id} = [values %acquired];
125 87         154 _run_task($this, $code, $done, @params);
126             }
127 554         473 @{$tasks} = (@delayed, @{$tasks}); # while _run_task() new tasks may be added
  554         993  
  554         480  
128             }
129              
130 269         269 $IN_flush = 0;
131 269 100       439 goto &throttle_flush if $IN_flush_recursion;
132 236         246 return;
133 13     13   5953 }
  13         20  
  13         42  
134              
135             sub throttle_it :Export {
136 4     4 1 34 return _it(0, 0, @_);
137 13     13   2196 }
  13         17  
  13         39  
138              
139             sub throttle_it_asap :Export {
140 1     1 1 16 return _it(0, 1, @_);
141 13     13   2176 }
  13         16  
  13         47  
142              
143             sub throttle_it_sync :Export {
144 0     0 1 0 return _it(1, 0, @_);
145 13     13   1995 }
  13         24  
  13         38  
146              
147             sub throttle_me :Export {
148 175     175 1 9429 return _me(\@Tasks, \@_);
149 13     13   2119 }
  13         15  
  13         37  
150              
151             sub throttle_me_asap :Export {
152 19     19 1 49 return _me(\@AsapTasks, \@_);
153 13     13   2095 }
  13         25  
  13         29  
154              
155             sub throttle_me_sync :Export {
156 64     64 1 8135 my ($done, $failed);
157              
158 0         0 my ($this, @params);
159 64         154 my $func = (caller 1)[CALLER_SUBROUTINE];
160 64 50       1567 croak 'impossible to throttle anonymous function' if !defined &{$func};
  64         209  
161 64         374 my ($pkg, $name) = $func =~ /\A(.*)::(.*)\z/ms;
162 64         78 my $is_method = eval { local $SIG{__DIE__}; $_[0]->isa($pkg) };
  64         159  
  64         430  
163 64 100       102 if ($is_method) {
164 14         24 ($this, @params) = @_;
165 14         62 $done = Sub::Throttler::__done->new($this.q{->}.$name);
166             }
167             else {
168 50         97 ($this, @params) = (q{}, @_);
169 50         52 $name = $func;
170 50         114 $done = Sub::Throttler::__done->new($func);
171             }
172              
173 64         75 my @old = ($IN_flush, $IN_flush_ignore_recursion);
174 64         70 ($IN_flush, $IN_flush_ignore_recursion) = (1, 1);
175 64         83 my $id = refaddr $done;
176 75         62 ACQUIRE_ALL:
177             {
178 64         51 my %acquired;
179 75         118 for (@Throttles) {
180 86         89 my ($throttle, $target) = @{$_};
  86         103  
181 86         196 my $resources = $target->($this, $name, @params);
182 86 100       367 next if !defined $resources;
183 73 100       163 die "Sub::Throttler: target returns not a HASHREF: $resources\n"
184             if ref $resources ne 'HASH';
185 68 100       49 next if !keys %{$resources};
  68         146  
186 67         53 while (my ($key, $quantity) = each %{$resources}) {
  113         303  
187 76 100       137 die "Sub::Throttler: target returns bad quantity for '$key': $quantity\n"
188             if ref $quantity;
189 72 100       157 if ($throttle->try_acquire($id, $key, $quantity)) {
190 46         130 $acquired{$throttle} = $throttle;
191             }
192             else {
193 19         31 eval { ## no critic (RequireCheckingReturnValueOfEval)
194 19         47 local $SIG{__DIE__};
195 19         61 $throttle->acquire($id, $key, $quantity);
196 11         115 $acquired{$throttle} = $throttle;
197             };
198 19         4068 $failed = $@;
199 19         61 for (values %acquired) {
200 15         68 $_->release_unused($id);
201             }
202 19 100       54 if ($failed) {
203 8         16 last ACQUIRE_ALL;
204             } else {
205 11         62 redo ACQUIRE_ALL;
206             }
207             }
208             }
209             }
210 40         114 $Running{$id} = [values %acquired];
211             }
212 48         63 ($IN_flush, $IN_flush_ignore_recursion) = @old;
213             # while waiting for resources needed for this sync call some resources
214             # needed for queued async calls may be released, but in this case
215             # throttle_flush() wasn't called because it was blocked, so let's
216             # ensure it will be called no late than this sync call will $done->()
217 48         50 $IN_flush_pending = 1;
218              
219 48 100       68 if ($failed) {
220 8         50 croak $failed;
221             } else {
222 40         374 return $done;
223             }
224 13     13   6315 }
  13         22  
  13         39  
225              
226             sub _done { ## no critic (ProhibitUnusedPrivateSubroutines)
227 127     127   165 my ($id, $is_used) = @_;
228 127   100     423 $is_used ||= 1 == @_;
229 127   100     117 for my $throttle (@{ delete $Running{$id} // [] }) {
  127         433  
230 105 100       154 if ($is_used) {
231 90         285 $throttle->release($id);
232             } else {
233 15         33 $throttle->release_unused($id);
234             }
235             }
236 127 100       244 if ($IN_flush_pending) {
237 24         44 throttle_flush();
238             }
239 127         342 return;
240             }
241              
242             sub _it {
243 5     5   31 my ($is_sync, $is_asap, $func) = @_;
244 5 50 33     30 croak 'require function name' if !$func || ref $func;
245 5 100       21 if ($func !~ /::/ms) {
246 3         11 $func = caller(1) . q{::} . $func;
247             }
248 5 50       54 croak 'no such function: '.$func if !defined &{$func};
  5         23  
249 5         6 my $orig = \&{$func};
  5         13  
250             ## no critic (ProhibitNoWarnings ProhibitStringyEval RequireCheckingReturnValueOfEval ProhibitImplicitNewlines RequireCarping)
251 13     13   3702 no warnings 'redefine';
  13         17  
  13         4568  
252 5 100 66 46   839 eval 'sub '.$func.' {
  46 50 100 10   5094  
  32 100 33     65  
  15 50 100     22  
  15 50       44  
  15 0       383  
  0         0  
  0         0  
  0         0  
  0         0  
  14         34  
  10         28  
  10         210  
  10         30  
  10         89  
  10         19  
  5         8  
  5         15  
  5         108  
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
253             if (!'.$is_sync.' && @_ && ref $_[-1] eq "CODE") {
254             my $done = &throttle_me'.($is_asap ? '_asap' : q{}).' || return;
255             my $cb = pop;
256             $orig->(@_, done_cb($done, $cb));
257             return;
258             } elsif (wantarray) {
259             my $done = &throttle_me_sync;
260             my @res = &$orig;
261             $done->();
262             return @res;
263             } else {
264             my $done = &throttle_me_sync;
265             my $res = &$orig;
266             $done->();
267             return $res;
268             }
269             }; 1' or die $@;
270 5         14 return $orig;
271             }
272              
273             sub _me {
274 194     194   190 my ($queue, $args) = @_;
275 194         238 for (0, 1) {
276 332 100       686 if (ref $args->[$_] eq 'Sub::Throttler::__done') {
277 87         67 return splice @{$args}, $_, 1;
  87         597  
278             }
279             }
280 107         259 my $func = (caller 2)[CALLER_SUBROUTINE];
281 107 100       2122 croak 'impossible to throttle anonymous function' if !defined &{$func};
  107         360  
282 105         100 my $code = \&{$func};
  105         232  
283 105         556 my ($pkg, $name) = $func =~ /\A(.*)::(.*)\z/ms;
284 105         116 my $is_method = eval { local $SIG{__DIE__}; $args->[0]->isa($pkg) };
  105         257  
  105         618  
285 105 100       175 if ($is_method) {
286 34         34 my $self = shift @{$args};
  34         48  
287 34         161 my $done = Sub::Throttler::__done->new($self.q{->}.$name);
288 34         33 push @{$queue}, [$done, $name, $self, $code, @{$args}];
  34         39  
  34         53  
289 34 100       65 if (ref $self) {
290 28         77 weaken $queue->[-1][2];
291             }
292             }
293             else {
294 71         178 my $done = Sub::Throttler::__done->new($func);
295 71         63 push @{$queue}, [$done, $func, q{}, $code, @{$args}];
  71         89  
  71         152  
296             }
297 105         154 throttle_flush();
298 89         590 return;
299             }
300              
301             # should be used only from tests
302             sub _reset { ## no critic (ProhibitUnusedPrivateSubroutines)
303 45     45   7501 $IN_flush = 0;
304 45         46 $IN_flush_recursion = 0;
305 45         35 $IN_flush_ignore_recursion = 0;
306 45         371 @Throttles = @Tasks = @AsapTasks = %Running = ();
307 45         54 return;
308             }
309              
310             sub _run_task {
311 87     87   124 my ($this, $code, $done, @params) = @_;
312 13     13   58 no strict 'refs';
  13         12  
  13         731  
313 87 100       122 if ($this) {
314 31         275 $this->$code($done, @params);
315             } else {
316 56         404 $code->($done, @params);
317             }
318 87         359 return;
319             }
320              
321              
322             package Sub::Throttler::__done; ## no critic (ProhibitMultiplePackages)
323 13     13   48 use Carp;
  13         16  
  13         693  
324              
325 13     13   53 use Scalar::Util qw( refaddr );
  13         18  
  13         2385  
326              
327             my (%Check, %Name);
328              
329             sub new {
330 169     169   217 my (undef, $name) = @_;
331 169         136 my $id;
332             my $done = bless sub {
333 129 100   129   29739 if ($Check{$id}) {
334 2         56 croak "Sub::Throttler: $name: \$done->() already called";
335             }
336 127         187 $Check{$id}=1;
337 127         282 Sub::Throttler::_done($id, @_); ## no critic(ProtectPrivateSubs)
338 169         713 }, __PACKAGE__;
339 169         367 $id = refaddr $done;
340 169         324 $Name{$id} = $name;
341 169         240 return $done;
342             }
343              
344             sub DESTROY {
345 148     148   19966 my $done = shift;
346 148         272 my $id = refaddr $done;
347 148         248 my $name = delete $Name{$id};
348 148 100       307 if (!delete $Check{$id}) {
349 42         370 carp "Sub::Throttler: $name: \$done->() was not called";
350             }
351 148         24333 return;
352             }
353              
354              
355             1; # Magic true value required at end of module
356             __END__