File Coverage

blib/lib/POE/Component/MessageQueue/Storage/Generic.pm
Criterion Covered Total %
statement 41 50 82.0
branch 1 4 25.0
condition 0 6 0.0
subroutine 19 20 95.0
pod 0 2 0.0
total 61 82 74.3


line stmt bran cond sub pod time code
1             #
2             # Copyright 2007-2010 David Snopek <dsnopek@gmail.com>
3             #
4             # This program is free software: you can redistribute it and/or modify
5             # it under the terms of the GNU General Public License as published by
6             # the Free Software Foundation, either version 2 of the License, or
7             # (at your option) any later version.
8             #
9             # This program is distributed in the hope that it will be useful,
10             # but WITHOUT ANY WARRANTY; without even the implied warranty of
11             # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12             # GNU General Public License for more details.
13             #
14             # You should have received a copy of the GNU General Public License
15             # along with this program. If not, see <http://www.gnu.org/licenses/>.
16             #
17              
18             package POE::Component::MessageQueue::Storage::Generic;
19 11     11   6266 use Moose;
  11         42  
  11         70  
20 11     11   65972 use POE;
  11         29  
  11         88  
21 11     11   10143 use POE::Component::Generic 0.1001;
  11         280917  
  11         326  
22 11     11   100 use POE::Component::MessageQueue::Logger;
  11         27  
  11         2516  
