File Coverage

blib/lib/DBIx/Poggy/DBI.pm
Criterion Covered Total %
statement 16 18 88.8
branch n/a
condition n/a
subroutine 6 6 100.0
pod n/a
total 22 24 91.6


line stmt bran cond sub pod time code
1 1     1   5 use strict;
  1         1  
  1         21  
2 1     1   4 use warnings;
  1         2  
  1         28  
3              
4             package DBIx::Poggy::DBI;
5 1     1   4 use base 'DBI';
  1         2  
  1         1197  
6              
7             =head1 NAME
8              
9             DBIx::Poggy::DBI - DBI subclass
10              
11             =head2 DESCRIPTION
12              
13             Overrides several methods in L. All queries are marked as async. See list of
14             supported methods below:
15              
16             =cut
17              
18             package DBIx::Poggy::DBI::db;
19 1     1   12534 use base 'DBI::db';
  1         2  
  1         350  
20              
21 1     1   828 use AnyEvent;
  1         3819  
  1         29  
22 1     1   206 use DBD::Pg qw(:async);
  0            
  0            
23             use Promises qw(collect deferred);
24             use Scalar::Util qw(weaken blessed);
25             use Guard qw(guard);
26             use Devel::GlobalDestruction;
27              
28             sub connected {
29             my $self= shift;
30             $self->{private_poggy_state} = {active => 0, queue => []};
31             return;
32             }
33              
34             =head2 METHODS
35              
36             =head3 supported
37              
38             These are supported: L, L, L,
39             L, L and L.
40              
41             For example:
42              
43             $pool->take->selectrow_array(
44             "SELECT * FROM test LIMIT 1",
45             )->then(sub {
46             my @row = @_;
47             ...
48             });
49              
50             See L to learn about L, L and L.
51              
52             =head3 not supported
53              
54             These are not supported, but will be when I need them or somebody will write a patch:
55             L
56              
57             You don't use C, C, C or C. I have some ideas of making
58             these work, but don't think there is urgent need to pursue.
59              
60             =cut
61              
62             my %map = (
63             selectrow_array => ['fetchrow_array'],
64             selectrow_arrayref => ['fetchrow_arrayref'],
65             selectrow_hashref => ['fetchrow_hashref'],
66             selectall_arrayref => ['fetchall_arrayref', sub {
67             my $in = shift;
68             my ($query, $attrs) = splice @$in, 0, 2;
69             my @fetch_args;
70             @fetch_args = delete @{$attrs}{'Slice', 'MaxRows'} if $attrs;
71             return (\@fetch_args, $query, $attrs, $in);
72             } ],
73             selectall_hashref => ['fetchall_hashref', sub {
74             my $in = shift;
75             my ($query, $key_field, $attrs) = splice @$in, 0, 3;
76             my @fetch_args = $key_field;
77             return (\@fetch_args, $query, $attrs, $in);
78             } ],
79             do => [''],
80             );
81             while ( my ($method, $fetch_method) = each %map ) {
82             no strict 'refs';
83             *{$method} = sub {
84             my $self = shift;
85              
86             my $d = deferred;
87              
88             my @args = $fetch_method->[1]?
89             ($d, $fetch_method->[0], $fetch_method->[1]->(\@_))
90             : ($d, $fetch_method->[0], [], shift, shift, \@_)
91             ;
92              
93             my $state = $self->{private_poggy_state};
94             if ( $state->{active} ) {
95             push @{$state->{queue}}, \@args;
96             return $d->promise;
97             }
98             $self->_do_async( @args );
99             return $d->promise;
100             }
101             }
102              
103             sub _do_async {
104             my $self = shift;
105             my ($d, $fetch_method, $fetch_args, $query, $args, $binds) = @_;
106              
107             my $sth;
108              
109             my $done = sub {
110             my $method = shift;
111             my @res = @_;
112             if ( $method eq 'reject' ) {
113             my $err = $self->errobj;
114             $err->{errstr} ||= $res[0] if @res;
115             unshift @res, $err;
116             }
117             if ( $sth ) {
118             $sth->finish unless $method eq 'reject';
119             $sth = undef;
120             }
121              
122             $d->$method( @res );
123              
124             return;
125             };
126              
127             $sth = eval { $self->prepare($query, $args) }
128             or return $done->( 'reject', $@ );
129             eval { $sth->execute( @$binds ) }
130             or return $done->( 'reject', Carp::longmess($@) );
131              
132             my $guard;
133             my $watcher = sub {
134             my $ready;
135             local $@;
136             eval { $ready = $self->pg_ready; 1 } or do {
137             return $done->('reject', $@);
138             };
139             return unless $ready;
140              
141             $guard = undef;
142             my $res = eval { $self->pg_result } or return $done->( 'reject', $@ );
143             return $done->(resolve => $res) unless $fetch_method;
144             my @res;
145             eval { @res = $sth->$fetch_method( @$fetch_args ); 1 } or return $done->('reject', $@);
146             return $done->( resolve => @res );
147             };
148             $guard = AnyEvent->io( fh => $self->{pg_socket}, poll => 'r', cb => $watcher );
149             return;
150             }
151              
152             sub prepare {
153             my $self = shift;
154             my $args = ($_[1]||={});
155             $args->{pg_async} ||= 0;
156             $args->{pg_async} |= PG_ASYNC;
157              
158             my $sth = $self->SUPER::prepare( @_ );
159             return $sth unless $sth;
160              
161             my $state = $self->{private_poggy_state};
162              
163             $state->{active}++;
164              
165             my $wself = $self;
166             weaken $wself;
167             $sth->{private_poggy_guard} = guard {
168             --$state->{active};
169             return unless @{ $state->{queue} };
170              
171             unless ($wself) {
172             warn "still have pending sql queries, but dbh has gone away";
173             return;
174             }
175             $wself->_do_async( @{ shift @{$state->{queue}} } );
176             };
177             return $sth;
178             }
179              
180             =head3 Transactions
181              
182             This module wraps L, L and L methods to
183             help handle transactions.
184              
185             B that behaviour is not yet defined when commiting or rolling back
186             a transaction with active query. I just havn't decided what to do in this
187             case. Now it's your job to make sure commit/rollback happens after all
188             queries on the handle.
189              
190             =head4 begin_work
191              
192             Returns a Promise that will be resolved once transaction is committed or
193             rejected on rollback or failed attempt to start the transaction.
194              
195             =cut
196              
197             sub begin_work {
198             my $self = shift;
199             my $d = deferred;
200             $self->SUPER::begin_work(@_)
201             or return $d->reject( $self->errobj )->promise;
202             $self->{private_poggy_state}{txn} = $d;
203             my $wself = $self;
204             if ( my $pool = $self->{private_poggy_state}{release_to} ) {
205             $d->finally(sub { $pool->release( $wself ) });
206             }
207             weaken $wself;
208             return $d->promise;
209             }
210              
211             =head4 commit
212              
213             Takes resolution value of the transaction, commits and resolves the promise returned
214             by L.
215              
216             =cut
217              
218             sub commit {
219             my $self = shift;
220             my $d = delete $self->{private_poggy_state}{txn} or die "No transaction in progress";
221             my $rv = $self->SUPER::commit();
222             unless ( $rv ) {
223             $d->reject($self->errobj);
224             return $rv;
225             }
226             $d->resolve(@_);
227             return $rv;
228             }
229              
230             =head4 rollback
231              
232             Takes rollback value of the transaction, commits and rejects the promise returned
233             by L.
234              
235             =cut
236              
237             sub rollback {
238             my $self = shift;
239             my $d = delete $self->{private_poggy_state}{txn} or die "No transaction in progress";
240             my $rv = $self->SUPER::rollback();
241             unless ( $rv ) {
242             $d->reject($self->errobj);
243             return $rv;
244             }
245             $d->reject(@_);
246             return $rv;
247             }
248              
249             sub errobj {
250             my $self = shift;
251             return DBIx::Poggy::Error->new( $self );
252             }
253              
254             my $orig;
255             BEGIN { $orig = __PACKAGE__->can('DESTROY') }
256             sub DESTROY {
257             my $self = shift;
258             unless (in_global_destruction) {
259             # ressurect DBH, bu pushing it back into the pool
260             # I know it's hackish, but I couldn't find good way to implement
261             # auto release that works transparently
262             my $state = $self->{private_poggy_state} || {};
263             if ( $state->{release_to} ) {
264             $self->SUPER::rollback() if delete $state->{txn};
265             return $state->{release_to}->release($self);
266             }
267             }
268             return $orig->($self, @_) if $orig;
269             return;
270             }
271              
272             package DBIx::Poggy::DBI::st;
273             use base 'DBI::st';
274              
275             sub errobj {
276             my $self = shift;
277             return DBIx::Poggy::Error->new( $self );
278             }
279              
280             1;