File Coverage

blib/lib/Job/Async/Client/Redis.pm
Criterion Covered Total %
statement 37 106 34.9
branch 1 14 7.1
condition 0 9 0.0
subroutine 12 31 38.7
pod 7 15 46.6
total 57 175 32.5


line stmt bran cond sub pod time code
1             package Job::Async::Client::Redis;
2              
3 3     3   109393 use strict;
  3         20  
  3         78  
4 3     3   15 use warnings;
  3         3  
  3         69  
5              
6 3     3   11 use mro;
  3         6  
  3         17  
7              
8 3     3   781 use parent qw(Job::Async::Client);
  3         522  
  3         17  
9              
10             our $VERSION = '0.004'; # VERSION
11              
12             =head1 NAME
13              
14             Job::Async::Client::Redis - L client implementation for L
15              
16             =head1 SYNOPSIS
17              
18             =head1 DESCRIPTION
19              
20             =cut
21              
22 3     3   51488 no indirect;
  3         1711  
  3         15  
23              
24 3     3   915 use Syntax::Keyword::Try;
  3         1076  
  3         16  
25 3     3   176 use JSON::MaybeUTF8 qw(:v1);
  3         5  
  3         324  
26 3     3   787 use Ryu::Async;
  3         295738  
  3         108  
27 3     3   746 use Job::Async::Utils;
  3         263559  
  3         124  
28 3     3   859 use Net::Async::Redis 1.003;
  3         35291  
  3         117  
29              
30 3     3   29 use Log::Any qw($log);
  3         7  
  3         20  
