File Coverage

blib/lib/IPC/LeaderBoard.pm
Criterion Covered Total %
statement 94 95 98.9
branch 35 40 87.5
condition 13 15 86.6
subroutine 15 15 100.0
pod 1 6 16.6
total 158 171 92.4


line stmt bran cond sub pod time code
1             package IPC::LeaderBoard;
2              
3 1     1   41494 use strict;
  1         2  
  1         26  
4 1     1   4 use warnings;
  1         2  
  1         26  
5              
6 1     1   4 use Fcntl ':flock'; # import LOCK_* constants
  1         2  
  1         103  
7 1     1   362 use Guard;
  1         433  
  1         39  
8 1     1   361 use IPC::ScoreBoard;
  1         11563  
  1         6  
9 1     1   812 use Moo;
  1         10532  
  1         7  
10 1     1   1319 use Path::Tiny;
  1         4  
  1         56  
11 1     1   483 use namespace::clean;
  1         10533  
  1         13  
12              
13             our $VERSION = '0.04';
14              
15             my $max_lock_attempts = $ENV{IPC_LEADERBOARD_MAX_SPINLOCK_ATTEMPTS} // 10000;
16              
17             =head1 NAME
18              
19             IPC::LeaderBoard - fast per-symbol online get/update information
20              
21             =head1 VERSION
22              
23             0.02
24              
25             =head1 STATUS
26              
27             =begin HTML
28              
29            

30            
31            

