File Coverage

blib/lib/Sub/Throttler/Rate/AnyEvent.pm
Criterion Covered Total %
statement 188 280 67.1
branch 43 100 43.0
condition 10 33 30.3
subroutine 32 37 86.4
pod 9 9 100.0
total 282 459 61.4


line stmt bran cond sub pod time code
1             package Sub::Throttler::Rate::AnyEvent;
2 3     3   25628 use 5.010001;
  3         7  
  3         82  
3 3     3   9 use warnings;
  3         4  
  3         55  
4 3     3   9 use strict;
  3         3  
  3         55  
5 3     3   8 use utf8;
  3         3  
  3         13  
6 3     3   41 use Carp;
  3         3  
  3         249  
7             our @CARP_NOT = qw( Sub::Throttler );
8              
9             our $VERSION = 'v0.2.2';
10              
11 3     3   369 use parent qw( Sub::Throttler::algo );
  3         219  
  3         9  
12 3     3   179 use Sub::Throttler qw( throttle_flush );
  3         4  
  3         10  
13 3     3   205 use Time::HiRes qw( clock_gettime CLOCK_MONOTONIC time sleep );
  3         5  
  3         18  
14 3     3   416 use List::Util qw( min );
  3         3  
  3         127  
15 3     3   12 use Scalar::Util qw( weaken );
  3         3  
  3         95  
16 3     3   1583 use Storable qw( dclone );
  3         6455  
  3         156  
17 3     3   2643 use AnyEvent;
  3         11899  
  3         121  
