File Coverage

blib/lib/Sub/Throttler.pm
Criterion Covered Total %
statement 292 307 95.1
branch 85 96 88.5
condition 19 33 57.5
subroutine 44 45 97.7
pod 10 10 100.0
total 450 491 91.6


line stmt bran cond sub pod time code
1             package Sub::Throttler;
2 13     13   1052330 use 5.010001;
  13         39  
3 13     13   72 use warnings;
  13         20  
  13         406  
4 13     13   51 use strict;
  13         21  
  13         239  
5 13     13   2014 use utf8;
  13         43  
  13         66  
6 13     13   278 use Carp;
  13         18  
  13         1004  
7              
8             our $VERSION = 'v0.2.10';
9              
10 13     13   3537 use Export::Attrs;
  13         54828  
  13         78  
11 13     13   912 use Scalar::Util qw( weaken refaddr blessed );
  13         18  
  13         905  
12              
13              
14 13     13   55 use constant CALLER_SUBROUTINE => 3;
  13         19  
  13         3883  
15              
16             my (@Throttles, @Tasks, @AsapTasks, %Running);
17             my $IN_flush = 0;
18             my $IN_flush_recursion = 0;
19             my $IN_flush_ignore_recursion = 0;
20             my $IN_flush_pending = 0;
21              
22              
23             sub done_cb :Export {
24 41     41 1 141 my ($done, $cb_or_obj_or_class, @p) = @_;
25 41 100 33     582 if (ref $cb_or_obj_or_class eq 'CODE') {
    100          
    50          
26 21         28 my $cb = $cb_or_obj_or_class;
27 21     21   171 return sub { $done->(); $cb->(@p, @_) };
  21         236801  
  21         59  
28             }
29             elsif (blessed($cb_or_obj_or_class)) {
30 14         22 my $obj = $cb_or_obj_or_class;
31 14         36 weaken($obj);
32 14         22 my $method = shift @p;
33 14 50 66     88 croak 'second param must be $method'
      33        
34             if !$method || (ref $method && ref $method ne 'CODE');
35 14 100   14   123 return sub { $done->(); $obj && $obj->$method(@p, @_) };
  14         590861  
  14         135  
36             }
37             elsif (defined $cb_or_obj_or_class && !ref $cb_or_obj_or_class) {
38 6         13 my $class = $cb_or_obj_or_class;
39 6         9 my $method = shift @p;
40 6 50 33     34 croak 'second param must be $method'
      33        
41             if !$method || (ref $method && ref $method ne 'CODE');
42 6     6   57 return sub { $done->(); $class->$method(@p, @_) };
  6         245891  
  6         44  
43             }
44             else {
45 0         0 croak 'first param must be $cb or $obj or $class';
46             }
47 13     13   66 }
  13         15  
  13         99  
48              
49             sub throttle_add :Export {
50 111     111 1 2670 my ($throttle, $target) = @_;
51 111 100       329 croak 'require 2 params' if 2 != @_;
52 107 100       229 croak 'throttle must be an object' if !ref $throttle;
53 104 100       354 croak 'target must be CODE' if ref $target ne 'CODE';
54 84         145 push @Throttles, [$throttle, $target];
55 84         138 return $throttle;
56 13     13   3872 }
  13         19  
  13         44  
57              
58             sub throttle_del :Export {
59 59     59 1 62867 my ($throttle) = @_;
60 59 100       105 @Throttles = grep { $throttle && $_->[0] != $throttle } @Throttles;
  32         271  
61 59         133 throttle_flush();
62 59         77 return;
63 13     13   2968 }
  13         17  
  13         44  
