File Coverage

blib/lib/IPC/ConcurrencyLimit/WithLatestStandby.pm
Criterion Covered Total %
statement 76 96 79.1
branch 25 54 46.3
condition 10 24 41.6
subroutine 11 18 61.1
pod 0 6 0.0
total 122 198 61.6


line stmt bran cond sub pod time code
1             package IPC::ConcurrencyLimit::WithLatestStandby;
2 35     35   577010 use 5.008001;
  35         105  
3 35     35   105 use strict;
  35         35  
  35         490  
4 35     35   105 use warnings;
  35         35  
  35         1155  
5              
6             our $VERSION = '0.16';
7              
8 35     35   105 use Carp qw(croak);
  35         35  
  35         1295  
9 35     35   16835 use Time::HiRes qw(sleep time);
  35         36330  
  35         140  
10 35     35   16065 use IPC::ConcurrencyLimit;
  35         70  
  35         26215  
11              
12             sub new {
13 69     69 0 311626632 my $class = shift;
14 69         1015 my %params = @_;
15 69         318 my $type = delete $params{type};
16 69 50       649 $type = 'Flock' if not defined $type;
17 69 50       433 croak( __PACKAGE__ . " only supports 'Flock' for now")
18             if $type ne 'Flock';
19              
20 69 50 33     386 if (defined $params{max_procs} and $params{max_procs}!=1) {
21 0         0 croak( __PACKAGE__ . " does not support max_procs!=1, use multiple objects instead.");
22             }
23              
24 69   50     925 my $process_name_change= $params{process_name_change} || 0;
25 69   50     276 my $path= $params{path} || die __PACKAGE__ . '->new: missing mandatory parameter `path`';
26 69   50     339 my $file_prefix= $params{file_prefix} || "";
27 69   0     219 my $poll_time= $params{poll_time} || $params{interval} || 1; # seconds to poll (may be fraction)
28 69   50     677 my $retries= $params{retries} || undef;
29 69   50     410 my $timeout= $params{timeout} || undef;
30 69   50     160 my $debug= $params{debug} || 0; # show debug?
31 69   50     197 my $debug_sub= $params{debug_sub} || undef;
32              
33             my $retry_sub= (ref $retries) ? $retries :
34             (defined $retries &&
35 0 0   0   0 defined $timeout) ? sub { $_[0] <= $retries && $_[2] <= $timeout } :
36 0     0   0 (defined $retries) ? sub { $_[0] <= $retries } :
37 0     0   0 (defined $timeout) ? sub { $_[2] <= $timeout } :
38 69 50 33 229   1982 sub { 1 };
  229 50       681  
    50          
    50          
39              
40             # primary is replace by standby1, is replaced by
41             # standby2, is replaced by standby3. However, standby1
42             # will exit when standby2's lock is held by another process.
43 69 50       362 my @names= map { ($file_prefix ? "$file_prefix.$_" : $_) }
  276         914  
44             qw(primary standby1 standby2 standby3);
45             my @lockers= map {
46 69 50       322 IPC::ConcurrencyLimit->new(
  276         1788  
47             type => $type,
48             max_procs => 1,
49             # future proofing
50             $type eq "Flock" ? (
51             file_prefix => $_,
52             path => $path,
53             ) : (),
54             )
55             } @names;
56              
57             return bless {
58             poll_time => $poll_time,
59             timeout => $timeout, # FYI
60             retries => $retries, # FYI
61             lock_name => \@names,
62             locker => \@lockers,
63             debug => $debug,
64 0     0   0 debug_sub => $debug_sub || sub { warn @_,"\n" },
65 69   50     1292 retry_sub => $retry_sub,
66             process_name_change => $process_name_change,
67             }, $class;
68             }
69              
70             sub _diag {
71 321     321   513 my ($self, $fmt, @args)= @_;
72 321 50       665 if (!@args) {
73 321         1226 $self->{debug_sub}->($fmt);
74             } else {
75 0         0 $self->{debug_sub}->(sprintf $fmt, @args);
76             }
77             }
78              
79              
80             sub get_lock {
81 69     69 0 26015 my ($self) = @_;
82              
83 69         244 my $locker= $self->{locker};
84 69         139 my $names= $self->{lock_name};
85              
86 69         389 my $old_oh= $0;
87              
88             $0 = "$old_oh - acquire"
89 69 50       193 if $self->{process_name_change};
90              
91             # try to get the rightmost lock (standby3) if we don't get it
92             # then we exit out. this shouldn't really happen if other things
93             # are sane, for instance when $poll_time is much smaller than
94             # the rate we allocate new workers.
95 69         265 my $locker_id= $#$locker;
96 69 50       252 if ( $locker->[$locker_id]->get_lock() ) {
97 69         248 $self->_diag( "Got a $names->[$locker_id] lock");
98             } else {
99             $self->_diag( "Failed to get a $names->[$locker_id] lock, entry lock is held by another process" )
100 0 0       0 if $self->{debug};
101             $0 = "$old_oh - no-lock-acquired"
102 0 0       0 if $self->{process_name_change};
103 0         0 return;
104             }
105              
106             # Each worker tries to acquire the lock to its left. If it does
107             # then it abandons its old lock. If that means the worker ends up
108             # on locker_id 0 then they are done, and can do work.
109             # The first standby worker also looks to its right to see if there
110             # is a replacement process for it, if there is it exits, leaving
111             # a gap and letting the replacements shuffle left.
112 69         6841 my $tries= 0;
113 69         74 my $lock_tries= 0;
114 69         389 my $standby_start= time();
115 69         128 my $lock_start= time();
116 69         109 my $poll_time= $self->{poll_time};
117              
118 69         206 while ( $locker_id > 0 ) {
119             $0 = "$old_oh - $names->[$locker_id]"
120 412 50       1765 if $self->{process_name_change};
121              
122             # can we shuffle our lock left?
123 412 100       3223 if ( $locker->[$locker_id - 1]->get_lock() ) {
124 183         912 $self->_diag( "Got a $names->[$locker_id -1] lock, dropping old $names->[$locker_id] lock");
125             # yep, we got the lock to the left, so drop our old lock,
126             # and move the pointer left at the same time.
127 183         16729 $locker->[ $locker_id-- ]->release_lock();
128 183         225 $lock_tries= 0;
129 183         389 $lock_start= time();
130 183         478 next;
131             }
132              
133 229 50       2086 unless ($self->{retry_sub}->(++$tries, ++$lock_tries, time - $standby_start, time - $lock_start)) {
134             $0 = "$old_oh - no-lock-timeout"
135 0 0       0 if $self->{process_name_change};
136 0         0 return;
137             }
138              
139             # check if we are the first standby worker.
140 229 100       1001 if ( $locker_id == 1 ) {
141             # yep - we are the first standby worker,
142             # so check if the lock to our right is being held:
143 176 100       798 if ( $locker->[$locker_id + 1]->get_lock() ) {
144             # we got the lock, which means nothing else
145             # holds it. so we release the lock and move on.
146 152         589 $locker->[$locker_id + 1]->release_lock();
147             } else {
148             $self->_diag(
149             "A newer worker is holding the $names->[$locker_id+1] lock, will exit to let it take over"
150 24 50       709 ) if $self->{debug};
151             # we failed to get the lock, which means there is a newer
152             # process that can replace us so return/exit - this frees up
153             # our lock and lets the newer process to move into our position.
154             $0 = "$old_oh - no-lock-retired"
155 24 50       2237 if $self->{process_name_change};
156 24         110 return;
157             }
158             }
159              
160             # nope - the lock to our left is being held so sleep a while before
161             # we try again. We use the rand and the formula so that items to the
162             # right poll faster than items to the left, and to reduce the chance
163             # that lock holder 1 and lock holder 3 poll lock 2 at the same time
164             # forever. The formula guarantees that items to the left poll faster,
165             # and the rand ensures there is jitter.
166 205         17377622 sleep rand(($poll_time / $locker_id)*2);
167             }
168              
169             # assert that $locker_id is 0 at this point.
170 45 50       136 die "panic: We should not reach this point with \$locker_id larger than 0, got $locker_id"
171             if $locker_id;
172              
173             $self->_diag("Got $names->[$locker_id] lock, we are allowed to do work.")
174 45 50       263 if $self->{debug};
175              
176             # at this point we should be $locker_id == 0 and we can do work.
177 45 50       2136 if ($self->{process_name_change}) {
178 0 0       0 if ($self->{process_name_change} > 1) {
179 0         0 $0 = $old_oh;
180             } else {
181 0         0 $0 = "$old_oh - $names->[$locker_id]"
182             }
183             }
184 45         99 return 1;
185             }
186              
187              
188             sub is_locked {
189 0     0 0 0 my $self = shift;
190 0         0 return $self->{locker}[0]->is_locked(@_);
191             }
192              
193             sub release_lock {
194 31     31 0 62320075 my $self = shift;
195 31         527 return $self->{locker}[0]->release_lock(@_);
196             }
197              
198             sub lock_id {
199 0     0 0   my $self = shift;
200 0           return $self->{locker}[0]->lock_id(@_);
201             }
202              
203             sub heartbeat {
204 0     0 0   my $self = shift;
205 0           return $self->{locker}[0]->heartbeat;
206             }
207              
208              
209             1;
210              
211             __END__