File Coverage

blib/lib/Mojo/Pg/Database.pm
Criterion Covered Total %
statement 7 9 77.7
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 10 12 83.3


line stmt bran cond sub pod time code
1             package Mojo::Pg::Database;
2 6     6   49 use Mojo::Base 'Mojo::EventEmitter';
  6         16  
  6         61  
3              
4 6     6   1333 use Carp qw(croak shortmess);
  6         16  
  6         360  
5 6     6   6792 use DBD::Pg ':async';
  0            
  0            
6             use Mojo::IOLoop;
7             use Mojo::JSON 'to_json';
8             use Mojo::Pg::Results;
9             use Mojo::Pg::Transaction;
10             use Mojo::Util 'monkey_patch';
11             use Scalar::Util 'weaken';
12              
13             has [qw(dbh pg)];
14             has results_class => 'Mojo::Pg::Results';
15              
16             for my $name (qw(delete insert select update)) {
17             monkey_patch __PACKAGE__, $name, sub {
18             my ($self, @cb) = (shift, ref $_[-1] eq 'CODE' ? pop : ());
19             return $self->query($self->pg->abstract->$name(@_), @cb);
20             };
21             }
22              
23             sub DESTROY {
24             my $self = shift;
25              
26             my $waiting = $self->{waiting};
27             $waiting->{cb}($self, 'Premature connection close', undef) if $waiting->{cb};
28              
29             return unless (my $pg = $self->pg) && (my $dbh = $self->dbh);
30             $pg->_enqueue($dbh) unless $dbh->{private_mojo_no_reuse};
31             }
32              
33             sub begin {
34             my $self = shift;
35             my $tx = Mojo::Pg::Transaction->new(db => $self);
36             weaken $tx->{db};
37             return $tx;
38             }
39              
40             sub disconnect {
41             my $self = shift;
42             $self->_unwatch;
43             $self->dbh->disconnect;
44             }
45              
46             sub dollar_only { ++$_[0]{dollar_only} and return $_[0] }
47              
48             sub is_listening { !!keys %{shift->{listen} || {}} }
49              
50             sub listen {
51             my ($self, $name) = @_;
52              
53             my $dbh = $self->dbh;
54             $dbh->do('listen ' . $dbh->quote_identifier($name))
55             unless $self->{listen}{$name}++;
56             $self->_watch;
57              
58             return $self;
59             }
60              
61             sub notify {
62             my ($self, $name, $payload) = @_;
63              
64             my $dbh = $self->dbh;
65             my $notify = 'notify ' . $dbh->quote_identifier($name);
66             $notify .= ', ' . $dbh->quote($payload) if defined $payload;
67             $dbh->do($notify);
68             $self->_notifications;
69              
70             return $self;
71             }
72              
73             sub pid { shift->dbh->{pg_pid} }
74              
75             sub ping { shift->dbh->ping }
76              
77             sub query {
78             my ($self, $query) = (shift, shift);
79             my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
80              
81             croak 'Non-blocking query already in progress' if $self->{waiting};
82              
83             my %attrs;
84             $attrs{pg_placeholder_dollaronly} = 1 if delete $self->{dollar_only};
85             $attrs{pg_async} = PG_ASYNC if $cb;
86             my $sth = $self->dbh->prepare_cached($query, \%attrs, 3);
87             local $sth->{HandleError} = sub { $_[0] = shortmess $_[0]; 0 };
88              
89             for (my $i = 0; $#_ >= $i; $i++) {
90             my ($param, $attrs) = ($_[$i], {});
91             if (ref $param eq 'HASH') {
92             if (exists $param->{json}) { $param = to_json $param->{json} }
93             elsif (exists $param->{type} && exists $param->{value}) {
94             ($attrs->{pg_type}, $param) = @{$param}{qw(type value)};
95             }
96             }
97             $sth->bind_param($i + 1, $param, $attrs);
98             }
99             $sth->execute;
100              
101             # Blocking
102             unless ($cb) {
103             $self->_notifications;
104             return $self->results_class->new(sth => $sth);
105             }
106              
107             # Non-blocking
108             $self->{waiting} = {cb => $cb, sth => $sth};
109             $self->_watch;
110             }
111              
112             sub tables {
113             my @tables = shift->dbh->tables('', '', '', '');
114             return [grep { $_ !~ /^(?:pg_catalog|information_schema)\./ } @tables];
115             }
116              
117             sub unlisten {
118             my ($self, $name) = @_;
119              
120             my $dbh = $self->dbh;
121             $dbh->do('unlisten ' . $dbh->quote_identifier($name));
122             $name eq '*' ? delete $self->{listen} : delete $self->{listen}{$name};
123             $self->_unwatch unless $self->{waiting} || $self->is_listening;
124              
125             return $self;
126             }
127              
128             sub _notifications {
129             my $self = shift;
130             my $dbh = $self->dbh;
131             while (my $n = $dbh->pg_notifies) { $self->emit(notification => @$n) }
132             }
133              
134             sub _unwatch {
135             my $self = shift;
136             return unless delete $self->{watching};
137             Mojo::IOLoop->singleton->reactor->remove($self->{handle});
138             $self->emit('close') if $self->is_listening;
139             }
140              
141             sub _watch {
142             my $self = shift;
143              
144             return if $self->{watching} || $self->{watching}++;
145              
146             my $dbh = $self->dbh;
147             unless ($self->{handle}) {
148             open $self->{handle}, '<&', $dbh->{pg_socket} or die "Can't dup: $!";
149             }
150             Mojo::IOLoop->singleton->reactor->io(
151             $self->{handle} => sub {
152             my $reactor = shift;
153              
154             $self->_unwatch if !eval { $self->_notifications; 1 };
155             return unless $self->{waiting} && $dbh->pg_ready;
156             my ($sth, $cb) = @{delete $self->{waiting}}{qw(sth cb)};
157              
158             # Do not raise exceptions inside the event loop
159             my $result = do { local $dbh->{RaiseError} = 0; $dbh->pg_result };
160             my $err = defined $result ? undef : $dbh->errstr;
161              
162             $self->$cb($err, $self->results_class->new(sth => $sth));
163             $self->_unwatch unless $self->{waiting} || $self->is_listening;
164             }
165             )->watch($self->{handle}, 1, 0);
166             }
167              
168             1;
169              
170             =encoding utf8
171              
172             =head1 NAME
173              
174             Mojo::Pg::Database - Database
175              
176             =head1 SYNOPSIS
177              
178             use Mojo::Pg::Database;
179              
180             my $db = Mojo::Pg::Database->new(pg => $pg, dbh => $dbh);
181             $db->query('select * from foo')
182             ->hashes->map(sub { $_->{bar} })->join("\n")->say;
183              
184             =head1 DESCRIPTION
185              
186             L is a container for L database handles used by
187             L.
188              
189             =head1 EVENTS
190              
191             L inherits all events from L and can
192             emit the following new ones.
193              
194             =head2 close
195              
196             $db->on(close => sub {
197             my $db = shift;
198             ...
199             });
200              
201             Emitted when the database connection gets closed while waiting for
202             notifications.
203              
204             =head2 notification
205              
206             $db->on(notification => sub {
207             my ($db, $name, $pid, $payload) = @_;
208             ...
209             });
210              
211             Emitted when a notification has been received.
212              
213             =head1 ATTRIBUTES
214              
215             L implements the following attributes.
216              
217             =head2 dbh
218              
219             my $dbh = $db->dbh;
220             $db = $db->dbh($dbh);
221              
222             L database handle used for all queries.
223              
224             # Use DBI utility methods
225             my $quoted = $db->dbh->quote_identifier('foo.bar');
226              
227             =head2 pg
228              
229             my $pg = $db->pg;
230             $db = $db->pg(Mojo::Pg->new);
231              
232             L object this database belongs to.
233              
234             =head2 results_class
235              
236             my $class = $db->results_class;
237             $db = $db->results_class('MyApp::Results');
238              
239             Class to be used by L, defaults to L. Note that
240             this class needs to have already been loaded before L is called.
241              
242             =head1 METHODS
243              
244             L inherits all methods from L and
245             implements the following new ones.
246              
247             =head2 begin
248              
249             my $tx = $db->begin;
250              
251             Begin transaction and return L object, which will
252             automatically roll back the transaction unless
253             L has been called before it is destroyed.
254              
255             # Insert rows in a transaction
256             eval {
257             my $tx = $db->begin;
258             $db->insert('frameworks', {name => 'Catalyst'});
259             $db->insert('frameworks', {name => 'Mojolicious'});
260             $tx->commit;
261             };
262             say $@ if $@;
263              
264             =head2 delete
265              
266             my $results = $db->delete($table, \%where, \%options);
267              
268             Generate a C statement with L (usually an
269             L object) and execute it with L. You can also append a
270             callback to perform operations non-blocking.
271              
272             $db->delete(some_table => sub {
273             my ($db, $err, $results) = @_;
274             ...
275             });
276             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
277              
278             Use all the same argument variations you would pass to the C method of
279             L.
280              
281             # "delete from some_table"
282             $db->delete('some_table');
283              
284             # "delete from some_table where foo = 'bar'"
285             $db->delete('some_table', {foo => 'bar'});
286              
287             # "delete from some_table where foo like '%test%'"
288             $db->delete('some_table', {foo => {-like => '%test%'}});
289              
290             # "delete from some_table where foo = 'bar' returning id"
291             $db->delete('some_table', {foo => 'bar'}, {returning => 'id'});
292              
293             =head2 disconnect
294              
295             $db->disconnect;
296              
297             Disconnect L and prevent it from getting reused.
298              
299             =head2 dollar_only
300              
301             $db = $db->dollar_only;
302              
303             Activate C for next L call and allow C
304             to be used as an operator.
305              
306             # Check for a key in a JSON document
307             $db->dollar_only->query('select * from foo where bar ? $1', 'baz')
308             ->expand->hashes->map(sub { $_->{bar}{baz} })->join("\n")->say;
309              
310             =head2 insert
311              
312             my $results = $db->insert($table, \@values || \%fieldvals, \%options);
313              
314             Generate an C statement with L (usually an
315             L object) and execute it with L. You can also append a
316             callback to perform operations non-blocking.
317              
318             $db->insert(some_table => {foo => 'bar'} => sub {
319             my ($db, $err, $results) = @_;
320             ...
321             });
322             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
323              
324             Use all the same argument variations you would pass to the C method of
325             L.
326              
327             # "insert into some_table (foo, baz) values ('bar', 'yada')"
328             $db->insert('some_table', {foo => 'bar', baz => 'yada'});
329              
330             # "insert into some_table (foo) values ({1,2,3})"
331             $db->insert('some_table', {foo => [1, 2, 3]});
332              
333             # "insert into some_table (foo) values ('bar') returning id"
334             $db->insert('some_table', {foo => 'bar'}, {returning => 'id'});
335              
336             # "insert into some_table (foo) values ('bar') returning id, foo"
337             $db->insert('some_table', {foo => 'bar'}, {returning => ['id', 'foo']});
338              
339             =head2 is_listening
340              
341             my $bool = $db->is_listening;
342              
343             Check if L is listening for notifications.
344              
345             =head2 listen
346              
347             $db = $db->listen('foo');
348              
349             Subscribe to a channel and receive L events when the
350             L event loop is running.
351              
352             =head2 notify
353              
354             $db = $db->notify('foo');
355             $db = $db->notify(foo => 'bar');
356              
357             Notify a channel.
358              
359             =head2 pid
360              
361             my $pid = $db->pid;
362              
363             Return the process id of the backend server process.
364              
365             =head2 ping
366              
367             my $bool = $db->ping;
368              
369             Check database connection.
370              
371             =head2 query
372              
373             my $results = $db->query('select * from foo');
374             my $results = $db->query('insert into foo values (?, ?, ?)', @values);
375             my $results = $db->query('select ?::json as foo', {json => {bar => 'baz'}});
376              
377             Execute a blocking L
378             statement and return a results object based on L (which is
379             usually L) with the query results. The L statement
380             handle will be automatically reused when it is not active anymore, to increase
381             the performance of future queries. You can also append a callback to perform
382             operations non-blocking.
383              
384             $db->query('insert into foo values (?, ?, ?)' => @values => sub {
385             my ($db, $err, $results) = @_;
386             ...
387             });
388             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
389              
390             Hash reference arguments containing a value named C, will be encoded to
391             JSON text with L. To accomplish the reverse, you can use
392             the method L, which automatically decodes all fields
393             of the types C and C with L to Perl values.
394              
395             # "I ♥ Mojolicious!"
396             $db->query('select ?::jsonb as foo', {json => {bar => 'I ♥ Mojolicious!'}})
397             ->expand->hash->{foo}{bar};
398              
399             Hash reference arguments containing values named C and C, can be
400             used to bind specific L data types to placeholders.
401              
402             # Insert binary data
403             use DBD::Pg ':pg_types';
404             $db->query('insert into bar values (?)', {type => PG_BYTEA, value => $bytes});
405              
406             =head2 select
407              
408             my $results = $db->select($source, $fields, $where, $order);
409              
410             Generate a C
411             L object) and execute it with L. You can also append a
412             callback to perform operations non-blocking.
413              
414             $db->select(some_table => ['foo'] => {bar => 'yada'} => sub {
415             my ($db, $err, $results) = @_;
416             ...
417             });
418             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
419              
420             Use all the same argument variations you would pass to the C
421             L.
422              
423             # "select * from some_table"
424             $db->select('some_table');
425              
426             # "select id, foo from some_table"
427             $db->select('some_table', ['id', 'foo']);
428              
429             # "select * from some_table where foo = 'bar'"
430             $db->select('some_table', undef, {foo => 'bar'});
431              
432             # "select * from some_table where foo = 'bar' order by id desc"
433             $db->select('some_table', undef, {foo => 'bar'}, {-desc => 'id'});
434              
435             # "select * from some_table where foo like '%test%'"
436             $db->select('some_table', undef, {foo => {-like => '%test%'}});
437              
438             =head2 tables
439              
440             my $tables = $db->tables;
441              
442             Return table and view names for this database, that are visible to the current
443             user and not internal, as an array reference.
444              
445             # Names of all tables
446             say for @{$db->tables};
447              
448             =head2 unlisten
449              
450             $db = $db->unlisten('foo');
451             $db = $db->unlisten('*');
452              
453             Unsubscribe from a channel, C<*> can be used to unsubscribe from all channels.
454              
455             =head2 update
456              
457             my $results = $db->update($table, \%fieldvals, \%where, \%options);
458              
459             Generate an C statement with L (usually an
460             L object) and execute it with L. You can also append a
461             callback to perform operations non-blocking.
462              
463             $db->update(some_table => {foo => 'baz'} => {foo => 'bar'} => sub {
464             my ($db, $err, $results) = @_;
465             ...
466             });
467             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
468              
469             Use all the same argument variations you would pass to the C method of
470             L.
471              
472             # "update some_table set foo = 'bar' where id = 23"
473             $db->update('some_table', {foo => 'bar'}, {id => 23});
474              
475             # "update some_table set foo = {1,2,3} where id = 23"
476             $db->update('some_table', {foo => [1, 2, 3]}, {id => 23});
477              
478             # "update some_table set foo = 'bar' where foo like '%test%'"
479             $db->update('some_table', {foo => 'bar'}, {foo => {-like => '%test%'}});
480              
481             # "update some_table set foo = 'bar' where id = 23 returning id"
482             $db->update('some_table', {foo => 'bar'}, {id => 23}, {returning => 'id'});
483              
484             =head1 SEE ALSO
485              
486             L, L, L.
487              
488             =cut