File Coverage

blib/lib/Net/PSYC/Datagram.pm
Criterion Covered Total %
statement 72 89 80.9
branch 19 38 50.0
condition 11 28 39.2
subroutine 8 10 80.0
pod 0 7 0.0
total 110 172 63.9


line stmt bran cond sub pod time code
1             package Net::PSYC::Datagram;
2              
3             our $VERSION = '0.5';
4              
5 1     1   7 use strict;
  1         3  
  1         54  
6 1     1   1142 use IO::Socket::INET;
  1         22715  
  1         11  
7              
8             import Net::PSYC qw( watch add W sendmsg same_host send_mmp parse_uniform BLOCKING makeMSG make_psyc parse_psyc parse_mmp PSYC_PORT PSYCS_PORT register_host register_route make_mmp UNL);
9              
10             sub TRUST {
11 0     0 0 0 return 1;
12             }
13              
14             sub new {
15 2     2 0 826 my $class = shift;
16              
17 2   100     13 my $addr = shift || undef; # NOT 127.1
18 2   50     19 my $port = int(shift||0) || undef; # also, NOT 4404
19              
20 2         11 my %a = (LocalPort => $port, Proto => 'udp');
21 2 100       7 $a{LocalAddr} = $addr if $addr;
22 2 50       16 my $socket = IO::Socket::INET->new(%a)
23             or return $!;
24 2   66     557 my $self = {
25             'SOCKET' => $socket,
26             'IP' => $socket->sockhost,
27             'PORT' => $port || $socket->sockport,
28             'TYPE' => 'd',
29             'I_BUFFER' => '',
30             'O_BUFFER' => [],
31             'O_COUNT' => 0,
32             'LF' => '',
33             };
34 2         124 W1('UDP bind to %s:%s successful', $self->{'IP'}, $self->{'PORT'});
35 2         6 bless $self, $class;
36              
37 2 50       7 watch($self) unless (BLOCKING() & 2);
38 2 50   2   5 add($self->{'SOCKET'}, 'w', sub {$self->write()}, 0)
  2         6  
39             unless (BLOCKING() & 1);
40            
41 2         9 return $self;
42             }
43              
44             # send ( target, mc, data, vars )
45             sub send {
46 1     1 0 2 my $self = shift;
47 1         2 my ( $target, $data, $vars ) = @_;
48 1         4 W2('send(%s, %s, %s)', $target, $data, $vars);
49            
50 1         1 push(@{$self->{'O_BUFFER'}}, [ [$vars, $data, $target, 0 ] ]);
  1         4  
51              
52 1 50 33     4 if (BLOCKING() || $Net::PSYC::ANACHRONISM) { # send the packet instantly
53 0         0 return !$self->write();
54             } else {
55 1         4 Net::PSYC::Event::revoke($self->{'SOCKET'});
56             }
57 1         5 return 0;
58             }
59              
60             sub write () {
61 2     2 0 4 my $self = shift;
62              
63 2 100       2 return 1 if (!${$self->{'O_BUFFER'}}[$self->{'O_COUNT'}]);
  2         12  
64            
65             # get a packet from the buffer
66 1         1 my $packet = shift(@{${$self->{'O_BUFFER'}}[$self->{'O_COUNT'}]});
  1         2  
  1         3  
67 1         2 my $target = $packet->[2];
68 1         10 my ($user, $host, $port, $type, $object) = parse_uniform($target);
69            
70 1   33     4 $port ||= PSYC_PORT();
71            
72 1   33     6 $packet->[0]->{'_target'} ||= $target;
73              
74             # funny, but not what we want.. returns 0.0.0.0 for INADDR_ANY and even
75             # when the ip is useful, the port may not - the other side should better
76             # use its own peer info. or the perl app provides _source.
77             #
78             # $vars->{'_source'} |= "psyc://$self->{'IP'}:$self->{'PORT'}/";
79              
80 1         2 my $m = ".\n"; # empty packet!
81 1         11 $m .= make_mmp($packet->[0], $packet->[1]);
82            
83 1 50       4 unless ($host) {
84 0         0 W0('This target (%s) needs a host. Dropping message.', $target);
85 0         0 return 1;
86             }
87              
88 1         163 my $taddr = gethostbyname($host); # hm.. strange thing!
89 1         6 my $tin = sockaddr_in($port, $taddr);
90            
91 1 50       21 if (!defined($self->{'SOCKET'}->send($m, 0, $tin))) {
92 0 0       0 if (++$packet->[3] >= 3) {
93 0         0 W0('Delivery of a udp packet to %s failed for the third time. Dropping message.', $target);
94 0         0 return 1;
95             }
96 0         0 unshift(@{${$self->{'O_BUFFER'}}[$self->{'O_COUNT'}]}, $packet);
  0         0  
  0         0  
97 0         0 return 1;
98             }
99 1   33     119 W1('UDP[%s:%s] <= %s', $host, $port,
100             $packet->[0]->{'_source'} || UNL());
101 1 50       2 if (!scalar(@{${$self->{'O_BUFFER'}}[$self->{'O_COUNT'}]})) {
  1         1  
  1         5  
102             # all fragments of this packet sent
103 1         2 splice(@{$self->{'O_BUFFER'}}, $self->{'O_COUNT'}, 1);
  1         5  
104 1 50       2 $self->{'O_COUNT'} = 0 if (!${$self->{'O_BUFFER'}}[$self->{'O_COUNT'}]);
  1         4  
105             } else {
106             # fragments of this packet left
107 0 0       0 $self->{'O_COUNT'} = 0 if (!${$self->{'O_BUFFER'}}[++$self->{'O_COUNT'}]);
  0         0  
108             }
109 1 50       3 if(scalar(@{$self->{'O_BUFFER'}})) {
  1         4  
110 0 0 0     0 if (BLOCKING() || $Net::PSYC::ANACHRONISM) {
111 0         0 $self->write();
112             } else {
113 0         0 Net::PSYC::Event::revoke($self->{'SOCKET'});
114             }
115             }
116 1         7 return 1;
117             }
118              
119             sub read () {
120 1     1 0 1 my $self = shift;
121 1         2 my ($data, $last);
122            
123 1         15 $self->{'LAST_RECV'} = $self->{'SOCKET'}->recv($data, 8192); # READ socket
124              
125 1 50       37 return if (!$data); # connection lost !?
126             # gibt es nen 'richtigen' weg herauszufinden, ob der socket noch lebt?
127              
128 1         2 $self->{'I_BUFFER'} .= $data;
129 1         3 delete $self->{'LF'};
130 1         6 return 1;
131             }
132              
133 0     0 0 0 sub negotiate { 1 }
134              
135             # returns _one_ mmp-packet .. or undef if the buffer is empty
136             sub recv () {
137 2     2 0 3 my $self = shift;
138 2 100       8 if (length($self->{'I_BUFFER'}) > 2) {
139 1 50 33     20 if ( $self->{'LF'} || $self->{'I_BUFFER'} =~ s/^\.(\r?\n)//g ) {
140            
141 1   33     7 $self->{'LF'} ||= $1;
142 1         5 my ($vars, $data) = parse_mmp(\$$self{'I_BUFFER'}, $self->{'LF'});
143 1 50       4 return if (!defined $vars);
144 1 50       3 unless (exists $vars->{'_source'}) {
145 1         5 my ($port, $ip) = sockaddr_in($self->{'LAST_RECV'});
146 1         11 $vars->{'_source'} = "psyc://$ip:$port";
147             }
148 1         4 return ($vars, $data);
149             }
150             # TODO : we need to provide a proper algorithm to clean up the
151             # in-buffer if we got corrupted packets in it. and we need to
152             # detect corrupted packets.. udp sucks noodles! ,-)
153             }
154 1         3 return;
155             }
156              
157              
158              
159             1;