File Coverage

blib/lib/Spread/Queue/Manager.pm
Criterion Covered Total %
statement 10 12 83.3
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 14 16 87.5


line stmt bran cond sub pod time code
1             package Spread::Queue::Manager;
2              
3             =head1 NAME
4              
5             Spread::Queue::Manager - coordinate one-of-many message delivery
6              
7             =head1 SYNOPSIS
8              
9             The provided 'sqm' executable does this:
10              
11             use Spread::Queue::Manager;
12             my $queue_name = shift @ARGV || die "usage: sqm queue-name";
13             my $session = new Spread::Queue::Manager($queue_name);
14             $session->run;
15              
16             =head1 DESCRIPTION
17              
18             The queue manager is responsible for assigning incoming messages
19             (see Spread::Queue::Sender) to registered workers (see Spread::Queue::Worker).
20              
21             When a message comes in, it is assigned to the first available worker,
22             otherwise it is put into a FIFO queue.
23              
24             When a worker reports availability, it is sent the first pending message,
25             otherwise it is put into a FIFO queue.
26              
27             When a message is sent to a worker, the worker should immediately
28             acknowledge receipt. If the worker does not acknowledge, the message
29             will (eventually) be assigned to another worker.
30              
31             If a queue manager is already running (detected via Spread group membership
32             messages), the new sqm should terminate.
33              
34             =head1 METHODS
35              
36             =cut
37              
38             require 5.005_03;
39 1     1   30448 use strict;
  1         5  
  1         55  
40 1     1   7 use vars qw($VERSION);
  1         3  
  1         83  
41             $VERSION = '0.4';
42              
43 1     1   6 use Carp;
  1         8  
  1         91  
44              
45 1     1   2323 use Spread::Session;
  0            
  0            
