File Coverage

blib/lib/Spread/Messaging/Transport.pm
Criterion Covered Total %
statement 10 12 83.3
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 14 16 87.5


line stmt bran cond sub pod time code
1             package Spread::Messaging::Transport;
2              
3 1     1   14 use 5.008;
  1         3  
  1         39  
4 1     1   5 use strict;
  1         2  
  1         32  
5 1     1   4 use warnings;
  1         1  
  1         29  
6              
7 1     1   18873 use Spread;
  0            
  0            
8             use Spread::Messaging::Exception;
9              
10             require Exporter;
11              
12             our @ISA = qw(Exporter);
13              
14             # Items to export into callers namespace by default. Note: do not export
15             # names by default without a very good reason. Use EXPORT_OK instead.
16             # Do not simply export all your public functions/methods/constants.
17              
18             # This allows declaration use Spread::Transport ':all';
19             # If you do not need this, moving things directly into @EXPORT or @EXPORT_OK
20             # will save memory.
21              
22             our %EXPORT_TAGS = ( 'all' => [ qw(
23            
24             ) ] );
25              
26             our @EXPORT_OK = ( @{ $EXPORT_TAGS{'all'} } );
27              
28             our @EXPORT = qw(
29             UNRELIABLE_MESS
30             RELIABLE_MESS
31             FIFO_MESS
32             CAUSAL_MESS
33             AGREED_MESS
34             SAFE_MESS
35             REGULAR_MESS
36             SELF_DISCARD
37             DROP_RECV
38             REG_MEMB_MESS
39             TRANSITION_MESS
40             CAUSED_BY_JOIN
41             CAUSED_BY_LEAVE
42             CAUSED_BY_DISCONNECT
43             CAUSED_BY_NETWORK
44             MEMBERSHIP_MESS
45             ACCEPT_SESSION
46             ILLEGAL_GROUP
47             ILLEGAL_MESSAGE
48             ILLEGAL_SERVICE
49             ILLEGAL_SESSION
50             ILLEGAL_SPREAD
51             CONNECTION_CLOSED
52             COULD_NOT_CONNECT
53             BUFFER_TOO_SHORT
54             GROUPS_TOO_SHORT
55             MESSAGE_TOO_LONG
56             REJECT_ILLEGAL_NAME
57             REJECT_NOT_UNIQUE
58             REJECT_NO_NAME
59             REJECT_QUOTA
60             REJECT_VERSION
61             );
62              
63             our $VERSION = '0.03';
64              
65             # -------------------------------------------------------------------------
66             # public methods
67             # -------------------------------------------------------------------------
68              
69             sub new {
70             my $proto = shift;
71             my %params = @_;
72              
73             my $self = {};
74             my @foo = split(/\//, $0);
75             my $class = ref ($proto) || $proto;
76              
77             # Initialize variables - these are defaults.
78              
79             $self->{port} = '4803';
80             $self->{timeout} = '5';
81             $self->{host} = 'localhost';
82             $self->{service_type} = SAFE_MESS;
83             $self->{private_name} = sprintf("%0s%05d",
84             substr($ENV{USER} || pop @foo, 0, 5), $$);
85              
86             # Parse named parameters, these may overwrite, or suppliment the above.
87              
88             my ($k, $v);
89             local $_;
90              
91             if (defined ($v = delete $params{'-port'})) {
92              
93             $self->{port} = $v;
94              
95             }
96              
97             if (defined ($v = delete $params{'-host'})) {
98              
99             $self->{host} = $v;
100              
101             }
102              
103             if (defined ($v = delete $params{'-private_name'})) {
104              
105             $self->{private_name} = $v;
106              
107             }
108              
109             if (defined ($v = delete $params{'-timeout'})) {
110              
111             $self->{timeout} = $v;
112              
113             }
114              
115             if (defined ($v = delete $params{'-service_type'})) {
116              
117             $self->{service_type} = $v;
118              
119             }
120              
121             bless($self, $class);
122              
123             # Open the connection
124              
125             $self->connect();
126              
127             return $self;
128              
129             }
130              
131             sub connect {
132             my ($self) = @_;
133              
134             my $connection;
135              
136             $connection->{spread_name} = $self->{port} . '@' . $self->{host};
137             $connection->{private_name} = $self->{private_name};
138              
139             undef $sperrno;
140             ($self->{mailbox}, $self->{private_group}) = Spread::connect($connection);
141             if ($sperrno) {
142            
143             Spread::Messaging::Exception->throw(
144             errno => $sperrno + 0,
145             errstr => $sperrno
146             );
147              
148             }
149              
150             }
151              
152             sub join_group {
153             my ($self, $groups) = @_;
154              
155             my @groupss = split(',', $groups);
156              
157             foreach my $group (@groupss) {
158              
159             undef $sperrno;
160             Spread::join($self->{mailbox}, $group);
161             if ($sperrno) {
162              
163             Spread::Messaging::Exception->throw(
164             errno => $sperrno + 0,
165             errstr => $sperrno
166             );
167              
168             }
169              
170             }
171              
172             }
173              
174             sub leave_group {
175             my ($self, $groups) = @_;
176              
177             my @groupss = split(',', $groups);
178            
179             foreach my $group (@groupss) {
180              
181             undef $sperrno;
182             Spread::leave($self->{mailbox}, $group);
183             if ($sperrno) {
184              
185             Spread::Messaging::Exception->throw(
186             errno => $sperrno + 0,
187             errstr => $sperrno
188             );
189              
190             }
191              
192             }
193              
194             }
195              
196             sub poll {
197             my ($self) = @_;
198              
199             undef $sperrno;
200             my $size = Spread::poll($self->{mailbox});
201             if ($sperrno) {
202              
203             Spread::Messaging::Exception->throw(
204             errno => $sperrno + 0,
205             errstr => $sperrno
206             );
207              
208             }
209              
210             return $size;
211              
212             }
213              
214             sub send {
215             my ($self, $group, $message, $type) = @_;
216              
217             undef $sperrno;
218             Spread::multicast($self->{mailbox}, $self->{service_type}, $group, $type, $message);
219             if ($sperrno) {
220              
221             Spread::Messaging::Exception->throw(
222             errno => $sperrno + 0,
223             errstr => $sperrno
224             );
225              
226             }
227              
228             }
229              
230             sub recv {
231             my ($self) = @_;
232              
233             my ($data, $content);
234              
235             undef $sperrno;
236             my ($service_type, $sender, $groups, $msg_type, $endian, $message) =
237             Spread::receive($self->{mailbox}, $self->{timeout});
238             if ($sperrno) {
239              
240             Spread::Messaging::Exception->throw(
241             errno => $sperrno + 0,
242             errstr => $sperrno
243             );
244              
245             }
246              
247             $data = $message;
248              
249             if ($service_type & MEMBERSHIP_MESS) {
250              
251             if ($service_type & REG_MEMB_MESS) {
252              
253             $data = _decode_reg_memb_mess($self, $service_type, $message);
254              
255             } elsif ($service_type & TRANSITION_MESS) {
256              
257             $data = _decode_transition_mess($self, $service_type, $message);
258              
259             }
260              
261             }
262              
263             return $service_type, $sender, $groups, $msg_type, $endian, $data;
264              
265             }
266              
267             sub disconnect {
268             my ($self) = @_;
269              
270             undef $sperrno;
271             Spread::disconnect($self->{mailbox});
272             if ($sperrno) {
273              
274             Spread::Messaging::Exception->throw(
275             errno => $sperrno + 0,
276             errstr => $sperrno
277             );
278              
279             }
280              
281             }
282              
283             # -------------------------------------------------------------------------
284             # public accessors
285             # -------------------------------------------------------------------------
286              
287             sub fd {
288             my ($self) = @_;
289              
290             return($self->{mailbox});
291              
292             }
293              
294             sub service_type {
295             my ($self, $p) = @_;
296              
297             $self->{service_type} = $p if ((defined $p) &&
298             (($p == UNRELIABLE_MESS) ||
299             ($p == RELIABLE_MESS) ||
300             ($p == FIFO_MESS) ||
301             ($p == CAUSAL_MESS) ||
302             ($p == AGREED_MESS) ||
303             ($p == SAFE_MESS)));
304             return($self->{service_type});
305              
306             }
307              
308             sub timeout {
309             my ($self, $p) = @_;
310              
311             $self->{timeout} = $p if defined $p;
312             return($self->{timeout});
313              
314             }
315              
316             sub host {
317             my ($self, $p) = @_;
318              
319             $self->{host} = $p if defined $p;
320             return($self->{host});
321              
322             }
323              
324             sub port {
325             my ($self, $p) = @_;
326              
327             $self->{port} = $p if defined $p;
328             return($self->{port});
329              
330             }
331              
332             sub private_group {
333             my ($self, $p) = @_;
334              
335             $self->{private_group} = $p if defined $p;
336             return($self->{private_group});
337              
338             }
339              
340             sub private_name {
341             my ($self, $p) = @_;
342              
343             $self->{private_name} = $p if defined $p;
344             return($self->{private_name});
345              
346             }
347              
348             DESTROY {
349             my ($self) = @_;
350              
351             Spread::disconnect($self->{mailbox}) if defined $self->{mailbox};
352              
353             }
354              
355             # -------------------------------------------------------------------------
356             # Private methods
357             # -------------------------------------------------------------------------
358              
359             sub _decode_reg_memb_mess {
360             my ($self, $service_type, $message) = @_;
361              
362             # Try to decode the message buffer. This really should
363             # be done in the xs code, not here in perl. And the returned
364             # buffer doesn't match what the Spread C API documentation says
365             # should be there. Arrgh...
366             #
367             # The first 12 bytes is the group ID, the remainer of the
368             # buffer is the current private group of this session.
369             #
370             # It would have been really nice if the buffer had been broken up into
371             # delimited fields, of which the first three fields would be the group ID,
372             # the next field the private group, the next field a count
373             # and the rest the fields were the actual names of the group memebers,
374             # the number of whom would have matched the count. This would have
375             # been a nice approximation of the actual C data structure.
376             #
377             # Oh well, such is life.
378              
379             my @data;
380              
381             if (($service_type & CAUSED_BY_JOIN) ||
382             ($service_type & CAUSED_BY_LEAVE) ||
383             ($service_type & CAUSED_BY_DISCONNECT)) {
384              
385             @data = unpack("I[3]A*", $message);
386              
387             } elsif ($service_type & CAUSED_BY_NETWORK) {
388              
389             @data = unpack("I[3]A*", $message);
390              
391             }
392              
393             return \@data;
394              
395             }
396              
397             sub _decode_transition_mess {
398             my ($self, $service_type, $message) = @_;
399              
400             # Try to decode the message buffer. This really should
401             # be done in the xs code, not here in perl.
402              
403             my @data = unpack("I3", $message);
404              
405             return \@data;
406              
407             }
408              
409             1;
410              
411             __END__