File Coverage

blib/lib/Clutch/Utils.pm
Criterion Covered Total %
statement 19 21 90.4
branch n/a
condition n/a
subroutine 7 7 100.0
pod n/a
total 26 28 92.8


line stmt bran cond sub pod time code
1             package # hide
2             Clutch::Utils;
3 5     5   24 use strict;
  5         10  
  5         138  
4 5     5   23 use warnings;
  5         9  
  5         122  
5 5     5   24 use parent qw(Exporter);
  5         8  
  5         40  
6 5     5   301 use IO::Socket::INET;
  5         10  
  5         37  
7 5     5   3579 use POSIX qw(EINTR EAGAIN EWOULDBLOCK);
  5         10  
  5         41  
8 5     5   2388 use Socket qw(IPPROTO_TCP TCP_NODELAY);
  5         8  
  5         1152  
9 5     5   8539 use JSON::XS ();
  0            
  0            
10              
11             our $CRLF = "\x0d\x0a";
12             our $DELIMITER = "\x20";
13             our $MAX_REQUEST_SIZE = 131072;
14              
15             our @EXPORT = qw($CRLF $DELIMITER $MAX_REQUEST_SIZE);
16              
17             our %CMD2NO = (
18             'request' => 1,
19             'request_background' => 1,
20             'request_multi' => 1,
21             );
22             our $JSON;
23              
24             sub new_client {
25             my $address = shift;
26              
27             my $sock = IO::Socket::INET->new(
28             PeerAddr => $address,
29             Proto => 'tcp',
30             ) or die "Cannot open client socket: $!";
31              
32             setsockopt($sock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
33             $sock->autoflush(1);
34             $sock;
35             }
36              
37             sub json {
38             $JSON ||= JSON::XS->new->allow_nonref;
39             }
40              
41             sub support_cmd {
42             $CMD2NO{+shift};
43             }
44              
45             sub make_request {
46             my ($cmd_name, $function, $args) = @_;
47             join($DELIMITER, $cmd_name, $function, json->encode($args)) . $CRLF;
48             }
49              
50             sub make_response {
51             my $res = shift;
52             json->encode($res) . $CRLF;
53             }
54              
55             sub verify_buffer {
56             my $buf = shift;
57             my $rv = ($buf =~ /$CRLF$/o ? 1 : 0);
58             if ($rv) {
59             $buf =~ s/$CRLF$//o;
60             return 1;
61             } else {
62             return;
63             }
64             }
65              
66             sub parse_read_buffer {
67             my ($buf, $ret) = @_;
68              
69             if ( verify_buffer($buf) ) {
70             ($ret->{cmd}, $ret->{function}, $ret->{args}) = split /$DELIMITER+/o, $buf;
71             $ret->{args} ||= '';
72             $ret->{args} = json->decode($ret->{args});
73             return 1;
74             }
75              
76             return 0;
77             }
78              
79             # returns (positive) number of bytes read, or undef if the socket is to be closed
80             sub read_timeout {
81             my ($sock, $buf, $len, $off, $timeout, $self) = @_;
82             do_io(undef, $sock, $buf, $len, $off, $timeout, $self);
83             }
84              
85             # returns (positive) number of bytes written, or undef if the socket is to be closed
86             sub write_timeout {
87             my ($sock, $buf, $len, $off, $timeout, $self) = @_;
88             do_io(1, $sock, $buf, $len, $off, $timeout, $self);
89             }
90              
91             # writes all data in buf and returns number of bytes written or undef if failed
92             sub write_all {
93             my ($sock, $buf, $timeout, $self) = @_;
94             my $off = 0;
95             while (my $len = length($buf) - $off) {
96             my $ret = write_timeout($sock, $buf, $len, $off, $timeout, $self)
97             or return;
98             $off += $ret;
99             }
100             return length $buf;
101             }
102              
103             # returns value returned by $cb, or undef on timeout or network error
104             sub do_io {
105             my ($is_write, $sock, $buf, $len, $off, $timeout, $self) = @_;
106             my $ret;
107             unless ($is_write || delete $self->{_is_deferred_accept}) {
108             goto DO_SELECT;
109             }
110             DO_READWRITE:
111             # try to do the IO
112             if ($is_write) {
113             $ret = syswrite $sock, $buf, $len, $off
114             and return $ret;
115             } else {
116             $ret = sysread $sock, $$buf, $len, $off
117             and return $ret;
118             }
119             unless ((! defined($ret)
120             && ($! == EINTR || $! == EAGAIN || $! == EWOULDBLOCK))) {
121             return;
122             }
123             # wait for data
124             DO_SELECT:
125             while (1) {
126             my ($rfd, $wfd);
127             my $efd = '';
128             vec($efd, fileno($sock), 1) = 1;
129             if ($is_write) {
130             ($rfd, $wfd) = ('', $efd);
131             } else {
132             ($rfd, $wfd) = ($efd, '');
133             }
134             my $start_at = time;
135             my $nfound = select($rfd, $wfd, $efd, $timeout);
136             $timeout -= (time - $start_at);
137             last if $nfound;
138             return if $timeout <= 0;
139             }
140             goto DO_READWRITE;
141             }
142              
143             1;
144