File Coverage

blib/lib/DBD/Gofer/Transport/Base.pm
Criterion Covered Total %
statement 120 136 88.2
branch 59 92 64.1
condition 10 24 41.6
subroutine 14 16 87.5
pod 1 8 12.5
total 204 276 73.9


line stmt bran cond sub pod time code
1             package DBD::Gofer::Transport::Base;
2              
3             # $Id: Base.pm 14120 2010-06-07 19:52:19Z H.Merijn $
4             #
5             # Copyright (c) 2007, Tim Bunce, Ireland
6             #
7             # You may distribute under the terms of either the GNU General Public
8             # License or the Artistic License, as specified in the Perl README file.
9              
10 56     56   410 use strict;
  56         99  
  56         1367  
11 56     56   256 use warnings;
  56         122  
  56         1444  
12              
13 56     56   260 use base qw(DBI::Gofer::Transport::Base);
  56         93  
  56         15858  
14              
15             our $VERSION = "0.014121";
16              
17             __PACKAGE__->mk_accessors(qw(
18             trace
19             go_dsn
20             go_url
21             go_policy
22             go_timeout
23             go_retry_hook
24             go_retry_limit
25             go_cache
26             cache_hit
27             cache_miss
28             cache_store
29             ));
30             __PACKAGE__->mk_accessors_using(make_accessor_autoviv_hashref => qw(
31             meta
32             ));
33              
34              
35             sub new {
36 715     715 0 2867 my ($class, $args) = @_;
37 715         3703 $args->{$_} = 0 for (qw(cache_hit cache_miss cache_store));
38 715 100 50     2147 $args->{keep_meta_frozen} ||= 1 if $args->{go_cache};
39             #warn "args @{[ %$args ]}\n";
40 715         3340 return $class->SUPER::new($args);
41             }
42              
43              
44 715 50   715   4521 sub _init_trace { $ENV{DBD_GOFER_TRACE} || 0 }
45              
46              
47             sub new_response {
48 0     0 0 0 my $self = shift;
49 0         0 return DBI::Gofer::Response->new(@_);
50             }
51              
52              
53             sub transmit_request {
54 7138     7138 0 13531 my ($self, $request) = @_;
55 7138         15757 my $trace = $self->trace;
56 7138         10292 my $response;
57              
58 7138         10380 my ($go_cache, $request_cache_key);
59 7138 100       15183 if ($go_cache = $self->{go_cache}) {
60             $request_cache_key
61             = $request->{meta}{request_cache_key}
62 18         47 = $self->get_cache_key_for_request($request);
63 18 50       37 if ($request_cache_key) {
64 18         27 my $frozen_response = eval { $go_cache->get($request_cache_key) };
  18         65  
65 18 100       37 if ($frozen_response) {
66 4 50       11 $self->_dump("cached response found for ".ref($request), $request)
67             if $trace;
68 4         10 $response = $self->thaw_response($frozen_response);
69 4 50       14 $self->trace_msg("transmit_request is returning a response from cache $go_cache\n")
70             if $trace;
71 4         8 ++$self->{cache_hit};
72 4         15 return $response;
73             }
74 14 50       29 warn $@ if $@;
75 14         23 ++$self->{cache_miss};
76 14 50       36 $self->trace_msg("transmit_request cache miss\n")
77             if $trace;
78             }
79             }
80              
81 7134         14491 my $to = $self->go_timeout;
82             my $transmit_sub = sub {
83 7214 50   7214   13661 $self->trace_msg("transmit_request\n") if $trace;
84 7214 100       20884 local $SIG{ALRM} = sub { die "TIMEOUT\n" } if $to;
  0         0  
85              
86 7214         11909 my $response = eval {
87             local $SIG{PIPE} = sub {
88 0 0       0 my $extra = ($! eq "Broken pipe") ? "" : " ($!)";
89 0         0 die "Unable to send request: Broken pipe$extra\n";
90 7214         66708 };
91 7214 100       23844 alarm($to) if $to;
92 7214         19672 $self->transmit_request_by_transport($request);
93             };
94 7214 100       21127 alarm(0) if $to;
95              
96 7214 50       14471 if ($@) {
97 0 0       0 return $self->transport_timedout("transmit_request", $to)
98             if $@ eq "TIMEOUT\n";
99 0         0 return $self->new_response({ err => 1, errstr => $@ });
100             }
101              
102 7214         28223 return $response;
103 7134         34499 };
104              
105 7134         16617 $response = $self->_transmit_request_with_retries($request, $transmit_sub);
106              
107 7134 50       14403 if ($response) {
108 0         0 my $frozen_response = delete $response->{meta}{frozen};
109 0 0       0 $self->_store_response_in_cache($frozen_response, $request_cache_key)
110             if $request_cache_key;
111             }
112              
113 7134 50 33     17258 $self->trace_msg("transmit_request is returning a response itself\n")
114             if $trace && $response;
115              
116 7134 50       14548 return $response unless wantarray;
117 7134         24103 return ($response, $transmit_sub);
118             }
119              
120              
121             sub _transmit_request_with_retries {
122 7214     7214   12282 my ($self, $request, $transmit_sub) = @_;
123 7214         9160 my $response;
124 7214   33     9493 do {
125 7214         12006 $response = $transmit_sub->();
126             } while ( $response && $self->response_needs_retransmit($request, $response) );
127 7214         14507 return $response;
128             }
129              
130              
131             sub receive_response {
132 7134     7134 0 13740 my ($self, $request, $retransmit_sub) = @_;
133 7134         17518 my $to = $self->go_timeout;
134              
135             my $receive_sub = sub {
136 7217     7217   22723 $self->trace_msg("receive_response\n");
137 7217 100       22910 local $SIG{ALRM} = sub { die "TIMEOUT\n" } if $to;
  0         0  
138              
139 7217         11016 my $response = eval {
140 7217 100       14903 alarm($to) if $to;
141 7217         18742 $self->receive_response_by_transport($request);
142             };
143 7217 100       17211 alarm(0) if $to;
144              
145 7217 50       13487 if ($@) {
146 0 0       0 return $self->transport_timedout("receive_response", $to)
147             if $@ eq "TIMEOUT\n";
148 0         0 return $self->new_response({ err => 1, errstr => $@ });
149             }
150 7217         19825 return $response;
151 7134         36115 };
152              
153 7134         11431 my $response;
154 7134         9822 do {
155 7137         12470 $response = $receive_sub->();
156 7137 100       17839 if ($self->response_needs_retransmit($request, $response)) {
157 80         152 $response = $self->_transmit_request_with_retries($request, $retransmit_sub);
158 80   33     231 $response ||= $receive_sub->();
159             }
160             } while ( $self->response_needs_retransmit($request, $response) );
161              
162 7134 50       15004 if ($response) {
163 7134         13813 my $frozen_response = delete $response->{meta}{frozen};
164 7134         13888 my $request_cache_key = $request->{meta}{request_cache_key};
165             $self->_store_response_in_cache($frozen_response, $request_cache_key)
166 7134 50 66     14647 if $request_cache_key && $self->{go_cache};
167             }
168              
169 7134         52120 return $response;
170             }
171              
172              
173             sub response_retry_preference {
174 3121     3121 1 4383 my ($self, $request, $response) = @_;
175              
176             # give the user a chance to express a preference (or undef for default)
177 3121 100       5166 if (my $go_retry_hook = $self->go_retry_hook) {
178 204         447 my $retry = $go_retry_hook->($request, $response, $self);
179 204 50       1187 $self->trace_msg(sprintf "go_retry_hook returned %s\n",
180             (defined $retry) ? $retry : 'undef');
181 204 50       649 return $retry if defined $retry;
182             }
183              
184             # This is the main decision point. We don't retry requests that got
185             # as far as executing because the error is probably from the database
186             # (not transport) so retrying is unlikely to help. But note that any
187             # severe transport error occurring after execute is likely to return
188             # a new response object that doesn't have the execute flag set. Beware!
189 2917 100       6806 return 0 if $response->executed_flag_set;
190              
191 2857 100 50     5122 return 1 if ($response->errstr || '') =~ m/induced by DBI_GOFER_RANDOM/;
192              
193 196 100       655 return 1 if $request->is_idempotent; # i.e. is SELECT or ReadOnly was set
194              
195 112         254 return undef; # we couldn't make up our mind
196             }
197              
198              
199             sub response_needs_retransmit {
200 14274     14274 0 24891 my ($self, $request, $response) = @_;
201              
202 14274 100       29924 my $err = $response->err
203             or return 0; # nothing went wrong
204              
205 3121         6489 my $retry = $self->response_retry_preference($request, $response);
206              
207 3121 100       5541 if (!$retry) { # false or undef
208 286         826 $self->trace_msg("response_needs_retransmit: response not suitable for retry\n");
209 286         949 return 0;
210             }
211              
212             # we'd like to retry but have we retried too much already?
213              
214 2835         4790 my $retry_limit = $self->go_retry_limit;
215 2835 100       4545 if (!$retry_limit) {
216 2720         5538 $self->trace_msg("response_needs_retransmit: retries disabled (retry_limit not set)\n");
217 2720         7217 return 0;
218             }
219              
220 115         258 my $request_meta = $request->meta;
221 115   100     334 my $retry_count = $request_meta->{retry_count} || 0;
222 115 100       247 if ($retry_count >= $retry_limit) {
223 32         102 $self->trace_msg("response_needs_retransmit: $retry_count is too many retries\n");
224             # XXX should be possible to disable altering the err
225 32         88 $response->errstr(sprintf "%s (after %d retries by gofer)", $response->errstr, $retry_count);
226 32         92 return 0;
227             }
228              
229             # will retry now, do the admin
230 83         118 ++$retry_count;
231 83         283 $self->trace_msg("response_needs_retransmit: retry $retry_count\n");
232              
233             # hook so response_retry_preference can defer some code execution
234             # until we've checked retry_count and retry_limit.
235 83 50       249 if (ref $retry eq 'CODE') {
236 0 0       0 $retry->($retry_count, $retry_limit)
237             and warn "should return false"; # protect future use
238             }
239              
240 83         158 ++$request_meta->{retry_count}; # update count for this request object
241 83         192 ++$self->meta->{request_retry_count}; # update cumulative transport stats
242              
243 83         202 return 1;
244             }
245              
246              
247             sub transport_timedout {
248 0     0 0 0 my ($self, $method, $timeout) = @_;
249 0   0     0 $timeout ||= $self->go_timeout;
250 0         0 return $self->new_response({ err => 1, errstr => "DBD::Gofer $method timed-out after $timeout seconds" });
251             }
252              
253              
254             # return undef if we don't want to cache this request
255             # subclasses may use more specialized rules
256             sub get_cache_key_for_request {
257 18     18 0 35 my ($self, $request) = @_;
258              
259             # we only want to cache idempotent requests
260             # is_idempotent() is true if GOf_REQUEST_IDEMPOTENT or GOf_REQUEST_READONLY set
261 18 50       47 return undef if not $request->is_idempotent;
262              
263             # XXX would be nice to avoid the extra freeze here
264 18         66 my $key = $self->freeze_request($request, undef, 1);
265              
266             #use Digest::MD5; warn "get_cache_key_for_request: ".Digest::MD5::md5_base64($key)."\n";
267              
268 18         46 return $key;
269             }
270              
271              
272             sub _store_response_in_cache {
273 14     14   29 my ($self, $frozen_response, $request_cache_key) = @_;
274             my $go_cache = $self->{go_cache}
275 14 50       38 or return;
276              
277             # new() ensures that enabling go_cache also enables keep_meta_frozen
278 14 50       32 warn "No meta frozen in response" if !$frozen_response;
279 14 50       29 warn "No request_cache_key" if !$request_cache_key;
280              
281 14 50 33     49 if ($frozen_response && $request_cache_key) {
282 14         73 $self->trace_msg("receive_response added response to cache $go_cache\n");
283 14         28 eval { $go_cache->set($request_cache_key, $frozen_response) };
  14         55  
284 14 50       34 warn $@ if $@;
285 14         38 ++$self->{cache_store};
286             }
287             }
288              
289             1;
290              
291             __END__