File Coverage

blib/lib/POE/Component/MessageQueue/Storage/Throttled.pm
Criterion Covered Total %
statement 44 44 100.0
branch 8 8 100.0
condition 2 3 66.6
subroutine 10 10 100.0
pod 0 3 0.0
total 64 68 94.1


line stmt bran cond sub pod time code
1             #
2             # Copyright 2007-2010 David Snopek <dsnopek@gmail.com>
3             #
4             # This program is free software: you can redistribute it and/or modify
5             # it under the terms of the GNU General Public License as published by
6             # the Free Software Foundation, either version 2 of the License, or
7             # (at your option) any later version.
8             #
9             # This program is distributed in the hope that it will be useful,
10             # but WITHOUT ANY WARRANTY; without even the implied warranty of
11             # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12             # GNU General Public License for more details.
13             #
14             # You should have received a copy of the GNU General Public License
15             # along with this program. If not, see <http://www.gnu.org/licenses/>.
16             #
17              
18             package POE::Component::MessageQueue::Storage::Throttled;
19 11     11   1796 use Moose;
  11         381685  
  11         122  
20 11     11   70733 use POE;
  11         30  
  11         93  
21              
22             with qw(POE::Component::MessageQueue::Storage::Double);
23              
24             has throttle_max => (
25             is => 'ro',
26             isa => 'Int',
27             default => 2,
28             );
29              
30             has sent => (
31             is => 'ro',
32             isa => 'Num',
33             default => 0,
34             traits => ['Counter'],
35             handles => {
36             'inc_sent' => 'inc',
37             'dec_sent' => 'dec',
38             'reset_sent' => 'reset',
39             }
40             );
41              
42             has throttle_count => (
43             is => 'ro',
44             isa => 'Num',
45             default => 0,
46             traits => ['Counter'],
47             handles => {
48             'inc_throttle_count' => 'inc',
49             'dec_throttle_count' => 'dec',
50             'reset_throttle_count' => 'reset',
51             }
52             );
53              
54             has shutdown_callback => (
55             is => 'rw',
56             isa => 'CodeRef',
57             clearer => 'stop_shutdown',
58             predicate => 'shutting_down',
59             );
60              
61             sub BUILD
62             {
63 3     3 0 4899 my $self = shift;
64 3         153 $self->children({THROTTLED => $self->front, STORAGE => $self->back});
65 3         34 $self->add_names('THROTTLED');
66             }
67              
68             sub _backstore_ready
69             {
70 389     389   1032 my $self = $_[0];
71              
72             # Send the next throttled message off to the backing store.
73             $self->front->get_oldest(sub {
74 389 100   389   1491 if (my $msg = $_[0])
75             {
76 186         5782 my $id = $msg->id;
77              
78 186         7651 $self->dec_throttle_count;
79 186         6335 my $tc = $self->throttle_count;
80 186         1434 $self->log(info => "Sending throttled message $id ($tc left)");
81              
82 186         7857 $self->delete_front($id);
83             $self->front->remove($id, sub {
84             $self->back->store($msg, sub {
85 186         688498 @_ = ($self);
86 186         815 goto &_backstore_ready;
87 186         6275 });
88 186         5863 });
89             }
90             else
91             {
92 203         9423 $self->dec_sent();
93 203         806 $self->_shutdown_throttle_check();
94             }
95 389         16062 });
96             }
97              
98             before remove => sub {
99             my ($self, $ids) = @_;
100             $ids = [$ids] unless (ref $ids eq 'ARRAY');
101             foreach my $id (@$ids)
102             {
103             $self->dec_throttle_count if $self->in_front($id);
104             }
105             };
106              
107             sub store
108             {
109 389     389 0 142512 my ($self, $message, $callback) = @_;
110 389 100       15812 if ($self->sent < $self->throttle_max)
111             {
112 203         8739 $self->inc_sent();
113             $self->back->store($message, sub {
114             # Do not tail call: the message is stored, but we want to do things
115             # after we satisfy the callback.
116 203 100   203   2095530 $callback->() if $callback;
117 203         1185 $self->_backstore_ready();
118 203         7502 });
119             }
120             else
121             {
122 186         5145 $self->set_front($message->id, {persisted => 0});
123 186         6008 $self->front->store($message, $callback);
124 186         7456 $self->inc_throttle_count;
125 186         5965 my $tc = $self->throttle_count;
126              
127 186         817 $self->log(debug => "$tc messages throttled");
128             }
129             }
130              
131             sub _shutdown_throttle_check
132             {
133 206     206   563 my $self = shift;
134 206 100 66     8516 if ($self->shutting_down && $self->throttle_count == 0)
135             {
136             # We have now finished sending things out of throttled, so -WE- are done.
137             # However, we'll still get message_storeds as our backstore finishes, and
138             # we don't want to continue calling shutdown_callback.
139 3         130 my $callback = $self->shutdown_callback;
140 3         141 $self->stop_shutdown();
141 3         11 goto $callback;
142             }
143             }
144              
145             sub storage_shutdown
146             {
147 3     3 0 1074 my ($self, $complete) = @_;
148             $self->shutdown_callback(sub {
149             $self->front->storage_shutdown(sub {
150 3         142 $self->back->storage_shutdown($complete);
151 3     3   120 });
152 3         169 });
153              
154 3         16 $self->_shutdown_throttle_check();
155             }
156              
157             1;
158              
159             __END__
160              
161             =pod
162              
163             =head1 NAME
164              
165             POE::Component::MessageQueue::Storage::Throttled -- Wraps around another storage engine to throttle the number of messages sent to be stored at one time.
166              
167             =head1 SYNOPSIS
168              
169             use POE;
170             use POE::Component::MessageQueue;
171             use POE::Component::MessageQueue::Storage::Throttled;
172             use POE::Component::MessageQueue::Storage::DBI;
173             use strict;
174              
175             my $DATA_DIR = '/tmp/perl_mq';
176              
177             POE::Component::MessageQueue->new({
178             storage => POE::Component::MessageQueue::Storage::Throttled->new({
179             storage => POE::Component::MessageQueue::Storage::DBI->new({
180             dsn => $DB_DSN,
181             username => $DB_USERNAME,
182             password => $DB_PASSWORD,
183             }),
184             throttle_max => 2
185             }),
186             });
187              
188             POE::Kernel->run();
189             exit;
190              
191             =head1 DESCRIPTION
192              
193             Wraps around another engine to limit the number of messages sent to be stored at once.
194              
195             Use of this module is B<highly> recommend!
196              
197             If the storage engine is unable to store the messages fast enough (ie. with slow disk IO) it can get really backed up and stall messages coming out of the queue. This allows a client producing execessive amounts of messages to basically monopolize the server, preventing any messages from getting distributed to subscribers.
198              
199             It is suggested to keep the throttle_max very low. In an ideal situation, the underlying storage engine would be able to write each message immediately. This means that there will never be more than one message sent to be stored at a time. The purpose of this module is make the message act as though this were the case even if it isn't. So, a throttle_max of 1, will strictly enforce this, however, for a little bit of leniancy, the suggested default is 2.
200              
201             =head1 CONSTRUCTOR PARAMETERS
202              
203             =over 2
204              
205             =item storage => L<POE::Component::MessageQueue::Storage>
206              
207             The storage engine to wrap.
208              
209             =item throttle_max => SCALAR
210              
211             The max number of messages that can be sent to the DBI store at one time.
212              
213             =back
214              
215             =head1 SUPPORTED STOMP HEADERS
216              
217             Ignored. Passed through to the wrapped storage engine.
218              
219             =head1 SEE ALSO
220              
221             L<POE::Component::MessageQueue>,
222             L<POE::Component::MessageQueue::Storage>,
223             L<POE::Component::MessageQueue::Storage::Double>
224              
225             I<Other storage engines:>
226              
227             L<POE::Component::MessageQueue::Storage::Memory>,
228             L<POE::Component::MessageQueue::Storage::BigMemory>,
229             L<POE::Component::MessageQueue::Storage::FileSystem>,
230             L<POE::Component::MessageQueue::Storage::DBI>,
231             L<POE::Component::MessageQueue::Storage::Generic>,
232             L<POE::Component::MessageQueue::Storage::Generic::DBI>,
233             L<POE::Component::MessageQueue::Storage::Complex>,
234             L<POE::Component::MessageQueue::Storage::Default>
235              
236             =cut
237