File Coverage

blib/lib/Net/Async/Beanstalk.pm
Criterion Covered Total %
statement 29 66 43.9
branch 0 6 0.0
condition n/a
subroutine 10 25 40.0
pod 5 7 71.4
total 44 104 42.3


line stmt bran cond sub pod time code
1             package Net::Async::Beanstalk;
2              
3             our $VERSION = '0.002';
4             $VERSION = eval $VERSION;
5              
6             =head1 NAME
7              
8             Net::Async::Beanstalk - Non-blocking beanstalk client
9              
10             =head1 SYNOPSIS
11              
12             use IO::Async::Loop;
13             use Net::Async::Beanstalk;
14              
15             my $loop = IO::Async::Loop->new();
16              
17             my $client = Net::Async::Beanstalk->new();
18             $loop->add($client);
19              
20             $client->connect(host => 'localhost', service => '11300')->get();
21             $client->put("anything")->get();
22              
23             $loop->run();
24              
25             =head1 BUGS
26              
27             =over
28              
29             =item * Receiving on_disconnect after sending quit might not work.
30              
31             In fact disconnecting hasn't been tested at all, even ad-hoc.
32              
33             =item * This document is even longer.
34              
35             =item * There are no tests
36              
37             See if it's appropriate to steal the tests out of L.
38              
39             =back
40              
41             =cut
42              
43 3     3   206324 use Moo;
  3         31836  
  3         13  
44 3     3   5277 use strictures 2;
  3         4541  
  3         114  
45              
46 3     3   520 use Carp;
  3         6  
  3         212  
47 3     3   1797 use IO::Async::Stream;
  3         227226  
  3         125  
48 3     3   26 use List::Util qw(any);
  3         6  
  3         277  
49 3     3   1293 use MooX::EventHandler;
  3         2586  
  3         15  
50 3     3   1698 use Net::Async::Beanstalk::Constant qw(:state @GENERAL);
  3         9  
  3         572  
51 3     3   1125 use YAML::Any qw(Dump Load);
  3         3117  
  3         11  
52 3     3   22497 use namespace::clean;
  3         29899  
  3         19  
53              
54             =head1 DESCRIPTION
55              
56             Implements the client-side of the beanstalk 1.10 protocol described in
57             L
58             using L to provide an asynchronous non-blocking API.
59              
60             Net::Async::Beanstalk is based on L and
61             L. Refer to those modules' documentation for basic
62             usage. In particular L.
63              
64             =head1 ATTRIBUTES
65              
66             Includes the command stack from L.
67              
68             =cut
69              
70             with 'Net::Async::Beanstalk::Stack';
71              
72             =over
73              
74             =item default_priority (10,000)
75              
76             =item defauly_delay (0)
77              
78             =item default_ttr (120)
79              
80             Default values to associate with a job when it is L on the
81             beanstalk server. The defaults here are arbitrary; they have been
82             chosen to match the default values from L.
83              
84             =cut
85              
86 0     0     has default_priority => is => lazy => builder => sub { 10_000 }; # Totally arbitrary
87              
88 0     0     has default_delay => is => lazy => builder => sub { 0 };
89              
90 0     0     has default_ttr => is => lazy => builder => sub { 120 }; # 2 minutes
91              
92             =item decoder (&YAML::Load)
93              
94             =item encoder (&YAML::Dump)
95              
96             A coderef which will be used to deserialise or serialise jobs as they
97             are retreived from or sent to a beanstalk server.
98              
99             This is not related to how the result of C or C commands
100             are deserialised. This is always done using L.
101              
102             =cut
103              
104             # TODO: has codec => ...; ?
105              
106 0     0     has decoder => is => lazy => builder => sub { \&Load };
107              
108 0     0     has encoder => is => lazy => builder => sub { \&Dump };
109              
110             =item using
111              
112             The name of the tube which was recently Ld.
113              
114             =cut
115              
116             has using => is => rwp => init_arg => undef, default => 'default';
117              
118             =item _watching
119              
120             A hashref who's keys are the tubes which are being Ced. The
121             values are ignored.
122              
123             Use the accessor C to get the list of Ced tubes
124             instead of using the attribute directly.
125              
126             =cut
127              
128             has _watching => is => rwp => init_arg => undef, default => sub {+{ default => 1 }};
129              
130 0     0 0   sub watching { keys %{ $_[0]->_watching } }
  0            
