File Coverage

blib/lib/IO/Iron/IronMQ/Queue.pm
Criterion Covered Total %
statement 29 236 12.2
branch 0 36 0.0
condition n/a
subroutine 12 29 41.3
pod 15 15 100.0
total 56 316 17.7


line stmt bran cond sub pod time code
1             package IO::Iron::IronMQ::Queue;
2              
3             ## no critic (Documentation::RequirePodAtEnd)
4             ## no critic (Documentation::RequirePodSections)
5             ## no critic (Subroutines::RequireArgUnpacking)
6             ## no critic (ControlStructures::ProhibitPostfixControls)
7              
8 3     3   52 use 5.010_000;
  3         12  
9 3     3   18 use strict;
  3         7  
  3         65  
10 3     3   14 use warnings;
  3         8  
  3         92  
11              
12             # Global creator
13       3     BEGIN {
14             # Export Nothing
15             }
16              
17             # Global destructor
18       3     END {
19             }
20              
21              
22             # ABSTRACT: IronMQ (Online Message Queue) Client (Queue).
23              
24             our $VERSION = '0.12_01'; # TRIAL VERSION: generated by DZP::OurPkgVersion
25              
26              
27 3     3   22 use Log::Any qw($log);
  3         6  
  3         24  
28 3     3   622 use Hash::Util 0.06 qw{lock_keys unlock_keys};
  3         49  
  3         16  
29 3     3   194 use Carp::Assert::More;
  3         6  
  3         498  
30 3     3   21 use English '-no_match_vars';
  3         8  
  3         16  
31 3     3   1113 use Params::Validate qw(:all);
  3         6  
  3         488  
32              
33 3     3   23 use IO::Iron::Common;
  3         6  
  3         71  
34 3     3   16 use IO::Iron::IronMQ::Api;
  3         5  
  3         8411  
