File Coverage

blib/lib/Spread/Queue/Worker.pm
Criterion Covered Total %
statement 7 9 77.7
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 10 12 83.3


line stmt bran cond sub pod time code
1             package Spread::Queue::Worker;
2              
3             =head1 NAME
4              
5             Spread::Queue::Worker - accept Spread::Queue message assignments
6              
7             =head1 SYNOPSIS
8              
9             use Spread::Queue::Worker;
10              
11             my $worker = new Spread::Queue::Worker(QUEUE => "myqueue",
12             CALLBACK => \&mycallback,
13             );
14             $worker->run;
15              
16             sub mycallback {
17             my ($worker, $originator, $input) = @_;
18              
19             my $result = {
20             response => "I heard you!",
21             };
22             $worker->respond($originator, $result);
23             }
24              
25             =head1 DESCRIPTION
26              
27             A process that declares itself to be a Spread::Queue::Worker will be
28             assigned messages in FIFO fashion by the sqm queue manager.
29              
30             Messages as supported by Spread::Queue are serialized Perl hashes.
31             Spread::Queue does not enforce structure on message contents.
32              
33             A running sqm for the queue is required before any messages will
34             be routed to the worker. Worker will not terminate if sqm is not
35             running, or if it goes away. If the sqm terminates and restarts,
36             it will reacquire any running workers (via heartbeat status signals).
37              
38             =head1 METHODS
39              
40             =cut
41              
42             require 5.005_03;
43 6     6   950416 use strict;
  6         12  
  6         223  
44 6     6   31 use vars qw($VERSION);
  6         12  
  6         297  
45             $VERSION = '0.4';
46              
47 6     6   13156 use Spread::Session;
  0            
  0            
