File Coverage

blib/lib/POE/Component/MessageQueue/Storage.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             #
2             # Copyright 2007-2010 David Snopek <dsnopek@gmail.com>
3             #
4             # This program is free software: you can redistribute it and/or modify
5             # it under the terms of the GNU General Public License as published by
6             # the Free Software Foundation, either version 2 of the License, or
7             # (at your option) any later version.
8             #
9             # This program is distributed in the hope that it will be useful,
10             # but WITHOUT ANY WARRANTY; without even the implied warranty of
11             # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12             # GNU General Public License for more details.
13             #
14             # You should have received a copy of the GNU General Public License
15             # along with this program. If not, see <http://www.gnu.org/licenses/>.
16             #
17              
18             package POE::Component::MessageQueue::Storage;
19 1     1   2476 use Moose::Role;
  0            
  0            
20             use POE::Component::MessageQueue::Logger;
21              
22             requires qw(
23             get get_all
24             get_oldest claim_and_retrieve
25             claim store
26             disown_all disown_destination
27             empty remove
28             storage_shutdown
29             );
30              
31             # Given a method name, makes its first argument OPTIONALLY be an aref. If it
32             # is not an aref, it normalizes it into one. If mangle_callback is true, it
33             # also unpacks it in the callback.
34             sub _areffify
35             {
36             my ($method, $mangle_callback) = @_;
37              
38             around($method => sub {
39             my @args = @_;
40             my $original = shift(@args);
41             my $arg = $args[1];
42             unless (ref $arg eq 'ARRAY')
43             {
44             $args[1] = [$arg];
45             if ($mangle_callback)
46             {
47             my $cb = $args[-1];
48             $args[-1] = sub {
49             my @response = @_;
50             my $arr = $response[0];
51             $response[0] = (@$arr > 0) ? $arr->[0] : undef;
52             @_ = @response;
53             goto $cb;
54             }
55             };
56             }
57             @_ = @args;
58             goto $original;
59             });
60             }
61              
62             _areffify($_, 1) foreach qw(get);
63             _areffify($_, 0) foreach qw(claim remove);
64              
65             has 'names' => (
66             is => 'rw',
67             isa => 'ArrayRef',
68             writer => 'set_names',
69             default => sub { [] },
70             );
71              
72             has 'namestr' => (
73             is => 'rw',
74             isa => 'Str',
75             default => q{},
76             );
77              
78             has 'children' => (
79             is => 'rw',
80             isa => 'HashRef',
81             default => sub { {} },
82             );
83              
84             has 'logger' => (
85             is => 'rw',
86             writer => 'set_logger',
87             default => sub { POE::Component::MessageQueue::Logger->new() },
88             );
89              
90             sub add_names
91             {
92             my ($self, @names) = @_;
93             my @prev_names = @{$self->names};
94             push(@prev_names, @names);
95             $self->set_names(\@prev_names);
96             }
97              
98             after 'set_names' => sub {
99             my ($self, $names) = @_;
100             while (my ($name, $store) = each %{$self->children})
101             {
102             $store->set_names([@$names, $name]);
103             }
104             $self->namestr(join(': ', @$names));
105             };
106              
107             sub log
108             {
109             my ($self, $type, $msg, @rest) = @_;
110             my $namestr = $self->namestr;
111             return $self->logger->log($type, "STORE: $namestr: $msg", @rest);
112             }
113              
114             1;
115              
116             __END__
117              
118             =pod
119              
120             =head1 NAME
121              
122             POE::Component::MessageQueue::Storage -- Parent of provided storage engines
123              
124             =head1 DESCRIPTION
125              
126             The role implemented by all storage engines. It provides a few bits of global
127             functionality, but mostly exists to define the interface for storage engines.
128              
129             =head1 CONCEPTS
130              
131             =over 2
132              
133             =item optional arefs
134              
135             Some functions take an "optional aref" as an argument. What this means is
136             that you can pass either a plain-old-scalar argument (such as a message id) or
137             an arrayref of such objects. If you pass the former, your callback (if any)
138             will receive a single value. If the latter, it will receive an arrayref.
139             Note that the normalization is done by this role - storage engines need only
140             implement the version that takes an aref, and send arefs to the callbacks.
141              
142             =item callbacks
143              
144             Every storage method has a callback as its last argument. Callbacks are Plain
145             Old Subs. If the method doesn't have some kind of return value, the callback is
146             optional and has no arguments. It's simply called so you you know the method
147             is done. If the method does have some kind of return value, the
148             callback is not optional and the argument will be said value. Return values
149             of storage functions are not significant and should never be used. Unless
150             otherwise specified, assume the functions below have plain success callbacks.
151              
152             =back
153              
154             =head1 INTERFACE
155              
156             =over 2
157              
158             =item set_logger I<SCALAR>
159              
160             Takes an object of type L<POE::Component::MessageQueue::Logger> that should be
161             used for logging. This isn't a storage method and does not have any callback
162             associated with it.
163              
164             =item store I<Message>
165              
166             Takes one or more objects of type L<POE::Component::MessageQueue::Message>
167             that should be stored.
168              
169             =item get I<optional-aref>
170              
171             Passes the message(s) specified by the passed id(s) to the callback.
172              
173             =item get_all
174              
175             =item get_oldest
176              
177             Self-explanatory.
178              
179             =item remove I<optional-aref>
180              
181             Removes the message(s) specified by the passed id(s).
182              
183             =item empty
184              
185             Deletes all messages from the storage engine.
186              
187             =item claim I<optional-aref>, I<client-id>
188              
189             Naively claims the specified messages for the specified client, even if they
190             are already claimed. This is intended to be called by stores that wrap other
191             stores to maintain synchronicity between multiple message copies - non-store
192             clients usually want claim_and_retrieve.
193              
194             =item claim_and_retrieve I<destination>, I<client-id>
195              
196             Claims the "next" message intended for I<destination> for I<client-id> and
197             passes it to the supplied callback. Storage engines are free to define what
198             "next" means, but the intended meaning is "oldest unclaimed message for this
199             destination".
200              
201             =item disown_all I<client-id>
202              
203             Disowns all messages owned by the client.
204              
205             =item disown_destination I<destination>, I<client-id>
206              
207             Disowns the message owned by the specified client on the specified
208             destination. (This should only be one message).
209              
210             =item storage_shutdown
211              
212             Starts shutting down the storage engine. The storage engine will
213             attempt to do any cleanup (persisting of messages, etc) before calling the
214             callback.
215              
216             =back
217              
218             =head1 SEE ALSO
219              
220             L<POE::Component::MessageQueue>,
221             L<POE::Component::MessageQueue::Storage::BigMemory>,
222             L<POE::Component::MessageQueue::Storage::Memory>,
223             L<POE::Component::MessageQueue::Storage::DBI>,
224             L<POE::Component::MessageQueue::Storage::FileSystem>,
225             L<POE::Component::MessageQueue::Storage::Generic>,
226             L<POE::Component::MessageQueue::Storage::Generic::DBI>,
227             L<POE::Component::MessageQueue::Storage::Double>,
228             L<POE::Component::MessageQueue::Storage::Throttled>,
229             L<POE::Component::MessageQueue::Storage::Complex>,
230             L<POE::Component::MessageQueue::Storage::Default>
231              
232             =cut