File Coverage

blib/lib/WebService/Async/CustomerIO/RateLimiter.pm
Criterion Covered Total %
statement 50 50 100.0
branch 8 10 80.0
condition 3 5 60.0
subroutine 13 13 100.0
pod 3 3 100.0
total 77 81 95.0


line stmt bran cond sub pod time code
1             package WebService::Async::CustomerIO::RateLimiter;
2              
3 4     4   145779 use strict;
  4         15  
  4         114  
4 4     4   22 use warnings;
  4         16  
  4         109  
5              
6 4     4   33 use Carp qw();
  4         16  
  4         118  
7 4     4   622 use Future;
  4         12175  
  4         137  
8 4     4   25 use mro;
  4         8  
  4         51  
9              
10 4     4   507 use parent qw(IO::Async::Notifier);
  4         350  
  4         27  
11              
12             our $VERSION = '0.002'; ## VERSION
13              
14             =head1 NAME
15             WebService::Async::CustomerIO::RateLimitter - This class provide possibility to limit amount
16             of request in time interval
17              
18             =head1 SYNOPSIS
19              
20              
21             =head1 DESCRIPTION
22              
23             =cut
24              
25             sub _init {
26 25     25   38740 my ($self, $args) = @_;
27 25         58 for my $k (qw(limit interval)) {
28 47 100       159 die "Missing required argument: $k" unless exists $args->{$k};
29 45 100       147 die "Invalid value for $k: $args->{$k}" unless int($args->{$k}) > 0;
30 41 50       144 $self->{$k} = delete $args->{$k} if exists $args->{$k};
31             }
32              
33 19         84 $self->{queue} = [];
34 19         37 $self->{counter} = 0;
35              
36 19         73 return $self->next::method($args);
37             }
38              
39             =head2 interval
40             =cut
41              
42 2     2 1 31 sub interval { return shift->{interval} }
43              
44             =head2 limit
45             =cut
46              
47 32     32 1 110 sub limit { return shift->{limit} }
48              
49             =head2 acquire
50              
51             Method checks availability for free slot.
52             It returns future, when slot will be available, then future will be resolved.
53              
54             =cut
55              
56             sub acquire {
57 4     4 1 2177 my ($self) = @_;
58              
59 4         41 $self->_start_timer;
60 4 100       172 return Future->done if ++$self->{counter} <= $self->limit;
61              
62 2         9 my $current = $self->_current_queue;
63 2         22 $current->{counter}++;
64 2         12 return $current->{future};
65             }
66              
67             sub _current_queue {
68 14     14   1060 my ($self) = @_;
69              
70             # +1 for getting correct position for edge cases like: limit 2, counter 4, should be 0
71 14         33 my $pos = int(($self->{counter} - ($self->limit + 1)) / $self->limit);
72              
73 14   50     70 $self->{queue}[$pos] //= {
74             future => $self->loop->new_future,
75             counter => 0
76             };
77              
78 14         1720 return $self->{queue}[$pos];
79             }
80              
81             sub _start_timer {
82 3     3   12 my ($self) = @_;
83              
84             $self->{timer} //= $self->loop->delay_future(
85             after => $self->interval,
86             )->on_ready(
87             sub {
88 1     1   1001354 $self->{counter} = 0;
89 1         7 delete $self->{timer};
90              
91 1 50       6 return unless @{$self->{queue}};
  1         20  
92              
93 1         16 $self->_start_timer;
94              
95 1         5 my $current = shift @{$self->{queue}};
  1         7  
96 1         7 $self->{counter} = $current->{counter};
97 1         12 $current->{future}->done;
98 3   66     32 });
99              
100 3         6140 return $self->{timer};
101             }
102              
103             1;