File Coverage

blib/lib/IO/Stream.pm
Criterion Covered Total %
statement 101 101 100.0
branch 38 40 95.0
condition 31 32 96.8
subroutine 14 14 100.0
pod 3 4 75.0
total 187 191 97.9


line stmt bran cond sub pod time code
1             package IO::Stream;
2 27     27   2386799 use 5.010001;
  27         288  
3 27     27   115 use warnings;
  27         35  
  27         566  
4 27     27   108 use strict;
  27         53  
  27         420  
5 27     27   11720 use utf8;
  27         275  
  27         112  
6 27     27   634 use Carp;
  27         43  
  27         1507  
7              
8             our $VERSION = 'v2.0.3';
9              
10 27     27   127 use Scalar::Util qw( weaken );
  27         42  
  27         916  
11              
12              
13 27     27   7770 use IO::Stream::const;
  27         54  
  27         117  
14 27     27   9289 use IO::Stream::EV;
  27         67  
  27         3607  
15              
16              
17             #
18             # Export constants.
19             #
20             # Usage: use IO::Stream qw( :ALL :DEFAULT :Event :Error IN EINBUFLIMIT ... )
21             #
22             sub import {
23 28     28   2108 my %tags = (
24             Event => [ qw( RESOLVED CONNECTED IN OUT EOF SENT ) ],
25             Error => [ qw(
26             EINBUFLIMIT
27             ETORESOLVE ETOCONNECT ETOWRITE
28             EDNS EDNSNXDOMAIN EDNSNODATA
29             EREQINBUFLIMIT EREQINEOF
30             ) ],
31             );
32 28         86 $tags{ALL} = $tags{DEFAULT} = [ map { @{$_} } values %tags ];
  56         78  
  56         209  
33 28         63 my %known = map { $_ => 1 } @{ $tags{ALL} };
  420         664  
  28         57  
34              
35 28         77 my (undef, @p) = @_;
36 28 100       95 if (!@p) {
37 24         83 @p = (':DEFAULT');
38             }
39 28 100       655 @p = map { /\A:(\w+)\z/xms ? @{ $tags{$1} || [] } : $_ } @p;
  32 100       274  
  29         248  
40 28         100 my $pkg = caller;
41 27     27   169 no strict 'refs';
  27         46  
  27         18911  
42 28         408 for my $const (@p) {
43 399 100       610 next if !$known{$const};
44 398         399 *{"${pkg}::$const"} = \&{$const};
  398         930  
  398         538  
45             }
46 28         7477 return;
47             }
48              
49              
50             my @Active;
51              
52              
53             sub new {
54 67     67 1 56449 my (undef, $opt) = @_;
55             croak 'usage: IO::Stream->new({ cb=>, wait_for=>, [fh=>, | host=>, port=>,] ... })'
56             if ref $opt ne 'HASH'
57             || !$opt->{cb}
58             || !($opt->{fh} xor $opt->{host})
59 67 100 100     935 || ($opt->{host} xor $opt->{port});
      100        
      100        
      75        
      100        
60              
61             my $self = bless {
62             # no default values for these:
63             cb => undef,
64             wait_for => undef,
65             fh => undef,
66             host => undef,
67             port => undef,
68             # default values:
69             method => 'IO',
70             in_buf_limit=> undef,
71             out_buf => q{}, # modified on: OUT
72             out_pos => undef, # modified on: OUT
73             # user shouldn't provide values for these, but it's ok if he want:
74             out_bytes => 0, # modified on: OUT
75             in_buf => q{}, # modified on: IN
76             in_bytes => 0, # modified on: IN
77             ip => undef, # modified on: RESOLVED
78             is_eof => undef, # modified on: EOF
79             # load user values:
80 56         153 %{$opt},
  56         604  
81             # we'll setup these below:
82             plugin => {},
83             _master => undef,
84             _slave => undef,
85             _id => undef,
86             }, __PACKAGE__;
87              
88             # Create socket if needed.
89 56 100       234 if (!$self->{fh}) {
90             # Maybe it have sense instead or croak just send event to user?
91             # (Most probable reason: error in socket because there no more fd.)
92 19 50       692 socket $self->{fh}, AF_INET, SOCK_STREAM, PROTO_TCP
93             or croak "socket: $!";
94 19         54 if (!WIN32) {
95 19 50       160 fcntl $self->{fh}, F_SETFL, O_NONBLOCK or croak "fcntl: $!";
96             } else {
97             my $nb=1; ioctl $self->{fh}, FIONBIO, \$nb or croak "ioctl: $!";
98             }
99             }
100              
101             # Keep this object alive, even if user doesn't keep it himself.
102 56         161 $self->{_id} = fileno $self->{fh};
103 56 100       191 if (!$self->{_id}) {
    100          
104 3         26 croak q{can't get file descriptor};
105             } elsif ($Active[ $self->{_id} ]) {
106 3         25 croak q{can't create second object for same fh};
107             } else {
108 50         124 $Active[ $self->{_id} ] = $self;
109             }
110              
111             # Connect plugins into chain and setup {plugin}.
112 50         70 my $master = $self;
113 50 100       116 if ($opt->{plugin}) {
114 2         5 while (my ($name, $plugin) = splice @{ $opt->{plugin} }, 0, 2) {
  6         24  
115 4         7 $self->{plugin}{$name} = $plugin;
116 4         8 $master->{_slave} = $plugin;
117 4         21 $plugin->{_master} = $master;
118 4         29 weaken($plugin->{_master});
119 4         20 $master = $plugin;
120             }
121             }
122 50         277 my $plugin = IO::Stream::EV->new();
123 50         74 $master->{_slave} = $plugin;
124 50         91 $plugin->{_master} = $master;
125 50         142 weaken($plugin->{_master});
126              
127             # Ask plugin chain to continue with initialization:
128 50         181 $self->{_slave}->PREPARE($self->{fh}, $self->{host}, $self->{port});
129              
130             # Shortcuts for typical operations after creating new I/O object:
131 50 100       129 if (length $self->{out_buf}) {
132 18         50 $self->write();
133             }
134              
135 50         421 return $self;
136             }
137              
138             #
139             # Push user data down the stream, optionally adding new data to {out_buf}.
140             #
141             sub write { ## no critic (ProhibitBuiltinHomonyms)
142 234     234 1 207632 my ($self, $data) = @_;
143 234 100       528 if ($#_ > 0) {
144 166         730 $self->{out_buf} .= $data;
145             }
146 234         686 $self->{_slave}->WRITE();
147 234         369 return;
148             }
149              
150             #
151             # Free fh and Stream object.
152             #
153             sub close { ## no critic (ProhibitBuiltinHomonyms ProhibitAmbiguousNames)
154 38     38 1 53506 my ($self) = @_;
155 38         89 undef $Active[ $self->{_id} ];
156 38         1157 return close $self->{fh};
157             }
158              
159             #
160             # Filter and deliver to user events (received from top plugin in the chain).
161             #
162             sub EVENT {
163 1727     1727 0 2571 my ($self, $e, $err) = @_;
164 1727         2074 my $w = $self->{wait_for};
165 1727 100 100     3859 if ($e & IN && !($w & IN)) {
166             # override $err in case of wrong config
167 156 100       366 if (!($w & EOF)) {
    100          
168 1         1 $err = EREQINEOF;
169             }
170             elsif (!defined $self->{in_buf_limit}) {
171 1         2 $err = EREQINBUFLIMIT;
172             }
173             }
174 1727 100 100     4475 if (!$err && $e & IN && !($w & IN)) {
      100        
175 154         187 my $l = $self->{in_buf_limit};
176 154 100 100     460 if ($l > 0 && length $self->{in_buf} > $l) {
177 3         5 $err = EINBUFLIMIT;
178             }
179             }
180 1727         1900 $e &= $w;
181 1727 100 100     3280 if ($e || $err) {
182 782 100       1446 if (ref $self->{cb} eq 'CODE') {
183 777         1531 $self->{cb}->($self, $e, $err);
184             } else {
185 5         6 my $method = $self->{method};
186 5         21 $self->{cb}->$method($self, $e, $err);
187             }
188             }
189 1725         36037 return;
190             }
191              
192              
193             1; # Magic true value required at end of module
194             __END__