File Coverage

blib/lib/Gearman/Util.pm
Criterion Covered Total %
statement 64 131 48.8
branch 11 64 17.1
condition 8 16 50.0
subroutine 15 17 88.2
pod 7 7 100.0
total 105 235 44.6


line stmt bran cond sub pod time code
1             package Gearman::Util;
2 19     19   1169 use version ();
  19         1607  
  19         749  
3             $Gearman::Util::VERSION = version->declare("2.002.001"); #TRIAL
4              
5              
6 19     19   95 use strict;
  19         29  
  19         354  
7 19     19   65 use warnings;
  19         29  
  19         509  
8              
9             # man errno
10             # Resource temporarily unavailable
11             # (may be the same value as EWOULDBLOCK) (POSIX.1)
12 19     19   1127 use POSIX qw(:errno_h);
  19         11774  
  19         145  
13 19     19   16548 use Time::HiRes qw();
  19         9868  
  19         422  
14 19     19   10158 use IO::Select;
  19         27808  
  19         25964  
15              
16             =head1 NAME
17              
18             Gearman::Util - Utility functions for gearman distributed job system
19              
20             =head1 METHODS
21              
22             =cut
23              
24             sub DEBUG () {0}
25              
26             # I: to jobserver
27             # O: out of job server
28             # W: worker
29             # C: client of job server
30             # J: jobserver
31             our %cmd = (
32             1 => ['I', "can_do"], # from W: [FUNC]
33             23 => ['I', "can_do_timeout"], # from W: FUNC[0]TIMEOUT
34             2 => ['I', "cant_do"], # from W: [FUNC]
35             3 => ['I', "reset_abilities"], # from W: ---
36             22 => ['I', "set_client_id"], # W->J: [RANDOM_STRING_NO_WHITESPACE]
37             4 => ['I', "pre_sleep"], # from W: ---
38              
39             26 => ['I', "option_req"], # C->J: [OPT]
40             27 => ['O', "option_res"], # J->C: [OPT]
41              
42             6 => ['O', "noop"], # J->W ---
43             7 => ['I', "submit_job"], # C->J FUNC[0]UNIQ[0]ARGS
44             21 => ['I', "submit_job_high"], # C->J FUNC[0]UNIQ[0]ARGS
45             18 => ['I', "submit_job_bg"], # C->J " " " " "
46             32 => ['I', "submit_job_high_bg"], # C->J FUNC[0]UNIQ[0]ARGS
47              
48             8 => ['O', "job_created"], # J->C HANDLE
49             9 => ['I', "grab_job"], # W->J --
50             10 => ['O', "no_job"], # J->W --
51             11 => ['O', "job_assign"], # J->W HANDLE[0]FUNC[0]ARG
52              
53             12 => ['IO', "work_status"], # W->J/C: HANDLE[0]NUMERATOR[0]DENOMINATOR
54             13 => ['IO', "work_complete"], # W->J/C: HANDLE[0]RES
55             14 => ['IO', "work_fail"], # W->J/C: HANDLE
56             25 => ['IO', "work_exception"], # W->J/C: HANDLE[0]EXCEPTION
57              
58             15 => ['I', "get_status"], # C->J: HANDLE
59             20 => ['O', "status_res"], # C->J: HANDLE[0]KNOWN[0]RUNNING[0]NUM[0]DENOM
60              
61             16 => ['I', "echo_req"], # ?->J TEXT
62             17 => ['O', "echo_res"], # J->? TEXT
63              
64             19 => ['O', "error"], # J->? ERRCODE[0]ERR_TEXT
65              
66             # for worker to declare to the jobserver that this worker is only connected
67             # to one jobserver, so no polls/grabs will take place, and server is free
68             # to push "job_assign" packets back down.
69             24 => ['I', "all_yours"], # W->J ---
70             );
71              
72             our %num; # name -> num
73             while (my ($num, $ary) = each %cmd) {
74             die if $num{ $ary->[1] };
75             $num{ $ary->[1] } = $num;
76             }
77              
78             =head2 cmd_name($num)
79              
80             B cmd
81              
82             =cut
83              
84             sub cmd_name {
85 35     35 1 16183 my $num = shift;
86 35         63 my $c = $cmd{$num};
87 35 50       226 return $c ? $c->[1] : undef;
88             }
89              
90             =head2 pack_req_command($key, $arg)
91              
92             B request string
93              
94             =cut
95              
96             sub pack_req_command {
97 68     68 1 1097 return _pack_command("REQ", @_);
98             }
99              
100             =head2 pack_res_command($cmd, $arg)
101              
102             B response string
103              
104             =cut
105              
106             sub pack_res_command {
107 56     56 1 1053 return _pack_command("RES", @_);
108             }
109              
110             =head2 read_res_packet($sock, $err_ref, $timeout)
111              
112             B undef on closed socket or malformed packet
113              
114             =cut
115              
116             sub read_res_packet {
117 1     1 1 1 warn " Entering read_res_packet" if DEBUG;
118 1         2 my $sock = shift;
119 1         3 my $err_ref = shift;
120 1         2 my $timeout = shift;
121 1         4 my $time_start = Time::HiRes::time();
122              
123             #TODO improvement for SSL socket
124             # http://search.cpan.org/~sullr/IO-Socket-SSL/lib/IO/Socket/SSL.pod#Using_Non-Blocking_Sockets
125             my $err = sub {
126 0     0   0 my $code = shift;
127 0 0       0 $sock->close() if $sock->connected;
128 0 0       0 $$err_ref = $code if ref $err_ref;
129 0         0 return undef;
130 1         5 };
131              
132 1         13 $sock->blocking(0);
133              
134 0         0 my $is = IO::Select->new($sock);
135              
136 0         0 my $readlen = 12;
137 0         0 my $offset = 0;
138 0         0 my $buf = '';
139              
140 0         0 my ($magic, $type, $len);
141              
142 0         0 warn " Starting up event loop\n" if DEBUG;
143              
144 0         0 while (1) {
145 0         0 my $time_remaining = undef;
146 0 0       0 if (defined $timeout) {
147 0         0 warn " We have a timeout of $timeout\n" if DEBUG;
148 0         0 $time_remaining = $time_start + $timeout - Time::HiRes::time();
149 0 0       0 return $err->("timeout") if $time_remaining < 0;
150             }
151              
152 0 0       0 $is->can_read($time_remaining) || next;
153              
154 0         0 warn " Entering read loop\n" if DEBUG;
155              
156 0         0 my ($ok, $err_code) = _read_sock($sock, \$buf, \$readlen, \$offset);
157 0 0       0 if (!defined($ok)) {
    0          
158 0         0 next;
159             }
160             elsif ($ok == 0) {
161 0         0 return $err->($err_code);
162             }
163              
164 0 0       0 if (!defined $type) {
165 0 0       0 next unless length($buf) >= 12;
166 0         0 my $header = substr($buf, 0, 12, '');
167 0         0 ($magic, $type, $len) = unpack("a4NN", $header);
168 0 0       0 return $err->("malformed_magic") unless $magic eq "\0RES";
169 0         0 my $starting = length($buf);
170 0         0 $readlen = $len - $starting;
171 0         0 $offset = $starting;
172              
173 0 0       0 if ($readlen) {
174 0         0 my ($ok, $err_code)
175             = _read_sock($sock, \$buf, \$readlen, \$offset);
176 0 0       0 if (!defined($ok)) {
    0          
177 0         0 next;
178             }
179             elsif ($ok == 0) {
180 0         0 return $err->($err_code);
181             }
182             } ## end if ($readlen)
183             } ## end if (!defined $type)
184              
185 0         0 $type = $cmd{$type};
186 0 0       0 return $err->("bogus_command") unless $type;
187 0 0       0 return $err->("bogus_command_type") unless index($type->[0], "O") != -1;
188              
189 0         0 warn " Fully formed res packet, returning; type=$type->[1] len=$len\n"
190             if DEBUG;
191              
192 0         0 $sock->blocking(1);
193              
194             return {
195 0         0 type => $type->[1],
196             len => $len,
197             blobref => \$buf,
198             };
199             } ## end while (1)
200             } ## end sub read_res_packet
201              
202             sub _read_sock {
203 0     0   0 my ($sock, $buf_ref, $readlen_ref, $offset_ref) = @_;
204 0         0 local $!;
205 0         0 my $rv = sysread($sock, $$buf_ref, $$readlen_ref, $$offset_ref);
206              
207 0 0       0 unless ($rv) {
208 0         0 warn " Read error: $!\n" if DEBUG;
209 0 0       0 $! == EAGAIN && return;
210             }
211              
212 0 0       0 return (0, "read_error") unless defined $rv;
213 0 0       0 return (0, "eof") unless $rv;
214              
215 0 0       0 unless ($rv >= $$readlen_ref) {
216 0         0 warn
217             " Partial read of $rv bytes, at offset $$offset_ref, readlen was $$readlen_ref\n"
218             if DEBUG;
219 0         0 $$offset_ref += $rv;
220 0         0 $$readlen_ref -= $rv;
221              
222 0         0 return _read_sock($sock, $buf_ref, $readlen_ref, $offset_ref);
223             } ## end unless ($rv >= $$readlen_ref)
224              
225 0         0 warn " Finished reading\n" if DEBUG;
226 0         0 return (1);
227             } ## end sub _read_sock
228              
229             =head2 read_text_status($sock, $err_ref)
230              
231             =cut
232              
233             sub read_text_status {
234 1     1 1 1299 my $sock = shift;
235 1         3 my $err_ref = shift;
236              
237             my $err = sub {
238 1     1   2 my $code = shift;
239 1 50       6 $sock->close() if $sock->connected;
240 1 50       22 $$err_ref = $code if ref $err_ref;
241 1         10 return undef;
242 1         5 };
243              
244 1         1 my @lines;
245 1         3 my $complete = 0;
246 1         93 while (my $line = <$sock>) {
247 0         0 chomp $line;
248 0 0       0 return $err->($1) if $line =~ /^ERR (\w+) /;
249              
250 0 0       0 if ($line eq '.') {
251 0         0 $complete++;
252 0         0 last;
253             }
254              
255 0         0 push @lines, $line;
256             } ## end while (my $line = <$sock>)
257 1 50       7 return $err->("eof") unless $complete;
258              
259 0         0 return @lines;
260             } ## end sub read_text_status
261              
262             =head2 send_req($sock, $reqref)
263              
264             =cut
265              
266             sub send_req {
267 7     7 1 1840 my ($sock, $reqref) = @_;
268 7 100       32 return 0 unless $sock;
269              
270 3         5 my $data = ${$reqref};
  3         30  
271 3         9 (my $total_len) = (my $len) = length($data);
272 3         7 my ($num_zero_writes, $offset) = (0, 0);
273 3         93 local $SIG{PIPE} = "IGNORE";
274              
275 3   33     30 while ($len && ($num_zero_writes < 5)) {
276 3         51 my $written = $sock->syswrite($data, $len, $offset);
277 2 50       31 if (!defined $written) {
    0          
278 2         2 warn "send_req: syswrite error: $!" if DEBUG;
279 2         47 return 0;
280             }
281             elsif ($written > 0) {
282 0         0 $len -= $written;
283 0         0 $offset += $written;
284             }
285             else {
286 0         0 $num_zero_writes++;
287             }
288             } ## end while ($len && ($num_zero_writes...))
289              
290 0   0     0 return ($total_len > 0 && $offset == $total_len);
291             } ## end sub send_req
292              
293             =head2 wait_for_readability($fileno, $timeout)
294              
295             given a file descriptor number and a timeout,
296              
297             wait for that descriptor to become readable
298              
299             B 0 or 1 on if it did or not
300              
301             =cut
302              
303             sub wait_for_readability {
304 1     1 1 1597 my ($fileno, $timeout) = @_;
305 1 50 33     9 return 0 unless $fileno && $timeout;
306              
307 1         1 my $rin = '';
308 1         4 vec($rin, $fileno, 1) = 1;
309 1         3003150 my $nfound = select($rin, undef, undef, $timeout);
310              
311             # nfound can be undef or 0, both failures, or 1, a success
312 1 50       26 return $nfound ? 1 : 0;
313             } ## end sub wait_for_readability
314              
315             #
316             # _pack_command($prefix, $key, $arg)
317             #
318             sub _pack_command {
319 124     124   183 my ($prefix, $key, $arg) = @_;
320 124 100 100     615 ($key && $num{$key}) || die sprintf("Bogus type arg of '%s'", $key || '');
      66        
321              
322 120   100     350 $arg ||= '';
323 120         147 my $len = length($arg);
324 120         1199 return "\0$prefix" . pack("NN", $num{$key}, $len) . $arg;
325             } ## end sub _pack_command
326              
327             1;