File Coverage

blib/lib/DBD/Multi.pm
Criterion Covered Total %
statement 215 226 95.1
branch 53 72 73.6
condition 11 12 91.6
subroutine 41 44 93.1
pod 0 1 0.0
total 320 355 90.1


line stmt bran cond sub pod time code
1             package DBD::Multi;
2             our $VERSION = '1.01';
3             package DBD::Multi::dr;
4             our $VERSION = '1.01';
5             package DBD::Multi::db;
6             our $VERSION = '1.01';
7             package DBD::Multi::st;
8             our $VERSION = '1.01';
9             package DBD::Multi::Handler;
10             our $VERSION = '1.01';
11              
12             package DBD::Multi;
13 12     12   310702 use strict;
  12         26  
  12         297  
14              
15 12     12   7198 use DBI;
  12         87582  
  12         644  
16             DBI->setup_driver('DBD::Multi');
17              
18 12     12   78 use vars qw[$err $errstr $sqlstate $drh];
  12         23  
  12         1845  
19              
20             $err = 0; # DBI::err
21             $errstr = ""; # DBI::errstr
22             $sqlstate = ""; # DBI::state
23             $drh = undef;
24              
25             sub driver {
26 10 50   10 0 112363 return $drh if $drh;
27 10         33 my($class, $attr) = @_;
28 10         59 $class .= '::dr';
29              
30 10         211 $drh = DBI::_new_drh($class, {
31             Name => 'Multi',
32             Version => $VERSION,
33             Err => \$DBD::Multi::err,
34             Errstr => \$DBD::Multi::errstr,
35             State => \$DBD::Multi::sqlstate,
36             Attribution => 'DBD::Multi, pair Networks Inc.',
37             });
38 10         427 DBD::Multi::db->install_method('multi_do_all');
39 10         508 return $drh;
40             }
41              
42             #######################################################################
43             package DBD::Multi::dr;
44 12     12   74 use strict;
  12         19  
  12         307  
45              
46             $DBD::Multi::dr::imp_data_size = 0;
47 12     12   4441 use DBD::File;
  12         433533  
  12         2742  
48              
49 0     0   0 sub DESTROY { shift->STORE(Active => 0) }
50              
51             sub connect {
52 113     113   16747 my($drh, $dbname, $user, $auth, $attr) = @_;
53 113         415 my $dbh = DBI::_new_dbh(
54             $drh => {
55             Name => $dbname,
56             USER => $user,
57             CURRENT_USER => $user,
58             },
59             );
60             my @dsns = $attr->{dsns} && ref($attr->{dsns}) eq 'ARRAY'
61 113 100 66     3222 ? @{$attr->{dsns}}
  112         271  
62             : ();
63              
64 113 50       252 if ( $dbname =~ /dsn=(.*)/ ) {
65 0         0 push @dsns, ( -1, [$1, $user, $auth] );
66             }
67              
68 113         398 my $handler = DBD::Multi::Handler->new({
69             dsources => [ @dsns ],
70             });
71             $handler->failed_max($attr->{failed_max})
72 113 50       319 if exists $attr->{failed_max};
73             $handler->failed_expire($attr->{failed_expire})
74 113 100       200 if exists $attr->{failed_expire};
75              
76 113         384 $dbh->STORE(_handler => $handler);
77 113         259 $dbh->STORE(handler => $handler); # temporary
78 113         467 $drh->{_handler} = $handler;
79 113         314 $dbh->STORE(Active => 1);
80 113         305 return $dbh;
81             }
82              
83 6     6   3091 sub data_sources { shift->FETCH('_handler')->all_sources }
84              
85             #######################################################################
86             package DBD::Multi::db;
87 12     12   96 use strict;
  12         24  
  12         4171  
