File Coverage

blib/lib/DBIx/Poggy.pm
Criterion Covered Total %
statement 12 14 85.7
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 17 19 89.4


line stmt bran cond sub pod time code
1 1     1   429 use strict;
  1         2  
  1         25  
2 1     1   4 use warnings;
  1         2  
  1         24  
3 1     1   9 use v5.14;
  1         8  
4              
5             package DBIx::Poggy;
6             our $VERSION = '0.08';
7              
8 1     1   9 use Scalar::Util qw(weaken refaddr);
  1         2  
  1         108  
9              
10             =head1 NAME
11              
12             DBIx::Poggy - async Pg with AnyEvent and Promises
13              
14             =head1 SYNOPSIS
15              
16             use strict;
17             use warnings;
18              
19             use DBIx::Poggy;
20             my $pool = DBIx::Poggy->new( pool_size => 5 );
21             $pool->connect('dbi:Pg:db=test', 'root', 'password');
22              
23             use AnyEvent;
24             my $cv = AnyEvent->condvar;
25              
26             my $res;
27             $pool->take->selectrow_arrayref(
28             'SELECT * FROM users WHERE name = ?', {}, 'ruz'
29             )
30             ->then(sub {
31             my $user = $res->{user} = shift;
32              
33             return $pool->take->selectall_arrayref(
34             'SELECT * FROM friends WHERE user_id = ?', undef, $user->{id}
35             );
36             })
37             ->then(sub {
38             my $friends = $res->{friends} = shift;
39             ...
40             })
41             ->catch(sub {
42             my $error = shift;
43             die $error;
44             })
45             ->finally(sub {
46             $cv->send( $res );
47             });
48              
49             $cv->recv;
50              
51             =head1 DESCRIPTION
52              
53             "Async" postgres as much as L allows with L instead of callbacks.
54              
55             You get DBI interface you used to that returns promises, connections pool, queries
56             queuing and support of transactions.
57              
58             =head2 Why pool?
59              
60             DBD::Pg is not async, it's non blocking. Every connection can execute only one query
61             at a moment, so to execute several queries in parallel you need several connections.
62             What you get is you can do something in Perl side while postgres crunches data for
63             you.
64              
65             =head2 Queue
66              
67             Usually if you attempt to run two queries on the same connection then DBI throws an
68             error about active query. Poggy takes care of that by queuing up queries you run on
69             one connection. Handy for transactions and pool doesn't grow too much.
70              
71             =head2 What is async here then?
72              
73             Only a queries on multiple connections, so if you need to execute many parallel
74             queries then you need many connections. pg_bouncer and haproxy are your friends.
75              
76             =head2 Pool management
77              
78             In auto mode (default) you just "loose" reference to database handle and it gets
79             released back into the pool after all queries are done:
80              
81             {
82             my $cv = AnyEvent->condvar;
83             $pool->take->do(...)->finally($cv);
84             $cv->recv;
85             }
86             # released
87              
88             Or:
89             {
90             my $cv = AnyEvent->condvar;
91             my $dbh = $pool->take;
92             $dbh->do(...)
93             ->then(sub { $dbh->do(...) })
94             ->then(sub { ... })
95             ->finally($cv);
96             $cv->recv;
97             }
98             # $dbh goes out of scope and all queries are done (cuz of condvar)
99             # released
100              
101             =cut
102              
103 1     1   324 use DBIx::Poggy::DBI;
  0            
  0            
104             use DBIx::Poggy::Error;
105              
106             =head1 METHODS
107              
108             =head2 new
109              
110             Named arguments:
111              
112             =over 4
113              
114             =item pool_size
115              
116             number of connections to create, creates one more in case all are busy
117              
118             =back
119              
120             Returns a new pool object.
121              
122             =cut
123              
124             sub new {
125             my $proto = shift;
126             my $self = bless { @_ }, ref($proto) || $proto;
127             return $self->init;
128             }
129              
130             sub init {
131             my $self = shift;
132             $self->{pool_size} ||= 10;
133             $self->{ping_on_take} ||= 30;
134             return $self;
135             }
136              
137             =head2 connect
138              
139             Takes the same arguments as L, opens "pool_size" connections.
140             Saves connection settings for reuse when pool is exhausted.
141              
142             =cut
143              
144             sub connect {
145             my $self = shift;
146             my ($dsn, $user, $password, $opts) = @_;
147              
148             $opts ||= {};
149             $opts->{RaiseError} //= 1;
150              
151             $self->{free} ||= [];
152              
153             $self->{connection_settings} = [ $dsn, $user, $password, $opts ];
154              
155             $self->_connect for 1 .. $self->{pool_size};
156             return $self;
157             }
158              
159             sub _connect {
160             my $self = shift;
161              
162             my $dbh = DBIx::Poggy::DBI->connect(
163             @{ $self->{connection_settings} }
164             ) or die DBIx::Poggy::Error->new( 'DBIx::Poggy::DBI' );
165             push @{$self->{free}}, $dbh;
166             $self->{last_used}{ refaddr $dbh } = time;
167              
168             return;
169             }
170              
171             =head2 take
172              
173             Gives one connection from the pool. Takes arguments:
174              
175             =over 4
176              
177             =item auto
178              
179             Connection will be released to the pool once C goes out of
180             scope (gets "DESTROYED"). True by default.
181              
182             =back
183              
184             Returns L handle. When "auto" is turned off
185             then in list context returns also guard object that will L
186             handle to the pool on destruction.
187              
188             =cut
189              
190             sub take {
191             my $self = shift;
192             my (%args) = (auto => 1, @_);
193             unless ( $self->{free} ) {
194             die DBIx::Poggy::Error->new(
195             err => 666,
196             errstr => 'Attempt to take a connection from not initialized pool',
197             );
198             }
199             my $dbh;
200             while (1) {
201             unless ( @{ $self->{free} } ) {
202             warn "DB pool exhausted, creating a new connection";
203             $self->_connect;
204             $dbh = shift @{ $self->{free} };
205             delete $self->{last_used}{ refaddr $dbh };
206             last;
207             }
208              
209             $dbh = shift @{ $self->{free} };
210             my $used = delete $self->{last_used}{ refaddr $dbh };
211             if ( (time - $used) > $self->{ping_on_take} ) {
212             unless ( $dbh->ping ) {
213             warn "connection is not alive, dropping";
214             next;
215             }
216             }
217             last;
218             }
219              
220             if ( $args{auto} ) {
221             $dbh->{private_poggy_state}{release_to} = $self;
222             weaken $dbh->{private_poggy_state}{release_to};
223             return $dbh;
224             }
225             return $dbh unless wantarray;
226             return ( $dbh, guard { $self->release($dbh) } );
227             }
228              
229             =head2 release
230              
231             Takes a handle as argument and puts it back into the pool. At the moment,
232             no protection against double putting or active queries on the handle.
233              
234             =cut
235              
236             sub release {
237             my $self = shift;
238             my $dbh = shift;
239             delete $dbh->{private_poggy_state}{release_to};
240              
241             if ( $dbh->err && !$dbh->ping ) {
242             warn "handle is in error state and not ping'able, not releasing to the pool";
243             return $self;
244             }
245              
246             push @{ $self->{free} }, $dbh;
247             $self->{last_used}{ refaddr $dbh } = time;
248             return $self;
249             }
250              
251             =head2 AUTHOR
252              
253             Ruslan U. Zakirov ERuslan.Zakirov@gmail.comE
254              
255             =head2 LICENSE
256              
257             Under the same terms as perl itself.
258              
259             =cut
260              
261             1;