File Coverage

blib/lib/Riak/Client.pm
Criterion Covered Total %
statement 47 260 18.0
branch 1 98 1.0
condition 0 37 0.0
subroutine 16 46 34.7
pod 16 17 94.1
total 80 458 17.4


line stmt bran cond sub pod time code
1             #
2             # This file is part of Riak-Client
3             #
4             # This software is copyright (c) 2014 by Damien Krotkine.
5             #
6             # This is free software; you can redistribute it and/or modify it under
7             # the same terms as the Perl 5 programming language system itself.
8             #
9             package Riak::Client;
10             {
11             $Riak::Client::VERSION = '1.95';
12             }
13              
14 1     1   18659 use 5.010;
  1         5  
  1         47  
15 1     1   408 use Riak::Client::PBC;
  1         4  
  1         61  
16 1     1   617 use Type::Params qw(compile);
  1         65878  
  1         8  
17 1     1   195 use Types::Standard -types;
  1         2  
  1         4  
18 1     1   3412 use Errno qw(EINTR);
  1         993  
  1         114  
19 1     1   5 use Scalar::Util qw(blessed);
  1         2  
  1         37  
20 1     1   603 use JSON::XS;
  1         4620  
  1         59  
21 1     1   7 use Carp;
  1         1  
  1         58  
22             $Carp::Internal{ (__PACKAGE__) }++;
23 1     1   521 use Module::Runtime qw(use_module);
  1         1336  
  1         5  
24             require bytes;
25 1     1   517 use Moo;
  1         9528  
  1         6  
26              
27 1     1   1941 use IO::Socket::INET;
  1         16986  
  1         12  
28 1     1   988 use IO::Socket::Timeout;
  1         3497  
  1         9  
29              
30 1     1   41 use Scalar::Util qw(weaken);
  1         2  
  1         65  
31              
32             use constant {
33             # error
34 1         535 ERROR_RESPONSE_CODE => 0,
35             # ping
36             PING_REQUEST_CODE => 1,
37             PING_RESPONSE_CODE => 2,
38             # get, get_raw
39             GET_REQUEST_CODE => 9,
40             GET_RESPONSE_CODE => 10,
41             # put, put_raw
42             PUT_REQUEST_CODE => 11,
43             PUT_RESPONSE_CODE => 12,
44             # del
45             DEL_REQUEST_CODE => 13,
46             DEL_RESPONSE_CODE => 14,
47             # get_buckets
48             GET_BUCKETS_REQUEST_CODE => 15,
49             GET_BUCKETS_RESPONSE_CODE => 16,
50             # get_keys
51             GET_KEYS_REQUEST_CODE => 17,
52             GET_KEYS_RESPONSE_CODE => 18,
53             # get_bucket_props
54             GET_BUCKET_PROPS_REQUEST_CODE => 19,
55             GET_BUCKET_PROPS_RESPONSE_CODE => 20,
56             # set_bucket_props
57             SET_BUCKET_PROPS_REQUEST_CODE => 21,
58             SET_BUCKET_PROPS_RESPONSE_CODE => 22,
59             # map_reducd
60             MAP_REDUCE_REQUEST_CODE => 23,
61             MAP_REDUCE_RESPONSE_CODE => 24,
62             # query_index
63             QUERY_INDEX_REQUEST_CODE => 25,
64             QUERY_INDEX_RESPONSE_CODE => 26,
65 1     1   5 };
  1         1  
