File Coverage

blib/lib/POE/Component/IKC/Channel.pm
Criterion Covered Total %
statement 10 12 83.3
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 14 16 87.5


line stmt bran cond sub pod time code
1             package POE::Component::IKC::Channel;
2              
3             ############################################################
4             # $Id: Channel.pm 1247 2014-07-07 09:06:34Z fil $
5             # Based on tests/refserver.perl
6             # Contributed by Artur Bergman
7             # Revised for 0.06 by Rocco Caputo
8             # Turned into a module by Philp Gwyn
9             #
10             # Copyright 1999-2014 Philip Gwyn. All rights reserved.
11             # This program is free software; you can redistribute it and/or modify
12             # it under the same terms as Perl itself.
13             #
14             # Contributed portions of IKC may be copyright by their respective
15             # contributors.
16              
17 1     1   2124 use strict;
  1         2  
  1         41  
18 1     1   4 use Socket;
  1         2  
  1         819  
19 1     1   6 use vars qw($VERSION @ISA @EXPORT @EXPORT_OK);
  1         2  
  1         90  
20 0           use POE qw(Wheel::ListenAccept Wheel::ReadWrite Wheel::SocketFactory
21             Driver::SysRW Filter::Reference Filter::Line
22 1     1   514 );
  0            
23             use POE::Component::IKC::Responder;
24             use POE::Component::IKC::Protocol;
25             use POE::Component::IKC::Util;
26             use Data::Dump qw( pp );
27             use Devel::Size qw( total_size );
28              
29             # use Net::Gen ();
30              
31             use Time::HiRes qw( gettimeofday tv_interval );
32              
33             require Exporter;
34             @ISA = qw(Exporter);
35             @EXPORT = qw(create_ikc_channel);
36             $VERSION = "0.2402";
37              
38             sub DEBUG () { 0 }
39              
40             BEGIN {
41             no strict 'refs';
42             unless( defined &TIMING ) {
43             if( $ENV{IKC_TIMING} ) { *TIMING = sub () { 1 } }
44             else { *TIMING = sub () { 0 } }
45             }
46             unless( defined &PROTOCOL ) {
47             if( $ENV{IKC_PROTOCOL} ) { *PROTOCOL = sub () { 1 } }
48             else { *PROTOCOL = sub () { 0 } }
49             }
50             }
51              
52             ###############################################################################
53             # Channel instances are created by the listening session to handle
54             # connections. They receive one or more thawed references, and pass
55             # them to the running Responder session for processing.
56              
57             #----------------------------------------------------
58             # This is just a convenient way to create channels.
59              
60             sub create_ikc_channel
61             {
62             my %p;
63             @p{qw(handle name on_connect subscribe rname unix aliases serializers protocol)}
64             = @_;
65             return __PACKAGE__->spawn(%p);
66             }
67              
68             sub spawn
69             {
70             my $package=shift;
71             my %params=@_;
72              
73             return POE::Session->create(
74             inline_states => {
75             _start => \&channel_start,
76             _stop => \&channel_stop,
77             _default => \&channel_default,
78             error => \&channel_error,
79             shutdown =>\&channel_close,
80              
81             receive => \&channel_receive,
82             'send' => \&channel_send,
83             'flushed' => \&channel_flushed,
84             'done' => \&channel_done,
85             'close' => \&channel_close,
86             server_000 => \&server_000,
87             server_001 => \&negociate_001,
88             server_002 => \&server_002,
89             server_003 => \&server_003,
90             server_010 => \&server_010,
91             client_000 => \&client_000,
92             client_001 => \&negociate_001,
93             client_002 => \&client_002,
94             client_003 => \&client_003,
95             client_010 => \&client_010,
96             'sig_INT' => \&sig_INT
97             },
98             args => [\%params]
99             )->ID;
100             }
101              
102             #----------------------------------------------------
103             # Accept POE's standard _start event, and begin processing data.
104             sub channel_start
105             {
106             my ($kernel, $heap, $session, $p) = @_[KERNEL, HEAP, SESSION, ARG0];
107              
108             if( TIMING ) {
109             $heap->{start_time} = [ gettimeofday ];
110             delete $heap->{last_time};
111             }
112              
113             my @names;
114             push @names, $p->{name} if $p->{name};
115             push @names, @{$p->{aliases}} if $p->{aliases};
116             # $name is blank if create_ikc_{server,client} wasn't called with a name
117             # OR if we are a kernel that was connected to (2001/05 huh?)
118              
119             # +GC
120             my $alias = 0+$session;
121             $alias = "Channel $alias";
122             $kernel->alias_set($alias);
123             $heap->{session_alias} = $alias;
124              
125             # all clients have $on_connect defined, even if sub {}
126             $heap->{is_server} = not $p->{client};
127             DEBUG and
128             warn "$$: We are a ".($heap->{is_server} ? 'server' : 'client')."\n";
129              
130             if($p->{unix}) {
131             $p->{unix}=~s/[^-:.\w]+/_/g;
132             push @names, "unix:$p->{unix}";
133              
134             unless($heap->{is_server}) {
135             $names[-1].=":$$-".fileno($p->{handle});
136             }
137             }
138             else {
139             my @name=unpack_sockaddr_in(getsockname($p->{handle}));
140             $name[1]=inet_ntoa($name[1]);
141             push @names, join ':', @name[1,0];
142             }
143              
144             DEBUG and warn "$$: Names: ", join ',', @names;
145             $heap->{kernel_name}=shift @names;
146             $heap->{kernel_aliases}=\@names;
147            
148              
149             # remote_kernel is only needed for DEBUG messages only
150             # remote_aliases, however, is important
151             # remote_ID is set when negociations are finished
152             # it should be cannonical name according to remote side
153             # temp_remote_kernel is a local sanity alias. (ie, if we connect to
154             # something:port, it should have that name as an alias)
155             if($p->{rname}) { # we are a server
156             $heap->{remote_kernel}=$p->{rname};
157             $heap->{temp_remote_kernel}=$p->{rname};
158             }
159             elsif($p->{unix}) { # we are a client
160             my $n=$p->{unix};
161             $n=~tr(/\\)(--);
162             $heap->{remote_kernel}="unix:$n:$$:".
163             fileno($p->{handle});
164             $heap->{temp_remote_kernel}="unix:n" unless $heap->{is_server};
165              
166              
167             # we need to have unique aliases for remote kernels
168             # so, only the server gets a default name, clients don't
169             }
170             else {
171             my @name=unpack_sockaddr_in(getpeername($p->{handle}));
172             $name[1]=inet_ntoa($name[1]);
173             $heap->{temp_remote_kernel}=
174             $heap->{remote_kernel}=join ':', @name[1,0];
175             }
176              
177             DEBUG && warn "Channel session $heap->{kernel_name}<->$heap->{remote_kernel} created.\n";
178              
179             # start reading and writing
180             $heap->{wheel_client} = new POE::Wheel::ReadWrite
181             ( Handle => $p->{handle}, # on this handle
182             Driver => new POE::Driver::SysRW, # using sysread and syswrite
183             InputEvent => 'none',
184             Filter => POE::Filter::Line->new(), # use a line filter for negociations
185             ErrorEvent => 'error', # generate this event on error
186             );
187              
188             $session->option(default=>1);
189             $heap->{on_connect}=$p->{on_connect} if ref($p->{on_connect});
190             $heap->{on_error}=$p->{on_error} if ref($p->{on_error});
191             $heap->{subscribe}=$p->{subscribe}
192             if ref($p->{subscribe}) and @{$p->{subscribe}};
193              
194             unless($heap->{is_server}) {
195             if(ref($p->{serializers}) and @{$p->{serializers}}) {
196             $heap->{serializers}=$p->{serializers};
197             }
198             DEBUG and
199             warn __PACKAGE__, " Serializers: ",
200             join(', ', @{$heap->{serializers}||[]}), "\n";
201             }
202            
203             # Setup negociation
204             $p->{protocol} ||= 'IKC';
205             if( $p->{protocol} eq 'IKC0' ) {
206             PROTOCOL and warn "$$: Using protocol IKC0\n";
207             _set_phase($kernel, $heap, '010');
208             }
209             else {
210             PROTOCOL and warn "$$: Using protocol IKC\n";
211             _set_phase($kernel, $heap, '000');
212             }
213              
214             # This shouldn't be necessary
215             POE::Component::IKC::Responder->spawn();
216              
217             # Register this channel
218             my $ikc = eval { $kernel->alias_resolve( 'IKC' ) };
219             if( $ikc ) {
220             $kernel->call( $ikc, 'register_channel' );
221             }
222             else {
223             POE::Component::IKC::Util::monitor_error( $heap, 'setup', 2, "No IKC responder" );
224             $kernel->yield( 'shutdown' );
225             }
226             return "channel-$session";
227             }
228              
229             #----------------------------------------------------
230             sub _negociation_done
231             {
232             my($kernel, $heap)=@_;
233             DEBUG and
234             warn "$$: Negociation done ($heap->{kernel_name}<->$heap->{remote_kernel}).\n";
235              
236             $heap->{finishing} = 1;
237              
238             _pause_wheel( $heap );
239             _register_remote( $kernel, $heap );
240              
241             # now that we've registered the remote kernel, we will no longer trigger on_error
242             delete $heap->{on_error};
243              
244             TIMING and channel_log( $heap, "negociated" );
245            
246             T->point( 'IKC', 'nego done' );
247              
248             # Now that we're set up properly
249             if($heap->{subscribe}) { # subscribe to wanted sessions
250             $kernel->call('IKC', 'subscribe', $heap->{subscribe}, 'done');
251             }
252             else {
253             # "fake" a completed subscription
254             $kernel->yield('done');
255             }
256              
257             delete $heap->{finishing};
258              
259             _change_wheel( $heap );
260             _resume_wheel( $heap );
261              
262             _monitor_channel( $heap, 'ready' );
263              
264             return;
265             }
266              
267             sub _register_remote
268             {
269             my( $kernel, $heap ) = @_;
270              
271             # Register the foreign kernel with the responder
272             my $aliases=delete $heap->{remote_aliases};
273             push @$aliases, $heap->{temp_remote_kernel}
274             if $heap->{temp_remote_kernel} and
275             not grep {$_ eq $heap->{temp_remote_kernel}} @$aliases;
276              
277             DEBUG and
278             warn "$$: Register remote as ", join ', ', @$aliases;
279             # we need a globaly unique ID
280             $heap->{remote_ID}=shift @$aliases;
281             # delete $heap->{remote_kernel};
282             $kernel->call('IKC', 'register', $heap->{remote_ID}, $aliases, $heap->{remote_pid});
283              
284             DEBUG and
285             warn "$$: Registered remotes";
286             }
287              
288              
289             sub _change_wheel
290             {
291             my( $heap ) = @_;
292              
293             DEBUG and
294             warn "$$: Changing the wheel events\n";
295             # generate this event on input
296             $heap->{'wheel_client'}->event( InputEvent => 'receive',
297             FlushedEvent => 'flushed'
298             );
299              
300             unless($heap->{filter}) {
301             DEBUG and warn "$$: We didn't negociate a freezer, using defaults\n";
302             $heap->{filter}=POE::Filter::Reference->new();
303             }
304              
305             DEBUG and
306             warn "$$: Changing the wheel filter\n";
307              
308             # parsing I/O as references
309             my $ft = $heap->{filter};
310             DEBUG and warn "$$: Filter is now $ft";
311             $heap->{wheel_client}->set_filter($ft);
312             delete $heap->{filter};
313              
314             DEBUG and
315             warn "$$: Changed the wheel filter\n";
316             }
317              
318              
319             sub _pause_wheel
320             {
321             my( $heap ) = @_;
322             DEBUG and
323             warn "$$: Pause wheel\n";
324             $heap->{'wheel_client'}->pause_input;
325             }
326              
327             sub _resume_wheel
328             {
329             my( $heap ) = @_;
330             DEBUG and
331             warn "$$: Resume wheel\n";
332             $heap->{'wheel_client'}->resume_input;
333             }
334              
335             sub _monitor_channel
336             {
337             my( $heap, $op ) = @_;
338             $poe_kernel->call( IKC => 'inform_monitors',
339             $heap->{remote_ID},
340             'channel', $op, $poe_kernel->get_active_session->ID
341             );
342              
343              
344             }
345              
346              
347             #----------------------------------------------------
348             # This is the subscription callback
349             sub channel_done
350             {
351             my($heap, $subscribed)=@_[HEAP, ARG0];
352             if($heap->{subscribe})
353             {
354             my %count;
355             foreach my $spec (@$subscribed, @{$heap->{subscribe}})
356             { $count{$spec}++;
357             }
358             my @missing=grep { $count{$_} != 2 } keys %count;
359              
360             if(@missing)
361             {
362             die "Unable to subscribe to ".join(', ', @missing)."\n";
363             }
364             delete $heap->{subscribe};
365             DEBUG and warn "$$: Subscriptions are completed\n";
366             }
367              
368             if($heap->{on_connect}) # or call the on_connect
369             {
370             DEBUG and warn "$$: On connect\n";
371             $heap->{on_connect}->();
372             delete $heap->{on_connect};
373             }
374              
375             # Detach from parent session
376             unless( $heap->{is_server} ) {
377             # Only if we are a client. Server uses 'lose' to detect disconnects
378             # for concurrency.
379             $_[KERNEL]->detach_myself;
380             }
381              
382             TIMING and channel_log( $heap, "subscribed" );
383              
384              
385             # wait until everything is sane before registering this
386             # $kernel->signal(INT=>'sig_INT'); # sig_INT() is in fact empty
387             }
388              
389             #----------------------------------------------------
390             #### DEAL WITH NEGOCIATION PHASE
391             sub _set_phase
392             {
393             my($kernel, $heap, $phase, $line)=@_;
394             if($phase eq 'ZZZ')
395             {
396             _negociation_done($kernel, $heap);
397             return;
398             }
399              
400             my $neg = $heap->{is_server} ? 'server_' : 'client_';
401              
402             # generate this event on input
403             $heap->{'wheel_client'}->event(InputEvent => $neg.$phase);
404             DEBUG && warn "Negociation phase $neg$phase.\n";
405             $kernel->yield($neg.$phase, $line); # Start the negociation phase
406             return;
407             }
408              
409             # First server state is
410             sub server_000
411             {
412             my ($heap, $kernel, $line)=@_[HEAP, KERNEL, ARG0];
413              
414             unless(defined $line) {
415             # wait for client to send HELLO
416             }
417             elsif( $line =~ /^HELLO IKC\d$/ ) { # compatible with IKC1
418             $heap->{'wheel_client'}->put( 'NOT' );
419             }
420             elsif( $line =~ /^SETUP/ ) { # compatible with IKC0
421             $heap->{'wheel_client'}->put( 'NOT' );
422             }
423             elsif( $line eq 'HELLO' ) {
424             $heap->{'wheel_client'}->put('IAM '.$kernel->ID());
425              
426             # put other server aliases here
427             $heap->{aliases001}=[$heap->{kernel_name},
428             @{$heap->{kernel_aliases}}];
429             DEBUG and warn "$$: Server we are going to tell remote that aliases001=", join ',', @{$heap->{aliases001}};
430             _set_phase($kernel, $heap, '001');
431              
432             }
433             else {
434             # wait for client to say something coherrent :)
435             warn "Client sent '$line' during phase 000\n";
436             }
437             return;
438             }
439              
440             # We tell who we are
441             sub negociate_001
442             {
443             my ($heap, $kernel, $line)=@_[HEAP, KERNEL, ARG0];
444            
445             unless(defined $line) {
446             # far side must talk now (we sent "IAM kernel")
447             }
448             elsif($line eq 'OK') {
449             my $a=pop @{$heap->{aliases001}};
450             if($a) {
451             $heap->{'wheel_client'}->put("IAM $a");
452             }
453             else {
454             delete $heap->{aliases001};
455             $heap->{'wheel_client'}->put('DONE');
456             _set_phase($kernel, $heap, '002');
457             }
458             }
459             else {
460             warn "Received '$line' during phase 001\n";
461             # prod far side into saying something coherrent
462             $heap->{wheel_client}->put('NOT') unless $line eq 'NOT';
463             }
464             return;
465             }
466              
467             # We find out who the client is
468             sub server_002
469             {
470             my ($heap, $kernel, $line)=@_[HEAP, KERNEL, ARG0];
471              
472             unless(defined $line) {
473             # far side must respond to the "DONE"
474             }
475             elsif($line eq 'DONE') {
476             _set_phase($kernel, $heap, '003');
477             }
478             elsif($line =~ /^IAM\s+([-:.\w]+)$/) {
479             # Register this kernel alias with the responder
480             push @{$heap->{remote_aliases}}, $1;
481             $heap->{'wheel_client'}->put('OK');
482              
483             }
484             else {
485             warn "Client sent '$line' during phase 002\n";
486             # prod far side into saying something coherrent
487             $heap->{wheel_client}->put('NOT') unless $line eq 'NOT';
488             }
489             return;
490             }
491              
492             # We find out what type of serialisation the client wants
493             sub server_003
494             {
495             my ($heap, $kernel, $line)=@_[HEAP, KERNEL, ARG0];
496              
497             unless(defined $line) {
498             # wait for client to send FREEZER after last IAM
499             }
500             elsif($line =~ /^FREEZER\s+([-:\w]+)$/) {
501             my $package=$1;
502             eval {
503             DEBUG and warn "Going to use $package as serializer\n";
504             $heap->{filter}=POE::Filter::Reference->new($package);
505             };
506              
507             if($heap->{filter}) {
508             DEBUG &&
509             warn "$$: Using $package\n";
510             $heap->{wheel_client}->put('OK');
511             } else {
512             DEBUG && warn "Client wanted $package, but we can't : $@";
513             $heap->{wheel_client}->put('NOT');
514             }
515             }
516             elsif($line =~ /^FREEZER\s+(.+)$/) {
517             warn "Client sent invalid package $1 as a serializer, refused\n";
518             $heap->{wheel_client}->put('NOT');
519             }
520             elsif($line eq 'WORLD') {
521             # last bit of the dialog has to come from us :(
522             $heap->{wheel_client}->put('UP');
523             _set_phase($kernel, $heap, 'ZZZ');
524             }
525             else {
526             warn "Client sent '$line' during phase 003\n";
527             $heap->{wheel_client}->put('NOT') unless $line eq 'NOT';
528             }
529             return;
530             }
531              
532             #----------------------------------------------------
533             # These states is invoked for each line during the negociation phase on
534             # the client's side
535              
536             ## Start negociation and listen to who the server is
537             sub client_000
538             {
539             my ($heap, $kernel, $line)=@_[HEAP, KERNEL, ARG0];
540              
541             unless(defined $line) {
542             $heap->{wheel_client}->put('HELLO');
543              
544             }
545             elsif($line =~ /^IAM\s+([-:.\w]+)$/) {
546             # Register this kernel alias with the responder
547             DEBUG and warn "$$: Remote server is called $1\n";
548             push @{$heap->{remote_aliases}}, $1;
549             $heap->{wheel_client}->put('OK');
550              
551             }
552             elsif($line eq 'DONE') {
553             $heap->{'wheel_client'}->put('IAM '.$poe_kernel->ID());
554             $heap->{aliases001}=[$heap->{kernel_name},
555             @{$heap->{kernel_aliases}}];
556             _set_phase($kernel, $heap, '001');
557              
558             }
559             else {
560             warn "$$: Server sent '$line' during negociation phase 000\n";
561             # prod far side into saying something coherrent
562             $heap->{wheel_client}->put('NOT') unless $line eq 'NOT';
563             }
564             return;
565             }
566              
567             # try to negociate a serialization method
568             sub client_002
569             {
570             my ($heap, $kernel, $line)=@_[HEAP, KERNEL, ARG0];
571            
572             unless(defined $line) {
573             $heap->{serial002}=$heap->{serializers};
574             $line=$heap->{serial002} ? 'NOT' : 'OK';
575             # NOT= pretend that we already sent a FREEZER
576             # OK= use default freezers
577             }
578              
579             if($line eq 'NOT') {
580             delete $heap->{filter};
581             my $ft;
582             while(@{$heap->{serial002}}) {
583             $ft=shift @{$heap->{serial002}};
584             DEBUG and
585             warn "$$: Trying serializer $ft\n";
586             $heap->{filter}=eval {
587             POE::Filter::Reference->new($ft);
588             };
589             last if $heap->{filter};
590             DEBUG and warn $@;
591             }
592              
593             if($ft) {
594             $heap->{'wheel_client'}->put('FREEZER '.$ft);
595             }
596             else {
597             DEBUG and
598             warn "Server doesn't like our list of serializers ",
599             join ', ', @{$heap->{serializers}};
600             delete $heap->{serial002};
601             _set_phase($kernel, $heap, '003');
602             }
603             }
604             elsif($line eq 'OK') {
605             delete $heap->{serial002};
606             _set_phase($kernel, $heap, '003');
607             }
608             else {
609             warn "Server sent '$line' during negociation phase 002\n";
610             # prod far side into saying something coherrent
611             $heap->{wheel_client}->put('NOT') unless $line eq 'NOT';
612             }
613             }
614              
615             # Game over
616             sub client_003
617             {
618             my ($heap, $kernel, $line)=@_[HEAP, KERNEL, ARG0];
619              
620             unless(defined $line) {
621             $heap->{'wheel_client'}->put('WORLD');
622             }
623             elsif($line eq 'UP') {
624             _set_phase($kernel, $heap, 'ZZZ');
625             }
626             else {
627             warn "Server sent '$line' during phase 003\n";
628             # prod far side into saying something coherrent
629             $heap->{wheel_client}->put('NOT') unless $line eq 'NOT';
630             }
631             return;
632             }
633              
634              
635             ##############################################################################
636             sub client_010
637             {
638             my ($heap, $kernel, $line)=@_[HEAP, KERNEL, ARG0];
639              
640             DEBUG and $line and warn "Client010: $line";
641              
642             unless(defined $line) {
643             # TODO : make sure all serializers load
644             # T->point( 'IKC', 'first line' );
645             my $setup = __build_setup( $heap, $heap->{serializers} );
646             # T->point( 'IKC', 'build_setup' );
647             DEBUG and warn "Client010: sending $setup";
648             $heap->{wheel_client}->put( $setup );
649             }
650             elsif( $line eq 'NOT' ) {
651             PROTOCOL and warn "$$: Using protocol IKC (fallback)\n";
652             _set_phase( $kernel, $heap, '000' );
653             }
654             elsif($line =~ /^SETUP (.+)$/) {
655             # T->point( IKC => 'got SETUP' );
656             DEBUG and warn "$$: Remote server setup as $1\n";
657             my $neg = __neg_setup( $1 );
658             unless( 1==@{ $neg->{freezer} } ) {
659             warn "Server didn't send one freezer in $line\n";
660             $neg->{bad}++;
661             }
662              
663             if( $neg->{bad} ) {
664             $heap->{wheel_client}->put( 'NOT' );
665             return;
666             }
667             # Register these kernel alias with the responder
668             $heap->{remote_aliases} = $neg->{kernel};
669             $heap->{remote_pid} = $neg->{pid};
670             # Build the filter we shall use later
671             $heap->{filter} = eval { POE::Filter::Reference->new( $neg->{freezer}[0] ) };
672             die "Unable to build filter: $@" if $@;
673             die "Unable to build filter $neg->{freezer}[0]" unless $heap->{filter};
674             # T->point( IKC => 'got SETUP' );
675             _set_phase( $kernel, $heap, 'ZZZ' );
676             }
677             else {
678             warn "Server sent '$line' during negociation phase 002\n";
679             $heap->{wheel_client}->put('NOT');
680             }
681             }
682              
683             sub server_010
684             {
685             my ($heap, $kernel, $line)=@_[HEAP, KERNEL, ARG0];
686              
687             DEBUG and $line and warn "Server010: $line";
688              
689             unless(defined $line) {
690             # wait for client
691             }
692             elsif( $line =~ /^HELLO IKC\d$/ ) { # compatible with IKC1
693             $heap->{'wheel_client'}->put( 'NOT' );
694             }
695             elsif( $line eq 'HELLO' ) {
696             PROTOCOL and warn "$$: Using protocol IKC (fallback)\n";
697             _set_phase( $kernel, $heap, '000', $line );
698             return;
699             }
700             elsif( $line =~ /^SETUP (.+)$/ ) {
701             DEBUG and warn "$$: Remote client setup as $1\n";
702             my $neg = __neg_setup( $1 );
703              
704             my $filter;
705             if( not $neg->{bad} ) {
706             # Build the filter we shall use later
707             foreach my $ft ( @{ $neg->{freezer} } ) {
708             $filter = $ft;
709             $heap->{filter} = eval { POE::Filter::Reference->new( $ft ) };
710             last if $heap->{filter};
711             DEBUG and warn "Client wanted $ft, but we can't: $@";
712             }
713             }
714             unless( $heap->{filter} ) {
715             warn "None of the filters the client wants are OK: ",
716             join ', ', @{ $neg->{freezer} };
717             $neg->{bad}++;
718             }
719              
720             if( $neg->{bad} ) {
721             $heap->{wheel_client}->put( 'NOT' );
722             return;
723             }
724             # Register these kernel alias with the responder
725             $heap->{remote_aliases} = $neg->{kernel};
726             $heap->{remote_pid} = $neg->{pid};
727              
728             # Send our SETUP back
729             my @freezers = ( $filter );
730             my $setup = __build_setup( $heap, [$filter] );
731             DEBUG and warn "Server010: sending $setup";
732             $heap->{wheel_client}->put( $setup );
733              
734             # Move to next phase
735             _set_phase( $kernel, $heap, 'ZZZ' );
736             }
737             }
738              
739             sub __build_setup
740             {
741             my( $heap, $freezers ) = @_;
742             my $aliases = [ $poe_kernel->ID,
743             $heap->{kernel_name},
744             @{$heap->{kernel_aliases}}
745             ];
746             return POE::Component::IKC::Protocol::__build_setup( $aliases, $freezers );
747             }
748              
749             sub __neg_setup
750             {
751             return POE::Component::IKC::Protocol::__neg_setup( $_[0] );
752             }
753              
754              
755              
756             #----------------------------------------------------
757             # This state is invoked for each error encountered by the session's
758             # ReadWrite wheel.
759              
760             sub channel_error
761             {
762             my ($heap, $kernel, $operation, $errnum, $errstr) =
763             @_[HEAP, KERNEL, ARG0, ARG1, ARG2];
764              
765             POE::Component::IKC::Util::monitor_error( $heap,
766             $operation, $errnum, $errstr,
767             ( $operation eq 'read' && $errnum == 0 )
768             );
769              
770             if ($errnum) {
771             DEBUG &&
772             warn "$$: Channel encountered $operation error $errnum: $errstr\n";
773             }
774             else {
775             DEBUG &&
776             warn "$$: The channel's client closed its connection ($heap->{kernel_name}<->$heap->{remote_kernel})\n";
777             }
778              
779             # warn "ERROR $heap->{remote_ID}";
780             _close_channel($heap, 1); # either way, shut down
781             }
782              
783              
784             #----------------------------------------------------
785             sub _channel_unregister
786             {
787             my($heap)=@_;
788             if($heap->{remote_ID}) {
789             DEBUG and warn <
790             ------------------------------------------
791             UNREGISTER $$ $heap->{remote_ID}
792             ------------------------------------------
793             WARN
794             # 2005/06 Tell IKC we closed the connection
795             $poe_kernel->call( 'IKC', 'unregister', $heap->{remote_ID} );
796             delete $heap->{remote_ID};
797             }
798             # either way, shut down
799             }
800              
801             #----------------------------------------------------
802             sub _close_channel
803             {
804             my($heap, $force)=@_;
805              
806              
807             # we have to inform monitors before unregistering
808             # but we only want to inform once,
809             _monitor_channel( $heap, 'close' ) unless $heap->{inform_once}++;
810              
811             # tell responder right away that this channel isn't to be used
812             _channel_unregister($heap);
813              
814             return unless $heap->{wheel_client};
815              
816              
817             if(not $force and $heap->{wheel_client}->get_driver_out_octets) {
818             DEBUG and
819             warn "************ Defering wheel close";
820             $heap->{go_away}=1; # wait until next Flushed
821             return;
822             }
823              
824             DEBUG and
825             warn "Deleting wheel session = ", $poe_kernel->get_active_session->ID;
826             my $x=delete $heap->{wheel_client};
827             # WORK AROUND
828             # $x->DESTROY;
829              
830             # sig_INT is empty
831             # $kernel->sig( 'INT' );
832              
833             if( $heap->{session_alias} ) {
834             $poe_kernel->alias_remove( delete $heap->{session_alias} );
835             }
836              
837             if( TIMING ) {
838             channel_log( $heap, "close" );
839             delete $heap->{start_time};
840             delete $heap->{last_time};
841             }
842             T->point( 'IKC', 'close' );
843              
844             return;
845             }
846              
847              
848              
849              
850             #----------------------------------------------------
851             #
852             sub channel_default
853             {
854             my($event)=$_[STATE];
855             DEBUG && warn "Unknown event $event posted to IKC::Channel\n"
856             if $event !~ /^_/;
857             return;
858             }
859              
860             #----------------------------------------------------
861             # Process POE's standard _stop event by shutting down.
862             sub channel_stop
863             {
864             my $heap = $_[HEAP];
865             DEBUG &&
866             warn "$$: *** Channel will shut down.\n";
867             _close_channel($heap);
868             T->end( 'IKC' );
869              
870             return "channel-$_[SESSION]";
871             }
872              
873             ###########################################################################
874             ## Next two events forward messages between Wheel::ReadWrite and the
875             ## Responder
876             ## Because the Responder know which foreign kernel sent a request,
877             ## these events fill in some of the details.
878              
879             #----------------------------------------------------
880             # Foreign kernel sent us a request
881             sub channel_receive
882             {
883             my ($kernel, $heap, $request) = @_[KERNEL, HEAP, ARG0];
884              
885             warn "$$: Attempting to receive during finishing" if $heap->{finishing};
886              
887             T->point( 'IKC', 'receive' );
888              
889             TIMING and
890             channel_log( $heap, "receive" );
891              
892             DEBUG &&
893             warn "$$: Received data...\n";
894             return if $heap->{shutdown};
895              
896             # we won't trust the other end to set this properly
897             $request->{errors_to}={ kernel=>$heap->{remote_ID},
898             session=>'IKC',
899             state=>'remote_error',
900             };
901             # just in case
902             $request->{call}->{kernel}||=$heap->{kernel_name};
903              
904             # call the Responder channel to process
905             # hmmm.... i wonder if this could be stream-lined into a direct call
906             $kernel->call('IKC', 'request', $request);
907             return;
908             }
909              
910             #----------------------------------------------------
911             # Local kernel is sending a request to a foreign kernel
912             sub channel_send
913             {
914             my ($heap, $request)=@_[HEAP, ARG0];
915              
916             die "Attempting to send during finishing" if $heap->{finishing};
917              
918             TIMING and
919             channel_log( $heap, "send" );
920              
921             my $size = total_size $request;
922             if( $size > 100*1024*1024 ) {
923             die "$$ Channel sending WAY too much data ($size bytes)";
924             }
925             DEBUG &&
926             warn "$$: Sending data...\n";
927             # add our name so the foreign channel can find us
928             # TODO should we do this? or should the other end do this?
929             $request->{rsvp}->{kernel}||=$heap->{kernel_name}
930             if ref($request) and $request->{rsvp};
931              
932             if($heap->{'wheel_client'}) {
933             $heap->{'wheel_client'}->put($request);
934             }
935             else {
936             my $what={event => $request->{event},
937             from => $request->{from}};
938             $what->{action} = $request->{params}[0]
939             if $what->{event}{state} eq 'IKC:proxy' and
940             'ARRAY' eq ref $request->{params};
941             my $type = "missing";
942             $type = "shutdown" if $heap->{shutdown};
943             warn "$$: Attempting to put to a $type channel! ". pp $what;
944             }
945             T->point( 'IKC', 'send' );
946              
947             return 1;
948             }
949              
950             #----------------------------------------------------
951             sub channel_flushed
952             {
953             my($heap, $wheel)=@_[HEAP, ARG0];
954             DEBUG &&
955             warn "$$: Flushed data...\n";
956             if($heap->{go_away}) {
957             _close_channel($heap);
958             }
959             return;
960             }
961              
962             #----------------------------------------------------
963             # Local kernel thinks it's time to close down the channel
964             sub channel_close
965             {
966             my ($heap, $sender)=@_[HEAP, SENDER];
967             unless( $heap->{shutdown} ) {
968             DEBUG &&
969             warn "$$: channel_close *****************************************\n";
970             $heap->{shutdown}=1;
971             }
972             _close_channel( $heap );
973             }
974              
975             #----------------------------------------------------
976             # User wants to kill process / kernel
977             sub sig_INT
978             {
979             my ($heap, $kernel)=@_[HEAP, KERNEL];
980             DEBUG && warn "$$: Channel::sig_INT\n";
981             $kernel->sig_handled();
982             return;
983             }
984              
985             #----------------------------------------------------
986             sub channel_log
987             {
988             my( $heap, $when ) = @_;
989             return unless $heap->{start_time};
990             my $now = [ gettimeofday ];
991             my $el = tv_interval( $heap->{start_time}, $now );
992             my $time = _delta_time( $el );
993              
994             if( $heap->{last_time} ) {
995             $el = tv_interval( $heap->{last_time}, $now );
996             $time .= " +"._delta_time( $el );
997             }
998             $heap->{last_time} = $now;
999             print STDERR "$$: CHANNEL $time $when\n";
1000             }
1001              
1002             sub _delta_time
1003             {
1004             my( $el ) = @_;
1005              
1006             if( $el > 1 ) {
1007             return sprintf( "%.3fs", $el);
1008             }
1009             $el *= 1000; # microseconds -> milliseconds
1010             if( $el > 10 ) {
1011             return sprintf( "%ims", int $el);
1012             }
1013             return sprintf( "%.1gms", $el);
1014             }
1015              
1016             ###########################################################################
1017              
1018             1;
1019             __END__