File Coverage

blib/lib/DBD/Gofer/Transport/stream.pm
Criterion Covered Total %
statement 69 110 62.7
branch 20 56 35.7
condition 8 25 32.0
subroutine 11 18 61.1
pod 0 3 0.0
total 108 212 50.9


line stmt bran cond sub pod time code
1             package DBD::Gofer::Transport::stream;
2              
3             # $Id: stream.pm 14598 2010-12-21 22:53:25Z Tim $
4             #
5             # Copyright (c) 2007, Tim Bunce, Ireland
6             #
7             # You may distribute under the terms of either the GNU General Public
8             # License or the Artistic License, as specified in the Perl README file.
9              
10 8     8   93 use strict;
  8         38  
  8         317  
11 8     8   66 use warnings;
  8         29  
  8         370  
12              
13 8     8   59 use Carp;
  8         28  
  8         815  
14              
15 8     8   68 use base qw(DBD::Gofer::Transport::pipeone);
  8         30  
  8         12031  
16              
17             our $VERSION = "0.014599";
18              
19             __PACKAGE__->mk_accessors(qw(
20             go_persist
21             ));
22              
23             my $persist_all = 5;
24             my %persist;
25              
26              
27             sub _connection_key {
28 148     148   327 my ($self) = @_;
29 148 50 50     495 return join "~", $self->go_url||"", @{ $self->go_perl || [] };
  148         417  
30             }
31              
32              
33             sub _connection_get {
34 148     148   339 my ($self) = @_;
35              
36 148         471 my $persist = $self->go_persist; # = 0 can force non-caching
37 148 50       452 $persist = $persist_all if not defined $persist;
38 148 50       619 my $key = ($persist) ? $self->_connection_key : '';
39 148 100 66     847 if ($persist{$key} && $self->_connection_check($persist{$key})) {
40 140 50       390 $self->trace_msg("reusing persistent connection $key\n",0) if $self->trace >= 1;
41 140         424 return $persist{$key};
42             }
43              
44 8         39 my $connection = $self->_make_connection;
45              
46 8 50       78 if ($key) {
47 8 50       71 %persist = () if keys %persist > $persist_all; # XXX quick hack to limit subprocesses
48 8         52 $persist{$key} = $connection;
49             }
50              
51 8         57 return $connection;
52             }
53              
54              
55             sub _connection_check {
56 140     140   361 my ($self, $connection) = @_;
57 140   33     363 $connection ||= $self->connection_info;
58 140         298 my $pid = $connection->{pid};
59 140         1293 my $ok = (kill 0, $pid);
60 140 50       401 $self->trace_msg("_connection_check: $ok (pid $$)\n",0) if $self->trace;
61 140         520 return $ok;
62             }
63              
64              
65             sub _connection_kill {
66 0     0   0 my ($self) = @_;
67 0         0 my $connection = $self->connection_info;
68 0         0 my ($pid, $wfh, $rfh, $efh) = @{$connection}{qw(pid wfh rfh efh)};
  0         0  
69 0 0       0 $self->trace_msg("_connection_kill: closing write handle\n",0) if $self->trace;
70             # closing the write file handle should be enough, generally
71 0         0 close $wfh;
72             # in future we may want to be more aggressive
73             #close $rfh; close $efh; kill 15, $pid
74             # but deleting from the persist cache...
75 0         0 delete $persist{ $self->_connection_key };
76             # ... and removing the connection_info should suffice
77 0         0 $self->connection_info( undef );
78 0         0 return;
79             }
80              
81              
82             sub _make_connection {
83 8     8   22 my ($self) = @_;
84              
85 8         29 my $go_perl = $self->go_perl;
86 8         36 my $cmd = [ @$go_perl, qw(-MDBI::Gofer::Transport::stream -e run_stdio_hex)];
87              
88             #push @$cmd, "DBI_TRACE=2=/tmp/goferstream.log", "sh", "-c";
89 8 50       31 if (my $url = $self->go_url) {
90 0 0       0 die "Only 'ssh:user\@host' style url supported by this transport"
91             unless $url =~ s/^ssh://;
92 0         0 my $ssh = $url;
93 0         0 my $setup_env = join "||", map { "source $_ 2>/dev/null" } qw(.bash_profile .bash_login .profile);
  0         0  
94 0         0 my $setup = $setup_env.q{; exec "$@"};
95             # don't use $^X on remote system by default as it's possibly wrong
96 0 0       0 $cmd->[0] = 'perl' if "@$go_perl" eq $^X;
97             # -x not only 'Disables X11 forwarding' but also makes connections *much* faster
98 0         0 unshift @$cmd, qw(ssh -xq), split(' ', $ssh), qw(bash -c), $setup;
99             }
100              
101 8 50       41 $self->trace_msg("new connection: @$cmd\n",0) if $self->trace;
102              
103             # XXX add a handshake - some message from DBI::Gofer::Transport::stream that's
104             # sent as soon as it starts that we can wait for to report success - and soak up
105             # and report useful warnings etc from ssh before we get it? Increases latency though.
106 8         68 my $connection = $self->start_pipe_command($cmd);
107 8         103 return $connection;
108             }
109              
110              
111             sub transmit_request_by_transport {
112 380     380 0 1009 my ($self, $request) = @_;
113 380         1041 my $trace = $self->trace;
114              
115 380   66     1183 my $connection = $self->connection_info || do {
116             my $con = $self->_connection_get;
117             $self->connection_info( $con );
118             $con;
119             };
120              
121 380         1570 my $encoded_request = unpack("H*", $self->freeze_request($request));
122 380         1194 $encoded_request .= "\015\012";
123              
124 380         889 my $wfh = $connection->{wfh};
125 380 50       1014 $self->trace_msg(sprintf("transmit_request_by_transport: to fh %s fd%d\n", $wfh, fileno($wfh)),0)
126             if $trace >= 4;
127              
128             # send frozen request
129 380         1488 local $\;
130             $wfh->print($encoded_request) # autoflush enabled
131 380 50       1711 or do {
132 0         0 my $err = $!;
133             # XXX could/should make new connection and retry
134 0         0 $self->_connection_kill;
135 0         0 die "Error sending request: $err";
136             };
137 380 50       13910 $self->trace_msg("Request sent: $encoded_request\n",0) if $trace >= 4;
138              
139 380         4766 return undef; # indicate no response yet (so caller calls receive_response_by_transport)
140             }
141              
142              
143             sub receive_response_by_transport {
144 380     380 0 759 my $self = shift;
145 380         1015 my $trace = $self->trace;
146              
147 380 50       1059 $self->trace_msg("receive_response_by_transport: awaiting response\n",0) if $trace >= 4;
148 380   50     1042 my $connection = $self->connection_info || die;
149 380         829 my ($pid, $rfh, $efh, $cmd) = @{$connection}{qw(pid rfh efh cmd)};
  380         1266  
150              
151 380         744 my $errno = 0;
152 380         753 my $encoded_response;
153             my $stderr_msg;
154              
155             $self->read_response_from_fh( {
156             $efh => {
157 0   0 0   0 error => sub { warn "error reading response stderr: $!"; $errno||=$!; 1 },
  0         0  
  0         0  
158 0 0   0   0 eof => sub { warn "eof reading efh" if $trace >= 4; 1 },
  0         0  
159 0     0   0 read => sub { $stderr_msg .= $_; 0 },
  0         0  
160             },
161             $rfh => {
162 0   0 0   0 error => sub { warn "error reading response: $!"; $errno||=$!; 1 },
  0         0  
  0         0  
163 0 0   0   0 eof => sub { warn "eof reading rfh" if $trace >= 4; 1 },
  0         0  
164 380 50   380   1279 read => sub { $encoded_response .= $_; ($encoded_response=~s/\015\012$//) ? 1 : 0 },
  380         4967  
165             },
166 380         6542 });
167              
168             # if we got no output on stdout at all then the command has
169             # probably exited, possibly with an error to stderr.
170             # Turn this situation into a reasonably useful DBI error.
171 380 50       4789 if (not $encoded_response) {
172 0         0 my @msg;
173 0 0       0 push @msg, "error while reading response: $errno" if $errno;
174 0 0       0 if ($stderr_msg) {
175 0         0 chomp $stderr_msg;
176 0 0       0 push @msg, sprintf "error reported by \"%s\" (pid %d%s): %s",
177             $self->cmd_as_string,
178             $pid, ((kill 0, $pid) ? "" : ", exited"),
179             $stderr_msg;
180             }
181 0         0 die join(", ", "No response received", @msg)."\n";
182             }
183              
184 380 50       1146 $self->trace_msg("Response received: $encoded_response\n",0)
185             if $trace >= 4;
186              
187 380 50 33     1050 $self->trace_msg("Gofer stream stderr message: $stderr_msg\n",0)
188             if $stderr_msg && $trace;
189              
190 380         6324 my $frozen_response = pack("H*", $encoded_response);
191              
192             # XXX need to be able to detect and deal with corruption
193 380         1747 my $response = $self->thaw_response($frozen_response);
194              
195 380 50       1016 if ($stderr_msg) {
196             # add stderr messages as warnings (for PrintWarn)
197 0 0 0     0 $response->add_err(0, $stderr_msg, undef, $trace)
198             # but ignore warning from old version of blib
199             unless $stderr_msg =~ /^Using .*blib/ && "@$cmd" =~ /-Mblib/;
200             }
201              
202 380         1428 return $response;
203             }
204              
205             sub transport_timedout {
206 0     0 0   my $self = shift;
207 0           $self->_connection_kill;
208 0           return $self->SUPER::transport_timedout(@_);
209             }
210              
211             1;
212              
213             __END__