File Coverage

blib/lib/Queue/Q/ReliableFIFO/Lua.pm
Criterion Covered Total %
statement 15 55 27.2
branch 0 20 0.0
condition 0 10 0.0
subroutine 5 8 62.5
pod 0 3 0.0
total 20 96 20.8


line stmt bran cond sub pod time code
1             package Queue::Q::ReliableFIFO::Lua;
2 1     1   5 use Redis;
  1         2  
  1         24  
3 1     1   874 use File::Slurp;
  1         13516  
  1         75  
4 1     1   749 use Digest::SHA1;
  1         745  
  1         46  
5 1     1   6 use Carp qw(croak);
  1         2  
  1         50  
6             use Class::XSAccessor {
7 1         10 getters => [qw(redis_conn script_dir)]
8 1     1   5 };
  1         2  
9             my %Scripts;
10              
11             sub new {
12 0     0 0   my $class = shift;
13 0           my $self = bless { @_ }, $class;
14 0 0         $self->redis_conn || croak("need a redis connection");
15 0   0       $self->{script_dir} ||= $ENV{LUA_SCRIPT_DIR};
16 0   0       $self->{call} ||= {};
17 0           $self->register;
18 0           return $self;
19             }
20              
21             sub register {
22 0     0 0   my $self = shift;
23 0           my $name = shift;
24 0 0         if ($self->script_dir) {
25 0   0       $name ||= '*';
26 0           for my $file (glob("$self->{script_dir}/$name.lua")) {
27 0           my $script = read_file($file);
28 0           my $sha1 = Digest::SHA1::sha1_hex($script);
29 0           my ($found) = @{$self->redis_conn->script_exists($sha1)};
  0            
30 0 0         if (!$found) {
31 0           print "registering $file\n";
32 0           my $rv = $self->redis_conn->script_load($script);
33 0 0         croak("returned sha1 is different from ours!")
34             if ($rv ne $sha1);
35             }
36 0           (my $call = $file) =~ s/\.lua$//;
37 0           $call =~ s/^.*\///;
38 0           $self->{call}{$call} = $sha1;
39             }
40             }
41             else {
42 0 0 0       croak("script $name not found") if $name && !exists $Scripts{$name};
43 0 0         my @names = $name ? ($name) : (keys %script);
44 0           for my $scr_name (@names) {
45 0           my $script = $Scripts{$scr_name};
46 0           my $sha1 = Digest::SHA1::sha1_hex($script);
47 0           my ($found) = @{$self->redis_conn->script_exists($sha1)};
  0            
48 0 0         if (!$found) {
49 0           my $rv = $self->redis_conn->script_load($script);
50 0 0         croak("returned sha1 is different from ours!")
51             if ($rv ne $sha1);
52             }
53 0           $self->{call}{$scr_name} = $sha1;
54             }
55             }
56             }
57             sub call {
58 0     0 0   my $self = shift;
59 0           my $name = shift;
60 0 0         $self->register($name) if not exists $self->{call}{$name};
61 0           my $sha1 = $self->{call}{$name};
62 0 0         croak("Unknown script $name") if ! $sha1;
63 0           return $self->redis_conn->evalsha($sha1, @_);
64             }
65              
66             ##################################
67             # Start of Lua script section
68             %Scripts = (
69             remove_failed_gentle => q{
70             -- remove_failed_gentle: requeue or requeue up to a given number of items
71             -- first call will rename the list to a temporary list
72             -- calls after that will work through the temporary list until is empty
73             -- # KEYS[1] from queue name (failed queue)
74             -- # KEYS[2] temp queue name (temporary queue)
75             -- # KEYS[3] log queue name (temporary queue)
76             -- # ARGV[1] timestamp
77             -- # ARGV[2] number of items to requeue. Value "0" means "all items"
78             -- # ARGV[3] lowest creation timestamp to alloow for the failed queue
79             -- # ARGV[4] failed counter criterium
80             -- # ARGV[5] max amount of log entries
81             --
82             if #KEYS ~= 3 then error('remove_failed_gentle.lua requires 3 keys') end
83             -- redis.log(redis.LOG_NOTICE, "nr keys: " .. #KEYS)
84             local from = assert(KEYS[1], 'failed queue name missing')
85             local temp = assert(KEYS[2], 'temp queue name missing')
86             local log = assert(KEYS[3], 'log queue name missing')
87             local ts = assert(tonumber(ARGV[1]), 'timestamp missing')
88             local num = assert(tonumber(ARGV[2]), 'number of items missing')
89             local tmin = assert(tonumber(ARGV[3]), 'tmin criterium missing')
90             local fc = assert(tonumber(ARGV[4]), 'failed counter criterium missing')
91             local loglimit = assert(tonumber(ARGV[5]), 'log limit missing')
92             local todo = 0
93             local n_removed = 0
94             local do_log = 1
95              
96             if redis.call('exists', temp) == 0 then
97             if redis.call('exists', from) == 1 then
98             redis.call('rename', from, temp)
99             else
100             return "0 0"
101             end
102             end
103              
104             local len = redis.call('llen', temp)
105              
106             if len > 0 then
107             if num == 0 or num > len then
108             num = len
109             end
110              
111             -- we don't want millions of log items.
112             if redis.call('llen', log) >= loglimit then
113             do_log = 0
114             end
115              
116             for i = 1, num do
117             local item = redis.call('rpop', temp)
118             if not item then break end
119              
120             local i = cjson.decode(item)
121              
122             if i.fc >= fc or i.t < tmin or (i.created ~= nil and i.created < tmin) then
123             -- item should be removed
124             n_removed = n_removed + 1
125             if do_log then
126             -- permanent fail, put it in a log queue
127             redis.call('lpush', log, item)
128             end
129             else
130             -- it's not yet a permanent fail
131             redis.call('lpush', from, item)
132             end
133             end
134             end
135              
136             -- throw away exceeding log entries
137             if do_log then
138             redis.call('ltrim',log,0,loglimit-1)
139             end
140              
141             -- return number of items handled and number of items removed from the failed
142             -- queue, space separated (as a string)
143             todo = len - num
144             return todo .. ' ' .. n_removed
145             },
146             requeue_busy => q{
147             -- requeue_busy (depending requeue limit items will be requeued or fail)
148             -- # KEYS[1] from queue name (busy queue)
149             -- # KEYS[2] dest queue name (main queue)
150             -- # KEYS[3] failed queue name (failed queue)
151             -- # ARGV[1] timestamp
152             -- # ARGV[2] item
153             -- # ARGV[3] requeue limit
154             -- # ARGV[4] place to requeue in dest-queue:
155             -- 0: at producer side, 1: consumer side
156             -- Note: failed items will always go to the tail of the failed queue
157             -- # ARGV[5] OPTIONAL error message
158             --
159             --redis.log(redis.LOG_WARNING, "requeue_tail")
160             if #KEYS ~= 3 then error('requeue_busy requires 3 keys') end
161             -- redis.log(redis.LOG_NOTICE, "nr keys: " .. #KEYS)
162             local from = assert(KEYS[1], 'busy queue name missing')
163             local dest = assert(KEYS[2], 'dest queue name missing')
164             local failed= assert(KEYS[3], 'failed queue name missing')
165             local ts = assert(tonumber(ARGV[1]), 'timestamp missing')
166             local item = assert(ARGV[2], 'item missing')
167             local limit = assert(tonumber(ARGV[3]), 'requeue limit missing')
168             local place = tonumber(ARGV[4])
169             assert(place == 0 or place == 1, 'requeue place should be 0 or 1')
170              
171             local n = redis.call('lrem', from, 1, item)
172              
173             if n > 0 then
174             local i= cjson.decode(item)
175             if i.rc == nil then
176             i.rc=1
177             else
178             i.rc=i.rc+1
179             end
180              
181             if i.rc <= limit then
182             -- only adjust timestamps in case of requeuing
183             -- (not if busy item is place back in the front of the queue)
184             if place == 0 then
185             if i.t_created == nil then
186             i.t_created = i.t
187             end
188             i.t = ts
189             end
190              
191             local v=cjson.encode(i)
192             if place == 0 then
193             redis.call('lpush', dest, v)
194             else
195             redis.call('rpush', dest, v)
196             end
197             else
198             -- reset requeue counter and increase fail counter
199             i.rc = nil
200             if i.fc == nil then
201             i.fc = 1
202             else
203             i.fc = i.fc + 1
204             end
205             if #ARGV == 5 then
206             i.error = ARGV[5]
207             else
208             i.error = nil
209             end
210             local v=cjson.encode(i)
211             redis.call('lpush', failed, v)
212             end
213             end
214             return n
215             },
216             requeue_failed => q{
217             -- requeue_failed: requeue a given number of failed items
218             -- # KEYS[1] from queue name (failed queue)
219             -- # KEYS[2] dest queue name (main queue)
220             -- # ARGV[1] timestamp
221             -- # ARGV[2] number of items to requeue. Value "0" means "all items"
222             --
223             if #KEYS ~= 2 then error('requeue_failed requires 2 keys') end
224             -- redis.log(redis.LOG_NOTICE, "nr keys: " .. #KEYS)
225             local from = assert(KEYS[1], 'failed queue name missing')
226             local dest = assert(KEYS[2], 'dest queue name missing')
227             local ts = assert(tonumber(ARGV[1]), 'timestamp missing')
228             local num = assert(tonumber(ARGV[2]), 'number of items missing')
229             local n = 0;
230              
231             if num == 0 then
232             num = redis.call('llen', from)
233             end
234              
235             for i = 1, num do
236             local item = redis.call('rpop', from);
237             if item == nil then break end
238              
239             local i = cjson.decode(item)
240              
241             if i.t_created == nil then
242             i.t_created = i.t
243             end
244             i.t = ts
245              
246             local v = cjson.encode(i)
247             redis.call('lpush', dest, v)
248              
249             n = n + 1
250             end
251             return n
252             },
253             requeue_failed_gentle => q{
254             -- requeue_failed_gentle: requeue or requeue up to a given number of items
255             -- first call will rename the list to a temporary list
256             -- calls after that will work through the temporary list until is empty
257             -- # KEYS[1] from queue name (failed queue)
258             -- # KEYS[2] dest queue name (main queue)
259             -- # KEYS[3] temp queue name (temporary queue)
260             -- # ARGV[1] timestamp
261             -- # ARGV[2] number of items to requeue. Value "0" means "all items"
262             -- # ARGV[3] delay before trying again after a fail
263             -- # ARGV[4] failed counter criterium
264             --
265             if #KEYS ~= 3 then error('requeue_failed_gentle.lua requires 3 keys') end
266             -- redis.log(redis.LOG_NOTICE, "nr keys: " .. #KEYS)
267             local from = assert(KEYS[1], 'failed queue name missing')
268             local dest = assert(KEYS[2], 'dest queue name missing')
269             local temp = assert(KEYS[3], 'temp queue name missing')
270             local ts = assert(tonumber(ARGV[1]), 'timestamp missing')
271             local num = assert(tonumber(ARGV[2]), 'number of items missing')
272             local delay = assert(tonumber(ARGV[3]), 'delay criterium missing')
273             local fc = assert(tonumber(ARGV[4]), 'failed counter criterium missing')
274             local n_requeued = 0
275             local tmin = 0
276              
277             if redis.call('exists', temp) == 0 then
278             if redis.call('exists', from) == 1 then
279             redis.call('rename', from, temp)
280             else
281             return "0 0"
282             end
283             end
284              
285             local len = redis.call('llen', temp)
286              
287             if len > 0 then
288             if num == 0 or num > len then
289             num = len
290             end
291              
292             for i = 1, num do
293             local item = redis.call('rpop', temp);
294             if not item then break end
295              
296             local i = cjson.decode(item)
297             tmin = ts - i.fc*delay
298              
299             if (fc == -1 or i.fc <= fc) and
300             (i.t <= tmin or (i.created ~= nil and i.created <=tmin)) then
301              
302             -- item should be requeued
303             n_requeued = n_requeued + 1
304             if i.t_created == nil then
305             i.t_created = i.t
306             end
307             i.t = ts
308              
309             local v = cjson.encode(i)
310             redis.call('lpush', dest, v)
311             else
312             -- put it back in failed queue
313             redis.call('lpush', from, item)
314             end
315             end
316             end
317              
318             -- return number of items handled and number of items removed from the failed
319             -- queue, space separated (as a string)
320             local todo = len - num
321             return todo .. ' ' .. n_requeued
322             },
323             requeue_failed_item => q{
324             -- Requeue_busy_items
325             -- # KEYS[1] from queue name (failed queue)
326             -- # KEYS[2] dest queue name (main queue)
327             -- # ARGV[1] timestamp
328             -- # ARGV[2] item
329             --
330             -- redis.log(redis.LOG_WARNING, "requeue_tail")
331             if #KEYS ~= 2 then error('requeue_failed_item requires 2 keys') end
332             -- redis.log(redis.LOG_NOTICE, "nr keys: " .. #KEYS)
333             local from = assert(KEYS[1], 'failed queue name missing')
334             local dest = assert(KEYS[2], 'dest queue name missing')
335             local ts = assert(tonumber(ARGV[1]), 'timestamp missing')
336             local item = assert(ARGV[2], 'item missing')
337              
338             local n = redis.call('lrem', from, 1, item)
339              
340             if n > 0 then
341             local i = cjson.decode(item)
342              
343             if i.t_created == nil then
344             i.t_created = i.t
345             end
346             i.t = ts
347              
348             local v = cjson.encode(i)
349             redis.call('lpush', dest, v)
350             end
351             return n
352             });
353              
354             1;
355              
356             __END__