File Coverage

blib/lib/Net/Stomp/Producer.pm
Criterion Covered Total %
statement 88 88 100.0
branch 22 24 91.6
condition 6 6 100.0
subroutine 24 24 100.0
pod 5 5 100.0
total 145 147 98.6


line stmt bran cond sub pod time code
1             package Net::Stomp::Producer;
2             $Net::Stomp::Producer::VERSION = '2.005';
3             {
4             $Net::Stomp::Producer::DIST = 'Net-Stomp-Producer';
5             }
6 4     4   863911 use Moose;
  4         740709  
  4         29  
7 4     4   25049 use namespace::autoclean;
  4         19150  
  4         22  
8             with 'Net::Stomp::MooseHelpers::CanConnect' => { -version => '2.6' };
9             with 'Net::Stomp::MooseHelpers::ReconnectOnFailure';
10 4     4   1287 use MooseX::Types::Moose qw(Str Bool CodeRef HashRef);
  4         132640  
  4         29  
11 4     4   21366 use Net::Stomp::Producer::Exceptions;
  4         14  
  4         179  
12 4     4   30 use Module::Runtime 'use_package_optimistically';
  4         8  
  4         24  
13 4     4   183 use Try::Tiny;
  4         7  
  4         1593  
14              
15             # ABSTRACT: helper object to send messages via Net::Stomp
16              
17              
18             has serializer => (
19             isa => CodeRef,
20             is => 'rw',
21             default => sub { \&_no_serializer },
22             );
23              
24             sub _no_serializer {
25 21     21   44 my ($message) = @_;
26 21 100       76 return $message unless ref $message;
27              
28 1         20 Net::Stomp::Producer::Exceptions::CantSerialize->throw({
29             previous_exception => q{can't send a reference without a serializer},
30             message_body => $message,
31             });
32             }
33              
34              
35             has default_headers => (
36             isa => HashRef,
37             is => 'rw',
38             default => sub { { } },
39             );
40              
41              
42             has transactional_sending => (
43             isa => Bool,
44             is => 'rw',
45             default => 0,
46             trigger => \&_transactional_sending_compat,
47             );
48              
49             sub _transactional_sending_compat {
50 2     2   6 my ($self, $value) = @_;
51              
52 2 100       6 if ($value) { $self->sending_method('transactional') }
  1         4  
53 1         6 else { $self->sending_method('') }
54             }
55              
56              
57             has sending_method => (
58             isa => Str,
59             is => 'rw',
60             default => '',
61             );
62              
63             sub _send_method_to_call {
64 31     31   58 my ($self,$requested_method) = @_;
65              
66 31   100     151 $requested_method ||= $self->sending_method;
67 31 100 100     128 my $method_name =
68             ($requested_method eq '' or $requested_method eq 'default')
69             ? 'send'
70             : "send_${requested_method}";
71 31         62 return $method_name;
72             }
73              
74             around 'sending_method' => sub {
75             my ($orig,$self,$value) = @_;
76             return $self->$orig() unless @_ > 2;
77              
78             my $method = $self->_send_method_to_call($value);
79             Net::Stomp::Producer::Exceptions::BadMethod->throw({
80             sending_method_value => $value,
81             method_to_call => $method,
82             }) unless $self->connection->can($method);
83              
84             return $self->$orig($value);
85             };
86              
87              
88             sub _prepare_message {
89 27     27   63 my ($self,$destination,$headers,$body) = @_;
90 4     4   30 use bytes;
  4         18  
  4         56  
91              
92 27     27   1712 try { $body = $self->serializer->($body) }
93             catch {
94 2 100   2   2259 if (eval {$_[0]->isa('Net::Stomp::Producer::Exceptions::CantSerialize')}) {
  2         28  
95 1         7 die $_[0];
96             }
97 1         2 my $prev=$_[0];
98 1         10 Net::Stomp::Producer::Exceptions::CantSerialize->throw({
99             message_body => $body,
100             previous_exception => $prev,
101             });
102 27         196 };
103              
104             my %actual_headers=(
105 25         417 %{$self->default_headers},
  25         662  
106             %$headers,
107             #'content-length' => length($body),
108             body => $body,
109             );
110              
111 25 100       79 $actual_headers{destination} = $destination if defined $destination;
112              
113 25         61 for ($actual_headers{destination}) {
114 25 100       95 $_ = "/$_"
115             unless m{^/};
116             }
117              
118 25         66 return \%actual_headers;
119             }
120              
121             sub _really_send {
122 22     22   50 my ($self,$frame) = @_;
123              
124 22         58 my $method = $self->_send_method_to_call;
125              
126             $self->reconnect_on_failure(
127             sub {
128 23     23   116028 my $ret = $_[0]->connection->$method($_[1]);
129 23 100       416 die "Call to $method failed"
130             unless $ret;
131             },
132 22         161 $frame,
133             );
134             }
135              
136             sub send {
137 16     16 1 14607 my ($self,$destination,$headers,$body) = @_;
138              
139 16         54 my $actual_headers = $self->_prepare_message($destination,$headers,$body);
140              
141 14         44 $self->_really_send($actual_headers);
142              
143 14         350 return;
144             }
145              
146              
147             has transformer_args => (
148             is => 'rw',
149             isa => HashRef,
150             default => sub { { } },
151             );
152              
153              
154             sub make_transformer {
155 8     8 1 18 my ($self,$transformer) = @_;
156              
157 8 50       24 return $transformer if ref($transformer);
158              
159 8         35 use_package_optimistically($transformer);
160 8 100       2202 if ($transformer->can('new')) {
161             # shallow clone, to make it less likely that a transformer
162             # will clobber our args
163             return $transformer->new(
164 4         8 { %{$self->transformer_args} }
  4         123  
165             );
166             }
167 4         11 return $transformer;
168             }
169              
170              
171             sub transform {
172 8     8 1 96 my ($self,$transformer,@input) = @_;
173              
174 8         26 $transformer=$self->make_transformer($transformer);
175              
176 8     8   204 my $method = try { $transformer->can('transform') }
177 8 50       731 or Net::Stomp::Producer::Exceptions::BadTransformer->throw({
178             transformer => $transformer,
179             });
180              
181 8         118 my @messages = $transformer->$method(@input);
182              
183 8     8   124 my $vmethod = try { $transformer->can('validate') };
  8         158  
184              
185 8         75 my @ret;
186              
187 8         34 while (my ($headers, $body) = splice @messages, 0, 2) {
188 8 100       19 if ($vmethod) {
189 2         5 my ($exception,$valid);
190             try {
191 2     2   60 $valid = $transformer->$vmethod($headers,$body);
192 2     1   11 } catch { $exception = $_ };
  1         19  
193 2 100       23 if (!$valid) {
194 1         24 Net::Stomp::Producer::Exceptions::Invalid->throw({
195             transformer => $transformer,
196             message_body => $body,
197             message_headers => $headers,
198             previous_exception => $exception,
199             });
200             }
201             }
202 7         25 push @ret,$headers,$body;
203             }
204              
205 7         28 return @ret;
206             }
207              
208              
209             sub send_many {
210 7     7 1 510 my ($self,@messages) = @_;
211              
212 7         23 while (my ($headers, $body) = splice @messages, 0, 2) {
213 7         22 $self->send(undef,$headers,$body);
214             }
215              
216 6         12 return;
217             }
218              
219              
220             sub transform_and_send {
221 7     7 1 759 my ($self,$transformer,@input) = @_;
222              
223 7         25 my @messages = $self->transform($transformer,@input);
224              
225 6         182 $self->send_many(@messages);
226              
227 5         17 return;
228             }
229              
230             __PACKAGE__->meta->make_immutable;
231              
232              
233             1;
234              
235             __END__
236              
237             =pod
238              
239             =encoding UTF-8
240              
241             =head1 NAME
242              
243             Net::Stomp::Producer - helper object to send messages via Net::Stomp
244              
245             =head1 VERSION
246              
247             version 2.005
248              
249             =head1 SYNOPSIS
250              
251             my $ser = JSON::XS->new->utf8;
252              
253             my $p = Net::Stomp::Producer->new({
254             connect_headers => { login => 'some-login', passcode => 's3cr3t' },
255             servers => [
256             { hostname => 'broker1.local', port => 61613 },
257             { hostname => 'broker2.local', port => 61613, ssl => 1 },
258             { hostname => 'broker3.local', port => 61613, ssl => 1,
259             connect_headers => { login => 'some-different-login',
260             passcode => 'an0th3r-s3cr3t' },
261             },
262             ],
263             serializer => sub { $ser->encode($_[0]) },
264             default_headers => { 'content-type' => 'json' },
265             });
266              
267             $p->send('/queue/somewhere',
268             { type => 'my_message' },
269             { a => [ 'data', 'structure' ] });
270              
271             Also:
272              
273             package My::Message::Transformer {
274             use Moose;
275             sub transform {
276             my ($self,@elems) = @_;
277              
278             return { destination => '/queue/somewhere',
279             type => 'my_message', },
280             { a => \@elems };
281             }
282             }
283              
284             $p->transform_and_send('My::Message::Transformer',
285             'data','structure');
286              
287             Or even:
288              
289             my $t = My::Message::Transformer->new();
290             $p->transform_and_send($t,
291             'data','structure');
292              
293             They all send the same message.
294              
295             =head1 DESCRIPTION
296              
297             This class sends messages via a STOMP connection (see
298             L<Net::Stomp::MooseHelpers::CanConnect>). It provides facilities for
299             serialisation and validation. You can have an instance of this class
300             as a singleton / global in your process, and use it to send all your
301             messages: this is recommended, as it will prevent flooding the broker
302             with many connections (each instance would connect independently, and
303             if you create many instances per second, the broker or your process
304             may run out of file descriptiors and stop working).
305              
306             You can use it at several levels:
307              
308             =head2 Raw sending
309              
310             my $p = Net::Stomp::Producer->new({
311             servers => [ { hostname => 'localhost', port => 61613 } ],
312             });
313              
314             $p->send($destination,\%headers,$body_byte_string);
315              
316             This will just wrap the parameters in a L<Net::Stomp::Frame> and send
317             it. C<$destination> can be undef, if you have set it in the
318             C<%headers>.
319              
320             =head2 Serialisation support
321              
322             my $p = Net::Stomp::Producer->new({
323             servers => [ { hostname => 'localhost', port => 61613 } ],
324             serializer => sub { encode_json($_[0]) },
325             });
326              
327             $p->send($destination,\%headers,$body_hashref);
328              
329             The body will be passed through the C<serializer>, and the resulting
330             string will be used as above.
331              
332             =head2 Transformer instance
333              
334             $p->transform_and_send($transformer_obj,@args);
335              
336             This will call C<< $transformer_obj->transform(@args) >>. That
337             function should return a list (with an even number of elements). Each
338             pair of elements is interpreted as C<< \%headers, $body_ref >> and
339             passed to L</send> as above (with no C<destination>, so the
340             transformer should set it in the headers). It's not an error for the
341             transformer to return an empty list: it just means that nothing will
342             be sent.
343              
344             =head2 Transformer class
345              
346             my $p = Net::Stomp::Producer->new({
347             servers => [ { hostname => 'localhost', port => 61613 } ],
348             transformer_args => { some => 'param' },
349             });
350              
351             $p->transform_and_send($transformer_class,@args);
352              
353             The transformer will be instantiated like C<<
354             $transformer_class->new($p->transformer_args) >>, then the object will
355             be called as above.
356              
357             =head2 Transform & validate
358              
359             If the transformer class / object supports the C<validate> method, it
360             will be called before sending each message, like:
361              
362             $transformer_obj->validate(\%headers,$body_ref);
363              
364             This method is expected to return a true value if the message is
365             valid, and throw a meaningful exception if it is not. The exception
366             will be wrapped in a L<Net::Stomp::Producer::Exceptions::Invalid>. If
367             the C<validate> method returns false without throwing any exception,
368             L<Net::Stomp::Producer::Exceptions::Invalid> will still be throw, but
369             the C<previous_exception> slot will be undef.
370              
371             =head1 ATTRIBUTES
372              
373             =head2 C<serializer>
374              
375             A coderef that, passed the body parameter from L</send>, returns a
376             byte string to use as the frame body. The default coderef will just
377             pass non-refs through, and die (with a
378             L<Net::Stomp::Producer::Exceptions::CantSerialize> exception) if
379             passed a ref.
380              
381             =head2 C<default_headers>
382              
383             Hashref of STOMP headers to use for every frame we send. Headers
384             passed in to L</send> take precedence. There is no support for
385             I<removing> a default header for a single send.
386              
387             =head2 C<transactional_sending>
388              
389             B<DEPRECATED>. Use L</sending_method> instead. This boolean was too
390             restrictive.
391              
392             Instead of doing C<< ->transactional_sending(1) >> do C<<
393             ->sending_method('transactional') >>.
394              
395             Instead of doing C<< ->transactional_sending(0) >> do C<<
396             ->sending_method('') >> or C<< ->sending_method('default') >>.
397              
398             Boolean, defaults to false. If true, use
399             L<Net::Stomp/send_transactional> instead of L<Net::Stomp/send> to send
400             frames.
401              
402             =head2 C<sending_method>
403              
404             String, defaults to C<''>. Selects which method to use on the
405             connection's L<Net::Stomp> object to actually send a message. The name
406             of the method is derived from the value of this attribute by
407             prepending C<send_> to it (so you can't abuse this to call arbitrary
408             methods), unless this attribute's value is C<''> or C<'default'>, in
409             which case the simple C<send> method will be used.
410              
411             For example, C<< sending_method => 'with_receipt' >> will block
412             sending until the broker sends back a receipt for the message (or it
413             times out).
414              
415             C<< sending_method => 'transactional' >> will send a C<COMMIT> frame
416             when the receipt is received, or a C<ROLLBACK> frame if something
417             breaks.
418              
419             I<NOTE>: these methods work when the connection is used only to send
420             messages, and not to receive them! The current implementation will
421             very probably deadlock or throw exceptions at random moments if
422             messages arrive while you're sending.
423              
424             =head2 C<transformer_args>
425              
426             Hashref to pass to the transformer constructor when
427             L</make_transformer> instantiates a transformer class.
428              
429             =head1 METHODS
430              
431             =head2 C<send>
432              
433             $p->send($destination,\%headers,$body);
434              
435             Serializes the C<$body> via the L</serializer>, merges the C<%headers>
436             with the L</default_headers>, setting the C<content-length> to the
437             byte length of the serialized body. Overrides the destination in the
438             headers with C<$destination> if it's defined.
439              
440             Finally, sends the frame.
441              
442             =head2 C<make_transformer>
443              
444             $p->make_transformer($class);
445              
446             If passed a reference, this function just returns it (it assumes it's
447             a transformer object ready to use).
448              
449             If passed a string, tries to load the class with
450             L<Module::Runtime::use_package_optimistically|Module::Runtime/use_package_optimistically>. If
451             the class has a C<new> method, it's invoked with the value of
452             L</transformer_args> to obtain an object that is then returned. If the
453             class does not have a C<new>, the class name is returned.
454              
455             =head2 C<transform>
456              
457             my (@headers_and_bodies) = $p->transform($transformer,@data);
458              
459             Uses L</make_transformer> to (optionally) instantiate a transformer
460             object, then tries to call C<transform> on it. If there is no such
461             method, a L<Net::Stomp::Producer::Exceptions::BadTransformer> is
462             thrown.
463              
464             The transformer is expected to return a list of (header,body) pairs
465             (that is, a list with an even number of elements; I<not> a list of
466             arrayrefs!).
467              
468             Each message in the returned list is optionally validated, then returned.
469              
470             The optional validation happens if the transformer C<<
471             ->can('validate') >>. If it can, that method is called like:
472              
473             $transformer->validate($header,$body_ref);
474              
475             The method is expected to return a true value if the message is valid,
476             and throw a meaningful exception if it is not. The exception will be
477             wrapped in a L<Net::Stomp::Producer::Exceptions::Invalid>. If the
478             C<validate> method returns false without throwing any exception,
479             L<Net::Stomp::Producer::Exceptions::Invalid> will still be throw, but
480             the C<previous_exception> slot will be undef.
481              
482             It's not an error for the transformer to return an empty list: it just
483             means that nothing will be returned.
484              
485             =head2 C<send_many>
486              
487             $p->send_many(@headers_and_bodies);
488              
489             Given a list of (header,body) pairs (that is, a list with an even
490             number of elements; I<not> a list of arrayrefs!), it will send each
491             pair as a message. Useful in combination with L</transform>.
492              
493             It's not an error for the list to beempty: it just means that nothing
494             will be sent.
495              
496             =head2 C<transform_and_send>
497              
498             $p->transform_and_send($transformer,@data);
499              
500             Equivalent to:
501              
502             $p->send_many($p->transform($transformer,@data));
503              
504             which is similar to:
505              
506             my ($header,$body) = $p->transform($transformer,@data);
507             $p->send(undef,$header,$body);
508              
509             but it works also when the transformer returns more than one pair.
510              
511             It's not an error for the transformer to return an empty list: it just
512             means that nothing will be sent.
513              
514             I<< Why would I ever want to use L</transform> and L</send_many> separately? >>
515              
516             Let's say you are in a transaction, and you want to fail if the
517             messages cannot be prepared, but not fail if the prepared messages
518             cannot be sent. In this case, you call L</transform> inside the
519             transaction, and L</send_many> outside of it.
520              
521             But yes, in most cases you should really just call
522             C<transform_and_send>.
523              
524             =head1 EXAMPLES
525              
526             You can find examples of use in the tests, or at
527             https://github.com/dakkar/CatalystX-StompSampleApps
528              
529             =head1 AUTHOR
530              
531             Gianni Ceccarelli <gianni.ceccarelli@net-a-porter.com>
532              
533             =head1 COPYRIGHT AND LICENSE
534              
535             This software is copyright (c) 2012 by Net-a-porter.com.
536              
537             This is free software; you can redistribute it and/or modify it under
538             the same terms as the Perl 5 programming language system itself.
539              
540             =cut