File Coverage

blib/lib/DBD/Gofer/Transport/pipeone.pm
Criterion Covered Total %
statement 98 115 85.2
branch 22 46 47.8
condition 4 15 26.6
subroutine 17 21 80.9
pod 0 7 0.0
total 141 204 69.1


line stmt bran cond sub pod time code
1             package DBD::Gofer::Transport::pipeone;
2              
3             # $Id: pipeone.pm 10087 2007-10-16 12:42:37Z Tim $
4             #
5             # Copyright (c) 2007, Tim Bunce, Ireland
6             #
7             # You may distribute under the terms of either the GNU General Public
8             # License or the Artistic License, as specified in the Perl README file.
9              
10 8     8   54 use strict;
  8         17  
  8         264  
11 8     8   44 use warnings;
  8         17  
  8         259  
12              
13 8     8   43 use Carp;
  8         19  
  8         585  
14 8     8   51 use Fcntl;
  8         20  
  8         1834  
15 8     8   2415 use IO::Select;
  8         12122  
  8         370  
16 8     8   2623 use IPC::Open3 qw(open3);
  8         20101  
  8         508  
17 8     8   78 use Symbol qw(gensym);
  8         19  
  8         353  
18              
19 8     8   49 use base qw(DBD::Gofer::Transport::Base);
  8         21  
  8         10554  
