File Coverage

blib/lib/DBD/Gofer/Transport/pipeone.pm
Criterion Covered Total %
statement 98 115 85.2
branch 22 46 47.8
condition 4 15 26.6
subroutine 17 21 80.9
pod 0 7 0.0
total 141 204 69.1


line stmt bran cond sub pod time code
1             package DBD::Gofer::Transport::pipeone;
2              
3             # $Id: pipeone.pm 10087 2007-10-16 12:42:37Z Tim $
4             #
5             # Copyright (c) 2007, Tim Bunce, Ireland
6             #
7             # You may distribute under the terms of either the GNU General Public
8             # License or the Artistic License, as specified in the Perl README file.
9              
10 8     8   36 use strict;
  8         12  
  8         319  
11 8     8   33 use warnings;
  8         9  
  8         252  
12              
13 8     8   31 use Carp;
  8         12  
  8         514  
14 8     8   33 use Fcntl;
  8         12  
  8         1629  
15 8     8   5107 use IO::Select;
  8         10382  
  8         362  
16 8     8   4019 use IPC::Open3 qw(open3);
  8         18411  
  8         487  
17 8     8   44 use Symbol qw(gensym);
  8         11  
  8         310  
18              
19 8     8   32 use base qw(DBD::Gofer::Transport::Base);
  8         16  
  8         12425  
