File Coverage

blib/lib/Event/tcpsession.pm
Criterion Covered Total %
statement 13 15 86.6
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 18 20 90.0


line stmt bran cond sub pod time code
1 1     1   1349 use strict;
  1         3  
  1         53  
2             package Event::tcpsession;
3 1     1   5 use Carp;
  1         2  
  1         90  
4 1     1   6 use Symbol;
  1         1  
  1         71  
5 1     1   6 use Socket;
  1         2  
  1         1684  
6 1     1   427 use Ioctl qw(FIONBIO);
  0            
  0            
7             use Errno qw(EAGAIN);
8             use Event 0.61;
9             use Event::Watcher qw(R W T);
10             require Event::io;
11             use base 'Event::io';
12             use vars qw($VERSION);
13             $VERSION = '0.14';
14              
15             use constant DEBUG_SHOW_RPCS => 0;
16             use constant DEBUG_BYTES => 0;
17              
18             use constant PROTOCOL_VERSION => 2;
19             use constant RECONNECT_TM => 3;
20              
21             use constant HEADER_FORMAT => 'Nn';
22              
23             # special message IDs
24             use constant NOREPLY_ID => 0;
25             use constant APIMAP_ID => 1;
26             use constant RESERVED_IDS => 2;
27              
28             'Event::Watcher'->register;
29              
30             # API is an ordered array:
31             # { name => 'opname', code => sub {}, req => 'nn' }
32             # { name => 'opname', code => sub {}, req => 'nn', reply => 'nn' }
33              
34             sub new {
35             my ($class, %p) = @_;
36             my @passthru;
37             push @passthru, desc => $p{desc} if
38             exists $p{desc};
39             my $o = $class->SUPER::new(parked => 1, reentrant => 0, @passthru);
40             $o->{status_cb} = $p{cb} || sub {};
41             $o->{api} = $p{api} || [];
42             $o->{delayed} = [];
43             $o->{q} = []; # message queue
44             $o->{pend} = {}; # pending transactions
45             $o->{next_txn} = $$;
46             $o->set_peer(can_ignore => 1, %p);
47             $o;
48             }
49              
50             sub is_server_side { # make function call XXX
51             my ($o) = @_;
52             !exists $o->{iaddr}
53             }
54              
55             # Transaction IDs are for keeping track of roundtrip messaging.
56             # They are also used for special messages. Special messages
57             # only use low-order IDs. The special range from
58             # [0x8000, 0x8000 + RESERVEDIDS) is unused.
59             #
60             # use 1 bit to distinguish short/long messages? XXX
61             #
62             sub get_next_transaction_id {
63             my ($o) = @_;
64             $o->{next_txn} = ($o->{next_txn}+1) & 0x7fff;
65             $o->{next_txn} = RESERVED_IDS if $o->{next_txn} < RESERVED_IDS;
66             $o->{next_txn} | ($o->is_server_side ? 0x8000 : 0);
67             }
68              
69             #########################################################################
70              
71             sub fd {
72             if (@_ == 1) {
73             shift->SUPER::fd;
74             } else {
75             my ($o, $fd) = @_;
76             if (caller eq __PACKAGE__) {
77             if ($fd) {
78             ioctl $fd, FIONBIO, pack('l', 1)
79             or die "ioctl FIONBIO: $!";
80             #setsockopt($c->{e_fd}, IPPROTO_TCP, TCP_NODELAY, pack('l',1))
81             # or die "setsockopt: $!";
82             }
83             $o->SUPER::fd($fd)
84             } else {
85             if (!defined $fd) {
86             # This is a special case for regression testing.
87             # Who knows, maybe it is generally useful too.
88             close $o->fd;
89             $o->SUPER::fd(undef)
90             } else {
91             $o->set_peer(fd => $fd);
92             }
93             }
94             }
95             }
96              
97             sub cb {
98             if (caller eq __PACKAGE__) {
99             shift->SUPER::cb(@_);
100             } else {
101             my $o = shift;
102             if (@_ == 0) {
103             $o->{status_cb}
104             } else {
105             $o->{status_cb} = shift;
106             }
107             }
108             }
109              
110             #########################################################################
111              
112             sub set_peer {
113             my ($o,%p) = @_;
114              
115             croak "set_peer: '".$o->desc."' already connected"
116             if $o->{peer_set};
117              
118             if (exists $p{port}) {
119             #client side
120              
121             my $iaddr;
122             if (exists $p{host}) {
123             my $host = $p{host};
124             $iaddr = inet_aton($host) || die "Lookup of host '$host' failed";
125             } elsif (exists $p{iaddr}) {
126             $iaddr = $p{iaddr};
127             warn "Both iaddr & host given; host ignored" if exists $p{host};
128             } else {
129             $iaddr = inet_aton('localhost');
130             }
131             my $port = $p{port};
132            
133             $o->{iaddr} = $iaddr;
134             $o->{port} = $port;
135              
136             $o->{status_cb}->($o, 'not available')
137             if !$o->connect_to_server;
138              
139             } elsif (exists $p{fd}) {
140             #server side
141              
142             $o->fd($p{fd});
143             $o->reconnected;
144            
145             } else {
146             return
147             if $p{can_ignore};
148             croak("connect to what?");
149             }
150             $o->{peer_set} = 1;
151             }
152              
153             sub disconnect {
154             my ($o, $why) = @_;
155             if ($o->is_server_side) {
156             # recovery is always client's responsibility
157             $o->cancel;
158             return 1;
159             }
160             $o->{status_cb}->($o, 'disconnect', $why);
161             $o->connect_to_server;
162             }
163              
164             sub connect_to_server {
165             my ($o) = @_;
166             $o->fd(undef);
167             my $fd = gensym;
168             socket($fd, PF_INET, SOCK_STREAM, getprotobyname('tcp'))
169             or die "socket: $!";
170             if (!connect($fd, sockaddr_in($o->{port}, $o->{iaddr}))) {
171             $o->{status_cb}->($o, 'connect', $!);
172             $o->timeout(RECONNECT_TM);
173             $o->cb([$o,'connect_to_server']);
174             $o->start;
175             return
176             }
177             $o->fd($fd);
178             $o->{status_cb}->($o, 'connect');
179             $o->reconnected;
180             1
181             }
182              
183             sub reconnected {
184             my ($o) = @_;
185              
186             $o->timeout(undef);
187             delete $o->{pend};
188             delete $o->{peer_version};
189             delete $o->{peer_api};
190             delete $o->{peer_opname};
191              
192             $o->{ibuf} = '';
193             $o->{obuf} = pack 'n', PROTOCOL_VERSION;
194              
195             append_obuf($o, APIMAP_ID, join("\n", map {
196             my @z = ($_->{name}, $_->{req} || '');
197             push @z, $_->{reply} || '' if exists $_->{reply};
198             join($;, @z);
199             } @{$o->{api}}));
200              
201             # reload pending transactions
202             # (anything not requiring acknowledgement gets/got ignored)
203             while (my ($tx,$i) = each %{$o->{pend}}) {
204             # warn "pend $i->[0]{name}";
205             append_obuf($o, $tx, $i->[2]);
206             }
207              
208             $o->poll(R|W);
209             $o->cb([$o,'service']);
210             $o->start;
211             }
212              
213             #########################################################################
214              
215             sub append_obuf { # function call
216             my ($o, $tx, $m) = @_;
217             # length is inclusive
218             my $mlen = length $m;
219             $o->{obuf} .= pack(HEADER_FORMAT, 6+$mlen, $tx) . $m;
220              
221             $o->poll($o->poll | W);
222             }
223              
224             sub pack_args {
225             my $template = shift;
226             if ($template) {
227             pack $template, @_;
228             } elsif (@_ == 0) {
229             ''
230             } elsif (@_ == 1) {
231             $_[0]
232             } else {
233             undef
234             }
235             }
236              
237             sub unpack_args {
238             my ($template, $bytes) = @_;
239             if ($template) {
240             unpack $template, $bytes
241             } elsif (length $bytes) {
242             $bytes
243             } else {
244             ()
245             }
246             }
247              
248             sub service {
249             my ($o, $e) = @_;
250             my $w = $e->w;
251             return $o->disconnect("inactivity")
252             if $e->got & T;
253             return $o->disconnect("fd closed")
254             if !defined $w->fd;
255             if ($e->got & R) {
256             my $buf = $o->{ibuf};
257             while (1) {
258             my $ret = sysread $w->fd, $buf, 8192, length($buf);
259             next if $ret;
260             last if $!{EAGAIN};
261             return $o->disconnect("sysread ret=$ret, $!");
262             }
263             #warn "$$:R:".unpack('h*', $buf).":";
264             # decode $buf
265             if (!exists $o->{peer_version} and length $buf >= 2) {
266             # check PROTOCOL_VERSION ...
267             $o->{peer_version} = unpack 'n', substr($buf, 0, 2);
268             warn "$$:peer_version=$o->{peer_version}"
269             if DEBUG_SHOW_RPCS;
270             $buf = substr $buf, 2;
271             $o->disconnect("peer version mismatch $o->{peer_version} != ".
272             PROTOCOL_VERSION)
273             if $o->{peer_version} != PROTOCOL_VERSION;
274             }
275             while (length $buf >= 6) {
276             my ($len, $tx) = unpack HEADER_FORMAT, $buf;
277             last if length $buf < $len; # got a complete message?
278             my $m = substr $buf, 6, $len-6;
279              
280             $buf = substr $buf, $len; # snip
281              
282             if ($tx == NOREPLY_ID) {
283             my $opid = unpack 'n', $m;
284             $m = substr $m, 2;
285             my $api = $o->{api}[$opid];
286             if (!$api) {
287             warn "API $opid not found (ignored)";
288             next
289             }
290             # EVAL
291             my @args = unpack_args($api->{req}, $m);
292             warn "$$:Run($opid)(".join(', ', @args).")"
293             if DEBUG_SHOW_RPCS;
294             $api->{code}->($o, @args);
295              
296             } elsif ($tx < RESERVED_IDS) {
297             if ($tx == APIMAP_ID) {
298             my @api;
299             for my $packedspec (split /\n/, $m) {
300             my @spec = split /$;/, $packedspec, -1;
301             if (@spec == 2 or @spec == 3) {
302             my @p=( name => $spec[0], req => $spec[1]);
303             push @p, reply => $spec[2]
304             if @spec == 3;
305             push @api, { @p };
306             } else {
307             warn "got strange API spec: ".join(', ',@spec);
308             }
309             }
310             warn "$$: ".(0+@api)." APIs"
311             if DEBUG_SHOW_RPCS;
312             $o->{peer_api} = \@api;
313             my %peer_opname;
314             for (my $x=0; $x < @api; $x++) {
315             $peer_opname{$api[$x]{name}} = $x;
316             }
317             $o->{peer_opname} = \%peer_opname;
318             for my $rpc (@{$o->{delayed}}) {
319             $o->rpc(@$rpc);
320             }
321             $o->{delayed} = [];
322             } else {
323             die "Unknown TX $tx?";
324             }
325             } else {
326             if ($tx >= 0x8000 xor $o->is_server_side) {
327             my $opid = unpack 'n', $m;
328             $m = substr $m, 2;
329             my $api = $o->{api}[$opid];
330             if (!$api) {
331             warn "API $opid not found (ignored)";
332             next
333             }
334             # EVAL
335             my @args = unpack_args($api->{req}, $m);
336             warn "$$:Run($opid)(".join(", ", @args).") returning..."
337             if DEBUG_SHOW_RPCS;
338             my @ret = $api->{code}->($o, @args);
339             # what if exception? XXX
340             warn "$$:Return($opid)(".join(", ", @ret).")"
341             if DEBUG_SHOW_RPCS;
342             my $packed_ret = pack_args($api->{reply}, @ret);
343             warn("'$api->{name}' returned (".join(', ',@ret).
344             " yet doesn't have a reply pack template")
345             if !defined $packed_ret;
346             append_obuf($o, $tx, pack('n',$opid).$packed_ret);
347            
348             } else {
349             my $pend = $o->{pend}{$tx};
350             if (!$pend) {
351             warn "Got unexpected reply for TXN $tx (ignored)";
352             next;
353             }
354             my ($api,$cb) = @$pend;
355             my $opid = unpack 'n', $m; # can double check opid XXX
356             # EVAL
357             my @args= unpack_args($api->{reply}, substr($m, 2));
358             warn "$$:RunReply($opid)(".join(", ", @args).")"
359             if DEBUG_SHOW_RPCS;
360             $cb->($o, @args);
361             }
362             }
363             }
364             $o->{ibuf} = $buf;
365             }
366             if (length $o->{obuf}) {
367             my $buf = $o->{obuf};
368             my $sent = syswrite($w->fd, $buf, length($buf), 0);
369             if ($!{EAGAIN}) {
370             $sent ||= 0;
371             } elsif (!defined $sent) {
372             return $o->disconnect("syswrite: $!")
373             }
374             if ($sent) {
375             warn "$$:W:".unpack('h*', substr($buf, 0, $sent)).":"
376             if DEBUG_BYTES;
377             $buf = substr $buf, $sent;
378             $o->{obuf} = $buf;
379             }
380             }
381             if (length $o->{obuf}) {
382             $o->poll($o->poll | W);
383             } else {
384             $o->poll($o->poll & ~W);
385             if (keys %{$o->{pend}}) {
386             # close connection if a timeout is exceeded
387             }
388             }
389             }
390              
391             sub rpc {
392             my $o = shift;
393             if (!defined $o->fd or !exists $o->{peer_opname}) {
394             my @copy = @_;
395             #my $fileno = $o->fd? fileno($o->fd) : 'undef';
396             #warn "$$: delay $copy[0] ($fileno, $o->{peer_opname})";
397             push @{$o->{delayed}}, \@copy;
398             return;
399             }
400             my $opname = shift;
401             confess "No opname?"
402             if !$opname;
403             my $id = $o->{peer_opname}{$opname};
404             croak "'$opname' not found on peer (".
405             join(' ', sort keys %{$o->{peer_opname}}).")"
406             if !defined $id;
407              
408             my $api = $o->{peer_api}[$id];
409              
410             # prepare for reply (if any)
411             my $tx;
412             my $save;
413             if (!exists $api->{reply}) {
414             $tx = NOREPLY_ID;
415             } else {
416             $tx = $o->get_next_transaction_id;
417             die "too many pending transactions"
418             if exists $o->{pend}{$tx};
419             $save = $o->{pend}{$tx} = [$api, shift];
420             }
421              
422             warn "$$:Call($id)(".join(", ", @_).")"
423             if DEBUG_SHOW_RPCS;
424             my $packed_args = pack_args($api->{req}, @_);
425             croak("Attempt to invoke '$opname' with (".join(', ', @_).
426             ") without pack template")
427             if !defined $packed_args;
428              
429             my $packed_msg = pack('n', $id).$packed_args;
430             $save->[2] = $packed_msg
431             if $save;
432             append_obuf($o, $tx, $packed_msg);
433             }
434              
435             1;
436             __END__