File Coverage

blib/lib/IO/Pipely.pm
Criterion Covered Total %
statement 73 179 40.7
branch 25 118 21.1
condition 2 19 10.5
subroutine 12 13 92.3
pod 2 2 100.0
total 114 331 34.4


line stmt bran cond sub pod time code
1             package IO::Pipely;
2             {
3             $IO::Pipely::VERSION = '0.005';
4             }
5              
6 2     2   49387 use warnings;
  2         4  
  2         75  
7 2     2   12 use strict;
  2         39  
  2         83  
8              
9 2     2   1813 use Symbol qw(gensym);
  2         1934  
  2         165  
10 2         11 use IO::Socket qw(
11             AF_UNIX
12             PF_INET
13             PF_UNSPEC
14             SOCK_STREAM
15             SOL_SOCKET
16             SOMAXCONN
17             SO_ERROR
18             SO_REUSEADDR
19             inet_aton
20             pack_sockaddr_in
21             unpack_sockaddr_in
22 2     2   1641 );
  2         46575  
23 2     2   562 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
  2         3  
  2         109  
24 2     2   9 use Errno qw(EINPROGRESS EWOULDBLOCK);
  2         4  
  2         369  
25              
26             our @EXPORT_OK = qw(pipely socketpairly);
27 2     2   11 use base qw(Exporter);
  2         3  
  2         662  
