File Coverage

blib/lib/POE/Component/MessageQueue/Storage/Double.pm
Criterion Covered Total %
statement 68 84 80.9
branch 16 26 61.5
condition 6 12 50.0
subroutine 22 25 88.0
pod 0 7 0.0
total 112 154 72.7


line stmt bran cond sub pod time code
1             #
2             # Copyright 2007 Paul Driver <frodwith@gmail.com>
3             #
4             # This program is free software: you can redistribute it and/or modify
5             # it under the terms of the GNU General Public License as published by
6             # the Free Software Foundation, either version 2 of the License, or
7             # (at your option) any later version.
8             #
9             # This program is distributed in the hope that it will be useful,
10             # but WITHOUT ANY WARRANTY; without even the implied warranty of
11             # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12             # GNU General Public License for more details.
13             #
14             # You should have received a copy of the GNU General Public License
15             # along with this program. If not, see <http://www.gnu.org/licenses/>.
16             #
17              
18             package POE::Component::MessageQueue::Storage::Double;
19 12     12   7547 use Moose::Role;
  12         8569  
  12         90  
20 12     12   63982 use MooseX::MultiInitArg;
  12         244325  
  12         1929  
21              
22             # These guys just call a method on both front and back stores and have a
23             # simple no-arg completion callback. No reason to write them all!
24             foreach my $method (qw(empty disown_destination disown_all)) {
25             __PACKAGE__->meta->add_method($method, sub {
26 1214     1214   4871662 my $self = shift;
        1214      
        1214      
27 1214         3044 my $last = pop;
28 1214 50       4827 if(ref $last eq 'CODE')
29             {
30 1214         4053 my @args = @_;
31             $self->front->$method(@args, sub {
32 1214     1214   42119 $self->back->$method(@args, $last);
33 1214         52634 });
34             }
35             else
36             {
37 0         0 $self->front->$method(@_, $last);
38 0         0 $self->back->$method(@_, $last);
39             }
40             });
41             }
42              
43             with qw(POE::Component::MessageQueue::Storage);
44 12     12   6059 use POE::Component::MessageQueue::Storage::BigMemory;
  12         50  
  12         11584  
