File Coverage

lib/Beekeeper/Worker/Extension/SharedCache.pm
Criterion Covered Total %
statement 30 267 11.2
branch 0 94 0.0
condition 0 18 0.0
subroutine 10 46 21.7
pod 0 1 0.0
total 40 426 9.3


line stmt bran cond sub pod time code
1             package Beekeeper::Worker::Extension::SharedCache;
2              
3 1     1   1044 use strict;
  1         2  
  1         31  
4 1     1   5 use warnings;
  1         13  
  1         41  
5              
6             our $VERSION = '0.10';
7              
8 1     1   11 use Exporter 'import';
  1         2  
  1         81  
9              
10             our @EXPORT = qw( shared_cache );
11              
12              
13             sub shared_cache {
14 0     0 0   my $self = shift;
15              
16 0           Beekeeper::Worker::Extension::SharedCache::Cache->new(
17             worker => $self,
18             @_
19             );
20             }
21              
22             package
23             Beekeeper::Worker::Extension::SharedCache::Cache; # hide from PAUSE
24              
25 1     1   6 use Beekeeper::Worker ':log';
  1         1  
  1         206  
26 1     1   11 use AnyEvent;
  1         2  
  1         26  
27 1     1   6 use JSON::XS;
  1         2  
  1         78  
28 1     1   7 use Fcntl qw(:DEFAULT :flock);
  1         1  
  1         398  
29 1     1   8 use Scalar::Util 'weaken';
  1         2  
  1         49  
30 1     1   5 use Carp;
  1         2  
  1         70  
31              
32 1     1   6 use constant SYNC_REQUEST_TIMEOUT => 30;
  1         2  
  1         3830  
