File Coverage

blib/lib/DBIx/Poggy/DBI.pm
Criterion Covered Total %
statement 16 18 88.8
branch n/a
condition n/a
subroutine 6 6 100.0
pod n/a
total 22 24 91.6


line stmt bran cond sub pod time code
1 1     1   3 use strict;
  1         1  
  1         21  
2 1     1   2 use warnings;
  1         1  
  1         26  
3              
4             package DBIx::Poggy::DBI;
5 1     1   3 use base 'DBI';
  1         1  
  1         1357  
6              
7             =head1 NAME
8              
9             DBIx::Poggy::DBI - DBI subclass
10              
11             =head2 DESCRIPTION
12              
13             Overrides several methods in L. All queries are marked as async. See list of
14             supported methods below:
15              
16             =cut
17              
18             package DBIx::Poggy::DBI::db;
19 1     1   11759 use base 'DBI::db';
  1         1  
  1         337  
20              
21 1     1   946 use AnyEvent;
  1         3703  
  1         26  
22 1     1   212 use DBD::Pg qw(:async);
  0            
  0            
23             use Promises qw(collect deferred);
24             use Scalar::Util qw(weaken blessed);
25             use Guard qw(guard);
26             use Devel::GlobalDestruction;
27              
28             sub connected {
29             my $self= shift;
30             $self->{private_poggy_state} = {active => 0, queue => []};
31             return;
32             }
33              
34             =head2 METHODS
35              
36             =head3 supported
37              
38             These are supported: L, L, L,
39             L, L and L.
40              
41             For example:
42              
43             $pool->take->selectrow_array(
44             "SELECT * FROM test LIMIT 1",
45             )->then(sub {
46             my @row = @_;
47             ...
48             });
49              
50             See L to learn about L, L and L.
51              
52             =head3 not supported
53              
54             These are not supported, but will be when I need them or somebody will write a patch:
55             L
56              
57             You don't use C, C, C or C. I have some ideas of making
58             these work, but don't think there is urgent need to pursue.
59              
60             =cut
61              
62             my %map = (
63             selectrow_array => ['fetchrow_array'],
64             selectrow_arrayref => ['fetchrow_arrayref'],
65             selectrow_hashref => ['fetchrow_hashref'],
66             selectall_arrayref => ['fetchall_arrayref', sub {
67             my $in = shift;
68             my ($query, $attrs) = splice @$in, 0, 2;
69             my @fetch_args;
70             @fetch_args = delete @{$attrs}{'Slice', 'MaxRows'} if $attrs;
71             return (\@fetch_args, $query, $attrs, $in);
72             } ],
73             selectall_hashref => ['fetchall_hashref', sub {
74             my $in = shift;
75             my ($query, $key_field, $attrs) = splice @$in, 0, 3;
76             my @fetch_args = $key_field;
77             return (\@fetch_args, $query, $attrs, $in);
78             } ],
79             do => [''],
80             );
81             while ( my ($method, $fetch_method) = each %map ) {
82             no strict 'refs';
83             *{$method} = sub {
84             my $self = shift;
85              
86             my $d = deferred;
87              
88             my @args = $fetch_method->[1]?
89             ($d, $fetch_method->[0], $fetch_method->[1]->(\@_))
90             : ($d, $fetch_method->[0], [], shift, shift, \@_)
91             ;
92              
93             my $state = $self->{private_poggy_state};
94             if ( $state->{active} ) {
95             push @{$state->{queue}}, \@args;
96             return $d->promise;
97             }
98             $self->_do_async( @args );
99             return $d->promise;
100             }
101             }
102              
103             sub _do_async {
104             my $self = shift;
105             my ($d, $fetch_method, $fetch_args, $query, $args, $binds) = @_;
106              
107             my $sth;
108              
109             my $done = sub {
110             my $method = shift;
111             my @res = @_;
112             if ( $method eq 'reject' ) {
113             unshift @res, $self->errobj;
114             }
115             if ( $sth ) {
116             $sth->finish;
117             $sth = undef;
118             }
119              
120             $d->$method( @res );
121              
122             return;
123             };
124              
125             $sth = $self->prepare($query, $args)
126             or return $done->( 'reject' );
127             $sth->execute( @$binds )
128             or return $done->( 'reject' );
129              
130             my $guard;
131             my $watcher = sub {
132             return unless $self->pg_ready;
133              
134             $guard = undef;
135             my $res = $self->pg_result or return $done->( 'reject' );
136             return $done->(resolve => $res) unless $fetch_method;
137             return $done->(resolve => $sth->$fetch_method( @$fetch_args ) );
138             };
139             $guard = AnyEvent->io( fh => $self->{pg_socket}, poll => 'r', cb => $watcher );
140             return;
141             }
142              
143             sub prepare {
144             my $self = shift;
145             my $args = ($_[1]||={});
146             $args->{pg_async} ||= 0;
147             $args->{pg_async} |= PG_ASYNC;
148              
149             my $sth = $self->SUPER::prepare( @_ );
150             return $sth unless $sth;
151              
152             my $state = $self->{private_poggy_state};
153              
154             $state->{active}++;
155              
156             my $wself = $self;
157             weaken $wself;
158             $sth->{private_poggy_guard} = guard {
159             --$state->{active};
160             return unless @{ $state->{queue} };
161              
162             unless ($wself) {
163             warn "still have pending sql queries, but dbh has gone away";
164             return;
165             }
166             $wself->_do_async( @{ shift @{$state->{queue}} } );
167             };
168             return $sth;
169             }
170              
171             =head3 Transactions
172              
173             This module wraps L, L and L methods to
174             help handle transactions.
175              
176             B that behaviour is not yet defined when commiting or rolling back
177             a transaction with active query. I just havn't decided what to do in this
178             case. Now it's your job to make sure commit/rollback happens after all
179             queries on the handle.
180              
181             =head4 begin_work
182              
183             Returns a Promise that will be resolved once transaction is committed or
184             rejected on rollback or failed attempt to start the transaction.
185              
186             =cut
187              
188             sub begin_work {
189             my $self = shift;
190             my $d = deferred;
191             $self->SUPER::begin_work(@_)
192             or return $d->reject( $self->errobj )->promise;
193             $self->{private_poggy_state}{txn} = $d;
194             if ( my $pool = $self->{private_poggy_state}{release_to} ) {
195             $d->finally(sub { $pool->release( $self ) });
196             }
197             return $d->promise;
198             }
199              
200             =head4 commit
201              
202             Takes resolution value of the transaction, commits and resolves the promise returned
203             by L.
204              
205             =cut
206              
207             sub commit {
208             my $self = shift;
209             my $d = delete $self->{private_poggy_state}{txn} or die "No transaction in progress";
210             my $rv = $self->SUPER::commit();
211             unless ( $rv ) {
212             $d->reject($self->errobj);
213             return $rv;
214             }
215             $d->resolve(@_);
216             return $rv;
217             }
218              
219             =head4 rollback
220              
221             Takes rollback value of the transaction, commits and rejects the promise returned
222             by L.
223              
224             =cut
225              
226             sub rollback {
227             my $self = shift;
228             my $d = delete $self->{private_poggy_state}{txn} or die "No transaction in progress";
229             my $rv = $self->SUPER::rollback();
230             unless ( $rv ) {
231             $d->reject($self->errobj);
232             return $rv;
233             }
234             $d->reject(@_);
235             return $rv;
236             }
237              
238             sub errobj {
239             my $self = shift;
240             return DBIx::Poggy::Error->new( $self );
241             }
242              
243             my $orig;
244             BEGIN { $orig = __PACKAGE__->can('DESTROY') }
245             sub DESTROY {
246             my $self = shift;
247             unless (in_global_destruction) {
248             # ressurect DBH, bu pushing it back into the pool
249             # I know it's hackish, but I couldn't find good way to implement
250             # auto release that works transparently
251             my $state = $self->{private_poggy_state} || {};
252             return $state->{release_to}->release($self)
253             if $state->{release_to} && !$state->{txn};
254             }
255             return $orig->($self, @_) if $orig;
256             return;
257             }
258              
259             package DBIx::Poggy::DBI::st;
260             use base 'DBI::st';
261              
262             1;