45              
46             has front => (
47             metaclass => 'MultiInitArg',
48             init_args => ['front_store'],
49             is => 'ro',
50             does => qw(POE::Component::MessageQueue::Storage),
51             default => sub {POE::Component::MessageQueue::Storage::BigMemory->new()},
52             required => 1,
53             );
54              
55             has back => (
56             is => 'ro',
57             metaclass => 'MultiInitArg',
58             init_args => [qw(back_store storage)],
59             does => qw(POE::Component::MessageQueue::Storage),
60             required => 1,
61             );
62              
63             # Any true value for a given ID means the message is in the front store.
64             # (value may be useful data, like message size)
65             has front_info => (
66             is => 'ro',
67             isa => 'HashRef',
68             default => sub { {} },
69             traits => ['Hash'],
70             handles => {
71             'in_front' => 'exists',
72             'get_front' => 'get',
73             'set_front' => 'set',
74             'clear_front' => 'clear',
75             'delete_front' => 'delete',
76             },
77             );
78              
79             after 'set_logger' => sub {
80             my ($self, $logger) = @_;
81             $self->front->set_logger($logger);
82             $self->back->set_logger($logger);
83             };
84              
85             sub in_back
86             {
87 446     446 0 1101 my ($self, $id) = @_;
88 446 100       17946 return 1 unless $self->in_front($id);
89 361         13667 return $self->get_front($id)->{persisted};
90             }
91              
92             sub _split_ids
93             {
94 72     72   164 my ($self, $ids) = @_;
95 72         156 my (@fids, @bids);
96 72         202 foreach my $id (@$ids)
97             {
98 78 50       3580 push (@fids, $id) if $self->in_front($id);
99 78 50       261 push (@bids, $id) if $self->in_back($id);
100             }
101 72         245 return (\@fids, \@bids);
102             }
103              
104             sub _doboth
105             {
106 66     66   171 my ($self, $ids, $do_front, $do_back, $callback) = @_;
107 66         195 my ($fids, $bids) = $self->_split_ids($ids);
108              
109 66 50 33     315 if (@$fids && @$bids)
    50          
    50          
110             {
111 0     0   0 $do_front->($fids, sub {$do_back->($bids, $callback)});
  0         0  
112             }
113             elsif(@$fids)
114             {
115 0         0 $do_front->($fids, $callback);
116             }
117             elsif(@$bids)
118             {
119 66         175 $do_back->($bids, $callback);
120             }
121             else
122             {
123 0         0 goto $callback;
124             }
125             }
126              
127             sub remove
128             {
129 63     63 0 170 my ($self, $aref, $callback) = @_;
130             $self->_doboth(
131             $aref,
132             sub {
133 0     0   0 my ($ids, $callback) = @_;
134 0         0 $self->delete_front($ids);
135 0         0 $self->front->remove($ids, $callback);
136             },
137             sub {
138 63     63   132 my ($ids, $callback) = @_;
139 63         2647 $self->back->remove($ids, $callback);
140             },
141 63         466 $callback,
142             );
143             }
144              
145             sub claim
146             {
147 3     3 0 11 my ($self, $aref, $client, $callback) = @_;
148             $self->_doboth(
149             $aref,
150 0     0   0 sub {$self->front->claim($_[0], $client, $_[1])},
151 3     3   119 sub {$self->back ->claim($_[0], $client, $_[1])},
152 3         34 $callback,
153             );
154             }
155              
156             sub get
157             {
158 6     6 0 24 my ($self, $ids, $callback) = @_;
159 6         33 my ($fids, $bids) = $self->_split_ids($ids);
160             $self->front->get($fids, sub {
161 6 50   6   27 goto $callback unless @$bids; # Avoid backstore call
162 6         17 my $got_front = $_[0];
163             $self->back->get($bids, sub {
164 6         15 my $got_back = $_[0];
165 6         18 push(@$got_back, @$got_front);
166 6         44 goto $callback;
167 6         247 });
168 6         251 });
169             }
170              
171             sub get_all
172             {
173 9     9 0 308 my ($self, $callback) = @_;
174 9         23 my %messages; # store in a hash to ensure uniqueness
175             $self->front->get_all(sub {
176 9     9   26 $messages{$_->id} = $_ foreach @{$_[0]};
  9         114  
177             $self->back->get_all(sub {
178 9         20 $messages{$_->id} = $_ foreach @{$_[0]};
  9         217  
179 9         253 @_ = ([values %messages]);
180 9         170 goto $callback;
181 9         364 });
182 9         373 });
183             }
184              
185             sub get_oldest
186             {
187 3     3 0 14546 my ($self, $callback) = @_;
188             $self->front->get_oldest(sub {
189 3     3   9 my $f = $_[0];
190             $self->back->get_oldest(sub {
191 3         7 my $b = $_[0];
192 3 50 66     63 @_ = (
    100 33        
193             ($f && $b) ?
194             ($f->timestamp < $b->timestamp ? $f : $b) :
195             ($f || $b)
196             );
197 3         20 goto $callback;
198 3         98 });
199 3         122 });
200             }
201              
202             sub claim_and_retrieve
203             {
204 1215     1215 0 3053315 my ($self, $destination, $client_id, $callback) = @_;
205              
206             $self->front->claim_and_retrieve($destination, $client_id, sub {
207 1215 100   1215   3481 if (my $msg = $_[0])
208             {
209             # We don't need to claim unless it's in the backstore already
210 115 50       3286 goto $callback unless ($self->in_back($msg->id));
211             $self->back->claim($msg->id, $client_id, sub {
212 0         0 @_ = ($msg);
213 0         0 goto $callback;
214 0         0 });
215             }
216             else
217             {
218             $self->back->claim_and_retrieve($destination, $client_id, sub {
219 1100         2661 my $msg = $_[0];
220 1100 50 66     35752 goto $callback
221             if (not defined $msg or not $self->in_front($msg->id));
222              
223             $self->front->claim($msg->id, $client_id, sub {
224 0           @_ = ($msg);
225 0           goto $callback;
226 0           });
227 1100         37024 });
228             }
229 1215         43045 });
230             }
231              
232             1;
233              
234             __END__
235              
236             =pod
237              
238             =head1 NAME
239              
240             POE::Component::MessageQueue::Storage::Double -- Stores composed of two other
241             stores.
242            
243             =head1 DESCRIPTION
244              
245             Refactor mercilessly, as they say. They also say don't repeat yourself. This
246             module contains functionality for any store that is a composition of two
247             stores. At least Throttled and Complex share this trait, and it doesn't make
248             any sense to duplicate code between them.
249              
250             =head1 CONSTRUCTOR PARAMETERS
251              
252             =over 2
253              
254             =item front => SCALAR
255              
256             =item back => SCALAR
257              
258             Takes a reference to a storage engine to use as the front store / back store.
259              
260             =back
261              
262             =head1 Unimplemented Methods
263              
264             =over 2
265              
266             =item store
267              
268             This isn't implemented because Complex and Throttled differ here. Perhaps
269             your storage differs here as well. This is essentially where you specify
270             policy about what goes in which store. Be sure you update the front_info hash
271             when you store something!
272              
273             =item storage_shutdown
274              
275             And this is where you specify policy about what happens when you die. You
276             lucky person, you.
277              
278             =back
279              
280             =cut