File Coverage

blib/lib/Spread/Session.pm
Criterion Covered Total %
statement 10 12 83.3
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 14 16 87.5


line stmt bran cond sub pod time code
1             package Spread::Session;
2              
3             =head1 NAME
4              
5             Spread::Session - OO wrapper for Spread messaging toolkit
6              
7             =head1 SYNOPSIS
8              
9             use Spread::Session;
10              
11             my $session = new Spread::Session(
12             MESSAGE_CALLBACK => \&message_callback,
13             ADMIN_CALLBACK => sub {},
14             );
15              
16             $session->subscribe("mygroup");
17             $session->publish("othergroup", $message);
18              
19             $session->receive($timeout, $extra_param);
20              
21             sub message_callback {
22             my ($message_info, $extra_param) = @_;
23             # do something
24             return $message_info->{BODY};
25             }
26              
27             =head1 DESCRIPTION
28              
29             Wrapper module for Spread.pm, providing an object-oriented interface
30             to the Spread messaging toolkit. The existing Spread.pm package is
31             a straightforward functional interface to the Spread C API.
32              
33             A session represents a connection to a Spread messaging daemon. The
34             publish and subscribe functions are for communication via spread groups.
35              
36             Handling of incoming messages is supported via callbacks; the receive()
37             method does not directly return the incoming message parameters to the
38             calling code.
39              
40             =head2 METHODS
41              
42             Most methods check the value of the Spread error code, $sperrno, and
43             will die() if this value is set.
44              
45             =cut
46              
47 5     5   47975 use 5.005;
  5         21  
  5         210  
48 5     5   31 use strict;
  5         9  
  5         217  
49             #use warnings;
50 5     5   29 use Carp;
  5         20  
  5         483  
51 5     5   30969 use Spread;
  0            
  0            
