File Coverage

blib/lib/Net/STOMP/Client.pm
Criterion Covered Total %
statement 42 337 12.4
branch 0 160 0.0
condition 0 20 0.0
subroutine 14 61 22.9
pod 35 35 100.0
total 91 613 14.8


line stmt bran cond sub pod time code
1             #+##############################################################################
2             # #
3             # File: Net/STOMP/Client.pm #
4             # #
5             # Description: STOMP object oriented client module #
6             # #
7             #-##############################################################################
8              
9             #
10             # module definition
11             #
12              
13             package Net::STOMP::Client;
14 1     1   13171 use strict;
  1         1  
  1         25  
15 1     1   3 use warnings;
  1         0  
  1         57  
16             our $VERSION = "2.3";
17             our $REVISION = sprintf("%d.%02d", q$Revision: 2.6 $ =~ /(\d+)\.(\d+)/);
18              
19             #
20             # used modules
21             #
22              
23 1     1   335 use Net::STOMP::Client::Auth qw();
  1         4  
  1         30  
24 1     1   378 use Net::STOMP::Client::Connection qw();
  1         3  
  1         29  
25 1     1   474 use Net::STOMP::Client::Frame qw(demessagify);
  1         3  
  1         9  
26 1     1   539 use Net::STOMP::Client::HeartBeat qw(*);
  1         2  
  1         4  
27 1     1   485 use Net::STOMP::Client::IO qw();
  1         1  
  1         19  
28 1     1   5 use Net::STOMP::Client::Peer qw();
  1         1  
  1         18  
29 1     1   401 use Net::STOMP::Client::Receipt qw(*);
  1         2  
  1         5  
30 1     1   457 use Net::STOMP::Client::Version qw(*);
  1         2  
  1         4  
31 1     1   74 use No::Worries::Die qw(dief);
  1         1  
  1         4  
32 1     1   60 use No::Worries::Log qw(log_debug);
  1         1  
  1         5  
33 1     1   73 use Params::Validate qw(validate :types);
  1         1  
  1         105  
34 1     1   4 use Time::HiRes qw();
  1         1  
  1         2985  