64              
65             sub throttle_flush :Export {
66 360 100   360 1 101827 if ($IN_flush) {
67 75 100       137 if (!$IN_flush_ignore_recursion) {
68 46         52 $IN_flush_recursion = 1;
69             }
70 75         121 return;
71             }
72 285         254 $IN_flush = 1;
73 285         324 $IN_flush_recursion = 0;
74 285         229 $IN_flush_ignore_recursion = 0;
75 285         234 $IN_flush_pending = 0;
76              
77 285         486 for my $tasks (\@AsapTasks, \@Tasks) {
78 570         562 my @tasks = @{$tasks};
  570         800  
79 570         505 @{$tasks} = ();
  570         704  
80 570         447 my @delayed;
81             TASK:
82 570         685 for my $task (@tasks) {
83 197         189 my ($done, $name, $this, $code, @params) = @{$task};
  197         553  
84 197         544 my $id = refaddr $done;
85 197 100       359 if (!defined $this) {
86 2         6 $done->(); # release $done
87 2         5 next;
88             }
89 195         189 my %acquired;
90 195         295 for (@Throttles) {
91 205         160 my ($throttle, $target) = @{$_};
  205         380  
92 205         480 my $resources = $target->($this, $name, @params);
93 205 100       896 next if !defined $resources;
94 196 100       438 die "Sub::Throttler: target returns not a HASHREF: $resources\n"
95             if ref $resources ne 'HASH';
96 191 100       135 next if !keys %{$resources};
  191         498  
97 190         178 my $acquired = 0;
98 190         188 while (my ($key, $quantity) = each %{$resources}) {
  286         793  
99 199 100       319 die "Sub::Throttler: target returns bad quantity for '$key': $quantity\n"
100             if ref $quantity;
101 195 100       519 if ($throttle->try_acquire($id, $key, $quantity)) {
102 96         165 $acquired++;
103             }
104             else {
105 92         177 last;
106             }
107             }
108 179 100       157 if ($acquired == keys %{$resources}) {
  179         351  
109 87         410 $acquired{$throttle} = $throttle;
110             }
111             else {
112 92         86 $IN_flush_ignore_recursion = 1;
113 92 100       139 if ($acquired) {
114 2         6 $throttle->release_unused($id);
115             }
116 92         169 for (values %acquired) {
117 13         35 $_->release_unused($id);
118             }
119 92         72 $IN_flush_ignore_recursion = 0;
120 92         94 push @delayed, $task;
121 92         312 next TASK;
122             }
123             }
124 87         228 $Running{$id} = [values %acquired];
125 87         206 _run_task($this, $code, $done, @params);
126             }
127 554         486 @{$tasks} = (@delayed, @{$tasks}); # while _run_task() new tasks may be added
  554         1013  
  554         563  
128             }
129              
130 269         274 $IN_flush = 0;
131 269 100       607 goto &throttle_flush if $IN_flush_recursion;
132 236         329 return;
133 13     13   6804 }
  13         19  
  13         54  
134              
135             sub throttle_it :Export {
136 4     4 1 36 return _it(0, 0, @_);
137 13     13   2520 }
  13         18  
  13         45  
138              
139             sub throttle_it_asap :Export {
140 1     1 1 18 return _it(0, 1, @_);
141 13     13   2390 }
  13         19  
  13         61  
142              
143             sub throttle_it_sync :Export {
144 0     0 1 0 return _it(1, 0, @_);
145 13     13   2387 }
  13         18  
  13         48  
146              
147             sub throttle_me :Export {
148 175     175 1 64160 return _me(\@Tasks, \@_);
149 13     13   2369 }
  13         17  
  13         1096  
150              
151             sub throttle_me_asap :Export {
152 19     19 1 53 return _me(\@AsapTasks, \@_);
153 13     13   2213 }
  13         19  
  13         37  
