File Coverage

blib/lib/Mojo/Pg/Che/Database.pm
Criterion Covered Total %
statement 7 9 77.7
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 10 12 83.3


line stmt bran cond sub pod time code
1             package Mojo::Pg::Che::Database;
2              
3 1     1   4 use Mojo::Base 'Mojo::EventEmitter'; #'Mojo::Pg::Database';
  1         0  
  1         6  
4 1     1   165 use Carp qw(croak shortmess);
  1         2  
  1         64  
5 1     1   338 use DBD::Pg ':async';
  0            
  0            
6             use Mojo::IOLoop;
7             use Mojo::Pg::Che::Results;
8             use Mojo::Pg::Transaction;
9             use Scalar::Util 'weaken';
10             #~ use Mojo::JSON 'to_json';
11              
12             my $handler_err = sub {$_[0] = shortmess $_[0]; 0;};
13             has handler_err => sub {$handler_err};
14              
15             has [qw(dbh pg)];
16              
17             has results_class => 'Mojo::Pg::Che::Results';
18              
19             my $PKG = __PACKAGE__;
20              
21             sub disconnect {# copy/paste Mojo::Pg::Database
22             my $self = shift;
23             $self->_unwatch;
24             $self->dbh->disconnect;
25             }
26              
27             sub is_listening { !!keys %{shift->{listen} || {}} }# copy/paste Mojo::Pg::Database
28              
29             sub listen {# copy/paste Mojo::Pg::Database
30             my ($self, $name) = @_;
31              
32             my $dbh = $self->dbh;
33             $dbh->do('listen ' . $dbh->quote_identifier($name))
34             unless $self->{listen}{$name}++;
35             $self->_watch;
36              
37             return $self;
38             }
39              
40             sub unlisten {# copy/paste Mojo::Pg::Database
41             my ($self, $name) = @_;
42              
43             my $dbh = $self->dbh;
44             $dbh->do('unlisten ' . $dbh->quote_identifier($name));
45             $name eq '*' ? delete $self->{listen} : delete $self->{listen}{$name};
46             $self->_unwatch unless $self->{waiting} || $self->is_listening;
47              
48             return $self;
49             }
50              
51             sub _notifications {# copy/paste Mojo::Pg::Database
52             my $self = shift;
53             my $dbh = $self->dbh;
54             while (my $n = $dbh->pg_notifies) { $self->emit(notification => @$n) }
55             }
56              
57             sub notify {# copy/paste Mojo::Pg::Database
58             my ($self, $name, $payload) = @_;
59              
60             my $dbh = $self->dbh;
61             my $notify = 'notify ' . $dbh->quote_identifier($name);
62             $notify .= ', ' . $dbh->quote($payload) if defined $payload;
63             $dbh->do($notify);
64             $self->_notifications;
65              
66             return $self;
67             }
68              
69             sub pid { shift->dbh->{pg_pid} } # copy/paste Mojo::Pg::Database
70              
71             sub ping { shift->dbh->ping } # copy/paste Mojo::Pg::Database
72              
73             sub query { shift->select(@_) }
74              
75             sub execute_sth {
76             my ($self, $sth,) = map shift, 1..2;
77            
78             my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
79            
80             #~ croak 'Previous async query has not finished'
81             #~ if $self->dbh->{pg_async_status} == 1;
82            
83             croak 'Non-blocking query already in progress'
84             if $self->{waiting};
85            
86             local $sth->{HandleError} = $self->handler_err;
87             $sth->{pg_async} = $cb ? PG_ASYNC : 0; # $self->dbh->{pg_async_status} == 1 ? PG_OLDQUERY_WAIT : PG_ASYNC #
88            
89             #~ $sth->execute(map { _json($_) ? to_json $_->{json} : $_ } @_);
90             eval {$sth->execute(@_)}#binds
91             or die "Bad statement: ", $@, $sth->{Statement};
92            
93             # Blocking
94             unless ($cb) {
95             $self->_notifications;
96             return $self->results_class->new(sth => $sth);
97             }
98            
99             # Non-blocking
100             $self->{waiting} = {cb => $cb, sth => $sth};
101             $self->_watch;
102             }
103              
104             sub execute_string {
105             my ($self, $query, $attrs,) = map shift, 1..3;
106            
107             my $dbh = $self->dbh;
108            
109             my $sth = $self->prepare($query, $attrs, 3);
110            
111             return $self->execute_sth($sth, @_);
112            
113             }
114              
115             sub prepare {
116             my ($self, $query, $attrs, $flag,) = @_;
117            
118             my $dbh = $self->dbh;
119            
120             return $dbh->prepare_cached($query, $attrs, $flag)
121             if delete $attrs->{Cached};
122            
123             return $dbh->prepare($query, $attrs,);
124            
125             }
126              
127             sub prepare_cached { shift->dbh->prepare_cached(@_); }
128              
129             sub tx {shift->begin}
130             sub begin {
131             my $self = shift;
132             return $self->{tx}
133             if $self->{tx};
134            
135             my $tx = $self->{tx} = Mojo::Pg::Transaction->new(db => $self);
136             weaken $tx->{db};
137             return $tx;
138              
139             }
140              
141             sub commit {
142             my $self = shift;
143             my $tx = delete $self->{tx}
144             or return;
145             $tx->commit;
146             }
147              
148             sub rollback {
149             my $self = shift;
150             my $tx = delete $self->{tx}
151             or return;
152             #~ warn "TX destroy";
153             $tx = undef;# DESTROY
154            
155             }
156              
157             my @DBH_METHODS = qw(
158             select
159             selectrow_array
160             selectrow_arrayref
161             selectrow_hashref
162             selectall_arrayref
163             selectall_array
164             selectall_hashref
165             selectcol_arrayref
166             do
167             );
168              
169             for my $method (@DBH_METHODS) {
170             no strict 'refs';
171             no warnings 'redefine';
172             *{"${PKG}::$method"} = sub { shift->_DBH_METHOD($method, @_) };
173            
174             }
175              
176             sub _DBH_METHOD {
177             my ($self, $method) = (shift, shift);
178             my ($sth, $query) = ref $_[0] ? (shift, undef) : (undef, shift);
179            
180             my @to_fetch = ();
181            
182             push @to_fetch, shift # $key_field
183             if $method eq 'selectall_hashref' && ! ref $_[0];
184            
185             my $attrs = shift;
186            
187            
188             $to_fetch[0] = delete $attrs->{KeyField}
189             if exists $attrs->{KeyField};
190            
191             #~ if ($method eq 'selectall_arrayref') {
192             for (qw(Slice MaxRows)) {
193             push @to_fetch, delete $attrs->{$_}
194             if exists $attrs->{$_};
195             }
196             $to_fetch[0] = delete $attrs->{Columns}
197             if exists $attrs->{Columns};
198             #~ }
199            
200             my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
201            
202             my $async = delete $attrs->{Async} || delete $attrs->{pg_async};
203              
204             $sth ||= $self->prepare($query, $attrs, 3,);
205             my $result;
206             $cb ||= sub {
207             my ($db, $err) = map shift, 1..2;
208             croak "Error on non-blocking $method: ",$err
209             if $err;
210             $result = shift;
211            
212             } if $async;
213            
214             my @bind = @_;
215            
216             $result = $self->execute_sth($sth, @bind, $cb ? ($cb) : ());
217            
218             Mojo::IOLoop->start
219             if $async && ! Mojo::IOLoop->is_running;
220            
221             (my $fetch_method = $method) =~ s/select/fetch/;
222            
223             return $result->$fetch_method(@to_fetch)
224             if ref $result eq $self->results_class && $result->can($fetch_method);
225            
226             return $result;
227            
228             }
229              
230             #Patch parent meth for $self->results_class
231             sub _watch {
232             my $self = shift;
233              
234             return if $self->{watching} || $self->{watching}++;
235              
236             my $dbh = $self->dbh;
237             unless ($self->{handle}) {
238             open $self->{handle}, '<&', $dbh->{pg_socket} or die "Can't dup: $!";
239             }
240             Mojo::IOLoop->singleton->reactor->io(
241             $self->{handle} => sub {
242             #~ die 146;
243             my $reactor = shift;
244              
245             $self->_unwatch if !eval { $self->_notifications; 1 };
246             #~ warn '_Watch', $self->{waiting};
247             return unless $self->{waiting} && $dbh->pg_ready;
248             my ($sth, $cb) = @{delete $self->{waiting}}{qw(sth cb)};
249              
250             # Do not raise exceptions inside the event loop
251             my $result = do { local $dbh->{RaiseError} = 0; $dbh->pg_result };
252             my $err = defined $result ? undef : $dbh->errstr;
253              
254             eval { $self->$cb($err, $self->results_class->new(sth => $sth)); };
255             warn "Non-blocking callback result error: ", $@
256             and $reactor->{cb_error} = $@
257             if $@;
258            
259             $self->_unwatch unless $self->{waiting} || $self->is_listening;
260             }
261             )->watch($self->{handle}, 1, 0);
262             }
263              
264             sub _unwatch {# copy/paste Mojo::Pg::Database
265             my $self = shift;
266             return unless delete $self->{watching};
267             Mojo::IOLoop->singleton->reactor->remove($self->{handle});
268             $self->emit('close') if $self->is_listening;
269             }
270              
271             sub DESTROY {# copy/paste Mojo::Pg::Database + rollback
272             my $self = shift;
273            
274             $self->rollback;
275            
276             my $waiting = $self->{waiting};
277             $waiting->{cb}($self, 'Premature connection close', undef) if $waiting->{cb};
278              
279             return unless (my $pg = $self->pg) && (my $dbh = $self->dbh);
280             $pg->_enqueue($dbh);
281            
282             #~ $self->SUPER::DESTROY;
283             }
284              
285             1;