88              
89             $DBD::Multi::db::imp_data_size = 0;
90              
91             sub prepare {
92 415     415   3048284 my ($dbh, $statement, @attribs) = @_;
93              
94             # create a 'blank' sth
95 415         1401 my ($outer, $sth) = DBI::_new_sth($dbh, { Statement => $statement });
96              
97 415         10458 my $handler = $dbh->FETCH('_handler');
98 415         1279 $sth->STORE(_handler => $handler);
99              
100 415         879 my $_dbh = $handler->dbh;
101 414         587 my $_sth;
102 414         718 until ( $_sth ) {
103 436         1866 $_sth = $_dbh->prepare($statement, @attribs);
104 436 100       41940 unless ( $_sth ) {
105 24         79 $handler->dbh_failed;
106 24         102 $_dbh = $handler->dbh;
107             }
108             }
109              
110 412         2825 $sth->STORE(NUM_OF_PARAMS => $_sth->FETCH('NUM_OF_PARAMS'));
111 412         1073 $sth->STORE(_dbh => $_dbh);
112 412         936 $sth->STORE(_sth => $_sth);
113              
114 412         1234 return $outer;
115             }
116              
117             sub multi_do_all {
118 9     9   40426 my $dbh = shift;
119 9         72 my $handler = $dbh->FETCH('_handler');
120 9         36 return $handler->multi_do_all( @_ );
121             }
122              
123             sub disconnect {
124 113     113   2374 my ($dbh) = @_;
125 113         277 $dbh->STORE(Active => 0);
126 113         1978 1;
127             }
128              
129             sub commit {
130 0     0   0 my ($dbh) = @_;
131 0 0       0 if ( $dbh->FETCH('Active') ) {
132 0 0       0 return $dbh->FETCH('_dbh')->commit if $dbh->FETCH('_dbh');
133             }
134 0         0 return;
135             }
136              
137             sub rollback {
138 0     0   0 my ($dbh) = @_;
139 0 0       0 if ( $dbh->FETCH('Active') ) {
140 0 0       0 return $dbh->FETCH('_dbh')->rollback if $dbh->FETCH('_dbh');
141             }
142 0         0 return;
143             }
144              
145             sub get_info {
146 5     5   1908 my($dbh, $info_type) = @_;
147              
148             # return info from current connection
149 5         21 my $handler = $dbh->FETCH('_handler');
150 5         12 my $_dbh = $handler->dbh;
151 5         17 return $_dbh->get_info($info_type);
152             }
153              
154             sub STORE {
155 1017     1017   6794 my ($self, $attr, $val) = @_;
156 1017         2409 $self->{$attr} = $val;
157             }
158              
159 110     110   6768 sub DESTROY { shift->disconnect }
160              
161             #######################################################################
162             package DBD::Multi::st;
163 12     12   76 use strict;
  12         29  
  12         345  
164              
165             $DBD::Multi::st::imp_data_size = 0;
166              
167 12     12   56 use vars qw[@METHODS @FIELDS];
  12         22  
  12         3480  
168             @METHODS = qw[
169             bind_param
170             bind_param_inout
171             bind_param_array
172             execute_array
173             execute_for_fetch
174             fetch
175             fetchrow_arrayref
176             fetchrow_array
177             fetchrow_hashref
178             fetchall_arrayref
179             fetchall_hashref
180             bind_col
181             bind_columns
182             dump_results
183             ];
184              
185             @FIELDS = qw[
186             NUM_OF_FIELDS
187             CursorName
188             ParamValues
189             RowsInCache
190             ];
191              
192             sub execute {
193 410     410   4870 my $sth = shift;
194 410         942 my $_sth = $sth->FETCH('_sth');
195             my $params = @_
196             ? $sth->{f_params} = [ @_ ]
197 410 100       1063 : $sth->{f_params};
198              
199 410 100       916 $sth->finish if $sth->FETCH('Active');
200 410         756 $sth->{Active} = 1;
201 410         526 my $rc = $_sth->execute(@{$params});
  410         50330  
202              
203 410         1317 for my $field ( @FIELDS ) {
204 1640         4268 my $value = $_sth->FETCH($field);
205 1640 100 100     4011 $sth->STORE($field => $value)
206             unless ! defined $value
207             || defined $sth->FETCH($field);
208             }
209              
210 410         1218 return $rc;
211             }
212              
213             sub FETCH {
214 2041     2041   3347 my ($sth, $attrib) = @_;
215 2041 100       14284 $sth->{'_sth'}->FETCH($attrib) || $sth->{$attrib};
216             }
217              
218             sub STORE {
219 2056     2056   3489 my ($self, $attr, $val) = @_;
220 2056         4597 $self->{$attr} = $val;
221             }
222              
223 3     3   36 sub rows { shift->FETCH('_sth')->rows }
224              
225             sub finish {
226 402     402   2315 my ($sth) = @_;
227 402         970 $sth->STORE(Active => 0);
228 402         976 return $sth->FETCH('_sth')->finish;
229             }
230              
231             foreach my $method ( @METHODS ) {
232 12     12   75 no strict;
  12         20  
  12         686  
233 407     407   2431 *{$method} = sub { shift->FETCH('_sth')->$method(@_) };
234             }
235              
236             #######################################################################
237             package DBD::Multi::Handler;
238 12     12   89 use strict;
  12         31  
  12         266  
239              
240 12     12   55 use base qw[Class::Accessor::Fast];
  12         20  
  12         3424  
