File Coverage

blib/lib/IPC/PubSub/Cache.pm
Criterion Covered Total %
statement 55 57 96.4
branch 5 6 83.3
condition 2 2 100.0
subroutine 14 15 93.3
pod 0 8 0.0
total 76 88 86.3


line stmt bran cond sub pod time code
1             package IPC::PubSub::Cache;
2 2     2   12 use strict;
  2         2  
  2         60  
3 2     2   9 use warnings;
  2         2  
  2         52  
4 2     2   9 use File::Spec;
  2         3  
  2         45  
5 2     2   1953 use Time::HiRes ();
  2         4388  
  2         561  
6              
7             #method fetch (Str *@keys --> List of Pair) { ... }
8             #method store (Str $key, Str $val, Num $time, Num $expiry) { ... }
9              
10             #method add_publisher (Str $chan, Str $pub) { ... }
11             #method remove_publisher (Str $chan, Str $pub) { ... }
12              
13             #method get_index (Str $chan, Str $pub --> Int) { ... }
14             #method set_index (Str $chan, Str $pub, Int $index) { ... }
15              
16             #method publisher_indices (Str $chan --> Hash of Int) { ... }
17              
18             sub fetch_data {
19 8     8 0 12 my $self = shift;
20 8         12 my $key = shift;
21 8   100     40 return (($self->fetch("data:$key"))[0] || [])->[-1];
22             }
23              
24             sub store_data {
25 6     6 0 11 my $self = shift;
26 6         8 my $key = shift;
27 6         10 my $val = shift;
28 6         28 $self->store("data:$key" => $val, -1, 0);
29             }
30              
31             sub modify {
32 12     12 0 21 my $self = shift;
33 12         20 my $key = shift;
34 12 100       53 return $self->fetch_data($key) unless @_;
35              
36 6         11 my $with = shift;
37              
38 6 100       19 if (ref($with) eq 'CODE') {
39 2         19 $self->lock("data:$key");
40 2         7 local $_ = $self->fetch_data($key);
41 2         895 my $rv = $with->();
42 2         18 $self->store_data($key => $_);
43 2         1344 $self->unlock("data:$key");
44 2         12 return $rv;
45             }
46             else {
47 4         24 $self->store_data($key => $with);
48 4         3289 return $with;
49             }
50             }
51              
52             sub get {
53 12     12 0 29 my ($self, $chan, $orig, $curr) = @_;
54              
55 2     2   14 no warnings 'uninitialized';
  2         4  
  2         487  
56 8         2392 sort { $a->[0] <=> $b->[0] } $self->fetch(
  8         15  
57             map {
58 12         45 my $pub = $_;
59 8         21 my $index = $curr->{$pub};
60 10         133 map {
61 8         50 "chan:$chan-$pub$_"
62             } (($orig->{$pub}+1) .. $index);
63             } keys(%$curr)
64             );
65             }
66              
67             sub put {
68 16     16 0 47 my ($self, $chan, $pub, $index, $msg, $expiry) = @_;
69 16         147 $self->store("chan:$chan-$pub$index", $msg, Time::HiRes::time(), $expiry);
70 16         23478 $self->set_index($chan, $pub, $index);
71             }
72              
73              
74 2     2   12 use constant LOCK => File::Spec->catdir(File::Spec->tmpdir, 'IPC-PubSub-lock-');
  2         11  
  2         618  
75              
76             my %locks;
77             sub lock {
78 2     2 0 4 my ($self, $chan) = @_;
79 2         7 for my $i (1..10) {
80 2 50       310 return if mkdir((LOCK . unpack("H*", $chan)), 0777);
81 0         0 Time::HiRes::usleep(rand(250000)+250000);
82             }
83             }
84              
85 0     0 0 0 sub disconnect {
86             }
87              
88             END {
89 2     2   274 rmdir(LOCK . unpack("H*", $_)) for keys %locks;
90             }
91              
92             sub unlock {
93 2     2 0 6 my ($self, $chan) = @_;
94 2         318 rmdir(LOCK . unpack("H*", $chan));
95 2         6 delete $locks{$chan};
96             }
97              
98             1;