line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
# |
2
|
|
|
|
|
|
|
# Copyright 2007, 2008 Paul Driver <frodwith@gmail.com> |
3
|
|
|
|
|
|
|
# |
4
|
|
|
|
|
|
|
# This program is free software: you can redistribute it and/or modify |
5
|
|
|
|
|
|
|
# it under the terms of the GNU General Public License as published by |
6
|
|
|
|
|
|
|
# the Free Software Foundation, either version 2 of the License, or |
7
|
|
|
|
|
|
|
# (at your option) any later version. |
8
|
|
|
|
|
|
|
# |
9
|
|
|
|
|
|
|
# This program is distributed in the hope that it will be useful, |
10
|
|
|
|
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
11
|
|
|
|
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
12
|
|
|
|
|
|
|
# GNU General Public License for more details. |
13
|
|
|
|
|
|
|
# |
14
|
|
|
|
|
|
|
# You should have received a copy of the GNU General Public License |
15
|
|
|
|
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16
|
|
|
|
|
|
|
# |
17
|
|
|
|
|
|
|
|
18
|
|
|
|
|
|
|
package POE::Component::MessageQueue::Storage::Default; |
19
|
|
|
|
|
|
|
|
20
|
|
|
|
|
|
|
# Not using moose for this cause it's just a frontend to whatever our |
21
|
|
|
|
|
|
|
# recommended storage engine is. There's no point. |
22
|
|
|
|
|
|
|
|
23
|
10
|
|
|
10
|
|
226729
|
use strict; |
|
10
|
|
|
|
|
30
|
|
|
10
|
|
|
|
|
251
|
|
24
|
10
|
|
|
10
|
|
50
|
use warnings; |
|
10
|
|
|
|
|
26
|
|
|
10
|
|
|
|
|
246
|
|
25
|
10
|
|
|
10
|
|
3796
|
use POE::Component::MessageQueue::Storage::Throttled; |
|
10
|
|
|
|
|
46
|
|
|
10
|
|
|
|
|
433
|
|
26
|
10
|
|
|
10
|
|
6024
|
use POE::Component::MessageQueue::Storage::DBI; |
|
10
|
|
|
|
|
43
|
|
|
10
|
|
|
|
|
381
|
|
27
|
10
|
|
|
10
|
|
6134
|
use POE::Component::MessageQueue::Storage::FileSystem; |
|
10
|
|
|
|
|
43
|
|
|
10
|
|
|
|
|
384
|
|
28
|
10
|
|
|
10
|
|
97
|
use POE::Component::MessageQueue::Storage::BigMemory; |
|
10
|
|
|
|
|
23
|
|
|
10
|
|
|
|
|
272
|
|
29
|
10
|
|
|
10
|
|
6108
|
use POE::Component::MessageQueue::Storage::Complex; |
|
10
|
|
|
|
|
48
|
|
|
10
|
|
|
|
|
448
|
|
30
|
10
|
|
|
10
|
|
14279
|
use DBI; |
|
10
|
|
|
|
|
130384
|
|
|
10
|
|
|
|
|
708
|
|
31
|
|
|
|
|
|
|
|
32
|
10
|
|
|
10
|
|
105
|
use constant META_SCHEMA => <<'EOF'; |
|
10
|
|
|
|
|
30
|
|
|
10
|
|
|
|
|
983
|
|
33
|
|
|
|
|
|
|
CREATE TABLE meta |
34
|
|
|
|
|
|
|
( |
35
|
|
|
|
|
|
|
key varchar(255) primary key, |
36
|
|
|
|
|
|
|
value varchar(255) |
37
|
|
|
|
|
|
|
); |
38
|
|
|
|
|
|
|
EOF |
39
|
|
|
|
|
|
|
|
40
|
10
|
|
|
10
|
|
73
|
use constant MESSAGES_SCHEMA_018 => <<'EOF'; |
|
10
|
|
|
|
|
26
|
|
|
10
|
|
|
|
|
598
|
|
41
|
|
|
|
|
|
|
CREATE TABLE messages |
42
|
|
|
|
|
|
|
( |
43
|
|
|
|
|
|
|
message_id varchar(255) primary key, |
44
|
|
|
|
|
|
|
destination varchar(255) not null, |
45
|
|
|
|
|
|
|
persistent char(1) default 'Y' not null, |
46
|
|
|
|
|
|
|
in_use_by int, |
47
|
|
|
|
|
|
|
body text, |
48
|
|
|
|
|
|
|
timestamp int, |
49
|
|
|
|
|
|
|
size int |
50
|
|
|
|
|
|
|
); |
51
|
|
|
|
|
|
|
|
52
|
|
|
|
|
|
|
EOF |
53
|
|
|
|
|
|
|
|
54
|
10
|
|
|
10
|
|
63
|
use constant MESSAGES_SCHEMA => <<'EOF'; |
|
10
|
|
|
|
|
24
|
|
|
10
|
|
|
|
|
9125
|
|
55
|
|
|
|
|
|
|
CREATE TABLE messages |
56
|
|
|
|
|
|
|
( |
57
|
|
|
|
|
|
|
message_id varchar(255) primary key, |
58
|
|
|
|
|
|
|
destination varchar(255) not null, |
59
|
|
|
|
|
|
|
persistent char(1) default 'Y' not null, |
60
|
|
|
|
|
|
|
in_use_by varchar(255), |
61
|
|
|
|
|
|
|
body text, |
62
|
|
|
|
|
|
|
timestamp decimal(15,5), |
63
|
|
|
|
|
|
|
size int, |
64
|
|
|
|
|
|
|
deliver_at int |
65
|
|
|
|
|
|
|
); |
66
|
|
|
|
|
|
|
|
67
|
|
|
|
|
|
|
CREATE INDEX id_index ON messages ( message_id ); |
68
|
|
|
|
|
|
|
CREATE INDEX timestamp_index ON messages ( timestamp ); |
69
|
|
|
|
|
|
|
CREATE INDEX destination_index ON messages ( destination ); |
70
|
|
|
|
|
|
|
CREATE INDEX in_use_by_index ON messages ( in_use_by ); |
71
|
|
|
|
|
|
|
CREATE INDEX deliver_at ON messages ( deliver_at ); |
72
|
|
|
|
|
|
|
|
73
|
|
|
|
|
|
|
EOF |
74
|
|
|
|
|
|
|
|
75
|
|
|
|
|
|
|
sub _do_schema |
76
|
|
|
|
|
|
|
{ |
77
|
24
|
|
|
24
|
|
143
|
my ($dbh, $schema) = @_; |
78
|
24
|
|
|
|
|
193
|
foreach my $stmt ( split(";", $schema) ) |
79
|
|
|
|
|
|
|
{ |
80
|
|
|
|
|
|
|
# strip leading/trailing whitespace |
81
|
108
|
|
|
|
|
541151
|
$stmt =~ s/^\s*//; |
82
|
108
|
|
|
|
|
1283
|
$stmt =~ s/\s*$//; |
83
|
|
|
|
|
|
|
|
84
|
108
|
100
|
|
|
|
1208
|
$dbh->do($stmt) if ($stmt); |
85
|
|
|
|
|
|
|
} |
86
|
|
|
|
|
|
|
} |
87
|
|
|
|
|
|
|
|
88
|
|
|
|
|
|
|
sub _expand_version |
89
|
|
|
|
|
|
|
{ |
90
|
0
|
|
|
0
|
|
0
|
my ($version) = @_; |
91
|
0
|
|
|
|
|
0
|
return join('.', map { sprintf "%02d", $_ } split('\.', $version)); |
|
0
|
|
|
|
|
0
|
|
92
|
|
|
|
|
|
|
} |
93
|
|
|
|
|
|
|
|
94
|
|
|
|
|
|
|
# Hopefully, this will make adding new changes that break db compatability a |
95
|
|
|
|
|
|
|
# little easier. Change the database schema above, then add a check for your |
96
|
|
|
|
|
|
|
# version like the examples below. |
97
|
|
|
|
|
|
|
sub _upgrade |
98
|
|
|
|
|
|
|
{ |
99
|
0
|
|
|
0
|
|
0
|
my $dbh = shift; |
100
|
0
|
|
|
|
|
0
|
my @versions = ('0.1.7', '0.1.8', '0.2.3', '0.2.9', '0.2.10'); |
101
|
|
|
|
|
|
|
|
102
|
|
|
|
|
|
|
# Funny lexical scoping rules require this to be an anonymous sub or weird |
103
|
|
|
|
|
|
|
# things will happen with $dbh |
104
|
|
|
|
|
|
|
my $meta_version = sub { |
105
|
0
|
|
|
0
|
|
0
|
my $check_version = shift; |
106
|
0
|
|
|
|
|
0
|
my $version; |
107
|
0
|
|
|
|
|
0
|
eval { |
108
|
0
|
|
|
|
|
0
|
($version) = $dbh->selectrow_array( |
109
|
|
|
|
|
|
|
"SELECT value FROM meta WHERE key = 'version'" |
110
|
|
|
|
|
|
|
); |
111
|
|
|
|
|
|
|
}; |
112
|
|
|
|
|
|
|
# TODO: we need to split the version and pad parts of it with zeros for |
113
|
|
|
|
|
|
|
# an accurate version comparison. |
114
|
0
|
|
0
|
|
|
0
|
return (!$@) && (_expand_version($version) ge _expand_version($check_version)); |
115
|
0
|
|
|
|
|
0
|
}; |
116
|
|
|
|
|
|
|
|
117
|
|
|
|
|
|
|
# These should return true if the test passes (no upgrade needed) |
118
|
|
|
|
|
|
|
my %tests = ( |
119
|
|
|
|
|
|
|
# The meta-table was added in 0.1.8, so we can't use that... |
120
|
|
|
|
|
|
|
'0.1.7' => sub { |
121
|
0
|
|
|
0
|
|
0
|
eval { |
122
|
0
|
|
|
|
|
0
|
$dbh->selectrow_array("SELECT timestamp, size FROM messages LIMIT 1"); |
123
|
|
|
|
|
|
|
}; |
124
|
0
|
|
|
|
|
0
|
return (!$@); |
125
|
|
|
|
|
|
|
}, |
126
|
0
|
|
|
0
|
|
0
|
'0.1.8' => sub { $meta_version->('0.1.8') }, |
127
|
0
|
|
|
0
|
|
0
|
'0.2.3' => sub { $meta_version->('0.2.3') }, |
128
|
0
|
|
|
0
|
|
0
|
'0.2.9' => sub { $meta_version->('0.2.9') }, |
129
|
0
|
|
|
0
|
|
0
|
'0.2.10' => sub { $meta_version->('0.2.10') }, |
130
|
0
|
|
|
|
|
0
|
); |
131
|
|
|
|
|
|
|
|
132
|
|
|
|
|
|
|
my %repairs = ( |
133
|
|
|
|
|
|
|
'0.1.7' => sub { |
134
|
0
|
|
|
0
|
|
0
|
$dbh->do('ALTER TABLE messages ADD COLUMN timestamp INT'); |
135
|
0
|
|
|
|
|
0
|
$dbh->do('ALTER TABLE messages ADD COLUMN size INT'); |
136
|
|
|
|
|
|
|
}, |
137
|
|
|
|
|
|
|
'0.1.8' => sub { |
138
|
|
|
|
|
|
|
# 0.1.8 adds a meta table for version info |
139
|
0
|
|
|
0
|
|
0
|
_do_schema($dbh, META_SCHEMA); |
140
|
0
|
|
|
|
|
0
|
$dbh->do(q{INSERT INTO meta (key, value) VALUES ('version', '0.1.8')}); |
141
|
|
|
|
|
|
|
|
142
|
|
|
|
|
|
|
# SQLite doesn't have a syntax for modifying column types on primary |
143
|
|
|
|
|
|
|
# keys, and 1.8->1.9 made message_id a text field. |
144
|
|
|
|
|
|
|
|
145
|
|
|
|
|
|
|
# Rename old table and create new one |
146
|
0
|
|
|
|
|
0
|
$dbh->do('ALTER TABLE messages RENAME TO old_messages'); |
147
|
0
|
|
|
|
|
0
|
_do_schema($dbh, MESSAGES_SCHEMA_018); |
148
|
|
|
|
|
|
|
|
149
|
|
|
|
|
|
|
# Dump old table into new table |
150
|
0
|
|
|
|
|
0
|
my $columns = q{ |
151
|
|
|
|
|
|
|
message_id, destination, persistent, |
152
|
|
|
|
|
|
|
in_use_by, body, timestamp, size |
153
|
|
|
|
|
|
|
}; |
154
|
|
|
|
|
|
|
|
155
|
0
|
|
|
|
|
0
|
$dbh->do(qq{ |
156
|
|
|
|
|
|
|
INSERT INTO messages ( $columns ) |
157
|
|
|
|
|
|
|
SELECT $columns FROM old_messages |
158
|
|
|
|
|
|
|
}); |
159
|
|
|
|
|
|
|
|
160
|
|
|
|
|
|
|
# Delete old table |
161
|
0
|
|
|
|
|
0
|
$dbh->do('DROP TABLE old_messages'); |
162
|
|
|
|
|
|
|
}, |
163
|
|
|
|
|
|
|
'0.2.3' => sub { |
164
|
|
|
|
|
|
|
# we add the deliver_at column |
165
|
0
|
|
|
0
|
|
0
|
$dbh->do("ALTER TABLE messages ADD COLUMN deliver_at INT"); |
166
|
0
|
|
|
|
|
0
|
$dbh->do("CREATE INDEX deliver_at_index ON messages ( deliver_at )"); |
167
|
|
|
|
|
|
|
|
168
|
|
|
|
|
|
|
# updated the version |
169
|
0
|
|
|
|
|
0
|
$dbh->do("UPDATE meta SET value = '0.2.3' where key = 'version'"); |
170
|
|
|
|
|
|
|
|
171
|
|
|
|
|
|
|
# databases created with 0.1.8 or later, didn't correctly add the indexes |
172
|
|
|
|
|
|
|
# to the table (because it feeds MESSAGE_SCHEMA as a single statement to |
173
|
|
|
|
|
|
|
# $db->do() rather than breaking it up); |
174
|
0
|
|
|
|
|
0
|
my $indices = { |
175
|
|
|
|
|
|
|
id_index => "message_id", |
176
|
|
|
|
|
|
|
timestamp_index => "timestamp", |
177
|
|
|
|
|
|
|
destination_index => "destination", |
178
|
|
|
|
|
|
|
in_use_by_index => "in_use_by" |
179
|
|
|
|
|
|
|
}; |
180
|
0
|
|
|
|
|
0
|
while (my ($name, $column) = each %$indices) |
181
|
|
|
|
|
|
|
{ |
182
|
|
|
|
|
|
|
eval |
183
|
0
|
|
|
|
|
0
|
{ |
184
|
0
|
|
|
|
|
0
|
$dbh->do("CREATE INDEX $name ON messages ( $column )"); |
185
|
|
|
|
|
|
|
}; |
186
|
|
|
|
|
|
|
} |
187
|
|
|
|
|
|
|
}, |
188
|
|
|
|
|
|
|
'0.2.9' => sub { |
189
|
|
|
|
|
|
|
# NOTE: Here we *would* change timestamp from INT to DECIMAL(15,5) but |
190
|
|
|
|
|
|
|
# not only is that not possible via SQLite3's ALTER statement, but it makes |
191
|
|
|
|
|
|
|
# no difference what so ever in SQLite3. |
192
|
|
|
|
|
|
|
|
193
|
|
|
|
|
|
|
# update the version |
194
|
0
|
|
|
0
|
|
0
|
$dbh->do("UPDATE meta SET value = '0.2.9' where key = 'version'"); |
195
|
|
|
|
|
|
|
}, |
196
|
|
|
|
|
|
|
'0.2.10' => sub { |
197
|
|
|
|
|
|
|
# NOTE: Here we *would* change in_use_by from INT to VARCHAR(255) but |
198
|
|
|
|
|
|
|
# not only is that not possible via SQLite3's ALTER statement, but it makes |
199
|
|
|
|
|
|
|
# no difference what so ever in SQLite3. |
200
|
|
|
|
|
|
|
|
201
|
|
|
|
|
|
|
# update the version |
202
|
0
|
|
|
0
|
|
0
|
$dbh->do("UPDATE meta SET value = '0.2.10' where key = 'version'"); |
203
|
|
|
|
|
|
|
} |
204
|
0
|
|
|
|
|
0
|
); |
205
|
|
|
|
|
|
|
|
206
|
0
|
|
|
|
|
0
|
my $do_repairs = 0; |
207
|
0
|
|
|
|
|
0
|
foreach my $ver (@versions) |
208
|
|
|
|
|
|
|
{ |
209
|
0
|
0
|
|
|
|
0
|
unless ($do_repairs) |
210
|
|
|
|
|
|
|
{ |
211
|
0
|
|
|
|
|
0
|
my $success = $tests{$ver}->(); |
212
|
0
|
0
|
|
|
|
0
|
unless ($success) |
213
|
|
|
|
|
|
|
{ |
214
|
0
|
|
|
|
|
0
|
$dbh->begin_work(); |
215
|
0
|
|
|
|
|
0
|
print STDERR "WARNING: User database is older than $ver.\n"; |
216
|
0
|
|
|
|
|
0
|
print STDERR "WARNING: Performing in-place upgrade..."; |
217
|
0
|
|
|
|
|
0
|
$do_repairs = 1; |
218
|
|
|
|
|
|
|
} |
219
|
|
|
|
|
|
|
} |
220
|
|
|
|
|
|
|
|
221
|
0
|
0
|
|
|
|
0
|
if ($do_repairs) |
222
|
|
|
|
|
|
|
{ |
223
|
0
|
|
|
|
|
0
|
eval { $repairs{$ver}->() }; |
|
0
|
|
|
|
|
0
|
|
224
|
0
|
0
|
|
|
|
0
|
if ($@) |
225
|
|
|
|
|
|
|
{ |
226
|
0
|
|
|
|
|
0
|
$dbh->rollback(); |
227
|
0
|
|
|
|
|
0
|
die "encountered errors: $@: rolling back.\n"; |
228
|
|
|
|
|
|
|
} |
229
|
|
|
|
|
|
|
} |
230
|
|
|
|
|
|
|
} |
231
|
0
|
0
|
|
|
|
0
|
if ($do_repairs) |
232
|
|
|
|
|
|
|
{ |
233
|
0
|
|
|
|
|
0
|
$dbh->commit(); |
234
|
0
|
|
|
|
|
0
|
print STDERR "upgrade complete.\n"; |
235
|
|
|
|
|
|
|
} |
236
|
|
|
|
|
|
|
} |
237
|
|
|
|
|
|
|
|
238
|
|
|
|
|
|
|
sub _make_db |
239
|
|
|
|
|
|
|
{ |
240
|
12
|
|
|
12
|
|
42055
|
my ($file, $dsn, $username, $password) = @_; |
241
|
12
|
|
|
|
|
213
|
my $db_exists = (-f $file); |
242
|
12
|
|
|
|
|
191
|
my $dbh = DBI->connect( |
243
|
|
|
|
|
|
|
$dsn, |
244
|
|
|
|
|
|
|
$username, |
245
|
|
|
|
|
|
|
$password, |
246
|
|
|
|
|
|
|
{ RaiseError => 1 } |
247
|
|
|
|
|
|
|
); |
248
|
|
|
|
|
|
|
|
249
|
12
|
50
|
|
|
|
54652
|
if ( $db_exists ) |
250
|
|
|
|
|
|
|
{ |
251
|
0
|
|
|
|
|
0
|
_upgrade($dbh); |
252
|
|
|
|
|
|
|
} |
253
|
|
|
|
|
|
|
else |
254
|
|
|
|
|
|
|
{ |
255
|
12
|
|
|
|
|
91
|
_do_schema($dbh, MESSAGES_SCHEMA); |
256
|
12
|
|
|
|
|
282
|
_do_schema($dbh, META_SCHEMA); |
257
|
12
|
|
|
|
|
180
|
$dbh->do(q{INSERT INTO meta (key, value) VALUES ('version', '0.2.10')}); |
258
|
|
|
|
|
|
|
} |
259
|
12
|
|
|
|
|
64006
|
$dbh->disconnect(); |
260
|
|
|
|
|
|
|
} |
261
|
|
|
|
|
|
|
|
262
|
|
|
|
|
|
|
sub new |
263
|
|
|
|
|
|
|
{ |
264
|
0
|
|
|
0
|
0
|
|
my $class = shift; |
265
|
0
|
0
|
|
|
|
|
my $args = (@_ > 1 ? {@_} : $_[0]); |
266
|
|
|
|
|
|
|
|
267
|
0
|
|
0
|
|
|
|
my $data_dir = $args->{data_dir} || die "No data dir."; |
268
|
|
|
|
|
|
|
|
269
|
0
|
0
|
0
|
|
|
|
(-d $data_dir) || |
270
|
|
|
|
|
|
|
mkdir $data_dir || |
271
|
|
|
|
|
|
|
die "Couldn't make data dir '$data_dir': $!"; |
272
|
|
|
|
|
|
|
|
273
|
0
|
|
|
|
|
|
my $db_file = "$data_dir/mq.db"; |
274
|
0
|
|
|
|
|
|
my $db_dsn = "DBI:SQLite:dbname=$db_file"; |
275
|
0
|
|
|
|
|
|
my $db_username = q(); |
276
|
0
|
|
|
|
|
|
my $db_password = q(); |
277
|
|
|
|
|
|
|
|
278
|
0
|
|
|
|
|
|
_make_db($db_file, $db_dsn, $db_username, $db_password); |
279
|
|
|
|
|
|
|
|
280
|
0
|
|
|
|
|
|
my $dbi = POE::Component::MessageQueue::Storage::DBI->new( |
281
|
|
|
|
|
|
|
dsn => $db_dsn, |
282
|
|
|
|
|
|
|
username => $db_username, |
283
|
|
|
|
|
|
|
password => $db_password, |
284
|
|
|
|
|
|
|
); |
285
|
|
|
|
|
|
|
|
286
|
0
|
|
|
|
|
|
my $fs = POE::Component::MessageQueue::Storage::FileSystem->new( |
287
|
|
|
|
|
|
|
info_storage => $dbi, |
288
|
|
|
|
|
|
|
data_dir => $data_dir, |
289
|
|
|
|
|
|
|
); |
290
|
|
|
|
|
|
|
|
291
|
|
|
|
|
|
|
my $throttled = POE::Component::MessageQueue::Storage::Throttled->new( |
292
|
|
|
|
|
|
|
back => $fs, |
293
|
0
|
|
0
|
|
|
|
throttle_max => $args->{throttle_max} || 2, |
294
|
|
|
|
|
|
|
); |
295
|
|
|
|
|
|
|
|
296
|
|
|
|
|
|
|
# We don't bless anything because we're just returning a Complex... |
297
|
|
|
|
|
|
|
return POE::Component::MessageQueue::Storage::Complex->new( |
298
|
|
|
|
|
|
|
timeout => $args->{timeout} || 4, |
299
|
|
|
|
|
|
|
granularity => $args->{granularity} || 2, |
300
|
|
|
|
|
|
|
front_max => $args->{front_max} || 64 * 1024 * 1024, |
301
|
|
|
|
|
|
|
front => $args->{front} || $args->{front_store} || |
302
|
0
|
|
0
|
|
|
|
POE::Component::MessageQueue::Storage::BigMemory->new(), |
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
303
|
|
|
|
|
|
|
back => $throttled, |
304
|
|
|
|
|
|
|
); |
305
|
|
|
|
|
|
|
} |
306
|
|
|
|
|
|
|
|
307
|
|
|
|
|
|
|
1; |
308
|
|
|
|
|
|
|
|
309
|
|
|
|
|
|
|
=pod |
310
|
|
|
|
|
|
|
|
311
|
|
|
|
|
|
|
=head1 NAME |
312
|
|
|
|
|
|
|
|
313
|
|
|
|
|
|
|
POE::Component::MessageQueue::Storage::Default -- The default storage engine (based on Complex), recommended for the most common case and used by mq.pl. |
314
|
|
|
|
|
|
|
|
315
|
|
|
|
|
|
|
=head1 SYNOPSIS |
316
|
|
|
|
|
|
|
|
317
|
|
|
|
|
|
|
use POE; |
318
|
|
|
|
|
|
|
use POE::Component::MessageQueue; |
319
|
|
|
|
|
|
|
use POE::Component::MessageQueue::Storage::Default; |
320
|
|
|
|
|
|
|
use strict; |
321
|
|
|
|
|
|
|
|
322
|
|
|
|
|
|
|
my $DATA_DIR = '/tmp/perl_mq'; |
323
|
|
|
|
|
|
|
|
324
|
|
|
|
|
|
|
POE::Component::MessageQueue->new({ |
325
|
|
|
|
|
|
|
storage => POE::Component::MessageQueue::Storage::Default->new({ |
326
|
|
|
|
|
|
|
data_dir => $DATA_DIR, |
327
|
|
|
|
|
|
|
timeout => 4, |
328
|
|
|
|
|
|
|
throttle_max => 2, |
329
|
|
|
|
|
|
|
|
330
|
|
|
|
|
|
|
# Alternative memory store available! |
331
|
|
|
|
|
|
|
#front => POE::Component::MessageQueue::Storage::BigMemory->new(), |
332
|
|
|
|
|
|
|
}) |
333
|
|
|
|
|
|
|
}); |
334
|
|
|
|
|
|
|
|
335
|
|
|
|
|
|
|
POE::Kernel->run(); |
336
|
|
|
|
|
|
|
exit; |
337
|
|
|
|
|
|
|
|
338
|
|
|
|
|
|
|
=head1 DESCRIPTION |
339
|
|
|
|
|
|
|
|
340
|
|
|
|
|
|
|
This storage engine combines all the other provided engines. It uses |
341
|
|
|
|
|
|
|
L<POE::Component::MessageQueue::Storage::BigMemory> as the front store and |
342
|
|
|
|
|
|
|
L<POE::Component::MessageQueue::Storage::FileSystem> as the back store |
343
|
|
|
|
|
|
|
for L<POE::Componenet::MessageQueue::Storage::Complex> and provides some other |
344
|
|
|
|
|
|
|
sensible and recommended defaults, though you can override them in most cases. |
345
|
|
|
|
|
|
|
Message are initially put into the front-end storage and will be moved into the |
346
|
|
|
|
|
|
|
backend storage after a given number of seconds (defaults to 4). |
347
|
|
|
|
|
|
|
|
348
|
|
|
|
|
|
|
The L<POE::Component::MessageQueue::Storage::FileSystem> component used |
349
|
|
|
|
|
|
|
internally uses L<POE::Component::MessageQueue::Storage::DBI> with a |
350
|
|
|
|
|
|
|
L<DBD::SQLite> database. It is also throttled via |
351
|
|
|
|
|
|
|
L<POE::Component::MessageQueue::Storage::Throttled>. |
352
|
|
|
|
|
|
|
|
353
|
|
|
|
|
|
|
This is the recommended storage engine. It should provide the best performance |
354
|
|
|
|
|
|
|
while (if configured sanely) still providing a reasonable amount of persistence |
355
|
|
|
|
|
|
|
with little risk of eating all your memory under high load. This is also the |
356
|
|
|
|
|
|
|
only storage backend to correctly honor the persistent flag and will only |
357
|
|
|
|
|
|
|
persist those messages with it set. |
358
|
|
|
|
|
|
|
|
359
|
|
|
|
|
|
|
=head1 CONSTRUCTOR PARAMETERS |
360
|
|
|
|
|
|
|
|
361
|
|
|
|
|
|
|
=over 2 |
362
|
|
|
|
|
|
|
|
363
|
|
|
|
|
|
|
=item timeout => SCALAR |
364
|
|
|
|
|
|
|
|
365
|
|
|
|
|
|
|
The number of seconds after a message enters the front-store before it |
366
|
|
|
|
|
|
|
expires. After this time, if the message hasn't been removed, it will be |
367
|
|
|
|
|
|
|
moved into the backstore. |
368
|
|
|
|
|
|
|
|
369
|
|
|
|
|
|
|
=item granularity => SCALAR |
370
|
|
|
|
|
|
|
|
371
|
|
|
|
|
|
|
The number of seconds to wait between checks for timeout expiration. |
372
|
|
|
|
|
|
|
|
373
|
|
|
|
|
|
|
=item data_dir => SCALAR |
374
|
|
|
|
|
|
|
|
375
|
|
|
|
|
|
|
The directory to store the SQLite database file and the message bodies. |
376
|
|
|
|
|
|
|
|
377
|
|
|
|
|
|
|
=item throttle_max => SCALAR |
378
|
|
|
|
|
|
|
|
379
|
|
|
|
|
|
|
The max number of messages that can be sent to the DBI store at once. |
380
|
|
|
|
|
|
|
This value is passed directly to the underlying |
381
|
|
|
|
|
|
|
L<POE::Component::MessageQueue::Storage::Throttled>. |
382
|
|
|
|
|
|
|
|
383
|
|
|
|
|
|
|
=item front_max => SCALAR |
384
|
|
|
|
|
|
|
|
385
|
|
|
|
|
|
|
The maximum number of bytes to allow the front store to grow to. If the front |
386
|
|
|
|
|
|
|
store grows to big, old messages will be "pushed off" to make room for new |
387
|
|
|
|
|
|
|
messages. |
388
|
|
|
|
|
|
|
|
389
|
|
|
|
|
|
|
=item front => SCALAR |
390
|
|
|
|
|
|
|
|
391
|
|
|
|
|
|
|
An optional reference to a storage engine to use as the front store instead of |
392
|
|
|
|
|
|
|
L<POE::Component::MessageQueue::Storage::BigMemory>. |
393
|
|
|
|
|
|
|
|
394
|
|
|
|
|
|
|
=back |
395
|
|
|
|
|
|
|
|
396
|
|
|
|
|
|
|
=head1 SUPPORTED STOMP HEADERS |
397
|
|
|
|
|
|
|
|
398
|
|
|
|
|
|
|
Same as L<POE::Component::MessageQueue::Storage::Complex>. |
399
|
|
|
|
|
|
|
|
400
|
|
|
|
|
|
|
=head1 SEE ALSO |
401
|
|
|
|
|
|
|
|
402
|
|
|
|
|
|
|
L<POE::Component::MessageQueue>, |
403
|
|
|
|
|
|
|
L<POE::Component::MessageQueue::Storage>, |
404
|
|
|
|
|
|
|
L<DBI>, |
405
|
|
|
|
|
|
|
L<DBD::SQLite> |
406
|
|
|
|
|
|
|
|
407
|
|
|
|
|
|
|
I<Other storage engines:> |
408
|
|
|
|
|
|
|
|
409
|
|
|
|
|
|
|
L<POE::Component::MessageQueue::Storage::Memory>, |
410
|
|
|
|
|
|
|
L<POE::Component::MessageQueue::Storage::BigMemory>, |
411
|
|
|
|
|
|
|
L<POE::Component::MessageQueue::Storage::FileSystem>, |
412
|
|
|
|
|
|
|
L<POE::Component::MessageQueue::Storage::DBI>, |
413
|
|
|
|
|
|
|
L<POE::Component::MessageQueue::Storage::Generic>, |
414
|
|
|
|
|
|
|
L<POE::Component::MessageQueue::Storage::Generic::DBI>, |
415
|
|
|
|
|
|
|
L<POE::Component::MessageQueue::Storage::Throttled>, |
416
|
|
|
|
|
|
|
L<POE::Component::MessageQueue::Storage::Complex> |
417
|
|
|
|
|
|
|
|