48             use Data::Serializer;
49             use Carp;
50             use Log::Channel;
51              
52             my $DEFAULT_HEARTBEAT = 2;
53              
54             BEGIN {
55             my $sqwlog = new Log::Channel;
56             sub sqwlog { $sqwlog->(@_) }
57             }
58              
59             =item B
60              
61             my $worker = new Spread::Queue::Worker("myqueue");
62              
63             Establish link to Spread messaging environment, and prepare to receive
64             messages on specific queue. Queue name will be obtained from
65             SPREAD_QUEUE environment variable if not provided here.
66              
67             =cut
68              
69             sub new {
70             my $proto = shift;
71             my $class = ref ($proto) || $proto;
72              
73             my %config = @_;
74             my $self = \%config;
75             bless ($self, $class);
76              
77             $self->{QUEUE} = $ENV{SPREAD_QUEUE} unless $self->{QUEUE};
78             croak "Queue name is required" unless $self->{QUEUE};
79             croak "Callback function is required" unless $self->{CALLBACK};
80              
81             $self->{HEARTBEAT} = $DEFAULT_HEARTBEAT unless $self->{HEARTBEAT};
82              
83             $self->{WQNAME} = "WQ_$self->{QUEUE}";
84             my $session = new Spread::Session (
85             MESSAGE_CALLBACK => \&_message_callback,
86             TIMEOUT_CALLBACK => \&_timeout_callback,
87             );
88             $self->{SESSION} = $session;
89             $self->{SERIALIZER} = new Data::Serializer;
90              
91             sqwlog "Message queue worker activated on $self->{QUEUE}\n";
92              
93             $self->{STATUS} = 'ready';
94             $self->{METRICS} = {
95             start_time => time,
96             num_messages => 0,
97             };
98             return $self;
99             }
100              
101             =item B
102              
103             $worker->run;
104              
105             Main loop for queue processing. Each incoming message will trigger a
106             call to the user-specified callback function.
107              
108             The loop will exit when $worker->terminate is called.
109              
110             =cut
111              
112             sub run {
113             my ($self) = shift;
114              
115             $self->_timeout_callback;
116              
117             for (;;) {
118             $self->{SESSION}->receive($self->{HEARTBEAT}, $self);
119              
120             last if $self->{TERMINATED};
121             }
122             }
123              
124             =item B
125              
126             use Event;
127             $worker->setup_Event;
128             Event::loop;
129              
130             Configure Event.pm callback for processing incoming messages.
131             $worker->terminate is still recommended in this configuration, to
132             advise the queue manager to no longer assign tasks to this worker.
133              
134             =cut
135              
136             sub setup_Event {
137             my ($self) = shift;
138              
139             $self->{IS_EVENT} = 1;
140             Event->io(fd => $self->{SESSION}->{MAILBOX},
141             cb => sub { $self->{SESSION}->receive(0, $self) },
142             );
143             $self->{EVENT_TIMER} = Event->timer(interval => $self->{HEARTBEAT},
144             cb => sub { $self->_timeout_callback },
145             );
146             }
147              
148             sub _message_callback {
149             my ($msg, $self) = @_;
150              
151             $self->{STATUS} = 'busy';
152              
153             if ($self->{EVENT_TIMER}) {
154             $self->{EVENT_TIMER}->cancel
155             }
156              
157             # set status with the queue manager
158             $self->_notify('working');
159              
160             my $content = $self->{SERIALIZER}->deserialize($msg->{BODY});
161              
162             my $body = $self->{SERIALIZER}->deserialize($content->{body});
163              
164             $self->{METRICS}->{num_messages}++;
165              
166             $self->{SESSION}->publish($content->{originator},
167             $self->{SERIALIZER}->serialize({
168             type => "ack",
169             }));
170              
171             # use eval so the loop doesn't die if there's bad code
172             eval {
173             $self->{CALLBACK}->($self,
174             $content->{originator},
175             $body);
176             };
177             if ($@) {
178             # @@@@ may want some more sophisticated handling here.
179             carp $@;
180             }
181             # ready for next task
182             $self->{STATUS} = 'ready';
183             $self->_notify('ready');
184              
185             if ($self->{EVENT_TIMER}) {
186             $self->{EVENT_TIMER} = Event->timer(interval => $self->{HEARTBEAT},
187             cb => sub { $self->_timeout_callback },
188             );
189             }
190             }
191              
192             sub _timeout_callback {
193             my ($self) = @_;
194              
195             # sqwlog "TIMEOUT\n";
196              
197             if ($self->{STATUS} eq 'ready') {
198             # ping the sqm so it knows we're available
199             $self->_notify('ready');
200             }
201             # return if $self->{TERMINATED};
202             }
203              
204             =item B
205              
206             $worker->respond($originator, $result);
207              
208             If the worker wants to send a reply back to the originator of the
209             request (e.g. in a request-reply environment). $originator is the
210             Spread private mailbox address sent to the callback function.
211             $result is a reference to a Perl hash.
212              
213             =cut
214              
215             sub respond {
216             my ($self, $originator, $payload) = @_;
217              
218             sqwlog "Responding to $originator\n";
219             $self->{SESSION}->publish($originator,
220             $self->{SERIALIZER}->serialize({
221             type => "response",
222             body => $payload
223             }));
224             # $self->_notify('ready');
225             }
226              
227              
228             sub _status {
229             my ($self, $status) = @_;
230              
231             return $self->{SERIALIZER}->serialize({ status => $status });
232             }
233              
234             sub _notify {
235             my ($self, $status) = @_;
236              
237             sqwlog "Advising $self->{QUEUE} queue manager: $status\n";
238             $self->{SESSION}->publish($self->{WQNAME},
239             $self->_status($status));
240             }
241              
242             sub acknowledge {
243             my ($self, $originator) = @_;
244              
245             sqwlog "Acknowledgement to $originator\n";
246             # end-to-end delivery acknowledgement back to the originator
247             $self->{SESSION}->publish($originator,
248             $self->_status('working'));
249             }
250              
251             =item B
252              
253             $worker->terminate;
254              
255             Advises the queue manager that this worker is no longer available for
256             task assignment. This will cause the runloop to exit.
257              
258             Note that this is not automatically called on process termination.
259             This means that the sqm might not realize that the worker is gone
260             until its next automatic internal review cycle in a few seconds.
261             For best messaging performance, it is important to notify the sqm
262             as quickly as possible when a worker aborts.
263              
264             =cut
265              
266             sub terminate {
267             my $self = shift;
268              
269             sqwlog "Terminating $self->{QUEUE}\n";
270             $self->_notify('terminate');
271             $self->{TERMINATED}++;
272             }
273              
274             1;
275              
276             =head1 AUTHOR
277              
278             Jason W. May
279              
280             =head1 COPYRIGHT
281              
282             Copyright (C) 2002 Jason W. May. All rights reserved.
283             This module is free software; you can redistribute it and/or
284             modify it under the same terms as Perl itself.
285              
286             The license for the Spread software can be found at
287             http://www.spread.org/license
288              
289             =head1 SEE ALSO
290              
291             L
292             L
293             L
294              
295             =cut