File Coverage

blib/lib/Mojo/Redis/Processor.pm
Criterion Covered Total %
statement 52 109 47.7
branch 21 38 55.2
condition 0 6 0.0
subroutine 12 26 46.1
pod 5 5 100.0
total 90 184 48.9


line stmt bran cond sub pod time code
1             package Mojo::Redis::Processor;
2              
3 1     1   52123 use Carp;
  1         1  
  1         46  
4 1     1   384 use Array::Utils qw (array_minus);
  1         262  
  1         49  
5 1     1   5 use Digest::MD5 qw(md5_hex);
  1         3  
  1         35  
6 1     1   17 use Time::HiRes qw(usleep);
  1         2  
  1         7  
7 1     1   571 use Mojo::Redis2;
  1         95362  
  1         8  
8 1     1   606 use RedisDB;
  1         20885  
  1         32  
9 1     1   595 use JSON::XS qw(encode_json decode_json);
  1         3799  
  1         51  
10 1     1   5 use strict;
  1         1  
  1         14  
11 1     1   2 use warnings;
  1         2  
  1         778  
12              
13             =head1 NAME
14              
15             Mojo::Redis::Processor - Encapsulates the process for a Mojo app to send an expensive job to a daemon using Redis underneath and Redis SET NX and Redis Pub/Sub.
16              
17             =head1 VERSION
18              
19             Version 0.02
20              
21             =cut
22              
23             our $VERSION = '0.06';
24              
25             =head1 DESCRIPTION
26              
27             This module is specialized to help a Mojo app to send an expensive job request to be processed in parallel in a separete daemon. Communication is handled through Redis.
28              
29             This is specialized for processing tasks that can be common between different running Mojo children. Race condition between children to add a new tasks is handle by Redis SET NX capability.
30              
31             =head1 Example
32              
33             Mojo app which wants to send data and get stream of processed results will look like:
34              
35             use Mojo::Redis::Processor;
36             use Mojolicious::Lite;
37              
38             my $rp = Mojo::Redis::Processor->new({
39             data => 'Data',
40             trigger => 'R_25',
41             });
42              
43             $rp->send();
44             my $redis_channel = $rp->on_processed(
45             sub {
46             my ($message, $channel) = @_;
47             print "Got a new result [$message]\n";
48             });
49              
50             app->start;
51              
52             Try it like:
53              
54             $ perl -Ilib ws.pl daemon
55              
56              
57             Processor daemon code will look like:
58              
59             use Mojo::Redis::Processor;
60             use Parallel::ForkManager;
61              
62             use constant MAX_WORKERS => 1;
63              
64             $pm = new Parallel::ForkManager(MAX_WORKERS);
65              
66             while (1) {
67             my $pid = $pm->start and next;
68              
69             my $rp = Mojo::Redis::Processor->new;
70              
71             $next = $rp->next();
72             if ($next) {
73             print "next job started [$next].\n";
74              
75             $redis_channel = $rp->on_trigger(
76             sub {
77             my $payload = shift;
78             print "processing payload\n";
79             return rand(100);
80             });
81             print "Job done, exiting the child!\n";
82             } else {
83             print "no job found\n";
84             sleep 1;
85             }
86             $pm->finish;
87             }
88              
89             Try it like:
90              
91             $ perl -Ilib daemon.pl
92              
93             Daemon needs to pick a forking method and also handle ide processes and timeouts.
94              
95             =cut
96              
97             =head1 METHODS
98              
99             =cut
100              
101             my @ALLOWED = qw(data trigger redis_read redis_write read_conn write_conn daemon_conn prefix expire usleep retry);
102              
103             =head2 C<< new(%Options) >>
104              
105             This will instantiate the object for both reqeust sender and processor. Type depends on options which are passed.
106              
107             =over
108              
109             =item B
110              
111             Data for processing that we pass to the $pricer code.
112              
113             =item B
114              
115             Trigger will be a redis channel that will trigger call of pricer code.
116              
117             =item B
118              
119             Data for processing that we pass to the $pricer code.
120              
121             =item B
122              
123             Redis URL for read and write. Write means there is a central and replicated redis. redis_write will default to redis_read if it is not passed.
124              
125             =item B
126              
127             Setting redis connections directly. daemon_conn is used to wait for trigger.
128              
129             =item B
130              
131             Key prefix that is used in redis. If it is not set it will default to "Redis::Processor::".
132              
133             =item B
134              
135             Expire time that client will set after receiving new price from price processor. Price process will continue to price as long as someone is extending this expiry.
136              
137             =item B
138              
139             Sleep time if there was no job available.
140              
141             =item B
142              
143             Retry time to wait for new job become available. If no job become available next() will return empty.
144              
145             =back
146              
147             This will new the thing.
148              
149             =cut
150              
151             sub new {
152 6     6 1 1698 my $class = shift;
153 6 100       17 my $self = ref $_[0] ? $_[0] : {@_};
154              
155 6         7 my @REQUIRED = qw();
156 6 100       11 if (exists $self->{data}) {
157 3         5 @REQUIRED = qw(data trigger);
158             }
159              
160 6         6 my @missing = grep { !$self->{$_} } @REQUIRED;
  6         10  
161 6 100       16 croak "Error, missing parameters: " . join(',', @missing) if @missing;
162              
163 5         12 my @passed = keys %$self;
164 5         12 my @invalid = array_minus(@passed, @ALLOWED);
165 5 100       94 croak "Error, invalid parameters:" . join(',', @invalid) if @invalid;
166              
167 4         4 bless $self, $class;
168 4         9 $self->_initialize();
169 4         15 return $self;
170             }
171              
172             sub _initialize {
173 4     4   2 my $self = shift;
174 4 100       12 $self->{prefix} = 'Redis::Processor::' if !exists $self->{prefix};
175 4 100       8 $self->{expire} = 60 if !exists $self->{expire};
176 4 100       8 $self->{usleep} = 10 if !exists $self->{usleep};
177 4 100       7 $self->{redis_read} = 'redis://127.0.0.1:6379/0' if !exists $self->{redis_read};
178 4 100       8 $self->{redis_write} = $self->{redis_read} if !exists $self->{redis_write};
179 4 100       8 $self->{retry} = 1 if !exists $self->{retry};
180              
181 4         7 $self->{_job_counter} = $self->{prefix} . 'job';
182 4         8 $self->{_worker_counter} = $self->{prefix} . 'worker';
183             }
184              
185             sub _job_load {
186 0     0   0 my $self = shift;
187 0         0 my $job = shift;
188 0         0 return $self->{prefix} . 'load::' . $job;
189             }
190              
191             sub _unique {
192 0     0   0 my $self = shift;
193 0         0 return $self->{prefix} . md5_hex($self->_payload);
194             }
195              
196             sub _payload {
197 0     0   0 my $self = shift;
198 0         0 return JSON::XS::encode_json([$self->{data}, $self->{trigger}]);
199             }
200              
201             sub _processed_channel {
202 0     0   0 my $self = shift;
203 0         0 return $self->_unique;
204             }
205              
206             sub _read {
207 0     0   0 my $self = shift;
208 0 0       0 $self->{read_conn} = Mojo::Redis2->new(url => $self->{redis_read}) if !$self->{read_conn};
209              
210 0         0 return $self->{read_conn};
211             }
212              
213             sub _write {
214 1     1   3 my $self = shift;
215              
216 1 50       8 $self->{write_conn} = RedisDB->new(url => $self->{redis_write}) if !$self->{write_conn};
217 0           return $self->{write_conn};
218             }
219              
220             sub _daemon_redis {
221 0     0     my $self = shift;
222              
223 0 0         $self->{daemon_conn} = RedisDB->new(url => $self->{redis_write}) if !$self->{daemon_conn};
224 0           return $self->{daemon_conn};
225             }
226              
227             =head2 C<< send() >>
228              
229             Will send the Mojo app data processing request. This is mainly a queueing job. Job will expire if no worker take it in time. If more than one app try to register the same job Redis SET NX will only assign one of them to proceed.
230              
231             =cut
232              
233             sub send {
234 0     0 1   my $self = shift;
235              
236             # race for setting a unique key
237 0 0         if ($self->_write->setnx($self->_unique, 1)) {
238             # if successful first set the key TTL. It must go away if no worker took the job.
239 0           $self->_write->expire($self->_unique, $self->{expire});
240              
241 0           my $job = $self->_write->incr($self->{_job_counter});
242 0           $self->_write->set($self->_job_load($job), $self->_payload);
243 0           $self->_write->expire($self->_job_load($job), $self->{expire});
244             }
245             }
246              
247             =head2 C<< on_processed($code) >>
248              
249             Mojo app will call this to register a code reference that will be triggered everytime there is a result. Results will be triggered and published based on trigger option.
250              
251             =cut
252              
253             sub on_processed {
254 0     0 1   my $self = shift;
255 0           my $code = shift;
256              
257             $self->_read->on(
258             message => sub {
259 0     0     my ($redis, $msg, $channel) = @_;
260 0           $code->($msg, $channel);
261 0           });
262 0           $self->_read->subscribe([$self->_processed_channel]);
263             }
264              
265             =head2 C<< next() >>
266              
267             Daemon will call this to start the next job. If it return empty it meam there was no job found after "retry".
268              
269             =cut
270              
271             sub next {
272 0     0 1   my $self = shift;
273              
274 0           my $last_job = $self->_read->get($self->{_job_counter});
275 0           my $last_worker = $self->_read->get($self->{_worker_counter});
276              
277 0 0 0       return if (!$last_job or ($last_worker && $last_job <= $last_worker));
      0        
278              
279 0           my $next = $self->_write->incr($self->{_worker_counter});
280 0           my $payload;
281              
282 0           for (my $i = 0; $i < $self->{retry}; $i++) {
283 0 0         last if $payload = $self->_read->get($self->_job_load($next));
284 0           usleep($self->{usleep});
285             }
286 0 0         return if not $payload;
287              
288 0           my $tmp = JSON::XS::decode_json($payload);
289              
290 0           $self->{data} = $tmp->[0];
291 0           $self->{trigger} = $tmp->[1];
292              
293 0           return $next;
294             }
295              
296             sub _expired {
297 0     0     my $self = shift;
298              
299 0 0         return 1 if $self->_read->ttl($self->_unique) <= 0;
300 0           return;
301             }
302              
303             =head2 C<< on_trigger() >>
304              
305             Daemon will call this to register a processor code reference that will be called everytime trigger happens.
306             The return value will be passed to Mojo apps which requested it using Redis Pub/Sub system.
307             on_trigger will exit the loop when there is no more subscriber to the channel.
308              
309             =cut
310              
311             sub on_trigger {
312 0     0 1   my $self = shift;
313 0           my $pricer = shift;
314              
315             $self->_daemon_redis->subscription_loop(
316             default_callback => sub {
317 0     0     my $c = shift;
318 0           my $count = $self->_publish($pricer->($self->{data}));
319 0           $self->_write->expire($self->_unique, $self->{expire});
320 0 0         if ($count == 0) {
321 0           $c->unsubscribe();
322 0           $self->_write->del($self->_unique);
323             }
324             },
325 0           subscribe => [$self->{trigger}]);
326             }
327              
328             sub _publish {
329 0     0     my $self = shift;
330 0           my $result = shift;
331              
332 0           $self->_write->publish($self->_processed_channel, $result);
333             }
334              
335             =head1 AUTHOR
336              
337             Binary.com, C<< >>
338              
339             =head1 BUGS
340              
341             Please report any bugs or feature requests to C, or through
342             the web interface at L. I will be notified, and then you'll
343             automatically be notified of progress on your bug as I make changes.
344              
345              
346              
347              
348             =head1 SUPPORT
349              
350             You can find documentation for this module with the perldoc command.
351              
352             perldoc Mojo::Redis::Processor
353              
354              
355             You can also look for information at:
356              
357             =over 4
358              
359             =item * RT: CPAN's request tracker (report bugs here)
360              
361             L
362              
363             =item * AnnoCPAN: Annotated CPAN documentation
364              
365             L
366              
367             =item * CPAN Ratings
368              
369             L
370              
371             =item * Search CPAN
372              
373             L
374              
375             =back
376              
377              
378             =head1 ACKNOWLEDGEMENTS
379              
380              
381             =head1 LICENSE AND COPYRIGHT
382              
383             Copyright 2016 Binary.com.
384              
385             This program is free software; you can redistribute it and/or modify it
386             under the terms of the the Artistic License (2.0). You may obtain a
387             copy of the full license at:
388              
389             L
390              
391             Any use, modification, and distribution of the Standard or Modified
392             Versions is governed by this Artistic License. By using, modifying or
393             distributing the Package, you accept this license. Do not use, modify,
394             or distribute the Package, if you do not accept this license.
395              
396             If your Modified Version has been derived from a Modified Version made
397             by someone other than you, you are nevertheless required to ensure that
398             your Modified Version complies with the requirements of this license.
399              
400             This license does not grant you the right to use any trademark, service
401             mark, tradename, or logo of the Copyright Holder.
402              
403             This license includes the non-exclusive, worldwide, free-of-charge
404             patent license to make, have made, use, offer to sell, sell, import and
405             otherwise transfer the Package with respect to any patent claims
406             licensable by the Copyright Holder that are necessarily infringed by the
407             Package. If you institute patent litigation (including a cross-claim or
408             counterclaim) against any party alleging that the Package constitutes
409             direct or contributory patent infringement, then this Artistic License
410             to you shall terminate on the date that such litigation is filed.
411              
412             Disclaimer of Warranty: THE PACKAGE IS PROVIDED BY THE COPYRIGHT HOLDER
413             AND CONTRIBUTORS "AS IS' AND WITHOUT ANY EXPRESS OR IMPLIED WARRANTIES.
414             THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
415             PURPOSE, OR NON-INFRINGEMENT ARE DISCLAIMED TO THE EXTENT PERMITTED BY
416             YOUR LOCAL LAW. UNLESS REQUIRED BY LAW, NO COPYRIGHT HOLDER OR
417             CONTRIBUTOR WILL BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, OR
418             CONSEQUENTIAL DAMAGES ARISING IN ANY WAY OUT OF THE USE OF THE PACKAGE,
419             EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
420              
421             =cut
422              
423             1;