File Coverage

blib/lib/Sub/Throttler/Rate/AnyEvent.pm
Criterion Covered Total %
statement 248 285 87.0
branch 69 100 69.0
condition 11 33 33.3
subroutine 36 37 97.3
pod 9 9 100.0
total 373 464 80.3


line stmt bran cond sub pod time code
1             package Sub::Throttler::Rate::AnyEvent;
2 3     3   441250 use 5.010001;
  3         8  
3 3     3   9 use warnings;
  3         3  
  3         57  
4 3     3   9 use strict;
  3         3  
  3         61  
5 3     3   12 use utf8;
  3         3  
  3         15  
6 3     3   50 use Carp;
  3         3  
  3         241  
7             our @CARP_NOT = qw( Sub::Throttler );
8              
9             our $VERSION = 'v0.2.10';
10              
11 3     3   11 use parent qw( Sub::Throttler::algo );
  3         3  
  3         15  
12 3     3   153 use Sub::Throttler qw( throttle_flush );
  3         2  
  3         13  
13 3     3   206 use Time::HiRes qw( clock_gettime CLOCK_MONOTONIC time sleep );
  3         4  
  3         19  
14 3     3   361 use List::Util qw( min );
  3         4  
  3         138  
15 3     3   11 use Scalar::Util qw( weaken );
  3         3  
  3         111  
16 3     3   11 use Storable qw( dclone );
  3         4  
  3         162  
17 3     3   2814 use AnyEvent;
  3         10852  
  3         112  
