File Coverage

blib/lib/DBIx/Poggy.pm
Criterion Covered Total %
statement 12 14 85.7
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 17 19 89.4


line stmt bran cond sub pod time code
1 1     1   408 use strict;
  1         2  
  1         25  
2 1     1   5 use warnings;
  1         2  
  1         24  
3 1     1   31 use v5.14;
  1         9  
4              
5             package DBIx::Poggy;
6             our $VERSION = '0.07';
7              
8 1     1   5 use Scalar::Util qw(weaken refaddr);
  1         2  
  1         83  
9              
10             =head1 NAME
11              
12             DBIx::Poggy - async Pg with AnyEvent and Promises
13              
14             =head1 SYNOPSIS
15              
16             use strict;
17             use warnings;
18              
19             use DBIx::Poggy;
20             my $pool = DBIx::Poggy->new( pool_size => 5 );
21             $pool->connect('dbi:Pg:db=test', 'root', 'password');
22              
23             use AnyEvent;
24             my $cv = AnyEvent->condvar;
25              
26             my $res;
27             $pool->take->selectrow_arrayref(
28             'SELECT * FROM users WHERE name = ?', {}, 'ruz'
29             )
30             ->then(sub {
31             my $user = $res->{user} = shift;
32              
33             return $pool->take->selectall_arrayref(
34             'SELECT * FROM friends WHERE user_id = ?', undef, $user->{id}
35             );
36             })
37             ->then(sub {
38             my $friends = $res->{friends} = shift;
39             ...
40             })
41             ->catch(sub {
42             my $error = shift;
43             die $error;
44             })
45             ->finally(sub {
46             $cv->send( $res );
47             });
48              
49             $cv->recv;
50              
51             =head1 DESCRIPTION
52              
53             "Async" postgres as much as L allows with L instead of callbacks.
54              
55             You get DBI interface you used to that returns promises, connections pool, queries
56             queuing and support of transactions.
57              
58             =head2 Why pool?
59              
60             DBD::Pg is not async, it's non blocking. Every connection can execute only one query
61             at a moment, so to execute several queries in parallel you need several connections.
62             What you get is you can do something in Perl side while postgres crunches data for
63             you.
64              
65             =head2 Queue
66              
67             Usually if you attempt to run two queries on the same connection then DBI throws an
68             error about active query. Poggy takes care of that by queuing up queries you run on
69             one connection. Handy for transactions and pool doesn't grow too much.
70              
71             =head2 What is async here then?
72              
73             Only a queries on multiple connections, so if you need to execute many parallel
74             queries then you need many connections. pg_bouncer and haproxy are your friends.
75              
76             =cut
77              
78 1     1   281 use DBIx::Poggy::DBI;
  0            
  0            
79             use DBIx::Poggy::Error;
80              
81             =head1 METHODS
82              
83             =head2 new
84              
85             Named arguments:
86              
87             =over 4
88              
89             =item pool_size
90              
91             number of connections to create, creates one more in case all are busy
92              
93             =back
94              
95             Returns a new pool object.
96              
97             =cut
98              
99             sub new {
100             my $proto = shift;
101             my $self = bless { @_ }, ref($proto) || $proto;
102             return $self->init;
103             }
104              
105             sub init {
106             my $self = shift;
107             $self->{pool_size} ||= 10;
108             $self->{ping_on_take} ||= 30;
109             return $self;
110             }
111              
112             =head2 connect
113              
114             Takes the same arguments as L, opens "pool_size" connections.
115              
116             =cut
117              
118             sub connect {
119             my $self = shift;
120             my ($dsn, $user, $password, $opts) = @_;
121              
122             $opts ||= {};
123             $opts->{RaiseError} //= 1;
124              
125             $self->{free} ||= [];
126              
127             $self->{connection_settings} = [ $dsn, $user, $password, $opts ];
128              
129             $self->_connect for 1 .. $self->{pool_size};
130             return $self;
131             }
132              
133             sub _connect {
134             my $self = shift;
135              
136             my $dbh = DBIx::Poggy::DBI->connect(
137             @{ $self->{connection_settings} }
138             ) or die DBIx::Poggy::Error->new( 'DBIx::Poggy::DBI' );
139             push @{$self->{free}}, $dbh;
140             $self->{last_used}{ refaddr $dbh } = time;
141              
142             return;
143             }
144              
145             =head2 take
146              
147             Gives one connection from the pool. Takes arguments:
148              
149             =over 4
150              
151             =item auto
152              
153             Connection will be released to the pool after transaction or
154             as soon as query queue becomes empty. True by default.
155              
156             =back
157              
158             Returns L handle. When "auto" is turned off
159             then in list context returns also guard object that will L
160             handle to the pool on destruction.
161              
162             =cut
163              
164             sub take {
165             my $self = shift;
166             my (%args) = (auto => 1, @_);
167             unless ( $self->{free} ) {
168             die DBIx::Poggy::Error->new(
169             err => 666,
170             errstr => 'Attempt to take a connection from not initialized pool',
171             );
172             }
173             my $dbh;
174             while (1) {
175             unless ( @{ $self->{free} } ) {
176             warn "DB pool exhausted, creating a new connection";
177             $self->_connect;
178             $dbh = shift @{ $self->{free} };
179             delete $self->{last_used}{ refaddr $dbh };
180             last;
181             }
182              
183             $dbh = shift @{ $self->{free} };
184             my $used = delete $self->{last_used}{ refaddr $dbh };
185             if ( (time - $used) > $self->{ping_on_take} ) {
186             unless ( $dbh->ping ) {
187             warn "connection is not alive, dropping";
188             next;
189             }
190             }
191             last;
192             }
193              
194             if ( $args{auto} ) {
195             $dbh->{private_poggy_state}{release_to} = $self;
196             weaken $dbh->{private_poggy_state}{release_to};
197             return $dbh;
198             }
199             return $dbh unless wantarray;
200             return ( $dbh, guard { $self->release($dbh) } );
201             }
202              
203             =head2 release
204              
205             Takes a handle as argument and puts it back into the pool. At the moment,
206             no protection against double putting or active queries on the handle.
207              
208             =cut
209              
210             sub release {
211             my $self = shift;
212             my $dbh = shift;
213             delete $dbh->{private_poggy_state}{release_to};
214              
215             if ( $dbh->err && !$dbh->ping ) {
216             warn "handle is in error state and not ping'able, not releasing to the pool";
217             return $self;
218             }
219              
220             push @{ $self->{free} }, $dbh;
221             $self->{last_used}{ refaddr $dbh } = time;
222             return $self;
223             }
224              
225             =head2 AUTHOR
226              
227             Ruslan U. Zakirov ERuslan.Zakirov@gmail.comE
228              
229             =head2 LICENSE
230              
231             Under the same terms as perl itself.
232              
233             =cut
234              
235             1;