File Coverage

lib/Messaging/Courier.pm
Criterion Covered Total %
statement 7 9 77.7
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 10 12 83.3


line stmt bran cond sub pod time code
1             package Messaging::Courier;
2              
3 6     6   1009230 use strict;
  6         18  
  6         261  
4 6     6   35 use warnings;
  6         13  
  6         216  
5              
6 6     6   3172 use EO;
  0            
  0            
7             use Spread;
8             use Data::UUID;
9             use Messaging::Courier::Frame;
10             use Messaging::Courier::Config;
11             use Regexp::Common;
12             use Time::HiRes qw(time);
13             use base qw( EO Class::Accessor::Chained );
14             __PACKAGE__->mk_accessors(qw( id mailbox name private_group ));
15              
16             our $VERSION = '0.42';
17              
18             our $DEFAULT_PEER = Messaging::Courier::Config->host || '127.0.0.1';
19             our $DEFAULT_PORT = Messaging::Courier::Config->port || '4803';
20             our $GROUP_NAME = Messaging::Courier::Config->group || 'courier';
21              
22             exception Messaging::Courier::Error::CouldNotJoin;
23             exception Messaging::Courier::Error::CouldNotConnect;
24              
25             sub init {
26             my $self = shift;
27             my $args = { @_ };
28             if ($self->SUPER::init( @_ )) {
29              
30             my $id = Data::UUID->new->create_str;
31             $self->id( $id );
32              
33             my $port = $args->{ Port } || $DEFAULT_PORT;
34             my $host = $args->{ Peer } || $DEFAULT_PEER;
35             my $name = $port . '@' . $host;
36             $self->name($name);
37              
38             $self->_connect();
39             return 1;
40             }
41             return 0;
42             }
43              
44             sub DESTROY {
45             my $self = shift;
46             $self->disconnect;
47             }
48              
49             sub send {
50             my($self, $message) = @_;
51              
52             if(!UNIVERSAL::isa( $message, 'Messaging::Courier::Message')) {
53             throw EO::Error::InvalidParameters
54             text => "message should be a Messaging::Courier::Message object, not $message";
55             }
56              
57             my $frame = $message->frame || Messaging::Courier::Frame->new();
58             $message->frame( $frame );
59             $frame->content( $message );
60             $frame->on_send( $self );
61              
62             my $serialized = $frame->serialize;
63              
64             Spread::multicast(
65             $self->mailbox,
66             AGREED_MESS,
67             $GROUP_NAME,
68             0,
69             $serialized
70             );
71             }
72              
73             sub receive {
74             my $self = shift;
75             my $timeout = shift || 0;
76             my $replying_to = shift;
77              
78             if ($timeout && $timeout !~ /^$RE{num}{real}$/) {
79             throw EO::Error::InvalidParameters
80             text => 'timeout must be a number';
81             }
82              
83             if ($timeout && $timeout < 0) {
84             throw EO::Error::InvalidParameters
85             text => 'timeout must be a positive number';
86             }
87              
88             if ($replying_to && !UNIVERSAL::isa($replying_to,'Messaging::Courier::Message')) {
89             throw EO::Error::InvalidParameters
90             text => 'replying to should be a Messaging::Courier::Message object';
91             }
92              
93             if (!$replying_to) {
94             $self->_receive_simple($timeout);
95             } else {
96             $self->_receive_reply($timeout, $replying_to);
97             }
98              
99             }
100              
101             sub receive_many {
102             my $self = shift;
103             my $timeout = shift || 0;
104             my $replying_to = shift;
105              
106             if ($timeout && $timeout !~ /^$RE{num}{real}$/) {
107             throw EO::Error::InvalidParameters
108             text => 'timeout must be a number';
109             }
110              
111             if ($timeout && $timeout < 0) {
112             throw EO::Error::InvalidParameters
113             text => 'timeout must be a positive number';
114             }
115              
116             if (!UNIVERSAL::isa($replying_to,'Messaging::Courier::Message')) {
117             throw EO::Error::InvalidParameters
118             text => 'replying to should be a Messaging::Courier::Message object';
119             }
120              
121             $self->_receive_reply_many($timeout, $replying_to);
122             }
123              
124             sub _receive_simple {
125             my $self = shift;
126             my $timeout = shift || 0;
127             my $cf;
128              
129             my $then = time;
130              
131             while ( 1 ) {
132             if ($timeout) {
133             my $time_left = $then - time + $timeout;
134             return if $timeout && $time_left < 0.001;
135             }
136              
137             my($service_type, $sender, $groups, $mess_type, $endian, $message);
138              
139             if ($timeout <= 0) {
140             ($service_type, $sender, $groups, $mess_type, $endian, $message) =
141             Spread::receive( $self->mailbox );
142             } else {
143             ($service_type, $sender, $groups, $mess_type, $endian, $message) =
144             Spread::receive( $self->mailbox, $timeout );
145             }
146              
147             if ( $sender ) {
148             eval { $cf = Messaging::Courier::Frame->new_with_frame( $message ) };
149             return $cf->content if not $@;
150             }
151             }
152             }
153              
154             sub _receive_reply {
155             my $self = shift;
156             my $timeout = shift || 0;
157             my $replying_to = shift;
158              
159             my $then = time;
160              
161             while (1) {
162              
163             my $message;
164              
165             if ($timeout) {
166             my $time_left = $then - time + $timeout;
167             return if $timeout && $time_left < 0.001;
168             $message = $self->_receive_simple($time_left);
169             } else {
170             $message = $self->_receive_simple;
171             }
172              
173             next unless $message;
174              
175             my $cf = $message->frame;
176             if ($replying_to->frame && $replying_to->frame->id eq $cf->in_reply_to) {
177             return $message;
178             }
179             }
180             }
181              
182             sub _receive_reply_many {
183             my $self = shift;
184             my $timeout = shift;
185             my $replying_to = shift;
186              
187             my $then = time;
188             my @replies;
189              
190             while (1) {
191             my $time_left = $then - time + $timeout;
192             return @replies if $timeout && $time_left < 0.001;
193              
194             my $message = $self->_receive_simple($time_left);
195             next unless $message;
196              
197             my $cf = $message->frame;
198             if ($replying_to->frame && $replying_to->frame->id eq $cf->in_reply_to) {
199             push @replies, $message;
200             }
201             }
202             }
203              
204             sub ask {
205             my($self, $message, $timeout) = @_;
206             $self->send($message);
207             return $self->receive( $timeout || 0, $message );
208             }
209              
210             sub ask_many {
211             my($self, $message, $timeout) = @_;
212             $self->send($message);
213              
214             if (not defined $timeout) {
215             throw EO::Error::InvalidParameters
216             text => 'Need timeout for ask_many';
217             }
218              
219             return $self->receive_many( $timeout || 0, $message );
220             }
221              
222             sub _connect {
223             my $self = shift;
224             my $id = $self->id;
225             my $name = $self->name;
226              
227             my ($mailbox, $priv_group) = Spread::connect({
228             spread_name => $name,
229             private_name => $id,
230             group_membership => 0,
231             });
232              
233             if (!$mailbox) {
234             throw Messaging::Courier::Error::CouldNotConnect
235             text => 'could not connect to spread daemon';
236             }
237              
238             my $count = Spread::join( $mailbox, $GROUP_NAME );
239              
240             if ($count != 1) {
241             throw Messaging::Courier::Error::CouldNotJoin
242             text => 'could not join courier group';
243             }
244              
245             $self->mailbox( $mailbox );
246             $self->private_group( $priv_group );
247             }
248              
249             sub disconnect {
250             my $self = shift;
251             my $mailbox = $self->mailbox;
252             Spread::disconnect($mailbox) if defined $mailbox;
253             }
254              
255             sub reconnect {
256             my $self = shift;
257             $self->disconnect;
258              
259             # pretend we're a completely different client
260             my $id = Data::UUID->new->create_str;
261             $self->id( $id );
262              
263             $self->_connect;
264             }
265              
266             1;
267              
268              
269             =head1 NAME
270              
271             Messaging::Courier - asynchronous and synchronous access to a message queue.
272              
273             =head1 SYNOPSIS
274              
275             use Messaging::Courier;
276              
277             my $c = Messaging::Courier->new();
278              
279             $m = $c->receive();
280             $c->send($m);
281              
282             $m = $c->ask($m);
283              
284             =head1 DESCRIPTION
285              
286             C is an interface into a message queue. It provides
287             both synchronous and asynchronous access to the queue.
288              
289             The message queue that Courier currently uses is Spread
290             (http://www.spread.org/). This must be installed before using and
291             testing this module. The module assumes that Spread is installed and
292             that the spread daemon is running on the local machine.
293              
294             =head1 INHERITANCE
295              
296             Messaging::Courier inherits from the EO class.
297              
298             =head1 CONSTRUCTOR
299              
300             In addition to the constructor provided by its parent class
301             Messaging::Courier provides the following constructors:
302              
303             =over 4
304              
305             =item new( [Peer => SCALAR [, Port => SCALAR]] )
306              
307             Provides a connection to the default queue. Peer defaults to 127.0.0.1
308             (localhost), and Port defaults to 4803 (the default spread port). By
309             specifiying Peer or Port arguments the queue that Messaging::Courier
310             connects to can be altered.
311              
312             my $c = Messaging::Courier->new();
313              
314             =back
315              
316             All constructors can throw two exceptions additional to those thrown
317             by the default constructor:
318              
319             =over 2
320              
321             =item * Messaging::Courier::Error::CouldNotConnect
322              
323             Thrown when Messaging::Courier cannot connect to the message queue.
324              
325             =item * Messaging::Courier::Error::CouldNotJoin
326              
327             Thrown when Messaging::Courier cannot join the correct group on the message queue.
328              
329             =back
330              
331             =head1 METHODS
332              
333             =over 4
334              
335             =item send( MESSAGE )
336              
337             Sends a message to the queue. The message, provided as MESSAGE should
338             be an object inheriting from Messaging::Courier::Message. Will throw an
339             EO::Error::InvalidParameters exception in the case that MESSAGE is not
340             a Messaging::Courier::Message.
341              
342             $c->receive(1);
343              
344             =item receive( [TIMEOUT[, REPLYING]] )
345              
346             Receives a message from the queue. If called without a TIMEOUT or a
347             TIMEOUT set to zero any call to receive will block. If a timeout
348             is specified receive does not block but returns undef in the case that
349             it does not receive a message. If REPLYING is specified it should be
350             a Messaging::Courier::Message object that you are waiting for a reply to. If this
351             is the case receive will only ever return a message that is in reply
352             to the message specified by REPLYING. The receive method will throw
353             an EO::Error::InvalidParameters object in the case that the parameters
354             sent do not match this specification.
355              
356             my $m = $c->receive(1);
357              
358             =item ask( MESSAGE[, TIMEOUT] )
359              
360             Send and receive a message MESSAGE. C returns a Messaging::Courier::Message
361             object unless a TIMEOUT is specified and no message is received in that
362             period. If this is the case undef is returned instead.
363              
364             my $r = $c->ask($m, 1);
365              
366             =item ask_many( MESSAGE, TIMEOUT )
367              
368             This sends a message MESSAGE. C returns all the messages
369             that are a reply to MESSAGE
370              
371             my @received = $c->ask_many($m);
372              
373             =item id()
374              
375             returns the id of this Messaging::Courier connection. The id is appended to all
376             Frames that are sent on the wire.
377              
378             =item mailbox()
379              
380             returns the mailbox of the connection to spread. This is a spread specific
381             value and should not be relied apon.
382              
383             =item name()
384              
385             returns the name of the connection to Spread
386              
387             =item private_group()
388              
389             returns the private group that this connection to spread is a member of. This
390             is a spread specific value and should not be relied apon.
391              
392             =item reconnect()
393              
394             disconnects and reconnects to Spread. This might be useful if your
395             program sends a lot of messages but does not want to process any.
396              
397             =back
398              
399             =head1 SEE ALSO
400              
401             Messaging::Courier::Message, Messaging::Courier::Frame
402              
403             =head1 AUTHORS
404              
405             James A. Duncan , Leon Brocard
406              
407             =head1 COPYRIGHT
408              
409             Copyright 2003-4 Fotango Ltd. All Rights Reserved.
410              
411             =cut
412              
413