File Coverage

blib/lib/Sub/Throttler/Rate/AnyEvent.pm
Criterion Covered Total %
statement 188 280 67.1
branch 43 100 43.0
condition 10 33 30.3
subroutine 32 37 86.4
pod 9 9 100.0
total 282 459 61.4


line stmt bran cond sub pod time code
1             package Sub::Throttler::Rate::AnyEvent;
2 3     3   35558 use 5.010001;
  3         7  
  3         95  
3 3     3   11 use warnings;
  3         3  
  3         61  
4 3     3   11 use strict;
  3         8  
  3         56  
5 3     3   8 use utf8;
  3         2  
  3         13  
6 3     3   38 use Carp;
  3         3  
  3         226  
7             our @CARP_NOT = qw( Sub::Throttler );
8              
9             our $VERSION = 'v0.2.3';
10              
11 3     3   352 use parent qw( Sub::Throttler::algo );
  3         279  
  3         11  
12 3     3   168 use Sub::Throttler qw( throttle_flush );
  3         4  
  3         16  
13 3     3   221 use Time::HiRes qw( clock_gettime CLOCK_MONOTONIC time sleep );
  3         4  
  3         18  
14 3     3   446 use List::Util qw( min );
  3         4  
  3         137  
15 3     3   10 use Scalar::Util qw( weaken );
  3         3  
  3         89  
16 3     3   1855 use Storable qw( dclone );
  3         7445  
  3         176  
17 3     3   2573 use AnyEvent;
  3         11420  
  3         119  