241 12     12   26288 use Sys::SigAction qw(timeout_call);
  12         101662  
  12         703  
242 12     12   124 use List::Util qw(shuffle);
  12         24  
  12         12300  
243              
244             =begin ImplementationNotes
245              
246             dsources - This thing changes from an arrayref to a hashref during construction. :(
247              
248             Initially, when data is passed in during construction, it's an arrayref
249             containing the 'dsns' param from the user's connect() call.
250              
251             Later, when _configure_dsources gets called, it turns into a multi-dimension
252             hashref:
253              
254             $dsources->{$pri}->{$dsource_id} = 1;
255              
256             The first key is the priority number, the second key is the data source index
257             number. The value is always just a true value.
258              
259             nextid - A counter. Stores the index number of the next data source to be added.
260              
261             all_dsources - A hashref. Maps index number to the connect data.
262              
263             current_dsource - The most recent chosen datasource index number.
264              
265             used - A hashref. Keys are index numbers. Values are true when the datasource
266             has been previously assigned and we want to prefer other datasources of the
267             same priority (for round-robin load distribution).
268              
269             failed - A hashref. Keys are index numbers. Values are counters indicating
270             how many times the data source has failed.
271              
272             failed_last - A hashref. Keys are index number. Values are unix timestamp
273             indicating the most recent time a data source failed.
274              
275             failed_max - A scalar value. Number of times a datasource may fail before we
276             stop trying it.
277              
278             failed_expire - A scalar value. Number of seconds since we stopped trying a
279             datasource before we'll try it again.
280              
281             timeout - A scalar value. Number of seconds we try to connect to a datasource
282             before giving up.
283              
284             =end ImplementationNotes
285              
286             =cut
287              
288             __PACKAGE__->mk_accessors(qw[
289             dsources
290             nextid
291             all_dsources
292             current_dsource
293             used
294             failed
295             failed_last
296             failed_max
297             failed_expire
298             timeout
299             ]);
300              
301             sub new {
302 113     113   204 my ($class, $args) = @_;
303 113         393 my $self = $class->SUPER::new($args);
304 113 50       1072 $self->nextid(0) unless defined $self->nextid;
305 113         1094 $self->all_dsources({});
306 113         524 $self->used({});
307 113         519 $self->failed({});
308 113         511 $self->failed_last({});
309 113 50       468 $self->failed_max(3) unless defined $self->failed_max;
310 113 50       825 $self->failed_expire(60*5) unless defined $self->failed_expire;
311 113 50       901 $self->timeout( 5 ) unless defined $self->timeout;
312 113         825 $self->_configure_dsources;
313 113         158 return $self;
314             }
315              
316             sub all_sources {
317 6     6   13 my ($self) = @_;
318 6         13 return values %{$self->all_dsources};
  6         15  
319             }
320              
321             sub add_to_pri {
322 234     234   310 my ($self, $pri, $dsource) = @_;
323 234         348 my $dsource_id = $self->nextid;
324 234         721 my $dsources = $self->dsources;
325 234         672 my $all = $self->all_dsources;
326              
327 234         720 $all->{$dsource_id} = $dsource;
328 234         393 $dsources->{$pri}->{$dsource_id} = 1;
329              
330 234         395 $self->nextid($dsource_id + 1);
331             }
332              
333             sub dbh {
334 450     450   609 my $self = shift;
335 450         815 my $dbh = $self->_connect_dsource;
336 447 100       1365 return $dbh if $dbh;
337 6         22 $self->dbh_failed;
338 6         50 $self->dbh;
339             }
340              
341             sub dbh_failed {
342 30     30   90 my ($self) = @_;
343              
344 30         80 my $current_dsource = $self->current_dsource;
345 30         150 $self->failed->{$current_dsource}++;
346 30         134 $self->failed_last->{$current_dsource} = time;
347             }
348              
349             sub _purge_old_failures {
350 450     450   638 my ($self) = @_;
351 450         593 my $now = time;
352 450         518 my @all = keys %{$self->all_dsources};
  450         971  
353            
354 450         2468 foreach my $dsource ( @all ) {
355 966 100       3077 next unless $self->failed->{$dsource};
356 175 100       748 if ( ($now - $self->failed_last->{$dsource}) > $self->failed_expire ) {
357 1         34 delete $self->failed->{$dsource};
358 1         7 delete $self->failed_last->{$dsource};
359             }
360             }
361             }
362              
363             sub _pick_dsource {
364 450     450   595 my ($self) = @_;
365 450         915 $self->_purge_old_failures;
366 450         2433 my $dsources = $self->dsources;
367 450         1401 my @pri = sort { $a <=> $b } keys %{$dsources};
  266         937  
  450         1390  
368              
369 450         807 foreach my $pri ( @pri ) {
370 566         1001 my $dsource = $self->_pick_pri_dsource($dsources->{$pri});
371 566 100       1132 if ( defined $dsource ) {
372 448         975 $self->current_dsource($dsource);
373 448         2295 return;
374             }
375             }
376              
377 2         8 $self->used({});
378             return $self->_pick_dsource
379 2 50       11 if (grep {$self->failed->{$_} >= $self->failed_max} keys(%{$self->failed})) < keys(%{$self->all_dsources});
  8         38  
  2         4  
  2         12  
380 2         37 die("All data sources failed!");
381             }
382              
383             ### _pick_pri_dsource
384             # Given a list of data sources, all of the same priority, choose one to use.
385             # Passed in a structure like this:
386             # _pick_pri_dsource( $self, { 3 => 1, 4 => 1, 5 => 1 } )
387             # Where the keys 3,4,5 are dsource id's, and the values are always true.
388             sub _pick_pri_dsource {
389 566     566   907 my ($self, $dsources) = @_;
390 566         701 my @dsources = sort { $a <=> $b } keys %{$dsources}; # Indexes of data sources to try.
  245         727  
  566         1410  
391 566         989 my @used = grep { exists $self->used->{$_} } @dsources; # List of data sources already used.
  811         2098  
392 566 100       2544 my @failed = grep { exists($self->failed->{$_}) && $self->failed->{$_} >= $self->failed_max } @dsources; # List of data sources that won't be tried.
  811         2111  
393              
394             # We've used them all and they all failed. Escallate.
395 566 100 100     4468 return if @used == @dsources && @failed == @dsources;
396            
397             # We've used them all but some are good. Purge and reuse.
398 448 100       1012 delete @{$self->used}{@dsources} if @used == @dsources;
  271         512  
399              
400 448         2040 foreach my $dsource ( shuffle @dsources ) {
401             next if $self->failed->{$dsource}
402 471 100 100     830 && $self->failed->{$dsource} >= $self->failed_max;
403 470 100       2169 next if $self->used->{$dsource};
404              
405 448         1690 $self->used->{$dsource} = 1;
406 448         1877 return $dsource;
407             }
408 0         0 return;
409             }
410              
411             sub _configure_dsources {
412 113     113   183 my ($self) = @_;
413 113         198 my $dsources = $self->dsources;
414 113         427 $self->dsources({});
415              
416 113         419 while ( my $pri = shift @{$dsources} ) {
  347         1368  
417 234 50       243 my $dsource = shift @{$dsources} or last;
  234         393  
418 234         356 $self->add_to_pri($pri => $dsource);
419             }
420             }
421              
422             sub _connect_dsource {
423 534     534   873 my ($self, $dsource) = @_;
424 534 100       1025 unless ( $dsource ) {
425 450         969 $self->_pick_dsource;
426 448         794 $dsource = $self->all_dsources->{$self->current_dsource};
427             }
428              
429             # Support ready-made handles
430 532 100       3273 return $dsource if UNIVERSAL::isa($dsource, 'DBI::db');
431              
432             # Support code-refs which return handles
433 478 100       1055 if (ref $dsource eq 'CODE') {
434 213         460 my $handle = $dsource->();
435 213 100       61341 return $handle if UNIVERSAL::isa($handle, 'DBI::db');
436 6         14 return undef; # Connect by coderef failed.
437             }
438              
439 265         300 my $dbh;
440 265         784 local $ENV{DBI_AUTOPROXY};
441 265 50   265   578 if (timeout_call( $self->timeout, sub { $dbh = DBI->connect_cached(@{$dsource}) } )) {
  265         29046  
  265         816  
442             #warn "Timeout[", $self->current_dsource, "] at ", time, "\n";
443             }
444 264         75452 return $dbh;
445             }
446              
447             sub connect_dsource {
448 84     84   194 my ($self, $dsource) = @_;
449 84         239 $self->_connect_dsource($dsource);
450             }
451              
452             sub multi_do_all {
453 21     21   57 my ($self, $code) = @_;
454              
455 21         33 my @all = values %{$self->all_dsources};
  21         85  
456              
457 21         156 foreach my $source ( @all ) {
458 84         567703 my $dbh = $self->connect_dsource($source);
459 84 50       224 next unless $dbh;
460 84 100       856 if ( $dbh->{handler} ) {
461 12         73 $dbh->{handler}->multi_do_all($code, $source);
462 12         100395 next;
463             }
464 72         265 $code->($dbh);
465             }
466             }
467              
468             1;
469             __END__