File Coverage

blib/lib/POE/Component/MessageQueue.pm
Criterion Covered Total %
statement 148 176 84.0
branch 23 44 52.2
condition 18 34 52.9
subroutine 34 36 94.4
pod 0 7 0.0
total 223 297 75.0


line stmt bran cond sub pod time code
1             #
2             # Copyright 2007-2010 David Snopek <dsnopek@gmail.com>
3             #
4             # This program is free software: you can redistribute it and/or modify
5             # it under the terms of the GNU General Public License as published by
6             # the Free Software Foundation, either version 2 of the License, or
7             # (at your option) any later version.
8             #
9             # This program is distributed in the hope that it will be useful,
10             # but WITHOUT ANY WARRANTY; without even the implied warranty of
11             # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12             # GNU General Public License for more details.
13             #
14             # You should have received a copy of the GNU General Public License
15             # along with this program. If not, see <http://www.gnu.org/licenses/>.
16             #
17              
18             package POE::Component::MessageQueue;
19 11     11   948826 use Moose;
  11         3607633  
  11         80  
20              
21             our $VERSION = '0.3004'; # VERSION
22              
23 11     11   78631 use POE 0.38;
  11         309  
  11         130  
24 11     11   10620 use POE::Component::Server::Stomp;
  11         38  
  11         306  
25 11     11   3863 use POE::Component::MessageQueue::Client;
  11         43  
  11         397  
26 11     11   5414 use POE::Component::MessageQueue::Queue;
  11         47  
  11         422  
27 11     11   6077 use POE::Component::MessageQueue::Topic;
  11         43  
  11         426  
28 11     11   5665 use POE::Component::MessageQueue::Message;
  11         46  
  11         418  
29 11     11   6322 use POE::Component::MessageQueue::IDGenerator::UUID;
  11         45  
  11         421  
30 11     11   1010 use Net::Stomp;
  11         11619  
  11         89  
31 11     11   5348 use Event::Notify;
  11         5385  
  11         319  
32              
33 11     11   75 use constant SHUTDOWN_SIGNALS => ('TERM', 'HUP', 'INT');
  11         25  
  11         22775  
