File Coverage

blib/lib/Mojo/Pg/Che.pm
Criterion Covered Total %
statement 10 12 83.3
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 14 16 87.5


line stmt bran cond sub pod time code
1             package Mojo::Pg::Che;
2              
3 1     1   14399 use Mojo::Base 'Mojo::EventEmitter';#'Mojo::Pg';
  1         8039  
  1         7  
4              
5             =pod
6              
7             =encoding utf-8
8              
9             Доброго всем
10              
11             =head1 Mojo::Pg::Che
12              
13             ¡ ¡ ¡ ALL GLORY TO GLORIA ! ! !
14              
15             =head1 NAME
16              
17             Mojo::Pg::Che - mix of parent Mojo::Pg and DBI.pm
18              
19             =head1 DESCRIPTION
20              
21             See L
22              
23             =head1 VERSION
24              
25             Version 0.800
26              
27             =cut
28              
29             our $VERSION = '0.800';
30              
31              
32             =head1 SYNOPSIS
33              
34             use Mojo::Pg::Che;
35              
36             my $pg = Mojo::Pg::Che->connect("dbname=test;", "postgres", 'pg-pwd', \%attrs);
37             # or
38             my $pg = Mojo::Pg::Che->new
39             ->dsn("DBI:Pg:dbname=test;")
40             ->username("postgres")
41             ->password('pg--pw')
42             ->options(\%attrs);
43            
44             # or
45             my $pg = Mojo::Pg->new('pg://postgres@/test');
46              
47             # Bloking query
48             my $result = $pg->query('select ...', undef, @bind);
49            
50             # Non-blocking query
51             my $result = $pg->query('select ...', {Async => 1, ...}, @bind);
52            
53             # Cached query
54             my $result = $pg->query('select ...', {Cached => 1, ...}, @bind);
55            
56             # prepare sth
57             my $sth = $pg->prepare('select ...');
58            
59             # cached sth
60             my $sth = $pg->prepare_cached('select ...');
61            
62             # Non-blocking query sth
63             my $r = $pg->query($sth, undef, @bind, sub {my ($db, $err, $result) = @_; ...});
64             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
65            
66             # Result non-blocking query sth
67             my $result = $pg->query($sth, {Async => 1,}, @bind,);
68             # Mojo::Pg::Results style
69             $result->hash->{...}
70             # DBI style
71             $result->fetchrow_hashref->{...};
72            
73             # Mojo::Pg style
74             my $now = $pg->db->query('select now() as now')->hash->{now};
75             $pg->db->query('select pg_sleep(?::int), now() as now', undef, 2, $cb);
76            
77             # DBI style
78             my $now = $pg->selectrow_hashref('select now() as now')->{now};
79             my $now = $pg->db->selectrow_hashref('select now() as now')->{now};
80            
81             my $now = $pg->selectrow_array('select now() as now');
82              
83             =head2 Transaction syntax
84              
85             eval {
86             my $tx = $pg->begin;
87             $tx->query('insert into foo (name) values (?)', 'bar');
88             $tx->do('insert into foo (name) values (?)', 'baz');
89             $tx->commit;
90             };
91             die $@ if $@;
92            
93             my $db = $pg->db;
94             $db->begin;
95             $db->do('insert into foo (name) values (?)', 'bazzzz');
96             $db->rollback;
97             $db->begin;
98             $db->query('insert into foo (name) values (?)', 'barrr');
99             $db->commit;
100              
101             =head1 Non-blocking query cases
102              
103             Depends on $attr->{Async} and callback:
104              
105             1. $attr->{Async} set to 1. None $cb. Callback will create and Mojo::IOLoop will auto start. Method C<< ->query() >> will return result object. Methods C<<->select...()>> will return there perl structures.
106              
107             2. $attr->{Async} not set. $cb defined. All ->query() and ->select...() methods will return reactor object and results pass to $cb. You need start Mojo::IOLoop:
108              
109             my @results;
110             my $cb = sub {
111             my ($db, $err, $results) = @_;
112             die $err if $err;
113             push @results, $results;
114             };
115             $pg->query('select ?::date as d, pg_sleep(?::int)', undef, ("2016-06-$_", 1), $cb)
116             for 17..23;
117             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
118             like($_->hash->{d}, qr/2016-06-\d+/, 'correct async query')
119             for @results;
120              
121              
122             3. $attr->{Async} set to 1. $cb defined. Mojo::IOLoop will auto start. Results pass to $cb.
123              
124              
125             =head1 METHODS
126              
127             All methods from parent module L are inherits and implements the following new ones.
128              
129             =head2 connect
130              
131             DBI-style of new object instance. See L
132              
133             =head2 db
134              
135             Overriden method of L. Because can first input param - DBI database handler (when prepared statement used).
136              
137             =head2 prepare
138              
139             Prepare and return DBI statement handler for query string.
140              
141             =head2 prepare_cached
142              
143             Prepare and return DBI cached statement handler for query string.
144              
145             =head2 query
146              
147             Like L but input params - L
148              
149             Blocking query without attr B or callback.
150              
151             Non-blocking query with attr B or callback.
152              
153             =head2 selectrow_array
154              
155             DBI style quering. See L. Blocking | non-blocking. Input params - L.
156              
157             =head2 selectrow_arrayref
158              
159             DBI style quering. See L. Blocking | non-blocking. Input params - L.
160              
161             =head2 selectrow_hashref
162              
163             DBI style quering. See L. Blocking | non-blocking. Input params - L.
164              
165             =head2 selectall_arrayref
166              
167             DBI style quering. See L. Blocking | non-blocking. Input params - L.
168              
169             =head2 selectall_hashref
170              
171             DBI style quering. See L. Blocking | non-blocking. Input params - L.
172              
173             =head2 selectcol_arrayref
174              
175             DBI style quering. See L. Blocking | non-blocking. Input params - L.
176              
177             =head2 do
178              
179             DBI style quering. See L. Blocking | non-blocking. Input params - L.
180              
181             =head2 begin
182              
183             Start transaction and return new L object which attr C< {tx} > is a L object. Sinonyms are: C<< ->tx >> and C<< ->begin_work >>.
184              
185             =head1 Params for quering methods
186              
187             The methods C, C, C has next ordered input params:
188              
189             =over 4
190              
191             =item * String query | statement handler object
192              
193             =item * Hashref attrs (optional)
194              
195             =item * Array of bind values (optional)
196              
197             =item * Last param - callback/coderef for non-blocking (optional)
198              
199             =back
200              
201             =head1 SEE ALSO
202              
203             L
204              
205             L
206              
207             =head1 AUTHOR
208              
209             Михаил Че (Mikhail Che), C<< >>
210              
211             =head1 BUGS / CONTRIBUTING
212              
213             Please report any bugs or feature requests at L. Pull requests also welcome.
214              
215             =head1 COPYRIGHT
216              
217             Copyright 2016 Mikhail Che.
218              
219             This library is free software; you can redistribute it and/or modify
220             it under the same terms as Perl itself.
221              
222             =cut
223              
224 1     1   2925 use DBI;
  1         12833  
  1         58  