52              
53             use vars qw($VERSION);
54             $VERSION = '0.4';
55              
56             my $DEFAULT_TIMEOUT = 5;
57              
58             BEGIN {
59             # don't make Spread::Session dependent on Log::Channel, but
60             # use it if it's available.
61              
62             my ($sslog, $msglog);
63             if (defined eval { require Log::Channel }) {
64             $sslog = new Log::Channel;
65             $msglog = new Log::Channel("message");
66             } else {
67             $sslog = sub {};
68             $msglog = sub {};
69             }
70              
71             sub sslog { $sslog->(@_) }
72             sub msglog { $msglog->(@_) }
73             }
74              
75              
76             =item B
77              
78             my $session = new Spread::Session(private_name => 'foo',
79             spread_name => '4444@remotenode',
80             #optional MESSAGE_CALLBACK => \&my_msg_callback,
81             #optional ADMIN_CALLBACK => \&my_admin_callback,
82             #optional TIMEOUT_CALLBACK => \&my_timeout_callback,
83             #optional TIMEOUT => 5,
84             );
85              
86             Establish a connection to a Spread messaging daemon at the host and
87             port specified in the 'spread_name' parameter. Default value is
88             4803@localhost.
89              
90             If 'private_name' is not provided, Spread will generate a unique private
91             address based on process id and hostname. If a value is provided for this
92             parameter, you must ensure uniqueness.
93              
94             Provided MESSAGE_CALLBACK and ADMIN_CALLBACK coderefs will be invoked
95             with a reference to a hash containing the components of the incoming
96             message in fields named SERVICE_TYPE, SENDER, GROUPS (arrayref),
97             MESSAGE_TYPE, ENDIAN, and BODY. A reference back to the Spread::Session
98             object is provided in SESSION. Any other parameters provided in the
99             receive() method call will be passed through to the callback as well.
100              
101             The TIMEOUT parameter overrides the built-in 5-second default timeout
102             for the receive() call.
103              
104             =cut
105              
106             sub new {
107             my $proto = shift;
108             my $class = ref ($proto) || $proto;
109              
110             my %config = @_;
111              
112             if (!$config{private_name}) {
113             my @foo = split(/\//, $0);
114             $config{private_name} = sprintf ("%0s%05d",
115             substr($ENV{USER} || pop @foo, 0, 5),
116             $$);
117             }
118             $config{spread_name} = "4803\@localhost" unless $config{spread_name};
119              
120             undef $sperrno;
121             my ($mailbox, $private_group) = Spread::connect(\%config);
122             croak "Spread::connect failed: $sperrno" if $sperrno;
123             sslog "Spread connection established as $private_group\n";
124              
125             my $self = \%config;
126              
127             $self->{MAILBOX} = $mailbox;
128             $self->{PRIVATE_GROUP} = $private_group;
129              
130             $self->{MESSAGE_CALLBACK} ||= \&_message_callback;
131             $self->{ADMIN_CALLBACK} ||= \&_admin_callback;
132             $self->{TIMEOUT_CALLBACK} ||= \&_timeout_callback;
133              
134             $self->{TIMEOUT} ||= $DEFAULT_TIMEOUT;
135              
136             bless $self, $class;
137             return $self;
138             }
139              
140             =item B
141              
142             $session->callbacks(message => \&message_callback,
143             admin => \&admin_callback,
144             timeout => \&timeout_callback);
145              
146             Define application callback functions for regular inbound messages
147             on subscribed groups, administrative messages regarding subscribed
148             groups (e.g. membership events), and timeouts (cf. receive).
149              
150             If no value is provided for any one of these events, a trivial stub is
151             provided by Spread::Session.
152              
153             =cut
154              
155             sub callbacks {
156             my $self = shift;
157              
158             my %callbacks = @_;
159             $self->{OLD_CALLBACKS}->{MESSAGE} = $callbacks{message} if $callbacks{message};
160             $self->{OLD_CALLBACKS}->{ADMIN} = $callbacks{admin} if $callbacks{admin};
161             $self->{OLD_CALLBACKS}->{TIMEOUT} = $callbacks{timeout} if $callbacks{timeout};
162             }
163              
164              
165             =item B
166              
167             $session->subscribe("mygroup", ...);
168              
169             Inform Spread that a copy of any message published to the named group(s)
170             should be dispatched to this process.
171              
172             =cut
173              
174             sub subscribe {
175             my $self = shift;
176              
177             undef $sperrno;
178             foreach my $group (@_) {
179             Spread::join($self->{MAILBOX}, $group);
180             croak "Spread::join failed: $sperrno" if $sperrno;
181              
182             sslog "Joined group $group\n";
183             }
184             }
185              
186             =item B
187              
188             $session->publish("othergroup", $message);
189              
190             Transmit a message to the specified group.
191              
192             $message is assumed to be a string; serialization of other data types
193             is not provided here.
194              
195             =cut
196              
197             sub publish {
198             my $self = shift;
199             my ($group, $message) = @_;
200              
201             undef $sperrno;
202             Spread::multicast($self->{MAILBOX}, # mbox
203             SAFE_MESS, # service_type
204             $group, # groups (just one)
205             0, # message type
206             $message); # message
207             croak "Spread::multicast failed: $sperrno" if $sperrno;
208              
209             msglog "Sent message to $group: ", length $message, " bytes\n";
210             }
211              
212             =item B
213              
214             my $msize = $session->poll;
215              
216             Non-blocking check to see if a message is available on any subscribed
217             group (including this session's private mailbox). Returns the size of
218             the first pending message. A zero indicates no message is pending.
219              
220             =cut
221              
222             sub poll {
223             my $self = shift;
224              
225             undef $sperrno;
226             my $msize = Spread::poll($self->{MAILBOX});
227             croak "Spread::poll failed: $sperrno" if $sperrno;
228              
229             return $msize;
230             }
231              
232             =item B
233              
234             $session->receive($timeout, $args...);
235              
236             Waits for $timeout seconds for a message to arrive on any subscribed
237             group (including this session's private mailbox). If a regular message
238             arrives, it is delivered to the message callback defined above. If a
239             Spread administrative message arrives (e.g. a group membership notification),
240             it is transmitted to any admin callback that has been installed. If no
241             message arrives, the timeout callback is called, if any.
242              
243             Additional optional parameters may be provided to receive(). These will
244             be passed along to the callback routines.
245              
246             =cut
247              
248             sub receive {
249             my $self = shift;
250             # a 0-sec timeout is not the same as undef
251             my $timeout = defined $_[0] ? shift : $self->{TIMEOUT};
252              
253             $sperrno = 0;
254             my ($service_type, $sender, $groups, $message_type, $endian, $message) =
255             Spread::receive($self->{MAILBOX}, $timeout);
256              
257             if ($sperrno == 3) {
258             # timeout
259             if ($self->{OLD_CALLBACKS}->{TIMEOUT}) {
260             return $self->{OLD_CALLBACKS}->{TIMEOUT}->(@_);
261             } else {
262             return $self->{TIMEOUT_CALLBACK}->(@_);
263             }
264             }
265              
266             # any other error from Spread::receive besides timeout is fatal
267             # *** MAKE SURE THIS MAKES SENSE ***
268              
269             croak "Spread::receive failed: $sperrno" if $sperrno;
270              
271             msglog "Received message from $sender: ", length $message, " bytes\n";
272              
273             my %message_container = (
274             SERVICE_TYPE => $service_type,
275             SENDER => $sender,
276             GROUPS => $groups,
277             MESSAGE_TYPE => $message_type,
278             ENDIAN => $endian,
279             SESSION => $self,
280             BODY => $message,
281             );
282              
283             if ($service_type & REGULAR_MESS) {
284             if (defined $self->{OLD_CALLBACKS}->{MESSAGE}) {
285             return $self->{OLD_CALLBACKS}->{MESSAGE}->($sender,
286             $groups,
287             $message,
288             @_);
289             } else {
290             return $self->{MESSAGE_CALLBACK}->(\%message_container,
291             @_);
292             }
293             } else {
294             if (defined $self->{OLD_CALLBACKS}->{ADMIN}) {
295             return $self->{OLD_CALLBACKS}->{ADMIN}->($service_type,
296             $sender,
297             $groups,
298             $message,
299             @_);
300             } else {
301             return $self->{ADMIN_CALLBACK}->(\%message_container,
302             @_);
303             }
304             }
305             }
306              
307             =head2 CALLBACKS
308              
309             sub my_message_callback {
310             my ($sender, $groups, $message, @args) = @_;
311             }
312              
313             sub my_admin_callback {
314             my ($service_type, $sender, $groups, $message, @args) = @_;
315             }
316              
317             sub my_timeout_callback {
318             my (@args) = @_;
319             }
320              
321             Some trivial default callbacks (dump incoming message details to stdout)
322             are provided by Spread::Session. Your application should override all
323             of these.
324              
325             =cut
326              
327             sub _message_callback {
328             my ($container, @args) = @_;
329              
330             sslog "SENDER: $container->{SENDER}\n";
331             sslog "GROUPS: [", join(",", @{$container->{GROUPS}}), "]\n";
332             sslog "MESSAGE TYPE: $container->{MESSAGE_TYPE}\n";
333             sslog "REG_MESSAGE: $container->{BODY}\n\n";
334             }
335              
336              
337             sub _admin_callback {
338             my ($container, @args) = @_;
339              
340             if ($container->{SERVICE_TYPE} & TRANSITION_MESS) {
341             sslog "> Transition message for $container->{SENDER}\n";
342             } elsif ($container->{SERVICE_TYPE} & REG_MEMB_MESS) {
343             sslog ("> New member(s) for $container->{SENDER}: ",
344             join(",", @{$container->{GROUPS}}),
345             "\n");
346             } elsif ($container->{SERVICE_TYPE} & MEMBERSHIP_MESS) {
347             sslog ("> Self-leave message for $container->{SENDER}:",
348             join(",", @{$container->{GROUPS}}),
349             "\n");
350             }
351             }
352              
353              
354             sub _timeout_callback {
355             # my @args = @_;
356             # print "...timeout!\n";
357             }
358              
359              
360             =item B
361              
362             my $sperrno = $session->err;
363              
364             Retrieve the value of the current Spread error, if any.
365              
366             =cut
367              
368             sub err {
369             return $sperrno;
370             }
371              
372              
373             DESTROY {
374             my $self = shift;
375             Spread::disconnect($self->{MAILBOX});
376             sslog "Spread session $self->{PRIVATE_GROUP} disconnected\n";
377             }
378              
379             1;
380              
381              
382             =head1 AUTHOR
383              
384             Jason W. May
385              
386             Joshua Goodall maintains the FreeBSD package for
387             this module.
388              
389             =head1 COPYRIGHT
390              
391             Copyright (C) 2002 Jason W. May. All rights reserved.
392             This module is free software; you can redistribute it and/or
393             modify it under the same terms as Perl itself.
394              
395             The license for the Spread software can be found at
396             http://www.spread.org/license
397              
398             =head1 SEE ALSO
399              
400             L
401             L
402              
403             =cut