18              
19              
20             sub new {
21 3     3   14 use warnings FATAL => qw( misc );
  3         3  
  3         3719  
22 9     9 1 102682 my ($class, %opt) = @_;
23 9   100     143 my $self = bless {
      100        
      33        
24             limit => delete $opt{limit} // 1,
25             period => delete $opt{period} // 1,
26             acquired=> {}, # { $id => { $key => [$time, $quantity], … }, … }
27             used => {}, # { $key => { next => $idx, data => [ $time, … ] }, … }
28             _cb => undef, # callback for timer
29             _t => undef, # undef or AE::timer
30             }, ref $class || $class;
31 9 50       84 croak 'limit must be an unsigned integer' if $self->{limit} !~ /\A\d+\z/ms;
32 9 50       33 croak 'period must be a positive number' if $self->{period} <= 0;
33 9 50       38 croak 'period is too large' if $self->{period} >= -Sub::Throttler::Rate::rr::EMPTY();
34 9 50       23 croak 'bad param: '.(keys %opt)[0] if keys %opt;
35 9         33 weaken(my $this = $self);
36 9 0   0   37 $self->{_cb} = sub { $this && $this->_tick() };
  0         0  
37 9         44 return $self;
38             }
39              
40             sub acquire {
41 11     11 1 16 my ($self, $id, $key, $quantity) = @_;
42 11 50       25 if (!$self->try_acquire($id, $key, $quantity)) {
43 11 50       33 if ($quantity <= $self->{limit}) {
44 11         28 my $now = clock_gettime(CLOCK_MONOTONIC);
45 11         45 my $delay = $self->{used}{$key}->get($quantity) + $self->{period} - $now;
46             # resource may expire between try_acquire() and clock_gettime()
47 11 50       26 if ($delay > 0) {
48 11         1182103 sleep $delay;
49             }
50             }
51 11 50       110 if (!$self->try_acquire($id, $key, $quantity)) {
52 0         0 croak "$self: unable to acquire $quantity of resource '$key'";
53             }
54             }
55 11         46 return $self;
56             }
57              
58             sub limit {
59 1     1 1 22 my ($self, $limit) = @_;
60 1 50       4 if (1 == @_) {
61 1         5 return $self->{limit};
62             }
63 0 0       0 croak 'limit must be an unsigned integer' if $limit !~ /\A\d+\z/ms;
64             # OPTIMIZATION call throttle_flush() only if amount of available
65             # resources increased (i.e. limit was increased)
66 0         0 my $resources_increases = $self->{limit} < $limit;
67 0         0 $self->{limit} = $limit;
68 0         0 for my $rr (values %{ $self->{used} }) {
  0         0  
69 0         0 $rr->resize($self->{limit});
70             }
71 0 0       0 if ($resources_increases) {
72 0         0 throttle_flush();
73             }
74 0         0 return $self;
75             }
76              
77             sub load {
78 5     5 1 152 my ($class, $state) = @_;
79 5 100       25 croak 'bad state: wrong algorithm' if $state->{algo} ne __PACKAGE__;
80 4         48 my $v = version->parse($state->{version});
81 4 100       138 if ($v > $VERSION) {
82 1         15 carp 'restoring state saved by future version';
83             }
84 4         284 my $self = $class->new(limit=>$state->{limit}, period=>$state->{period});
85 4         101 $self->{used} = dclone($state->{used});
86 4         19 my ($time, $now) = (time, clock_gettime(CLOCK_MONOTONIC));
87             # time jump backward, no matter how much, handled like we still is in
88             # current period, to be safe
89 4 100       22 if ($state->{at} > $time) {
90 2         3 $time = $state->{at};
91             }
92 4         5 my $diff = $time - $now;
93 4         4 for my $data (map {$_->{data}} values %{ $self->{used} }) {
  4         10  
  4         10  
94 4         6 for (@{ $data }) {
  4         4  
95 12 50       19 if ($_ != Sub::Throttler::Rate::rr::EMPTY()) {
96 12         16 $_ -= $diff;
97             }
98             }
99             }
100 4         6 for (values %{ $self->{used} }) {
  4         9  
101 4         9 bless $_, 'Sub::Throttler::Rate::rr';
102             }
103 4         20 $self->{_t} = AE::timer 0, 0, $self->{_cb};
104 4         10 return $self;
105             }
106              
107             sub period {
108 1     1 1 2 my ($self, $period) = @_;
109 1 50       3 if (1 == @_) {
110 1         3 return $self->{period};
111             }
112 0 0       0 croak 'period must be a positive number' if $period <= 0;
113 0 0       0 croak 'period is too large' if $self->{period} >= -Sub::Throttler::Rate::rr::EMPTY();
114             # OPTIMIZATION call throttle_flush() only if amount of available
115             # resources increased (i.e. period was decreased)
116 0         0 my $resources_increases = $self->{period} > $period;
117 0         0 $self->{period} = $period;
118 0 0       0 if ($resources_increases) {
119 0 0       0 if ($self->{_t}) {
120 0         0 $self->{_t} = undef;
121 0         0 $self->_tick();
122             }
123             else {
124 0         0 throttle_flush();
125             }
126             }
127 0         0 return $self;
128             }
129              
130             sub release {
131 18     18 1 26 my ($self, $id) = @_;
132 18 50       64 croak sprintf '%s not acquired anything', $id if !$self->{acquired}{$id};
133 18         37 delete $self->{acquired}{$id};
134 18         36 return $self;
135             }
136              
137             sub release_unused {
138 13     13 1 22 my ($self, $id) = @_;
139 13 50       54 croak sprintf '%s not acquired anything', $id if !$self->{acquired}{$id};
140              
141 13         30 my $now = clock_gettime(CLOCK_MONOTONIC);
142 13         50 for my $key (grep {$self->{used}{$_}} keys %{ $self->{acquired}{$id} }) {
  13         55  
  13         58  
143 13         14 my ($time, $quantity) = @{ $self->{acquired}{$id}{$key} };
  13         37  
144 13         62 $self->{used}{$key}->del($time, $quantity);
145             # clean up (avoid memory leak in long run with unique keys)
146 13 100       63 if ($self->{used}{$key}->get($self->{limit}) + $self->{period} <= $now) {
147 11         63 delete $self->{used}{$key};
148             }
149             }
150 13         39 delete $self->{acquired}{$id};
151 13         52 throttle_flush();
152 13 100       21 if (!keys %{ $self->{used} }) {
  13         41  
153 11         60 $self->{_t} = undef;
154             }
155 13         115 return $self;
156             }
157              
158             sub save {
159 1     1 1 3 my ($self) = @_;
160 1         6 my ($time, $now) = (time, clock_gettime(CLOCK_MONOTONIC));
161 1         26 my $diff = $time - $now;
162 1         140 my $state = {
163             algo => __PACKAGE__,
164             version => version->declare($VERSION)->numify,
165             limit => $self->{limit},
166             period => $self->{period},
167             used => dclone($self->{used}),
168             at => $time,
169             };
170 1         5 for my $data (map {$_->{data}} values %{ $state->{used} }) {
  1         3  
  1         3  
171 1         1 for (@{ $data }) {
  1         2  
172 3 50       7 if ($_ != Sub::Throttler::Rate::rr::EMPTY()) {
173 3         5 $_ += $diff;
174             }
175             }
176             }
177 1         6 for (values %{ $state->{used} }) {
  1         2  
178 1         2 $_ = {%{ $_ }}; # unbless
  1         9  
179             }
180 1         2 return $state;
181             }
182              
183             sub try_acquire {
184 62     62 1 208 my ($self, $id, $key, $quantity) = @_;
185 62 100 66     227 croak sprintf '%s already acquired %s', $id, $key
186             if $self->{acquired}{$id} && exists $self->{acquired}{$id}{$key};
187 61 50       132 croak 'quantity must be positive' if $quantity <= 0;
188              
189 61         208 my $now = clock_gettime(CLOCK_MONOTONIC);
190              
191 61   66     493 $self->{used}{$key} ||= Sub::Throttler::Rate::rr->new($self->{limit});
192 61 100       189 if (!$self->{used}{$key}->add($self->{period}, $now, $quantity)) {
193 26         69 return;
194             }
195              
196 35         159 $self->{acquired}{$id}{$key} = [$now, $quantity];
197 35 100       112 if (!$self->{_t}) {
198 16         155 $self->{_t} = AE::timer $self->{period}, 0, $self->{_cb};
199             }
200 35         4042 return 1;
201             }
202              
203             sub _tick {
204 0     0   0 my $self = shift;
205 0         0 my $now = clock_gettime(CLOCK_MONOTONIC);
206 0         0 my $when = 0;
207 0         0 for my $key (keys %{ $self->{used} }) {
  0         0  
208 0         0 my $after = $self->{used}{$key}->after($now - $self->{period});
209 0 0 0     0 if (!$after) {
    0          
210 0         0 delete $self->{used}{$key};
211             }
212             elsif (!$when || $when > $after) {
213 0         0 $when = $after;
214             }
215             }
216 0 0       0 $self->{_t} = !$when ? undef : AE::timer $when + $self->{period} - $now, 0, $self->{_cb};
217 0         0 throttle_flush();
218 0         0 return;
219             }
220              
221              
222             package Sub::Throttler::Rate::rr; ## no critic (ProhibitMultiplePackages)
223 3     3   55 use 5.010001;
  3         6  
  3         85  
