File Coverage

blib/lib/Job/Async/Worker/Redis.pm
Criterion Covered Total %
statement 38 102 37.2
branch 2 22 9.0
condition 0 20 0.0
subroutine 12 29 41.3
pod 7 15 46.6
total 59 188 31.3


line stmt bran cond sub pod time code
1             package Job::Async::Worker::Redis;
2              
3 3     3   132211 use strict;
  3         21  
  3         85  
4 3     3   15 use warnings;
  3         4  
  3         91  
5              
6 3     3   735 use parent qw(Job::Async::Worker);
  3         480  
  3         17  
7              
8             our $VERSION = '0.003'; # VERSION
9              
10             =head1 NAME
11              
12             Job::Async::Worker::Redis - L worker implementation for L
13              
14             =head1 SYNOPSIS
15              
16             =head1 DESCRIPTION
17              
18             =cut
19              
20 3     3   347727 use curry::weak;
  3         7  
  3         59  
21 3     3   12 use Syntax::Keyword::Try;
  3         5  
  3         23  
22 3     3   913 use Future::AsyncAwait;
  3         1281  
  3         17  
23              
24 3     3   800 use Job::Async::Utils;
  3         260881  
  3         93  
25 3     3   21 use Future::Utils qw(repeat);
  3         5  
  3         153  
26 3     3   17 use JSON::MaybeUTF8 qw(:v1);
  3         6  
  3         374  
27 3     3   17 use Log::Any qw($log);
  3         23  
  3         25  
28              
29 3     3   1502 use Net::Async::Redis;
  3         34405  
  3         5527  
