File Coverage

blib/lib/Starlet/Server.pm
Criterion Covered Total %
statement 313 343 91.2
branch 116 180 64.4
condition 48 81 59.2
subroutine 41 42 97.6
pod 0 9 0.0
total 518 655 79.0


line stmt bran cond sub pod time code
1             package Starlet::Server;
2 112     112   630 use strict;
  112         196  
  112         2995  
3 112     112   415 use warnings;
  112         146  
  112         2549  
4              
5 112     112   478 use Carp ();
  112         108  
  112         1965  
6 112     112   49908 use Plack;
  112         11923  
  112         3545  
7 112     112   45877 use Plack::HTTPParser qw( parse_http_request );
  112         255047  
  112         7406  
8 112     112   660 use IO::Socket::INET;
  112         156  
  112         2175  
9 112     112   109755 use HTTP::Date;
  112         333998  
  112         8056  
10 112     112   13288 use HTTP::Status;
  112         86051  
  112         34796  
11 112     112   594 use List::Util qw(max sum);
  112         132  
  112         6035  
12 112     112   507 use Plack::Util;
  112         224  
  112         2268  
13 112     112   48584 use Plack::TempBuffer;
  112         559354  
  112         3033  
14 112     112   619 use POSIX qw(EINTR EAGAIN EWOULDBLOCK);
  112         163  
  112         757  
15 112     112   9866 use Socket qw(IPPROTO_TCP TCP_NODELAY);
  112         157  
  112         8352  
16 112     112   1708 use File::Temp qw(tempfile);
  112         12037  
  112         12746  
17 112     112   451 use Fcntl qw(:flock);
  112         2641  
  112         11218  
18              
19 112     112   474 use Try::Tiny;
  112         106  
  112         5260  
20 112     112   421 use Time::HiRes qw(time);
  112         134  
  112         1720  
21              
22 112     112   24548 use constant MAX_REQUEST_SIZE => 131072;
  112         139  
  112         7634  
23 112     112   430 use constant CHUNKSIZE => 64 * 1024;
  112         141  
  112         5473  
24 112     112   387 use constant MSWin32 => $^O eq 'MSWin32';
  112         132  
  112         161761  
25              
26 112     112   554 my $null_io = do { open my $io, "<", \""; $io };
  112         115  
  112         2446  
