File Coverage

blib/lib/POE/Component/MessageQueue/Storage/Generic/DBI.pm
Criterion Covered Total %
statement 42 142 29.5
branch 4 38 10.5
condition 1 15 6.6
subroutine 7 34 20.5
pod 4 13 30.7
total 58 242 23.9


line stmt bran cond sub pod time code
1             #
2             # Copyright 2007-2010 David Snopek <dsnopek@gmail.com>
3             #
4             # This program is free software: you can redistribute it and/or modify
5             # it under the terms of the GNU General Public License as published by
6             # the Free Software Foundation, either version 2 of the License, or
7             # (at your option) any later version.
8             #
9             # This program is distributed in the hope that it will be useful,
10             # but WITHOUT ANY WARRANTY; without even the implied warranty of
11             # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12             # GNU General Public License for more details.
13             #
14             # You should have received a copy of the GNU General Public License
15             # along with this program. If not, see <http://www.gnu.org/licenses/>.
16             #
17              
18             package POE::Component::MessageQueue::Storage::Generic::DBI;
19 5     5   6913 use Moose;
  5         28  
  5         199  
20              
21             with qw(POE::Component::MessageQueue::Storage::Generic::Base);
22              
23 5     5   44402 use DBI;
  5         12173  
  5         446  
24 5     5   2923 use Exception::Class::DBI;
  5         42418  
  5         151  
25 5     5   1798 use Exception::Class::TryCatch;
  5         1640  
  5         8665  
