File Coverage

blib/lib/POE/Component/MessageQueue/Storage/Throttled.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             #
2             # Copyright 2007-2010 David Snopek <dsnopek@gmail.com>
3             #
4             # This program is free software: you can redistribute it and/or modify
5             # it under the terms of the GNU General Public License as published by
6             # the Free Software Foundation, either version 2 of the License, or
7             # (at your option) any later version.
8             #
9             # This program is distributed in the hope that it will be useful,
10             # but WITHOUT ANY WARRANTY; without even the implied warranty of
11             # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12             # GNU General Public License for more details.
13             #
14             # You should have received a copy of the GNU General Public License
15             # along with this program. If not, see <http://www.gnu.org/licenses/>.
16             #
17              
18             package POE::Component::MessageQueue::Storage::Throttled;
19 1     1   1907 use Moose;
  0            
  0            
20             use POE;
21              
22             with qw(POE::Component::MessageQueue::Storage::Double);
23              
24             has throttle_max => (
25             is => 'ro',
26             isa => 'Int',
27             default => 2,
28             );
29              
30             has sent => (
31             is => 'ro',
32             isa => 'Num',
33             default => 0,
34             traits => ['Counter'],
35             handles => {
36             'inc_sent' => 'inc',
37             'dec_sent' => 'dec',
38             'reset_sent' => 'reset',
39             }
40             );
41              
42             has throttle_count => (
43             is => 'ro',
44             isa => 'Num',
45             default => 0,
46             traits => ['Counter'],
47             handles => {
48             'inc_throttle_count' => 'inc',
49             'dec_throttle_count' => 'dec',
50             'reset_throttle_count' => 'reset',
51             }
52             );
53              
54             has shutdown_callback => (
55             is => 'rw',
56             isa => 'CodeRef',
57             clearer => 'stop_shutdown',
58             predicate => 'shutting_down',
59             );
60              
61             sub BUILD
62             {
63             my $self = shift;
64             $self->children({THROTTLED => $self->front, STORAGE => $self->back});
65             $self->add_names('THROTTLED');
66             }
67              
68             sub _backstore_ready
69             {
70             my $self = $_[0];
71              
72             # Send the next throttled message off to the backing store.
73             $self->front->get_oldest(sub {
74             if (my $msg = $_[0])
75             {
76             my $id = $msg->id;
77              
78             $self->dec_throttle_count;
79             my $tc = $self->throttle_count;
80             $self->log(info => "Sending throttled message $id ($tc left)");
81              
82             $self->delete_front($id);
83             $self->front->remove($id, sub {
84             $self->back->store($msg, sub {
85             @_ = ($self);
86             goto &_backstore_ready;
87             });
88             });
89             }
90             else
91             {
92             $self->dec_sent();
93             $self->_shutdown_throttle_check();
94             }
95             });
96             }
97              
98             before remove => sub {
99             my ($self, $ids) = @_;
100             $ids = [$ids] unless (ref $ids eq 'ARRAY');
101             foreach my $id (@$ids)
102             {
103             $self->dec_throttle_count if $self->in_front($id);
104             }
105             };
106              
107             sub store
108             {
109             my ($self, $message, $callback) = @_;
110             if ($self->sent < $self->throttle_max)
111             {
112             $self->inc_sent();
113             $self->back->store($message, sub {
114             # Do not tail call: the message is stored, but we want to do things
115             # after we satisfy the callback.
116             $callback->() if $callback;
117             $self->_backstore_ready();
118             });
119             }
120             else
121             {
122             $self->set_front($message->id, {persisted => 0});
123             $self->front->store($message, $callback);
124             $self->inc_throttle_count;
125             my $tc = $self->throttle_count;
126              
127             $self->log(debug => "$tc messages throttled");
128             }
129             }
130              
131             sub _shutdown_throttle_check
132             {
133             my $self = shift;
134             if ($self->shutting_down && $self->throttle_count == 0)
135             {
136             # We have now finished sending things out of throttled, so -WE- are done.
137             # However, we'll still get message_storeds as our backstore finishes, and
138             # we don't want to continue calling shutdown_callback.
139             my $callback = $self->shutdown_callback;
140             $self->stop_shutdown();
141             goto $callback;
142             }
143             }
144              
145             sub storage_shutdown
146             {
147             my ($self, $complete) = @_;
148             $self->shutdown_callback(sub {
149             $self->front->storage_shutdown(sub {
150             $self->back->storage_shutdown($complete);
151             });
152             });
153              
154             $self->_shutdown_throttle_check();
155             }
156              
157             1;
158              
159             __END__
160              
161             =pod
162              
163             =head1 NAME
164              
165             POE::Component::MessageQueue::Storage::Throttled -- Wraps around another storage engine to throttle the number of messages sent to be stored at one time.
166              
167             =head1 SYNOPSIS
168              
169             use POE;
170             use POE::Component::MessageQueue;
171             use POE::Component::MessageQueue::Storage::Throttled;
172             use POE::Component::MessageQueue::Storage::DBI;
173             use strict;
174              
175             my $DATA_DIR = '/tmp/perl_mq';
176              
177             POE::Component::MessageQueue->new({
178             storage => POE::Component::MessageQueue::Storage::Throttled->new({
179             storage => POE::Component::MessageQueue::Storage::DBI->new({
180             dsn => $DB_DSN,
181             username => $DB_USERNAME,
182             password => $DB_PASSWORD,
183             }),
184             throttle_max => 2
185             }),
186             });
187              
188             POE::Kernel->run();
189             exit;
190              
191             =head1 DESCRIPTION
192              
193             Wraps around another engine to limit the number of messages sent to be stored at once.
194              
195             Use of this module is B<highly> recommend!
196              
197             If the storage engine is unable to store the messages fast enough (ie. with slow disk IO) it can get really backed up and stall messages coming out of the queue. This allows a client producing execessive amounts of messages to basically monopolize the server, preventing any messages from getting distributed to subscribers.
198              
199             It is suggested to keep the throttle_max very low. In an ideal situation, the underlying storage engine would be able to write each message immediately. This means that there will never be more than one message sent to be stored at a time. The purpose of this module is make the message act as though this were the case even if it isn't. So, a throttle_max of 1, will strictly enforce this, however, for a little bit of leniancy, the suggested default is 2.
200              
201             =head1 CONSTRUCTOR PARAMETERS
202              
203             =over 2
204              
205             =item storage => L<POE::Component::MessageQueue::Storage>
206              
207             The storage engine to wrap.
208              
209             =item throttle_max => SCALAR
210              
211             The max number of messages that can be sent to the DBI store at one time.
212              
213             =back
214              
215             =head1 SUPPORTED STOMP HEADERS
216              
217             Ignored. Passed through to the wrapped storage engine.
218              
219             =head1 SEE ALSO
220              
221             L<POE::Component::MessageQueue>,
222             L<POE::Component::MessageQueue::Storage>,
223             L<POE::Component::MessageQueue::Storage::Double>
224              
225             I<Other storage engines:>
226              
227             L<POE::Component::MessageQueue::Storage::Memory>,
228             L<POE::Component::MessageQueue::Storage::BigMemory>,
229             L<POE::Component::MessageQueue::Storage::FileSystem>,
230             L<POE::Component::MessageQueue::Storage::DBI>,
231             L<POE::Component::MessageQueue::Storage::Generic>,
232             L<POE::Component::MessageQueue::Storage::Generic::DBI>,
233             L<POE::Component::MessageQueue::Storage::Complex>,
234             L<POE::Component::MessageQueue::Storage::Default>
235              
236             =cut
237