File Coverage

blib/lib/Mojo/Pg/Che.pm
Criterion Covered Total %
statement 3 3 100.0
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 4 4 100.0


line stmt bran cond sub pod time code
1             package Mojo::Pg::Che;
2              
3 1     1   13737 use Mojo::Base 'Mojo::Pg';
  1         7025  
  1         5  
4              
5             =pod
6              
7             =encoding utf-8
8              
9             Доброго всем
10              
11             =head1 Mojo::Pg::Che
12              
13             ¡ ¡ ¡ ALL GLORY TO GLORIA ! ! !
14              
15             =head1 NAME
16              
17             Mojo::Pg::Che - mix of parent Mojo::Pg and DBI.pm
18              
19             =head1 VERSION
20              
21             Version 0.070
22              
23             =cut
24              
25             our $VERSION = '0.070';
26              
27              
28             =head1 SYNOPSIS
29              
30              
31              
32             use Mojo::Pg::Che;
33              
34             my $pg = Mojo::Pg::Che->connect("dbname=test;", "postgres", 'pg-pwd', \%attrs);
35             # or
36             my $pg = Mojo::Pg::Che->new
37             ->dsn("DBI:Pg:dbname=test;")
38             ->username("postgres")
39             ->password('pg--pw')
40             ->options(\%attrs);
41              
42             # Bloking query
43             my $result = $pg->query('select ...', undef, @bind);
44            
45             # Non-blocking query
46             my $result = $pg->query('select ...', {Async => 1, ...}, @bind);
47            
48             # Cached query
49             my $result = $pg->query('select ...', {Cached => 1, ...}, @bind);
50            
51             # prepare sth
52             my $sth = $pg->prepare('select ...');
53            
54             # cached sth
55             my $sth = $pg->prepare_cached('select ...');
56            
57             # Non-blocking query sth
58             my $r = $pg->query($sth, undef, @bind, sub {my ($db, $err, $result) = @_; ...});
59             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
60            
61             # Result non-blocking query sth
62             my $result = $pg->query($sth, {Async => 1,}, @bind,);
63            
64             # Mojo::Pg style
65             my $now = $pg->db->query('select now() as now')->hash->{now};
66            
67             # DBI style
68             my $now = $pg->selectrow_hashref('select now() as now')->{now};
69             my $now = $pg->db->selectrow_hashref('select now() as now')->{now};
70            
71             my $now = $pg->selectrow_array('select now() as now');
72              
73             =head2 Transaction syntax
74              
75             eval {
76             my $tx = $pg->begin;
77             $tx->query('insert into foo (name) values (?)', 'bar');
78             $tx->do('insert into foo (name) values (?)', 'baz');
79             $tx->commit;
80             };
81             die $@ if $@;
82            
83             my $db = $pg->db;
84             $db->begin;
85             $db->do('insert into foo (name) values (?)', 'bazzzz');
86             $db->rollback;
87             $db->begin;
88             $db->query('insert into foo (name) values (?)', 'barrr');
89             $db->commit;
90              
91             =head1 Non-blocking query cases
92              
93             Depends on $attr->{Async} and callback:
94              
95             1. $attr->{Async} set to 1. None $cb. Callback will create and Mojo::IOLoop will auto start. Method C<< ->query() >> will return result object. Methods C<<->select...()>> will return there perl structures.
96              
97             2. $attr->{Async} not set. $cb defined. All ->query() and ->select...() methods will return reactor object and results pass to $cb. You need start Mojo::IOLoop:
98              
99             my @results;
100             my $cb = sub {
101             my ($db, $err, $results) = @_;
102             die $err if $err;
103             push @results, $results;
104             };
105             $pg->query('select ?::date as d, pg_sleep(?::int)', undef, ("2016-06-$_", 1), $cb)
106             for 17..23;
107             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
108             like($_->hash->{d}, qr/2016-06-\d+/, 'correct async query')
109             for @results;
110              
111              
112             3. $attr->{Async} set to 1. $cb defined. Mojo::IOLoop will auto start. Results pass to $cb.
113              
114              
115             =head1 METHODS
116              
117             All methods from parent module L are inherits and implements the following new ones.
118              
119             =head2 connect
120              
121             DBI-style of new object instance. See L
122              
123             =head2 db
124              
125             Overriden method of L. Because can first input param - DBI database handler (when prepared statement used).
126              
127             =head2 prepare
128              
129             Prepare and return DBI statement handler for query string.
130              
131             =head2 prepare_cached
132              
133             Prepare and return DBI cached statement handler for query string.
134              
135             =head2 query
136              
137             Like L but input params - L
138              
139             Blocking query without attr B or callback.
140              
141             Non-blocking query with attr B or callback.
142              
143             =head2 selectrow_array
144              
145             DBI style quering. See L. Blocking | non-blocking. Input params - L.
146              
147             =head2 selectrow_arrayref
148              
149             DBI style quering. See L. Blocking | non-blocking. Input params - L.
150              
151             =head2 selectrow_hashref
152              
153             DBI style quering. See L. Blocking | non-blocking. Input params - L.
154              
155             =head2 selectall_arrayref
156              
157             DBI style quering. See L. Blocking | non-blocking. Input params - L.
158              
159             =head2 selectall_hashref
160              
161             DBI style quering. See L. Blocking | non-blocking. Input params - L.
162              
163             =head2 selectcol_arrayref
164              
165             DBI style quering. See L. Blocking | non-blocking. Input params - L.
166              
167             =head2 do
168              
169             DBI style quering. See L. Blocking | non-blocking. Input params - L.
170              
171             =head2 begin
172              
173             Start transaction and return new L object which attr C< {tx} > is a L object. Sinonyms are: C<< ->tx >> and C<< ->begin_work >>.
174              
175             =head1 Params for quering methods
176              
177             The methods C, C, C has next ordered input params:
178              
179             =over 4
180              
181             =item * String query | statement handler object
182              
183             =item * Hashref attrs (optional)
184              
185             =item * Array of bind values (optional)
186              
187             =item * Last param - callback/coderef for non-blocking (optional)
188              
189             =back
190              
191             =head1 SEE ALSO
192              
193             L
194              
195             L
196              
197             =head1 AUTHOR
198              
199             Михаил Че (Mikhail Che), C<< >>
200              
201             =head1 BUGS / CONTRIBUTING
202              
203             Please report any bugs or feature requests at L. Pull requests also welcome.
204              
205             =head1 COPYRIGHT
206              
207             Copyright 2016 Mikhail Che.
208              
209             This library is free software; you can redistribute it and/or modify
210             it under the same terms as Perl itself.
211              
212             =cut
213              
214             use Carp qw(croak);
215              
216             has database_class => sub {
217             require Mojo::Pg::Che::Database;
218             'Mojo::Pg::Che::Database';
219             };
220              
221             has options => sub {
222             {AutoCommit => 1, AutoInactiveDestroy => 1, PrintError => 0, RaiseError => 1, ShowErrorStatement => 1, pg_enable_utf8 => 1,};
223             };
224              
225             has [qw(debug)];
226              
227             sub connect {
228             my $self = shift->SUPER::new;
229             map $self->$_(shift), qw(dsn username password);
230             if (my $attrs = shift) {
231             my $options = $self->options;
232             @$options{ keys %$attrs } = values %$attrs;
233             }
234             $self->dsn('DBI:Pg:'.$self->dsn)
235             unless $self->dsn =~ /^DBI:Pg:/;
236             return $self;
237             }
238              
239             sub query {
240             my $self = shift;
241             #~ my ($query, $attrs, @bind) = @_;
242             my ($sth, $query) = ref $_[0] ? (shift, undef) : (undef, shift);
243            
244             my $attrs = shift;
245             my $async = delete $attrs->{Async} || delete $attrs->{pg_async};
246            
247             my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
248             my $result;
249             $cb ||= sub {
250             my ($db, $err) = map shift, 1..2;
251             croak "Error on non-blocking query: ",$err
252             if $err;
253             $result = shift;
254            
255             } if $async;
256            
257             my @bind = @_;
258            
259             #~ $sth ||= $self->prepare($query, $attrs, 3); ?????
260            
261             if ($sth) {$result = $self->db($sth->{Database})->execute_sth($sth, @bind, $cb ? ($cb) : ());}
262             else {$result = $self->db->execute_string($query, $attrs, @bind, $cb ? ($cb) : (),);}
263            
264             Mojo::IOLoop->start if $async && not(Mojo::IOLoop->is_running);
265              
266             return $result;
267            
268             }
269              
270             sub db {
271             my ($self, $dbh) = (shift, shift);
272              
273             # Fork-safety
274             delete @$self{qw(pid queue)} unless ($self->{pid} //= $$) eq $$;
275            
276             $dbh ||= $self->_dequeue;
277              
278             return $self->database_class->new(dbh => $dbh, pg => $self);
279             }
280              
281             sub prepare { shift->db->prepare(@_); }
282             sub prepare_cached { shift->db->prepare_cached(@_); }
283              
284             sub _db_sth {shift->db(ref $_[0] && $_[0]->{Database})}
285              
286             sub selectrow_array { shift->_db_sth(@_)->selectrow_array(@_) }
287             sub selectrow_arrayref { shift->_db_sth(@_)->selectrow_arrayref(@_) }
288             sub selectrow_hashref { shift->_db_sth(@_)->selectrow_hashref(@_) }
289             sub selectall_arrayref { shift->_db_sth(@_)->selectall_arrayref(@_) }
290             sub selectall_hashref { shift->_db_sth(@_)->selectall_hashref(@_) }
291             sub selectcol_arrayref { shift->_db_sth(@_)->selectcol_arrayref(@_) }
292             sub do { shift->_db_sth(@_)->do(@_) }
293              
294             #~ sub begin_work {croak 'Use $pg->db->tx | $pg->db->begin';}
295             sub tx {shift->begin}
296             sub begin_work {shift->begin}
297             sub begin {
298             my $self = shift;
299             my $db = $self->db;
300             $db->begin;
301             return $db;
302             }
303              
304             sub commit {croak 'Use: $tx = $pg->begin; $tx->do(...); $tx->commit;';}
305             sub rollback {croak 'Use: $tx = $pg->begin; $tx->do(...); $tx->rollback;';}
306              
307             # Patch parent Mojo::Pg::_dequeue
308             sub _dequeue {
309             my $self = shift;
310              
311             #~ while (my $dbh = shift @{$self->{queue} || []}) { return $dbh if $dbh->ping }
312            
313             my $queue = $self->{queue} ||= [];
314             for my $i (0..$#$queue) {
315            
316             my $dbh = $queue->[$i];
317              
318             next
319             if $dbh->{pg_async_status} && $dbh->{pg_async_status} > 0;
320            
321             splice(@$queue, $i, 1); #~ delete $queue->[$i]
322            
323             #~ warn "queue-- $dbh", scalar @$queue and
324             return $dbh
325             if $dbh->ping;
326            
327             }
328            
329             my $dbh = DBI->connect(map { $self->$_ } qw(dsn username password options));
330             #~ say STDERR "НОвое [$dbh] соединение";
331            
332              
333             #~ if (my $path = $self->search_path) {
334             #~ my $search_path = join ', ', map { $dbh->quote_identifier($_) } @$path;
335             #~ $dbh->do("set search_path to $search_path");
336             #~ }
337            
338             #~ ++$self->{migrated} and $self->migrations->migrate
339             #~ if !$self->{migrated} && $self->auto_migrate;
340             $self->emit(connection => $dbh);
341              
342             return $dbh;
343             }
344              
345             sub _enqueue {
346             my ($self, $dbh) = @_;
347             my $queue = $self->{queue} ||= [];
348             #~ warn "queue++ $dbh:", scalar @$queue and
349             push @$queue, $dbh
350             if $dbh->{Active} && @$queue < $self->max_connections;
351             #~ shift @$queue while @$queue > $self->max_connections;
352             }
353              
354             1;
355              
356