31              
32             # Our client has a single Redis connection, a UUID to
33             # represent the client, and expects to see job announcements
34             # on the pubsub channel client::$client_id. For each
35             # announcement, the payload represents the job ID, and we get
36             # the actual details from the job hash.
37              
38             sub _add_to_loop {
39 0     0   0 my ($self) = @_;
40             $self->add_child(
41 0         0 $self->{client} = Net::Async::Redis->new(
42             uri => $self->uri,
43             )
44             );
45             $self->add_child(
46 0         0 $self->{subscriber} = Net::Async::Redis->new(
47             uri => $self->uri,
48             )
49             );
50             $self->add_child(
51 0         0 $self->{submitter} = Net::Async::Redis->new(
52             uri => $self->uri,
53             )
54             );
55             $self->add_child(
56 0         0 $self->{ryu} = Ryu::Async->new
57             );
58             }
59              
60             =head2 client
61              
62             =cut
63              
64 0     0 1 0 sub client { shift->{client} }
65              
66             =head2 subscriber
67              
68             =cut
69              
70 0     0 1 0 sub subscriber { shift->{subscriber} }
71              
72             =head2 submitter
73              
74             =cut
75              
76 0     0 1 0 sub submitter { shift->{submitter} }
77              
78 0     0 0 0 sub ryu { shift->{ryu} }
79              
80 0   0 0 0 0 sub prefix { shift->{prefix} //= 'jobs' }
81              
82             sub prefixed_queue {
83 0     0 0 0 my ($self, $q) = @_;
84 0 0       0 return $q unless length(my $prefix = $self->prefix);
85 0         0 return join '::', $self->prefix, $q;
86             }
87 0   0 0 0 0 sub queue { shift->{queue} //= 'pending' }
88              
89             =head2 start
90              
91             =cut
92              
93             sub start {
94 0     0 1 0 my ($self) = @_;
95 0         0 local $log->{context}{client_id} = $self->id;
96             try {
97             $log->tracef("Client awaiting Redis connections via %s", '' . $self->uri);
98             return Future->wait_all(
99             $self->client->connect,
100             $self->submitter->connect,
101             $self->subscriber->connect
102             )->then(sub {
103 0     0   0 local $log->{context}{client_id} = $self->id;
104 0         0 $log->tracef("Subscribing to notifications");
105 0         0 return $self->subscriber
106             ->subscribe('client::' . $self->id)
107             ->on_done(
108             $self->curry::weak::on_subscribed
109             );
110             })
111 0         0 } catch {
112             $log->errorf('Failed on connection setup - %s', $@);
113             die $@;
114             }
115             }
116              
117             =head2 on_subscribed
118              
119             =cut
120              
121             sub on_subscribed {
122 0     0 1 0 my ($self, $sub) = @_;
123 0         0 local $log->{context}{client_id} = $self->id;
124             # Every time someone tells us they finished a job, we pull back the details
125             # and check the results
126             $sub->events
127             ->map('payload')
128             ->each(sub {
129 0     0   0 my ($id) = @_;
130 0         0 local @{$log->{context}}{qw(client_id job_id)} = ($self->id, $id);
  0         0  
131 0         0 $log->tracef("Received job notification");
132 0         0 my $job = $self->pending_job($id);
133 0         0 my $client = $self->client;
134             ($job ? $client->hmget('job::' . $id, 'result')->then(sub {
135 0         0 local @{$log->{context}}{qw(client_id job_id)} = ($self->id, $id);
  0         0  
136 0         0 my ($result) = @{$_[0]};
  0         0  
137 0         0 my $type = substr $result, 0, 1, '';
138 0 0       0 $result = decode_json_utf8($result) if $type eq 'J';
139 0         0 $log->tracef('Job result %s', $result);
140 0         0 $job->done($result);
141             }) : Future->done)->then(sub {
142 0         0 local @{$log->{context}}{qw(client_id job_id)} = ($self->id, $id);
  0         0  
143 0         0 $log->tracef('Removing job data');
144 0         0 $client->del('job::' . $id);
145             })->on_fail(sub {
146 0         0 local @{$log->{context}}{qw(client_id job_id)} = ($self->id, $id);
  0         0  
147 0         0 $log
148 0 0       0 })->retain;
149 0         0 });
150              
151 0         0 $log->tracef("Redis connections established, starting client operations");
152             }
153              
154             sub submit {
155 0     0 1 0 my $self = shift;
156             my $job = (@_ == 1)
157             ? shift
158 0 0       0 : do {
159 0         0 Job::Async::Job->new(
160             future => $self->loop->new_future,
161             id => Job::Async::Utils::uuid(),
162             data => { @_ },
163             );
164             };
165 0         0 $self->{pending_job}{$job->id} = $job;
166             my $code = sub {
167 0     0   0 my $tx = shift;
168 0   0     0 my $id = $job->id // die 'no job ID?';
169             return Future->needs_all(
170             $tx->hmset(
171             'job::' . $id,
172             _reply_to => $self->id,
173             _queued => Time::HiRes::time(),
174 0         0 %{ $job->flattened_data }
175             ),
176             $tx->lpush($self->prefixed_queue($self->queue), $id)
177             ->on_done(sub {
178 0         0 my ($count) = @_;
179 0         0 local @{$log->{context}}{qw(client_id job_id)} = ($self->id, $id);
  0         0  
180 0         0 $log->tracef('Job count for [%s] now %d', $self->queue, $count);
181 0         0 $self->queue_length
182             ->emit($count);
183             })
184 0         0 );
185 0         0 };
186             return (
187             $self->use_multi
188             ? $self->submitter->multi($code)
189             : $code->($self->submitter)
190 0     0   0 )->then(sub { $job->future })
191 0 0       0 ->retain
192             }
193              
194             sub queue_length {
195 0     0 0 0 my ($self) = @_;
196 0   0     0 $self->{queue_length} ||= $self->ryu->source(
197             label => 'Currently pending events for ' . $self->queue
198             );
199             }
200              
201 0     0 0 0 sub use_multi { shift->{use_multi} }
202              
203             sub pending_job {
204 0     0 0 0 my ($self, $id) = @_;
205 0 0       0 die 'no ID' unless defined $id;
206 0         0 return delete $self->{pending_job}{$id}
207             }
208              
209             sub configure {
210 2     2 1 319 my ($self, %args) = @_;
211 2         7 for (qw(queue uri use_multi prefix)) {
212 8 50       21 $self->{$_} = delete $args{$_} if exists $args{$_};
213             }
214 2         13 $self->next::method(%args)
215             }
216              
217 0     0 0   sub uri { shift->{uri} }
218              
219             1;
220              
221             =head1 AUTHOR
222              
223             Tom Molesworth
224              
225             =head1 LICENSE
226              
227             Copyright Tom Molesworth 2016-2017. Licensed under the same terms as Perl itself.
228