18              
19              
20             sub new {
21 3     3   16 use warnings FATAL => qw( misc );
  3         2  
  3         3579  
22 9     9 1 103052 my ($class, %opt) = @_;
23 9   100     135 my $self = bless {
      100        
      33        
24             limit => delete $opt{limit} // 1,
25             period => delete $opt{period} // 1,
26             acquired=> {}, # { $id => { $key => [$time, $quantity], … }, … }
27             used => {}, # { $key => { next => $idx, data => [ $time, … ] }, … }
28             _cb => undef, # callback for timer
29             _t => undef, # undef or AE::timer
30             }, ref $class || $class;
31 9 50       92 croak 'limit must be an unsigned integer' if $self->{limit} !~ /\A\d+\z/ms;
32 9 50       37 croak 'period must be a positive number' if $self->{period} <= 0;
33 9 50       39 croak 'period is too large' if $self->{period} >= -Sub::Throttler::Rate::rr::EMPTY();
34 9 50       26 croak 'bad param: '.(keys %opt)[0] if keys %opt;
35 9         34 weaken(my $this = $self);
36 9 0   0   40 $self->{_cb} = sub { $this && $this->_tick() };
  0         0  
37 9         59 return $self;
38             }
39              
40             sub acquire {
41 11     11 1 16 my ($self, $id, $key, $quantity) = @_;
42 11 50       22 if (!$self->try_acquire($id, $key, $quantity)) {
43 11 50       36 if ($quantity <= $self->{limit}) {
44 11         29 my $now = clock_gettime(CLOCK_MONOTONIC);
45 11         53 my $delay = $self->{used}{$key}->get($quantity) + $self->{period} - $now;
46             # resource may expire between try_acquire() and clock_gettime()
47 11 50       28 if ($delay > 0) {
48 11         1178429 sleep $delay;
49             }
50             }
51 11 50       100 if (!$self->try_acquire($id, $key, $quantity)) {
52 0         0 croak "$self: unable to acquire $quantity of resource '$key'";
53             }
54             }
55 11         40 return $self;
56             }
57              
58             sub limit {
59 1     1 1 34 my ($self, $limit) = @_;
60 1 50       4 if (1 == @_) {
61 1         6 return $self->{limit};
62             }
63 0 0       0 croak 'limit must be an unsigned integer' if $limit !~ /\A\d+\z/ms;
64             # OPTIMIZATION call throttle_flush() only if amount of available
65             # resources increased (i.e. limit was increased)
66 0         0 my $resources_increases = $self->{limit} < $limit;
67 0         0 $self->{limit} = $limit;
68 0         0 for my $rr (values %{ $self->{used} }) {
  0         0  
69 0         0 $rr->resize($self->{limit});
70             }
71 0 0       0 if ($resources_increases) {
72 0         0 throttle_flush();
73             }
74 0         0 return $self;
75             }
76              
77             sub load {
78 5     5 1 122 my ($class, $state) = @_;
79 5 100       23 croak 'bad state: wrong algorithm' if $state->{algo} ne __PACKAGE__;
80 4         44 my $v = version->parse($state->{version});
81 4 100       143 if ($v > $VERSION) {
82 1         15 carp 'restoring state saved by future version';
83             }
84 4         259 my $self = $class->new(limit=>$state->{limit}, period=>$state->{period});
85 4         96 $self->{used} = dclone($state->{used});
86 4         21 my ($time, $now) = (time, clock_gettime(CLOCK_MONOTONIC));
87             # time jump backward, no matter how much, handled like we still is in
88             # current period, to be safe
89 4 100       32 if ($state->{at} > $time) {
90 2         3 $time = $state->{at};
91             }
92 4         8 my $diff = $time - $now;
93 4         6 for my $data (map {$_->{data}} values %{ $self->{used} }) {
  4         13  
  4         10  
94 4         6 for (@{ $data }) {
  4         7  
95 12 50       24 if ($_ != Sub::Throttler::Rate::rr::EMPTY()) {
96 12         17 $_ -= $diff;
97             }
98             }
99             }
100 4         3 for (values %{ $self->{used} }) {
  4         7  
101 4         11 bless $_, 'Sub::Throttler::Rate::rr';
102             }
103 4         19 $self->{_t} = AE::timer 0, 0, $self->{_cb};
104 4         15 return $self;
105             }
106              
107             sub period {
108 1     1 1 2 my ($self, $period) = @_;
109 1 50       6 if (1 == @_) {
110 1         4 return $self->{period};
111             }
112 0 0       0 croak 'period must be a positive number' if $period <= 0;
113 0 0       0 croak 'period is too large' if $self->{period} >= -Sub::Throttler::Rate::rr::EMPTY();
114             # OPTIMIZATION call throttle_flush() only if amount of available
115             # resources increased (i.e. period was decreased)
116 0         0 my $resources_increases = $self->{period} > $period;
117 0         0 $self->{period} = $period;
118 0 0       0 if ($resources_increases) {
119 0 0       0 if ($self->{_t}) {
120 0         0 $self->{_t} = undef;
121 0         0 $self->_tick();
122             }
123             else {
124 0         0 throttle_flush();
125             }
126             }
127 0         0 return $self;
128             }
129              
130             sub release {
131 18     18 1 25 my ($self, $id) = @_;
132 18 50       51 croak sprintf '%s not acquired anything', $id if !$self->{acquired}{$id};
133 18         36 delete $self->{acquired}{$id};
134 18         44 return $self;
135             }
136              
137             sub release_unused {
138 13     13 1 21 my ($self, $id) = @_;
139 13 50       50 croak sprintf '%s not acquired anything', $id if !$self->{acquired}{$id};
140              
141 13         30 my $now = clock_gettime(CLOCK_MONOTONIC);
142 13         47 for my $key (grep {$self->{used}{$_}} keys %{ $self->{acquired}{$id} }) {
  13         57  
  13         54  
143 13         17 my ($time, $quantity) = @{ $self->{acquired}{$id}{$key} };
  13         30  
144 13         49 $self->{used}{$key}->del($time, $quantity);
145             # clean up (avoid memory leak in long run with unique keys)
146 13 100       54 if ($self->{used}{$key}->get($self->{limit}) + $self->{period} <= $now) {
147 11         64 delete $self->{used}{$key};
148             }
149             }
150 13         33 delete $self->{acquired}{$id};
151 13         44 throttle_flush();
152 13 100       14 if (!keys %{ $self->{used} }) {
  13         38  
153 11         21 $self->{_t} = undef;
154             }
155 13         94 return $self;
156             }
157              
158             sub save {
159 1     1 1 2 my ($self) = @_;
160 1         7 my ($time, $now) = (time, clock_gettime(CLOCK_MONOTONIC));
161 1         28 my $diff = $time - $now;
162 1         194 my $state = {
163             algo => __PACKAGE__,
164             version => version->declare($VERSION)->numify,
165             limit => $self->{limit},
166             period => $self->{period},
167             used => dclone($self->{used}),
168             at => $time,
169             };
170 1         9 for my $data (map {$_->{data}} values %{ $state->{used} }) {
  1         4  
  1         4  
171 1         2 for (@{ $data }) {
  1         3  
172 3 50       8 if ($_ != Sub::Throttler::Rate::rr::EMPTY()) {
173 3         8 $_ += $diff;
174             }
175             }
176             }
177 1         2 for (values %{ $state->{used} }) {
  1         4  
178 1         1 $_ = {%{ $_ }}; # unbless
  1         15  
179             }
180 1         3 return $state;
181             }
182              
183             sub try_acquire {
184 62     62 1 218 my ($self, $id, $key, $quantity) = @_;
185 62 100 66     247 croak sprintf '%s already acquired %s', $id, $key
186             if $self->{acquired}{$id} && exists $self->{acquired}{$id}{$key};
187 61 50       117 croak 'quantity must be positive' if $quantity <= 0;
188              
189 61         192 my $now = clock_gettime(CLOCK_MONOTONIC);
190              
191 61   66     533 $self->{used}{$key} ||= Sub::Throttler::Rate::rr->new($self->{limit});
192 61 100       202 if (!$self->{used}{$key}->add($self->{period}, $now, $quantity)) {
193 26         71 return;
194             }
195              
196 35         139 $self->{acquired}{$id}{$key} = [$now, $quantity];
197 35 100       121 if (!$self->{_t}) {
198 16         181 $self->{_t} = AE::timer $self->{period}, 0, $self->{_cb};
199             }
200 35         4373 return 1;
201             }
202              
203             sub _tick {
204 0     0   0 my $self = shift;
205 0         0 my $now = clock_gettime(CLOCK_MONOTONIC);
206 0         0 my $when = 0;
207 0         0 for my $key (keys %{ $self->{used} }) {
  0         0  
208 0         0 my $after = $self->{used}{$key}->after($now - $self->{period});
209 0 0 0     0 if (!$after) {
    0          
210 0         0 delete $self->{used}{$key};
211             }
212             elsif (!$when || $when > $after) {
213 0         0 $when = $after;
214             }
215             }
216 0 0       0 $self->{_t} = !$when ? undef : AE::timer $when + $self->{period} - $now, 0, $self->{_cb};
217 0         0 throttle_flush();
218 0         0 return;
219             }
220              
221              
222             package Sub::Throttler::Rate::rr; ## no critic (ProhibitMultiplePackages)
223 3     3   50 use 5.010001;
  3         7  
  3         78  
