File Coverage

blib/lib/Event/RPC/Message.pm
Criterion Covered Total %
statement 9 86 10.4
branch 0 44 0.0
condition 0 8 0.0
subroutine 3 18 16.6
pod 0 15 0.0
total 12 171 7.0


line stmt bran cond sub pod time code
1              
2             #-----------------------------------------------------------------------
3             # Copyright (C) 2002-2006 Jörn Reder .
4             # All Rights Reserved. See file COPYRIGHT for details.
5             #
6             # This module is part of Event::RPC, which is free software; you can
7             # redistribute it and/or modify it under the same terms as Perl itself.
8             #-----------------------------------------------------------------------
9              
10             package Event::RPC::Message;
11              
12 1     1   14 use Carp;
  1         1  
  1         64  
13 1     1   5 use strict;
  1         2  
  1         18  
14 1     1   2175 use Storable;
  1         5194  
  1         1530  
15              
16             my $DEBUG = 0;
17             my $MAX_PACKET_SIZE = 2*1024*1024*1024;
18              
19 0     0 0   sub get_sock { shift->{sock} }
20              
21 0     0 0   sub get_buffer { shift->{buffer} }
22 0     0 0   sub get_length { shift->{length} }
23 0     0 0   sub get_written { shift->{written} }
24              
25 0     0 0   sub set_buffer { shift->{buffer} = $_[1] }
26 0     0 0   sub set_length { shift->{length} = $_[1] }
27 0     0 0   sub set_written { shift->{written} = $_[1] }
28              
29             sub get_max_packet_size {
30 0     0 0   return $MAX_PACKET_SIZE;
31             }
32              
33             sub set_max_packet_size {
34 0     0 0   my $class = shift;
35 0           my ($value) = @_;
36 0           $MAX_PACKET_SIZE = $value;
37             }
38              
39             sub new {
40 0     0 0   my $class = shift;
41 0           my ($sock) = @_;
42              
43 0           my $self = bless {
44             sock => $sock,
45             buffer => undef,
46             length => 0,
47             written => 0,
48             }, $class;
49              
50 0           return $self;
51             }
52              
53             sub read {
54 0     0 0   my $self = shift;
55 0           my ($blocking) = @_;
56              
57 0 0         $self->get_sock->blocking($blocking?1:0);
58            
59 0 0         if ( not defined $self->{buffer} ) {
60 0           my $length_packed;
61 0 0         $DEBUG && print "DEBUG: going to read header...\n";
62 0           my $rc = sysread ($self->get_sock, $length_packed, 4);
63 0 0         $DEBUG && print "DEBUG: header read rc=$rc\n";
64 0 0 0       die "DISCONNECTED" if !(defined $rc) || $rc == 0;
65 0           $self->{length} = unpack("N", $length_packed);
66 0 0         $DEBUG && print "DEBUG: packet size=$self->{length}\n";
67             die "Incoming message size exceeds limit of $MAX_PACKET_SIZE bytes"
68 0 0         if $self->{length} > $MAX_PACKET_SIZE;
69             }
70              
71 0   0       my $buffer_length = length($self->{buffer}||'');
72              
73 0 0         $DEBUG && print "DEBUG: going to read packet... (buffer_length=$buffer_length)\n";
74              
75             my $rc = sysread (
76             $self->get_sock,
77             $self->{buffer},
78 0           $self->{length} - $buffer_length,
79             $buffer_length
80             );
81              
82 0 0         $DEBUG && print "DEBUG: packet read rc=$rc\n";
83              
84 0 0         return if not defined $rc;
85 0 0         die "DISCONNECTED" if $rc == 0;
86              
87 0           $buffer_length = length($self->{buffer});
88              
89             $DEBUG && print "DEBUG: more to read... ($self->{length} != $buffer_length)\n"
90 0 0 0       if $self->{length} != $buffer_length;
91              
92 0 0         return if $self->{length} != $buffer_length;
93              
94 0 0         $DEBUG && print "DEBUG: read finished, length=$buffer_length\n";
95              
96 0           my $data = Storable::thaw($self->{buffer});
97              
98 0           $self->{buffer} = undef;
99 0           $self->{length} = 0;
100              
101 0           return $data;
102             }
103              
104             sub read_blocked {
105 0     0 0   my $self = shift;
106              
107 0           my $rc;
108 0           $rc = $self->read(1) while not defined $rc;
109              
110 0           return $rc;
111             }
112              
113             sub set_data {
114 0     0 0   my $self = shift;
115 0           my ($data) = @_;
116              
117 0 0         $DEBUG && print "DEBUG: Message->set_data($data)\n";
118              
119 0           my $packed = Storable::nfreeze ($data);
120              
121 0 0         if ( length($packed) > $MAX_PACKET_SIZE ) {
122 0           Event::RPC::Server->instance->log("ERROR: response packet exceeds limit of $MAX_PACKET_SIZE bytes");
123 0           $data = { rc => 0, msg => "Response packed exceeds limit of $MAX_PACKET_SIZE bytes" };
124 0           $packed = Storable::nfreeze ($data);
125             }
126              
127 0           $self->{buffer} = pack("N", length($packed)).$packed;
128 0           $self->{length} = length($self->{buffer});
129 0           $self->{written} = 0;
130              
131 0           1;
132             }
133              
134             sub write {
135 0     0 0   my $self = shift;
136 0           my ($blocking) = @_;
137              
138 0 0         $self->get_sock->blocking($blocking?1:0);
139              
140             my $rc = syswrite (
141             $self->get_sock,
142             $self->{buffer},
143             $self->{length}-$self->{written},
144             $self->{written},
145 0           );
146              
147 0 0         $DEBUG && print "DEBUG: written rc=$rc\n";
148              
149 0 0         return if not defined $rc;
150              
151 0           $self->{written} += $rc;
152              
153 0 0         if ( $self->{written} == $self->{length} ) {
154 0 0         $DEBUG && print "DEBUG: write finished\n";
155 0           $self->{buffer} = undef;
156 0           $self->{length} = 0;
157 0           return 1;
158             }
159              
160 0 0         $DEBUG && print "DEBUG: more to be written...\n";
161              
162 0           return;
163             }
164              
165             sub write_blocked {
166 0     0 0   my $self = shift;
167 0           my ($data) = @_;
168              
169 0           $self->set_data($data);
170              
171 0           my $finished = 0;
172 0           $finished = $self->write(1) while not $finished;
173              
174 0           1;
175             }
176              
177             1;
178              
179             __END__