34              
35             has alias => (
36             is => 'ro',
37             default => 'MQ',
38             );
39              
40 8     8 0 277 sub master_alias { $_[0]->alias.'-master' }
41              
42             has logger => (
43             is => 'ro',
44             lazy => 1,
45             default => sub {
46             my $self = shift;
47             POE::Component::MessageQueue::Logger->new(
48             logger_alias => $self->logger_alias
49             );
50             },
51             handles => [qw(log)],
52             );
53              
54             has notifier => (
55             is => 'ro',
56             default => sub { Event::Notify->new() },
57             handles => [qw(notify register_event unregister_event)],
58             );
59              
60             has idgen => (
61             is => 'ro',
62             default => sub { POE::Component::MessageQueue::IDGenerator::UUID->new() },
63             handles => { generate_id => 'generate' },
64             );
65              
66             has observers => (is => 'ro');
67             has logger_alias => (is => 'ro');
68              
69             has storage => (
70             is => 'ro',
71             does => 'POE::Component::MessageQueue::Storage',
72             required => 1,
73             );
74              
75             has clients => (
76             isa => 'HashRef[POE::Component::MessageQueue::Client]',
77             default => sub { {} },
78             traits => ['Hash'],
79             handles => {
80             'get_client' => 'get',
81             'remove_client' => 'delete',
82             'set_client' => 'set',
83             'all_client_ids' => 'keys',
84             }
85             );
86              
87             has shutdown_count => (
88             is => 'ro',
89             isa => 'Num',
90             default => 0,
91             traits => ['Counter'],
92             handles => {
93             'inc_shutdown_count' => 'inc',
94             'dec_shutdown_count' => 'dec',
95             'reset_shutdown_count' => 'reset',
96             }
97             );
98              
99             has message_class => (
100             is => 'ro',
101             isa => 'ClassName',
102             default => 'POE::Component::MessageQueue::Message',
103             );
104              
105             has pump_frequency => (
106             is => 'ro',
107             isa => 'Maybe[Num]',
108             default => 0,
109             );
110              
111             before remove_client => sub {
112             my ($self, @ids) = @_;
113              
114             if (my @clients = grep { $_ } map { $self->get_client($_) } @ids)
115             {
116             my $client_str = @clients > 1
117             ? 'clients (' . join(', ', map { $_->id } @clients) . ')'
118             : 'client ' . $clients[0]->id;
119              
120             $self->log(notice => "MASTER: Removing $client_str");
121              
122             foreach my $c (@clients)
123             {
124             my @destinations = map { $_->destination } $c->all_subscriptions;
125              
126             $c->unsubscribe($_) foreach @destinations;
127              
128             if ($self->shutdown_count == 0)
129             {
130             $self->storage->disown_all($c->id,
131             sub { $_->pump() foreach @destinations });
132             }
133              
134             $c->shutdown();
135             }
136             }
137             };
138            
139             has destinations => (
140             isa => 'HashRef[POE::Component::MessageQueue::Destination]',
141             default => sub { {} },
142             traits => ['Hash'],
143             handles => {
144             'get_destination' => 'get',
145             'set_destination' => 'set',
146             'all_destinations' => 'values',
147             }
148             );
149              
150             has owners => (
151             isa => 'HashRef[POE::Component::MessageQueue::Subscription]',
152             default => sub { {} },
153             traits => ['Hash'],
154             handles => {
155             'get_owner' => 'get',
156             'set_owner' => 'set',
157             'delete_owner' => 'delete',
158             },
159             );
160              
161             sub BUILD
162             {
163 4     4 0 8295 my ($self, $args) = @_;
164              
165 4         160 my $observers = $self->observers;
166 4 50       21 if ($observers)
167             {
168 0         0 $_->register($self) for (@$observers);
169             }
170              
171 4         125 $self->storage->set_logger($self->logger);
172              
173             POE::Component::Server::Stomp->new(
174             Alias => $self->alias,
175             Address => $args->{address},
176             Hostname => $args->{hostname},
177             Port => $args->{port},
178             Domain => $args->{domain},
179              
180 275     275   1127 HandleFrame => sub { $self->_handle_frame(@_) },
181 10     10   18575 ClientDisconnected => sub { $self->_client_disconnected(@_) },
182 1     1   10002886 ClientError => sub { $self->_client_error(@_) },
183 4         116 );
184              
185             # a custom session for non-STOMP responsive tasks
186 4         32 POE::Session->create(
187             object_states => [ $self => [qw(_start _shutdown)] ],
188             );
189             }
190              
191             sub _start
192             {
193 4     4   794 my ($self, $kernel) = @_[OBJECT, KERNEL ];
194 4         26 $kernel->alias_set($self->master_alias);
195              
196             # install signal handlers to initiate graceful shutdown.
197             # We only respond to user-type signals - crash signals like
198             # SEGV and BUS should behave normally
199 4         172 foreach my $signal ( SHUTDOWN_SIGNALS )
200             {
201 12         573 $kernel->sig($signal => '_shutdown');
202             }
203             }
204              
205             sub make_destination
206             {
207 4     4 0 14 my ($self, $name) = @_;
208 4         28 my @args = (name => $name, parent => $self);
209 4         11 my $dest;
210              
211 4 50       48 if ($name =~ m{/queue/})
    0          
212             {
213 4         212 $dest = POE::Component::MessageQueue::Queue->new(@args);
214             }
215             elsif ($name =~ m{/topic/})
216             {
217 0         0 $dest = POE::Component::MessageQueue::Topic->new(@args);
218             }
219              
220 4 50       175 $self->set_destination($name => $dest) if $dest;
221 4         27 return $dest;
222             }
223              
224             sub _handle_frame
225             {
226 275     275   595 my $self = shift;
227 275         732 my ($kernel, $heap, $frame) = @_[ KERNEL, HEAP, ARG0 ];
228              
229 275 50       10857 if ($self->shutdown_count)
230             {
231 0         0 $kernel->yield('shutdown');
232 0         0 return;
233             }
234              
235 275         1186 my $id = $kernel->get_active_session()->ID();
236              
237 275         11533 my $client = $self->get_client($id);
238 275 100       885 unless ($client)
239             {
240 10         980 $client = POE::Component::MessageQueue::Client->new(id => $id);
241 10         452 $self->set_client($id => $client);
242             }
243              
244 275         1106 $self->route_frame($client, $frame);
245             }
246              
247             sub _client_disconnected
248             {
249 10     10   28 my $self = shift;
250 10         33 my ($kernel, $heap) = @_[ KERNEL, HEAP ];
251              
252 10         41 my $id = $kernel->get_active_session()->ID();
253 10         90 $self->remove_client($id);
254             }
255              
256             sub _client_error
257             {
258 1     1   4 my $self = shift;
259 1         6 my ($kernel, $name, $number, $message) = @_[ KERNEL, ARG0, ARG1, ARG2 ];
260              
261 1 50 33     11 unless ( $name eq 'read' and $number == 0 ) # Anything but EOF
262             {
263 0         0 $self->log(error => "Client error: $name $number $message" );
264             }
265             }
266              
267             sub _shutdown_complete
268             {
269 4     4   16 my ($self) = @_;
270              
271 4         35 $self->log('alert', 'Storage engine has finished shutting down');
272              
273             # Really, really take us down!
274 4         78 $self->log('alert', 'Sending TERM signal to master sessions');
275 4         146 $poe_kernel->signal( $self->alias, 'TERM' );
276 4         308 $poe_kernel->signal( $self->master_alias, 'TERM' );
277              
278 4         500 $self->log(alert => 'Shutting down all observers');
279 4 50       119 if (my $oref = $self->observers)
280             {
281 0         0 $_->shutdown() foreach (@$oref);
282             }
283              
284 4         22 $self->log(alert => 'Shutting down the logger');
285 4         102 $self->logger->shutdown();
286             }
287              
288             sub route_frame
289             {
290 275     275 0 728 my ($self, $client, $frame) = @_;
291 275         8685 my $cid = $client->id;
292 275         1127 my $destination_name = $frame->headers->{destination};
293              
294             my %handlers = (
295             CONNECT => sub {
296 10   50 10   34 my $login = $frame->headers->{login} || q();
297 10   50     75 my $passcode = $frame->headers->{passcode} || q();
298              
299 10         118 $self->log('notice', "RECV ($cid): CONNECT $login:$passcode");
300 10         54 $client->connect($login, $passcode);
301             },
302              
303             DISCONNECT => sub {
304 9     9   69 $self->log( 'notice', "RECV ($cid): DISCONNECT");
305 9         125 $self->remove_client($cid);
306             },
307              
308             SEND => sub {
309 150   33 150   726 $frame->headers->{'message-id'} ||= $self->generate_id();
310 150         4504 my $message = $self->message_class->from_stomp_frame($frame);
311              
312 150 50 33     5152 if ($message->has_delay() and not $self->pump_frequency)
313             {
314 0         0 $message->clear_delay();
315              
316 0         0 $self->log(warning => "MASTER: Received a message with deliver-after header, but there is no pump-frequency enabled. Ignoring header and delivering with no delay.");
317             }
318              
319 150         4465 $self->log(notice =>
320             sprintf('RECV (%s): SEND message %s (%i bytes) to %s (persistent: %i)',
321             $cid, $message->id, $message->size, $message->destination,
322             $message->persistent));
323              
324              
325 150 50 66     6612 if(my $d = $self->get_destination ($destination_name) ||
326             $self->make_destination($destination_name))
327             {
328 150         1005 $self->notify( 'recv', {
329             destination => $d,
330             message => $message,
331             client => $client,
332             });
333 150         3181 $d->send($message);
334             }
335             else
336             {
337 0         0 $self->log(error => "Don't know how to send to $destination_name");
338             }
339             },
340              
341             SUBSCRIBE => sub {
342 6     6   20 my $ack_type = $frame->headers->{ack};
343              
344 6         57 $self->log('notice',
345             "RECV ($cid): SUBSCRIBE $destination_name (ack: $ack_type)");
346              
347 6 50 66     218 if(my $d = $self->get_destination ($destination_name) ||
348             $self->make_destination($destination_name))
349             {
350 6   66     68 $client->subscribe($d, $ack_type && $ack_type eq 'client');
351 6         72 $self->notify(subscribe => {destination => $d, client => $client});
352 6         169 $d->pump();
353             }
354             else
355             {
356 0         0 $self->log(error => "Don't know how to subscribe to $destination_name");
357             }
358             },
359              
360             UNSUBSCRIBE => sub {
361 0     0   0 $self->log('notice', "RECV ($cid): UNSUBSCRIBE $destination_name");
362 0 0       0 if(my $d = $self->get_destination($destination_name))
363             {
364 0         0 $client->unsubscribe($d);
365             $self->storage->disown_destination($d->name, $client->id,
366 0         0 sub { $d->pump() });
  0         0  
367             }
368             },
369              
370             ACK => sub {
371 100     100   376 my $message_id = $frame->headers->{'message-id'};
372 100         1091 $self->log('notice', "RECV ($cid): ACK - message $message_id");
373 100         433 $self->ack_message($client, $message_id);
374             },
375 275         6212 );
376              
377 275 50       1270 if (my $fn = $handlers{$frame->command})
378             {
379             # Send receipt on anything but a connect
380 275 100 66     2048 if ($frame->command ne 'CONNECT' &&
      100        
381             $frame->headers &&
382             (my $receipt = $frame->headers->{receipt}))
383             {
384 223         5048 $client->send_frame(Net::Stomp::Frame->new({
385             command => 'RECEIPT',
386             headers => {'receipt-id' => $receipt},
387             }));
388             }
389 275         1950 $fn->();
390             }
391             else
392             {
393 0         0 $self->log('error',
394             "ERROR: Don't know how to handle frame: " . $frame->as_string);
395             }
396             }
397              
398             sub ack_message
399             {
400 100     100 0 345 my ($self, $client, $message_id) = @_;
401 100         2975 my $client_id = $client->id;
402              
403 100         3531 my $s = $self->get_owner($message_id);
404 100 50 33     3504 if ($s && $s->client && $s->client->id eq $client_id)
      33        
405             {
406 100         3604 $self->delete_owner($message_id);
407 100         529 $s->ready(1);
408 100         3308 my $d = $s->destination;
409 100         575 $self->notify(remove => $message_id);
410 100     100   4770 $self->storage->remove($message_id, sub {$d->pump()});
  100         642  
411             }
412             else
413             {
414 0         0 $self->log(alert => "DANGER: Client $client_id trying to ACK message ".
415             "$message_id, which he does not own!");
416 0         0 return;
417             }
418             }
419              
420             sub _shutdown
421             {
422 8     8   7528 my ($self, $kernel, $signal) = @_[ OBJECT, KERNEL, ARG0 ];
423 8         55 $self->log('alert', "Got SIG$signal. Shutting down.");
424 8         52 $kernel->sig_handled();
425 8         118 $self->shutdown();
426             }
427              
428             sub shutdown
429             {
430 8     8 0 24 my $self = shift;
431 8         397 $self->inc_shutdown_count;
432 8 100       273 if ($self->shutdown_count > 1)
433             {
434 4 50       134 if ($self->shutdown_count > 2)
435             {
436             # If we handle three shutdown signals, we'll just die. This is handy
437             # during debugging, and no one who wants MQ to shutdown gracefully will
438             # throw 3 kills at us. TODO: Make sure that's true.
439 0         0 my $msg = 'Shutdown called ' . $self->shutdown_count
440             . ' times! Forcing ungraceful quit.';
441 0         0 $self->log('emergency', $msg);
442 0         0 print STDERR "$msg\n";
443 0         0 $poe_kernel->stop();
444             }
445             }
446             else
447             {
448             # First time we were called, so shut things down.
449 4         24 $self->log(alert => 'Initiating message queue shutdown...');
450              
451 4         22 $self->log(alert => 'Shutting down all destinations');
452 4         195 $_->shutdown() foreach $self->all_destinations;
453              
454             # stop listening for connections
455 4         662 $poe_kernel->post( $self->alias => 'shutdown' );
456              
457             # shutdown all client connections
458 4         557 $self->remove_client( $self->all_client_ids );
459              
460             # shutdown the storage
461 4     4   110 $self->storage->storage_shutdown( sub { $self->_shutdown_complete(@_) } );
  4         41  
462             }
463             }
464              
465             sub dispatch_message
466             {
467 130     130 0 1928 my ($self, $msg, $subscriber) = @_;
468 130 50       4054 return if ($self->shutdown_count > 0);
469              
470 130         3919 my $msg_id = $msg->id;
471 130         4105 my $destination = $self->get_destination($msg->destination);
472              
473 130 50       4338 if(my $client = $subscriber->client)
474             {
475 130         4058 my $client_id = $client->id;
476 130 50       682 if ($client->send_frame($msg->create_stomp_frame()))
477             {
478 130         925 $self->log(info => "Dispatching message $msg_id to client $client_id");
479 130 100       4405 if ($subscriber->client_ack)
480             {
481 100         526 $subscriber->ready(0);
482 100         3676 $self->set_owner($msg_id => $subscriber);
483             }
484             else
485             {
486 30         131 $self->notify(remove => $msg_id);
487 30         1297 $self->storage->remove($msg_id);
488             }
489              
490 130         1099 $self->notify(dispatch => {
491             destination => $destination,
492             message => $msg,
493             client => $client,
494             });
495             }
496             else
497             {
498 0           $self->log(warning =>
499             "MASTER: Couldn't send frame to client $client_id: removing.");
500 0           $self->remove_client($client_id);
501             }
502             }
503             else
504             {
505 0           $self->log(warning =>
506             "MASTER: Message $msg_id could not be delivered (no client)");
507 0 0         if ($msg->claimed)
508             {
509             $self->storage->disown_all($msg->claimant,
510 0     0     sub { $destination->pump() });
  0            
511             }
512             }
513             }
514              
515             1;
516              
517             __END__
518              
519             =pod
520              
521             =head1 NAME
522              
523             POE::Component::MessageQueue - A STOMP based message queue server
524              
525             =head1 USAGE
526              
527             If you are only interested in running with the recommended storage backend and
528             some predetermined defaults, you can use the included command line script:
529              
530             POE::Component::MessageQueue version 0.2.12
531             Copyright 2007-2011 David Snopek (http://www.hackyourlife.org)
532             Copyright 2007, 2008 Paul Driver <frodwith@gmail.com>
533             Copyright 2007 Daisuke Maki <daisuke@endeworks.jp>
534              
535             mq.pl [--port|-p <num>] [--hostname|-h <host>]
536             [--front-store <str>] [--front-max <size>]
537             [--granularity <seconds>] [--nouuids]
538             [--timeout|-i <seconds>] [--throttle|-T <count>]
539             [--dbi-dsn <str>] [--mq-id <str>]
540             [--dbi-username <str>] [--dbi-password <str>]
541             [--pump-freq|-Q <seconds>]
542             [--data-dir <path_to_dir>] [--log-conf <path_to_file>]
543             [--stats-interval|-i <seconds>] [--stats]
544             [--pidfile|-p <path_to_file>] [--background|-b]
545             [--crash-cmd <path_to_script>]
546             [--debug-shell] [--version|-v] [--help|-h]
547              
548             SERVER OPTIONS:
549             --port -p <num> The port number to listen on (Default: 61613)
550             --hostname -h <host> The hostname of the interface to listen on
551             (Default: localhost)
552              
553             STORAGE OPTIONS:
554             --storage <str> Specify which overall storage engine to use. This
555             affects what other options are value. (can be
556             default or dbi)
557             --front-store -f <str> Specify which in-memory storage engine to use for
558             the front-store (can be memory or bigmemory).
559             --front-max <size> How much message body the front-store should cache.
560             This size is specified in "human-readable" format
561             as per the -h option of ls, du, etc. (ex. 2.5M)
562             --timeout -i <secs> The number of seconds to keep messages in the
563             front-store (Default: 4)
564             --pump-freq -Q <secs> How often (in seconds) to automatically pump each
565             queue. Set to zero to disable this timer entirely
566             (Default: 0)
567             --granularity <secs> How often (in seconds) Complex should check for
568             messages that have passed the timeout.
569             --[no]uuids Use (or do not use) UUIDs instead of incrementing
570             integers for message IDs. (Default: uuids)
571             --throttle -T <count> The number of messages that can be stored at once
572             before throttling (Default: 2)
573             --data-dir <path> The path to the directory to store data
574             (Default: /var/lib/perl_mq)
575             --log-conf <path> The path to the log configuration file
576             (Default: /etc/perl_mq/log.conf)
577              
578             --dbi-dsn <str> The database DSN when using --storage dbi
579             --dbi-username <str> The database username when using --storage dbi
580             --dbi-password <str> The database password when using --storage dbi
581             --mq-id <str> A string uniquely identifying this MQ when more
582             than one MQ use the DBI database for storage
583              
584             STATISTICS OPTIONS:
585             --stats If specified the, statistics information will be
586             written to $DATA_DIR/stats.yml
587             --stats-interval <secs> Specifies the number of seconds to wait before
588             dumping statistics (Default: 10)
589              
590             DAEMON OPTIONS:
591             --background -b If specified the script will daemonize and run in the
592             background
593             --pidfile -p <path> The path to a file to store the PID of the process
594              
595             --crash-cmd <path> The path to a script to call when crashing.
596             A stacktrace will be printed to the script's STDIN.
597             (ex. 'mail root@localhost')
598              
599             OTHER OPTIONS:
600             --debug-shell Run with POE::Component::DebugShell
601             --version -v Show the current version.
602             --help -h Show this usage message
603              
604             =head1 SYNOPSIS
605              
606             =head2 Subscriber
607              
608             use Net::Stomp;
609            
610             my $stomp = Net::Stomp->new({
611             hostname => 'localhost',
612             port => 61613
613             });
614            
615             # Currently, PoCo::MQ doesn't do any authentication, so you can put
616             # whatever you want as the login and passcode.
617             $stomp->connect({ login => $USERNAME, passcode => $PASSWORD });
618            
619             $stomp->subscribe({
620             destination => '/queue/my_queue.sub_queue',
621             ack => 'client'
622             });
623            
624             while (1)
625             {
626             my $frame = $stomp->receive_frame;
627             print $frame->body . "\n";
628             $stomp->ack({ frame => $frame });
629             }
630            
631             $stomp->disconnect();
632              
633             =head2 Producer
634              
635             use Net::Stomp;
636            
637             my $stomp = Net::Stomp->new({
638             hostname => 'localhost',
639             port => 61613
640             });
641            
642             # Currently, PoCo::MQ doesn't do any authentication, so you can put
643             # whatever you want as the login and passcode.
644             $stomp->connect({ login => $USERNAME, passcode => $PASSWORD });
645            
646             $stomp->send({
647             destination => '/queue/my_queue.sub_queue',
648             body => 'I am a message',
649             persistent => 'true',
650             });
651            
652             $stomp->disconnect();
653              
654             =head2 Server
655              
656             If you want to use a different arrangement of storage engines or to embed PoCo::MQ
657             inside another application, the following synopsis may be useful to you:
658              
659             use POE;
660             use POE::Component::Logger;
661             use POE::Component::MessageQueue;
662             use POE::Component::MessageQueue::Storage::Default;
663             use Socket; # For AF_INET
664             use strict;
665              
666             my $DATA_DIR = '/tmp/perl_mq';
667              
668             # we create a logger, because a production message queue would
669             # really need one.
670             POE::Component::Logger->spawn(
671             ConfigFile => 'log.conf',
672             Alias => 'mq_logger'
673             );
674              
675             POE::Component::MessageQueue->new({
676             port => 61613, # Optional.
677             address => '127.0.0.1', # Optional.
678             hostname => 'localhost', # Optional.
679             domain => AF_INET, # Optional.
680            
681             logger_alias => 'mq_logger', # Optional.
682              
683             # Required!!
684             storage => POE::Component::MessageQueue::Storage::Default->new({
685             data_dir => $DATA_DIR,
686             timeout => 2,
687             throttle_max => 2
688             })
689             });
690              
691             POE::Kernel->run();
692             exit;
693              
694             =head1 DESCRIPTION
695              
696             This module implements a message queue [1] on top of L<POE> that communicates
697             via the STOMP protocol [2].
698              
699             There exist a few good Open Source message queues, most notably ActiveMQ [3] which
700             is written in Java. It provides more features and flexibility than this one (while
701             still implementing the STOMP protocol), however, it was (at the time I last used it)
702             very unstable. With every version there was a different mix of memory leaks, persistence
703             problems, STOMP bugs, and file descriptor leaks. Due to its complexity I was
704             unable to be very helpful in fixing any of these problems, so I wrote this module!
705              
706             This component distinguishes itself in a number of ways:
707              
708             =over 4
709              
710             =item *
711              
712             No OS threads, its asynchronous. (Thanks to L<POE>!)
713              
714             =item *
715              
716             Persistence was a high priority.
717              
718             =item *
719              
720             A strong effort is put to low memory and high performance.
721              
722             =item *
723              
724             Message storage can be provided by a number of different backends.
725              
726             =item *
727              
728             Features to support high-availability and fail-over. (See the L<#HIGH AVAILABILITY> section below)
729              
730             =back
731              
732             =head2 Special STOMP headers
733              
734             You can see the main STOMP documentation here: L<http://stomp.codehaus.org/Protocol>
735              
736             PoCo::MQ implements a number of non-standard STOMP headers:
737              
738             =over 4
739              
740             =item B<persistent>
741              
742             Set to the string "true" to request that a message be persisted. Not setting this header
743             or setting it to any other value, means that a message is non-persistent.
744              
745             Many storage engines ignore the "persistent" header, either persisting all messages or
746             no messages, so be sure to check the documentation for your storage engine.
747              
748             Using the Complex or Default storage engines, persistent messages will always be sent
749             to the back store and non-persistent messages will be discarded eventually.
750              
751             =item B<expire-after>
752              
753             For non-persistent messages, you can set this header to the number of seconds this
754             message must be kept before being discarded. This is ignored for persistent messages.
755              
756             Many storage engines ignore the "expire-after" header, so be sure to check the
757             documentation for your storage engine.
758              
759             Using the Complex or Default storage engines, this header will be honored. If it isn't
760             specified, non-persistent messages are discarded when pushed out of the front store.
761              
762             =item B<deliver-after>
763              
764             For both persistent or non-persistent messages, you can set this header to the number of
765             seconds this message should be held before being delivered. In other words, this allows
766             you to delay delivery of a message for an arbitrary number of seconds.
767              
768             All the storage engines in the standard distribution support this header. B<But it will not
769             work without a pump frequency enabled!> If using mq.pl, enable with --pump-freq or if creating
770             a L<POE::Component::MessageQueue> object directly, pass pump_frequency as an argument to new().
771              
772             =back
773              
774             =head2 Queues and Topics
775              
776             In PoCo::MQ there are two types of I<destinations>: B<queues> and B<topics>
777              
778             =over 4
779              
780             =item B<queue>
781              
782             Each message is only delivered to a single subscriber (not counting
783             messages that were delivered but not ACK'd). If there are multiple
784             subscribers on a single queue, the messages will be divided amoung them,
785             roughly equally.
786              
787             =item B<topic>
788              
789             Each message is delivered to every subscriber. Topics don't support any kind
790             of persistence, so to get a message, a subscriber I<must> be connected at the
791             time it was sent.
792              
793             =back
794              
795             All destination names start with either "/queue/" or "/topic/" to distinguish
796             between queues and topics.
797              
798             =head2 Tips and Tricks
799              
800             =over 4
801              
802             =item B<Logging! Use it.>
803              
804             PoCo::MQ uses L<POE::Component::Logger> for logging which is based on
805             L<Log::Dispatch>. By default B<mq.pl> looks for a log file at:
806             "/etc/perl_mq/log.conf". Or you can specify an alternate location with the
807             I<--log-conf> command line argument.
808              
809             =item B<Using the login/passcode to track clients in the log.>
810              
811             Currently the login and passcode aren't used by PoCo::MQ for auth, but they
812             I<are> written to the log file. In the log file clients are only identified
813             by the client id. But if you put information identifying the client in the
814             login/passcode you can connect that to a client id by finding it in the log.
815              
816             =back
817              
818             =head1 STORAGE
819              
820             When creating an instance of this component you must pass in a storage object
821             so that the message queue knows how to store its messages. There are some storage
822             backends provided with this distribution. See their individual documentation for
823             usage information. Here is a quick break down:
824              
825             =over 4
826              
827             =item *
828              
829             L<POE::Component::MessageQueue::Storage::Memory> -- The simplest storage engine. It keeps messages in memory and provides absolutely no presistence.
830              
831             =item *
832              
833             L<POE::Component::MessageQueue::Storage::BigMemory> -- An alternative memory storage engine that is optimized for large numbers of messages.
834              
835             =item *
836              
837             L<POE::Component::MessageQueue::Storage::DBI> -- Uses Perl L<DBI> to store messages. Depending on your database configuration, using directly may not be recommended because the message bodies are stored in the database. Wrapping with L<POE::Component::MessageQueue::Storage::FileSystem> allows you to store the message bodies on disk. All messages are stored persistently. (Underneath this is really just L<POE::Component::MessageQueue::Storage::Generic> and L<POE::Component::MessageQueue::Storage::Generic::DBI>)
838              
839             =item *
840              
841             L<POE::Component::MessageQueue::Storage::FileSystem> -- Wraps around another storage engine to store the message bodies on the filesystem. This can be used in conjunction with the DBI storage engine so that message properties are stored in DBI, but the message bodies are stored on disk. All messages are stored persistently regardless of whether a message has set the persistent header or not.
842              
843             =item *
844              
845             L<POE::Component::MessageQueue::Storage::Generic> -- Uses L<POE::Component::Generic> to wrap storage modules that aren't asynchronous. Using this module is the easiest way to write custom storage engines.
846              
847             =item *
848              
849             L<POE::Component::MessageQueue::Storage::Generic::DBI> -- A synchronous L<DBI>-based storage engine that can be used inside of Generic. This provides the basis for the L<POE::Component::MessageQueue::Storage::DBI> module.
850              
851             =item *
852              
853             L<POE::Component::MessageQueue::Storage::Throttled> -- Wraps around another engine to limit the number of messages sent to be stored at once. Use of this module is B<highly> recommended! If the storage engine is unable to store the messages fast enough (ie. with slow disk IO) it can get really backed up and stall messages coming out of the queue, allowing execessive producers to basically monopolize the server, preventing any messages from getting distributed to subscribers. Also, it will significantly cuts down the number of open FDs when used with L<POE::Component::MessageQueue::Storage::FileSystem>. Internally it makes use of L<POE::Component::MessageQueue::Storage::BigMemory> to store the throttled messages.
854              
855             =item *
856              
857             L<POE::Component::MessageQueue::Storage::Complex> -- A configurable storage engine that keeps a front-store (something fast) and a back-store (something persistent), allowing you to specify a timeout and an action to be taken when messages in the front-store expire, by default, moving them into the back-store. This optimization allows for the possibility of messages being handled before ever having to be persisted. Complex is capable to correctly handle the persistent and expire-after headers.
858              
859             =item *
860              
861             L<POE::Component::MessageQueue::Storage::Default> -- A combination of the Complex, BigMemory, FileSystem, DBI and Throttled modules above. It will keep messages in BigMemory and move them into FileSystem after a given number of seconds, throttling messages passed into DBI. The DBI backend is configured to use SQLite. It is capable to correctly handle the persistent and expire-after headers. This is the recommended storage engine and should provide the best performance in the most common case (ie. when both providers and consumers are connected to the queue at the same time).
862              
863             =back
864              
865             =head1 CONSTRUCTOR PARAMETERS
866              
867             =over 2
868              
869             =item storage => SCALAR
870              
871             The only required parameter. Sets the object that the message queue should use for
872             message storage. This must be an object that follows the interface of
873             L<POE::Component::MessageQueue::Storage> but doesn't necessarily need to be a child
874             of that class.
875              
876             =item alias => SCALAR
877              
878             The session alias to use.
879              
880             =item port => SCALAR
881              
882             The optional port to listen on. If none is given, we use 61613 by default.
883              
884             =item address => SCALAR
885              
886             The option interface address to bind to. It defaults to INADDR_ANY or INADDR6_ANY
887             when using IPv4 or IPv6, respectively.
888              
889             =item hostname => SCALAR
890              
891             The optional name of the interface to bind to. This will be converted to the IP and
892             used as if you set I<address> instead. If you set both I<hostname> and I<address>,
893             I<address> will override this value.
894              
895             =item domain => SCALAR
896              
897             Optionally specifies the domain within which communication will take place. Defaults
898             to AF_INET.
899              
900             =item logger_alias => SCALAR
901              
902             Optionally set the alias of the POE::Component::Logger object that you want the message
903             queue to log to. If no value is given, log information is simply printed to STDERR.
904              
905             =item message_class => SCALAR
906              
907             Optionally set the package name to use for the Message object. This should be a child
908             class of POE::Component::MessageQueue::Message or atleast follow the same interface.
909              
910             This allows you to add new message headers which the MQ can recognize.
911              
912             =item pump_frequency => SCALAR
913              
914             Optionally set how often (in seconds) to automatically pump each queue. If zero or
915             no value is given, then this timer is disabled entirely.
916              
917             When disabled, each queue is only pumped when its contents change, meaning
918             when a message is added or removed from the queue. Normally, this is enough. However,
919             if your storage engine holds back messages for any reason (ie. to delay their
920             delivery) it will be necessary to enable this, so that the held back messages will
921             ultimately be delivered.
922              
923             I<You must enable this for the message queue to honor the deliver-after header!>
924              
925             =item observers => ARRAYREF
926              
927             Optionally pass in a number of objects that will receive information about events inside
928             of the message queue.
929              
930             Currently, only one observer is provided with the PoCo::MQ distribution:
931             L<POE::Component::MessageQueue::Statistics>. Please see its documentation for more information.
932              
933             =back
934              
935             =head1 HIGH AVAILABILITY
936              
937             From version 0.2.10, PoCo::MQ supports a features to enable high availability.
938              
939             =over 4
940              
941             =item B<Clustering>
942              
943             You can now run multiple MQs which share the same back-store, behind a reverse-proxy load-balancer with
944             automatic fail-over, if one of the MQs goes down.
945              
946             See the the clustering documentation for more information:
947              
948             L<POE::Component::MessageQueue::Manual::Clustering>
949              
950             =item B<DBI fail-over>
951              
952             The DBI storage engine can be configured with a list of database servers. If one of them is not available
953             or goes down, it will fail-over to the next one.
954              
955             If you set up several database servers with master-to-master replication, this will allow the MQ to seemlessly
956             handle failure of one of the databases.
957              
958             See the DBI storage engine documentation for more information:
959              
960             L<POE::Component::MessageQueue::Storage::Generic::DBI>
961              
962             =back
963              
964             =head1 REFERENCES
965              
966             =over 4
967              
968             =item [1]
969              
970             L<http://en.wikipedia.org/wiki/Message_Queue> -- General information about message queues
971              
972             =item [2]
973              
974             L<http://stomp.codehaus.org/Protocol> -- The informal "spec" for the STOMP protocol
975              
976             =item [3]
977              
978             L<http://www.activemq.org/> -- ActiveMQ is a popular Java-based message queue
979              
980             =back
981              
982             =head1 UPGRADING FROM OLDER VERSIONS
983              
984             If you used any of the following storage engines with PoCo::MQ 0.2.9 or older:
985              
986             =over 4
987              
988             =item *
989              
990             L<POE::Component::MessageQueue::Storage::DBI>
991              
992             =back
993              
994             The database format has changed!
995              
996             B<Note:> When using L<POE::Component::MessageQueue::Storage::Default> (meaning mq.pl
997             --storage default) the database will be automatically updated in place, so you don't
998             need to worry about this.
999              
1000             Included in the distribution, is a schema/ directory with a few SQL scripts for
1001             upgrading:
1002              
1003             =over
1004              
1005             =item *
1006              
1007             upgrade-0.1.7.sql -- Apply if you are upgrading from version 0.1.6 or older.
1008              
1009             =item *
1010              
1011             upgrade-0.1.8.sql -- Apply if your are upgrading from version 0.1.7 or after applying
1012             the above upgrade script. This one has a SQLite specific version: upgrade-0.1.8-sqlite.sql).
1013              
1014             =item *
1015              
1016             upgrade-0.2.3.sql -- Apply if you are upgrading from version 0.2.2 or older (after
1017             applying the above upgrade scripts).
1018              
1019             =item *
1020              
1021             upgrade-0.2.9-mysql.sql -- Doesn't apply to SQLite users! Apply if you are upgrading from version
1022             0.2.8 or older (after applying the above upgrade scripts).
1023              
1024             =item *
1025              
1026             upgrade-0.2.10-mysql.sql -- Doesn't apply to SQLite users! Apply if you are upgrading from version
1027             0.2.9 or older (after applying the above upgrade scripts).
1028              
1029             =back
1030              
1031             =head1 CONTACT
1032              
1033             Please check out the Google Group at:
1034              
1035             L<http://groups.google.com/group/pocomq>
1036              
1037             Or just send an e-mail to: pocomq@googlegroups.com
1038              
1039             =head1 DEVELOPMENT
1040              
1041             If you find any bugs, have feature requests, or wish to contribute, please
1042             contact us at our Google Group mentioned above. We'll do our best to help you
1043             out!
1044              
1045             Development is coordinated via Bazaar (See L<http://bazaar-vcs.org>). The main
1046             Bazaar branch can be found here:
1047              
1048             L<http://code.hackyourlife.org/bzr/dsnopek/perl_mq/devel.mainline>
1049              
1050             We prefer that contributions come in the form of a published Bazaar branch with the
1051             changes. This helps facilitate the back-and-forth in the review process to get
1052             any new code merged into the main branch.
1053              
1054             There is also an official git mirror hosted on GitHub here:
1055              
1056             L<https://github.com/dsnopek/POE--Component--MessageQueue>
1057              
1058             We will also accept contributions via git and GitHub pull requests!
1059              
1060             =head1 FUTURE
1061              
1062             The goal of this module is not to support every possible feature but rather to
1063             be small, simple, efficient and robust. For the most part expect incremental
1064             changes to address those areas.
1065              
1066             Beyond that we have a TODO list (shown below) called B<"The Long Road To
1067             1.0">. This is a list of things we feel we need to have inorder to call the
1068             product complete. That includes management and monitoring tools for sysadmins
1069             as well as documentation for developers.
1070              
1071             =over 4
1072              
1073             =item *
1074              
1075             B<Full support for STOMP>: Includes making sure we are robust to clients
1076             participating badly in the protocol.
1077              
1078             =item *
1079              
1080             B<Authentication and authorization>: This should be highly pluggable, but
1081             basically (as far as authorization goes) each user can get read/write/admin
1082             perms for a queue which are inherited by default to sub-queues (as separated
1083             by the dot character).
1084              
1085             =item *
1086              
1087             B<Monitoring/management tools>: It should be possible for an admin to monitor the
1088             overall state of the queue, ie: (1) how many messages for what queues are in
1089             the front-store, throttled, back-store, etc, (2) information on connected
1090             clients, (3) data/message thorough put, (4) daily/weekly/monthly trends, (X)
1091             etc.. They should also be able to "peek" at any message at any point as well
1092             as delete messages or whole queues.
1093             The rough plan is to use special STOMP frames and "magic" queues/topics to
1094             access special information or perform admin tasks. Command line scripts for
1095             simple things would be included in the main distribution and a full-featured
1096             web-interface would be provided as a separate module.
1097              
1098             =item *
1099              
1100             B<Log rotation>: At minimum, documentation on how to set it up.
1101              
1102             =item *
1103              
1104             B<Docs on "using" the MQ>: A full tutorial from start to finish, advice on
1105             writing good consumers/producers and solid docs on authoring custom storage
1106             engines.
1107              
1108             =back
1109              
1110             =head1 APPLICATIONS USING PoCo::MQ
1111              
1112             =over 4
1113              
1114             =item L<http://chessvegas.com>
1115              
1116             Chess gaming site ChessVegas.
1117              
1118             =back
1119              
1120             =head1 SEE ALSO
1121              
1122             I<External modules:>
1123              
1124             L<POE>,
1125             L<POE::Component::Server::Stomp>,
1126             L<POE::Component::Client::Stomp>,
1127             L<Net::Stomp>,
1128             L<POE::Filter::Stomp>,
1129             L<POE::Component::Logger>,
1130             L<DBD::SQLite>,
1131             L<POE::Component::Generic>
1132              
1133             I<Storage modules:>
1134              
1135             L<POE::Component::MessageQueue::Storage>,
1136             L<POE::Component::MessageQueue::Storage::Memory>,
1137             L<POE::Component::MessageQueue::Storage::BigMemory>,
1138             L<POE::Component::MessageQueue::Storage::DBI>,
1139             L<POE::Component::MessageQueue::Storage::FileSystem>,
1140             L<POE::Component::MessageQueue::Storage::Generic>,
1141             L<POE::Component::MessageQueue::Storage::Generic::DBI>,
1142             L<POE::Component::MessageQueue::Storage::Double>,
1143             L<POE::Component::MessageQueue::Storage::Throttled>,
1144             L<POE::Component::MessageQueue::Storage::Complex>,
1145             L<POE::Component::MessageQueue::Storage::Default>
1146              
1147             I<Statistics modules:>
1148              
1149             L<POE::Component::MessageQueue::Statistics>,
1150             L<POE::Component::MessageQueue::Statistics::Publish>,
1151             L<POE::Component::MessageQueue::Statistics::Publish::YAML>
1152              
1153             I<ID generator modules:>
1154              
1155             L<POE::Component::MessageQueue::IDGenerator>,
1156             L<POE::Component::MessageQueue::IDGenerator::SimpleInt>,
1157             L<POE::Component::MessageQueue::IDGenerator::UUID>
1158              
1159             =head1 BUGS
1160              
1161             We are serious about squashing bugs! Currently, there are no known bugs, but
1162             some probably do exist. If you find any, please let us know at the Google group.
1163              
1164             That said, we are using this in production in a commercial application for
1165             thousands of large messages daily and we experience very few issues.
1166              
1167             =head1 AUTHORS
1168              
1169             Copyright 2007-2011 David Snopek (L<http://www.hackyourlife.org>)
1170              
1171             Copyright 2007, 2008 Paul Driver <frodwith@gmail.com>
1172              
1173             Copyright 2007 Daisuke Maki <daisuke@endeworks.jp>
1174              
1175             =head1 LICENSE
1176              
1177             This program is free software: you can redistribute it and/or modify
1178             it under the terms of the GNU General Public License as published by
1179             the Free Software Foundation, either version 2 of the License, or
1180             (at your option) any later version.
1181              
1182             This program is distributed in the hope that it will be useful,
1183             but WITHOUT ANY WARRANTY; without even the implied warranty of
1184             MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1185             GNU General Public License for more details.
1186              
1187             You should have received a copy of the GNU General Public License
1188             along with this program. If not, see <http://www.gnu.org/licenses/>.
1189              
1190             =cut
1191