File Coverage

blib/lib/Inferno/RegMgr/TCP.pm
Criterion Covered Total %
statement 16 18 88.8
branch n/a
condition n/a
subroutine 6 6 100.0
pod n/a
total 22 24 91.6


line stmt bran cond sub pod time code
1             package Inferno::RegMgr::TCP;
2              
3 1     1   3924 use warnings;
  1         3  
  1         29  
4 1     1   5 use strict;
  1         1  
  1         27  
5 1     1   5 use Carp;
  1         1  
  1         55  
6              
7             # update POD & Changes & README
8 1     1   5 use version; our $VERSION = qv('0.1.3');
  1         2  
  1         5  
9              
10             # update DEPENDENCIES in POD & Makefile.PL & README
11 1     1   80 use Scalar::Util qw( weaken );
  1         2  
  1         44  
12 1     1   579 use IO::Stream;
  0            
  0            
13             use Inferno::RegMgr::Utils qw( run_callback quote attr parse_svc );
14              
15              
16             use constant PORT_NEW => 6701;
17             use constant PORT_FIND => 6702;
18             use constant PORT_EVENT => 6703;
19             use constant KiB => 1024; ## no critic (Capitalization)
20              
21              
22             sub new {
23             my ($class, $opt) = @_;
24             croak '{host} required' if !defined $opt->{host};
25             my $self = {
26             host => $opt->{host},
27             port_new => $opt->{port_new} || PORT_NEW,
28             port_find => $opt->{port_find} || PORT_FIND,
29             port_event => $opt->{port_event} || PORT_EVENT,
30             };
31             return bless $self, $class;
32             }
33              
34             sub open_event {
35             my ($self, $opt) = @_;
36             croak '{cb} required' if !defined $opt->{cb};
37             my $io = IO::Stream->new({
38             host => $self->{host},
39             port => $self->{port_event},
40             cb => \&_cb_event,
41             wait_for => IN|EOF,
42             User_cb => [ $opt->{cb}, $opt->{method} ],
43             Is_connected=> 0,
44             });
45             return $io;
46             }
47              
48             sub _cb_event {
49             my ($io, $e, $err) = @_;
50             if (!$io->{Is_connected}) {
51             if ($e & IN) {
52             if ($io->{in_buf} =~ s/\AREADY\n//xms) {
53             $io->{Is_connected} = 1;
54             $e |= CONNECTED;
55             }
56             else {
57             $err = "Bug in registry: expected 'READY\\n', got '$io->{in_buf}'";
58             }
59             }
60             }
61             else {
62             if ($err) {
63             $e |= EOF;
64             }
65             }
66             if ($e & IN && $io->{in_buf} !~ s/.*\n//xms) {
67             $e &= ~IN;
68             if (length $io->{in_buf} > KiB) {
69             $err = 'Bug in registry: got 1 KiB without \\n';
70             }
71             }
72             if ($e & EOF || $err) {
73             $io->close();
74             }
75             if ($e || $err) {
76             run_callback( @{ $io->{User_cb} }, $e, $err );
77             }
78             return;
79             }
80              
81             sub open_new {
82             my ($self, $opt) = @_;
83             croak '{name} required' if !defined $opt->{name};
84             croak '{cb} required' if !defined $opt->{cb};
85             my $io = IO::Stream->new({
86             host => $self->{host},
87             port => $self->{port_new},
88             cb => \&_cb_new,
89             wait_for => 0,
90             User_cb => [ $opt->{cb}, $opt->{method} ],
91             });
92             $io->write(sprintf "%s %s\n", quote($opt->{name}), attr($opt->{attr}));
93             return $io;
94             }
95              
96             sub _cb_new {
97             my ($io, $e, $err) = @_;
98             $io->close();
99             run_callback( @{ $io->{User_cb} }, $err );
100             return;
101             }
102              
103             sub update {
104             my ($self, $io, $attr) = @_;
105             $io->write(sprintf "%s\n", attr($attr));
106             return;
107             }
108              
109             sub open_find {
110             my ($self, $opt) = @_;
111             croak '{cb} required' if !defined $opt->{cb};
112             my $io = IO::Stream->new({
113             host => $self->{host},
114             port => $self->{port_find},
115             cb => \&_cb_find,
116             wait_for => EOF,
117             in_buf_limit=> KiB*KiB,
118             User_cb => [ $opt->{cb}, $opt->{method} ],
119             });
120             $io->write(sprintf "%s\n", attr($opt->{attr}));
121             return $io;
122             }
123              
124             sub _cb_find {
125             my ($io, $e, $err) = @_;
126             my $svc;
127             if ($e & EOF && !$err) {
128             ($svc, $err) = parse_svc($io->{in_buf});
129             }
130             $io->close();
131             run_callback( @{ $io->{User_cb} }, $svc, $err );
132             return;
133             }
134              
135              
136             1; # Magic true value required at end of module
137             __END__