File Coverage

blib/lib/Database/Async/Pool.pm
Criterion Covered Total %
statement 76 85 89.4
branch 7 14 50.0
condition 3 8 37.5
subroutine 22 25 88.0
pod 3 14 21.4
total 111 146 76.0


line stmt bran cond sub pod time code
1             package Database::Async::Pool;
2              
3 2     2   14 use strict;
  2         4  
  2         64  
4 2     2   10 use warnings;
  2         2  
  2         86  
5              
6             our $VERSION = '0.016'; # VERSION
7              
8             =head1 NAME
9              
10             Database::Async::Pool - connection manager for L
11              
12             =head1 DESCRIPTION
13              
14             =cut
15              
16 2     2   849 use Database::Async::Backoff;
  2         5  
  2         61  
17              
18 2     2   14 use Future;
  2         3  
  2         45  
19 2     2   10 use Future::AsyncAwait;
  2         3  
  2         17  
20 2     2   84 use Syntax::Keyword::Try;
  2         3  
  2         19  
21 2     2   156 use Scalar::Util qw(blessed refaddr);
  2         5  
  2         107  
22 2     2   1053 use List::UtilsBy qw(extract_by);
  2         4251  
  2         144  
23 2     2   14 use Log::Any qw($log);
  2         6  
  2         17  
24              
25             sub new {
26 4     4 0 1262 my ($class, %args) = @_;
27 4         11 my $backoff = delete $args{backoff};
28 4 50       19 unless(blessed $backoff) {
29 4         10 my $type = 'exponential';
30 4 50 33     24 $type = $backoff if $backoff and not ref $backoff;
31 4 50 33     32 $backoff = Database::Async::Backoff->new(
32             type => $type,
33             initial => 0.010,
34             max => 30,
35             ($backoff && ref($backoff) ? %$backoff : ())
36             )
37             }
38             bless {
39 4         56 pending_count => 0,
40             count => 0,
41             min => 0,
42             max => 1,
43             ordering => 'serial',
44             backoff => $backoff,
45             waiting => [],
46             ready => [],
47             %args
48             }, $class
49             }
50              
51 4     4 0 20 sub min { shift->{min} }
52 6     6 0 27 sub max { shift->{max} }
53 7     7 0 33 sub count { shift->{count} }
54 2     2 0 5 sub pending_count { shift->{pending_count} }
55 3     3 0 13 sub backoff { shift->{backoff} }
56              
57             sub register_engine {
58 1     1 0 4 my ($self, $engine) = @_;
59 1         3 --$self->{pending_count};
60 1         3 ++$self->{count};
61 1         2 $self
62             }
63              
64             sub unregister_engine {
65 0     0 0 0 my ($self, $engine) = @_;
66             try {
67             $log->tracef('Engine is removed from the pool, with %d in the queue', 0 + @{$self->{waiting}});
68             my $addr = refaddr($engine);
69             # This engine may have been actively processing a request, and not in the pool:
70             # that's fine, we only remove if we had it.
71 0     0   0 my $count = () = extract_by { refaddr($_) == $addr } @{$self->{ready}};
72             $log->tracef('Removed %d engine instances from the ready pool', $count);
73             # Any engine that wasn't in the ready queue (`count`) was out on assignment
74             # and thus included in `pending_count`
75             --$self->{$count ? 'count' : 'pending_count'};
76             $log->infof('After cleanup we have %d count, %d pending, %d waiting', $self->{count}, $self->{pending_count}, 0 + @{$self->{waiting}});
77             $self->process_pending->retain if @{$self->{waiting}};
78 0         0 } catch ($e) {
79             $log->errorf('Failed %s', $e);
80             }
81 0         0 $self
82             }
83              
84             =head2 queue_ready_engine
85              
86             Called when there's a spare engine we can put back in the pool.
87              
88             =cut
89              
90             sub queue_ready_engine {
91 1     1 1 4 my ($self, $engine) = @_;
92 1         3 $log->tracef('Engine is now ready, with %d in the queue', 0 + @{$self->{waiting}});
  1         8  
93 1 50       4 return $self->notify_engine($engine) if @{$self->{waiting}};
  1         5  
94 1         2 push @{$self->{ready}}, $engine;
  1         5  
95 1         2 $self
96             }
97              
98             =head2 notify_engine
99              
100             We call this internally to hand an engine over to the next
101             waiting request.
102              
103             =cut
104              
105             sub notify_engine {
106 0     0 1 0 my ($self, $engine) = @_;
107             die 'unable to notify, we have no pending requests'
108 0 0       0 unless my $f = shift @{$self->{waiting}};
  0         0  
109 0         0 $f->done($engine);
110 0         0 return $self;
111             }
112              
113             =head2 next_engine
114              
115             Resolves to an engine. May need to wait if there are none available.
116              
117             =cut
118              
119 2     2 1 1424 async sub next_engine {
120 2         5 my ($self) = @_;
121 2         5 $log->tracef('Have %d ready engines to use', 0 + @{$self->{ready}});
  2         9  
122 2 100       7 if(my $engine = shift @{$self->{ready}}) {
  2         14  
123 1         17 return $engine;
124             }
125 1         3 push @{$self->{waiting}}, my $f = $self->new_future;
  1         5  
126 1         23 await $self->process_pending;
127 1         30 return await $f;
128             }
129              
130 1     1 0 2 async sub process_pending {
131 1         3 my ($self) = @_;
132 1         3 my $total = $self->count + $self->pending_count;
133 1         3 $log->tracef('Might request, current count is %d/%d (%d pending, %d active)', $total, $self->max, $self->pending_count, $self->count);
134 1 50       4 await $self->request_engine unless $total >= $self->max;
135 1         86 return;
136             }
137              
138             sub new_future {
139 1     1 0 3 my ($self, $label) = @_;
140             (
141             $self->{new_future} //= sub {
142 1     1   4 Future->new->set_label($_[1])
143             }
144 1   50     13 )->($label)
145             }
146              
147 1     1 0 2 async sub request_engine {
148 1         2 my ($self) = @_;
149 1         4 $log->tracef('Pool requesting new engine');
150 1         4 ++$self->{pending_count};
151 1         3 await $self->{request_engine}->()
152             }
153              
154             1;
155              
156             =head1 AUTHOR
157              
158             Tom Molesworth C<< >>
159              
160             =head1 LICENSE
161              
162             Copyright Tom Molesworth 2011-2021. Licensed under the same terms as Perl itself.
163