File Coverage

blib/lib/DBIx/Aurora/Cluster.pm
Criterion Covered Total %
statement 22 105 20.9
branch 0 42 0.0
condition 0 12 0.0
subroutine 7 18 38.8
pod 0 8 0.0
total 29 185 15.6


line stmt bran cond sub pod time code
1             package DBIx::Aurora::Cluster;
2 1     1   3 use strict;
  1         1  
  1         22  
3 1     1   2 use warnings;
  1         1  
  1         17  
4 1     1   3 use Carp;
  1         2  
  1         39  
5 1     1   454 use Time::HiRes;
  1         979  
  1         4  
6 1     1   78 use List::Util 'shuffle';
  1         1  
  1         82  
7 1     1   317 use DBIx::Aurora::Instance;
  1         2  
  1         37  
8             use constant {
9 1         1 map { ($_ => $_) } qw(
  2         747  
10             ERROR_TYPE_CONNECT_WRITER
11             ERROR_TYPE_CONNECT_READER
12             )
13 1     1   4 };
  1         2  
14              
15             sub new {
16 0     0 0   my ($class, $instances, $opts) = @_;
17             my $self = bless {
18             force_reader_only => 0,
19             reconnect_interval => 5,
20       0     logger => sub {},
21 0           %$opts,
22             instances => { },
23             }, $class;
24              
25 0           for (my $i = 0; $i < @$instances; $i++) {
26 0           my $instance = $instances->[$i];
27 0           my $aurora_instance = DBIx::Aurora::Instance->new(@$instance);
28 0           $self->{instances}{$i} = {
29             instance => $aurora_instance,
30             };
31             }
32              
33 0           $self;
34             }
35              
36             sub log {
37 0     0 0   my ($self, $error_type, $message, $exception) = @_;
38 0           $self->{logger}->($error_type, $message, $exception);
39             }
40              
41 0     0 0   sub aurora_instances { map { $_->{instance} } values %{$_[0]->{instances}} }
  0            
  0            
42              
43             sub _update_connectivity {
44 0     0     my ($self, $instance) = @_;
45              
46 0 0         if ($instance->connected_at + $self->{reconnect_interval} < Time::HiRes::time) {
47 0           $instance->disconnect;
48 0           $instance->handler;
49             }
50             }
51              
52             sub writer {
53 0     0 0   my ($self, $callback) = @_;
54 0           my $wantarray = wantarray;
55              
56 0           my @maybe_readers = grep { $_->maybe_reader } $self->aurora_instances;
  0            
57 0           my @maybe_writers = grep { $_->maybe_writer } $self->aurora_instances;
  0            
58              
59 0           my $instance;
60 0           for my $i (shuffle(@maybe_writers), shuffle(@maybe_readers)) {
61 0 0         next if $i->is_unreachable;
62              
63 0           eval { $self->_update_connectivity($i) };
  0            
64 0 0         if (my $e = $@) {
65 0           $self->log(ERROR_TYPE_CONNECT_WRITER, "Aurora Writer connection error", $e);
66              
67 0 0 0       if (ref $e && $e->isa('DBIx::Aurora::Instance::Exception::Connectivity')) {
68 0           next; # connectivity issue, try next reader
69             } else {
70 0           Carp::croak($e);
71             }
72             } else {
73 0 0         if ($i->is_writer) {
74 0           $instance = $i;
75 0           last;
76             } else {
77 0           next;
78             }
79             }
80             }
81              
82 0 0         unless ($instance) {
83 0           Carp::croak "No writer found";
84             }
85              
86 0           my @ret = eval {
87 0 0         $wantarray
88             ? $instance->handler->txn($callback)
89             : scalar $instance->handler->txn($callback);
90             };
91 0 0         if (my $e = $@) {
92 0           Carp::croak($e);
93             }
94              
95 0 0         return $wantarray ? @ret : $ret[0];
96             }
97              
98             sub reader {
99 0     0 0   my ($self, $callback) = @_;
100 0           my $wantarray = wantarray;
101              
102 0           my @is_reader = grep { $_->is_reader } $self->aurora_instances;
  0            
103 0           my @maybe_readers = grep { $_->maybe_reader } $self->aurora_instances;
  0            
104 0           my @maybe_writers = grep { $_->maybe_writer } $self->aurora_instances;
  0            
105              
106 0           my $instance;
107 0           for my $i (shuffle(@is_reader), shuffle(@maybe_readers), shuffle(@maybe_writers)) {
108 0 0         next if $i->is_unreachable;
109              
110 0           eval { $self->_update_connectivity($i) };
  0            
111 0 0         if (my $e = $@) {
    0          
112 0           $self->log(ERROR_TYPE_CONNECT_READER, "Aurora Reader connection error", $e);
113 0 0 0       if (ref $e && $e->isa('DBIx::Aurora::Instance::Exception::Connectivity')) {
114 0           next; # connectivity issue, try next reader
115             } else {
116 0           Carp::croak($e);
117             }
118             } elsif ($i->is_reader) {
119 0           $instance = $i;
120 0           last;
121             } else {
122 0           next; # it's writer
123             }
124             }
125              
126 0 0         if (not $instance) {
127 0 0         if ($self->{force_reader_only}) {
128 0           Carp::croak "No reader found";
129             } else {
130 0           return $self->writer($callback); # fallback to writer
131             }
132             }
133              
134 0           my @ret = eval {
135 0 0         $wantarray
136             ? $instance->handler->txn($callback)
137             : scalar $instance->handler->txn($callback);
138             };
139 0 0         if (my $e = $@) {
140 0           Carp::croak($e);
141             }
142              
143 0 0         return $wantarray ? @ret : $ret[0];
144             }
145              
146             sub disconnect_all {
147 0     0 0   my $self = shift;
148 0           for my $instance ($self->aurora_instances) {
149 0 0         $instance && $instance->disconnect;
150             }
151             }
152              
153             sub disconnect_writer {
154 0     0 0   my $self = shift;
155 0           for my $instance ($self->aurora_instances) {
156 0 0 0       $instance && $instance->maybe_writer && $instance->disconnect;
157             }
158             }
159              
160             sub disconnect_reader {
161 0     0 0   my $self = shift;
162 0           for my $instance ($self->aurora_instances) {
163 0 0 0       $instance && $instance->maybe_reader && $instance->disconnect;
164             }
165             }
166              
167 0     0     sub DESTROY { $_[0]->disconnect_all }
168              
169             1;
170             __END__