18              
19              
20             sub new {
21 3     3   16 use warnings FATAL => qw( misc );
  3         3  
  3         3964  
22 40     40 1 192817 my ($class, %opt) = @_;
23             my $self = bless {
24             limit => delete $opt{limit} // 1,
25 39   100     612 period => delete $opt{period} // 1,
      100        
      33        
26             acquired=> {}, # { $id => { $key => [$time, $quantity], … }, … }
27             used => {}, # { $key => { next => $idx, data => [ $time, … ] }, … }
28             _cb => undef, # callback for timer
29             _t => undef, # undef or AE::timer
30             }, ref $class || $class;
31 39 100       310 croak 'limit must be an unsigned integer' if $self->{limit} !~ /\A\d+\z/ms;
32 37 50       109 croak 'period must be a positive number' if $self->{period} <= 0;
33 37 50       144 croak 'period is too large' if $self->{period} >= -Sub::Throttler::Rate::rr::EMPTY();
34 37 100       115 croak 'bad param: '.(keys %opt)[0] if keys %opt;
35 35         107 weaken(my $this = $self);
36 35 50   13   128 $self->{_cb} = sub { $this && $this->_tick() };
  13         819605  
37 35         109 return $self;
38             }
39              
40             sub acquire {
41 11     11 1 26 my ($self, $id, $key, $quantity) = @_;
42 11 50       16 if (!$self->try_acquire($id, $key, $quantity)) {
43 11 50       31 if ($quantity <= $self->{limit}) {
44 11         20 my $now = clock_gettime(CLOCK_MONOTONIC);
45 11         126 my $delay = $self->{used}{$key}->get($quantity) + $self->{period} - $now;
46 11         35 $delay = sprintf '%.6f', $delay; # work around floating point precision issues
47             # resource may expire between try_acquire() and clock_gettime()
48 11 50       26 if ($delay > 0) {
49 11         23 sleep $delay;
50             }
51             }
52 11 50       110 if (!$self->try_acquire($id, $key, $quantity)) {
53 0         0 croak "$self: unable to acquire $quantity of resource '$key'";
54             }
55             }
56 11         18 return $self;
57             }
58              
59             sub limit {
60 11     11 1 972 my ($self, $limit) = @_;
61 11 100       31 if (1 == @_) {
62 2         11 return $self->{limit};
63             }
64 9 50       53 croak 'limit must be an unsigned integer' if $limit !~ /\A\d+\z/ms;
65             # OPTIMIZATION call throttle_flush() only if amount of available
66             # resources increased (i.e. limit was increased)
67 9         14 my $resources_increases = $self->{limit} < $limit;
68 9         13 $self->{limit} = $limit;
69 9         7 for my $rr (values %{ $self->{used} }) {
  9         29  
70 7         18 $rr->resize($self->{limit});
71             }
72 9 100       18 if ($resources_increases) {
73 2         5 throttle_flush();
74             }
75 9         17 return $self;
76             }
77              
78             sub load {
79 5     5 1 72 my ($class, $state) = @_;
80 5 100       19 croak 'bad state: wrong algorithm' if $state->{algo} ne __PACKAGE__;
81 4         25 my $v = version->parse($state->{version});
82 4 100       26 if ($v > $VERSION) {
83 1         11 carp 'restoring state saved by future version';
84             }
85 4         184 my $self = $class->new(limit=>$state->{limit}, period=>$state->{period});
86 4         80 $self->{used} = dclone($state->{used});
87 4         15 my ($time, $now) = (time, clock_gettime(CLOCK_MONOTONIC));
88             # time jump backward, no matter how much, handled like we still is in
89             # current period, to be safe
90 4 100       91 if ($state->{at} > $time) {
91 2         3 $time = $state->{at};
92             }
93 4         5 my $diff = $time - $now;
94 4         4 for my $data (map {$_->{data}} values %{ $self->{used} }) {
  4         8  
  4         11  
95 4         4 for (@{ $data }) {
  4         5  
96 12 50       28 if ($_ != Sub::Throttler::Rate::rr::EMPTY()) {
97 12         12 $_ -= $diff;
98 12         36 $_ = sprintf '%.6f', $_; # work around floating point precision issues
99             }
100             }
101             }
102 4         5 for (values %{ $self->{used} }) {
  4         7  
103 4         8 bless $_, 'Sub::Throttler::Rate::rr';
104             }
105 4         9 $self->{_t} = AE::timer 0, 0, $self->{_cb};
106 4         132 return $self;
107             }
108              
109             sub period {
110 8     8 1 50683 my ($self, $period) = @_;
111 8 100       34 if (1 == @_) {
112 5         36 return $self->{period};
113             }
114 3 50       15 croak 'period must be a positive number' if $period <= 0;
115 3 50       17 croak 'period is too large' if $self->{period} >= -Sub::Throttler::Rate::rr::EMPTY();
116             # OPTIMIZATION call throttle_flush() only if amount of available
117             # resources increased (i.e. period was decreased)
118 3         9 my $resources_increases = $self->{period} > $period;
119 3         6 $self->{period} = $period;
120 3 100       11 if ($resources_increases) {
121 1 50       7 if ($self->{_t}) {
122 1         2 $self->{_t} = undef;
123 1         3 $self->_tick();
124             }
125             else {
126 0         0 throttle_flush();
127             }
128             }
129 3         47 return $self;
130             }
131              
132             sub release {
133 28     28 1 54960 my ($self, $id) = @_;
134 28 100       156 croak sprintf '%s not acquired anything', $id if !$self->{acquired}{$id};
135 25         60 delete $self->{acquired}{$id};
136 25         53 return $self;
137             }
138              
139             sub release_unused {
140 26     26 1 104434 my ($self, $id) = @_;
141 26 100       115 croak sprintf '%s not acquired anything', $id if !$self->{acquired}{$id};
142              
143 23         61 my $now = clock_gettime(CLOCK_MONOTONIC);
144 23         349 for my $key (grep {$self->{used}{$_}} keys %{ $self->{acquired}{$id} }) {
  25         64  
  23         82  
145 22         19 my ($time, $quantity) = @{ $self->{acquired}{$id}{$key} };
  22         39  
146 22         54 $self->{used}{$key}->del($time, $quantity);
147             # clean up (avoid memory leak in long run with unique keys)
148 22 100       51 if ($self->{used}{$key}->get($self->{limit}) + $self->{period} <= $now) {
149 16         51 delete $self->{used}{$key};
150             }
151             }
152 23         61 delete $self->{acquired}{$id};
153 23         55 throttle_flush();
154 23 100       37 if (!keys %{ $self->{used} }) {
  23         77  
155 14         18 $self->{_t} = undef;
156             }
157 23         63 return $self;
158             }
159              
160             sub save {
161 1     1 1 1 my ($self) = @_;
162 1         5 my ($time, $now) = (time, clock_gettime(CLOCK_MONOTONIC));
163 1         21 my $diff = $time - $now;
164             my $state = {
165             algo => __PACKAGE__,
166             version => version->declare($VERSION)->numify,
167             limit => $self->{limit},
168             period => $self->{period},
169 1         145 used => dclone($self->{used}),
170             at => $time,
171             };
172 1         3 for my $data (map {$_->{data}} values %{ $state->{used} }) {
  1         4  
  1         3  
173 1         1 for (@{ $data }) {
  1         2  
174 3 50       11 if ($_ != Sub::Throttler::Rate::rr::EMPTY()) {
175 3         4 $_ += $diff;
176 3         13 $_ = sprintf '%.6f', $_; # work around floating point precision issues
177             }
178             }
179             }
180 1         2 for (values %{ $state->{used} }) {
  1         3  
181 1         2 $_ = {%{ $_ }}; # unbless
  1         10  
182             }
183 1         2 return $state;
184             }
185              
186             sub try_acquire {
187 166     166 1 607640 my ($self, $id, $key, $quantity) = @_;
188             croak sprintf '%s already acquired %s', $id, $key
189 166 100 66     563 if $self->{acquired}{$id} && exists $self->{acquired}{$id}{$key};
190 164 100       298 croak 'quantity must be positive' if $quantity <= 0;
191              
192 162         389 my $now = clock_gettime(CLOCK_MONOTONIC);
193              
194 162   66     2867 $self->{used}{$key} ||= Sub::Throttler::Rate::rr->new($self->{limit});
195 162 100       373 if (!$self->{used}{$key}->add($self->{period}, $now, $quantity)) {
196 68         217 return;
197             }
198              
199 94         237 $self->{acquired}{$id}{$key} = [$now, $quantity];
200 94 100       158 if (!$self->{_t}) {
201 45         203 $self->{_t} = AE::timer $self->{period}, 0, $self->{_cb};
202             }
203 94         5893 return 1;
204             }
205              
206             sub _tick {
207 14     14   47 my $self = shift;
208 14         88 my $now = clock_gettime(CLOCK_MONOTONIC);
209 14         345 my $when = 0;
210 14         32 for my $key (keys %{ $self->{used} }) {
  14         125  
211 20         109 my $after = $self->{used}{$key}->after($now - $self->{period});
212 20 100 33     67 if (!$after) {
    50          
213 17         98 delete $self->{used}{$key};
214             }
215             elsif (!$when || $when > $after) {
216 3         15 $when = $after;
217             }
218             }
219 14 100       75 $self->{_t} = !$when ? undef : AE::timer $when + $self->{period} - $now, 0, $self->{_cb};
220 14         220 throttle_flush();
221 14         111 return;
222             }
223              
224              
225             package Sub::Throttler::Rate::rr; ## no critic (ProhibitMultiplePackages)
226 3     3   65 use 5.010001;
  3         9  
