File Coverage

blib/lib/Gearman/Util.pm
Criterion Covered Total %
statement 58 126 46.0
branch 11 62 17.7
condition 8 13 61.5
subroutine 15 17 88.2
pod 7 7 100.0
total 99 225 44.0


line stmt bran cond sub pod time code
1             package Gearman::Util;
2 19     19   954 use version;
  19         1261  
  19         85  
3             $Gearman::Util::VERSION = qv("2.001_001");
4              
5 19     19   1249 use strict;
  19         20  
  19         276  
6 19     19   95 use warnings;
  19         17  
  19         424  
7              
8             # man errno
9             # Resource temporarily unavailable
10             # (may be the same value as EWOULDBLOCK) (POSIX.1)
11 19     19   870 use POSIX qw(:errno_h);
  19         8661  
  19         120  
12 19     19   11790 use Time::HiRes qw();
  19         7814  
  19         288  
13 19     19   965 use IO::Handle;
  19         8907  
  19         18225  
14              
15             =head1 NAME
16              
17             Gearman::Util - Utility functions for gearman distributed job system
18              
19             =head1 METHODS
20              
21             =cut
22              
23             sub DEBUG () {0}
24              
25             # I: to jobserver
26             # O: out of job server
27             # W: worker
28             # C: client of job server
29             # J: jobserver
30             our %cmd = (
31             1 => ['I', "can_do"], # from W: [FUNC]
32             23 => ['I', "can_do_timeout"], # from W: FUNC[0]TIMEOUT
33             2 => ['I', "cant_do"], # from W: [FUNC]
34             3 => ['I', "reset_abilities"], # from W: ---
35             22 => ['I', "set_client_id"], # W->J: [RANDOM_STRING_NO_WHITESPACE]
36             4 => ['I', "pre_sleep"], # from W: ---
37              
38             26 => ['I', "option_req"], # C->J: [OPT]
39             27 => ['O', "option_res"], # J->C: [OPT]
40              
41             6 => ['O', "noop"], # J->W ---
42             7 => ['I', "submit_job"], # C->J FUNC[0]UNIQ[0]ARGS
43             21 => ['I', "submit_job_high"], # C->J FUNC[0]UNIQ[0]ARGS
44             18 => ['I', "submit_job_bg"], # C->J " " " " "
45             32 => ['I', "submit_job_high_bg"], # C->J FUNC[0]UNIQ[0]ARGS
46              
47             8 => ['O', "job_created"], # J->C HANDLE
48             9 => ['I', "grab_job"], # W->J --
49             10 => ['O', "no_job"], # J->W --
50             11 => ['O', "job_assign"], # J->W HANDLE[0]FUNC[0]ARG
51              
52             12 => ['IO', "work_status"], # W->J/C: HANDLE[0]NUMERATOR[0]DENOMINATOR
53             13 => ['IO', "work_complete"], # W->J/C: HANDLE[0]RES
54             14 => ['IO', "work_fail"], # W->J/C: HANDLE
55             25 => ['IO', "work_exception"], # W->J/C: HANDLE[0]EXCEPTION
56              
57             15 => ['I', "get_status"], # C->J: HANDLE
58             20 => ['O', "status_res"], # C->J: HANDLE[0]KNOWN[0]RUNNING[0]NUM[0]DENOM
59              
60             16 => ['I', "echo_req"], # ?->J TEXT
61             17 => ['O', "echo_res"], # J->? TEXT
62              
63             19 => ['O', "error"], # J->? ERRCODE[0]ERR_TEXT
64              
65             # for worker to declare to the jobserver that this worker is only connected
66             # to one jobserver, so no polls/grabs will take place, and server is free
67             # to push "job_assign" packets back down.
68             24 => ['I', "all_yours"], # W->J ---
69             );
70              
71             our %num; # name -> num
72             while (my ($num, $ary) = each %cmd) {
73             die if $num{ $ary->[1] };
74             $num{ $ary->[1] } = $num;
75             }
76              
77             =head2 cmd_name($num)
78              
79             B cmd
80              
81             =cut
82              
83             sub cmd_name {
84 35     35 1 10303 my $num = shift;
85 35         47 my $c = $cmd{$num};
86 35 50       140 return $c ? $c->[1] : undef;
87             }
88              
89             =head2 pack_req_command($key, $arg)
90              
91             B request string
92              
93             =cut
94              
95             sub pack_req_command {
96 68     68 1 865 return _pack_command("REQ", @_);
97             }
98              
99             =head2 pack_res_command($cmd, $arg)
100              
101             B response string
102              
103             =cut
104              
105             sub pack_res_command {
106 56     56 1 721 return _pack_command("RES", @_);
107             }
108              
109             =head2 read_res_packet($sock, $err_ref, $timeout)
110              
111             B undef on closed socket or malformed packet
112              
113             =cut
114              
115             sub read_res_packet {
116 1     1 1 3 warn " Entering read_res_packet" if DEBUG;
117 1         2 my $sock = shift;
118 1         2 my $err_ref = shift;
119 1         3 my $timeout = shift;
120 1         4 my $time_start = Time::HiRes::time();
121              
122             my $err = sub {
123 0     0   0 my $code = shift;
124 0 0       0 $sock->close() if $sock->connected;
125 0 0       0 $$err_ref = $code if ref $err_ref;
126 0         0 return undef;
127 1         6 };
128              
129 1         19 IO::Handle::blocking($sock, 0);
130              
131 0         0 my $fileno = fileno($sock);
132 0         0 my $rin = '';
133 0         0 vec($rin, $fileno, 1) = 1;
134              
135 0         0 my $readlen = 12;
136 0         0 my $offset = 0;
137 0         0 my $buf = '';
138              
139 0         0 my ($magic, $type, $len);
140              
141 0         0 warn " Starting up event loop\n" if DEBUG;
142              
143 0         0 while (1) {
144 0         0 my $time_remaining = undef;
145 0 0       0 if (defined $timeout) {
146 0         0 warn " We have a timeout of $timeout\n" if DEBUG;
147 0         0 $time_remaining = $time_start + $timeout - Time::HiRes::time();
148 0 0       0 return $err->("timeout") if $time_remaining < 0;
149             }
150              
151 0         0 warn " Selecting on fd $fileno\n" if DEBUG;
152 0         0 my $nfound = select((my $rout = $rin), undef, undef, $time_remaining);
153              
154 0         0 warn " Got $nfound fds back from select\n" if DEBUG;
155              
156 0 0       0 next unless vec($rout, $fileno, 1);
157              
158 0         0 warn " Entering read loop\n" if DEBUG;
159              
160 0         0 my ($ok, $err_code) = _read_sock($sock, \$buf, \$readlen, \$offset);
161 0 0       0 if (!defined($ok)) {
    0          
162 0         0 next;
163             }
164             elsif ($ok == 0) {
165 0         0 return $err->($err_code);
166             }
167              
168 0 0       0 if (!defined $type) {
169 0 0       0 next unless length($buf) >= 12;
170 0         0 my $header = substr($buf, 0, 12, '');
171 0         0 ($magic, $type, $len) = unpack("a4NN", $header);
172 0 0       0 return $err->("malformed_magic") unless $magic eq "\0RES";
173 0         0 my $starting = length($buf);
174 0         0 $readlen = $len - $starting;
175 0         0 $offset = $starting;
176              
177 0 0       0 if ($readlen) {
178 0         0 my ($ok, $err_code)
179             = _read_sock($sock, \$buf, \$readlen, \$offset);
180 0 0       0 if (!defined($ok)) {
    0          
181 0         0 next;
182             }
183             elsif ($ok == 0) {
184 0         0 return $err->($err_code);
185             }
186             } ## end if ($readlen)
187             } ## end if (!defined $type)
188              
189 0         0 $type = $cmd{$type};
190 0 0       0 return $err->("bogus_command") unless $type;
191 0 0       0 return $err->("bogus_command_type") unless index($type->[0], "O") != -1;
192              
193 0         0 warn " Fully formed res packet, returning; type=$type->[1] len=$len\n"
194             if DEBUG;
195              
196 0         0 IO::Handle::blocking($sock, 1);
197              
198             return {
199 0         0 type => $type->[1],
200             len => $len,
201             blobref => \$buf,
202             };
203             } ## end while (1)
204             } ## end sub read_res_packet
205              
206             sub _read_sock {
207 0     0   0 my ($sock, $buf_ref, $readlen_ref, $offset_ref) = @_;
208 0         0 local $!;
209 0         0 my $rv = sysread($sock, $$buf_ref, $$readlen_ref, $$offset_ref);
210              
211 0 0       0 unless ($rv) {
212 0         0 warn " Read error: $!\n" if DEBUG;
213 0 0       0 $! == EAGAIN && return;
214             } ## end unless ($rv)
215              
216 0 0       0 return (0, "read_error") unless defined $rv;
217 0 0       0 return (0, "eof") unless $rv;
218              
219 0 0       0 unless ($rv >= $$readlen_ref) {
220 0         0 warn
221             " Partial read of $rv bytes, at offset $$offset_ref, readlen was $$readlen_ref\n"
222             if DEBUG;
223 0         0 $$offset_ref += $rv;
224 0         0 $$readlen_ref -= $rv;
225              
226 0         0 return _read_sock($sock, $buf_ref, $readlen_ref, $offset_ref);
227             } ## end unless ($rv >= $$readlen_ref)
228              
229 0         0 warn " Finished reading\n" if DEBUG;
230 0         0 return (1);
231             } ## end sub _read_sock
232              
233             =head2 read_text_status($sock, $err_ref)
234              
235             =cut
236              
237             sub read_text_status {
238 1     1 1 939 my $sock = shift;
239 1         2 my $err_ref = shift;
240              
241             my $err = sub {
242 1     1   2 my $code = shift;
243 1 50       8 $sock->close() if $sock->connected;
244 1 50       22 $$err_ref = $code if ref $err_ref;
245 1         9 return undef;
246 1         4 };
247              
248 1         2 my @lines;
249 1         1 my $complete = 0;
250 1         77 while (my $line = <$sock>) {
251 0         0 chomp $line;
252 0 0       0 return $err->($1) if $line =~ /^ERR (\w+) /;
253              
254 0 0       0 if ($line eq '.') {
255 0         0 $complete++;
256 0         0 last;
257             }
258              
259 0         0 push @lines, $line;
260             } ## end while (my $line = <$sock>)
261 1 50       5 return $err->("eof") unless $complete;
262              
263 0         0 return @lines;
264             } ## end sub read_text_status
265              
266             =head2 send_req($sock, $reqref)
267              
268             =cut
269              
270             sub send_req {
271 7     7 1 1309 my ($sock, $reqref) = @_;
272 7 100       22 return 0 unless $sock;
273              
274 3         5 my $len = length($$reqref);
275 3         60 local $SIG{PIPE} = 'IGNORE';
276 3         34 my $rv = $sock->syswrite($$reqref, $len);
277 2 50 33     65 return ($rv && $rv == $len) ? 1 : 0;
278             } ## end sub send_req
279              
280             =head2 wait_for_readability($fileno, $timeout)
281              
282             given a file descriptor number and a timeout,
283              
284             wait for that descriptor to become readable
285              
286             B 0 or 1 on if it did or not
287              
288             =cut
289              
290             sub wait_for_readability {
291 1     1 1 1214 my ($fileno, $timeout) = @_;
292 1 50 33     6 return 0 unless $fileno && $timeout;
293              
294 1         3 my $rin = '';
295 1         2 vec($rin, $fileno, 1) = 1;
296 1         3003122 my $nfound = select($rin, undef, undef, $timeout);
297              
298             # nfound can be undef or 0, both failures, or 1, a success
299 1 50       37 return $nfound ? 1 : 0;
300             } ## end sub wait_for_readability
301              
302             #
303             # _pack_command($prefix, $key, $arg)
304             #
305             sub _pack_command {
306 124     124   130 my ($prefix, $key, $arg) = @_;
307 124 100 100     429 ($key && $num{$key}) || die sprintf("Bogus type arg of '%s'", $key || '');
      66        
308              
309 120   100     241 $arg ||= '';
310 120         108 my $len = length($arg);
311 120         781 return "\0$prefix" . pack("NN", $num{$key}, $len) . $arg;
312             } ## end sub _pack_command
313              
314             1;