35              
36             #
37             # global variables
38             #
39              
40             our(
41             $Debug, # default debug string
42             %Hook, # registered frame hooks
43             %Setup, # registered setup helpers
44             );
45              
46             #+++############################################################################
47             # #
48             # timeout handling #
49             # #
50             #---############################################################################
51              
52             #
53             # check the timeout attribute and convert it to its expanded form
54             #
55              
56             sub _check_timeout ($) {
57 0     0     my($self) = @_;
58 0           my($timeout);
59              
60 0           $timeout = $self->{"timeout"};
61 0 0         if (defined($timeout)) {
62 0 0         if (ref($timeout) eq "") {
    0          
63             # scalar timeout specified -> backward compatibility
64 0           $timeout = {
65             "connect" => $timeout,
66             "connected" => $timeout,
67             "disconnect" => $timeout,
68             "send" => undef,
69             "receive" => undef,
70             };
71             } elsif (ref($timeout) eq "HASH") {
72             # hash timeout specified -> use it as is
73             } else {
74 0           dief("unexpected timeout: %s", $timeout);
75             }
76             } else {
77             # no timeout specified -> use hard-coded defaults
78 0           $timeout = {
79             "connect" => undef,
80             "connected" => 10,
81             "disconnect" => 10,
82             "send" => undef,
83             "receive" => undef,
84             };
85             }
86 0           $self->{"timeout"} = $timeout;
87             }
88              
89             #+++############################################################################
90             # #
91             # callback handling #
92             # #
93             #---############################################################################
94              
95             #
96             # user-friendly accessors for the callbacks
97             #
98              
99             sub _any_callback ($$;$) {
100 0     0     my($self, $command, $callback) = @_;
101              
102 0 0         return($self->{"callback"}{$command}) unless $callback;
103 0           $self->{"callback"}{$command} = $callback;
104 0           return($self);
105             }
106              
107             sub connected_callback : method {
108 0     0 1   my($self, $callback) = @_;
109              
110 0           return(_any_callback($self, "CONNECTED", $callback));
111             }
112              
113             sub error_callback : method {
114 0     0 1   my($self, $callback) = @_;
115              
116 0           return(_any_callback($self, "ERROR", $callback));
117             }
118              
119             sub message_callback : method {
120 0     0 1   my($self, $callback) = @_;
121              
122 0           return(_any_callback($self, "MESSAGE", $callback));
123             }
124              
125             sub receipt_callback : method {
126 0     0 1   my($self, $callback) = @_;
127              
128 0           return(_any_callback($self, "RECEIPT", $callback));
129             }
130              
131             #
132             # dispatch one received frame, calling the appropriate callback if existing
133             #
134              
135             sub dispatch_frame : method {
136 0     0 1   my($self, $frame, %option) = @_;
137 0           my($command, $callback);
138              
139 0           $command = $frame->command();
140 0 0         dief("unexpected %s frame received", $command)
141             unless $command =~ /^(CONNECTED|ERROR|MESSAGE|RECEIPT)$/;
142 0           $callback = $self->{"callback"}{$command};
143 0 0         return() unless $callback;
144 0           return($callback->($self, $frame));
145             }
146              
147             #+++############################################################################
148             # #
149             # hook handling #
150             # #
151             #---############################################################################
152              
153             #
154             # run all the hooks of the given frame
155             #
156              
157             sub _run_hooks ($$) {
158 0     0     my($self, $frame) = @_;
159 0           my($command);
160              
161 0           $command = $frame->command();
162 0 0         return unless $Hook{$command};
163 0           foreach my $name (sort(keys(%{ $Hook{$command} }))) {
  0            
164 0           $Hook{$command}{$name}->($self, $frame);
165             }
166             }
167              
168             #
169             # default CONNECT hook
170             #
171              
172             $Hook{"CONNECT"}{"default"} = sub {
173             my($self, $frame) = @_;
174              
175             # do nothing when only STOMP 1.0 is asked
176             return unless grep($_ ne "1.0", $self->accept_version());
177             # add the required host header if missing
178             $frame->header("host", $self->host())
179             unless defined($frame->header("host"));
180             };
181              
182             #
183             # default CONNECTED hook
184             #
185              
186             $Hook{"CONNECTED"}{"default"} = sub {
187             my($self, $frame) = @_;
188             my($value);
189              
190             # make sure we receive this frame only once!
191             dief("already connected") if $self->{"session"};
192             # keep track of session information
193             $value = $frame->header("session");
194             if ($value) {
195             # keep it only if true as it will be used elsewhere to check state
196             $self->{"session"} = $value;
197             } else {
198             # this header is optional but often used so we forge our own if needed
199             $self->{"session"} = sprintf("sid-%s", $self->{"id"});
200             }
201             # keep track of server information
202             $value = $frame->header("server");
203             $self->{"server"} = $value if defined($value) and length($value);
204             };
205              
206             #+++############################################################################
207             # #
208             # object constructor and destructor #
209             # #
210             #---############################################################################
211              
212             #
213             # FIXME: compatibility hack for Net::STOMP::Client 1.x (to be removed one day)
214             #
215              
216             sub _hacknew ($) {
217 0     0     my($option) = @_;
218              
219             # Net::STOMP::Client::Debug::Flags to be replaced by ???
220 0 0 0       if ($Net::STOMP::Client::Debug::Flags and not exists($option->{"debug"})) {
221 0 0         No::Worries::Log::log_filter("debug caller=~^Net::STOMP::Client")
222             unless No::Worries::Log::log_wants_debug();
223 0           $option->{"debug"} = "+";
224 0 0         $option->{"debug"} .= "api+"
225             if $Net::STOMP::Client::Debug::Flags & (1 << 0);
226 0 0         $option->{"debug"} .= "command+"
227             if $Net::STOMP::Client::Debug::Flags & (1 << 1);
228 0 0         $option->{"debug"} .= "header+"
229             if $Net::STOMP::Client::Debug::Flags & (1 << 2);
230 0 0         $option->{"debug"} .= "body+"
231             if $Net::STOMP::Client::Debug::Flags & (1 << 3);
232 0 0         $option->{"debug"} .= "io+"
233             if $Net::STOMP::Client::Debug::Flags & (1 << 4);
234             }
235             }
236              
237             #
238             # create a new Net::STOMP::Client object and connect to the server (socket level only)
239             #
240              
241             my %new_options = (
242             "uri" => { optional => 1, type => SCALAR },
243             "host" => { optional => 1, type => SCALAR },
244             "port" => { optional => 1, type => SCALAR },
245             "sockopts" => { optional => 1, type => HASHREF },
246             "debug" => { optional => 1, type => SCALAR },
247             "timeout" => { optional => 1, type => UNDEF|SCALAR|HASHREF },
248             # additional options from the sub-modules
249             map($_->(), values(%Setup)),
250             );
251              
252             sub new : method {
253 0     0 1   my($class, %option, $self, %sockopts, %connopt, $timeout, $socket, $peer);
254              
255 0           $class = shift(@_);
256 0           %option = validate(@_, \%new_options);
257 0           _hacknew(\%option);
258 0           $self = bless(\%option, $class);
259 0           foreach my $name (sort(keys(%Setup))) {
260 0           $Setup{$name}->($self);
261             }
262 0 0         if ($self =~ /\(0x(\w+)\)/) {
263 0           $self->{"id"} =
264             sprintf("%s-%x-%x-%x", $1, time(), $$, int(rand(65536)));
265             } else {
266 0           dief("unexpected Perl object: %s", $self);
267             }
268             # check the debug option (and set defaults)
269 0 0         $self->{"debug"} = $Debug unless exists($self->{"debug"});
270             # check the timeout option (and set defaults)
271 0           _check_timeout($self);
272             # check the sockopts option
273 0 0         %sockopts = %{ $self->{"sockopts"} } if $self->{"sockopts"};
  0            
274             $sockopts{SSL_use_cert} = 1
275 0 0 0       if $sockopts{SSL_cert_file} or $sockopts{SSL_key_file};
276 0 0         unless (exists($sockopts{Timeout})) {
277 0           $timeout = $self->{"timeout"}{"connect"};
278 0 0         $sockopts{Timeout} = $timeout if $timeout;
279             }
280             # connect (TCP level only)
281 0 0         $connopt{"debug"} = $self->{"debug"} if defined($self->{"debug"});
282 0 0         $connopt{"host"} = $self->{"host"} if defined($self->{"host"});
283 0 0         $connopt{"port"} = $self->{"port"} if defined($self->{"port"});
284 0 0         $connopt{"uri"} = $self->{"uri"} if defined($self->{"uri"});
285 0 0         $connopt{"sockopt"} = \%sockopts if keys(%sockopts);
286 0           ($socket, $peer) = Net::STOMP::Client::Connection::new(%connopt);
287             # bookkeeping
288 0           $self->{"peer"} = $peer;
289 0 0         if ($self->uri()) {
290             # keep track of the peer this way too...
291 0           $self->{"host"} = $peer->host();
292 0           $self->{"port"} = $peer->port();
293             }
294 0           $self->{"io"} = Net::STOMP::Client::IO->new($socket);
295 0           $self->{"serial"} = 1;
296             $self->{"callback"} = {
297             "ERROR" => sub {
298 0     0     my($_self, $_frame) = @_;
299 0   0       dief("unexpected ERROR frame received: %s",
300             $_frame->header("message") || "?");
301             },
302 0           };
303             # so far so good!
304 0           return($self);
305             }
306              
307             #
308             # close the socket opened by new()
309             #
310              
311             sub _close ($) {
312 0     0     my($self) = @_;
313 0           my($socket, $ignored);
314              
315             # try to disconnect gracefully if possible
316 0 0 0       $self->disconnect() if $self->{"session"} and $self->{"io"};
317             # then destroy the I/O object while keeping a handle on the socket
318 0           $socket = $self->{"io"}{"socket"};
319 0           delete($self->{"io"});
320             # then maybe shutdown the socket (http://www.perlmonks.org/?node=108244)
321 0 0         if ($socket) {
322             # call shutdown() without checking if it fails or not since there is
323             # not much that can be done in case of failure... unless we use SSL
324             # for which it is better not to call shutdown(), see IO::Socket::SSL's
325             # man page for more information
326 0 0         $ignored = shutdown($socket, 2) unless $socket->isa("IO::Socket::SSL");
327             }
328             # the socket will auto-close when $socket gets destroyed
329             }
330              
331             #
332             # object destructor
333             #
334              
335             sub DESTROY {
336 0     0     my($self) = @_;
337              
338 0           local $@ = ""; # preserve $@!
339 0           _close($self);
340             }
341              
342             #+++############################################################################
343             # #
344             # accessors #
345             # #
346             #---############################################################################
347              
348             #
349             # very simple-minded read-only accessors
350             #
351              
352 0     0 1   sub host : method { my($self) = @_; return($self->{"host"}); }
  0            