227 3     3   11 use warnings;
  3         5  
  3         75  
228 3     3   11 use strict;
  3         3  
  3         51  
229 3     3   10 use utf8;
  3         3  
  3         15  
230 3     3   46 use Carp;
  3         28  
  3         189  
231              
232 3     3   11 use constant EMPTY => -1_000_000_000;
  3         3  
  3         2561  
233              
234              
235             sub new {
236 63     63   95 my ($class, $len) = @_;
237 63   33     446 my $self = bless {
238             next => 0,
239             data => [ (EMPTY) x $len ],
240             }, ref $class || $class;
241 63         171 return $self;
242             }
243              
244             sub add {
245 162     162   188 my ($self, $period, $time, $quantity) = @_;
246 162         649 $time = sprintf '%.6f', $time; # work around floating point precision issues
247 162         138 my $len = @{ $self->{data} };
  162         218  
248             # try_acquire() guarantee $quantity > 0, so we continue only if $len > 0
249             # (thus avoid division by zero on % $len) and there is a chance to add
250             # $quantity elements
251 162 100       284 if ($quantity > $len) {
252 3         10 return;
253             }
254 159         258 my $required = ($self->{next} + $quantity - 1) % $len;
255             # {data} is sorted, last added element ($self->{next}-1) is guaranteed
256             # to be largest of all elements, so all elements between (inclusive)
257             # $self->{next} and $required are guaranteed to be either EMPTY
258             # or <= $self->{next}-1 element, and $required element is largest of them
259 159         643 my $since = sprintf '%.6f', $time - $period; # work around floating point precision issues
260 159 100       409 if ($self->{data}[$required] > $since) {
261 65         138 return;
262             }
263 94         191 for (1 .. $quantity) {
264 188         216 $self->{data}[ $self->{next} ] = $time;
265 188         233 ($self->{next} += 1) %= $len;
266             }
267 94         216 return 1;
268             }
269              
270             # Return time of acquiring first resource after $time or nothing.
271             sub after {
272 20     20   30 my ($self, $time) = @_;
273 20         111 $time = sprintf '%.6f', $time; # work around floating point precision issues
274             # _tick() guarantee $time > EMPTY
275 20         26 my $len = @{ $self->{data} };
  20         52  
276 20         212 for (1 .. $len) {
277 46         107 $_ = ($self->{next} + $_ - 1) % $len;
278 46 100       147 return $self->{data}[ $_ ] if $self->{data}[ $_ ] > $time;
279             }
280 17         44 return;
281             }
282              
283             sub del {
284 22     22   26 my ($self, $time, $quantity) = @_;
285 22         93 $time = sprintf '%.6f', $time; # work around floating point precision issues
286             # try_acquire() guarantee $quantity > 0
287             # even if $time is already outdated, these elements should be removed
288             # anyway in case {period} will be increased later
289 22         18 my $len = @{ $self->{data} };
  22         36  
290 22 50       44 if (!$len) {
291 0         0 return;
292             }
293 22 50       41 if ($quantity > $len) {
294 0         0 $quantity = $len;
295             }
296             # OPTIMIZATION not in {data}
297 22 100       93 if ($self->{data}[ $self->{next} ] > $time) {
    100          
    50          
298 3         4 return;
299             }
300             # OPTIMIZATION oldest
301             elsif ($self->{data}[ $self->{next} ] == $time) {
302 16         32 for (map { ($self->{next} + $_ - 1) % $len } 1 .. $quantity) {
  26         58  
303             # part of $quantity may be not in {data} (if {limit} was decreased)
304 26 50       44 return if $self->{data}[ $_ ] != $time;
305 26         39 $self->{data}[ $_ ] = EMPTY;
306             }
307             }
308             # OPTIMIZATION newest
309             elsif ($self->{data}[ $self->{next} - 1 ] == $time) {
310 3         8 for (map { $self->{next} - $_ } 1 .. $quantity) {
  6         12  
311 6 50       15 croak 'assert: newest: no time' if $self->{data}[ $_ ] != $time;
312 6         7 $self->{data}[ $_ ] = EMPTY;
313             }
314 3         7 $self->{next} = ($self->{next} - $quantity) % $len;
315             }
316             # middle (actually it support any case, not just middle)
317             else {
318             my $i = _binsearch($time, $self->{data}, $self->{next}, $len - 1)
319 0   0     0 // _binsearch($time, $self->{data}, 0, $self->{next} - 1);
320 0 0       0 croak 'assert: middle: not found' if !defined $i;
321 0         0 for (map { ($i + $_ - 1) % $len } 1 .. $quantity) {
  0         0  
322 0 0       0 croak 'assert: middle: no time' if $self->{data}[ $_ ] != $time;
323 0         0 $self->{data}[ $_ ] = EMPTY;
324             }
325             # OPTIMIZATION move minimum amount of elements
326 0         0 my $count_rew = ($self->{next} - $i) % $len;
327 0         0 my $count_fwd = ($i + $quantity - $self->{next}) % $len;
328             # move oldest elements forward
329 0 0       0 if ($count_fwd <= $count_rew) {
330 0         0 @{ $self->{data} }[ map { ($self->{next}+$_-1) % $len } 1 .. $count_fwd ] =
  0         0  
331 0         0 @{ $self->{data} }[ map { ($self->{next}+$_-1) % $len } $count_fwd-$quantity+1 .. $count_fwd, 1 .. $count_fwd-$quantity ];
  0         0  
  0         0  
332             }
333             # move newest elements backward
334             else {
335 0         0 @{ $self->{data} }[ map { ($i+$_-1) % $len } 1 .. $count_rew ] =
  0         0  
336 0         0 @{ $self->{data} }[ map { ($i+$_-1) % $len } 1+$quantity .. $count_rew, 1 .. $quantity];
  0         0  
  0         0  
337 0         0 $self->{next} = ($self->{next} - $quantity) % $len;
338             }
339             }
340 19         26 return;
341             }
342              
343             sub get {
344 33     33   42 my ($self, $id) = @_;
345             # $id is number of required element, counting from oldest one ($id = 1)
346 33         20 my $len = @{ $self->{data} };
  33         153  
347             # acquire() guarantee 0 < $id <= $len
348 33         43 my $i = ($self->{next} + $id - 1) % $len;
349 33         94 return $self->{data}[$i];
350             }
351              
352             sub resize {
353 7     7   11 my ($self, $newlen) = @_;
354             # limit() guarantee $newlen >= 0
355 7         6 my $len = @{ $self->{data} };
  7         12  
356 7         10 my $d = $self->{data};
357 7         14 $self->{data} = [ @{$d}[ $self->{next} .. $#{$d} ], @{$d}[ 0 .. $self->{next} - 1 ] ];
  7         22  
  7         12  
  7         23  
358 7 100       20 if ($newlen < $len) {
359 6         7 $self->{next} = 0;
360 6         6 splice @{ $self->{data} }, 0, $len - $newlen;
  6         19  
361             } else {
362 1         2 $self->{next} = $len % $newlen;
363 1         1 push @{ $self->{data} }, (EMPTY) x ($newlen - $len);
  1         4  
364             }
365 7         21 return $self;
366             }
367              
368             # From List::BinarySearch::PP version 0.23.
369             # Modified to support slices and work with array of numbers, without callback.
370             sub _binsearch {
371 0     0     my ( $target, $aref, $min, $max ) = @_;
372 0   0       $min //= 0;
373 0   0       $max //= $#{$aref};
  0            
374 0 0 0       croak 'bad slice' if $min < 0 || $#{$aref} < $max || $min > $max;
  0   0        
375 0           while ( $max > $min ) {
376 0           my $mid = int( ( $min + $max ) / 2 );
377 0 0         if ( $target > $aref->[$mid] ) {
378 0           $min = $mid + 1;
379             } else {
380 0           $max = $mid;
381             }
382             }
383 0 0         return $min if $target == $aref->[$min];
384 0           return;
385             }
386              
387              
388             1; # Magic true value required at end of module
389             __END__