30              
31             =head2 incoming_job
32              
33             Source for jobs received from the C<< BRPOP(LPUSH) >> queue wait.
34              
35             =cut
36              
37             sub incoming_job {
38 0     0 1 0 my ($self) = @_;
39 0   0     0 $self->{incoming_job} //= do {
40 0 0       0 die 'needs to be part of a loop' unless $self->loop;
41 0         0 my $src = $self->ryu->source;
42 0         0 $src->map($self->curry::weak::on_job_received)->map('retain')->retain;
43 0         0 $src
44             }
45             }
46              
47             =head2 on_job_received
48              
49             Called for each job that's received.
50              
51             =cut
52              
53 0     0 1 0 async sub on_job_received {
54 0         0 my ($self, $id) = (shift, @$_);
55             try {
56             my ($queue) = $self->pending_queues;
57              
58             $log->debugf('Received job %s', $id);
59             if(exists $self->{pending_jobs}{$id}) {
60             $log->errorf("Already have job %s", $id);
61             die 'Duplicate job ID';
62             } else {
63             undef $self->{pending_jobs}{$id};
64             }
65              
66             my $job_count = 0 + keys %{$self->{pending_jobs}};
67             $log->debugf("Current job count is %d", $job_count);
68             $self->trigger;
69             my ($items) = await $self->redis->hgetall('job::' . $id);
70             my %data = @$items;
71             my $result = delete $data{result};
72             $log->debugf('Original job data is %s', \%data);
73             $self->{pending_jobs}{$id} = my $job = Job::Async::Job->new(
74             data => Job::Async::Job->structured_data(\%data),
75             id => $id,
76             future => my $f = $self->loop->new_future,
77             );
78              
79             $log->debugf('Job content is %s', { map { $_ => $job->{$_} } qw(data id) });
80             $f->on_done(sub {
81 0     0   0 my ($rslt) = @_;
82 0         0 $log->debugf("Result was %s", $rslt);
83             my $code = sub {
84 0         0 my $tx = shift;
85             try {
86             delete $self->{pending_jobs}{$id};
87             $log->tracef('Removing job from processing queue');
88             return Future->needs_all(
89             $tx->hmset('job::' . $id, result => ref($rslt) ? 'J' . encode_json_utf8($rslt) : 'T' . $rslt),
90             $tx->publish('client::' . $data{_reply_to}, $id),
91             $tx->lrem(
92             $self->prefixed_queue($self->processing_queue) => 1,
93             $id
94             ),
95             )
96 0         0 } catch {
97             $log->errorf("Failed due to %s", $@);
98             return Future->fail($@, redis => $self->id, $id);
99             }
100 0         0 };
101             (
102             $self->use_multi
103             ? $self->redis->multi($code)
104             : $code->($self->redis)
105             )->on_ready($self->curry::weak::trigger)
106 0         0 ->on_fail(sub { $log->errorf('Failed to update Redis status for job %s - %s', $id, shift); })
107 0 0       0 ->retain;
108             });
109             $f->on_ready($self->curry::weak::trigger);
110             if(my $timeout = $self->timeout) {
111             Future->needs_any(
112             $f,
113             $self->loop->timeout_future(after => $timeout)->on_fail(sub {
114 0     0   0 local @{$log->{context}}{qw(worker_id job_id)} = ($self->id, $id);
  0         0  
115 0 0       0 $log->errorf("Timeout but already completed with %s", $f->state) if $f->is_ready;
116 0         0 $f->fail('timeout')
117             })
118             )->retain;
119             }
120             $self->jobs->emit($job);
121             return $f;
122 0         0 } catch {
123             $log->errorf("Unable to process received job %s - %s", $id, $@);
124             }
125             }
126              
127 0     0 0 0 sub use_multi { shift->{use_multi} }
128              
129 0   0 0 0 0 sub prefix { shift->{prefix} //= 'jobs' }
130              
131             =head2 pending_queues
132              
133             Note that L only
134             supports a single queue, and will fail if you attempt to start with multiple
135             queues defined.
136              
137             =cut
138              
139 0   0 0 1 0 sub pending_queues { @{ shift->{pending_queues} ||= [qw(pending)] } }
  0         0  
140              
141             =head2 processing_queue
142              
143             =cut
144              
145 0   0 0 1 0 sub processing_queue { shift->{processing_queue} //= 'processing' }
146              
147             =head2 start
148              
149             =cut
150              
151             sub start {
152 0     0 1 0 my ($self) = @_;
153              
154 0         0 $self->trigger;
155             }
156              
157             =head2 stop
158              
159             Requests to stop processing.
160              
161             Returns a future which will complete when all currently-processing jobs have
162             finished.
163              
164             =cut
165              
166             sub stop {
167 0     0 1 0 my ($self) = @_;
168 0   0     0 $self->{stopping_future} ||= $self->loop->new_future;
169 0         0 my $pending = 0 + keys %{$self->{pending_jobs}};
  0         0  
170 0 0 0     0 if(!$pending && $self->{awaiting_job}) {
171             # This will ->cancel a Net::Async::Redis future. Currently that's just
172             # ignored to no great effect, but it would be nice sometime to do
173             # something useful with that.
174 0         0 $self->{awaiting_job}->cancel;
175 0         0 $self->{stopping_future}->done;
176             }
177             # else, either a job is being processed, or there are pending ones.
178             # sub trigger will recheck
179 0         0 return $self->{stopping_future};
180             }
181              
182             sub queue_redis {
183 0     0 0 0 my ($self) = @_;
184 0 0       0 unless($self->{queue_redis}) {
185             $self->add_child(
186 0         0 $self->{queue_redis} = Net::Async::Redis->new(
187             uri => $self->uri,
188             )
189             );
190 0         0 $self->{queue_redis}->connect;
191             }
192 0         0 return $self->{queue_redis};
193             }
194              
195             sub redis {
196 0     0 0 0 my ($self) = @_;
197 0 0       0 unless($self->{redis}) {
198             $self->add_child(
199 0         0 $self->{redis} = Net::Async::Redis->new(
200             uri => $self->uri,
201             )
202             );
203 0         0 $self->{redis}->connect;
204             }
205 0         0 return $self->{redis};
206             }
207              
208             sub prefixed_queue {
209 0     0 0 0 my ($self, $q) = @_;
210 0 0       0 return $q unless length(my $prefix = $self->prefix);
211 0         0 return join '::', $self->prefix, $q;
212             }
213              
214             sub trigger {
215 0     0 0 0 my ($self) = @_;
216 0         0 local @{$log->{context}}{qw(worker_id queue)} = ($self->id, my ($queue) = $self->pending_queues);
  0         0  
217             try {
218             my $pending = 0 + keys %{$self->{pending_jobs}};
219             $log->tracef('Trigger called with %d pending tasks, %d max', $pending, $self->max_concurrent_jobs);
220             return if $pending >= $self->max_concurrent_jobs;
221             if(!$pending and $self->{stopping_future}) {
222             $self->{stopping_future}->done;
223             return;
224             }
225             return $self->{awaiting_job} //= do {
226             $log->debugf('Awaiting job on %s', $queue);
227             $self->queue_redis->brpoplpush(
228             $self->prefixed_queue($queue) => $self->prefixed_queue($self->processing_queue), 0
229             )->on_ready(sub {
230 0     0   0 my $f = shift;
231 0         0 local @{$log->{context}}{qw(worker_id queue)} = ($self->id, $queue);
  0         0  
232             try {
233             $log->debugf('And we have an event on %s', $queue);
234             delete $self->{awaiting_job};
235             $log->tracef('Had task from queue, pending now %d', 0 + keys %{$self->{pending_jobs}});
236             my ($id, $queue, @details) = $f->get;
237             if($id) {
238             $queue //= $queue;
239             $self->incoming_job->emit([ $id, $queue ]);
240             } else {
241             $log->warnf('No ID, full details were %s - maybe timeout?', join ' ', $id // (), $queue // (), @details);
242             }
243 0         0 } catch {
244             $log->errorf("Failed to retrieve and process job: %s", $@);
245             }
246 0         0 $self->loop->later($self->curry::weak::trigger);
247             });
248             };
249 0         0 } catch {
250             $log->errorf('Failed to trigger job handling on %s - %s', $queue, $@);
251             }
252 0         0 return;
253             }
254              
255 0   0 0 0 0 sub max_concurrent_jobs { shift->{max_concurrent_jobs} //= 1 }
256              
257 0     0 0 0 sub uri { shift->{uri} }
258              
259             sub configure {
260 1     1 1 621 my ($self, %args) = @_;
261 1         3 for my $k (qw(uri max_concurrent_jobs prefix mode processing_queue use_multi)) {
262 6 50       11 $self->{$k} = delete $args{$k} if exists $args{$k};
263             }
264              
265 1 50       4 if(exists $args{pending_queues}) {
266 0 0       0 if(my $queues = $args{pending_queues}) {
267 0 0 0     0 die 'Only a single queue is supported in reliable mode' if $self->mode eq 'reliable' and @$queues > 1;
268 0         0 $self->{pending_queues} = $queues;
269             } else {
270             delete $self->{pending_queues}
271 0         0 }
272             }
273 1         3 return $self->next::method(%args);
274             }
275              
276             1;
277              
278             =head1 AUTHOR
279              
280             Tom Molesworth
281              
282             =head1 LICENSE
283              
284             Copyright Tom Molesworth 2016-2019. Licensed under the same terms as Perl itself.
285