27              
28             sub new {
29 112     112 0 321 my($class, %args) = @_;
30              
31             my $self = bless {
32             listens => $args{listens} || [],
33             host => $args{host} || 0,
34             port => $args{port} || $args{socket} || 8080,
35             timeout => $args{timeout} || 300,
36             keepalive_timeout => $args{keepalive_timeout} || 2,
37             max_keepalive_reqs => $args{max_keepalive_reqs} || 1,
38             server_software => $args{server_software} || $class,
39       108     server_ready => $args{server_ready} || sub {},
40             min_reqs_per_child => (
41             defined $args{min_reqs_per_child}
42             ? $args{min_reqs_per_child} : undef,
43             ),
44             max_reqs_per_child => (
45             $args{max_reqs_per_child} || $args{max_requests} || 100,
46             ),
47             spawn_interval => $args{spawn_interval} || 0,
48             err_respawn_interval => (
49             defined $args{err_respawn_interval}
50             ? $args{err_respawn_interval} : undef,
51             ),
52             is_multiprocess => Plack::Util::FALSE,
53       82     child_exit => $args{child_exit} || sub {},
54 112 100 100     5969 _using_defer_accept => undef,
    50 100        
      100        
      50        
      50        
      50        
      33        
      100        
      50        
      50        
      100        
55             }, $class;
56              
57 112 50 33     542 if ($args{max_workers} && $args{max_workers} > 1) {
58 0         0 Carp::carp(
59             "Preforking in $class is deprecated. Falling back to the non-forking mode. ",
60             "If you need preforking, use Starman or Starlet instead and run like `plackup -s Starlet`",
61             );
62             }
63              
64 112         365 $self;
65             }
66              
67             sub run {
68 0     0 0 0 my($self, $app) = @_;
69 0         0 $self->setup_listener();
70 0         0 $self->accept_loop($app);
71             }
72              
73             sub setup_listener {
74 111     111 0 171 my $self = shift;
75 111 100       150 if (scalar(grep {defined $_} @{$self->{listens}}) == 0) {
  72         72  
  111         541  
76 102         136 my $sock;
77 102 100       766 if ($self->{port} =~ /^[0-9]+$/s) {
78             $sock = IO::Socket::INET->new(
79             Listen => SOMAXCONN,
80             LocalPort => $self->{port},
81             LocalAddr => $self->{host},
82 96 50       1386 Proto => 'tcp',
83             ReuseAddr => 1,
84             ) or die "failed to listen to port $self->{port}:$!";
85             } else {
86             $sock = IO::Socket::UNIX->new(
87             Listen => SOMAXCONN,
88             Local => $self->{port},
89 6 50       102 ) or die "failed to listen to socket $self->{port}:$!";
90             }
91             $self->{listens}[fileno($sock)] = {
92             host => $self->{host},
93             port => $self->{port},
94 102         37246 sock => $sock,
95             };
96             }
97              
98 111         163 my @listens = grep {defined $_} @{$self->{listens}};
  492         750  
  111         239  
99 111         207 for my $listen (@listens) {
100 129         1210 my $family = Socket::sockaddr_family(getsockname($listen->{sock}));
101 129         290 $listen->{_is_tcp} = $family != AF_UNIX;
102              
103             # set defer accept
104 129 100 33     771 if ($^O eq 'linux' && $listen->{_is_tcp}) {
105             setsockopt($listen->{sock}, IPPROTO_TCP, 9, 1)
106 105 50       660 and $listen->{_using_defer_accept} = 1;
107             }
108             }
109              
110 111 100       309 if (scalar(@listens) > 1) {
111 3   33     9 $self->{lock_path} ||= do {
112 3         117 my ($fh, $lock_path) = tempfile(UNLINK => 1);
113             # closing the file handle explicitly for two reasons
114             # 1) tempfile retains the handle when UNLINK is set
115             # 2) tempfile implicitely locks the file on OS X
116 3         2169 close $fh;
117 3         15 $lock_path;
118             };
119             }
120              
121 111         351 $self->{server_ready}->($self);
122             }
123              
124             sub accept_loop {
125             # TODO handle $max_reqs_per_child
126 98     98 0 573 my($self, $app, $max_reqs_per_child) = @_;
127 98         437 my $proc_req_count = 0;
128 98         330 my $is_keepalive = 0;
129              
130             local $SIG{TERM} = sub {
131 83     83   34299413 $self->{term_received} = 1;
132 98         4329 };
133 98         1015 local $SIG{PIPE} = 'IGNORE';
134              
135 98         1691 my $acceptor = $self->_get_acceptor;
136              
137 98   33     2103 while (! defined $max_reqs_per_child || $proc_req_count < $max_reqs_per_child) {
138             # accept (or exit on SIGTERM)
139 268 100       11502 if ($self->{term_received}) {
140 83         943 $self->{child_exit}->($self, $app);
141 83         20642 exit 0;
142             }
143 185         768 my ($conn, $peer, $listen) = $acceptor->();
144 185 100       1587 next unless $conn;
145              
146 102         566 $self->{_is_deferred_accept} = $listen->{_using_defer_accept};
147 102 50       1848 defined($conn->blocking(0))
148             or die "failed to set socket to nonblocking mode:$!";
149 102         2877 my ($peerport, $peerhost, $peeraddr) = (0, undef, undef);
150 102 100       637 if ($listen->{_is_tcp}) {
151 96 50       1521 $conn->setsockopt(IPPROTO_TCP, TCP_NODELAY, 1)
152             or die "setsockopt(TCP_NODELAY) failed:$!";
153 96         1734 ($peerport, $peerhost) = unpack_sockaddr_in $peer;
154 96         1005 $peeraddr = inet_ntoa($peerhost);
155             }
156 102         231 my $req_count = 0;
157 102         320 my $pipelined_buf = '';
158              
159 102         173 while (1) {
160 102         205 ++$req_count;
161 102         228 ++$proc_req_count;
162             my $env = {
163             SERVER_PORT => $listen->{port} || 0,
164             SERVER_NAME => $listen->{host} || 0,
165             SCRIPT_NAME => '',
166             REMOTE_ADDR => $peeraddr,
167             REMOTE_PORT => $peerport,
168             'psgi.version' => [ 1, 1 ],
169             'psgi.errors' => *STDERR,
170             'psgi.url_scheme' => 'http',
171             'psgi.run_once' => Plack::Util::FALSE,
172             'psgi.multithread' => Plack::Util::FALSE,
173             'psgi.multiprocess' => $self->{is_multiprocess},
174 102   50     3852 'psgi.streaming' => Plack::Util::TRUE,
      100        
175             'psgi.nonblocking' => Plack::Util::FALSE,
176             'psgix.input.buffered' => Plack::Util::TRUE,
177             'psgix.io' => $conn,
178             'psgix.harakiri' => 1,
179             };
180              
181 102         285 my $may_keepalive = $req_count < $self->{max_keepalive_reqs};
182 102 0 33     420 if ($may_keepalive && $max_reqs_per_child && $proc_req_count >= $max_reqs_per_child) {
      33        
183 0         0 $may_keepalive = undef;
184             }
185 102 50       414 $may_keepalive = 1 if length $pipelined_buf;
186 102         160 my $keepalive;
187 102         993 ($keepalive, $pipelined_buf) = $self->handle_connection($env, $conn, $app,
188             $may_keepalive, $req_count != 1, $pipelined_buf);
189              
190 101 100       481 if ($env->{'psgix.harakiri.commit'}) {
191 14         217 $conn->close;
192 14         1850 return;
193             }
194 87 50       676 last unless $keepalive;
195             # TODO add special cases for clients with broken keep-alive support, as well as disabling keep-alive for HTTP/1.0 proxies
196             }
197 87         682 $conn->close;
198             }
199             }
200              
201             sub _get_acceptor {
202 98     98   440 my $self = shift;
203 98         360 my @listens = grep {defined $_} @{$self->{listens}};
  428         1111  
  98         1142  
204              
205 98 100       657 if (scalar(@listens) == 1) {
206 96         333 my $listen = $listens[0];
207             return sub {
208 176 100   176   4667 if (my ($conn, $peer) = $listen->{sock}->accept) {
209 95         9825385 return ($conn, $peer, $listen);
210             }
211 81         399 return +();
212 96         1329 };
213             }
214             else {
215             # wait for multiple sockets with select(2)
216 2         14 my @fds;
217 2         13 my $rin = '';
218 2         13 for my $listen (@listens) {
219 14 50       182 defined($listen->{sock}->blocking(0))
220             or die "failed to set listening socket to non-blocking mode:$!";
221 14         321 my $fd = fileno($listen->{sock});
222 14         23 push @fds, $fd;
223 14         76 vec($rin, $fd, 1) = 1;
224             }
225              
226             open(my $lock_fh, '>', $self->{lock_path})
227 2 50       250 or die "failed to open lock file:@{[$self->{lock_path}]}:$!";
  0         0  
228              
229             return sub {
230 9 100   9   969339 if (! flock($lock_fh, LOCK_EX)) {
231 1 50       57 die "failed to lock file:@{[$self->{lock_path}]}:$!"
  0         0  
232             if $! != EINTR;
233 1         3 return +();
234             }
235 8         971441 my $nfound = select(my $rout = $rin, undef, undef, undef);
236 8         51 for (my $i = 0; $nfound > 0; ++$i) {
237 28         28 my $fd = $fds[$i];
238 28 100       65 next unless vec($rout, $fd, 1);
239 7         9 --$nfound;
240 7         20 my $listen = $self->{listens}[$fd];
241 7 50       113 if (my ($conn, $peer) = $listen->{sock}->accept) {
242 7         1219 flock($lock_fh, LOCK_UN);
243 7         24 return ($conn, $peer, $listen);
244             }
245             }
246 1         5 flock($lock_fh, LOCK_UN);
247 1         3 return +();
248 2         45 };
249             }
250             }
251              
252             my $bad_response = [ 400, [ 'Content-Type' => 'text/plain', 'Connection' => 'close' ], [ 'Bad Request' ] ];
253             sub handle_connection {
254 102     102 0 269 my($self, $env, $conn, $app, $use_keepalive, $is_keepalive, $prebuf) = @_;
255            
256 102         376 my $buf = '';
257 102         279 my $pipelined_buf='';
258 102         191 my $res = $bad_response;
259            
260 102         197 while (1) {
261 102         156 my $rlen;
262 102 50       742 if ( $rlen = length $prebuf ) {
263 0         0 $buf = $prebuf;
264 0         0 undef $prebuf;
265             }
266             else {
267             $rlen = $self->read_timeout(
268             $conn, \$buf, MAX_REQUEST_SIZE - length($buf), length($buf),
269             $is_keepalive ? $self->{keepalive_timeout} : $self->{timeout},
270 102 50       1222 ) or return;
    100          
271             }
272 92         1046 my $reqlen = parse_http_request($buf, $env);
273 92 100       19941 if ($reqlen >= 0) {
274             # handle request
275 91         208 my $protocol = $env->{SERVER_PROTOCOL};
276 91 50       268 if ($use_keepalive) {
277 0 0       0 if ($self->{term_received}) {
    0          
278 0         0 $use_keepalive = undef;
279             } elsif ( $protocol eq 'HTTP/1.1' ) {
280 0 0       0 if (my $c = $env->{HTTP_CONNECTION}) {
281 0 0       0 $use_keepalive = undef
282             if $c =~ /^\s*close\s*/i;
283             }
284             } else {
285 0 0       0 if (my $c = $env->{HTTP_CONNECTION}) {
286 0 0       0 $use_keepalive = undef
287             unless $c =~ /^\s*keep-alive\s*/i;
288             } else {
289 0         0 $use_keepalive = undef;
290             }
291             }
292             }
293 91         284 $buf = substr $buf, $reqlen;
294 112     112   607 my $chunked = do { no warnings; lc delete $env->{HTTP_TRANSFER_ENCODING} eq 'chunked' };
  112         220  
  112         175068  
  91         130  
  91         247  
295 91 100       396 if (my $cl = $env->{CONTENT_LENGTH}) {
    100          
296 5         75 my $buffer = Plack::TempBuffer->new($cl);
297 5         270 while ($cl > 0) {
298 5         10 my $chunk;
299 5 100       12 if (length $buf) {
300 4         5 $chunk = $buf;
301 4         5 $buf = '';
302             } else {
303             $self->read_timeout(
304             $conn, \$chunk, $cl, 0, $self->{timeout})
305 1 50       3 or return;
306             }
307 4         15 $buffer->print($chunk);
308 4         152 $cl -= length $chunk;
309             }
310 4         16 $env->{'psgi.input'} = $buffer->rewind;
311             }
312             elsif ($chunked) {
313 1         21 my $buffer = Plack::TempBuffer->new;
314 1         121 my $chunk_buffer = '';
315 1         1 my $length;
316 1         2 DECHUNK: while(1) {
317 49         29 my $chunk;
318 49 100       60 if ( length $buf ) {
319 1         1 $chunk = $buf;
320 1         5 $buf = '';
321             }
322             else {
323             $self->read_timeout($conn, \$chunk, CHUNKSIZE, 0, $self->{timeout})
324 48 50       86 or return;
325             }
326              
327 49         114 $chunk_buffer .= $chunk;
328 49         305 while ( $chunk_buffer =~ s/^(([0-9a-fA-F]+).*\015\012)// ) {
329 79         224 my $trailer = $1;
330 79         89 my $chunk_len = hex $2;
331 79 100       161 if ($chunk_len == 0) {
    50          
332 1         2 last DECHUNK;
333             } elsif (length $chunk_buffer < $chunk_len + 2) {
334 0         0 $chunk_buffer = $trailer . $chunk_buffer;
335 0         0 last;
336             }
337 78         248 $buffer->print(substr $chunk_buffer, 0, $chunk_len, '');
338 78         893 $chunk_buffer =~ s/^\015\012//;
339 78         193 $length += $chunk_len;
340             }
341             }
342 1         6 $env->{CONTENT_LENGTH} = $length;
343 1         4 $env->{'psgi.input'} = $buffer->rewind;
344             } else {
345 85 50       275 if ( $buf =~ m!^(?:GET|HEAD)! ) { #pipeline
346 0         0 $pipelined_buf = $buf;
347 0         0 $use_keepalive = 1; #force keepalive
348             } # else clear buffer
349 85         335 $env->{'psgi.input'} = $null_io;
350             }
351              
352 90 50       541 if ( $env->{HTTP_EXPECT} ) {
353 0 0       0 if ( $env->{HTTP_EXPECT} eq '100-continue' ) {
354 0 0       0 $self->write_all($conn, "HTTP/1.1 100 Continue\015\012\015\012")
355             or return;
356             } else {
357 0         0 $res = [417,[ 'Content-Type' => 'text/plain', 'Connection' => 'close' ], [ 'Expectation Failed' ] ];
358 0         0 last;
359             }
360             }
361              
362 90         1036 $res = Plack::Util::run_app $app, $env;
363 89         10491 last;
364             }
365 1 50       6 if ($reqlen == -2) {
    50          
366             # request is incomplete, do nothing
367             } elsif ($reqlen == -1) {
368             # error, close conn
369 1         4 last;
370             }
371             }
372              
373 90 100       703 if (ref $res eq 'ARRAY') {
    50          
374 87         688 $self->_handle_response($env->{SERVER_PROTOCOL}, $res, $conn, \$use_keepalive);
375             } elsif (ref $res eq 'CODE') {
376             $res->(sub {
377 3     3   128 $self->_handle_response($env->{SERVER_PROTOCOL}, $_[0], $conn, \$use_keepalive);
378 3         32 });
379             } else {
380 0         0 die "Bad response $res";
381             }
382            
383 90         492 return ($use_keepalive, $pipelined_buf);
384             }
385              
386             sub _handle_response {
387 90     90   166 my($self, $protocol, $res, $conn, $use_keepalive_r) = @_;
388 90         152 my $status_code = $res->[0];
389 90         125 my $headers = $res->[1];
390 90         136 my $body = $res->[2];
391            
392 90         116 my @lines;
393             my %send_headers;
394 90         398 for (my $i = 0; $i < @$headers; $i += 2) {
395 105         163 my $k = $headers->[$i];
396 105         210 my $v = $headers->[$i + 1];
397 105         187 my $lck = lc $k;
398 105 100       233 if ($lck eq 'connection') {
399 1 50 33     7 $$use_keepalive_r = undef
400             if $$use_keepalive_r && lc $v ne 'keep-alive';
401             } else {
402 104         269 push @lines, "$k: $v\015\012";
403 104         605 $send_headers{$lck} = $v;
404             }
405             }
406 90 100       282 if ( ! exists $send_headers{server} ) {
407 89         317 unshift @lines, "Server: $self->{server_software}\015\012";
408             }
409 90 50       596 if ( ! exists $send_headers{date} ) {
410 90         128 unshift @lines, "Date: @{[HTTP::Date::time2str()]}\015\012";
  90         692  
411             }
412              
413             # try to set content-length when keepalive can be used, or disable it
414 90         2158 my $use_chunked;
415 90 100 100     810 if (defined($protocol) && $protocol eq 'HTTP/1.1') {
416 79 100 66     1069 if (defined $send_headers{'content-length'}
    100          
417             || defined $send_headers{'transfer-encoding'}) {
418             # ok
419             } elsif (!Plack::Util::status_with_no_entity_body($status_code)) {
420 74         651 push @lines, "Transfer-Encoding: chunked\015\012";
421 74         112 $use_chunked = 1;
422             }
423 79 50       373 push @lines, "Connection: close\015\012" unless $$use_keepalive_r;
424             } else {
425             # HTTP/1.0
426 11 50       27 if ($$use_keepalive_r) {
427 0 0 0     0 if (defined $send_headers{'content-length'}
    0 0        
428             || defined $send_headers{'transfer-encoding'}) {
429             # ok
430             } elsif (!Plack::Util::status_with_no_entity_body($status_code)
431             && defined(my $cl = Plack::Util::content_length($body))) {
432 0         0 push @lines, "Content-Length: $cl\015\012";
433             } else {
434 0         0 $$use_keepalive_r = undef;
435             }
436             }
437 11 50       25 push @lines, "Connection: keep-alive\015\012" if $$use_keepalive_r;
438 11 50       45 push @lines, "Connection: close\015\012" if !$$use_keepalive_r; #fmm..
439             }
440              
441 90         212 unshift @lines, "HTTP/1.1 $status_code @{[ HTTP::Status::status_message($status_code) ]}\015\012";
  90         1040  
442 90         1096 push @lines, "\015\012";
443            
444 90 100 100     1253 if (defined $body && ref $body eq 'ARRAY' && @$body == 1
      100        
      100        
445             && length $body->[0] < 8192) {
446             # combine response header and small request body
447 80         368 my $buf = $body->[0];
448 80 100       178 if ($use_chunked ) {
449 68         110 my $len = length $buf;
450 68         302 $buf = sprintf("%x",$len) . "\015\012" . $buf . "\015\012" . '0' . "\015\012\015\012";
451             }
452             $self->write_all(
453             $conn, join('', @lines, $buf), $self->{timeout},
454 80         563 );
455 80         1225 return;
456             }
457             $self->write_all($conn, join('', @lines), $self->{timeout})
458 10 50       74 or return;
459              
460 10 100       36 if (defined $body) {
461 8         12 my $failed;
462             my $completed;
463 8 100       25 my $body_count = (ref $body eq 'ARRAY') ? $#{$body} + 1 : -1;
  3         6  
464             Plack::Util::foreach(
465             $body,
466             sub {
467 10 50   10   618 unless ($failed) {
468 10         11 my $buf = $_[0];
469 10         24 --$body_count;
470 10 100       22 if ( $use_chunked ) {
471 7         7 my $len = length $buf;
472 7 50       13 return unless $len;
473 7         153 $buf = sprintf("%x",$len) . "\015\012" . $buf . "\015\012";
474 7 100       21 if ( $body_count == 0 ) {
475 2         5 $buf .= '0' . "\015\012\015\012";
476 2         3 $completed = 1;
477             }
478             }
479             $self->write_all($conn, $buf, $self->{timeout})
480 10 50       33 or $failed = 1;
481             }
482             },
483 8         104 );
484 8 100 100     2670 $self->write_all($conn, '0' . "\015\012\015\012", $self->{timeout}) if $use_chunked && !$completed;
485             } else {
486             return Plack::Util::inline_object
487             write => sub {
488 5     5   204 my $buf = $_[0];
489 5 50       11 if ( $use_chunked ) {
490 5         4 my $len = length $buf;
491 5 100       11 return unless $len;
492 4         14 $buf = sprintf("%x",$len) . "\015\012" . $buf . "\015\012"
493             }
494             $self->write_all($conn, $buf, $self->{timeout})
495 4         8 },
496             close => sub {
497 2 50   2   37 $self->write_all($conn, '0' . "\015\012\015\012", $self->{timeout}) if $use_chunked;
498 2         46 };
499             }
500             }
501              
502             # returns value returned by $cb, or undef on timeout or network error
503             sub do_io {
504 259     259 0 535 my ($self, $is_write, $sock, $buf, $len, $off, $timeout) = @_;
505 259         274 my $ret;
506 259 100 66     1164 unless ($is_write || delete $self->{_is_deferred_accept}) {
507 55         90 goto DO_SELECT;
508             }
509             DO_READWRITE:
510             # try to do the IO
511 260 100       4378 if ($is_write) {
512 108 50       8010 $ret = syswrite $sock, $buf, $len, $off
513             and return $ret;
514             } else {
515 152 100       2551 $ret = sysread $sock, $$buf, $len, $off
516             and return $ret;
517             }
518 12 100 33     109 unless ((! defined($ret)
      66        
519             && ($! == EINTR || $! == EAGAIN || $! == EWOULDBLOCK))) {
520 11         123 return;
521             }
522             # wait for data
523             DO_SELECT:
524 56         50 while (1) {
525 56         42 my ($rfd, $wfd);
526 56         57 my $efd = '';
527 56         238 vec($efd, fileno($sock), 1) = 1;
528 56 50       95 if ($is_write) {
529 0         0 ($rfd, $wfd) = ('', $efd);
530             } else {
531 56         72 ($rfd, $wfd) = ($efd, '');
532             }
533 56         111 my $start_at = time;
534 56         706232 my $nfound = select($rfd, $wfd, $efd, $timeout);
535 56         125 $timeout -= (time - $start_at);
536 56 50       132 last if $nfound;
537 0 0       0 return if $timeout <= 0;
538             }
539 56         62 goto DO_READWRITE;
540             }
541              
542             # returns (positive) number of bytes read, or undef if the socket is to be closed
543             sub read_timeout {
544 151     151 0 284 my ($self, $sock, $buf, $len, $off, $timeout) = @_;
545 151         1352 $self->do_io(undef, $sock, $buf, $len, $off, $timeout);
546             }
547              
548             # returns (positive) number of bytes written, or undef if the socket is to be closed
549             sub write_timeout {
550 108     108 0 176 my ($self, $sock, $buf, $len, $off, $timeout) = @_;
551 108         280 $self->do_io(1, $sock, $buf, $len, $off, $timeout);
552             }
553              
554             # writes all data in buf and returns number of bytes written or undef if failed
555             sub write_all {
556 108     108 0 217 my ($self, $sock, $buf, $timeout) = @_;
557 108         146 my $off = 0;
558 108         326 while (my $len = length($buf) - $off) {
559 108 50       652 my $ret = $self->write_timeout($sock, $buf, $len, $off, $timeout)
560             or return;
561 108         345 $off += $ret;
562             }
563 108         304 return length $buf;
564             }
565              
566             1;