20              
21             our $VERSION = "0.010088";
22              
23             __PACKAGE__->mk_accessors(qw(
24             connection_info
25             go_perl
26             ));
27              
28              
29             sub new {
30 336     336 0 1064 my ($self, $args) = @_;
31 336   33     1242 $args->{go_perl} ||= do {
32 0 0       0 ($INC{"blib.pm"}) ? [ $^X, '-Mblib' ] : [ $^X ];
33             };
34 336 50       1084 if (not ref $args->{go_perl}) {
35             # user can override the perl to be used, either with an array ref
36             # containing the command name and args to use, or with a string
37             # (ie via the DSN) in which case, to enable args to be passed,
38             # we split on two or more consecutive spaces (otherwise the path
39             # to perl couldn't contain a space itself).
40 336         2795 $args->{go_perl} = [ split /\s{2,}/, $args->{go_perl} ];
41             }
42 336         1971 return $self->SUPER::new($args);
43             }
44              
45              
46             # nonblock($fh) puts filehandle into nonblocking mode
47             sub nonblock {
48 776     776 0 1867 my $fh = shift;
49 776 50       4141 my $flags = fcntl($fh, F_GETFL, 0)
50             or croak "Can't get flags for filehandle $fh: $!";
51 776 50       4504 fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
52             or croak "Can't make filehandle $fh nonblocking: $!";
53             }
54              
55              
56             sub start_pipe_command {
57 388     388 0 1161 my ($self, $cmd) = @_;
58 388 50       1518 $cmd = [ $cmd ] unless ref $cmd eq 'ARRAY';
59              
60             # if it's important that the subprocess uses the same
61             # (versions of) modules as us then the caller should
62             # set PERL5LIB itself.
63              
64             # limit various forms of insanity, for now
65 388         2107 local $ENV{DBI_TRACE}; # use DBI_GOFER_TRACE instead
66 388         2072 local $ENV{DBI_AUTOPROXY};
67 388         1476 local $ENV{DBI_PROFILE};
68              
69 388         2967 my ($wfh, $rfh, $efh) = (gensym, gensym, gensym);
70 388 50       18963 my $pid = open3($wfh, $rfh, $efh, @$cmd)
71             or die "error starting @$cmd: $!\n";
72 388 50       1286917 if ($self->trace) {
73 0         0 $self->trace_msg(sprintf("Started pid $pid: @$cmd {fd: w%d r%d e%d, ppid=$$}\n", fileno $wfh, fileno $rfh, fileno $efh),0);
74             }
75 388         2332 nonblock($rfh);
76 388         1628 nonblock($efh);
77 388         6462 my $ios = IO::Select->new($rfh, $efh);
78              
79             return {
80 388         53749 cmd=>$cmd,
81             pid=>$pid,
82             wfh=>$wfh, rfh=>$rfh, efh=>$efh,
83             ios=>$ios,
84             };
85             }
86              
87              
88             sub cmd_as_string {
89 0     0 0 0 my $self = shift;
90             # XXX meant to return a properly shell-escaped string suitable for system
91             # but its only for debugging so that can wait
92 0         0 my $connection_info = $self->connection_info;
93 0 0       0 return join " ", map { (m/^[-:\w]*$/) ? $_ : "'$_'" } @{$connection_info->{cmd}};
  0         0  
  0         0  
94             }
95              
96              
97             sub transmit_request_by_transport {
98 380     380 0 1237 my ($self, $request) = @_;
99              
100 380         2090 my $frozen_request = $self->freeze_request($request);
101              
102 380         968 my $cmd = [ @{$self->go_perl}, qw(-MDBI::Gofer::Transport::pipeone -e run_one_stdio)];
  380         1649  
103 380         1712 my $info = $self->start_pipe_command($cmd);
104              
105 380         1503 my $wfh = delete $info->{wfh};
106             # send frozen request
107 380         2437 local $\;
108 380 50       6153 print $wfh $frozen_request
109             or warn "error writing to @$cmd: $!\n";
110             # indicate that there's no more
111 380 50       3112 close $wfh
112             or die "error closing pipe to @$cmd: $!\n";
113              
114 380         3126 $self->connection_info( $info );
115 380         10895 return;
116             }
117              
118              
119             sub read_response_from_fh {
120 760     760 0 2111 my ($self, $fh_actions) = @_;
121 760         2349 my $trace = $self->trace;
122              
123 760   50     2344 my $info = $self->connection_info || die;
124 760         1674 my ($ios) = @{$info}{qw(ios)};
  760         1872  
125 760         1550 my $errors = 0;
126 760         1319 my $complete;
127              
128 760 50       3663 die "No handles to read response from" unless $ios->count;
129              
130 760         5667 while ($ios->count) {
131 1140         10113 my @readable = $ios->can_read();
132 1140         50099634 for my $fh (@readable) {
133 1520         3690 local $_;
134 1520   50     8996 my $actions = $fh_actions->{$fh} || die "panic: no action for $fh";
135 1520         20000 my $rv = sysread($fh, $_='', 1024*31); # to fit in 32KB slab
136 1520 100       4923 unless ($rv) { # error (undef) or end of file (0)
137 760         1350 my $action;
138 760 50       2167 unless (defined $rv) { # was an error
139 0 0       0 $self->trace_msg("error on handle $fh: $!\n") if $trace >= 4;
140 0   0     0 $action = $actions->{error} || $actions->{eof};
141 0         0 ++$errors;
142             # XXX an error may be a permenent condition of the handle
143             # if so we'll loop here - not good
144             }
145             else {
146 760         2203 $action = $actions->{eof};
147 760 50       2232 $self->trace_msg("eof on handle $fh\n") if $trace >= 4;
148             }
149 760 50       2807 if ($action->($fh)) {
150 760 50       2029 $self->trace_msg("removing $fh from handle set\n") if $trace >= 4;
151 760         4182 $ios->remove($fh);
152             }
153 760         43107 next;
154             }
155             # action returns true if the response is now complete
156             # (we finish all handles
157 760 100       3893 $actions->{read}->($fh) && ++$complete;
158             }
159 1140 100       6822 last if $complete;
160             }
161 760         3876 return $errors;
162             }
163              
164              
165             sub receive_response_by_transport {
166 380     380 0 1120 my $self = shift;
167              
168 380   50     1450 my $info = $self->connection_info || die;
169 380         1130 my ($pid, $rfh, $efh, $ios, $cmd) = @{$info}{qw(pid rfh efh ios cmd)};
  380         1676  
170              
171 380         1057 my $frozen_response;
172             my $stderr_msg;
173              
174             $self->read_response_from_fh( {
175             $efh => {
176 0     0   0 error => sub { warn "error reading response stderr: $!"; 1 },
  0         0  
177 380     380   777 eof => sub { warn "eof on stderr" if 0; 1 },
  380         1106  
178 0     0   0 read => sub { $stderr_msg .= $_; 0 },
  0         0  
179             },
180             $rfh => {
181 0     0   0 error => sub { warn "error reading response: $!"; 1 },
  0         0  
182 380     380   785 eof => sub { warn "eof on stdout" if 0; 1 },
  380         1379  
183 380     380   2209 read => sub { $frozen_response .= $_; 0 },
  380         2790  
184             },
185 380         11753 });
186              
187 380 50       19720 waitpid $info->{pid}, 0
188             or warn "waitpid: $!"; # XXX do something more useful?
189              
190 380 50       1775 die ref($self)." command (@$cmd) failed: $stderr_msg"
191             if not $frozen_response; # no output on stdout at all
192              
193             # XXX need to be able to detect and deal with corruption
194 380         3969 my $response = $self->thaw_response($frozen_response);
195              
196 380 50       1377 if ($stderr_msg) {
197             # add stderr messages as warnings (for PrintWarn)
198 0 0 0     0 $response->add_err(0, $stderr_msg, undef, $self->trace)
199             # but ignore warning from old version of blib
200             unless $stderr_msg =~ /^Using .*blib/ && "@$cmd" =~ /-Mblib/;
201             }
202              
203 380         2310 return $response;
204             }
205              
206              
207             1;
208              
209             __END__