File Coverage

blib/lib/Net/Stomp/Producer.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             package Net::Stomp::Producer;
2             $Net::Stomp::Producer::VERSION = '2.004';
3             {
4             $Net::Stomp::Producer::DIST = 'Net-Stomp-Producer';
5             }
6 2     2   116073 use Moose;
  0            
  0            
7             use namespace::autoclean;
8             with 'Net::Stomp::MooseHelpers::CanConnect' => { -version => '2.6' };
9             with 'Net::Stomp::MooseHelpers::ReconnectOnFailure';
10             use MooseX::Types::Moose qw(Str Bool CodeRef HashRef);
11             use Net::Stomp::Producer::Exceptions;
12             use Module::Runtime 'use_package_optimistically';
13             use Try::Tiny;
14              
15             # ABSTRACT: helper object to send messages via Net::Stomp
16              
17              
18             has serializer => (
19             isa => CodeRef,
20             is => 'rw',
21             default => sub { \&_no_serializer },
22             );
23              
24             sub _no_serializer {
25             my ($message) = @_;
26             return $message unless ref $message;
27              
28             Net::Stomp::Producer::Exceptions::CantSerialize->throw({
29             previous_exception => q{can't send a reference without a serializer},
30             message_body => $message,
31             });
32             }
33              
34              
35             has default_headers => (
36             isa => HashRef,
37             is => 'rw',
38             default => sub { { } },
39             );
40              
41              
42             has transactional_sending => (
43             isa => Bool,
44             is => 'rw',
45             default => 0,
46             trigger => \&_transactional_sending_compat,
47             );
48              
49             sub _transactional_sending_compat {
50             my ($self, $value) = @_;
51              
52             if ($value) { $self->sending_method('transactional') }
53             else { $self->sending_method('') }
54             }
55              
56              
57             has sending_method => (
58             isa => Str,
59             is => 'rw',
60             default => '',
61             );
62              
63             sub _send_method_to_call {
64             my ($self,$requested_method) = @_;
65              
66             $requested_method ||= $self->sending_method;
67             my $method_name =
68             ($requested_method eq '' or $requested_method eq 'default')
69             ? 'send'
70             : "send_${requested_method}";
71             return $method_name;
72             }
73              
74             around 'sending_method' => sub {
75             my ($orig,$self,$value) = @_;
76             return $self->$orig() unless @_ > 2;
77              
78             my $method = $self->_send_method_to_call($value);
79             Net::Stomp::Producer::Exceptions::BadMethod->throw({
80             sending_method_value => $value,
81             method_to_call => $method,
82             }) unless $self->connection->can($method);
83              
84             return $self->$orig($value);
85             };
86              
87              
88             sub _prepare_message {
89             my ($self,$destination,$headers,$body) = @_;
90             use bytes;
91              
92             try { $body = $self->serializer->($body) }
93             catch {
94             if (eval {$_[0]->isa('Net::Stomp::Producer::Exceptions::CantSerialize')}) {
95             die $_[0];
96             }
97             my $prev=$_[0];
98             Net::Stomp::Producer::Exceptions::CantSerialize->throw({
99             message_body => $body,
100             previous_exception => $prev,
101             });
102             };
103              
104             my %actual_headers=(
105             %{$self->default_headers},
106             %$headers,
107             #'content-length' => length($body),
108             body => $body,
109             );
110              
111             $actual_headers{destination} = $destination if defined $destination;
112              
113             for ($actual_headers{destination}) {
114             $_ = "/$_"
115             unless m{^/};
116             }
117              
118             return \%actual_headers;
119             }
120              
121             sub _really_send {
122             my ($self,$frame) = @_;
123              
124             my $method = $self->_send_method_to_call;
125              
126             $self->reconnect_on_failure(
127             sub {
128             my $ret = $_[0]->connection->$method($_[1]);
129             die "Call to $method failed"
130             unless $ret;
131             },
132             $frame,
133             );
134             }
135              
136             sub send {
137             my ($self,$destination,$headers,$body) = @_;
138              
139             my $actual_headers = $self->_prepare_message($destination,$headers,$body);
140              
141             $self->_really_send($actual_headers);
142              
143             return;
144             }
145              
146              
147             has transformer_args => (
148             is => 'rw',
149             isa => HashRef,
150             default => sub { { } },
151             );
152              
153              
154             sub make_transformer {
155             my ($self,$transformer) = @_;
156              
157             return $transformer if ref($transformer);
158              
159             use_package_optimistically($transformer);
160             if ($transformer->can('new')) {
161             # shallow clone, to make it less likely that a transformer
162             # will clobber our args
163             return $transformer->new(
164             { %{$self->transformer_args} }
165             );
166             }
167             return $transformer;
168             }
169              
170              
171             sub transform {
172             my ($self,$transformer,@input) = @_;
173              
174             $transformer=$self->make_transformer($transformer);
175              
176             my $method = try { $transformer->can('transform') }
177             or Net::Stomp::Producer::Exceptions::BadTransformer->throw({
178             transformer => $transformer,
179             });
180              
181             my @messages = $transformer->$method(@input);
182              
183             my $vmethod = try { $transformer->can('validate') };
184              
185             my @ret;
186              
187             while (my ($headers, $body) = splice @messages, 0, 2) {
188             if ($vmethod) {
189             my ($exception,$valid);
190             try {
191             $valid = $transformer->$vmethod($headers,$body);
192             } catch { $exception = $_ };
193             if (!$valid) {
194             Net::Stomp::Producer::Exceptions::Invalid->throw({
195             transformer => $transformer,
196             message_body => $body,
197             message_headers => $headers,
198             previous_exception => $exception,
199             });
200             }
201             }
202             push @ret,$headers,$body;
203             }
204              
205             return @ret;
206             }
207              
208              
209             sub send_many {
210             my ($self,@messages) = @_;
211              
212             while (my ($headers, $body) = splice @messages, 0, 2) {
213             $self->send(undef,$headers,$body);
214             }
215              
216             return;
217             }
218              
219              
220             sub transform_and_send {
221             my ($self,$transformer,@input) = @_;
222              
223             my @messages = $self->transform($transformer,@input);
224              
225             $self->send_many(@messages);
226              
227             return;
228             }
229              
230             __PACKAGE__->meta->make_immutable;
231              
232              
233             1;
234              
235             __END__
236              
237             =pod
238              
239             =encoding UTF-8
240              
241             =head1 NAME
242              
243             Net::Stomp::Producer - helper object to send messages via Net::Stomp
244              
245             =head1 VERSION
246              
247             version 2.004
248              
249             =head1 SYNOPSIS
250              
251             my $ser = JSON::XS->new->utf8;
252              
253             my $p = Net::Stomp::Producer->new({
254             servers => [ { hostname => 'localhost', port => 61613 } ],
255             serializer => sub { $ser->encode($_[0]) },
256             default_headers => { 'content-type' => 'json' },
257             });
258              
259             $p->send('/queue/somewhere',
260             { type => 'my_message' },
261             { a => [ 'data', 'structure' ] });
262              
263             Also:
264              
265             package My::Message::Transformer {
266             use Moose;
267             sub transform {
268             my ($self,@elems) = @_;
269              
270             return { destination => '/queue/somewhere',
271             type => 'my_message', },
272             { a => \@elems };
273             }
274             }
275              
276             $p->transform_and_send('My::Message::Transformer',
277             'data','structure');
278              
279             Or even:
280              
281             my $t = My::Message::Transformer->new();
282             $p->transform_and_send($t,
283             'data','structure');
284              
285             They all send the same message.
286              
287             =head1 DESCRIPTION
288              
289             This class sends messages via a STOMP connection (see
290             L<Net::Stomp::MooseHelpers::CanConnect>). It provides facilities for
291             serialisation and validation. You can have an instance of this class
292             as a singleton / global in your process, and use it to send all your
293             messages: this is recommended, as it will prevent flooding the broker
294             with many connections (each instance would connect independently, and
295             if you create many instances per second, the broker or your process
296             may run out of file descriptiors and stop working).
297              
298             You can use it at several levels:
299              
300             =head2 Raw sending
301              
302             my $p = Net::Stomp::Producer->new({
303             servers => [ { hostname => 'localhost', port => 61613 } ],
304             });
305              
306             $p->send($destination,\%headers,$body_byte_string);
307              
308             This will just wrap the parameters in a L<Net::Stomp::Frame> and send
309             it. C<$destination> can be undef, if you have set it in the
310             C<%headers>.
311              
312             =head2 Serialisation support
313              
314             my $p = Net::Stomp::Producer->new({
315             servers => [ { hostname => 'localhost', port => 61613 } ],
316             serializer => sub { encode_json($_[0]) },
317             });
318              
319             $p->send($destination,\%headers,$body_hashref);
320              
321             The body will be passed through the C<serializer>, and the resulting
322             string will be used as above.
323              
324             =head2 Transformer instance
325              
326             $p->transform_and_send($transformer_obj,@args);
327              
328             This will call C<< $transformer_obj->transform(@args) >>. That
329             function should return a list (with an even number of elements). Each
330             pair of elements is interpreted as C<< \%headers, $body_ref >> and
331             passed to L</send> as above (with no C<destination>, so the
332             transformer should set it in the headers). It's not an error for the
333             transformer to return an empty list: it just means that nothing will
334             be sent.
335              
336             =head2 Transformer class
337              
338             my $p = Net::Stomp::Producer->new({
339             servers => [ { hostname => 'localhost', port => 61613 } ],
340             transformer_args => { some => 'param' },
341             });
342              
343             $p->transform_and_send($transformer_class,@args);
344              
345             The transformer will be instantiated like C<<
346             $transformer_class->new($p->transformer_args) >>, then the object will
347             be called as above.
348              
349             =head2 Transform & validate
350              
351             If the transformer class / object supports the C<validate> method, it
352             will be called before sending each message, like:
353              
354             $transformer_obj->validate(\%headers,$body_ref);
355              
356             This method is expected to return a true value if the message is
357             valid, and throw a meaningful exception if it is not. The exception
358             will be wrapped in a L<Net::Stomp::Producer::Exceptions::Invalid>. If
359             the C<validate> method returns false without throwing any exception,
360             L<Net::Stomp::Producer::Exceptions::Invalid> will still be throw, but
361             the C<previous_exception> slot will be undef.
362              
363             =head1 ATTRIBUTES
364              
365             =head2 C<serializer>
366              
367             A coderef that, passed the body parameter from L</send>, returns a
368             byte string to use as the frame body. The default coderef will just
369             pass non-refs through, and die (with a
370             L<Net::Stomp::Producer::Exceptions::CantSerialize> exception) if
371             passed a ref.
372              
373             =head2 C<default_headers>
374              
375             Hashref of STOMP headers to use for every frame we send. Headers
376             passed in to L</send> take precedence. There is no support for
377             I<removing> a default header for a single send.
378              
379             =head2 C<transactional_sending>
380              
381             B<DEPRECATED>. Use L</sending_method> instead. This boolean was too
382             restrictive.
383              
384             Instead of doing C<< ->transactional_sending(1) >> do C<<
385             ->sending_method('transactional') >>.
386              
387             Instead of doing C<< ->transactional_sending(0) >> do C<<
388             ->sending_method('') >> or C<< ->sending_method('default') >>.
389              
390             Boolean, defaults to false. If true, use
391             L<Net::Stomp/send_transactional> instead of L<Net::Stomp/send> to send
392             frames.
393              
394             =head2 C<sending_method>
395              
396             String, defaults to C<''>. Selects which method to use on the
397             connection's L<Net::Stomp> object to actually send a message. The name
398             of the method is derived from the value of this attribute by
399             prepending C<send_> to it (so you can't abuse this to call arbitrary
400             methods), unless this attribute's value is C<''> or C<'default'>, in
401             which case the simple C<send> method will be used.
402              
403             =head2 C<transformer_args>
404              
405             Hashref to pass to the transformer constructor when
406             L</make_transformer> instantiates a transformer class.
407              
408             =head1 METHODS
409              
410             =head2 C<send>
411              
412             $p->send($destination,\%headers,$body);
413              
414             Serializes the C<$body> via the L</serializer>, merges the C<%headers>
415             with the L</default_headers>, setting the C<content-length> to the
416             byte length of the serialized body. Overrides the destination in the
417             headers with C<$destination> if it's defined.
418              
419             Finally, sends the frame.
420              
421             =head2 C<make_transformer>
422              
423             $p->make_transformer($class);
424              
425             If passed a reference, this function just returns it (it assumes it's
426             a transformer object ready to use).
427              
428             If passed a string, tries to load the class with
429             L<Module::Runtime::use_package_optimistically|Module::Runtime/use_package_optimistically>. If
430             the class has a C<new> method, it's invoked with the value of
431             L</transformer_args> to obtain an object that is then returned. If the
432             class does not have a C<new>, the class name is returned.
433              
434             =head2 C<transform>
435              
436             my (@headers_and_bodies) = $p->transform($transformer,@data);
437              
438             Uses L</make_transformer> to (optionally) instantiate a transformer
439             object, then tries to call C<transform> on it. If there is no such
440             method, a L<Net::Stomp::Producer::Exceptions::BadTransformer> is
441             thrown.
442              
443             The transformer is expected to return a list of (header,body) pairs
444             (that is, a list with an even number of elements; I<not> a list of
445             arrayrefs!).
446              
447             Each message in the returned list is optionally validated, then returned.
448              
449             The optional validation happens if the transformer C<<
450             ->can('validate') >>. If it can, that method is called like:
451              
452             $transformer->validate($header,$body_ref);
453              
454             The method is expected to return a true value if the message is valid,
455             and throw a meaningful exception if it is not. The exception will be
456             wrapped in a L<Net::Stomp::Producer::Exceptions::Invalid>. If the
457             C<validate> method returns false without throwing any exception,
458             L<Net::Stomp::Producer::Exceptions::Invalid> will still be throw, but
459             the C<previous_exception> slot will be undef.
460              
461             It's not an error for the transformer to return an empty list: it just
462             means that nothing will be returned.
463              
464             =head2 C<send_many>
465              
466             $p->send_many(@headers_and_bodies);
467              
468             Given a list of (header,body) pairs (that is, a list with an even
469             number of elements; I<not> a list of arrayrefs!), it will send each
470             pair as a message. Useful in combination with L</transform>.
471              
472             It's not an error for the list to beempty: it just means that nothing
473             will be sent.
474              
475             =head2 C<transform_and_send>
476              
477             $p->transform_and_send($transformer,@data);
478              
479             Equivalent to:
480              
481             $p->send_many($p->transform($transformer,@data));
482              
483             which is similar to:
484              
485             my ($header,$body) = $p->transform($transformer,@data);
486             $p->send(undef,$header,$body);
487              
488             but it works also when the transformer returns more than one pair.
489              
490             It's not an error for the transformer to return an empty list: it just
491             means that nothing will be sent.
492              
493             I<< Why would I ever want to use L</transform> and L</send_many> separately? >>
494              
495             Let's say you are in a transaction, and you want to fail if the
496             messages cannot be prepared, but not fail if the prepared messages
497             cannot be sent. In this case, you call L</transform> inside the
498             transaction, and L</send_many> outside of it.
499              
500             But yes, in most cases you should really just call
501             C<transform_and_send>.
502              
503             =head1 EXAMPLES
504              
505             You can find examples of use in the tests, or at
506             https://github.com/dakkar/CatalystX-StompSampleApps
507              
508             =head1 AUTHOR
509              
510             Gianni Ceccarelli <gianni.ceccarelli@net-a-porter.com>
511              
512             =head1 COPYRIGHT AND LICENSE
513              
514             This software is copyright (c) 2012 by Net-a-porter.com.
515              
516             This is free software; you can redistribute it and/or modify it under
517             the same terms as the Perl 5 programming language system itself.
518              
519             =cut