File Coverage

blib/lib/Plack/Handler/Stomp.pm
Criterion Covered Total %
statement 138 146 94.5
branch 28 40 70.0
condition 10 14 71.4
subroutine 30 31 96.7
pod 11 11 100.0
total 217 242 89.6


line stmt bran cond sub pod time code
1             package Plack::Handler::Stomp;
2             $Plack::Handler::Stomp::VERSION = '1.14';
3             {
4             $Plack::Handler::Stomp::DIST = 'Plack-Handler-Stomp';
5             }
6 13     13   1497 use Moose;
  13         25  
  13         93  
7 13     13   81342 use MooseX::Types::Moose qw(Bool);
  13         31  
  13         136  
8 13     13   62253 use Plack::Handler::Stomp::Types qw(Logger PathMap);
  13         62  
  13         91  
9 13     13   26080 use Net::Stomp::MooseHelpers::Types qw(NetStompish);
  13         1344379  
  13         113  
10 13     13   46141 use Plack::Handler::Stomp::PathInfoMunger 'munge_path_info';
  13         33  
  13         82  
11 13     13   6380 use Plack::Handler::Stomp::Exceptions;
  13         47  
  13         568  
12 13     13   101 use Net::Stomp::MooseHelpers::Exceptions;
  13         27  
  13         298  
13 13     13   60 use namespace::autoclean;
  13         25  
  13         115  
14 13     13   936 use Try::Tiny;
  13         32  
  13         806  
15 13     13   4652 use Plack::Util;
  13         77514  
  13         14438  
