File Coverage

blib/lib/Starlet/Server.pm
Criterion Covered Total %
statement 311 353 88.1
branch 113 180 62.7
condition 46 81 56.7
subroutine 41 44 93.1
pod 0 9 0.0
total 511 667 76.6


line stmt bran cond sub pod time code
1             package Starlet::Server;
2 114     114   926 use strict;
  114         245  
  114         3980  
3 114     114   597 use warnings;
  114         172  
  114         3031  
4              
5 114     114   526 use Carp ();
  114         234  
  114         2138  
6 114     114   60931 use Plack;
  114         15283  
  114         3962  
7 114     114   57426 use Plack::HTTPParser qw( parse_http_request );
  114         307948  
  114         7746  
8 114     114   785 use IO::Socket::INET;
  114         212  
  114         2659  
9 114     114   125180 use HTTP::Date;
  114         361442  
  114         9055  
10 114     114   15047 use HTTP::Status;
  114         93686  
  114         43146  
11 114     114   760 use List::Util qw(max sum);
  114         165  
  114         8414  
12 114     114   608 use Plack::Util;
  114         276  
  114         2763  
13 114     114   55735 use Plack::TempBuffer;
  114         695814  
  114         3365  
14 114     114   748 use POSIX qw(EINTR EAGAIN EWOULDBLOCK);
  114         203  
  114         960  
15 114     114   11420 use Socket qw(IPPROTO_TCP TCP_NODELAY);
  114         178  
  114         9520  
16 114     114   2215 use File::Temp qw(tempfile);
  114         17723  
  114         15563  
17 114     114   553 use Fcntl qw(:flock);
  114         193  
  114         17382  
18              
19 114     114   737 use Try::Tiny;
  114         168  
  114         6578  
20 114     114   508 use Time::HiRes qw(time);
  114         132  
  114         1866  
21              
22 114     114   30601 use constant MAX_REQUEST_SIZE => 131072;
  114         167  
  114         9034  
23 114     114   560 use constant CHUNKSIZE => 64 * 1024;
  114         141  
  114         6565  
24 114     114   823 use constant MSWin32 => $^O eq 'MSWin32';
  114         142  
  114         192195  
25              
26 114     114   680 my $null_io = do { open my $io, "<", \""; $io };
  114         164  
  114         3202  
