File Coverage

blib/lib/Net/MPRPC/Client.pm
Criterion Covered Total %
statement 33 98 33.6
branch 0 36 0.0
condition 0 14 0.0
subroutine 11 20 55.0
pod 5 5 100.0
total 49 173 28.3


line stmt bran cond sub pod time code
1             package Net::MPRPC::Client;
2 2     2   189721 use strict;
  2         5  
  2         72  
3 2     2   11 use warnings;
  2         4  
  2         219  
4              
5             our $VERSION = '0.02';
6              
7 2     2   2907 use IO::Select;
  2         4769  
  2         102  
8 2     2   1189 use IO::Socket::INET;
  2         31062  
  2         19  
9              
10             our $_HAVE_UNIX_SOCKET = 1;
11 2     2   11 eval q[use IO::Socket::INET; 1];
  2         3  
  2         18  
12             if ($@) { $_HAVE_UNIX_SOCKET = 0 }
13              
14 2     2   4069 use Try::Tiny;
  2         4025  
  2         107  
15 2     2   14 use Carp;
  2         4  
  2         91  
16 2     2   1720 use Data::MessagePack;
  2         2683  
  2         65  
17              
18 2     2   13 use constant MP_REQ_TYPE => 0;
  2         4  
  2         103  
19 2     2   10 use constant MP_RES_ERROR => 2;
  2         4  
  2         81  
20 2     2   17 use constant MP_RES_RESULT => 3;
  2         3  
  2         2218  
21              
22             sub new {
23 0     0 1   my $class = shift;
24 0 0         my $args = @_ > 1 ? {@_} : $_[0];
25              
26 0           $args->{_id} = 0;
27 0   0       $args->{timeout} ||= 30;
28 0           $args->{_error} = q[];
29              
30 0           bless $args, $class;
31             }
32              
33             sub connect {
34 0     0 1   my $self = shift;
35 0 0         my $args = @_ > 1 ? {@_} : $_[0];
36              
37 0 0         $self->disconnect if $self->{_sock};
38              
39 0   0       my $host = $args->{host} || $self->{host};
40 0   0       my $port = $args->{post} || $self->{port};
41              
42 0 0         croak q[Required "host" parameter to connect] unless $host;
43 0 0         croak q[Required "port" parameter to connect] unless $port;
44              
45 0           my $sock;
46             try {
47 0 0   0     if ($host eq 'unix/') {
48 0 0         if (!$_HAVE_UNIX_SOCKET) {
49 0           croak "This environment doesn't support UNIX socket";
50             }
51              
52 0 0         $sock = IO::Socket::UNIX->new(
53             Peer => $port,
54             Timeout => $self->{timeout},
55             ) or die qq/Unable to connect unix socket "$port": $!/;
56             }
57             else {
58 0 0         $sock = IO::Socket::INET->new(
59             PeerAddr => $host,
60             PeerPort => $port,
61             Proto => 'tcp',
62             Timeout => $self->{timeout},
63             ) or die qq/Unable to connect "${host}:${port}": $!/;
64             }
65              
66 0           $sock->autoflush(1);
67 0           $self->{_sock} = $sock;
68             } catch {
69 0     0     $self->{_error} = $_;
70 0           };
71              
72 0           return !!$self->{_sock};
73             }
74              
75             sub disconnect {
76 0     0 1   delete $_[0]->{_sock};
77             }
78              
79             sub call {
80 0     0 1   my ($self, $method, $param) = @_;
81              
82 0           $self->{_error} = q[];
83 0 0 0       return unless $self->{_sock} or $self->connect;
84              
85 0           my $sock = $self->{_sock};
86 0           my $req = [
87             MP_REQ_TYPE, ++$self->{_id},
88             $method, $param,
89             ];
90 0           $sock->print(Data::MessagePack->pack($req));
91              
92 0           my $timeout = $sock->timeout;
93 0           my $limit = time + $timeout;
94 0           my $buf = q[];
95              
96 0 0         my $select = IO::Select->new or croak $!;
97 0           $select->add($sock);
98              
99 0           my $unpacker = Data::MessagePack::Unpacker->new;
100 0           my $nread = 0;
101              
102 0           while ($limit >= time) {
103 0 0         my @ready = $select->can_read( $limit - time )
104             or last;
105              
106 0 0         croak q/Fatal error on select, $ready[0] isn't $sock/
107             if $sock ne $ready[0];
108              
109 0 0         unless (my $l = $sock->sysread($buf, 512, length $buf)) {
110 0           my $e = $!;
111 0           $self->disconnect;
112 0           croak qq/Error reading socket: $e/;
113             }
114              
115             try {
116 0     0     $nread = $unpacker->execute($buf, $nread);
117             } catch {
118 0     0     $self->{_error} = $_;
119 0           $self->disconnect;
120 0           $unpacker->reset;
121 0           };
122 0 0         return if $self->{_error};
123              
124 0 0         if ($unpacker->is_finished) {
125 0           my $res = $unpacker->data;
126 0           $unpacker->reset;
127              
128 0 0 0       unless ($res and ref $res eq 'ARRAY') {
129 0           $self->{_error} = 'Invalid response from server';
130 0           $self->disconnect;
131 0           return;
132             }
133              
134 0 0         if (my $error = $res->[MP_RES_ERROR]) {
135 0           $self->{_error} = $error;
136 0           return;
137             }
138              
139 0           return $res->[MP_RES_RESULT];
140             }
141             }
142              
143 0           $self->disconnect;
144 0           croak 'request timeout';
145             }
146              
147             sub error {
148 0     0 1   $_[0]->{_error};
149             }
150              
151             1;
152              
153             __END__