line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Spread::Queue::Worker; |
2
|
|
|
|
|
|
|
|
3
|
|
|
|
|
|
|
=head1 NAME |
4
|
|
|
|
|
|
|
|
5
|
|
|
|
|
|
|
Spread::Queue::Worker - accept Spread::Queue message assignments |
6
|
|
|
|
|
|
|
|
7
|
|
|
|
|
|
|
=head1 SYNOPSIS |
8
|
|
|
|
|
|
|
|
9
|
|
|
|
|
|
|
use Spread::Queue::Worker; |
10
|
|
|
|
|
|
|
|
11
|
|
|
|
|
|
|
my $worker = new Spread::Queue::Worker(QUEUE => "myqueue", |
12
|
|
|
|
|
|
|
CALLBACK => \&mycallback, |
13
|
|
|
|
|
|
|
); |
14
|
|
|
|
|
|
|
$worker->run; |
15
|
|
|
|
|
|
|
|
16
|
|
|
|
|
|
|
sub mycallback { |
17
|
|
|
|
|
|
|
my ($worker, $originator, $input) = @_; |
18
|
|
|
|
|
|
|
|
19
|
|
|
|
|
|
|
my $result = { |
20
|
|
|
|
|
|
|
response => "I heard you!", |
21
|
|
|
|
|
|
|
}; |
22
|
|
|
|
|
|
|
$worker->respond($originator, $result); |
23
|
|
|
|
|
|
|
} |
24
|
|
|
|
|
|
|
|
25
|
|
|
|
|
|
|
=head1 DESCRIPTION |
26
|
|
|
|
|
|
|
|
27
|
|
|
|
|
|
|
A process that declares itself to be a Spread::Queue::Worker will be |
28
|
|
|
|
|
|
|
assigned messages in FIFO fashion by the sqm queue manager. |
29
|
|
|
|
|
|
|
|
30
|
|
|
|
|
|
|
Messages as supported by Spread::Queue are serialized Perl hashes. |
31
|
|
|
|
|
|
|
Spread::Queue does not enforce structure on message contents. |
32
|
|
|
|
|
|
|
|
33
|
|
|
|
|
|
|
A running sqm for the queue is required before any messages will |
34
|
|
|
|
|
|
|
be routed to the worker. Worker will not terminate if sqm is not |
35
|
|
|
|
|
|
|
running, or if it goes away. If the sqm terminates and restarts, |
36
|
|
|
|
|
|
|
it will reacquire any running workers (via heartbeat status signals). |
37
|
|
|
|
|
|
|
|
38
|
|
|
|
|
|
|
=head1 METHODS |
39
|
|
|
|
|
|
|
|
40
|
|
|
|
|
|
|
=cut |
41
|
|
|
|
|
|
|
|
42
|
|
|
|
|
|
|
require 5.005_03; |
43
|
6
|
|
|
6
|
|
950416
|
use strict; |
|
6
|
|
|
|
|
12
|
|
|
6
|
|
|
|
|
223
|
|
44
|
6
|
|
|
6
|
|
31
|
use vars qw($VERSION); |
|
6
|
|
|
|
|
12
|
|
|
6
|
|
|
|
|
297
|
|
45
|
|
|
|
|
|
|
$VERSION = '0.4'; |
46
|
|
|
|
|
|
|
|
47
|
6
|
|
|
6
|
|
13156
|
use Spread::Session; |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
48
|
|
|
|
|
|
|
use Data::Serializer; |
49
|
|
|
|
|
|
|
use Carp; |
50
|
|
|
|
|
|
|
use Log::Channel; |
51
|
|
|
|
|
|
|
|
52
|
|
|
|
|
|
|
my $DEFAULT_HEARTBEAT = 2; |
53
|
|
|
|
|
|
|
|
54
|
|
|
|
|
|
|
BEGIN { |
55
|
|
|
|
|
|
|
my $sqwlog = new Log::Channel; |
56
|
|
|
|
|
|
|
sub sqwlog { $sqwlog->(@_) } |
57
|
|
|
|
|
|
|
} |
58
|
|
|
|
|
|
|
|
59
|
|
|
|
|
|
|
=item B |
60
|
|
|
|
|
|
|
|
61
|
|
|
|
|
|
|
my $worker = new Spread::Queue::Worker("myqueue"); |
62
|
|
|
|
|
|
|
|
63
|
|
|
|
|
|
|
Establish link to Spread messaging environment, and prepare to receive |
64
|
|
|
|
|
|
|
messages on specific queue. Queue name will be obtained from |
65
|
|
|
|
|
|
|
SPREAD_QUEUE environment variable if not provided here. |
66
|
|
|
|
|
|
|
|
67
|
|
|
|
|
|
|
=cut |
68
|
|
|
|
|
|
|
|
69
|
|
|
|
|
|
|
sub new { |
70
|
|
|
|
|
|
|
my $proto = shift; |
71
|
|
|
|
|
|
|
my $class = ref ($proto) || $proto; |
72
|
|
|
|
|
|
|
|
73
|
|
|
|
|
|
|
my %config = @_; |
74
|
|
|
|
|
|
|
my $self = \%config; |
75
|
|
|
|
|
|
|
bless ($self, $class); |
76
|
|
|
|
|
|
|
|
77
|
|
|
|
|
|
|
$self->{QUEUE} = $ENV{SPREAD_QUEUE} unless $self->{QUEUE}; |
78
|
|
|
|
|
|
|
croak "Queue name is required" unless $self->{QUEUE}; |
79
|
|
|
|
|
|
|
croak "Callback function is required" unless $self->{CALLBACK}; |
80
|
|
|
|
|
|
|
|
81
|
|
|
|
|
|
|
$self->{HEARTBEAT} = $DEFAULT_HEARTBEAT unless $self->{HEARTBEAT}; |
82
|
|
|
|
|
|
|
|
83
|
|
|
|
|
|
|
$self->{WQNAME} = "WQ_$self->{QUEUE}"; |
84
|
|
|
|
|
|
|
my $session = new Spread::Session ( |
85
|
|
|
|
|
|
|
MESSAGE_CALLBACK => \&_message_callback, |
86
|
|
|
|
|
|
|
TIMEOUT_CALLBACK => \&_timeout_callback, |
87
|
|
|
|
|
|
|
); |
88
|
|
|
|
|
|
|
$self->{SESSION} = $session; |
89
|
|
|
|
|
|
|
$self->{SERIALIZER} = new Data::Serializer; |
90
|
|
|
|
|
|
|
|
91
|
|
|
|
|
|
|
sqwlog "Message queue worker activated on $self->{QUEUE}\n"; |
92
|
|
|
|
|
|
|
|
93
|
|
|
|
|
|
|
$self->{STATUS} = 'ready'; |
94
|
|
|
|
|
|
|
$self->{METRICS} = { |
95
|
|
|
|
|
|
|
start_time => time, |
96
|
|
|
|
|
|
|
num_messages => 0, |
97
|
|
|
|
|
|
|
}; |
98
|
|
|
|
|
|
|
return $self; |
99
|
|
|
|
|
|
|
} |
100
|
|
|
|
|
|
|
|
101
|
|
|
|
|
|
|
=item B |
102
|
|
|
|
|
|
|
|
103
|
|
|
|
|
|
|
$worker->run; |
104
|
|
|
|
|
|
|
|
105
|
|
|
|
|
|
|
Main loop for queue processing. Each incoming message will trigger a |
106
|
|
|
|
|
|
|
call to the user-specified callback function. |
107
|
|
|
|
|
|
|
|
108
|
|
|
|
|
|
|
The loop will exit when $worker->terminate is called. |
109
|
|
|
|
|
|
|
|
110
|
|
|
|
|
|
|
=cut |
111
|
|
|
|
|
|
|
|
112
|
|
|
|
|
|
|
sub run { |
113
|
|
|
|
|
|
|
my ($self) = shift; |
114
|
|
|
|
|
|
|
|
115
|
|
|
|
|
|
|
$self->_timeout_callback; |
116
|
|
|
|
|
|
|
|
117
|
|
|
|
|
|
|
for (;;) { |
118
|
|
|
|
|
|
|
$self->{SESSION}->receive($self->{HEARTBEAT}, $self); |
119
|
|
|
|
|
|
|
|
120
|
|
|
|
|
|
|
last if $self->{TERMINATED}; |
121
|
|
|
|
|
|
|
} |
122
|
|
|
|
|
|
|
} |
123
|
|
|
|
|
|
|
|
124
|
|
|
|
|
|
|
=item B |
125
|
|
|
|
|
|
|
|
126
|
|
|
|
|
|
|
use Event; |
127
|
|
|
|
|
|
|
$worker->setup_Event; |
128
|
|
|
|
|
|
|
Event::loop; |
129
|
|
|
|
|
|
|
|
130
|
|
|
|
|
|
|
Configure Event.pm callback for processing incoming messages. |
131
|
|
|
|
|
|
|
$worker->terminate is still recommended in this configuration, to |
132
|
|
|
|
|
|
|
advise the queue manager to no longer assign tasks to this worker. |
133
|
|
|
|
|
|
|
|
134
|
|
|
|
|
|
|
=cut |
135
|
|
|
|
|
|
|
|
136
|
|
|
|
|
|
|
sub setup_Event { |
137
|
|
|
|
|
|
|
my ($self) = shift; |
138
|
|
|
|
|
|
|
|
139
|
|
|
|
|
|
|
$self->{IS_EVENT} = 1; |
140
|
|
|
|
|
|
|
Event->io(fd => $self->{SESSION}->{MAILBOX}, |
141
|
|
|
|
|
|
|
cb => sub { $self->{SESSION}->receive(0, $self) }, |
142
|
|
|
|
|
|
|
); |
143
|
|
|
|
|
|
|
$self->{EVENT_TIMER} = Event->timer(interval => $self->{HEARTBEAT}, |
144
|
|
|
|
|
|
|
cb => sub { $self->_timeout_callback }, |
145
|
|
|
|
|
|
|
); |
146
|
|
|
|
|
|
|
} |
147
|
|
|
|
|
|
|
|
148
|
|
|
|
|
|
|
sub _message_callback { |
149
|
|
|
|
|
|
|
my ($msg, $self) = @_; |
150
|
|
|
|
|
|
|
|
151
|
|
|
|
|
|
|
$self->{STATUS} = 'busy'; |
152
|
|
|
|
|
|
|
|
153
|
|
|
|
|
|
|
if ($self->{EVENT_TIMER}) { |
154
|
|
|
|
|
|
|
$self->{EVENT_TIMER}->cancel |
155
|
|
|
|
|
|
|
} |
156
|
|
|
|
|
|
|
|
157
|
|
|
|
|
|
|
# set status with the queue manager |
158
|
|
|
|
|
|
|
$self->_notify('working'); |
159
|
|
|
|
|
|
|
|
160
|
|
|
|
|
|
|
my $content = $self->{SERIALIZER}->deserialize($msg->{BODY}); |
161
|
|
|
|
|
|
|
|
162
|
|
|
|
|
|
|
my $body = $self->{SERIALIZER}->deserialize($content->{body}); |
163
|
|
|
|
|
|
|
|
164
|
|
|
|
|
|
|
$self->{METRICS}->{num_messages}++; |
165
|
|
|
|
|
|
|
|
166
|
|
|
|
|
|
|
$self->{SESSION}->publish($content->{originator}, |
167
|
|
|
|
|
|
|
$self->{SERIALIZER}->serialize({ |
168
|
|
|
|
|
|
|
type => "ack", |
169
|
|
|
|
|
|
|
})); |
170
|
|
|
|
|
|
|
|
171
|
|
|
|
|
|
|
# use eval so the loop doesn't die if there's bad code |
172
|
|
|
|
|
|
|
eval { |
173
|
|
|
|
|
|
|
$self->{CALLBACK}->($self, |
174
|
|
|
|
|
|
|
$content->{originator}, |
175
|
|
|
|
|
|
|
$body); |
176
|
|
|
|
|
|
|
}; |
177
|
|
|
|
|
|
|
if ($@) { |
178
|
|
|
|
|
|
|
# @@@@ may want some more sophisticated handling here. |
179
|
|
|
|
|
|
|
carp $@; |
180
|
|
|
|
|
|
|
} |
181
|
|
|
|
|
|
|
# ready for next task |
182
|
|
|
|
|
|
|
$self->{STATUS} = 'ready'; |
183
|
|
|
|
|
|
|
$self->_notify('ready'); |
184
|
|
|
|
|
|
|
|
185
|
|
|
|
|
|
|
if ($self->{EVENT_TIMER}) { |
186
|
|
|
|
|
|
|
$self->{EVENT_TIMER} = Event->timer(interval => $self->{HEARTBEAT}, |
187
|
|
|
|
|
|
|
cb => sub { $self->_timeout_callback }, |
188
|
|
|
|
|
|
|
); |
189
|
|
|
|
|
|
|
} |
190
|
|
|
|
|
|
|
} |
191
|
|
|
|
|
|
|
|
192
|
|
|
|
|
|
|
sub _timeout_callback { |
193
|
|
|
|
|
|
|
my ($self) = @_; |
194
|
|
|
|
|
|
|
|
195
|
|
|
|
|
|
|
# sqwlog "TIMEOUT\n"; |
196
|
|
|
|
|
|
|
|
197
|
|
|
|
|
|
|
if ($self->{STATUS} eq 'ready') { |
198
|
|
|
|
|
|
|
# ping the sqm so it knows we're available |
199
|
|
|
|
|
|
|
$self->_notify('ready'); |
200
|
|
|
|
|
|
|
} |
201
|
|
|
|
|
|
|
# return if $self->{TERMINATED}; |
202
|
|
|
|
|
|
|
} |
203
|
|
|
|
|
|
|
|
204
|
|
|
|
|
|
|
=item B |
205
|
|
|
|
|
|
|
|
206
|
|
|
|
|
|
|
$worker->respond($originator, $result); |
207
|
|
|
|
|
|
|
|
208
|
|
|
|
|
|
|
If the worker wants to send a reply back to the originator of the |
209
|
|
|
|
|
|
|
request (e.g. in a request-reply environment). $originator is the |
210
|
|
|
|
|
|
|
Spread private mailbox address sent to the callback function. |
211
|
|
|
|
|
|
|
$result is a reference to a Perl hash. |
212
|
|
|
|
|
|
|
|
213
|
|
|
|
|
|
|
=cut |
214
|
|
|
|
|
|
|
|
215
|
|
|
|
|
|
|
sub respond { |
216
|
|
|
|
|
|
|
my ($self, $originator, $payload) = @_; |
217
|
|
|
|
|
|
|
|
218
|
|
|
|
|
|
|
sqwlog "Responding to $originator\n"; |
219
|
|
|
|
|
|
|
$self->{SESSION}->publish($originator, |
220
|
|
|
|
|
|
|
$self->{SERIALIZER}->serialize({ |
221
|
|
|
|
|
|
|
type => "response", |
222
|
|
|
|
|
|
|
body => $payload |
223
|
|
|
|
|
|
|
})); |
224
|
|
|
|
|
|
|
# $self->_notify('ready'); |
225
|
|
|
|
|
|
|
} |
226
|
|
|
|
|
|
|
|
227
|
|
|
|
|
|
|
|
228
|
|
|
|
|
|
|
sub _status { |
229
|
|
|
|
|
|
|
my ($self, $status) = @_; |
230
|
|
|
|
|
|
|
|
231
|
|
|
|
|
|
|
return $self->{SERIALIZER}->serialize({ status => $status }); |
232
|
|
|
|
|
|
|
} |
233
|
|
|
|
|
|
|
|
234
|
|
|
|
|
|
|
sub _notify { |
235
|
|
|
|
|
|
|
my ($self, $status) = @_; |
236
|
|
|
|
|
|
|
|
237
|
|
|
|
|
|
|
sqwlog "Advising $self->{QUEUE} queue manager: $status\n"; |
238
|
|
|
|
|
|
|
$self->{SESSION}->publish($self->{WQNAME}, |
239
|
|
|
|
|
|
|
$self->_status($status)); |
240
|
|
|
|
|
|
|
} |
241
|
|
|
|
|
|
|
|
242
|
|
|
|
|
|
|
sub acknowledge { |
243
|
|
|
|
|
|
|
my ($self, $originator) = @_; |
244
|
|
|
|
|
|
|
|
245
|
|
|
|
|
|
|
sqwlog "Acknowledgement to $originator\n"; |
246
|
|
|
|
|
|
|
# end-to-end delivery acknowledgement back to the originator |
247
|
|
|
|
|
|
|
$self->{SESSION}->publish($originator, |
248
|
|
|
|
|
|
|
$self->_status('working')); |
249
|
|
|
|
|
|
|
} |
250
|
|
|
|
|
|
|
|
251
|
|
|
|
|
|
|
=item B |
252
|
|
|
|
|
|
|
|
253
|
|
|
|
|
|
|
$worker->terminate; |
254
|
|
|
|
|
|
|
|
255
|
|
|
|
|
|
|
Advises the queue manager that this worker is no longer available for |
256
|
|
|
|
|
|
|
task assignment. This will cause the runloop to exit. |
257
|
|
|
|
|
|
|
|
258
|
|
|
|
|
|
|
Note that this is not automatically called on process termination. |
259
|
|
|
|
|
|
|
This means that the sqm might not realize that the worker is gone |
260
|
|
|
|
|
|
|
until its next automatic internal review cycle in a few seconds. |
261
|
|
|
|
|
|
|
For best messaging performance, it is important to notify the sqm |
262
|
|
|
|
|
|
|
as quickly as possible when a worker aborts. |
263
|
|
|
|
|
|
|
|
264
|
|
|
|
|
|
|
=cut |
265
|
|
|
|
|
|
|
|
266
|
|
|
|
|
|
|
sub terminate { |
267
|
|
|
|
|
|
|
my $self = shift; |
268
|
|
|
|
|
|
|
|
269
|
|
|
|
|
|
|
sqwlog "Terminating $self->{QUEUE}\n"; |
270
|
|
|
|
|
|
|
$self->_notify('terminate'); |
271
|
|
|
|
|
|
|
$self->{TERMINATED}++; |
272
|
|
|
|
|
|
|
} |
273
|
|
|
|
|
|
|
|
274
|
|
|
|
|
|
|
1; |
275
|
|
|
|
|
|
|
|
276
|
|
|
|
|
|
|
=head1 AUTHOR |
277
|
|
|
|
|
|
|
|
278
|
|
|
|
|
|
|
Jason W. May |
279
|
|
|
|
|
|
|
|
280
|
|
|
|
|
|
|
=head1 COPYRIGHT |
281
|
|
|
|
|
|
|
|
282
|
|
|
|
|
|
|
Copyright (C) 2002 Jason W. May. All rights reserved. |
283
|
|
|
|
|
|
|
This module is free software; you can redistribute it and/or |
284
|
|
|
|
|
|
|
modify it under the same terms as Perl itself. |
285
|
|
|
|
|
|
|
|
286
|
|
|
|
|
|
|
The license for the Spread software can be found at |
287
|
|
|
|
|
|
|
http://www.spread.org/license |
288
|
|
|
|
|
|
|
|
289
|
|
|
|
|
|
|
=head1 SEE ALSO |
290
|
|
|
|
|
|
|
|
291
|
|
|
|
|
|
|
L |
292
|
|
|
|
|
|
|
L |
293
|
|
|
|
|
|
|
L |
294
|
|
|
|
|
|
|
|
295
|
|
|
|
|
|
|
=cut |