353 0     0 1   sub peer : method { my($self) = @_; return($self->{"peer"}); }
  0            
354 0     0 1   sub port : method { my($self) = @_; return($self->{"port"}); }
  0            
355 0     0 1   sub server : method { my($self) = @_; return($self->{"server"}); }
  0            
356 0     0 1   sub session : method { my($self) = @_; return($self->{"session"}); }
  0            
357 0     0 1   sub uri : method { my($self) = @_; return($self->{"uri"}); }
  0            
358              
359             #
360             # I/O-related read-only accessors
361             #
362              
363             sub socket : method { ## no critic 'ProhibitBuiltinHomonyms'
364 0     0 1   my($self) = @_;
365              
366 0 0         return(undef) unless $self->{"io"};
367 0           return($self->{"io"}{"socket"});
368             }
369              
370             sub incoming_buffer_reference : method {
371 0     0 1   my($self) = @_;
372              
373 0 0         return(undef) unless $self->{"io"};
374 0           return(\$self->{"io"}{"incoming_buffer"});
375             }
376              
377             sub outgoing_buffer_length : method {
378 0     0 1   my($self) = @_;
379              
380 0 0         return(undef) unless $self->{"io"};
381 0           return($self->{"io"}{"outgoing_length"});
382             }
383              
384             #
385             # return a universal pseudo-unique id to be used in receipts and transactions
386             #
387              
388             sub uuid : method {
389 0     0 1   my($self) = @_;
390              
391 0           return(sprintf("%s-%x", $self->{"id"}, $self->{"serial"}++));
392             }
393              
394             #+++############################################################################
395             # #
396             # low-level API #
397             # #
398             #---############################################################################
399              
400             #
401             # FIXME: compatibility hack for Net::STOMP::Client 1.x (to be removed one day)
402             #
403              
404             sub _hackopt (@) {
405 0 0   0     return("timeout" => $_[0]) if @_ == 1;
406 0           return(@_);
407             }
408              
409             #
410             # helper for the debug and timeout options
411             #
412              
413             sub _chkopt ($$%) {
414 0     0     my($self, $what, %option) = @_;
415              
416             # handle the global debug option
417             $option{"debug"} = $self->{"debug"}
418 0 0         unless exists($option{"debug"});
419             # handle the global timeout option
420 0 0         if ($what) {
421             $option{"timeout"} = $self->{"timeout"}{$what}
422 0 0         unless exists($option{"timeout"});
423             } else {
424 0           delete($option{"timeout"});
425             }
426             # so far so good
427 0           return(%option);
428             }
429              
430             #
431             # send data
432             #
433              
434             sub send_data : method {
435 0     0 1   my($self, %option);
436              
437 0           $self = shift(@_);
438 0           %option = _hackopt(@_);
439             # check that the I/O object is still usable
440 0 0         dief("lost connection") unless $self->{"io"};
441             # just do it
442 0           return($self->{"io"}->send_data(_chkopt($self, "send", %option)));
443             }
444              
445             #
446             # receive data
447             #
448              
449             sub receive_data : method {
450 0     0 1   my($self, %option);
451              
452 0           $self = shift(@_);
453 0           %option = _hackopt(@_);
454             # check that the I/O object is still usable
455 0 0         dief("lost connection") unless $self->{"io"};
456             # just do it
457 0           return($self->{"io"}->receive_data(_chkopt($self, "receive", %option)));
458             }
459              
460             #
461             # queue the given frame
462             #
463              
464             sub queue_frame : method {
465 0     0 1   my($self, $frame, %option) = @_;
466 0           my($data);
467              
468             # check that the I/O object is still usable
469 0 0         dief("lost connection") unless $self->{"io"};
470 0 0         if (ref($frame)) {
471             # a real frame
472 0           _run_hooks($self, $frame);
473             # encode it
474 0 0         $option{"version"} = $self->{"version"} if $self->{"version"};
475 0           $data = $frame->encode(_chkopt($self, undef, %option));
476             } else {
477             # handle already encoded frames (including the special NOOP frame)
478 0           $data = \$frame;
479             }
480             # queue what we have
481 0           return($self->{"io"}->queue_data($data));
482             }
483              
484             #
485             # send the given frame (i.e. queue and then send _all_ data)
486             #
487              
488             sub send_frame : method {
489 0     0 1   my($self, $frame, %option);
490              
491 0           $self = shift(@_);
492 0           $frame = shift(@_);
493 0           %option = _hackopt(@_);
494             # queue the frame
495 0           $self->queue_frame($frame, %option);
496             # send queued data
497 0           $self->send_data(%option);
498             # make sure we did send _all_ data
499 0 0         dief("could not send all data!") if $self->outgoing_buffer_length();
500             }
501              
502             #
503             # try to receive one frame
504             #
505              
506             sub receive_frame : method {
507 0     0 1   my($self, %option, $maxtime, %decopt, $bufref, $frame, $remaining);
508              
509 0           $self = shift(@_);
510 0           %option = _hackopt(@_);
511             # keep track of time
512             $option{"timeout"} = $self->{"timeout"}{"receive"}
513 0 0         unless exists($option{"timeout"});
514             $maxtime = Time::HiRes::time() + $option{"timeout"}
515 0 0         if defined($option{"timeout"});
516             # first try to use the incoming buffer
517 0           %decopt = ("state" => {});
518 0 0         $decopt{"version"} = $self->{"version"} if $self->{"version"};
519 0 0         $decopt{"debug"} = $option{"debug"} if exists($option{"debug"});
520 0 0         $decopt{"debug"} = $self->{"debug"} unless exists($decopt{"debug"});
521 0           $bufref = $self->incoming_buffer_reference();
522 0           $frame = Net::STOMP::Client::Frame::decode($bufref, %decopt);
523             # if this fails, try to receive more data until we are done
524 0           while (not $frame) {
525             # where are we with time?
526 0 0         if (not defined($option{"timeout"})) {
    0          
527             # timeout = undef => blocking
528             } elsif ($option{"timeout"}) {
529             # timeout > 0 => try once more if not too late
530 0           $remaining = $maxtime - Time::HiRes::time();
531 0 0         return(0) if $remaining <= 0;
532 0           $option{"timeout"} = $remaining;
533             } else {
534             # timeout = 0 => non-blocking
535             }
536             # receive more data
537 0 0         return() unless $self->receive_data(%option);
538             # do we have a complete frame now?
539 0           $frame = Net::STOMP::Client::Frame::decode($bufref, %decopt);
540             }
541             # so far so good
542 0 0         _run_hooks($self, $frame) if $frame;
543 0           return($frame);
544             }
545              
546             #
547             # wait for new frames and dispatch them
548             #
549              
550             sub wait_for_frames : method {
551 0     0 1   my($self, %option) = @_;
552 0           my($callback, $maxtime, $frame, $result, $remaining, %recvopt);
553              
554 0           $callback = $option{callback};
555 0           %recvopt = ();
556 0 0         $recvopt{"debug"} = $option{"debug"} if exists($option{"debug"});
557 0 0         $recvopt{"debug"} = $self->{"debug"} unless exists($recvopt{"debug"});
558 0 0         if (defined($option{"timeout"})) {
559 0           $maxtime = Time::HiRes::time() + $option{"timeout"};
560 0           $recvopt{"timeout"} = $option{"timeout"};
561             }
562 0           while (1) {
563 0           $frame = $self->receive_frame(%recvopt);
564 0 0         if ($frame) {
565             # we always call first the per-command callback
566 0           $self->dispatch_frame($frame);
567 0 0         if ($callback) {
568             # user callback: we stop if callback returns error or true or if once
569 0           $result = $callback->($self, $frame);
570             return($result)
571 0 0 0       if not defined($result) or $result or $option{once};
      0        
572             } else {
573             # no user callback: we stop on the first frame and return it
574 0           return($frame);
575             }
576             }
577             # we check if we exceeded the timeout
578 0 0         if (defined($maxtime)) {
579 0           $remaining = $maxtime - Time::HiRes::time();
580 0 0         return(0) if $remaining <= 0;
581 0           $recvopt{"timeout"} = $remaining;
582             }
583             }
584             # not reached...
585 0           die("ooops!");
586             }
587              
588             #
589             # convenient shortcuts
590             #
591              
592             sub queue_message : method {
593 0     0 1   my($self, $message, %option) = @_;
594              
595 0           return($self->queue_frame(demessagify($message), %option));
596             }
597              
598             sub send_message : method {
599 0     0 1   my($self, $message, %option) = @_;
600              
601 0           return($self->send_frame(demessagify($message), %option));
602             }
603              
604             #+++############################################################################
605             # #
606             # high-level API (each method matches a client frame command) #
607             # #
608             #---############################################################################
609              
610             #
611             # check the method invocation for the high-level API (except connect)
612             #
613              
614             sub _check_api ($$$$) {
615 0     0     my($self, $name, $header, $option) = @_;
616 0           my($debug);
617              
618             $option->{debug} = delete($header->{debug})
619 0 0         if exists($header->{debug});
620             $option->{timeout} = delete($header->{timeout})
621 0 0         if exists($header->{timeout});
622 0 0         $debug = exists($option->{debug}) ? $option->{debug} : $self->{debug};
623 0 0 0       log_debug("%s->%s()", "$self", $name)
624             if $debug and $debug =~ /\b(api|all)\b/;
625 0 0         if ($name eq "connect") {
626 0 0         dief("already connected") if $self->{"session"};
627             } else {
628 0 0         dief("not connected") unless $self->{"session"};
629             }
630             }
631              
632             #
633             # connect to server
634             #
635              
636             sub connect : method { ## no critic 'ProhibitBuiltinHomonyms'
637 0     0 1   my($self, %header) = @_;
638 0           my(%option, $frame);
639              
640 0           _check_api($self, "connect", \%header, \%option);
641             # send a CONNECT frame
642 0           $frame = Net::STOMP::Client::Frame->new(
643             command => "CONNECT",
644             headers => \%header,
645             );
646 0           $self->send_frame($frame, %option);
647             # wait for the CONNECTED frame to come back
648             $self->wait_for_frames(
649 0     0     callback => sub { return($self->{"session"}) },
650 0           timeout => $self->{"timeout"}{"connected"},
651             );
652 0 0         dief("no CONNECTED frame received") unless $self->{"session"};
653 0           return($self);
654             }
655              
656             #
657             # disconnect from server
658             #
659              
660             sub disconnect : method {
661 0     0 1   my($self, %header) = @_;
662 0           my(%option, $frame);
663              
664 0           _check_api($self, "disconnect", \%header, \%option);
665             # send a DISCONNECT frame
666 0           $frame = Net::STOMP::Client::Frame->new(
667             command => "DISCONNECT",
668             headers => \%header,
669             );
670 0           $self->send_frame($frame, %option);
671             # if a receipt has been given, wait for it!
672 0 0         if ($header{receipt}) {
673             # at this point, the server may abruptly close the socket without
674             # lingering so we ignore I/O errors while we wait for the receipt
675             # to come back
676 0           eval {
677             $self->wait_for_frames(
678             timeout => $self->{"timeout"}{"disconnect"},
679             callback => sub {
680 0     0     return(! $self->{"receipts"}{$header{receipt}});
681             },
682 0           );
683             };
684             }
685             # additional bookkeeping
686 0           delete($self->{"peer"});
687 0           delete($self->{"session"});
688 0           _close($self);
689 0           return($self);
690             }
691              
692             #
693             # subscribe to something
694             #
695              
696             sub subscribe : method {
697 0     0 1   my($self, %header) = @_;
698 0           my(%option, $frame);
699              
700 0           _check_api($self, "subscribe", \%header, \%option);
701             # send a SUBSCRIBE frame
702 0           $frame = Net::STOMP::Client::Frame->new(
703             command => "SUBSCRIBE",
704             headers => \%header,
705             );
706 0           $self->send_frame($frame, %option);
707 0           return($self);
708             }
709              
710             #
711             # unsubscribe from something
712             #
713              
714             sub unsubscribe : method {
715 0     0 1   my($self, %header) = @_;
716 0           my(%option, $frame);
717              
718 0           _check_api($self, "unsubscribe", \%header, \%option);
719             # send an UNSUBSCRIBE frame
720 0           $frame = Net::STOMP::Client::Frame->new(
721             command => "UNSUBSCRIBE",
722             headers => \%header,
723             );
724 0           $self->send_frame($frame, %option);
725 0           return($self);
726             }
727              
728             #
729             # send a message somewhere
730             #
731              
732             sub send : method { ## no critic 'ProhibitBuiltinHomonyms'
733 0     0 1   my($self, %header) = @_;
734 0           my(%option, %frameopt, $frame);
735              
736 0           _check_api($self, "send", \%header, \%option);
737             # we can optionally give a message body here
738             $frameopt{body} = delete($header{body})
739 0 0         if defined($header{body});
740             $frameopt{body_reference} = delete($header{body_reference})
741 0 0         if defined($header{body_reference});
742             # send a SEND frame
743 0           $frame = Net::STOMP::Client::Frame->new(%frameopt,
744             command => "SEND",
745             headers => \%header,
746             );
747 0           $self->send_frame($frame, %option);
748 0           return($self);
749             }
750              
751             #
752             # acknowledge the reception of a message
753             #
754              
755             sub ack : method {
756 0     0 1   my($self, %header) = @_;
757 0           my(%option, $frame, $value);
758              
759 0           _check_api($self, "ack", \%header, \%option);
760             # we can optionally give a MESSAGE frame here
761 0 0         if ($header{frame}) {
762 0           $value = $header{frame}->header("message-id");
763 0 0         $header{"message-id"} = $value if defined($value);
764 0           $value = $header{frame}->header("subscription");
765 0 0         $header{"subscription"} = $value if defined($value);
766 0           $value = $header{frame}->header("ack");
767 0 0         $header{"id"} = $value if defined($value);
768 0           delete($header{frame});
769             }
770             # send an ACK frame
771 0           $frame = Net::STOMP::Client::Frame->new(
772             command => "ACK",
773             headers => \%header,
774             );
775 0           $self->send_frame($frame, %option);
776 0           return($self);
777             }
778              
779             #
780             # acknowledge the rejection of a message
781             #
782              
783             sub nack : method {
784 0     0 1   my($self, %header) = @_;
785 0           my(%option, $frame, $value);
786              
787 0           _check_api($self, "nack", \%header, \%option);
788             dief("unsupported NACK frames for STOMP 1.0")
789 0 0         if $self->{"version"} eq "1.0";
790             # we can optionally give a MESSAGE frame here
791 0 0         if ($header{frame}) {
792 0           $value = $header{frame}->header("message-id");
793 0 0         $header{"message-id"} = $value if defined($value);
794 0           $value = $header{frame}->header("subscription");
795 0 0         $header{"subscription"} = $value if defined($value);
796 0           $value = $header{frame}->header("ack");
797 0 0         $header{"id"} = $value if defined($value);
798 0           delete($header{frame});
799             }
800             # send an NACK frame
801 0           $frame = Net::STOMP::Client::Frame->new(
802             command => "NACK",
803             headers => \%header,
804             );
805 0           $self->send_frame($frame, %option);
806 0           return($self);
807             }
808              
809             #
810             # begin/start a transaction
811             #
812              
813             sub begin : method {
814 0     0 1   my($self, %header) = @_;
815 0           my(%option, $frame);
816              
817 0           _check_api($self, "begin", \%header, \%option);
818             # send a BEGIN frame
819 0           $frame = Net::STOMP::Client::Frame->new(
820             command => "BEGIN",
821             headers => \%header,
822             );
823 0           $self->send_frame($frame, %option);
824 0           return($self);
825             }
826              
827             #
828             # commit a transaction
829             #
830              
831             sub commit : method {
832 0     0 1   my($self, %header) = @_;
833 0           my(%option, $frame);
834              
835 0           _check_api($self, "commit", \%header, \%option);
836             # send a COMMIT frame
837 0           $frame = Net::STOMP::Client::Frame->new(
838             command => "COMMIT",
839             headers => \%header,
840             );
841 0           $self->send_frame($frame, %option);
842 0           return($self);
843             }
844              
845             #
846             # abort/rollback a transaction
847             #
848              
849             sub abort : method {
850 0     0 1   my($self, %header) = @_;
851 0           my(%option, $frame);
852              
853 0           _check_api($self, "abort", \%header, \%option);
854             # send a ABORT frame
855 0           $frame = Net::STOMP::Client::Frame->new(
856             command => "ABORT",
857             headers => \%header,
858             );
859 0           $self->send_frame($frame, %option);
860 0           return($self);
861             }
862              
863             #
864             # send an empty/noop frame (in fact, a single newline byte)
865             #
866              
867             sub noop : method {
868 0     0 1   my($self, %header) = @_;
869 0           my(%option, $frame);
870              
871 0           _check_api($self, "noop", \%header, \%option);
872             # there is no NOOP frame (yet) so we simply send a newline
873 0           $frame = "\n";
874 0           $self->send_frame($frame, %option);
875 0           return($self);
876             }
877              
878             1;
879              
880             __END__