File Coverage

blib/lib/POE/Component/MessageQueue/Storage/FileSystem.pm
Criterion Covered Total %
statement 150 172 87.2
branch 24 40 60.0
condition 3 11 27.2
subroutine 35 35 100.0
pod 0 4 0.0
total 212 262 80.9


line stmt bran cond sub pod time code
1             #
2             # Copyright 2007-2010 David Snopek <dsnopek@gmail.com>
3             #
4             # This program is free software: you can redistribute it and/or modify
5             # it under the terms of the GNU General Public License as published by
6             # the Free Software Foundation, either version 2 of the License, or
7             # (at your option) any later version.
8             #
9             # This program is distributed in the hope that it will be useful,
10             # but WITHOUT ANY WARRANTY; without even the implied warranty of
11             # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12             # GNU General Public License for more details.
13             #
14             # You should have received a copy of the GNU General Public License
15             # along with this program. If not, see <http://www.gnu.org/licenses/>.
16             #
17              
18             package POE::Component::MessageQueue::Storage::FileSystem;
19 11     11   1508 use Moose;
  11         23  
  11         79  
20              
21 11     11   67980 use POE::Kernel;
  11         34  
  11         107  
22 11     11   637 use POE::Session;
  11         25  
  11         83  
23 11     11   6101 use POE::Filter::Stream;
  11         3494  
  11         271  
24 11     11   74 use POE::Wheel::ReadWrite;
  11         26  
  11         221  
25 11     11   52 use IO::File;
  11         28  
  11         1435  
26 11     11   4094 use IO::Dir;
  11         72463  
  11         546  
27              
28 11     11   122 use constant NotFound => 'FILESYSTEM: Message not on disk';
  11         32  
  11         3323  
29              
30             has 'info_storage' => (
31             is => 'ro',
32             required => 1,
33             does => qw(POE::Component::MessageQueue::Storage),
34             handles => [qw(claim disown_all disown_destination)],
35             );
36              
37             # For all these, we get an aref of stuff that needs bodies from our info
38             # store. So, let's just make them all at once.
39             foreach my $method (qw(get get_all)) {
40             __PACKAGE__->meta->add_method($method, sub {
41 15     15   162 my $self = shift;
42 15         46 my $callback = pop;
43             $self->info_storage->$method(@_, sub {
44             $self->_read_loop($_[0], [], sub {
45 15         39 @_ = ([grep { $_ ne NotFound } @{$_[0]}]);
  1173         2516  
  15         58  
46 15         152 goto $callback;
47 15     15   365707 });
48 15         635 });
49             });
50             }
51              
52             # These are similar to the above, but for single messages. Also, we want to
53             # retry if the message info tells us about is not on disk.
54             foreach my $method (qw(get_oldest claim_and_retrieve)) {
55             __PACKAGE__->meta->add_method($method, sub {
56 1226     1226   3081147 my $self = shift;
        1226      
57 1226         3678 my $callback = pop;
58 1226         4427 my @args = @_;
59             $self->info_storage->$method(@args, sub {
60 1226     1226   10577709 my $info_answer = $_[0];
61 1226 100       5775 goto $callback unless $info_answer;
62              
63             $self->_read_loop([$info_answer], [], sub {
64 1191         3278 my $disk_answer = $_[0]->[0];
65              
66 1191 100       4886 if ($disk_answer eq NotFound)
67             {
68 20         94 $self->$method(@args, $callback);
69             }
70             else
71             {
72 1171         3267 @_ = ($disk_answer);
73 1171         10899 goto $callback;
74             }
75 1191         12463 });
76 1226         46918 });
77             });
78             }
79              
80             # Apply the role here, after we've monkeyed with the metaclass
81             with qw(POE::Component::MessageQueue::Storage);
82              
83             has 'data_dir' => (
84             is => 'ro',
85             isa => 'Str',
86             required => 1,
87             );
88              
89             use constant empty_hashref => (
90             is => 'ro',
91 18         20434 default => sub{ {} },
92 11     11   82 );
  11         25  
  11         16318  
