File Coverage

blib/lib/Redis/Setlock.pm
Criterion Covered Total %
statement 44 146 30.1
branch 0 48 0.0
condition 0 18 0.0
subroutine 15 33 45.4
pod 1 11 9.0
total 60 256 23.4


line stmt bran cond sub pod time code
1             package Redis::Setlock;
2 13     13   1136957 use 5.008001;
  13         88  
3 13     13   47 use strict;
  13         18  
  13         199  
4 13     13   43 use warnings;
  13         19  
  13         224  
5 13     13   6952 use Redis;
  13         414365  
  13         330  
6 13     13   7416 use Getopt::Long ();
  13         106134  
  13         340  
7 13     13   5284 use Pod::Usage;
  13         491815  
  13         1483  
8 13     13   5743 use Log::Minimal;
  13         131414  
  13         67  
9 13     13   1400 use Try::Tiny;
  13         22  
  13         622  
10 13     13   62 use Time::HiRes qw/ sleep /;
  13         22  
  13         87  
11 13     13   1913 use Carp;
  13         20  
  13         565  
12 13     13   5983 use Digest::SHA qw/ sha1_hex /;
  13         26976  
  13         873  
13 13     13   4633 use Guard ();
  13         5025  
  13         693  
14              
15             our $VERSION = "0.12";
16             our $DEFAULT_EXPIRES = 86400;
17             our $RETRY_INTERVAL = 0.5;
18             our $BLOCKING_KEY_POSTFIX = ":wait";
19             our $WAIT_QUEUE = 0;
20              
21             use constant {
22 13         908 EXIT_CODE_ERROR => 111,
23 13     13   70 };
  13         22  
24              
25 13         522 use constant UNLOCK_LUA_SCRIPT => <<'END_OF_SCRIPT'
26             if redis.call("get",KEYS[1]) == ARGV[1]
27             then
28             return redis.call("del",KEYS[1])
29             else
30             return 0
31             end
32             END_OF_SCRIPT
33 13     13   62 ;
  13         21  
34              
35 13         15676 use constant BLOCKING_UNLOCK_LUA_SCRIPT_TMPL => <<'END_OF_SCRIPT'
36             if redis.call("get",KEYS[1]) == ARGV[1]
37             then
38             local q = KEYS[1].."%s"
39             redis.call("del", KEYS[1])
40             redis.call("expire", q, 60)
41             return redis.call("lpush", q, ARGV[1])
42             else
43             return 0
44             end
45             END_OF_SCRIPT
46 13     13   56 ;
  13         16  
