File Coverage

blib/lib/Job/Async/Client/Redis.pm
Criterion Covered Total %
statement 28 96 29.1
branch 1 12 8.3
condition 0 9 0.0
subroutine 9 28 32.1
pod 7 15 46.6
total 45 160 28.1


line stmt bran cond sub pod time code
1             package Job::Async::Client::Redis;
2              
3 3     3   133518 use strict;
  3         25  
  3         93  
4 3     3   17 use warnings;
  3         5  
  3         76  
5              
6 3     3   26 use mro;
  3         7  
  3         20  
7              
8 3     3   992 use parent qw(Job::Async::Client);
  3         621  
  3         16  
9              
10             our $VERSION = '0.002'; # VERSION
11              
12             =head1 NAME
13              
14             Job::Async::Client::Redis - L client implementation for L
15              
16             =head1 SYNOPSIS
17              
18             =head1 DESCRIPTION
19              
20             =cut
21              
22 3     3   59767 use Ryu::Async;
  3         368742  
  3         101  
23 3     3   915 use Job::Async::Utils;
  3         310015  
  3         122  
24 3     3   994 use Net::Async::Redis 1.003;
  3         39626  
  3         100  
25              
26 3     3   21 use Log::Any qw($log);
  3         10  
  3         17  
27              
28             # Our client has a single Redis connection, a UUID to
29             # represent the client, and expects to see job announcements
30             # on the pubsub channel client::$client_id. For each
31             # announcement, the payload represents the job ID, and we get
32             # the actual details from the job hash.
33              
34             sub _add_to_loop {
35 0     0   0 my ($self) = @_;
36             $self->add_child(
37 0         0 $self->{client} = Net::Async::Redis->new(
38             uri => $self->uri,
39             )
40             );
41             $self->add_child(
42 0         0 $self->{subscriber} = Net::Async::Redis->new(
43             uri => $self->uri,
44             )
45             );
46             $self->add_child(
47 0         0 $self->{submitter} = Net::Async::Redis->new(
48             uri => $self->uri,
49             )
50             );
51             $self->add_child(
52 0         0 $self->{ryu} = Ryu::Async->new
53             );
54             }
55              
56             =head2 client
57              
58             =cut
59              
60 0     0 1 0 sub client { shift->{client} }
61              
62             =head2 subscriber
63              
64             =cut
65              
66 0     0 1 0 sub subscriber { shift->{subscriber} }
67              
68             =head2 submitter
69              
70             =cut
71              
72 0     0 1 0 sub submitter { shift->{submitter} }
73              
74 0     0 0 0 sub ryu { shift->{ryu} }
75              
76 0   0 0 0 0 sub prefix { shift->{prefix} //= 'jobs' }
77              
78             sub prefixed_queue {
79 0     0 0 0 my ($self, $q) = @_;
80 0 0       0 return $q unless length(my $prefix = $self->prefix);
81 0         0 return join '::', $self->prefix, $q;
82             }
83 0   0 0 0 0 sub queue { shift->{queue} //= 'pending' }
84              
85             =head2 start
86              
87             =cut
88              
89             sub start {
90 0     0 1 0 my ($self) = @_;
91 0         0 local $log->{context}{client_id} = $self->id;
92 0         0 $log->tracef("Client awaiting Redis connections");
93             Future->wait_all(
94             $self->client->connect,
95             $self->submitter->connect,
96             $self->subscriber->connect
97             )->then(sub {
98 0     0   0 local $log->{context}{client_id} = $self->id;
99 0         0 $log->tracef("Subscribing to notifications");
100 0         0 return $self->subscriber
101             ->subscribe('client::' . $self->id)
102             ->on_done(
103             $self->curry::weak::on_subscribed
104             );
105             })
106 0         0 }
107              
108             =head2 on_subscribed
109              
110             =cut
111              
112             sub on_subscribed {
113 0     0 1 0 my ($self, $sub) = @_;
114 0         0 local $log->{context}{client_id} = $self->id;
115             # Every time someone tells us they finished a job, we pull back the details
116             # and check the results
117             $sub->events
118             ->map('payload')
119             ->each(sub {
120 0     0   0 my ($id) = @_;
121 0         0 local @{$log->{context}}{qw(client_id job_id)} = ($self->id, $id);
  0         0  
122 0         0 $log->tracef("Received job notification");
123 0         0 my $job = $self->pending_job($id);
124 0         0 my $client = $self->client;
125             ($job ? $client->hmget('job::' . $id, 'result')->then(sub {
126 0         0 local @{$log->{context}}{qw(client_id job_id)} = ($self->id, $id);
  0         0  
127 0         0 my ($result) = @{$_[0]};
  0         0  
128 0         0 $log->tracef('Job result %s', $result);
129 0         0 $job->done($result);
130             }) : Future->done)->then(sub {
131 0         0 local @{$log->{context}}{qw(client_id job_id)} = ($self->id, $id);
  0         0  
132 0         0 $log->tracef('Removing job data');
133 0         0 $client->del('job::' . $id);
134             })->on_fail(sub {
135 0         0 local @{$log->{context}}{qw(client_id job_id)} = ($self->id, $id);
  0         0  
136 0         0 $log
137 0 0       0 })->retain;
138 0         0 });
139              
140 0         0 $log->tracef("Redis connections established, starting client operations");
141             }
142              
143             sub submit {
144 0     0 1 0 my $self = shift;
145             my $job = (@_ == 1)
146             ? shift
147 0 0       0 : do {
148 0         0 Job::Async::Job->new(
149             future => $self->loop->new_future,
150             id => Job::Async::Utils::uuid(),
151             data => { @_ },
152             );
153             };
154 0         0 $self->{pending_job}{$job->id} = $job;
155             my $code = sub {
156 0     0   0 my $tx = shift;
157 0   0     0 my $id = $job->id // die 'no job ID?';
158             (
159             $tx->hmset(
160             'job::' . $id,
161             _reply_to => $self->id,
162 0         0 %{ $job->flattened_data }
163             ),
164             $tx->lpush($self->prefixed_queue($self->queue), $id)
165             ->on_done(sub {
166 0         0 my ($count) = @_;
167 0         0 local @{$log->{context}}{qw(client_id job_id)} = ($self->id, $id);
  0         0  
168 0         0 $log->tracef('Job count for [%s] now %d', $self->queue, $count);
169 0         0 $self->queue_length
170             ->emit($count);
171             })
172 0         0 )
173 0         0 };
174             ($self->use_multi
175             ? $self->submitter->multi($code)
176             : Future->needs_all($code->($self->submitter))
177 0     0   0 )->then(sub { $job->future })
178 0 0       0 ->retain
179             }
180              
181             sub queue_length {
182 0     0 0 0 my ($self) = @_;
183 0   0     0 $self->{queue_length} ||= $self->ryu->source(
184             label => 'Currently pending events for ' . $self->queue
185             );
186             }
187              
188 0     0 0 0 sub use_multi { shift->{use_multi} }
189              
190             sub pending_job {
191 0     0 0 0 my ($self, $id) = @_;
192 0 0       0 die 'no ID' unless defined $id;
193 0         0 return delete $self->{pending_job}{$id}
194             }
195              
196             sub configure {
197 2     2 1 424 my ($self, %args) = @_;
198 2         8 for (qw(queue uri use_multi prefix)) {
199 8 50       22 $self->{$_} = delete $args{$_} if exists $args{$_};
200             }
201 2         16 $self->next::method(%args)
202             }
203              
204 0     0 0   sub uri { shift->{uri} }
205              
206             1;
207              
208             =head1 AUTHOR
209              
210             Tom Molesworth
211              
212             =head1 LICENSE
213              
214             Copyright Tom Molesworth 2016-2017. Licensed under the same terms as Perl itself.
215