File Coverage

lib/Beekeeper/Worker/Extension/SharedCache.pm
Criterion Covered Total %
statement 30 257 11.6
branch 0 90 0.0
condition 0 18 0.0
subroutine 10 44 22.7
pod 0 1 0.0
total 40 410 9.7


line stmt bran cond sub pod time code
1             package Beekeeper::Worker::Extension::SharedCache;
2              
3 1     1   1065 use strict;
  1         2  
  1         65  
4 1     1   10 use warnings;
  1         2  
  1         51  
5              
6             our $VERSION = '0.08';
7              
8 1     1   5 use Exporter 'import';
  1         2  
  1         85  
9              
10             our @EXPORT = qw( shared_cache );
11              
12              
13             sub shared_cache {
14 0     0 0   my $self = shift;
15              
16 0           Beekeeper::Worker::Extension::SharedCache::Cache->new(
17             worker => $self,
18             @_
19             );
20             }
21              
22             package
23             Beekeeper::Worker::Extension::SharedCache::Cache; # hide from PAUSE
24              
25 1     1   6 use Beekeeper::Worker ':log';
  1         2  
  1         147  
26 1     1   7 use AnyEvent;
  1         2  
  1         18  
27 1     1   5 use JSON::XS;
  1         14  
  1         73  
28 1     1   7 use Fcntl qw(:DEFAULT :flock);
  1         2  
  1         384  
29 1     1   8 use Scalar::Util 'weaken';
  1         3  
  1         56  
30 1     1   7 use Carp;
  1         3  
  1         68  
31              
32 1     1   7 use constant SYNC_REQUEST_TIMEOUT => 30;
  1         2  
  1         3546  
