File Coverage

blib/lib/POE/Component/Client/TCP.pm
Criterion Covered Total %
statement 140 166 84.3
branch 75 104 72.1
condition 9 18 50.0
subroutine 20 27 74.0
pod 1 1 100.0
total 245 316 77.5


line stmt bran cond sub pod time code
1             package POE::Component::Client::TCP;
2              
3 11     11   485 use strict;
  11         20  
  11         434  
4              
5 11     11   49 use vars qw($VERSION);
  11         19  
  11         561  
6             $VERSION = '1.365'; # NOTE - Should be #.### (three decimal places)
7              
8 11     11   50 use Carp qw(carp croak);
  11         17  
  11         618  
9 11     11   53 use Errno qw(ETIMEDOUT ECONNRESET);
  11         15  
  11         502  
10              
11             # Explicit use to import the parameter constants;
12 11     11   62 use POE::Session;
  11         14  
  11         69  
13 11     11   879 use POE::Driver::SysRW;
  11         19  
  11         198  
14 11     11   710 use POE::Filter::Line;
  11         17  
  11         221  
15 11     11   1041 use POE::Wheel::ReadWrite;
  11         18  
  11         302  
16 11     11   2012 use POE::Wheel::SocketFactory;
  11         19  
  11         18023  
17              
18             # Create the client. This is just a handy way to encapsulate
19             # POE::Session->create(). Because the states are so small, it uses
20             # real inline coderefs.
21              
22             sub new {
23 84     84 1 9013 my $type = shift;
24              
25             # Helper so we don't have to type it all day. $mi is a name I call
26             # myself.
27 84         146 my $mi = $type . '->new()';
28              
29             # If they give us lemons, tell them to make their own damn
30             # lemonade.
31 84 100       421 croak "$mi requires an even number of parameters" if (@_ & 1);
32 82         2150 my %param = @_;
33              
34             # Validate what we're given.
35 82 100       392 croak "$mi needs a RemoteAddress parameter"
36             unless exists $param{RemoteAddress};
37 80 100       321 croak "$mi needs a RemotePort parameter"
38             unless exists $param{RemotePort};
39              
40             # Extract parameters.
41 78         120 my $alias = delete $param{Alias};
42 78         105 my $address = delete $param{RemoteAddress};
43 78         91 my $port = delete $param{RemotePort};
44 78         84 my $domain = delete $param{Domain};
45 78         76 my $bind_address = delete $param{BindAddress};
46 78         74 my $bind_port = delete $param{BindPort};
47 78         75 my $ctimeout = delete $param{ConnectTimeout};
48 78         71 my $args = delete $param{Args};
49 78         78 my $session_type = delete $param{SessionType};
50 78         72 my $session_params = delete $param{SessionParams};
51              
52 78 50       150 $args = [] unless defined $args;
53 78 50       182 croak "Args must be an array reference" unless ref($args) eq "ARRAY";
54              
55 78         131 foreach (
56             qw(
57             PreConnect Connected ConnectError Disconnected ServerInput
58             ServerError ServerFlushed Started
59             ServerHigh ServerLow
60             )
61             ) {
62 780 50 66     2123 croak "$_ must be a coderef" if(
63             defined($param{$_}) and ref($param{$_}) ne 'CODE'
64             );
65             }
66              
67 78         117 my $high_mark_level = delete $param{HighMark};
68 78         79 my $low_mark_level = delete $param{LowMark};
69 78         98 my $high_event = delete $param{ServerHigh};
70 78         74 my $low_event = delete $param{ServerLow};
71              
72             # this is ugly, but now its elegant :) grep++
73 78         101 my $using_watermarks = grep { defined $_ }
  312         367  
74             ($high_mark_level, $low_mark_level, $high_event, $low_event);
75 78 100 66     178 if ($using_watermarks > 0 and $using_watermarks != 4) {
76 8         738 croak "If you use the Mark settings, you must define all four";
77             }
78              
79 70 50   0   264 $high_event = sub { } unless defined $high_event;
  0         0  
80 70 50   0   221 $low_event = sub { } unless defined $low_event;
  0         0  
81              
82 70         94 my $pre_conn_callback = delete $param{PreConnect};
83 70         94 my $conn_callback = delete $param{Connected};
84 70         79 my $conn_error_callback = delete $param{ConnectError};
85 70         84 my $disc_callback = delete $param{Disconnected};
86 70         83 my $input_callback = delete $param{ServerInput};
87 70         77 my $error_callback = delete $param{ServerError};
88 70         108 my $flush_callback = delete $param{ServerFlushed};
89 70         75 my $start_callback = delete $param{Started};
90 70         63 my $filter = delete $param{Filter};
91             # TODO should we have ServerInputFilter/ServerOutputFilter like Server-TCP does?
92              
93             # Extra states.
94              
95 70         76 my $inline_states = delete $param{InlineStates};
96 70 100       132 $inline_states = {} unless defined $inline_states;
97              
98 70         68 my $package_states = delete $param{PackageStates};
99 70 100       141 $package_states = [] unless defined $package_states;
100              
101 70         78 my $object_states = delete $param{ObjectStates};
102 70 100       116 $object_states = [] unless defined $object_states;
103              
104 70 100       318 croak "InlineStates must be a hash reference"
105             unless ref($inline_states) eq 'HASH';
106              
107 68 100       302 croak "PackageStates must be a list or array reference"
108             unless ref($package_states) eq 'ARRAY';
109              
110 66 100       335 croak "ObjectStates must be a list or array reference"
111             unless ref($object_states) eq 'ARRAY';
112              
113             # Errors.
114              
115 64 100       322 croak "$mi requires a ServerInput parameter" unless defined $input_callback;
116              
117 62         180 foreach (sort keys %param) {
118 0         0 carp "$mi doesn't recognize \"$_\" as a parameter";
119             }
120              
121             # Defaults.
122              
123 62 50       121 $session_type = 'POE::Session' unless defined $session_type;
124 62 100 66     171 if (defined($session_params) && ref($session_params)) {
125 2 50       4 if (ref($session_params) ne 'ARRAY') {
126 2         183 croak "SessionParams must be an array reference";
127             }
128             } else {
129 60         75 $session_params = [ ];
130             }
131              
132 60 50       95 $address = '127.0.0.1' unless defined $address;
133              
134 60 100       109 $conn_error_callback = \&_default_error unless defined $conn_error_callback;
135 60 100       102 $error_callback = \&_default_io_error unless defined $error_callback;
136              
137             # Spawn the session that makes the connection and then interacts
138             # with what was connected to.
139              
140             return $session_type->create
141             ( @$session_params,
142             inline_states =>
143             { _start => sub {
144 60     60   108 my ($kernel, $heap) = @_[KERNEL, HEAP];
145 60         118 $heap->{shutdown_on_error} = 1;
146 60 100       170 $kernel->alias_set( $alias ) if defined $alias;
147 60         150 $kernel->yield( 'reconnect' );
148 60 100       228 $start_callback and $start_callback->(@_);
149             },
150              
151             # To quiet ASSERT_STATES.
152 60     60   186 _stop => sub { },
153 0     0   0 _child => sub { },
154              
155             reconnect => sub {
156 60     60   101 my ($kernel, $heap) = @_[KERNEL, HEAP];
157              
158 60         99 $heap->{shutdown} = 0;
159 60         74 $heap->{connected} = 0;
160              
161             # Tentative patch to re-establish the alias upon reconnect.
162             # Necessary because otherwise the alias goes away for good.
163             # Unfortunately, there is a gap where the alias may not be
164             # set, and any events dispatched then will be dropped.
165 60 100       162 $kernel->alias_set( $alias ) if defined $alias;
166              
167 60         334 $heap->{server} = POE::Wheel::SocketFactory->new
168             ( RemoteAddress => $address,
169             RemotePort => $port,
170             SocketDomain => $domain,
171             BindAddress => $bind_address,
172             BindPort => $bind_port,
173             SuccessEvent => 'got_connect_success',
174             FailureEvent => 'got_connect_error',
175             );
176 60 50       115 $_[KERNEL]->alarm_remove( delete $heap->{ctimeout_id} )
177             if exists $heap->{ctimeout_id};
178 60 100       224 $heap->{ctimeout_id} = $_[KERNEL]->alarm_set
179             ( got_connect_timeout => time + $ctimeout
180             ) if defined $ctimeout;
181             },
182              
183             connect => sub {
184 0     0   0 my ($new_address, $new_port) = @_[ARG0, ARG1];
185 0 0       0 $address = $new_address if defined $new_address;
186 0 0       0 $port = $new_port if defined $new_port;
187 0         0 $_[KERNEL]->yield("reconnect");
188             },
189              
190             got_connect_success => sub {
191 58     58   125 my ($kernel, $heap, $socket) = @_[KERNEL, HEAP, ARG0];
192              
193 58 100       151 $kernel->alarm_remove( delete $heap->{ctimeout_id} )
194             if exists $heap->{ctimeout_id};
195              
196             # Pre-connected callback.
197 58 100       108 if ($pre_conn_callback) {
198 2 50       8 unless ($socket = $pre_conn_callback->(@_)) {
199 0         0 $heap->{connected} = 0;
200             # TODO - Error callback? Disconnected callback?
201 0         0 return;
202             }
203             }
204              
205             # Ok to overwrite like this as of 0.13.
206 58 50       1291 $_[HEAP]->{server} = POE::Wheel::ReadWrite->new
207             ( Handle => $socket,
208             Driver => POE::Driver::SysRW->new(),
209             Filter => _get_filter($filter),
210             InputEvent => 'got_server_input',
211             ErrorEvent => 'got_server_error',
212             FlushedEvent => 'got_server_flush',
213             (
214             $using_watermarks
215             ? (
216             HighMark => $high_mark_level,
217             HighEvent => 'got_high',
218             LowMark => $low_mark_level,
219             LowEvent => 'got_low',
220             )
221             : ()
222             )
223             );
224              
225 58         456 $heap->{connected} = 1;
226 58 100       296 $conn_callback and $conn_callback->(@_);
227             },
228             got_high => $high_event,
229             got_low => $low_event,
230              
231             got_connect_error => sub {
232 2     2   4 my $heap = $_[HEAP];
233 2 50       6 $_[KERNEL]->alarm_remove( delete $heap->{ctimeout_id} )
234             if exists $heap->{ctimeout_id};
235 2         5 $heap->{connected} = 0;
236 2         8 $conn_error_callback->(@_);
237 2         1067 delete $heap->{server};
238             },
239              
240             got_connect_timeout => sub {
241 0     0   0 my $heap = $_[HEAP];
242 0         0 $heap->{connected} = 0;
243 0 0       0 $_[KERNEL]->alarm_remove( delete $heap->{ctimeout_id} )
244             if exists $heap->{ctimeout_id};
245 0         0 $! = ETIMEDOUT;
246 0         0 @_[ARG0,ARG1,ARG2] = ('connect', $!+0, $!);
247 0         0 $conn_error_callback->(@_);
248 0         0 delete $heap->{server};
249             },
250              
251             got_server_error => sub {
252 18     18   80 $error_callback->(@_);
253 18 50       1298 if ($_[HEAP]->{shutdown_on_error}) {
254 18         73 $_[KERNEL]->yield("shutdown");
255 18         72 $_[HEAP]->{got_an_error} = 1;
256             }
257             },
258              
259             got_server_input => sub {
260 58     58   91 my $heap = $_[HEAP];
261 58 50       444 return if $heap->{shutdown};
262 58         212 $input_callback->(@_);
263             },
264              
265             got_server_flush => sub {
266 60     60   70 my $heap = $_[HEAP];
267 60 100       278 $flush_callback and $flush_callback->(@_);
268 60 100       1496 if ($heap->{shutdown}) {
269 4         20 delete $heap->{server};
270 4 50       25 $disc_callback and $disc_callback->(@_);
271             }
272             },
273              
274             shutdown => sub {
275 69     69   242 my ($kernel, $heap) = @_[KERNEL, HEAP];
276 69         139 $heap->{shutdown} = 1;
277              
278 69 50       201 $kernel->alarm_remove( delete $heap->{ctimeout_id} )
279             if exists $heap->{ctimeout_id};
280              
281 69 100       184 if ($heap->{connected}) {
282 58         86 $heap->{connected} = 0;
283 58 50       136 if (defined $heap->{server}) {
284 58 100 100     272 if (
285             $heap->{got_an_error} or
286             not $heap->{server}->get_driver_out_octets()
287             ) {
288 54         209 delete $heap->{server};
289 54 100       275 $disc_callback and $disc_callback->(@_);
290             }
291             }
292             }
293             else {
294 11         39 delete $heap->{server};
295             }
296              
297 69 100       418 $kernel->alias_remove($alias) if defined $alias;
298             },
299              
300             # User supplied states.
301 60         1840 %$inline_states,
302             },
303              
304             # User arguments.
305             args => $args,
306              
307             # User supplied states.
308             package_states => $package_states,
309             object_states => $object_states,
310             )->ID;
311             }
312              
313             sub _get_filter {
314 58     58   61 my $filter = shift;
315 58 100       208 if (ref $filter eq 'ARRAY') {
    50          
    50          
316 2         6 my @filter_args = @$filter;
317 2         4 $filter = shift @filter_args;
318 2         11 return $filter->new(@filter_args);
319             } elsif (ref $filter) {
320 0         0 return $filter->clone();
321             } elsif (!defined($filter)) {
322 56         174 return POE::Filter::Line->new();
323             } else {
324 0           return $filter->new();
325             }
326             }
327              
328             # The default error handler logs to STDERR and shuts down the socket.
329              
330             sub _default_error {
331 0 0 0 0     unless ($_[ARG0] eq "read" and ($_[ARG1] == 0 or $_[ARG1] == ECONNRESET)) {
      0        
332 0           warn(
333             'Client ', $_[SESSION]->ID, " got $_[ARG0] error $_[ARG1] ($_[ARG2])\n"
334             );
335             }
336 0           delete $_[HEAP]->{server};
337             }
338              
339             sub _default_io_error {
340 0     0     my ($syscall, $errno, $error) = @_[ARG0..ARG2];
341 0 0         $error = "Normal disconnection" unless $errno;
342 0           warn('Client ', $_[SESSION]->ID, " got $syscall error $errno ($error)\n");
343 0           $_[KERNEL]->yield("shutdown");
344             }
345              
346             1;
347              
348             __END__