| line | stmt | bran | cond | sub | pod | time | code | 
| 1 |  |  |  |  |  |  | package DBD::Gofer::Transport::pipeone; | 
| 2 |  |  |  |  |  |  |  | 
| 3 |  |  |  |  |  |  | #   $Id: pipeone.pm 10087 2007-10-16 12:42:37Z Tim $ | 
| 4 |  |  |  |  |  |  | # | 
| 5 |  |  |  |  |  |  | #   Copyright (c) 2007, Tim Bunce, Ireland | 
| 6 |  |  |  |  |  |  | # | 
| 7 |  |  |  |  |  |  | #   You may distribute under the terms of either the GNU General Public | 
| 8 |  |  |  |  |  |  | #   License or the Artistic License, as specified in the Perl README file. | 
| 9 |  |  |  |  |  |  |  | 
| 10 | 8 |  |  | 8 |  | 54 | use strict; | 
|  | 8 |  |  |  |  | 17 |  | 
|  | 8 |  |  |  |  | 264 |  | 
| 11 | 8 |  |  | 8 |  | 44 | use warnings; | 
|  | 8 |  |  |  |  | 17 |  | 
|  | 8 |  |  |  |  | 259 |  | 
| 12 |  |  |  |  |  |  |  | 
| 13 | 8 |  |  | 8 |  | 43 | use Carp; | 
|  | 8 |  |  |  |  | 19 |  | 
|  | 8 |  |  |  |  | 585 |  | 
| 14 | 8 |  |  | 8 |  | 51 | use Fcntl; | 
|  | 8 |  |  |  |  | 20 |  | 
|  | 8 |  |  |  |  | 1834 |  | 
| 15 | 8 |  |  | 8 |  | 2415 | use IO::Select; | 
|  | 8 |  |  |  |  | 12122 |  | 
|  | 8 |  |  |  |  | 370 |  | 
| 16 | 8 |  |  | 8 |  | 2623 | use IPC::Open3 qw(open3); | 
|  | 8 |  |  |  |  | 20101 |  | 
|  | 8 |  |  |  |  | 508 |  | 
| 17 | 8 |  |  | 8 |  | 78 | use Symbol qw(gensym); | 
|  | 8 |  |  |  |  | 19 |  | 
|  | 8 |  |  |  |  | 353 |  | 
| 18 |  |  |  |  |  |  |  | 
| 19 | 8 |  |  | 8 |  | 49 | use base qw(DBD::Gofer::Transport::Base); | 
|  | 8 |  |  |  |  | 21 |  | 
|  | 8 |  |  |  |  | 10554 |  | 
| 20 |  |  |  |  |  |  |  | 
| 21 |  |  |  |  |  |  | our $VERSION = "0.010088"; | 
| 22 |  |  |  |  |  |  |  | 
| 23 |  |  |  |  |  |  | __PACKAGE__->mk_accessors(qw( | 
| 24 |  |  |  |  |  |  | connection_info | 
| 25 |  |  |  |  |  |  | go_perl | 
| 26 |  |  |  |  |  |  | )); | 
| 27 |  |  |  |  |  |  |  | 
| 28 |  |  |  |  |  |  |  | 
| 29 |  |  |  |  |  |  | sub new { | 
| 30 | 336 |  |  | 336 | 0 | 1064 | my ($self, $args) = @_; | 
| 31 | 336 |  | 33 |  |  | 1242 | $args->{go_perl} ||= do { | 
| 32 | 0 | 0 |  |  |  | 0 | ($INC{"blib.pm"}) ? [ $^X, '-Mblib' ] : [ $^X ]; | 
| 33 |  |  |  |  |  |  | }; | 
| 34 | 336 | 50 |  |  |  | 1084 | if (not ref $args->{go_perl}) { | 
| 35 |  |  |  |  |  |  | # user can override the perl to be used, either with an array ref | 
| 36 |  |  |  |  |  |  | # containing the command name and args to use, or with a string | 
| 37 |  |  |  |  |  |  | # (ie via the DSN) in which case, to enable args to be passed, | 
| 38 |  |  |  |  |  |  | # we split on two or more consecutive spaces (otherwise the path | 
| 39 |  |  |  |  |  |  | # to perl couldn't contain a space itself). | 
| 40 | 336 |  |  |  |  | 2795 | $args->{go_perl} = [ split /\s{2,}/, $args->{go_perl} ]; | 
| 41 |  |  |  |  |  |  | } | 
| 42 | 336 |  |  |  |  | 1971 | return $self->SUPER::new($args); | 
| 43 |  |  |  |  |  |  | } | 
| 44 |  |  |  |  |  |  |  | 
| 45 |  |  |  |  |  |  |  | 
| 46 |  |  |  |  |  |  | # nonblock($fh) puts filehandle into nonblocking mode | 
| 47 |  |  |  |  |  |  | sub nonblock { | 
| 48 | 776 |  |  | 776 | 0 | 1867 | my $fh = shift; | 
| 49 | 776 | 50 |  |  |  | 4141 | my $flags = fcntl($fh, F_GETFL, 0) | 
| 50 |  |  |  |  |  |  | or croak "Can't get flags for filehandle $fh: $!"; | 
| 51 | 776 | 50 |  |  |  | 4504 | fcntl($fh, F_SETFL, $flags | O_NONBLOCK) | 
| 52 |  |  |  |  |  |  | or croak "Can't make filehandle $fh nonblocking: $!"; | 
| 53 |  |  |  |  |  |  | } | 
| 54 |  |  |  |  |  |  |  | 
| 55 |  |  |  |  |  |  |  | 
| 56 |  |  |  |  |  |  | sub start_pipe_command { | 
| 57 | 388 |  |  | 388 | 0 | 1161 | my ($self, $cmd) = @_; | 
| 58 | 388 | 50 |  |  |  | 1518 | $cmd = [ $cmd ] unless ref $cmd eq 'ARRAY'; | 
| 59 |  |  |  |  |  |  |  | 
| 60 |  |  |  |  |  |  | # if it's important that the subprocess uses the same | 
| 61 |  |  |  |  |  |  | # (versions of) modules as us then the caller should | 
| 62 |  |  |  |  |  |  | # set PERL5LIB itself. | 
| 63 |  |  |  |  |  |  |  | 
| 64 |  |  |  |  |  |  | # limit various forms of insanity, for now | 
| 65 | 388 |  |  |  |  | 2107 | local $ENV{DBI_TRACE}; # use DBI_GOFER_TRACE instead | 
| 66 | 388 |  |  |  |  | 2072 | local $ENV{DBI_AUTOPROXY}; | 
| 67 | 388 |  |  |  |  | 1476 | local $ENV{DBI_PROFILE}; | 
| 68 |  |  |  |  |  |  |  | 
| 69 | 388 |  |  |  |  | 2967 | my ($wfh, $rfh, $efh) = (gensym, gensym, gensym); | 
| 70 | 388 | 50 |  |  |  | 18963 | my $pid = open3($wfh, $rfh, $efh, @$cmd) | 
| 71 |  |  |  |  |  |  | or die "error starting @$cmd: $!\n"; | 
| 72 | 388 | 50 |  |  |  | 1286917 | if ($self->trace) { | 
| 73 | 0 |  |  |  |  | 0 | $self->trace_msg(sprintf("Started pid $pid: @$cmd {fd: w%d r%d e%d, ppid=$$}\n", fileno $wfh, fileno $rfh, fileno $efh),0); | 
| 74 |  |  |  |  |  |  | } | 
| 75 | 388 |  |  |  |  | 2332 | nonblock($rfh); | 
| 76 | 388 |  |  |  |  | 1628 | nonblock($efh); | 
| 77 | 388 |  |  |  |  | 6462 | my $ios = IO::Select->new($rfh, $efh); | 
| 78 |  |  |  |  |  |  |  | 
| 79 |  |  |  |  |  |  | return { | 
| 80 | 388 |  |  |  |  | 53749 | cmd=>$cmd, | 
| 81 |  |  |  |  |  |  | pid=>$pid, | 
| 82 |  |  |  |  |  |  | wfh=>$wfh, rfh=>$rfh, efh=>$efh, | 
| 83 |  |  |  |  |  |  | ios=>$ios, | 
| 84 |  |  |  |  |  |  | }; | 
| 85 |  |  |  |  |  |  | } | 
| 86 |  |  |  |  |  |  |  | 
| 87 |  |  |  |  |  |  |  | 
| 88 |  |  |  |  |  |  | sub cmd_as_string { | 
| 89 | 0 |  |  | 0 | 0 | 0 | my $self = shift; | 
| 90 |  |  |  |  |  |  | # XXX meant to return a properly shell-escaped string suitable for system | 
| 91 |  |  |  |  |  |  | # but its only for debugging so that can wait | 
| 92 | 0 |  |  |  |  | 0 | my $connection_info = $self->connection_info; | 
| 93 | 0 | 0 |  |  |  | 0 | return join " ", map { (m/^[-:\w]*$/) ? $_ : "'$_'" } @{$connection_info->{cmd}}; | 
|  | 0 |  |  |  |  | 0 |  | 
|  | 0 |  |  |  |  | 0 |  | 
| 94 |  |  |  |  |  |  | } | 
| 95 |  |  |  |  |  |  |  | 
| 96 |  |  |  |  |  |  |  | 
| 97 |  |  |  |  |  |  | sub transmit_request_by_transport { | 
| 98 | 380 |  |  | 380 | 0 | 1237 | my ($self, $request) = @_; | 
| 99 |  |  |  |  |  |  |  | 
| 100 | 380 |  |  |  |  | 2090 | my $frozen_request = $self->freeze_request($request); | 
| 101 |  |  |  |  |  |  |  | 
| 102 | 380 |  |  |  |  | 968 | my $cmd = [ @{$self->go_perl}, qw(-MDBI::Gofer::Transport::pipeone -e run_one_stdio)]; | 
|  | 380 |  |  |  |  | 1649 |  | 
| 103 | 380 |  |  |  |  | 1712 | my $info = $self->start_pipe_command($cmd); | 
| 104 |  |  |  |  |  |  |  | 
| 105 | 380 |  |  |  |  | 1503 | my $wfh = delete $info->{wfh}; | 
| 106 |  |  |  |  |  |  | # send frozen request | 
| 107 | 380 |  |  |  |  | 2437 | local $\; | 
| 108 | 380 | 50 |  |  |  | 6153 | print $wfh $frozen_request | 
| 109 |  |  |  |  |  |  | or warn "error writing to @$cmd: $!\n"; | 
| 110 |  |  |  |  |  |  | # indicate that there's no more | 
| 111 | 380 | 50 |  |  |  | 3112 | close $wfh | 
| 112 |  |  |  |  |  |  | or die "error closing pipe to @$cmd: $!\n"; | 
| 113 |  |  |  |  |  |  |  | 
| 114 | 380 |  |  |  |  | 3126 | $self->connection_info( $info ); | 
| 115 | 380 |  |  |  |  | 10895 | return; | 
| 116 |  |  |  |  |  |  | } | 
| 117 |  |  |  |  |  |  |  | 
| 118 |  |  |  |  |  |  |  | 
| 119 |  |  |  |  |  |  | sub read_response_from_fh { | 
| 120 | 760 |  |  | 760 | 0 | 2111 | my ($self, $fh_actions) = @_; | 
| 121 | 760 |  |  |  |  | 2349 | my $trace = $self->trace; | 
| 122 |  |  |  |  |  |  |  | 
| 123 | 760 |  | 50 |  |  | 2344 | my $info = $self->connection_info || die; | 
| 124 | 760 |  |  |  |  | 1674 | my ($ios) = @{$info}{qw(ios)}; | 
|  | 760 |  |  |  |  | 1872 |  | 
| 125 | 760 |  |  |  |  | 1550 | my $errors = 0; | 
| 126 | 760 |  |  |  |  | 1319 | my $complete; | 
| 127 |  |  |  |  |  |  |  | 
| 128 | 760 | 50 |  |  |  | 3663 | die "No handles to read response from" unless $ios->count; | 
| 129 |  |  |  |  |  |  |  | 
| 130 | 760 |  |  |  |  | 5667 | while ($ios->count) { | 
| 131 | 1140 |  |  |  |  | 10113 | my @readable = $ios->can_read(); | 
| 132 | 1140 |  |  |  |  | 50099634 | for my $fh (@readable) { | 
| 133 | 1520 |  |  |  |  | 3690 | local $_; | 
| 134 | 1520 |  | 50 |  |  | 8996 | my $actions = $fh_actions->{$fh} || die "panic: no action for $fh"; | 
| 135 | 1520 |  |  |  |  | 20000 | my $rv = sysread($fh, $_='', 1024*31);  # to fit in 32KB slab | 
| 136 | 1520 | 100 |  |  |  | 4923 | unless ($rv) {              # error (undef) or end of file (0) | 
| 137 | 760 |  |  |  |  | 1350 | my $action; | 
| 138 | 760 | 50 |  |  |  | 2167 | unless (defined $rv) {  # was an error | 
| 139 | 0 | 0 |  |  |  | 0 | $self->trace_msg("error on handle $fh: $!\n") if $trace >= 4; | 
| 140 | 0 |  | 0 |  |  | 0 | $action = $actions->{error} || $actions->{eof}; | 
| 141 | 0 |  |  |  |  | 0 | ++$errors; | 
| 142 |  |  |  |  |  |  | # XXX an error may be a permenent condition of the handle | 
| 143 |  |  |  |  |  |  | # if so we'll loop here - not good | 
| 144 |  |  |  |  |  |  | } | 
| 145 |  |  |  |  |  |  | else { | 
| 146 | 760 |  |  |  |  | 2203 | $action = $actions->{eof}; | 
| 147 | 760 | 50 |  |  |  | 2232 | $self->trace_msg("eof on handle $fh\n") if $trace >= 4; | 
| 148 |  |  |  |  |  |  | } | 
| 149 | 760 | 50 |  |  |  | 2807 | if ($action->($fh)) { | 
| 150 | 760 | 50 |  |  |  | 2029 | $self->trace_msg("removing $fh from handle set\n") if $trace >= 4; | 
| 151 | 760 |  |  |  |  | 4182 | $ios->remove($fh); | 
| 152 |  |  |  |  |  |  | } | 
| 153 | 760 |  |  |  |  | 43107 | next; | 
| 154 |  |  |  |  |  |  | } | 
| 155 |  |  |  |  |  |  | # action returns true if the response is now complete | 
| 156 |  |  |  |  |  |  | # (we finish all handles | 
| 157 | 760 | 100 |  |  |  | 3893 | $actions->{read}->($fh) && ++$complete; | 
| 158 |  |  |  |  |  |  | } | 
| 159 | 1140 | 100 |  |  |  | 6822 | last if $complete; | 
| 160 |  |  |  |  |  |  | } | 
| 161 | 760 |  |  |  |  | 3876 | return $errors; | 
| 162 |  |  |  |  |  |  | } | 
| 163 |  |  |  |  |  |  |  | 
| 164 |  |  |  |  |  |  |  | 
| 165 |  |  |  |  |  |  | sub receive_response_by_transport { | 
| 166 | 380 |  |  | 380 | 0 | 1120 | my $self = shift; | 
| 167 |  |  |  |  |  |  |  | 
| 168 | 380 |  | 50 |  |  | 1450 | my $info = $self->connection_info || die; | 
| 169 | 380 |  |  |  |  | 1130 | my ($pid, $rfh, $efh, $ios, $cmd) = @{$info}{qw(pid rfh efh ios cmd)}; | 
|  | 380 |  |  |  |  | 1676 |  | 
| 170 |  |  |  |  |  |  |  | 
| 171 | 380 |  |  |  |  | 1057 | my $frozen_response; | 
| 172 |  |  |  |  |  |  | my $stderr_msg; | 
| 173 |  |  |  |  |  |  |  | 
| 174 |  |  |  |  |  |  | $self->read_response_from_fh( { | 
| 175 |  |  |  |  |  |  | $efh => { | 
| 176 | 0 |  |  | 0 |  | 0 | error => sub { warn "error reading response stderr: $!"; 1 }, | 
|  | 0 |  |  |  |  | 0 |  | 
| 177 | 380 |  |  | 380 |  | 777 | eof   => sub { warn "eof on stderr" if 0; 1 }, | 
|  | 380 |  |  |  |  | 1106 |  | 
| 178 | 0 |  |  | 0 |  | 0 | read  => sub { $stderr_msg .= $_; 0 }, | 
|  | 0 |  |  |  |  | 0 |  | 
| 179 |  |  |  |  |  |  | }, | 
| 180 |  |  |  |  |  |  | $rfh => { | 
| 181 | 0 |  |  | 0 |  | 0 | error => sub { warn "error reading response: $!"; 1 }, | 
|  | 0 |  |  |  |  | 0 |  | 
| 182 | 380 |  |  | 380 |  | 785 | eof   => sub { warn "eof on stdout" if 0; 1 }, | 
|  | 380 |  |  |  |  | 1379 |  | 
| 183 | 380 |  |  | 380 |  | 2209 | read  => sub { $frozen_response .= $_; 0 }, | 
|  | 380 |  |  |  |  | 2790 |  | 
| 184 |  |  |  |  |  |  | }, | 
| 185 | 380 |  |  |  |  | 11753 | }); | 
| 186 |  |  |  |  |  |  |  | 
| 187 | 380 | 50 |  |  |  | 19720 | waitpid $info->{pid}, 0 | 
| 188 |  |  |  |  |  |  | or warn "waitpid: $!"; # XXX do something more useful? | 
| 189 |  |  |  |  |  |  |  | 
| 190 | 380 | 50 |  |  |  | 1775 | die ref($self)." command (@$cmd) failed: $stderr_msg" | 
| 191 |  |  |  |  |  |  | if not $frozen_response; # no output on stdout at all | 
| 192 |  |  |  |  |  |  |  | 
| 193 |  |  |  |  |  |  | # XXX need to be able to detect and deal with corruption | 
| 194 | 380 |  |  |  |  | 3969 | my $response = $self->thaw_response($frozen_response); | 
| 195 |  |  |  |  |  |  |  | 
| 196 | 380 | 50 |  |  |  | 1377 | if ($stderr_msg) { | 
| 197 |  |  |  |  |  |  | # add stderr messages as warnings (for PrintWarn) | 
| 198 | 0 | 0 | 0 |  |  | 0 | $response->add_err(0, $stderr_msg, undef, $self->trace) | 
| 199 |  |  |  |  |  |  | # but ignore warning from old version of blib | 
| 200 |  |  |  |  |  |  | unless $stderr_msg =~ /^Using .*blib/ && "@$cmd" =~ /-Mblib/; | 
| 201 |  |  |  |  |  |  | } | 
| 202 |  |  |  |  |  |  |  | 
| 203 | 380 |  |  |  |  | 2310 | return $response; | 
| 204 |  |  |  |  |  |  | } | 
| 205 |  |  |  |  |  |  |  | 
| 206 |  |  |  |  |  |  |  | 
| 207 |  |  |  |  |  |  | 1; | 
| 208 |  |  |  |  |  |  |  | 
| 209 |  |  |  |  |  |  | __END__ |