33              
34             # Show errors from perspective of caller
35             $Carp::Internal{(__PACKAGE__)}++;
36              
37              
38             sub new {
39 0     0     my ($class, %args) = @_;
40              
41 0           my $worker = $args{'worker'};
42 0           my $id = $args{'id'};
43 0           my $uid = "$$-" . int(rand(90000000)+10000000);
44 0           my $pool_id = $worker->{_WORKER}->{pool_id};
45              
46             my $self = {
47             id => $id,
48             uid => $uid,
49             pool_id => $pool_id,
50             resolver => $args{'resolver'},
51             on_update => $args{'on_update'},
52             persist => $args{'persist'},
53             max_age => $args{'max_age'},
54 0           refresh => $args{'refresh'},
55             synced => 0,
56             data => {},
57             vers => {},
58             time => {},
59             _BUS => undef,
60             _BUS_GROUP=> undef,
61             };
62              
63 0           bless $self, $class;
64              
65 0 0         $self->_load_state if $self->{persist};
66              
67 0           $self->_connect_to_all_brokers($worker);
68              
69 0           my $Self = $self;
70 0           weaken $Self;
71              
72 0           AnyEvent->now_update;
73              
74 0 0         if ($self->{max_age}) {
75              
76             $self->{gc_timer} = AnyEvent->timer(
77             after => $self->{max_age} * rand() + 60,
78             interval => $self->{max_age},
79 0     0     cb => sub { $Self->_gc },
80 0           );
81             }
82              
83 0 0         if ($self->{refresh}) {
84              
85             $self->{refresh_timer} = AnyEvent->timer(
86             after => $self->{refresh} * rand() + 60,
87             interval => $self->{refresh},
88 0     0     cb => sub { $Self->_send_sync_request },
89 0           );
90             }
91              
92 0           return $self;
93             }
94              
95             sub _connect_to_all_brokers {
96 0     0     my ($self, $worker) = @_;
97 0           weaken $self;
98              
99             #TODO: using multiple shared_cache from the same worker will cause multiple bus connections
100              
101 0           my $worker_bus = $worker->{_BUS};
102 0           my $group_config = Beekeeper::Config->get_bus_group_config( bus_id => $worker_bus->bus_id );
103              
104 0           my $bus_group = $self->{_BUS_GROUP} = [];
105              
106 0           foreach my $config (@$group_config) {
107              
108 0           my $bus_id = $config->{'bus_id'};
109              
110 0 0         if ($bus_id eq $worker_bus->bus_id) {
111             # Already connected to our own bus
112 0           $self->_setup_sync_listeners($worker_bus);
113 0           $self->_send_sync_request($worker_bus);
114 0           $self->{_BUS} = $worker_bus;
115 0           weaken $self->{_BUS};
116 0           next;
117             }
118              
119 0           my $bus; $bus = Beekeeper::MQTT->new(
120             %$config,
121             bus_id => $bus_id,
122             timeout => 300,
123             on_error => sub {
124             # Reconnect
125 0   0 0     my $errmsg = $_[0] || ""; $errmsg =~ s/\s+/ /sg;
  0            
126 0           log_error "Connection to $bus_id failed: $errmsg";
127 0           my $delay = $self->{connect_err}->{$bus_id}++;
128             $self->{reconnect_tmr}->{$bus_id} = AnyEvent->timer(
129             after => ($delay < 10 ? $delay * 3 : 30),
130 0           cb => sub { $bus->connect },
131 0 0         );
132             },
133 0           );
134              
135 0           push @$bus_group, $bus;
136              
137             $bus->connect(
138             on_connack => sub {
139             # Setup
140 0     0     log_debug "Connected to $bus_id";
141 0           $self->_setup_sync_listeners($bus);
142 0 0         $self->_accept_sync_requests($bus) if $self->{synced};
143             },
144 0           );
145             }
146             }
147              
148             sub _setup_sync_listeners {
149 0     0     my ($self, $bus) = @_;
150 0           weaken $self;
151              
152 0           my $cache_id = $self->{id};
153 0           my $uid = $self->{uid};
154 0           my $local_bus = $bus->{bus_role};
155 0           my $client_id = $bus->{client_id};
156              
157 0           my $topic = "msg/$local_bus/_sync/$cache_id/set";
158              
159             $bus->subscribe(
160             topic => $topic,
161             on_publish => sub {
162             # my ($payload_ref, $mqtt_properties) = @_;
163              
164 0     0     my $entry = decode_json( ${$_[0]} );
  0            
165              
166 0           $self->_merge($entry);
167             },
168             on_suback => sub {
169 0     0     my ($success) = @_;
170 0 0         log_error "Could not subscribe to topic '$topic'" unless $success;
171             }
172 0           );
173              
174 0           my $reply_topic = "priv/$client_id/sync-$cache_id";
175              
176             $bus->subscribe(
177             topic => $reply_topic,
178             on_publish => sub {
179 0     0     my ($payload_ref, $mqtt_properties) = @_;
180              
181 0           my $dump = decode_json($$payload_ref);
182              
183 0           $self->_merge_dump($dump);
184              
185 0           $self->_sync_completed(1);
186             },
187             on_suback => sub {
188 0     0     my ($success) = @_;
189 0 0         log_error "Could not subscribe to reply topic '$reply_topic'" unless $success;
190             }
191 0           );
192             }
193              
194             sub _send_sync_request {
195 0     0     my ($self, $bus) = @_;
196 0           weaken $self;
197              
198             # Do not send more than one sync request at the time
199 0 0         return if $self->{_sync_timeout};
200              
201 0           my $cache_id = $self->{id};
202 0           my $uid = $self->{uid};
203 0           my $local_bus = $bus->{bus_role};
204 0           my $client_id = $bus->{client_id};
205              
206 0           $bus->publish(
207             topic => "req/$local_bus/_sync/$cache_id/dump",
208             response_topic => "priv/$client_id/sync-$cache_id",
209             );
210              
211             # Ensure that timeout is set properly when the event loop was blocked
212 0           AnyEvent->now_update;
213              
214             # When a fresh pool is started there is no master to reply sync requests
215             $self->{_sync_timeout} = AnyEvent->timer(
216             after => SYNC_REQUEST_TIMEOUT,
217 0     0     cb => sub { $self->_sync_completed(0) },
218 0           );
219             }
220              
221             sub _sync_completed {
222 0     0     my ($self, $success) = @_;
223              
224 0           delete $self->{_sync_timeout};
225              
226 0 0         return if $self->{synced};
227              
228             # BUG: When a fresh pool is started there is no master to reply sync requests.
229             # When two fresh pools are started at t0 and t1 time, and (t1 - t0) < SYNC_REQUEST_TIMEOUT,
230             # cache updates in the t0-t1 range are not properly synced in the pool wich was started later
231 0 0         log_debug( "Shared cache '$self->{id}': " . ($success ? "Sync completed" : "Acting as master"));
232              
233 0           $self->{synced} = 1;
234              
235 0           foreach my $bus ( @{$self->{_BUS_GROUP}} ) {
  0            
236              
237             # Connections to other buses could have failed or be in progress
238 0 0         next unless $bus->{is_connected};
239              
240 0           $self->_accept_sync_requests($bus);
241             }
242             }
243              
244             sub _accept_sync_requests {
245 0     0     my ($self, $bus) = @_;
246 0           weaken $self;
247 0           weaken $bus;
248              
249 0           my $cache_id = $self->{id};
250 0           my $uid = $self->{uid};
251 0           my $bus_id = $bus->{bus_id};
252 0           my $local_bus = $bus->{bus_role};
253              
254 0           log_debug "Shared cache '$self->{id}': Accepting sync requests from $local_bus";
255              
256 0           my $topic = "\$share/BKPR/req/$local_bus/_sync/$cache_id/dump";
257              
258             $bus->subscribe(
259             topic => $topic,
260             on_publish => sub {
261 0     0     my ($payload_ref, $mqtt_properties) = @_;
262              
263 0           my $dump = encode_json( $self->dump );
264              
265             $bus->publish(
266 0           topic => $mqtt_properties->{'response_topic'},
267             payload => \$dump,
268             );
269             },
270             on_suback => sub {
271 0     0     my ($success) = @_;
272 0 0         log_error "Could not subscribe to topic '$topic'" unless $success;
273             }
274 0           );
275             }
276              
277             my $_now = 0;
278              
279             sub set {
280 0     0     my ($self, $key, $value) = @_;
281 0           weaken $self;
282              
283 0 0         croak "Key value is undefined" unless (defined $key);
284              
285 0           my $old = $self->{data}->{$key};
286              
287 0           $self->{data}->{$key} = $value;
288 0           $self->{vers}->{$key}++;
289 0           $self->{time}->{$key} = Time::HiRes::time();
290              
291             my $json = encode_json([
292             $key,
293             $value,
294             $self->{vers}->{$key},
295             $self->{time}->{$key},
296             $self->{uid},
297 0           ]);
298              
299 0 0         $self->{on_update}->($key, $value, $old) if $self->{on_update};
300              
301             # Notify all workers in every cluster about the change
302 0           my @bus_group = grep { $_->{is_connected} } @{$self->{_BUS_GROUP}};
  0            
  0            
303              
304 0           unshift @bus_group, $self->{_BUS};
305              
306 0           foreach my $bus (@bus_group) {
307 0           my $local_bus = $bus->{bus_role};
308 0           my $cache_id = $self->{id};
309              
310 0           $bus->publish(
311             topic => "msg/$local_bus/_sync/$cache_id/set",
312             payload => \$json,
313             );
314             }
315              
316 0 0         unless (defined $value) {
317             # Postpone delete because it is necessary to keep the versioning
318             # of this modification until it is propagated to all workers
319              
320             # Ensure that timer is set properly when the event loop was blocked
321 0 0         if ($_now != time) { $_now = time; AnyEvent->now_update }
  0            
  0            
322              
323             $self->{_destroy}->{$key} = AnyEvent->timer( after => 60, cb => sub {
324 0     0     delete $self->{_destroy}->{$key};
325 0           delete $self->{data}->{$key};
326 0           delete $self->{vers}->{$key};
327 0           delete $self->{time}->{$key};
328 0           });
329             }
330              
331 0           return 1;
332             }
333              
334             sub get {
335 0     0     my ($self, $key) = @_;
336              
337 0           $self->{data}->{$key};
338             }
339              
340             sub delete {
341 0     0     my ($self, $key) = @_;
342              
343 0           $self->set( $key => undef );
344             }
345              
346             sub raw_data {
347 0     0     my $self = shift;
348              
349 0           $self->{data};
350             }
351              
352             sub _merge {
353 0     0     my ($self, $entry) = @_;
354              
355 0           my ($key, $value, $version, $time, $uid) = @$entry;
356              
357             # Discard updates sent by myself
358 0 0 0       return if (defined $uid && $uid eq $self->{uid});
359              
360 0 0 0       if ($version > ($self->{vers}->{$key} || 0)) {
    0          
361              
362             # Received a fresher value for the entry
363 0           my $old = $self->{data}->{$key};
364              
365 0           $self->{data}->{$key} = $value;
366 0           $self->{vers}->{$key} = $version;
367 0           $self->{time}->{$key} = $time;
368              
369 0 0         $self->{on_update}->($key, $value, $old) if $self->{on_update};
370             }
371             elsif ($version < $self->{vers}->{$key}) {
372              
373             # Received a stale value (we have a newest version)
374 0           return;
375             }
376             else {
377              
378             # Version conflict, default resolution is to keep newest value
379             my $resolver = $self->{resolver} || sub {
380 0 0   0     return $_[0]->{time} > $_[1]->{time} ? $_[0] : $_[1];
381 0   0       };
382              
383             my $keep = $resolver->(
384             { # Mine
385             data => $self->{data}->{$key},
386             vers => $self->{vers}->{$key},
387 0           time => $self->{time}->{$key},
388             },
389             { # Theirs
390             data => $value,
391             vers => $version,
392             time => $time,
393             },
394             );
395              
396 0           my $old = $self->{data}->{$key};
397              
398 0           $self->{data}->{$key} = $keep->{data};
399 0           $self->{vers}->{$key} = $keep->{vers};
400 0           $self->{time}->{$key} = $keep->{time};
401              
402 0 0         $self->{on_update}->($key, $keep->{data}, $old) if $self->{on_update};
403             }
404              
405 0 0         unless (defined $self->{data}->{$key}) {
406             # Postpone delete because it is necessary to keep the versioning
407             # of this modification until it is propagated to all workers
408              
409             # Ensure that timer is set properly when the event loop was blocked
410 0 0         if ($_now != time) { $_now = time; AnyEvent->now_update }
  0            
  0            
411              
412             $self->{_destroy}->{$key} = AnyEvent->timer( after => 60, cb => sub {
413 0     0     delete $self->{_destroy}->{$key};
414 0           delete $self->{data}->{$key};
415 0           delete $self->{vers}->{$key};
416 0           delete $self->{time}->{$key};
417 0           });
418             }
419             }
420              
421             sub dump {
422 0     0     my $self = shift;
423              
424 0           my @dump;
425              
426 0           foreach my $key (keys %{$self->{data}}) {
  0            
427             push @dump, [
428             $key,
429             $self->{data}->{$key},
430             $self->{vers}->{$key},
431 0           $self->{time}->{$key},
432             ];
433             }
434              
435             return {
436             uid => $self->{uid},
437 0           time => Time::HiRes::time(),
438             dump => \@dump,
439             };
440             }
441              
442             sub _merge_dump {
443 0     0     my ($self, $dump) = @_;
444              
445             # Discard dumps sent by myself
446 0 0         return if ($dump->{uid} eq $self->{uid});
447              
448 0           foreach my $entry (@{$dump->{dump}}) {
  0            
449 0           $self->_merge($entry);
450             }
451             }
452              
453             sub touch {
454 0     0     my ($self, $key) = @_;
455              
456 0 0         return unless defined $self->{data}->{$key};
457              
458 0 0         croak "No max_age specified (gc is disabled)" unless $self->{max_age};
459              
460 0           my $age = time() - $self->{time}->{$key};
461              
462 0 0         return unless ( $age > $self->{max_age} * 0.3);
463 0 0         return unless ( $age < $self->{max_age} * 1.3);
464              
465             # Set to current value but without increasing version
466 0           $self->{vers}->{$key}--;
467              
468 0           $self->set( $key => $self->{data}->{$key} );
469             }
470              
471             sub _gc {
472 0     0     my $self = shift;
473              
474 0           my $min_time = time() - $self->{max_age} * 1.3;
475              
476 0           foreach my $key (keys %{$self->{data}}) {
  0            
477              
478 0 0         next unless ( $self->{time}->{$key} < $min_time );
479 0 0         next unless ( defined $self->{data}->{$key} );
480              
481 0           $self->delete( $key );
482             }
483             }
484              
485             sub _save_state {
486 0     0     my $self = shift;
487              
488 0 0         return unless ($self->{synced});
489              
490 0           my $id = $self->{id};
491 0           my ($pool_id) = ($self->{pool_id} =~ m/^([\w-]+)$/); # untaint
492 0           my $tmp_file = "/tmp/beekeeper-sharedcache-$pool_id-$id.dump";
493              
494             # Avoid stampede when several workers are exiting simultaneously
495 0 0 0       return if (-e $tmp_file && (stat($tmp_file))[9] == time());
496              
497             # Lock file because several workers may try to write simultaneously to it
498 0 0         sysopen(my $fh, $tmp_file, O_RDWR|O_CREAT) or return;
499 0 0         flock($fh, LOCK_EX | LOCK_NB) or return;
500 0 0         truncate($fh, 0) or return;
501              
502 0           print $fh encode_json( $self->dump );
503              
504 0           close($fh);
505             }
506              
507             sub _load_state {
508 0     0     my $self = shift;
509              
510 0           my $id = $self->{id};
511 0           my ($pool_id) = ($self->{pool_id} =~ m/^([\w-]+)$/); # untaint
512 0           my $tmp_file = "/tmp/beekeeper-sharedcache-$pool_id-$id.dump";
513              
514 0 0         return unless (-e $tmp_file);
515              
516             # Do not load stale dumps
517 0 0 0       return if ($self->{max_age} && (stat($tmp_file))[9] < time() - $self->{max_age});
518              
519 0           local($/);
520 0 0         open(my $fh, '<', $tmp_file) or die "Couldn't read $tmp_file: $!";
521 0           my $data = <$fh>;
522 0           close($fh);
523              
524 0           local $@;
525 0           my $dump = eval { decode_json($data) };
  0            
526 0 0         return if $@;
527              
528 0 0         my $min_time = $self->{max_age} ? time() - $self->{max_age} : undef;
529              
530 0           foreach my $entry (@{$dump->{dump}}) {
  0            
531             # Do not merge stale entries
532 0 0 0       next if ($min_time && $entry->[3] < $min_time);
533 0           $self->_merge($entry);
534             }
535             }
536              
537             sub _disconnect {
538 0     0     my $self = shift;
539              
540 0 0         $self->_save_state if $self->{persist};
541              
542 0           foreach my $bus (@{$self->{_BUS_GROUP}}) {
  0            
543              
544 0 0         next unless ($bus->{is_connected});
545 0           $bus->disconnect;
546             }
547             }
548              
549             sub DESTROY {
550 0     0     my $self = shift;
551              
552 0           $self->_disconnect;
553             }
554              
555             1;
556              
557             __END__