File Coverage

blib/lib/DBD/Multi.pm
Criterion Covered Total %
statement 215 226 95.1
branch 53 72 73.6
condition 11 12 91.6
subroutine 41 44 93.1
pod 0 1 0.0
total 320 355 90.1


line stmt bran cond sub pod time code
1             package DBD::Multi;
2 12     12   357370 use strict;
  12         29  
  12         364  
3              
4 12     12   8440 use DBI;
  12         111719  
  12         788  
5             DBI->setup_driver('DBD::Multi');
6              
7 12     12   97 use vars qw[$VERSION $err $errstr $sqlstate $drh];
  12         27  
  12         2199  
8              
9             $VERSION = '1.00';
10              
11             $err = 0; # DBI::err
12             $errstr = ""; # DBI::errstr
13             $sqlstate = ""; # DBI::state
14             $drh = undef;
15              
16             sub driver {
17 10 50   10 0 208792 return $drh if $drh;
18 10         33 my($class, $attr) = @_;
19 10         33 $class .= '::dr';
20              
21 10         96 $drh = DBI::_new_drh($class, {
22             Name => 'Multi',
23             Version => $VERSION,
24             Err => \$DBD::Multi::err,
25             Errstr => \$DBD::Multi::errstr,
26             State => \$DBD::Multi::sqlstate,
27             Attribution => 'DBD::Multi, pair Networks Inc.',
28             });
29 10         426 DBD::Multi::db->install_method('multi_do_all');
30 10         513 return $drh;
31             }
32              
33             #######################################################################
34             package DBD::Multi::dr;
35 12     12   80 use strict;
  12         27  
  12         371  
36              
37             $DBD::Multi::dr::imp_data_size = 0;
38 12     12   5225 use DBD::File;
  12         500430  
  12         2962  
39              
40 0     0   0 sub DESTROY { shift->STORE(Active => 0) }
41              
42             sub connect {
43 113     113   16766 my($drh, $dbname, $user, $auth, $attr) = @_;
44 113         567 my $dbh = DBI::_new_dbh(
45             $drh => {
46             Name => $dbname,
47             USER => $user,
48             CURRENT_USER => $user,
49             },
50             );
51             my @dsns = $attr->{dsns} && ref($attr->{dsns}) eq 'ARRAY'
52 113 100 66     3322 ? @{$attr->{dsns}}
  112         289  
53             : ();
54              
55 113 50       274 if ( $dbname =~ /dsn=(.*)/ ) {
56 0         0 push @dsns, ( -1, [$1, $user, $auth] );
57             }
58              
59 113         425 my $handler = DBD::Multi::Handler->new({
60             dsources => [ @dsns ],
61             });
62             $handler->failed_max($attr->{failed_max})
63 113 50       307 if exists $attr->{failed_max};
64             $handler->failed_expire($attr->{failed_expire})
65 113 100       210 if exists $attr->{failed_expire};
66              
67 113         405 $dbh->STORE(_handler => $handler);
68 113         262 $dbh->STORE(handler => $handler); # temporary
69 113         469 $drh->{_handler} = $handler;
70 113         261 $dbh->STORE(Active => 1);
71 113         332 return $dbh;
72             }
73              
74 6     6   4538 sub data_sources { shift->FETCH('_handler')->all_sources }
75              
76             #######################################################################
77             package DBD::Multi::db;
78 12     12   111 use strict;
  12         25  
  12         4481  