225 1     1   7 use Carp qw(croak);
  1         5  
  1         46  
226 1     1   557 use Mojo::Pg::Che::Database;
  0            
  0            
227             use Mojo::URL;
228             use Scalar::Util 'weaken';
229              
230             has database_class => 'Mojo::Pg::Che::Database';
231             has dsn => 'dbi:Pg:';
232             has max_connections => 5;
233             has [qw(password username)] => '';
234             has pubsub => sub {
235             require Mojo::Pg::PubSub;
236             my $pubsub = Mojo::Pg::PubSub->new(pg => shift);
237             #~ weaken $pubsub->{pg};#???
238             #Mojo::Reactor::EV: Timer failed: Can't call method "db" on an undefined value at t/06-pubsub.t line 21.
239             #EV: error in callback (ignoring): Can't call method "db" on an undefined value at Mojo/Pg/PubSub.pm line 44.
240              
241             return $pubsub;
242             };
243              
244             has options => sub {
245             {AutoCommit => 1, AutoInactiveDestroy => 1, PrintError => 0, RaiseError => 1, ShowErrorStatement => 1, pg_enable_utf8 => 1,};
246             };
247              
248             has debug => $ENV{DEBUG_Mojo_Pg_Che} || 0;
249             my $PKG = __PACKAGE__;
250              
251             sub from_string {# copy/paste Mojo::Pg
252             my ($self, $str) = @_;
253              
254             # Protocol
255             return $self unless $str;
256             my $url = Mojo::URL->new($str);
257             croak qq{Invalid PostgreSQL connection string "$str"}
258             unless $url->protocol =~ /^(?:pg|postgres(?:ql)?)$/;
259              
260             # Connection information
261             my $db = $url->path->parts->[0];
262             my $dsn = defined $db ? "dbi:Pg:dbname=$db" : 'dbi:Pg:';
263             if (my $host = $url->host) { $dsn .= ";host=$host" }
264             if (my $port = $url->port) { $dsn .= ";port=$port" }
265             if (defined(my $username = $url->username)) { $self->username($username) }
266             if (defined(my $password = $url->password)) { $self->password($password) }
267              
268             # Service
269             my $hash = $url->query->to_hash;
270             if (my $service = delete $hash->{service}) { $dsn .= "service=$service" }
271              
272             # Options
273             @{$self->options}{keys %$hash} = values %$hash;
274              
275             return $self->dsn($dsn);
276             }
277              
278             sub new { @_ > 1 ? shift->SUPER::new->from_string(@_) : shift->SUPER::new }# copy/paste Mojo::Pg
279              
280             sub connect {
281             my $self = shift->SUPER::new;
282             map $self->$_(shift), qw(dsn username password);
283             if (my $attrs = shift) {
284             my $options = $self->options;
285             @$options{ keys %$attrs } = values %$attrs;
286             }
287             $self->dsn('DBI:Pg:'.$self->dsn)
288             unless $self->dsn =~ /^DBI:Pg:/;
289             say STDERR sprintf("[$PKG->connect] prepare connection data for [%s]", $self->dsn, )
290             if $self->debug;
291             return $self;
292             }
293              
294             sub db {
295             my ($self, $dbh) = (shift, shift);
296              
297             # Fork-safety
298             delete @$self{qw(pid queue)} unless ($self->{pid} //= $$) eq $$;
299            
300             $dbh ||= $self->_dequeue;
301              
302             return $self->database_class->new(dbh => $dbh, pg => $self);
303             }
304              
305             sub prepare { shift->db->prepare(@_); }
306             sub prepare_cached { shift->db->prepare_cached(@_); }
307              
308             sub _db_sth {shift->db(ref $_[0] && $_[0]->{Database})}
309              
310             sub query { shift->_db_sth(@_)->select(@_) }
311             sub select { shift->_db_sth(@_)->select(@_) }
312             sub selectrow_array { shift->_db_sth(@_)->selectrow_array(@_) }
313             sub selectrow_arrayref { shift->_db_sth(@_)->selectrow_arrayref(@_) }
314             sub selectrow_hashref { shift->_db_sth(@_)->selectrow_hashref(@_) }
315             sub selectall_arrayref { shift->_db_sth(@_)->selectall_arrayref(@_) }
316             sub selectall_hashref { shift->_db_sth(@_)->selectall_hashref(@_) }
317             sub selectcol_arrayref { shift->_db_sth(@_)->selectcol_arrayref(@_) }
318             sub do { shift->_db_sth(@_)->do(@_) }
319              
320             #~ sub begin_work {croak 'Use $pg->db->tx | $pg->db->begin';}
321             sub tx {shift->begin}
322             sub begin_work {shift->begin}
323             sub begin {
324             my $self = shift;
325             my $db = $self->db;
326             $db->begin;
327             return $db;
328             }
329              
330             sub commit {croak 'Use: $tx = $pg->begin; $tx->do(...); $tx->commit;';}
331             sub rollback {croak 'Use: $tx = $pg->begin; $tx->do(...); $tx->rollback;';}
332              
333             # Patch parent Mojo::Pg::_dequeue
334             sub _dequeue {
335             my $self = shift;
336              
337             #~ while (my $dbh = shift @{$self->{queue} || []}) { return $dbh if $dbh->ping }
338            
339             my $queue = $self->{queue} ||= [];
340             for my $i (0..$#$queue) {
341            
342             my $dbh = $queue->[$i];
343              
344             next
345             if $dbh->{pg_async_status} && $dbh->{pg_async_status} > 0;
346            
347             splice(@$queue, $i, 1); #~ delete $queue->[$i]
348            
349             ($self->debug
350             && (say STDERR sprintf("[$PKG->_dequeue] [$dbh] does dequeued, pool count:[%s]", scalar @$queue))
351             && 0)
352             or return $dbh
353             if $dbh->ping;
354            
355             }
356            
357             my $dbh = DBI->connect(map { $self->$_ } qw(dsn username password options));
358             $self->debug
359             && say STDERR sprintf("[$PKG->_dequeue] new DBI connection [$dbh]", );
360             #~ say STDERR "НОвое [$dbh] соединение";
361            
362              
363             $self->emit(connection => $dbh);
364              
365             return $dbh;
366             }
367              
368             sub _enqueue {
369             my ($self, $dbh) = @_;
370             my $queue = $self->{queue} ||= [];
371             #~ warn "queue++ $dbh:", scalar @$queue and
372            
373             if ($dbh->{Active} && ($dbh->{pg_async_status} && $dbh->{pg_async_status} > 0) || @$queue < $self->max_connections) {
374             unshift @$queue, $dbh;
375             $self->debug
376             && say STDERR sprintf("[$PKG->_enqueue] [$dbh] does enqueued, pool count:[%s], pg_async_status=[%s]", scalar @$queue, $dbh->{pg_async_status});
377             return;
378             }
379             #~ shift @$queue while @$queue > $self->max_connections;
380             $self->debug
381             && say STDERR sprintf("[$PKG->_enqueue] [$dbh] does not enqueued, pool count:[%s]", scalar @$queue);
382             }
383              
384             1;
385              
386