File Coverage

blib/lib/DBIx/Class/Storage/DBI/Replicated/Pool.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             package DBIx::Class::Storage::DBI::Replicated::Pool;
2              
3 3     3   7309 use Moose;
  0            
  0            
4             use DBIx::Class::Storage::DBI::Replicated::Replicant;
5             use List::Util 'sum';
6             use Scalar::Util 'reftype';
7             use DBI ();
8             use MooseX::Types::Moose qw/Num Int ClassName HashRef/;
9             use DBIx::Class::Storage::DBI::Replicated::Types 'DBICStorageDBI';
10             use Try::Tiny;
11              
12             use namespace::clean -except => 'meta';
13              
14             =head1 NAME
15              
16             DBIx::Class::Storage::DBI::Replicated::Pool - Manage a pool of replicants
17              
18             =head1 SYNOPSIS
19              
20             This class is used internally by L. You
21             shouldn't need to create instances of this class.
22              
23             =head1 DESCRIPTION
24              
25             In a replicated storage type, there is at least one replicant to handle the
26             read-only traffic. The Pool class manages this replicant, or list of
27             replicants, and gives some methods for querying information about their status.
28              
29             =head1 ATTRIBUTES
30              
31             This class defines the following attributes.
32              
33             =head2 maximum_lag ($num)
34              
35             This is a number which defines the maximum allowed lag returned by the
36             L method. The default is 0. In
37             general, this should return a larger number when the replicant is lagging
38             behind its master, however the implementation of this is database specific, so
39             don't count on this number having a fixed meaning. For example, MySQL will
40             return a number of seconds that the replicating database is lagging.
41              
42             =cut
43              
44             has 'maximum_lag' => (
45             is=>'rw',
46             isa=>Num,
47             required=>1,
48             lazy=>1,
49             default=>0,
50             );
51              
52             =head2 last_validated
53              
54             This is an integer representing a time since the last time the replicants were
55             validated. It's nothing fancy, just an integer provided via the perl L
56             built-in.
57              
58             =cut
59              
60             has 'last_validated' => (
61             is=>'rw',
62             isa=>Int,
63             reader=>'last_validated',
64             writer=>'_last_validated',
65             lazy=>1,
66             default=>0,
67             );
68              
69             =head2 replicant_type ($classname)
70              
71             Base class used to instantiate replicants that are in the pool. Unless you
72             need to subclass L you should
73             just leave this alone.
74              
75             =cut
76              
77             has 'replicant_type' => (
78             is=>'ro',
79             isa=>ClassName,
80             required=>1,
81             default=>'DBIx::Class::Storage::DBI',
82             handles=>{
83             'create_replicant' => 'new',
84             },
85             );
86              
87             =head2 replicants
88              
89             A hashref of replicant, with the key being the dsn and the value returning the
90             actual replicant storage. For example, if the $dsn element is something like:
91              
92             "dbi:SQLite:dbname=dbfile"
93              
94             You could access the specific replicant via:
95              
96             $schema->storage->replicants->{'dbname=dbfile'}
97              
98             This attributes also supports the following helper methods:
99              
100             =over 4
101              
102             =item set_replicant($key=>$storage)
103              
104             Pushes a replicant onto the HashRef under $key
105              
106             =item get_replicant($key)
107              
108             Retrieves the named replicant
109              
110             =item has_replicants
111              
112             Returns true if the Pool defines replicants.
113              
114             =item num_replicants
115              
116             The number of replicants in the pool
117              
118             =item delete_replicant ($key)
119              
120             Removes the replicant under $key from the pool
121              
122             =back
123              
124             =cut
125              
126             has 'replicants' => (
127             is=>'rw',
128             traits => ['Hash'],
129             isa=>HashRef['Object'],
130             default=>sub {{}},
131             handles => {
132             'set_replicant' => 'set',
133             'get_replicant' => 'get',
134             'has_replicants' => 'is_empty',
135             'num_replicants' => 'count',
136             'delete_replicant' => 'delete',
137             'all_replicant_storages' => 'values',
138             },
139             );
140              
141             around has_replicants => sub {
142             my ($orig, $self) = @_;
143             return !$self->$orig;
144             };
145              
146             has next_unknown_replicant_id => (
147             is => 'rw',
148             traits => ['Counter'],
149             isa => Int,
150             default => 1,
151             handles => {
152             'inc_unknown_replicant_id' => 'inc',
153             },
154             );
155              
156             =head2 master
157              
158             Reference to the master Storage.
159              
160             =cut
161              
162             has master => (is => 'rw', isa => DBICStorageDBI, weak_ref => 1);
163              
164             =head1 METHODS
165              
166             This class defines the following methods.
167              
168             =head2 connect_replicants ($schema, Array[$connect_info])
169              
170             Given an array of $dsn or connect_info structures suitable for connected to a
171             database, create an L object
172             and store it in the L attribute.
173              
174             =cut
175              
176             sub connect_replicants {
177             my $self = shift @_;
178             my $schema = shift @_;
179              
180             my @newly_created = ();
181             foreach my $connect_info (@_) {
182             $connect_info = [ $connect_info ]
183             if reftype $connect_info ne 'ARRAY';
184              
185             my $connect_coderef =
186             (reftype($connect_info->[0])||'') eq 'CODE' ? $connect_info->[0]
187             : (reftype($connect_info->[0])||'') eq 'HASH' &&
188             $connect_info->[0]->{dbh_maker};
189              
190             my $dsn;
191             my $replicant = do {
192             # yes this is evil, but it only usually happens once (for coderefs)
193             # this will fail if the coderef does not actually DBI::connect
194             no warnings 'redefine';
195             my $connect = \&DBI::connect;
196             local *DBI::connect = sub {
197             $dsn = $_[1];
198             goto $connect;
199             };
200             $self->connect_replicant($schema, $connect_info);
201             };
202              
203             my $key;
204              
205             if (!$dsn) {
206             if (!$connect_coderef) {
207             $dsn = $connect_info->[0];
208             $dsn = $dsn->{dsn} if (reftype($dsn)||'') eq 'HASH';
209             }
210             else {
211             # all attempts to get the DSN failed
212             $key = "UNKNOWN_" . $self->next_unknown_replicant_id;
213             $self->inc_unknown_replicant_id;
214             }
215             }
216             if ($dsn) {
217             $replicant->dsn($dsn);
218             ($key) = ($dsn =~ m/^dbi\:.+\:(.+)$/i);
219             }
220              
221             $replicant->id($key);
222             $self->set_replicant($key => $replicant);
223              
224             push @newly_created, $replicant;
225             }
226              
227             return @newly_created;
228             }
229              
230             =head2 connect_replicant ($schema, $connect_info)
231              
232             Given a schema object and a hashref of $connect_info, connect the replicant
233             and return it.
234              
235             =cut
236              
237             sub connect_replicant {
238             my ($self, $schema, $connect_info) = @_;
239             my $replicant = $self->create_replicant($schema);
240             $replicant->connect_info($connect_info);
241              
242             ## It is undesirable for catalyst to connect at ->conect_replicants time, as
243             ## connections should only happen on the first request that uses the database.
244             ## So we try to set the driver without connecting, however this doesn't always
245             ## work, as a driver may need to connect to determine the DB version, and this
246             ## may fail.
247             ##
248             ## Why this is necessary at all, is that we need to have the final storage
249             ## class to apply the Replicant role.
250              
251             $self->_safely($replicant, '->_determine_driver', sub {
252             $replicant->_determine_driver
253             });
254              
255             Moose::Meta::Class->initialize(ref $replicant);
256              
257             DBIx::Class::Storage::DBI::Replicated::Replicant->meta->apply($replicant);
258              
259             # link back to master
260             $replicant->master($self->master);
261              
262             return $replicant;
263             }
264              
265             =head2 _safely_ensure_connected ($replicant)
266              
267             The standard ensure_connected method with throw an exception should it fail to
268             connect. For the master database this is desirable, but since replicants are
269             allowed to fail, this behavior is not desirable. This method wraps the call
270             to ensure_connected in an eval in order to catch any generated errors. That
271             way a slave can go completely offline (e.g. the box itself can die) without
272             bringing down your entire pool of databases.
273              
274             =cut
275              
276             sub _safely_ensure_connected {
277             my ($self, $replicant, @args) = @_;
278              
279             return $self->_safely($replicant, '->ensure_connected', sub {
280             $replicant->ensure_connected(@args)
281             });
282             }
283              
284             =head2 _safely ($replicant, $name, $code)
285              
286             Execute C<$code> for operation C<$name> catching any exceptions and printing an
287             error message to the C<<$replicant->debugobj>>.
288              
289             Returns 1 on success and undef on failure.
290              
291             =cut
292              
293             sub _safely {
294             my ($self, $replicant, $name, $code) = @_;
295              
296             return try {
297             $code->();
298             1;
299             } catch {
300             $replicant->debugobj->print(sprintf(
301             "Exception trying to $name for replicant %s, error is %s",
302             $replicant->_dbi_connect_info->[0], $_)
303             );
304             undef;
305             };
306             }
307              
308             =head2 connected_replicants
309              
310             Returns true if there are connected replicants. Actually is overloaded to
311             return the number of replicants. So you can do stuff like:
312              
313             if( my $num_connected = $storage->has_connected_replicants ) {
314             print "I have $num_connected connected replicants";
315             } else {
316             print "Sorry, no replicants.";
317             }
318              
319             This method will actually test that each replicant in the L hashref
320             is actually connected, try not to hit this 10 times a second.
321              
322             =cut
323              
324             sub connected_replicants {
325             my $self = shift @_;
326             return sum( map {
327             $_->connected ? 1:0
328             } $self->all_replicants );
329             }
330              
331             =head2 active_replicants
332              
333             This is an array of replicants that are considered to be active in the pool.
334             This does not check to see if they are connected, but if they are not, DBIC
335             should automatically reconnect them for us when we hit them with a query.
336              
337             =cut
338              
339             sub active_replicants {
340             my $self = shift @_;
341             return ( grep {$_} map {
342             $_->active ? $_:0
343             } $self->all_replicants );
344             }
345              
346             =head2 all_replicants
347              
348             Just a simple array of all the replicant storages. No particular order to the
349             array is given, nor should any meaning be derived.
350              
351             =cut
352              
353             sub all_replicants {
354             my $self = shift @_;
355             return values %{$self->replicants};
356             }
357              
358             =head2 validate_replicants
359              
360             This does a check to see if 1) each replicate is connected (or reconnectable),
361             2) that is ->is_replicating, and 3) that it is not exceeding the lag amount
362             defined by L. Replicants that fail any of these tests are set to
363             inactive, and thus removed from the replication pool.
364              
365             This tests L, since a replicant that has been previous marked
366             as inactive can be reactivated should it start to pass the validation tests again.
367              
368             See L for more about checking if a replicating
369             connection is not following a master or is lagging.
370              
371             Calling this method will generate queries on the replicant databases so it is
372             not recommended that you run them very often.
373              
374             This method requires that your underlying storage engine supports some sort of
375             native replication mechanism. Currently only MySQL native replication is
376             supported. Your patches to make other replication types work are welcomed.
377              
378             =cut
379              
380             sub validate_replicants {
381             my $self = shift @_;
382             foreach my $replicant($self->all_replicants) {
383             if($self->_safely_ensure_connected($replicant)) {
384             my $is_replicating = $replicant->is_replicating;
385             unless(defined $is_replicating) {
386             $replicant->debugobj->print("Storage Driver ".ref($self)." Does not support the 'is_replicating' method. Assuming you are manually managing.\n");
387             next;
388             } else {
389             if($is_replicating) {
390             my $lag_behind_master = $replicant->lag_behind_master;
391             unless(defined $lag_behind_master) {
392             $replicant->debugobj->print("Storage Driver ".ref($self)." Does not support the 'lag_behind_master' method. Assuming you are manually managing.\n");
393             next;
394             } else {
395             if($lag_behind_master <= $self->maximum_lag) {
396             $replicant->active(1);
397             } else {
398             $replicant->active(0);
399             }
400             }
401             } else {
402             $replicant->active(0);
403             }
404             }
405             } else {
406             $replicant->active(0);
407             }
408             }
409             ## Mark that we completed this validation.
410             $self->_last_validated(time);
411             }
412              
413             =head1 FURTHER QUESTIONS?
414              
415             Check the list of L.
416              
417             =head1 COPYRIGHT AND LICENSE
418              
419             This module is free software L
420             by the L. You can
421             redistribute it and/or modify it under the same terms as the
422             L.
423              
424             =cut
425              
426             __PACKAGE__->meta->make_immutable;
427              
428             1;