File Coverage

blib/lib/DBD/Gofer/Transport/Base.pm
Criterion Covered Total %
statement 120 136 88.2
branch 59 92 64.1
condition 10 24 41.6
subroutine 14 16 87.5
pod 1 8 12.5
total 204 276 73.9


line stmt bran cond sub pod time code
1             package DBD::Gofer::Transport::Base;
2              
3             # $Id: Base.pm 14120 2010-06-07 19:52:19Z H.Merijn $
4             #
5             # Copyright (c) 2007, Tim Bunce, Ireland
6             #
7             # You may distribute under the terms of either the GNU General Public
8             # License or the Artistic License, as specified in the Perl README file.
9              
10 56     56   415 use strict;
  56         106  
  56         1460  
11 56     56   266 use warnings;
  56         1084  
  56         1503  
12              
13 56     56   1165 use base qw(DBI::Gofer::Transport::Base);
  56         107  
  56         23237  
14              
15             our $VERSION = "0.014121";
16              
17             __PACKAGE__->mk_accessors(qw(
18             trace
19             go_dsn
20             go_url
21             go_policy
22             go_timeout
23             go_retry_hook
24             go_retry_limit
25             go_cache
26             cache_hit
27             cache_miss
28             cache_store
29             ));
30             __PACKAGE__->mk_accessors_using(make_accessor_autoviv_hashref => qw(
31             meta
32             ));
33              
34              
35             sub new {
36 715     715 0 3100 my ($class, $args) = @_;
37 715         4200 $args->{$_} = 0 for (qw(cache_hit cache_miss cache_store));
38 715 100 50     3014 $args->{keep_meta_frozen} ||= 1 if $args->{go_cache};
39             #warn "args @{[ %$args ]}\n";
40 715         4113 return $class->SUPER::new($args);
41             }
42              
43              
44 715 50   715   6219 sub _init_trace { $ENV{DBD_GOFER_TRACE} || 0 }
45              
46              
47             sub new_response {
48 0     0 0 0 my $self = shift;
49 0         0 return DBI::Gofer::Response->new(@_);
50             }
51              
52              
53             sub transmit_request {
54 7138     7138 0 15664 my ($self, $request) = @_;
55 7138         17265 my $trace = $self->trace;
56 7138         11867 my $response;
57              
58 7138         12310 my ($go_cache, $request_cache_key);
59 7138 100       18144 if ($go_cache = $self->{go_cache}) {
60             $request_cache_key
61             = $request->{meta}{request_cache_key}
62 18         79 = $self->get_cache_key_for_request($request);
63 18 50       58 if ($request_cache_key) {
64 18         39 my $frozen_response = eval { $go_cache->get($request_cache_key) };
  18         97  
65 18 100       54 if ($frozen_response) {
66 4 50       16 $self->_dump("cached response found for ".ref($request), $request)
67             if $trace;
68 4         19 $response = $self->thaw_response($frozen_response);
69 4 50       14 $self->trace_msg("transmit_request is returning a response from cache $go_cache\n")
70             if $trace;
71 4         11 ++$self->{cache_hit};
72 4         18 return $response;
73             }
74 14 50       55 warn $@ if $@;
75 14         36 ++$self->{cache_miss};
76 14 50       41 $self->trace_msg("transmit_request cache miss\n")
77             if $trace;
78             }
79             }
80              
81 7134         17566 my $to = $self->go_timeout;
82             my $transmit_sub = sub {
83 7214 50   7214   14824 $self->trace_msg("transmit_request\n") if $trace;
84 7214 100       28483 local $SIG{ALRM} = sub { die "TIMEOUT\n" } if $to;
  0         0  
85              
86 7214         13716 my $response = eval {
87             local $SIG{PIPE} = sub {
88 0 0       0 my $extra = ($! eq "Broken pipe") ? "" : " ($!)";
89 0         0 die "Unable to send request: Broken pipe$extra\n";
90 7214         133594 };
91 7214 100       33661 alarm($to) if $to;
92 7214         28818 $self->transmit_request_by_transport($request);
93             };
94 7214 100       32362 alarm(0) if $to;
95              
96 7214 50       16879 if ($@) {
97 0 0       0 return $self->transport_timedout("transmit_request", $to)
98             if $@ eq "TIMEOUT\n";
99 0         0 return $self->new_response({ err => 1, errstr => $@ });
100             }
101              
102 7214         44603 return $response;
103 7134         37628 };
104              
105 7134         21446 $response = $self->_transmit_request_with_retries($request, $transmit_sub);
106              
107 7134 50       16238 if ($response) {
108 0         0 my $frozen_response = delete $response->{meta}{frozen};
109 0 0       0 $self->_store_response_in_cache($frozen_response, $request_cache_key)
110             if $request_cache_key;
111             }
112              
113 7134 50 33     21015 $self->trace_msg("transmit_request is returning a response itself\n")
114             if $trace && $response;
115              
116 7134 50       17781 return $response unless wantarray;
117 7134         31890 return ($response, $transmit_sub);
118             }
119              
120              
121             sub _transmit_request_with_retries {
122 7214     7214   15298 my ($self, $request, $transmit_sub) = @_;
123 7214         10357 my $response;
124 7214   33     10219 do {
125 7214         12616 $response = $transmit_sub->();
126             } while ( $response && $self->response_needs_retransmit($request, $response) );
127 7214         18371 return $response;
128             }
129              
130              
131             sub receive_response {
132 7134     7134 0 17017 my ($self, $request, $retransmit_sub) = @_;
133 7134         21919 my $to = $self->go_timeout;
134              
135             my $receive_sub = sub {
136 7217     7217   28870 $self->trace_msg("receive_response\n");
137 7217 100       33312 local $SIG{ALRM} = sub { die "TIMEOUT\n" } if $to;
  0         0  
138              
139 7217         14223 my $response = eval {
140 7217 100       19443 alarm($to) if $to;
141 7217         24776 $self->receive_response_by_transport($request);
142             };
143 7217 100       22893 alarm(0) if $to;
144              
145 7217 50       16920 if ($@) {
146 0 0       0 return $self->transport_timedout("receive_response", $to)
147             if $@ eq "TIMEOUT\n";
148 0         0 return $self->new_response({ err => 1, errstr => $@ });
149             }
150 7217         28182 return $response;
151 7134         42099 };
152              
153 7134         13072 my $response;
154 7134         11765 do {
155 7137         14168 $response = $receive_sub->();
156 7137 100       23587 if ($self->response_needs_retransmit($request, $response)) {
157 80         170 $response = $self->_transmit_request_with_retries($request, $retransmit_sub);
158 80   33     267 $response ||= $receive_sub->();
159             }
160             } while ( $self->response_needs_retransmit($request, $response) );
161              
162 7134 50       16922 if ($response) {
163 7134         16292 my $frozen_response = delete $response->{meta}{frozen};
164 7134         17749 my $request_cache_key = $request->{meta}{request_cache_key};
165             $self->_store_response_in_cache($frozen_response, $request_cache_key)
166 7134 50 66     16934 if $request_cache_key && $self->{go_cache};
167             }
168              
169 7134         59738 return $response;
170             }
171              
172              
173             sub response_retry_preference {
174 3121     3121 1 4863 my ($self, $request, $response) = @_;
175              
176             # give the user a chance to express a preference (or undef for default)
177 3121 100       5825 if (my $go_retry_hook = $self->go_retry_hook) {
178 204         474 my $retry = $go_retry_hook->($request, $response, $self);
179 204 50       1181 $self->trace_msg(sprintf "go_retry_hook returned %s\n",
180             (defined $retry) ? $retry : 'undef');
181 204 50       606 return $retry if defined $retry;
182             }
183              
184             # This is the main decision point. We don't retry requests that got
185             # as far as executing because the error is probably from the database
186             # (not transport) so retrying is unlikely to help. But note that any
187             # severe transport error occurring after execute is likely to return
188             # a new response object that doesn't have the execute flag set. Beware!
189 2917 100       6505 return 0 if $response->executed_flag_set;
190              
191 2857 100 50     5334 return 1 if ($response->errstr || '') =~ m/induced by DBI_GOFER_RANDOM/;
192              
193 196 100       859 return 1 if $request->is_idempotent; # i.e. is SELECT or ReadOnly was set
194              
195 112         245 return undef; # we couldn't make up our mind
196             }
197              
198              
199             sub response_needs_retransmit {
200 14274     14274 0 27723 my ($self, $request, $response) = @_;
201              
202 14274 100       36770 my $err = $response->err
203             or return 0; # nothing went wrong
204              
205 3121         6690 my $retry = $self->response_retry_preference($request, $response);
206              
207 3121 100       6019 if (!$retry) { # false or undef
208 286         848 $self->trace_msg("response_needs_retransmit: response not suitable for retry\n");
209 286         977 return 0;
210             }
211              
212             # we'd like to retry but have we retried too much already?
213              
214 2835         5265 my $retry_limit = $self->go_retry_limit;
215 2835 100       4998 if (!$retry_limit) {
216 2720         6224 $self->trace_msg("response_needs_retransmit: retries disabled (retry_limit not set)\n");
217 2720         7381 return 0;
218             }
219              
220 115         275 my $request_meta = $request->meta;
221 115   100     315 my $retry_count = $request_meta->{retry_count} || 0;
222 115 100       250 if ($retry_count >= $retry_limit) {
223 32         91 $self->trace_msg("response_needs_retransmit: $retry_count is too many retries\n");
224             # XXX should be possible to disable altering the err
225 32         73 $response->errstr(sprintf "%s (after %d retries by gofer)", $response->errstr, $retry_count);
226 32         81 return 0;
227             }
228              
229             # will retry now, do the admin
230 83         102 ++$retry_count;
231 83         280 $self->trace_msg("response_needs_retransmit: retry $retry_count\n");
232              
233             # hook so response_retry_preference can defer some code execution
234             # until we've checked retry_count and retry_limit.
235 83 50       185 if (ref $retry eq 'CODE') {
236 0 0       0 $retry->($retry_count, $retry_limit)
237             and warn "should return false"; # protect future use
238             }
239              
240 83         147 ++$request_meta->{retry_count}; # update count for this request object
241 83         182 ++$self->meta->{request_retry_count}; # update cumulative transport stats
242              
243 83         192 return 1;
244             }
245              
246              
247             sub transport_timedout {
248 0     0 0 0 my ($self, $method, $timeout) = @_;
249 0   0     0 $timeout ||= $self->go_timeout;
250 0         0 return $self->new_response({ err => 1, errstr => "DBD::Gofer $method timed-out after $timeout seconds" });
251             }
252              
253              
254             # return undef if we don't want to cache this request
255             # subclasses may use more specialized rules
256             sub get_cache_key_for_request {
257 18     18 0 47 my ($self, $request) = @_;
258              
259             # we only want to cache idempotent requests
260             # is_idempotent() is true if GOf_REQUEST_IDEMPOTENT or GOf_REQUEST_READONLY set
261 18 50       84 return undef if not $request->is_idempotent;
262              
263             # XXX would be nice to avoid the extra freeze here
264 18         117 my $key = $self->freeze_request($request, undef, 1);
265              
266             #use Digest::MD5; warn "get_cache_key_for_request: ".Digest::MD5::md5_base64($key)."\n";
267              
268 18         65 return $key;
269             }
270              
271              
272             sub _store_response_in_cache {
273 14     14   51 my ($self, $frozen_response, $request_cache_key) = @_;
274             my $go_cache = $self->{go_cache}
275 14 50       369 or return;
276              
277             # new() ensures that enabling go_cache also enables keep_meta_frozen
278 14 50       42 warn "No meta frozen in response" if !$frozen_response;
279 14 50       44 warn "No request_cache_key" if !$request_cache_key;
280              
281 14 50 33     77 if ($frozen_response && $request_cache_key) {
282 14         112 $self->trace_msg("receive_response added response to cache $go_cache\n");
283 14         36 eval { $go_cache->set($request_cache_key, $frozen_response) };
  14         88  
284 14 50       47 warn $@ if $@;
285 14         49 ++$self->{cache_store};
286             }
287             }
288              
289             1;
290              
291             __END__