File Coverage

blib/lib/XAS/Lib/Stomp/Utils.pm
Criterion Covered Total %
statement 12 162 7.4
branch 0 80 0.0
condition 0 3 0.0
subroutine 4 18 22.2
pod 13 13 100.0
total 29 276 10.5


line stmt bran cond sub pod time code
1             package XAS::Lib::Stomp::Utils;
2              
3             our $VERSION = '0.03';
4              
5 1     1   3 use XAS::Lib::Stomp::Frame;
  1         1  
  1         23  
6 1     1   3 use XAS::Constants ':stomp';
  1         1  
  1         6  
7              
8             use XAS::Class
9 1         11 debug => 0,
10             version => $VERSION,
11             base => 'XAS::Base',
12             utils => ':validation',
13             vars => {
14             PARAMS => {
15             -target => { optional => 1, default => undef, regex => STOMP_LEVELS },
16             }
17             }
18 1     1   214 ;
  1         3  
19              
20             #use Data::Dumper;
21              
22             # -----------------------------------------------------------
23             # Public Methods
24             # -----------------------------------------------------------
25              
26             sub connect {
27 0     0 1   my $self = shift;
28 0           my $p = validate_params(\@_, {
29             -login => { optional => 1, default => undef },
30             -passcode => { optional => 1, default => undef },
31             -host => { optional => 1, default => 'localhost' },
32             -heart_beat => { optional => 1, default => '0,0', regex => qr/\d+,\d+/ },
33             -acceptable => { optional => 1, default => '1.0,1.1,1.2',
34             callbacks => {
35             'valid target' => \&_match
36             }
37             }
38             });
39              
40 0           my $frame;
41 0           my $header = {};
42              
43 0 0         $header->{'login'} = $p->{'login'} if (defined($p->{'login'}));
44 0 0         $header->{'passcode'} = $p->{'passcode'} if (defined($p->{'passcode'}));
45              
46 0 0         if ($self->target > 1.0) {
47              
48 0           $header->{'host'} = $p->{'host'};
49 0           $header->{'heart-beat'} = $p->{'heart_beat'};
50 0           $header->{'accept-version'} = $p->{'acceptable'};
51              
52             }
53              
54 0           $frame = XAS::Lib::Stomp::Frame->new(
55             -command => 'CONNECT',
56             -headers => $header,
57             -body => ''
58             );
59              
60 0           return $frame;
61              
62             }
63              
64             sub stomp {
65 0     0 1   my $self = shift;
66 0           my $p = validate_params(\@_, {
67             -login => { optional => 1, default => undef },
68             -passcode => { optional => 1, default => undef },
69             -prefetch => { optional => 1, default => undef },
70             -host => { optional => 1, default => 'localhost' },
71             -heart_beat => { optional => 1, default => '0,0', regex => qr/\d+,\d+/ },
72             -acceptable => {
73             optional => 1,
74             default => '1.0,1.1,1.2',
75             callbacks => {
76             'valid target' => \&_match
77             }
78             }
79             });
80              
81 0           my $frame;
82 0           my $header = {};
83              
84 0 0         if ($self->target == 1.0) {
85              
86 0           $self->throw_msg(
87             'xas.lib.stomp.utils.stomp.nosup',
88             'stomp_no_support',
89             $self->target,
90             'stomp'
91             );
92              
93             }
94              
95 0 0         $header->{'login'} = $p->{'login'} if (defined($p->{'login'}));
96 0 0         $header->{'passcode'} = $p->{'passcode'} if (defined($p->{'passcode'}));
97 0 0         $header->{'prefetch-size'} = $p->{'prefetch'} if (defined($p->{'prefetch'}));
98              
99 0 0         if ($self->target > 1.0) {
100              
101 0           $header->{'host'} = $p->{'host'};
102 0           $header->{'heart-beat'} = $p->{'heart_beat'};
103 0           $header->{'accept-version'} = $p->{'acceptable'};
104              
105             }
106              
107 0           $frame = XAS::Lib::Stomp::Frame->new(
108             -command => 'STOMP',
109             -headers => $header,
110             -body => ''
111             );
112              
113 0           return $frame;
114              
115             }
116              
117             sub subscribe {
118 0     0 1   my $self = shift;
119 0           my $p = validate_params(\@_, {
120             -destination => 1,
121             -prefetch => { optional => 1, default => 0 },
122             -id => { optional => 1, default => undef },
123             -receipt => { optional => 1, default => undef },
124             -ack => { optional => 1, default => 'auto', regex => qr/auto|client|client\-individual/ },
125             });
126              
127 0           my $frame;
128 0           my $header = {};
129              
130 0           $header->{'ack'} = $p->{'ack'};
131 0           $header->{'prefetch-count'} = $p->{'prefetch'};
132 0           $header->{'destination'} = $p->{'destination'};
133 0 0         $header->{'receipt'} = $p->{'receipt'} if (defined($p->{'receipt'}));
134              
135 0 0         if (defined($p->{'id'})) {
136              
137 0           $header->{'id'} = $p->{'id'};
138              
139             } else {
140              
141             # v1.1 and greater must have an id header
142              
143 0 0         if ($self->target > 1.0) {
144              
145 0           $self->throw_msg(
146             'xas.lib.stomp.utils.subscribe',
147             'stomp_no_id',
148             $self->target
149            
150             );
151              
152             }
153              
154             }
155              
156 0           $frame = XAS::Lib::Stomp::Frame->new(
157             -command => 'SUBSCRIBE',
158             -headers => $header,
159             -body => ''
160             );
161              
162 0           return $frame;
163              
164             }
165              
166             sub unsubscribe {
167 0     0 1   my $self = shift;
168 0           my $p = validate_params(\@_, {
169             -id => { optional => 1, default => undef },
170             -destination => { optional => 1, default => undef },
171             -receipt => { optional => 1, default => undef },
172             });
173              
174 0           my $frame;
175 0           my $header = {};
176              
177 0 0         $header->{'receipt'} = $p->{'receipt'} if (defined($p->{'receipt'}));
178              
179             # v1.0 should have either a destination and/or id header
180             # v1.1 and greater may have a destination header
181              
182 0 0 0       if (defined($p->{'destination'}) && defined($p->{'id'})) {
    0          
    0          
183              
184 0           $header->{'id'} = $p->{'id'};
185 0           $header->{'destination'} = $p->{'destination'};
186              
187             } elsif (defined($p->{'destination'})) {
188              
189 0           $header->{'destination'} = $p->{'destination'};
190              
191             } elsif (defined($p->{'id'})) {
192              
193 0           $header->{'id'} = $p->{'id'};
194              
195             } else {
196              
197 0           $self->throw_msg(
198             'xas.lib.stomp.utils.unsubscribe.invparams',
199             'stomp_invalid_params',
200             $self->target
201             );
202              
203             }
204              
205 0 0         if ($self->target > 1.0) {
206              
207             # v1.1 and greater must have an id header
208              
209 0 0         unless (defined($header->{'id'})) {
210              
211 0           $self->throw_msg(
212             'xas.lib.stomp.utils.unsubscribe.noid',
213             'stomp_no_id',
214             $self->target
215             );
216              
217             }
218              
219             }
220              
221 0           $frame = XAS::Lib::Stomp::Frame->new(
222             -command => 'UNSUBSCRIBE',
223             -headers => $header,
224             -body => ''
225             );
226              
227 0           return $frame;
228              
229             }
230              
231             sub begin {
232 0     0 1   my $self = shift;
233 0           my $p = validate_params(\@_, {
234             -transaction => 1,
235             -receipt => { optional => 1, default => undef },
236             });
237              
238 0           my $frame;
239 0           my $header = {};
240              
241 0           $header->{'transaction'} = $p->{'transaction'};
242 0 0         $header->{'receipt'} = $p->{'receipt'} if (defined($p->{'receipt'}));
243              
244 0           $frame = XAS::Lib::Stomp::Frame->new(
245             -command => 'BEGIN',
246             -headers => $header,
247             -body => ''
248             );
249              
250 0           return $frame;
251              
252             }
253              
254             sub commit {
255 0     0 1   my $self = shift;
256 0           my $p = validate_params(\@_, {
257             -transaction => 1,
258             -receipt => { optional => 1, default => undef },
259             });
260              
261 0           my $frame;
262 0           my $header = {};
263              
264 0           $header->{'transaction'} = $p->{'transaction'};
265 0 0         $header->{'receipt'} = $p->{'receipt'} if (defined($p->{'receipt'}));
266              
267 0           $frame = XAS::Lib::Stomp::Frame->new(
268             -command => 'COMMIT',
269             -headers => $header,
270             -body => ''
271             );
272              
273 0           return $frame;
274              
275             }
276              
277             sub abort {
278 0     0 1   my $self = shift;
279 0           my $p = validate_params(\@_, {
280             -transaction => 1,
281             -receipt => { optional => 1, default => undef },
282             });
283              
284 0           my $frame;
285 0           my $header = {};
286              
287 0           $header->{'transaction'} = $p->{'transaction'};
288 0 0         $header->{'receipt'} = $p->{'receipt'} if (defined($p->{'receipt'}));
289              
290 0           $frame = XAS::Lib::Stomp::Frame->new(
291             -command => 'ABORT',
292             -headers => $header,
293             -body => ''
294             );
295              
296 0           return $frame;
297              
298             }
299              
300             sub ack {
301 0     0 1   my $self = shift;
302 0           my $p = validate_params(\@_, {
303             -message_id => 1,
304             -subscription => { optional => 1, default => undef },
305             -receipt => { optional => 1, default => undef },
306             -transaction => { optional => 1, default => undef },
307             });
308              
309 0           my $frame;
310 0           my $header = {};
311              
312 0 0         $header->{'receipt'} = $p->{'receipt'} if (defined($p->{'receipt'}));
313 0 0         $header->{'transaction'} = $p->{'transaction'} if (defined($p->{'transaction'}));
314              
315 0 0         if ($self->target < 1.2) {
316              
317 0           $header->{'message-id'} = $p->{'message_id'};
318              
319             } else {
320              
321 0           $header->{'id'} = $p->{'message_id'};
322              
323             }
324              
325 0 0         if (defined($p->{'subscription'})) {
326              
327 0           $header->{'subscription'} = $p->{'subscription'};
328              
329             } else {
330              
331 0 0         if ($self->target > 1.0) {
332              
333 0           $self->throw_msg(
334             'xas.lib.stomp.utils.ack.nosup',
335             'stomp_no_subscription',
336             $self->target
337             );
338              
339             }
340              
341             }
342              
343 0           $frame = XAS::Lib::Stomp::Frame->new(
344             -command => 'ACK',
345             -headers => $header,
346             -body => ''
347             );
348              
349 0           return $frame;
350              
351             }
352              
353             sub nack {
354 0     0 1   my $self = shift;
355 0           my $p = validate_params(\@_, {
356             -message_id => 1,
357             -receipt => { optional => 1, default => undef },
358             -subscription => { optional => 1, default => undef },
359             -transaction => { optional => 1, default => undef },
360             });
361              
362 0           my $frame;
363 0           my $header = {};
364              
365 0 0         $header->{'receipt'} = $p->{'receipt'} if (defined($p->{'receipt'}));
366 0 0         $header->{'transaction'} = $p->{'transaction'} if (defined($p->{'transaction'}));
367              
368 0 0         if ($self->target == 1.0) {
369              
370 0           $self->throw_msg(
371             'xas.lib.stomp.utils.nack',
372             'stomp_no_support',
373             $self->target,
374             'nack'
375             );
376              
377             }
378              
379 0 0         if ($self->target < 1.2) {
380              
381 0           $header->{'message-id'} = $p->{'message_id'};
382              
383             } else {
384              
385 0           $header->{'id'} = $p->{'message_id'};
386              
387             }
388              
389 0 0         if (defined($p->{'subscription'})) {
390              
391 0           $header->{'subscription'} = $p->{'subscription'};
392              
393             } else {
394              
395 0 0         if ($self->target > 1.0) {
396              
397 0           $self->throw_msg(
398             'xas.lib.stomp.utils.nact',
399             'stomp_no_support',
400             $self->target,
401             'nack'
402             );
403              
404             }
405              
406             }
407              
408 0           $frame = XAS::Lib::Stomp::Frame->new(
409             -command => 'NACK',
410             -headers => $header,
411             -body => ''
412             );
413              
414 0           return $frame;
415              
416             }
417              
418             sub disconnect {
419 0     0 1   my $self = shift;
420 0           my $p = validate_params(\@_, {
421             -receipt => { optional => 1, default => undef }
422             });
423              
424 0           my $frame;
425 0           my $header = {};
426              
427 0 0         $header->{'receipt'} = $p->{'receipt'} if (defined($p->{'receipt'}));
428              
429 0           $frame = XAS::Lib::Stomp::Frame->new(
430             -command => 'DISCONNECT',
431             -headers => $header,
432             -body => ''
433             );
434              
435 0           return $frame;
436              
437             }
438              
439             sub send {
440 0     0 1   my $self = shift;
441 0           my $p = validate_params(\@_, {
442             -destination => 1,
443             -message => 1,
444             -receipt => { optional => 1, default => undef },
445             -persistent => { optional => 1, default => undef },
446             -transaction => { optional => 1, default => undef },
447             -length => { optional => 1, default => undef },
448             -type => { optional => 1, default => 'text/plain' },
449             });
450              
451 0           my $frame;
452 0           my $header = {};
453 0           my $body = $p->{'message'};
454              
455 0           $header->{'destination'} = $p->{'destination'};
456 0 0         $header->{'receipt'} = $p->{'receipt'} if (defined($p->{'receipt'}));
457 0 0         $header->{'persistent'} = $p->{'persistent'} if (defined($p->{'presistent'}));
458 0 0         $header->{'transaction'} = $p->{'transaction'} if (defined($p->{'transaction'}));
459             {
460 1     1   1662 use bytes;
  1         1  
  1         8  
  0            
461 0 0         $header->{'content-length'} = defined($p->{'length'}) ? $p->{'length'} : length($body);
462             }
463              
464 0 0         if ($self->target > 1.0) {
465              
466 0           $header->{'content-type'} = $p->{'type'};
467              
468             }
469              
470 0           $frame = XAS::Lib::Stomp::Frame->new(
471             -command => 'SEND',
472             -headers => $header,
473             -body => $body
474             );
475              
476 0           return $frame;
477              
478             }
479              
480             sub noop {
481 0     0 1   my $self = shift;
482              
483 0           my $frame;
484              
485 0 0         if ($self->target == 1.0) {
486              
487 0           $self->throw_msg(
488             'xas.lib.stomp.utils.noop.nosup',
489             'stomp_no_support',
490             $self->target,
491             'noop'
492             );
493              
494             }
495              
496 0           $frame = XAS::Lib::Stomp::Frame->new(
497             -command => 'NOOP',
498             -headers => {},
499             -body => ''
500             );
501              
502 0           return $frame;
503              
504             }
505              
506             # -----------------------------------------------------------
507             # Private Methods
508             # -----------------------------------------------------------
509              
510             sub init {
511 0     0 1   my $class = shift;
512              
513 0           my $self = $class->SUPER::init(@_);
514              
515 0 0         unless (defined($self->{target})) {
516              
517 0           $self->{target} = $self->env->mqlevel;
518              
519             }
520              
521 0           return $self;
522              
523             }
524              
525             sub _match {
526 0     0     my $buffer = shift;
527              
528 0           foreach my $item (split(',', $buffer)) {
529              
530 0 0         return 0 if ($item !~ m/\d\.\d/);
531              
532             }
533              
534 0           return 1;
535              
536             }
537              
538             1;
539              
540             __END__
541              
542             =head1 NAME
543              
544             XAS::Lib::Stomp::Utils - STOMP protocol utilities for clients
545              
546             =head1 SYNOPSIS
547              
548             This module uses XAS::Lib::Stomp::Frame to create STOMP frames.
549              
550             use XAS::Lib::Stomp::Utils;
551              
552             my $stomp = XAS::Lib::Stomp::Utils->new();
553             my $frame = $stomp->connect(
554             -login => 'test',
555             -passcode => 'test'
556             );
557              
558             put($frame->to_string);
559              
560             =head1 DESCRIPTION
561              
562             This module is an easy way to create STOMP frames without worrying about
563             the various differences between the protocol versions.
564              
565             =head1 METHODS
566              
567             =head2 new
568              
569             This method initializes the base object. It takes the following parameters:
570              
571             =head2 connect
572              
573             This method creates a "CONNECT" frame. This frame is used to initiate a
574             session with a STOMP server. On STOMP v1.1 and later targets the following
575             headers are automatically set:
576              
577             host
578             heart-beat
579             accept-version
580              
581             Unless otherwise specified, they will be the defaults. This method takes the
582             following parameters:
583              
584             =over 4
585              
586             =item B<-login>
587              
588             An optional login name to be used on the STOMP server.
589              
590             =item B<-passcode>
591              
592             An optional password for the login name on the STOMP server.
593              
594             =item B<-host>
595              
596             An optional virtual host name to connect to on STOMP v1.1 and later servers.
597             Defaults to 'localhost'.
598              
599             =item B<-heart_beat>
600              
601             An optional heart beat request for STOMP v1.1 and later servers. The default
602             is to turn them off.
603              
604             =item B<-acceptable>
605              
606             An optional list of protocol versions that are acceptable to this client for
607             STOMP v1.1 and later clients. The default is '1.0,1.1,1.2'.
608              
609             =item B<-prefetch>
610              
611             This sets the optional header 'prefetch-size' for RabbitMQ or other servers
612             that support this extension.
613              
614             =back
615              
616             =head2 stomp
617              
618             This method creates a "STOMP" frame, this works the same as connect(), but
619             only works for STOMP v1.1 and later targets. Please see the documentation for
620             connect().
621              
622             =head2 disconnect
623              
624             This method creates a "DISCONNECT" frame. This frame is used to signal the
625             server that you no longer wish to communicate with it. This method takes the
626             following parameters:
627              
628             =over 4
629              
630             =item B<-receipt>
631              
632             An optional receipt that will be returned by the server.
633              
634             =back
635              
636             =head2 subscribe
637              
638             This method create a "SUBSCRIBE" frame. This frame is used to notify
639             the server which queues you want to listen too. The naming of queues is
640             left up to the server implementation. This method takes the following
641             parameters:
642              
643             =over 4
644              
645             =item B<-destination>
646              
647             The name of the queue you wish to subscribe too. Naming convention is
648             server dependent.
649              
650             =item B<-subscription>
651              
652             A mandatory subscription id for usage on STOMP v1.1 and later targets. It
653             has no meaning for STOMP v1.0 servers.
654              
655             =item B<-ack>
656              
657             The type of acknowledgement you would like to receive when messages are sent
658             to a queue. It defaults to 'auto'. It understands 'auto', 'client' and
659             'client-individual'. Please refer to the STOMP protocol reference for
660             what this means.
661              
662             =item B<-receipt>
663              
664             An optional receipt that will be returned by the server.
665              
666             =back
667              
668             =head2 unsubscribe
669              
670             This method creates an "UNSUBSCRIBE" frame. This frame is used to notify the
671             server that you don't want to subscribe to a queue anymore. Subsequently
672             any messages left on that queue will no longer be sent to your client.
673              
674             =over 4
675              
676             =item B<-destination>
677              
678             The optional name of the queue that you subscribed too. STOMP v1.0 targets
679             need a queue name and/or a subscription id to unsubscribe. This is optional
680             on v1.1 and later targets.
681              
682             =item B<-subscription>
683              
684             The id of the subscription, this should be the same as the one used
685             with subscribe(). This is optional on STOMP v1.0 servers and mandatory
686             on v1.1 and later targets.
687              
688             =item B<-receipt>
689              
690             An optional receipt that will be returned by the server.
691              
692             =back
693              
694             =head2 begin
695              
696             This method creates a "BEGIN" frame. This frame signals the server that a
697             transaction is beginning. A transaction is either ended by a "COMMIT" frame
698             or an "ABORT" frame. Any other frame that is sent must have a transaction id
699             associated with them. This method takes the following parameters:
700              
701             =over 4
702              
703             =item B<-transaction>
704              
705             The mandatory id for the transaction.
706              
707             =item B<-receipt>
708              
709             An optional receipt that will be returned by the server.
710              
711             =back
712              
713             =head2 commit
714              
715             This method creates a "COMMIT" frame. This frame signals the end of a
716             transaction. This method takes the following parameters:
717              
718             =over 4
719              
720             =item B<-transaction>
721              
722             The mandatory transaction id from begin().
723              
724             =item B<-receipt>
725              
726             An optional receipt that will be returned by the server.
727              
728             =back
729              
730             =head2 abort
731              
732             This method creates an "ABORT" frame. This frame is used to signal the server
733             that the current transaction is to be aborted.
734              
735             This method takes the following parameters:
736              
737             =over 4
738              
739             =item B<-transaction>
740              
741             The mandatory transaction id from begin().
742              
743             =item B<-receipt>
744              
745             An optional receipt that will be returned by the server.
746              
747             =back
748              
749             =head2 send
750              
751             This method creates a "SEND" frame. This frame is the basis of communication
752             over your queues to the server. This method takes the following parameters:
753              
754             =over 4
755              
756             =item B<-destination>
757              
758             The name of the queue to send the message too.
759              
760             =item B<-message>
761              
762             The message to be sent. No attempt is made to serializes the message.
763              
764             =item B<-transaction>
765              
766             An optional transaction number. This should be the same as for begin().
767              
768             =item B<-length>
769              
770             An optional length for the message. If one is not specified a
771             'content-length' header will be auto generated.
772              
773             =item B<-type>
774              
775             An optional MIME type for the message. If one is not specified, 'text/plain'
776             will be used. This only has meaning for STOMP v1.1 and later targets.
777              
778             =item B<-persistent>
779              
780             An optional header for indicating that this frame should be 'persisted' by
781             the server. What this means, is highly server specific.
782              
783             =item B<-receipt>
784              
785             An optional receipt that will be returned by the server.
786              
787             =back
788              
789             =head2 ack
790              
791             This method creates an "ACK" frame. This frame is used to tell the server that
792             the message was successfully received. This method takes the following
793             parameters:
794              
795             =over 4
796              
797             =item B<-message_id>
798              
799             The id of the message that is being acked.
800              
801             =item B<-subscription>
802              
803             This should match the id from the subscribe() method. This has meaning for
804             STOMP v1.1 and later targets.
805              
806             =item B<-transaction>
807              
808             The transaction id if this ack is part of a transaction.
809              
810             =item B<-receipt>
811              
812             An optional receipt that will be returned by the server.
813              
814             =back
815              
816             =head2 nack
817              
818             This method creates a "NACK" frame. It notifies the server that the message
819             was rejected. It has meaning on STOMP v1.1 and later targets. This method
820             takes the following parameters:
821              
822             =over 4
823              
824             =item B<-message_id>
825              
826             The id of the message that is being nacked.
827              
828             =item B<-subscription>
829              
830             This should match the id from the subscribe() method. This has meaning for
831             STOMP v1.1 and later targets.
832              
833             =item B<-transaction>
834              
835             The transaction id if this nack is part of a transaction.
836              
837             =item B<-receipt>
838              
839             An optional receipt that will be returned by the server.
840              
841             =back
842              
843             =head2 noop
844              
845             This method creates a "NOOP" frame. It has meaning on STOMP v1.1 and
846             later targets.
847              
848             =head1 SEE ALSO
849              
850             =over 4
851              
852             =item L<Net::Stomp|https://metacpan.org/pod/Net::Stomp>
853              
854             =item L<Net::Stomp::Frame|https://metacpan.org/pod/Net::Stomp::Frame>
855              
856             =item L<XAS|XAS>
857              
858             =back
859              
860             For more information on the STOMP protocol, please refer to: L<http://stomp.github.io/> .
861              
862             =head1 AUTHOR
863              
864             Kevin L. Esteb, E<lt>kevin@kesteb.usE<gt>
865              
866             =head1 COPYRIGHT AND LICENSE
867              
868             Copyright (C) 2014 Kevin L. Esteb
869              
870             This is free software; you can redistribute it and/or modify it under
871             the terms of the Artistic License 2.0. For details, see the full text
872             of the license at http://www.perlfoundation.org/artistic_license_2_0.
873              
874             =cut