File Coverage

blib/lib/Plack/Handler/Stomp/NoNetwork.pm
Criterion Covered Total %
statement 41 44 93.1
branch 4 10 40.0
condition n/a
subroutine 11 12 91.6
pod 1 1 100.0
total 57 67 85.0


line stmt bran cond sub pod time code
1             package Plack::Handler::Stomp::NoNetwork;
2             $Plack::Handler::Stomp::NoNetwork::VERSION = '1.14';
3             {
4             $Plack::Handler::Stomp::NoNetwork::DIST = 'Plack-Handler-Stomp';
5             }
6 2     2   12540 use Moose;
  2         4  
  2         14  
7 2     2   13434 use namespace::autoclean;
  2         6  
  2         20  
8 2     2   156 use Try::Tiny;
  2         4  
  2         132  
9 2     2   12 use File::ChangeNotify;
  2         4  
  2         64  
10 2     2   12 use Net::Stomp::MooseHelpers::ReadTrace 1.7;
  2         78  
  2         76  
11 2     2   18 use Path::Class;
  2         6  
  2         1324  
12             extends 'Plack::Handler::Stomp';
13              
14             # ABSTRACT: like L<Plack::Handler::Stomp>, but without a network
15              
16              
17             with 'Net::Stomp::MooseHelpers::TraceOnly';
18              
19             sub _default_servers {
20             [ {
21 1     1   846 hostname => 'not.using.the.network',
22             port => 9999,
23             } ]
24             }
25              
26             has subscription_directory_map => (
27             is => 'ro',
28             isa => 'HashRef',
29             default => sub { { } },
30             );
31              
32             after subscribe_single => sub {
33             my ($self,$sub,$headers) = @_;
34              
35             my $dest_dir = $self->trace_basedir->subdir(
36             $self->_dirname_from_destination(
37             $headers->{destination}
38             )
39             );
40             $dest_dir->mkpath;
41              
42             my $id = $headers->{id};
43              
44             $self->subscription_directory_map->{$dest_dir->stringify}=$id;
45              
46             return;
47             };
48              
49              
50             has file_watcher => (
51             is => 'ro',
52             isa => 'File::ChangeNotify::Watcher',
53             lazy_build => 1,
54             );
55             sub _build_file_watcher {
56 1     1   3 my ($self) = @_;
57              
58 1         2 my @directories = keys %{$self->subscription_directory_map};
  1         29  
59              
60             # File::ChangeNotify::Watcher::Default throws an exception if you
61             # ask it to monitor non-existent directories; coupled with the
62             # try/catch below, it would lead to an infinite loop. Let's make
63             # sure it does not happen
64 1         12 dir($_)->mkpath for @directories;
65              
66 1         561 return File::ChangeNotify->instantiate_watcher(
67             directories => \@directories,
68             filter => qr{^\d+\.\d+-send-},
69             );
70             }
71              
72              
73             has frame_reader => (
74             is => 'ro',
75             lazy_build => 1,
76             );
77             sub _build_frame_reader {
78 1     1   3 my ($self) = @_;
79              
80 1         31 return Net::Stomp::MooseHelpers::ReadTrace->new({
81             trace_basedir => $self->trace_basedir,
82             });
83             }
84              
85              
86             sub frame_loop {
87 1     1 1 3 my ($self,$app) = @_;
88              
89 1         2 while (1) {
90 6         15 my @events;
91             # if someone deletes multiple directories while we're looking
92             # at them, File::ChangeNotify::Watcher::Default gets very
93             # confused and throws an exception. Let's catch it and just
94             # re-build the watcher.
95 6     6   487 try { @events = $self->file_watcher->wait_for_events() }
96             catch {
97 0 0   0   0 if (/File::ChangeNotify::Watcher::Default::/) {
98 0         0 $self->clear_file_watcher;
99             }
100 0         0 else { die $_ }
101 6         66 };
102 5         10011423 for my $event (@events) {
103 5 50       38 next unless $event->type eq 'create';
104 5 50       73 next unless -f $event->path;
105             # loop until the reader can get a complete frame, to work
106             # around race conditions between the writer and us
107 5         10 my $frame;
108 5         25 while (!$frame) {
109 5         199 $frame = $self->frame_reader
110             ->read_frame_from_filename($event->path);
111             }
112              
113             # messages sent will be of type "SEND", but they would
114             # come back ask "MESSAGE" if they passed through a broker
115 5 50       2664 $frame->command('MESSAGE') if $frame->command eq 'SEND';
116              
117             $frame->headers->{subscription} =
118             $self->subscription_directory_map->{
119 5         307 file($event->path)->dir->stringify
120             };
121              
122 5         669 $self->handle_stomp_frame($app, $frame);
123              
124 5 50       395 Plack::Handler::Stomp::Exceptions::OneShot->throw()
125             if $self->one_shot;
126             }
127             }
128             }
129              
130             __PACKAGE__->meta->make_immutable;
131              
132             1;
133              
134             __END__
135              
136             =pod
137              
138             =encoding UTF-8
139              
140             =head1 NAME
141              
142             Plack::Handler::Stomp::NoNetwork - like L<Plack::Handler::Stomp>, but without a network
143              
144             =head1 VERSION
145              
146             version 1.14
147              
148             =head1 SYNOPSIS
149              
150             my $runner = Plack::Handler::Stomp::NoNetwork->new({
151             trace_basedir => '/tmp/mq',
152             subscriptions => [
153             { destination => '/queue/plack-handler-stomp-test' },
154             { destination => '/topic/plack-handler-stomp-test',
155             path_info => '/topic/ch1', },
156             { destination => '/topic/plack-handler-stomp-test',
157             path_info => '/topic/ch2', },
158             ],
159             });
160             $runner->run(MyApp->get_app());
161              
162             =head1 DESCRIPTION
163              
164             Just like L<Plack::Handler::Stomp>, but instead of using a network
165             connection, we get our frames from a directory.
166              
167             This class uses L<File::ChangeNotify> to monitor the
168             L<trace_basedir|Net::Stomp::MooseHelpers::TraceOnly/trace_basedir>,
169             and L<Net::Stomp::MooseHelpers::ReadTrace> to read the frames.
170              
171             It also consumes L<Net::Stomp::MooseHelpers::TraceOnly> to make sure
172             that every reply we try to send is actually written to disk instead of
173             a broker.
174              
175             =head2 WARNING!
176              
177             This class does not implement subscription selectors. If you have
178             multiple subscriptions for the same destination, a random one will be
179             used.
180              
181             =head1 ATTRIBUTES
182              
183             =head2 C<file_watcher>
184              
185             Instance of L<File::ChangeNotify::Watcher>, set up to monitor
186             C<trace_basedir> for sent messages.
187              
188             =head2 C<frame_reader>
189              
190             Instance of L<Net::Stomp::MooseHelpers::ReadTrace> used to parse
191             frames from disk.
192              
193             =head1 METHODS
194              
195             =head2 C<frame_loop>
196              
197             This method overrides the corresponding one from
198             L<Plack::Handler::Stomp>.
199              
200             Loop forever, collecting C<create> events from the
201             L</file_watcher>. Each new file is parsed by the L</frame_reader>,
202             then passed to
203             L<handle_stomp_frame|Plack::Handler::Stomp/handle_stomp_frame> as
204             usual.
205              
206             =head1 AUTHOR
207              
208             Gianni Ceccarelli <gianni.ceccarelli@net-a-porter.com>
209              
210             =head1 COPYRIGHT AND LICENSE
211              
212             This software is copyright (c) 2012 by Net-a-porter.com.
213              
214             This is free software; you can redistribute it and/or modify it under
215             the same terms as the Perl 5 programming language system itself.
216              
217             =cut