File Coverage

blib/lib/Net/Stomp/Producer.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             package Net::Stomp::Producer;
2             $Net::Stomp::Producer::VERSION = '2.003';
3             {
4             $Net::Stomp::Producer::DIST = 'Net-Stomp-Producer';
5             }
6 1     1   103933 use Moose;
  0            
  0            
7             use namespace::autoclean;
8             with 'Net::Stomp::MooseHelpers::CanConnect' => { -version => '2.6' };
9             with 'Net::Stomp::MooseHelpers::ReconnectOnFailure';
10             use MooseX::Types::Moose qw(Str Bool CodeRef HashRef);
11             use Net::Stomp::Producer::Exceptions;
12             use Module::Runtime 'use_package_optimistically';
13             use Try::Tiny;
14              
15             # ABSTRACT: helper object to send messages via Net::Stomp
16              
17              
18             has serializer => (
19             isa => CodeRef,
20             is => 'rw',
21             default => sub { \&_no_serializer },
22             );
23              
24             sub _no_serializer {
25             my ($message) = @_;
26             return $message unless ref $message;
27              
28             Net::Stomp::Producer::Exceptions::CantSerialize->throw({
29             previous_exception => q{can't send a reference without a serializer},
30             message_body => $message,
31             });
32             }
33              
34              
35             has default_headers => (
36             isa => HashRef,
37             is => 'rw',
38             default => sub { { } },
39             );
40              
41              
42             has transactional_sending => (
43             isa => Bool,
44             is => 'rw',
45             default => 0,
46             trigger => \&_transactional_sending_compat,
47             );
48              
49             sub _transactional_sending_compat {
50             my ($self, $value) = @_;
51              
52             if ($value) { $self->sending_method('transactional') }
53             else { $self->sending_method('') }
54             }
55              
56              
57             has sending_method => (
58             isa => Str,
59             is => 'rw',
60             default => '',
61             );
62              
63             sub _send_method_to_call {
64             my ($self,$requested_method) = @_;
65              
66             $requested_method ||= $self->sending_method;
67             my $method_name =
68             ($requested_method eq '' or $requested_method eq 'default')
69             ? 'send'
70             : "send_${requested_method}";
71             return $method_name;
72             }
73              
74             around 'sending_method' => sub {
75             my ($orig,$self,$value) = @_;
76             return $self->$orig() unless @_ > 2;
77              
78             my $method = $self->_send_method_to_call($value);
79             Net::Stomp::Producer::Exceptions::BadMethod->throw({
80             sending_method_value => $value,
81             method_to_call => $method,
82             }) unless $self->connection->can($method);
83              
84             return $self->$orig($value);
85             };
86              
87              
88             sub _prepare_message {
89             my ($self,$destination,$headers,$body) = @_;
90             use bytes;
91              
92             try { $body = $self->serializer->($body) }
93             catch {
94             if (eval {$_[0]->isa('Net::Stomp::Producer::Exceptions::CantSerialize')}) {
95             die $_[0];
96             }
97             my $prev=$_[0];
98             Net::Stomp::Producer::Exceptions::CantSerialize->throw({
99             message_body => $body,
100             previous_exception => $prev,
101             });
102             };
103              
104             my %actual_headers=(
105             %{$self->default_headers},
106             %$headers,
107             #'content-length' => length($body),
108             body => $body,
109             );
110              
111             $actual_headers{destination} = $destination if defined $destination;
112              
113             for ($actual_headers{destination}) {
114             $_ = "/$_"
115             unless m{^/};
116             }
117              
118             return \%actual_headers;
119             }
120              
121             sub _really_send {
122             my ($self,$frame) = @_;
123              
124             my $method = $self->_send_method_to_call;
125              
126             $self->reconnect_on_failure(
127             sub { $_[0]->connection->$method($_[1]) },
128             $frame,
129             );
130             }
131              
132             sub send {
133             my ($self,$destination,$headers,$body) = @_;
134              
135             my $actual_headers = $self->_prepare_message($destination,$headers,$body);
136              
137             $self->_really_send($actual_headers);
138              
139             return;
140             }
141              
142              
143             has transformer_args => (
144             is => 'rw',
145             isa => HashRef,
146             default => sub { { } },
147             );
148              
149              
150             sub make_transformer {
151             my ($self,$transformer) = @_;
152              
153             return $transformer if ref($transformer);
154              
155             use_package_optimistically($transformer);
156             if ($transformer->can('new')) {
157             # shallow clone, to make it less likely that a transformer
158             # will clobber our args
159             return $transformer->new(
160             { %{$self->transformer_args} }
161             );
162             }
163             return $transformer;
164             }
165              
166              
167             sub transform {
168             my ($self,$transformer,@input) = @_;
169              
170             $transformer=$self->make_transformer($transformer);
171              
172             my $method = try { $transformer->can('transform') }
173             or Net::Stomp::Producer::Exceptions::BadTransformer->throw({
174             transformer => $transformer,
175             });
176              
177             my @messages = $transformer->$method(@input);
178              
179             my $vmethod = try { $transformer->can('validate') };
180              
181             my @ret;
182              
183             while (my ($headers, $body) = splice @messages, 0, 2) {
184             if ($vmethod) {
185             my ($exception,$valid);
186             try {
187             $valid = $transformer->$vmethod($headers,$body);
188             } catch { $exception = $_ };
189             if (!$valid) {
190             Net::Stomp::Producer::Exceptions::Invalid->throw({
191             transformer => $transformer,
192             message_body => $body,
193             message_headers => $headers,
194             previous_exception => $exception,
195             });
196             }
197             }
198             push @ret,$headers,$body;
199             }
200              
201             return @ret;
202             }
203              
204              
205             sub send_many {
206             my ($self,@messages) = @_;
207              
208             while (my ($headers, $body) = splice @messages, 0, 2) {
209             $self->send(undef,$headers,$body);
210             }
211              
212             return;
213             }
214              
215              
216             sub transform_and_send {
217             my ($self,$transformer,@input) = @_;
218              
219             my @messages = $self->transform($transformer,@input);
220              
221             $self->send_many(@messages);
222              
223             return;
224             }
225              
226             __PACKAGE__->meta->make_immutable;
227              
228              
229             1;
230              
231             __END__
232              
233             =pod
234              
235             =encoding UTF-8
236              
237             =head1 NAME
238              
239             Net::Stomp::Producer - helper object to send messages via Net::Stomp
240              
241             =head1 VERSION
242              
243             version 2.003
244              
245             =head1 SYNOPSIS
246              
247             my $ser = JSON::XS->new->utf8;
248              
249             my $p = Net::Stomp::Producer->new({
250             servers => [ { hostname => 'localhost', port => 61613 } ],
251             serializer => sub { $ser->encode($_[0]) },
252             default_headers => { 'content-type' => 'json' },
253             });
254              
255             $p->send('/queue/somewhere',
256             { type => 'my_message' },
257             { a => [ 'data', 'structure' ] });
258              
259             Also:
260              
261             package My::Message::Transformer {
262             use Moose;
263             sub transform {
264             my ($self,@elems) = @_;
265              
266             return { destination => '/queue/somewhere',
267             type => 'my_message', },
268             { a => \@elems };
269             }
270             }
271              
272             $p->transform_and_send('My::Message::Transformer',
273             'data','structure');
274              
275             Or even:
276              
277             my $t = My::Message::Transformer->new();
278             $p->transform_and_send($t,
279             'data','structure');
280              
281             They all send the same message.
282              
283             =head1 DESCRIPTION
284              
285             This class sends messages via a STOMP connection (see
286             L<Net::Stomp::MooseHelpers::CanConnect>). It provides facilities for
287             serialisation and validation. You can have an instance of this class
288             as a singleton / global in your process, and use it to send all your
289             messages: this is recommended, as it will prevent flooding the broker
290             with many connections (each instance would connect independently, and
291             if you create many instances per second, the broker or your process
292             may run out of file descriptiors and stop working).
293              
294             You can use it at several levels:
295              
296             =head2 Raw sending
297              
298             my $p = Net::Stomp::Producer->new({
299             servers => [ { hostname => 'localhost', port => 61613 } ],
300             });
301              
302             $p->send($destination,\%headers,$body_byte_string);
303              
304             This will just wrap the parameters in a L<Net::Stomp::Frame> and send
305             it. C<$destination> can be undef, if you have set it in the
306             C<%headers>.
307              
308             =head2 Serialisation support
309              
310             my $p = Net::Stomp::Producer->new({
311             servers => [ { hostname => 'localhost', port => 61613 } ],
312             serializer => sub { encode_json($_[0]) },
313             });
314              
315             $p->send($destination,\%headers,$body_hashref);
316              
317             The body will be passed through the C<serializer>, and the resulting
318             string will be used as above.
319              
320             =head2 Transformer instance
321              
322             $p->transform_and_send($transformer_obj,@args);
323              
324             This will call C<< $transformer_obj->transform(@args) >>. That
325             function should return a list (with an even number of elements). Each
326             pair of elements is interpreted as C<< \%headers, $body_ref >> and
327             passed to L</send> as above (with no C<destination>, so the
328             transformer should set it in the headers). It's not an error for the
329             transformer to return an empty list: it just means that nothing will
330             be sent.
331              
332             =head2 Transformer class
333              
334             my $p = Net::Stomp::Producer->new({
335             servers => [ { hostname => 'localhost', port => 61613 } ],
336             transformer_args => { some => 'param' },
337             });
338              
339             $p->transform_and_send($transformer_class,@args);
340              
341             The transformer will be instantiated like C<<
342             $transformer_class->new($p->transformer_args) >>, then the object will
343             be called as above.
344              
345             =head2 Transform & validate
346              
347             If the transformer class / object supports the C<validate> method, it
348             will be called before sending each message, like:
349              
350             $transformer_obj->validate(\%headers,$body_ref);
351              
352             This method is expected to return a true value if the message is
353             valid, and throw a meaningful exception if it is not. The exception
354             will be wrapped in a L<Net::Stomp::Producer::Exceptions::Invalid>. If
355             the C<validate> method returns false without throwing any exception,
356             L<Net::Stomp::Producer::Exceptions::Invalid> will still be throw, but
357             the C<previous_exception> slot will be undef.
358              
359             =head1 ATTRIBUTES
360              
361             =head2 C<serializer>
362              
363             A coderef that, passed the body parameter from L</send>, returns a
364             byte string to use as the frame body. The default coderef will just
365             pass non-refs through, and die (with a
366             L<Net::Stomp::Producer::Exceptions::CantSerialize> exception) if
367             passed a ref.
368              
369             =head2 C<default_headers>
370              
371             Hashref of STOMP headers to use for every frame we send. Headers
372             passed in to L</send> take precedence. There is no support for
373             I<removing> a default header for a single send.
374              
375             =head2 C<transactional_sending>
376              
377             B<DEPRECATED>. Use L</sending_method> instead. This boolean was too
378             restrictive.
379              
380             Instead of doing C<< ->transactional_sending(1) >> do C<<
381             ->sending_method('transactional') >>.
382              
383             Instead of doing C<< ->transactional_sending(0) >> do C<<
384             ->sending_method('') >> or C<< ->sending_method('default') >>.
385              
386             Boolean, defaults to false. If true, use
387             L<Net::Stomp/send_transactional> instead of L<Net::Stomp/send> to send
388             frames.
389              
390             =head2 C<sending_method>
391              
392             String, defaults to C<''>. Selects which method to use on the
393             connection's L<Net::Stomp> object to actually send a message. The name
394             of the method is derived from the value of this attribute by
395             prepending C<send_> to it (so you can't abuse this to call arbitrary
396             methods), unless this attribute's value is C<''> or C<'default'>, in
397             which case the simple C<send> method will be used.
398              
399             =head2 C<transformer_args>
400              
401             Hashref to pass to the transformer constructor when
402             L</make_transformer> instantiates a transformer class.
403              
404             =head1 METHODS
405              
406             =head2 C<send>
407              
408             $p->send($destination,\%headers,$body);
409              
410             Serializes the C<$body> via the L</serializer>, merges the C<%headers>
411             with the L</default_headers>, setting the C<content-length> to the
412             byte length of the serialized body. Overrides the destination in the
413             headers with C<$destination> if it's defined.
414              
415             Finally, sends the frame.
416              
417             =head2 C<make_transformer>
418              
419             $p->make_transformer($class);
420              
421             If passed a reference, this function just returns it (it assumes it's
422             a transformer object ready to use).
423              
424             If passed a string, tries to load the class with
425             L<Module::Runtime::use_package_optimistically|Module::Runtime/use_package_optimistically>. If
426             the class has a C<new> method, it's invoked with the value of
427             L</transformer_args> to obtain an object that is then returned. If the
428             class does not have a C<new>, the class name is returned.
429              
430             =head2 C<transform>
431              
432             my (@headers_and_bodies) = $p->transform($transformer,@data);
433              
434             Uses L</make_transformer> to (optionally) instantiate a transformer
435             object, then tries to call C<transform> on it. If there is no such
436             method, a L<Net::Stomp::Producer::Exceptions::BadTransformer> is
437             thrown.
438              
439             The transformer is expected to return a list of (header,body) pairs
440             (that is, a list with an even number of elements; I<not> a list of
441             arrayrefs!).
442              
443             Each message in the returned list is optionally validated, then returned.
444              
445             The optional validation happens if the transformer C<<
446             ->can('validate') >>. If it can, that method is called like:
447              
448             $transformer->validate($header,$body_ref);
449              
450             The method is expected to return a true value if the message is valid,
451             and throw a meaningful exception if it is not. The exception will be
452             wrapped in a L<Net::Stomp::Producer::Exceptions::Invalid>. If the
453             C<validate> method returns false without throwing any exception,
454             L<Net::Stomp::Producer::Exceptions::Invalid> will still be throw, but
455             the C<previous_exception> slot will be undef.
456              
457             It's not an error for the transformer to return an empty list: it just
458             means that nothing will be returned.
459              
460             =head2 C<send_many>
461              
462             $p->send_many(@headers_and_bodies);
463              
464             Given a list of (header,body) pairs (that is, a list with an even
465             number of elements; I<not> a list of arrayrefs!), it will send each
466             pair as a message. Useful in combination with L</transform>.
467              
468             It's not an error for the list to beempty: it just means that nothing
469             will be sent.
470              
471             =head2 C<transform_and_send>
472              
473             $p->transform_and_send($transformer,@data);
474              
475             Equivalent to:
476              
477             $p->send_many($p->transform($transformer,@data));
478              
479             which is similar to:
480              
481             my ($header,$body) = $p->transform($transformer,@data);
482             $p->send(undef,$header,$body);
483              
484             but it works also when the transformer returns more than one pair.
485              
486             It's not an error for the transformer to return an empty list: it just
487             means that nothing will be sent.
488              
489             I<< Why would I ever want to use L</transform> and L</send_many> separately? >>
490              
491             Let's say you are in a transaction, and you want to fail if the
492             messages cannot be prepared, but not fail if the prepared messages
493             cannot be sent. In this case, you call L</transform> inside the
494             transaction, and L</send_many> outside of it.
495              
496             But yes, in most cases you should really just call
497             C<transform_and_send>.
498              
499             =head1 EXAMPLES
500              
501             You can find examples of use in the tests, or at
502             https://github.com/dakkar/CatalystX-StompSampleApps
503              
504             =head1 AUTHOR
505              
506             Gianni Ceccarelli <gianni.ceccarelli@net-a-porter.com>
507              
508             =head1 COPYRIGHT AND LICENSE
509              
510             This software is copyright (c) 2012 by Net-a-porter.com.
511              
512             This is free software; you can redistribute it and/or modify it under
513             the same terms as the Perl 5 programming language system itself.
514              
515             =cut