File Coverage

blib/lib/Job/Async/Client/Redis.pm
Criterion Covered Total %
statement 37 106 34.9
branch 1 14 7.1
condition 0 9 0.0
subroutine 12 31 38.7
pod 7 15 46.6
total 57 175 32.5


line stmt bran cond sub pod time code
1             package Job::Async::Client::Redis;
2              
3 3     3   116770 use strict;
  3         22  
  3         85  
4 3     3   14 use warnings;
  3         6  
  3         71  
5              
6 3     3   14 use mro;
  3         4  
  3         20  
7              
8 3     3   911 use parent qw(Job::Async::Client);
  3         525  
  3         24  
9              
10             our $VERSION = '0.003'; # VERSION
11              
12             =head1 NAME
13              
14             Job::Async::Client::Redis - L client implementation for L
15              
16             =head1 SYNOPSIS
17              
18             =head1 DESCRIPTION
19              
20             =cut
21              
22 3     3   51194 no indirect;
  3         1789  
  3         16  
23              
24 3     3   922 use Syntax::Keyword::Try;
  3         1092  
  3         23  
25 3     3   180 use JSON::MaybeUTF8 qw(:v1);
  3         7  
  3         326  
26 3     3   743 use Ryu::Async;
  3         319065  
  3         175  
27 3     3   829 use Job::Async::Utils;
  3         262261  
  3         120  
28 3     3   1000 use Net::Async::Redis 1.003;
  3         37644  
  3         122  
29              
30 3     3   23 use Log::Any qw($log);
  3         6  
  3         26  