33              
34             # Show errors from perspective of caller
35             $Carp::Internal{(__PACKAGE__)}++;
36              
37              
38             sub new {
39 0     0     my ($class, %args) = @_;
40              
41 0           my $worker = $args{'worker'};
42 0           my $id = $args{'id'};
43 0           my $uid = "$$-" . int(rand(90000000)+10000000);
44 0           my $pool_id = $worker->{_WORKER}->{pool_id};
45              
46             my $self = {
47             id => $id,
48             uid => $uid,
49             pool_id => $pool_id,
50             resolver => $args{'resolver'},
51             on_update => $args{'on_update'},
52             persist => $args{'persist'},
53             max_age => $args{'max_age'},
54 0           refresh => $args{'refresh'},
55             synced => 0,
56             data => {},
57             vers => {},
58             time => {},
59             _BUS => undef,
60             _BUS_GROUP=> undef,
61             };
62              
63 0           bless $self, $class;
64              
65 0 0         $self->_load_state if $self->{persist};
66              
67 0           $self->_connect_to_all_brokers($worker);
68              
69 0           my $Self = $self;
70 0           weaken $Self;
71              
72 0           AnyEvent->now_update;
73              
74 0 0         if ($self->{max_age}) {
75              
76             $self->{gc_timer} = AnyEvent->timer(
77             after => $self->{max_age} * rand() + 60,
78             interval => $self->{max_age},
79 0     0     cb => sub { $Self->_gc },
80 0           );
81             }
82              
83 0 0         if ($self->{refresh}) {
84              
85             $self->{refresh_timer} = AnyEvent->timer(
86             after => $self->{refresh} * rand() + 60,
87             interval => $self->{refresh},
88 0     0     cb => sub { $Self->_send_sync_request },
89 0           );
90             }
91              
92             # Ping backend brokers to avoid disconnections due to inactivity
93             $self->{ping_timer} = AnyEvent->timer(
94             after => 60 * rand(),
95             interval => 60,
96 0     0     cb => sub { $Self->_ping_backend_brokers },
97 0           );
98              
99 0           return $self;
100             }
101              
102             sub _connect_to_all_brokers {
103 0     0     my ($self, $worker) = @_;
104 0           weaken $self;
105              
106             #TODO: using multiple shared_cache from the same worker will cause multiple bus connections
107              
108 0           my $worker_bus = $worker->{_BUS};
109 0           my $group_config = Beekeeper::Config->get_bus_group_config( bus_id => $worker_bus->bus_id );
110              
111 0           my $bus_group = $self->{_BUS_GROUP} = [];
112              
113 0           foreach my $config (@$group_config) {
114              
115 0           my $bus_id = $config->{'bus_id'};
116              
117 0 0         if ($bus_id eq $worker_bus->bus_id) {
118             # Already connected to our own bus
119 0           $self->_setup_sync_listeners($worker_bus);
120 0           $self->_send_sync_request($worker_bus);
121 0           $self->{_BUS} = $worker_bus;
122 0           weaken $self->{_BUS};
123 0           next;
124             }
125              
126 0           my $bus; $bus = Beekeeper::MQTT->new(
127             %$config,
128             bus_id => $bus_id,
129             timeout => 300,
130             on_error => sub {
131             # Reconnect
132 0   0 0     my $errmsg = $_[0] || ""; $errmsg =~ s/\s+/ /sg;
  0            
133 0           log_error "Connection to $bus_id failed: $errmsg";
134 0           my $delay = $self->{connect_err}->{$bus_id}++;
135             $self->{reconnect_tmr}->{$bus_id} = AnyEvent->timer(
136             after => ($delay < 10 ? $delay * 3 : 30),
137             cb => sub {
138             $bus->connect(
139             on_connack => sub {
140 0           log_warn "Reconnected to $bus_id";
141 0           $self->_setup_sync_listeners($bus);
142 0 0         $self->_accept_sync_requests($bus) if $self->{synced};
143             }
144 0           );
145             },
146 0 0         );
147             },
148 0           );
149              
150 0           push @$bus_group, $bus;
151              
152             $bus->connect(
153             on_connack => sub {
154             # Setup
155 0     0     log_debug "Connected to $bus_id";
156 0           $self->_setup_sync_listeners($bus);
157 0 0         $self->_accept_sync_requests($bus) if $self->{synced};
158             },
159 0           );
160             }
161             }
162              
163             sub _setup_sync_listeners {
164 0     0     my ($self, $bus) = @_;
165 0           weaken $self;
166              
167 0           my $cache_id = $self->{id};
168 0           my $uid = $self->{uid};
169 0           my $local_bus = $bus->{bus_role};
170 0           my $client_id = $bus->{client_id};
171              
172 0           my $topic = "msg/$local_bus/_sync/$cache_id/set";
173              
174             $bus->subscribe(
175             topic => $topic,
176             on_publish => sub {
177             # my ($payload_ref, $mqtt_properties) = @_;
178              
179 0     0     my $entry = decode_json( ${$_[0]} );
  0            
180              
181 0           $self->_merge($entry);
182             },
183             on_suback => sub {
184 0     0     my ($success) = @_;
185 0 0         log_error "Could not subscribe to topic '$topic'" unless $success;
186             }
187 0           );
188              
189 0           my $reply_topic = "priv/$client_id/sync-$cache_id";
190              
191             $bus->subscribe(
192             topic => $reply_topic,
193             on_publish => sub {
194 0     0     my ($payload_ref, $mqtt_properties) = @_;
195              
196 0           my $dump = decode_json($$payload_ref);
197              
198 0           $self->_merge_dump($dump);
199              
200 0           $self->_sync_completed(1);
201             },
202             on_suback => sub {
203 0     0     my ($success) = @_;
204 0 0         log_error "Could not subscribe to reply topic '$reply_topic'" unless $success;
205             }
206 0           );
207             }
208              
209             sub _send_sync_request {
210 0     0     my ($self, $bus) = @_;
211 0           weaken $self;
212              
213             # Do not send more than one sync request at the time
214 0 0         return if $self->{_sync_timeout};
215              
216 0           my $cache_id = $self->{id};
217 0           my $uid = $self->{uid};
218 0           my $local_bus = $bus->{bus_role};
219 0           my $client_id = $bus->{client_id};
220              
221 0           $bus->publish(
222             topic => "req/$local_bus/_sync/$cache_id/dump",
223             response_topic => "priv/$client_id/sync-$cache_id",
224             );
225              
226             # Ensure that timeout is set properly when the event loop was blocked
227 0           AnyEvent->now_update;
228              
229             # When a fresh pool is started there is no master to reply sync requests
230             $self->{_sync_timeout} = AnyEvent->timer(
231             after => SYNC_REQUEST_TIMEOUT,
232 0     0     cb => sub { $self->_sync_completed(0) },
233 0           );
234             }
235              
236             sub _sync_completed {
237 0     0     my ($self, $success) = @_;
238              
239 0           delete $self->{_sync_timeout};
240              
241 0 0         return if $self->{synced};
242              
243             # BUG: When a fresh pool is started there is no master to reply sync requests.
244             # When two fresh pools are started at t0 and t1 time, and (t1 - t0) < SYNC_REQUEST_TIMEOUT,
245             # cache updates in the t0-t1 range are not properly synced in the pool wich was started later
246 0 0         log_debug( "Shared cache '$self->{id}': " . ($success ? "Sync completed" : "Acting as master"));
247              
248 0           $self->{synced} = 1;
249              
250 0           foreach my $bus ( @{$self->{_BUS_GROUP}} ) {
  0            
251              
252             # Connections to other buses could have failed or be in progress
253 0 0         next unless $bus->{is_connected};
254              
255 0           $self->_accept_sync_requests($bus);
256             }
257             }
258              
259             sub _accept_sync_requests {
260 0     0     my ($self, $bus) = @_;
261 0           weaken $self;
262 0           weaken $bus;
263              
264 0           my $cache_id = $self->{id};
265 0           my $uid = $self->{uid};
266 0           my $bus_id = $bus->{bus_id};
267 0           my $local_bus = $bus->{bus_role};
268              
269 0           log_debug "Shared cache '$self->{id}': Accepting sync requests from $local_bus";
270              
271 0           my $topic = "\$share/BKPR/req/$local_bus/_sync/$cache_id/dump";
272              
273             $bus->subscribe(
274             topic => $topic,
275             on_publish => sub {
276 0     0     my ($payload_ref, $mqtt_properties) = @_;
277              
278 0           my $dump = encode_json( $self->dump );
279              
280             $bus->publish(
281 0           topic => $mqtt_properties->{'response_topic'},
282             payload => \$dump,
283             );
284             },
285             on_suback => sub {
286 0     0     my ($success) = @_;
287 0 0         log_error "Could not subscribe to topic '$topic'" unless $success;
288             }
289 0           );
290             }
291              
292             sub _ping_backend_brokers {
293 0     0     my $self = shift;
294              
295 0           foreach my $bus (@{$self->{_BUS_GROUP}}) {
  0            
296              
297 0 0         next unless $bus->{is_connected};
298 0           $bus->pingreq;
299             }
300             }
301              
302             my $_now = 0;
303              
304             sub set {
305 0     0     my ($self, $key, $value) = @_;
306 0           weaken $self;
307              
308 0 0         croak "Key value is undefined" unless (defined $key);
309              
310 0           my $old = $self->{data}->{$key};
311              
312 0           $self->{data}->{$key} = $value;
313 0           $self->{vers}->{$key}++;
314 0           $self->{time}->{$key} = Time::HiRes::time();
315              
316             my $json = encode_json([
317             $key,
318             $value,
319             $self->{vers}->{$key},
320             $self->{time}->{$key},
321             $self->{uid},
322 0           ]);
323              
324 0 0         $self->{on_update}->($key, $value, $old) if $self->{on_update};
325              
326             # Notify all workers in every cluster about the change
327 0           my @bus_group = grep { $_->{is_connected} } @{$self->{_BUS_GROUP}};
  0            
  0            
328              
329 0           unshift @bus_group, $self->{_BUS};
330              
331 0           foreach my $bus (@bus_group) {
332 0           my $local_bus = $bus->{bus_role};
333 0           my $cache_id = $self->{id};
334              
335 0           $bus->publish(
336             topic => "msg/$local_bus/_sync/$cache_id/set",
337             payload => \$json,
338             );
339             }
340              
341 0 0         unless (defined $value) {
342             # Postpone delete because it is necessary to keep the versioning
343             # of this modification until it is propagated to all workers
344              
345             # Ensure that timer is set properly when the event loop was blocked
346 0 0         if ($_now != time) { $_now = time; AnyEvent->now_update }
  0            
  0            
347              
348             $self->{_destroy}->{$key} = AnyEvent->timer( after => 60, cb => sub {
349 0     0     delete $self->{_destroy}->{$key};
350 0           delete $self->{data}->{$key};
351 0           delete $self->{vers}->{$key};
352 0           delete $self->{time}->{$key};
353 0           });
354             }
355              
356 0           return 1;
357             }
358              
359             sub get {
360 0     0     my ($self, $key) = @_;
361              
362 0           $self->{data}->{$key};
363             }
364              
365             sub delete {
366 0     0     my ($self, $key) = @_;
367              
368 0           $self->set( $key => undef );
369             }
370              
371             sub raw_data {
372 0     0     my $self = shift;
373              
374 0           $self->{data};
375             }
376              
377             sub _merge {
378 0     0     my ($self, $entry) = @_;
379              
380 0           my ($key, $value, $version, $time, $uid) = @$entry;
381              
382             # Discard updates sent by myself
383 0 0 0       return if (defined $uid && $uid eq $self->{uid});
384              
385 0 0 0       if ($version > ($self->{vers}->{$key} || 0)) {
    0          
386              
387             # Received a fresher value for the entry
388 0           my $old = $self->{data}->{$key};
389              
390 0           $self->{data}->{$key} = $value;
391 0           $self->{vers}->{$key} = $version;
392 0           $self->{time}->{$key} = $time;
393              
394 0 0         $self->{on_update}->($key, $value, $old) if $self->{on_update};
395             }
396             elsif ($version < $self->{vers}->{$key}) {
397              
398             # Received a stale value (we have a newest version)
399 0           return;
400             }
401             else {
402              
403             # Version conflict, default resolution is to keep newest value
404             my $resolver = $self->{resolver} || sub {
405 0 0   0     return $_[0]->{time} > $_[1]->{time} ? $_[0] : $_[1];
406 0   0       };
407              
408             my $keep = $resolver->(
409             { # Mine
410             data => $self->{data}->{$key},
411             vers => $self->{vers}->{$key},
412 0           time => $self->{time}->{$key},
413             },
414             { # Theirs
415             data => $value,
416             vers => $version,
417             time => $time,
418             },
419             );
420              
421 0           my $old = $self->{data}->{$key};
422              
423 0           $self->{data}->{$key} = $keep->{data};
424 0           $self->{vers}->{$key} = $keep->{vers};
425 0           $self->{time}->{$key} = $keep->{time};
426              
427 0 0         $self->{on_update}->($key, $keep->{data}, $old) if $self->{on_update};
428             }
429              
430 0 0         unless (defined $self->{data}->{$key}) {
431             # Postpone delete because it is necessary to keep the versioning
432             # of this modification until it is propagated to all workers
433              
434             # Ensure that timer is set properly when the event loop was blocked
435 0 0         if ($_now != time) { $_now = time; AnyEvent->now_update }
  0            
  0            
436              
437             $self->{_destroy}->{$key} = AnyEvent->timer( after => 60, cb => sub {
438 0     0     delete $self->{_destroy}->{$key};
439 0           delete $self->{data}->{$key};
440 0           delete $self->{vers}->{$key};
441 0           delete $self->{time}->{$key};
442 0           });
443             }
444             }
445              
446             sub dump {
447 0     0     my $self = shift;
448              
449 0           my @dump;
450              
451 0           foreach my $key (keys %{$self->{data}}) {
  0            
452             push @dump, [
453             $key,
454             $self->{data}->{$key},
455             $self->{vers}->{$key},
456 0           $self->{time}->{$key},
457             ];
458             }
459              
460             return {
461             uid => $self->{uid},
462 0           time => Time::HiRes::time(),
463             dump => \@dump,
464             };
465             }
466              
467             sub _merge_dump {
468 0     0     my ($self, $dump) = @_;
469              
470             # Discard dumps sent by myself
471 0 0         return if ($dump->{uid} eq $self->{uid});
472              
473 0           foreach my $entry (@{$dump->{dump}}) {
  0            
474 0           $self->_merge($entry);
475             }
476             }
477              
478             sub touch {
479 0     0     my ($self, $key) = @_;
480              
481 0 0         return unless defined $self->{data}->{$key};
482              
483 0 0         croak "No max_age specified (gc is disabled)" unless $self->{max_age};
484              
485 0           my $age = time() - $self->{time}->{$key};
486              
487 0 0         return unless ( $age > $self->{max_age} * 0.3);
488 0 0         return unless ( $age < $self->{max_age} * 1.3);
489              
490             # Set to current value but without increasing version
491 0           $self->{vers}->{$key}--;
492              
493 0           $self->set( $key => $self->{data}->{$key} );
494             }
495              
496             sub _gc {
497 0     0     my $self = shift;
498              
499 0           my $min_time = time() - $self->{max_age} * 1.3;
500              
501 0           foreach my $key (keys %{$self->{data}}) {
  0            
502              
503 0 0         next unless ( $self->{time}->{$key} < $min_time );
504 0 0         next unless ( defined $self->{data}->{$key} );
505              
506 0           $self->delete( $key );
507             }
508             }
509              
510             sub _save_state {
511 0     0     my $self = shift;
512              
513 0 0         return unless ($self->{synced});
514              
515 0           my $id = $self->{id};
516 0           my ($pool_id) = ($self->{pool_id} =~ m/^([\w-]+)$/); # untaint
517 0           my $tmp_file = "/tmp/beekeeper-sharedcache-$pool_id-$id.dump";
518              
519             # Avoid stampede when several workers are exiting simultaneously
520 0 0 0       return if (-e $tmp_file && (stat($tmp_file))[9] == time());
521              
522             # Lock file because several workers may try to write simultaneously to it
523 0 0         sysopen(my $fh, $tmp_file, O_RDWR|O_CREAT) or return;
524 0 0         flock($fh, LOCK_EX | LOCK_NB) or return;
525 0 0         truncate($fh, 0) or return;
526              
527 0           print $fh encode_json( $self->dump );
528              
529 0           close($fh);
530             }
531              
532             sub _load_state {
533 0     0     my $self = shift;
534              
535 0           my $id = $self->{id};
536 0           my ($pool_id) = ($self->{pool_id} =~ m/^([\w-]+)$/); # untaint
537 0           my $tmp_file = "/tmp/beekeeper-sharedcache-$pool_id-$id.dump";
538              
539 0 0         return unless (-e $tmp_file);
540              
541             # Do not load stale dumps
542 0 0 0       return if ($self->{max_age} && (stat($tmp_file))[9] < time() - $self->{max_age});
543              
544 0           local($/);
545 0 0         open(my $fh, '<', $tmp_file) or die "Couldn't read $tmp_file: $!";
546 0           my $data = <$fh>;
547 0           close($fh);
548              
549 0           local $@;
550 0           my $dump = eval { decode_json($data) };
  0            
551 0 0         return if $@;
552              
553 0 0         my $min_time = $self->{max_age} ? time() - $self->{max_age} : undef;
554              
555 0           foreach my $entry (@{$dump->{dump}}) {
  0            
556             # Do not merge stale entries
557 0 0 0       next if ($min_time && $entry->[3] < $min_time);
558 0           $self->_merge($entry);
559             }
560             }
561              
562             sub _disconnect {
563 0     0     my $self = shift;
564              
565 0 0         $self->_save_state if $self->{persist};
566              
567 0           foreach my $bus (@{$self->{_BUS_GROUP}}) {
  0            
568              
569 0 0         next unless ($bus->{is_connected});
570 0           $bus->disconnect;
571             }
572             }
573              
574             sub DESTROY {
575 0     0     my $self = shift;
576              
577 0           $self->_disconnect;
578             }
579              
580             1;
581              
582             __END__