File Coverage

blib/lib/Redis/Setlock.pm
Criterion Covered Total %
statement 44 151 29.1
branch 0 52 0.0
condition 0 21 0.0
subroutine 15 33 45.4
pod 1 11 9.0
total 60 268 22.3


line stmt bran cond sub pod time code
1             package Redis::Setlock;
2 15     15   1765779 use 5.008001;
  15         110  
3 15     15   85 use strict;
  15         23  
  15         304  
4 15     15   79 use warnings;
  15         27  
  15         353  
5 15     15   10698 use Redis;
  15         640813  
  15         489  
6 15     15   11869 use Getopt::Long ();
  15         163474  
  15         489  
7 15     15   8348 use Pod::Usage;
  15         760177  
  15         2209  
8 15     15   8868 use Log::Minimal;
  15         200638  
  15         102  
9 15     15   1996 use Try::Tiny;
  15         33  
  15         882  
10 15     15   110 use Time::HiRes qw/ sleep /;
  15         33  
  15         162  
11 15     15   2647 use Carp;
  15         33  
  15         808  
12 15     15   9032 use Digest::SHA qw/ sha1_hex /;
  15         42046  
  15         1269  
13 15     15   7712 use Guard ();
  15         7503  
  15         1158  
14              
15             our $VERSION = "0.13";
16             our $DEFAULT_EXPIRES = 86400;
17             our $RETRY_INTERVAL = 0.5;
18             our $BLOCKING_KEY_POSTFIX = ":wait";
19             our $WAIT_QUEUE = 0;
20             our $WARN_LOCK_TIME_THRESHOLD = 0;
21              
22             use constant {
23 15         1450 EXIT_CODE_ERROR => 111,
24 15     15   111 };
  15         35  
25              
26 15         1040 use constant UNLOCK_LUA_SCRIPT => <<'END_OF_SCRIPT'
27             if redis.call("get",KEYS[1]) == ARGV[1]
28             then
29             return redis.call("del",KEYS[1])
30             else
31             return 0
32             end
33             END_OF_SCRIPT
34 15     15   103 ;
  15         30  
35              
36 15         25702 use constant BLOCKING_UNLOCK_LUA_SCRIPT_TMPL => <<'END_OF_SCRIPT'
37             if redis.call("get",KEYS[1]) == ARGV[1]
38             then
39             local q = KEYS[1].."%s"
40             redis.call("del", KEYS[1])
41             redis.call("expire", q, 60)
42             return redis.call("lpush", q, ARGV[1])
43             else
44             return 0
45             end
46             END_OF_SCRIPT
47 15     15   107 ;
  15         32  
