File Coverage

blib/lib/IPC/ConcurrencyLimit/WithLatestStandby.pm
Criterion Covered Total %
statement 81 96 84.3
branch 29 54 53.7
condition 10 24 41.6
subroutine 11 18 61.1
pod 0 6 0.0
total 131 198 66.1


line stmt bran cond sub pod time code
1             package IPC::ConcurrencyLimit::WithLatestStandby;
2 35     35   572880 use 5.008001;
  35         105  
3 35     35   105 use strict;
  35         35  
  35         560  
4 35     35   70 use warnings;
  35         35  
  35         1190  
5              
6             our $VERSION = '0.15';
7              
8 35     35   105 use Carp qw(croak);
  35         35  
  35         1260  
9 35     35   14560 use Time::HiRes qw(sleep time);
  35         35525  
  35         140  
10 35     35   16695 use IPC::ConcurrencyLimit;
  35         70  
  35         27090  
11              
12             sub new {
13 69     69 0 319156361 my $class = shift;
14 69         990 my %params = @_;
15 69         463 my $type = delete $params{type};
16 69 50       581 $type = 'Flock' if not defined $type;
17 69 50       358 croak( __PACKAGE__ . " only supports 'Flock' for now")
18             if $type ne 'Flock';
19              
20 69 50 33     316 if (defined $params{max_procs} and $params{max_procs}!=1) {
21 0         0 croak( __PACKAGE__ . " does not support max_procs!=1, use multiple objects instead.");
22             }
23              
24 69   50     747 my $process_name_change= $params{process_name_change} // 1;
25 69   50     314 my $path= $params{path} || die "lock_root is mandatory";
26 69   50     563 my $file_prefix= $params{file_prefix} || "";
27 69   0     190 my $poll_time= $params{poll_time} || $params{interval} || 1; # seconds to poll (may be fraction)
28 69   50     512 my $retries= $params{retries} || undef;
29 69   50     323 my $timeout= $params{timeout} || undef;
30 69   50     284 my $debug= $params{debug} || 0; # show debug?
31 69   50     156 my $debug_sub= $params{debug_sub} || undef;
32              
33             my $retry_sub= (ref $retries) ? $retries :
34             (defined $retries &&
35 0 0   0   0 defined $timeout) ? sub { $_[0] <= $retries && $_[2] <= $timeout } :
36 0     0   0 (defined $retries) ? sub { $_[0] <= $retries } :
37 0     0   0 (defined $timeout) ? sub { $_[2] <= $timeout } :
38 69 50 33 207   1956 sub { 1 };
  207 50       637  
    50          
    50          
39              
40             # primary is replace by standby1, is replaced by
41             # standby2, is replaced by standby3. However, standby1
42             # will exit when standby2's lock is held by another process.
43 69 50       421 my @names= map { ($file_prefix ? "$file_prefix.$_" : $_) }
  276         1171  
44             qw(primary standby1 standby2 standby3);
45             my @lockers= map {
46 69 50       305 IPC::ConcurrencyLimit->new(
  276         1664  
47             type => $type,
48             max_procs => 1,
49             # future proofing
50             $type eq "Flock" ? (
51             file_prefix => $_,
52             path => $path,
53             ) : (),
54             )
55             } @names;
56              
57             return bless {
58             poll_time => $poll_time,
59             timeout => $timeout, # FYI
60             retries => $retries, # FYI
61             lock_name => \@names,
62             locker => \@lockers,
63             debug => $debug,
64 0     0   0 debug_sub => $debug_sub || sub { warn @_,"\n" },
65 69   50     1175 retry_sub => $retry_sub,
66             process_name_change => $process_name_change,
67             }, $class;
68             }
69              
70             sub _diag {
71 318     318   596 my ($self, $fmt, @args)= @_;
72 318 50       609 if (!@args) {
73 318         1288 $self->{debug_sub}->($fmt);
74             } else {
75 0         0 $self->{debug_sub}->(sprintf $fmt, @args);
76             }
77             }
78              
79              
80             sub get_lock {
81 69     69 0 23916 my ($self) = @_;
82              
83 69         218 my $locker= $self->{locker};
84 69         138 my $names= $self->{lock_name};
85              
86 69         280 my $old_oh= $0;
87              
88             $0 = "$old_oh - acquire"
89 69 50       1611 if $self->{process_name_change};
90              
91             # try to get the rightmost lock (standby3) if we don't get it
92             # then we exit out. this shouldn't really happen if other things
93             # are sane, for instance when $poll_time is much smaller than
94             # the rate we allocate new workers.
95 69         245 my $locker_id= $#$locker;
96 69 100       272 if ( $locker->[$locker_id]->get_lock() ) {
97 68         478 $self->_diag( "Got a $names->[$locker_id] lock");
98             } else {
99             $self->_diag( "Failed to get a $names->[$locker_id] lock, entry lock is held by another process" )
100 1 50       14 if $self->{debug};
101             $0 = "$old_oh - no-lock-acquired"
102 1 50       291 if $self->{process_name_change};
103 1         10 return;
104             }
105              
106             # Each worker tries to acquire the lock to its left. If it does
107             # then it abandons its old lock. If that means the worker ends up
108             # on locker_id 0 then they are done, and can do work.
109             # The first standby worker also looks to its right to see if there
110             # is a replacement process for it, if there is it exits, leaving
111             # a gap and letting the replacements shuffle left.
112 68         8251 my $tries= 0;
113 68         108 my $lock_tries= 0;
114 68         545 my $standby_start= time();
115 68         164 my $lock_start= time();
116 68         128 my $poll_time= $self->{poll_time};
117              
118 68         318 while ( $locker_id > 0 ) {
119             $0 = "$old_oh - $names->[$locker_id]"
120 388 50       6433 if $self->{process_name_change};
121              
122             # can we shuffle our lock left?
123 388 100       2543 if ( $locker->[$locker_id - 1]->get_lock() ) {
124 181         889 $self->_diag( "Got a $names->[$locker_id -1] lock, dropping old $names->[$locker_id] lock");
125             # yep, we got the lock to the left, so drop our old lock,
126             # and move the pointer left at the same time.
127 181         18802 $locker->[ $locker_id-- ]->release_lock();
128 181         198 $lock_tries= 0;
129 181         404 $lock_start= time();
130 181         462 next;
131             }
132              
133 207 50       2172 unless ($self->{retry_sub}->(++$tries, ++$lock_tries, time - $standby_start, time - $lock_start)) {
134             $0 = "$old_oh - no-lock-timeout"
135 0 0       0 if $self->{process_name_change};
136 0         0 return;
137             }
138              
139             # check if we are the first standby worker.
140 207 100       862 if ( $locker_id == 1 ) {
141             # yep - we are the first standby worker,
142             # so check if the lock to our right is being held:
143 182 100       848 if ( $locker->[$locker_id + 1]->get_lock() ) {
144             # we got the lock, which means nothing else
145             # holds it. so we release the lock and move on.
146 159         921 $locker->[$locker_id + 1]->release_lock();
147             } else {
148             $self->_diag(
149             "A newer worker is holding the $names->[$locker_id+1] lock, will exit to let it take over"
150 23 50       351 ) if $self->{debug};
151             # we failed to get the lock, which means there is a newer
152             # process that can replace us so return/exit - this frees up
153             # our lock and lets the newer process to move into our position.
154             $0 = "$old_oh - no-lock-retired"
155 23 50       2093 if $self->{process_name_change};
156 23         80 return;
157             }
158             }
159              
160             # nope - the lock to our left is being held so sleep a while before
161             # we try again. We use the rand and the formula so that items to the
162             # right poll faster than items to the left, and to reduce the chance
163             # that lock holder 1 and lock holder 3 poll lock 2 at the same time
164             # forever. The formula guarantees that items to the left poll faster,
165             # and the rand ensures there is jitter.
166 184         14693650 sleep rand(($poll_time / $locker_id)*2);
167             }
168              
169             # assert that $locker_id is 0 at this point.
170 45 50       167 die "panic: We should not reach this point with \$locker_id larger than 0, got $locker_id"
171             if $locker_id;
172              
173             $self->_diag("Got $names->[$locker_id] lock, we are allowed to do work.")
174 45 50       880 if $self->{debug};
175              
176             # at this point we should be $locker_id == 0 and we can do work.
177 45 50       3097 if ($self->{process_name_change}) {
178 45 50       127 if ($self->{process_name_change} > 1) {
179 0         0 $0 = $old_oh;
180             } else {
181 45         263 $0 = "$old_oh - $names->[$locker_id]"
182             }
183             }
184 45         130 return 1;
185             }
186              
187              
188             sub is_locked {
189 0     0 0 0 my $self = shift;
190 0         0 return $self->{locker}[0]->is_locked(@_);
191             }
192              
193             sub release_lock {
194 31     31 0 62327236 my $self = shift;
195 31         341 return $self->{locker}[0]->release_lock(@_);
196             }
197              
198             sub lock_id {
199 0     0 0   my $self = shift;
200 0           return $self->{locker}[0]->lock_id(@_);
201             }
202              
203             sub heartbeat {
204 0     0 0   my $self = shift;
205 0           return $self->{locker}[0]->heartbeat;
206             }
207              
208              
209             1;
210              
211             __END__