File Coverage

blib/lib/DBIx/Poggy/DBI.pm
Criterion Covered Total %
statement 16 18 88.8
branch n/a
condition n/a
subroutine 6 6 100.0
pod n/a
total 22 24 91.6


line stmt bran cond sub pod time code
1 1     1   7 use strict;
  1         1  
  1         24  
2 1     1   4 use warnings;
  1         2  
  1         30  
3              
4             package DBIx::Poggy::DBI;
5 1     1   4 use base 'DBI';
  1         2  
  1         1279  
6              
7             =head1 NAME
8              
9             DBIx::Poggy::DBI - DBI subclass
10              
11             =head2 DESCRIPTION
12              
13             Overrides several methods in L. All queries are marked as async. See list of
14             supported methods below:
15              
16             =cut
17              
18             package DBIx::Poggy::DBI::db;
19 1     1   14697 use base 'DBI::db';
  1         3  
  1         374  
20              
21 1     1   929 use AnyEvent;
  1         5211  
  1         49  
22 1     1   375 use DBD::Pg qw(:async);
  0            
  0            
23             use Promises qw(collect deferred);
24             use Scalar::Util qw(weaken blessed);
25             use Guard qw(guard);
26             use Devel::GlobalDestruction;
27              
28             sub connected {
29             my $self= shift;
30             $self->{private_poggy_state} = {active => 0, queue => []};
31             return;
32             }
33              
34             =head2 METHODS
35              
36             =head3 supported
37              
38             These are supported: L, L, L,
39             L, L and L.
40              
41             For example:
42              
43             $pool->take->selectrow_array(
44             "SELECT * FROM test LIMIT 1",
45             )->then(sub {
46             my @row = @_;
47             ...
48             });
49              
50             See L to learn about L, L and L.
51              
52             =head3 not supported
53              
54             These are not supported, but will be when I need them or somebody will write a patch:
55             L
56              
57             You don't use C, C, C or C. I have some ideas of making
58             these work, but don't think there is urgent need to pursue.
59              
60             =cut
61              
62             my %map = (
63             selectrow_array => ['fetchrow_array'],
64             selectrow_arrayref => ['fetchrow_arrayref'],
65             selectrow_hashref => ['fetchrow_hashref'],
66             selectall_arrayref => ['fetchall_arrayref', sub {
67             my $in = shift;
68             my ($query, $attrs) = splice @$in, 0, 2;
69             my @fetch_args;
70             @fetch_args = delete @{$attrs}{'Slice', 'MaxRows'} if $attrs;
71             return (\@fetch_args, $query, $attrs, $in);
72             } ],
73             selectall_hashref => ['fetchall_hashref', sub {
74             my $in = shift;
75             my ($query, $key_field, $attrs) = splice @$in, 0, 3;
76             my @fetch_args = $key_field;
77             return (\@fetch_args, $query, $attrs, $in);
78             } ],
79             do => [''],
80             );
81             while ( my ($method, $fetch_method) = each %map ) {
82             no strict 'refs';
83             *{$method} = sub {
84             my $self = shift;
85              
86             my $d = deferred;
87              
88             my @args = $fetch_method->[1]?
89             ($d, $fetch_method->[0], $fetch_method->[1]->(\@_))
90             : ($d, $fetch_method->[0], [], shift, shift, \@_)
91             ;
92              
93             my $state = $self->{private_poggy_state};
94             if ( $state->{active} ) {
95             push @{$state->{queue}}, \@args;
96             return $d->promise;
97             }
98             $self->_do_async( @args );
99             return $d->promise;
100             }
101             }
102              
103             sub _do_async {
104             my $self = shift;
105             my ($d, $fetch_method, $fetch_args, $query, $args, $binds) = @_;
106              
107             my $sth;
108              
109             my $done = sub {
110             my $method = shift;
111             my @res = @_;
112             if ( $method eq 'reject' ) {
113             my $err = $self->errobj;
114             $err->{errstr} ||= $res[0] if @res;
115             unshift @res, $err;
116             }
117             if ( $sth ) {
118             $sth->finish unless $method eq 'reject';
119             $sth = undef;
120             }
121              
122             $d->$method( @res );
123              
124             return;
125             };
126              
127             $sth = eval { $self->prepare($query, $args) }
128             or return $done->( 'reject', $@ );
129             eval { $sth->execute( @$binds ) }
130             or return $done->( 'reject', Carp::longmess($@) );
131              
132             my $guard;
133             my $watcher = sub {
134             my $ready;
135             local $@;
136             eval { $ready = $self->pg_ready; 1 } or do {
137             $guard = undef;
138             return $done->('reject', $@);
139             };
140             return unless $ready;
141              
142             $guard = undef;
143             my $res = eval { $self->pg_result } or return $done->( 'reject', $@ );
144             return $done->(resolve => $res) unless $fetch_method;
145             my @res;
146             eval { @res = $sth->$fetch_method( @$fetch_args ); 1 } or return $done->('reject', $@);
147             return $done->( resolve => @res );
148             };
149             $guard = AnyEvent->io( fh => $self->{pg_socket}, poll => 'r', cb => $watcher );
150             return;
151             }
152              
153             sub prepare {
154             my $self = shift;
155             my $args = ($_[1]||={});
156             $args->{pg_async} ||= 0;
157             $args->{pg_async} |= PG_ASYNC;
158              
159             my $sth = $self->SUPER::prepare( @_ );
160             return $sth unless $sth;
161              
162             my $state = $self->{private_poggy_state};
163              
164             $state->{active}++;
165              
166             my $wself = $self;
167             weaken $wself;
168             $sth->{private_poggy_guard} = guard {
169             --$state->{active};
170             return unless @{ $state->{queue} };
171              
172             unless ($wself) {
173             warn "still have pending sql queries, but dbh has gone away";
174             return;
175             }
176             $wself->_do_async( @{ shift @{$state->{queue}} } );
177             };
178             return $sth;
179             }
180              
181             =head3 Transactions
182              
183             This module wraps L, L and L methods to
184             help handle transactions.
185              
186             B that flow is similar to sync DBI: begin, query, query, ..., commit or
187             rollback, so it's your job to make sure commit or rollback is called after
188             all queries on the handle are finished otherwise code dies.
189              
190             =head4 begin_work
191              
192             Returns a Promise that will be resolved once transaction is committed or
193             rejected if the transaction is rolled back or failed attempt to start
194             the transaction.
195              
196             Value of the promise would be whatever is passed to commit or rollback.
197              
198             =cut
199              
200             sub begin_work {
201             my $self = shift;
202             my $d = deferred;
203             $self->SUPER::begin_work(@_)
204             or return $d->reject( $self->errobj )->promise;
205             $self->{private_poggy_state}{txn} = $d;
206             return $d->promise;
207             }
208              
209             =head4 commit
210              
211             Takes resolution value of the transaction, commits and resolves the promise returned
212             by L with the value.
213              
214             Dies if you call commit without transaction or while queries are active.
215              
216             =cut
217              
218             sub commit {
219             my $self = shift;
220             my $state = $self->{private_poggy_state};
221             die "Can not commit when you have active queries"
222             if $state->{active} || @{$state->{queue}};
223             my $d = delete $state->{txn} or die "No transaction in progress";
224             my $rv = $self->SUPER::commit();
225             unless ( $rv ) {
226             $d->reject($self->errobj);
227             return $rv;
228             }
229             $d->resolve(@_);
230             return $rv;
231             }
232              
233             =head4 rollback
234              
235             Takes rollback value of the transaction, commits and rejects the promise returned
236             by L with the value.
237              
238             Dies if you call rollback without transaction or while queries are active.
239              
240             =cut
241              
242             sub rollback {
243             my $self = shift;
244             my $state = $self->{private_poggy_state};
245             die "Can not commit when you have active queries"
246             if $state->{active} || @{$state->{queue}};
247             my $d = delete $state->{txn} or die "No transaction in progress";
248             my $rv = $self->SUPER::rollback();
249             unless ( $rv ) {
250             $d->reject($self->errobj);
251             return $rv;
252             }
253             $d->reject(@_);
254             return $rv;
255             }
256              
257             sub errobj {
258             my $self = shift;
259             return DBIx::Poggy::Error->new( $self );
260             }
261              
262             my $orig;
263             BEGIN { $orig = __PACKAGE__->can('DESTROY') }
264             sub DESTROY {
265             my $self = shift;
266             unless (in_global_destruction) {
267             # ressurect DBH, bu pushing it back into the pool
268             # I know it's hackish, but I couldn't find good way to implement
269             # auto release that works transparently
270             my $state = $self->{private_poggy_state} || {};
271             if ( $state->{release_to} ) {
272             $self->SUPER::rollback() if delete $state->{txn};
273             return $state->{release_to}->release($self);
274             }
275             }
276             return $orig->($self, @_) if $orig;
277             return;
278             }
279              
280             package DBIx::Poggy::DBI::st;
281             use base 'DBI::st';
282              
283             sub errobj {
284             my $self = shift;
285             return DBIx::Poggy::Error->new( $self );
286             }
287              
288             1;