File Coverage

blib/lib/POE/Component/MessageQueue/Storage/Default.pm
Criterion Covered Total %
statement 46 116 39.6
branch 3 18 16.6
condition 0 18 0.0
subroutine 13 27 48.1
pod 0 1 0.0
total 62 180 34.4


line stmt bran cond sub pod time code
1             #
2             # Copyright 2007, 2008 Paul Driver <frodwith@gmail.com>
3             #
4             # This program is free software: you can redistribute it and/or modify
5             # it under the terms of the GNU General Public License as published by
6             # the Free Software Foundation, either version 2 of the License, or
7             # (at your option) any later version.
8             #
9             # This program is distributed in the hope that it will be useful,
10             # but WITHOUT ANY WARRANTY; without even the implied warranty of
11             # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12             # GNU General Public License for more details.
13             #
14             # You should have received a copy of the GNU General Public License
15             # along with this program. If not, see <http://www.gnu.org/licenses/>.
16             #
17              
18             package POE::Component::MessageQueue::Storage::Default;
19              
20             # Not using moose for this cause it's just a frontend to whatever our
21             # recommended storage engine is. There's no point.
22              
23 10     10   226729 use strict;
  10         30  
  10         251  
24 10     10   50 use warnings;
  10         26  
  10         246  
25 10     10   3796 use POE::Component::MessageQueue::Storage::Throttled;
  10         46  
  10         433  
26 10     10   6024 use POE::Component::MessageQueue::Storage::DBI;
  10         43  
  10         381  
27 10     10   6134 use POE::Component::MessageQueue::Storage::FileSystem;
  10         43  
  10         384  
28 10     10   97 use POE::Component::MessageQueue::Storage::BigMemory;
  10         23  
  10         272  
29 10     10   6108 use POE::Component::MessageQueue::Storage::Complex;
  10         48  
  10         448  
30 10     10   14279 use DBI;
  10         130384  
  10         708  
31              
32 10     10   105 use constant META_SCHEMA => <<'EOF';
  10         30  
  10         983  
33             CREATE TABLE meta
34             (
35             key varchar(255) primary key,
36             value varchar(255)
37             );
38             EOF
39              
40 10     10   73 use constant MESSAGES_SCHEMA_018 => <<'EOF';
  10         26  
  10         598  
41             CREATE TABLE messages
42             (
43             message_id varchar(255) primary key,
44             destination varchar(255) not null,
45             persistent char(1) default 'Y' not null,
46             in_use_by int,
47             body text,
48             timestamp int,
49             size int
50             );
51              
52             EOF
53              
54 10     10   63 use constant MESSAGES_SCHEMA => <<'EOF';
  10         24  
  10         9125  
