File Coverage

blib/lib/IO/Stream.pm
Criterion Covered Total %
statement 19 21 90.4
branch n/a
condition n/a
subroutine 7 7 100.0
pod n/a
total 26 28 92.8


line stmt bran cond sub pod time code
1             package IO::Stream;
2              
3 27     27   1827266 use warnings;
  27         55  
  27         1031  
4 27     27   140 use strict;
  27         43  
  27         918  
5 27     27   121 use Carp;
  27         49  
  27         2486  
6              
7 27     27   23026 use version; our $VERSION = qv('1.0.9'); # update POD & Changes & README
  27         63720  
  27         169  
8              
9             # update DEPENDENCIES in POD & Makefile.PL & README
10 27     27   3548 use Scalar::Util qw( weaken );
  27         50  
  27         2986  
11              
12              
13 27     27   16937 use IO::Stream::const;
  27         70  
  27         154  
14 27     27   15119 use IO::Stream::EV;
  0            
  0            
15              
16              
17             #
18             # Export constants.
19             #
20             # Usage: use IO::Stream qw( :ALL :DEFAULT :Event :Error IN EINBUFLIMIT ... )
21             #
22             my %tags = (
23             Event => [ qw( RESOLVED CONNECTED IN OUT EOF SENT ) ],
24             Error => [ qw(
25             EINBUFLIMIT
26             ETORESOLVE ETOCONNECT ETOWRITE
27             EDNS EDNSNXDOMAIN EDNSNODATA
28             EREQINBUFLIMIT EREQINEOF
29             ) ],
30             );
31             $tags{ALL} = $tags{DEFAULT} = [ map { @{$_} } values %tags ];
32             my %known = map { $_ => 1 } @{ $tags{ALL} };
33              
34             sub import {
35             my (undef, @p) = @_;
36             if (!@p) {
37             @p = (':DEFAULT');
38             }
39             @p = map { /\A:(\w+)\z/xms ? @{ $tags{$1} || [] } : $_ } @p;
40             my $pkg = caller;
41             no strict 'refs';
42             for my $const (@p) {
43             next if !$known{$const};
44             *{"${pkg}::$const"} = \&{$const};
45             }
46             return;
47             }
48              
49              
50             my @Active;
51              
52              
53             sub new {
54             my (undef, $opt) = @_;
55             croak 'usage: IO::Stream->new({ cb=>, wait_for=>, [fh=>, | host=>, port=>,] ... })'
56             if ref $opt ne 'HASH'
57             || !$opt->{cb}
58             || !($opt->{fh} xor $opt->{host})
59             || ($opt->{host} xor $opt->{port});
60              
61             my $self = bless {
62             # no default values for these:
63             cb => undef,
64             wait_for => undef,
65             fh => undef,
66             host => undef,
67             port => undef,
68             # default values:
69             method => 'IO',
70             in_buf_limit=> undef,
71             out_buf => q{}, # modified on: OUT
72             out_pos => undef, # modified on: OUT
73             # user shouldn't provide values for these, but it's ok if he want:
74             out_bytes => 0, # modified on: OUT
75             in_buf => q{}, # modified on: IN
76             in_bytes => 0, # modified on: IN
77             ip => undef, # modified on: RESOLVED
78             is_eof => undef, # modified on: EOF
79             # load user values:
80             %{$opt},
81             # we'll setup these below:
82             plugin => {},
83             _master => undef,
84             _slave => undef,
85             _id => undef,
86             }, __PACKAGE__;
87              
88             # Create socket if needed.
89             if (!$self->{fh}) {
90             # Maybe it have sense instead or croak just send event to user?
91             # (Most probable reason: error in socket because there no more fd.)
92             socket $self->{fh}, AF_INET, SOCK_STREAM, PROTO_TCP
93             or croak "socket: $!";
94             if (!WIN32) {
95             fcntl $self->{fh}, F_SETFL, O_NONBLOCK or croak "fcntl: $!";
96             } else {
97             my $nb=1; ioctl $self->{fh}, FIONBIO, \$nb or croak "ioctl: $!";
98             }
99             }
100              
101             # Keep this object alive, even if user doesn't keep it himself.
102             $self->{_id} = fileno $self->{fh};
103             if (!$self->{_id}) {
104             croak q{can't get file descriptor};
105             } elsif ($Active[ $self->{_id} ]) {
106             croak q{can't create second object for same fh};
107             } else {
108             $Active[ $self->{_id} ] = $self;
109             }
110              
111             # Connect plugins into chain and setup {plugin}.
112             my $master = $self;
113             if ($opt->{plugin}) {
114             while (my ($name, $plugin) = splice @{ $opt->{plugin} }, 0, 2) {
115             $self->{plugin}{$name} = $plugin;
116             $master->{_slave} = $plugin;
117             $plugin->{_master} = $master;
118             weaken($plugin->{_master});
119             $master = $plugin;
120             }
121             }
122             my $plugin = IO::Stream::EV->new();
123             $master->{_slave} = $plugin;
124             $plugin->{_master} = $master;
125             weaken($plugin->{_master});
126              
127             # Ask plugin chain to continue with initialization:
128             $self->{_slave}->PREPARE($self->{fh}, $self->{host}, $self->{port});
129              
130             # Shortcuts for typical operations after creating new I/O object:
131             if (length $self->{out_buf}) {
132             $self->write();
133             }
134              
135             return $self;
136             }
137              
138             #
139             # Push user data down the stream, optionally adding new data to {out_buf}.
140             #
141             sub write { ## no critic (ProhibitBuiltinHomonyms)
142             my ($self, $data) = @_;
143             if ($#_ > 0) {
144             $self->{out_buf} .= $data;
145             }
146             $self->{_slave}->WRITE();
147             return;
148             }
149              
150             #
151             # Free fh and Stream object.
152             #
153             sub close { ## no critic (ProhibitBuiltinHomonyms ProhibitAmbiguousNames)
154             my ($self) = @_;
155             undef $Active[ $self->{_id} ];
156             return close $self->{fh};
157             }
158              
159             #
160             # Filter and deliver to user events (received from top plugin in the chain).
161             #
162             sub EVENT {
163             my ($self, $e, $err) = @_;
164             my $w = $self->{wait_for};
165             if ($e & IN && !($w & IN)) {
166             # override $err in case of wrong config
167             if (!($w & EOF)) {
168             $err = EREQINEOF;
169             }
170             elsif (!defined $self->{in_buf_limit}) {
171             $err = EREQINBUFLIMIT;
172             }
173             }
174             if (!$err && $e & IN && !($w & IN)) {
175             my $l = $self->{in_buf_limit};
176             if ($l > 0 && length $self->{in_buf} > $l) {
177             $err = EINBUFLIMIT;
178             }
179             }
180             $e &= $w;
181             if ($e || $err) {
182             if (ref $self->{cb} eq 'CODE') {
183             $self->{cb}->($self, $e, $err);
184             } else {
185             my $method = $self->{method};
186             $self->{cb}->$method($self, $e, $err);
187             }
188             }
189             return;
190             }
191              
192              
193             1; # Magic true value required at end of module
194             __END__