27              
28             sub new {
29 114     114 0 404 my($class, %args) = @_;
30              
31             my $self = bless {
32             listens => $args{listens} || [],
33             host => $args{host} || 0,
34             port => $args{port} || $args{socket} || 8080,
35             timeout => $args{timeout} || 300,
36             keepalive_timeout => $args{keepalive_timeout} || 2,
37             max_keepalive_reqs => $args{max_keepalive_reqs} || 1,
38             server_software => $args{server_software} || $class,
39       110     server_ready => $args{server_ready} || sub {},
40             min_reqs_per_child => (
41             defined $args{min_reqs_per_child}
42             ? $args{min_reqs_per_child} : undef,
43             ),
44             max_reqs_per_child => (
45             $args{max_reqs_per_child} || $args{max_requests} || 100,
46             ),
47             spawn_interval => $args{spawn_interval} || 0,
48             err_respawn_interval => (
49             defined $args{err_respawn_interval}
50             ? $args{err_respawn_interval} : undef,
51             ),
52             is_multiprocess => Plack::Util::FALSE,
53       83     child_exit => $args{child_exit} || sub {},
54 114 100 100     82041 _using_defer_accept => undef,
    50 100        
      100        
      50        
      50        
      50        
      33        
      100        
      50        
      50        
      100        
55             }, $class;
56              
57 114 50 33     563 if ($args{max_workers} && $args{max_workers} > 1) {
58 0         0 Carp::carp(
59             "Preforking in $class is deprecated. Falling back to the non-forking mode. ",
60             "If you need preforking, use Starman or Starlet instead and run like `plackup -s Starlet`",
61             );
62             }
63              
64 114         639 $self;
65             }
66              
67             sub run {
68 0     0 0 0 my($self, $app) = @_;
69 0         0 $self->setup_listener();
70 0         0 $self->accept_loop($app);
71             }
72              
73             sub setup_listener {
74 113     113 0 263 my $self = shift;
75 113 100       234 if (scalar(grep {defined $_} @{$self->{listens}}) == 0) {
  72         87  
  113         563  
76 104         203 my $sock;
77 104 100       875 if ($self->{port} =~ /^[0-9]+$/s) {
78             $sock = IO::Socket::INET->new(
79             Listen => SOMAXCONN,
80             LocalPort => $self->{port},
81             LocalAddr => $self->{host},
82 98 50       1621 Proto => 'tcp',
83             ReuseAddr => 1,
84             ) or die "failed to listen to port $self->{port}:$!";
85             } else {
86             $sock = IO::Socket::UNIX->new(
87             Listen => SOMAXCONN,
88             Local => $self->{port},
89 6 50       108 ) or die "failed to listen to socket $self->{port}:$!";
90             }
91             $self->{listens}[fileno($sock)] = {
92             host => $self->{host},
93             port => $self->{port},
94 104         47034 sock => $sock,
95             };
96             }
97              
98 113         239 my @listens = grep {defined $_} @{$self->{listens}};
  500         661  
  113         373  
99 113         311 for my $listen (@listens) {
100 131         1518 my $family = Socket::sockaddr_family(getsockname($listen->{sock}));
101 131         529 $listen->{_is_tcp} = $family != AF_UNIX;
102              
103             # set defer accept
104 131 100 33     974 if ($^O eq 'linux' && $listen->{_is_tcp}) {
105             setsockopt($listen->{sock}, IPPROTO_TCP, 9, 1)
106 107 50       712 and $listen->{_using_defer_accept} = 1;
107             }
108             }
109              
110 113 100       400 if (scalar(@listens) > 1) {
111 3   33     12 $self->{lock_path} ||= do {
112 3         45 my ($fh, $lock_path) = tempfile(UNLINK => 1);
113             # closing the file handle explicitly for two reasons
114             # 1) tempfile retains the handle when UNLINK is set
115             # 2) tempfile implicitely locks the file on OS X
116 3         2046 close $fh;
117 3         15 $lock_path;
118             };
119             }
120              
121 113         350 $self->{server_ready}->($self);
122             }
123              
124             sub accept_loop {
125             # TODO handle $max_reqs_per_child
126 100     100 0 1035 my($self, $app, $max_reqs_per_child) = @_;
127 100         603 my $proc_req_count = 0;
128 100         628 my $is_keepalive = 0;
129              
130             local $SIG{TERM} = sub {
131 84     84   27376395 $self->{term_received} = 1;
132 100         5888 };
133 100         1439 local $SIG{PIPE} = 'IGNORE';
134              
135 100         1960 my $acceptor = $self->_get_acceptor;
136              
137 100   33     2403 while (! defined $max_reqs_per_child || $proc_req_count < $max_reqs_per_child) {
138             # accept (or exit on SIGTERM)
139 271 100       12707 if ($self->{term_received}) {
140 84         1242 $self->{child_exit}->($self, $app);
141 84         27899 exit 0;
142             }
143 187         955 my ($conn, $peer, $listen) = $acceptor->();
144 187 100       1977 next unless $conn;
145              
146 103         812 $self->{_is_deferred_accept} = $listen->{_using_defer_accept};
147 103 50       1921 defined($conn->blocking(0))
148             or die "failed to set socket to nonblocking mode:$!";
149 103         4274 my ($peerport, $peerhost, $peeraddr) = (0, undef, undef);
150 103 100       689 if ($listen->{_is_tcp}) {
151 97 50       1347 $conn->setsockopt(IPPROTO_TCP, TCP_NODELAY, 1)
152             or die "setsockopt(TCP_NODELAY) failed:$!";
153 97         2421 ($peerport, $peerhost) = unpack_sockaddr_in $peer;
154 97         1347 $peeraddr = inet_ntoa($peerhost);
155             }
156 103         426 my $req_count = 0;
157 103         471 my $pipelined_buf = '';
158              
159 103         238 while (1) {
160 103         198 ++$req_count;
161 103         200 ++$proc_req_count;
162             my $env = {
163             SERVER_PORT => $listen->{port} || 0,
164             SERVER_NAME => $listen->{host} || 0,
165             SCRIPT_NAME => '',
166             REMOTE_ADDR => $peeraddr,
167             REMOTE_PORT => $peerport,
168             'psgi.version' => [ 1, 1 ],
169             'psgi.errors' => *STDERR,
170             'psgi.url_scheme' => 'http',
171             'psgi.run_once' => Plack::Util::FALSE,
172             'psgi.multithread' => Plack::Util::FALSE,
173             'psgi.multiprocess' => $self->{is_multiprocess},
174             'psgi.streaming' => Plack::Util::TRUE,
175             'psgi.nonblocking' => Plack::Util::FALSE,
176             'psgix.input.buffered' => Plack::Util::TRUE,
177             'psgix.io' => $conn,
178             'psgix.harakiri' => 1,
179             'psgix.informational' => sub {
180 0     0   0 $self->_informational($conn, @_);
181             },
182 103   50     6248 };
      100        
183              
184 103         508 my $may_keepalive = $req_count < $self->{max_keepalive_reqs};
185 103 0 33     641 if ($may_keepalive && $max_reqs_per_child && $proc_req_count >= $max_reqs_per_child) {
      33        
186 0         0 $may_keepalive = undef;
187             }
188 103 50       591 $may_keepalive = 1 if length $pipelined_buf;
189 103         178 my $keepalive;
190 103         1811 ($keepalive, $pipelined_buf) = $self->handle_connection($env, $conn, $app,
191             $may_keepalive, $req_count != 1, $pipelined_buf);
192              
193 102 100       592 if ($env->{'psgix.harakiri.commit'}) {
194 15         258 $conn->close;
195 15         7501 return;
196             }
197 87 50       1132 last unless $keepalive;
198             # TODO add special cases for clients with broken keep-alive support, as well as disabling keep-alive for HTTP/1.0 proxies
199             }
200 87         778 $conn->close;
201             }
202             }
203              
204             sub _get_acceptor {
205 100     100   345 my $self = shift;
206 100         544 my @listens = grep {defined $_} @{$self->{listens}};
  436         1699  
  100         1410  
207              
208 100 100       1088 if (scalar(@listens) == 1) {
209 98         613 my $listen = $listens[0];
210             return sub {
211 178 100   178   6046 if (my ($conn, $peer) = $listen->{sock}->accept) {
212 96         10198491 return ($conn, $peer, $listen);
213             }
214 82         362 return +();
215 98         2212 };
216             }
217             else {
218             # wait for multiple sockets with select(2)
219 2         7 my @fds;
220 2         7 my $rin = '';
221 2         12 for my $listen (@listens) {
222 14 50       191 defined($listen->{sock}->blocking(0))
223             or die "failed to set listening socket to non-blocking mode:$!";
224 14         255 my $fd = fileno($listen->{sock});
225 14         19 push @fds, $fd;
226 14         56 vec($rin, $fd, 1) = 1;
227             }
228              
229             open(my $lock_fh, '>', $self->{lock_path})
230 2 50       232 or die "failed to open lock file:@{[$self->{lock_path}]}:$!";
  0         0  
231              
232             return sub {
233 9 50   9   962324 if (! flock($lock_fh, LOCK_EX)) {
234 0 0       0 die "failed to lock file:@{[$self->{lock_path}]}:$!"
  0         0  
235             if $! != EINTR;
236 0         0 return +();
237             }
238 9         962633 my $nfound = select(my $rout = $rin, undef, undef, undef);
239 9         137 for (my $i = 0; $nfound > 0; ++$i) {
240 28         29 my $fd = $fds[$i];
241 28 100       154 next unless vec($rout, $fd, 1);
242 7         36 --$nfound;
243 7         20 my $listen = $self->{listens}[$fd];
244 7 50       96 if (my ($conn, $peer) = $listen->{sock}->accept) {
245 7         1400 flock($lock_fh, LOCK_UN);
246 7         28 return ($conn, $peer, $listen);
247             }
248             }
249 2         45 flock($lock_fh, LOCK_UN);
250 2         7 return +();
251 2         72 };
252             }
253             }
254              
255             my $bad_response = [ 400, [ 'Content-Type' => 'text/plain', 'Connection' => 'close' ], [ 'Bad Request' ] ];
256             sub handle_connection {
257 103     103 0 396 my($self, $env, $conn, $app, $use_keepalive, $is_keepalive, $prebuf) = @_;
258            
259 103         524 my $buf = '';
260 103         438 my $pipelined_buf='';
261 103         265 my $res = $bad_response;
262            
263 103         227 while (1) {
264 103         198 my $rlen;
265 103 50       780 if ( $rlen = length $prebuf ) {
266 0         0 $buf = $prebuf;
267 0         0 undef $prebuf;
268             }
269             else {
270             $rlen = $self->read_timeout(
271             $conn, \$buf, MAX_REQUEST_SIZE - length($buf), length($buf),
272             $is_keepalive ? $self->{keepalive_timeout} : $self->{timeout},
273 103 50       1833 ) or return;
    100          
274             }
275 93         1548 my $reqlen = parse_http_request($buf, $env);
276 93 100       26601 if ($reqlen >= 0) {
277             # handle request
278 92         305 my $protocol = $env->{SERVER_PROTOCOL};
279 92 50       354 if ($use_keepalive) {
280 0 0       0 if ($self->{term_received}) {
    0          
281 0         0 $use_keepalive = undef;
282             } elsif ( $protocol eq 'HTTP/1.1' ) {
283 0 0       0 if (my $c = $env->{HTTP_CONNECTION}) {
284 0 0       0 $use_keepalive = undef
285             if $c =~ /^\s*close\s*/i;
286             }
287             } else {
288 0 0       0 if (my $c = $env->{HTTP_CONNECTION}) {
289 0 0       0 $use_keepalive = undef
290             unless $c =~ /^\s*keep-alive\s*/i;
291             } else {
292 0         0 $use_keepalive = undef;
293             }
294             }
295             }
296 92         434 $buf = substr $buf, $reqlen;
297 114     114   756 my $chunked = do { no warnings; lc delete $env->{HTTP_TRANSFER_ENCODING} eq 'chunked' };
  114         197  
  114         225859  
  92         135  
  92         426  
298              
299 92 50       544 if ( $env->{HTTP_EXPECT} ) {
300 0 0       0 if ( lc $env->{HTTP_EXPECT} eq '100-continue' ) {
301 0 0       0 $self->write_all($conn, "HTTP/1.1 100 Continue\015\012\015\012")
302             or return;
303             } else {
304 0         0 $res = [417,[ 'Content-Type' => 'text/plain', 'Connection' => 'close' ], [ 'Expectation Failed' ] ];
305 0         0 last;
306             }
307             }
308              
309 92 100       540 if (my $cl = $env->{CONTENT_LENGTH}) {
    100          
310 5         135 my $buffer = Plack::TempBuffer->new($cl);
311 5         440 while ($cl > 0) {
312 5         10 my $chunk;
313 5 100       16 if (length $buf) {
314 4         8 $chunk = $buf;
315 4         12 $buf = '';
316             } else {
317             $self->read_timeout(
318             $conn, \$chunk, $cl, 0, $self->{timeout})
319 1 50       5 or return;
320             }
321 4         17 $buffer->print($chunk);
322 4         221 $cl -= length $chunk;
323             }
324 4         19 $env->{'psgi.input'} = $buffer->rewind;
325             }
326             elsif ($chunked) {
327 1         50 my $buffer = Plack::TempBuffer->new;
328 1         199 my $chunk_buffer = '';
329 1         2 my $length;
330 1         2 DECHUNK: while(1) {
331 3         5 my $chunk;
332 3 100       9 if ( length $buf ) {
333 1         2 $chunk = $buf;
334 1         68 $buf = '';
335             }
336             else {
337             $self->read_timeout($conn, \$chunk, CHUNKSIZE, 0, $self->{timeout})
338 2 50       10 or return;
339             }
340              
341 3         81 $chunk_buffer .= $chunk;
342 3         151 while ( $chunk_buffer =~ s/^(([0-9a-fA-F]+).*\015\012)// ) {
343 79         183 my $trailer = $1;
344 79         130 my $chunk_len = hex $2;
345 79 100       252 if ($chunk_len == 0) {
    50          
346 1         6 last DECHUNK;
347             } elsif (length $chunk_buffer < $chunk_len + 2) {
348 0         0 $chunk_buffer = $trailer . $chunk_buffer;
349 0         0 last;
350             }
351 78         366 $buffer->print(substr $chunk_buffer, 0, $chunk_len, '');
352 78         1584 $chunk_buffer =~ s/^\015\012//;
353 78         568 $length += $chunk_len;
354             }
355             }
356 1         12 $env->{CONTENT_LENGTH} = $length;
357 1         9 $env->{'psgi.input'} = $buffer->rewind;
358             } else {
359 86 50       386 if ( $buf =~ m!^(?:GET|HEAD)! ) { #pipeline
360 0         0 $pipelined_buf = $buf;
361 0         0 $use_keepalive = 1; #force keepalive
362             } # else clear buffer
363 86         474 $env->{'psgi.input'} = $null_io;
364             }
365              
366 91         1761 $res = Plack::Util::run_app $app, $env;
367 90         14232 last;
368             }
369 1 50       4 if ($reqlen == -2) {
    50          
370             # request is incomplete, do nothing
371             } elsif ($reqlen == -1) {
372             # error, close conn
373 1         4 last;
374             }
375             }
376              
377 91 100       568 if (ref $res eq 'ARRAY') {
    50          
378 88         1004 $self->_handle_response($env->{SERVER_PROTOCOL}, $res, $conn, \$use_keepalive);
379             } elsif (ref $res eq 'CODE') {
380             $res->(sub {
381 3     3   173 $self->_handle_response($env->{SERVER_PROTOCOL}, $_[0], $conn, \$use_keepalive);
382 3         30 });
383             } else {
384 0         0 die "Bad response $res";
385             }
386            
387 91         633 return ($use_keepalive, $pipelined_buf);
388             }
389              
390             sub _informational {
391 0     0   0 my ($self, $conn, $status_code, $headers) = @_;
392              
393 0         0 my @lines = "HTTP/1.1 $status_code @{[ HTTP::Status::status_message($status_code) ]}\015\012";
  0         0  
394 0         0 for (my $i = 0; $i < @$headers; $i += 2) {
395 0         0 my $k = $headers->[$i];
396 0         0 my $v = $headers->[$i + 1];
397 0         0 push @lines, "$k: $v\015\012";
398             }
399 0         0 push @lines, "\015\012";
400              
401 0         0 $self->write_all($conn, join("", @lines), $self->{timeout});
402             }
403              
404             sub _handle_response {
405 91     91   227 my($self, $protocol, $res, $conn, $use_keepalive_r) = @_;
406 91         197 my $status_code = $res->[0];
407 91         164 my $headers = $res->[1];
408 91         134 my $body = $res->[2];
409            
410 91         151 my @lines;
411             my %send_headers;
412 91         465 for (my $i = 0; $i < @$headers; $i += 2) {
413 106         203 my $k = $headers->[$i];
414 106         258 my $v = $headers->[$i + 1];
415 106         227 my $lck = lc $k;
416 106 100       313 if ($lck eq 'connection') {
417 1 50 33     7 $$use_keepalive_r = undef
418             if $$use_keepalive_r && lc $v ne 'keep-alive';
419             } else {
420 105         372 push @lines, "$k: $v\015\012";
421 105         855 $send_headers{$lck} = $v;
422             }
423             }
424 91 100       393 if ( ! exists $send_headers{server} ) {
425 90         419 unshift @lines, "Server: $self->{server_software}\015\012";
426             }
427 91 50       734 if ( ! exists $send_headers{date} ) {
428 91         233 unshift @lines, "Date: @{[HTTP::Date::time2str()]}\015\012";
  91         778  
429             }
430              
431             # try to set content-length when keepalive can be used, or disable it
432 91         3014 my $use_chunked;
433 91 100 100     1038 if (defined($protocol) && $protocol eq 'HTTP/1.1') {
434 80 100 66     1399 if (defined $send_headers{'content-length'}
    100          
435             || defined $send_headers{'transfer-encoding'}) {
436             # ok
437             } elsif (!Plack::Util::status_with_no_entity_body($status_code)) {
438 75         927 push @lines, "Transfer-Encoding: chunked\015\012";
439 75         150 $use_chunked = 1;
440             }
441 80 50       470 push @lines, "Connection: close\015\012" unless $$use_keepalive_r;
442             } else {
443             # HTTP/1.0
444 11 50       46 if ($$use_keepalive_r) {
445 0 0 0     0 if (defined $send_headers{'content-length'}
    0 0        
446             || defined $send_headers{'transfer-encoding'}) {
447             # ok
448             } elsif (!Plack::Util::status_with_no_entity_body($status_code)
449             && defined(my $cl = Plack::Util::content_length($body))) {
450 0         0 push @lines, "Content-Length: $cl\015\012";
451             } else {
452 0         0 $$use_keepalive_r = undef;
453             }
454             }
455 11 50       24 push @lines, "Connection: keep-alive\015\012" if $$use_keepalive_r;
456 11 50       59 push @lines, "Connection: close\015\012" if !$$use_keepalive_r; #fmm..
457             }
458              
459 91         306 unshift @lines, "HTTP/1.1 $status_code @{[ HTTP::Status::status_message($status_code) ]}\015\012";
  91         1576  
460 91         1291 push @lines, "\015\012";
461            
462 91 100 100     1872 if (defined $body && ref $body eq 'ARRAY' && @$body == 1
      100        
      100        
463             && length $body->[0] < 8192) {
464             # combine response header and small request body
465 81         195 my $buf = $body->[0];
466 81 100       257 if ($use_chunked ) {
467 69         153 my $len = length $buf;
468 69         427 $buf = sprintf("%x",$len) . "\015\012" . $buf . "\015\012" . '0' . "\015\012\015\012";
469             }
470             $self->write_all(
471             $conn, join('', @lines, $buf), $self->{timeout},
472 81         938 );
473 81         404 return;
474             }
475             $self->write_all($conn, join('', @lines), $self->{timeout})
476 10 50       163 or return;
477              
478 10 100       35 if (defined $body) {
479 8         14 my $failed;
480             my $completed;
481 8 100       37 my $body_count = (ref $body eq 'ARRAY') ? $#{$body} + 1 : -1;
  3         10  
482             Plack::Util::foreach(
483             $body,
484             sub {
485 10 50   10   842 unless ($failed) {
486 10         16 my $buf = $_[0];
487 10         14 --$body_count;
488 10 100       29 if ( $use_chunked ) {
489 7         21 my $len = length $buf;
490 7 50       18 return unless $len;
491 7         237 $buf = sprintf("%x",$len) . "\015\012" . $buf . "\015\012";
492 7 100       27 if ( $body_count == 0 ) {
493 2         14 $buf .= '0' . "\015\012\015\012";
494 2         3 $completed = 1;
495             }
496             }
497             $self->write_all($conn, $buf, $self->{timeout})
498 10 50       47 or $failed = 1;
499             }
500             },
501 8         145 );
502 8 100 100     3920 $self->write_all($conn, '0' . "\015\012\015\012", $self->{timeout}) if $use_chunked && !$completed;
503             } else {
504             return Plack::Util::inline_object
505             write => sub {
506 5     5   243 my $buf = $_[0];
507 5 50       12 if ( $use_chunked ) {
508 5         9 my $len = length $buf;
509 5 100       17 return unless $len;
510 4         15 $buf = sprintf("%x",$len) . "\015\012" . $buf . "\015\012"
511             }
512             $self->write_all($conn, $buf, $self->{timeout})
513 4         22 },
514             close => sub {
515 2 50   2   44 $self->write_all($conn, '0' . "\015\012\015\012", $self->{timeout}) if $use_chunked;
516 2         65 };
517             }
518             }
519              
520             # returns value returned by $cb, or undef on timeout or network error
521             sub do_io {
522 215     215 0 652 my ($self, $is_write, $sock, $buf, $len, $off, $timeout) = @_;
523 215         345 my $ret;
524 215 100 66     1461 unless ($is_write || delete $self->{_is_deferred_accept}) {
525 9         49 goto DO_SELECT;
526             }
527             DO_READWRITE:
528             # try to do the IO
529 215 100       641 if ($is_write) {
530 109 50       6636 $ret = syswrite $sock, $buf, $len, $off
531             and return $ret;
532             } else {
533 106 100       9230 $ret = sysread $sock, $$buf, $len, $off
534             and return $ret;
535             }
536 11 50 0     109 unless ((! defined($ret)
      33        
537             && ($! == EINTR || $! == EAGAIN || $! == EWOULDBLOCK))) {
538 11         153 return;
539             }
540             # wait for data
541             DO_SELECT:
542 9         13 while (1) {
543 9         15 my ($rfd, $wfd);
544 9         32 my $efd = '';
545 9         77 vec($efd, fileno($sock), 1) = 1;
546 9 50       190 if ($is_write) {
547 0         0 ($rfd, $wfd) = ('', $efd);
548             } else {
549 9         21 ($rfd, $wfd) = ($efd, '');
550             }
551 9         87 my $start_at = time;
552 9         498 my $nfound = select($rfd, $wfd, $efd, $timeout);
553 9         55 $timeout -= (time - $start_at);
554 9 50       41 last if $nfound;
555 0 0       0 return if $timeout <= 0;
556             }
557 9         22 goto DO_READWRITE;
558             }
559              
560             # returns (positive) number of bytes read, or undef if the socket is to be closed
561             sub read_timeout {
562 106     106 0 272 my ($self, $sock, $buf, $len, $off, $timeout) = @_;
563 106         1177 $self->do_io(undef, $sock, $buf, $len, $off, $timeout);
564             }
565              
566             # returns (positive) number of bytes written, or undef if the socket is to be closed
567             sub write_timeout {
568 109     109 0 194 my ($self, $sock, $buf, $len, $off, $timeout) = @_;
569 109         293 $self->do_io(1, $sock, $buf, $len, $off, $timeout);
570             }
571              
572             # writes all data in buf and returns number of bytes written or undef if failed
573             sub write_all {
574 109     109 0 352 my ($self, $sock, $buf, $timeout) = @_;
575 109         183 my $off = 0;
576 109         424 while (my $len = length($buf) - $off) {
577 109 50       899 my $ret = $self->write_timeout($sock, $buf, $len, $off, $timeout)
578             or return;
579 109         473 $off += $ret;
580             }
581 109         449 return length $buf;
582             }
583              
584             1;