File Coverage

blib/lib/Database/Async/Query.pm
Criterion Covered Total %
statement 30 129 23.2
branch 0 22 0.0
condition 0 35 0.0
subroutine 10 42 23.8
pod 7 22 31.8
total 47 250 18.8


line stmt bran cond sub pod time code
1             package Database::Async::Query;
2              
3 2     2   14 use strict;
  2         4  
  2         148  
4 2     2   11 use warnings;
  2         5  
  2         132  
5              
6             our $VERSION = '0.015'; # VERSION
7              
8             =head1 NAME
9              
10             Database::Async::Query - represents a single database query
11              
12             =head1 SYNOPSIS
13              
14             my $query = Database::Async::Query->new(
15             db => Database::Async->new(...),
16             );
17              
18             =head1 DESCRIPTION
19              
20             A query:
21              
22             =over 4
23              
24             =item * has zero or more parameters
25              
26             =item * accepts zero or more input rows
27              
28             =item * returns zero or more output rows
29              
30             =item * can be prepared or direct
31              
32             =back
33              
34             =head2 Creating queries
35              
36             Queries are initiated from the L instance; users
37             are not expected to instantiate L objects
38             directly.
39              
40             my $query = $db->query('select 1');
41              
42             =head2 Passing parameters
43              
44             Normally additional parameters for L
45             are passed when creating the query instance:
46              
47             my $query_with_parameters = $db->query('select name from users where id = ?', $id);
48              
49             For prepared statements, query parameters can be bound and passed for each
50             execution, see L for more details.
51              
52             =head2 Retrieving data
53              
54             Methods prefixed with C each provide a L which
55             emits events for each row returned from the query:
56              
57             $db->query(q{select * from a_table})
58             ->row_hashrefs
59             ->each(sub {
60             say "Had ID $_->{id} with name $_->{name}"
61             })->await;
62              
63             =head2 Direct queries
64              
65             A direct query is of the form:
66              
67             $db->query(q{select 1})
68              
69             or
70              
71             $db->query(q{select * from some_table where id = ?}, $id)
72              
73             and has all the information required to start the query.
74              
75             =head2 Prepared queries
76              
77             When the same query needs to be executed multiple times with different
78             parameters, it may be worth using a prepared query. This involves sending
79             the SQL for the query to the server so that it can parse the text and
80             prepare a plan. Once complete, you can then send a set of parameters
81             and have the server execute and return any results.
82              
83             A prepared query involves two steps.
84              
85             First, the query is created:
86              
87             my $query = $db->query(q{select * from some_table where id = ?});
88              
89             Next, it will need to be prepared. This will fail if the query was
90             provided any parameters when initially constructed:
91              
92             $query->prepare(
93             statement => 'xyz'
94             );
95              
96             Some engines allow a C parameter, others will ignore it.
97              
98             After a call to L, the query is marked as prepared and will
99             support the L and L methods. Once the query is prepared,
100             it is traditional to bind some variables to it:
101              
102             $query->bind(
103             $var1, $var2, ...
104             );
105              
106             after which it can be executed:
107              
108             $query->execute(
109             portal => 'abc'
110             );
111              
112             and any results can be extracted by the usual methods such as L.
113              
114             Again, some engines support named portals, others will ignore the parameter.
115              
116             Since passing parameters is so common, you can combine the L and
117             L steps by passing an arrayref to L:
118              
119             $query->execute([ $first_value, ... ], portal => '');
120              
121             Streaming of values via L is also supported:
122              
123             $query->execute($src, portal => '');
124              
125             Note that prepared queries will continue to emit values from the C<< row_* >>
126             source(s) until the query itself is discarded. The caller is expected to
127             keep track of any required mapping from input parameters to output rows.
128              
129             A full example might look something like this:
130              
131             async sub name_by_user_id {
132             my ($self, $id) = @_;
133             my $q = await $self->{name_by_user_id} //= do {
134             # Prepare the query on first call
135             my $q = $self->db->query(q{select name from "site"."user" where id = ?});
136             $q->prepare(
137             statement => 'name_by_user_id'
138             )
139             };
140             my ($name) = await $q->execute([ $id ])->single;
141             return $name;
142             }
143              
144             =head2 Custom engine features
145              
146             Different engines support additional features or events.
147              
148             Once a query is scheduled onto an engine, it will resolve the L L
149             instance:
150              
151             my $query = $db->query('select * from some_table');
152             my $engine = await $query->engine;
153             $engine->source('notification')
154             ->map('payload')
155             ->say;
156              
157             =head2 Cancelling queries
158              
159             In cases where you want to terminate a query early, use the L method.
160             This will ask the engine to stop query execution if already scheduled. For a query
161             which has not yet been assigned to an engine, the L method will cancel
162             the schedule request.
163              
164             =head2 Cursors
165              
166             Cursors are handled as normal SQL queries.
167              
168             $db->txn(async sub {
169             my ($txn) = @_;
170             await $txn->query(q{declare c cursor for select id from xyz})->void;
171             say while await $txn->query(q{fetch next from c})->single;
172             await $txn->query(q{close c})->void;
173             });
174              
175             =cut
176              
177 2     2   12 no indirect;
  2         4  
  2         17  