32              
33             =end HTML
34              
35              
36             =head1 SYNOPSIS
37              
38             use IPC::LeaderBoard;
39              
40             # in master-process
41             my $master = IPC::LeaderBoard::create(
42             n_slots => 2, # number of symbols
43             slot_shared_size => 4, # number integers per slot, concurrent access
44             slot_private_size => 2, # number integers per slot, non-concurrent access
45             mmaped_file => "/var/run/data/my.scores", # mmaped file
46             );
47             # ... initialize data here
48              
49             # in slave processes
50             my $slave = IPC::LeaderBoard::attach(
51             # exactly the same parameters as for master
52             );
53              
54             my $leader_board = $slave; # or $master, does not matter
55              
56             # get shared and private arrays of integers for the 0-th slot
57             my ($shared, $private) = $leader_board->read_slot(0);
58              
59             # update shared integers with values 1,2,3,4 and 0-th private integer
60             # with value 6
61             my $success = $leader_board->update(0, [1, 2, 3, 4], 0 => 6, 1 => 8);
62              
63             # $shared = [1, 2, 3, 4], $private = [6, 8]
64             ($shared, $private) = $leader_board->read_slot(0);
65              
66             # update just private integer with index 1 with value 2
67             $leader_board->update(0, 1 => 2);
68              
69             # update just shared values of 0-th slot
70             $success = $leader_board->update(0, [1, 2, 3, 4]);
71              
72             =head1 DESCRIPTION
73              
74             LeaderBoard uses shared memory IPC to fast set/get integers on arbitrary row,
75             (slot) defined by it's index.
76              
77             There are the following assumptions:
78              
79             =over 2
80              
81             =item * only one master is present
82              
83             C method dies, if it founds that some other master ownes shared
84             memory (file lock is used for that).
85              
86             =item * master is launched before slaves
87              
88             C dies, if slave finds, that master-owner isn't present, or,
89             if it presents, the masters provider/symbol information isn't actual.
90             In the last case master should be restarted first.
91              
92             =item * there is no hot-deploy mechanism
93              
94             Just restart master/slaves
95              
96             =item * read slot before update it
97              
98             The vesion/generation pattern is used do detect, whether update
99             has been successfull or not. Update failure means, some other
100             C instance updated the slot; you should re-read it
101             and try uptate it again (if the update will be still actual after
102             data refresh)
103              
104             =item * no semantical difference between slave and master
105              
106             Master was introduced to lock leadear board to prevent other masters
107             connect to it and re-initialize (corrupt) data. After attach slave validates,
108             that LeaderBoard is valid (i.e. number of slots, as well as the sizes
109             of private and shared areas match to the declared).
110              
111             Hence, master can be presented only by one instance, while slaves
112             can be presented by multiple instances.
113              
114             =item * slot data organization and consistency
115              
116             A leaderboard is an array of slots of the same size:
117              
118             +------------------------------------------------------------------------+
119             | slot 1 |
120             +------------------------------------------------------------------------+
121             | slot 2 |
122             +------------------------------------------------------------------------+
123             | ... |
124             +------------------------------------------------------------------------+
125             | slot N |
126             +------------------------------------------------------------------------+
127              
128             A slot is addressed by its index.
129              
130             Each slot contains a spin-lock, a shared part, a generation field and a private part like
131              
132             It is supposed, that only leader (independent for each slot) will update the shared part,
133             while other competitors will update only own private parts, i.e.:
134              
135             | | shared part | | private part |
136             | spin | | gene- | process1 | process2 | process3 |
137             | lock | shr1 | shr2 | ... | shrN | ration | p1 | p2 | ... | pN | p1 | p2 | ... | pN | p1 | p2 | ... | pN |
138              
139             All values (shrX and pX) in the leaderboard are integer numbers. Only the current leader updates
140             the shared part, and does that in safe manner (i.e. protected by spin-lock and generation). Each process can
141             update its own private part of a slot.
142              
143             Read or write for integer values (shr1, p1, ..) read/write B is guaranteed
144             by L, which in the final, uses special CPU-instructions for that.
145              
146             The SpinLock pattern guarantees the safety of shared part update, i.e. in
147             the case of two or more concurrent write request, they will be done in
148             sequential manner.
149              
150             The Generation pattern guarantees that you update the most recent values
151             in the shared part of the slot, i.e. if some process updated shared
152             part of the slot, between slot read and update operations of the
153             current process, than, the update request of the current process
154             would fail. You have re-read the slot, and try to update it again, but
155             after re-read the update might be not required.
156              
157             Both SpinLock and Generation patterns guarantee, that you'll never
158             can made inconsistent C, or updating non-actual data.
159              
160             In the same time, you might end up with the inconsistent C
161             of the shared data: the individual values (integer) are consistent (atomic),
162             but you they might belong to the different generations. There is an assumption
163             in the C design, that it is B: would you try to update
164             the shared data, the C will fail, hence, no any harm will occur. If
165             you need to handle that, just check return value C.
166              
167             There are no any guarantees for slot private data; but it isn't needed.
168             The shared data should store information about leader, hence when a
169             new leader arrives, it updates the information; or the current leader update
170             it's information on the LeaderBoard in the appropriate slot. No data loss might
171             occur.
172              
173             When competitor (i.e. some process) updates private data, nobody else
174             can update it (i.e. you shouldn't write progam such a way, that one
175             process-competitor updates data of the other process-competitor), hence,
176             private data cannot be corrupted if used properly.
177              
178             The private data might be inconsistent on read (e.g. competitor1 reads
179             private data of competitor2, while it is half-updated by competitor2);
180             but that shoudl be B. If it is
181             significant, use shared memory for that, re-design your approach (e.g
182             use additional slots) or use some other module.
183              
184             =back
185              
186             The update process should be rather simple: C
187             and then start all together. C / C should be wrappend into
188             C (or C & friends), to repeat seveal attempts with some delay.
189              
190             The C method might fail, (i.e. it does not returns true), when it detects,
191             that somebody else already has changed an row. It is assumed that no any harm
192             in it. If needed the row can be refreshed (re-read), and the next update
193             might be successfull.
194              
195             It is assumed, that if C returs outdated data and the C decision
196             has been taken, than update will silently fail (return false), without any
197             loud exceptions; so, the next read-update cycle might be successful, but
198             probably, the updated values are already correct, so, no immediate update
199             would occur.
200              
201             =for Pod::Coverage BUILD DEMOLISH attach create mmaped_file n_slots read_slot slot_private_size slot_shared_size
202              
203             =cut
204              
205             has mmaped_file => (
206             is => 'ro',
207             required => 1
208             );
209              
210             has n_slots => (
211             is => 'ro',
212             required => 1
213             );
214              
215             has slot_shared_size => (
216             is => 'ro',
217             required => 1
218             );
219              
220             has slot_private_size => (
221             is => 'ro',
222             required => 1
223             );
224              
225             has _mode => (
226             is => 'ro',
227             required => 1
228             );
229              
230             has _score_board => (is => 'rw');
231             has _fd => (is => 'rw');
232             has _generation_idx => (is => 'rw');
233             has _last_generation => (is => 'rw');
234             has _last_idx => (
235             is => 'rw',
236             default => sub { -1 });
237              
238             sub BUILD {
239 12     12 0 138 my $self = shift;
240 12         35 my $mode = $self->_mode;
241 12 50       56 die("unknown mode '$mode'") unless $mode =~ /(slave)|(master)/;
242              
243             # construct ids (number, actually the order) for all symbols
244             # and providers. Should be sorted to guaranttee the same
245             # ids in different proccess
246             # There is an assumption, that processes, using LeaderBoard, should
247             # restarte in case of symbols/providers change.
248              
249 12         31 my $filename = $self->mmaped_file;
250 12 50 66     213 if (!(-e $filename) && ($mode eq 'slave')) {
251 0         0 die("LeaderBoard ($filename) is abandoned, cannot attach to it (file not exists)");
252             }
253              
254 12         48 my $scoreboard_path = path($filename);
255 12 100       555 $scoreboard_path->touch if !-e $filename;
256 12         338 my $fd = $scoreboard_path->filehandle('<');
257              
258 12         1094 my $score_board;
259 12 100       34 if ($mode eq 'slave') {
260             # die, if slave was able to lock it, that means, that master
261             # didn't accquired the exclusive lock, i.e. no master
262 7 100       36 flock($fd, LOCK_SH | LOCK_NB)
263             && die("LeaderBoard ($filename) is abandoned, cannot attach to it (shared lock obtained)");
264 6         681 my ($sb, $nslots, $slotsize) = IPC::ScoreBoard->open($filename);
265             # just additional check, that providers/symbols information is actual
266 6         1290 my $declared_size = $self->slot_shared_size + $self->slot_private_size + 2;
267 6 50       24 die("number of slots mismatch") unless $nslots == $self->n_slots;
268 6 50       16 die("slot size mismatch") unless $slotsize == $declared_size;
269 6         24 $score_board = $sb;
270             } else {
271             # die if we can't lock it, that means, another master-process
272             # already acquired it
273 5 100       22 flock($fd, LOCK_EX | LOCK_NB)
274             || die("LeaderBoard ($filename) is owned by some other process, cannot lock it exclusively");
275             # we use the addtitional fields: for spinlock and generation
276 4         651 my $declared_size = $self->slot_shared_size + $self->slot_private_size + 2;
277 4         46 $score_board = IPC::ScoreBoard->named($filename, $self->n_slots, $declared_size, 0);
278 4         1434 $self->_fd($fd);
279             }
280 10         42 $self->_generation_idx($self->slot_shared_size + 1); # [spin_lock | shared_data | generation | private_data ]
281 10         32 $self->_score_board($score_board);
282 10         118 return;
283             }
284              
285             sub DEMOLISH {
286 12     12 0 8182 my $self = shift;
287             # actually we need that only for tests
288 12 100       55 if ($self->_mode eq 'master') {
289 5 100       36 flock($self->_fd, LOCK_UN) if ($self->_fd);
290             }
291 12         775 return;
292             }
293              
294             sub attach {
295 7     7 0 908 return IPC::LeaderBoard->new({
296             _mode => 'slave',
297             @_,
298             });
299             }
300              
301             sub create {
302 5     5 0 20107 return IPC::LeaderBoard->new({
303             _mode => 'master',
304             @_,
305             });
306             }
307              
308             # our use-case implies, that if we read a bit outdated data, this is OK, because
309             # the generation field will be outdated, hence, no update would occur
310             sub read_slot {
311 12     12 0 1488 my ($self, $idx) = @_;
312 12 100 100     99 die("wrong index") if ($idx >= $self->n_slots) || $idx < 0;
313              
314 10         61 my @all_values = $self->_score_board->get_all($idx);
315             # drop spinlock and generation
316 10         36 my $generation = splice @all_values, $self->_generation_idx, 1;
317 10         21 splice @all_values, 0, 1;
318              
319             # record generation + index for further possible update
320 10         26 $self->_last_idx($idx);
321 10         26 $self->_last_generation($generation);
322              
323             # separate shared and private data
324 10         25 my $shared_size = $self->slot_shared_size;
325 10         38 my @shared_values = @all_values[0 .. $shared_size - 1];
326 10         39 my @private_values = @all_values[$shared_size .. $shared_size + $self->slot_private_size - 1];
327              
328 10         84 return \@shared_values, \@private_values;
329             }
330              
331             sub update {
332 12     12 1 4167 my ($self, $idx, @rest) = @_;
333 12 100 66     71 my $values = (@rest && ref($rest[0]) eq 'ARRAY') ? shift(@rest) : undef;
334 12         38 my %private_values = @rest;
335 12         19 my $operation_result = 0;
336 12 100 100     77 die("wrong index") if ($idx >= $self->n_slots) || $idx < 0;
337 10 50       39 die("update for only last read index is allowed") if $idx != $self->_last_idx;
338              
339 10         20 my $sb = $self->_score_board;
340              
341             # updating shared values
342 10 100       24 if ($values) {
343 7 100       35 die("values size mismatch slot size") if @$values != $self->slot_shared_size;
344              
345             # obtain spin-lock
346 5         8 my $attempts = 0;
347 5         30 while ($sb->incr($idx, 0) != 1) {
348 10001         24596 $sb->decr($idx, 0);
349 10001 100       31877 if (++$attempts > $max_lock_attempts) {
350 1         39 warn("failed to acquire spin lock for row $idx after $attempts attempts");
351 1         20 return 0;
352             }
353             }
354             # release the lock at the end of the scope
355 4     4   25 scope_guard { $sb->decr($idx, 0) };
  4         21  
356              
357             # now we hold the record, nobody else can update it.
358             # Atomically read generation value via increment it to zero.
359             # The simple $sb->get(...) cannot be used, because it does not guarantees
360             # atomicity, i.e. slot re-write is possible due to L1/L2 caches in CPU
361 4         12 my $actual_generation = $sb->incr($idx, $self->_generation_idx, 0);
362 4 100       20 if ($actual_generation == $self->_last_generation) {
363             # now we are sure, that nobody else updated the record since our last read
364             # so we can safely update it
365              
366             # +1 because the 1st field is spinlock
367 2         17 $sb->set($idx, $_ + 1, $values->[$_]) for (0 .. @$values - 1);
368             # increment the generation field
369 2         8 $sb->incr($idx, $self->_generation_idx);
370             # success
371 2         7 $operation_result = 1;
372             }
373             }
374              
375             # updating private values
376 7 100       25 if (%private_values) {
377 6         14 my $idx_delta = $self->_generation_idx + 1;
378 6         17 for my $private_idx (keys %private_values) {
379 7         11 my $value = $private_values{$private_idx};
380 7 100 100     45 if (($private_idx >= $self->slot_private_size) || ($private_idx < 0)) {
381 2         19 die("wrong private index");
382             }
383 5         16 $sb->set($idx, $private_idx + $idx_delta, $value);
384             }
385             }
386              
387 5         28 return $operation_result;
388             }
389              
390             =head1 AUTHOR
391              
392             binary.com, C<< >>
393              
394             =head1 BUGS
395              
396             Please report any bugs or feature requests to
397             L.
398              
399             =cut
400              
401             1;