35             require IO::Iron::IronMQ::Message;
36              
37              
38              
39             sub new {
40 0     0 1   my $class = shift;
41 0           my %params = validate(
42             @_, {
43             'name' => { type => SCALAR, }, # queue name.
44             'ironmq_client' => { type => OBJECT, }, # Reference to IronMQ client
45             'connection' => { type => OBJECT, }, # Reference to REST client
46             }
47             );
48 0           $log->tracef( 'Entering new(%s, %s)', $class, \%params );
49 0           my $self;
50 0           my @self_keys = ( ## no critic (CodeLayout::ProhibitQuotedWordLists)
51             'ironmq_client', # Reference to IronMQ client
52             'name', # Queue name
53             'connection', # Reference to REST client
54             'last_http_status_code', # After successfull network operation, the return value is here.
55             );
56 0           lock_keys( %{$self}, @self_keys );
  0            
57 0           $self->{'ironmq_client'} = $params{'ironmq_client'};
58 0           $self->{'name'} = $params{'name'};
59 0           $self->{'connection'} = $params{'connection'};
60 0           assert_isa( $self->{'connection'}, 'IO::Iron::Connection', 'self->{\'connection\'} is IO::Iron::Connection.' );
61 0           assert_isa( $self->{'ironmq_client'}, 'IO::Iron::IronMQ::Client', 'self->{\'ironmq_client\'} is IO::Iron::IronMQ::Client.' );
62 0           assert_nonblank( $self->{'name'}, 'self->{\'name\'} is defined and is not blank.' );
63              
64 0           unlock_keys( %{$self} );
  0            
65 0           my $blessed_ref = bless $self, $class;
66 0           lock_keys( %{$self}, @self_keys );
  0            
67              
68 0           $log->tracef( 'Exiting new: %s', $blessed_ref );
69 0           return $blessed_ref;
70             }
71              
72              
73             sub size {
74 0     0 1   my $self = shift;
75 0           my %params = validate(
76             @_, {
77             # No parameters
78             }
79             );
80 0           $log->tracef('Entering size().');
81              
82 0           my $queue_name = $self->name();
83 0           my $connection = $self->{'connection'};
84 0           my ( $http_status_code, $response_message ) =
85             $connection->perform_iron_action(
86             IO::Iron::IronMQ::Api::IRONMQ_GET_QUEUE_INFO(),
87             { '{Queue Name}' => $queue_name, } );
88 0           $self->{'last_http_status_code'} = $http_status_code;
89 0           my $size = $response_message->{'queue'}->{'size'};
90 0           $log->debugf( 'Queue size is %s.', $size );
91              
92 0           $log->tracef( 'Exiting size(): %s', $size );
93 0           return $size;
94             }
95              
96              
97             sub post_messages {
98             # TODO Limit the total size!
99 0     0 1   my $self = shift;
100             my %params = validate(
101             @_, {
102             'messages' => {
103             type => ARRAYREF,
104             callbacks => {
105             'assert_class' => sub {
106 0     0     foreach my $message (@{$_[0]}) {
  0            
107 0           assert_isa( $message, 'IO::Iron::IronMQ::Message',
108             'Message is IO::Iron::IronMQ::Message.' );
109             # FIXME Do this better!
110             }
111 0           return 1;
112             }
113             }
114             }, # one or more objects of class IO::Iron::IronMQ::Message.
115             }
116 0           );
117 0           my @messages = @{$params{'messages'}};
  0            
118 0           $log->tracef( 'Entering post_messages(%s)', @messages );
119              
120 0           my $queue_name = $self->name();
121 0           my $connection = $self->{'connection'};
122 0           my @message_contents;
123 0           foreach my $message (@messages) {
124 0           my ( $msg_body, $msg_delay, $msg_push_headers, ) = (
125             $message->body(), $message->delay(), $message->push_headers(),
126             );
127 0           my $message_content = {};
128 0           $message_content->{'body'} = $msg_body;
129 0 0         $message_content->{'delay'} = $msg_delay if defined $msg_delay;
130 0 0         $message_content->{'push_headers'} = $msg_push_headers if defined $msg_push_headers;
131             # Gimmick to ensure the proper jsonization of numbers
132             # Otherwise numbers might end up as strings.
133 0           $message_content->{'delay'} += 0;
134              
135 0           CORE::push @message_contents, $message_content;
136             }
137 0           my %item_body = ( 'messages' => \@message_contents );
138              
139 0           my ( $http_status_code, $response_message ) =
140             $connection->perform_iron_action(
141             IO::Iron::IronMQ::Api::IRONMQ_POST_MESSAGES(),
142             {
143             '{Queue Name}' => $queue_name,
144             'body' => \%item_body,
145             }
146             );
147 0           $self->{'last_http_status_code'} = $http_status_code;
148              
149 0           my ( @ids, $msg );
150 0           @ids = ( @{ $response_message->{'ids'} } ); # message ids.
  0            
151 0           $msg = $response_message->{'msg'}; # Should be "Messages put on queue."
152             $log->debugf( 'Pushed IronMQ Message(s) (queue name=%s; message id(s)=%s).',
153 0           $self->{'name'}, ( join q{,}, @ids ) );
154 0 0         if (wantarray) {
155 0           $log->tracef( 'Exiting post_messages: %s', ( join q{:}, @ids ) );
156 0           return @ids;
157             }
158             else {
159 0 0         if ( scalar @messages == 1 ) {
160 0           $log->tracef( 'Exiting post_messages: %s', $ids[0] );
161 0           return $ids[0];
162             }
163             else {
164 0           $log->tracef( 'Exiting post_messages: %s', scalar @ids );
165 0           return scalar @ids;
166             }
167             }
168             }
169              
170              
171             sub reserve_messages {
172 0     0 1   my $self = shift;
173 0           my %params = validate(
174             @_, {
175             'n' => { type => SCALAR, optional => 1, }, # Number of messages to pull.
176             'timeout' => { type => SCALAR, optional => 1, }, # When reading from queue, after timeout (in seconds), item will be placed back onto queue.
177             'wait' => { type => SCALAR, optional => 1, }, # Seconds to long poll the queue.
178             'delete' => { type => SCALAR, optional => 1, } # Do not put each message back on to the queue after reserving.
179             }
180             );
181 0           assert_positive(wantarray, 'Method reserve_messages() only works in LIST context!');
182 0           $log->tracef( 'Entering reserve_messages(%s)', \%params );
183              
184 0           my $queue_name = $self->name();
185 0           my $connection = $self->{'connection'};
186 0           my %item_body;
187 0 0         $item_body{'n'} = $params{'n'}+0 if $params{'n'};
188 0 0         $item_body{'timeout'} = $params{'timeout'}+0 if $params{'timeout'};
189 0 0         $item_body{'wait'} = $params{'wait'}+0 if $params{'wait'};
190 0 0         $item_body{'delete'} = $params{'delete'} if $params{'delete'};
191 0           my ( $http_status_code, $response_message ) =
192             $connection->perform_iron_action(
193             IO::Iron::IronMQ::Api::IRONMQ_RESERVE_MESSAGES(),
194             {
195             '{Queue Name}' => $queue_name,
196             'body' => \%item_body,
197             }
198             );
199 0           $self->{'last_http_status_code'} = $http_status_code;
200              
201 0           my @pulled_messages;
202 0           my $messages = $response_message->{'messages'}; # messages.
203 0           foreach ( @{$messages} ) {
  0            
204 0           my $msg = $_;
205             $log->debugf( 'Pulled IronMQ Message (queue name=%s; message id=%s).',
206 0           $self->{'name'}, $msg->{'id'} );
207             my $message = IO::Iron::IronMQ::Message->new(
208             'body' => $msg->{'body'},
209             'id' => $msg->{'id'},
210             'reserved_count' => $msg->{'reserved_count'},
211 0           'reservation_id' => $msg->{'reservation_id'},
212             );
213 0           CORE::push @pulled_messages,
214             $message; # using CORE routine, not this class' method.
215             }
216             $log->debugf( 'Reserved %d IronMQ Messages (queue name=%s).',
217 0           scalar @pulled_messages, $self->{'name'} );
218 0 0         $log->tracef( 'Exiting reserve_messages(): %s',
219             @pulled_messages ? @pulled_messages : '[NONE]' );
220 0           return @pulled_messages;
221             }
222              
223              
224             sub peek_messages {
225 0     0 1   my $self = shift;
226 0           my %params = validate(
227             @_, {
228             'n' => { type => SCALAR, optional => 1, }, # Number of messages to read.
229             }
230             );
231 0           assert_positive(wantarray, 'Method peek_messages() only works in LIST context!');
232 0           $log->tracef( 'Entering peek_messages(%s)', \%params );
233              
234 0           my $queue_name = $self->name();
235 0           my $connection = $self->{'connection'};
236 0           my %query_params;
237 0 0         $query_params{'{n}'} = $params{'n'} if $params{'n'};
238 0           my ( $http_status_code, $response_message ) =
239             $connection->perform_iron_action(
240             IO::Iron::IronMQ::Api::IRONMQ_PEEK_MESSAGES(),
241             {
242             '{Queue Name}' => $queue_name,
243             %query_params
244             }
245             );
246 0           $self->{'last_http_status_code'} = $http_status_code;
247              
248 0           my @peeked_messages;
249 0           my $messages = $response_message->{'messages'}; # messages.
250 0           foreach ( @{$messages} ) {
  0            
251 0           my $msg = $_;
252             $log->debugf( 'peeked IronMQ Message (queue name=%s; message id=%s.',
253 0           $self->{'name'}, $msg->{'id'} );
254             my $message = IO::Iron::IronMQ::Message->new(
255             'body' => $msg->{'body'},
256 0           'id' => $msg->{'id'},
257             );
258 0 0         $message->reserved_count($msg->{'reserved_count'}) if $msg->{'reserved_count'};
259             # When peeking, timeout is not returned
260             # (it is irrelevent, because peeking does not reserve the message).
261 0           push @peeked_messages, $message;
262             }
263 0 0         $log->tracef( 'Exiting peek_messages(): %s',
264             @peeked_messages ? @peeked_messages : '[NONE]' );
265 0           return @peeked_messages;
266             }
267              
268              
269             sub delete_message {
270 0     0 1   my $self = shift;
271 0           my %params = validate(
272             @_, {
273             'message' => {
274             type => OBJECT,
275             isa => 'IO::Iron::IronMQ::Message',
276             optional => 0,
277             },
278             'subscriber_name' => {
279             type => SCALAR,
280             optional => 1,
281             },
282             }
283             );
284 0           $log->tracef( 'Entering delete(%s)', \%params );
285              
286 0           my $queue_name = $self->name();
287 0           my $connection = $self->{'connection'};
288 0           my $message = $params{'message'};
289 0           my %item_body = ( 'reservation_id' => $message->reservation_id(), );
290 0 0         $item_body{'subscriber_name'} = $params{'subscriber_name'} if $params{'subscriber_name'};
291 0           my ( $http_status_code, $response_message ) =
292             $connection->perform_iron_action(
293             IO::Iron::IronMQ::Api::IRONMQ_DELETE_MESSAGE(),
294             {
295             '{Queue Name}' => $queue_name,
296             '{Message ID}' => $message->id(),
297             'body' => \%item_body,
298             }
299             );
300 0           $self->{'last_http_status_code'} = $http_status_code;
301              
302 0           my $msg = $response_message->{'msg'}; # Should be 'Deleted'
303             $log->debugf( 'Deleted IronMQ Message (queue name=%s; message id=%s.',
304 0           $queue_name, $params{'message'}->id() );
305 0           $log->tracef( 'Exiting delete_message(): %s', 'undef' );
306 0           return;
307             }
308              
309              
310             sub delete_messages {
311 0     0 1   my $self = shift;
312             # my %params = validate(
313             # @_, {
314             # 'ids' => {
315             # type => ARRAYREF,
316             # }, # one or more id strings (alphanum text string).
317             # }
318             # );
319 0           my @messages = validate_pos(@_, ( { type => OBJECT, isa => 'IO::Iron::IronMQ::Message', } ) x scalar @_);
320             # my @message_ids = @{$params{'ids'}};
321 0           assert_positive(scalar @messages, 'There is one or more messages.');
322 0           $log->tracef( 'Entering delete_messages(%s)', \@messages );
323              
324 0           my $queue_name = $self->name();
325 0           my $connection = $self->{'connection'};
326 0           my %item_body = ( 'ids' => [ ], );
327 0           my @message_ids;
328 0           foreach my $msg (@messages) {
329 0           CORE::push @{$item_body{'ids'}}, { 'id' => $msg->id(), 'reservation_id' => $msg->reservation_id(), };
  0            
330 0           CORE::push @message_ids, $msg->id();
331             }
332              
333 0           my ( $http_status_code, $response_message ) =
334             $connection->perform_iron_action(
335             IO::Iron::IronMQ::Api::IRONMQ_DELETE_MESSAGES(),
336             {
337             '{Queue Name}' => $queue_name,
338             'body' => \%item_body,
339             }
340             );
341 0           $self->{'last_http_status_code'} = $http_status_code;
342              
343 0           my $msg = $response_message->{'msg'}; # Should be 'Deleted'
344 0           $log->debugf( 'Deleted IronMQ Message(s) (queue name=%s; message id(s)=%s.',
345             $queue_name, ( join q{,}, @message_ids ) );
346 0           $log->tracef( 'Exiting delete_messages: %s', 'undef' );
347 0           return;
348             }
349              
350              
351             sub touch_message {
352 0     0 1   my $self = shift;
353 0           my %params = validate(
354             @_, {
355             'message' => {
356             type => OBJECT,
357             isa => 'IO::Iron::IronMQ::Message',
358             optional => 0,
359             },
360             'timeout' => {
361             type => SCALAR,
362             optional => 1,
363             },
364             }
365             );
366 0           $log->tracef( 'Entering touch_message(%s)', \%params );
367              
368 0           my $queue_name = $self->name();
369 0           my $connection = $self->{'connection'};
370 0           my $message = $params{'message'};
371 0           my %item_body = ( 'reservation_id' => $message->reservation_id(), );
372 0 0         $item_body{'timeout'} = $params{'timeout'} if $params{'timeout'};
373 0           my ( $http_status_code, $response_message ) = $connection->perform_iron_action(
374             IO::Iron::IronMQ::Api::IRONMQ_TOUCH_MESSAGE(),
375             {
376             '{Queue Name}' => $queue_name,
377             '{Message ID}' => $message->id(),
378             'body' => \%item_body,
379             }
380             );
381 0           $self->{'last_http_status_code'} = $http_status_code;
382 0           $message->reservation_id($response_message->{'reservation_id'});
383 0           $log->debugf( 'Touched IronMQ Message (queue name=%s; message id=%s.',
384             $queue_name, $message->id() );
385              
386 0           $log->tracef( 'Exiting touch_message(): %s', 'undef' );
387 0           return;
388             }
389              
390              
391             sub release_message {
392 0     0 1   my $self = shift;
393 0           my %params = validate(
394             @_, {
395             'message' => {
396             type => OBJECT,
397             isa => 'IO::Iron::IronMQ::Message',
398             optional => 0,
399             },
400             'delay' => { type => SCALAR, optional => 1, }, # Delay before releasing.
401             }
402             );
403 0 0         assert_nonnegative_integer( $params{'delay'} ? $params{'delay'} : 0, 'Parameter delay is a non negative integer.' );
404 0           $log->tracef( 'Entering release_message(%s)', \%params );
405              
406 0           my $queue_name = $self->name();
407 0           my $connection = $self->{'connection'};
408 0           my $message = $params{'message'};
409 0           my %item_body = ( 'reservation_id' => $message->reservation_id(), );
410 0 0         $item_body{'delay'} = $params{'delay'} if $params{'delay'};
411             # We do not give delay a default value (0); we let IronMQ use internal default values!
412 0           my ( $http_status_code, $response_message ) =
413             $connection->perform_iron_action(
414             IO::Iron::IronMQ::Api::IRONMQ_RELEASE_MESSAGE(),
415             {
416             '{Queue Name}' => $queue_name,
417             '{Message ID}' => $message->id(),
418             'body' => \%item_body,
419             }
420             );
421 0           $self->{'last_http_status_code'} = $http_status_code;
422             $log->debugf(
423             'Released IronMQ Message(s) (queue name=%s; message id=%s; delay=%d)',
424 0 0         $queue_name, $params{'id'}, $params{'delay'} ? $params{'delay'} : 0 );
425              
426 0           $log->tracef( 'Exiting release_message: %s', 1 );
427 0           return 1;
428             }
429              
430              
431             sub clear_messages {
432 0     0 1   my $self = shift;
433 0           my %params = validate(
434             @_, {
435             # No parameters
436             }
437             );
438 0           $log->tracef('Entering clear_messages()');
439              
440 0           my $queue_name = $self->name();
441 0           my $connection = $self->{'connection'};
442 0           my %item_body;
443 0           my ( $http_status_code, $response_message ) =
444             $connection->perform_iron_action(
445             IO::Iron::IronMQ::Api::IRONMQ_CLEAR_MESSAGES(),
446             {
447             '{Queue Name}' => $queue_name,
448             'body' => \%item_body, # Empty body.
449             }
450             );
451 0           $self->{'last_http_status_code'} = $http_status_code;
452 0           my $msg = $response_message->{'msg'}; # Should be 'Cleared'
453 0           $log->debugf( 'Cleared IronMQ Message queue %s.', $queue_name );
454 0           $log->tracef( 'Exiting clear_messages: %s', 'undef' );
455 0           return;
456             }
457              
458              
459             sub get_push_statuses {
460 0     0 1   my $self = shift;
461 0           my %params = validate(
462             @_, {
463             'id' => { type => SCALAR, }, # message id.
464             }
465             );
466 0           assert_positive(wantarray == 0, 'Method get_push_statuses() only works in SCALAR context!');
467 0           assert_nonblank( $params{'id'}, 'Parameter id is a non null string.');
468 0           $log->tracef('Entering get_push_statuses(%s)', \%params);
469              
470 0           my $queue_name = $self->name();
471 0           my $connection = $self->{'connection'};
472             my ($http_status_code, $response_message) = $connection->perform_iron_action(
473             IO::Iron::IronMQ::Api::IRONMQ_GET_PUSH_STATUSES_FOR_A_MESSAGE(),
474             {
475             '{Queue Name}' => $queue_name,
476 0           '{Message ID}' => $params{'id'},
477             }
478             );
479 0           $self->{'last_http_status_code'} = $http_status_code;
480 0           my $info = $response_message;
481 0           $log->debugf('Returned push status for message %s.', $params{'id'});
482              
483 0           $log->tracef('Exiting get_push_statuses: %s', $info);
484 0           return $info;
485             }
486              
487              
488 0     0 1   sub ironmq_client { return $_[0]->_access_internal('ironmq_client', $_[1]); }
489 0     0 1   sub name { return $_[0]->_access_internal('name', $_[1]); }
490 0     0 1   sub connection { return $_[0]->_access_internal('connection', $_[1]); }
491 0     0 1   sub last_http_status_code { return $_[0]->_access_internal('last_http_status_code', $_[1]); }
492              
493             # TODO Move _access_internal() to IO::Iron::Common.
494              
495             sub _access_internal {
496 0     0     my ($self, $var_name, $var_value) = @_;
497 0           $log->tracef('_access_internal(%s, %s)', $var_name, $var_value);
498 0 0         if( defined $var_value ) {
499 0           $self->{$var_name} = $var_value;
500 0           return $self;
501             }
502             else {
503 0           return $self->{$var_name};
504             }
505             }
506              
507             1;
508              
509             __END__