131              
132             =back
133              
134             =cut
135              
136             =head1 CLASS
137              
138             A Net::Async::Beanstalk object represents a single connection to a
139             beanstalkd server. Once a connection has been established (see
140             L) commands may be submitted by calling the objects
141             methods (see L).
142              
143             The command methods all return a L which will either be
144             completed (marked C) with the result if the command was a
145             success or failed with the error.
146              
147             The command methods are named after the L
148             API|https://raw.githubusercontent.com/kr/beanstalkd/v1.10/doc/protocol.txt>
149             command that they implement, with the hyphens changed to an underscore
150             (C). All command methods but L take the same options in
151             the same order as the respective API command. L also has an
152             option added to it so that it can be used to make reservations with or
153             without a timeout.
154              
155             Some events may happen without a command having been sent or in spite
156             of a command being actively executed. These invoke the events
157             documented below and do I complete or fail the associated
158             L. See the documentation of each error (there aren't many) for
159             the details in L.
160              
161             Although this class implements a non-blocking beanstalk client, the
162             protocol itself is not asynchronous. Each command sent will receive a
163             response before the next command will be processed although in
164             practice network buffering makes it appear that commands can be sent
165             while waiting for a previous command's response.
166              
167             Ordinarily this is irrelevant as all the commands except C
168             and C respond quickly enough that any delay will
169             be negligible and this class' internal L
170             stack|Net::Async::Beanstalk::Stack> smooths over the times where
171             that's not the case.
172              
173             When any command which blocks the server has been sent, other commands
174             will be stacked up waiting to be sent to the server but will not be
175             handled (that is, not even put on the wire) until the running command
176             command has completed (perhaps with a timeout).
177              
178             If this is a concern, the beanstalkd server is explicitly written to
179             support multiple connections with low overhead, so there is no need to
180             perform all operations using the same client object. Just be aware
181             that the list of active tubes is not copied (by default) from one
182             client to another. Each client starts off using and watching the
183             C tube.
184              
185             When a job has been reserved by a client (which remains connected)
186             that job is invisible to any other clients. It cannot, for example, be
187             deleted except over the same connected in which it was reserved and if
188             that connection is closed the job will return to the ready queue and
189             may be reserved by another client.
190              
191             =head1 CONNECTING
192              
193             Voodoo.
194              
195             =head1 ERRORS
196              
197             =for comment Move this to ::Receive
198              
199             There are not many error conditions described by the beanstalk
200             protocol. L also defines errors in the
201             event of bugs revealing mistakes in this code or communication
202             failures. Each will cause either the current L to fail, raise
203             an event, or both.
204              
205             See each error's description but by and large the errors that happen
206             because a command failed (which is not the same thing thing as "while
207             a command was active") fail the L while the error conditions
208             that arise spontaneously invoke an C event (defined in
209             L). If the Net::Async::Beanstalk object (or its
210             parent) is created without an C handler then the default
211             C handler will be used which calls L.
212              
213             Except where noted each error or failure includes the arguments which
214             were sent with the command to the server (not including the command
215             itself). If you call a command such as this
216              
217             my $give_it_a_rest = $beanstalk->bury(0x6642, 9_001);
218              
219             then $give_it_a_rest will hold a L which will eventually fail
220             and call its handler with:
221              
222             $h->("...buried: 26180 not found", "beanstalk-peek", 0x6632, 9_001);
223              
224             The exceptional errors are:
225              
226              
227             =for comment = find all three errors.
228              
229             =over
230              
231             =item C
232              
233             Category: C
234              
235             Arguments: The buffer which was written into the communication stream.
236              
237             This error invokes an C event and fails the active L.
238              
239             The server received a command line that was not well-formed. This
240             should never happen and when it does it indicates an error in this
241             module. Please report it so that it can be repaired.
242              
243             =item C
244              
245             Category: C
246              
247             Arguments: Everything.
248              
249             This error invokes C only.
250              
251             The server suffered from an internal error. This should never happen
252             and when it does it indicates an error in the server. Please report it
253             to the L so
254             that it can be repaired.
255              
256             This error does not attempt to fail the current or any pending
257             Ls, however the server's probably about to crash so your code
258             should deal with that and the pending Ls gracefully.
259              
260             =item C
261              
262             Category: C
263              
264             Arguments: The command name and then the arguments as usual.
265              
266             This error only fails the active L.
267              
268             The server ran out of memory. This happens sometimes but generally it
269             should not. Please report it to your system administrator so that he
270             can be repaired.
271              
272             =item C
273              
274             Category: C
275              
276             Arguments: The command name and then the arguments as usual.
277              
278             This error invokes an C event and fails the active L.
279              
280             This module sent a command the server did not understand. This should
281             never happen and when it does it indicates an error in the server or a
282             protocol mismatch between the server and client.
283              
284             =item C
285              
286             Category: C
287              
288             Arguments: The buffer which was received from the communication stream
289             as an arrayref of each received chunk.
290              
291             This error invokes C only.
292              
293             The server sent a message this client did not understand. This should
294             never happen and when it does it indicates an error in the server or a
295             protocol mismatch between the server and client.
296              
297             This error does not attempt to fail the current or any pending
298             Ls, however the server's speaking gibberish so nobody knows
299             what's going to happen next. Your code should deal with that and the
300             pending Ls gracefully.
301              
302             =back
303              
304             In order to make the command stack work, each L is created
305             with an C handler which sends the next pending command. In
306             the event of an error the pending commands may become invalid. This
307             class makes no attempt to deal with that.
308              
309             One other protocol error (C) can be received only in
310             response to a L command (it does not invoke an L
311             event).
312              
313             =head1 COMMAND METHODS
314              
315             =for comment Move this to ::Send
316              
317             Methods which initiate a command on the server are implemented in
318             L. The server response is processed by
319             the event handlers in L. Every command
320             method returns a L which will complete with the server's
321             response to that command, whether success or failure.
322              
323             With few exceptions, documented below, each method expects exactly the
324             arguments that the respective command requires. The commands which
325             expect to receive a YAML structure as the response (primarily the
326             C commands) deserialise the response before returning it as a
327             (non-reference) list or hash.
328              
329             The methods are named with a C<_> where the API command has a C<->.
330              
331             See
332             L
333             for further details on each command. They are:
334              
335             =over
336              
337             =item put ($job, %options) or put (%options)
338              
339             Put a job onto the currently Cd tube. The job data can be passed
340             as the method's first argument or in C<%options>.
341              
342             The job's C, C or C can be set by including
343             those values in C<%options>. If they are not then the object's default
344             value is used (see above).
345              
346             The job may be passed as the method's first or only argument or as
347             C in C<%options>. It will be serialised using L if
348             it's a reference and does not overload the stringify (C<"">) operator.
349              
350             The job may instead be passed as C in C<%options> if it has
351             already been serialised.
352              
353             Regardless of whether C is used to serialise the job it is
354             changed to a string of bytes using L.
355              
356             It is an error to pass the job data in more than one form or to
357             included unknown options and C will L.
358              
359             Possible failures:
360              
361             =over
362              
363             =item C
364              
365             Category: C
366              
367             Arguments: As with C, this should but does not include the
368             buffer which was sent.
369              
370             This error only fails the active L.
371              
372             The client sent badly-formed job data which was not terminated by a
373             CR+LF pair. This should never happen and when it does it indicates an
374             error in this module. Please report it so that it can be repaired.
375              
376             =item C
377              
378             Category: C
379              
380             The client sent job data which was rejected by the server for being
381             too large. The job has not beed stored in any queue.
382              
383             =item C
384              
385             Category: C
386              
387             The job was successfully received by the server but it was unable to
388             allocate memory to put it into the ready queue and so the job has been
389             buried.
390              
391             =item C
392              
393             Category: C
394              
395             The server is currently being drained and is not accepting new
396             jobs. The job has not been stored in any queue.
397              
398             =back
399              
400             =item reserve (%options)
401              
402             Reserve the next available job. C may be passed in
403             C<%options> in which case the C command is sent
404             instead. C may be C<0>.
405              
406             The data returned by the server is transformed into a string of
407             characters with L then deserialised using C.
408              
409             If the C option is set to a true value then the data returned by
410             the server is transformed into characters but is not deserialised.
411              
412             If the C option is set to a true value then the data is left
413             completely untouched.
414              
415             Possible failures:
416              
417             =over
418              
419             =item C
420              
421             Category: C
422              
423             A job which was previously reserved by this client and has not been
424             handled is nearing the time when its reservation will expire and the
425             server will restore it to the ready queue.
426              
427             =back
428              
429             =item reserve_with_timeout ($time, %options)
430              
431             Implemented by calling L with a C option. C<$time>
432             may be 0 which will cause the L to fail immediately with a
433             C error if there are no jobs available.
434              
435             Possible failures:
436              
437             =over
438              
439             =item C
440              
441             Category: C
442              
443             The number of seconds specified in the timeout value to
444             C has expired without a job becoming ready to
445             reserve.
446              
447             =back
448              
449             In addition all the failures possible in response to the L
450             command can be received in response to C.
451              
452             =item bury ($job_id)
453              
454             Possible failures:
455              
456             =over
457              
458             =item C
459              
460             Category: C
461              
462             The job with ID C<$id> could not be buried because it does not exist
463             or has not been previously reserved by this client.
464              
465             =for comment TODO: Can a job be buried if it is reserved by _no_ client?
466              
467             =back
468              
469             =item delete ($job_id)
470              
471             Possible failures:
472              
473             =over
474              
475             =item C
476              
477             Category: C
478              
479             The job with ID C<$id> could not be deleted because it does not exist,
480             has not been previously reserved by this client or is not in a
481             C or C state.
482              
483             =back
484              
485             =item ignore ($tube_name)
486              
487             Possible failures:
488              
489             =over
490              
491             =item C
492              
493             Category: C
494              
495             The client attempted to ignore the only tube remaining in its watch
496             list.
497              
498             =for comment Is it an error to ignore a tube which is not being watched?
499              
500             =back
501              
502             =item kick_job ($job_id)
503              
504             Possible failures:
505              
506             =over
507              
508             =item C
509              
510             Category: C
511              
512             The job with ID C<$id> could not be kicked because it "is not in a
513             kickable state".
514              
515             =for comment The documentation is vague and includes the possibility
516             of "internal errors".
517              
518             =back
519              
520             =item kick ($max)
521              
522             This command should not fail.
523              
524             =item list_tubes ()
525              
526             This command should not fail.
527              
528             =item list_tubes_watched ()
529              
530             This command should not fail.
531              
532             =item list_tube_used ()
533              
534             This command should not fail.
535              
536             =item pause_tube ($tube_name, $delay)
537              
538             Possible failures:
539              
540             =over
541              
542             =item C
543              
544             Category: C
545              
546             The tube could not be paused because it doesn't exist.
547              
548             =back
549              
550             =item peek ($job_id)
551              
552             Possible failures:
553              
554             =over
555              
556             =item C
557              
558             Category: C
559              
560             The specified job could not be retrieved because it does not exist.
561              
562             =back
563              
564             =item peek_buried ()
565              
566             Possible failures:
567              
568             =over
569              
570             =item C
571              
572             Category: C
573              
574             The next job in a buried state could not be retrieved because one does
575             not exist.
576              
577             =back
578              
579             =item peek_delayed ()
580              
581             Possible failures:
582              
583             =over
584              
585             =item C
586              
587             Category: C
588              
589             The next job in a delayed state could not be retrieved because one
590             does not exist.
591              
592             =back
593              
594             =item peek_ready ()
595              
596             Possible failures:
597              
598             =over
599              
600             =item C
601              
602             Category: C
603              
604             The next job in a ready state could not be retrieved because one does
605             not exist.
606              
607             =back
608              
609             =item quit ()
610              
611             In theory this will raise an C in addition to
612             completing the L it returns. In practice I haven't written it
613             yet.
614              
615             =item release ($job_id, $priority, $delay)
616              
617             Possible failures:
618              
619             =over
620              
621             =item C
622              
623             Category: C
624              
625             The job with ID C<$id> could not be released because it does not exist
626             or has not been previously reserved by this client.
627              
628             =for comment TODO: What if an attempt is made to release a released job?
629              
630             =item C
631              
632             Category: C
633              
634             The job with ID C<$id> could not be released because the server ran
635             out of memory.
636              
637             =back
638              
639             =item stats ()
640              
641             This command should not fail.
642              
643             =item stats_job ($job_id)
644              
645             Possible failures:
646              
647             =over
648              
649             =item C
650              
651             Category: C
652              
653             No statistics are available for the job with ID C<$id> because it does
654             not exist.
655              
656             =back
657              
658             =item stats_tube ($tube_name)
659              
660             Possible failures:
661              
662             =over
663              
664             =item C
665              
666             Category: C
667              
668             No statistics are available for the tube named C<$tube> because it
669             does not exist.
670              
671             =back
672              
673             =item touch ($job_id)
674              
675             Possible failures:
676              
677             =over
678              
679             =item C
680              
681             Category: C
682              
683             The job with ID C<$id> could not be touched because it does not exist
684             or has not been previously reserved by this client.
685              
686             =back
687              
688             =item use ($tube_name)
689              
690             This command should not fail.
691              
692             =item watch ($tube_name)
693              
694             This command should not fail.
695              
696             =back
697              
698             =cut
699              
700             with 'Net::Async::Beanstalk::Receive';
701              
702             with 'Net::Async::Beanstalk::Send';
703              
704             =head1 OTHER METHODS
705              
706             =over
707              
708             =item reserve_pending () => @commands
709              
710             Returns a all the entries in L
711             which refer to a C or C command.
712              
713             =cut
714              
715             sub reserve_pending {
716 0     0 1   scalar grep { $_->[STATE_COMMAND] =~ /^reserve/ } $_[0]->_pending_commands;
  0            
717             }
718              
719             =item disconnect () => $future
720              
721             An alias for L.
722              
723             =cut
724              
725             *disconnect = \&quit;
726              
727             =item sync () => $future
728              
729             Returns a L which completes when all pending commands have
730             been responded to.
731              
732             =cut
733              
734 0     0 1   sub sync { Future->wait_all(map { $_->[STATE_FUTURE] } $_[0]->_pending_commands) }
  0            
735              
736             =item watch_only (@tubes) => $future
737              
738             Send a C command and based on its result send a
739             series of C and then C commands so that the tubes being
740             watched for this client exactly matches C<@tubes>.
741              
742             =cut
743              
744             sub watch_only {
745 0     0 1   my $self = shift;
746 0           my %want = map{+($_=>1)} @_;
  0            
747             $self->list_tubes_watched->then(sub {
748 0     0     my %current = map {+($_=>1)} @_;
  0            
749 0           my @watch = map { $self->watch($_) } grep { not $current{$_} } keys %want;
  0            
  0            
750 0           my @ignore = map { $self->ignore($_) } grep { not $want{$_} } keys %current;
  0            
  0            
751 0           Future->wait_all(@watch, @ignore);
752 0           });
753             }
754              
755             =back
756              
757             =head1 INTERNAL METHODS
758              
759             =over
760              
761             =item _assert_state($response_word) => VOID
762              
763             Raises an exception of the word received from the server is not
764             something expected in response to the command which has most recently
765             been sent.
766              
767             =cut
768              
769             sub _assert_state {
770 0     0     my $self = shift;
771 0           my ($response) = @_;
772 0 0   0     return if any { $response eq $_ } @GENERAL;
  0            
773 0 0         croak "Internal error: $response in null state" unless $self->count_commands;
774 0           my $state = $self->current_command;
775             croak "Internal error: $response in invalid state " . $state->[STATE_COMMAND]
776 0 0         unless exists $STATE_CAN{$state->[STATE_COMMAND]}{$response};
777             }
778              
779             =item fail_command($message, $exception, @args) => $future
780              
781             Remove the current command from the command stack and fail its
782             L with this method's arguments.
783              
784             The L returned is the one which returned when initiating a
785             command and can be safely ignored.
786              
787             This is used by L when the client
788             received an expected response which nevertheless indicates an error of
789             some kind, such as C received in response to a
790             C command.
791              
792             =cut
793              
794 0     0 1   sub fail_command { $_[0]->_shift_command->[0]->fail(@_[1..$#_]) }
795              
796             =item finish_command($event, @args) => $future
797              
798             Remove the current command from the command stack and complete its
799             L with this method's arguments.
800              
801             The L returned is the one which returned when initiating a
802             command and can be safely ignored.
803              
804             This is used by L when the server sent
805             a response to a command which indicates successful completion.
806              
807             =cut
808              
809 0     0 1   sub finish_command { $_[0]->_shift_command->[0]->done(@_[1..$#_]) }
810              
811             # Messy Moo/IO::Async stuff
812              
813             # TODO: Don't use IaProtocol. Apparently it's bad.
814 3     3   3807 BEGIN { our @ISA; unshift @ISA, 'IO::Async::Stream' }
  3         394  
815              
816             my @events = qw(
817             on_disconnect
818             );
819              
820             # TODO: MooX::EventHandler is looking less and less useful.
821             has_event $_ for @events;
822              
823             my @attributes = qw(
824             default_delay
825             default_priority
826             default_ttr
827             encoder
828             decoder
829             );
830              
831             sub FOREIGNBUILDARGS {
832 0     0 0   my $class = shift;
833 0           my %args = @_;
834 0           delete @args{@attributes, @events};
835 0           return %args;
836             }
837              
838             1;
839              
840             =back
841              
842             =head1 ALTERNATIVE IMPLEMENTATIONS
843              
844             =over
845              
846             =item L
847              
848             A good module and asynchronous but it uses L which ... the
849             less said the better. The core of the protocol is implemented but it
850             does not handle all error conditions. I have attempted to make
851             L's API superficially similar to this one.
852              
853             =item L
854              
855             Also written by Graham Barr, this module seems to be slightly more
856             functionally complete than its L counterpart and has proven
857             itself stable and fast but unfortunately does not operate
858             asynchronously.
859              
860             =item L
861              
862             Unfortunately also based on L which is a shame because it
863             implements what appears to be an interesting FSA using beanstalk
864             queues.
865              
866             =item L
867              
868             Ancient, presumably unsupported and based on an out-dated version of
869             the beanstalk protocol.
870              
871             =back
872              
873             =head1 SEE ALSO
874              
875             L
876              
877             L
878              
879             L
880              
881             L
882              
883             =head1 AUTHOR
884              
885             Matthew King
886              
887             =cut