20              
21             our $VERSION = "0.010088";
22              
23             __PACKAGE__->mk_accessors(qw(
24             connection_info
25             go_perl
26             ));
27              
28              
29             sub new {
30 336     336 0 844 my ($self, $args) = @_;
31 336   33     1329 $args->{go_perl} ||= do {
32 0 0       0 ($INC{"blib.pm"}) ? [ $^X, '-Mblib' ] : [ $^X ];
33             };
34 336 50       1183 if (not ref $args->{go_perl}) {
35             # user can override the perl to be used, either with an array ref
36             # containing the command name and args to use, or with a string
37             # (ie via the DSN) in which case, to enable args to be passed,
38             # we split on two or more consecutive spaces (otherwise the path
39             # to perl couldn't contain a space itself).
40 336         3112 $args->{go_perl} = [ split /\s{2,}/, $args->{go_perl} ];
41             }
42 336         1951 return $self->SUPER::new($args);
43             }
44              
45              
46             # nonblock($fh) puts filehandle into nonblocking mode
47             sub nonblock {
48 776     776 0 1352 my $fh = shift;
49 776 50       3677 my $flags = fcntl($fh, F_GETFL, 0)
50             or croak "Can't get flags for filehandle $fh: $!";
51 776 50       3731 fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
52             or croak "Can't make filehandle $fh nonblocking: $!";
53             }
54              
55              
56             sub start_pipe_command {
57 388     388 0 812 my ($self, $cmd) = @_;
58 388 50       1561 $cmd = [ $cmd ] unless ref $cmd eq 'ARRAY';
59              
60             # if it's important that the subprocess uses the same
61             # (versions of) modules as us then the caller should
62             # set PERL5LIB itself.
63              
64             # limit various forms of insanity, for now
65 388         2072 local $ENV{DBI_TRACE}; # use DBI_GOFER_TRACE instead
66 388         1845 local $ENV{DBI_AUTOPROXY};
67 388         1267 local $ENV{DBI_PROFILE};
68              
69 388         2726 my ($wfh, $rfh, $efh) = (gensym, gensym, gensym);
70 388 50       15167 my $pid = open3($wfh, $rfh, $efh, @$cmd)
71             or die "error starting @$cmd: $!\n";
72 388 50       1249848 if ($self->trace) {
73 0         0 $self->trace_msg(sprintf("Started pid $pid: @$cmd {fd: w%d r%d e%d, ppid=$$}\n", fileno $wfh, fileno $rfh, fileno $efh),0);
74             }
75 388         2275 nonblock($rfh);
76 388         1276 nonblock($efh);
77 388         6009 my $ios = IO::Select->new($rfh, $efh);
78              
79             return {
80 388         44905 cmd=>$cmd,
81             pid=>$pid,
82             wfh=>$wfh, rfh=>$rfh, efh=>$efh,
83             ios=>$ios,
84             };
85             }
86              
87              
88             sub cmd_as_string {
89 0     0 0 0 my $self = shift;
90             # XXX meant to return a properly shell-escaped string suitable for system
91             # but its only for debugging so that can wait
92 0         0 my $connection_info = $self->connection_info;
93 0 0       0 return join " ", map { (m/^[-:\w]*$/) ? $_ : "'$_'" } @{$connection_info->{cmd}};
  0         0  
  0         0  
94             }
95              
96              
97             sub transmit_request_by_transport {
98 380     380 0 894 my ($self, $request) = @_;
99              
100 380         1907 my $frozen_request = $self->freeze_request($request);
101              
102 380         768 my $cmd = [ @{$self->go_perl}, qw(-MDBI::Gofer::Transport::pipeone -e run_one_stdio)];
  380         1429  
103 380         1614 my $info = $self->start_pipe_command($cmd);
104              
105 380         1391 my $wfh = delete $info->{wfh};
106             # send frozen request
107 380         2509 local $\;
108 380 50       5853 print $wfh $frozen_request
109             or warn "error writing to @$cmd: $!\n";
110             # indicate that there's no more
111 380 50       3013 close $wfh
112             or die "error closing pipe to @$cmd: $!\n";
113              
114 380         2473 $self->connection_info( $info );
115 380         16260 return;
116             }
117              
118              
119             sub read_response_from_fh {
120 760     760 0 1511 my ($self, $fh_actions) = @_;
121 760         2468 my $trace = $self->trace;
122              
123 760   50     2307 my $info = $self->connection_info || die;
124 760         1466 my ($ios) = @{$info}{qw(ios)};
  760         1815  
125 760         1380 my $errors = 0;
126 760         1152 my $complete;
127              
128 760 50       3838 die "No handles to read response from" unless $ios->count;
129              
130 760         5867 while ($ios->count) {
131 1196         9682 my @readable = $ios->can_read();
132 1196         57224040 for my $fh (@readable) {
133 1520         2829 local $_;
134 1520   50     8787 my $actions = $fh_actions->{$fh} || die "panic: no action for $fh";
135 1520         17623 my $rv = sysread($fh, $_='', 1024*31); # to fit in 32KB slab
136 1520 100       4353 unless ($rv) { # error (undef) or end of file (0)
137 760         1086 my $action;
138 760 50       2126 unless (defined $rv) { # was an error
139 0 0       0 $self->trace_msg("error on handle $fh: $!\n") if $trace >= 4;
140 0   0     0 $action = $actions->{error} || $actions->{eof};
141 0         0 ++$errors;
142             # XXX an error may be a permenent condition of the handle
143             # if so we'll loop here - not good
144             }
145             else {
146 760         1956 $action = $actions->{eof};
147 760 50       2261 $self->trace_msg("eof on handle $fh\n") if $trace >= 4;
148             }
149 760 50       2760 if ($action->($fh)) {
150 760 50       1911 $self->trace_msg("removing $fh from handle set\n") if $trace >= 4;
151 760         3568 $ios->remove($fh);
152             }
153 760         33911 next;
154             }
155             # action returns true if the response is now complete
156             # (we finish all handles
157 760 100       3790 $actions->{read}->($fh) && ++$complete;
158             }
159 1196 100       7050 last if $complete;
160             }
161 760         3434 return $errors;
162             }
163              
164              
165             sub receive_response_by_transport {
166 380     380 0 806 my $self = shift;
167              
168 380   50     1304 my $info = $self->connection_info || die;
169 380         963 my ($pid, $rfh, $efh, $ios, $cmd) = @{$info}{qw(pid rfh efh ios cmd)};
  380         1454  
170              
171 380         560 my $frozen_response;
172             my $stderr_msg;
173              
174             $self->read_response_from_fh( {
175             $efh => {
176 0     0   0 error => sub { warn "error reading response stderr: $!"; 1 },
  0         0  
177 380     380   540 eof => sub { warn "eof on stderr" if 0; 1 },
  380         1222  
178 0     0   0 read => sub { $stderr_msg .= $_; 0 },
  0         0  
179             },
180             $rfh => {
181 0     0   0 error => sub { warn "error reading response: $!"; 1 },
  0         0  
182 380     380   603 eof => sub { warn "eof on stdout" if 0; 1 },
  380         1431  
183 380     380   1373 read => sub { $frozen_response .= $_; 0 },
  380         2817  
184             },
185 380         11647 });
186              
187 380 50       25371 waitpid $info->{pid}, 0
188             or warn "waitpid: $!"; # XXX do something more useful?
189              
190 380 50       1357 die ref($self)." command (@$cmd) failed: $stderr_msg"
191             if not $frozen_response; # no output on stdout at all
192              
193             # XXX need to be able to detect and deal with corruption
194 380         4701 my $response = $self->thaw_response($frozen_response);
195              
196 380 50       1386 if ($stderr_msg) {
197             # add stderr messages as warnings (for PrintWarn)
198 0 0 0     0 $response->add_err(0, $stderr_msg, undef, $self->trace)
199             # but ignore warning from old version of blib
200             unless $stderr_msg =~ /^Using .*blib/ && "@$cmd" =~ /-Mblib/;
201             }
202              
203 380         2424 return $response;
204             }
205              
206              
207             1;
208              
209             __END__