154              
155             sub throttle_me_sync :Export {
156 64     64 1 6674 my ($done, $failed);
157              
158 0         0 my ($this, @params);
159 64         158 my $func = (caller 1)[CALLER_SUBROUTINE];
160 64 50       1305 croak 'impossible to throttle anonymous function' if !defined &{$func};
  64         208  
161 64         315 my ($pkg, $name) = $func =~ /\A(.*)::(.*)\z/ms;
162 64         70 my $is_method = eval { local $SIG{__DIE__}; $_[0]->isa($pkg) };
  64         176  
  64         447  
163 64 100       106 if ($is_method) {
164 14         28 ($this, @params) = @_;
165 14         60 $done = Sub::Throttler::__done->new($this.q{->}.$name);
166             }
167             else {
168 50         99 ($this, @params) = (q{}, @_);
169 50         52 $name = $func;
170 50         100 $done = Sub::Throttler::__done->new($func);
171             }
172              
173 64         85 my @old = ($IN_flush, $IN_flush_ignore_recursion);
174 64         61 ($IN_flush, $IN_flush_ignore_recursion) = (1, 1);
175 64         87 my $id = refaddr $done;
176             ACQUIRE_ALL:
177             {
178 64         47 my %acquired;
  75         64  
179 75         111 for (@Throttles) {
180 86         58 my ($throttle, $target) = @{$_};
  86         125  
181 86         163 my $resources = $target->($this, $name, @params);
182 86 100       627 next if !defined $resources;
183 73 100       172 die "Sub::Throttler: target returns not a HASHREF: $resources\n"
184             if ref $resources ne 'HASH';
185 68 100       44 next if !keys %{$resources};
  68         164  
186 67         68 while (my ($key, $quantity) = each %{$resources}) {
  112         290  
187 75 100       124 die "Sub::Throttler: target returns bad quantity for '$key': $quantity\n"
188             if ref $quantity;
189 71 100       190 if ($throttle->try_acquire($id, $key, $quantity)) {
190 45         116 $acquired{$throttle} = $throttle;
191             }
192             else {
193 19         21 eval { ## no critic (RequireCheckingReturnValueOfEval)
194 19         44 local $SIG{__DIE__};
195 19         48 $throttle->acquire($id, $key, $quantity);
196 11         44 $acquired{$throttle} = $throttle;
197             };
198 19         3098 $failed = $@;
199 19         42 for (values %acquired) {
200 14         49 $_->release_unused($id);
201             }
202 19 100       259 if ($failed) {
203 8         19 last ACQUIRE_ALL;
204             } else {
205 11         35 redo ACQUIRE_ALL;
206             }
207             }
208             }
209             }
210 40         96 $Running{$id} = [values %acquired];
211             }
212 48         61 ($IN_flush, $IN_flush_ignore_recursion) = @old;
213             # while waiting for resources needed for this sync call some resources
214             # needed for queued async calls may be released, but in this case
215             # throttle_flush() wasn't called because it was blocked, so let's
216             # ensure it will be called no late than this sync call will $done->()
217 48         38 $IN_flush_pending = 1;
218              
219 48 100       65 if ($failed) {
220 8         53 croak $failed;
221             } else {
222 40         294 return $done;
223             }
224 13     13   6967 }
  13         24  
  13         45  
