File Coverage

blib/lib/Mojo/Pg/Database.pm
Criterion Covered Total %
statement 7 9 77.7
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 10 12 83.3


line stmt bran cond sub pod time code
1             package Mojo::Pg::Database;
2 6     6   41 use Mojo::Base 'Mojo::EventEmitter';
  6         15  
  6         51  
3              
4 6     6   1399 use Carp qw(croak shortmess);
  6         14  
  6         374  
5 6     6   5290 use DBD::Pg ':async';
  0            
  0            
6             use Mojo::IOLoop;
7             use Mojo::JSON 'to_json';
8             use Mojo::Pg::Results;
9             use Mojo::Pg::Transaction;
10             use Mojo::Promise;
11             use Mojo::Util 'monkey_patch';
12             use Scalar::Util 'weaken';
13              
14             has [qw(dbh pg)];
15             has results_class => 'Mojo::Pg::Results';
16              
17             for my $name (qw(delete insert select update)) {
18             monkey_patch __PACKAGE__, $name, sub {
19             my ($self, @cb) = (shift, ref $_[-1] eq 'CODE' ? pop : ());
20             return $self->query($self->pg->abstract->$name(@_), @cb);
21             };
22             monkey_patch __PACKAGE__, "${name}_p", sub {
23             my $self = shift;
24             return $self->query_p($self->pg->abstract->$name(@_));
25             };
26             }
27              
28             sub DESTROY {
29             my $self = shift;
30              
31             my $waiting = $self->{waiting};
32             $waiting->{cb}($self, 'Premature connection close', undef) if $waiting->{cb};
33              
34             return unless (my $pg = $self->pg) && (my $dbh = $self->dbh);
35             $pg->_enqueue($dbh) unless $dbh->{private_mojo_no_reuse};
36             }
37              
38             sub begin {
39             my $self = shift;
40             my $tx = Mojo::Pg::Transaction->new(db => $self);
41             weaken $tx->{db};
42             return $tx;
43             }
44              
45             sub disconnect {
46             my $self = shift;
47             $self->_unwatch;
48             $self->dbh->disconnect;
49             }
50              
51             sub dollar_only { ++$_[0]{dollar_only} and return $_[0] }
52              
53             sub is_listening { !!keys %{shift->{listen} || {}} }
54              
55             sub listen {
56             my ($self, $name) = @_;
57              
58             my $dbh = $self->dbh;
59             $dbh->do('listen ' . $dbh->quote_identifier($name))
60             unless $self->{listen}{$name}++;
61             $self->_watch;
62              
63             return $self;
64             }
65              
66             sub notify {
67             my ($self, $name, $payload) = @_;
68              
69             my $dbh = $self->dbh;
70             my $notify = 'notify ' . $dbh->quote_identifier($name);
71             $notify .= ', ' . $dbh->quote($payload) if defined $payload;
72             $dbh->do($notify);
73             $self->_notifications;
74              
75             return $self;
76             }
77              
78             sub pid { shift->dbh->{pg_pid} }
79              
80             sub ping { shift->dbh->ping }
81              
82             sub query {
83             my ($self, $query) = (shift, shift);
84             my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
85              
86             croak 'Non-blocking query already in progress' if $self->{waiting};
87              
88             my %attrs;
89             $attrs{pg_placeholder_dollaronly} = 1 if delete $self->{dollar_only};
90             $attrs{pg_async} = PG_ASYNC if $cb;
91             my $sth = $self->dbh->prepare_cached($query, \%attrs, 3);
92             local $sth->{HandleError} = sub { $_[0] = shortmess $_[0]; 0 };
93              
94             for (my $i = 0; $#_ >= $i; $i++) {
95             my ($param, $attrs) = ($_[$i], {});
96             if (ref $param eq 'HASH') {
97             if (exists $param->{json}) { $param = to_json $param->{json} }
98             elsif (exists $param->{type} && exists $param->{value}) {
99             ($attrs->{pg_type}, $param) = @{$param}{qw(type value)};
100             }
101             }
102             $sth->bind_param($i + 1, $param, $attrs);
103             }
104             $sth->execute;
105              
106             # Blocking
107             unless ($cb) {
108             $self->_notifications;
109             return $self->results_class->new(sth => $sth);
110             }
111              
112             # Non-blocking
113             $self->{waiting} = {cb => $cb, sth => $sth};
114             $self->_watch;
115             }
116              
117             sub query_p {
118             my $self = shift;
119             my $promise = Mojo::Promise->new;
120             $self->query(
121             @_ => sub { $_[1] ? $promise->reject($_[1]) : $promise->resolve($_[2]) });
122             return $promise;
123             }
124              
125             sub tables {
126             my @tables = shift->dbh->tables('', '', '', '');
127             return [grep { $_ !~ /^(?:pg_catalog|information_schema)\./ } @tables];
128             }
129              
130             sub unlisten {
131             my ($self, $name) = @_;
132              
133             my $dbh = $self->dbh;
134             $dbh->do('unlisten ' . $dbh->quote_identifier($name));
135             $name eq '*' ? delete $self->{listen} : delete $self->{listen}{$name};
136             $self->_unwatch unless $self->{waiting} || $self->is_listening;
137              
138             return $self;
139             }
140              
141             sub _notifications {
142             my $self = shift;
143             my $dbh = $self->dbh;
144             while (my $n = $dbh->pg_notifies) { $self->emit(notification => @$n) }
145             }
146              
147             sub _unwatch {
148             my $self = shift;
149             return unless delete $self->{watching};
150             Mojo::IOLoop->singleton->reactor->remove($self->{handle});
151             $self->emit('close') if $self->is_listening;
152             }
153              
154             sub _watch {
155             my $self = shift;
156              
157             return if $self->{watching} || $self->{watching}++;
158              
159             my $dbh = $self->dbh;
160             unless ($self->{handle}) {
161             open $self->{handle}, '<&', $dbh->{pg_socket} or die "Can't dup: $!";
162             }
163             Mojo::IOLoop->singleton->reactor->io(
164             $self->{handle} => sub {
165             my $reactor = shift;
166              
167             $self->_unwatch if !eval { $self->_notifications; 1 };
168             return unless $self->{waiting} && $dbh->pg_ready;
169             my ($sth, $cb) = @{delete $self->{waiting}}{qw(sth cb)};
170              
171             # Do not raise exceptions inside the event loop
172             my $result = do { local $dbh->{RaiseError} = 0; $dbh->pg_result };
173             my $err = defined $result ? undef : $dbh->errstr;
174              
175             $self->$cb($err, $self->results_class->new(sth => $sth));
176             $self->_unwatch unless $self->{waiting} || $self->is_listening;
177             }
178             )->watch($self->{handle}, 1, 0);
179             }
180              
181             1;
182              
183             =encoding utf8
184              
185             =head1 NAME
186              
187             Mojo::Pg::Database - Database
188              
189             =head1 SYNOPSIS
190              
191             use Mojo::Pg::Database;
192              
193             my $db = Mojo::Pg::Database->new(pg => $pg, dbh => $dbh);
194             $db->query('select * from foo')
195             ->hashes->map(sub { $_->{bar} })->join("\n")->say;
196              
197             =head1 DESCRIPTION
198              
199             L is a container for L database handles used by
200             L.
201              
202             =head1 EVENTS
203              
204             L inherits all events from L and can
205             emit the following new ones.
206              
207             =head2 close
208              
209             $db->on(close => sub {
210             my $db = shift;
211             ...
212             });
213              
214             Emitted when the database connection gets closed while waiting for
215             notifications.
216              
217             =head2 notification
218              
219             $db->on(notification => sub {
220             my ($db, $name, $pid, $payload) = @_;
221             ...
222             });
223              
224             Emitted when a notification has been received.
225              
226             =head1 ATTRIBUTES
227              
228             L implements the following attributes.
229              
230             =head2 dbh
231              
232             my $dbh = $db->dbh;
233             $db = $db->dbh($dbh);
234              
235             L database handle used for all queries.
236              
237             # Use DBI utility methods
238             my $quoted = $db->dbh->quote_identifier('foo.bar');
239              
240             =head2 pg
241              
242             my $pg = $db->pg;
243             $db = $db->pg(Mojo::Pg->new);
244              
245             L object this database belongs to.
246              
247             =head2 results_class
248              
249             my $class = $db->results_class;
250             $db = $db->results_class('MyApp::Results');
251              
252             Class to be used by L, defaults to L. Note that
253             this class needs to have already been loaded before L is called.
254              
255             =head1 METHODS
256              
257             L inherits all methods from L and
258             implements the following new ones.
259              
260             =head2 begin
261              
262             my $tx = $db->begin;
263              
264             Begin transaction and return L object, which will
265             automatically roll back the transaction unless
266             L has been called before it is destroyed.
267              
268             # Insert rows in a transaction
269             eval {
270             my $tx = $db->begin;
271             $db->insert('frameworks', {name => 'Catalyst'});
272             $db->insert('frameworks', {name => 'Mojolicious'});
273             $tx->commit;
274             };
275             say $@ if $@;
276              
277             =head2 delete
278              
279             my $results = $db->delete($table, \%where, \%options);
280              
281             Generate a C statement with L (usually an
282             L object) and execute it with L. You can also append a
283             callback to perform operations non-blocking.
284              
285             $db->delete(some_table => sub {
286             my ($db, $err, $results) = @_;
287             ...
288             });
289             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
290              
291             Use all the same argument variations you would pass to the C method of
292             L.
293              
294             # "delete from some_table"
295             $db->delete('some_table');
296              
297             # "delete from some_table where foo = 'bar'"
298             $db->delete('some_table', {foo => 'bar'});
299              
300             # "delete from some_table where foo like '%test%'"
301             $db->delete('some_table', {foo => {-like => '%test%'}});
302              
303             # "delete from some_table where foo = 'bar' returning id"
304             $db->delete('some_table', {foo => 'bar'}, {returning => 'id'});
305              
306             =head2 delete_p
307              
308             my $promise = $db->delete_p($table, \%where, \%options);
309              
310             Same as L, but performs all operations non-blocking and returns a
311             L object to be used as a promise instead of accepting a callback.
312              
313             $db->delete_p('some_table')->then(sub {
314             my $results = shift;
315             ...
316             })->catch(sub {
317             my $err = shift;
318             ...
319             })->wait;
320              
321             =head2 disconnect
322              
323             $db->disconnect;
324              
325             Disconnect L and prevent it from getting reused.
326              
327             =head2 dollar_only
328              
329             $db = $db->dollar_only;
330              
331             Activate C for next L call and allow C
332             to be used as an operator.
333              
334             # Check for a key in a JSON document
335             $db->dollar_only->query('select * from foo where bar ? $1', 'baz')
336             ->expand->hashes->map(sub { $_->{bar}{baz} })->join("\n")->say;
337              
338             =head2 insert
339              
340             my $results = $db->insert($table, \@values || \%fieldvals, \%options);
341              
342             Generate an C statement with L (usually an
343             L object) and execute it with L. You can also append a
344             callback to perform operations non-blocking.
345              
346             $db->insert(some_table => {foo => 'bar'} => sub {
347             my ($db, $err, $results) = @_;
348             ...
349             });
350             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
351              
352             Use all the same argument variations you would pass to the C method of
353             L.
354              
355             # "insert into some_table (foo, baz) values ('bar', 'yada')"
356             $db->insert('some_table', {foo => 'bar', baz => 'yada'});
357              
358             # "insert into some_table (foo) values ({1,2,3})"
359             $db->insert('some_table', {foo => [1, 2, 3]});
360              
361             # "insert into some_table (foo) values ('bar') returning id"
362             $db->insert('some_table', {foo => 'bar'}, {returning => 'id'});
363              
364             # "insert into some_table (foo) values ('bar') returning id, foo"
365             $db->insert('some_table', {foo => 'bar'}, {returning => ['id', 'foo']});
366              
367             =head2 insert_p
368              
369             my $promise = $db->insert_p($table, \@values || \%fieldvals, \%options);
370              
371             Same as L, but performs all operations non-blocking and returns a
372             L object to be used as a promise instead of accepting a callback.
373              
374             $db->insert_p(some_table => {foo => 'bar'})->then(sub {
375             my $results = shift;
376             ...
377             })->catch(sub {
378             my $err = shift;
379             ...
380             })->wait;
381              
382             =head2 is_listening
383              
384             my $bool = $db->is_listening;
385              
386             Check if L is listening for notifications.
387              
388             =head2 listen
389              
390             $db = $db->listen('foo');
391              
392             Subscribe to a channel and receive L events when the
393             L event loop is running.
394              
395             =head2 notify
396              
397             $db = $db->notify('foo');
398             $db = $db->notify(foo => 'bar');
399              
400             Notify a channel.
401              
402             =head2 pid
403              
404             my $pid = $db->pid;
405              
406             Return the process id of the backend server process.
407              
408             =head2 ping
409              
410             my $bool = $db->ping;
411              
412             Check database connection.
413              
414             =head2 query
415              
416             my $results = $db->query('select * from foo');
417             my $results = $db->query('insert into foo values (?, ?, ?)', @values);
418             my $results = $db->query('select ?::json as foo', {json => {bar => 'baz'}});
419              
420             Execute a blocking L
421             statement and return a results object based on L (which is
422             usually L) with the query results. The L statement
423             handle will be automatically reused when it is not active anymore, to increase
424             the performance of future queries. You can also append a callback to perform
425             operations non-blocking.
426              
427             $db->query('insert into foo values (?, ?, ?)' => @values => sub {
428             my ($db, $err, $results) = @_;
429             ...
430             });
431             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
432              
433             Hash reference arguments containing a value named C, will be encoded to
434             JSON text with L. To accomplish the reverse, you can use
435             the method L, which automatically decodes all fields
436             of the types C and C with L to Perl values.
437              
438             # "I ♥ Mojolicious!"
439             $db->query('select ?::jsonb as foo', {json => {bar => 'I ♥ Mojolicious!'}})
440             ->expand->hash->{foo}{bar};
441              
442             Hash reference arguments containing values named C and C, can be
443             used to bind specific L data types to placeholders.
444              
445             # Insert binary data
446             use DBD::Pg ':pg_types';
447             $db->query('insert into bar values (?)', {type => PG_BYTEA, value => $bytes});
448              
449             =head2 query_p
450              
451             my $promise = $db->query_p('select * from foo');
452              
453             Same as L, but performs all operations non-blocking and returns a
454             L object to be used as a promise instead of accepting a callback.
455              
456             $db->query_p('insert into foo values (?, ?, ?)' => @values)->then(sub {
457             my $results = shift;
458             ...
459             })->catch(sub {
460             my $err = shift;
461             ...
462             })->wait;
463              
464             =head2 select
465              
466             my $results = $db->select($source, $fields, $where, $order);
467              
468             Generate a C
469             L object) and execute it with L. You can also append a
470             callback to perform operations non-blocking.
471              
472             $db->select(some_table => ['foo'] => {bar => 'yada'} => sub {
473             my ($db, $err, $results) = @_;
474             ...
475             });
476             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
477              
478             Use all the same argument variations you would pass to the C
479             L.
480              
481             # "select * from some_table"
482             $db->select('some_table');
483              
484             # "select id, foo from some_table"
485             $db->select('some_table', ['id', 'foo']);
486              
487             # "select * from some_table where foo = 'bar'"
488             $db->select('some_table', undef, {foo => 'bar'});
489              
490             # "select * from some_table where foo = 'bar' order by id desc"
491             $db->select('some_table', undef, {foo => 'bar'}, {-desc => 'id'});
492              
493             # "select * from some_table where foo like '%test%'"
494             $db->select('some_table', undef, {foo => {-like => '%test%'}});
495              
496             =head2 select_p
497              
498             my $promise = $db->select_p($source, $fields, $where, $order);
499              
500             Same as L, but performs all operations non-blocking and returns a
501             L object to be used as a promise instead of accepting a callback.
502              
503             $db->select_p(some_table => ['foo'] => {bar => 'yada'})->then(sub {
504             my $results = shift;
505             ...
506             })->catch(sub {
507             my $err = shift;
508             ...
509             })->wait;
510              
511             =head2 tables
512              
513             my $tables = $db->tables;
514              
515             Return table and view names for this database, that are visible to the current
516             user and not internal, as an array reference.
517              
518             # Names of all tables
519             say for @{$db->tables};
520              
521             =head2 unlisten
522              
523             $db = $db->unlisten('foo');
524             $db = $db->unlisten('*');
525              
526             Unsubscribe from a channel, C<*> can be used to unsubscribe from all channels.
527              
528             =head2 update
529              
530             my $results = $db->update($table, \%fieldvals, \%where, \%options);
531              
532             Generate an C statement with L (usually an
533             L object) and execute it with L. You can also append a
534             callback to perform operations non-blocking.
535              
536             $db->update(some_table => {foo => 'baz'} => {foo => 'bar'} => sub {
537             my ($db, $err, $results) = @_;
538             ...
539             });
540             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
541              
542             Use all the same argument variations you would pass to the C method of
543             L.
544              
545             # "update some_table set foo = 'bar' where id = 23"
546             $db->update('some_table', {foo => 'bar'}, {id => 23});
547              
548             # "update some_table set foo = {1,2,3} where id = 23"
549             $db->update('some_table', {foo => [1, 2, 3]}, {id => 23});
550              
551             # "update some_table set foo = 'bar' where foo like '%test%'"
552             $db->update('some_table', {foo => 'bar'}, {foo => {-like => '%test%'}});
553              
554             # "update some_table set foo = 'bar' where id = 23 returning id"
555             $db->update('some_table', {foo => 'bar'}, {id => 23}, {returning => 'id'});
556              
557             =head2 update_p
558              
559             my $promise = $db->update_p($table, \%fieldvals, \%where, \%options);
560              
561             Same as L, but performs all operations non-blocking and returns a
562             L object to be used as a promise instead of accepting a
563             callback.
564              
565             $db->update_p(some_table => {foo => 'baz'} => {foo => 'bar'})->then(sub {
566             my $results = shift;
567             ...
568             })->catch(sub {
569             my $err = shift;
570             ...
571             })->wait;
572              
573             =head1 SEE ALSO
574              
575             L, L, L.
576              
577             =cut