| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
# |
|
2
|
|
|
|
|
|
|
# Copyright 2007, 2008 Paul Driver <frodwith@gmail.com> |
|
3
|
|
|
|
|
|
|
# |
|
4
|
|
|
|
|
|
|
# This program is free software: you can redistribute it and/or modify |
|
5
|
|
|
|
|
|
|
# it under the terms of the GNU General Public License as published by |
|
6
|
|
|
|
|
|
|
# the Free Software Foundation, either version 2 of the License, or |
|
7
|
|
|
|
|
|
|
# (at your option) any later version. |
|
8
|
|
|
|
|
|
|
# |
|
9
|
|
|
|
|
|
|
# This program is distributed in the hope that it will be useful, |
|
10
|
|
|
|
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11
|
|
|
|
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12
|
|
|
|
|
|
|
# GNU General Public License for more details. |
|
13
|
|
|
|
|
|
|
# |
|
14
|
|
|
|
|
|
|
# You should have received a copy of the GNU General Public License |
|
15
|
|
|
|
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16
|
|
|
|
|
|
|
# |
|
17
|
|
|
|
|
|
|
|
|
18
|
|
|
|
|
|
|
package POE::Component::MessageQueue::Storage::Default; |
|
19
|
|
|
|
|
|
|
|
|
20
|
|
|
|
|
|
|
# Not using moose for this cause it's just a frontend to whatever our |
|
21
|
|
|
|
|
|
|
# recommended storage engine is. There's no point. |
|
22
|
|
|
|
|
|
|
|
|
23
|
10
|
|
|
10
|
|
226729
|
use strict; |
|
|
10
|
|
|
|
|
30
|
|
|
|
10
|
|
|
|
|
251
|
|
|
24
|
10
|
|
|
10
|
|
50
|
use warnings; |
|
|
10
|
|
|
|
|
26
|
|
|
|
10
|
|
|
|
|
246
|
|
|
25
|
10
|
|
|
10
|
|
3796
|
use POE::Component::MessageQueue::Storage::Throttled; |
|
|
10
|
|
|
|
|
46
|
|
|
|
10
|
|
|
|
|
433
|
|
|
26
|
10
|
|
|
10
|
|
6024
|
use POE::Component::MessageQueue::Storage::DBI; |
|
|
10
|
|
|
|
|
43
|
|
|
|
10
|
|
|
|
|
381
|
|
|
27
|
10
|
|
|
10
|
|
6134
|
use POE::Component::MessageQueue::Storage::FileSystem; |
|
|
10
|
|
|
|
|
43
|
|
|
|
10
|
|
|
|
|
384
|
|
|
28
|
10
|
|
|
10
|
|
97
|
use POE::Component::MessageQueue::Storage::BigMemory; |
|
|
10
|
|
|
|
|
23
|
|
|
|
10
|
|
|
|
|
272
|
|
|
29
|
10
|
|
|
10
|
|
6108
|
use POE::Component::MessageQueue::Storage::Complex; |
|
|
10
|
|
|
|
|
48
|
|
|
|
10
|
|
|
|
|
448
|
|
|
30
|
10
|
|
|
10
|
|
14279
|
use DBI; |
|
|
10
|
|
|
|
|
130384
|
|
|
|
10
|
|
|
|
|
708
|
|
|
31
|
|
|
|
|
|
|
|
|
32
|
10
|
|
|
10
|
|
105
|
use constant META_SCHEMA => <<'EOF'; |
|
|
10
|
|
|
|
|
30
|
|
|
|
10
|
|
|
|
|
983
|
|
|
33
|
|
|
|
|
|
|
CREATE TABLE meta |
|
34
|
|
|
|
|
|
|
( |
|
35
|
|
|
|
|
|
|
key varchar(255) primary key, |
|
36
|
|
|
|
|
|
|
value varchar(255) |
|
37
|
|
|
|
|
|
|
); |
|
38
|
|
|
|
|
|
|
EOF |
|
39
|
|
|
|
|
|
|
|
|
40
|
10
|
|
|
10
|
|
73
|
use constant MESSAGES_SCHEMA_018 => <<'EOF'; |
|
|
10
|
|
|
|
|
26
|
|
|
|
10
|
|
|
|
|
598
|
|
|
41
|
|
|
|
|
|
|
CREATE TABLE messages |
|
42
|
|
|
|
|
|
|
( |
|
43
|
|
|
|
|
|
|
message_id varchar(255) primary key, |
|
44
|
|
|
|
|
|
|
destination varchar(255) not null, |
|
45
|
|
|
|
|
|
|
persistent char(1) default 'Y' not null, |
|
46
|
|
|
|
|
|
|
in_use_by int, |
|
47
|
|
|
|
|
|
|
body text, |
|
48
|
|
|
|
|
|
|
timestamp int, |
|
49
|
|
|
|
|
|
|
size int |
|
50
|
|
|
|
|
|
|
); |
|
51
|
|
|
|
|
|
|
|
|
52
|
|
|
|
|
|
|
EOF |
|
53
|
|
|
|
|
|
|
|
|
54
|
10
|
|
|
10
|
|
63
|
use constant MESSAGES_SCHEMA => <<'EOF'; |
|
|
10
|
|
|
|
|
24
|
|
|
|
10
|
|
|
|
|
9125
|
|
|
55
|
|
|
|
|
|
|
CREATE TABLE messages |
|
56
|
|
|
|
|
|
|
( |
|
57
|
|
|
|
|
|
|
message_id varchar(255) primary key, |
|
58
|
|
|
|
|
|
|
destination varchar(255) not null, |
|
59
|
|
|
|
|
|
|
persistent char(1) default 'Y' not null, |
|
60
|
|
|
|
|
|
|
in_use_by varchar(255), |
|
61
|
|
|
|
|
|
|
body text, |
|
62
|
|
|
|
|
|
|
timestamp decimal(15,5), |
|
63
|
|
|
|
|
|
|
size int, |
|
64
|
|
|
|
|
|
|
deliver_at int |
|
65
|
|
|
|
|
|
|
); |
|
66
|
|
|
|
|
|
|
|
|
67
|
|
|
|
|
|
|
CREATE INDEX id_index ON messages ( message_id ); |
|
68
|
|
|
|
|
|
|
CREATE INDEX timestamp_index ON messages ( timestamp ); |
|
69
|
|
|
|
|
|
|
CREATE INDEX destination_index ON messages ( destination ); |
|
70
|
|
|
|
|
|
|
CREATE INDEX in_use_by_index ON messages ( in_use_by ); |
|
71
|
|
|
|
|
|
|
CREATE INDEX deliver_at ON messages ( deliver_at ); |
|
72
|
|
|
|
|
|
|
|
|
73
|
|
|
|
|
|
|
EOF |
|
74
|
|
|
|
|
|
|
|
|
75
|
|
|
|
|
|
|
sub _do_schema |
|
76
|
|
|
|
|
|
|
{ |
|
77
|
24
|
|
|
24
|
|
143
|
my ($dbh, $schema) = @_; |
|
78
|
24
|
|
|
|
|
193
|
foreach my $stmt ( split(";", $schema) ) |
|
79
|
|
|
|
|
|
|
{ |
|
80
|
|
|
|
|
|
|
# strip leading/trailing whitespace |
|
81
|
108
|
|
|
|
|
541151
|
$stmt =~ s/^\s*//; |
|
82
|
108
|
|
|
|
|
1283
|
$stmt =~ s/\s*$//; |
|
83
|
|
|
|
|
|
|
|
|
84
|
108
|
100
|
|
|
|
1208
|
$dbh->do($stmt) if ($stmt); |
|
85
|
|
|
|
|
|
|
} |
|
86
|
|
|
|
|
|
|
} |
|
87
|
|
|
|
|
|
|
|
|
88
|
|
|
|
|
|
|
sub _expand_version |
|
89
|
|
|
|
|
|
|
{ |
|
90
|
0
|
|
|
0
|
|
0
|
my ($version) = @_; |
|
91
|
0
|
|
|
|
|
0
|
return join('.', map { sprintf "%02d", $_ } split('\.', $version)); |
|
|
0
|
|
|
|
|
0
|
|
|
92
|
|
|
|
|
|
|
} |
|
93
|
|
|
|
|
|
|
|
|
94
|
|
|
|
|
|
|
# Hopefully, this will make adding new changes that break db compatability a |
|
95
|
|
|
|
|
|
|
# little easier. Change the database schema above, then add a check for your |
|
96
|
|
|
|
|
|
|
# version like the examples below. |
|
97
|
|
|
|
|
|
|
sub _upgrade |
|
98
|
|
|
|
|
|
|
{ |
|
99
|
0
|
|
|
0
|
|
0
|
my $dbh = shift; |
|
100
|
0
|
|
|
|
|
0
|
my @versions = ('0.1.7', '0.1.8', '0.2.3', '0.2.9', '0.2.10'); |
|
101
|
|
|
|
|
|
|
|
|
102
|
|
|
|
|
|
|
# Funny lexical scoping rules require this to be an anonymous sub or weird |
|
103
|
|
|
|
|
|
|
# things will happen with $dbh |
|
104
|
|
|
|
|
|
|
my $meta_version = sub { |
|
105
|
0
|
|
|
0
|
|
0
|
my $check_version = shift; |
|
106
|
0
|
|
|
|
|
0
|
my $version; |
|
107
|
0
|
|
|
|
|
0
|
eval { |
|
108
|
0
|
|
|
|
|
0
|
($version) = $dbh->selectrow_array( |
|
109
|
|
|
|
|
|
|
"SELECT value FROM meta WHERE key = 'version'" |
|
110
|
|
|
|
|
|
|
); |
|
111
|
|
|
|
|
|
|
}; |
|
112
|
|
|
|
|
|
|
# TODO: we need to split the version and pad parts of it with zeros for |
|
113
|
|
|
|
|
|
|
# an accurate version comparison. |
|
114
|
0
|
|
0
|
|
|
0
|
return (!$@) && (_expand_version($version) ge _expand_version($check_version)); |
|
115
|
0
|
|
|
|
|
0
|
}; |
|
116
|
|
|
|
|
|
|
|
|
117
|
|
|
|
|
|
|
# These should return true if the test passes (no upgrade needed) |
|
118
|
|
|
|
|
|
|
my %tests = ( |
|
119
|
|
|
|
|
|
|
# The meta-table was added in 0.1.8, so we can't use that... |
|
120
|
|
|
|
|
|
|
'0.1.7' => sub { |
|
121
|
0
|
|
|
0
|
|
0
|
eval { |
|
122
|
0
|
|
|
|
|
0
|
$dbh->selectrow_array("SELECT timestamp, size FROM messages LIMIT 1"); |
|
123
|
|
|
|
|
|
|
}; |
|
124
|
0
|
|
|
|
|
0
|
return (!$@); |
|
125
|
|
|
|
|
|
|
}, |
|
126
|
0
|
|
|
0
|
|
0
|
'0.1.8' => sub { $meta_version->('0.1.8') }, |
|
127
|
0
|
|
|
0
|
|
0
|
'0.2.3' => sub { $meta_version->('0.2.3') }, |
|
128
|
0
|
|
|
0
|
|
0
|
'0.2.9' => sub { $meta_version->('0.2.9') }, |
|
129
|
0
|
|
|
0
|
|
0
|
'0.2.10' => sub { $meta_version->('0.2.10') }, |
|
130
|
0
|
|
|
|
|
0
|
); |
|
131
|
|
|
|
|
|
|
|
|
132
|
|
|
|
|
|
|
my %repairs = ( |
|
133
|
|
|
|
|
|
|
'0.1.7' => sub { |
|
134
|
0
|
|
|
0
|
|
0
|
$dbh->do('ALTER TABLE messages ADD COLUMN timestamp INT'); |
|
135
|
0
|
|
|
|
|
0
|
$dbh->do('ALTER TABLE messages ADD COLUMN size INT'); |
|
136
|
|
|
|
|
|
|
}, |
|
137
|
|
|
|
|
|
|
'0.1.8' => sub { |
|
138
|
|
|
|
|
|
|
# 0.1.8 adds a meta table for version info |
|
139
|
0
|
|
|
0
|
|
0
|
_do_schema($dbh, META_SCHEMA); |
|
140
|
0
|
|
|
|
|
0
|
$dbh->do(q{INSERT INTO meta (key, value) VALUES ('version', '0.1.8')}); |
|
141
|
|
|
|
|
|
|
|
|
142
|
|
|
|
|
|
|
# SQLite doesn't have a syntax for modifying column types on primary |
|
143
|
|
|
|
|
|
|
# keys, and 1.8->1.9 made message_id a text field. |
|
144
|
|
|
|
|
|
|
|
|
145
|
|
|
|
|
|
|
# Rename old table and create new one |
|
146
|
0
|
|
|
|
|
0
|
$dbh->do('ALTER TABLE messages RENAME TO old_messages'); |
|
147
|
0
|
|
|
|
|
0
|
_do_schema($dbh, MESSAGES_SCHEMA_018); |
|
148
|
|
|
|
|
|
|
|
|
149
|
|
|
|
|
|
|
# Dump old table into new table |
|
150
|
0
|
|
|
|
|
0
|
my $columns = q{ |
|
151
|
|
|
|
|
|
|
message_id, destination, persistent, |
|
152
|
|
|
|
|
|
|
in_use_by, body, timestamp, size |
|
153
|
|
|
|
|
|
|
}; |
|
154
|
|
|
|
|
|
|
|
|
155
|
0
|
|
|
|
|
0
|
$dbh->do(qq{ |
|
156
|
|
|
|
|
|
|
INSERT INTO messages ( $columns ) |
|
157
|
|
|
|
|
|
|
SELECT $columns FROM old_messages |
|
158
|
|
|
|
|
|
|
}); |
|
159
|
|
|
|
|
|
|
|
|
160
|
|
|
|
|
|
|
# Delete old table |
|
161
|
0
|
|
|
|
|
0
|
$dbh->do('DROP TABLE old_messages'); |
|
162
|
|
|
|
|
|
|
}, |
|
163
|
|
|
|
|
|
|
'0.2.3' => sub { |
|
164
|
|
|
|
|
|
|
# we add the deliver_at column |
|
165
|
0
|
|
|
0
|
|
0
|
$dbh->do("ALTER TABLE messages ADD COLUMN deliver_at INT"); |
|
166
|
0
|
|
|
|
|
0
|
$dbh->do("CREATE INDEX deliver_at_index ON messages ( deliver_at )"); |
|
167
|
|
|
|
|
|
|
|
|
168
|
|
|
|
|
|
|
# updated the version |
|
169
|
0
|
|
|
|
|
0
|
$dbh->do("UPDATE meta SET value = '0.2.3' where key = 'version'"); |
|
170
|
|
|
|
|
|
|
|
|
171
|
|
|
|
|
|
|
# databases created with 0.1.8 or later, didn't correctly add the indexes |
|
172
|
|
|
|
|
|
|
# to the table (because it feeds MESSAGE_SCHEMA as a single statement to |
|
173
|
|
|
|
|
|
|
# $db->do() rather than breaking it up); |
|
174
|
0
|
|
|
|
|
0
|
my $indices = { |
|
175
|
|
|
|
|
|
|
id_index => "message_id", |
|
176
|
|
|
|
|
|
|
timestamp_index => "timestamp", |
|
177
|
|
|
|
|
|
|
destination_index => "destination", |
|
178
|
|
|
|
|
|
|
in_use_by_index => "in_use_by" |
|
179
|
|
|
|
|
|
|
}; |
|
180
|
0
|
|
|
|
|
0
|
while (my ($name, $column) = each %$indices) |
|
181
|
|
|
|
|
|
|
{ |
|
182
|
|
|
|
|
|
|
eval |
|
183
|
0
|
|
|
|
|
0
|
{ |
|
184
|
0
|
|
|
|
|
0
|
$dbh->do("CREATE INDEX $name ON messages ( $column )"); |
|
185
|
|
|
|
|
|
|
}; |
|
186
|
|
|
|
|
|
|
} |
|
187
|
|
|
|
|
|
|
}, |
|
188
|
|
|
|
|
|
|
'0.2.9' => sub { |
|
189
|
|
|
|
|
|
|
# NOTE: Here we *would* change timestamp from INT to DECIMAL(15,5) but |
|
190
|
|
|
|
|
|
|
# not only is that not possible via SQLite3's ALTER statement, but it makes |
|
191
|
|
|
|
|
|
|
# no difference what so ever in SQLite3. |
|
192
|
|
|
|
|
|
|
|
|
193
|
|
|
|
|
|
|
# update the version |
|
194
|
0
|
|
|
0
|
|
0
|
$dbh->do("UPDATE meta SET value = '0.2.9' where key = 'version'"); |
|
195
|
|
|
|
|
|
|
}, |
|
196
|
|
|
|
|
|
|
'0.2.10' => sub { |
|
197
|
|
|
|
|
|
|
# NOTE: Here we *would* change in_use_by from INT to VARCHAR(255) but |
|
198
|
|
|
|
|
|
|
# not only is that not possible via SQLite3's ALTER statement, but it makes |
|
199
|
|
|
|
|
|
|
# no difference what so ever in SQLite3. |
|
200
|
|
|
|
|
|
|
|
|
201
|
|
|
|
|
|
|
# update the version |
|
202
|
0
|
|
|
0
|
|
0
|
$dbh->do("UPDATE meta SET value = '0.2.10' where key = 'version'"); |
|
203
|
|
|
|
|
|
|
} |
|
204
|
0
|
|
|
|
|
0
|
); |
|
205
|
|
|
|
|
|
|
|
|
206
|
0
|
|
|
|
|
0
|
my $do_repairs = 0; |
|
207
|
0
|
|
|
|
|
0
|
foreach my $ver (@versions) |
|
208
|
|
|
|
|
|
|
{ |
|
209
|
0
|
0
|
|
|
|
0
|
unless ($do_repairs) |
|
210
|
|
|
|
|
|
|
{ |
|
211
|
0
|
|
|
|
|
0
|
my $success = $tests{$ver}->(); |
|
212
|
0
|
0
|
|
|
|
0
|
unless ($success) |
|
213
|
|
|
|
|
|
|
{ |
|
214
|
0
|
|
|
|
|
0
|
$dbh->begin_work(); |
|
215
|
0
|
|
|
|
|
0
|
print STDERR "WARNING: User database is older than $ver.\n"; |
|
216
|
0
|
|
|
|
|
0
|
print STDERR "WARNING: Performing in-place upgrade..."; |
|
217
|
0
|
|
|
|
|
0
|
$do_repairs = 1; |
|
218
|
|
|
|
|
|
|
} |
|
219
|
|
|
|
|
|
|
} |
|
220
|
|
|
|
|
|
|
|
|
221
|
0
|
0
|
|
|
|
0
|
if ($do_repairs) |
|
222
|
|
|
|
|
|
|
{ |
|
223
|
0
|
|
|
|
|
0
|
eval { $repairs{$ver}->() }; |
|
|
0
|
|
|
|
|
0
|
|
|
224
|
0
|
0
|
|
|
|
0
|
if ($@) |
|
225
|
|
|
|
|
|
|
{ |
|
226
|
0
|
|
|
|
|
0
|
$dbh->rollback(); |
|
227
|
0
|
|
|
|
|
0
|
die "encountered errors: $@: rolling back.\n"; |
|
228
|
|
|
|
|
|
|
} |
|
229
|
|
|
|
|
|
|
} |
|
230
|
|
|
|
|
|
|
} |
|
231
|
0
|
0
|
|
|
|
0
|
if ($do_repairs) |
|
232
|
|
|
|
|
|
|
{ |
|
233
|
0
|
|
|
|
|
0
|
$dbh->commit(); |
|
234
|
0
|
|
|
|
|
0
|
print STDERR "upgrade complete.\n"; |
|
235
|
|
|
|
|
|
|
} |
|
236
|
|
|
|
|
|
|
} |
|
237
|
|
|
|
|
|
|
|
|
238
|
|
|
|
|
|
|
sub _make_db |
|
239
|
|
|
|
|
|
|
{ |
|
240
|
12
|
|
|
12
|
|
42055
|
my ($file, $dsn, $username, $password) = @_; |
|
241
|
12
|
|
|
|
|
213
|
my $db_exists = (-f $file); |
|
242
|
12
|
|
|
|
|
191
|
my $dbh = DBI->connect( |
|
243
|
|
|
|
|
|
|
$dsn, |
|
244
|
|
|
|
|
|
|
$username, |
|
245
|
|
|
|
|
|
|
$password, |
|
246
|
|
|
|
|
|
|
{ RaiseError => 1 } |
|
247
|
|
|
|
|
|
|
); |
|
248
|
|
|
|
|
|
|
|
|
249
|
12
|
50
|
|
|
|
54652
|
if ( $db_exists ) |
|
250
|
|
|
|
|
|
|
{ |
|
251
|
0
|
|
|
|
|
0
|
_upgrade($dbh); |
|
252
|
|
|
|
|
|
|
} |
|
253
|
|
|
|
|
|
|
else |
|
254
|
|
|
|
|
|
|
{ |
|
255
|
12
|
|
|
|
|
91
|
_do_schema($dbh, MESSAGES_SCHEMA); |
|
256
|
12
|
|
|
|
|
282
|
_do_schema($dbh, META_SCHEMA); |
|
257
|
12
|
|
|
|
|
180
|
$dbh->do(q{INSERT INTO meta (key, value) VALUES ('version', '0.2.10')}); |
|
258
|
|
|
|
|
|
|
} |
|
259
|
12
|
|
|
|
|
64006
|
$dbh->disconnect(); |
|
260
|
|
|
|
|
|
|
} |
|
261
|
|
|
|
|
|
|
|
|
262
|
|
|
|
|
|
|
sub new |
|
263
|
|
|
|
|
|
|
{ |
|
264
|
0
|
|
|
0
|
0
|
|
my $class = shift; |
|
265
|
0
|
0
|
|
|
|
|
my $args = (@_ > 1 ? {@_} : $_[0]); |
|
266
|
|
|
|
|
|
|
|
|
267
|
0
|
|
0
|
|
|
|
my $data_dir = $args->{data_dir} || die "No data dir."; |
|
268
|
|
|
|
|
|
|
|
|
269
|
0
|
0
|
0
|
|
|
|
(-d $data_dir) || |
|
270
|
|
|
|
|
|
|
mkdir $data_dir || |
|
271
|
|
|
|
|
|
|
die "Couldn't make data dir '$data_dir': $!"; |
|
272
|
|
|
|
|
|
|
|
|
273
|
0
|
|
|
|
|
|
my $db_file = "$data_dir/mq.db"; |
|
274
|
0
|
|
|
|
|
|
my $db_dsn = "DBI:SQLite:dbname=$db_file"; |
|
275
|
0
|
|
|
|
|
|
my $db_username = q(); |
|
276
|
0
|
|
|
|
|
|
my $db_password = q(); |
|
277
|
|
|
|
|
|
|
|
|
278
|
0
|
|
|
|
|
|
_make_db($db_file, $db_dsn, $db_username, $db_password); |
|
279
|
|
|
|
|
|
|
|
|
280
|
0
|
|
|
|
|
|
my $dbi = POE::Component::MessageQueue::Storage::DBI->new( |
|
281
|
|
|
|
|
|
|
dsn => $db_dsn, |
|
282
|
|
|
|
|
|
|
username => $db_username, |
|
283
|
|
|
|
|
|
|
password => $db_password, |
|
284
|
|
|
|
|
|
|
); |
|
285
|
|
|
|
|
|
|
|
|
286
|
0
|
|
|
|
|
|
my $fs = POE::Component::MessageQueue::Storage::FileSystem->new( |
|
287
|
|
|
|
|
|
|
info_storage => $dbi, |
|
288
|
|
|
|
|
|
|
data_dir => $data_dir, |
|
289
|
|
|
|
|
|
|
); |
|
290
|
|
|
|
|
|
|
|
|
291
|
|
|
|
|
|
|
my $throttled = POE::Component::MessageQueue::Storage::Throttled->new( |
|
292
|
|
|
|
|
|
|
back => $fs, |
|
293
|
0
|
|
0
|
|
|
|
throttle_max => $args->{throttle_max} || 2, |
|
294
|
|
|
|
|
|
|
); |
|
295
|
|
|
|
|
|
|
|
|
296
|
|
|
|
|
|
|
# We don't bless anything because we're just returning a Complex... |
|
297
|
|
|
|
|
|
|
return POE::Component::MessageQueue::Storage::Complex->new( |
|
298
|
|
|
|
|
|
|
timeout => $args->{timeout} || 4, |
|
299
|
|
|
|
|
|
|
granularity => $args->{granularity} || 2, |
|
300
|
|
|
|
|
|
|
front_max => $args->{front_max} || 64 * 1024 * 1024, |
|
301
|
|
|
|
|
|
|
front => $args->{front} || $args->{front_store} || |
|
302
|
0
|
|
0
|
|
|
|
POE::Component::MessageQueue::Storage::BigMemory->new(), |
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
303
|
|
|
|
|
|
|
back => $throttled, |
|
304
|
|
|
|
|
|
|
); |
|
305
|
|
|
|
|
|
|
} |
|
306
|
|
|
|
|
|
|
|
|
307
|
|
|
|
|
|
|
1; |
|
308
|
|
|
|
|
|
|
|
|
309
|
|
|
|
|
|
|
=pod |
|
310
|
|
|
|
|
|
|
|
|
311
|
|
|
|
|
|
|
=head1 NAME |
|
312
|
|
|
|
|
|
|
|
|
313
|
|
|
|
|
|
|
POE::Component::MessageQueue::Storage::Default -- The default storage engine (based on Complex), recommended for the most common case and used by mq.pl. |
|
314
|
|
|
|
|
|
|
|
|
315
|
|
|
|
|
|
|
=head1 SYNOPSIS |
|
316
|
|
|
|
|
|
|
|
|
317
|
|
|
|
|
|
|
use POE; |
|
318
|
|
|
|
|
|
|
use POE::Component::MessageQueue; |
|
319
|
|
|
|
|
|
|
use POE::Component::MessageQueue::Storage::Default; |
|
320
|
|
|
|
|
|
|
use strict; |
|
321
|
|
|
|
|
|
|
|
|
322
|
|
|
|
|
|
|
my $DATA_DIR = '/tmp/perl_mq'; |
|
323
|
|
|
|
|
|
|
|
|
324
|
|
|
|
|
|
|
POE::Component::MessageQueue->new({ |
|
325
|
|
|
|
|
|
|
storage => POE::Component::MessageQueue::Storage::Default->new({ |
|
326
|
|
|
|
|
|
|
data_dir => $DATA_DIR, |
|
327
|
|
|
|
|
|
|
timeout => 4, |
|
328
|
|
|
|
|
|
|
throttle_max => 2, |
|
329
|
|
|
|
|
|
|
|
|
330
|
|
|
|
|
|
|
# Alternative memory store available! |
|
331
|
|
|
|
|
|
|
#front => POE::Component::MessageQueue::Storage::BigMemory->new(), |
|
332
|
|
|
|
|
|
|
}) |
|
333
|
|
|
|
|
|
|
}); |
|
334
|
|
|
|
|
|
|
|
|
335
|
|
|
|
|
|
|
POE::Kernel->run(); |
|
336
|
|
|
|
|
|
|
exit; |
|
337
|
|
|
|
|
|
|
|
|
338
|
|
|
|
|
|
|
=head1 DESCRIPTION |
|
339
|
|
|
|
|
|
|
|
|
340
|
|
|
|
|
|
|
This storage engine combines all the other provided engines. It uses |
|
341
|
|
|
|
|
|
|
L<POE::Component::MessageQueue::Storage::BigMemory> as the front store and |
|
342
|
|
|
|
|
|
|
L<POE::Component::MessageQueue::Storage::FileSystem> as the back store |
|
343
|
|
|
|
|
|
|
for L<POE::Componenet::MessageQueue::Storage::Complex> and provides some other |
|
344
|
|
|
|
|
|
|
sensible and recommended defaults, though you can override them in most cases. |
|
345
|
|
|
|
|
|
|
Message are initially put into the front-end storage and will be moved into the |
|
346
|
|
|
|
|
|
|
backend storage after a given number of seconds (defaults to 4). |
|
347
|
|
|
|
|
|
|
|
|
348
|
|
|
|
|
|
|
The L<POE::Component::MessageQueue::Storage::FileSystem> component used |
|
349
|
|
|
|
|
|
|
internally uses L<POE::Component::MessageQueue::Storage::DBI> with a |
|
350
|
|
|
|
|
|
|
L<DBD::SQLite> database. It is also throttled via |
|
351
|
|
|
|
|
|
|
L<POE::Component::MessageQueue::Storage::Throttled>. |
|
352
|
|
|
|
|
|
|
|
|
353
|
|
|
|
|
|
|
This is the recommended storage engine. It should provide the best performance |
|
354
|
|
|
|
|
|
|
while (if configured sanely) still providing a reasonable amount of persistence |
|
355
|
|
|
|
|
|
|
with little risk of eating all your memory under high load. This is also the |
|
356
|
|
|
|
|
|
|
only storage backend to correctly honor the persistent flag and will only |
|
357
|
|
|
|
|
|
|
persist those messages with it set. |
|
358
|
|
|
|
|
|
|
|
|
359
|
|
|
|
|
|
|
=head1 CONSTRUCTOR PARAMETERS |
|
360
|
|
|
|
|
|
|
|
|
361
|
|
|
|
|
|
|
=over 2 |
|
362
|
|
|
|
|
|
|
|
|
363
|
|
|
|
|
|
|
=item timeout => SCALAR |
|
364
|
|
|
|
|
|
|
|
|
365
|
|
|
|
|
|
|
The number of seconds after a message enters the front-store before it |
|
366
|
|
|
|
|
|
|
expires. After this time, if the message hasn't been removed, it will be |
|
367
|
|
|
|
|
|
|
moved into the backstore. |
|
368
|
|
|
|
|
|
|
|
|
369
|
|
|
|
|
|
|
=item granularity => SCALAR |
|
370
|
|
|
|
|
|
|
|
|
371
|
|
|
|
|
|
|
The number of seconds to wait between checks for timeout expiration. |
|
372
|
|
|
|
|
|
|
|
|
373
|
|
|
|
|
|
|
=item data_dir => SCALAR |
|
374
|
|
|
|
|
|
|
|
|
375
|
|
|
|
|
|
|
The directory to store the SQLite database file and the message bodies. |
|
376
|
|
|
|
|
|
|
|
|
377
|
|
|
|
|
|
|
=item throttle_max => SCALAR |
|
378
|
|
|
|
|
|
|
|
|
379
|
|
|
|
|
|
|
The max number of messages that can be sent to the DBI store at once. |
|
380
|
|
|
|
|
|
|
This value is passed directly to the underlying |
|
381
|
|
|
|
|
|
|
L<POE::Component::MessageQueue::Storage::Throttled>. |
|
382
|
|
|
|
|
|
|
|
|
383
|
|
|
|
|
|
|
=item front_max => SCALAR |
|
384
|
|
|
|
|
|
|
|
|
385
|
|
|
|
|
|
|
The maximum number of bytes to allow the front store to grow to. If the front |
|
386
|
|
|
|
|
|
|
store grows to big, old messages will be "pushed off" to make room for new |
|
387
|
|
|
|
|
|
|
messages. |
|
388
|
|
|
|
|
|
|
|
|
389
|
|
|
|
|
|
|
=item front => SCALAR |
|
390
|
|
|
|
|
|
|
|
|
391
|
|
|
|
|
|
|
An optional reference to a storage engine to use as the front store instead of |
|
392
|
|
|
|
|
|
|
L<POE::Component::MessageQueue::Storage::BigMemory>. |
|
393
|
|
|
|
|
|
|
|
|
394
|
|
|
|
|
|
|
=back |
|
395
|
|
|
|
|
|
|
|
|
396
|
|
|
|
|
|
|
=head1 SUPPORTED STOMP HEADERS |
|
397
|
|
|
|
|
|
|
|
|
398
|
|
|
|
|
|
|
Same as L<POE::Component::MessageQueue::Storage::Complex>. |
|
399
|
|
|
|
|
|
|
|
|
400
|
|
|
|
|
|
|
=head1 SEE ALSO |
|
401
|
|
|
|
|
|
|
|
|
402
|
|
|
|
|
|
|
L<POE::Component::MessageQueue>, |
|
403
|
|
|
|
|
|
|
L<POE::Component::MessageQueue::Storage>, |
|
404
|
|
|
|
|
|
|
L<DBI>, |
|
405
|
|
|
|
|
|
|
L<DBD::SQLite> |
|
406
|
|
|
|
|
|
|
|
|
407
|
|
|
|
|
|
|
I<Other storage engines:> |
|
408
|
|
|
|
|
|
|
|
|
409
|
|
|
|
|
|
|
L<POE::Component::MessageQueue::Storage::Memory>, |
|
410
|
|
|
|
|
|
|
L<POE::Component::MessageQueue::Storage::BigMemory>, |
|
411
|
|
|
|
|
|
|
L<POE::Component::MessageQueue::Storage::FileSystem>, |
|
412
|
|
|
|
|
|
|
L<POE::Component::MessageQueue::Storage::DBI>, |
|
413
|
|
|
|
|
|
|
L<POE::Component::MessageQueue::Storage::Generic>, |
|
414
|
|
|
|
|
|
|
L<POE::Component::MessageQueue::Storage::Generic::DBI>, |
|
415
|
|
|
|
|
|
|
L<POE::Component::MessageQueue::Storage::Throttled>, |
|
416
|
|
|
|
|
|
|
L<POE::Component::MessageQueue::Storage::Complex> |
|
417
|
|
|
|
|
|
|
|