File Coverage

blib/lib/POE/Component/IKC/ClientLite.pm
Criterion Covered Total %
statement 32 343 9.3
branch 1 176 0.5
condition 0 71 0.0
subroutine 12 35 34.2
pod 11 12 91.6
total 56 637 8.7


line stmt bran cond sub pod time code
1             package POE::Component::IKC::ClientLite;
2              
3             ############################################################
4             # $Id: ClientLite.pm 1247 2014-07-07 09:06:34Z fil $
5             # By Philp Gwyn
6             #
7             # Copyright 1999-2014 Philip Gwyn. All rights reserved.
8             # This program is free software; you can redistribute it and/or modify
9             # it under the same terms as Perl itself.
10             #
11             # Contributed portions of IKC may be copyright by their respective
12             # contributors.
13              
14 2     2   28452 use strict;
  2         3  
  2         92  
15 2     2   11 use vars qw($VERSION @ISA @EXPORT @EXPORT_OK $error $request);
  2         4  
  2         199  
16              
17 2     2   2033 use Socket;
  2         9906  
  2         1361  
18 2     2   2131 use IO::Socket;
  2         40307  
  2         8  
19 2     2   5780 use IO::Select;
  2         3802  
  2         104  
20 2     2   629 use POE::Component::IKC::Specifier;
  2         7  
  2         137  
21 2     2   1489 use POE::Component::IKC::Protocol;
  2         6  
  2         68  
22 2     2   2893 use Data::Dumper;
  2         21043  
  2         186  
23 2     2   2623 use POSIX qw(:errno_h);
  2         13704  
  2         13  
24 2     2   8993 use Carp;
  2         4  
  2         8572  
