File Coverage

blib/lib/POE/Component/MessageQueue/Storage/FileSystem.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             #
2             # Copyright 2007-2010 David Snopek <dsnopek@gmail.com>
3             #
4             # This program is free software: you can redistribute it and/or modify
5             # it under the terms of the GNU General Public License as published by
6             # the Free Software Foundation, either version 2 of the License, or
7             # (at your option) any later version.
8             #
9             # This program is distributed in the hope that it will be useful,
10             # but WITHOUT ANY WARRANTY; without even the implied warranty of
11             # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12             # GNU General Public License for more details.
13             #
14             # You should have received a copy of the GNU General Public License
15             # along with this program. If not, see <http://www.gnu.org/licenses/>.
16             #
17              
18             package POE::Component::MessageQueue::Storage::FileSystem;
19 1     1   2311 use Moose;
  0            
  0            
20              
21             use POE::Kernel;
22             use POE::Session;
23             use POE::Filter::Stream;
24             use POE::Wheel::ReadWrite;
25             use IO::File;
26             use IO::Dir;
27              
28             use constant NotFound => 'FILESYSTEM: Message not on disk';
29              
30             has 'info_storage' => (
31             is => 'ro',
32             required => 1,
33             does => qw(POE::Component::MessageQueue::Storage),
34             handles => [qw(claim disown_all disown_destination)],
35             );
36              
37             # For all these, we get an aref of stuff that needs bodies from our info
38             # store. So, let's just make them all at once.
39             foreach my $method (qw(get get_all)) {
40             __PACKAGE__->meta->add_method($method, sub {
41             my $self = shift;
42             my $callback = pop;
43             $self->info_storage->$method(@_, sub {
44             $self->_read_loop($_[0], [], sub {
45             @_ = ([grep { $_ ne NotFound } @{$_[0]}]);
46             goto $callback;
47             });
48             });
49             });
50             }
51              
52             # These are similar to the above, but for single messages. Also, we want to
53             # retry if the message info tells us about is not on disk.
54             foreach my $method (qw(get_oldest claim_and_retrieve)) {
55             __PACKAGE__->meta->add_method($method, sub {
56             my $self = shift;
57             my $callback = pop;
58             my @args = @_;
59             $self->info_storage->$method(@args, sub {
60             my $info_answer = $_[0];
61             goto $callback unless $info_answer;
62              
63             $self->_read_loop([$info_answer], [], sub {
64             my $disk_answer = $_[0]->[0];
65              
66             if ($disk_answer eq NotFound)
67             {
68             $self->$method(@args, $callback);
69             }
70             else
71             {
72             @_ = ($disk_answer);
73             goto $callback;
74             }
75             });
76             });
77             });
78             }
79              
80             # Apply the role here, after we've monkeyed with the metaclass
81             with qw(POE::Component::MessageQueue::Storage);
82              
83             has 'data_dir' => (
84             is => 'ro',
85             isa => 'Str',
86             required => 1,
87             );
88              
89             use constant empty_hashref => (
90             is => 'ro',
91             default => sub{ {} },
92             );
93              
94             has 'file_wheels' => empty_hashref;
95             has 'wheel_to_message_map' => empty_hashref;
96             has 'pending_writes' => empty_hashref;
97              
98             has 'alias' => (
99             is => 'ro',
100             isa => 'Str',
101             default => 'MQ-Storage-Filesystem',
102             required => 1,
103             );
104              
105             has 'session' => (is => 'rw');
106              
107             has 'shutdown_callback' => (
108             is => 'ro',
109             writer => 'set_shutdown_callback',
110             predicate => 'shutting_down',
111             clearer => 'stop_shutdown',
112             );
113              
114             has 'shutdown_waiting' => (
115             is => 'rw',
116             isa => 'Bool',
117             default => 1,
118             );
119              
120             after 'set_logger' => sub {
121             my ($self, $logger) = @_;
122             $self->info_storage->set_logger($logger);
123             };
124              
125             sub BUILD
126             {
127             my $self = shift;
128             $self->children({INFO => $self->info_storage});
129             $self->session(POE::Session->create(
130             object_states => [
131             $self => [qw(
132             _start _stop _shutdown
133             _read_message_from_disk _read_input _read_error
134             _write_message_to_disk _write_flushed_event
135             )]
136             ],
137             ));
138             }
139              
140             sub _start
141             {
142             my ($self, $kernel) = @_[OBJECT, KERNEL];
143             $kernel->alias_set($self->alias);
144             }
145              
146             sub _shutdown
147             {
148             my ($self, $kernel) = @_[OBJECT, KERNEL];
149             $kernel->alias_remove($self->alias);
150             }
151              
152             # POE calls this when the session dies.
153             sub _stop
154             {
155             my ($self) = $_[OBJECT];
156             $self->_something_finished_shutting();
157             }
158              
159             sub _something_finished_shutting {
160             my $self = $_[0];
161             my $complete = $self->shutdown_callback;
162              
163             if($self->shutdown_waiting)
164             {
165             $self->shutdown_waiting(0);
166             }
167             else
168             {
169             goto $complete if $complete;
170             }
171             }
172              
173             sub storage_shutdown
174             {
175             my ($self, $complete) = @_;
176              
177             # We need to wait for two states: info_storage complete and no wheels.
178             $self->set_shutdown_callback($complete);
179              
180             $self->info_storage->storage_shutdown(sub {
181             $self->_something_finished_shutting();
182             });
183              
184             $poe_kernel->post($self->alias, '_shutdown');
185             }
186              
187             sub store
188             {
189             my ($self, $message, $callback) = @_;
190              
191             # DRS: To avaid a race condition where:
192             #
193             # (1) We post _write_message_to_disk
194             # (2) Message is "removed" from disk (even though it isn't there yet)
195             # (3) We start writing message to disk
196             #
197             # Mark message as needing to be written.
198             #
199             # PLD: Also, multiple copies of messages is bad juju. Delete the body from
200             # a clone.
201             my $info_copy = $message->clone;
202              
203             # Make sure the size is computed before we delete the body
204             $info_copy->size;
205             $self->pending_writes->{$message->id} = $info_copy->delete_body();
206              
207             # initiate file writing process (only the body will be written)
208             $poe_kernel->post($self->session, _write_message_to_disk => $message);
209              
210             $self->info_storage->store($info_copy, $callback);
211             }
212              
213             sub _get_filename
214             {
215             my ($self, $message_id) = @_;
216             return sprintf('%s/msg-%s.txt', $self->data_dir, $message_id);
217             }
218              
219             sub _hard_delete
220             {
221             my ($self, $id) = @_;
222            
223             # Just unlink it unless there are pending writes
224             return $self->_unlink_file($id) unless delete $self->pending_writes->{$id};
225              
226             my $info = $self->file_wheels->{$id};
227             if ($info)
228             {
229             $self->log('debug', "Stopping wheels for message $id (removing)");
230             my $wheel = $info->{write_wheel} || $info->{read_wheel};
231             $wheel->shutdown_input();
232             $wheel->shutdown_output();
233              
234             # Mark for deletion: we'll detect this primarly in
235             # _write_flushed_event and unlink the file at that time
236             # (prevents a file descriptor leak)
237             $info->{delete_me} = 1;
238             }
239             else
240             {
241             # If we haven't started yet, _write_message_to_disk will just abort.
242             $self->log('debug', "Removing message $id before writing started");
243             }
244             }
245              
246             sub _unlink_file
247             {
248             my ($self, $message_id) = @_;
249             my $fn = $self->_get_filename($message_id);
250             $self->log( 'debug', "Deleting $fn" );
251             unlink $fn ||
252             $self->log( 'error', "Unable to remove $fn: $!" );
253             return;
254             }
255              
256             # We can't use an iterative loop, because we may have to read messages from
257             # disk. So, here's our recursive function that may pause in the middle to
258             # wait for disk reads.
259             sub _read_loop
260             {
261             my ($self, $to_read, $done_reading, $callback) = @_;
262             my $again = sub {
263             @_ = ($self, $to_read, $done_reading, $callback);
264             goto &_read_loop;
265             };
266              
267             my $message = pop(@$to_read);
268             unless ($message) {
269             @_ = ($done_reading);
270             goto $callback;
271             }
272              
273             if (my $body = $self->pending_writes->{$message->id})
274             {
275             $message->body($body);
276             push(@$done_reading, $message);
277             goto $again;
278             }
279             else
280             {
281             # Don't have the body anymore, so we'll have to read it from disk.
282             $poe_kernel->post($self->session,
283             _read_message_from_disk => $message->id, sub {
284             my $answer = $_[0];
285             if ($answer eq NotFound)
286             {
287             push(@$done_reading, $answer);
288             }
289             elsif (defined $answer)
290             {
291             $message->body($answer);
292             push(@$done_reading, $message);
293             }
294             goto $again;
295             }
296             );
297             }
298             }
299              
300             sub remove
301             {
302             my ($self, $message_ids, $callback) = @_;
303             $self->info_storage->remove($message_ids, sub {
304             $self->_hard_delete($_) foreach (@$message_ids);
305             goto $callback if $callback;
306             });
307             }
308              
309             sub empty
310             {
311             my ($self, $callback) = @_;
312              
313             $self->info_storage->empty(sub {
314             # Delete all the message files that don't have writes pending
315             my $dh = IO::Dir->new($self->data_dir);
316             foreach my $fn ($dh->read())
317             {
318             if ($fn =~ /msg-\(.*\)\.txt/)
319             {
320             my $id = $1;
321             $self->_unlink_file($id) unless exists $self->pending_writes->{$id};
322             }
323             }
324            
325             # Do the special dance for deleting those that are pending
326             $self->_hard_delete($_) foreach (keys %{$self->pending_writes});
327             goto $callback if $callback;
328             });
329             }
330              
331             #
332             # For handling disk access
333             #
334             sub _write_message_to_disk
335             {
336             my ($self, $kernel, $message) = @_[ OBJECT, KERNEL, ARG0, ARG1 ];
337              
338             if ($self->file_wheels->{$message->id})
339             {
340             $self->log('emergency', sprintf(
341             '%s::_write_message_to_disk: wheel already exists for message %s!',
342             __PACKAGE__, $message->id
343             ));
344             return;
345             }
346            
347             unless ($self->pending_writes->{$message->id})
348             {
349             $self->log('debug', sprintf('Abort write of message %s to disk',
350             $message->id));
351             delete $self->file_wheels->{$message->id};
352             return;
353             }
354              
355             # Yes, we do want to die if we can't open the file for writing. It
356             # means something is wrong and it's very unlikely we can persist other
357             # messages.
358             my $fn = $self->_get_filename($message->id);
359             my $fh = IO::File->new( ">$fn" ) || die "Unable to save message in $fn: $!";
360              
361             my $wheel = POE::Wheel::ReadWrite->new(
362             Handle => $fh,
363             Filter => POE::Filter::Stream->new(),
364             FlushedEvent => '_write_flushed_event'
365             );
366              
367             # initiate the write to disk
368             $wheel->put($self->pending_writes->{$message->id});
369              
370             # stash the wheel in our maps
371             $self->file_wheels->{$message->id} = {write_wheel => $wheel};
372              
373             $self->wheel_to_message_map->{$wheel->ID()} = $message->id;
374             }
375              
376             sub _read_message_from_disk
377             {
378             my ($self, $kernel, $id, $callback) = @_[ OBJECT, KERNEL, ARG0, ARG1 ];
379              
380             if ($self->file_wheels->{$id})
381             {
382             my $here = __PACKAGE__.'::_read_message_from_disk()';
383             $self->log('emergency',
384             "$here: A wheel already exists for this message ($id)! ".
385             'This should never happen!'
386             );
387             return;
388             }
389              
390             my $fn = $self->_get_filename($id);
391             my $fh = IO::File->new( $fn );
392            
393             $self->log( 'debug', "Starting to read $fn from disk" );
394              
395             # if we can't find the message body. This usually happens as a result
396             # of crash recovery.
397             unless ($fh)
398             {
399             $self->log( 'warning', "Can't find $fn on disk! Discarding message." );
400              
401             # we need to get the message out of the info store
402             $self->info_storage->remove($id);
403              
404             @_ = (NotFound);
405             goto $callback;
406             }
407            
408             # setup the wheel
409             my $wheel = POE::Wheel::ReadWrite->new(
410             Handle => $fh,
411             Filter => POE::Filter::Stream->new(),
412             InputEvent => '_read_input',
413             ErrorEvent => '_read_error'
414             );
415              
416             # stash the wheel in our maps
417             $self->file_wheels->{$id} = {
418             read_wheel => $wheel,
419             accumulator => q{},
420             callback => $callback
421             };
422             $self->wheel_to_message_map->{$wheel->ID()} = $id;
423             }
424              
425             sub _read_input
426             {
427             my ($self, $kernel, $input, $wheel_id) = @_[ OBJECT, KERNEL, ARG0, ARG1 ];
428              
429             # We do care about reading during shutdown! Maybe. We may be using this as
430             # a front-store (HA!), and doing empty.
431              
432             my $id = $self->wheel_to_message_map->{$wheel_id};
433             $self->file_wheels->{$id}->{accumulator} .= $input;
434             }
435              
436             sub _read_error
437             {
438             my ($self, $op, $errnum, $errstr, $wheel_id) = @_[ OBJECT, ARG0..ARG3 ];
439              
440             if ( $op eq 'read' and $errnum == 0 )
441             {
442             # EOF! Our message is now totally assembled. Hurray!
443              
444             my $id = $self->wheel_to_message_map->{$wheel_id};
445             my $info = $self->file_wheels->{$id};
446             my $body = $info->{accumulator};
447             my $callback = $info->{callback};
448              
449             my $fn = $self->_get_filename($id);
450             $self->log('debug', "Finished reading $fn");
451              
452             # clear our state
453             delete $self->wheel_to_message_map->{$wheel_id};
454             delete $self->file_wheels->{$id};
455              
456             # NOTE: I have never seen this happen, but it seems theoretically
457             # possible. Considering the former problem with leaking FD's, I'd
458             # rather keep this here just in case.
459             $self->_unlink_file($id) if ($info->{delete_me});
460              
461             # send the message out!
462             @_ = ($body);
463             goto $callback;
464             }
465             else
466             {
467             $self->log( 'error', "$op: Error $errnum $errstr" );
468             }
469             }
470              
471             sub _write_flushed_event
472             {
473             my ($self, $kernel, $wheel_id) = @_[ OBJECT, KERNEL, ARG0 ];
474              
475             # remove from the first map
476             my $id = delete $self->wheel_to_message_map->{$wheel_id};
477              
478             $self->log( 'debug', "Finished writing message $id to disk" );
479              
480             # remove from the second map
481             my $info = delete $self->file_wheels->{$id};
482              
483             # Write isn't pending anymore. :)
484             delete $self->pending_writes->{$id};
485              
486             # If we were actively writing the file when the message to delete
487             # came, we cannot actually delete it until the FD gets flushed, or the FD
488             # will live until the program dies.
489             $self->_unlink_file($id) if ($info->{delete_me});
490             }
491              
492             1;
493              
494             __END__
495              
496             =pod
497              
498             =head1 NAME
499              
500             POE::Component::MessageQueue::Storage::FileSystem -- A storage engine that keeps message bodies on the filesystem
501              
502             =head1 SYNOPSIS
503              
504             use POE;
505             use POE::Component::MessageQueue;
506             use POE::Component::MessageQueue::Storage::FileSystem;
507             use POE::Component::MessageQueue::Storage::DBI;
508             use strict;
509              
510             # For mysql:
511             my $DB_DSN = 'DBI:mysql:database=perl_mq';
512             my $DB_USERNAME = 'perl_mq';
513             my $DB_PASSWORD = 'perl_mq';
514              
515             POE::Component::MessageQueue->new({
516             storage => POE::Component::MessageQueue::Storage::FileSystem->new({
517             info_storage => POE::Component::MessageQueue::Storage::DBI->new({
518             dsn => $DB_DSN,
519             username => $DB_USERNAME,
520             password => $DB_PASSWORD,
521             }),
522             data_dir => $DATA_DIR,
523             })
524             });
525              
526             POE::Kernel->run();
527             exit;
528              
529             =head1 DESCRIPTION
530              
531             A storage engine that wraps around another storage engine in order to store
532             the message bodies on the file system. The other message properties are
533             stored with the wrapped storage engine.
534              
535             While I would argue that using this module is less efficient than using
536             L<POE::Component::MessageQueue::Storage::Complex>, using it directly would
537             make sense if persistance was your primary concern. All messages stored via
538             this backend will be persistent regardless of whether they have the persistent
539             header set to true or not. Every message is stored, even if it is handled
540             right away and will be removed immediately after having been stored.
541              
542             =head1 CONSTRUCTOR PARAMETERS
543              
544             =over 2
545              
546             =item info_storage => L<POE::Component::MessageQueue::Storage>
547              
548             The storage engine used to store message properties.
549              
550             =item data_dir => SCALAR
551              
552             The directory to store the files containing the message body's.
553              
554             =back
555              
556             =head1 SUPPORTED STOMP HEADERS
557              
558             Be sure to check also the storage engine you are wrapping!
559              
560             =over 4
561              
562             =item B<persistent>
563              
564             I<Ignored>. All message bodies are always persisted.
565              
566             =item B<expire-after>
567              
568             I<Ignored>. All message bodies are kept until handled.
569              
570             =item B<deliver-after>
571              
572             I<Fully Supported>.
573              
574             =back
575              
576             =head1 SEE ALSO
577              
578             L<POE::Component::MessageQueue>,
579             L<POE::Component::MessageQueue::Storage>
580              
581             I<Other storage engines:>
582              
583             L<POE::Component::MessageQueue::Storage::DBI>,
584             L<POE::Component::MessageQueue::Storage::Memory>,
585             L<POE::Component::MessageQueue::Storage::BigMemory>,
586             L<POE::Component::MessageQueue::Storage::Generic>,
587             L<POE::Component::MessageQueue::Storage::Generic::DBI>,
588             L<POE::Component::MessageQueue::Storage::Throttled>,
589             L<POE::Component::MessageQueue::Storage::Complex>,
590             L<POE::Component::MessageQueue::Storage::Default>
591              
592             =cut
593