178              
179 2     2   1039 use Database::Async::Row;
  2         6  
  2         98  
180              
181 2     2   13 use Future;
  2         6  
  2         66  
182 2     2   10 use Syntax::Keyword::Try;
  2         6  
  2         19  
183 2     2   1323 use Ryu::Async;
  2         320036  
  2         109  
184 2     2   26 use Scalar::Util qw(blessed);
  2         7  
  2         155  
185              
186 2     2   16 use Log::Any qw($log);
  2         4  
  2         14  
187              
188             use overload
189 0     0     '""' => sub { my ($self) = @_; sprintf '%s[%s]', ref($self), $self->sql },
  0            
190 0     0     bool => sub { 1 },
191 2     2   646 fallback => 1;
  2         7  
  2         30  
192              
193             sub new {
194 0     0 0   my ($class, %args) = @_;
195 0           Scalar::Util::weaken($args{db});
196 0           bless \%args, $class;
197             }
198              
199             =head2 in
200              
201             This is a L used for queries which stream data to the server.
202              
203             It's buffered internally.
204              
205             =cut
206              
207             sub in {
208 0     0 1   my ($self) = @_;
209 0   0       $self->{in} //= do {
210 0           my $sink = $self->db->new_sink;
211 0 0         die 'already have streaming input but no original ->{in} sink' if $self->{streaming_input};
212 0     0     $sink->source->completed->on_ready(sub { $log->debugf('Sink for %s completed with %s', $self, shift->state) });
  0            
213 0           $self->{streaming_input} = $sink->source->buffer->pause;
214             $self->ready_to_stream->on_done(sub {
215 0     0     $log->debugf('Ready to stream, resuming streaming input');
216 0           $self->streaming_input->resume;
217 0           });
218 0           $sink
219             }
220             }
221              
222             #sub {
223             # my ($self) = @_;
224             # my $engine = $self->{engine} or die 'needs a valid ::Engine instance';
225             # my $sink = $self->in or die 'had no valid sink for streaming input';
226             #
227             # my $src = $sink->buffer;
228             # $src->pause;
229             # $engine->stream_from($src);
230             #
231             # $self->ready_to_stream
232             # ->on_done(sub {
233             # $log->tracef('Ready to stream for %s', $sink);
234             # $src->resume;
235             # })->on_fail(sub {
236             # $src->completed->fail(@_) unless $sink->completed->is_ready;
237             # })->on_cancel(sub {
238             # $src->completed->cancel unless $sink->completed->is_ready;
239             # });
240             #}
241              
242 0   0 0 0   sub streaming_input { shift->{streaming_input} // die '->in has not yet been called' }
243              
244             sub finish {
245 0     0 0   my ($self) = @_;
246 0 0         if($self->{in}) {
247 0           $self->input_stream->done;
248             } else {
249 0           $self->input_stream->cancel;
250             }
251             }
252              
253             =head2 db
254              
255             Accessor for the L instance.
256              
257             =cut
258              
259 0     0 1   sub db { shift->{db} }
260              
261             =head2 sql
262              
263             The SQL string that this query would be running.
264              
265             =cut
266              
267 0     0 1   sub sql { shift->{sql} }
268              
269             =head2 bind
270              
271             A list of bind parameters for this query, can be empty.
272              
273             =cut
274              
275 0     0 1   sub bind { @{shift->{bind}} }
  0            
276              
277             sub row_description {
278 0     0 0   my ($self, $desc) = @_;
279 0           $log->tracef('Have row description %s', $desc);
280 0           my @names = map { $_->{name} } $desc->@*;
  0            
281 0           $self->{field_names} = \@names;
282             $self->{field_by_name} = {
283             # First column wins by default if we have multiple hits
284 0           map { $names[$_] => $_ } reverse 0..$#names
  0            
285             };
286 0           $self
287             }
288              
289             sub row {
290 0     0 0   my ($self, $row) = @_;
291 0           $log->tracef('Have row %s', $row);
292 0           $self->row_data->emit($row);
293             }
294              
295             sub row_hashrefs {
296 0     0 0   my ($self) = @_;
297             $self->{row_hashrefs} //= $self->row_data
298             ->map(sub {
299 0     0     my ($row) = @_;
300             +{
301 0           map {;
302 0           $self->{field_names}[$_] => $row->[$_]
303             } 0..$#$row
304             }
305 0   0       });
306             }
307              
308             sub row_arrayrefs {
309 0     0 0   my ($self) = @_;
310 0   0       $self->{row_arrayrefs} //= $self->row_data;
311             }
312              
313             =head2 start
314              
315             Schedules this query for execution.
316              
317             =cut
318              
319             sub start {
320 0     0 1   my ($self) = @_;
321 0   0       $self->{queued} //= $self->db->queue_query($self)->retain;
322             }
323              
324             sub run_on {
325 0     0 0   my ($self, $engine) = @_;
326 0           $log->tracef('Running query %s on %s', $self, $engine);
327 0           $self->{engine} = $engine;
328 0           $engine->query(
329             $self->sql,
330             $self->bind
331             );
332             }
333              
334             =head2 rows
335              
336             Returns a L which will emit the rows from this query.
337              
338             Each row is a L instance.
339              
340             Will call L if required.
341              
342             =cut
343              
344             sub row_data {
345 0     0 0   my ($self) = @_;
346 0   0       $self->{row_data} //= do {
347 0           my $row_data = $self->db->new_source;
348             $self->completed->on_ready(sub {
349 0     0     my ($f) = @_;
350 0 0         return if $row_data->is_ready;
351 0 0         return $row_data->finish if $f->is_done;
352 0 0         return if $f->is_cancelled;
353 0           return $row_data->fail($f->failure);
354 0           });
355             $row_data->completed->on_ready(sub {
356 0     0     my $f = $self->completed;
357 0 0         shift->on_ready($f) unless $f->is_ready;
358 0           });
359 0           $self->start;
360 0           $row_data;
361             };
362             }
363              
364             sub completed {
365 0     0 0   my ($self) = @_;
366 0   0       $self->{completed} //= do {
367 0           my $f = $self->db->new_future;
368             $self->start->on_fail(sub {
369 0 0   0     $f->fail(@_) unless $f->is_ready;
370 0           });
371 0           $f
372             }
373             }
374              
375             sub void {
376 0     0 0   my ($self) = @_;
377 0           $self->start;
378 0           return $self->completed;
379             }
380              
381             sub ready_to_stream {
382 0     0 0   my ($self) = @_;
383 0   0       $self->{ready_to_stream} //= $self->db->new_future;
384             }
385              
386             sub input_stream {
387 0     0 0   my ($self) = @_;
388 0   0       $self->{input_stream} //= $self->db->new_future;
389             }
390              
391             sub done {
392 0     0 0   my ($self) = @_;
393 0           my $f = $self->completed;
394 0 0         if($f->is_ready) {
395 0           $log->warnf('Calling ->done but we think our status is already %s', $f->state);
396 0           return $f;
397             }
398             # $self->in->completed->done unless $self->in->completed->is_ready;
399 0           $self->completed->done;
400             }
401              
402             sub from {
403 0     0 0   my ($self, $src) = @_;
404 0 0         if(ref($src) eq 'ARRAY') {
405 0           $src = Ryu::Source->from($src);
406             }
407 0 0 0       die 'Invalid source' unless blessed($src) and $src->isa('Ryu::Source');
408              
409 0           $self->in->from($src);
410 0           $src->prepare_await;
411 0           $self;
412             }
413              
414             sub rows {
415 0     0 1   my ($self) = @_;
416             $self->{rows} //= $self->row_data
417             ->map(sub {
418 0     0     my ($row) = @_;
419             Database::Async::Row->new(
420 0           index_by_name => +{ map { $row->[$_]->{description}->name => $_ } 0..$#$row },
  0            
421             data => $row
422             )
423             })
424 0   0       }
425              
426             =head2 single
427              
428             Used to retrieve data for a query that's always going to return a single row.
429              
430             Defaults to all columns, provide a list of indices to select a subset:
431              
432             # should yield "a", "b" and "c" as the three results
433             print for await $db->query(q{select 'a', 'b', 'c'})->single->as_list;
434              
435             # should yield just the ID column from the first row
436             print for await $db->query(q{select id, * from some_table})->single('id')->as_list;
437              
438             Returns a L which will resolve to the list of items.
439              
440             =cut
441              
442             sub single {
443 0     0 1   my ($self, @id) = @_;
444             $self->{single} //= $self->row_data
445             ->first
446             ->flat_map(sub {
447 0 0   0     [ @id ? @{$_}{@id} : @$_ ]
  0            
448 0   0       })->as_list;
449             }
450              
451             1;
452              
453             __END__