File Coverage

blib/lib/Database/Async/Pool.pm
Criterion Covered Total %
statement 85 94 90.4
branch 9 16 56.2
condition 5 8 62.5
subroutine 24 27 88.8
pod 4 14 28.5
total 127 159 79.8


line stmt bran cond sub pod time code
1             package Database::Async::Pool;
2              
3 2     2   15 use strict;
  2         5  
  2         64  
4 2     2   10 use warnings;
  2         4  
  2         58  
5              
6 2     2   10 use parent qw(IO::Async::Notifier);
  2         3  
  2         11  
7              
8             our $VERSION = '0.017'; # VERSION
9              
10             =head1 NAME
11              
12             Database::Async::Pool - connection manager for L
13              
14             =head1 DESCRIPTION
15              
16             =cut
17              
18 2     2   986 use Database::Async::Backoff::Exponential;
  2         5  
  2         73  
19 2     2   843 use Database::Async::Backoff::None;
  2         5  
  2         71  
20              
21 2     2   17 use Future;
  2         4  
  2         39  
22 2     2   11 use Future::AsyncAwait;
  2         3  
  2         12  
23 2     2   89 use Syntax::Keyword::Try;
  2         4  
  2         17  
24 2     2   202 use Scalar::Util qw(blessed refaddr);
  2         4  
  2         113  
25 2     2   1073 use List::UtilsBy qw(extract_by);
  2         4269  
  2         157  
26 2     2   14 use Log::Any qw($log);
  2         4  
  2         29  
27              
28             sub new {
29 4     4 1 1272 my ($class, %args) = @_;
30 4         9 my $backoff = delete $args{backoff};
31 4 50       18 unless(blessed $backoff) {
32 4         9 my $type = 'exponential';
33 4 100 66     15 $type = $backoff if $backoff and not ref $backoff;
34 4 50 66     43 $backoff = Database::Async::Backoff->instantiate(
35             type => $type,
36             initial_delay => 0.010,
37             max_delay => 30,
38             ($backoff && ref($backoff) ? %$backoff : ())
39             )
40             }
41             bless {
42 4         64 pending_count => 0,
43             count => 0,
44             min => 0,
45             max => 1,
46             attempts => undef,
47             ordering => 'serial',
48             backoff => $backoff,
49             waiting => [],
50             ready => [],
51             %args
52             }, $class
53             }
54              
55 4     4 0 22 sub min { shift->{min} }
56 6     6 0 25 sub max { shift->{max} }
57 7     7 0 33 sub count { shift->{count} }
58 2     2 0 4 sub pending_count { shift->{pending_count} }
59 5     5 0 25 sub backoff { shift->{backoff} }
60              
61             sub register_engine {
62 1     1 0 3 my ($self, $engine) = @_;
63 1         3 --$self->{pending_count};
64 1         1 ++$self->{count};
65 1         2 $self
66             }
67              
68             sub unregister_engine {
69 0     0 0 0 my ($self, $engine) = @_;
70             try {
71             $log->tracef('Engine is removed from the pool, with %d in the queue', 0 + @{$self->{waiting}});
72             my $addr = refaddr($engine);
73             # This engine may have been actively processing a request, and not in the pool:
74             # that's fine, we only remove if we had it.
75 0     0   0 my $count = () = extract_by { refaddr($_) == $addr } @{$self->{ready}};
76             $log->tracef('Removed %d engine instances from the ready pool', $count);
77             # Any engine that wasn't in the ready queue (`count`) was out on assignment
78             # and thus included in `pending_count`
79             --$self->{$count ? 'count' : 'pending_count'};
80             $log->infof('After cleanup we have %d count, %d pending, %d waiting', $self->{count}, $self->{pending_count}, 0 + @{$self->{waiting}});
81             $self->process_pending->retain if @{$self->{waiting}};
82 0         0 } catch ($e) {
83             $log->errorf('Failed %s', $e);
84             }
85 0         0 $self
86             }
87              
88             =head2 queue_ready_engine
89              
90             Called when there's a spare engine we can put back in the pool.
91              
92             =cut
93              
94             sub queue_ready_engine {
95 1     1 1 3 my ($self, $engine) = @_;
96 1         2 $log->tracef('Engine is now ready, with %d in the queue', 0 + @{$self->{waiting}});
  1         8  
97 1 50       5 return $self->notify_engine($engine) if @{$self->{waiting}};
  1         5  
98 1         2 push @{$self->{ready}}, $engine;
  1         3  
99 1         3 $self
100             }
101              
102             =head2 notify_engine
103              
104             We call this internally to hand an engine over to the next
105             waiting request.
106              
107             =cut
108              
109             sub notify_engine {
110 0     0 1 0 my ($self, $engine) = @_;
111             die 'unable to notify, we have no pending requests'
112 0 0       0 unless my $f = shift @{$self->{waiting}};
  0         0  
113 0         0 $f->done($engine);
114 0         0 return $self;
115             }
116              
117             =head2 next_engine
118              
119             Resolves to an engine. May need to wait if there are none available.
120              
121             =cut
122              
123 2     2 1 1371 async sub next_engine {
124 2         6 my ($self) = @_;
125 2         35 $log->tracef('Have %d ready engines to use', 0 + @{$self->{ready}});
  2         11  
126 2 100       9 if(my $engine = shift @{$self->{ready}}) {
  2         15  
127 1         16 return $engine;
128             }
129 1         3 push @{$self->{waiting}}, my $f = $self->new_future;
  1         5  
130 1         22 await $self->process_pending;
131 1         29 return await $f;
132             }
133              
134 1     1 0 3 async sub process_pending {
135 1         3 my ($self) = @_;
136 1         3 my $total = $self->count + $self->pending_count;
137 1         4 $log->tracef('Might request, current count is %d/%d (%d pending, %d active)', $total, $self->max, $self->pending_count, $self->count);
138 1 50       5 await $self->request_engine unless $total >= $self->max;
139 1         48 return;
140             }
141              
142             sub new_future {
143 1     1 0 3 my ($self, $label) = @_;
144             (
145             $self->{new_future} //= sub {
146 1     1   5 Future->new->set_label($_[1])
147             }
148 1   50     17 )->($label)
149             }
150              
151 1     1 0 2 async sub request_engine {
152 1         3 my ($self) = @_;
153 1         3 $log->tracef('Pool requesting new engine');
154 1         4 ++$self->{pending_count};
155 1         3 my $delay = $self->backoff->next;
156 1 50       4 await $self->loop->delay_future(
157             after => $delay
158             ) if $delay;
159 1         3 await $self->{request_engine}->();
160 1         54 $self->backoff->reset;
161             }
162              
163             1;
164              
165             =head1 AUTHOR
166              
167             Tom Molesworth C<< >>
168              
169             =head1 LICENSE
170              
171             Copyright Tom Molesworth 2011-2021. Licensed under the same terms as Perl itself.
172