55             CREATE TABLE messages
56             (
57             message_id varchar(255) primary key,
58             destination varchar(255) not null,
59             persistent char(1) default 'Y' not null,
60             in_use_by varchar(255),
61             body text,
62             timestamp decimal(15,5),
63             size int,
64             deliver_at int
65             );
66              
67             CREATE INDEX id_index ON messages ( message_id );
68             CREATE INDEX timestamp_index ON messages ( timestamp );
69             CREATE INDEX destination_index ON messages ( destination );
70             CREATE INDEX in_use_by_index ON messages ( in_use_by );
71             CREATE INDEX deliver_at ON messages ( deliver_at );
72              
73             EOF
74              
75             sub _do_schema
76             {
77 24     24   143 my ($dbh, $schema) = @_;
78 24         193 foreach my $stmt ( split(";", $schema) )
79             {
80             # strip leading/trailing whitespace
81 108         541151 $stmt =~ s/^\s*//;
82 108         1283 $stmt =~ s/\s*$//;
83              
84 108 100       1208 $dbh->do($stmt) if ($stmt);
85             }
86             }
87              
88             sub _expand_version
89             {
90 0     0   0 my ($version) = @_;
91 0         0 return join('.', map { sprintf "%02d", $_ } split('\.', $version));
  0         0  
92             }
93              
94             # Hopefully, this will make adding new changes that break db compatability a
95             # little easier. Change the database schema above, then add a check for your
96             # version like the examples below.
97             sub _upgrade
98             {
99 0     0   0 my $dbh = shift;
100 0         0 my @versions = ('0.1.7', '0.1.8', '0.2.3', '0.2.9', '0.2.10');
101              
102             # Funny lexical scoping rules require this to be an anonymous sub or weird
103             # things will happen with $dbh
104             my $meta_version = sub {
105 0     0   0 my $check_version = shift;
106 0         0 my $version;
107 0         0 eval {
108 0         0 ($version) = $dbh->selectrow_array(
109             "SELECT value FROM meta WHERE key = 'version'"
110             );
111             };
112             # TODO: we need to split the version and pad parts of it with zeros for
113             # an accurate version comparison.
114 0   0     0 return (!$@) && (_expand_version($version) ge _expand_version($check_version));
115 0         0 };
116              
117             # These should return true if the test passes (no upgrade needed)
118             my %tests = (
119             # The meta-table was added in 0.1.8, so we can't use that...
120             '0.1.7' => sub {
121 0     0   0 eval {
122 0         0 $dbh->selectrow_array("SELECT timestamp, size FROM messages LIMIT 1");
123             };
124 0         0 return (!$@);
125             },
126 0     0   0 '0.1.8' => sub { $meta_version->('0.1.8') },
127 0     0   0 '0.2.3' => sub { $meta_version->('0.2.3') },
128 0     0   0 '0.2.9' => sub { $meta_version->('0.2.9') },
129 0     0   0 '0.2.10' => sub { $meta_version->('0.2.10') },
130 0         0 );
131              
132             my %repairs = (
133             '0.1.7' => sub {
134 0     0   0 $dbh->do('ALTER TABLE messages ADD COLUMN timestamp INT');
135 0         0 $dbh->do('ALTER TABLE messages ADD COLUMN size INT');
136             },
137             '0.1.8' => sub {
138             # 0.1.8 adds a meta table for version info
139 0     0   0 _do_schema($dbh, META_SCHEMA);
140 0         0 $dbh->do(q{INSERT INTO meta (key, value) VALUES ('version', '0.1.8')});
141              
142             # SQLite doesn't have a syntax for modifying column types on primary
143             # keys, and 1.8->1.9 made message_id a text field.
144              
145             # Rename old table and create new one
146 0         0 $dbh->do('ALTER TABLE messages RENAME TO old_messages');
147 0         0 _do_schema($dbh, MESSAGES_SCHEMA_018);
148              
149             # Dump old table into new table
150 0         0 my $columns = q{
151             message_id, destination, persistent,
152             in_use_by, body, timestamp, size
153             };
154              
155 0         0 $dbh->do(qq{
156             INSERT INTO messages ( $columns )
157             SELECT $columns FROM old_messages
158             });
159              
160             # Delete old table
161 0         0 $dbh->do('DROP TABLE old_messages');
162             },
163             '0.2.3' => sub {
164             # we add the deliver_at column
165 0     0   0 $dbh->do("ALTER TABLE messages ADD COLUMN deliver_at INT");
166 0         0 $dbh->do("CREATE INDEX deliver_at_index ON messages ( deliver_at )");
167              
168             # updated the version
169 0         0 $dbh->do("UPDATE meta SET value = '0.2.3' where key = 'version'");
170              
171             # databases created with 0.1.8 or later, didn't correctly add the indexes
172             # to the table (because it feeds MESSAGE_SCHEMA as a single statement to
173             # $db->do() rather than breaking it up);
174 0         0 my $indices = {
175             id_index => "message_id",
176             timestamp_index => "timestamp",
177             destination_index => "destination",
178             in_use_by_index => "in_use_by"
179             };
180 0         0 while (my ($name, $column) = each %$indices)
181             {
182             eval
183 0         0 {
184 0         0 $dbh->do("CREATE INDEX $name ON messages ( $column )");
185             };
186             }
187             },
188             '0.2.9' => sub {
189             # NOTE: Here we *would* change timestamp from INT to DECIMAL(15,5) but
190             # not only is that not possible via SQLite3's ALTER statement, but it makes
191             # no difference what so ever in SQLite3.
192            
193             # update the version
194 0     0   0 $dbh->do("UPDATE meta SET value = '0.2.9' where key = 'version'");
195             },
196             '0.2.10' => sub {
197             # NOTE: Here we *would* change in_use_by from INT to VARCHAR(255) but
198             # not only is that not possible via SQLite3's ALTER statement, but it makes
199             # no difference what so ever in SQLite3.
200            
201             # update the version
202 0     0   0 $dbh->do("UPDATE meta SET value = '0.2.10' where key = 'version'");
203             }
204 0         0 );
205              
206 0         0 my $do_repairs = 0;
207 0         0 foreach my $ver (@versions)
208             {
209 0 0       0 unless ($do_repairs)
210             {
211 0         0 my $success = $tests{$ver}->();
212 0 0       0 unless ($success)
213             {
214 0         0 $dbh->begin_work();
215 0         0 print STDERR "WARNING: User database is older than $ver.\n";
216 0         0 print STDERR "WARNING: Performing in-place upgrade...";
217 0         0 $do_repairs = 1;
218             }
219             }
220              
221 0 0       0 if ($do_repairs)
222             {
223 0         0 eval { $repairs{$ver}->() };
  0         0  
224 0 0       0 if ($@)
225             {
226 0         0 $dbh->rollback();
227 0         0 die "encountered errors: $@: rolling back.\n";
228             }
229             }
230             }
231 0 0       0 if ($do_repairs)
232             {
233 0         0 $dbh->commit();
234 0         0 print STDERR "upgrade complete.\n";
235             }
236             }
237              
238             sub _make_db
239             {
240 12     12   42055 my ($file, $dsn, $username, $password) = @_;
241 12         213 my $db_exists = (-f $file);
242 12         191 my $dbh = DBI->connect(
243             $dsn,
244             $username,
245             $password,
246             { RaiseError => 1 }
247             );
248              
249 12 50       54652 if ( $db_exists )
250             {
251 0         0 _upgrade($dbh);
252             }
253             else
254             {
255 12         91 _do_schema($dbh, MESSAGES_SCHEMA);
256 12         282 _do_schema($dbh, META_SCHEMA);
257 12         180 $dbh->do(q{INSERT INTO meta (key, value) VALUES ('version', '0.2.10')});
258             }
259 12         64006 $dbh->disconnect();
260             }
261              
262             sub new
263             {
264 0     0 0   my $class = shift;
265 0 0         my $args = (@_ > 1 ? {@_} : $_[0]);
266              
267 0   0       my $data_dir = $args->{data_dir} || die "No data dir.";
268              
269 0 0 0       (-d $data_dir) ||
270             mkdir $data_dir ||
271             die "Couldn't make data dir '$data_dir': $!";
272              
273 0           my $db_file = "$data_dir/mq.db";
274 0           my $db_dsn = "DBI:SQLite:dbname=$db_file";
275 0           my $db_username = q();
276 0           my $db_password = q();
277              
278 0           _make_db($db_file, $db_dsn, $db_username, $db_password);
279              
280 0           my $dbi = POE::Component::MessageQueue::Storage::DBI->new(
281             dsn => $db_dsn,
282             username => $db_username,
283             password => $db_password,
284             );
285              
286 0           my $fs = POE::Component::MessageQueue::Storage::FileSystem->new(
287             info_storage => $dbi,
288             data_dir => $data_dir,
289             );
290            
291             my $throttled = POE::Component::MessageQueue::Storage::Throttled->new(
292             back => $fs,
293 0   0       throttle_max => $args->{throttle_max} || 2,
294             );
295              
296             # We don't bless anything because we're just returning a Complex...
297             return POE::Component::MessageQueue::Storage::Complex->new(
298             timeout => $args->{timeout} || 4,
299             granularity => $args->{granularity} || 2,
300             front_max => $args->{front_max} || 64 * 1024 * 1024,
301             front => $args->{front} || $args->{front_store} ||
302 0   0       POE::Component::MessageQueue::Storage::BigMemory->new(),
      0        
      0        
      0        
303             back => $throttled,
304             );
305             }
306              
307             1;
308              
309             =pod
310              
311             =head1 NAME
312              
313             POE::Component::MessageQueue::Storage::Default -- The default storage engine (based on Complex), recommended for the most common case and used by mq.pl.
314              
315             =head1 SYNOPSIS
316              
317             use POE;
318             use POE::Component::MessageQueue;
319             use POE::Component::MessageQueue::Storage::Default;
320             use strict;
321              
322             my $DATA_DIR = '/tmp/perl_mq';
323              
324             POE::Component::MessageQueue->new({
325             storage => POE::Component::MessageQueue::Storage::Default->new({
326             data_dir => $DATA_DIR,
327             timeout => 4,
328             throttle_max => 2,
329              
330             # Alternative memory store available!
331             #front => POE::Component::MessageQueue::Storage::BigMemory->new(),
332             })
333             });
334              
335             POE::Kernel->run();
336             exit;
337              
338             =head1 DESCRIPTION
339              
340             This storage engine combines all the other provided engines. It uses
341             L<POE::Component::MessageQueue::Storage::BigMemory> as the front store and
342             L<POE::Component::MessageQueue::Storage::FileSystem> as the back store
343             for L<POE::Componenet::MessageQueue::Storage::Complex> and provides some other
344             sensible and recommended defaults, though you can override them in most cases.
345             Message are initially put into the front-end storage and will be moved into the
346             backend storage after a given number of seconds (defaults to 4).
347              
348             The L<POE::Component::MessageQueue::Storage::FileSystem> component used
349             internally uses L<POE::Component::MessageQueue::Storage::DBI> with a
350             L<DBD::SQLite> database. It is also throttled via
351             L<POE::Component::MessageQueue::Storage::Throttled>.
352              
353             This is the recommended storage engine. It should provide the best performance
354             while (if configured sanely) still providing a reasonable amount of persistence
355             with little risk of eating all your memory under high load. This is also the
356             only storage backend to correctly honor the persistent flag and will only
357             persist those messages with it set.
358              
359             =head1 CONSTRUCTOR PARAMETERS
360              
361             =over 2
362              
363             =item timeout => SCALAR
364              
365             The number of seconds after a message enters the front-store before it
366             expires. After this time, if the message hasn't been removed, it will be
367             moved into the backstore.
368              
369             =item granularity => SCALAR
370              
371             The number of seconds to wait between checks for timeout expiration.
372              
373             =item data_dir => SCALAR
374              
375             The directory to store the SQLite database file and the message bodies.
376              
377             =item throttle_max => SCALAR
378              
379             The max number of messages that can be sent to the DBI store at once.
380             This value is passed directly to the underlying
381             L<POE::Component::MessageQueue::Storage::Throttled>.
382              
383             =item front_max => SCALAR
384              
385             The maximum number of bytes to allow the front store to grow to. If the front
386             store grows to big, old messages will be "pushed off" to make room for new
387             messages.
388              
389             =item front => SCALAR
390              
391             An optional reference to a storage engine to use as the front store instead of
392             L<POE::Component::MessageQueue::Storage::BigMemory>.
393              
394             =back
395              
396             =head1 SUPPORTED STOMP HEADERS
397              
398             Same as L<POE::Component::MessageQueue::Storage::Complex>.
399              
400             =head1 SEE ALSO
401              
402             L<POE::Component::MessageQueue>,
403             L<POE::Component::MessageQueue::Storage>,
404             L<DBI>,
405             L<DBD::SQLite>
406              
407             I<Other storage engines:>
408              
409             L<POE::Component::MessageQueue::Storage::Memory>,
410             L<POE::Component::MessageQueue::Storage::BigMemory>,
411             L<POE::Component::MessageQueue::Storage::FileSystem>,
412             L<POE::Component::MessageQueue::Storage::DBI>,
413             L<POE::Component::MessageQueue::Storage::Generic>,
414             L<POE::Component::MessageQueue::Storage::Generic::DBI>,
415             L<POE::Component::MessageQueue::Storage::Throttled>,
416             L<POE::Component::MessageQueue::Storage::Complex>
417