File Coverage

blib/lib/DBIx/Async.pm
Criterion Covered Total %
statement 27 91 29.6
branch 0 12 0.0
condition 0 4 0.0
subroutine 9 30 30.0
pod 16 16 100.0
total 52 153 33.9


line stmt bran cond sub pod time code
1             package DBIx::Async;
2             # ABSTRACT: database support for IO::Async via DBI
3 1     1   17577 use strict;
  1         2  
  1         27  
4 1     1   3 use warnings;
  1         1  
  1         24  
5              
6 1     1   373 use parent qw(IO::Async::Notifier);
  1         216  
  1         4  
7              
8             our $VERSION = '0.003';
9              
10             =head1 NAME
11              
12             DBIx::Async - use L with L
13              
14             =head1 VERSION
15              
16             version 0.003
17              
18             =head1 SYNOPSIS
19              
20             #!/usr/bin/env perl
21             use strict;
22             use warnings;
23             use feature qw(say);
24             use IO::Async::Loop;
25             use DBIx::Async;
26             my $loop = IO::Async::Loop->new;
27             say 'Connecting to db';
28             $loop->add(my $dbh = DBIx::Async->connect(
29             'dbi:SQLite:dbname=:memory:',
30             '',
31             '', {
32             AutoCommit => 1,
33             RaiseError => 1,
34             }
35             ));
36             $dbh->do(q{CREATE TABLE tmp(id integer primary key autoincrement, content text)})
37             # ... put some values in it
38             ->then(sub { $dbh->do(q{INSERT INTO tmp(content) VALUES ('some text'), ('other text') , ('more data')}) })
39             # ... and then read them back
40             ->then(sub {
41             # obviously you'd never really use * in a query like this...
42             my $sth = $dbh->prepare(q{select * from tmp});
43             $sth->execute;
44             # the while($row = fetchrow_hashref) construct isn't a good fit
45             # but we attempt to provide something reasonably close using the
46             # ->iterate helper
47             $sth->iterate(
48             fetchrow_hashref => sub {
49             my $row = shift;
50             say "Row: " . join(',', %$row);
51             }
52             );
53             })->on_done(sub {
54             say "Query complete";
55             })->on_fail(sub { warn "Failure: @_\n" })->get;
56              
57             =head1 DESCRIPTION
58              
59             Wrapper for L, for running queries much slower than usual but without blocking.
60              
61             C: This is an early release, please get in contact via email (see L
62             section) or RT before relying on it for anything.
63              
64             =head2 PERFORMANCE
65              
66             Greatly lacking. See C, in one sample run the results looked
67             like this:
68              
69             Rate DBIx::Async DBD::SQLite
70             DBIx::Async 1.57/s -- -89%
71             DBD::SQLite 13.8/s 776% --
72              
73             If you're doing anything more than occasional light queries, you'd probably be better
74             off with blocking DBI-based code running in a fork.
75              
76             =head1 METHODS
77              
78             Where possible, L method signatures are used.
79              
80             =cut
81              
82 1     1   12423 use IO::Async::Channel;
  1         52589  
  1         29  
83 1     1   482 use IO::Async::Routine;
  1         4264  
  1         23  
84 1     1   5 use Future;
  1         1  
  1         12  
85 1     1   446 use Module::Load qw();
  1         786  
  1         19  
86              
87 1     1   377 use Variable::Disposition qw(retain_future);
  1         338  
  1         45  
88              
89 1     1   353 use DBIx::Async::Handle;
  1         2  
  1         832  