28              
29             # The order of pipe primitives depends on the platform.
30              
31             # It's not always safe to assume that a function can be used if it's
32             # present.
33              
34             my (@oneway_pipe_types, @twoway_pipe_types);
35             if ($^O eq "MSWin32" or $^O eq "MacOS") {
36             @oneway_pipe_types = qw(inet socketpair pipe);
37             @twoway_pipe_types = qw(inet socketpair pipe);
38             }
39             elsif ($^O eq "cygwin") {
40             @oneway_pipe_types = qw(pipe inet socketpair);
41             @twoway_pipe_types = qw(inet pipe socketpair);
42             }
43             else {
44             @oneway_pipe_types = qw(pipe socketpair inet);
45             @twoway_pipe_types = qw(socketpair inet pipe);
46             }
47              
48             # Provide dummy constants so things at least compile. These constants
49             # aren't used if we're RUNNING_IN_HELL, but Perl needs to see them.
50              
51             BEGIN {
52             # older perls than 5.10 needs a kick in the arse to AUTOLOAD the constant...
53 2 50   2   11 eval "F_GETFL" if $] < 5.010;
54              
55 2 50       3597 if ( ! defined &Fcntl::F_GETFL ) {
56 0 0       0 if ( ! defined prototype "F_GETFL" ) {
57 0         0 *F_GETFL = sub { 0 };
  0         0  
58 0         0 *F_SETFL = sub { 0 };
  0         0  
59             } else {
60 0         0 *F_GETFL = sub () { 0 };
61 0         0 *F_SETFL = sub () { 0 };
62             }
63             }
64             }
65              
66             # Make a socket. This is a homebrew socketpair() for systems that
67             # don't support it. The things I must do to make Windows happy.
68              
69             sub _make_socket {
70              
71             ### Server side.
72              
73 0     0   0 my $acceptor = gensym();
74 0         0 my $accepted = gensym();
75              
76 0 0       0 my $tcp = getprotobyname('tcp') or die "getprotobyname: $!";
77 0 0       0 socket( $acceptor, PF_INET, SOCK_STREAM, $tcp ) or die "socket: $!";
78              
79 0 0       0 setsockopt( $acceptor, SOL_SOCKET, SO_REUSEADDR, 1) or die "reuse: $!";
80              
81 0 0       0 my $server_addr = inet_aton('127.0.0.1') or die "inet_aton: $!";
82 0 0       0 $server_addr = pack_sockaddr_in(0, $server_addr)
83             or die "sockaddr_in: $!";
84              
85 0 0       0 bind( $acceptor, $server_addr ) or die "bind: $!";
86              
87 0         0 $acceptor->blocking(0);
88              
89 0         0 $server_addr = getsockname($acceptor);
90              
91 0 0       0 listen( $acceptor, SOMAXCONN ) or die "listen: $!";
92              
93             ### Client side.
94              
95 0         0 my $connector = gensym();
96              
97 0 0       0 socket( $connector, PF_INET, SOCK_STREAM, $tcp ) or die "socket: $!";
98              
99 0         0 $connector->blocking(0);
100              
101 0 0       0 unless (connect( $connector, $server_addr )) {
102 0 0 0     0 die "connect: $!" if $! and ($! != EINPROGRESS) and ($! != EWOULDBLOCK);
      0        
103             }
104              
105 0         0 my $connector_address = getsockname($connector);
106 0         0 my ($connector_port, $connector_addr) =
107             unpack_sockaddr_in($connector_address);
108              
109             ### Loop around 'til it's all done. I thought I was done writing
110             ### select loops. Damnit.
111              
112 0         0 my $in_read = '';
113 0         0 my $in_write = '';
114              
115 0         0 vec( $in_read, fileno($acceptor), 1 ) = 1;
116 0         0 vec( $in_write, fileno($connector), 1 ) = 1;
117              
118 0         0 my $done = 0;
119 0         0 while ($done != 0x11) {
120 0         0 my $hits = select( my $out_read = $in_read,
121             my $out_write = $in_write,
122             undef,
123             5
124             );
125 0 0       0 unless ($hits) {
126 0 0 0     0 next if ($! and ($! == EINPROGRESS) or ($! == EWOULDBLOCK));
      0        
127 0 0       0 die "select: $!" unless $hits;
128             }
129              
130             # Accept happened.
131 0 0       0 if (vec($out_read, fileno($acceptor), 1)) {
132 0         0 my $peer = accept($accepted, $acceptor);
133 0         0 my ($peer_port, $peer_addr) = unpack_sockaddr_in($peer);
134              
135 0 0 0     0 if ( $peer_port == $connector_port and
136             $peer_addr eq $connector_addr
137             ) {
138 0         0 vec($in_read, fileno($acceptor), 1) = 0;
139 0         0 $done |= 0x10;
140             }
141             }
142              
143             # Connect happened.
144 0 0       0 if (vec($out_write, fileno($connector), 1)) {
145 0         0 $! = unpack('i', getsockopt($connector, SOL_SOCKET, SO_ERROR));
146 0 0       0 die "connect: $!" if $!;
147              
148 0         0 vec($in_write, fileno($connector), 1) = 0;
149 0         0 $done |= 0x01;
150             }
151             }
152              
153             # Turn blocking back on, damnit.
154 0         0 $accepted->blocking(1);
155 0         0 $connector->blocking(1);
156              
157 0         0 return ($accepted, $connector);
158             }
159              
160             sub pipely {
161 2     2 1 726 my %arg = @_;
162              
163 2         5 my $conduit_type = delete($arg{type});
164 2   50     13 my $debug = delete($arg{debug}) || 0;
165              
166             # Generate symbols to be used as filehandles for the pipe's ends.
167             #
168             # Filehandle autovivification isn't used for portability with older
169             # versions of Perl.
170              
171 2         10 my ($a_read, $b_write) = (gensym(), gensym());
172              
173             # Try the specified conduit type only. No fallback.
174              
175 2 50       39 if (defined $conduit_type) {
176 2 50       8 return ($a_read, $b_write) if _try_oneway_type(
177             $conduit_type, $debug, \$a_read, \$b_write
178             );
179             }
180              
181             # Otherwise try all available conduit types until one works.
182             # Conduit types that fail are discarded for speed.
183              
184 0         0 while (my $try_type = $oneway_pipe_types[0]) {
185 0 0       0 return ($a_read, $b_write) if _try_oneway_type(
186             $try_type, $debug, \$a_read, \$b_write
187             );
188 0         0 shift @oneway_pipe_types;
189             }
190              
191             # There's no conduit type left. Bummer!
192              
193 0 0       0 $debug and warn "nothing worked";
194 0         0 return;
195             }
196              
197             sub socketpairly {
198 2     2 1 1303 my %arg = @_;
199              
200 2         6 my $conduit_type = delete($arg{type});
201 2   50     12 my $debug = delete($arg{debug}) || 0;
202              
203             # Generate symbols to be used as filehandles for the pipe's ends.
204             #
205             # Filehandle autovivification isn't used for portability with older
206             # versions of Perl.
207              
208 2         9 my ($a_read, $a_write) = (gensym(), gensym());
209 2         43 my ($b_read, $b_write) = (gensym(), gensym());
210              
211 2 50       36 if (defined $conduit_type) {
212 2 50       7 return ($a_read, $a_write, $b_read, $b_write) if _try_twoway_type(
213             $conduit_type, $debug,
214             \$a_read, \$a_write,
215             \$b_read, \$b_write
216             );
217             }
218              
219 0         0 while (my $try_type = $twoway_pipe_types[0]) {
220 0 0       0 return ($a_read, $a_write, $b_read, $b_write) if _try_twoway_type(
221             $try_type, $debug,
222             \$a_read, \$a_write,
223             \$b_read, \$b_write
224             );
225 0         0 shift @oneway_pipe_types;
226             }
227              
228             # There's no conduit type left. Bummer!
229              
230 0 0       0 $debug and warn "nothing worked";
231 0         0 return;
232             }
233              
234             # Try a pipe by type.
235              
236             sub _try_oneway_type {
237 2     2   3 my ($type, $debug, $a_read, $b_write) = @_;
238              
239             # Try a pipe().
240 2 100       7 if ($type eq "pipe") {
241 1         2 eval {
242 1 50       30 pipe($$a_read, $$b_write) or die "pipe failed: $!";
243             };
244              
245             # Pipe failed.
246 1 50       4 if (length $@) {
247 0 0       0 warn "pipe failed: $@" if $debug;
248 0         0 return;
249             }
250              
251 1 50       3 $debug and do {
252 0         0 warn "using a pipe";
253 0         0 warn "ar($$a_read) bw($$b_write)\n";
254             };
255              
256             # Turn off buffering. POE::Kernel does this for us, but
257             # someone might want to use the pipe class elsewhere.
258 1         7 select((select($$b_write), $| = 1)[0]);
259 1         6 return 1;
260             }
261              
262             # Try a UNIX-domain socketpair.
263 1 50       6 if ($type eq "socketpair") {
264 1         2 eval {
265 1 50       66 socketpair($$a_read, $$b_write, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
266             or die "socketpair failed: $!";
267             };
268              
269 1 50       4 if (length $@) {
270 0 0       0 warn "socketpair failed: $@" if $debug;
271 0         0 return;
272             }
273              
274 1 50       4 $debug and do {
275 0         0 warn "using a UNIX domain socketpair";
276 0         0 warn "ar($$a_read) bw($$b_write)\n";
277             };
278              
279             # It's one-way, so shut down the unused directions.
280 1         10 shutdown($$a_read, 1);
281 1         4 shutdown($$b_write, 0);
282              
283             # Turn off buffering. POE::Kernel does this for us, but someone
284             # might want to use the pipe class elsewhere.
285 1         6 select((select($$b_write), $| = 1)[0]);
286 1         7 return 1;
287             }
288              
289             # Try a pair of plain INET sockets.
290 0 0       0 if ($type eq "inet") {
291 0         0 eval {
292 0         0 ($$a_read, $$b_write) = _make_socket();
293             };
294              
295 0 0       0 if (length $@) {
296 0 0       0 warn "make_socket failed: $@" if $debug;
297 0         0 return;
298             }
299              
300 0 0       0 $debug and do {
301 0         0 warn "using a plain INET socket";
302 0         0 warn "ar($$a_read) bw($$b_write)\n";
303             };
304              
305             # It's one-way, so shut down the unused directions.
306 0         0 shutdown($$a_read, 1);
307 0         0 shutdown($$b_write, 0);
308              
309             # Turn off buffering. POE::Kernel does this for us, but someone
310             # might want to use the pipe class elsewhere.
311 0         0 select((select($$b_write), $| = 1)[0]);
312 0         0 return 1;
313             }
314              
315             # There's nothing left to try.
316 0 0       0 $debug and warn "unknown pipely() socket type ``$type''";
317 0         0 return;
318             }
319              
320             # Try a pipe by type.
321              
322             sub _try_twoway_type {
323 2     2   6 my ($type, $debug, $a_read, $a_write, $b_read, $b_write) = @_;
324              
325             # Try a socketpair().
326 2 100       6 if ($type eq "socketpair") {
327 1         3 eval {
328 1 50       73 socketpair($$a_read, $$b_read, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
329             or die "socketpair 1 failed: $!";
330             };
331              
332             # Socketpair failed.
333 1 50       5 if (length $@) {
334 0 0       0 warn "socketpair failed: $@" if $debug;
335 0         0 return;
336             }
337              
338 1 50       5 $debug and do {
339 0         0 warn "using UNIX domain socketpairs";
340 0         0 warn "ar($$a_read) aw($$a_write) br($$b_read) bw($$b_write)\n";
341             };
342              
343             # It's two-way, so each reader is also a writer.
344 1         2 $$a_write = $$a_read;
345 1         2 $$b_write = $$b_read;
346              
347             # Turn off buffering. POE::Kernel does this for us, but someone
348             # might want to use the pipe class elsewhere.
349 1         6 select((select($$a_write), $| = 1)[0]);
350 1         5 select((select($$b_write), $| = 1)[0]);
351 1         6 return 1;
352             }
353              
354             # Try a couple pipe() calls.
355 1 50       4 if ($type eq "pipe") {
356 1         2 eval {
357 1 50       33 pipe($$a_read, $$b_write) or die "pipe 1 failed: $!";
358 1 50       19 pipe($$b_read, $$a_write) or die "pipe 2 failed: $!";
359             };
360              
361             # Pipe failed.
362 1 50       5 if (length $@) {
363 0 0       0 warn "pipe failed: $@" if $debug;
364 0         0 return;
365             }
366              
367 1 50       4 $debug and do {
368 0         0 warn "using a pipe";
369 0         0 warn "ar($$a_read) aw($$a_write) br($$b_read) bw($$b_write)\n";
370             };
371              
372             # Turn off buffering. POE::Kernel does this for us, but someone
373             # might want to use the pipe class elsewhere.
374 1         9 select((select($$a_write), $| = 1)[0]);
375 1         4 select((select($$b_write), $| = 1)[0]);
376 1         7 return 1;
377             }
378              
379             # Try a pair of plain INET sockets.
380 0 0         if ($type eq "inet") {
381 0           eval {
382 0           ($$a_read, $$b_read) = _make_socket();
383             };
384              
385             # Sockets failed.
386 0 0         if (length $@) {
387 0 0         warn "make_socket failed: $@" if $debug;
388 0           return;
389             }
390              
391 0 0         $debug and do {
392 0           warn "using a plain INET socket";
393 0           warn "ar($$a_read) aw($$a_write) br($$b_read) bw($$b_write)\n";
394             };
395              
396 0           $$a_write = $$a_read;
397 0           $$b_write = $$b_read;
398              
399             # Turn off buffering. POE::Kernel does this for us, but someone
400             # might want to use the pipe class elsewhere.
401 0           select((select($$a_write), $| = 1)[0]);
402 0           select((select($$b_write), $| = 1)[0]);
403 0           return 1;
404             }
405              
406 0 0         $debug and warn "unknown pipely(2) socket type ``$type''";
407 0           return;
408             }
409              
410             1;
411              
412             __END__