File Coverage

blib/lib/Job/Async/Worker/Redis.pm
Criterion Covered Total %
statement 38 105 36.1
branch 2 26 7.6
condition 0 22 0.0
subroutine 12 33 36.3
pod 9 17 52.9
total 61 203 30.0


line stmt bran cond sub pod time code
1             package Job::Async::Worker::Redis;
2              
3 3     3   157156 use strict;
  3         23  
  3         78  
4 3     3   14 use warnings;
  3         5  
  3         76  
5              
6 3     3   736 use parent qw(Job::Async::Worker);
  3         526  
  3         16  
7              
8             our $VERSION = '0.004'; # VERSION
9              
10             =head1 NAME
11              
12             Job::Async::Worker::Redis - L worker implementation for L
13              
14             =head1 SYNOPSIS
15              
16             =head1 DESCRIPTION
17              
18             =cut
19              
20 3     3   362697 use curry::weak;
  3         5  
  3         62  
21 3     3   15 use Syntax::Keyword::Try;
  3         7  
  3         20  
22 3     3   1165 use Future::AsyncAwait;
  3         1461  
  3         20  
23              
24 3     3   897 use Job::Async::Utils;
  3         257388  
  3         96  
25 3     3   20 use Future::Utils qw(repeat);
  3         6  
  3         148  
26 3     3   17 use JSON::MaybeUTF8 qw(:v1);
  3         5  
  3         384  
27 3     3   20 use Log::Any qw($log);
  3         15  
  3         24  
28              
29 3     3   1507 use Net::Async::Redis;
  3         35318  
  3         6435  