79              
80             $DBD::Multi::db::imp_data_size = 0;
81              
82             sub prepare {
83 415     415   3067095 my ($dbh, $statement, @attribs) = @_;
84              
85             # create a 'blank' sth
86 415         1602 my ($outer, $sth) = DBI::_new_sth($dbh, { Statement => $statement });
87              
88 415         12193 my $handler = $dbh->FETCH('_handler');
89 415         1545 $sth->STORE(_handler => $handler);
90              
91 415         1074 my $_dbh = $handler->dbh;
92 414         631 my $_sth;
93 414         857 until ( $_sth ) {
94 436         2182 $_sth = $_dbh->prepare($statement, @attribs);
95 436 100       50011 unless ( $_sth ) {
96 24         71 $handler->dbh_failed;
97 24         119 $_dbh = $handler->dbh;
98             }
99             }
100              
101 412         3404 $sth->STORE(NUM_OF_PARAMS => $_sth->FETCH('NUM_OF_PARAMS'));
102 412         1325 $sth->STORE(_dbh => $_dbh);
103 412         1206 $sth->STORE(_sth => $_sth);
104              
105 412         1506 return $outer;
106             }
107              
108             sub multi_do_all {
109 9     9   65567 my $dbh = shift;
110 9         107 my $handler = $dbh->FETCH('_handler');
111 9         46 return $handler->multi_do_all( @_ );
112             }
113              
114             sub disconnect {
115 113     113   2559 my ($dbh) = @_;
116 113         357 $dbh->STORE(Active => 0);
117 113         2104 1;
118             }
119              
120             sub commit {
121 0     0   0 my ($dbh) = @_;
122 0 0       0 if ( $dbh->FETCH('Active') ) {
123 0 0       0 return $dbh->FETCH('_dbh')->commit if $dbh->FETCH('_dbh');
124             }
125 0         0 return;
126             }
127              
128             sub rollback {
129 0     0   0 my ($dbh) = @_;
130 0 0       0 if ( $dbh->FETCH('Active') ) {
131 0 0       0 return $dbh->FETCH('_dbh')->rollback if $dbh->FETCH('_dbh');
132             }
133 0         0 return;
134             }
135              
136             sub get_info {
137 5     5   1929 my($dbh, $info_type) = @_;
138              
139             # return info from current connection
140 5         21 my $handler = $dbh->FETCH('_handler');
141 5         12 my $_dbh = $handler->dbh;
142 5         17 return $_dbh->get_info($info_type);
143             }
144              
145             sub STORE {
146 1017     1017   6992 my ($self, $attr, $val) = @_;
147 1017         2544 $self->{$attr} = $val;
148             }
149              
150 110     110   10565 sub DESTROY { shift->disconnect }
151              
152             #######################################################################
153             package DBD::Multi::st;
154 12     12   84 use strict;
  12         27  
  12         348  
155              
156             $DBD::Multi::st::imp_data_size = 0;
157              
158 12     12   81 use vars qw[@METHODS @FIELDS];
  12         33  
  12         3668  
159             @METHODS = qw[
160             bind_param
161             bind_param_inout
162             bind_param_array
163             execute_array
164             execute_for_fetch
165             fetch
166             fetchrow_arrayref
167             fetchrow_array
168             fetchrow_hashref
169             fetchall_arrayref
170             fetchall_hashref
171             bind_col
172             bind_columns
173             dump_results
174             ];
175              
176             @FIELDS = qw[
177             NUM_OF_FIELDS
178             CursorName
179             ParamValues
180             RowsInCache
181             ];
182              
183             sub execute {
184 410     410   5139 my $sth = shift;
185 410         1158 my $_sth = $sth->FETCH('_sth');
186             my $params = @_
187             ? $sth->{f_params} = [ @_ ]
188 410 100       1204 : $sth->{f_params};
189              
190 410 100       1166 $sth->finish if $sth->FETCH('Active');
191 410         961 $sth->{Active} = 1;
192 410         599 my $rc = $_sth->execute(@{$params});
  410         65835  
193              
194 410         1514 for my $field ( @FIELDS ) {
195 1640         5326 my $value = $_sth->FETCH($field);
196 1640 100 100     4778 $sth->STORE($field => $value)
197             unless ! defined $value
198             || defined $sth->FETCH($field);
199             }
200              
201 410         1450 return $rc;
202             }
203              
204             sub FETCH {
205 2041     2041   3960 my ($sth, $attrib) = @_;
206 2041 100       17241 $sth->{'_sth'}->FETCH($attrib) || $sth->{$attrib};
207             }
208              
209             sub STORE {
210 2056     2056   4174 my ($self, $attr, $val) = @_;
211 2056         5398 $self->{$attr} = $val;
212             }
213              
214 3     3   40 sub rows { shift->FETCH('_sth')->rows }
215              
216             sub finish {
217 402     402   2776 my ($sth) = @_;
218 402         1250 $sth->STORE(Active => 0);
219 402         1134 return $sth->FETCH('_sth')->finish;
220             }
221              
222             foreach my $method ( @METHODS ) {
223 12     12   77 no strict;
  12         29  
  12         905  
224 407     407   2861 *{$method} = sub { shift->FETCH('_sth')->$method(@_) };
225             }
226              
227             #######################################################################
228             package DBD::Multi::Handler;
229 12     12   108 use strict;
  12         27  
  12         292  
230              
231 12     12   77 use base qw[Class::Accessor::Fast];
  12         21  
  12         4391  
232 12     12   28223 use Sys::SigAction qw(timeout_call);
  12         115600  
  12         771  
233 12     12   98 use List::Util qw(shuffle);
  12         19  
  12         12741  
