File Coverage

blib/lib/Redis/Setlock.pm
Criterion Covered Total %
statement 41 137 29.9
branch 0 46 0.0
condition 0 18 0.0
subroutine 14 30 46.6
pod 1 11 9.0
total 56 242 23.1


line stmt bran cond sub pod time code
1             package Redis::Setlock;
2 13     13   1192587 use 5.008001;
  13         75  
3 13     13   53 use strict;
  13         15  
  13         210  
4 13     13   46 use warnings;
  13         19  
  13         257  
5 13     13   7487 use Redis;
  13         441927  
  13         414  
6 13     13   8239 use Getopt::Long ();
  13         118464  
  13         364  
7 13     13   6100 use Pod::Usage;
  13         540251  
  13         1468  
8 13     13   5701 use Log::Minimal;
  13         138476  
  13         67  
9 13     13   1468 use Try::Tiny;
  13         25  
  13         623  
10 13     13   142 use Time::HiRes qw/ sleep /;
  13         23  
  13         103  
11 13     13   1951 use Carp;
  13         23  
  13         567  
12 13     13   5364 use Guard ();
  13         5861  
  13         752  
13              
14             our $VERSION = "0.11";
15             our $DEFAULT_EXPIRES = 86400;
16             our $RETRY_INTERVAL = 0.5;
17             our $BLOCKING_KEY_POSTFIX = ":wait";
18             our $WAIT_QUEUE = 0;
19              
20             use constant {
21 13         860 EXIT_CODE_ERROR => 111,
22 13     13   77 };
  13         30  
23              
24 13         793 use constant UNLOCK_LUA_SCRIPT => <<'END_OF_SCRIPT'
25             if redis.call("get",KEYS[1]) == ARGV[1]
26             then
27             return redis.call("del",KEYS[1])
28             else
29             return 0
30             end
31             END_OF_SCRIPT
32 13     13   61 ;
  13         26  
33              
34 13         15808 use constant BLOCKING_UNLOCK_LUA_SCRIPT_TMPL => <<'END_OF_SCRIPT'
35             if redis.call("get",KEYS[1]) == ARGV[1]
36             then
37             redis.call("del",KEYS[1],KEYS[1].."%s")
38             return redis.call("lpush",KEYS[1].."%s",ARGV[1])
39             else
40             return 0
41             end
42             END_OF_SCRIPT
43 13     13   76 ;
  13         21  
