File Coverage

blib/lib/DBIx/Class/Storage/DBI/Replicated.pm
Criterion Covered Total %
statement 4 4 100.0
branch 1 2 50.0
condition n/a
subroutine 2 2 100.0
pod n/a
total 7 8 87.5


line stmt bran cond sub pod time code
1             package DBIx::Class::Storage::DBI::Replicated;
2              
3             BEGIN {
4 3     3   37499 use DBIx::Class;
  3         7  
  3         255  
5 3 50   3   113 die('The following modules are required for Replication ' . DBIx::Class::Optional::Dependencies->req_missing_for ('replicated') . "\n" )
6             unless DBIx::Class::Optional::Dependencies->req_ok_for ('replicated');
7             }
8              
9             use Moose;
10             use DBIx::Class::Storage::DBI;
11             use DBIx::Class::Storage::DBI::Replicated::Pool;
12             use DBIx::Class::Storage::DBI::Replicated::Balancer;
13             use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/;
14             use MooseX::Types::Moose qw/ClassName HashRef Object/;
15             use Scalar::Util 'reftype';
16             use Hash::Merge;
17             use List::Util qw/min max reduce/;
18             use Context::Preserve 'preserve_context';
19             use Try::Tiny;
20              
21             use namespace::clean -except => 'meta';
22              
23             =head1 NAME
24              
25             DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support
26              
27             =head1 SYNOPSIS
28              
29             The Following example shows how to change an existing $schema to a replicated
30             storage type, add some replicated (read-only) databases, and perform reporting
31             tasks.
32              
33             You should set the 'storage_type attribute to a replicated type. You should
34             also define your arguments, such as which balancer you want and any arguments
35             that the Pool object should get.
36              
37             my $schema = Schema::Class->clone;
38             $schema->storage_type(['::DBI::Replicated', { balancer_type => '::Random' }]);
39             $schema->connection(...);
40              
41             Next, you need to add in the Replicants. Basically this is an array of
42             arrayrefs, where each arrayref is database connect information. Think of these
43             arguments as what you'd pass to the 'normal' $schema->connect method.
44              
45             $schema->storage->connect_replicants(
46             [$dsn1, $user, $pass, \%opts],
47             [$dsn2, $user, $pass, \%opts],
48             [$dsn3, $user, $pass, \%opts],
49             );
50              
51             Now, just use the $schema as you normally would. Automatically all reads will
52             be delegated to the replicants, while writes to the master.
53              
54             $schema->resultset('Source')->search({name=>'etc'});
55              
56             You can force a given query to use a particular storage using the search
57             attribute 'force_pool'. For example:
58              
59             my $rs = $schema->resultset('Source')->search(undef, {force_pool=>'master'});
60              
61             Now $rs will force everything (both reads and writes) to use whatever was setup
62             as the master storage. 'master' is hardcoded to always point to the Master,
63             but you can also use any Replicant name. Please see:
64             L and the replicants attribute for more.
65              
66             Also see transactions and L for alternative ways to
67             force read traffic to the master. In general, you should wrap your statements
68             in a transaction when you are reading and writing to the same tables at the
69             same time, since your replicants will often lag a bit behind the master.
70              
71             If you have a multi-statement read only transaction you can force it to select
72             a random server in the pool by:
73              
74             my $rs = $schema->resultset('Source')->search( undef,
75             { force_pool => $db->storage->read_handler->next_storage }
76             );
77              
78             =head1 DESCRIPTION
79              
80             Warning: This class is marked BETA. This has been running a production
81             website using MySQL native replication as its backend and we have some decent
82             test coverage but the code hasn't yet been stressed by a variety of databases.
83             Individual DBs may have quirks we are not aware of. Please use this in first
84             development and pass along your experiences/bug fixes.
85              
86             This class implements replicated data store for DBI. Currently you can define
87             one master and numerous slave database connections. All write-type queries
88             (INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master
89             database, all read-type queries (SELECTs) go to the slave database.
90              
91             Basically, any method request that L would normally
92             handle gets delegated to one of the two attributes: L or to
93             L. Additionally, some methods need to be distributed
94             to all existing storages. This way our storage class is a drop in replacement
95             for L.
96              
97             Read traffic is spread across the replicants (slaves) occurring to a user
98             selected algorithm. The default algorithm is random weighted.
99              
100             =head1 NOTES
101              
102             The consistency between master and replicants is database specific. The Pool
103             gives you a method to validate its replicants, removing and replacing them
104             when they fail/pass predefined criteria. Please make careful use of the ways
105             to force a query to run against Master when needed.
106              
107             =head1 REQUIREMENTS
108              
109             Replicated Storage has additional requirements not currently part of
110             L. See L for more details.
111              
112             =head1 ATTRIBUTES
113              
114             This class defines the following attributes.
115              
116             =head2 schema
117              
118             The underlying L object this storage is attaching
119              
120             =cut
121              
122             has 'schema' => (
123             is=>'rw',
124             isa=>DBICSchema,
125             weak_ref=>1,
126             required=>1,
127             );
128              
129             =head2 pool_type
130              
131             Contains the classname which will instantiate the L object. Defaults
132             to: L.
133              
134             =cut
135              
136             has 'pool_type' => (
137             is=>'rw',
138             isa=>ClassName,
139             default=>'DBIx::Class::Storage::DBI::Replicated::Pool',
140             handles=>{
141             'create_pool' => 'new',
142             },
143             );
144              
145             =head2 pool_args
146              
147             Contains a hashref of initialized information to pass to the Balancer object.
148             See L for available arguments.
149              
150             =cut
151              
152             has 'pool_args' => (
153             is=>'rw',
154             isa=>HashRef,
155             lazy=>1,
156             default=>sub { {} },
157             );
158              
159              
160             =head2 balancer_type
161              
162             The replication pool requires a balance class to provider the methods for
163             choose how to spread the query load across each replicant in the pool.
164              
165             =cut
166              
167             has 'balancer_type' => (
168             is=>'rw',
169             isa=>BalancerClassNamePart,
170             coerce=>1,
171             required=>1,
172             default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First',
173             handles=>{
174             'create_balancer' => 'new',
175             },
176             );
177              
178             =head2 balancer_args
179              
180             Contains a hashref of initialized information to pass to the Balancer object.
181             See L for available arguments.
182              
183             =cut
184              
185             has 'balancer_args' => (
186             is=>'rw',
187             isa=>HashRef,
188             lazy=>1,
189             required=>1,
190             default=>sub { {} },
191             );
192              
193             =head2 pool
194              
195             Is a L or derived class. This is a
196             container class for one or more replicated databases.
197              
198             =cut
199              
200             has 'pool' => (
201             is=>'ro',
202             isa=>'DBIx::Class::Storage::DBI::Replicated::Pool',
203             lazy_build=>1,
204             handles=>[qw/
205             connect_replicants
206             replicants
207             has_replicants
208             /],
209             );
210              
211             =head2 balancer
212              
213             Is a L or derived class. This
214             is a class that takes a pool (L)
215              
216             =cut
217              
218             has 'balancer' => (
219             is=>'rw',
220             isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer',
221             lazy_build=>1,
222             handles=>[qw/auto_validate_every/],
223             );
224              
225             =head2 master
226              
227             The master defines the canonical state for a pool of connected databases. All
228             the replicants are expected to match this databases state. Thus, in a classic
229             Master / Slaves distributed system, all the slaves are expected to replicate
230             the Master's state as quick as possible. This is the only database in the
231             pool of databases that is allowed to handle write traffic.
232              
233             =cut
234              
235             has 'master' => (
236             is=> 'ro',
237             isa=>DBICStorageDBI,
238             lazy_build=>1,
239             );
240              
241             =head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE
242              
243             The following methods are delegated all the methods required for the
244             L interface.
245              
246             =cut
247              
248             my $method_dispatch = {
249             writer => [qw/
250             on_connect_do
251             on_disconnect_do
252             on_connect_call
253             on_disconnect_call
254             connect_info
255             _connect_info
256             throw_exception
257             sql_maker
258             sqlt_type
259             create_ddl_dir
260             deployment_statements
261             datetime_parser
262             datetime_parser_type
263             build_datetime_parser
264             last_insert_id
265             insert
266             update
267             delete
268             dbh
269             txn_begin
270             txn_do
271             txn_commit
272             txn_rollback
273             txn_scope_guard
274             _exec_txn_rollback
275             _exec_txn_begin
276             _exec_txn_commit
277             deploy
278             with_deferred_fk_checks
279             dbh_do
280             _prep_for_execute
281             is_datatype_numeric
282             _count_select
283             svp_rollback
284             svp_begin
285             svp_release
286             relname_to_table_alias
287             _dbh_last_insert_id
288             _default_dbi_connect_attributes
289             _dbi_connect_info
290             _dbic_connect_attributes
291             auto_savepoint
292             _query_start
293             _query_end
294             _format_for_trace
295             _dbi_attrs_for_bind
296             bind_attribute_by_data_type
297             transaction_depth
298             _dbh
299             _select_args
300             _dbh_execute_for_fetch
301             _sql_maker
302             _dbh_execute_inserts_with_no_binds
303             _select_args_to_query
304             _gen_sql_bind
305             _svp_generate_name
306             _normalize_connect_info
307             _parse_connect_do
308             savepoints
309             _sql_maker_opts
310             _use_multicolumn_in
311             _conn_pid
312             _dbh_autocommit
313             _native_data_type
314             _get_dbh
315             sql_maker_class
316             insert_bulk
317             _insert_bulk
318             _execute
319             _do_query
320             _dbh_execute
321             /, Class::MOP::Class->initialize('DBIx::Class::Storage::DBIHacks')->get_method_list ],
322             reader => [qw/
323             select
324             select_single
325             columns_info_for
326             _dbh_columns_info_for
327             _select
328             /],
329             unimplemented => [qw/
330             _arm_global_destructor
331             _verify_pid
332              
333             get_use_dbms_capability
334             set_use_dbms_capability
335             get_dbms_capability
336             set_dbms_capability
337             _dbh_details
338             _dbh_get_info
339              
340             _determine_connector_driver
341             _extract_driver_from_connect_info
342             _describe_connection
343             _warn_undetermined_driver
344              
345             sql_limit_dialect
346             sql_quote_char
347             sql_name_sep
348              
349             _prefetch_autovalues
350             _perform_autoinc_retrieval
351             _autoinc_supplied_for_op
352              
353             _resolve_bindattrs
354              
355             _max_column_bytesize
356             _is_lob_type
357             _is_binary_lob_type
358             _is_binary_type
359             _is_text_lob_type
360              
361             _prepare_sth
362             _bind_sth_params
363             /,(
364             # the capability framework
365             # not sure if CMOP->initialize does evil things to DBIC::S::DBI, fix if a problem
366             grep
367             { $_ =~ /^ _ (?: use | supports | determine_supports ) _ /x and $_ ne '_use_multicolumn_in' }
368             ( Class::MOP::Class->initialize('DBIx::Class::Storage::DBI')->get_all_method_names )
369             )],
370             };
371              
372             if (DBIx::Class::_ENV_::DBICTEST) {
373              
374             my $seen;
375             for my $type (keys %$method_dispatch) {
376             for (@{$method_dispatch->{$type}}) {
377             push @{$seen->{$_}}, $type;
378             }
379             }
380              
381             if (my @dupes = grep { @{$seen->{$_}} > 1 } keys %$seen) {
382             die(join "\n", '',
383             'The following methods show up multiple times in ::Storage::DBI::Replicated handlers:',
384             (map { "$_: " . (join ', ', @{$seen->{$_}}) } sort @dupes),
385             '',
386             );
387             }
388              
389             if (my @cant = grep { ! DBIx::Class::Storage::DBI->can($_) } keys %$seen) {
390             die(join "\n", '',
391             '::Storage::DBI::Replicated specifies handling of the following *NON EXISTING* ::Storage::DBI methods:',
392             @cant,
393             '',
394             );
395             }
396             }
397              
398             for my $method (@{$method_dispatch->{unimplemented}}) {
399             __PACKAGE__->meta->add_method($method, sub {
400             my $self = shift;
401             $self->throw_exception("$method() must not be called on ".(blessed $self).' objects');
402             });
403             }
404              
405             =head2 read_handler
406              
407             Defines an object that implements the read side of L.
408              
409             =cut
410              
411             has 'read_handler' => (
412             is=>'rw',
413             isa=>Object,
414             lazy_build=>1,
415             handles=>$method_dispatch->{reader},
416             );
417              
418             =head2 write_handler
419              
420             Defines an object that implements the write side of L,
421             as well as methods that don't write or read that can be called on only one
422             storage, methods that return a C<$dbh>, and any methods that don't make sense to
423             run on a replicant.
424              
425             =cut
426              
427             has 'write_handler' => (
428             is=>'ro',
429             isa=>Object,
430             lazy_build=>1,
431             handles=>$method_dispatch->{writer},
432             );
433              
434              
435              
436             has _master_connect_info_opts =>
437             (is => 'rw', isa => HashRef, default => sub { {} });
438              
439             =head2 around: connect_info
440              
441             Preserves master's C options (for merging with replicants.)
442             Also sets any Replicated-related options from connect_info, such as
443             C, C, C and C.
444              
445             =cut
446              
447             around connect_info => sub {
448             my ($next, $self, $info, @extra) = @_;
449              
450             $self->throw_exception(
451             'connect_info can not be retrieved from a replicated storage - '
452             . 'accessor must be called on a specific pool instance'
453             ) unless defined $info;
454              
455             my $merge = Hash::Merge->new('LEFT_PRECEDENT');
456              
457             my %opts;
458             for my $arg (@$info) {
459             next unless (reftype($arg)||'') eq 'HASH';
460             %opts = %{ $merge->merge($arg, \%opts) };
461             }
462             delete $opts{dsn};
463              
464             if (@opts{qw/pool_type pool_args/}) {
465             $self->pool_type(delete $opts{pool_type})
466             if $opts{pool_type};
467              
468             $self->pool_args(
469             $merge->merge((delete $opts{pool_args} || {}), $self->pool_args)
470             );
471              
472             ## Since we possibly changed the pool_args, we need to clear the current
473             ## pool object so that next time it is used it will be rebuilt.
474             $self->clear_pool;
475             }
476              
477             if (@opts{qw/balancer_type balancer_args/}) {
478             $self->balancer_type(delete $opts{balancer_type})
479             if $opts{balancer_type};
480              
481             $self->balancer_args(
482             $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args)
483             );
484              
485             $self->balancer($self->_build_balancer)
486             if $self->balancer;
487             }
488              
489             $self->_master_connect_info_opts(\%opts);
490              
491             return preserve_context {
492             $self->$next($info, @extra);
493             } after => sub {
494             # Make sure master is blessed into the correct class and apply role to it.
495             my $master = $self->master;
496             $master->_determine_driver;
497             Moose::Meta::Class->initialize(ref $master);
498              
499             DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master);
500              
501             # link pool back to master
502             $self->pool->master($master);
503             };
504             };
505              
506             =head1 METHODS
507              
508             This class defines the following methods.
509              
510             =head2 BUILDARGS
511              
512             L when instantiating its storage passed itself as the
513             first argument. So we need to massage the arguments a bit so that all the
514             bits get put into the correct places.
515              
516             =cut
517              
518             sub BUILDARGS {
519             my ($class, $schema, $storage_type_args, @args) = @_;
520              
521             return {
522             schema=>$schema,
523             %$storage_type_args,
524             @args
525             }
526             }
527              
528             =head2 _build_master
529              
530             Lazy builder for the L attribute.
531              
532             =cut
533              
534             sub _build_master {
535             my $self = shift @_;
536             my $master = DBIx::Class::Storage::DBI->new($self->schema);
537             $master
538             }
539              
540             =head2 _build_pool
541              
542             Lazy builder for the L attribute.
543              
544             =cut
545              
546             sub _build_pool {
547             my $self = shift @_;
548             $self->create_pool(%{$self->pool_args});
549             }
550              
551             =head2 _build_balancer
552              
553             Lazy builder for the L attribute. This takes a Pool object so that
554             the balancer knows which pool it's balancing.
555              
556             =cut
557              
558             sub _build_balancer {
559             my $self = shift @_;
560             $self->create_balancer(
561             pool=>$self->pool,
562             master=>$self->master,
563             %{$self->balancer_args},
564             );
565             }
566              
567             =head2 _build_write_handler
568              
569             Lazy builder for the L attribute. The default is to set this to
570             the L.
571              
572             =cut
573              
574             sub _build_write_handler {
575             return shift->master;
576             }
577              
578             =head2 _build_read_handler
579              
580             Lazy builder for the L attribute. The default is to set this to
581             the L.
582              
583             =cut
584              
585             sub _build_read_handler {
586             return shift->balancer;
587             }
588              
589             =head2 around: connect_replicants
590              
591             All calls to connect_replicants needs to have an existing $schema tacked onto
592             top of the args, since L needs it, and any
593             L
594             options merged with the master, with replicant opts having higher priority.
595              
596             =cut
597              
598             around connect_replicants => sub {
599             my ($next, $self, @args) = @_;
600              
601             for my $r (@args) {
602             $r = [ $r ] unless reftype $r eq 'ARRAY';
603              
604             $self->throw_exception('coderef replicant connect_info not supported')
605             if ref $r->[0] && reftype $r->[0] eq 'CODE';
606              
607             # any connect_info options?
608             my $i = 0;
609             $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH';
610              
611             # make one if none
612             $r->[$i] = {} unless $r->[$i];
613              
614             # merge if two hashes
615             my @hashes = @$r[$i .. $#{$r}];
616              
617             $self->throw_exception('invalid connect_info options')
618             if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes;
619              
620             $self->throw_exception('too many hashrefs in connect_info')
621             if @hashes > 2;
622              
623             my $merge = Hash::Merge->new('LEFT_PRECEDENT');
624             my %opts = %{ $merge->merge(reverse @hashes) };
625              
626             # delete them
627             splice @$r, $i+1, ($#{$r} - $i), ();
628              
629             # make sure master/replicants opts don't clash
630             my %master_opts = %{ $self->_master_connect_info_opts };
631             if (exists $opts{dbh_maker}) {
632             delete @master_opts{qw/dsn user password/};
633             }
634             delete $master_opts{dbh_maker};
635              
636             # merge with master
637             %opts = %{ $merge->merge(\%opts, \%master_opts) };
638              
639             # update
640             $r->[$i] = \%opts;
641             }
642              
643             $self->$next($self->schema, @args);
644             };
645              
646             =head2 all_storages
647              
648             Returns an array of all the connected storage backends. The first element
649             in the returned array is the master, and the rest are each of the
650             replicants.
651              
652             =cut
653              
654             sub all_storages {
655             my $self = shift @_;
656             return grep {defined $_ && blessed $_} (
657             $self->master,
658             values %{ $self->replicants },
659             );
660             }
661              
662             =head2 execute_reliably ($coderef, ?@args)
663              
664             Given a coderef, saves the current state of the L, forces it to
665             use reliable storage (e.g. sets it to the master), executes a coderef and then
666             restores the original state.
667              
668             Example:
669              
670             my $reliably = sub {
671             my $name = shift @_;
672             $schema->resultset('User')->create({name=>$name});
673             my $user_rs = $schema->resultset('User')->find({name=>$name});
674             return $user_rs;
675             };
676              
677             my $user_rs = $schema->storage->execute_reliably($reliably, 'John');
678              
679             Use this when you must be certain of your database state, such as when you just
680             inserted something and need to get a resultset including it, etc.
681              
682             =cut
683              
684             sub execute_reliably {
685             my $self = shift;
686             my $coderef = shift;
687              
688             $self->throw_exception('Second argument must be a coderef')
689             unless( ref $coderef eq 'CODE');
690              
691             ## replace the current read handler for the remainder of the scope
692             local $self->{read_handler} = $self->master;
693              
694             &$coderef;
695             }
696              
697             =head2 set_reliable_storage
698              
699             Sets the current $schema to be 'reliable', that is all queries, both read and
700             write are sent to the master
701              
702             =cut
703              
704             sub set_reliable_storage {
705             my $self = shift @_;
706             my $schema = $self->schema;
707             my $write_handler = $self->schema->storage->write_handler;
708              
709             $schema->storage->read_handler($write_handler);
710             }
711              
712             =head2 set_balanced_storage
713              
714             Sets the current $schema to be use the for all reads, while all
715             writes are sent to the master only
716              
717             =cut
718              
719             sub set_balanced_storage {
720             my $self = shift @_;
721             my $schema = $self->schema;
722             my $balanced_handler = $self->schema->storage->balancer;
723              
724             $schema->storage->read_handler($balanced_handler);
725             }
726              
727             =head2 connected
728              
729             Check that the master and at least one of the replicants is connected.
730              
731             =cut
732              
733             sub connected {
734             my $self = shift @_;
735             return
736             $self->master->connected &&
737             $self->pool->connected_replicants;
738             }
739              
740             =head2 ensure_connected
741              
742             Make sure all the storages are connected.
743              
744             =cut
745              
746             sub ensure_connected {
747             my $self = shift @_;
748             foreach my $source ($self->all_storages) {
749             $source->ensure_connected(@_);
750             }
751             }
752              
753             =head2 limit_dialect
754              
755             Set the limit_dialect for all existing storages
756              
757             =cut
758              
759             sub limit_dialect {
760             my $self = shift @_;
761             foreach my $source ($self->all_storages) {
762             $source->limit_dialect(@_);
763             }
764             return $self->master->limit_dialect;
765             }
766              
767             =head2 quote_char
768              
769             Set the quote_char for all existing storages
770              
771             =cut
772              
773             sub quote_char {
774             my $self = shift @_;
775             foreach my $source ($self->all_storages) {
776             $source->quote_char(@_);
777             }
778             return $self->master->quote_char;
779             }
780              
781             =head2 name_sep
782              
783             Set the name_sep for all existing storages
784              
785             =cut
786              
787             sub name_sep {
788             my $self = shift @_;
789             foreach my $source ($self->all_storages) {
790             $source->name_sep(@_);
791             }
792             return $self->master->name_sep;
793             }
794              
795             =head2 set_schema
796              
797             Set the schema object for all existing storages
798              
799             =cut
800              
801             sub set_schema {
802             my $self = shift @_;
803             foreach my $source ($self->all_storages) {
804             $source->set_schema(@_);
805             }
806             }
807              
808             =head2 debug
809              
810             set a debug flag across all storages
811              
812             =cut
813              
814             sub debug {
815             my $self = shift @_;
816             if(@_) {
817             foreach my $source ($self->all_storages) {
818             $source->debug(@_);
819             }
820             }
821             return $self->master->debug;
822             }
823              
824             =head2 debugobj
825              
826             set a debug object
827              
828             =cut
829              
830             sub debugobj {
831             my $self = shift @_;
832             return $self->master->debugobj(@_);
833             }
834              
835             =head2 debugfh
836              
837             set a debugfh object
838              
839             =cut
840              
841             sub debugfh {
842             my $self = shift @_;
843             return $self->master->debugfh(@_);
844             }
845              
846             =head2 debugcb
847              
848             set a debug callback
849              
850             =cut
851              
852             sub debugcb {
853             my $self = shift @_;
854             return $self->master->debugcb(@_);
855             }
856              
857             =head2 disconnect
858              
859             disconnect everything
860              
861             =cut
862              
863             sub disconnect {
864             my $self = shift @_;
865             foreach my $source ($self->all_storages) {
866             $source->disconnect(@_);
867             }
868             }
869              
870             =head2 cursor_class
871              
872             set cursor class on all storages, or return master's
873              
874             =cut
875              
876             sub cursor_class {
877             my ($self, $cursor_class) = @_;
878              
879             if ($cursor_class) {
880             $_->cursor_class($cursor_class) for $self->all_storages;
881             }
882             $self->master->cursor_class;
883             }
884              
885             =head2 cursor
886              
887             set cursor class on all storages, or return master's, alias for L
888             above.
889              
890             =cut
891              
892             sub cursor {
893             my ($self, $cursor_class) = @_;
894              
895             if ($cursor_class) {
896             $_->cursor($cursor_class) for $self->all_storages;
897             }
898             $self->master->cursor;
899             }
900              
901             =head2 unsafe
902              
903             sets the L option on all storages or returns
904             master's current setting
905              
906             =cut
907              
908             sub unsafe {
909             my $self = shift;
910              
911             if (@_) {
912             $_->unsafe(@_) for $self->all_storages;
913             }
914              
915             return $self->master->unsafe;
916             }
917              
918             =head2 disable_sth_caching
919              
920             sets the L option on all storages
921             or returns master's current setting
922              
923             =cut
924              
925             sub disable_sth_caching {
926             my $self = shift;
927              
928             if (@_) {
929             $_->disable_sth_caching(@_) for $self->all_storages;
930             }
931              
932             return $self->master->disable_sth_caching;
933             }
934              
935             =head2 lag_behind_master
936              
937             returns the highest Replicant L
938             setting
939              
940             =cut
941              
942             sub lag_behind_master {
943             my $self = shift;
944              
945             return max map $_->lag_behind_master, $self->replicants;
946             }
947              
948             =head2 is_replicating
949              
950             returns true if all replicants return true for
951             L
952              
953             =cut
954              
955             sub is_replicating {
956             my $self = shift;
957              
958             return (grep $_->is_replicating, $self->replicants) == ($self->replicants);
959             }
960              
961             =head2 connect_call_datetime_setup
962              
963             calls L for all storages
964              
965             =cut
966              
967             sub connect_call_datetime_setup {
968             my $self = shift;
969             $_->connect_call_datetime_setup for $self->all_storages;
970             }
971              
972             sub _populate_dbh {
973             my $self = shift;
974             $_->_populate_dbh for $self->all_storages;
975             }
976              
977             sub _connect {
978             my $self = shift;
979             $_->_connect for $self->all_storages;
980             }
981              
982             sub _rebless {
983             my $self = shift;
984             $_->_rebless for $self->all_storages;
985             }
986              
987             sub _determine_driver {
988             my $self = shift;
989             $_->_determine_driver for $self->all_storages;
990             }
991              
992             sub _driver_determined {
993             my $self = shift;
994              
995             if (@_) {
996             $_->_driver_determined(@_) for $self->all_storages;
997             }
998              
999             return $self->master->_driver_determined;
1000             }
1001              
1002             sub _init {
1003             my $self = shift;
1004              
1005             $_->_init for $self->all_storages;
1006             }
1007              
1008             sub _run_connection_actions {
1009             my $self = shift;
1010              
1011             $_->_run_connection_actions for $self->all_storages;
1012             }
1013              
1014             sub _do_connection_actions {
1015             my $self = shift;
1016              
1017             if (@_) {
1018             $_->_do_connection_actions(@_) for $self->all_storages;
1019             }
1020             }
1021              
1022             sub connect_call_do_sql {
1023             my $self = shift;
1024             $_->connect_call_do_sql(@_) for $self->all_storages;
1025             }
1026              
1027             sub disconnect_call_do_sql {
1028             my $self = shift;
1029             $_->disconnect_call_do_sql(@_) for $self->all_storages;
1030             }
1031              
1032             sub _seems_connected {
1033             my $self = shift;
1034              
1035             return min map $_->_seems_connected, $self->all_storages;
1036             }
1037              
1038             sub _ping {
1039             my $self = shift;
1040              
1041             return min map $_->_ping, $self->all_storages;
1042             }
1043              
1044             # not using the normalized_version, because we want to preserve
1045             # version numbers much longer than the conventional xxx.yyyzzz
1046             my $numify_ver = sub {
1047             my $ver = shift;
1048             my @numparts = split /\D+/, $ver;
1049             my $format = '%d.' . (join '', ('%06d') x (@numparts - 1));
1050              
1051             return sprintf $format, @numparts;
1052             };
1053             sub _server_info {
1054             my $self = shift;
1055              
1056             if (not $self->_dbh_details->{info}) {
1057             $self->_dbh_details->{info} = (
1058             reduce { $a->[0] < $b->[0] ? $a : $b }
1059             map [ $numify_ver->($_->{dbms_version}), $_ ],
1060             map $_->_server_info, $self->all_storages
1061             )->[1];
1062             }
1063              
1064             return $self->next::method;
1065             }
1066              
1067             sub _get_server_version {
1068             my $self = shift;
1069              
1070             return $self->_server_info->{dbms_version};
1071             }
1072              
1073             =head1 GOTCHAS
1074              
1075             Due to the fact that replicants can lag behind a master, you must take care to
1076             make sure you use one of the methods to force read queries to a master should
1077             you need realtime data integrity. For example, if you insert a row, and then
1078             immediately re-read it from the database (say, by doing
1079             L<< $result->discard_changes|DBIx::Class::Row/discard_changes >>)
1080             or you insert a row and then immediately build a query that expects that row
1081             to be an item, you should force the master to handle reads. Otherwise, due to
1082             the lag, there is no certainty your data will be in the expected state.
1083              
1084             For data integrity, all transactions automatically use the master storage for
1085             all read and write queries. Using a transaction is the preferred and recommended
1086             method to force the master to handle all read queries.
1087              
1088             Otherwise, you can force a single query to use the master with the 'force_pool'
1089             attribute:
1090              
1091             my $result = $resultset->search(undef, {force_pool=>'master'})->find($pk);
1092              
1093             This attribute will safely be ignored by non replicated storages, so you can use
1094             the same code for both types of systems.
1095              
1096             Lastly, you can use the L method, which works very much like
1097             a transaction.
1098              
1099             For debugging, you can turn replication on/off with the methods L
1100             and L, however this operates at a global level and is not
1101             suitable if you have a shared Schema object being used by multiple processes,
1102             such as on a web application server. You can get around this limitation by
1103             using the Schema clone method.
1104              
1105             my $new_schema = $schema->clone;
1106             $new_schema->set_reliable_storage;
1107              
1108             ## $new_schema will use only the Master storage for all reads/writes while
1109             ## the $schema object will use replicated storage.
1110              
1111             =head1 FURTHER QUESTIONS?
1112              
1113             Check the list of L.
1114              
1115             =head1 COPYRIGHT AND LICENSE
1116              
1117             This module is free software L
1118             by the L. You can
1119             redistribute it and/or modify it under the same terms as the
1120             L.
1121              
1122             =cut
1123              
1124             __PACKAGE__->meta->make_immutable;
1125              
1126             1;