File Coverage

blib/lib/DB/Evented.pm
Criterion Covered Total %
statement 57 73 78.0
branch 4 12 33.3
condition 2 6 33.3
subroutine 11 15 73.3
pod 5 5 100.0
total 79 111 71.1


line stmt bran cond sub pod time code
1             package DB::Evented;
2              
3 1     1   55947 use 5.006;
  1         5  
  1         41  
4 1     1   760 use strictures;
  1         842  
  1         5  
5 1     1   863 use AnyEvent::DBI;
  1         58642  
  1         836  
6              
7             =head1 NAME
8              
9             DB::Evented - A pragmatic DBI like evented module.
10              
11             =cut
12              
13             our $VERSION = '0.06';
14             our $handlers = [];
15              
16             =head1 SYNOPSIS
17              
18             Doing selects in synchronise order is not always the most efficient way to interact with the
19             Database.
20              
21             use DB::Evented;
22              
23             my $evented = DB::Evented->new("DBI:SQLite2:dbname=$dname", "","");
24              
25             my $results;
26             $evented->selectcol_arrayref(
27             q{
28             select
29             test1,
30             test2
31             from
32             test
33             },
34             {
35             Columns => [1,2],
36             response => sub {
37             $results->{result1} = shift;
38             }
39             }
40             );
41            
42             $evented->selectrow_hashref(
43             q{
44             select
45             test1,
46             test2
47             from
48             test
49             },
50             {
51             response => sub {
52             $results->{result2} = shift;
53             }
54             }
55             );
56              
57             $evented->execute_in_parallel;
58              
59             =head1 STATIC METHODS
60              
61             =head2 new ($connection_str, $username, $pass, %dbi_args )
62              
63             In order to initialize a DB::Evented object a connection_str is most likely required.
64             See AnyEvent::DBI for more information.
65              
66             =cut
67              
68             sub new {
69 1     1 1 685 my $class = shift;
70 1   33     4 $class ||= ref $class;
71 1         6 my ($connection_str, $username, $pass, %dbi_args) = @_;
72 1         11 return bless {
73             connection_str => $connection_str,
74             username => $username,
75             pass => $pass,
76             dbi_args => \%dbi_args,
77             _queue => [],
78             }, $class;
79             }
80              
81             =head1 INSTANCE METHODS
82              
83             =head2 any_event_handler
84              
85             This will return an AnyEvent::DBI handler. The key difference between this handler and DBI is that it's using AnyEvent
86             under the hood. What does this mean? It means that if you use an AnyEvent::DBI method it will run asynchronously.
87              
88             =cut
89             sub any_event_handler {
90 5     5 1 16 my $self = shift;
91 5         317 return AnyEvent::DBI->new($self->{connection_str}, $self->{username}, $self->{pass}, %{$self->{dbi_args}}, on_error => sub {
92 0     0   0 $self->clear_queue;
93 0         0 warn "DBI Error: $@ at $_[1]:$_[2]\n";
94 5         28 });
95             }
96              
97             =head2 clear_handlers
98              
99             Clears all handlers
100              
101             =cut
102              
103             sub clear_handlers {
104 0     0 1 0 $handlers = [];
105             }
106              
107             =head2 clear_queue
108              
109             Clears the queue of any db todos
110              
111             =cut
112              
113             sub clear_queue {
114 1     1 1 9 $_[0]->{_queue} = undef;
115             }
116              
117             =head2 execute_in_parallel
118              
119             Will execute all of the queued statements in parallel. This will create a pool of handlers and cache them if necessary.
120              
121             =cut
122              
123             sub execute_in_parallel {
124 1     1 1 1001 my $self = shift;
125 1 50       7 if ( scalar @{$self->{_queue}} ) {
  1         9  
126             # Setup a pool of handlers
127             # TODO: Make this more intelligent to shrink
128 1 50 33     8 if ( ! scalar @{$handlers} || ( scalar @{$handlers} < scalar @{$self->{_queue}} )) {
  1         20  
  0         0  
  0         0  
129 1         2 while ( scalar @{$handlers} < scalar @{$self->{_queue}} ) {
  5         15139  
  5         115  
130 4         27 push @{$handlers}, $self->any_event_handler;
  4         48  
131             }
132             }
133 1         262 $self->{cv} = AnyEvent->condvar;
134 1         35 my $count = 0;
135 1         8 for my $item ( @{$self->{_queue}} ) {
  1         30  
136 4         31 my $cb = pop @$item;
137             my $callback_wrapper = sub {
138 4     4   1299293 my ($dbh, $result) = @_;
139 4         49 $cb->($result, $dbh);
140 4         102 $self->{cv}->end;
141 4         28 };
142 4         25 my $req_method = pop @$item;
143 4         10 my $line = pop @$item;
144 4         18 my $file = pop @$item;
145 4         67 $self->{cv}->begin;
146 4         35 $handlers->[$count]->_req($callback_wrapper, $line, $file, $req_method, @$item);
147 4         271 $count++;
148             }
149 1         12 $self->{cv}->recv;
150 1         109 delete $self->{cv};
151 1         18 $self->clear_queue;
152             }
153 1         52 return;
154             }
155              
156             sub _add_to_queue {
157 4     4   35 my ( $self, $sql, $attr, $key_field, @args) = @_;
158              
159 4         16 my $cb = delete $attr->{response};
160 4         29 my $item = [$sql, $attr, $key_field, @args, __PACKAGE__ . '::_req_dispatch', $cb];
161              
162 4         11 push @{$self->{_queue}}, $item;
  4         25  
163             }
164              
165             sub _req_dispatch {
166 0     0   0 my (undef, $st, $attr, $key_field, @args) = @{+shift};
  0         0  
167 0         0 my $method_name = pop @args;
168 0 0       0 my $result = $AnyEvent::DBI::DBH->$method_name($key_field ? ($st, $key_field, $attr, @args) : ($st, $attr, @args) );
169 0 0       0 [1, $result ? $result : undef];
170             }
171              
172             =head2 selectall_arrayref ($sql, \%attr, @binds )
173              
174             This method functions in the same way as DBI::selectall_arrayref. The key difference
175             being it delays the execution until execute_in_parallel has been called. The results
176             can be accessed in the response attribute call back
177              
178             =cut
179              
180             =head2 selectall_hashref ($sql, $key_field, \%attr, @binds )
181              
182             This method functions in the same way as DBI::selectall_hashref. The key difference
183             being it delays the execution until execute_in_parallel has been called. The results
184             can be accessed in the response attribute call back
185              
186             =cut
187              
188             =head2 selectrow_arrayref ($sql, \%attr, @binds )
189              
190             This method functions in the same way as DBI::selectrow_arrayref. The key difference
191             being it delays the execution until execute_in_parallel has been called. The results
192             can be accessed in the response attribute call back
193              
194             =cut
195              
196             =head2 selectrow_hashref ($sql, \%attr, @binds )
197              
198             This method functions in the same way as DBI::selectrow_hashref. The key difference
199             being it delays the execution until execute_in_parallel has been called. The results
200             can be accessed in the response attribute call back
201              
202             =cut
203              
204             for my $method_name ( qw(selectrow_hashref selectcol_arrayref selectall_hashref selectall_arrayref) ) {
205 1     1   10 no strict 'refs';
  1         1  
  1         190  
206             *{$method_name} = sub {
207 4     4   1328168 my $self = shift;
208 4 100       41 my ($sql, $key_field, $attr, @args) = (shift, ($method_name eq 'selectall_hashref' ? (shift) : (undef)), shift, @_);
209 4         80 $self->_add_to_queue($sql, $attr, $key_field, @args, $method_name, (caller)[1,2]);
210             };
211             }
212              
213             # TODO: Investigate if this is the bet way to handle this.
214             # The child processes are technically held by AnyEvent::DBI
215             # by clearing the known handlers these children *should* be reaped
216             sub DESTROY {
217 0     0     my $error = do {
218 0           local $@;
219 0           eval {
220 0           DB::Evented->clear_handlers;
221             };
222 0           $@;
223             };
224 0 0         $? = 0 unless $error;
225             }
226              
227             =head1 AUTHOR
228              
229             Logan Bell, C<< >>
230              
231             =head1 SUPPORT
232              
233             You can find documentation for this module with the perldoc command.
234              
235             perldoc DB::Evented
236              
237             You can also look for information at:
238              
239             =head1 ACKNOWLEDGEMENTS
240              
241             Aaron Cohen and Belden Lyman.
242              
243             =head1 LICENSE
244              
245             Copyright (c) 2013 Logan Bell and Shutterstock Inc (http://shutterstock.com). All rights reserved. This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
246              
247             =cut
248              
249             1; # End of DB::Evented