File Coverage

blib/lib/Mojo/Pg/Database.pm
Criterion Covered Total %
statement 7 9 77.7
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 10 12 83.3


line stmt bran cond sub pod time code
1             package Mojo::Pg::Database;
2 6     6   42 use Mojo::Base 'Mojo::EventEmitter';
  6         15  
  6         45  
3              
4 6     6   1137 use Carp qw(croak shortmess);
  6         13  
  6         311  
5 6     6   4923 use DBD::Pg ':async';
  0            
  0            
6             use Mojo::IOLoop;
7             use Mojo::JSON 'to_json';
8             use Mojo::Pg::Results;
9             use Mojo::Pg::Transaction;
10             use Mojo::Util 'monkey_patch';
11             use Scalar::Util 'weaken';
12              
13             has [qw(dbh pg)];
14             has results_class => 'Mojo::Pg::Results';
15              
16             for my $name (qw(delete insert select update)) {
17             monkey_patch __PACKAGE__, $name, sub {
18             my ($self, @cb) = (shift, ref $_[-1] eq 'CODE' ? pop : ());
19             return $self->query($self->pg->abstract->$name(@_), @cb);
20             };
21             monkey_patch __PACKAGE__, "${name}_p", sub {
22             my $self = shift;
23             return $self->query_p($self->pg->abstract->$name(@_));
24             };
25             }
26              
27             sub DESTROY {
28             my $self = shift;
29              
30             my $waiting = $self->{waiting};
31             $waiting->{cb}($self, 'Premature connection close', undef) if $waiting->{cb};
32              
33             return unless (my $pg = $self->pg) && (my $dbh = $self->dbh);
34             $pg->_enqueue($dbh) unless $dbh->{private_mojo_no_reuse};
35             }
36              
37             sub begin {
38             my $self = shift;
39             my $tx = Mojo::Pg::Transaction->new(db => $self);
40             weaken $tx->{db};
41             return $tx;
42             }
43              
44             sub disconnect {
45             my $self = shift;
46             $self->_unwatch;
47             $self->dbh->disconnect;
48             }
49              
50             sub dollar_only { ++$_[0]{dollar_only} and return $_[0] }
51              
52             sub is_listening { !!keys %{shift->{listen} || {}} }
53              
54             sub listen {
55             my ($self, $name) = @_;
56              
57             my $dbh = $self->dbh;
58             $dbh->do('listen ' . $dbh->quote_identifier($name))
59             unless $self->{listen}{$name}++;
60             $self->_watch;
61              
62             return $self;
63             }
64              
65             sub notify {
66             my ($self, $name, $payload) = @_;
67              
68             my $dbh = $self->dbh;
69             my $notify = 'notify ' . $dbh->quote_identifier($name);
70             $notify .= ', ' . $dbh->quote($payload) if defined $payload;
71             $dbh->do($notify);
72             $self->_notifications;
73              
74             return $self;
75             }
76              
77             sub pid { shift->dbh->{pg_pid} }
78              
79             sub ping { shift->dbh->ping }
80              
81             sub query {
82             my ($self, $query) = (shift, shift);
83             my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
84              
85             croak 'Non-blocking query already in progress' if $self->{waiting};
86              
87             my %attrs;
88             $attrs{pg_placeholder_dollaronly} = 1 if delete $self->{dollar_only};
89             $attrs{pg_async} = PG_ASYNC if $cb;
90             my $sth = $self->dbh->prepare_cached($query, \%attrs, 3);
91             local $sth->{HandleError} = sub { $_[0] = shortmess $_[0]; 0 };
92              
93             for (my $i = 0; $#_ >= $i; $i++) {
94             my ($param, $attrs) = ($_[$i], {});
95             if (ref $param eq 'HASH') {
96             if (exists $param->{json}) { $param = to_json $param->{json} }
97             elsif (exists $param->{type} && exists $param->{value}) {
98             ($attrs->{pg_type}, $param) = @{$param}{qw(type value)};
99             }
100             }
101             $sth->bind_param($i + 1, $param, $attrs);
102             }
103             $sth->execute;
104              
105             # Blocking
106             unless ($cb) {
107             $self->_notifications;
108             return $self->results_class->new(sth => $sth);
109             }
110              
111             # Non-blocking
112             $self->{waiting} = {cb => $cb, sth => $sth};
113             $self->_watch;
114             }
115              
116             sub query_p {
117             my $self = shift;
118             my $promise = Mojo::IOLoop->delay;
119             $self->query(
120             @_ => sub { $_[1] ? $promise->reject($_[1]) : $promise->resolve($_[2]) });
121             return $promise;
122             }
123              
124             sub tables {
125             my @tables = shift->dbh->tables('', '', '', '');
126             return [grep { $_ !~ /^(?:pg_catalog|information_schema)\./ } @tables];
127             }
128              
129             sub unlisten {
130             my ($self, $name) = @_;
131              
132             my $dbh = $self->dbh;
133             $dbh->do('unlisten ' . $dbh->quote_identifier($name));
134             $name eq '*' ? delete $self->{listen} : delete $self->{listen}{$name};
135             $self->_unwatch unless $self->{waiting} || $self->is_listening;
136              
137             return $self;
138             }
139              
140             sub _notifications {
141             my $self = shift;
142             my $dbh = $self->dbh;
143             while (my $n = $dbh->pg_notifies) { $self->emit(notification => @$n) }
144             }
145              
146             sub _unwatch {
147             my $self = shift;
148             return unless delete $self->{watching};
149             Mojo::IOLoop->singleton->reactor->remove($self->{handle});
150             $self->emit('close') if $self->is_listening;
151             }
152              
153             sub _watch {
154             my $self = shift;
155              
156             return if $self->{watching} || $self->{watching}++;
157              
158             my $dbh = $self->dbh;
159             unless ($self->{handle}) {
160             open $self->{handle}, '<&', $dbh->{pg_socket} or die "Can't dup: $!";
161             }
162             Mojo::IOLoop->singleton->reactor->io(
163             $self->{handle} => sub {
164             my $reactor = shift;
165              
166             $self->_unwatch if !eval { $self->_notifications; 1 };
167             return unless $self->{waiting} && $dbh->pg_ready;
168             my ($sth, $cb) = @{delete $self->{waiting}}{qw(sth cb)};
169              
170             # Do not raise exceptions inside the event loop
171             my $result = do { local $dbh->{RaiseError} = 0; $dbh->pg_result };
172             my $err = defined $result ? undef : $dbh->errstr;
173              
174             $self->$cb($err, $self->results_class->new(sth => $sth));
175             $self->_unwatch unless $self->{waiting} || $self->is_listening;
176             }
177             )->watch($self->{handle}, 1, 0);
178             }
179              
180             1;
181              
182             =encoding utf8
183              
184             =head1 NAME
185              
186             Mojo::Pg::Database - Database
187              
188             =head1 SYNOPSIS
189              
190             use Mojo::Pg::Database;
191              
192             my $db = Mojo::Pg::Database->new(pg => $pg, dbh => $dbh);
193             $db->query('select * from foo')
194             ->hashes->map(sub { $_->{bar} })->join("\n")->say;
195              
196             =head1 DESCRIPTION
197              
198             L is a container for L database handles used by
199             L.
200              
201             =head1 EVENTS
202              
203             L inherits all events from L and can
204             emit the following new ones.
205              
206             =head2 close
207              
208             $db->on(close => sub {
209             my $db = shift;
210             ...
211             });
212              
213             Emitted when the database connection gets closed while waiting for
214             notifications.
215              
216             =head2 notification
217              
218             $db->on(notification => sub {
219             my ($db, $name, $pid, $payload) = @_;
220             ...
221             });
222              
223             Emitted when a notification has been received.
224              
225             =head1 ATTRIBUTES
226              
227             L implements the following attributes.
228              
229             =head2 dbh
230              
231             my $dbh = $db->dbh;
232             $db = $db->dbh($dbh);
233              
234             L database handle used for all queries.
235              
236             # Use DBI utility methods
237             my $quoted = $db->dbh->quote_identifier('foo.bar');
238              
239             =head2 pg
240              
241             my $pg = $db->pg;
242             $db = $db->pg(Mojo::Pg->new);
243              
244             L object this database belongs to.
245              
246             =head2 results_class
247              
248             my $class = $db->results_class;
249             $db = $db->results_class('MyApp::Results');
250              
251             Class to be used by L, defaults to L. Note that
252             this class needs to have already been loaded before L is called.
253              
254             =head1 METHODS
255              
256             L inherits all methods from L and
257             implements the following new ones.
258              
259             =head2 begin
260              
261             my $tx = $db->begin;
262              
263             Begin transaction and return L object, which will
264             automatically roll back the transaction unless
265             L has been called before it is destroyed.
266              
267             # Insert rows in a transaction
268             eval {
269             my $tx = $db->begin;
270             $db->insert('frameworks', {name => 'Catalyst'});
271             $db->insert('frameworks', {name => 'Mojolicious'});
272             $tx->commit;
273             };
274             say $@ if $@;
275              
276             =head2 delete
277              
278             my $results = $db->delete($table, \%where, \%options);
279              
280             Generate a C statement with L (usually an
281             L object) and execute it with L. You can also append a
282             callback to perform operations non-blocking.
283              
284             $db->delete(some_table => sub {
285             my ($db, $err, $results) = @_;
286             ...
287             });
288             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
289              
290             Use all the same argument variations you would pass to the C method of
291             L.
292              
293             # "delete from some_table"
294             $db->delete('some_table');
295              
296             # "delete from some_table where foo = 'bar'"
297             $db->delete('some_table', {foo => 'bar'});
298              
299             # "delete from some_table where foo like '%test%'"
300             $db->delete('some_table', {foo => {-like => '%test%'}});
301              
302             # "delete from some_table where foo = 'bar' returning id"
303             $db->delete('some_table', {foo => 'bar'}, {returning => 'id'});
304              
305             =head2 delete_p
306              
307             my $promise = $db->delete_p($table, \%where, \%options);
308              
309             Same as L, but performs all operations non-blocking and returns a
310             L object to be used as a promise instead of accepting a
311             callback.
312              
313             $db->delete_p('some_table')->then(sub {
314             my $results = shift;
315             ...
316             })->catch(sub {
317             my $err = shift;
318             ...
319             })->wait;
320              
321             =head2 disconnect
322              
323             $db->disconnect;
324              
325             Disconnect L and prevent it from getting reused.
326              
327             =head2 dollar_only
328              
329             $db = $db->dollar_only;
330              
331             Activate C for next L call and allow C
332             to be used as an operator.
333              
334             # Check for a key in a JSON document
335             $db->dollar_only->query('select * from foo where bar ? $1', 'baz')
336             ->expand->hashes->map(sub { $_->{bar}{baz} })->join("\n")->say;
337              
338             =head2 insert
339              
340             my $results = $db->insert($table, \@values || \%fieldvals, \%options);
341              
342             Generate an C statement with L (usually an
343             L object) and execute it with L. You can also append a
344             callback to perform operations non-blocking.
345              
346             $db->insert(some_table => {foo => 'bar'} => sub {
347             my ($db, $err, $results) = @_;
348             ...
349             });
350             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
351              
352             Use all the same argument variations you would pass to the C method of
353             L.
354              
355             # "insert into some_table (foo, baz) values ('bar', 'yada')"
356             $db->insert('some_table', {foo => 'bar', baz => 'yada'});
357              
358             # "insert into some_table (foo) values ({1,2,3})"
359             $db->insert('some_table', {foo => [1, 2, 3]});
360              
361             # "insert into some_table (foo) values ('bar') returning id"
362             $db->insert('some_table', {foo => 'bar'}, {returning => 'id'});
363              
364             # "insert into some_table (foo) values ('bar') returning id, foo"
365             $db->insert('some_table', {foo => 'bar'}, {returning => ['id', 'foo']});
366              
367             =head2 insert_p
368              
369             my $promise = $db->insert_p($table, \@values || \%fieldvals, \%options);
370              
371             Same as L, but performs all operations non-blocking and returns a
372             L object to be used as a promise instead of accepting a
373             callback.
374              
375             $db->insert_p(some_table => {foo => 'bar'})->then(sub {
376             my $results = shift;
377             ...
378             })->catch(sub {
379             my $err = shift;
380             ...
381             })->wait;
382              
383             =head2 is_listening
384              
385             my $bool = $db->is_listening;
386              
387             Check if L is listening for notifications.
388              
389             =head2 listen
390              
391             $db = $db->listen('foo');
392              
393             Subscribe to a channel and receive L events when the
394             L event loop is running.
395              
396             =head2 notify
397              
398             $db = $db->notify('foo');
399             $db = $db->notify(foo => 'bar');
400              
401             Notify a channel.
402              
403             =head2 pid
404              
405             my $pid = $db->pid;
406              
407             Return the process id of the backend server process.
408              
409             =head2 ping
410              
411             my $bool = $db->ping;
412              
413             Check database connection.
414              
415             =head2 query
416              
417             my $results = $db->query('select * from foo');
418             my $results = $db->query('insert into foo values (?, ?, ?)', @values);
419             my $results = $db->query('select ?::json as foo', {json => {bar => 'baz'}});
420              
421             Execute a blocking L
422             statement and return a results object based on L (which is
423             usually L) with the query results. The L statement
424             handle will be automatically reused when it is not active anymore, to increase
425             the performance of future queries. You can also append a callback to perform
426             operations non-blocking.
427              
428             $db->query('insert into foo values (?, ?, ?)' => @values => sub {
429             my ($db, $err, $results) = @_;
430             ...
431             });
432             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
433              
434             Hash reference arguments containing a value named C, will be encoded to
435             JSON text with L. To accomplish the reverse, you can use
436             the method L, which automatically decodes all fields
437             of the types C and C with L to Perl values.
438              
439             # "I ♥ Mojolicious!"
440             $db->query('select ?::jsonb as foo', {json => {bar => 'I ♥ Mojolicious!'}})
441             ->expand->hash->{foo}{bar};
442              
443             Hash reference arguments containing values named C and C, can be
444             used to bind specific L data types to placeholders.
445              
446             # Insert binary data
447             use DBD::Pg ':pg_types';
448             $db->query('insert into bar values (?)', {type => PG_BYTEA, value => $bytes});
449              
450             =head2 query_p
451              
452             my $promise = $db->query_p('select * from foo');
453              
454             Same as L, but performs all operations non-blocking and returns a
455             L object to be used as a promise instead of accepting a
456             callback.
457              
458             $db->query_p('insert into foo values (?, ?, ?)' => @values)->then(sub {
459             my $results = shift;
460             ...
461             })->catch(sub {
462             my $err = shift;
463             ...
464             })->wait;
465              
466             =head2 select
467              
468             my $results = $db->select($source, $fields, $where, $order);
469              
470             Generate a C
471             L object) and execute it with L. You can also append a
472             callback to perform operations non-blocking.
473              
474             $db->select(some_table => ['foo'] => {bar => 'yada'} => sub {
475             my ($db, $err, $results) = @_;
476             ...
477             });
478             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
479              
480             Use all the same argument variations you would pass to the C
481             L.
482              
483             # "select * from some_table"
484             $db->select('some_table');
485              
486             # "select id, foo from some_table"
487             $db->select('some_table', ['id', 'foo']);
488              
489             # "select * from some_table where foo = 'bar'"
490             $db->select('some_table', undef, {foo => 'bar'});
491              
492             # "select * from some_table where foo = 'bar' order by id desc"
493             $db->select('some_table', undef, {foo => 'bar'}, {-desc => 'id'});
494              
495             # "select * from some_table where foo like '%test%'"
496             $db->select('some_table', undef, {foo => {-like => '%test%'}});
497              
498             =head2 select_p
499              
500             my $promise = $db->select_p($source, $fields, $where, $order);
501              
502             Same as L, but performs all operations non-blocking and returns a
503             L object to be used as a promise instead of accepting a
504             callback.
505              
506             $db->select_p(some_table => ['foo'] => {bar => 'yada'})->then(sub {
507             my $results = shift;
508             ...
509             })->catch(sub {
510             my $err = shift;
511             ...
512             })->wait;
513              
514             =head2 tables
515              
516             my $tables = $db->tables;
517              
518             Return table and view names for this database, that are visible to the current
519             user and not internal, as an array reference.
520              
521             # Names of all tables
522             say for @{$db->tables};
523              
524             =head2 unlisten
525              
526             $db = $db->unlisten('foo');
527             $db = $db->unlisten('*');
528              
529             Unsubscribe from a channel, C<*> can be used to unsubscribe from all channels.
530              
531             =head2 update
532              
533             my $results = $db->update($table, \%fieldvals, \%where, \%options);
534              
535             Generate an C statement with L (usually an
536             L object) and execute it with L. You can also append a
537             callback to perform operations non-blocking.
538              
539             $db->update(some_table => {foo => 'baz'} => {foo => 'bar'} => sub {
540             my ($db, $err, $results) = @_;
541             ...
542             });
543             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
544              
545             Use all the same argument variations you would pass to the C method of
546             L.
547              
548             # "update some_table set foo = 'bar' where id = 23"
549             $db->update('some_table', {foo => 'bar'}, {id => 23});
550              
551             # "update some_table set foo = {1,2,3} where id = 23"
552             $db->update('some_table', {foo => [1, 2, 3]}, {id => 23});
553              
554             # "update some_table set foo = 'bar' where foo like '%test%'"
555             $db->update('some_table', {foo => 'bar'}, {foo => {-like => '%test%'}});
556              
557             # "update some_table set foo = 'bar' where id = 23 returning id"
558             $db->update('some_table', {foo => 'bar'}, {id => 23}, {returning => 'id'});
559              
560             =head2 update_p
561              
562             my $promise = $db->update_p($table, \%fieldvals, \%where, \%options);
563              
564             Same as L, but performs all operations non-blocking and returns a
565             L object to be used as a promise instead of accepting a
566             callback.
567              
568             $db->update_p(some_table => {foo => 'baz'} => {foo => 'bar'})->then(sub {
569             my $results = shift;
570             ...
571             })->catch(sub {
572             my $err = shift;
573             ...
574             })->wait;
575              
576             =head1 SEE ALSO
577              
578             L, L, L.
579              
580             =cut