File Coverage

blib/lib/Beanstalk/Client.pm
Criterion Covered Total %
statement 47 257 18.2
branch 6 160 3.7
condition 3 30 10.0
subroutine 15 42 35.7
pod 27 27 100.0
total 98 516 18.9


line stmt bran cond sub pod time code
1             package Beanstalk::Client;
2              
3 3     3   4435 use strict;
  3         7  
  3         92  
4 3     3   16 use warnings;
  3         5  
  3         107  
5              
6 3     3   15 use base qw(Class::Accessor::Fast);
  3         4  
  3         225  
7              
8 3     3   3168 use YAML::Syck;
  3         8333  
  3         225  
9 3     3   3358 use Socket;
  3         13328  
  3         8407  
10 3     3   3996 use IO::Socket::INET;
  3         71736  
  3         26  
11              
12 3     3   3704 use Beanstalk::Job;
  3         7  
  3         30  
13 3     3   125 use Beanstalk::Stats;
  3         12  
  3         317  
14              
15             our $VERSION = "1.07";
16              
17             # use namespace::clean;
18              
19             our $CRLF = "\015\012";
20             our $MSG_NOSIGNAL = eval { Socket::MSG_NOSIGNAL() } || 0;
21              
22             BEGIN {
23 3     3   43 __PACKAGE__->mk_accessors(
24             qw(
25             connect_timeout
26             debug
27             decoder
28             default_tube
29             delay
30             encoder
31             error
32             priority
33             server
34             socket
35             ttr
36             _watching
37             _using
38             )
39             );
40             }
41              
42             # no namespace::clean;
43              
44             sub _interact {
45 1     1   2 my ($self, $cmd, $data) = @_;
46 1 50 33     9 my $sock = $self->socket || $self->connect
47             or return;
48              
49 0 0       0 local $SIG{PIPE} = "IGNORE" unless $MSG_NOSIGNAL;
50              
51 0         0 my $debug = $self->debug;
52 0 0       0 warn $cmd ."\n" if $debug;
53              
54 0         0 $cmd .= $CRLF;
55 0 0       0 $cmd .= $data . $CRLF if defined $data;
56              
57 0         0 my $offset = 0;
58 0         0 WRITE: {
59 0         0 my $sent = send($sock, substr($cmd, $offset), $MSG_NOSIGNAL);
60 0 0       0 if ($sent) {
61 0         0 $offset += $sent;
62 0 0       0 redo WRITE if $offset < length($cmd);
63             }
64             else {
65 0         0 $self->error("$!");
66 0 0 0     0 redo WRITE if $!{EINTR} and fileno($sock);
67 0         0 return $self->disconnect;
68             }
69             }
70              
71 0         0 my $buffer;
72 0         0 $offset = 0;
73 0         0 READ: {
74 0         0 my $read = sysread($sock, $buffer, 1024, $offset);
75 0 0       0 if ($read) {
76 0 0       0 if ($buffer =~ /^([^\015\012]+)\015\012/) {
77 0         0 $self->{_recv_buffer} = substr($buffer, 2 + length($1));
78 0 0       0 warn $1,"\n" if $debug;
79 0         0 return split(' ', $1);
80             }
81 0         0 $offset += length $buffer;
82 0         0 redo READ;
83             }
84             else {
85 0         0 $self->error("$!");
86 0 0 0     0 redo READ if $!{EINTR} and fileno($sock);
87             }
88             }
89 0         0 $self->disconnect;
90 0         0 return;
91             }
92              
93              
94             sub _recv_data {
95 0     0   0 my ($self, $bytes) = @_;
96 0         0 my $sock = $self->socket;
97              
98 0         0 my $need = $bytes + 2; # count CRLF
99 0         0 my $offset = length($self->{_recv_buffer});
100 0         0 my $more = $need - $offset;
101              
102 0         0 READ: while ($more > 0) {
103 0         0 my $read = sysread($sock, $self->{_recv_buffer}, $more, $offset);
104 0 0       0 if ($read) {
105 0         0 $offset += $read;
106 0         0 $more -= $read;
107 0 0       0 last if $more == 0;
108 0         0 redo READ;
109             }
110             else {
111 0 0       0 redo READ if $!{EINTR};
112 0         0 $self->error("$!");
113 0         0 return $self->disconnect;
114             }
115             }
116 0 0       0 warn substr($self->{_recv_buffer}, 0, $bytes),"\n" if $self->debug;
117 0         0 return substr($self->{_recv_buffer}, 0, $bytes);
118             }
119              
120              
121             sub _interact_yaml_resp {
122 1     1   2 my ($self, $cmd) = @_;
123              
124 1 50       3 my @resp = _interact($self, $cmd)
125             or return;
126              
127 0 0       0 if ($resp[0] eq 'OK') {
128 0 0       0 my $data = _recv_data($self, $resp[1])
129             or return undef;
130 0         0 return YAML::Syck::Load($data);
131             }
132              
133 0         0 $self->error(join ' ', @resp);
134 0         0 return undef;
135             }
136              
137              
138             sub _interact_stats {
139 0 0   0   0 my $ret = _interact_yaml_resp(@_)
140             or return undef;
141 0         0 return Beanstalk::Stats->new($ret);
142             }
143              
144              
145             sub _peek {
146 0     0   0 my $self = shift;
147 0         0 my $cmd = shift;
148              
149 0 0       0 my @resp = _interact($self, $cmd)
150             or return undef;
151              
152 0 0       0 if ($resp[0] eq 'FOUND') {
153 0         0 my $data = _recv_data($self, $resp[2]);
154 0 0       0 return undef unless defined $data;
155 0         0 return Beanstalk::Job->new(
156             { id => $resp[1],
157             client => $self,
158             data => $data,
159             }
160             );
161             }
162              
163 0         0 $self->error(join ' ', @resp);
164 0         0 return undef;
165             }
166              
167             sub __watching {
168 0     0   0 my $self = shift;
169 0         0 my $watching = $self->_watching;
170 0 0       0 return $watching if $watching;
171 0         0 $self->list_tubes_watched;
172 0         0 $self->_watching;
173             }
174              
175             # use namespace::clean;
176              
177             sub new {
178 4     4 1 2424 my $proto = shift;
179 4   50     80 my $fields = shift || {};
180 4         51 my $self = $proto->SUPER::new(
181             { delay => 0,
182             ttr => 120,
183             priority => 10_000,
184             encoder => \&YAML::Syck::Dump,
185             decoder => \&YAML::Syck::Load,
186             %$fields,
187             }
188             );
189 4         89 $self->{_recv_buffer} = '';
190 4         16 $self;
191             }
192              
193              
194             sub connect {
195 3     3 1 1012 my $self = shift;
196 3   50     33 my $server = $self->server || "127.0.0.1";
197              
198 3 50       47 $server .= ":11300" unless $server =~ /:/;
199              
200 3         17 my $timeout = $self->connect_timeout;
201              
202 3         31 my $sock = IO::Socket::INET->new(
203             PeerAddr => $server,
204             Timeout => $timeout,
205             );
206              
207 3 50       1860 unless ($sock) {
208 3         21 $self->error("connect: $@");
209 3         27 return $self->disconnect;
210             }
211              
212 0         0 $self->socket($sock);
213              
214 0         0 my $was_watching = $self->_watching;
215 0         0 my $was_using = $self->_using;
216              
217 0         0 $self->list_tubes_watched;
218 0 0       0 if ($was_watching) {
    0          
219 0 0       0 $self->watch_only(keys %$was_watching)
220             or return $self->disconnect;
221             }
222             elsif (my $default_tube = $self->default_tube) {
223 0 0 0     0 $self->use($default_tube) && $self->watch_only($default_tube)
224             or return $self->disconnect;
225             }
226              
227 0 0       0 if (defined $was_using) {
228 0 0       0 $self->use($was_using)
229             or return $self->disconnect;
230             }
231              
232 0         0 $sock;
233             }
234              
235              
236             sub disconnect {
237 3     3 1 7 my $self = shift;
238 3 50       14 if (my $sock = $self->socket) {
239 0         0 close($sock);
240             }
241 3         29 $self->socket(undef);
242             }
243              
244             sub quit {
245 0     0 1 0 shift->disconnect;
246 0         0 return 1;
247             }
248              
249              
250             sub put {
251 0     0 1 0 my $self = shift;
252 0   0     0 my $opt = shift || {};
253              
254 0 0       0 my $pri = exists $opt->{priority} ? $opt->{priority} : $self->priority;
255 0 0       0 my $ttr = exists $opt->{ttr} ? $opt->{ttr} : $self->ttr;
256 0 0       0 my $delay = exists $opt->{delay} ? $opt->{delay} : $self->delay;
257 0 0       0 my $data = exists $opt->{data} ? $opt->{data} : $self->encoder->(@_);
258              
259 0 0       0 utf8::encode($data) if utf8::is_utf8($data); # need bytes
260              
261 0         0 my $bytes = length($data);
262              
263 0 0       0 my @resp = _interact($self, "put $pri $delay $ttr $bytes", $data)
264             or return undef;
265              
266 0 0       0 if ($resp[0] =~ /( INSERTED | BURIED )/x) {
267 0 0       0 return Beanstalk::Job->new(
268             { id => $resp[1],
269             client => $self,
270             buried => $1 eq 'BURIED' ? 1 : 0,
271             data => $data,
272             }
273             );
274             }
275              
276 0         0 $self->error(join ' ', @resp);
277              
278 0         0 return undef;
279             }
280              
281              
282             sub stats {
283 0     0 1 0 my $self = shift;
284 0         0 _interact_stats($self, "stats");
285             }
286              
287              
288             sub stats_tube {
289 0     0 1 0 my $self = shift;
290 0 0       0 my $tube = @_ ? shift: 'default';
291 0         0 _interact_stats($self, "stats-tube $tube");
292             }
293              
294              
295             sub stats_job {
296 0     0 1 0 my $self = shift;
297 0   0     0 my $id = shift || 0;
298 0         0 _interact_stats($self, "stats-job $id");
299             }
300              
301              
302             sub kick {
303 0     0 1 0 my $self = shift;
304 0   0     0 my $bound = shift || 1;
305              
306 0 0       0 my @resp = _interact($self, "kick $bound")
307             or return undef;
308              
309 0 0       0 return $resp[1] if $resp[0] eq 'KICKED';
310              
311 0         0 $self->error(join ' ', @resp);
312 0         0 return undef;
313             }
314              
315              
316             sub kick_job {
317 0     0 1 0 my $self = shift;
318 0         0 my $job = shift;
319              
320 0 0       0 my @resp = _interact($self, "kick-job $job")
321             or return undef;
322              
323 0 0       0 return 1 if $resp[0] eq 'KICKED';
324              
325 0         0 $self->error(join ' ', @resp);
326 0         0 return undef;
327             }
328              
329              
330             sub use {
331 0     0 1 0 my $self = shift;
332 0         0 my $tube = shift;
333              
334 0 0       0 my @resp = _interact($self, "use $tube")
335             or return undef;
336              
337 0 0       0 return $self->_using($resp[1]) if $resp[0] eq 'USING';
338              
339 0         0 $self->error(join ' ', @resp);
340 0         0 return undef;
341             }
342              
343              
344             sub reserve {
345 0     0 1 0 my $self = shift;
346 0         0 my $timeout = shift;
347              
348 0 0       0 my $cmd = defined($timeout) ? "reserve-with-timeout $timeout" : "reserve";
349 0 0       0 my @resp = _interact($self, $cmd)
350             or return undef;
351              
352 0 0       0 if ($resp[0] eq 'RESERVED') {
353 0         0 my $data = _recv_data($self, $resp[2]);
354 0 0       0 return undef unless defined $data;
355              
356 0         0 return Beanstalk::Job->new(
357             { id => $resp[1],
358             client => $self,
359             reserved => 1,
360             data => $data,
361             }
362             );
363             }
364              
365 0         0 $self->error(join ' ', @resp);
366 0         0 return undef;
367             }
368              
369              
370             sub delete {
371 0     0 1 0 my $self = shift;
372 0         0 my $id = shift;
373 0 0       0 my @resp = _interact($self, "delete $id")
374             or return undef;
375 0 0       0 return 1 if $resp[0] eq 'DELETED';
376              
377 0         0 $self->error(join ' ', @resp);
378 0         0 return undef;
379             }
380              
381             sub touch {
382 0     0 1 0 my $self = shift;
383 0         0 my $id = shift;
384 0 0       0 my @resp = _interact($self, "touch $id")
385             or return undef;
386 0 0       0 return 1 if $resp[0] eq 'TOUCHED';
387              
388 0         0 $self->error(join ' ', @resp);
389 0         0 return undef;
390             }
391              
392              
393             sub release {
394 0     0 1 0 my $self = shift;
395 0         0 my $id = shift;
396 0   0     0 my $opt = shift || {};
397              
398 0 0       0 my $pri = exists $opt->{priority} ? $opt->{priority} : $self->priority;
399 0 0       0 my $delay = exists $opt->{delay} ? $opt->{delay} : $self->delay;
400              
401 0 0       0 my @resp = _interact($self, "release $id $pri $delay")
402             or return undef;
403 0 0       0 return 1 if $resp[0] eq 'RELEASED';
404              
405 0         0 $self->error(join ' ', @resp);
406 0         0 return undef;
407             }
408              
409              
410             sub bury {
411 0     0 1 0 my $self = shift;
412 0         0 my $id = shift;
413 0   0     0 my $opt = shift || {};
414              
415 0 0       0 my $pri = exists $opt->{priority} ? $opt->{priority} : $self->priority;
416              
417 0 0       0 my @resp = _interact($self, "bury $id $pri")
418             or return undef;
419 0 0       0 return 1 if $resp[0] eq 'BURIED';
420              
421 0         0 $self->error(join ' ', @resp);
422 0         0 return undef;
423             }
424              
425              
426             sub watch {
427 0     0 1 0 my $self = shift;
428 0         0 my $tube = shift;
429              
430 0 0       0 my $watching = $self->__watching or return undef;
431 0 0       0 return scalar keys %$watching if $watching->{$tube};
432              
433 0 0       0 my @resp = _interact($self, "watch $tube")
434             or return undef;
435              
436 0 0       0 if ($resp[0] eq 'WATCHING') {
437 0         0 $watching->{$tube}++;
438 0         0 return $resp[1];
439             }
440              
441 0         0 $self->error(join ' ', @resp);
442 0         0 return undef;
443             }
444              
445              
446             sub ignore {
447 0     0 1 0 my $self = shift;
448 0         0 my $tube = shift;
449              
450 0 0       0 my $watching = $self->__watching or return undef;
451 0 0       0 return scalar keys %$watching unless $watching->{$tube};
452              
453 0 0       0 my @resp = _interact($self, "ignore $tube")
454             or return undef;
455              
456 0 0       0 if ($resp[0] eq 'WATCHING') {
457 0         0 delete $watching->{$tube};
458 0         0 return $resp[1];
459             }
460              
461 0         0 $self->error(join ' ', @resp);
462 0         0 return undef;
463             }
464              
465              
466             sub watch_only {
467 0     0 1 0 my $self = shift;
468 0 0       0 my $watching = $self->__watching or return undef;
469 0         0 my %watched = %$watching;
470 0         0 my $ret;
471 0         0 foreach my $watch (@_) {
472 0 0       0 next if delete $watched{$watch};
473 0 0       0 $ret = $self->watch($watch) or return undef;
474             }
475 0         0 foreach my $ignore (keys %watched) {
476 0 0       0 $ret = $self->ignore($ignore) or return undef;
477             }
478 0   0     0 return $ret || scalar keys %$watching;
479             }
480              
481              
482 0     0 1 0 sub peek { _peek($_[0], "peek $_[1]") }
483 0     0 1 0 sub peek_ready { _peek(shift, "peek-ready") }
484 0     0 1 0 sub peek_delayed { _peek(shift, "peek-delayed") }
485 0     0 1 0 sub peek_buried { _peek(shift, "peek-buried") }
486              
487              
488             sub list_tubes {
489 0     0 1 0 my $self = shift;
490 0 0       0 my $ret = _interact_yaml_resp($self, "list-tubes")
491             or return undef;
492 0         0 return @$ret;
493             }
494              
495              
496             sub list_tube_used {
497 0     0 1 0 my $self = shift;
498 0 0       0 my @resp = _interact($self, "list-tube-used")
499             or return undef;
500 0 0       0 return $resp[1] if $resp[0] eq 'USING';
501              
502 0         0 $self->error(join ' ', @resp);
503 0         0 return undef;
504             }
505              
506              
507             sub list_tubes_watched {
508 1     1 1 420 my $self = shift;
509 1 50       3 my $ret = _interact_yaml_resp($self, "list-tubes-watched")
510             or return;
511 0           $self->_watching( { map { ($_,1) } @$ret });
  0            
512 0           @$ret;
513             }
514              
515              
516             sub pause_tube {
517 0     0 1   my $self = shift;
518 0           my $tube = shift;
519 0   0       my $delay= shift || 0;
520 0 0         my @resp = _interact($self, "pause-tube $tube $delay")
521             or return undef;
522 0 0         return 1 if $resp[0] eq 'PAUSED';
523              
524 0           $self->error(join ' ', @resp);
525 0           return undef;
526             }
527              
528             1;
529              
530             __END__