File Coverage

blib/lib/DBIx/Async.pm
Criterion Covered Total %
statement 9 9 100.0
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 12 12 100.0


line stmt bran cond sub pod time code
1             package DBIx::Async;
2             # ABSTRACT: database support for IO::Async via DBI
3 1     1   37042 use strict;
  1         2  
  1         41  
4 1     1   5 use warnings;
  1         3  
  1         33  
5 1     1   1047 use parent qw(IO::Async::Notifier);
  1         377  
  1         7  
6              
7             our $VERSION = '0.002';
8              
9             =head1 NAME
10              
11             DBIx::Async - use L with L
12              
13             =head1 VERSION
14              
15             version 0.002
16              
17             =head1 SYNOPSIS
18              
19             #!/usr/bin/env perl
20             use strict;
21             use warnings;
22             use feature qw(say);
23             use IO::Async::Loop;
24             use DBIx::Async;
25             my $loop = IO::Async::Loop->new;
26             say 'Connecting to db';
27             $loop->add(my $dbh = DBIx::Async->connect(
28             'dbi:SQLite:dbname=test.sqlite3',
29             '',
30             '', {
31             AutoCommit => 1,
32             RaiseError => 1,
33             }
34             ));
35             $dbh->do(q{CREATE TABLE tmp(id integer primary key autoincrement, content text)})
36             # ... put some values in it
37             ->then(sub { $dbh->do(q{INSERT INTO tmp(content) VALUES ('some text'), ('other text') , ('more data')}) })
38             # ... and then read them back
39             ->then(sub {
40             # obviously you'd never really use * in a query like this...
41             my $sth = $dbh->prepare(q{select * from tmp});
42             $sth->execute;
43             # the while($row = fetchrow_hashref) construct isn't a good fit
44             # but we attempt to provide something reasonably close using the
45             # ->iterate helper
46             $sth->iterate(
47             fetchrow_hashref => sub {
48             my $row = shift;
49             say "Row: " . join(',', %$row);
50             }
51             );
52             })->on_done(sub {
53             say "Query complete";
54             $loop->stop;
55             })->on_fail(sub { warn "Failure: @_\n" });
56             $loop->run;
57              
58             =head1 DESCRIPTION
59              
60             Wrapper for L, for running queries much slower than usual but without blocking.
61              
62             C: This is an early release, please get in contact via email (see L
63             section) or RT before relying on it for anything.
64              
65             =head2 PERFORMANCE
66              
67             Greatly lacking. See C, in one sample run the results looked
68             like this:
69              
70             Rate DBIx::Async DBD::SQLite
71             DBIx::Async 1.57/s -- -89%
72             DBD::SQLite 13.8/s 776% --
73              
74             If you're doing anything more than occasional light queries, you'd probably be better
75             off with blocking DBI-based code running in a fork.
76              
77             =head1 METHODS
78              
79             Where possible, L method signatures are used.
80              
81             =cut
82              
83             use IO::Async::Channel;
84             use IO::Async::Routine;
85             use Future;
86             use Module::Load qw();
87             use Try::Tiny;
88              
89             use DBIx::Async::Handle;
90              
91             use constant DEBUG => 0;
92              
93             =head2 connect
94              
95             Constuctor. Sets up our instance with parameters that will be used when we attempt to
96             connect to the given DSN.
97              
98             Takes the following options:
99              
100             =over 4
101              
102             =item * $dsn - the data source name, should be something like 'dbi:SQLite:dbname=:memory:'
103              
104             =item * $user - username to connect as
105              
106             =item * $pass - password to connect with
107              
108             =item * $opt - any options
109              
110             =back
111              
112             Options consist of:
113              
114             =over 4
115              
116             =item * RaiseError - set this to 1
117              
118             =item * AutoCommit - whether to run in AutoCommit mode by default, probably works better
119             if this is set to 1 as well
120              
121             =back
122              
123             C: Despite the name, this method does not initiate a connection. This may change in
124             a future version, but if this behaviour does change this method will still return C<$self>.
125              
126             Returns $self.
127              
128             =cut
129              
130             sub connect {
131             my $class = shift;
132             my ($dsn, $user, $pass, $opt) = @_;
133             my $self = bless {
134             options => {
135             RaiseError => 1,
136             PrintError => 0,
137             AutoCommit => 1,
138             %{ $opt || {} },
139             },
140             pass => $pass,
141             user => $user,
142             dsn => $dsn,
143             }, $class;
144             $self
145             }
146              
147             =head2 dsn
148              
149             Returns the DSN used in the L request.
150              
151             =cut
152              
153             sub dsn { shift->{dsn} }
154              
155             =head2 user
156              
157             Returns the username used in the L request.
158              
159             =cut
160              
161             sub user { shift->{user} }
162              
163             =head2 pass
164              
165             Returns the password used in the L request.
166              
167             =cut
168              
169             sub pass { shift->{pass} }
170              
171             =head2 options
172              
173             Returns any options that were set in the L request.
174              
175             =cut
176              
177             sub options { shift->{options} }
178              
179             =head2 do
180              
181             Runs a query with optional bind parameters. Takes the following parameters:
182              
183             =over 4
184              
185             =item * $sql - the query to run
186              
187             =item * $options - any options to apply (can be undef)
188              
189             =item * @params - the parameters to bind (can be empty)
190              
191             =back
192              
193             Returns a L which will resolve when this query completes.
194              
195             =cut
196              
197             sub do : method {
198             my ($self, $sql, $options, @params) = @_;
199             $self->queue({
200             op => 'do',
201             sql => $sql,
202             options => $options,
203             params => \@params,
204             });
205             }
206              
207             =head2 begin_work
208              
209             Starts a transaction.
210              
211             Returns a L which will resolve when this transaction has started.
212              
213             =cut
214              
215             sub begin_work {
216             my $self = shift;
217             $self->queue({ op => 'begin_work' });
218             }
219              
220             =head2 commit
221              
222             Commit the current transaction.
223              
224             Returns a L which will resolve when this transaction has been committed.
225              
226             =cut
227              
228             sub commit {
229             my $self = shift;
230             $self->queue({ op => 'commit' });
231             }
232              
233             =head2 savepoint
234              
235             Marks a savepoint. Takes a single parameter: the name to use for the savepoint.
236              
237             $dbh->savepoint('here');
238              
239             Returns a L which will resolve once the savepoint has been created.
240              
241             =cut
242              
243             sub savepoint {
244             my $self = shift;
245             my $savepoint = shift;
246             $self->queue({ op => 'savepoint', savepoint => $savepoint });
247             }
248              
249              
250             =head2 release
251              
252             Releases a savepoint. Takes a single parameter: the name to use for the savepoint.
253              
254             $dbh->release('here');
255              
256             This is similar to L for the work which has been completed since
257             the savepoint, although the database state is not updated until the transaction
258             itself is committed.
259              
260             Returns a L which will resolve once the savepoint has been released.
261              
262             =cut
263              
264             sub release {
265             my $self = shift;
266             my $savepoint = shift;
267             $self->queue({ op => 'release', savepoint => $savepoint });
268             }
269              
270             =head2 rollback
271              
272             Rolls back this transaction. Takes an optional savepoint which
273             can be used to roll back to the savepoint rather than cancelling
274             the entire transaction.
275              
276             Returns a L which will resolve once the transaction has been
277             rolled back.
278              
279             =cut
280              
281             sub rollback {
282             my $self = shift;
283             my $savepoint = shift;
284             $self->queue({ op => 'rollback', savepoint => $savepoint });
285             }
286              
287             =head2 prepare
288              
289             Attempt to prepare a query.
290              
291             Returns the statement handle as a L instance.
292              
293             =cut
294              
295             sub prepare {
296             my $self = shift;
297             my $sql = shift;
298             DBIx::Async::Handle->new(
299             dbh => $self,
300             prepare => $self->queue({ op => 'prepare', sql => $sql }),
301             );
302             }
303              
304             =head1 INTERNAL METHODS
305              
306             These are unlikely to be of much use in application code.
307              
308             =cut
309              
310             =head2 queue
311              
312             Queue a request. Used internally.
313              
314             Returns a L.
315              
316             =cut
317              
318             sub queue {
319             my $self = shift;
320             my ($req, $code) = @_;
321             my $f = $self->loop->new_future;
322             if(DEBUG) {
323             require Data::Dumper;
324             warn "Sending req " . Data::Dumper::Dumper($req);
325             }
326             $self->sth_ch->send($req);
327             $self->ret_ch->recv(
328             on_recv => sub {
329             my ( $ch, $rslt ) = @_;
330             if($rslt->{status} eq 'ok') {
331             $f->done($rslt);
332             } else {
333             $f->fail($rslt->{message});
334             }
335             }
336             );
337             $f
338             }
339              
340             =head2 worker_class_from_dsn
341              
342             Returns $self.
343              
344             =cut
345              
346             sub worker_class_from_dsn {
347             my $self = shift;
348             my $dsn = shift;
349             my ($dbd) = $dsn =~ /^dbi:([^:]+)(?::|$)/;
350             die "Invalid DBD class: $dbd" unless $dbd =~ /^[a-zA-Z0-9]+$/;
351             my $loaded;
352             my $class;
353             for my $subclass ($dbd, 'Default') {
354             last if $loaded;
355             $class = 'DBIx::Async::Worker::' . $subclass;
356             try {
357             Module::Load::load($class);
358             $loaded = 1
359             } catch {
360             warn "class load: $_\n" if DEBUG;
361             };
362             }
363             die "Could not find suitable class for $dbd" unless $loaded;
364             $class;
365             }
366              
367             =head2 sth_ch
368              
369             The channel used for prepared statements.
370              
371             =cut
372              
373             sub sth_ch { shift->{sth_ch} }
374              
375             =head2 ret_ch
376              
377             The channel which returns values.
378              
379             =cut
380              
381             sub ret_ch { shift->{ret_ch} }
382              
383             =head2 _add_to_loop
384              
385             Sets things up when we are added to a loop.
386              
387             =cut
388              
389             sub _add_to_loop {
390             my ($self, $loop) = @_;
391              
392             my $worker = $self->worker_class_from_dsn($self->dsn)->new(
393             parent => $self,
394             sth_ch => ($self->{sth_ch} = IO::Async::Channel->new),
395             ret_ch => ($self->{ret_ch} = IO::Async::Channel->new),
396             );
397             my $routine = IO::Async::Routine->new(
398             channels_in => [ $self->{sth_ch} ],
399             channels_out => [ $self->{ret_ch} ],
400             code => sub { $worker->run },
401             on_finish => sub {
402             print "The routine aborted early - $_[-1]\n";
403             $self->loop->stop;
404             },
405             );
406             $loop->add($routine);
407             }
408              
409             =head2 _remove_from_loop
410              
411             Doesn't do anything.
412              
413             =cut
414              
415             sub _remove_from_loop {
416             my $self = shift;
417             my ($loop) = @_;
418             warn "Removed from loop\n" if DEBUG;
419             }
420              
421             1;
422              
423             __END__