File Coverage

blib/lib/Cantella/Worker/Role/Beanstalk.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             package Cantella::Worker::Role::Beanstalk;
2              
3 1     1   248512 use Moose::Role;
  0            
  0            
4             use List::Util 'shuffle';
5             use MooseX::Types::Common::Numeric qw/PositiveInt/;
6              
7             our $VERSION = '0.001000';
8              
9             has beanstalk_clients => (
10             is => 'ro',
11             isa => 'ArrayRef[Beanstalk::Client]',
12             required => 1,
13             );
14              
15             has reserve_timeout => (
16             is => 'ro',
17             isa => PositiveInt,
18             predicate => 'has_reserve_timeout',
19             );
20              
21             has max_tries => (
22             is => 'ro',
23             isa => PositiveInt,
24             predicate => 'has_max_tries',
25             );
26              
27             has delete_on_max_tries => (
28             is => 'ro',
29             isa => 'Bool',
30             required => 1,
31             default => sub{ 0 },
32             );
33              
34             after _start => sub {
35             my $self = shift;
36             for my $client ( @{ $self->beanstalk_clients } ){
37             $client->disconnect;
38             unless( $client->connect ){
39             $self->error( $client->error );
40             }
41             }
42             };
43              
44             sub get_work {
45             my($self) = @_;
46             for my $client ( shuffle @{ $self->beanstalk_clients }){
47             if( my $job = $client->reserve($self->has_reserve_timeout ? $self->reserve_timeout : ())){
48             if( $self->has_max_tries ){
49             my $stats = $job->stats;
50             if( $stats->reserves > $self->max_tries ){
51             my $job_id = $job->id;
52             my $tube = $stats->tube;
53             my $args = join( ', ', map { "'$_'" } $job->args);
54             if( $self->delete_on_max_tries ){
55             $self->logger->notice("Job exceeds max-tries. Deleting job ${job_id} from tube '$tube' with args: $args");
56             $job->delete;
57             } else {
58             $self->logger->notice("Job exceeds max-tries. Burying job ${job_id} from tube '$tube' with args: $args");
59             $job->bury;
60             }
61             redo;
62             }
63             }
64             return $job;
65             } else {
66             my $error = $client->error;
67             if( $error ne 'TIMED_OUT'){
68             $self->logger->error( $error );
69             }
70             }
71             }
72             return;
73             }
74              
75             1;
76              
77             __END__;
78              
79             =head1 NAME
80              
81             Cantella::Worker::Role::Beanstalk - Fetch Cantella::Worker jobs from beanstalkd
82              
83             =head1 SYNOPSIS
84              
85             package TestWorkerPool;
86              
87             use Try::Tiny;
88             use Moose;
89             with(
90             'Cantella::Worker::Role::Worker',
91             'Cantella::Worker::Role::Beanstalk'
92             );
93              
94             sub work {
95             my ($self, $job) = @_;
96             my @args = $job->args;
97             try {
98             if( do_something(@args) ){
99             $job->delete; #work done successfully
100             } else {
101             $job->release({delay => 10}); #let's try again in 10 seconds
102             }
103             } catch {
104             $job->bury; #job failed, bury it and log to file
105             $self->logger->error("Burying job ".$job->id." due to error: '$_'");
106             };
107             }
108              
109              
110             =head1 ATTRIBUTES
111              
112             =head2 beanstalk_clients
113              
114             =over 4
115              
116             =item B<beanstalk_clients> - reader
117              
118             =back
119              
120             Read-only, required, ArrayRef of L<Beanstalk::Client> instances.
121              
122             =head2 reserve_timeout
123              
124             =over 4
125              
126             =item B<reserve_timeout> - reader
127              
128             =item B<has_reserve_timeout> - predicate
129              
130             =back
131              
132             Read-only integer. The reserve timeout will be passed on to
133             L<Beanstalk::Client>'s C<reserve> method, and signals how long, in seconds,
134             the client should wait for a job to become available before timing out and
135             trying the next client in the pool.
136              
137             B<WARNING:> If you only have one Beanstalk server, you might be tempted
138             to set not time out. B<Don't do this.> By setting no timeout, the reserve
139             command will block all other events, including signal handlers. Instead, it
140             is suggested that the C<reserve_timeout> is set to something that is resonable
141             for you workload and the load of your B<beanstalkd> process.
142              
143             =head2 max_tries
144              
145             =over 4
146              
147             =item B<max_tries> - reader
148              
149             =item B<has_max_tries> - predicate
150              
151             =back
152              
153             Read-only, Integer. After a job has been reserved more than C<max_tries>,
154             it will be deleted and not attempted again.
155              
156             =head2 delete_on_max_tries
157              
158             =over 4
159              
160             =item B<delete_on_max_tries> reader
161              
162             =back
163              
164             Reas-only boolean. If C<delet_on_max_tries> is set to true and any job exceeds
165             C<max_tries>, the job will be deleted from the pool, otherwise the job will be
166             C<bury>ed. The value defaults to false. This attribue has no effect unless
167             C<max_tries> is set.
168              
169             =head1 METHODS
170              
171             =head2 get_work
172              
173             =over 4
174              
175             =item B<arguments:> none
176              
177             =item B<return value:> C<$beanstalk_job>
178              
179             =back
180              
181             Will attempt to reserve a job from all of the clients and return it.
182              
183             =head1 _start
184              
185             =over 4
186              
187             =item B<arguments:> none
188              
189             =item B<return value:> none
190              
191             =back
192              
193             The C<_start> method is extended to disconnect and reconnect to the beanstalk
194             servers. This ensures that if a client instance is passed in as an argument
195             prior to a fork when using L<Cantella::Worker::Manager::Prefork>, the connection
196             works correctly in the child.
197              
198             =head1 SEE ALSO
199              
200             L<Cantella::Worker::Manager::Prefork>, L<Cantella::Worker::Role::Worker>
201              
202             =head1 AUTHOR
203              
204             Guillermo Roditi (groditi) E<lt>groditi@cpan.orgE<gt>
205              
206             =head1 COPYRIGHT AND LICENSE
207              
208             This software is copyright (c) 2009-2010 by Guillermo Roditi.
209             This library is free software, you can redistribute it and/or modify
210             it under the same terms as Perl itself.
211              
212             =cut
213