48              
49             sub BLOCKING_UNLOCK_LUA_SCRIPT {
50 0     0 0   sprintf BLOCKING_UNLOCK_LUA_SCRIPT_TMPL, $BLOCKING_KEY_POSTFIX;
51             }
52              
53             sub parse_options {
54 0     0 0   my (@argv) = @_;
55              
56 0           my $p = Getopt::Long::Parser->new(
57             config => [qw/posix_default no_ignore_case auto_help bundling pass_through/]
58             );
59 0           my $opt = {
60             wait => 1,
61             exit_code => EXIT_CODE_ERROR,
62             };
63 0 0         $p->getoptionsfromarray(\@argv, $opt, qw/
64             redis=s
65             expires=i
66             keep
67             n
68             N
69             x
70             X
71             version
72             /) or pod2usage;
73              
74 0 0         if ($opt->{version}) {
75 0           print STDERR "version: $VERSION\n";
76 0           exit 0;
77             }
78 0 0         $opt->{wait} = 0 if $opt->{n}; # no delay
79 0 0         $opt->{exit_code} = 0 if $opt->{x}; # exit code 0
80 0 0         $opt->{expires} = $DEFAULT_EXPIRES unless defined $opt->{expires};
81              
82 0           return ($opt, @argv);
83             }
84              
85             sub lock_guard {
86 0     0 1   my $class = shift;
87 0           my ($redis, $key, $expires, $wait) = @_;
88              
89 0 0         my $opt = {
90             wait => $wait,
91             expires => defined $expires ? $expires : $DEFAULT_EXPIRES,
92             };
93 0           my $start = [ Time::HiRes::gettimeofday ];
94 0 0         my $token = try_get_lock($redis, $opt, $key)
95             or return;
96              
97             return Guard::guard {
98 0     0     my $elapsed = Time::HiRes::tv_interval($start);
99 0 0 0       if ($opt->{expires} < $elapsed) {
    0          
100 0           warnf "guard(token:%s) seems to be already expired. elasped %s sec", $token, $elapsed;
101             } elsif (0 < $WARN_LOCK_TIME_THRESHOLD && $WARN_LOCK_TIME_THRESHOLD < $elapsed) {
102 0           warnf "guard(token:%s) elasped %s sec", $token, $elapsed;
103             }
104 0           release_lock($redis, $opt, $key, $token);
105 0           };
106             }
107              
108             sub run {
109 0     0 0   my $class = shift;
110              
111 0           local $Log::Minimal::PRINT = \&log_minimal_print;
112              
113 0           my ($opt, $key, @command) = parse_options(@_);
114              
115 0 0 0       pod2usage() if !defined $key || @command == 0;
116              
117 0 0         my $redis = connect_to_redis_server($opt)
118             or return EXIT_CODE_ERROR;
119              
120 0 0         validate_redis_version($redis)
121             or return EXIT_CODE_ERROR;
122              
123 0 0         if ( my $token = try_get_lock($redis, $opt, $key) ) {
124 0           my $code = invoke_command(@command);
125 0           release_lock($redis, $opt, $key, $token);
126 0           debugf "exit with code %d", $code;
127 0           return $code;
128             }
129             else {
130             # couldnot get lock
131 0 0         if ($opt->{exit_code}) {
132 0           critf "unable to lock %s.", $key;
133 0           return $opt->{exit_code};
134             }
135 0           debugf "exit with code 0";
136 0           return 0; # by option x
137             }
138             }
139              
140             sub connect_to_redis_server {
141 0     0 0   my $opt = shift;
142             try {
143             Redis->new(
144             server => $opt->{redis},
145 0 0   0     reconnect => $opt->{wait} ? $opt->{expires} : 0,
146             every => $RETRY_INTERVAL * 1000, # to msec
147             );
148             }
149             catch {
150 0     0     my $e = $_;
151 0           my $error = (split(/\n/, $e))[0];
152 0           critf "Redis server seems down: %s", $error;
153 0           return;
154 0           };
155             }
156              
157             sub validate_redis_version {
158 0     0 0   my $redis = shift;
159 0           my $version = $redis->info->{redis_version};
160 0           debugf "Redis version is: %s", $version;
161 0           my ($major, $minor, $rev) = split /\./, $version;
162 0 0 0       if ( $major >= 3
      0        
      0        
      0        
      0        
163             || $major == 2 && $minor >= 7
164             || $major == 2 && $minor == 6 && $rev >= 12
165             ) {
166             # ok
167 0           return 1;
168             }
169 0           critf "required Redis server version >= 2.6.12. current server version is %s", $version;
170 0           return;
171             }
172              
173             sub try_get_lock {
174 0     0 0   my ($redis, $opt, $key) = @_;
175 0           my $got_lock;
176 0           my $token = create_token();
177             GET_LOCK:
178 0           while (1) {
179 0           my @args = ($key, $token, "EX", $opt->{expires}, "NX");
180 0           debugf "redis: SET @args";
181 0           $got_lock = $redis->set(@args);
182 0 0         if ($got_lock) {
    0          
183 0           debugf "got lock: %s", $key;
184 0           last GET_LOCK;
185             }
186             elsif (!$opt->{wait}) { # no delay by option n
187 0           debugf "no delay mode. exit";
188 0           last GET_LOCK;
189             }
190 0           debugf "unable to lock. waiting for release";
191 0 0         if ($WAIT_QUEUE) {
192 0           $redis->blpop("$key${BLOCKING_KEY_POSTFIX}", $opt->{expires});
193             }
194             else {
195 0           sleep $RETRY_INTERVAL;
196             }
197             }
198 0 0         return $token if $got_lock;
199             }
200              
201             sub release_lock {
202 0     0 0   my ($redis, $opt, $key, $token) = @_;
203 0 0         if ($opt->{keep}) {
204 0           debugf "Keep lock key %s", $key;
205 0           return;
206             }
207              
208 0           debugf "Release lock key %s", $key;
209 0 0         my $script = $WAIT_QUEUE ? BLOCKING_UNLOCK_LUA_SCRIPT : UNLOCK_LUA_SCRIPT;
210 0           my $sha1 = sha1_hex($script);
211             try {
212 0     0     $redis->evalsha($sha1, 1, $key, $token);
213             }
214             catch {
215 0     0     my $e = $_;
216 0 0         if ($e =~ /NOSCRIPT/) {
217 0           $redis->eval($script, 1, $key, $token);
218             }
219             else {
220 0           croak $e;
221             }
222 0           };
223             }
224              
225             sub invoke_command {
226 0     0 0   my @command = @_;
227 0           debugf "invoking command: @command";
228 0 0         if (my $pid = fork()) {
229 0     0     local $SIG{CHLD} = sub { };
230             local $SIG{TERM} = $SIG{HUP} = $SIG{INT} = $SIG{QUIT} = sub {
231 0     0     my $signal = shift;
232 0           warnf "Got signal %s", $signal;
233 0           kill $signal, $pid;
234 0           };
235 0           wait;
236             }
237             else {
238 0           exec @command;
239 0           die;
240             }
241 0           my $code = $?;
242 0 0         if ($code == -1) {
    0          
243 0           critf "faildto execute: %s", $!;
244 0           return $code;
245             }
246             elsif ($code & 127) {
247 0           debugf "child died with signal %d", $code & 127;
248 0           return $code;
249             }
250             else {
251 0           $code = $code >> 8; # to raw exit code
252 0           debugf "child exit with code: %s", $code;
253 0           return $code;
254             }
255             }
256              
257             sub log_minimal_print {
258 0     0 0   my ( $time, $type, $message, $trace) = @_;
259 0           warn "$time $$ $type $message\n";
260             }
261              
262             sub create_token {
263 0     0 0   Time::HiRes::time() . rand();
264             }
265              
266             1;
267             __END__