46             use Spread;
47             use Data::Serializer;
48              
49             use Spread::Queue::ManagedWorker;
50             use Spread::Queue::FIFO;
51              
52             use Log::Channel;
53              
54             BEGIN {
55             my $qmlog = new Log::Channel;
56             sub qmlog { $qmlog->(@_) }
57             }
58              
59             my $DEFAULT_SQM_HEARTBEAT = 3;
60              
61             my %Worker;
62              
63             =item B
64              
65             my $session = new Spread::Queue::Manager($queue_name);
66              
67             Initialize Spread messaging environment, and prepare to act
68             as the queue manager. If queue_name is omitted, environment
69             variable SPREAD_QUEUE will be checked.
70              
71             =cut
72              
73             sub new {
74             my $proto = shift;
75             my $class = ref ($proto) || $proto;
76              
77             my %config = @_;
78             my $self = \%config;
79             bless ($self, $class);
80              
81             $self->{QUEUE} = $ENV{SPREAD_QUEUE} unless $self->{QUEUE};
82             croak "Queue name is required" unless $self->{QUEUE};
83              
84             $self->{WQNAME} = "WQ_$self->{QUEUE}";
85             $self->{MQNAME} = "MQ_$self->{QUEUE}";
86              
87             $self->{MQ} = new Spread::Queue::FIFO($self->{MQNAME});
88             $self->{WQ} = new Spread::Queue::FIFO($self->{WQNAME});
89              
90             $self->{SESSION} = new Spread::Session (
91             MESSAGE_CALLBACK => \&message_callback,
92             ADMIN_CALLBACK => \&admin_callback,
93             TIMEOUT_CALLBACK => \&timeout_callback,
94             );
95             $self->{SESSION}->subscribe($self->{MQNAME});
96             $self->{SESSION}->subscribe($self->{WQNAME});
97              
98             $self->{SERIALIZER} = new Data::Serializer(serializer => 'Data::Denter');
99              
100             $self->{ACTIVE} = 1;
101              
102             $self->initialize_statistics;
103              
104             return $self;
105             }
106              
107             sub initialize_statistics {
108             my $self = shift;
109              
110             $self->{STATISTICS} = {
111             START_TIME => 0,
112             INBOUND_MESSAGES => 0,
113             ADMIN_MESSAGES => 0,
114             MESSAGES_DISPATCHED => 0,
115             MESSAGES_QUEUED => 0,
116             CURRENTLY_QUEUED => 0,
117             GROSS_PENDING_TIME => 0,
118             WORKER_NOTIFICATIONS => 0,
119             WORKER_REGISTRATIONS => 0,
120             WORKER_TERMINATIONS => 0,
121             };
122             }
123              
124             =item B
125              
126             $session->run;
127              
128             Run loop for the queue manager. Does not return unless interrupted.
129              
130             =cut
131              
132             sub run {
133             my ($self) = shift;
134              
135             $self->{STATISTICS}->{START_TIME} = time;
136              
137             my $heartbeat = $ENV{SQM_HEARTBEAT} || $DEFAULT_SQM_HEARTBEAT;
138              
139             while ($self->{ACTIVE}) {
140             $self->{SESSION}->receive($heartbeat, $self);
141             }
142             }
143              
144              
145             sub message_callback {
146             my ($msg, $self) = @_;
147              
148             if (grep { $_ eq $self->{MQNAME} } @{$msg->{GROUPS}}) {
149             $self->handle_message($msg->{SENDER}, $msg->{BODY});
150             } elsif (grep { $_ eq $self->{WQNAME} } @{$msg->{GROUPS}}) {
151             $self->handle_worker($msg->{SENDER}, $msg->{BODY});
152             }
153             }
154              
155              
156             sub handle_message {
157             my ($self, $sender, $message) = @_;
158              
159             $self->handle_admin_command($sender, $message) && return;
160              
161             $self->{STATISTICS}->{INBOUND_MESSAGES}++;
162              
163             $self->_check_worker_queue;
164             my ($available_worker, $pending_time) = $self->{WQ}->dequeue;
165             if ($available_worker) {
166             $self->dispatch($available_worker, {
167             originator => $sender,
168             body => $message
169             });
170             } else {
171             qmlog "ENQUEUE MESSAGE FROM $sender\n";
172             $self->{MQ}->enqueue({
173             originator => $sender,
174             body => $message
175             });
176             $self->{STATISTICS}->{MESSAGES_QUEUED}++;
177             }
178             }
179              
180              
181             sub handle_admin_command {
182             my ($self, $sender, $message) = @_;
183              
184             if ($message eq "^^status") {
185             qmlog "STATUS request from $sender\n";
186              
187             $self->{STATISTICS}->{ADMIN_MESSAGES}++;
188              
189             $self->{SESSION}->publish($sender,
190             $self->snapshot);
191             return 1;
192             }
193             return;
194             }
195              
196              
197             sub handle_worker {
198             my ($self, $sender, $message) = @_;
199              
200             $self->{STATISTICS}->{WORKER_NOTIFICATIONS}++;
201              
202             my $data = $self->{SERIALIZER}->deserialize($message);
203             my $status = $data->{status};
204              
205             my $worker = $Worker{$sender};
206             if (!$worker) {
207             $worker = new Spread::Queue::ManagedWorker($sender);
208             $Worker{$sender} = $worker;
209              
210             $self->{STATISTICS}->{WORKER_REGISTRATIONS}++;
211             }
212              
213             # qmlog "WORKER ", $worker->private, " status change: $status\n";
214              
215             if ($status eq 'ready') {
216             $self->worker_ready($worker);
217             } elsif ($status eq 'working') {
218             $self->worker_working($worker);
219             } elsif ($status eq 'terminate') {
220             $self->worker_terminated($worker);
221             } else {
222             qmlog "**** INVALID STATUS '$status' FROM WORKER $sender ***\n";
223             }
224              
225             $self->_clear_stuck_workers;
226             }
227              
228              
229             sub worker_ready {
230             my ($self, $worker) = @_;
231              
232             delete $worker->{TASK};
233              
234             my ($pending_message, $pending_time) = $self->{MQ}->dequeue;
235             if ($pending_message) {
236             $self->dispatch($worker, $pending_message);
237             $self->{STATISTICS}->{GROSS_PENDING_TIME} += $pending_time;
238             qmlog "PENDING TIME: $pending_time\n";
239             } else {
240             if ($worker->is_ready) {
241             qmlog "WORKER ", $worker->private, " ALREADY PENDING\n";
242             } else {
243             qmlog "WORKER ", $worker->private, " IS PENDING\n";
244             $self->{WQ}->enqueue($worker);
245             }
246             $worker->ready;
247             }
248             }
249              
250              
251             sub worker_working {
252             my ($self, $worker) = @_;
253              
254             if ($worker->is_assigned) {
255             qmlog "WORKER ", $worker->private, " ACKNOWLEDGED\n";
256             $worker->acknowledged;
257             } else {
258             qmlog "WHAT THE HECK IS ", $worker->private, " DOING???\n";
259             }
260             }
261              
262              
263             sub worker_terminated {
264             my ($self, $worker) = @_;
265              
266             $self->_dispose($worker);
267             # $self->_check_worker_queue;
268              
269             $self->{STATISTICS}->{WORKER_TERMINATIONS}++;
270             }
271              
272              
273             sub dispatch {
274             my ($self, $worker, $message) = @_;
275              
276             qmlog "DISPATCH MESSAGE FROM $message->{originator} TO ", $worker->private, "\n";
277              
278             $self->{SESSION}->publish($worker->private,
279             $self->{SERIALIZER}->serialize($message));
280             $worker->{TASK} = $message;
281             $worker->assigned;
282              
283             $self->{STATISTICS}->{MESSAGES_DISPATCHED}++;
284             }
285              
286              
287             sub timeout_callback {
288             my ($self) = shift;
289              
290             # scrub workers from the front of the queue
291             # who haven't signalled readiness lately
292              
293             foreach my $worker ($self->{WQ}->all) {
294             qmlog "\t...worker $worker->{PRIVATE} is $worker->{STATUS}\n";
295             }
296              
297             foreach my $worker ($self->{WQ}->all) {
298             if ($worker->is_talking) {
299             # leader looks OK
300             last;
301             }
302             my $worker = $self->{WQ}->dequeue;
303             $self->_dispose($worker);
304             }
305              
306             $self->_clear_stuck_workers;
307             }
308              
309             sub _check_worker_queue {
310             my ($self) = shift;
311              
312             # scrub workers from the front of the queue
313             # who haven't signalled readiness lately
314              
315             foreach my $worker ($self->{WQ}->all) {
316             if ($worker->is_talking) {
317             # this one is fine
318             return;
319             }
320             my $worker = $self->{WQ}->dequeue;
321             $self->_dispose($worker);
322             }
323             }
324              
325             sub _dispose {
326             my ($self, $worker) = @_;
327              
328             qmlog "WORKER ", $worker->private, " TERMINATED\n";
329              
330             # reassign the task, and retire the worker
331             my $task = $worker->{TASK};
332             if ($task) {
333             qmlog "Reassigning stuck message\n";
334             $self->handle_message($task->{originator},
335             $task->{body});
336             }
337             delete $worker->{TASK};
338             $worker->terminated;
339             }
340              
341             sub _clear_stuck_workers {
342             my $self = shift;
343              
344             foreach my $worker (values %Worker) {
345             if ($worker->is_stuck) {
346             qmlog "WORKER ", $worker->private, " IS STUCK\n";
347             $self->_dispose($worker);
348             }
349             }
350             }
351              
352             # Called for Spread admin messages - in particular, changes in
353             # group membership. There should only be one listener subscribed
354             # to the MQ_ and WQ_ groups for this queue.
355              
356             sub admin_callback {
357             my ($msg, $self) = @_;
358              
359             if ($msg->{SERVICE_TYPE} & REG_MEMB_MESS) {
360             foreach my $group (@{$msg->{GROUPS}}) {
361             if ($group ne $self->{SESSION}->{PRIVATE_GROUP}) {
362             if (!$self->{INCUMBENT}) {
363             carp "Duplicate sqm $group detected for $self->{QUEUE}; aborting";
364             $self->{ACTIVE} = 0;
365             } else {
366             carp "Duplicate sqm $group detected for $self->{QUEUE}; other should abort";
367             }
368             }
369             }
370             $self->{INCUMBENT} = 1;
371             }
372             }
373              
374             sub snapshot {
375             my $self = shift;
376              
377             $self->{STATISTICS}->{RUN_TIME} =
378             time - $self->{STATISTICS}->{START_TIME};
379              
380             $self->{STATISTICS}->{CURRENTLY_QUEUED} =
381             $self->{MQ}->length;
382              
383             return $self->{SERIALIZER}->serialize({
384             type => "status",
385             body => $self->{STATISTICS}
386             });
387             }
388              
389              
390             1;
391              
392              
393             =head1 AUTHOR
394              
395             Jason W. May
396              
397             =head1 COPYRIGHT
398              
399             Copyright (C) 2002 Jason W. May. All rights reserved.
400             This module is free software; you can redistribute it and/or
401             modify it under the same terms as Perl itself.
402              
403             The license for the Spread software can be found at
404             http://www.spread.org/license
405              
406             =head1 SEE ALSO
407              
408             L
409             L
410             L
411             L
412             L
413             L
414              
415             =cut