224 3     3   9 use warnings;
  3         6  
  3         57  
225 3     3   14 use strict;
  3         4  
  3         89  
226 3     3   16 use utf8;
  3         3  
  3         18  
227 3     3   51 use Carp;
  3         29  
  3         199  
228              
229 3     3   12 use constant EMPTY => -1_000_000_000;
  3         3  
  3         2542  
230              
231              
232             sub new {
233 16     16   26 my ($class, $len) = @_;
234 16   33     165 my $self = bless {
235             next => 0,
236             data => [ (EMPTY) x $len ],
237             }, ref $class || $class;
238 16         63 return $self;
239             }
240              
241             sub add {
242 61     61   103 my ($self, $period, $time, $quantity) = @_;
243 61         54 my $len = @{ $self->{data} };
  61         109  
244             # try_acquire() guarantee $quantity > 0, so we continue only if $len > 0
245             # (thus avoid division by zero on % $len) and there is a chance to add
246             # $quantity elements
247 61 100       118 if ($quantity > $len) {
248 1         4 return;
249             }
250 60         130 my $required = ($self->{next} + $quantity - 1) % $len;
251             # {data} is sorted, last added element ($self->{next}-1) is guaranteed
252             # to be largest of all elements, so all elements between (inclusive)
253             # $self->{next} and $required are guaranteed to be either EMPTY
254             # or <= $self->{next}-1 element, and $required element is largest of them
255 60 100       164 if ($self->{data}[$required] > $time - $period) {
256 25         64 return;
257             }
258 35         84 for (1 .. $quantity) {
259 38         60 $self->{data}[ $self->{next} ] = $time;
260 38         104 ($self->{next} += 1) %= $len;
261             }
262 35         106 return 1;
263             }
264              
265             # Return time of acquiring first resource after $time or nothing.
266             sub after {
267 0     0   0 my ($self, $time) = @_;
268             # _tick() guarantee $time > EMPTY
269 0         0 my $len = @{ $self->{data} };
  0         0  
270 0         0 for (1 .. $len) {
271 0         0 $_ = ($self->{next} + $_ - 1) % $len;
272 0 0       0 return $self->{data}[ $_ ] if $self->{data}[ $_ ] > $time;
273             }
274 0         0 return;
275             }
276              
277             sub del {
278 13     13   19 my ($self, $time, $quantity) = @_;
279             # try_acquire() guarantee $quantity > 0
280             # even if $time is already outdated, these elements should be removed
281             # anyway in case {period} will be increased later
282 13         18 my $len = @{ $self->{data} };
  13         21  
283 13 50       48 if (!$len) {
284 0         0 return;
285             }
286 13 50       33 if ($quantity > $len) {
287 0         0 $quantity = $len;
288             }
289             # OPTIMIZATION not in {data}
290 13 50       75 if ($self->{data}[ $self->{next} ] > $time) {
    100          
    50          
291 0         0 return;
292             }
293             # OPTIMIZATION oldest
294             elsif ($self->{data}[ $self->{next} ] == $time) {
295 11         27 for (map { ($self->{next} + $_ - 1) % $len } 1 .. $quantity) {
  11         50  
296             # part of $quantity may be not in {data} (if {limit} was decreased)
297 11 50       37 return if $self->{data}[ $_ ] != $time;
298 11         30 $self->{data}[ $_ ] = EMPTY;
299             }
300             }
301             # OPTIMIZATION newest
302             elsif ($self->{data}[ $self->{next} - 1 ] == $time) {
303 2         6 for (map { $self->{next} - $_ } 1 .. $quantity) {
  2         6  
304 2 50       17 croak 'assert: newest: no time' if $self->{data}[ $_ ] != $time;
305 2         6 $self->{data}[ $_ ] = EMPTY;
306             }
307 2         8 $self->{next} = ($self->{next} - $quantity) % $len;
308             }
309             # middle (actually it support any case, not just middle)
310             else {
311 0   0     0 my $i = _binsearch($time, $self->{data}, $self->{next}, $len - 1)
312             // _binsearch($time, $self->{data}, 0, $self->{next} - 1);
313 0 0       0 croak 'assert: middle: not found' if !defined $i;
314 0         0 for (map { ($i + $_ - 1) % $len } 1 .. $quantity) {
  0         0  
315 0 0       0 croak 'assert: middle: no time' if $self->{data}[ $_ ] != $time;
316 0         0 $self->{data}[ $_ ] = EMPTY;
317             }
318             # OPTIMIZATION move minimum amount of elements
319 0         0 my $count_rew = ($self->{next} - $i) % $len;
320 0         0 my $count_fwd = ($i + $quantity - $self->{next}) % $len;
321             # move oldest elements forward
322 0 0       0 if ($count_fwd <= $count_rew) {
323 0         0 @{ $self->{data} }[ map { ($self->{next}+$_-1) % $len } 1 .. $count_fwd ] =
  0         0  
  0         0  
324 0         0 @{ $self->{data} }[ map { ($self->{next}+$_-1) % $len } $count_fwd-$quantity+1 .. $count_fwd, 1 .. $count_fwd-$quantity ];
  0         0  
325             }
326             # move newest elements backward
327             else {
328 0         0 @{ $self->{data} }[ map { ($i+$_-1) % $len } 1 .. $count_rew ] =
  0         0  
  0         0  
329 0         0 @{ $self->{data} }[ map { ($i+$_-1) % $len } 1+$quantity .. $count_rew, 1 .. $quantity];
  0         0  
330 0         0 $self->{next} = ($self->{next} - $quantity) % $len;
331             }
332             }
333 13         26 return;
334             }
335              
336             sub get {
337 24     24   35 my ($self, $id) = @_;
338             # $id is number of required element, counting from oldest one ($id = 1)
339 24         28 my $len = @{ $self->{data} };
  24         48  
340             # acquire() guarantee 0 < $id <= $len
341 24         34 my $i = ($self->{next} + $id - 1) % $len;
342 24         81 return $self->{data}[$i];
343             }
344              
345             sub resize {
346 0     0     my ($self, $newlen) = @_;
347             # limit() guarantee $newlen >= 0
348 0           my $len = @{ $self->{data} };
  0            
349 0           my $d = $self->{data};
350 0           $self->{data} = [ @{$d}[ $self->{next} .. $#{$d} ], @{$d}[ 0 .. $self->{next} - 1 ] ];
  0            
  0            
  0            
351 0 0         if ($newlen < $len) {
352 0           $self->{next} = 0;
353 0           splice @{ $self->{data} }, 0, $len - $newlen;
  0            
354             } else {
355 0           $self->{next} = $len % $newlen;
356 0           push @{ $self->{data} }, (EMPTY) x ($newlen - $len);
  0            
357             }
358 0           return $self;
359             }
360              
361             # From List::BinarySearch::PP version 0.23.
362             # Modified to support slices and work with array of numbers, without callback.
363             sub _binsearch {
364 0     0     my ( $target, $aref, $min, $max ) = @_;
365 0   0       $min //= 0;
366 0   0       $max //= $#{$aref};
  0            
367 0 0 0       croak 'bad slice' if $min < 0 || $#{$aref} < $max || $min > $max;
  0   0        
368 0           while ( $max > $min ) {
369 0           my $mid = int( ( $min + $max ) / 2 );
370 0 0         if ( $target > $aref->[$mid] ) {
371 0           $min = $mid + 1;
372             } else {
373 0           $max = $mid;
374             }
375             }
376 0 0         return $min if $target == $aref->[$min];
377 0           return;
378             }
379              
380              
381             1; # Magic true value required at end of module
382             __END__