234              
235             =begin ImplementationNotes
236              
237             dsources - This thing changes from an arrayref to a hashref during construction. :(
238              
239             Initially, when data is passed in during construction, it's an arrayref
240             containing the 'dsns' param from the user's connect() call.
241              
242             Later, when _configure_dsources gets called, it turns into a multi-dimension
243             hashref:
244              
245             $dsources->{$pri}->{$dsource_id} = 1;
246              
247             The first key is the priority number, the second key is the data source index
248             number. The value is always just a true value.
249              
250             nextid - A counter. Stores the index number of the next data source to be added.
251              
252             all_dsources - A hashref. Maps index number to the connect data.
253              
254             current_dsource - The most recent chosen datasource index number.
255              
256             used - A hashref. Keys are index numbers. Values are true when the datasource
257             has been previously assigned and we want to prefer other datasources of the
258             same priority (for round-robin load distribution).
259              
260             failed - A hashref. Keys are index numbers. Values are counters indicating
261             how many times the data source has failed.
262              
263             failed_last - A hashref. Keys are index number. Values are unix timestamp
264             indicating the most recent time a data source failed.
265              
266             failed_max - A scalar value. Number of times a datasource may fail before we
267             stop trying it.
268              
269             failed_expire - A scalar value. Number of seconds since we stopped trying a
270             datasource before we'll try it again.
271              
272             timeout - A scalar value. Number of seconds we try to connect to a datasource
273             before giving up.
274              
275             =end ImplementationNotes
276              
277             =cut
278              
279             __PACKAGE__->mk_accessors(qw[
280             dsources
281             nextid
282             all_dsources
283             current_dsource
284             used
285             failed
286             failed_last
287             failed_max
288             failed_expire
289             timeout
290             ]);
291              
292             sub new {
293 113     113   193 my ($class, $args) = @_;
294 113         349 my $self = $class->SUPER::new($args);
295 113 50       1048 $self->nextid(0) unless defined $self->nextid;
296 113         1111 $self->all_dsources({});
297 113         543 $self->used({});
298 113         532 $self->failed({});
299 113         550 $self->failed_last({});
300 113 50       490 $self->failed_max(3) unless defined $self->failed_max;
301 113 50       838 $self->failed_expire(60*5) unless defined $self->failed_expire;
302 113 50       924 $self->timeout( 5 ) unless defined $self->timeout;
303 113         834 $self->_configure_dsources;
304 113         177 return $self;
305             }
306              
307             sub all_sources {
308 6     6   16 my ($self) = @_;
309 6         12 return values %{$self->all_dsources};
  6         24  
310             }
311              
312             sub add_to_pri {
313 234     234   323 my ($self, $pri, $dsource) = @_;
314 234         374 my $dsource_id = $self->nextid;
315 234         752 my $dsources = $self->dsources;
316 234         721 my $all = $self->all_dsources;
317              
318 234         1022 $all->{$dsource_id} = $dsource;
319 234         420 $dsources->{$pri}->{$dsource_id} = 1;
320              
321 234         422 $self->nextid($dsource_id + 1);
322             }
323              
324             sub dbh {
325 450     450   705 my $self = shift;
326 450         1003 my $dbh = $self->_connect_dsource;
327 447 100       1616 return $dbh if $dbh;
328 6         17 $self->dbh_failed;
329 6         43 $self->dbh;
330             }
331              
332             sub dbh_failed {
333 30     30   56 my ($self) = @_;
334              
335 30         67 my $current_dsource = $self->current_dsource;
336 30         174 $self->failed->{$current_dsource}++;
337 30         161 $self->failed_last->{$current_dsource} = time;
338             }
339              
340             sub _purge_old_failures {
341 450     450   712 my ($self) = @_;
342 450         720 my $now = time;
343 450         592 my @all = keys %{$self->all_dsources};
  450         1107  
344            
345 450         2910 foreach my $dsource ( @all ) {
346 966 100       3566 next unless $self->failed->{$dsource};
347 175 100       1091 if ( ($now - $self->failed_last->{$dsource}) > $self->failed_expire ) {
348 1         23 delete $self->failed->{$dsource};
349 1         11 delete $self->failed_last->{$dsource};
350             }
351             }
352             }
353              
354             sub _pick_dsource {
355 450     450   750 my ($self) = @_;
356 450         1022 $self->_purge_old_failures;
357 450         3138 my $dsources = $self->dsources;
358 450         1764 my @pri = sort { $a <=> $b } keys %{$dsources};
  266         1181  
  450         1719  
359              
360 450         947 foreach my $pri ( @pri ) {
361 566         1269 my $dsource = $self->_pick_pri_dsource($dsources->{$pri});
362 566 100       1374 if ( defined $dsource ) {
363 448         1191 $self->current_dsource($dsource);
364 448         2717 return;
365             }
366             }
367              
368 2         8 $self->used({});
369             return $self->_pick_dsource
370 2 50       11 if (grep {$self->failed->{$_} >= $self->failed_max} keys(%{$self->failed})) < keys(%{$self->all_dsources});
  8         49  
  2         6  
  2         18  
371 2         49 die("All data sources failed!");
372             }
373              
374             ### _pick_pri_dsource
375             # Given a list of data sources, all of the same priority, choose one to use.
376             # Passed in a structure like this:
377             # _pick_pri_dsource( $self, { 3 => 1, 4 => 1, 5 => 1 } )
378             # Where the keys 3,4,5 are dsource id's, and the values are always true.
379             sub _pick_pri_dsource {
380 566     566   1081 my ($self, $dsources) = @_;
381 566         806 my @dsources = sort { $a <=> $b } keys %{$dsources}; # Indexes of data sources to try.
  245         786  
  566         1693  
382 566         1173 my @used = grep { exists $self->used->{$_} } @dsources; # List of data sources already used.
  811         2507  
383 566 100       3250 my @failed = grep { exists($self->failed->{$_}) && $self->failed->{$_} >= $self->failed_max } @dsources; # List of data sources that won't be tried.
  811         2489  
384              
385             # We've used them all and they all failed. Escallate.
386 566 100 100     5540 return if @used == @dsources && @failed == @dsources;
387            
388             # We've used them all but some are good. Purge and reuse.
389 448 100       1116 delete @{$self->used}{@dsources} if @used == @dsources;
  271         1156  
390              
391 448         2447 foreach my $dsource ( shuffle @dsources ) {
392             next if $self->failed->{$dsource}
393 475 100 100     1076 && $self->failed->{$dsource} >= $self->failed_max;
394 473 100       2690 next if $self->used->{$dsource};
395              
396 448         2055 $self->used->{$dsource} = 1;
397 448         2271 return $dsource;
398             }
399 0         0 return;
400             }
401              
402             sub _configure_dsources {
403 113     113   175 my ($self) = @_;
404 113         199 my $dsources = $self->dsources;
405 113         453 $self->dsources({});
406              
407 113         424 while ( my $pri = shift @{$dsources} ) {
  347         1460  
408 234 50       244 my $dsource = shift @{$dsources} or last;
  234         431  
409 234         392 $self->add_to_pri($pri => $dsource);
410             }
411             }
412              
413             sub _connect_dsource {
414 534     534   1084 my ($self, $dsource) = @_;
415 534 100       1394 unless ( $dsource ) {
416 450         1058 $self->_pick_dsource;
417 448         918 $dsource = $self->all_dsources->{$self->current_dsource};
418             }
419              
420             # Support ready-made handles
421 532 100       4249 return $dsource if UNIVERSAL::isa($dsource, 'DBI::db');
422              
423             # Support code-refs which return handles
424 477 100       1225 if (ref $dsource eq 'CODE') {
425 213         1055 my $handle = $dsource->();
426 213 100       83676 return $handle if UNIVERSAL::isa($handle, 'DBI::db');
427 6         15 return undef; # Connect by coderef failed.
428             }
429              
430 264         372 my $dbh;
431 264         953 local $ENV{DBI_AUTOPROXY};
432 264 50   264   717 if (timeout_call( $self->timeout, sub { $dbh = DBI->connect_cached(@{$dsource}) } )) {
  264         33566  
  264         963  
433             #warn "Timeout[", $self->current_dsource, "] at ", time, "\n";
434             }
435 263         77347 return $dbh;
436             }
437              
438             sub connect_dsource {
439 84     84   413 my ($self, $dsource) = @_;
440 84         382 $self->_connect_dsource($dsource);
441             }
442              
443             sub multi_do_all {
444 21     21   74 my ($self, $code) = @_;
445              
446 21         49 my @all = values %{$self->all_dsources};
  21         112  
447              
448 21         232 foreach my $source ( @all ) {
449 84         1033433 my $dbh = $self->connect_dsource($source);
450 84 50       304 next unless $dbh;
451 84 100       1289 if ( $dbh->{handler} ) {
452 12         104 $dbh->{handler}->multi_do_all($code, $source);
453 12         161466 next;
454             }
455 72         355 $code->($dbh);
456             }
457             }
458              
459             1;
460             __END__