44              
45             sub BLOCKING_UNLOCK_LUA_SCRIPT {
46 0     0 0   sprintf BLOCKING_UNLOCK_LUA_SCRIPT_TMPL, $BLOCKING_KEY_POSTFIX, $BLOCKING_KEY_POSTFIX;
47             }
48              
49             sub parse_options {
50 0     0 0   my (@argv) = @_;
51              
52 0           my $p = Getopt::Long::Parser->new(
53             config => [qw/posix_default no_ignore_case auto_help bundling pass_through/]
54             );
55 0           my $opt = {
56             wait => 1,
57             exit_code => EXIT_CODE_ERROR,
58             };
59 0 0         $p->getoptionsfromarray(\@argv, $opt, qw/
60             redis=s
61             expires=i
62             keep
63             n
64             N
65             x
66             X
67             version
68             /) or pod2usage;
69              
70 0 0         if ($opt->{version}) {
71 0           print STDERR "version: $VERSION\n";
72 0           exit 0;
73             }
74 0 0         $opt->{wait} = 0 if $opt->{n}; # no delay
75 0 0         $opt->{exit_code} = 0 if $opt->{x}; # exit code 0
76 0 0         $opt->{expires} = $DEFAULT_EXPIRES unless defined $opt->{expires};
77              
78 0           return ($opt, @argv);
79             }
80              
81             sub lock_guard {
82 0     0 1   my $class = shift;
83 0           my ($redis, $key, $expires, $wait) = @_;
84              
85 0 0         my $opt = {
86             wait => $wait,
87             expires => defined $expires ? $expires : $DEFAULT_EXPIRES,
88             };
89 0 0         my $token = try_get_lock($redis, $opt, $key)
90             or return;
91             return Guard::guard {
92 0     0     release_lock($redis, $opt, $key, $token);
93 0           };
94             }
95              
96             sub run {
97 0     0 0   my $class = shift;
98              
99 0           local $Log::Minimal::PRINT = \&log_minimal_print;
100              
101 0           my ($opt, $key, @command) = parse_options(@_);
102              
103 0 0 0       pod2usage() if !defined $key || @command == 0;
104              
105 0 0         my $redis = connect_to_redis_server($opt)
106             or return EXIT_CODE_ERROR;
107              
108 0 0         validate_redis_version($redis)
109             or return EXIT_CODE_ERROR;
110              
111 0 0         if ( my $token = try_get_lock($redis, $opt, $key) ) {
112 0           my $code = invoke_command(@command);
113 0           release_lock($redis, $opt, $key, $token);
114 0           debugf "exit with code %d", $code;
115 0           return $code;
116             }
117             else {
118             # couldnot get lock
119 0 0         if ($opt->{exit_code}) {
120 0           critf "unable to lock %s.", $key;
121 0           return $opt->{exit_code};
122             }
123 0           debugf "exit with code 0";
124 0           return 0; # by option x
125             }
126             }
127              
128             sub connect_to_redis_server {
129 0     0 0   my $opt = shift;
130             try {
131             Redis->new(
132             server => $opt->{redis},
133 0 0   0     reconnect => $opt->{wait} ? $opt->{expires} : 0,
134             every => $RETRY_INTERVAL * 1000, # to msec
135             );
136             }
137             catch {
138 0     0     my $e = $_;
139 0           my $error = (split(/\n/, $e))[0];
140 0           critf "Redis server seems down: %s", $error;
141 0           return;
142 0           };
143             }
144              
145             sub validate_redis_version {
146 0     0 0   my $redis = shift;
147 0           my $version = $redis->info->{redis_version};
148 0           debugf "Redis version is: %s", $version;
149 0           my ($major, $minor, $rev) = split /\./, $version;
150 0 0 0       if ( $major >= 3
      0        
      0        
      0        
      0        
151             || $major == 2 && $minor >= 7
152             || $major == 2 && $minor == 6 && $rev >= 12
153             ) {
154             # ok
155 0           return 1;
156             }
157 0           critf "required Redis server version >= 2.6.12. current server version is %s", $version;
158 0           return;
159             }
160              
161             sub try_get_lock {
162 0     0 0   my ($redis, $opt, $key) = @_;
163 0           my $got_lock;
164 0           my $token = create_token();
165             GET_LOCK:
166 0           while (1) {
167 0           my @args = ($key, $token, "EX", $opt->{expires}, "NX");
168 0           debugf "redis: SET @args";
169 0           $got_lock = $redis->set(@args);
170 0 0         if ($got_lock) {
    0          
171 0           debugf "got lock: %s", $key;
172 0           last GET_LOCK;
173             }
174             elsif (!$opt->{wait}) { # no delay by option n
175 0           debugf "no delay mode. exit";
176 0           last GET_LOCK;
177             }
178 0           debugf "unable to lock. waiting for release";
179 0 0         if ($WAIT_QUEUE) {
180 0           $redis->blpop("$key${BLOCKING_KEY_POSTFIX}", $opt->{expires});
181             }
182             else {
183 0           sleep $RETRY_INTERVAL;
184             }
185             }
186 0 0         return $token if $got_lock;
187             }
188              
189             sub release_lock {
190 0     0 0   my ($redis, $opt, $key, $token) = @_;
191 0 0         if ($opt->{keep}) {
192 0           debugf "Keep lock key %s", $key;
193             }
194             else {
195 0           debugf "Release lock key %s", $key;
196 0 0         if ($WAIT_QUEUE) {
197 0           $redis->eval(BLOCKING_UNLOCK_LUA_SCRIPT, 1, $key, $token);
198             }
199             else {
200 0           $redis->eval(UNLOCK_LUA_SCRIPT, 1, $key, $token);
201             }
202             }
203             }
204              
205             sub invoke_command {
206 0     0 0   my @command = @_;
207 0           debugf "invoking command: @command";
208 0 0         if (my $pid = fork()) {
209 0     0     local $SIG{CHLD} = sub { };
210             local $SIG{TERM} = $SIG{HUP} = $SIG{INT} = $SIG{QUIT} = sub {
211 0     0     my $signal = shift;
212 0           warnf "Got signal %s", $signal;
213 0           kill $signal, $pid;
214 0           };
215 0           wait;
216             }
217             else {
218 0           exec @command;
219 0           die;
220             }
221 0           my $code = $?;
222 0 0         if ($code == -1) {
    0          
223 0           critf "faildto execute: %s", $!;
224 0           return $code;
225             }
226             elsif ($code & 127) {
227 0           debugf "child died with signal %d", $code & 127;
228 0           return $code;
229             }
230             else {
231 0           $code = $code >> 8; # to raw exit code
232 0           debugf "child exit with code: %s", $code;
233 0           return $code;
234             }
235             }
236              
237             sub log_minimal_print {
238 0     0 0   my ( $time, $type, $message, $trace) = @_;
239 0           warn "$time $$ $type $message\n";
240             }
241              
242             sub create_token {
243 0     0 0   Time::HiRes::time() . rand();
244             }
245              
246             1;
247             __END__