File Coverage

blib/lib/Database/Async/Pool.pm
Criterion Covered Total %
statement 66 74 89.1
branch 7 14 50.0
condition 3 8 37.5
subroutine 19 21 90.4
pod 3 13 23.0
total 98 130 75.3


line stmt bran cond sub pod time code
1             package Database::Async::Pool;
2              
3 2     2   14 use strict;
  2         6  
  2         65  
4 2     2   12 use warnings;
  2         3  
  2         103  
5              
6             our $VERSION = '0.015'; # VERSION
7              
8             =head1 NAME
9              
10             Database::Async::Pool - connection manager for L
11              
12             =head1 DESCRIPTION
13              
14             =cut
15              
16 2     2   919 use Database::Async::Backoff;
  2         6  
  2         62  
17              
18 2     2   13 use Future;
  2         5  
  2         43  
19 2     2   10 use Future::AsyncAwait;
  2         4  
  2         17  
20 2     2   99 use Scalar::Util qw(blessed);
  2         4  
  2         136  
21 2     2   13 use Log::Any qw($log);
  2         4  
  2         20  
22              
23             sub new {
24 4     4 0 1363 my ($class, %args) = @_;
25 4         12 my $backoff = delete $args{backoff};
26 4 50       22 unless(blessed $backoff) {
27 4         11 my $type = 'exponential';
28 4 50 33     27 $type = $backoff if $backoff and not ref $backoff;
29 4 50 33     38 $backoff = Database::Async::Backoff->new(
30             type => $type,
31             initial => 0.010,
32             max => 30,
33             ($backoff && ref($backoff) ? %$backoff : ())
34             )
35             }
36             bless {
37 4         70 pending_count => 0,
38             count => 0,
39             min => 0,
40             max => 1,
41             ordering => 'serial',
42             backoff => $backoff,
43             waiting => [],
44             ready => [],
45             %args
46             }, $class
47             }
48              
49 4     4 0 23 sub min { shift->{min} }
50 6     6 0 28 sub max { shift->{max} }
51 7     7 0 51 sub count { shift->{count} }
52 2     2 0 6 sub pending_count { shift->{pending_count} }
53 3     3 0 18 sub backoff { shift->{backoff} }
54              
55             sub register_engine {
56 1     1 0 4 my ($self, $engine) = @_;
57 1         2 --$self->{pending_count};
58 1         3 ++$self->{count};
59 1         2 $self
60             }
61              
62             sub unregister_engine {
63 0     0 0 0 my ($self, $engine) = @_;
64 0         0 --$self->{count};
65 0         0 $self
66             }
67              
68             =head2 queue_ready_engine
69              
70             Called when there's a spare engine we can put back in the pool.
71              
72             =cut
73              
74             sub queue_ready_engine {
75 1     1 1 4 my ($self, $engine) = @_;
76 1         3 $log->tracef('Engine is now ready, with %d in the queue', 0 + @{$self->{waiting}});
  1         6  
77 1 50       5 return $self->notify_engine($engine) if @{$self->{waiting}};
  1         6  
78 1         2 push @{$self->{ready}}, $engine;
  1         3  
79 1         3 $self
80             }
81              
82             =head2 notify_engine
83              
84             We call this internally to hand an engine over to the next
85             waiting request.
86              
87             =cut
88              
89             sub notify_engine {
90 0     0 1 0 my ($self, $engine) = @_;
91             die 'unable to notify, we have no pending requests'
92 0 0       0 unless my $f = shift @{$self->{waiting}};
  0         0  
93 0         0 $f->done($engine);
94 0         0 return $self;
95             }
96              
97             =head2 next_engine
98              
99             Resolves to an engine. May need to wait if there are none available.
100              
101             =cut
102              
103 2     2 1 1428 async sub next_engine {
104 2         5 my ($self) = @_;
105 2         3 $log->tracef('Have %d ready engines to use', 0 + @{$self->{ready}});
  2         10  
106 2 100       8 if(my $engine = shift @{$self->{ready}}) {
  2         13  
107 1         15 return $engine;
108             }
109 1         7 push @{$self->{waiting}}, my $f = $self->new_future;
  1         4  
110 1         21 my $total = $self->count + $self->pending_count;
111 1         3 $log->tracef('Might request, current count is %d/%d (%d pending, %d active)', $total, $self->max, $self->pending_count, $self->count);
112 1 50       5 await $self->request_engine unless $total >= $self->max;
113 1         92 return await $f;
114             }
115              
116             sub new_future {
117 1     1 0 4 my ($self, $label) = @_;
118             (
119             $self->{new_future} //= sub {
120 1     1   4 Future->new->set_label($_[1])
121             }
122 1   50     12 )->($label)
123             }
124              
125 1     1 0 3 async sub request_engine {
126 1         3 my ($self) = @_;
127 1         4 $log->tracef('Pool requesting new engine');
128 1         4 ++$self->{pending_count};
129 1         3 await $self->{request_engine}->()
130             }
131              
132             1;
133              
134             =head1 AUTHOR
135              
136             Tom Molesworth C<< >>
137              
138             =head1 LICENSE
139              
140             Copyright Tom Molesworth 2011-2021. Licensed under the same terms as Perl itself.
141