File Coverage

blib/lib/SQS/Worker.pm
Criterion Covered Total %
statement 23 28 82.1
branch n/a
condition n/a
subroutine 8 10 80.0
pod 0 4 0.0
total 31 42 73.8


line stmt bran cond sub pod time code
1             package SQS::Worker;
2 8     8   77278 use Paws;
  8         17  
  8         222  
3 8     8   38 use Moose::Role;
  8         14  
  8         57  
4 8     8   42744 use Data::Dumper;
  8         39665  
  8         436  
5 8     8   2538 use SQS::Consumers::Default;
  8         23  
  8         218  
6 8     8   2846 use SQS::Consumers::DeleteAlways;
  8         22  
  8         259  
7 8     8   3027 use SQS::Consumers::DeleteAndFork;
  8         23  
  8         2622  
8              
9             our $VERSION = '0.05';
10              
11             requires 'process_message';
12              
13             has queue_url => (is => 'ro', isa => 'Str', required => 1);
14             has region => (is => 'ro', isa => 'Str', required => 1);
15              
16             has sqs => (is => 'ro', isa => 'Paws::SQS', lazy => 1, default => sub {
17             my $self = shift;
18             Paws->service('SQS', region => $self->region);
19             });
20              
21             has log => (is => 'ro', required => 1);
22              
23             has on_failure => (is => 'ro', isa => 'CodeRef', default => sub {
24             return sub {
25             my ($self, $message) = @_;
26             $self->log->error("Error processing message " . $message->ReceiptHandle);
27             $self->log->debug("Message Dump " . Dumper($message));
28             }
29             });
30              
31             has processor => (is => 'ro', lazy => 1, default => sub {
32             my $self = shift;
33             return SQS::Consumers::Default->new;
34             });
35              
36             sub fetch_message {
37 17     17 0 19623 my $self = shift;
38 17         492 $self->processor->fetch_message($self);
39             }
40              
41             sub run {
42 0     0 0 0 my $self = shift;
43 0         0 while (1) {
44 0         0 $self->fetch_message;
45             }
46             }
47              
48             sub delete_message {
49 0     0 0 0 my ($self, $message) = @_;
50 0         0 $self->sqs->DeleteMessage(
51             QueueUrl => $self->queue_url,
52             ReceiptHandle => $message->ReceiptHandle,
53             );
54             }
55              
56             sub receive_message {
57 17     17 0 34 my $self = shift;
58 17         378 my $message_pack = $self->sqs->ReceiveMessage(
59             WaitTimeSeconds => 20,
60             QueueUrl => $self->queue_url,
61             MaxNumberOfMessages => 1
62             );
63 17         671 return $message_pack;
64             }
65              
66             1;
67              
68             =head1 NAME
69              
70             SQS::Worker - A light framework for processing messages from SQS queues
71              
72             =head1 DESCRIPTION
73              
74             SQS::Worker is a light framework that allows you to just code asyncronous tasks
75             that consume messages from an SQS Queue. The framework takes care of launching the
76             necessary processes (workers), and executes your code on incoming messages, so you
77             can focus on writing the important part (behavior)
78              
79             Also, since you're surely going to be deserializing the messages that come from the
80             queue, SQS::Worker provides you with ways to easily consume JSON messages, for example.
81              
82             It comes in the form of a Moose role that is to be composed into the end user code
83             that wants to receive and process messages from an SQS queue.
84              
85             The worker runs uninterrumped, fetching messages from it's configured queue,
86             one at a time and then executing the process_message of the worker class.
87              
88             The worker consumer can compose further funcionality by consuming more roles
89             from the SQS::Worker namespace.
90              
91             =head1 USAGE
92              
93             Simple usage
94              
95             package YourWorker;
96              
97             use Moose;
98             with 'SQS::Worker';
99              
100             sub process_message {
101             my ($self,$message) = @_;
102              
103             # $message is a Paws::SQS::Message
104             # do something with the message
105             }
106              
107             Composing automatic json decoding to perl data structure
108              
109             package YourWorker;
110             use Moose;
111             with 'SQS::Worker', 'SQS::Worker::DecodeJson';
112              
113             sub process_mesage {
114             my ($self, $data) = @_;
115            
116             # Do something with the data, already parsed into a structure
117             my $name = $data->{name};
118              
119             # You get a logger attached to the worker so you can log stuff
120             $c->log->info("I processed a message for $name");
121             }
122              
123             =head1 Bundled roles
124              
125             L<SQS::Worker::DecodeJson> decodes the message body in json format and passes
126              
127             L<SQS::Worker::DecodeStorable> decodes the message body in Perl storable format
128              
129             L<SQS::Worker::Multiplex> dispatches to different methods via a dispatch table
130              
131             L<SQS::Worker::SNS> decodes a message sent from SNS and inflates it to a C<SNS::Notfication>
132              
133             =head1 Creating your own processing module
134              
135             Create a Moose role that wraps functionality around the method C<process_message>
136              
137             package PrefixTheMessage;
138             use Moose::Role;
139              
140             around process_message => sub {
141             my ($orig, $self, $message) = @_;
142             return 'prefixed ' . $message->Body;
143             };
144              
145             1;
146              
147             And then use it inside your consumers
148              
149             package YourWorker;
150            
151             use Moose;
152             with 'SQS::Worker', 'PrefixTheMessage';
153            
154             sub process_mesage {
155             my ($self, $message) = @_;
156             # surprise! $message is prefixed!
157             }
158            
159             1;
160              
161             =head1 Composing roles
162              
163             The worker roles can be composed (if it makes sense), so your worker could implement
164              
165             with 'SQS::Worker', 'SQS::Worker::DecodeJson', 'SQS::Worker::Multiplex';
166              
167             to decode a message in json format that will then dispatch the json to the multiplex worker
168              
169             =head1 Error handling
170              
171             Any exception thrown from process_message will be treated as a failed message. Different
172             message processors treat failed messages in different ways:
173              
174             =head1 Message processors
175              
176             L<SQS::Consumers::Default> Messages processed before deleting them from the queue. If a message fails,
177             it will be treated by SQS as an unprocessed message, and will reappear in the queue to be processed
178             again by SQS (or delivered to a dead letter queue after N redeliveries if your SQS queue is configured
179             appropiately
180              
181             L<SQS::Consumers::DeleteAlways> Message deleted, then processed. If a message fails it will
182             not be reprocessed ever
183              
184             =head1 Running the worker
185              
186             Running the worker can be done via the C<spawn_worker> command that comes bundled with the
187             distribution
188              
189             spawn_worker --worker YourWorker --queue_url sqs_endpoint_url --region aws_sqs_region --log_conf log4perl_config_file_path
190              
191             You can also control if the message should be deleted upon reception (before the message is actually processed) with
192              
193             spawn_worker --worker YourClass --queue_url sqs_endpoint_url --region aws_sqs_region --log_conf log4perl_config_file_path --consumer DeleteAlways
194              
195             or you can create an instance of your object and invoke run:
196              
197             my $worker_instance = YourWorker->new(
198             queue_url => $args->queue_url,
199             region => $args->region,
200             log => Log::Log4perl->get_logger('async'),
201             processor => $args->_consumer,
202             );
203             $worker_instance->run
204              
205             =head1 Credentials
206              
207             SQS::Worker uses the same credential system as L<Paws> to authenticate to SQS: so, in a nutshell, it
208             will work if you:
209              
210             =over
211              
212             =item *
213              
214             have the credentials in the home of the user launching the script, in the ~/.aws/credentials file.
215              
216             =item *
217              
218             assign an IAM role to the EC2 instance that is running the code (if deploying the code inside an EC2 instance)
219              
220             =item *
221              
222             set environment variables: AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
223              
224             =back
225              
226             =head1 SEE ALSO
227            
228             L<Paws>
229            
230             =head1 COPYRIGHT and LICENSE
231            
232             Copyright (c) 2016 by CAPSiDE
233            
234             This code is distributed under the Apache 2 License. The full text of the license can be found in the LICENSE file included with this module.
235            
236             =head1 AUTHORS
237              
238             Jose Luis Martinez, Albert Hilazo, Pau Cervera and Loic Prieto
239              
240             =cut