File Coverage

blib/lib/Spread/Message.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             #!/usr/bin/perl
2              
3             #BEGIN { $Exporter::Verbose=1 }
4              
5             package Spread::Message;
6             our $VERSION = 0.21;
7 1     1   8652 use Spread qw(:SP :ERROR :MESS);
  0            
  0            
8             use Data::Dumper;
9             use Carp qw/cluck/;
10             use Sys::Hostname;
11              
12             use strict;
13             use constant REJECT_MESS => 0x00400000;
14              
15             sub logit (@);
16             our($Program_Name, $Command);
17             $Command = "$0 @ARGV";
18             @_ = split(/\/+/, $0);
19             $Program_Name = pop(@_);
20              
21             =head1 NAME
22              
23             Spread::Message - provide messaging using the Spread toolkit
24              
25             This product uses software developed by Spread Concepts LLC for use in
26             the Spread toolkit. For more information about Spread see
27             http://www.spread.org
28              
29             =head1 SYNOPSIS
30              
31             use Spread::Message;
32              
33             my $mbox = Message->new(
34             spread_name => '4803@host',
35             name => "down$$",
36             group => ['devices-down'],
37             #debug => 1,
38             member_sub => \&process_control,
39             message_sub => \&process_data,
40             timeout_sub => \&heartbeat,
41             );
42              
43             sub process_control
44             {
45             my $mbox = shift;
46             my $loop = shift;
47             # Process membership messages here. See examples
48             }
49              
50             sub process_data
51             {
52             my $mbox = shift;
53             my $loop = shift;
54             # Process the data here. See examples
55             }
56              
57             sub heartbeat
58             {
59             my $mbox = shift;
60             my $loop = shift;
61             # Process any timeouts here. See examples
62             }
63              
64             while(1)
65             {
66             # Process different data as required
67             $mbox->rx(10,$loop);
68             $loop++;
69              
70             # Extra processing of side effects created by the callbacks
71             }
72             $mbox->disconnect();
73              
74             Other possibilites are:
75              
76             # Connection
77             $mbox->connect;
78             $mbox->disconnect;
79              
80             # Config
81             $mbox->configure(%config);
82             $spread_daemon = $mbox->spread_name;
83             $mbox->spread_name('3480@1.1.1.1');
84             $seed_name = $mbox->name;
85             $mbox->name('test');
86             $rv = $mbox->debug();
87             $mbox->debug(1);
88              
89             # tx/rx messages
90             $mbox->send(@grps,$msg);
91             $mbox->sends(@grps,\%perlhash);
92             $hashref = $mbox->decode;
93             $msg_size = $mbox->poll;
94             $mbox->rx($timeout,@args);
95             $regular_msg = $mbox->get;
96             $msg = $mbox->getmsg($timeout);
97              
98             # Object/Message details
99             $spread = $mbox->mbox;
100             @grps = $mbox->grps;
101             $sent_by = $mbox->sender;
102             $service_type = $mbox->type;
103             $message_type = $mbox->mess_type;
104             $same_endian = $mbox->endian;
105             $last_message = $mbox->msg;
106             $last_hashref = $mbox->command;
107             $is_new_message = $mbox->new_msg;
108             $time_last_received = $mbox->tm;
109             $timed_out = $mbox->timeout;
110             $mysperrorno = $mbox->error;
111             $whoami = $mbox->me;
112              
113             # Test message
114             $mbox->control_msg;
115             $mbox->aimed_at_me;
116             $mbox->Is_unreliable_mess;
117             $mbox->Is_reliable_mess;
118             $mbox->Is_fifo_mess;
119             $mbox->Is_causal_mess;
120             $mbox->Is_agreed_mess;
121             $mbox->Is_safe_mess;
122             $mbox->Is_regular_mess;
123             $mbox->Is_self_discard;
124             $mbox->Is_reg_memb_mess;
125             $mbox->Is_transition_mess;
126             $mbox->Is_caused_join_mess;
127             $mbox->Is_caused_leave_mess;
128             $mbox->Is_caused_disconnect_mess;
129             $mbox->Is_caused_network_mess;
130             $mbox->Is_membership_mess;
131             $mbox->Is_reject_mess;
132             $mbox->Is_self_leave;
133              
134             # Supplied Callbacks
135             $mbox->_member_sub
136             $mbox->_message_sub
137             $mbox->_error_sub
138             $mbox->_timeout_sub
139             $mbox->handle_commands_aimed_at_me
140              
141             =head1 DESCRIPTION
142              
143             The Spread package provides a simple wrapper around the spread toolkit.
144             We try to provide a much higher level wrapper. By providing:
145              
146             - Simple methods to send serialised Perl structures between programs
147             - Callback registration
148             - Extensible callbacks for command driven programs
149             - Lots of accesor functions
150             - Handling of incoming messages is supported via callbacks or
151             via direct polling for input. Its your choice :-)
152              
153             =head1 OBJECT CONFIGURATION
154              
155              
156             group => is an array ref of groups to subscribe to
157             debug => is a scalar variable the effects debugging output
158             name => is a scalar variable that defines a Spread name. Must
159             be uniq.
160              
161             The following are the names of the callback config variables. Each
162             must be a CODE reference.
163              
164             # These provide message gathering callbacks defined on the type of
165             # message received.
166             member_sub => subroutine to handle membership messages.
167             message_sub => subroutine to hanlde normal data messages
168             error_sub => gets called when ever we find an error of some kind
169             timeout_sub => called in the event of any timeout.
170              
171             # If defined then this installs handle_commands_aimed_at_me() as the
172             # call back for each of the above and allows you to override bits and
173             # pieces. See CALLBACKS below
174             commands => {
175             'default' => subroutine to handle ALL default message
176             'new' => subroutine to handle 'new' command
177             .
178             .
179             }
180              
181              
182             =head1 METHODS
183              
184              
185             =cut
186              
187              
188             =head2 B
189              
190             Create a new object and get it configured.
191              
192             my $mbox = Spread::Message->new(
193             name => $name,
194             spread_name => '4803@localhost',
195             group => ['polling-ctl', 'polling-data'],
196             member_sub => \&my_memeber_callback,
197             message_sub => \&my_message_callback,
198             error_sub => \&my_error_callback,
199             timeout_sub => \&my_timeout_callback,
200             debug => 1,
201             );
202             die "Can't create a new message object" unless $mbox;
203              
204             or
205              
206             my $mbox = Spread::Message->new(
207             name => $name,
208             spread_name => '4803@localhost',
209             group => ['polling-ctl', 'polling-data'],
210             debug => 1,
211             commands => {
212             'default' => \&myhandlecommands;
213             },
214             );
215             die "Can't create a new message object" unless $mbox;
216              
217             =cut
218              
219             sub new
220             {
221             my $invocant = shift;
222             my $class = ref($invocant) || $invocant; # Object or class name
223              
224             my $self = {};
225             bless($self, $class);
226              
227             $self->configure(@_);
228             warn "$class new called\n" if($self->{'DEBUG'});
229             return $self;
230             }
231              
232             # B used by sends()
233             #
234             #Takes a Perl variable (normally a hash reference) and returns a textual
235             #description of it. It uses Data::Dumper and is therfore constrained to
236             #its methods. We call serialise when we wish to send a Perl structure to
237             #another program and use B to rebirth the structure.
238             #
239             sub serialise
240             {
241             my $self = shift;
242             return undef unless $self->{'mbox'};
243              
244             my $hashref = shift;
245             my $data = Data::Dumper->new([$hashref],['msg']);
246             $data->Indent(0);
247             return $data->Dump;
248             }
249              
250              
251             =head2 B
252              
253             Configure an object before getting connected. You can change the
254             configuration of an object at anytime. But make sure you disconnect and
255             then B again afterwards. The B method calls configure for
256             you in the right order. So, normally you wont want to call this method.
257              
258             However, you may want to, so here is what you can do.
259              
260              
261             my $mbox = Spread::Message->new(
262             name => $name,
263             spread_name => '4803@localhost',
264             group => ['polling-ctl', 'polling-data'],
265             member_sub => \&my_memeber_callback,
266             message_sub => \&my_message_callback,
267             error_sub => \&my_error_callback,
268             timeout_sub => \&my_timeout_callback,
269             debug => 0,
270             );
271             die "Can't create a new message object" unless $mbox;
272              
273             # stuff happens
274              
275             # Here we change the membership message call back at run time
276             $mbox->configure(member_sub => \&new_callback);
277              
278             # more stuff happens and we eventually disconnect and reconnect
279             # to a different spread daemon. Same groups and call backs
280             $mbox->disconnect();
281             $mbox->configure( spread_name => '4803@newhost' );
282             $mbox->connect() || warn "Failed to attach to 4803@newhost";
283              
284             # Change the debugging on the fly
285             $mbox->configure( debug => 1 );
286             $mbox->debug(1);
287              
288             Configure defaults to:
289              
290             name => "pid$$"
291             group => ['info']
292             debug => 0
293             spread_name => '4803@localhost
294             member_sub => sub { print something useful };
295             message_sub => sub { print something useful };
296             error_sub => sub { print something useful };
297             timeout_sub => sub { print something useful };
298              
299             You dont have to have callbacks defined. You can still use B and
300             B to collect messages. Callbacks are only used when B is
301             called.
302              
303             If you intend to use callbacks and B then consider configuring
304             your own command callbacks that will get triggered when a particular
305             command is recieved.
306              
307             my $mbox = Spread::Message->new(
308             name => $name,
309             spread_name => '4803@localhost',
310             group => ['polling-ctl', 'polling-data'],
311             commands {
312             'default' => \&mysub,
313             },
314             debug => 0,
315             );
316             die "Can't create a new message object" unless $mbox;
317              
318             # stuff happens
319              
320             # Here we change the command control back to the bundled
321             # handle_commands_aimed_at_me sub.
322             $mbox->configure(
323             commands => {
324             'override' => \&Spread::Message::handle_commands_aimed_at_me
325             }
326             );
327              
328             =cut
329              
330             sub configure
331             {
332             my $self = shift;
333             my %config = @_;
334              
335             my @array_ref = qw/group logto/;
336             my @scalar = qw/debug name spread_name/;
337             my @sub = qw/member_sub message_sub error_sub timeout_sub/;
338             my @hash = qw/commands/;
339              
340             # Configure subroutine callbacks
341             foreach (@sub)
342             {
343             if( defined $config{$_} && ref($config{$_}) ne 'CODE')
344             {
345             warn "config variable $_ should be a code reference. Skipping\n";
346             next;
347             }
348              
349             # Assign new call back if defined
350             if(defined $config{$_})
351             {
352             $self->{$_} = $config{$_};
353             }
354             else # default to null sub unless one there already
355             {
356             unless(defined $self->{$_})
357             {
358             # Create a symbolic reference to each named sub and
359             # assign it as a default.
360             no strict 'refs';
361             my $sub = "_".$_;
362             $self->{$_} = \&$sub;
363             }
364             }
365             }
366              
367             # Configure commands. Note: callbacks get overriden here!
368             if(defined $config{'commands'})
369             {
370             if( defined $config{'commands'} && ref($config{'commands'}) ne 'HASH')
371             {
372             warn "config variable $_ should be a hash reference. Skipping\n";
373             next;
374             }
375              
376             my $hashref = $config{'commands'};
377             # Assign new call back if defined
378             for my $cmd (keys %$hashref)
379             {
380             if(ref($hashref->{$cmd}) eq 'CODE')
381             {
382             $self->{'commands'}{$cmd} = $hashref->{$cmd};
383             }
384             else # default to null sub unless one there already
385             {
386             warn "commands hash key: $cmd needs a ref to CODE";
387             }
388             }
389             foreach (@sub)
390             {
391             $self->{$_} = \&handle_commands_aimed_at_me;
392             }
393             }
394              
395             # Configure array reference variables
396             foreach (@array_ref)
397             {
398             if( defined $config{$_} && ref($config{$_}) ne 'ARRAY')
399             {
400             warn "config variable $_ should be an array reference. Skipping\n";
401             next;
402             }
403             $self->{$_} = $config{$_} if defined $config{$_};
404              
405             # Make sure array reference variables have reasonable vaules
406             $self->{$_} = [] unless defined $self->{$_};
407             }
408              
409             # Configure scalar variables
410             foreach (@scalar)
411             {
412             if( defined $config{$_} && ref($config{$_}))
413             {
414             warn "config variable $_ shouldn't be a reference. Skipping\n";
415             next;
416             }
417             $self->{$_} = $config{$_} if defined $config{$_};
418             }
419              
420             # Some reasonable defaults
421             $self->{'name'} = "pid$$" unless defined $self->{'name'};
422             $self->{'debug'} = 0 unless defined $self->{'debug'};
423             $self->{'group'} = ['info'] unless defined $self->{'group'};
424             $self->{'logto'} = ['info'] unless defined $self->{'logto'};
425             $self->{'mbox'} = 0;
426              
427             # Need a daemon to connect to.
428             $self->{'spread_name'} = '4803@localhost' unless defined $self->{'spread_name'};
429             # Do a simple test on configuration details handed in. Are they valid?
430             foreach my $ckey (keys %config)
431             {
432             next if grep($ckey eq $_, @array_ref, @scalar, @sub, @hash);
433             warn "configure: unknown configuration variable $ckey\n";
434             }
435              
436             return %$self;
437             }
438              
439              
440             =head2 B
441              
442             Connect an Spread::Message object to a Spread Daemon and join any groups
443             that have been configured. You almost need to use this method. It
444             is called by you after B when you first create an object.
445              
446             $mbox->connect();
447              
448             You may wish to call this method if you B and later wish to
449             reconnect to the same or another Spread daemon.
450              
451             =cut
452              
453             sub connect
454             {
455             my $self = shift;
456             my $name = $self->name;
457              
458             #$sperrno = undef;
459             my($mbox, $private_group) = Spread::connect(
460             {
461             spread_name => $self->spread_name,
462             private_name => $name,
463             }
464             );
465             if($sperrno)
466             {
467             warn "Failed to connect to Spread daemon: $sperrno\n";
468             $self->mbox(0);
469             return 0;
470             }
471              
472             $self->{'private_group'} = $private_group;
473             $self->mbox($mbox);
474              
475             # Join into our groups if we have some to join
476             $self->join();
477              
478             return $mbox;
479             }
480              
481              
482             =head2 B
483              
484             Join any groups that have been configured.
485              
486             $mbox->join(); # Joins configured groups
487             $mbox->join('test'); # Joins the test group
488              
489             Note: connect will join groups configured for you. So don't call
490             join unless you need to.
491              
492             To find out what groups you have already joined use
493              
494             my @joined_grps = $mbox->joined;
495              
496             =cut
497              
498             sub join
499             {
500             my $self = shift;
501             my @groups = @_;
502             my $mbox = $self->mbox();
503              
504             @groups = @{$self->{'group'}} unless @groups;
505             my @current = $self->joined;
506              
507             # Join into our groups if we have some to join
508             if(@groups && $mbox)
509             {
510             my(@joined_groups) = grep( Spread::join($mbox,$_), @groups);
511              
512             unless($#groups == $#joined_groups)
513             {
514             warn "Failed to join one or more groups: $sperrno\n";
515             }
516              
517             @joined_groups = (@current, @joined_groups);
518             $self->joined(\@joined_groups);
519              
520             return @joined_groups;
521             }
522             return wantarray ? () : 0;
523             }
524              
525              
526             =head2 B
527              
528             Leave one or more groups we have joined previously
529              
530             $mbox->leave(@grps);
531              
532             =cut
533              
534             sub leave
535             {
536             my $self = shift;
537             my @groups = @_;
538            
539             @groups = @{$self->{'group'}} unless @groups;
540              
541             my $mbox = $self->mbox() || return 0;
542              
543             my @joined = $self->joined();
544             return 0 unless @joined;
545              
546             # Leave the groups
547             my @left = ();
548             for my $g (@groups)
549             {
550             unless(grep($_ eq $g,@joined))
551             {
552             warn "Can't leave $g. Not joined!\n";
553             next;
554             }
555             if(Spread::leave($mbox,$g))
556             {
557             push(@left,$g);
558             delete $self->{'members'}{$g} if defined $self->{'members'}{$g};
559             @joined = grep( $_ ne $g, @joined); # Remove group
560             }
561             else
562             {
563             warn "Failed to leave group $g: $sperrno\n";
564             }
565             }
566             $self->joined(\@joined); # Update what is left
567             return @left;
568             }
569              
570              
571             =head2 B
572              
573             Send a message to set of group/s
574              
575             $mbox->send(@grps,$msg);
576              
577             =cut
578              
579             # sends all the messages to the recipient in such a manner that a
580             # large message can be concatenated back together
581             sub sendall
582             {
583             my($self,$msg,@grp) = @_;
584              
585             return undef unless $self->{'mbox'};
586              
587             my $num = $#{@$msg};
588              
589             # These are guarenteed to arrive in order. Thanks Spread :-)
590             for(my $i=0; $i <= $num; $i++)
591             {
592             $self->logit("Sending partial message $i of $num to: ",CORE::join(",",@grp),"\n") if $self->debug;
593             $self->send(@grp,"Spread::Message part $i of $num\n".$msg->[$i]);
594             }
595             }
596              
597             sub send
598             {
599             my $self = shift;
600             return undef unless $self->{'mbox'};
601              
602             my $msg = pop(@_); # Message is last param
603             my @grps = @_;
604             unless(@grps)
605             {
606             warn "Nothing sent as no groups to send to";
607             return 0;
608             }
609              
610             my $mbox = $self->mbox;
611              
612             # Use agreed ordering and we don;t want to see what was sent
613             my $type = AGREED_MESS | SELF_DISCARD;
614              
615             # Check to see if they DO want to see the message
616             if(grep($self->me eq $_,@grps))
617             {
618             $type = AGREED_MESS;
619             }
620              
621             my $rtn = 0;
622             if(length($msg) > 100 * 1024)
623             {
624             warn "send -- message big [", length($msg), "] chopping\n" if $self->debug;
625              
626             my $size = 90 * 1024;
627             # Chop into 90K chunks and gather left overs as well :-)
628             my @chunks = unpack("A$size" x (length($msg)/$size + 1), $msg);
629             return $self->sendall(\@chunks,@grps);
630             }
631              
632             if(@grps > 1)
633             {
634             $rtn = Spread::multicast($mbox, $type, [@grps], 0,$msg);
635             }
636             else
637             {
638             $rtn = Spread::multicast($mbox, $type, $grps[0], 0,$msg);
639             }
640              
641             unless(defined $rtn)
642             {
643             warn "Failed to send data - $sperrno\n";
644             return 0;
645             }
646              
647             if($self->debug)
648             {
649             warn "Sent ", $msg,"\n";
650             }
651              
652             return $rtn;
653             }
654              
655              
656             =head2 B
657              
658             Send a message to set of group/s
659              
660             $mbox->sends(@grps,$msg);
661              
662             Note $msg is run through B so that B can be used to send
663             Perl code between processes.
664              
665             =cut
666              
667              
668             sub sends
669             {
670             my $self = shift;
671             return undef unless $self->{'mbox'};
672              
673             my $ref = pop(@_); # Message is last param
674             my @grps = @_;
675              
676             my $mbox = $self->mbox;
677             my $msg = $self->serialise($ref);
678              
679             return $self->send(@grps,$msg);
680             }
681              
682             =head2 B
683              
684             Send a message to set of logto group/s
685              
686             $mbox->config( logto => ['a','b'] );
687             or
688             $mbox->logto('a','b');
689              
690             $mbox->logit($msg); # Send the txt message
691              
692             You set the groups/addresses you want the messages sent to by configuring
693             the B variable.
694              
695             The message is formatted such that the process id and hostname are
696             prepended to the message. Much like this:
697              
698             Tue Jul 29 18:12:20 2003:[19239@localhost] Got status message
699              
700             =cut
701              
702              
703             sub logit (@)
704             {
705             my $self = shift;
706             my $h = hostname;
707              
708             my $prepend = scalar(localtime).":[$$" . '@' . "$h]:$Program_Name" .
709             "{" . $self->me . "} ";
710             my @to = $self->logto;
711             unless(@to)
712             {
713             warn $prepend,@_;
714             return;
715             }
716            
717             $self->send(@to,CORE::join("",($prepend,@_)));
718             }
719              
720              
721              
722             =head2 B
723              
724             decode a message that has been sent using B.
725              
726             my $msg = $mbox->decode() || die "Can't decode';
727             print "The command is: ", $msg->{'cmd'}, "\n";
728             print "The structure is: ", Dumper($mbox->command), "\n";
729              
730             As a side effect the variable $mbox->command() is set to hold the Perl
731             structure returned as a result of the decode.
732              
733             See FINE GRAINED CALLBACKS below for further details.
734              
735             =cut
736              
737             sub decode
738             {
739             my $self = shift;
740             return undef unless $self->{'mbox'};
741              
742             my $msg = $self->msg;
743              
744             # Decode message
745             eval $msg; if($@) { cluck "Bad perl code seen $msg"; return; }
746             $self->command($msg);
747             return $msg;
748             }
749              
750             =head2 B
751              
752             Disconnect from the Spread Daemon and reset internal states. The Basic
753             configuration remains however all details of the Spread connection are
754             lost.
755              
756             $mbox->disconnect();
757              
758             =cut
759              
760              
761              
762             sub disconnect
763             {
764             my $self = shift;
765             return undef unless $self->{'mbox'};
766              
767             my $mbox = $self->mbox;
768              
769             unless(Spread::disconnect($mbox))
770             {
771             warn "disconnect -- $sperrno\n";
772             }
773             $self->mbox(0);
774             delete $self->{'members'} if defined $self->{'members'};
775             $self->type(0);
776             $self->sender(0);
777              
778             my @grps = ();
779             $self->grps(\@grps);
780             $self->mess_type(0);
781             $self->endian(0);
782             $self->error(0);
783             $self->timeout(0);
784             $self->msg('');
785             $self->new_msg(0);
786             $self->joined(\@grps);
787             }
788              
789             =head2 B
790              
791             Poll to see if there is a new message waiting for picking up. Returns the
792             size of the message waiting.
793              
794             if($mbox->poll())
795             {
796             # Have a message to pick up
797             }
798             else
799             {
800             # Have NO message to pick up
801             }
802              
803              
804             =cut
805              
806              
807             sub poll
808             {
809             my $self = shift;
810             return undef unless $self->{'mbox'};
811              
812             my($messsize) = Spread::poll($self->mbox);
813             if(defined($messsize))
814             {
815             return $messsize;
816             }
817             else
818             {
819             warn "poll -- $sperrno\n";
820             }
821             }
822              
823              
824             =head2 B
825              
826             Pick up the next data message in the queue. B will loop until a
827             regular data message has been received. It calls B.
828              
829             # wait for a data message - this could be a while
830             my $msg = $mbox->get();
831              
832             =cut
833              
834             sub get
835             {
836             my $self = shift;
837             return undef unless $self->{'mbox'};
838              
839              
840             my($message) = $self->getmsg(1);
841              
842             until($self->new_msg && $self->Is_regular_mess())
843             {
844             $message = $self->getmsg(1);
845             }
846              
847             return $message;
848             }
849              
850             # Set all acessor slots and handle multi part messages
851             sub setstate
852             {
853             my $s = shift;
854             return undef unless $s->{'mbox'};
855              
856             my ($service_type, $sender, $groups, $mess_type, $endian, $message) = @_;
857              
858             $s->tm(time());
859              
860             # Check for timeout
861             if ($sperrno == 3)
862             {
863             $s->error($sperrno);
864             $s->new_msg(0);
865             $s->timeout(1);
866             $sperrno = 0;
867             return;
868             }
869              
870             unless(defined $service_type && defined $sender && defined $groups &&
871             defined $mess_type && defined $endian && defined $message
872             )
873             {
874             $s->error($sperrno);
875             $s->timeout(0);
876             $s->new_msg(0);
877             return;
878             }
879              
880             my @grps = ();
881             if( ref($groups) eq "SCALAR" )
882             {
883             @grps = ( $$groups );
884             }
885             else
886             {
887             @grps = @$groups;
888             }
889              
890             # Is it a partial message
891             if( $message =~ /^Spread::Message part (\d+) of (\d+)\s*$/m)
892             {
893             my($part,$total) = ($1,$2);
894              
895             # Remove the header details
896             $message =~ s/^Spread::Message part \d+ of \d+\n//s;
897             $s->logit("Got partial message part $part of $total from $sender\n") if $s->debug;
898              
899             # Is it the final message?
900             if($part == $total)
901             {
902             $s->{'partial'}{$sender} .= $message;
903             $s->msg($s->{'partial'}{$sender});
904             delete $s->{'partial'}{$sender};
905             $s->new_msg(1);
906             $s->type($service_type);
907             $s->sender($sender);
908             $s->grps(\@grps);
909             $s->mess_type($mess_type);
910             $s->endian($endian);
911             $s->error(0);
912             $s->timeout(0);
913              
914             return;
915             }
916             else # Just store away this piece
917             {
918             $s->{'partial'}{$sender} = '' if $part == 0;
919             $s->{'partial'}{$sender} .= $message;
920             $s->new_msg(0);
921             $s->timeout(0);
922             return;
923             }
924             }
925             else # Normal complete message
926             {
927             $s->type($service_type);
928             $s->sender($sender);
929             $s->grps(\@grps);
930             $s->mess_type($mess_type);
931             $s->endian($endian);
932             $s->error(0);
933             $s->timeout(0);
934              
935             $s->msg($message);
936             $s->new_msg(1);
937             }
938              
939             }
940              
941              
942              
943             =head2 B
944              
945             receive next bunch of messages and trigger any call backs as
946             required. Also pass all other arguments to any called routines.
947              
948             $mbox->rx($timeout,"loop 20");
949              
950             Will have B wait for $timeout seconds and call any of the defined
951             callback methods with a copy of $mbox and "loop 20" in this example.
952              
953             Every callback function can expect to receive at least one paramater
954             which is a copy of the B and then any further paramters as defined
955             in the call to B.
956              
957             B will return whatever the callback returns.
958              
959             =cut
960              
961             sub rx
962             {
963             my $self = shift;
964             return undef unless $self->{'mbox'};
965              
966             my $timeout = shift;
967              
968             my($message) = $self->getmsg($timeout);
969              
970             #print Dumper($self);
971              
972             # Check for timeouts first
973             if($self->timeout)
974             {
975             return $self->{'timeout_sub'}->($self,@_);
976             }
977              
978             # handle member messages first
979             if($self->Is_membership_mess)
980             {
981             return $self->{'member_sub'}->($self,@_);
982             }
983             # handle Regular message
984             elsif($self->Is_regular_mess)
985             {
986             return $self->{'message_sub'}->($self,@_);
987             }
988             # Only have errors left
989             else
990             {
991             return $self->{'error_sub'}->($self,@_);
992             }
993             }
994              
995              
996              
997             =head2 B
998              
999             get the next mesage from our queue and set the current state details
1000             accordingly. All the ACCESSOR functions below will be updated.
1001              
1002             my $msg = $mbox->getmsg($timeout)
1003              
1004             or
1005              
1006             $mbox->getmsg($timeout);
1007             my $msg = $mbox->msg;
1008              
1009             or
1010              
1011             $mbox->getmsg($timeout);
1012             if($mbox->new_msg)
1013             {
1014             my $msg = $mbox->msg;
1015             }
1016              
1017             B will return the next message only if there is one to return.
1018             Otherwise it returns a null string.
1019              
1020             With debugging turned on getmsg will also print details of messages
1021             received.
1022              
1023             =cut
1024              
1025             sub getmsg
1026             {
1027             my $self = shift;
1028             return undef unless $self->{'mbox'};
1029              
1030             my $wait = shift || 5;
1031              
1032             $self->setstate(Spread::receive($self->{'mbox'},$wait));
1033             if($self->debug && $self->new_msg)
1034             {
1035             my @grps = $self->grps;
1036             @grps = (' ') unless defined $grps[0];
1037             # Regular message?
1038             if($self->Is_regular_mess)
1039             {
1040             warn "** Regular Message received **\n";
1041             warn "Service Type : ",$self->type,"\n";
1042             warn "Sender : ",$self->sender,"\n";
1043             warn "Sent to : ", CORE::join(",",@grps),"\n";
1044             warn "Message Type : ",$self->mess_type,"\n";
1045             warn "Endian Missmatch : ",$self->endian ? "Yes" : "No" ,"\n";
1046             warn "I am : ",$self->me,"\n";
1047             warn "Message : ",$self->msg,"\n" if $self->debug > 1;
1048             }
1049             elsif($self->Is_membership_mess) # membership message
1050             {
1051            
1052             warn "** Membership Message received **\n";
1053             warn "Service Type : ",$self->type,"\n";
1054             warn "For group : ",$self->sender,"\n";
1055             warn "Sent to : ", CORE::join(",",@grps),"\n";
1056             warn "I member number : ",$self->mess_type,"\n";
1057             warn "Endian Missmatch : ",$self->endian ? "Yes" : "No" ,"\n";
1058             warn "I am : ",$self->me,"\n";
1059             }
1060             else
1061             {
1062             warn "** Unknown Message received **\n";
1063             warn "Service Type : ",$self->type,"\n";
1064             warn "Sender : ",$self->sender,"\n";
1065             warn "Sent to : ", CORE::join(",",@grps),"\n";
1066             warn "Message Type : ",$self->mess_type,"\n";
1067             warn "Endian Missmatch : ",$self->endian ? "Yes" : "No" ,"\n";
1068             print "I am : ",$self->me,"\n";
1069             warn "Message : ",$self->msg,"\n" if $self->debug > 1;
1070             }
1071             }
1072              
1073             # Why this message
1074             my $txt = '';
1075              
1076             # grps holds the complete memebership list for this sender. So store
1077             # it away for later query by members() function
1078             if($self->Is_reg_memb_mess) # Regular membership message
1079             {
1080             my $group = $self->sender; # Group it affects
1081             my @membership = $self->grps; # Who got the message
1082             $self->{'members'}{$group} = \@membership;
1083              
1084             # Also store away other stuff that is contained in the message
1085             # groupID, numGroups, Groups
1086             # 12bytes, 4bytes, sit on MAX_GROUP_NAME boundaries terminated by 0
1087             my $mgn = 32; # MAX_GROUP_NAME
1088             my $msg = $self->msg;
1089             my @gid = ();
1090             my $numg = 0;
1091             my $who;
1092             ($gid[0],$gid[1],$gid[2],$numg,$who) = unpack("IIIIa*",$msg);
1093              
1094             $who =~ s/[[:cntrl:]]+/ /go; # Just to clean it up
1095             $who =~ s/\s+$/ /go; # No space at end thanks
1096              
1097             # Establish why this message was recieved
1098             $txt = "$who joining" if $self->Is_caused_join_mess;
1099             $txt = "$who leaving" if $self->Is_caused_leave_mess;
1100             $txt = "$who disconnecting" if $self->Is_caused_disconnect_mess;
1101             $txt = "Network change" if $self->Is_caused_network_mess;
1102             if($self->debug)
1103             {
1104             warn "groupID = @gid, Num grps in msg = $numg\n";
1105             warn $txt,"\n";
1106             }
1107             }
1108             elsif($self->Is_transition_mess)
1109             {
1110             $txt = 'Transition for group '.$self->sender."\n";
1111             }
1112             elsif($self->Is_caused_leave_mess)
1113             {
1114             $txt = 'membership message that left group '.$self->sender."\n";
1115             }
1116             elsif($self->Is_reject_mess)
1117             {
1118             $txt = 'Reject from '.$self->sender."\n";
1119             }
1120             elsif($self->Is_regular_mess)
1121             {
1122             $txt = "regular message\n";
1123             }
1124             else
1125             {
1126             $txt = "Error unknown message\n";
1127             }
1128             $self->reason($txt);
1129              
1130             return $self->new_msg ? $self->msg : '';
1131             }
1132              
1133             sub to_int
1134             {
1135             my($buf,$offset) = @_;
1136             my @ints = ((substr($buf,$offset++,1) & 0xFF) x 4);
1137             return ($ints[0] << 24) | ($ints[1] << 16) | ($ints[2] << 8) | $ints[3];
1138             }
1139              
1140             sub members
1141             {
1142             my $self = shift;
1143             my @grps = @_;
1144              
1145             my @rtn = ();
1146              
1147             if(@grps)
1148             {
1149             foreach(@grps)
1150             {
1151             if(defined($self->{'members'}{$_}))
1152             {
1153             push(@rtn,@{$self->{'members'}{$_}});
1154             }
1155             }
1156             }
1157             else # Return everything
1158             {
1159             foreach(keys %{$self->{'members'}})
1160             {
1161             push(@rtn,@{$self->{'members'}{$_}});
1162             }
1163             }
1164             return @rtn;
1165             }
1166              
1167             =head1 ACCESSORS
1168              
1169             =over
1170              
1171             =item B - return the current Spread Mailbox connection id
1172              
1173             =cut
1174              
1175             sub mbox
1176             {
1177             $_[0]->{'mbox'} = $_[1] if defined $_[1];
1178             return $_[0]->{'mbox'};
1179             }
1180              
1181             =item B - return the current groups the last message was sent to
1182              
1183             =cut
1184              
1185             sub grps
1186             {
1187             $_[0]->{'last_groups'} = $_[1] if defined $_[1];
1188             return defined $_[0]->{'last_groups'} ? @{$_[0]->{'last_groups'}} : ();
1189             }
1190              
1191              
1192             =item B - return the current groups we have joined succesfully
1193              
1194             =cut
1195              
1196             sub joined
1197             {
1198             $_[0]->{'joined'} = $_[1] if defined $_[1];
1199             return defined $_[0]->{'joined'} ? @{$_[0]->{'joined'}} : ();
1200             }
1201              
1202             =item B - return the current groups we will log to
1203              
1204             =cut
1205              
1206             sub logto
1207             {
1208             $_[0]->{'logto'} = $_[1] if defined $_[1];
1209             return @{$_[0]->{'logto'}};
1210             }
1211              
1212             =item B - return the sender of the last message.
1213              
1214             =cut
1215              
1216             sub sender
1217             {
1218             $_[0]->{'last_sender'} = $_[1] if defined $_[1];
1219             return $_[0]->{'last_sender'};
1220             }
1221              
1222             =item B - return the service type of the last message.
1223              
1224             =cut
1225              
1226             sub type
1227             {
1228             $_[0]->{'last_service_type'} = $_[1] if defined $_[1];
1229             my $s = shift;
1230              
1231             return 'no type defined' unless defined $s->{'last_service_type'};
1232              
1233             if($s->Is_regular_mess)
1234             {
1235             return 'Is_regular_mess';
1236             }
1237             elsif($s->Is_transition_mess) # membership transistion
1238             {
1239             # sender will be set to the name of the group for which the
1240             # membership change is occuring.
1241             # The importance of the TRANS_MEMB_MESS is that it
1242             # tells the application that all messages received after
1243             # it and before the REG_MEMB_MESS for the same group are
1244             # 'clean up' messages to put the messages in a consistant
1245             # state before actually changing memberships.
1246             return 'Is_transition_mess';
1247             }
1248             elsif($s->Is_reg_memb_mess) # Regular membership message
1249             {
1250             # groups array will be set to the private group names of all
1251             # members of this group in the new membership
1252             return 'Is_reg_memb_mess';
1253             }
1254             elsif($s->Is_self_leave)
1255             {
1256             return 'Is_self_leave';
1257             }
1258             return $s->{'last_service_type'};
1259             }
1260              
1261             =item B - return the message type of the last message.
1262              
1263             =cut
1264              
1265             sub mess_type
1266             {
1267             $_[0]->{'last_mess_type'} = $_[1] if defined $_[1];
1268             return $_[0]->{'last_mess_type'};
1269             }
1270              
1271              
1272             =item B - return the reason we got the last message
1273              
1274             "$who joining"
1275             "$who leaving"
1276             "$who disconnecting"
1277             "Network change"
1278             'Transition for group '.$self->sender
1279             'membership message that left group '.$self->sender
1280             'Reject from '.$self->sender
1281             "regular message"
1282             "Error unknown message"
1283             =cut
1284              
1285             sub reason
1286             {
1287             $_[0]->{'reason'} = $_[1] if defined $_[1];
1288             return $_[0]->{'reason'};
1289             }
1290              
1291             =item B - return true if the last message has same endian
1292              
1293             =cut
1294              
1295             sub endian
1296             {
1297             $_[0]->{'last_endian'} = $_[1] if defined $_[1];
1298             return $_[0]->{'last_endian'};
1299             }
1300              
1301             =item B - return the last message.
1302              
1303             =cut
1304              
1305             sub msg
1306             {
1307             $_[0]->{'last_message'} = $_[1] if defined $_[1];
1308             return $_[0]->{'last_message'};
1309             }
1310              
1311             =item B - return the last Perl structure decoded using the
1312             B method.
1313              
1314             =cut
1315              
1316             sub command
1317             {
1318             $_[0]->{'command'} = $_[1] if defined $_[1];
1319             return $_[0]->{'command'};
1320             }
1321              
1322             =item B - return true if the last message was a new message
1323             indicates and error when false
1324              
1325             =cut
1326              
1327             sub new_msg
1328             {
1329             $_[0]->{'new_message'} = $_[1] if defined $_[1];
1330             return $_[0]->{'new_message'} == 1;
1331             }
1332              
1333             =item B - return the time the last message was received
1334              
1335             =cut
1336              
1337             sub tm
1338             {
1339             $_[0]->{'last_time'} = $_[1] if defined $_[1];
1340             return $_[0]->{'last_time'};
1341             }
1342              
1343             =item B - did the last rx() call time out?
1344              
1345             =cut
1346              
1347             sub timeout
1348             {
1349             $_[0]->{'timeout'} = $_[1] if defined $_[1];
1350             return $_[0]->{'timeout'};
1351             }
1352              
1353              
1354             =item B - return the last error as defined by Spread B
1355              
1356             =cut
1357              
1358             sub error
1359             {
1360             $_[0]->{'error'} = $_[1] if defined $_[1];
1361             return $_[0]->{'error'};
1362             }
1363              
1364             =item B - return my name as Spread knows it. This is needed to work out
1365             if a message was sent to me directly rather than via a group. It is
1366             effectively my private group name.
1367              
1368             =cut
1369              
1370             # private group
1371             sub me
1372             {
1373             $_[0]->{'private_group'} = $_[1] if defined $_[1];
1374             return $_[0]->{'private_group'};
1375             }
1376              
1377             =item B - return the Spread daemon details
1378              
1379             =cut
1380              
1381             # Spread daemon to connect to
1382             sub spread_name
1383             {
1384             $_[0]->{'spread_name'} = $_[1] if defined $_[1];
1385             return $_[0]->{'spread_name'};
1386             }
1387              
1388             =item B - return our defined name used when we first connected.
1389              
1390             =cut
1391              
1392             sub name
1393             {
1394             $_[0]->{'name'} = $_[1] if defined $_[1];
1395             return $_[0]->{'name'};
1396             }
1397              
1398             =item B - return our debug level
1399              
1400             =cut
1401              
1402             sub debug
1403             {
1404             $_[0]->{'debug'} = $_[1] if defined $_[1];
1405             return $_[0]->{'debug'};
1406             }
1407              
1408              
1409             =item B - Is the current message a control message for
1410             me.
1411              
1412             That is, does this message eminate from a .*-ctl group that I am joined
1413             to OR is it directed specifically at me.
1414              
1415             =cut
1416              
1417             sub control_msg
1418             {
1419             my $self = shift;
1420             return undef unless $self->{'mbox'};
1421              
1422             my $me = $self->me;
1423              
1424             # True if it is a regular message and from a control group or my
1425             # private group
1426             return $self->Is_regular_mess &&
1427             (grep(/-ctl$/,$self->grps) || grep($_ eq $me,$self->grps));
1428             }
1429              
1430              
1431             =item B - Is the previous message aimed specifically at me
1432              
1433             =cut
1434              
1435             sub aimed_at_me
1436             {
1437             my $self = shift;
1438             return undef unless $self->{'mbox'};
1439              
1440             my $me = $self->me;
1441              
1442             return grep($_ eq $me,$self->grps);
1443             }
1444              
1445             =back
1446              
1447             These methods return details of the current message. See the Spread
1448             documentation for further details.
1449              
1450             =over
1451              
1452             =item B
1453              
1454             =item B
1455              
1456             =item B
1457              
1458             =item B
1459              
1460             =item B
1461              
1462             =item B
1463              
1464             =item B
1465              
1466             =item B
1467              
1468             =item B
1469              
1470             =item B
1471              
1472             =item B
1473              
1474             =item B
1475              
1476             =item B
1477              
1478             =item B
1479              
1480             =item B
1481              
1482             =item B
1483              
1484             =item B
1485              
1486             =back
1487              
1488             =cut
1489              
1490             sub Is_unreliable_mess { $_[0]->{'last_service_type'} & UNRELIABLE_MESS }
1491             sub Is_reliable_mess { $_[0]->{'last_service_type'} & RELIABLE_MESS }
1492             sub Is_fifo_mess { $_[0]->{'last_service_type'} & FIFO_MESS }
1493             sub Is_causal_mess { $_[0]->{'last_service_type'} & CAUSAL_MESS }
1494             sub Is_agreed_mess { $_[0]->{'last_service_type'} & AGREED_MESS }
1495             sub Is_safe_mess { $_[0]->{'last_service_type'} & SAFE_MESS }
1496             sub Is_regular_mess { ($_[0]->{'last_service_type'} & REGULAR_MESS) && !($_[0]->{'last_service_type'} & REJECT_MESS) }
1497             sub Is_self_discard { $_[0]->{'last_service_type'} & SELF_DISCARD }
1498             sub Is_reg_memb_mess { $_[0]->{'last_service_type'} & REG_MEMB_MESS }
1499             sub Is_transition_mess { $_[0]->{'last_service_type'} & TRANSITION_MESS }
1500             sub Is_caused_join_mess { $_[0]->{'last_service_type'} & CAUSED_BY_JOIN }
1501             sub Is_caused_leave_mess { $_[0]->{'last_service_type'} & CAUSED_BY_LEAVE }
1502             sub Is_caused_disconnect_mess { $_[0]->{'last_service_type'} & CAUSED_BY_DISCONNECT }
1503             sub Is_caused_network_mess { $_[0]->{'last_service_type'} & CAUSED_BY_NETWORK }
1504             sub Is_membership_mess { ($_[0]->{'last_service_type'} & MEMBERSHIP_MESS) && !($_[0]->{'last_service_type'} & REJECT_MESS) }
1505             sub Is_reject_mess { $_[0]->{'last_service_type'} & REJECT_MESS }
1506             sub Is_self_leave { ($_[0]->{'last_service_type'} & CAUSED_BY_LEAVE) && !($_[0]->{'last_service_type'} & (REG_MEMB_MESS | TRANSITION_MESS)) }
1507              
1508             =head1 CALLBACKS
1509              
1510             Some very simple call back are provided. You should override these when
1511             calling B.
1512              
1513             They basically print out a little information and then return. These are
1514             defined as:
1515              
1516             Spread::Message::_member_sub
1517              
1518             Spread::Message::_message_sub
1519              
1520             Spread::Message::_error_sub
1521              
1522             Spread::Message::_timeout_sub
1523              
1524             You can use them if you like. But I wouldn't :-)
1525              
1526             =cut
1527              
1528              
1529             sub _member_sub
1530             {
1531             my $mbox = shift;
1532             my @args = @_;
1533              
1534             print scalar(localtime),": recieved a membership message\n";
1535             print scalar(localtime),": Because ",$mbox->reason,"\n";
1536             my @grps = $mbox->grps;
1537             print "Current grps are: ", CORE::join(", ",@grps),"\n" if defined $grps[0];
1538            
1539             my @joined = $mbox->joined;
1540             print "I have joined these groups: @joined\n";
1541             print "\t$_ => ", CORE::join(", ",$mbox->members($_)), "\n" foreach @joined;
1542             }
1543              
1544             sub _message_sub
1545             {
1546             my $mbox = shift;
1547             my @args = @_;
1548              
1549             print scalar(localtime),": recieved a message\n";
1550             print "Args are:", Dumper(\@args),"\n" if @args;
1551             print "Message was: >>",$mbox->msg,"<<\n";
1552             }
1553              
1554             sub _error_sub
1555             {
1556             my $mbox = shift;
1557             my @args = @_;
1558              
1559             print scalar(localtime),": Error callback triggered\n";
1560             print "Args are:", Dumper(\@args),"\n" if @args;
1561             print "Message was: >>",$mbox->msg,"<<\n";
1562             }
1563              
1564             sub _timeout_sub
1565             {
1566             my $mbox = shift;
1567             my @args = @_;
1568              
1569             print scalar(localtime),": Timeout callback triggered\n";
1570             print "Args are:", Dumper(\@args),"\n" if @args;
1571             }
1572              
1573              
1574             =head1 FINE GRAINED CALLBACKS
1575              
1576             Some fine grained callback subs are provided that you can extend. This
1577             makes creating Message programms a little easier. We provide a simple
1578             command interpreter that can handle commands sent to us using the
1579             B method. It assumes the messages sent are done in this form:
1580              
1581             %msg = (
1582             cmd => 'some sort of command',
1583             .
1584             .
1585             );
1586              
1587             The only requirement is that the hashref sent to B has a key
1588             called B, and that B contains a valid command name to call.
1589             Also, you must B the message to a specific Spread user not to a
1590             group. That is, B must return true when the message is
1591             received.
1592              
1593             We automatically handle commands where cmd is:
1594              
1595             shut or stop or die => program dies
1596             restart => program restarts itself
1597             clone => program creates another copy of self
1598             status => program sends() status info
1599              
1600             It assumes you have defined a 'default' function. If not then a message
1601             is printed.
1602              
1603             You can define your own commands to
1604             override the ones we provide. Or you can provide a single 'override'
1605             function. This is done like this:
1606              
1607             In the receiving application:
1608              
1609             use Data::Dumper;
1610              
1611             sub new
1612             {
1613             # We get the Spread::Message object and any args sent to rx()
1614             my($mbox,@args) = @_;
1615              
1616             # pick up decode command
1617             my %msg = %{$mbox->command};
1618            
1619             print "new() called with args @args\n";
1620             print "and message >",$mbox->msg,"<\n";
1621             }
1622              
1623             sub mydefault
1624             {
1625             # We get the Spread::Message object and any args sent to rx()
1626             my($mbox,@args) = @_;
1627              
1628             # pick up decoded command
1629             my %msg = %{$mbox->command};
1630            
1631             print "mydefault() called with args @args\n";
1632             print "and message >",$mbox->msg,"<\n";
1633             }
1634              
1635             my $mbox = Message->new(
1636             .
1637             .
1638             name => "fping$$",
1639             group => ['polling-ctl', 'polling-data'],
1640             .
1641             # This says use the fine grained commands
1642             commands => {
1643             'new' => \&new, # handle 'new' commands
1644             'default' => \&mydefault, # handle left over commands
1645              
1646             # Only define this if you want to catch ALL the commands
1647             #'override' => \&myoverride,
1648             },
1649             .
1650             .
1651             );
1652              
1653             while(1)
1654             {
1655             # Process different data as required
1656             $mbox->rx(30,'arg1','arg2');
1657             }
1658              
1659             In the sending application:
1660              
1661             sub process_control
1662             {
1663             my $mbox = shift;
1664              
1665             # A global array to hold stuff
1666             @Settings::pingers = grep(/fping/,$mbox->grps);
1667             }
1668              
1669             my $mbox = Message->new(
1670             .
1671             group => ['polling-ctl', 'polling-data'],
1672             member_sub => \&process_control,
1673             .
1674             );
1675              
1676             my %msg = (
1677             cmd => 'new'
1678             .
1679             .
1680             );
1681              
1682             # Use rx() to receive any membership messages and make sure you snarf
1683             # away the id of the receiving application. Should exist in
1684             # @Settings::pingers once a receiving application has joined a group
1685             # of ours
1686             $mbox->rx(30,undef);
1687             my $id = shift(@Settings::pingers);
1688              
1689             $mbox->sends($id,\%msg); # Send new command specifically to $id
1690             $msg{'cmd'} = 'restart';
1691             $mbox->sends($id,\%msg); # Send restart command specifically to $id
1692             $msg{'cmd'} = 'funny';
1693             $mbox->sends($id,\%msg); # Send funny command, will call default
1694             $msg{'cmd'} = 'clone';
1695             $mbox->sends($id,\%msg); # Send clone command specifically to $id
1696             $msg{'cmd'} = 'stop';
1697             $mbox->sends($id,\%msg); # Send stop command specifically to $id
1698              
1699             =cut
1700              
1701             sub handle_commands_aimed_at_me
1702             {
1703             my ($self,@args) = @_;
1704             return undef unless $self->{'mbox'};
1705              
1706              
1707             # Message must be regular, new and aimed at me. No group messages
1708             # allowed
1709             if($self->aimed_at_me && $self->new_msg && $self->Is_regular_mess)
1710             {
1711             $self->logit("Message for me :-)\n") if $self->debug;
1712             }
1713             else
1714             {
1715             return @args;
1716             }
1717             #logit("Got message >>", $msg, "<<\n");
1718              
1719             # Decode message
1720             my $msg = $self->decode || return;
1721             if( !defined $msg->{'cmd'})
1722             {
1723             $self->logit("Not a command message: no 'cmd' key in structure\n");
1724             return @args;
1725             }
1726             $_ = $msg->{'cmd'};
1727              
1728             $self->logit("Executing $_\n") if $self->debug;
1729              
1730             # Allow the user to override all our help!
1731             if(defined $self->{'commands'}{'override'})
1732             {
1733             $self->logit("Executing override()\n") if $self->debug;
1734             return $self->{'commands'}{'override'}->($self,@args);
1735             }
1736              
1737             # extract a command from the input and call its counterpart in the
1738             # %commands hash if it exists
1739             s/\s+.*//; # Remove everything after any white space
1740             if(defined $self->{'commands'}{$_})
1741             {
1742             $self->logit("Executing $_ ()\n") if $self->debug;
1743             return $self->{'commands'}{$_}->($self,@args);
1744             }
1745              
1746             # We are here only if the user hasn't overriden us or hasn't provided
1747             # a specific command handler for the command. If we can't provide a
1748             # handler for the command then we print a message and return
1749             if(/^shut|^stop|^die/i)
1750             {
1751             $self->logit("Exiting - bye!\n") if $self->debug;
1752             $self->disconnect();
1753             exit;
1754             }
1755             elsif(/^restart/i)
1756             {
1757             $self->logit("Disconnecting from Spread and restarting\n") if $self->debug;
1758             $self->disconnect();
1759             exec "$Command"; # Just rerun ourselves
1760             }
1761             elsif(/^clone/i)
1762             {
1763             $self->logit("Cloning a new process\n")if $self->debug;
1764             clone();
1765             }
1766             elsif(/^noop/i)
1767             {
1768             $self->logit("noop message recieved - check the sender\n")if $self->debug;
1769             }
1770             elsif(/^status/i)
1771             {
1772             $self->logit("Object details:\n") if $self->debug;
1773             $self->logit(Dumper($self),"\n") if $self->debug;
1774             $self->logit("Settings::state\n") if $self->debug;
1775             $self->logit(Dumper(\%Settings::state),"\n") if $self->debug;
1776             }
1777             elsif(defined $self->{'commands'}{'default'})
1778             {
1779             $self->logit("Calling Default Handler for >$_<.\n") if $self->debug;
1780             return $self->{'commands'}{'default'}->($self,@args);
1781             }
1782             else
1783             {
1784             $self->logit("No Default Handler for >$_<.\n");
1785             }
1786              
1787             return @args;
1788             }
1789              
1790              
1791             # Utility Functions
1792             sub clone
1793             {
1794             my($pid) = fork; # fork child
1795             if ($pid) # return if parent
1796             {
1797             #warn("Parent: $$ forked child: $pid");
1798             return;
1799             }
1800             die "Couldn't fork: $!\n" unless defined($pid);
1801              
1802             # Child code from here
1803             # Become our own session leader
1804             POSIX::setsid() ||
1805             die "Can't start new session: $!\n";
1806              
1807             # Exec ourselves from scratch
1808             #warn("Cloning - $Command");
1809             exec "$Command"; # Just rerun ourselves
1810             }
1811              
1812              
1813             =head1 Bugs and other stuff
1814              
1815             There are bound to be bugs in this code. It is first cut code that even
1816             though used extensively hasn't been used broadly. By that I mean, the
1817             bits of this code that I have used, works well for me, but my use isn't
1818             your use, and you may stumble across bugs.
1819              
1820             If you do find bugs, then please go to the effort of reporting it in a
1821             manner in which I can get a good understanding of what your talking
1822             about.
1823              
1824             Please note: I have no affiliation with The Spread Group Communication
1825             Toolkit. I also know next to nothing about messaging and group
1826             communication, so dont' ask me about these things.
1827              
1828             This module is offered in good faith as is.
1829              
1830             =cut
1831              
1832             =head1 TODO
1833              
1834             Lots-n-lots
1835              
1836             =cut
1837              
1838             =head1 Copyright
1839              
1840             Copyright 2003-2006, Mark Pfeiffer
1841              
1842             This code may be copied only under the terms of the Artistic License or
1843             the GNU General Public License, version 2 or later
1844             which may be found in the Perl 5 source kit.
1845              
1846             Use 'perldoc perlartistic' to see the Artistic License.
1847             Use 'perldoc perlgpl' to see the GPL License.
1848              
1849             Complete documentation for Perl, including FAQ lists, should be found on
1850             this system using `man perl' or `perldoc perl'. If you have access to the
1851             Internet, point your browser at http://www.perl.org/, the Perl Home Page.
1852              
1853             =cut
1854              
1855              
1856             1;