26              
27 0     0 1 0 sub dsn { return $_[0]->servers->[0]->{dsn}; }
28 0     0 1 0 sub username { return $_[0]->servers->[0]->{username}; }
29 0     0 1 0 sub password { return $_[0]->servers->[0]->{password}; }
30 0     0 1 0 sub options { return $_[0]->servers->[0]->{options}; }
31              
32             has 'servers' => (
33             is => 'ro',
34             isa => 'ArrayRef[HashRef]',
35             required => 1,
36             default => sub { return [] },
37             );
38              
39             has 'mq_id' => (
40             is => 'ro',
41             isa => 'Str',
42             );
43              
44             has 'dbh' => (
45             is => 'ro',
46             isa => 'Object',
47             writer => '_dbh',
48             lazy => 1,
49             builder => '_build_dbh',
50             init_arg => undef,
51             );
52              
53             has 'cur_server' => (
54             is => 'ro',
55             isa => 'Int',
56             writer => '_cur_server',
57             default => sub { return -1 },
58             init_arg => undef,
59             );
60              
61             has max_retries => (
62             is => 'ro',
63             isa => 'Int',
64             default => 10,
65             );
66              
67             # NOT async!
68             sub _clear_claims {
69 7     7   27 my ($self) = @_;
70            
71             # Clear all this servers claims
72 7         29 my $sql = "UPDATE messages SET in_use_by = NULL";
73 7         305 my $mq_id = $self->mq_id;
74 7 50 33     95 if (defined $mq_id and $mq_id ne '') {
75 0         0 $sql .= " WHERE in_use_by LIKE '$mq_id:%'";
76             }
77              
78 7         266 $self->dbh->do($sql);
79             }
80              
81             around BUILDARGS => sub
82             {
83             my ($orig, $class) = @_;
84             my %args = @_;
85              
86             if (!defined($args{servers})) {
87             $args{servers} = [{
88             dsn => $args{dsn},
89             username => $args{username},
90             password => $args{password},
91             options => $args{options} || {},
92             }];
93             }
94              
95             return $class->$orig(%args);
96             };
97              
98             sub BUILD
99             {
100 7     7 0 12177 my ($self, $args) = @_;
101              
102 7         37 foreach my $server (@{$self->servers}) {
  7         299  
103 7 50       44 if (!defined $server->{options}) {
104 0         0 $server->{options} = {};
105             }
106              
107             # Force exception handling
108             $server->{options}->{'HandleError'} = Exception::Class::DBI->handler,
109 7         174 $server->{options}->{'PrintError'} = 0;
110 7         746 $server->{options}->{'RaiseError'} = 0;
111             }
112              
113             # This actually makes DBH connect
114 7         57 $self->_clear_claims();
115             }
116              
117             sub _build_dbh
118             {
119 7     7   31 my ($self) = @_;
120              
121 7         311 my $i = $self->cur_server + 1;
122 7         27 my $count = scalar @{$self->servers};
  7         309  
123 7         47 my @servers = map { [$_, $self->servers->[$_]] } (0 .. $count-1);
  7         242  
124 7         23 my $dbh;
125              
126             # re-arrange the server list, so that it starts on $i
127 7         40 @servers = (@servers[$i .. $count-1], @servers[0 .. $i-1]);
128              
129 7         14 while (1) {
130 7         48 foreach my $spec ( @servers ) {
131 7         21 my ($id, $server) = @$spec;
132              
133 7         79 $self->log(info => "Connecting to DB: $server->{dsn}");
134 7         18 try eval {
135 7         209 $dbh = DBI->connect($server->{dsn}, $server->{username}, $server->{password}, $server->{options});
136             };
137 7 50       6207 if (my $err = catch) {
138 0         0 $self->log(error => "Unable to connect to DB ($server->{dsn}): $err");
139 0         0 $dbh = undef;
140             }
141              
142 7 50       202 if (defined $dbh) {
143 7         359 $self->_cur_server($id);
144 7         240 return $dbh;
145             }
146             }
147              
148 0 0         if ($self->cur_server == -1) {
149             # if this is our first connection on MQ startup, we should fail loudly..
150 0           $self->log(error => "Unable to connect to database.");
151 0           die "Unable to connect to database.";
152             }
153              
154             # after trying them all we sleep for 1 second, so that we don't hot-loop and
155             # the system has a chance to get back up.
156 0           $self->log(error => "Unable to connect to any DB servers. Waiting 1 second and then retrying...");
157             # this is OK because we are in PoCo::Generic
158 0           sleep 1;
159             }
160             }
161              
162             sub _wrap {
163 0     0     my ($self, $name, $action) = @_;
164 0           my $trying = 1;
165 0           my $max_retries = $self->max_retries;
166              
167 0           while ($trying++) {
168 0 0         if ($trying >= $max_retries) {
169 0           $self->log(error =>
170             "Giving up on $name() after trying $max_retries times");
171 0           return 0;
172             }
173 0           try eval {
174 0           $action->();
175             # it was a success, so no need to try any more
176 0           $trying = 0;
177             };
178 0 0         if (my $err = catch)
179             {
180 0           $self->log(error => "Error in $name(): $err");
181 0           $self->log(error => "Going to reconnect to DB to try again...");
182 0           $self->_dbh($self->_build_dbh());
183             }
184             }
185              
186 0           return 1;
187             }
188              
189             sub _make_where
190             {
191 0     0     my $ids = shift;
192 0           return join(' OR ', map "message_id = '$_'", @$ids);
193             }
194              
195             sub _wrap_ids
196             {
197 0     0     my ($self, $ids, $name, $action) = @_;
198 0 0   0     $self->_wrap(name => sub {$action->(_make_where($ids))}) if (@$ids > 0);
  0            
199             }
200              
201             sub _make_message {
202 0     0     my ($self, $h) = @_;
203 0           my %map = (
204             id => 'message_id',
205             destination => 'destination',
206             body => 'body',
207             persistent => 'persistent',
208             claimant => 'in_use_by',
209             size => 'size',
210             timestamp => 'timestamp',
211             deliver_at => 'deliver_at',
212             );
213 0           my %args;
214 0           foreach my $field (keys %map)
215             {
216 0           my $val = $h->{$map{$field}};
217 0 0         $args{$field} = $val if (defined $val);
218             }
219             # pull only the client ID out of the in_use_by field
220 0           my $mq_id = $self->mq_id;
221 0 0 0       if (defined $mq_id and $mq_id ne '' and defined $args{claimant}) {
      0        
222 0           $args{claimant} =~ s/^$mq_id://;
223             }
224 0           return POE::Component::MessageQueue::Message->new(%args);
225             };
226              
227             sub _in_use_by {
228 0     0     my ($self, $client_id) = @_;
229 0 0 0       if (defined $client_id and defined $self->mq_id and $self->mq_id ne '') {
      0        
230 0           return $self->mq_id .":". $client_id;
231             }
232 0           return $client_id;
233             }
234              
235             # Note: We explicitly set @_ in all the storage methods in this module,
236             # because when we do our tail-calls (goto $method), we don't want to pass them
237             # anything unneccessary, particulary $callbacks.
238              
239             sub store {
240 0     0 0   my ($self, $m, $callback) = @_;
241              
242             $self->_wrap(store => sub {
243 0     0     my $sth = $self->dbh->prepare(q{
244             INSERT INTO messages (
245             message_id, destination, body,
246             persistent, in_use_by,
247             timestamp, size,
248             deliver_at
249             ) VALUES (
250             ?, ?, ?,
251             ?, ?,
252             ?, ?,
253             ?
254             )
255             });
256 0           $sth->execute(
257             $m->id, $m->destination, $m->body,
258             $m->persistent, $self->_in_use_by($m->claimant),
259             $m->timestamp, $m->size,
260             $m->deliver_at
261             );
262 0 0         }) or $self->log(error => sprintf
263             "Could not store message '%s' to queue %s", $m->body, $m->destination);
264              
265 0           @_ = ();
266 0 0         goto $callback if $callback;
267             }
268              
269             sub _get
270             {
271 0     0     my ($self, $name, $clause, $callback) = @_;
272 0           my @messages;
273             $self->_wrap($name => sub {
274 0     0     my $sth = $self->dbh->prepare("SELECT * FROM messages $clause");
275 0           $sth->execute;
276 0           my $results = $sth->fetchall_arrayref({});
277 0           @messages = map $self->_make_message($_), @$results;
278 0           });
279 0           @_ = (\@messages);
280 0           goto $callback;
281             }
282              
283             sub _get_one
284             {
285 0     0     my ($self, $name, $clause, $callback) = @_;
286             $self->_get($name, $clause, sub {
287 0     0     my $messages = $_[0];
288 0 0         @_ = (@$messages > 0 ? $messages->[0] : undef);
289 0           goto $callback;
290 0           });
291             }
292              
293             sub get
294             {
295             my ($self, $message_ids, $callback) = @_;
296             $self->_get(get => 'WHERE '._make_where($message_ids), $callback);
297             }
298              
299             sub get_all
300             {
301 0     0 0   my ($self, $callback) = @_;
302 0           $self->_get(get_all => '', $callback);
303             }
304              
305             sub get_oldest
306             {
307 0     0 0   my ($self, $callback) = @_;
308 0           $self->_get_one(get_oldest => 'ORDER BY timestamp ASC LIMIT 1', $callback);
309             }
310              
311             sub claim_and_retrieve
312             {
313 0     0 0   my ($self, $destination, $client_id, $callback) = @_;
314 0           my $time = time();
315             $self->_get_one(claim_and_retrieve => qq{
316             WHERE destination = '$destination' AND in_use_by IS NULL AND
317             (deliver_at IS NULL OR deliver_at < $time)
318             ORDER BY timestamp ASC LIMIT 1
319             }, sub {
320 0 0   0     if(my $message = $_[0])
321             {
322 0           $self->claim($message->id, $client_id)
323             }
324 0           goto $callback;
325 0           });
326             }
327              
328             sub remove
329             {
330             my ($self, $message_ids, $callback) = @_;
331             $self->_wrap_ids($message_ids, remove => sub {
332             my $where = shift;
333             $self->dbh->do("DELETE FROM messages WHERE $where");
334             });
335             @_ = ();
336             goto $callback if $callback;
337             }
338              
339             sub empty
340             {
341 0     0 0   my ($self, $callback) = @_;
342 0     0     $self->_wrap(empty => sub {$self->dbh->do("DELETE FROM messages")});
  0            
343 0           @_ = ();
344 0 0         goto $callback if $callback;
345             }
346              
347             sub claim
348             {
349             my ($self, $message_ids, $client_id, $callback) = @_;
350             my $in_use_by = $self->_in_use_by($client_id);
351             $self->_wrap_ids($message_ids, claim => sub {
352             my $where = shift;
353             $self->dbh->do(qq{
354             UPDATE messages SET in_use_by = '$in_use_by' WHERE $where
355             });
356             });
357             @_ = ();
358             goto $callback if $callback;
359             }
360              
361             sub disown_destination
362             {
363 0     0 0   my ($self, $destination, $client_id, $callback) = @_;
364 0           my $in_use_by = $self->_in_use_by($client_id);
365             $self->_wrap(disown_destination => sub {
366 0     0     $self->dbh->do(qq{
367             UPDATE messages SET in_use_by = NULL WHERE in_use_by = '$in_use_by'
368             AND destination = '$destination'
369             });
370 0           });
371 0           @_ = ();
372 0 0         goto $callback if $callback;
373             }
374              
375             sub disown_all
376             {
377 0     0 0   my ($self, $client_id, $callback) = @_;
378 0           my $in_use_by = $self->_in_use_by($client_id);
379             $self->_wrap(disown_all => sub {
380 0     0     $self->dbh->do(qq{
381             UPDATE messages SET in_use_by = NULL WHERE in_use_by = '$in_use_by'
382             });
383 0           });
384 0           @_ = ();
385 0 0         goto $callback if $callback;
386             }
387              
388             sub storage_shutdown
389             {
390 0     0 0   my ($self, $callback) = @_;
391              
392 0           $self->log(alert => 'Shutting down DBI storage engine...');
393              
394 0           $self->_clear_claims();
395 0           $self->dbh->disconnect();
396 0           @_ = ();
397 0 0         goto $callback if $callback;
398             }
399              
400             1;
401              
402             __END__
403              
404             =pod
405              
406             =head1 NAME
407              
408             POE::Component::MessageQueue::Storage::Generic::DBI -- A storage engine that uses L<DBI>
409              
410             =head1 SYNOPSIS
411              
412             use POE;
413             use POE::Component::MessageQueue;
414             use POE::Component::MessageQueue::Storage::Generic;
415             use POE::Component::MessageQueue::Storage::Generic::DBI;
416             use strict;
417              
418             # For mysql:
419             my $DB_DSN = 'DBI:mysql:database=perl_mq';
420             my $DB_USERNAME = 'perl_mq';
421             my $DB_PASSWORD = 'perl_mq';
422             my $DB_OPTIONS = undef;
423              
424             POE::Component::MessageQueue->new({
425             storage => POE::Component::MessageQueue::Storage::Generic->new({
426             package => 'POE::Component::MessageQueue::Storage::DBI',
427             options => [{
428             # if there is only one DB server
429             dsn => $DB_DSN,
430             username => $DB_USERNAME,
431             password => $DB_PASSWORD,
432             options => $DB_OPTIONS,
433              
434             # OR, if you have multiple database servers and want to failover
435             # when one goes down.
436              
437             #servers => [
438             # {
439             # dsn => $DB_SERVER1_DSN,
440             # username => $DB_SERVER1_USERNAME,
441             # password => $DB_SERVER1_PASSWORD,
442             # options => $DB_SERVER1_OPTIONS
443             # },
444             # {
445             # dsn => $DB_SERVER2_DSN,
446             # username => $DB_SERVER2_USERNAME,
447             # password => $DB_SERVER2_PASSWORD,
448             # options => $DB_SERVER2_OPTIONS
449             # },
450             #],
451             }],
452             })
453             });
454              
455             POE::Kernel->run();
456             exit;
457              
458             =head1 DESCRIPTION
459              
460             A storage engine that uses L<DBI>. All messages stored with this backend are
461             persistent.
462              
463             This module is not itself asynchronous and must be run via
464             L<POE::Component::MessageQueue::Storage::Generic> as shown above.
465              
466             Rather than using this module "directly" [1], I would suggest wrapping it inside of
467             L<POE::Component::MessageQueue::Storage::FileSystem>, to keep the message bodys on
468             the filesystem, or L<POE::Component::MessageQueue::Storage::Complex>, which is the
469             overall recommended storage engine.
470              
471             If you are only going to deal with very small messages then, possibly, you could
472             safely keep the message body in the database. However, this is still not really
473             recommended for a couple of reasons:
474              
475             =over 4
476              
477             =item *
478              
479             All database access is conducted through L<POE::Component::Generic> which maintains
480             a single forked process to do database access. So, not only must the message body be
481             communicated to this other proccess via a pipe, but only one database operation can
482             happen at once. The best performance can be achieved by having this forked process
483             do as little as possible.
484              
485             =item *
486              
487             A number of databases have hard limits on the amount of data that can be stored in
488             a BLOB (namely, SQLite, which sets an artificially lower limit than it is actually
489             capable of).
490              
491             =item *
492              
493             Keeping large amounts of BLOB data in a database is bad form anyway! Let the database do what
494             it does best: index and look-up information quickly.
495              
496             =back
497              
498             =head1 CONSTRUCTOR PARAMETERS
499              
500             =over 2
501              
502             =item dsn => SCALAR
503              
504             =item username => SCALAR
505              
506             =item password => SCALAR
507              
508             =item options => SCALAR
509              
510             =item servers => ARRAYREF
511              
512             An ARRAYREF of HASHREFs containing dsn, username, password and options. Use this when you
513             have serveral DB servers and want Storage::DBI to failover when one goes down.
514              
515             =item mq_id => SCALAR
516              
517             A string which uniquely identifies this MQ. This is required when running two MQs which
518             use the same database. If they don't have unique mq_id values, than one MQ could inadvertently
519             clear the claims set by the other, causing messages to be delivered more than once.
520              
521             =back
522              
523             =head1 SUPPORTED STOMP HEADERS
524              
525             =over 4
526              
527             =item B<persistent>
528              
529             I<Ignored>. All messages are persisted.
530              
531             =item B<expire-after>
532              
533             I<Ignored>. All messages are kept until handled.
534              
535             =item B<deliver-after>
536              
537             I<Fully Supported>.
538              
539             =back
540              
541             =head1 FOOTNOTES
542              
543             =over 4
544              
545             =item [1]
546              
547             By "directly", I still mean inside of L<POE::Component::MessageQueue::Storage::Generic> because
548             that is the only way to use this module.
549              
550             =back
551              
552             =head1 SEE ALSO
553              
554             L<POE::Component::MessageQueue>,
555             L<POE::Component::MessageQueue::Storage>,
556             L<DBI>
557              
558             I<Other storage engines:>
559              
560             L<POE::Component::MessageQueue::Storage::Memory>,
561             L<POE::Component::MessageQueue::Storage::BigMemory>,
562             L<POE::Component::MessageQueue::Storage::DBI>,
563             L<POE::Component::MessageQueue::Storage::FileSystem>,
564             L<POE::Component::MessageQueue::Storage::Generic>,
565             L<POE::Component::MessageQueue::Storage::Throttled>,
566             L<POE::Component::MessageQueue::Storage::Complex>,
567             L<POE::Component::MessageQueue::Storage::Default>
568              
569             =cut
570