File Coverage

blib/lib/WebService/Async/CustomerIO/RateLimiter.pm
Criterion Covered Total %
statement 50 50 100.0
branch 8 10 80.0
condition 3 5 60.0
subroutine 13 13 100.0
pod 3 3 100.0
total 77 81 95.0


line stmt bran cond sub pod time code
1             package WebService::Async::CustomerIO::RateLimiter;
2              
3 4     4   121161 use strict;
  4         13  
  4         118  
4 4     4   20 use warnings;
  4         19  
  4         1397  
5              
6 4     4   23 use Carp qw();
  4         1135  
  4         92  
7 4     4   615 use Future;
  4         12119  
  4         95  
8 4     4   1251 use mro;
  4         12  
  4         31  
9              
10 4     4   520 use parent qw(IO::Async::Notifier);
  4         357  
  4         24  
11              
12             our $VERSION = '0.001'; ## VERSION
13              
14             =head1 NAME
15             WebService::Async::CustomerIO::RateLimitter - This class provide possibility to limit amount
16             of request in time interval
17              
18             =head1 SYNOPSIS
19              
20              
21             =head1 DESCRIPTION
22              
23             =cut
24              
25             sub _init {
26 24     24   33059 my ($self, $args) = @_;
27 24         56 for my $k (qw(limit interval)) {
28 45 100       158 die "Missing required argument: $k" unless exists $args->{$k};
29 43 100       136 die "Invalid value for $k: $args->{$k}" unless int($args->{$k}) > 0;
30 39 50       144 $self->{$k} = delete $args->{$k} if exists $args->{$k};
31             }
32              
33 18         41 $self->{queue} = [];
34 18         35 $self->{counter} = 0;
35              
36 18         91 return $self->next::method($args);
37             }
38              
39             =head2 interval
40             =cut
41              
42 2     2 1 32 sub interval { return shift->{interval} }
43              
44             =head2 limit
45             =cut
46              
47 32     32 1 108 sub limit { return shift->{limit} }
48              
49             =head2 acquire
50              
51             Method checks availability for free slot.
52             It returns future, when slot will be available, then future will be resolved.
53              
54             =cut
55              
56             sub acquire {
57 4     4 1 1916 my ($self) = @_;
58              
59 4         24 $self->_start_timer;
60 4 100       163 return Future->done if ++$self->{counter} <= $self->limit;
61              
62 2         8 my $current = $self->_current_queue;
63 2         13 $current->{counter}++;
64 2         9 return $current->{future};
65             }
66              
67             sub _current_queue {
68 14     14   1149 my ($self) = @_;
69              
70             # +1 for getting correct position for edge cases like: limit 2, counter 4, should be 0
71 14         30 my $pos = int(($self->{counter} - ($self->limit + 1)) / $self->limit);
72              
73 14   50     75 $self->{queue}[$pos] //= {
74             future => $self->loop->new_future,
75             counter => 0
76             };
77              
78 14         1750 return $self->{queue}[$pos];
79             }
80              
81             sub _start_timer {
82 3     3   10 my ($self) = @_;
83              
84             $self->{timer} //= $self->loop->delay_future(
85             after => $self->interval,
86             )->on_ready(
87             sub {
88 1     1   1001291 $self->{counter} = 0;
89 1         4 delete $self->{timer};
90              
91 1 50       5 return unless @{$self->{queue}};
  1         16  
92              
93 1         13 $self->_start_timer;
94              
95 1         3 my $current = shift @{$self->{queue}};
  1         6  
96 1         6 $self->{counter} = $current->{counter};
97 1         14 $current->{future}->done;
98 3   66     27 });
99              
100 3         6319 return $self->{timer};
101             }
102              
103             1;