25              
26             require Exporter;
27             @ISA = qw(Exporter);
28             @EXPORT = qw(create_ikc_client);
29             $VERSION = '0.2402';
30              
31 2     2 0 20 sub DEBUG { 0 }
32              
33             $request=0;
34              
35             ###############################################################################
36             sub spawn
37             {
38 0     0 1   my( $package, %parms ) = @_;
39              
40             # $parms{on_connect}||=sub{}; # would be silly for this to be blank
41 0   0       $parms{ip}||='localhost';
42 0   0       $parms{port}||=603; # POE! (almost :)
43 0   0       $parms{name}||="Client$$";
44 0   0       $parms{connect_timeout} ||= $parms{timeout} || 30;
      0        
45 0   0       $parms{timeout}||=30;
46 0   0       $parms{serialiser}||=_default_freezer();
47 0   0       $parms{block_size} ||= 65535;
48 0   0       $parms{protocol} ||= 'IKC0';
49              
50 0           my %self;
51 0           @self{qw(ip port name serialiser timeout connect_timeout block_size protocol)}=
52             @parms{qw(ip port name serialiser timeout connect_timeout block_size protocol)};
53              
54 0           eval {
55 0           @{$self{remote}}{qw(freeze thaw)}=_get_freezer($self{serialiser});
  0            
56             };
57              
58 0 0         if($@) {
59 0           $self{error}=$error=$@;
60 0           return;
61             }
62 0           my $self=bless \%self, $package;
63 0           $self->{remote}{aliases}={};
64 0           $self->{remote}{name}="$self->{ip}:$self->{port}";
65              
66 0 0         $self->connect and return $self;
67 0           return;
68             }
69              
70             sub create_ikc_client
71             {
72 0     0 1   my(%parms)=@_;
73 0   0       my $package = $parms{package} || __PACKAGE__;
74 0           carp "create_ikc_client is deprecated; use $package->spawn instead";
75 0           $package->spawn( %parms );
76             }
77              
78 0     0 1   sub name { $_[0]->{name}; }
79              
80             #----------------------------------------------------
81             sub connect
82             {
83 0     0 1   my($self)=@_;
84 0 0 0       return 1 if($self->{remote}{connected} and $self->{remote}{socket} and
      0        
85             $self->ping); # are we already connected?
86              
87 0           my $remote=$self->{remote};
88 0           delete $remote->{socket};
89 0           delete $remote->{connected};
90              
91 0           my $name=$remote->{name};
92 0 0         DEBUG && print "Connecting to $name...\n";
93 0           my( $sock, $resp );
94              
95 0           my $DONE = 0;
96 0           eval {
97 0           local $SIG{__DIE__}='DEFAULT';
98 0           local $SIG{__WARN__};
99 0     0     local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
  0            
100 0           $sock=IO::Socket::INET->new( PeerAddr=>$self->{ip},
101             PeerPort=>$self->{port},
102             # Proto=>'tcp',
103             Timeout=>$self->{connect_timeout}
104             );
105 0 0         die "Unable to connect to $name: $!\n" unless $sock;
106 0           $sock->autoflush(1);
107 0           local $/="\cM\cJ";
108 0           local $\="\cM\cJ";
109              
110             # Attempt IKC0 protocol
111 0 0         if( $self->{protocol} eq 'IKC0' ) {
112 0 0         if( $self->_protocol_IKC0( $sock ) ) {
113 0           $DONE = 1;
114 0           return;
115             }
116             }
117              
118             # Fallback to IKC protocol
119 0           $sock->print('HELLO');
120 0           my $resp;
121              
122 0           alarm( $self->{connect_timeout} );
123 0           while (defined($resp=$sock->getline)) # phase 000
124             {
125 0           chomp($resp);
126 0 0         last if $resp eq 'DONE';
127 0 0         die "Invalid IAM response from $name: $resp\n"
128             unless $resp=~/^IAM\s+([-:.\w]+)$/;
129 0   0       $remote->{name}||=$1;
130 0   0       $self->{ping}||="poe://$1/IKC/ping";
131 0           $remote->{aliases}->{$1}=1;
132 0           $sock->print('OK');
133             }
134 0 0         die "Phase 000: $!\n" unless defined $resp;
135              
136              
137 0           alarm( $self->{connect_timeout} );
138 0           $sock->print("IAM $self->{name}"); # phase 001
139 0           chomp($resp=$sock->getline);
140 0 0         die "Phase 001: $!\n" unless defined $resp;
141 0 0         die "Didn't get OK from $name\n" unless $resp eq 'OK';
142 0           $sock->print("DONE");
143              
144 0           alarm( $self->{connect_timeout} );
145 0           $sock->print("FREEZER $self->{serialiser}");# phase 002
146 0           chomp($resp=$sock->getline);
147 0 0         die "Phase 002: $!\n" unless defined $resp;
148 0 0         die "$name refused $self->{serialiser}\n" unless $resp eq 'OK';
149              
150 0           alarm( $self->{connect_timeout} );
151 0           $sock->print('WORLD'); # phase 003
152 0           chomp($resp=$sock->getline);
153 0 0         die "Phase 003: $!\n" unless defined $resp;
154 0 0         die "Didn't get UP from $name\n" unless $resp eq 'UP';
155 0           $DONE = 1;
156             };
157 0           alarm( 0 );
158 0 0         if($@)
159             {
160 0           $self->{error}=$error=$@;
161 0 0         if( $error eq "alarm\n" ) {
162 0           $self->{error}=$error="Timeout connecting to $self->{ip}:$self->{port}";
163             }
164 0           return;
165             }
166 0           $remote->{socket}=$sock;
167 0           $remote->{connected}=1;
168 0           return 1;
169             }
170              
171             #----------------------------------------------------
172             sub _protocol_IKC0
173             {
174 0     0     my( $self, $sock ) = @_;
175              
176 0           my $remote=$self->{remote};
177 0           my $name=$remote->{name};
178 0           my $resp;
179              
180 0           my $setup = POE::Component::IKC::Protocol::__build_setup(
181             [ $self->{name} ], [ $self->{serialiser} ] );
182 0           $sock->print( $setup );
183 0           alarm( $self->{connect_timeout} );
184 0           while (defined($resp=$sock->getline)) # phase 010
185             {
186 0           chomp($resp);
187 0 0         return if $resp eq 'NOT'; # move to phase 000
188 0 0         die "Phase 010: Invalid response from $name: $resp\n"
189             unless $resp =~ /^SETUP (.+)$/;
190 0           my $neg = POE::Component::IKC::Protocol::__neg_setup( $1 );
191 0 0         if( $neg->{bad} ) {
192 0           $sock->print( 'NOT' );
193 0           next;
194             }
195 0 0         die "Phase 010: Refused $self->{serialiser}, wants $neg->{freezer}[0]"
196             unless $neg->{freezer}[0] eq $self->{serialiser};
197 0           $remote->{name} = $neg->{kernel}[0];
198 0           foreach my $a ( @{ $neg->{kernel} } ) {
  0            
199 0           $remote->{aliases}{$a} = 1;
200             }
201 0           return 1;
202             }
203             }
204              
205              
206             #----------------------------------------------------
207             sub error
208             {
209 0 0   0 1   return $_[0]->{error} if @_==1;
210 0           return $error;
211             }
212             #----------------------------------------------------
213             sub ping
214             {
215 0     0 1   my($self)=@_;
216 0           my $ret=eval {
217 0           my $rsvp={kernel=>$self->{name},
218             session=>'IKC', state=>'pong'
219             };
220 0           my $r=$self->_send_msg({event=>$self->{ping}, params=>'PING',
221             rsvp=>$rsvp});
222 0 0         return unless $r;
223 0           my $pong=$self->_response($rsvp);
224 0 0 0       return 1 if $pong and $pong eq 'PONG';
225             };
226 0 0         $self->{error}=$error=$@ if $@;
227 0           $self->{remote}{connected}=$ret;
228 0           return $ret;
229             }
230              
231             #----------------------------------------------------
232             sub disconnect
233             {
234 0     0 1   my($self)=@_;
235             # 2001/01 why did we try to unregister ourselves? unregister wouldn't
236             # be safe for remote kernels anyway
237             # $self->call('IKC/unregister', $self->{name}) if $self->{remote};
238 0           delete @{$self->{remote}}{qw(socket connected name aliases)};
  0            
239 0           $self->{remote}={};
240             }
241              
242             sub DESTROY
243             {
244 0     0     my($self)=@_;
245 0           $self->disconnect;
246             }
247             sub END
248             {
249 2 50   2   1373 DEBUG and print "end\n";
250             }
251              
252             #----------------------------------------------------
253             # Post an event, maybe waits for a response and throws it away
254             #
255             sub post
256             {
257 0     0 1   my($self, $spec, $params)=@_;
258 0 0 0       unless(ref $spec or $spec=~m(^poe:)) {
259            
260 0 0         unless($self->{remote}{name}) {
261 0           $self->{error}=$error="Attempting to post $spec to unknown kernel";
262             # carp $error;
263 0           return;
264             }
265              
266 0           $spec="poe://$self->{remote}{name}/$spec";
267             }
268              
269 0           my $ret=eval {
270 0 0         return 0 if(0==$self->_try_send({event=>$spec, params=>$params}));
271 0           1;
272             };
273 0 0         if($@) {
274 0           $self->{error}=$error=$@;
275 0           return;
276             }
277 0           return $ret;
278             }
279              
280             #----------------------------------------------------
281             # posts an event, waits for the response, returns the response
282             sub call
283             {
284 0     0 1   my($self, $spec, $params)=@_;
285 0 0 0       $spec="poe://$self->{remote}{name}/$spec" unless ref $spec or $spec=~m(^poe:);
286              
287 0           my $rsvp={kernel=>$self->{name}, session=>'IKCLite',
288             state=>'response'.$request++};
289            
290 0           my $req={event=>$spec, params=>$params,
291             rsvp=>$rsvp, 'wantarray'=>wantarray(),
292             };
293 0           my @ret=eval {
294 0 0         return unless $self->_try_send($req);
295 0 0         DEBUG && print "Waiting for response...\n";
296 0           return $self->_response($rsvp, $req->{wantarray});
297             };
298 0 0         if($@) {
299 0           $self->{error}=$error=$@;
300 0           return;
301             }
302 0 0         return @ret if $req->{wantarray};
303 0           return $ret[0];
304             }
305              
306             #----------------------------------------------------
307             # posts an event, waits for the response, returns the response
308             # this differs from call() in that the foreign server may
309             # need many states before getting a response
310             sub post_respond
311             {
312 0     0 1   my($self, $spec, $params)=@_;
313 0 0 0       $spec="poe://$self->{remote}{name}/$spec" unless ref $spec or $spec=~m(^poe:);
314              
315 0           my $ret;
316 0           my $rsvp={kernel=>$self->{name}, session=>'IKCLite',
317             state=>'response'.$request++};
318 0           $ret=eval {
319 0 0         return unless $self->_try_send({event=>$spec,
320             params=>[$params, $rsvp],
321             });
322 0 0         DEBUG && print "Waiting for response...\n";
323 0           return $self->_response($rsvp);
324             };
325 0 0         if($@) {
326 0           $self->{error}=$error=$@;
327 0           return;
328             }
329 0           return $ret;
330             }
331              
332             #----------------------------------------------------
333             sub responded
334             {
335 0     0 1   my( $self, $state ) = @_;
336              
337 0           my $wantarray = wantarray;
338 0           my $rsvp = { kernel=>$self->{name},
339             session=>'IKCLite',
340             state=>$state
341             };
342 0           my @ret = eval {
343 0 0         DEBUG && print "Waiting for response...\n";
344 0           return $self->_response($rsvp, $wantarray);
345             };
346 0 0         if($@) {
347 0           $self->{error}=$error=$@;
348 0           return;
349             }
350 0 0         return @ret if wantarray;
351 0           return $ret[0];
352             }
353              
354              
355              
356             #----------------------------------------------------
357             sub _from
358             {
359 0     0     my( $self ) = @_;
360 0           return { kernel => $self->{name},
361             session => 'IKCLite',
362             # state => 'IKC:lite'
363             }
364             }
365              
366             #----------------------------------------------------
367             sub _try_send
368             {
369 0     0     my($self, $msg)=@_;
370 0 0 0       return unless $self->{remote}{connected} or $self->connect();
371              
372 0   0       $msg->{from} ||= $self->_from;
373              
374 0           my $ret=$self->_send_msg($msg);
375 0 0         DEBUG && print "Sending message...\n";
376 0 0 0       if(defined $ret and $ret==0) {
377 0 0         return 0 unless $self->connect();
378 0 0         DEBUG && print "Retry message...\n";
379 0           $ret=$self->_send_msg($msg);
380             }
381 0           return $ret;
382             }
383              
384             #----------------------------------------------------
385             sub _send_msg
386             {
387 0     0     my($self, $msg)=@_;
388              
389 0 0         my $e=$msg->{rsvp} ? 'call' : 'post';
390              
391 0           my $to=specifier_parse($msg->{event});
392 0 0         unless($to) {
393 0           croak "Bad message ", Dumper $msg;
394             }
395 0 0         unless($to) {
396 0           warn "Bad or missing 'to' parameter '$msg->{event}' to poe:/IKC/$e\n";
397 0           return;
398             }
399 0 0         unless($to->{session}) {
400 0           warn "Need a session name in poe:/IKC/$e";
401 0           return;
402             }
403 0 0         unless($to->{state}) {
404 0           carp "Need a state name in poe:IKC/$e";
405 0           return;
406             }
407              
408 0           my $frozen = $self->{remote}{freeze}->($msg);
409 0           my $raw=length($frozen) . "\0" . $frozen;
410              
411 0 0         unless($self->{remote}{socket}->opened()) {
412 0           $self->{connected}=0;
413 0           $self->{error}=$error="Socket not open";
414 0           return 0;
415             }
416 0 0         unless($self->{remote}{socket}->syswrite($raw, length $raw)) {
417 0           $self->{connected}=0;
418 0 0         return 0 if($!==EPIPE);
419 0           $self->{error}=$error="Error writing: $!\n";
420 0           return 0;
421             }
422 0           return 1;
423             }
424              
425              
426             #----------------------------------------------------
427             sub _response
428             {
429 0     0     my($self, $rsvp, $wantarray)=@_;
430              
431 0           $rsvp=specifier_parse($rsvp);
432 0           my $remote=$self->{remote};
433              
434 0           my $start = time;
435 0           my $stopon = $start + $self->{timeout};
436              
437 0 0         my $select=IO::Select->new() or die $!; # create the select object
438 0           $select->add($remote->{socket});
439              
440 0           my(@ready, $s, $raw, $frozen, $ret, $l, $need);
441 0           $raw='';
442              
443 0           my $blocks = 0;
444 0           do {{
445 0           my $timeout = $stopon-time;
  0            
446 0 0         if( $timeout <= 0 ) {
447 0           $timeout = 1;
448             }
449             # Torture::my_warn( "timeout=$timeout" );
450 0           @ready=$select->can_read( $timeout ); # this is the select
451 0 0         unless( @ready ) { # nothing ready == timeout
452             # Torture::my_warn( 'select hates me' );
453 0           last;
454             }
455            
456 0           foreach $s (@ready) # let's see what's ready...
457             {
458 0 0         die "Hey! $s isn't $remote->{socket}"
459             unless $s eq $remote->{socket};
460             }
461 0 0         DEBUG && print "Got something...\n";
462            
463             # read in another chunk
464 0           $l = $remote->{socket}->sysread($raw, $self->{block_size},
465             length($raw));
466              
467 0 0         unless(defined $l) { # disconnect, maybe?
468 0 0         $remote->{connected}=0 if $!==EPIPE;
469 0           die "Error reading: $!\n";
470             }
471 0           $blocks ++;
472              
473 0 0 0       if(not $need and $raw=~s/(\d+)\0//s) { # look for a marker?
474 0           $need=$1 ;
475 0 0         DEBUG && print "Need $need bytes...\n";
476             }
477              
478 0 0         next unless $need; # still looking...
479              
480 0 0         if(length($raw) >= $need) # do we have all we want?
481             {
482             # Torture::my_warn( 'Got it all' );
483 0 0         DEBUG && print "Got it all...\n";
484              
485 0           $frozen=substr($raw, 0, $need); # seems so...
486 0           substr($raw, 0, $need)='';
487 0           my $msg=$self->{remote}{thaw}->($frozen); # thaw the message
488 0 0         DEBUG && print "msg=", Dumper $msg;
489 0           my $to=specifier_parse($msg->{event});
490              
491 0 0         die "$msg->{params}\n" if($msg->{is_error}); # throw an error out
492 0 0         DEBUG && print "Not an error...\n";
493              
494             # make sure it's what we're waiting for...
495 0 0 0       if($to->{session} ne 'IKC' and $to->{session} ne 'IKCLite')
496             {
497 0           warn "Unknown session $to->{session}\n";
498 0 0         DEBUG && print "Not for us! ($to->{session})...\n";
499 0           next;
500             }
501 0 0 0       if($to->{session} ne $rsvp->{session} or
502             $to->{state} ne $rsvp->{state})
503             {
504 0           warn specifier_name($to). " received, expecting " .
505             specifier_name($rsvp). "\n";
506 0 0         DEBUG && print "Not for us! ($to->{session}/$to->{state})...\n";
507 0           next;
508             }
509              
510 0 0         DEBUG and print "wantarray=$wantarray\n";
511 0 0         if( $wantarray ) {
512 0 0         DEBUG and print "Wanted an array\n";
513 0 0         return @{$msg->{params}} if ref $msg->{params} eq 'ARRAY';
  0            
514             }
515 0           return $msg->{params}; # finaly!
516             }
517             # Torture::my_warn( "blocks=$blocks l=$l need=$need, got=", length $raw );
518             }} while ($stopon >= time) ; # do it until time's up
519              
520 0           $remote->{connected}=0;
521 0           confess "Timed out waiting for response ", specifier_name( $rsvp );
522             # die "Timed out waiting for response ", specifier_name( $rsvp ), "\n",
523             # "start=$start stopon=$stopon now=", time;
524 0           return;
525             }
526              
527              
528              
529              
530              
531              
532              
533              
534              
535             #------------------------------------------------------------------------------
536             # Try to require one of the default freeze/thaw packages.
537             sub _default_freezer
538             {
539 0     0     local $SIG{'__DIE__'} = 'DEFAULT';
540 0           my $ret;
541              
542 0           foreach my $p (qw(Storable FreezeThaw POE::Component::IKC::Freezer)) {
543 0           my $q=$p;
544 0           $q=~s(::)(/)g;
545 0           eval { require "$q.pm"; import $p ();};
  0            
  0            
546 0 0 0       DEBUG and warn $@ if $@;
547 0 0         return $p if $@ eq '';
548             }
549 0           die __PACKAGE__." requires Storable or FreezeThaw or POE::Component::IKC::Freezer\n";
550             }
551              
552             sub _get_freezer
553             {
554 0     0     my($freezer)=@_;
555 0 0         unless(ref $freezer) {
556 0           my $symtable=$::{"main::"};
557 0           my $loaded=1; # find out of the package was loaded
558 0           foreach my $p (split /::/, $freezer) {
559 0 0         unless(exists $symtable->{"$p\::"}) {
560 0           $loaded=0;
561 0           last;
562             }
563 0           $symtable=$symtable->{"$p\::"};
564             }
565              
566 0 0         unless($loaded) { my $q=$freezer;
  0            
567 0           $q=~s(::)(/)g;
568 0           eval {require "$q.pm"; import $freezer ();};
  0            
  0            
569 0 0         croak $@ if $@;
570             }
571             }
572              
573             # Now get the methodes we want
574 0   0       my $freeze=$freezer->can('nfreeze') || $freezer->can('freeze');
575 0 0         carp "$freezer doesn't have a freeze method" unless $freeze;
576 0           my $thaw=$freezer->can('thaw');
577 0 0         carp "$freezer doesn't have a thaw method" unless $thaw;
578              
579             # If it's an object, we use closures to create a $self->method()
580 0           my $tf=$freeze;
581 0           my $tt=$thaw;
582 0 0         if(ref $freezer) {
583 0     0     $tf=sub { return $freeze->($freezer, @_) };
  0            
584 0     0     $tt=sub { return ($thaw->($freezer, @_))[0] };
  0            
585             }
586             else {
587             # FreezeThaw::thaw returns an array now! We only want the first
588             # element.
589 0     0     $tt=sub { return ($thaw->( @_ ))[0] };
  0            
590             }
591 0           return($tf, $tt);
592             }
593              
594             1;
595              
596             __END__