File Coverage

blib/lib/POE/Component/MessageQueue/Storage/Generic/DBI.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             #
2             # Copyright 2007-2010 David Snopek <dsnopek@gmail.com>
3             #
4             # This program is free software: you can redistribute it and/or modify
5             # it under the terms of the GNU General Public License as published by
6             # the Free Software Foundation, either version 2 of the License, or
7             # (at your option) any later version.
8             #
9             # This program is distributed in the hope that it will be useful,
10             # but WITHOUT ANY WARRANTY; without even the implied warranty of
11             # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12             # GNU General Public License for more details.
13             #
14             # You should have received a copy of the GNU General Public License
15             # along with this program. If not, see <http://www.gnu.org/licenses/>.
16             #
17              
18             package POE::Component::MessageQueue::Storage::Generic::DBI;
19 1     1   2450 use Moose;
  0            
  0            
20              
21             with qw(POE::Component::MessageQueue::Storage::Generic::Base);
22              
23             use DBI;
24             use Exception::Class::DBI;
25             use Exception::Class::TryCatch;
26              
27             sub dsn { return $_[0]->servers->[0]->{dsn}; }
28             sub username { return $_[0]->servers->[0]->{username}; }
29             sub password { return $_[0]->servers->[0]->{password}; }
30             sub options { return $_[0]->servers->[0]->{options}; }
31              
32             has 'servers' => (
33             is => 'ro',
34             isa => 'ArrayRef[HashRef]',
35             required => 1,
36             default => sub { return [] },
37             );
38              
39             has 'mq_id' => (
40             is => 'ro',
41             isa => 'Str',
42             );
43              
44             has 'dbh' => (
45             is => 'ro',
46             isa => 'Object',
47             writer => '_dbh',
48             lazy => 1,
49             builder => '_connect',
50             init_arg => undef,
51             );
52              
53             has 'cur_server' => (
54             is => 'ro',
55             isa => 'Int',
56             writer => '_cur_server',
57             default => sub { return -1 },
58             init_arg => undef,
59             );
60              
61             has max_retries => (
62             is => 'ro',
63             isa => 'Int',
64             default => 10,
65             );
66              
67             # NOT async!
68             sub _clear_claims {
69             my ($self) = @_;
70            
71             # Clear all this servers claims
72             my $sql = "UPDATE messages SET in_use_by = NULL";
73             my $mq_id = $self->mq_id;
74             if (defined $mq_id and $mq_id ne '') {
75             $sql .= " WHERE in_use_by LIKE '$mq_id:%'";
76             }
77              
78             $self->dbh->do($sql);
79             }
80              
81             around BUILDARGS => sub
82             {
83             my ($orig, $class) = @_;
84             my %args = @_;
85              
86             if (!defined($args{servers})) {
87             $args{servers} = [{
88             dsn => $args{dsn},
89             username => $args{username},
90             password => $args{password},
91             options => $args{options} || {},
92             }];
93             }
94              
95             return $class->$orig(%args);
96             };
97              
98             sub BUILD
99             {
100             my ($self, $args) = @_;
101              
102             foreach my $server (@{$self->servers}) {
103             if (!defined $server->{options}) {
104             $server->{options} = {};
105             }
106              
107             # Force exception handling
108             $server->{options}->{'HandleError'} = Exception::Class::DBI->handler,
109             $server->{options}->{'PrintError'} = 0;
110             $server->{options}->{'RaiseError'} = 0;
111             }
112              
113             # This actually makes DBH connect
114             $self->_clear_claims();
115             }
116              
117             sub _connect
118             {
119             my ($self) = @_;
120              
121             my $i = $self->cur_server + 1;
122             my $count = scalar @{$self->servers};
123             my @servers = map { [$_, $self->servers->[$_]] } (0 .. $count-1);
124             my $dbh;
125              
126             # re-arrange the server list, so that it starts on $i
127             @servers = (@servers[$i .. $count-1], @servers[0 .. $i-1]);
128              
129             while (1) {
130             foreach my $spec ( @servers ) {
131             my ($id, $server) = @$spec;
132              
133             $self->log(info => "Connecting to DB: $server->{dsn}");
134             try eval {
135             $dbh = DBI->connect($server->{dsn}, $server->{username}, $server->{password}, $server->{options});
136             };
137             if (my $err = catch) {
138             $self->log(error => "Unable to connect to DB ($server->{dsn}): $err");
139             $dbh = undef;
140             }
141              
142             if (defined $dbh) {
143             $self->_cur_server($id);
144             return $dbh;
145             }
146             }
147              
148             # if ($self->cur_server == -1) {
149             # # if this is our first connection on MQ startup, we should fail loudly..
150             # $self->log(error => "Unable to connect to database.");
151             # exit 1;
152             # }
153              
154             # after trying them all we sleep for 1 second, so that we don't hot-loop and
155             # the system has a chance to get back up.
156             $self->log(error => "Unable to connect to any DB servers. Waiting 1 second and then retrying...");
157             # this is OK because we are in PoCo::Generic
158             sleep 1;
159             }
160             }
161              
162             sub _wrap {
163             my ($self, $name, $action) = @_;
164             my $trying = 1;
165             my $max_retries = $self->max_retries;
166              
167             while ($trying++) {
168             if ($trying >= $max_retries) {
169             $self->log(error =>
170             "Giving up on $name() after trying $max_retries times");
171             return 0;
172             }
173             try eval {
174             $action->();
175             # it was a success, so no need to try any more
176             $trying = 0;
177             };
178             if (my $err = catch)
179             {
180             $self->log(error => "Error in $name(): $err");
181             $self->log(error => "Going to reconnect to DB to try again...");
182             $self->_dbh($self->_connect());
183             }
184             }
185              
186             return 1;
187             }
188              
189             sub _make_where
190             {
191             my $ids = shift;
192             return join(' OR ', map "message_id = '$_'", @$ids);
193             }
194              
195             sub _wrap_ids
196             {
197             my ($self, $ids, $name, $action) = @_;
198             $self->_wrap(name => sub {$action->(_make_where($ids))}) if (@$ids > 0);
199             }
200              
201             sub _make_message {
202             my ($self, $h) = @_;
203             my %map = (
204             id => 'message_id',
205             destination => 'destination',
206             body => 'body',
207             persistent => 'persistent',
208             claimant => 'in_use_by',
209             size => 'size',
210             timestamp => 'timestamp',
211             deliver_at => 'deliver_at',
212             );
213             my %args;
214             foreach my $field (keys %map)
215             {
216             my $val = $h->{$map{$field}};
217             $args{$field} = $val if (defined $val);
218             }
219             # pull only the client ID out of the in_use_by field
220             my $mq_id = $self->mq_id;
221             if (defined $mq_id and $mq_id ne '' and defined $args{claimant}) {
222             $args{claimant} =~ s/^$mq_id://;
223             }
224             return POE::Component::MessageQueue::Message->new(%args);
225             };
226              
227             sub _in_use_by {
228             my ($self, $client_id) = @_;
229             if (defined $client_id and defined $self->mq_id and $self->mq_id ne '') {
230             return $self->mq_id .":". $client_id;
231             }
232             return $client_id;
233             }
234              
235             # Note: We explicitly set @_ in all the storage methods in this module,
236             # because when we do our tail-calls (goto $method), we don't want to pass them
237             # anything unneccessary, particulary $callbacks.
238              
239             sub store {
240             my ($self, $m, $callback) = @_;
241              
242             $self->_wrap(store => sub {
243             my $sth = $self->dbh->prepare(q{
244             INSERT INTO messages (
245             message_id, destination, body,
246             persistent, in_use_by,
247             timestamp, size,
248             deliver_at
249             ) VALUES (
250             ?, ?, ?,
251             ?, ?,
252             ?, ?,
253             ?
254             )
255             });
256             $sth->execute(
257             $m->id, $m->destination, $m->body,
258             $m->persistent, $self->_in_use_by($m->claimant),
259             $m->timestamp, $m->size,
260             $m->deliver_at
261             );
262             }) or $self->log(error => sprintf
263             "Could not store message '%s' to queue %s", $m->body, $m->destination);
264              
265             @_ = ();
266             goto $callback if $callback;
267             }
268              
269             sub _get
270             {
271             my ($self, $name, $clause, $callback) = @_;
272             my @messages;
273             $self->_wrap($name => sub {
274             my $sth = $self->dbh->prepare("SELECT * FROM messages $clause");
275             $sth->execute;
276             my $results = $sth->fetchall_arrayref({});
277             @messages = map $self->_make_message($_), @$results;
278             });
279             @_ = (\@messages);
280             goto $callback;
281             }
282              
283             sub _get_one
284             {
285             my ($self, $name, $clause, $callback) = @_;
286             $self->_get($name, $clause, sub {
287             my $messages = $_[0];
288             @_ = (@$messages > 0 ? $messages->[0] : undef);
289             goto $callback;
290             });
291             }
292              
293             sub get
294             {
295             my ($self, $message_ids, $callback) = @_;
296             $self->_get(get => 'WHERE '._make_where($message_ids), $callback);
297             }
298              
299             sub get_all
300             {
301             my ($self, $callback) = @_;
302             $self->_get(get_all => '', $callback);
303             }
304              
305             sub get_oldest
306             {
307             my ($self, $callback) = @_;
308             $self->_get_one(get_oldest => 'ORDER BY timestamp ASC LIMIT 1', $callback);
309             }
310              
311             sub claim_and_retrieve
312             {
313             my ($self, $destination, $client_id, $callback) = @_;
314             my $time = time();
315             $self->_get_one(claim_and_retrieve => qq{
316             WHERE destination = '$destination' AND in_use_by IS NULL AND
317             (deliver_at IS NULL OR deliver_at < $time)
318             ORDER BY timestamp ASC LIMIT 1
319             }, sub {
320             if(my $message = $_[0])
321             {
322             $self->claim($message->id, $client_id)
323             }
324             goto $callback;
325             });
326             }
327              
328             sub remove
329             {
330             my ($self, $message_ids, $callback) = @_;
331             $self->_wrap_ids($message_ids, remove => sub {
332             my $where = shift;
333             $self->dbh->do("DELETE FROM messages WHERE $where");
334             });
335             @_ = ();
336             goto $callback if $callback;
337             }
338              
339             sub empty
340             {
341             my ($self, $callback) = @_;
342             $self->_wrap(empty => sub {$self->dbh->do("DELETE FROM messages")});
343             @_ = ();
344             goto $callback if $callback;
345             }
346              
347             sub claim
348             {
349             my ($self, $message_ids, $client_id, $callback) = @_;
350             my $in_use_by = $self->_in_use_by($client_id);
351             $self->_wrap_ids($message_ids, claim => sub {
352             my $where = shift;
353             $self->dbh->do(qq{
354             UPDATE messages SET in_use_by = '$in_use_by' WHERE $where
355             });
356             });
357             @_ = ();
358             goto $callback if $callback;
359             }
360              
361             sub disown_destination
362             {
363             my ($self, $destination, $client_id, $callback) = @_;
364             my $in_use_by = $self->_in_use_by($client_id);
365             $self->_wrap(disown_destination => sub {
366             $self->dbh->do(qq{
367             UPDATE messages SET in_use_by = NULL WHERE in_use_by = '$in_use_by'
368             AND destination = '$destination'
369             });
370             });
371             @_ = ();
372             goto $callback if $callback;
373             }
374              
375             sub disown_all
376             {
377             my ($self, $client_id, $callback) = @_;
378             my $in_use_by = $self->_in_use_by($client_id);
379             $self->_wrap(disown_all => sub {
380             $self->dbh->do(qq{
381             UPDATE messages SET in_use_by = NULL WHERE in_use_by = '$in_use_by'
382             });
383             });
384             @_ = ();
385             goto $callback if $callback;
386             }
387              
388             sub storage_shutdown
389             {
390             my ($self, $callback) = @_;
391              
392             $self->log(alert => 'Shutting down DBI storage engine...');
393              
394             $self->_clear_claims();
395             $self->dbh->disconnect();
396             @_ = ();
397             goto $callback if $callback;
398             }
399              
400             1;
401              
402             __END__
403              
404             =pod
405              
406             =head1 NAME
407              
408             POE::Component::MessageQueue::Storage::Generic::DBI -- A storage engine that uses L<DBI>
409              
410             =head1 SYNOPSIS
411              
412             use POE;
413             use POE::Component::MessageQueue;
414             use POE::Component::MessageQueue::Storage::Generic;
415             use POE::Component::MessageQueue::Storage::Generic::DBI;
416             use strict;
417              
418             # For mysql:
419             my $DB_DSN = 'DBI:mysql:database=perl_mq';
420             my $DB_USERNAME = 'perl_mq';
421             my $DB_PASSWORD = 'perl_mq';
422             my $DB_OPTIONS = undef;
423              
424             POE::Component::MessageQueue->new({
425             storage => POE::Component::MessageQueue::Storage::Generic->new({
426             package => 'POE::Component::MessageQueue::Storage::DBI',
427             options => [{
428             # if there is only one DB server
429             dsn => $DB_DSN,
430             username => $DB_USERNAME,
431             password => $DB_PASSWORD,
432             options => $DB_OPTIONS,
433              
434             # OR, if you have multiple database servers and want to failover
435             # when one goes down.
436              
437             #servers => [
438             # {
439             # dsn => $DB_SERVER1_DSN,
440             # username => $DB_SERVER1_USERNAME,
441             # password => $DB_SERVER1_PASSWORD,
442             # options => $DB_SERVER1_OPTIONS
443             # },
444             # {
445             # dsn => $DB_SERVER2_DSN,
446             # username => $DB_SERVER2_USERNAME,
447             # password => $DB_SERVER2_PASSWORD,
448             # options => $DB_SERVER2_OPTIONS
449             # },
450             #],
451             }],
452             })
453             });
454              
455             POE::Kernel->run();
456             exit;
457              
458             =head1 DESCRIPTION
459              
460             A storage engine that uses L<DBI>. All messages stored with this backend are
461             persistent.
462              
463             This module is not itself asynchronous and must be run via
464             L<POE::Component::MessageQueue::Storage::Generic> as shown above.
465              
466             Rather than using this module "directly" [1], I would suggest wrapping it inside of
467             L<POE::Component::MessageQueue::Storage::FileSystem>, to keep the message bodys on
468             the filesystem, or L<POE::Component::MessageQueue::Storage::Complex>, which is the
469             overall recommended storage engine.
470              
471             If you are only going to deal with very small messages then, possibly, you could
472             safely keep the message body in the database. However, this is still not really
473             recommended for a couple of reasons:
474              
475             =over 4
476              
477             =item *
478              
479             All database access is conducted through L<POE::Component::Generic> which maintains
480             a single forked process to do database access. So, not only must the message body be
481             communicated to this other proccess via a pipe, but only one database operation can
482             happen at once. The best performance can be achieved by having this forked process
483             do as little as possible.
484              
485             =item *
486              
487             A number of databases have hard limits on the amount of data that can be stored in
488             a BLOB (namely, SQLite, which sets an artificially lower limit than it is actually
489             capable of).
490              
491             =item *
492              
493             Keeping large amounts of BLOB data in a database is bad form anyway! Let the database do what
494             it does best: index and look-up information quickly.
495              
496             =back
497              
498             =head1 CONSTRUCTOR PARAMETERS
499              
500             =over 2
501              
502             =item dsn => SCALAR
503              
504             =item username => SCALAR
505              
506             =item password => SCALAR
507              
508             =item options => SCALAR
509              
510             =item servers => ARRAYREF
511              
512             An ARRAYREF of HASHREFs containing dsn, username, password and options. Use this when you
513             have serveral DB servers and want Storage::DBI to failover when one goes down.
514              
515             =item mq_id => SCALAR
516              
517             A string which uniquely identifies this MQ. This is required when running two MQs which
518             use the same database. If they don't have unique mq_id values, than one MQ could inadvertently
519             clear the claims set by the other, causing messages to be delivered more than once.
520              
521             =back
522              
523             =head1 SUPPORTED STOMP HEADERS
524              
525             =over 4
526              
527             =item B<persistent>
528              
529             I<Ignored>. All messages are persisted.
530              
531             =item B<expire-after>
532              
533             I<Ignored>. All messages are kept until handled.
534              
535             =item B<deliver-after>
536              
537             I<Fully Supported>.
538              
539             =back
540              
541             =head1 FOOTNOTES
542              
543             =over 4
544              
545             =item [1]
546              
547             By "directly", I still mean inside of L<POE::Component::MessageQueue::Storage::Generic> because
548             that is the only way to use this module.
549              
550             =back
551              
552             =head1 SEE ALSO
553              
554             L<POE::Component::MessageQueue>,
555             L<POE::Component::MessageQueue::Storage>,
556             L<DBI>
557              
558             I<Other storage engines:>
559              
560             L<POE::Component::MessageQueue::Storage::Memory>,
561             L<POE::Component::MessageQueue::Storage::BigMemory>,
562             L<POE::Component::MessageQueue::Storage::DBI>,
563             L<POE::Component::MessageQueue::Storage::FileSystem>,
564             L<POE::Component::MessageQueue::Storage::Generic>,
565             L<POE::Component::MessageQueue::Storage::Throttled>,
566             L<POE::Component::MessageQueue::Storage::Complex>,
567             L<POE::Component::MessageQueue::Storage::Default>
568              
569             =cut
570