16              
17             with 'Net::Stomp::MooseHelpers::CanConnect' => { -version => '2.6' };
18             with 'Net::Stomp::MooseHelpers::CanSubscribe' => { -version => '2.6' };
19             with 'Net::Stomp::MooseHelpers::ReconnectOnFailure';
20              
21             # ABSTRACT: adapt STOMP to (almost) HTTP, via Plack
22              
23              
24             has logger => (
25             is => 'rw',
26             isa => Logger,
27             lazy_build => 1,
28             );
29             sub _build_logger {
30 0     0   0 require Net::Stomp::StupidLogger;
31 0         0 Net::Stomp::StupidLogger->new();
32             }
33              
34             sub _build_connection {
35 14     14   2004439 my ($self) = @_;
36              
37             return $self->connection_builder->({
38 14         32 %{$self->extra_connection_builder_args},
  14         425  
39             logger => $self->logger,
40             hosts => $self->servers,
41             });
42             }
43              
44              
45             has destination_path_map => (
46             is => 'ro',
47             isa => PathMap,
48             default => sub { { } },
49             );
50              
51              
52             has one_shot => (
53             is => 'rw',
54             isa => Bool,
55             default => 0,
56             );
57              
58              
59             sub run {
60 19     19 1 277 my ($self, $app) = @_;
61              
62 19         46 my $exception;
63             $self->reconnect_on_failure(
64             sub {
65             try {
66 20         872 $self->subscribe();
67              
68 19         1353 $self->frame_loop($app);
69             } catch {
70 19         588 $exception = $_;
71 20     20   22157 };
72 19 50       166 if ($exception) {
73 19 50       182 if (!blessed $exception) {
74 0         0 $exception = "unhandled exception $exception";
75 0         0 return;
76             }
77 19 100       192 if ($exception->isa('Plack::Handler::Stomp::Exceptions::AppError')) {
78 1         7 return;
79             }
80 18 100       109 if ($exception->isa('Plack::Handler::Stomp::Exceptions::UnknownFrame')) {
81 1         3 return;
82             }
83 17 100       62 if ($exception->isa('Plack::Handler::Stomp::Exceptions::OneShot')) {
84 16         30 $exception=undef;
85 16         557 return;
86             }
87 1         4 die $exception;
88             }
89 19         232 });
90 18 100       596 die $exception if defined $exception;
91 16         64 return;
92             }
93              
94              
95             sub frame_loop {
96 18     18 1 57 my ($self,$app) = @_;
97              
98 18         35 while (1) {
99 18         504 my $frame = $self->connection->receive_frame();
100 18 50 33     141 if(!$frame || !ref($frame)) {
101 0         0 Net::Stomp::MooseHelpers::Exceptions::Stomp->throw({
102             stomp_error => 'empty frame received',
103             });
104             }
105 18         82 $self->handle_stomp_frame($app, $frame);
106              
107 16 50       594 Plack::Handler::Stomp::Exceptions::OneShot->throw()
108             if $self->one_shot;
109             }
110             }
111              
112              
113             sub handle_stomp_frame {
114 23     23 1 74 my ($self, $app, $frame) = @_;
115              
116 23         85 my $command = $frame->command();
117 23         253 my $method = $self->can("handle_stomp_\L$command");
118 23 100       86 if ($method) {
119 22         83 $self->$method($app, $frame);
120             }
121             else {
122 1         74 Plack::Handler::Stomp::Exceptions::UnknownFrame->throw(
123             {frame=>$frame}
124             );
125             }
126             }
127              
128              
129             sub handle_stomp_error {
130 5     5 1 17 my ($self, $app, $frame) = @_;
131              
132 5         18 my $error = $frame->headers->{message};
133 5         167 $self->logger->warn($error);
134             }
135              
136              
137             sub handle_stomp_message {
138 16     16 1 47 my ($self, $app, $frame) = @_;
139              
140 16         67 my $env = $self->build_psgi_env($frame);
141             try {
142 16     16   806 $self->process_the_message($app,$env);
143              
144 15         480 $self->connection->ack({ frame => $frame });
145             } catch {
146 1     1   193 Plack::Handler::Stomp::Exceptions::AppError->throw({
147             app_error => $_
148             });
149 16         126 };
150             }
151              
152              
153             sub process_the_message {
154 16     16 1 41 my ($self,$app,$env) = @_;
155              
156 16         65 my $res = $app->($env);
157              
158 15         1283 my $flattened_response=[];
159 15     15   76 my $cb = sub { $flattened_response->[2].=$_[0] };
  15         169  
160              
161             my $response_handler = sub {
162 15     15   36 my ($response) = @_;
163              
164 15         37 $flattened_response->[0]=$response->[0];
165 15         35 $flattened_response->[1]=$response->[1];
166              
167 15         34 my $body=$response->[2];
168 15 50       38 if (defined $body) {
169 15         120 Plack::Util::foreach($body, $cb);
170             }
171             else {
172             return Plack::Util::inline_object(
173             write => $cb,
174             close => sub { },
175 0         0 );
176             }
177 15         73 };
178              
179 15 50       64 if (ref $res eq 'ARRAY') {
    0          
180 15         46 $response_handler->($res);
181             }
182             elsif (ref $res eq 'CODE') {
183 0         0 $res->($response_handler);
184             }
185             else {
186 0         0 Plack::Handler::Stomp::Exceptions::AppError->throw({
187             app_error => "Bad response $res"
188             });
189             }
190              
191 15         85 $self->maybe_send_reply($flattened_response);
192              
193 15         103 return;
194             }
195              
196              
197             sub handle_stomp_receipt {
198 1     1 1 4 my ($self, $app, $frame) = @_;
199              
200             $self->logger->debug('ignored RECEIPT frame for '
201 1         25 .$frame->headers->{'receipt-id'});
202             }
203              
204              
205             sub maybe_send_reply {
206 15     15 1 36 my ($self, $response) = @_;
207              
208 15         51 my $reply_to = $self->where_should_send_reply($response);
209 15 100       349 if ($reply_to) {
210 6         26 $self->send_reply($response,$reply_to);
211             }
212              
213 15         37 return;
214             }
215              
216              
217             sub where_should_send_reply {
218 15     15 1 37 my ($self, $response) = @_;
219              
220 15   66     58 return Plack::Util::header_get($response->[1],
221             'X-Reply-Address')
222             || Plack::Util::header_get($response->[1],
223             'X-STOMP-Reply-Address')
224             }
225              
226              
227             sub send_reply {
228 6     6 1 55 my ($self, $response, $reply_address) = @_;
229              
230 6         25 my $reply_queue = '/remote-temp-queue/' . $reply_address;
231              
232 6         12 my $content = '';
233 6 50       58 unless (Plack::Util::status_with_no_entity_body($response->[0])) {
234             # pre-flattened, see L</process_the_message>
235 6         50 $content = $response->[2];
236             }
237              
238 6         16 my %reply_hh = ();
239 6         13 while (my ($k,$v) = splice @{$response->[1]},0,2) {
  13         52  
240 7         16 $k=lc($k);
241 7 100       23 next if $k eq 'x-stomp-reply-address';
242 1 50       3 next if $k eq 'x-reply-address';
243 1 50       5 next unless $k =~ s{^x-stomp-}{};
244              
245 1         3 $reply_hh{lc($k)} = $v;
246             }
247              
248             $self->connection->send({
249 6         201 %reply_hh,
250             destination => $reply_queue,
251             body => $content
252             });
253              
254 6         9531 return;
255             }
256              
257              
258             after subscribe_single => sub {
259             my ($self,$sub,$headers) = @_;
260              
261             my $destination = $headers->{destination};
262             my $sub_id = $headers->{id};
263              
264             $self->destination_path_map->{$destination} =
265             $self->destination_path_map->{"/subscription/$sub_id"} =
266             $sub->{path_info} || $destination;
267              
268             return;
269             };
270              
271              
272             sub build_psgi_env {
273 16     16 1 48 my ($self, $frame) = @_;
274              
275 16         55 my $destination = $frame->headers->{destination};
276 16         105 my $sub_id = $frame->headers->{subscription};
277              
278 16         75 my $path_info;
279 16 100       57 if (defined $sub_id) {
280 11         367 $path_info = $self->destination_path_map->{"/subscription/$sub_id"}
281             };
282 16   100     260 $path_info ||= $self->destination_path_map->{$destination};
283 16 100       58 if ($path_info) {
284 15         75 $path_info = munge_path_info(
285             $path_info,
286             $self->current_server,
287             $frame,
288             );
289             }
290 16   66     59 $path_info ||= $destination; # should not really be needed
291              
292 13     13   113 use bytes;
  13         31  
  13         158  
293              
294             my $env = {
295             # server
296             SERVER_NAME => 'localhost',
297             SERVER_PORT => 0,
298             SERVER_PROTOCOL => 'STOMP',
299              
300             # client
301             REQUEST_METHOD => 'POST',
302             REQUEST_URI => $path_info,
303             SCRIPT_NAME => '',
304             PATH_INFO => $path_info,
305             QUERY_STRING => '',
306              
307             # broker
308             REMOTE_ADDR => $self->current_server->{hostname},
309              
310             # http
311             HTTP_USER_AGENT => 'Net::Stomp',
312             CONTENT_LENGTH => length($frame->body),
313             CONTENT_TYPE => ( $frame->headers->{'content-type'} || 'application/octet-stream' ),
314              
315             # psgi
316             'psgi.version' => [1,0],
317             'psgi.url_scheme' => 'http',
318             'psgi.multithread' => 0,
319             'psgi.multiprocess' => 0,
320             'psgi.run_once' => 0,
321             'psgi.nonblocking' => 0,
322             'psgi.streaming' => 1,
323             'psgi.input' => do {
324 16     8   645 open my $input, '<', \($frame->body);
  8         615  
  8         17  
  8         65  
325 16         6124 $input;
326             },
327             'psgi.errors' => Plack::Util::inline_object(
328 1     1   107 print => sub { $self->logger->error(@_) },
329 16   100     70 ),
330             };
331              
332 16 50       509 if ($frame->headers) {
333 16         109 for my $header (keys %{$frame->headers}) {
  16         49  
334 45         259 $env->{"jms.$header"} = $frame->headers->{$header};
335             }
336             }
337              
338 16         106 return $env;
339             }
340              
341             __PACKAGE__->meta->make_immutable;
342              
343              
344             1;
345              
346             __END__
347              
348             =pod
349              
350             =encoding UTF-8
351              
352             =head1 NAME
353              
354             Plack::Handler::Stomp - adapt STOMP to (almost) HTTP, via Plack
355              
356             =head1 VERSION
357              
358             version 1.14
359              
360             =head1 SYNOPSIS
361              
362             my $runner = Plack::Handler::Stomp->new({
363             servers => [ { hostname => 'localhost', port => 61613 } ],
364             subscriptions => [
365             { destination => '/queue/plack-handler-stomp-test' },
366             { destination => '/topic/plack-handler-stomp-test',
367             headers => {
368             selector => q{custom_header = '1' or JMSType = 'test_foo'},
369             },
370             path_info => '/topic/ch1', },
371             { destination => '/topic/plack-handler-stomp-test',
372             headers => {
373             selector => q{custom_header = '2' or JMSType = 'test_bar'},
374             },
375             path_info => '/topic/ch2', },
376             ],
377             });
378             $runner->run(MyApp->get_app());
379              
380             =head1 DESCRIPTION
381              
382             Sometimes you want to use your very nice web-application-framework
383             dispatcher, module loading mechanisms, etc, but you're not really
384             writing a web application, you're writing a ActiveMQ consumer. In
385             those cases, this module is for you.
386              
387             This module is inspired by L<Catalyst::Engine::Stomp>, but aims to be
388             usable by any PSGI application.
389              
390             =head2 Roles Consumed
391              
392             We consume L<Net::Stomp::MooseHelpers::CanConnect> and
393             L<Net::Stomp::MooseHelpers::CanSubscribe>. Read those modules'
394             documentation to see how to configure servers and subscriptions.
395              
396             =head1 ATTRIBUTES
397              
398             =head2 C<logger>
399              
400             A logger object used by thes handler. Not to be confused by the logger
401             used by the application (either internally, or via a Middleware). Can
402             be any object that can C<debug>, C<info>, C<warn>, C<error>. Defaults
403             to an instance of L<Net::Stomp::StupidLogger>. This logger is passed
404             on to the L<Net::Stomp> object held in C<connection> (see
405             L<Net::Stomp::MooseHelpers::CanConnect>).
406              
407             =head2 C<destination_path_map>
408              
409             A hashref mapping destinations (queues, topics, subscription ids) to
410             URI paths to send to the application. You should not modify this.
411              
412             =head2 C<one_shot>
413              
414             If true, exit after the first message is consumed. Useful for testing,
415             defaults to false.
416              
417             =head1 METHODS
418              
419             =head2 C<run>
420              
421             Given a PSGI application, loops forever:
422              
423             =over 4
424              
425             =item *
426              
427             connect to a STOMP server (see
428             L<connect|Net::Stomp::MooseHelpers::CanConnect/connect> and
429             L<servers|Net::Stomp::MooseHelpers::CanConnect/servers> in
430             L<Net::Stomp::MooseHelpers::CanConnect>)
431              
432             =item *
433              
434             subscribe to whatever needed (see
435             L<subscribe|Net::Stomp::MooseHelpers::CanSubscribe/subscribe> and
436             L<subscriptions|Net::Stomp::MooseHelpers::CanSubscribe/subscriptions>
437             in L<Net::Stomp::MooseHelpers::CanSubscribe>)
438              
439             =item *
440              
441             consume STOMP frames in an inner loop (see L</frame_loop>)
442              
443             =back
444              
445             If the application throws an exception, the loop exits re-throwing the
446             exception. If the STOMP connection has problems, the loop is repeated
447             with a different server (see
448             L<next_server|Net::Stomp::MooseHelpers::CanConnect/next_server> in
449             L<Net::Stomp::MooseHelpers::CanConnect>).
450              
451             If L</one_shot> is set, this function exits after having consumed
452             exactly 1 frame.
453              
454             =head2 C<frame_loop>
455              
456             Loop forever receiving frames from the STOMP connection. Call
457             L</handle_stomp_frame> for each frame.
458              
459             If L</one_shot> is set, this function exits after having consumed
460             exactly 1 frame.
461              
462             =head2 C<handle_stomp_frame>
463              
464             Delegates the handling to L</handle_stomp_message>,
465             L</handle_stomp_error>, L</handle_stomp_receipt>, or throws
466             L<Plack::Handler::Stomp::Exceptions::UnknownFrame> if the frame is of
467             some other kind. If you want to handle different kind of frames (maybe
468             because you have some non-standard STOMP server), you can just
469             subclass and add methods; for example, to handle C<STRANGE> frames,
470             add a C<handle_stomp_strange> method.
471              
472             =head2 C<handle_stomp_error>
473              
474             Logs the error via the L</logger>, level C<warn>.
475              
476             =head2 C<handle_stomp_message>
477              
478             Calls L</build_psgi_env> to convert the STOMP message into a PSGI
479             environment.
480              
481             The environment is then passed to L</process_the_message>, and the
482             frame is acknowledged.
483              
484             =head2 C<process_the_message>
485              
486             Runs a PSGI environment through the application, then flattens the
487             response body into a simple string.
488              
489             The response so flattened is sent back via L</maybe_send_reply>.
490              
491             =head2 C<handle_stomp_receipt>
492              
493             Logs (level C<debug>) the receipt id. Nothing else is done with
494             receipts.
495              
496             =head2 C<maybe_send_reply>
497              
498             Calls L</where_should_send_reply> to determine if to send a reply, and
499             where. If it returns a true value, L</send_reply> is called to
500             actually send the reply.
501              
502             =head2 C<where_should_send_reply>
503              
504             Returns the header C<X-Reply-Address> or C<X-STOMP-Reply-Address> from
505             the response.
506              
507             =head2 C<send_reply>
508              
509             Converts the PSGI response into a STOMP frame, by removing the prefix
510             C<x-stomp-> from the key of header fields that have it, removing
511             entirely header fields that don't, and stringifying the body.
512              
513             Then sends the frame.
514              
515             =head2 C<subscribe_single>
516              
517             C<after> modifier on the method provided by
518             L<Net::Stomp::MooseHelpers::CanSubscribe>.
519              
520             It sets the L</destination_path_map> to map the destination and the
521             subscription id to the C<path_info> slot of the L</subscriptions>
522             element, or to the destination itself if C<path_info> is not defined.
523              
524             =head2 C<build_psgi_env>
525              
526             Builds a PSGI environment from the message, like:
527              
528             # server
529             SERVER_NAME => 'localhost',
530             SERVER_PORT => 0,
531             SERVER_PROTOCOL => 'STOMP',
532              
533             # client
534             REQUEST_METHOD => 'POST',
535             REQUEST_URI => $path_info,
536             SCRIPT_NAME => '',
537             PATH_INFO => $path_info,
538             QUERY_STRING => '',
539              
540             # broker
541             REMOTE_ADDR => $server_hostname,
542              
543             # http
544             HTTP_USER_AGENT => 'Net::Stomp',
545             CONTENT_LENGTH => length($body),
546             CONTENT_TYPE => $content-type,
547              
548             # psgi
549             'psgi.version' => [1,0],
550             'psgi.url_scheme' => 'http',
551             'psgi.multithread' => 0,
552             'psgi.multiprocess' => 0,
553             'psgi.run_once' => 0,
554             'psgi.nonblocking' => 0,
555             'psgi.streaming' => 1,
556              
557             In addition, reading from C<psgi.input> will return the message body,
558             and writing to C<psgi.errors> will log via the L</logger> at level
559             C<error>.
560              
561             Finally, every header in the STOMP message will be available in the
562             "namespace" C<jms.>, so for example the message type is in
563             C<jms.type>.
564              
565             The C<$path_info> is obtained from the L</destination_path_map>
566             (i.e. from the C<path_info> subscription options) passed through
567             L<munge_path_info|Plack::Handler::Stomp::PathInfoMunger/munge_path_info>.
568              
569             =head1 EXAMPLES
570              
571             You can find examples of use in the tests, or at
572             https://github.com/dakkar/CatalystX-StompSampleApps
573              
574             =head1 AUTHOR
575              
576             Gianni Ceccarelli <gianni.ceccarelli@net-a-porter.com>
577              
578             =head1 COPYRIGHT AND LICENSE
579              
580             This software is copyright (c) 2012 by Net-a-porter.com.
581              
582             This is free software; you can redistribute it and/or modify it under
583             the same terms as the Perl 5 programming language system itself.
584              
585             =cut