23              
24             # We're going to proxy some methods to the generic object. Yay MOP!
25             my @proxy_methods = qw(
26             get get_all
27             get_oldest claim_and_retrieve
28             claim empty
29             remove store
30             disown_all disown_destination
31             );
32             foreach my $method (@proxy_methods)
33             {
34             __PACKAGE__->meta->add_method($method, sub {
35 4302     4302   8862176 my ($self, @args) = @_;
        4302      
        4302      
        4302      
        4302      
        4302      
        4302      
36 4302         170922 $self->generic->yield(
37             $method,
38             {session => $self->alias, event => '_general_handler'},
39             @args,
40             );
41 4302         572405 return;
42             });
43             }
44              
45             # Have to do with after we add those methods, or the role will fail.
46             with qw(POE::Component::MessageQueue::Storage);
47              
48             has alias => (
49             is => 'ro',
50             isa => 'Str',
51             default => 'MQ-Storage-Generic',
52             required => 1,
53             );
54              
55             has generic => (
56             is => 'rw',
57             isa => 'POE::Component::Generic',
58             );
59              
60             has package => (
61             is => 'ro',
62             isa => 'Str',
63             required => 1,
64             );
65              
66             has options => (
67             is => 'rw',
68             isa => 'ArrayRef',
69             default => sub { [] },
70             );
71              
72             # Because PoCo::Generic needs the constructor options passed to it in this
73             # funny way, we have to set up generic in BUILD.
74             sub BUILD
75             {
76 7     7 0 14003 my $self = $_[0];
77              
78 7         185 POE::Session->create(
79             object_states => [
80             $self => [qw(_general_handler _log_proxy _error _start _shutdown)],
81             ],
82             );
83              
84 7         2399 $self->generic(POE::Component::Generic->spawn(
85             package => $self->package,
86             object_options => $self->options,
87             packages => {
88             $self->package, {
89             callbacks => [@proxy_methods, qw(storage_shutdown)],
90             postbacks => [qw(set_log_function)],
91             },
92             },
93             error => {
94             session => $self->alias,
95             event => '_error'
96             },
97             #debug => 1,
98             #verbose => 1,
99             ));
100              
101 7         769 $self->generic->set_log_function({}, {
102             session => $self->alias,
103             event => '_log_proxy'
104             });
105              
106 11     11   1042 use POE::Component::MessageQueue;
  11         25  
  11         3827  
107 7         4028 $self->generic->ignore_signals({},
108             POE::Component::MessageQueue->SHUTDOWN_SIGNALS);
109             };
110              
111             sub _start
112             {
113 7     7   2944 my ($self, $kernel) = @_[OBJECT, KERNEL];
114 7         342 $kernel->alias_set($self->alias);
115             }
116              
117             sub _shutdown
118             {
119 7     7   1911 my ($self, $kernel, $callback) = @_[OBJECT, KERNEL, ARG0];
120 7         314 $self->generic->shutdown();
121 7         17048 $kernel->alias_remove($self->alias);
122 7         642 $self->log('alert', 'Generic storage engine is shutdown!');
123 7         58 goto $callback;
124             }
125              
126             sub storage_shutdown
127             {
128 7     7 0 13496 my ($self, $complete) = @_;
129 7         102 $self->log('alert', 'Shutting down generic storage engine...');
130              
131             # Send the shutdown message to generic - it will come back when it's cleaned
132             # up its resources, and we can stop it for reals (as well as stop our own
133             # session).
134             $self->generic->yield('storage_shutdown', {}, sub {
135 7     7   21922 $poe_kernel->post($self->alias, '_shutdown', $complete);
136 7         255 });
137              
138 7         1112 return;
139             }
140              
141             sub _general_handler
142             {
143 4302     4302   3000584 my ($self, $kernel, $ref, $result) = @_[ OBJECT, KERNEL, ARG0, ARG1 ];
144              
145 4302 50       17564 if ( $ref->{error} )
146             {
147 0         0 $self->log('error', "Generic error: $ref->{error}");
148             }
149 4302         14151 return;
150             }
151              
152             sub _error
153             {
154 0     0   0 my ( $self, $err ) = @_[ OBJECT, ARG0 ];
155              
156 0 0       0 if ( $err->{stderr} )
157             {
158 0         0 $self->log('debug', $err->{stderr});
159             }
160             else
161             {
162 0   0     0 my $op = $err->{operation} || q{};
163 0   0     0 my $num = $err->{errnum} || q{};
164 0   0     0 my $str = $err->{errstr} || q{};
165 0         0 $self->log('error', "Generic error: $op $num $str");
166             }
167 0         0 return;
168             }
169              
170             sub _log_proxy
171             {
172 7     7   8346 my ($self, $type, $msg) = @_[ OBJECT, ARG0, ARG1 ];
173              
174 7         54 $self->log($type, $msg);
175 7         58 return;
176             }
177              
178             1;
179              
180             __END__
181              
182             =pod
183              
184             =head1 NAME
185              
186             POE::Component::MessageQueue::Storage::Generic -- Wraps storage engines that aren't asynchronous via L<POE::Component::Generic> so they can be used.
187              
188             =head1 SYNOPSIS
189              
190             use POE;
191             use POE::Component::MessageQueue;
192             use POE::Component::MessageQueue::Storage::Generic;
193             use POE::Component::MessageQueue::Storage::Generic::DBI;
194             use strict;
195              
196             # For mysql:
197             my $DB_DSN = 'DBI:mysql:database=perl_mq';
198             my $DB_USERNAME = 'perl_mq';
199             my $DB_PASSWORD = 'perl_mq';
200             my $DB_OPTIONS = undef;
201              
202             POE::Component::MessageQueue->new({
203             storage => POE::Component::MessageQueue::Storage::Generic->new({
204             package => 'POE::Component::MessageQueue::Storage::DBI',
205             options => [
206             dsn => $DB_DSN,
207             username => $DB_USERNAME,
208             password => $DB_PASSWORD,
209             options => $DB_OPTIONS
210             ],
211             })
212             });
213              
214             POE::Kernel->run();
215             exit;
216              
217             =head1 DESCRIPTION
218              
219             Wraps storage engines that aren't asynchronous via L<POE::Component::Generic> so they can be used.
220              
221             Using this module is by far the easiest way to write custom storage engines because you don't have to worry about making your operations asynchronous. This approach isn't without its down-sides, but on the whole, the simplicity is worth it.
222              
223             There is only one package currently provided designed to work with this module: L<POE::Component::MessageQueue::Storage::Generic::DBI>.
224              
225             =head1 ATTRIBUTES
226              
227             =over 2
228              
229             =item package_name
230              
231             The name of the package to wrap. Required.
232              
233             =item options
234              
235             An arrayref of the options to be passed to the supplied package's constructor.
236              
237             =back
238              
239             =head1 SEE ALSO
240              
241             L<POE::Component::MessageQueue>,
242             L<POE::Component::MessageQueue::Storage>,
243             L<POE::Component::Generic>
244              
245             I<Other storage engines:>
246              
247             L<POE::Component::MessageQueue::Storage::Memory>,
248             L<POE::Component::MessageQueue::Storage::BigMemory>,
249             L<POE::Component::MessageQueue::Storage::FileSystem>,
250             L<POE::Component::MessageQueue::Storage::DBI>,
251             L<POE::Component::MessageQueue::Storage::Generic::DBI>,
252             L<POE::Component::MessageQueue::Storage::Throttled>,
253             L<POE::Component::MessageQueue::Storage::Complex>,
254             L<POE::Component::MessageQueue::Storage::Default>
255              
256             =cut
257