224 3     3   9 use warnings;
  3         6  
  3         59  
225 3     3   13 use strict;
  3         3  
  3         79  
226 3     3   11 use utf8;
  3         2  
  3         15  
227 3     3   43 use Carp;
  3         33  
  3         171  
228              
229 3     3   12 use constant EMPTY => -1_000_000_000;
  3         4  
  3         2480  
230              
231              
232             sub new {
233 16     16   31 my ($class, $len) = @_;
234 16   33     188 my $self = bless {
235             next => 0,
236             data => [ (EMPTY) x $len ],
237             }, ref $class || $class;
238 16         62 return $self;
239             }
240              
241             sub add {
242 61     61   72 my ($self, $period, $time, $quantity) = @_;
243 61         56 my $len = @{ $self->{data} };
  61         93  
244             # try_acquire() guarantee $quantity > 0, so we continue only if $len > 0
245             # (thus avoid division by zero on % $len) and there is a chance to add
246             # $quantity elements
247 61 100       106 if ($quantity > $len) {
248 1         2 return;
249             }
250 60         127 my $required = ($self->{next} + $quantity - 1) % $len;
251             # {data} is sorted, last added element ($self->{next}-1) is guaranteed
252             # to be largest of all elements, so all elements between (inclusive)
253             # $self->{next} and $required are guaranteed to be either EMPTY
254             # or <= $self->{next}-1 element, and $required element is largest of them
255 60 100       171 if ($self->{data}[$required] > $time - $period) {
256 25         51 return;
257             }
258 35         78 for (1 .. $quantity) {
259 38         58 $self->{data}[ $self->{next} ] = $time;
260 38         88 ($self->{next} += 1) %= $len;
261             }
262 35         97 return 1;
263             }
264              
265             # Return time of acquiring first resource after $time or nothing.
266             sub after {
267 0     0   0 my ($self, $time) = @_;
268             # _tick() guarantee $time > EMPTY
269 0         0 my $len = @{ $self->{data} };
  0         0  
270 0         0 for (1 .. $len) {
271 0         0 $_ = ($self->{next} + $_ - 1) % $len;
272 0 0       0 return $self->{data}[ $_ ] if $self->{data}[ $_ ] > $time;
273             }
274 0         0 return;
275             }
276              
277             sub del {
278 13     13   21 my ($self, $time, $quantity) = @_;
279             # try_acquire() guarantee $quantity > 0
280             # even if $time is already outdated, these elements should be removed
281             # anyway in case {period} will be increased later
282 13         11 my $len = @{ $self->{data} };
  13         30  
283 13 50       29 if (!$len) {
284 0         0 return;
285             }
286 13 50       39 if ($quantity > $len) {
287 0         0 $quantity = $len;
288             }
289             # OPTIMIZATION not in {data}
290 13 50       75 if ($self->{data}[ $self->{next} ] > $time) {
    100          
    50          
291 0         0 return;
292             }
293             # OPTIMIZATION oldest
294             elsif ($self->{data}[ $self->{next} ] == $time) {
295 11         30 for (map { ($self->{next} + $_ - 1) % $len } 1 .. $quantity) {
  11         43  
296             # part of $quantity may be not in {data} (if {limit} was decreased)
297 11 50       91 return if $self->{data}[ $_ ] != $time;
298 11         43 $self->{data}[ $_ ] = EMPTY;
299             }
300             }
301             # OPTIMIZATION newest
302             elsif ($self->{data}[ $self->{next} - 1 ] == $time) {
303 2         7 for (map { $self->{next} - $_ } 1 .. $quantity) {
  2         6  
304 2 50       15 croak 'assert: newest: no time' if $self->{data}[ $_ ] != $time;
305 2         5 $self->{data}[ $_ ] = EMPTY;
306             }
307 2         5 $self->{next} = ($self->{next} - $quantity) % $len;
308             }
309             # middle (actually it support any case, not just middle)
310             else {
311 0   0     0 my $i = _binsearch($time, $self->{data}, $self->{next}, $len - 1)
312             // _binsearch($time, $self->{data}, 0, $self->{next} - 1);
313 0 0       0 croak 'assert: middle: not found' if !defined $i;
314 0         0 for (map { ($i + $_ - 1) % $len } 1 .. $quantity) {
  0         0  
315 0 0       0 croak 'assert: middle: no time' if $self->{data}[ $_ ] != $time;
316 0         0 $self->{data}[ $_ ] = EMPTY;
317             }
318             # OPTIMIZATION move minimum amount of elements
319 0         0 my $count_rew = ($self->{next} - $i) % $len;
320 0         0 my $count_fwd = ($i + $quantity - $self->{next}) % $len;
321             # move oldest elements forward
322 0 0       0 if ($count_fwd <= $count_rew) {
323 0         0 @{ $self->{data} }[ map { ($self->{next}+$_-1) % $len } 1 .. $count_fwd ] =
  0         0  
  0         0  
324 0         0 @{ $self->{data} }[ map { ($self->{next}+$_-1) % $len } $count_fwd-$quantity+1 .. $count_fwd, 1 .. $count_fwd-$quantity ];
  0         0  
325             }
326             # move newest elements backward
327             else {
328 0         0 @{ $self->{data} }[ map { ($i+$_-1) % $len } 1 .. $count_rew ] =
  0         0  
  0         0  
329 0         0 @{ $self->{data} }[ map { ($i+$_-1) % $len } 1+$quantity .. $count_rew, 1 .. $quantity];
  0         0  
330 0         0 $self->{next} = ($self->{next} - $quantity) % $len;
331             }
332             }
333 13         24 return;
334             }
335              
336             sub get {
337 24     24   35 my ($self, $id) = @_;
338             # $id is number of required element, counting from oldest one ($id = 1)
339 24         31 my $len = @{ $self->{data} };
  24         37  
340             # acquire() guarantee 0 < $id <= $len
341 24         43 my $i = ($self->{next} + $id - 1) % $len;
342 24         91 return $self->{data}[$i];
343             }
344              
345             sub resize {
346 0     0     my ($self, $newlen) = @_;
347             # limit() guarantee $newlen >= 0
348 0           my $len = @{ $self->{data} };
  0            
349 0           my $d = $self->{data};
350 0           $self->{data} = [ @{$d}[ $self->{next} .. $#{$d} ], @{$d}[ 0 .. $self->{next} - 1 ] ];
  0            
  0            
  0            
351 0 0         if ($newlen < $len) {
352 0           $self->{next} = 0;
353 0           splice @{ $self->{data} }, 0, $len - $newlen;
  0            
354             } else {
355 0           $self->{next} = $len % $newlen;
356 0           push @{ $self->{data} }, (EMPTY) x ($newlen - $len);
  0            
357             }
358 0           return $self;
359             }
360              
361             # From List::BinarySearch::PP version 0.23.
362             # Modified to support slices and work with array of numbers, without callback.
363             sub _binsearch {
364 0     0     my ( $target, $aref, $min, $max ) = @_;
365 0   0       $min //= 0;
366 0   0       $max //= $#{$aref};
  0            
367 0 0 0       croak 'bad slice' if $min < 0 || $#{$aref} < $max || $min > $max;
  0   0        
368 0           while ( $max > $min ) {
369 0           my $mid = int( ( $min + $max ) / 2 );
370 0 0         if ( $target > $aref->[$mid] ) {
371 0           $min = $mid + 1;
372             } else {
373 0           $max = $mid;
374             }
375             }
376 0 0         return $min if $target == $aref->[$min];
377 0           return;
378             }
379              
380              
381             1; # Magic true value required at end of module
382             __END__