225              
226             sub _done { ## no critic (ProhibitUnusedPrivateSubroutines)
227 127     127   168 my ($id, $is_used) = @_;
228 127   100     687 $is_used ||= 1 == @_;
229 127   100     139 for my $throttle (@{ delete $Running{$id} // [] }) {
  127         512  
230 105 100       190 if ($is_used) {
231 90         346 $throttle->release($id);
232             } else {
233 15         44 $throttle->release_unused($id);
234             }
235             }
236 127 100       307 if ($IN_flush_pending) {
237 24         45 throttle_flush();
238             }
239 127         350 return;
240             }
241              
242             sub _it {
243 5     5   10 my ($is_sync, $is_asap, $func) = @_;
244 5 50 33     36 croak 'require function name' if !$func || ref $func;
245 5 100       21 if ($func !~ /::/ms) {
246 3         12 $func = caller(1) . q{::} . $func;
247             }
248 5 50       63 croak 'no such function: '.$func if !defined &{$func};
  5         21  
249 5         6 my $orig = \&{$func};
  5         12  
250             ## no critic (ProhibitNoWarnings ProhibitStringyEval RequireCheckingReturnValueOfEval ProhibitImplicitNewlines RequireCarping)
251 13     13   4192 no warnings 'redefine';
  13         19  
  13         5145  
252 5 100 66 46   885 eval 'sub '.$func.' {
  46 50 100 10   5917  
  32 100 33     85  
  15 50 100     25  
  15 50       52  
  15 0       835  
  0         0  
  0         0  
  0         0  
  0         0  
  14         37  
  10         25  
  10         202  
  10         27  
  10         102  
  10         28  
  5         8  
  5         17  
  5         160  
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
253             if (!'.$is_sync.' && @_ && ref $_[-1] eq "CODE") {
254             my $done = &throttle_me'.($is_asap ? '_asap' : q{}).' || return;
255             my $cb = pop;
256             $orig->(@_, done_cb($done, $cb));
257             return;
258             } elsif (wantarray) {
259             my $done = &throttle_me_sync;
260             my @res = &$orig;
261             $done->();
262             return @res;
263             } else {
264             my $done = &throttle_me_sync;
265             my $res = &$orig;
266             $done->();
267             return $res;
268             }
269             }; 1' or die $@;
270 5         11 return $orig;
271             }
272              
273             sub _me {
274 194     194   253 my ($queue, $args) = @_;
275 194         376 for (0, 1) {
276 332 100       823 if (ref $args->[$_] eq 'Sub::Throttler::__done') {
277 87         95 return splice @{$args}, $_, 1;
  87         821  
278             }
279             }
280 107         336 my $func = (caller 2)[CALLER_SUBROUTINE];
281 107 100       2426 croak 'impossible to throttle anonymous function' if !defined &{$func};
  107         409  
282 105         106 my $code = \&{$func};
  105         248  
283 105         708 my ($pkg, $name) = $func =~ /\A(.*)::(.*)\z/ms;
284 105         152 my $is_method = eval { local $SIG{__DIE__}; $args->[0]->isa($pkg) };
  105         330  
  105         871  
285 105 100       214 if ($is_method) {
286 34         38 my $self = shift @{$args};
  34         54  
287 34         194 my $done = Sub::Throttler::__done->new($self.q{->}.$name);
288 34         41 push @{$queue}, [$done, $name, $self, $code, @{$args}];
  34         50  
  34         80  
289 34 100       87 if (ref $self) {
290 28         82 weaken $queue->[-1][2];
291             }
292             }
293             else {
294 71         226 my $done = Sub::Throttler::__done->new($func);
295 71         68 push @{$queue}, [$done, $func, q{}, $code, @{$args}];
  71         101  
  71         196  
296             }
297 105         211 throttle_flush();
298 89         762 return;
299             }
300              
301             # should be used only from tests
302             sub _reset { ## no critic (ProhibitUnusedPrivateSubroutines)
303 45     45   58889 $IN_flush = 0;
304 45         42 $IN_flush_recursion = 0;
305 45         46 $IN_flush_ignore_recursion = 0;
306 45         240 @Throttles = @Tasks = @AsapTasks = %Running = ();
307 45         56 return;
308             }
309              
310             sub _run_task {
311 87     87   321 my ($this, $code, $done, @params) = @_;
312 13     13   64 no strict 'refs';
  13         14  
  13         897  
313 87 100       150 if ($this) {
314 31         404 $this->$code($done, @params);
315             } else {
316 56         499 $code->($done, @params);
317             }
318 87         1784 return;
319             }
320              
321              
322             package Sub::Throttler::__done; ## no critic (ProhibitMultiplePackages)
323 13     13   54 use Carp;
  13         19  
  13         734  
324              
325 13     13   49 use Scalar::Util qw( refaddr );
  13         14  
  13         2692  
326              
327             my (%Check, %Name);
328              
329             sub new {
330 169     169   253 my (undef, $name) = @_;
331 169         146 my $id;
332             my $done = bless sub {
333 129 100   129   175153 if ($Check{$id}) {
334 2         67 croak "Sub::Throttler: $name: \$done->() already called";
335             }
336 127         233 $Check{$id}=1;
337 127         326 Sub::Throttler::_done($id, @_); ## no critic(ProtectPrivateSubs)
338 169         767 }, __PACKAGE__;
339 169         425 $id = refaddr $done;
340 169         381 $Name{$id} = $name;
341 169         287 return $done;
342             }
343              
344             sub DESTROY {
345 154     154   19511 my $done = shift;
346 154         292 my $id = refaddr $done;
347 154         284 my $name = delete $Name{$id};
348 154 100       352 if (!delete $Check{$id}) {
349 42         396 carp "Sub::Throttler: $name: \$done->() was not called";
350             }
351 154         17678 return;
352             }
353              
354              
355             1; # Magic true value required at end of module
356             __END__