File Coverage

blib/lib/Net/IMP/Remote/Connection.pm
Criterion Covered Total %
statement 24 156 15.3
branch 0 78 0.0
condition 0 23 0.0
subroutine 8 20 40.0
pod 0 10 0.0
total 32 287 11.1


line stmt bran cond sub pod time code
1 1     1   4 use strict;
  1         2  
  1         36  
2 1     1   3 use warnings;
  1         1  
  1         35  
3              
4             package Net::IMP::Remote::Connection;
5 1     1   3 use fields qw(ev fd rbuf wbuf onwrite onclose need_more_rbuf wire analyzer max_analyzer_id);
  1         1  
  1         5  
6 1     1   76 use Net::IMP::Remote::Protocol;
  1         1  
  1         56  
7 1     1   6 use Net::IMP::Debug;
  1         1  
  1         6  
8 1     1   103 use Scalar::Util 'weaken';
  1         2  
  1         53  
9 1     1   5 use Errno 'EIO';
  1         1  
  1         105  
10 1     1   3 use Carp;
  1         2  
  1         1659  
11              
12             sub new {
13 0     0 0   my ($class,$fd,$side,%args) = @_;
14 0           my $impl = Net::IMP::Remote::Protocol
15             ->load_implementation(delete $args{impl});
16 0           my $wire = $impl->new;
17 0           my $self = fields::new($class);
18 0           %$self = (
19             ev => delete $args{eventlib},
20             fd => $fd,
21             rbuf => '',
22             need_more_rbuf => 1,
23             wbuf => '',
24             wire => $wire,
25             analyzer => {},
26             max_analyzer_id => 1,
27             );
28              
29 0           debug("init wire=$self->{wire} self=$self");
30 0 0         if ( my $buf = $wire->init($side) ) {
31 0           debug("send initial data");
32 0 0         $self->write($buf) or return;
33             }
34              
35 0           return $self;
36             }
37              
38             sub onClose {
39 0     0 0   my ($self,$cb) = @_;
40 0           $self->{onclose} = $cb
41             }
42              
43             sub close:method {
44 0     0 0   my ($self,$error) = @_;
45 0           debug("close");
46 0 0         warn "[$self] error: $error\n" if $error;
47 0 0         if ( my $ev = $self->{ev} ) {
48 0           $ev->onread($self->{fd},undef);
49 0           $ev->onwrite($self->{fd},undef);
50             }
51 0 0         $self->{onclose}->($error) if $self->{onclose};
52 0           %$self = ();
53             }
54              
55             sub add_analyzer {
56 0     0 0   my ($self,$obj,$id) = @_;
57 0 0         if ( ! $id ) {
    0          
58 0           while (1) {
59 0           $id = $self->{max_analyzer_id}++;
60 0 0         $id = $self->{max_analyzer_id} = 1 if $id > 0x7fffffff;
61 0 0         last if ! $self->{analyzer}{$id};
62             }
63             } elsif ( $self->{analyzer}{$id} ) {
64 0           return;
65             }
66 0           $self->{analyzer}{$id} = $obj;
67 0           return $id;
68             }
69              
70             sub weak_add_analyzer {
71 0     0 0   my $self = shift;
72 0 0         my $id = $self->add_analyzer(@_) or return;
73 0           weaken( $self->{analyzer}{$id} );
74 0           return $id;
75             }
76              
77             sub get_analyzer {
78 0     0 0   my ($self,$id) = @_;
79 0           $self->{analyzer}{$id};
80             }
81              
82             sub del_analyzer {
83 0     0 0   my ($self,$id) = @_;
84 0           delete $self->{analyzer}{$id};
85             }
86              
87             sub rpc {
88 0     0 0   my ($self,$call,$actions) = @_;
89 0 0         my $wire = $self->{wire} or do {
90 0           debug("no more wire there to call @$call");
91 0           return;
92             };
93 0           my $buf = $wire->rpc2buf($call);
94 0 0         if ( defined wantarray ) {
95 0           debug("blocking rpc $call->[0]");
96 0   0       return $self->write($buf) &&
97             ( $actions ? $self->nextop($actions) : 1 )
98             }
99 0           debug("non-blocking rpc $call->[0]");
100 0           $self->write($buf);
101 0 0         $self->nextop($actions) if $actions;
102             }
103              
104             # write data in buffer
105             # !eventlib || defined wantarray -> blocking write
106             # otherwise: nonblocking with event handler for writing rest
107             sub write {
108 0     0 0   my $self = shift;
109 0 0         $self->{wbuf} .= shift if @_;
110 0 0 0       if ( $self->{wbuf} eq '' ) {
    0          
111 0           debug("nothing to write");
112 0           return 1; # nothing to write
113             } elsif ( defined wantarray or ! $self->{ev} ) {
114             # blocking write
115 0           while ( $self->{wbuf} ne '' ) {
116 0           my $n = syswrite($self->{fd},$self->{wbuf});
117 0 0 0       if ( $n ) {
    0          
118 0           debug("wrote %d of %d bytes",$n,length($self->{wbuf}));
119 0           substr($self->{wbuf},0,$n,'');
120             } elsif ( ! defined $n and $!{EAGAIN} ) {
121 0           debug("short write - blocking wait for writable socket");
122 0           vec(my $win = '',fileno($self->{fd}),1) = 1;
123 0 0         select(undef,$win,undef,undef) or do {
124 0           $self->close("failed to select: $!");
125             return
126 0           }
127             } else {
128 0           $self->close("failed to write: $!");
129 0           return;
130             }
131             }
132 0           debug("blocking write completed");
133 0           return 1;
134             } else {
135             # async write
136 0           my $n = syswrite($self->{fd},$self->{wbuf});
137 0 0         if ( $n ) {
    0          
138 0           substr($self->{wbuf},0,$n,'');
139 0 0         if ( $self->{wbuf} eq '' ) {
140 0           debug("non-blocking write completed");
141 0           return 1;
142             } else {
143 0           debug("non-blocking short write %d of %d",$n,
144             $n+length($self->{wbuf}));
145             }
146             } elsif ( $!{EAGAIN} ) {
147 0           $self->close("failed to write: $!");
148 0           return;
149             }
150              
151 0           debug("async continue write if socket writable");
152 0   0       $self->{ev}->onwrite( $self->{fd}, $self->{onwrite} ||= do {
153             # callback to write rest of wbuf
154 0           weaken( my $wself = $self );
155             sub {
156 0   0 0     $wself->write() or return while $wself->{wbuf} ne '';
157 0           $wself->{ev}->onwrite($wself->{fd},undef);
158 0           };
159             });
160 0           return 1;
161             }
162             }
163              
164              
165             # handle single (or up to $max) operation
166             # !eventlib || defined wantarray -> block until one operation done
167             # otherwise: setup event handler if no operation fully read yet
168             sub nextop {
169 0     0 0   my ($self,$actions,$max,$incb) = @_;
170 0 0 0       my $block = $incb ? 0 : defined wantarray || ! $self->{ev};
171              
172             NEXTOP:
173 0           while ( $self->{need_more_rbuf} ) {
174 0 0         debug("trying to read%s...", $incb ? ' inside read-callback':'' );
175 0           my $n = sysread($self->{fd},$self->{rbuf},8192,length($self->{rbuf}));
176 0   0       debug("read done -> ".( $n // $! ));
177 0 0         if ( $n ) {
    0          
    0          
    0          
    0          
178 0           last;
179             } elsif ( defined $n ) {
180             # eof
181 0 0         if ( $self->{rbuf} eq '' ) {
182 0           $self->close();
183 0           return 0
184             } else {
185             # consider eof within data block as error
186 0           $! = EIO;
187 0           $self->close("eof inside operation");
188 0           return;
189             }
190             } elsif ( ! $!{EAGAIN} ) {
191 0           $self->close("failed to read: $!");
192 0           return;
193             } elsif ( $incb ) {
194             # async wait, but we are inside callback already, so just
195             # wait for more data
196 0           debug("waiting for more inside existing callback");
197 0           return;
198             } elsif ( $block ) {
199             # blocking wait for new data
200 0           vec(my $rin = '',fileno($self->{fd}),1) = 1;
201 0 0         select($rin,undef,undef,undef) or do {
202 0           $self->close("select failed: $!");
203 0           return;
204             };
205 0           next;
206             }
207              
208             # async wait for more
209 0           debug("install onread callback for more data");
210 0           weaken(my $wself = $self);
211             $self->{ev}->onread($self->{fd}, sub {
212 0     0     $wself->nextop($actions,$max,1);
213 0           });
214 0           return;
215             }
216              
217 0           my $rpc = $self->{wire}->buf2rpc(\$self->{rbuf});
218 0 0         if ( ! $rpc ) {
219 0           $self->{need_more_rbuf} = 1;
220 0           goto NEXTOP;
221             }
222              
223 0           $self->{need_more_rbuf} = $self->{rbuf} eq '';
224              
225 0           my ($type,@args) = @$rpc;
226             #debug(Dumper($rpc)); use Data::Dumper;
227 0           debug("processing $type");
228 0 0         my $act = $actions->{$type+0} or do {
229 0           $self->close( "no handler for return type $type" );
230 0           return;
231             };
232              
233 0 0         if ( ref($act) eq 'CODE' ) {
    0          
234 0           $act->(@args)
235             } elsif ( ref($act) eq 'ARRAY' ) {
236 0 0         if (! @$act) {
237 0           @$act = @args;
238             } else {
239             # assume code+args
240 0           my ($code,@m) = @$act;
241 0           $code->(@m,@args);
242             }
243             } else {
244             # assume object
245 0           $act->$type(@args)
246             }
247            
248 0 0 0       --$max if $max && $max>0;
249 0 0         return 1 if ! $max; # one-shot or done
250 0 0         $incb = 0 if $max>0; # redo onread callback with changed $max
251 0           goto NEXTOP;
252             }
253              
254              
255             1;