File Coverage

blib/lib/Data/YUID/Client.pm
Criterion Covered Total %
statement 27 107 25.2
branch 0 48 0.0
condition 0 29 0.0
subroutine 9 16 56.2
pod 2 5 40.0
total 38 205 18.5


line stmt bran cond sub pod time code
1             # $Id$
2              
3             package Data::YUID::Client;
4 1     1   558 use strict;
  1         2  
  1         57  
5              
6 1     1   1127 use fields qw( servers select_timeout hosts connect_timeout );
  1         1826  
  1         4  
7 1     1   64 use Carp;
  1         2  
  1         81  
8 1     1   994 use Errno qw( EINPROGRESS EWOULDBLOCK EISCONN );
  1         1215  
  1         128  
9 1     1   77778 use IO::Socket::INET;
  1         42203  
  1         14  
10 1     1   1191 use Socket qw( MSG_NOSIGNAL );
  1         2  
  1         336  
11 1     1   1089 use URI::Escape ();
  1         1661  
  1         31  
12              
13 1     1   8 use constant DEFAULT_PORT => 9001;
  1         2  
  1         92  
14 1         1338 use constant DEFAULT_TIMEOUT => {
15             connect => 1.0, # seconds
16             select => 1.0, # seconds
17 1     1   5 };
  1         1  
18              
19             our $FLAG_NOSIGNAL = 0;
20             eval { $FLAG_NOSIGNAL = MSG_NOSIGNAL };
21              
22             my $Active_Sock;
23             my $Socket_Used_Count = 0; #yuids served per socket before we refresh it, to force rebalancing
24             my $Max_Socket_Reused = 1000 + rand(1000);
25              
26             sub new {
27 0     0 1   my Data::YUID::Client $client = shift;
28 0           my %args = @_;
29 0 0         $client = fields::new($client) unless ref $client;
30              
31 0 0 0       croak "servers must be an arrayref if specified"
32             unless !exists $args{servers} || ref $args{servers} eq 'ARRAY';
33 0   0       $client->{servers} = $args{servers} || [];
34 0           for (qw(select_timeout connect_timeout)) {
35 0 0         my $value = exists $args{$_} ? $args{$_} : DEFAULT_TIMEOUT->{$_};
36 0           $client->{$_} = $value;
37             }
38 0           $client->{hosts} = $args{servers};
39              
40 0           $client;
41             }
42              
43             sub _oneline {
44 0     0     my Data::YUID::Client $client = shift;
45 0           my ($sock, $line) = @_;
46 0           my $res;
47 0           my ($ret, $offset) = (undef, 0);
48              
49             # state: 0 - writing, 1 - reading, 2 - done
50 0 0         my $state = defined $line ? 0 : 1;
51              
52             # the bitsets for select
53 0           my ($rin, $rout, $win, $wout);
54 0           my $nfound;
55              
56 0           my $copy_state = -1;
57 0 0         local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
58              
59             # the select loop
60 0           while(1) {
61 0 0         if ($copy_state!=$state) {
62 0 0         last if $state==2;
63 0           ($rin, $win) = ('', '');
64 0 0         vec($rin, fileno($sock), 1) = 1 if $state==1;
65 0 0         vec($win, fileno($sock), 1) = 1 if $state==0;
66 0           $copy_state = $state;
67             }
68 0           $nfound = select($rout=$rin, $wout=$win, undef,
69             $client->{select_timeout});
70 0 0         last unless $nfound;
71              
72 0 0         if (vec($wout, fileno($sock), 1)) {
73 0           $res = send($sock, $line, $FLAG_NOSIGNAL);
74             next
75 0 0 0       if not defined $res and $!==EWOULDBLOCK;
76 0 0 0       unless (($res || 0) > 0) {
77 0           _close_sock($sock);
78 0           return undef;
79             }
80 0 0         if ($res == length($line)) { # all sent
81 0           $state = 1;
82             } else { # we only succeeded in sending some of it
83 0           substr($line, 0, $res, ''); # delete the part we sent
84             }
85             }
86              
87 0 0         if (vec($rout, fileno($sock), 1)) {
88 0           $res = sysread($sock, $ret, 255, $offset);
89             next
90 0 0 0       if !defined($res) and $!==EWOULDBLOCK;
91 0 0         if ($res == 0) { # catches 0=conn closed or undef=error
92 0           _close_sock($sock);
93 0           return undef;
94             }
95 0           $offset += $res;
96 0 0         if (rindex($ret, "\r\n") + 2 == length($ret)) {
97 0           $state = 2;
98             }
99             }
100             }
101              
102 0 0         unless ($state == 2) {
103 0           _close_sock($sock);
104 0           return undef;
105             }
106              
107 0           return $ret;
108             }
109              
110             sub get_id {
111 0     0 1   my Data::YUID::Client $client = shift;
112 0           my($ns) = @_;
113 0           my $id;
114 0   0       while (!$id && (my $sock = $client->get_sock)) {
115 0   0       my $cmd = sprintf "getid ns=%s\r\n", URI::Escape::uri_escape($ns || '');
116 0 0         my $res = $client->_oneline($sock, $cmd) or next;
117 0           ($id) = $res =~ /^ok\s+id=(\d+)/i;
118             }
119 0           $id;
120             }
121              
122             sub _close_sock {
123 0     0     my($sock) = @_;
124 0 0 0       undef $Active_Sock
125             if ($Active_Sock && fileno($sock) == fileno($Active_Sock));
126 0           close $sock;
127             }
128              
129             #class method, for request cleanup
130             sub close_active_sock {
131 0     0 0   close $Active_Sock;
132             }
133              
134             sub connect_to_server {
135 0     0 0   my Data::YUID::Client $client = shift;
136 0           my($host) = @_;
137 0           my($ip, $port) = split /:/, $host;
138 0   0       $port ||= DEFAULT_PORT;
139 0 0         my $sock = IO::Socket::INET->new(
140             PeerAddr => $ip,
141             PeerPort => $port,
142             Proto => 'tcp',
143             Type => SOCK_STREAM,
144             ReuseAddr => 1,
145             Blocking => 0,
146             Timeout => $client->{connect_timeout},
147             ) or return;
148 0           $sock;
149             }
150              
151             sub get_sock {
152 0     0 0   my Data::YUID::Client $client = shift;
153              
154 0           ++$Socket_Used_Count;
155              
156 0 0 0       if ($Active_Sock && $Active_Sock->connected && ($Socket_Used_Count % $Max_Socket_Reused != 0)) {
      0        
157 0           return $Active_Sock;
158             }else{
159 0           my $hosts = $client->{hosts};
160 0           my @try_hosts = @$hosts;
161 0           my $sock;
162 0           while (@try_hosts){
163 0           my $host = splice(@try_hosts, int(rand(@try_hosts)), 1);
164 0           $sock = $client->connect_to_server($host);
165 0 0         last if $sock;
166             }
167 0           return $Active_Sock = $sock;
168             }
169             }
170              
171             1;
172             __END__