66              
67              
68             # ABSTRACT: Fast and lightweight Perl client for Riak
69              
70              
71             has host => ( is => 'ro', isa => Str, required => 1 );
72             has port => ( is => 'ro', isa => Int, required => 1 );
73             has r => ( is => 'ro', isa => Int, default => sub {2} );
74             has w => ( is => 'ro', isa => Int, default => sub {2} );
75             has dw => ( is => 'ro', isa => Int, default => sub {1} );
76             has connection_timeout => ( is => 'ro', isa => Num, default => sub {5} );
77             has read_timeout => ( is => 'ro', predicate => 1, isa => Num, default => sub {5} );
78             has write_timeout => ( is => 'ro', predicate => 1, isa => Num, default => sub {5} );
79             has no_delay => ( is => 'ro', isa => Bool, default => sub {0} );
80              
81              
82             has no_auto_connect => ( is => 'ro', isa => Bool, default => sub {0} );
83              
84             has _on_connect_cb => ( is => 'rw' );
85              
86             has _requests_lock => ( is => 'rw', default => sub { undef });
87              
88             has _socket => ( is => 'ro', lazy => 1, builder => 1 );
89             sub _build__socket {
90 0     0   0 my ($self) = @_;
91              
92 0         0 my $host = $self->host;
93 0         0 my $port = $self->port;
94              
95 0         0 my $socket = IO::Socket::INET->new(
96             PeerHost => $host,
97             PeerPort => $port,
98             Timeout => $self->connection_timeout,
99             );
100              
101 0 0       0 croak "Error ($!), can't connect to $host:$port"
102             unless defined $socket;
103              
104 0 0 0     0 $self->has_read_timeout || $self->has_write_timeout
105             or return $socket;
106              
107             # enable read and write timeouts on the socket
108 0         0 IO::Socket::Timeout->enable_timeouts_on($socket);
109             # setup the timeouts
110 0 0       0 $self->has_read_timeout
111             and $socket->read_timeout($self->read_timeout);
112 0 0       0 $self->has_write_timeout
113             and $socket->write_timeout($self->write_timeout);
114              
115 1     1   6 use Socket qw(IPPROTO_TCP TCP_NODELAY);
  1         1  
  1         3890  
116 0 0       0 $self->no_delay
117             and $socket->setsockopt(IPPROTO_TCP, TCP_NODELAY, 1);
118              
119 0         0 return $socket;
120             }
121              
122             sub BUILD {
123 3     3 0 54 my ($self) = @_;
124 3 50       58 $self->no_auto_connect
125             and return;
126              
127 0           $self->connect();
128             }
129              
130              
131             sub connect {
132 0     0 1   state $check = compile(Any, Optional[CodeRef]);
133 0           my ( $self, $cb ) = $check->(@_);
134              
135             # that will perform connection
136 0           $self->_socket();
137 0 0         if ($cb) {
138 0           $cb->();
139 0           return;
140             } else {
141 0           return 1;
142             }
143              
144             }
145              
146             has _getkeys_accumulator => (is => 'rw', init_arg => undef);
147             has _mapreduce_accumulator => (is => 'rw', init_arg => undef);
148              
149              
150             sub ping {
151 0     0 1   state $check = compile(Any, Optional[CodeRef]);
152 0           my ( $self, $cb ) = $check->(@_);
153 0           $_[0]->_parse_response( {
154             request_code => PING_REQUEST_CODE,
155             expected_code => PING_RESPONSE_CODE,
156             operation_name => 'ping',
157             body_ref => \'',
158             cb => $cb,
159             } );
160             }
161              
162              
163             sub is_alive {
164 0     0 1   state $check = compile(Any, Optional[CodeRef]);
165 0           my ( $self, $cb ) = $check->(@_);
166 0           my $res = eval { $self->ping; 1 };
  0            
  0            
167 0 0         $cb and return $cb->($res);
168 0           return $res;
169             }
170              
171              
172             sub get {
173 0     0 1   state $check = compile(Any, Str, Str, Optional[CodeRef]);
174 0           my ( $self, $bucket, $key, $cb ) = $check->(@_);
175 0           $self->_fetch( $bucket, $key, 1, 0, $cb );
176             }
177              
178              
179             sub get_raw {
180 0     0 1   state $check = compile(Any, Str, Str, Optional[CodeRef]);
181 0           my ( $self, $bucket, $key, $cb ) = $check->(@_);
182 0           $self->_fetch( $bucket, $key, 0, 0, $cb );
183             }
184              
185              
186             #my $LinksStructure = declare as ArrayRef[Dict[bucket => Str, key => Str, tag => Str]];
187             #coerce $LinksStructure, from HashRef[] Num, q{ int($_) };
188              
189             sub put {
190 0 0   0 1   my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
191 0           state $check = compile(Any, Str, Str, Any, Optional[Str],
192             Optional[HashRef[Str]], # indexes
193             Optional[ArrayRef[Dict[bucket => Str, key => Str, tag => Str]]], # links
194             );
195 0           my ( $self, $bucket, $key, $value, $content_type, $indexes, $links ) = $check->(@_);
196              
197 0 0 0       ($content_type //= 'application/json')
198             eq 'application/json'
199             and $value = encode_json($value);
200              
201 0           $self->_store( $bucket, $key, $value, $content_type, $indexes, $links, $cb);
202             }
203              
204              
205              
206             sub put_raw {
207 0 0   0 1   my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
208 0           state $check = compile(Any, Str, Str, Any, Optional[Str],
209             Optional[HashRef[Str]], # indexes
210             Optional[ArrayRef[Dict[bucket => Str, key => Str, tag => Str]]], # links
211             );
212 0           my ( $self, $bucket, $key, $value, $content_type, $indexes, $links ) = $check->(@_);
213              
214 0   0       $content_type ||= 'plain/text';
215 0           $self->_store( $bucket, $key, $value, $content_type, $indexes, $links, $cb);
216             }
217              
218              
219             sub del {
220 0     0 1   state $check = compile(Any, Str, Str, Optional[CodeRef]);
221 0           my ( $self, $bucket, $key, $cb ) = $check->(@_);
222              
223 0           my $body = RpbDelReq->encode(
224             { key => $key,
225             bucket => $bucket,
226             rw => $self->dw
227             }
228             );
229              
230 0           $self->_parse_response( {
231             request_code => DEL_REQUEST_CODE,
232             expected_code => DEL_RESPONSE_CODE,
233             operation_name => 'del',
234             key => $key,
235             bucket => $bucket,
236             body_ref => \$body,
237             cb => $cb,
238             } );
239             }
240              
241              
242             sub get_keys {
243 0     0 1   state $check = compile(Any, Str, Optional[CodeRef]);
244 0           my ( $self, $bucket, $cb ) = $check->(@_);
245              
246             # reset accumulator
247 0           $self->_getkeys_accumulator([]);
248 0           my $body = RpbListKeysReq->encode( { bucket => $bucket } );
249 0           $self->_parse_response( {
250             request_code => GET_KEYS_REQUEST_CODE,
251             expected_code => GET_KEYS_RESPONSE_CODE,
252             operation_name => 'get_keys',
253             key => "*",
254             bucket => $bucket,
255             body_ref => \$body,
256             cb => $cb,
257             handle_response => \&_handle_get_keys_response,
258             lock_requests => 1,
259             } );
260             }
261              
262             sub _handle_get_keys_response {
263 0     0     my ( $self, $encoded_message, $args ) = @_;
264              
265             # TODO: support for 1.4 (which provides 'stream', 'return_terms', and 'stream')
266 0           my $obj = RpbListKeysResp->decode( $encoded_message );
267 0   0       my @keys = @{$obj->keys // []};
  0            
268              
269             # case 1 : no user callback
270 0           my $cb = $args->{cb};
271 0 0         if (! $cb ) {
272             # accumulate results
273 0           push @{$self->_getkeys_accumulator}, @keys;
  0            
274              
275             # if more to come, return by saying so
276 0 0         $obj->done
277             or return (undef, 1);
278              
279             # all results are there, return the whole
280 0           my $keys = $self->_getkeys_accumulator;
281 0           $self->_getkeys_accumulator([]);
282 0           return \$keys;
283             }
284              
285             # case 2 : we have a user callback
286 0           my $last_key;
287 0 0         my $obj_done = $obj->done
288             and $last_key = pop @keys;
289              
290             # no second arg = more to come
291 0           $cb->($_) foreach @keys;
292              
293             # if more to come, return by saying so
294 0 0         $obj->done
295             or return (undef, 1);
296              
297             # process last keys if any
298 0 0         defined $last_key and $cb->($last_key, 1);
299              
300             # means: nothing left to do, all results processed through callback
301 0           return;
302             }
303              
304              
305             sub exists {
306 0     0 1   state $check = compile(Any, Str, Str, Optional[CodeRef]);
307 0           my ( $self, $bucket, $key, $cb ) = $check->(@_);
308 0           $self->_fetch( $bucket, $key, 0, 1, $cb );
309             }
310              
311             sub _fetch {
312 0     0     my ( $self, $bucket, $key, $decode, $test_exist, $cb ) = @_;
313              
314 0           my $body = RpbGetReq->encode(
315             { r => $self->r,
316             key => $key,
317             bucket => $bucket,
318             head => $test_exist
319             }
320             );
321              
322 0           $self->_parse_response( {
323             request_code => GET_REQUEST_CODE,
324             expected_code => GET_RESPONSE_CODE,
325             operation_name => 'get',
326             key => $key,
327             bucket => $bucket,
328             body_ref => \$body,
329             decode => $decode,
330             handle_response => \&_handle_get_response,
331             test_exist => $test_exist,
332             cb => $cb,
333             cb_args => 1,
334             } );
335             }
336              
337             sub _handle_get_response {
338 0     0     my ( $self, $encoded_message, $args ) = @_;
339              
340 0 0         defined $encoded_message
341             or return _die_generic_error( "Undefined Message", 'get', $args );
342              
343 0           my $decoded_message = RpbGetResp->decode($encoded_message);
344 0           my $content = $decoded_message->content;
345              
346             # empty content
347 0 0         ref $content eq 'ARRAY'
348             or return \undef;
349              
350             # if we just need to test existence
351 0 0         $args->{test_exist}
352             and return \1;
353              
354             # TODO: handle metadata
355 0           my $value = $content->[0]->value;
356 0           my $content_type = $content->[0]->content_type;
357              
358             # if we need to decode
359 0 0 0       $args->{decode} && ($content_type // '') eq 'application/json'
      0        
360             and return \decode_json($value);
361              
362             # simply return the value
363 0           return \$value;
364             }
365              
366             sub _store {
367 0     0     my ( $self, $bucket, $key, $encoded_value, $content_type, $indexes, $links, $cb ) = @_;
368              
369 0           my $body = RpbPutReq->encode(
370             { key => $key,
371             bucket => $bucket,
372             content => {
373             value => $encoded_value,
374             content_type => $content_type,
375             ( $indexes ?
376             ( indexes => [
377             map {
378 0 0         { key => $_ , value => $indexes->{$_} }
    0          
379             } keys %$indexes
380             ])
381             : ()
382             ),
383             ( $links ? ( links => $links) : () ),
384             },
385             }
386             );
387              
388 0           $self->_parse_response( {
389             request_code => PUT_REQUEST_CODE,
390             expected_code => PUT_RESPONSE_CODE,
391             operation_name => 'put',
392             key => $key,
393             bucket => $bucket,
394             body_ref => \$body,
395             cb => $cb,
396             } );
397             }
398              
399              
400             sub query_index {
401 0     0 1   state $check = compile(Any, Str, Str, Str|ArrayRef, Optional[CodeRef]);
402 0           my ( $self, $bucket, $index, $value_to_match, $cb ) = $check->(@_);
403              
404 0           my $query_type_is_eq = 0; # eq
405 0 0         ref $value_to_match
406             and $query_type_is_eq = 1; # range
407 0 0         my $body = RpbIndexReq->encode(
408             { index => $index,
409             bucket => $bucket,
410             qtype => $query_type_is_eq,
411             $query_type_is_eq ?
412             ( range_min => $value_to_match->[0],
413             range_max => $value_to_match->[1] )
414             : (key => $value_to_match ),
415             }
416             );
417            
418 0 0         $self->_parse_response( {
419             request_code => QUERY_INDEX_REQUEST_CODE,
420             expected_code => QUERY_INDEX_RESPONSE_CODE,
421             operation_name => 'query_index',
422             $query_type_is_eq ?
423             (key => '2i query on ' . join('...', @$value_to_match) )
424             : (key => $value_to_match ),
425             bucket => $bucket,
426             body_ref => \$body,
427             handle_response => \&_handle_query_index_response,
428             cb => $cb,
429             lock_requests => 1,
430             } );
431             }
432              
433             sub _handle_query_index_response {
434 0     0     my ( $self, $encoded_message, $args ) = @_;
435            
436 0           my $obj = RpbIndexResp->decode( $encoded_message );
437 0   0       my @keys = @{$obj->keys // []};
  0            
438              
439             # case 1 : no user callback
440 0 0         my $cb = $args->{cb}
441             or return \\@keys;
442              
443             # case 2 : we have a user callback
444 0           $cb->($_) foreach @keys;
445              
446             # means: nothing left to do, all results processed through callback
447 0           return;
448              
449             }
450              
451              
452             sub get_buckets {
453 0     0 1   state $check = compile(Any, Optional[CodeRef]);
454 0           my ( $self, $cb ) = $check->(@_);
455              
456 0           $self->_parse_response( {
457             request_code => GET_BUCKETS_REQUEST_CODE,
458             expected_code => GET_BUCKETS_RESPONSE_CODE,
459             operation_name => 'get_buckets',
460             handle_response => \&_handle_get_buckets_response,
461             cb => $cb,
462             } );
463             }
464              
465             sub _handle_get_buckets_response {
466 0     0     my ( $self, $encoded_message, $args ) = @_;
467 0           my $obj = RpbListBucketsResp->decode( $encoded_message );
468 0   0       return \($obj->buckets // []);
469             }
470              
471              
472             sub get_bucket_props {
473 0     0 1   state $check = compile(Any, Str, Optional[CodeRef]);
474 0           my ( $self, $bucket, $cb ) = $check->(@_);
475              
476 0           my $body = RpbGetBucketReq->encode( { bucket => $bucket } );
477 0           $self->_parse_response( {
478             request_code => GET_BUCKET_PROPS_REQUEST_CODE,
479             expected_code => GET_BUCKET_PROPS_RESPONSE_CODE,
480             bucket => $bucket,
481             body_ref => \$body,
482             handle_response => \&_handle_get_bucket_props_response,
483             cb => $cb,
484             } );
485             }
486              
487             sub _handle_get_bucket_props_response {
488 0     0     my ( $self, $encoded_message, $args ) = @_;
489              
490 0           my $obj = RpbListBucketsResp->decode( $encoded_message );
491 0           my $props = RpbBucketProps->decode($obj->buckets->[0]);
492 0           return \{ %$props }; # unblessing variable
493             }
494              
495              
496             sub set_bucket_props {
497 0     0 1   state $check = compile( Any, Str,
498             Dict[ n_val => Optional[Int],
499             allow_mult => Optional[Bool] ],
500             Optional[CodeRef] );
501 0           my ( $self, $bucket, $props, $cb ) = $check->(@_);
502 0 0 0       $props->{n_val} && $props->{n_val} < 0 and croak 'n_val should be possitive integer';
503              
504 0           my $body = RpbSetBucketReq->encode({ bucket => $bucket, props => $props });
505 0           $self->_parse_response( {
506             request_code => SET_BUCKET_PROPS_REQUEST_CODE,
507             expected_code => SET_BUCKET_PROPS_RESPONSE_CODE,
508             bucket => $bucket,
509             body_ref => \$body,
510             } );
511             }
512              
513              
514             sub map_reduce {
515 0     0 1   state $check = compile(Any, Any, Optional[CodeRef]);
516 0           my ( $self, $request, $cb) = $check->(@_);
517              
518 0           my @args;
519            
520 0 0         push @args, ref($request) ? encode_json($request): $request;
521 0           push @args, 'application/json';
522 0 0         push @args, $cb if $cb;
523            
524 0           map_reduce_raw($self, @args);
525              
526             }
527              
528              
529             sub map_reduce_raw {
530 0     0 1   state $check = compile(Any, Str, Str, Optional[CodeRef]);
531 0           my ( $self, $request, $content_type, $cb) = $check->(@_);
532            
533 0           my $body = RpbMapRedReq->encode(
534             {
535             request => $request,
536             content_type => $content_type,
537             }
538             );
539              
540             # reset accumulator
541 0           $self->_mapreduce_accumulator([]);
542              
543 0           $self->_parse_response( {
544             request_code => MAP_REDUCE_REQUEST_CODE,
545             expected_code => MAP_REDUCE_RESPONSE_CODE,
546             operation => 'map_reduce',
547             body_ref => \$body,
548             cb => $cb,
549             decode => ($content_type eq 'application/json'),
550             handle_response => \&_handle_map_reduce_response,
551             lock_requests => 1,
552             } );
553             }
554              
555             sub _handle_map_reduce_response {
556 0     0     my ( $self, $encoded_message, $args ) = @_;
557 0           my $obj = RpbMapRedResp->decode( $encoded_message );
558              
559             # case 1 : no user callback
560 0           my $cb = $args->{cb};
561 0 0         if (! $cb ) {
562              
563             # all results were there, reset the accumulator and return the whole,
564 0 0         if ($obj->done) {
565 0           my $results = $self->_mapreduce_accumulator();
566 0           $self->_mapreduce_accumulator([]);
567 0           return \$results;
568             }
569              
570             # accumulate results
571 0 0 0       push @{$self->_mapreduce_accumulator},
  0            
572             { phase => $obj->phase, response => ($args->{decode}) ? decode_json($obj->response // '[]') : $obj->response };
573              
574             # more stuff to come, say so
575 0           return (undef, 1);
576              
577             }
578              
579             # case 2 : we have a user callback
580              
581             # means: nothing left to do, all results processed through callback
582             $obj->done
583 0 0         and return;
584              
585 0           $cb->($obj->response, $obj->phase, $obj->done);
586              
587             # more stuff to come, say so
588 0           return (undef, 1);
589              
590             }
591              
592             sub _parse_response {
593 0     0     my ( $self, $args ) = @_;
594              
595 0           my $socket = $self->_socket;
596 0   0       _send_bytes($socket, $args->{request_code}, $args->{body_ref} // \'');
597              
598 0           while (1) {
599 0           my $response;
600             # get and check response
601 0 0 0       my $raw_response_ref = _read_response($socket)
602             or return _die_generic_error( $! || "Socket Closed", $args);
603              
604 0           my ( $response_code, $response_body ) = unpack( 'c a*', $$raw_response_ref );
605              
606             # in case of error msg
607 0 0         if ($response_code == ERROR_RESPONSE_CODE) {
608 0           my $decoded_message = RpbErrorResp->decode($response_body);
609 0           my $errmsg = $decoded_message->errmsg;
610 0           my $errcode = $decoded_message->errcode;
611              
612 0           return _die_generic_error( "Riak Error (code: $errcode) '$errmsg'", $args);
613             }
614              
615              
616             # check if we have what we want
617 0 0         $response_code != $args->{expected_code}
618             and return _die_generic_error(
619             "Unexpected Response Code in (got: $response_code, expected: $args->{expected_code})",
620             $args );
621            
622             # default value if we don't need to handle the response.
623 0           my ($ret, $more_to_come) = ( \1, undef);
624              
625             # handle the response.
626 0 0         if (my $handle_response = $args->{handle_response}) {
627 0           ($ret, $more_to_come) = $handle_response->( $self, $response_body, $args);
628             }
629              
630             # it's a multiple response request, loop again
631             $more_to_come
632 0 0         and next;
633              
634             # there is a result, process or return it
635 0 0         if ($ret) {
636 0 0         $args->{cb} and return $args->{cb}->($$ret);
637 0           return $$ret;
638             }
639              
640             # ret was undef, means we have processed everything in the callback
641 0           return;
642              
643             }
644             }
645              
646             sub _die_generic_error {
647 0     0     my ( $error, $args ) = @_;
648              
649 0   0       my ($operation_name, $bucket, $key) =
650 0           map { $args->{$_} // "" } ( qw( operation_name bucket key) );
651              
652 0           my $extra = '';
653 0 0 0       defined $bucket && defined $key
654             and $extra = "(bucket: $bucket, key: $key) ";
655              
656 0           my $msg = "Error in '$operation_name' $extra: $error";
657 0 0         if ( my $cb = $args->{cb} ) {
    0          
658 0   0       $cb->((undef) x ($args->{cb_nb_args} // 0), $msg);
659 0           return;
660             } elsif (my $cv = $args->{cv}) {
661 0           $cv->croak($msg);
662             } else {
663 0           croak $msg;
664             }
665 0           return;
666             }
667              
668             sub _read_response {
669 0     0     my ($socket) = @_;
670 0   0       _read_bytes($socket, unpack( 'N', ${ _read_bytes($socket, 4) // return } ));
  0            
671             }
672              
673             sub _read_bytes {
674 0     0     my ( $socket, $length ) = @_;
675              
676 0           my $buffer;
677 0           my $offset = 0;
678 0           my $read = 0;
679              
680 0           while ($length > 0) {
681 0           $read = $socket->sysread( $buffer, $length, $offset );
682 0 0         if (! defined $read) {
683 0 0         $! == EINTR
684             and next;
685 0           return;
686             }
687              
688 0 0         $read > 0
689             or return;
690              
691 0           $offset += $read;
692 0           $length -= $read;
693             }
694              
695 0           return \$buffer;
696             }
697              
698              
699             sub _send_bytes {
700 0     0     my ( $socket, $request_code, $body_ref ) = @_;
701              
702 0           my $bytes = pack('N', my $length = (bytes::length($$body_ref) + 1)) . pack('c', $request_code) . $$body_ref;
703              
704 0           $length += 4;
705 0           my $offset = 0;
706 0           my $sent = 0;
707              
708 0           while ($length > 0) {
709 0           $sent = $socket->syswrite( $bytes, $length, $offset );
710 0 0         if (! defined $sent) {
711 0 0         $! == EINTR
712             and next;
713 0           return;
714             }
715              
716 0 0         $sent > 0
717             or return;
718              
719 0           $offset += $sent;
720 0           $length -= $sent;
721             }
722              
723 0           return $offset;
724             }
725              
726              
727              
728             1;
729              
730             __END__