31              
32             # Our client has a single Redis connection, a UUID to
33             # represent the client, and expects to see job announcements
34             # on the pubsub channel client::$client_id. For each
35             # announcement, the payload represents the job ID, and we get
36             # the actual details from the job hash.
37              
38             sub _add_to_loop {
39 0     0   0 my ($self) = @_;
40             $self->add_child(
41 0         0 $self->{client} = Net::Async::Redis->new(
42             uri => $self->uri,
43             )
44             );
45             $self->add_child(
46 0         0 $self->{subscriber} = Net::Async::Redis->new(
47             uri => $self->uri,
48             )
49             );
50             $self->add_child(
51 0         0 $self->{submitter} = Net::Async::Redis->new(
52             uri => $self->uri,
53             )
54             );
55             $self->add_child(
56 0         0 $self->{ryu} = Ryu::Async->new
57             );
58             }
59              
60             =head2 client
61              
62             =cut
63              
64 0     0 1 0 sub client { shift->{client} }
65              
66             =head2 subscriber
67              
68             =cut
69              
70 0     0 1 0 sub subscriber { shift->{subscriber} }
71              
72             =head2 submitter
73              
74             =cut
75              
76 0     0 1 0 sub submitter { shift->{submitter} }
77              
78 0     0 0 0 sub ryu { shift->{ryu} }
79              
80 0   0 0 0 0 sub prefix { shift->{prefix} //= 'jobs' }
81              
82             sub prefixed_queue {
83 0     0 0 0 my ($self, $q) = @_;
84 0 0       0 return $q unless length(my $prefix = $self->prefix);
85 0         0 return join '::', $self->prefix, $q;
86             }
87 0   0 0 0 0 sub queue { shift->{queue} //= 'pending' }
88              
89             =head2 start
90              
91             =cut
92              
93             sub start {
94 0     0 1 0 my ($self) = @_;
95 0         0 local $log->{context}{client_id} = $self->id;
96             try {
97             $log->tracef("Client awaiting Redis connections via %s", '' . $self->uri);
98             return Future->wait_all(
99             $self->client->connect,
100             $self->submitter->connect,
101             $self->subscriber->connect
102             )->then(sub {
103 0     0   0 local $log->{context}{client_id} = $self->id;
104 0         0 $log->tracef("Subscribing to notifications");
105 0         0 return $self->subscriber
106             ->subscribe('client::' . $self->id)
107             ->on_done(
108             $self->curry::weak::on_subscribed
109             );
110             })
111 0         0 } catch {
112             $log->errorf('Failed on connection setup - %s', $@);
113             die $@;
114             }
115             }
116              
117             =head2 on_subscribed
118              
119             =cut
120              
121             sub on_subscribed {
122 0     0 1 0 my ($self, $sub) = @_;
123 0         0 local $log->{context}{client_id} = $self->id;
124             # Every time someone tells us they finished a job, we pull back the details
125             # and check the results
126             $sub->events
127             ->map('payload')
128             ->each(sub {
129 0     0   0 my ($id) = @_;
130 0         0 local @{$log->{context}}{qw(client_id job_id)} = ($self->id, $id);
  0         0  
131 0         0 $log->tracef("Received job notification");
132 0         0 my $job = $self->pending_job($id);
133 0         0 my $client = $self->client;
134             ($job ? $client->hmget('job::' . $id, 'result')->then(sub {
135 0         0 local @{$log->{context}}{qw(client_id job_id)} = ($self->id, $id);
  0         0  
136 0         0 my ($result) = @{$_[0]};
  0         0  
137 0         0 my $type = substr $result, 0, 1, '';
138 0 0       0 $result = decode_json_utf8($result) if $type eq 'J';
139 0         0 $log->tracef('Job result %s', $result);
140 0         0 $job->done($result);
141             }) : Future->done)->then(sub {
142 0         0 local @{$log->{context}}{qw(client_id job_id)} = ($self->id, $id);
  0         0  
143 0         0 $log->tracef('Removing job data');
144 0         0 $client->del('job::' . $id);
145             })->on_fail(sub {
146 0         0 local @{$log->{context}}{qw(client_id job_id)} = ($self->id, $id);
  0         0  
147 0         0 $log
148 0 0       0 })->retain;
149 0         0 });
150              
151 0         0 $log->tracef("Redis connections established, starting client operations");
152             }
153              
154             sub submit {
155 0     0 1 0 my $self = shift;
156             my $job = (@_ == 1)
157             ? shift
158 0 0       0 : do {
159 0         0 Job::Async::Job->new(
160             future => $self->loop->new_future,
161             id => Job::Async::Utils::uuid(),
162             data => { @_ },
163             );
164             };
165 0         0 $self->{pending_job}{$job->id} = $job;
166             my $code = sub {
167 0     0   0 my $tx = shift;
168 0   0     0 my $id = $job->id // die 'no job ID?';
169             return Future->needs_all(
170             $tx->hmset(
171             'job::' . $id,
172             _reply_to => $self->id,
173 0         0 %{ $job->flattened_data }
174             ),
175             $tx->lpush($self->prefixed_queue($self->queue), $id)
176             ->on_done(sub {
177 0         0 my ($count) = @_;
178 0         0 local @{$log->{context}}{qw(client_id job_id)} = ($self->id, $id);
  0         0  
179 0         0 $log->tracef('Job count for [%s] now %d', $self->queue, $count);
180 0         0 $self->queue_length
181             ->emit($count);
182             })
183 0         0 );
184 0         0 };
185             return (
186             $self->use_multi
187             ? $self->submitter->multi($code)
188             : $code->($self->submitter)
189 0     0   0 )->then(sub { $job->future })
190 0 0       0 ->retain
191             }
192              
193             sub queue_length {
194 0     0 0 0 my ($self) = @_;
195 0   0     0 $self->{queue_length} ||= $self->ryu->source(
196             label => 'Currently pending events for ' . $self->queue
197             );
198             }
199              
200 0     0 0 0 sub use_multi { shift->{use_multi} }
201              
202             sub pending_job {
203 0     0 0 0 my ($self, $id) = @_;
204 0 0       0 die 'no ID' unless defined $id;
205 0         0 return delete $self->{pending_job}{$id}
206             }
207              
208             sub configure {
209 2     2 1 344 my ($self, %args) = @_;
210 2         8 for (qw(queue uri use_multi prefix)) {
211 8 50       22 $self->{$_} = delete $args{$_} if exists $args{$_};
212             }
213 2         14 $self->next::method(%args)
214             }
215              
216 0     0 0   sub uri { shift->{uri} }
217              
218             1;
219              
220             =head1 AUTHOR
221              
222             Tom Molesworth
223              
224             =head1 LICENSE
225              
226             Copyright Tom Molesworth 2016-2017. Licensed under the same terms as Perl itself.
227