93              
94             has 'file_wheels' => empty_hashref;
95             has 'wheel_to_message_map' => empty_hashref;
96             has 'pending_writes' => empty_hashref;
97              
98             has 'alias' => (
99             is => 'ro',
100             isa => 'Str',
101             default => 'MQ-Storage-Filesystem',
102             required => 1,
103             );
104              
105             has 'session' => (is => 'rw');
106              
107             has 'shutdown_callback' => (
108             is => 'ro',
109             writer => 'set_shutdown_callback',
110             predicate => 'shutting_down',
111             clearer => 'stop_shutdown',
112             );
113              
114             has 'shutdown_waiting' => (
115             is => 'rw',
116             isa => 'Bool',
117             default => 1,
118             );
119              
120             after 'set_logger' => sub {
121             my ($self, $logger) = @_;
122             $self->info_storage->set_logger($logger);
123             };
124              
125             sub BUILD
126             {
127 6     6 0 8777 my $self = shift;
128 6         289 $self->children({INFO => $self->info_storage});
129 6         131 $self->session(POE::Session->create(
130             object_states => [
131             $self => [qw(
132             _start _stop _shutdown
133             _read_message_from_disk _read_input _read_error
134             _write_message_to_disk _write_flushed_event
135             )]
136             ],
137             ));
138             }
139              
140             sub _start
141             {
142 6     6   2474 my ($self, $kernel) = @_[OBJECT, KERNEL];
143 6         278 $kernel->alias_set($self->alias);
144             }
145              
146             sub _shutdown
147             {
148 6     6   4712 my ($self, $kernel) = @_[OBJECT, KERNEL];
149 6         275 $kernel->alias_remove($self->alias);
150             }
151              
152             # POE calls this when the session dies.
153             sub _stop
154             {
155 6     6   1178 my ($self) = $_[OBJECT];
156 6         33 $self->_something_finished_shutting();
157             }
158              
159             sub _something_finished_shutting {
160 12     12   229 my $self = $_[0];
161 12         518 my $complete = $self->shutdown_callback;
162              
163 12 100       617 if($self->shutdown_waiting)
164             {
165 6         224 $self->shutdown_waiting(0);
166             }
167             else
168             {
169 6 50       78 goto $complete if $complete;
170             }
171             }
172              
173             sub storage_shutdown
174             {
175 6     6 0 942 my ($self, $complete) = @_;
176              
177             # We need to wait for two states: info_storage complete and no wheels.
178 6         296 $self->set_shutdown_callback($complete);
179              
180             $self->info_storage->storage_shutdown(sub {
181 6     6   43 $self->_something_finished_shutting();
182 6         237 });
183              
184 6         262 $poe_kernel->post($self->alias, '_shutdown');
185             }
186              
187             sub store
188             {
189 690     690 0 148050 my ($self, $message, $callback) = @_;
190              
191             # DRS: To avaid a race condition where:
192             #
193             # (1) We post _write_message_to_disk
194             # (2) Message is "removed" from disk (even though it isn't there yet)
195             # (3) We start writing message to disk
196             #
197             # Mark message as needing to be written.
198             #
199             # PLD: Also, multiple copies of messages is bad juju. Delete the body from
200             # a clone.
201 690         3046 my $info_copy = $message->clone;
202              
203             # Make sure the size is computed before we delete the body
204 690         431834 $info_copy->size;
205 690         24981 $self->pending_writes->{$message->id} = $info_copy->delete_body();
206              
207             # initiate file writing process (only the body will be written)
208 690         24328 $poe_kernel->post($self->session, _write_message_to_disk => $message);
209              
210 690         119011 $self->info_storage->store($info_copy, $callback);
211             }
212              
213             sub _get_filename
214             {
215 5517     5517   14074 my ($self, $message_id) = @_;
216 5517         185291 return sprintf('%s/msg-%s.txt', $self->data_dir, $message_id);
217             }
218              
219             sub _hard_delete
220             {
221 119     119   385 my ($self, $id) = @_;
222            
223             # Just unlink it unless there are pending writes
224 119 50       5770 return $self->_unlink_file($id) unless delete $self->pending_writes->{$id};
225              
226 0         0 my $info = $self->file_wheels->{$id};
227 0 0       0 if ($info)
228             {
229 0         0 $self->log('debug', "Stopping wheels for message $id (removing)");
230 0   0     0 my $wheel = $info->{write_wheel} || $info->{read_wheel};
231 0         0 $wheel->shutdown_input();
232 0         0 $wheel->shutdown_output();
233              
234             # Mark for deletion: we'll detect this primarly in
235             # _write_flushed_event and unlink the file at that time
236             # (prevents a file descriptor leak)
237 0         0 $info->{delete_me} = 1;
238             }
239             else
240             {
241             # If we haven't started yet, _write_message_to_disk will just abort.
242 0         0 $self->log('debug', "Removing message $id before writing started");
243             }
244             }
245              
246             sub _unlink_file
247             {
248 119     119   376 my ($self, $message_id) = @_;
249 119         379 my $fn = $self->_get_filename($message_id);
250 119         831 $self->log( 'debug', "Deleting $fn" );
251 119   33     8449 unlink $fn ||
252             $self->log( 'error', "Unable to remove $fn: $!" );
253 119         640 return;
254             }
255              
256             # We can't use an iterative loop, because we may have to read messages from
257             # disk. So, here's our recursive function that may pause in the middle to
258             # wait for disk reads.
259             sub _read_loop
260             {
261 3570     3570   10477 my ($self, $to_read, $done_reading, $callback) = @_;
262             my $again = sub {
263 2364     2364   8501 @_ = ($self, $to_read, $done_reading, $callback);
264 2364         10647 goto &_read_loop;
265 3570         14590 };
266              
267 3570         9298 my $message = pop(@$to_read);
268 3570 100       11638 unless ($message) {
269 1206         3663 @_ = ($done_reading);
270 1206         5552 goto $callback;
271             }
272              
273 2364 50       98950 if (my $body = $self->pending_writes->{$message->id})
274             {
275 0         0 $message->body($body);
276 0         0 push(@$done_reading, $message);
277 0         0 goto $again;
278             }
279             else
280             {
281             # Don't have the body anymore, so we'll have to read it from disk.
282             $poe_kernel->post($self->session,
283             _read_message_from_disk => $message->id, sub {
284 2364     2364   626870 my $answer = $_[0];
285 2364 100       10783 if ($answer eq NotFound)
    50          
286             {
287 20         61 push(@$done_reading, $answer);
288             }
289             elsif (defined $answer)
290             {
291 2344         87128 $message->body($answer);
292 2344         6463 push(@$done_reading, $message);
293             }
294 2364         12824 goto $again;
295             }
296 2364         82232 );
297             }
298             }
299              
300             sub remove
301             {
302             my ($self, $message_ids, $callback) = @_;
303             $self->info_storage->remove($message_ids, sub {
304             $self->_hard_delete($_) foreach (@$message_ids);
305             goto $callback if $callback;
306             });
307             }
308              
309             sub empty
310             {
311 3     3 0 48 my ($self, $callback) = @_;
312              
313             $self->info_storage->empty(sub {
314             # Delete all the message files that don't have writes pending
315 3     3   40934 my $dh = IO::Dir->new($self->data_dir);
316 3         324 foreach my $fn ($dh->read())
317             {
318 588 50       2175 if ($fn =~ /msg-\(.*\)\.txt/)
319             {
320 0         0 my $id = $1;
321 0 0       0 $self->_unlink_file($id) unless exists $self->pending_writes->{$id};
322             }
323             }
324            
325             # Do the special dance for deleting those that are pending
326 3         35 $self->_hard_delete($_) foreach (keys %{$self->pending_writes});
  3         129  
327 3 50       28 goto $callback if $callback;
328 3         108 });
329             }
330              
331             #
332             # For handling disk access
333             #
334             sub _write_message_to_disk
335             {
336 690     690   309960 my ($self, $kernel, $message) = @_[ OBJECT, KERNEL, ARG0, ARG1 ];
337              
338 690 50       30427 if ($self->file_wheels->{$message->id})
339             {
340 0         0 $self->log('emergency', sprintf(
341             '%s::_write_message_to_disk: wheel already exists for message %s!',
342             __PACKAGE__, $message->id
343             ));
344 0         0 return;
345             }
346            
347 690 50       24486 unless ($self->pending_writes->{$message->id})
348             {
349 0         0 $self->log('debug', sprintf('Abort write of message %s to disk',
350             $message->id));
351 0         0 delete $self->file_wheels->{$message->id};
352 0         0 return;
353             }
354              
355             # Yes, we do want to die if we can't open the file for writing. It
356             # means something is wrong and it's very unlikely we can persist other
357             # messages.
358 690         20826 my $fn = $self->_get_filename($message->id);
359 690   50     6669 my $fh = IO::File->new( ">$fn" ) || die "Unable to save message in $fn: $!";
360              
361 690         146318 my $wheel = POE::Wheel::ReadWrite->new(
362             Handle => $fh,
363             Filter => POE::Filter::Stream->new(),
364             FlushedEvent => '_write_flushed_event'
365             );
366              
367             # initiate the write to disk
368 690         267221 $wheel->put($self->pending_writes->{$message->id});
369              
370             # stash the wheel in our maps
371 690         85227 $self->file_wheels->{$message->id} = {write_wheel => $wheel};
372              
373 690         22129 $self->wheel_to_message_map->{$wheel->ID()} = $message->id;
374             }
375              
376             sub _read_message_from_disk
377             {
378 2364     2364   911891 my ($self, $kernel, $id, $callback) = @_[ OBJECT, KERNEL, ARG0, ARG1 ];
379              
380 2364 50       96359 if ($self->file_wheels->{$id})
381             {
382 0         0 my $here = __PACKAGE__.'::_read_message_from_disk()';
383 0         0 $self->log('emergency',
384             "$here: A wheel already exists for this message ($id)! ".
385             'This should never happen!'
386             );
387 0         0 return;
388             }
389              
390 2364         7909 my $fn = $self->_get_filename($id);
391 2364         18278 my $fh = IO::File->new( $fn );
392            
393 2364         203860 $self->log( 'debug', "Starting to read $fn from disk" );
394              
395             # if we can't find the message body. This usually happens as a result
396             # of crash recovery.
397 2364 100       8761 unless ($fh)
398             {
399 20         117 $self->log( 'warning', "Can't find $fn on disk! Discarding message." );
400              
401             # we need to get the message out of the info store
402 20         776 $self->info_storage->remove($id);
403              
404 20         82 @_ = (NotFound);
405 20         74 goto $callback;
406             }
407            
408             # setup the wheel
409 2344         17467 my $wheel = POE::Wheel::ReadWrite->new(
410             Handle => $fh,
411             Filter => POE::Filter::Stream->new(),
412             InputEvent => '_read_input',
413             ErrorEvent => '_read_error'
414             );
415              
416             # stash the wheel in our maps
417 2344         957241 $self->file_wheels->{$id} = {
418             read_wheel => $wheel,
419             accumulator => q{},
420             callback => $callback
421             };
422 2344         82210 $self->wheel_to_message_map->{$wheel->ID()} = $id;
423             }
424              
425             sub _read_input
426             {
427 2344     2344   729768 my ($self, $kernel, $input, $wheel_id) = @_[ OBJECT, KERNEL, ARG0, ARG1 ];
428              
429             # We do care about reading during shutdown! Maybe. We may be using this as
430             # a front-store (HA!), and doing empty.
431              
432 2344         96583 my $id = $self->wheel_to_message_map->{$wheel_id};
433 2344         79906 $self->file_wheels->{$id}->{accumulator} .= $input;
434             }
435              
436             sub _read_error
437             {
438 2344     2344   577757 my ($self, $op, $errnum, $errstr, $wheel_id) = @_[ OBJECT, ARG0..ARG3 ];
439              
440 2344 50 33     14871 if ( $op eq 'read' and $errnum == 0 )
441             {
442             # EOF! Our message is now totally assembled. Hurray!
443              
444 2344         96371 my $id = $self->wheel_to_message_map->{$wheel_id};
445 2344         78479 my $info = $self->file_wheels->{$id};
446 2344         6275 my $body = $info->{accumulator};
447 2344         5167 my $callback = $info->{callback};
448              
449 2344         6593 my $fn = $self->_get_filename($id);
450 2344         13268 $self->log('debug', "Finished reading $fn");
451              
452             # clear our state
453 2344         80347 delete $self->wheel_to_message_map->{$wheel_id};
454 2344         77096 delete $self->file_wheels->{$id};
455              
456             # NOTE: I have never seen this happen, but it seems theoretically
457             # possible. Considering the former problem with leaking FD's, I'd
458             # rather keep this here just in case.
459 2344 50       7901 $self->_unlink_file($id) if ($info->{delete_me});
460              
461             # send the message out!
462 2344         7585 @_ = ($body);
463 2344         13371 goto $callback;
464             }
465             else
466             {
467 0         0 $self->log( 'error', "$op: Error $errnum $errstr" );
468             }
469             }
470              
471             sub _write_flushed_event
472             {
473 690     690   657730 my ($self, $kernel, $wheel_id) = @_[ OBJECT, KERNEL, ARG0 ];
474              
475             # remove from the first map
476 690         32721 my $id = delete $self->wheel_to_message_map->{$wheel_id};
477              
478 690         5518 $self->log( 'debug', "Finished writing message $id to disk" );
479              
480             # remove from the second map
481 690         24705 my $info = delete $self->file_wheels->{$id};
482              
483             # Write isn't pending anymore. :)
484 690         24996 delete $self->pending_writes->{$id};
485              
486             # If we were actively writing the file when the message to delete
487             # came, we cannot actually delete it until the FD gets flushed, or the FD
488             # will live until the program dies.
489 690 50       4813 $self->_unlink_file($id) if ($info->{delete_me});
490             }
491              
492             1;
493              
494             __END__
495              
496             =pod
497              
498             =head1 NAME
499              
500             POE::Component::MessageQueue::Storage::FileSystem -- A storage engine that keeps message bodies on the filesystem
501              
502             =head1 SYNOPSIS
503              
504             use POE;
505             use POE::Component::MessageQueue;
506             use POE::Component::MessageQueue::Storage::FileSystem;
507             use POE::Component::MessageQueue::Storage::DBI;
508             use strict;
509              
510             # For mysql:
511             my $DB_DSN = 'DBI:mysql:database=perl_mq';
512             my $DB_USERNAME = 'perl_mq';
513             my $DB_PASSWORD = 'perl_mq';
514              
515             POE::Component::MessageQueue->new({
516             storage => POE::Component::MessageQueue::Storage::FileSystem->new({
517             info_storage => POE::Component::MessageQueue::Storage::DBI->new({
518             dsn => $DB_DSN,
519             username => $DB_USERNAME,
520             password => $DB_PASSWORD,
521             }),
522             data_dir => $DATA_DIR,
523             })
524             });
525              
526             POE::Kernel->run();
527             exit;
528              
529             =head1 DESCRIPTION
530              
531             A storage engine that wraps around another storage engine in order to store
532             the message bodies on the file system. The other message properties are
533             stored with the wrapped storage engine.
534              
535             While I would argue that using this module is less efficient than using
536             L<POE::Component::MessageQueue::Storage::Complex>, using it directly would
537             make sense if persistance was your primary concern. All messages stored via
538             this backend will be persistent regardless of whether they have the persistent
539             header set to true or not. Every message is stored, even if it is handled
540             right away and will be removed immediately after having been stored.
541              
542             =head1 CONSTRUCTOR PARAMETERS
543              
544             =over 2
545              
546             =item info_storage => L<POE::Component::MessageQueue::Storage>
547              
548             The storage engine used to store message properties.
549              
550             =item data_dir => SCALAR
551              
552             The directory to store the files containing the message body's.
553              
554             =back
555              
556             =head1 SUPPORTED STOMP HEADERS
557              
558             Be sure to check also the storage engine you are wrapping!
559              
560             =over 4
561              
562             =item B<persistent>
563              
564             I<Ignored>. All message bodies are always persisted.
565              
566             =item B<expire-after>
567              
568             I<Ignored>. All message bodies are kept until handled.
569              
570             =item B<deliver-after>
571              
572             I<Fully Supported>.
573              
574             =back
575              
576             =head1 SEE ALSO
577              
578             L<POE::Component::MessageQueue>,
579             L<POE::Component::MessageQueue::Storage>
580              
581             I<Other storage engines:>
582              
583             L<POE::Component::MessageQueue::Storage::DBI>,
584             L<POE::Component::MessageQueue::Storage::Memory>,
585             L<POE::Component::MessageQueue::Storage::BigMemory>,
586             L<POE::Component::MessageQueue::Storage::Generic>,
587             L<POE::Component::MessageQueue::Storage::Generic::DBI>,
588             L<POE::Component::MessageQueue::Storage::Throttled>,
589             L<POE::Component::MessageQueue::Storage::Complex>,
590             L<POE::Component::MessageQueue::Storage::Default>
591              
592             =cut
593