90              
91             # temporary pending next release of curry
92             our $_curry_weak = sub {
93             my ($invocant, $code) = splice @_, 0, 2;
94             Scalar::Util::weaken($invocant) if Scalar::Util::blessed($invocant);
95             my @args = @_;
96             sub {
97             return unless $invocant;
98             $invocant->$code(@args => @_)
99             }
100             };
101              
102             =head2 connect
103              
104             Constuctor. Sets up our instance with parameters that will be used when we attempt to
105             connect to the given DSN.
106              
107             Takes the following options:
108              
109             =over 4
110              
111             =item * $dsn - the data source name, should be something like 'dbi:SQLite:dbname=:memory:'
112              
113             =item * $user - username to connect as
114              
115             =item * $pass - password to connect with
116              
117             =item * $opt - any options
118              
119             =back
120              
121             Options consist of:
122              
123             =over 4
124              
125             =item * RaiseError - set this to 1
126              
127             =item * AutoCommit - whether to run in AutoCommit mode by default, probably works better
128             if this is set to 1 as well
129              
130             =back
131              
132             C: Despite the name, this method does not initiate a connection. This may change in
133             a future version, but if this behaviour does change this method will still return C<$self>.
134              
135             Returns $self.
136              
137             =cut
138              
139             sub connect {
140 0     0 1   my $class = shift;
141 0           my ($dsn, $user, $pass, $opt) = @_;
142 0 0         my $self = bless {
143             options => {
144             RaiseError => 1,
145             PrintError => 0,
146             AutoCommit => 1,
147 0           %{ $opt || {} },
148             },
149             pass => $pass,
150             user => $user,
151             dsn => $dsn,
152             }, $class;
153 0           $self
154             }
155              
156             =head2 dsn
157              
158             Returns the DSN used in the L request.
159              
160             =cut
161              
162 0     0 1   sub dsn { shift->{dsn} }
163              
164             =head2 user
165              
166             Returns the username used in the L request.
167              
168             =cut
169              
170 0     0 1   sub user { shift->{user} }
171              
172             =head2 pass
173              
174             Returns the password used in the L request.
175              
176             =cut
177              
178 0     0 1   sub pass { shift->{pass} }
179              
180             =head2 options
181              
182             Returns any options that were set in the L request.
183              
184             =cut
185              
186 0     0 1   sub options { shift->{options} }
187              
188             =head2 do
189              
190             Runs a query with optional bind parameters. Takes the following parameters:
191              
192             =over 4
193              
194             =item * $sql - the query to run
195              
196             =item * $options - any options to apply (can be undef)
197              
198             =item * @params - the parameters to bind (can be empty)
199              
200             =back
201              
202             Returns a L which will resolve when this query completes.
203              
204             =cut
205              
206             sub do : method {
207 0     0 1   my ($self, $sql, $options, @params) = @_;
208 0           $self->queue({
209             op => 'do',
210             sql => $sql,
211             options => $options,
212             params => \@params,
213             });
214             }
215              
216             =head2 begin_work
217              
218             Starts a transaction.
219              
220             Returns a L which will resolve when this transaction has started.
221              
222             =cut
223              
224             sub begin_work {
225 0     0 1   my $self = shift;
226 0           $self->queue({ op => 'begin_work' });
227             }
228              
229             =head2 commit
230              
231             Commit the current transaction.
232              
233             Returns a L which will resolve when this transaction has been committed.
234              
235             =cut
236              
237             sub commit {
238 0     0 1   my $self = shift;
239 0           $self->queue({ op => 'commit' });
240             }
241              
242             =head2 savepoint
243              
244             Marks a savepoint. Takes a single parameter: the name to use for the savepoint.
245              
246             $dbh->savepoint('here');
247              
248             Returns a L which will resolve once the savepoint has been created.
249              
250             =cut
251              
252             sub savepoint {
253 0     0 1   my $self = shift;
254 0           my $savepoint = shift;
255 0           $self->queue({ op => 'savepoint', savepoint => $savepoint });
256             }
257              
258              
259             =head2 release
260              
261             Releases a savepoint. Takes a single parameter: the name to use for the savepoint.
262              
263             $dbh->release('here');
264              
265             This is similar to L for the work which has been completed since
266             the savepoint, although the database state is not updated until the transaction
267             itself is committed.
268              
269             Returns a L which will resolve once the savepoint has been released.
270              
271             =cut
272              
273             sub release {
274 0     0 1   my $self = shift;
275 0           my $savepoint = shift;
276 0           $self->queue({ op => 'release', savepoint => $savepoint });
277             }
278              
279             =head2 rollback
280              
281             Rolls back this transaction. Takes an optional savepoint which
282             can be used to roll back to the savepoint rather than cancelling
283             the entire transaction.
284              
285             Returns a L which will resolve once the transaction has been
286             rolled back.
287              
288             =cut
289              
290             sub rollback {
291 0     0 1   my $self = shift;
292 0           my $savepoint = shift;
293 0           $self->queue({ op => 'rollback', savepoint => $savepoint });
294             }
295              
296             =head2 prepare
297              
298             Attempt to prepare a query.
299              
300             Returns the statement handle as a L instance.
301              
302             =cut
303              
304             sub prepare {
305 0     0 1   my $self = shift;
306 0           my $sql = shift;
307 0           DBIx::Async::Handle->new(
308             dbh => $self,
309             prepare => $self->queue({ op => 'prepare', sql => $sql }),
310             );
311             }
312              
313             =head1 INTERNAL METHODS
314              
315             These are unlikely to be of much use in application code.
316              
317             =cut
318              
319             =head2 queue
320              
321             Queue a request. Used internally.
322              
323             Returns a L.
324              
325             =cut
326              
327             sub queue {
328 0     0 1   my $self = shift;
329 0           my ($req, $code) = @_;
330 0           my $f = $self->loop->new_future;
331 0   0       $self->debug_printf("Sending request [%s]", join ',', map { $_ . '=' . ($req->{$_} // '') } sort keys %$req);
  0            
332              
333 0           $self->sth_ch->send($req);
334             $self->ret_ch->recv(
335             on_recv => sub {
336 0     0     my ($ch, $rslt) = @_;
337 0 0         if($rslt->{status} eq 'ok') {
338 0           $f->done($rslt);
339             } else {
340 0   0       $f->fail($rslt->{message} // 'unknown exception');
341             }
342             }
343 0           );
344 0           retain_future $f;
345             }
346              
347             =head2 worker_class_from_dsn
348              
349             Attempts to locate a suitable worker subclass, given a DSN.
350              
351             For example:
352              
353             $dbh->worker_class_from_dsn('dbi:SQLite:memory')
354              
355             should return 'DBIx::Async::Worker::SQLite'.
356              
357             Note that this method will load the class if it is not already
358             present.
359              
360             Returns the class as a string.
361              
362             =cut
363              
364             sub worker_class_from_dsn {
365 0     0 1   my $self = shift;
366 0           my $dsn = shift;
367 0           my ($dbd) = $dsn =~ /^dbi:([^:]+)(?::|$)/;
368 0 0         die "Invalid DBD class: $dbd" unless $dbd =~ /^[a-zA-Z0-9]+$/;
369              
370 0           my $loaded;
371             my $class;
372 0           for my $subclass ($dbd, 'Default') {
373 0 0         last if $loaded;
374 0           $class = 'DBIx::Async::Worker::' . $subclass;
375             eval {
376 0           Module::Load::load($class);
377 0           $loaded = 1
378 0 0         } or do {
379             # TODO we need proper error handling here,
380             # but some DSNs won't have a related worker
381             # class, default to something perhaps?
382 0           warn "Failed to load: $@"
383             };
384             }
385 0 0         die "Could not find suitable class for $dbd" unless $loaded;
386 0           $class;
387             }
388              
389             =head2 sth_ch
390              
391             The channel used for prepared statements.
392              
393             =cut
394              
395 0     0 1   sub sth_ch { shift->{sth_ch} }
396              
397             =head2 ret_ch
398              
399             The channel which returns values.
400              
401             =cut
402              
403 0     0 1   sub ret_ch { shift->{ret_ch} }
404              
405             =head2 _add_to_loop
406              
407             Sets things up when we are added to a loop.
408              
409             =cut
410              
411             sub _add_to_loop {
412 0     0     my ($self, $loop) = @_;
413              
414 0           my $worker = $self->worker_class_from_dsn($self->dsn)->new(
415             parent => $self,
416             sth_ch => ($self->{sth_ch} = IO::Async::Channel->new),
417             ret_ch => ($self->{ret_ch} = IO::Async::Channel->new),
418             );
419             my $routine = IO::Async::Routine->new(
420             channels_in => [ $self->{sth_ch} ],
421             channels_out => [ $self->{ret_ch} ],
422 0     0     code => sub { $worker->run },
423             on_finish => $self->$_curry_weak(sub {
424 0     0     my $self = shift;
425 0           $self->debug_printf("The routine aborted early with [%s]", $_[-1]);
426 0           }),
427             );
428 0           $self->add_child($routine);
429             }
430              
431             =head2 _remove_from_loop
432              
433             Doesn't do anything yet.
434              
435             =cut
436              
437             sub _remove_from_loop {
438 0     0     my $self = shift;
439 0           my ($loop) = @_;
440             }
441              
442             1;
443              
444             __END__