File Coverage

blib/lib/IPC/ConcurrencyLimit/WithLatestStandby.pm
Criterion Covered Total %
statement 74 96 77.0
branch 27 58 46.5
condition 10 24 41.6
subroutine 10 18 55.5
pod 0 6 0.0
total 121 202 59.9


line stmt bran cond sub pod time code
1             package IPC::ConcurrencyLimit::WithLatestStandby;
2 4     4   99160 use 5.008001;
  4         16  
3 4     4   16 use strict;
  4         4  
  4         80  
4 4     4   12 use warnings;
  4         8  
  4         172  
5              
6             our $VERSION = '0.17';
7              
8 4     4   16 use Carp qw(croak);
  4         4  
  4         328  
9 4     4   2324 use Time::HiRes qw(sleep time);
  4         6900  
  4         20  
10 4     4   2432 use IPC::ConcurrencyLimit;
  4         8  
  4         4604  
11              
12             sub new {
13 7     7 0 1521614 my $class = shift;
14 7         125 my %params = @_;
15 7         74 my $type = delete $params{type};
16 7 50       94 $type = 'Flock' if not defined $type;
17 7 50       64 croak( __PACKAGE__ . " only supports 'Flock' for now")
18             if $type ne 'Flock';
19              
20 7 50 33     70 if (defined $params{max_procs} and $params{max_procs}!=1) {
21 0         0 croak( __PACKAGE__ . " does not support max_procs!=1, use multiple objects instead.");
22             }
23              
24 7   50     139 my $process_name_change= $params{process_name_change} || 0;
25 7   50     58 my $path= $params{path} || die __PACKAGE__ . '->new: missing mandatory parameter `path`';
26 7   50     66 my $file_prefix= $params{file_prefix} || "";
27 7   0     35 my $poll_time= $params{poll_time} || $params{interval} || 1; # seconds to poll (may be fraction)
28 7   50     1258 my $retries= $params{retries} || undef;
29 7   50     65 my $timeout= $params{timeout} || undef;
30 7   50     34 my $debug= $params{debug} || 0; # show debug?
31 7   50     28 my $debug_sub= $params{debug_sub} || undef;
32              
33             my $retry_sub= (ref $retries) ? $retries :
34             (defined $retries &&
35 0 0   0   0 defined $timeout) ? sub { $_[0] <= $retries && $_[2] <= $timeout } :
36 0     0   0 (defined $retries) ? sub { $_[0] <= $retries } :
37 0     0   0 (defined $timeout) ? sub { $_[2] <= $timeout } :
38 7 50 33 25   333 sub { 1 };
  25 50       140  
    50          
    50          
39              
40             # primary is replace by standby1, is replaced by
41             # standby2, is replaced by standby3. However, standby1
42             # will exit when standby2's lock is held by another process.
43 7 50       58 my @names= map { ($file_prefix ? "$file_prefix.$_" : $_) }
  28         118  
44             qw(primary standby1 standby2 standby3);
45             my @lockers= map {
46 7 50       50 IPC::ConcurrencyLimit->new(
  28         272  
47             type => $type,
48             max_procs => 1,
49             # future proofing
50             $type eq "Flock" ? (
51             file_prefix => $_,
52             path => $path,
53             ) : (),
54             )
55             } @names;
56              
57             return bless {
58             poll_time => $poll_time,
59             timeout => $timeout, # FYI
60             retries => $retries, # FYI
61             lock_name => \@names,
62             locker => \@lockers,
63             debug => $debug,
64 0     0   0 debug_sub => $debug_sub || sub { warn @_,"\n" },
65 7   50     253 retry_sub => $retry_sub,
66             process_name_change => $process_name_change,
67             }, $class;
68             }
69              
70             sub _diag {
71 33     33   67 my ($self, $fmt, @args)= @_;
72 33 50       92 if (!@args) {
73 33         129 $self->{debug_sub}->($fmt);
74             } else {
75 0         0 $self->{debug_sub}->(sprintf $fmt, @args);
76             }
77             }
78              
79              
80             sub get_lock {
81 7     7 0 1746 my ($self) = @_;
82              
83 7         24 my $locker= $self->{locker};
84 7         14 my $names= $self->{lock_name};
85              
86 7         48 my $old_oh= $0;
87              
88             $0 = "$old_oh - acquire"
89 7 50       43 if $self->{process_name_change};
90              
91             # try to get the rightmost lock (standby3) if we don't get it
92             # then we exit out. this shouldn't really happen if other things
93             # are sane, for instance when $poll_time is much smaller than
94             # the rate we allocate new workers.
95 7         25 my $locker_id= $#$locker;
96 7 50       35 if ( $locker->[$locker_id]->get_lock() ) {
97             $self->_diag( "Got a $names->[$locker_id] lock")
98 7 50       53 if $self->{debug};
99             } else {
100             $self->_diag( "Failed to get a $names->[$locker_id] lock, entry lock is held by another process" )
101 0 0       0 if $self->{debug};
102             $0 = "$old_oh - no-lock-acquired"
103 0 0       0 if $self->{process_name_change};
104 0         0 return;
105             }
106              
107             # Each worker tries to acquire the lock to its left. If it does
108             # then it abandons its old lock. If that means the worker ends up
109             # on locker_id 0 then they are done, and can do work.
110             # The first standby worker also looks to its right to see if there
111             # is a replacement process for it, if there is it exits, leaving
112             # a gap and letting the replacements shuffle left.
113 7         1018 my $tries= 0;
114 7         10 my $lock_tries= 0;
115 7         49 my $standby_start= time();
116 7         14 my $lock_start= time();
117 7         34 my $poll_time= $self->{poll_time};
118              
119 7         28 while ( $locker_id > 0 ) {
120             $0 = "$old_oh - $names->[$locker_id]"
121 44 50       239 if $self->{process_name_change};
122              
123             # can we shuffle our lock left?
124 44 100       479 if ( $locker->[$locker_id - 1]->get_lock() ) {
125             $self->_diag( "Got a $names->[$locker_id -1] lock, dropping old $names->[$locker_id] lock")
126 19 50       150 if $self->{debug};
127             # yep, we got the lock to the left, so drop our old lock,
128             # and move the pointer left at the same time.
129 19         1534 $locker->[ $locker_id-- ]->release_lock();
130 19         24 $lock_tries= 0;
131 19         50 $lock_start= time();
132 19         55 next;
133             }
134              
135 25 50       603 unless ($self->{retry_sub}->(++$tries, ++$lock_tries, time - $standby_start, time - $lock_start)) {
136             $0 = "$old_oh - no-lock-timeout"
137 0 0       0 if $self->{process_name_change};
138 0         0 return;
139             }
140              
141             # check if we are the first standby worker.
142 25 100       267 if ( $locker_id == 1 ) {
143             # yep - we are the first standby worker,
144             # so check if the lock to our right is being held:
145 21 100       159 if ( $locker->[$locker_id + 1]->get_lock() ) {
146             # we got the lock, which means nothing else
147             # holds it. so we release the lock and move on.
148 19         123 $locker->[$locker_id + 1]->release_lock();
149             } else {
150             $self->_diag(
151             "A newer worker is holding the $names->[$locker_id+1] lock, will exit to let it take over"
152 2 50       30 ) if $self->{debug};
153             # we failed to get the lock, which means there is a newer
154             # process that can replace us so return/exit - this frees up
155             # our lock and lets the newer process to move into our position.
156             $0 = "$old_oh - no-lock-retired"
157 2 50       251 if $self->{process_name_change};
158 2         10 return;
159             }
160             }
161              
162             # nope - the lock to our left is being held so sleep a while before
163             # we try again. We use the rand and the formula so that items to the
164             # right poll faster than items to the left, and to reduce the chance
165             # that lock holder 1 and lock holder 3 poll lock 2 at the same time
166             # forever. The formula guarantees that items to the left poll faster,
167             # and the rand ensures there is jitter.
168 23         1622397 sleep rand(($poll_time / $locker_id)*2);
169             }
170              
171             # assert that $locker_id is 0 at this point.
172 5 50       19 die "panic: We should not reach this point with \$locker_id larger than 0, got $locker_id"
173             if $locker_id;
174              
175             $self->_diag("Got $names->[$locker_id] lock, we are allowed to do work.")
176 5 50       42 if $self->{debug};
177              
178             # at this point we should be $locker_id == 0 and we can do work.
179 5 50       273 if ($self->{process_name_change}) {
180 0 0       0 if ($self->{process_name_change} > 1) {
181 0         0 $0 = $old_oh;
182             } else {
183 0         0 $0 = "$old_oh - $names->[$locker_id]"
184             }
185             }
186 5         20 return 1;
187             }
188              
189              
190             sub is_locked {
191 0     0 0   my $self = shift;
192 0           return $self->{locker}[0]->is_locked(@_);
193             }
194              
195             sub release_lock {
196 0     0 0   my $self = shift;
197 0           return $self->{locker}[0]->release_lock(@_);
198             }
199              
200             sub lock_id {
201 0     0 0   my $self = shift;
202 0           return $self->{locker}[0]->lock_id(@_);
203             }
204              
205             sub heartbeat {
206 0     0 0   my $self = shift;
207 0           return $self->{locker}[0]->heartbeat;
208             }
209              
210              
211             1;
212              
213             __END__