30              
31             =head2 incoming_job
32              
33             Source for jobs received from the C<< BRPOP(LPUSH) >> queue wait.
34              
35             =cut
36              
37             sub incoming_job {
38 0     0 1 0 my ($self) = @_;
39 0   0     0 $self->{incoming_job} //= do {
40 0 0       0 die 'needs to be part of a loop' unless $self->loop;
41 0         0 my $src = $self->ryu->source;
42 0         0 $src->map($self->curry::weak::on_job_received)->map('retain')->retain;
43 0         0 $src
44             }
45             }
46              
47             =head2 on_job_received
48              
49             Called for each job that's received.
50              
51             =cut
52              
53 0     0 1 0 async sub on_job_received {
54 0         0 my ($self, $id) = (shift, @$_);
55             try {
56             my ($queue) = $self->pending_queues;
57              
58             $log->debugf('Received job %s', $id);
59             if(exists $self->{pending_jobs}{$id}) {
60             $log->errorf("Already have job %s", $id);
61             die 'Duplicate job ID';
62             } else {
63             undef $self->{pending_jobs}{$id};
64             }
65              
66             my $job_count = 0 + keys %{$self->{pending_jobs}};
67             $log->debugf("Current job count is %d", $job_count);
68             $self->trigger;
69             my ($items) = await $self->redis->hgetall('job::' . $id);
70             $self->redis->hmset(
71             'job::' . $id,
72             _started => Time::HiRes::time()
73             )->retain;
74             my %data = @$items;
75             my $result = delete $data{result};
76             $log->debugf('Original job data is %s', \%data);
77             $self->{pending_jobs}{$id} = my $job = Job::Async::Job->new(
78             data => Job::Async::Job->structured_data(\%data),
79             id => $id,
80             future => my $f = $self->loop->new_future,
81             );
82              
83             $log->debugf('Job content is %s', { map { $_ => $job->{$_} } qw(data id) });
84             $f->on_done(sub {
85 0     0   0 my ($rslt) = @_;
86 0         0 $log->debugf("Result was %s", $rslt);
87             my $code = sub {
88 0         0 my $tx = shift;
89             try {
90             delete $self->{pending_jobs}{$id};
91             $log->tracef('Removing job from processing queue');
92             return Future->needs_all(
93             map {
94             $_->on_ready(sub {
95             my $f = shift;
96             $log->tracef('ready for %s - %s', $f->label, $f->state);
97             });
98             }
99             $tx->hmset(
100             'job::' . $id,
101             _processed => Time::HiRes::time(),
102             result => ref($rslt) ? 'J' . encode_json_utf8($rslt) : 'T' . $rslt
103             ),
104             $tx->publish('client::' . $data{_reply_to}, $id),
105             $tx->lrem(
106             $self->prefixed_queue($self->processing_queue) => 1,
107             $id
108             ),
109             )
110 0         0 } catch {
111             $log->errorf("Failed due to %s", $@);
112             return Future->fail($@, redis => $self->id, $id);
113             }
114 0         0 };
115             (
116             $self->use_multi
117             ? $self->redis->multi($code)
118             : $code->($self->redis)
119             )->on_ready($self->curry::weak::trigger)
120 0         0 ->on_fail(sub { $log->errorf('Failed to update Redis status for job %s - %s', $id, shift); })
121 0 0       0 ->retain;
122             });
123             $f->on_ready($self->curry::weak::trigger);
124             if(my $timeout = $self->timeout) {
125             Future->needs_any(
126             $f,
127             $self->loop->timeout_future(after => $timeout)->on_fail(sub {
128 0     0   0 local @{$log->{context}}{qw(worker_id job_id)} = ($self->id, $id);
  0         0  
129 0 0       0 $log->errorf("Timeout but already completed with %s", $f->state) if $f->is_ready;
130 0         0 $f->fail('timeout')
131             })
132             )->retain;
133             }
134             $self->jobs->emit($job);
135             return $f;
136 0         0 } catch {
137             $log->errorf("Unable to process received job %s - %s", $id, $@);
138             }
139             }
140              
141 0     0 0 0 sub use_multi { shift->{use_multi} }
142              
143 0   0 0 0 0 sub prefix { shift->{prefix} //= 'jobs' }
144              
145             =head2 pending_queues
146              
147             Note that L only
148             supports a single queue, and will fail if you attempt to start with multiple
149             queues defined.
150              
151             =cut
152              
153 0   0 0 1 0 sub pending_queues { @{ shift->{pending_queues} ||= [qw(pending)] } }
  0         0  
154              
155             =head2 processing_queue
156              
157             =cut
158              
159 0   0 0 1 0 sub processing_queue { shift->{processing_queue} //= 'processing' }
160              
161             =head2 start
162              
163             =cut
164              
165             sub start {
166 0     0 1 0 my ($self) = @_;
167              
168 0         0 $self->trigger;
169             }
170              
171             =head2 stop
172              
173             Requests to stop processing.
174              
175             Returns a future which will complete when all currently-processing jobs have
176             finished.
177              
178             =cut
179              
180             sub stop {
181 0     0 1 0 my ($self) = @_;
182 0         0 my $pending = 0 + keys %{$self->{pending_jobs}};
  0         0  
183 0 0 0     0 if(!$pending && $self->{awaiting_job}) {
184 0         0 return $self->stopping_future->done;
185             }
186             # else, either a job is being processed, or there are pending ones.
187             # sub trigger will recheck
188 0         0 return $self->stopping_future;
189             }
190              
191             sub stopping_future {
192 0     0 0 0 my ($self) = @_;
193 0   0     0 $self->{stopping_future} ||= $self->loop->new_future->set_label('Job::Async::Worker::Redis shutdown');
194             }
195              
196             sub queue_redis {
197 0     0 0 0 my ($self) = @_;
198 0 0       0 unless($self->{queue_redis}) {
199             $self->add_child(
200 0         0 $self->{queue_redis} = Net::Async::Redis->new(
201             uri => $self->uri,
202             )
203             );
204 0         0 $self->{queue_redis}->connect;
205             }
206 0         0 return $self->{queue_redis};
207             }
208              
209             sub redis {
210 0     0 0 0 my ($self) = @_;
211 0 0       0 unless($self->{redis}) {
212             $self->add_child(
213 0         0 $self->{redis} = Net::Async::Redis->new(
214             uri => $self->uri,
215             )
216             );
217 0         0 $self->{redis}->connect;
218             }
219 0         0 return $self->{redis};
220             }
221              
222             sub prefixed_queue {
223 0     0 0 0 my ($self, $q) = @_;
224 0 0       0 return $q unless length(my $prefix = $self->prefix);
225 0         0 return join '::', $self->prefix, $q;
226             }
227              
228             sub trigger {
229 0     0 0 0 my ($self) = @_;
230 0         0 local @{$log->{context}}{qw(worker_id queue)} = ($self->id, my ($queue) = $self->pending_queues);
  0         0  
231             try {
232             my $pending = 0 + keys %{$self->{pending_jobs}};
233             $log->tracef('Trigger called with %d pending tasks, %d max', $pending, $self->max_concurrent_jobs);
234             return if $pending >= $self->max_concurrent_jobs;
235              
236             return $self->{awaiting_job} //= do {
237             $log->debugf('Awaiting job on %s', $queue);
238             Future->wait_any(
239             # If this is cancelled, we don't retrigger. Failure or success should retrigger as usual.
240             $self->queue_redis->brpoplpush(
241             $self->prefixed_queue($queue) => $self->prefixed_queue($self->processing_queue),
242             $self->job_poll_interval
243             )->on_done(sub {
244 0     0   0 my ($id, $queue, @details) = @_;
245             try {
246             $log->tracef('And we have an event on %s', $queue);
247             if($id) {
248             $log->tracef('Had task from queue, pending now %d', 0 + keys %{$self->{pending_jobs}});
249             $self->incoming_job->emit([ $id, $queue ]);
250             } else {
251             $log->tracef('No ID, full details were %s - maybe timeout?', join ' ', $id // (), $queue // (), @details);
252             }
253 0         0 } catch {
254             $log->errorf("Failed to retrieve and process job: %s", $@);
255             }
256 0 0       0 $self->loop->later($self->curry::weak::trigger) unless $self->stopping_future->is_ready;
257             })->on_fail(sub {
258 0     0   0 my $failure = shift;
259 0         0 $log->errorf("Failed to retrieve job from redis: %s", $failure);
260 0 0       0 $self->loop->later($self->curry::weak::trigger) unless $self->stopping_future->is_ready;
261             }),
262             $self->stopping_future->without_cancel
263             )->on_ready(sub {
264 0     0   0 delete $self->{awaiting_job};
265             });
266             };
267 0         0 } catch {
268             $log->errorf('Failed to trigger job handling on %s - %s', $queue, $@);
269             }
270 0         0 return;
271             }
272              
273             =head2 max_concurrent_jobs
274              
275             Number of jobs to process in parallel. Defaults to 1.
276              
277             =cut
278              
279 0   0 0 1 0 sub max_concurrent_jobs { shift->{max_concurrent_jobs} //= 1 }
280              
281             =head2 job_poll_interval
282              
283             Polling interval (e.g. for C in C mode), in seconds.
284              
285             Defaults to 3 seconds.
286              
287             =cut
288              
289 0   0 0 1 0 sub job_poll_interval { shift->{job_poll_interval} //= 3 }
290              
291 0     0 0 0 sub uri { shift->{uri} }
292              
293             sub configure {
294 1     1 1 623 my ($self, %args) = @_;
295 1         4 for my $k (qw(uri max_concurrent_jobs prefix mode processing_queue use_multi job_poll_interval)) {
296 7 50       13 $self->{$k} = delete $args{$k} if exists $args{$k};
297             }
298              
299 1 50       3 if(exists $args{pending_queues}) {
300 0 0       0 if(my $queues = $args{pending_queues}) {
301 0 0 0     0 die 'Only a single queue is supported in reliable mode' if $self->mode eq 'reliable' and @$queues > 1;
302 0         0 $self->{pending_queues} = $queues;
303             } else {
304             delete $self->{pending_queues}
305 0         0 }
306             }
307 1         5 return $self->next::method(%args);
308             }
309              
310             1;
311              
312             =head1 AUTHOR
313              
314             Tom Molesworth
315              
316             =head1 LICENSE
317              
318             Copyright Tom Molesworth 2016-2019. Licensed under the same terms as Perl itself.
319