47              
48             sub BLOCKING_UNLOCK_LUA_SCRIPT {
49 0     0 0   sprintf BLOCKING_UNLOCK_LUA_SCRIPT_TMPL, $BLOCKING_KEY_POSTFIX;
50             }
51              
52             sub parse_options {
53 0     0 0   my (@argv) = @_;
54              
55 0           my $p = Getopt::Long::Parser->new(
56             config => [qw/posix_default no_ignore_case auto_help bundling pass_through/]
57             );
58 0           my $opt = {
59             wait => 1,
60             exit_code => EXIT_CODE_ERROR,
61             };
62 0 0         $p->getoptionsfromarray(\@argv, $opt, qw/
63             redis=s
64             expires=i
65             keep
66             n
67             N
68             x
69             X
70             version
71             /) or pod2usage;
72              
73 0 0         if ($opt->{version}) {
74 0           print STDERR "version: $VERSION\n";
75 0           exit 0;
76             }
77 0 0         $opt->{wait} = 0 if $opt->{n}; # no delay
78 0 0         $opt->{exit_code} = 0 if $opt->{x}; # exit code 0
79 0 0         $opt->{expires} = $DEFAULT_EXPIRES unless defined $opt->{expires};
80              
81 0           return ($opt, @argv);
82             }
83              
84             sub lock_guard {
85 0     0 1   my $class = shift;
86 0           my ($redis, $key, $expires, $wait) = @_;
87              
88 0 0         my $opt = {
89             wait => $wait,
90             expires => defined $expires ? $expires : $DEFAULT_EXPIRES,
91             };
92 0 0         my $token = try_get_lock($redis, $opt, $key)
93             or return;
94             return Guard::guard {
95 0     0     release_lock($redis, $opt, $key, $token);
96 0           };
97             }
98              
99             sub run {
100 0     0 0   my $class = shift;
101              
102 0           local $Log::Minimal::PRINT = \&log_minimal_print;
103              
104 0           my ($opt, $key, @command) = parse_options(@_);
105              
106 0 0 0       pod2usage() if !defined $key || @command == 0;
107              
108 0 0         my $redis = connect_to_redis_server($opt)
109             or return EXIT_CODE_ERROR;
110              
111 0 0         validate_redis_version($redis)
112             or return EXIT_CODE_ERROR;
113              
114 0 0         if ( my $token = try_get_lock($redis, $opt, $key) ) {
115 0           my $code = invoke_command(@command);
116 0           release_lock($redis, $opt, $key, $token);
117 0           debugf "exit with code %d", $code;
118 0           return $code;
119             }
120             else {
121             # couldnot get lock
122 0 0         if ($opt->{exit_code}) {
123 0           critf "unable to lock %s.", $key;
124 0           return $opt->{exit_code};
125             }
126 0           debugf "exit with code 0";
127 0           return 0; # by option x
128             }
129             }
130              
131             sub connect_to_redis_server {
132 0     0 0   my $opt = shift;
133             try {
134             Redis->new(
135             server => $opt->{redis},
136 0 0   0     reconnect => $opt->{wait} ? $opt->{expires} : 0,
137             every => $RETRY_INTERVAL * 1000, # to msec
138             );
139             }
140             catch {
141 0     0     my $e = $_;
142 0           my $error = (split(/\n/, $e))[0];
143 0           critf "Redis server seems down: %s", $error;
144 0           return;
145 0           };
146             }
147              
148             sub validate_redis_version {
149 0     0 0   my $redis = shift;
150 0           my $version = $redis->info->{redis_version};
151 0           debugf "Redis version is: %s", $version;
152 0           my ($major, $minor, $rev) = split /\./, $version;
153 0 0 0       if ( $major >= 3
      0        
      0        
      0        
      0        
154             || $major == 2 && $minor >= 7
155             || $major == 2 && $minor == 6 && $rev >= 12
156             ) {
157             # ok
158 0           return 1;
159             }
160 0           critf "required Redis server version >= 2.6.12. current server version is %s", $version;
161 0           return;
162             }
163              
164             sub try_get_lock {
165 0     0 0   my ($redis, $opt, $key) = @_;
166 0           my $got_lock;
167 0           my $token = create_token();
168             GET_LOCK:
169 0           while (1) {
170 0           my @args = ($key, $token, "EX", $opt->{expires}, "NX");
171 0           debugf "redis: SET @args";
172 0           $got_lock = $redis->set(@args);
173 0 0         if ($got_lock) {
    0          
174 0           debugf "got lock: %s", $key;
175 0           last GET_LOCK;
176             }
177             elsif (!$opt->{wait}) { # no delay by option n
178 0           debugf "no delay mode. exit";
179 0           last GET_LOCK;
180             }
181 0           debugf "unable to lock. waiting for release";
182 0 0         if ($WAIT_QUEUE) {
183 0           $redis->blpop("$key${BLOCKING_KEY_POSTFIX}", $opt->{expires});
184             }
185             else {
186 0           sleep $RETRY_INTERVAL;
187             }
188             }
189 0 0         return $token if $got_lock;
190             }
191              
192             sub release_lock {
193 0     0 0   my ($redis, $opt, $key, $token) = @_;
194 0 0         if ($opt->{keep}) {
195 0           debugf "Keep lock key %s", $key;
196 0           return;
197             }
198              
199 0           debugf "Release lock key %s", $key;
200 0 0         my $script = $WAIT_QUEUE ? BLOCKING_UNLOCK_LUA_SCRIPT : UNLOCK_LUA_SCRIPT;
201 0           my $sha1 = sha1_hex($script);
202             try {
203 0     0     $redis->evalsha($sha1, 1, $key, $token);
204             }
205             catch {
206 0     0     my $e = $_;
207 0 0         if ($e =~ /NOSCRIPT/) {
208 0           $redis->eval($script, 1, $key, $token);
209             }
210             else {
211 0           croak $e;
212             }
213 0           };
214             }
215              
216             sub invoke_command {
217 0     0 0   my @command = @_;
218 0           debugf "invoking command: @command";
219 0 0         if (my $pid = fork()) {
220 0     0     local $SIG{CHLD} = sub { };
221             local $SIG{TERM} = $SIG{HUP} = $SIG{INT} = $SIG{QUIT} = sub {
222 0     0     my $signal = shift;
223 0           warnf "Got signal %s", $signal;
224 0           kill $signal, $pid;
225 0           };
226 0           wait;
227             }
228             else {
229 0           exec @command;
230 0           die;
231             }
232 0           my $code = $?;
233 0 0         if ($code == -1) {
    0          
234 0           critf "faildto execute: %s", $!;
235 0           return $code;
236             }
237             elsif ($code & 127) {
238 0           debugf "child died with signal %d", $code & 127;
239 0           return $code;
240             }
241             else {
242 0           $code = $code >> 8; # to raw exit code
243 0           debugf "child exit with code: %s", $code;
244 0           return $code;
245             }
246             }
247              
248             sub log_minimal_print {
249 0     0 0   my ( $time, $type, $message, $trace) = @_;
250 0           warn "$time $$ $type $message\n";
251             }
252              
253             sub create_token {
254 0     0 0   Time::HiRes::time() . rand();
255             }
256              
257             1;
258             __END__