File Coverage

blib/lib/POE/Component/Proxy/TCP.pm
Criterion Covered Total %
statement 128 150 85.3
branch 16 38 42.1
condition 2 12 16.6
subroutine 27 34 79.4
pod 0 5 0.0
total 173 239 72.3


line stmt bran cond sub pod time code
1             # $Id: TCP.pm,v 1.2 2004/08/02 09:40:41 avpurshottam Exp $
2             # TCP Proxy Component - Andrew v. Purshottam andy@andypurshottam.com 22 Jun 2004
3             # Module structure adapted from PoCo::Server::TCP
4            
5             # to do:
6             # - clean up exported logic below
7             # - rationalize and document session aliases
8             # - document the connection between per client server sessions and
9             # per client client [sic!] sessions
10             # - change OrigPort and OrigAddress to RemotePort and RemoteServer
11             # - should the test code get installed and if so where? Study a famous module
12             # for example.
13            
14             package POE::Component::Proxy::TCP;
15            
16 2     2   432508 use strict;
  2         4  
  2         67  
17 2     2   10 use Exporter();
  2         4  
  2         41  
18            
19 2     2   10 use vars qw(@ISA @EXPORT $VERSION);
  2         9  
  2         553  
20             @ISA = qw(Exporter);
21            
22             @EXPORT = qw();
23 2     2   11 use vars qw($VERSION);
  2         4  
  2         104  
24             $VERSION = (qw($Revision: 1.2 $ ))[1];
25            
26 2     2   41 use Carp qw(carp croak);
  2         4  
  2         153  
27 2     2   2307 use Socket qw(INADDR_ANY inet_ntoa);
  2         4314  
  2         304  
28 2     2   963 use POSIX qw(ECONNABORTED ECONNRESET);
  2         8457  
  2         14  
29            
30 2     2   8717 use POE;
  2         51185  
  2         14  
31 2     2   123069 use POE::Component::Client::TCP;
  2         48071  
  2         67  
32 2     2   868 use POE::Filter::Stream;
  2         390  
  2         163  
33 2     2   12 use POE::Filter::Line;
  2         3  
  2         36  
34 2     2   10 use POE::Session;
  2         4  
  2         15  
35 2     2   4086 use POE::Component::Server::TCP;
  2         9222  
  2         70  
36            
37 2     2   1642 use POE::Component::Proxy::TCP::PoeDebug;
  2         6  
  2         143  
38            
39 2         13 use fields qw(clients alias address
40             orig_address port remote_client_filter
41             remote_server_input_filter remote_server_output_filter
42             data_from_client data_from_server
43             session_type session_params args client_connected
44 2     2   2247 );
  2         2755  
45            
46             sub new {
47 1     1 0 29 my $type = shift;
48            
49             # Helper so we don't have to type it all day. $mi is a name I call
50             # myself.
51 1         3 my $mi = $type . '->new()';
52             # All instance state should now live in the hash.
53 1         11 my $self = bless {@_},$type;
54            
55             # hash mapping server session ids to client session ids.
56 1         21 $self->{clients} = {};
57            
58             # param list is list of name, value pairs so must be even number.
59 1 50       11 croak "$mi requires an even number of parameters." if (@_ & 1);
60 1         5 my %param = @_;
61            
62             # Validate what we're given.
63 1 50       6 croak "$mi needs a Port parameter" unless exists $param{Port};
64            
65             # Extract parameters.
66 1         5 $self->{alias} = delete $param{Alias};
67 1         4 $self->{address} = delete $param{Address};
68 1         3 $self->{orig_address} = delete $param{OrigAddress};
69 1         3 $self->{port} = delete $param{Port};
70 1         3 $self->{orig_port} = delete $param{OrigPort};
71 1         4 $self->{remote_client_filter} = delete $param{RemoteClientFilter};
72 1         4 $self->{remote_server_input_filter} = delete $param{RemoteServerInputFilter};
73 1         2 $self->{remote_server_output_filter} = delete $param{RemoteServerOutputFilter};
74            
75 1         10 foreach ( qw( DataFromClient DataFromServer)) {
76 2 50 33     18 croak "$_ must be a coderef"
77             if defined($param{$_}) and ref($param{$_}) ne 'CODE';
78             }
79            
80 1         6 $self->{data_from_client} = delete $param{DataFromClient};
81 1         5 $self->{data_from_server} = delete $param{DataFromServer};
82            
83             # Defaults.
84 1 50       7 $self->{address} = INADDR_ANY unless defined $self->{address};
85 1 50       4 $self->{remote_client_filter} = "POE::Filter::Stream"
86             unless defined $self->{remote_client_filter};
87 1 50       4 $self->{remote_server_input_filter} = "POE::Filter::Stream"
88             unless defined $self->{remote_server_input_filter};
89 1 50       5 $self->{remote_server_output_filter} = "POE::Filter::Stream"
90             unless defined $self->{remote_server_output_filter};
91            
92 1 50       14 $self->{session_type} = 'POE::Session' unless defined $self->{session_type};
93 1 50 33     7 if (defined($self->{session_params}) && ref($self->{session_params})) {
94 0 0       0 if (ref($self->{session_params}) ne 'ARRAY') {
95 0         0 croak "SessionParams must be an array reference";
96             }
97             } else {
98 1         3 $self->{session_params} = [ ];
99             }
100            
101             # $self->{client_error} = \&_default_client_error unless defined $self->{client_error};
102 1 50   2   10 $self->{client_connected} = sub {} unless defined $self->{client_connected};
  2         5  
103             # $self->{client_disconnected} = sub {} unless defined $self->{client_disconnected};
104             # $self->{client_flushed} = sub {} unless defined $self->{client_flushed};
105            
106 1 50   0   6 $self->{data_from_client} = sub {} unless defined $self->{data_from_client};
  0         0  
107 1 50   0   52 $self->{data_from_server} = sub {} unless defined $self->{data_from_server};
  0         0  
108            
109 1 50       9 $self->{args} = [] unless defined $self->{args};
110            
111             # Extra states.
112            
113            
114 1         2 my $shutdown_on_error = 1;
115 1 50       4 if (exists $param{ClientShutdownOnError}) {
116 0         0 $shutdown_on_error = delete $param{ClientShutdownOnError};
117             }
118            
119             # Complain about strange things we're given.
120 1         6 foreach (sort keys %param) {
121 0         0 carp "$mi doesn't recognize \"$_\" as a parameter";
122             }
123            
124             # Server side of proxy, clients connect to this
125             $self->{server_component} = POE::Component::Server::TCP->new
126             (Alias => $self->{alias},
127             Port => $self->{port},
128             InlineStates => { send => sub {
129 32     32   4751 my ( $heap, $message ) = @_[ HEAP, ARG0 ];
130 32         173 dbprint(5, "sending to client:$message");
131 32         186 $heap->{client}->put($message);
132            
133             } },
134 1         21 Args => [$self], # so handle_client_connect_to_proxy_server gets $self
135             ClientConnected => \&handle_client_connect_to_proxy_server,
136             ClientError => \&handle_remote_client_error,
137             ClientDisconnected => \&handle_remote_client_disconnect,
138             ClientInput => \&handle_input_from_remote_client,
139             ClientInputFilter => $self->{remote_server_input_filter},
140             ClientOutputFilter => $self->{remote_server_output_filter}
141             );
142 1         4479 return $self;
143             }
144            
145             sub handle_remote_client_error {
146 0     0 0 0 my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP];
147 0         0 my $self = $heap->{self};
148 0         0 my $session_id = $session->ID;
149 0         0 dbprint(1, "Client $session_id disconnected due to error .");
150 0         0 delete $self->{clients}->{$session_id};
151             # XXX delete proxy client on heap???
152             };
153            
154             sub handle_remote_client_disconnect {
155 2     2 0 312 my ( $kernel, $session, $heap ) = @_[ KERNEL, SESSION, HEAP ];
156 2         8 my $self = $heap->{self};
157 2         8 my $session_id = $session->ID;
158 2         11 delete $heap->{proxy_client};
159 2         8 delete $self->{clients}->{$session_id};
160 2         10 dbprint(1, "Client $session_id disconnected.");
161             };
162            
163             sub handle_input_from_remote_client {
164 6     6 0 4869 my ( $kernel, $session, $heap, $input ) = @_[ KERNEL, SESSION, HEAP, ARG0 ];
165 6         13 my $self = $heap->{self};
166 6         16 my $session_id = $session->ID;
167 6         37 my $proxy_client_session_alias = $self->{clients}->{$session_id};
168            
169 6         33 dbprint(3, "Input:$input from $session_id sending to ",
170             $heap->{proxy_client_session_alias});
171            
172             # send the input to the remote server
173 6         27 $kernel->post($heap->{proxy_client_session_alias}, "send_server", $input);
174             # do whatever application specification processing is called for.
175 6         1131 $self->{data_from_client}->($input);
176             }
177            
178             # called in a Proxy Server Connection Session, passed self as parameter,
179             # responsible for setting in session heap.
180             sub handle_client_connect_to_proxy_server {
181 2     2 0 5918 my ( $kernel, $session, $heap, $self ) = @_[ KERNEL, SESSION, HEAP, ARG0 ];
182             # The per client session spawed by PoCo::Server::TCP
183 2         9 my $session_id = $session->ID;
184             # Standard alias name.
185             # XXX Should use the alias passed to constructor as prefix!
186 2         16 my $proxy_client_session_alias = $self->{alias} . "client". $session_id;
187 2         6 $heap->{proxy_client_session_alias} = $proxy_client_session_alias;
188             # Provides access to proxy object in per client connection server component session.
189 2         5 $heap->{self} = $self;
190 2         6 $self->{clients}->{$session_id} = $proxy_client_session_alias;
191 2         13 dbprint(2, "Client $session_id connected.");
192             # invoke the client connected callback.
193 2         8 $self->{client_connected}->($session_id);
194             # self is passed down to Proxy Client Connection session.
195             # Create the per client connection client component session.
196             $heap->{proxy_client} =
197             POE::Component::Client::TCP->new
198             ( Alias => $proxy_client_session_alias,
199             RemoteAddress => $self->{orig_address},
200             RemotePort => $self->{orig_port},
201             Filter => $self->{remote_client_filter},
202             Args => [$self],
203             Started => sub {
204 2     2   1301 my ( $kernel, $heap, $inner_self) = @_[ KERNEL, HEAP, ARG0];
205 2         6 $heap->{parent_client_session} = $session_id;
206 2         4 $heap->{self} = $inner_self;
207 2         5 $heap->{is_connected_to_server} = 0;
208 2         13 dbprint(3, "connected to $inner_self->{orig_address}:$inner_self->{orig_port}");
209             },
210             Connected => sub {
211 2     2   1840 my ( $kernel, $heap) = @_[ KERNEL, HEAP];
212 2         4 $heap->{is_connected_to_server} = 1;
213 2         3 $heap->{parent_client_session} = $session_id;
214 2         15 dbprint(3, "connected to $self->{orig_address}:$self->{orig_port}");
215             },
216            
217             # The connection failed.
218             ConnectError => sub {
219 0     0   0 dbprint(1, "could not connect to $self->{orig_address}:$self->{orig_port}");
220 0         0 $heap->{is_connected_to_server} = 0;
221             # XXX do something to shut the system down
222             # no in some applications wish to keep going and wait for new connection.
223             #$_[KERNEL]->yield("shutdown");
224             },
225            
226             # The remote server has sent us something,
227             # so send it to the remote client and perform
228             # whatever aookication specific log (eg logging to
229             # screen) is required.
230             ServerInput => sub {
231 32     32   30797 my ( $kernel, $heap, $input ) = @_[ KERNEL, HEAP, ARG0 ];
232 32 50       129 if (defined($input)) {
233 32         434 dbprint(3, "Input from remote server $self->{orig_address} :",
234             "$self->{orig_port}: -$input- sending to",
235             "remote client and any callback");
236 32         198 $kernel->post($heap->{parent_client_session},
237             "send", $input);
238 32         7069 $self->{data_from_server}->($input);
239             } else {
240 0         0 dbprint(1, "ServerInput event but no input!");
241             }
242             },
243            
244             ConnectError => sub {
245 0     0   0 my ($syscall_name, $error_number, $error_string) = @_[ARG0, ARG1, ARG2];
246 0         0 dbprint(2, "ConnectError from ORIG_SERVER on",
247             " $self->{orig_port} : $self->{orig_address}");
248             },
249            
250             Disconnected => sub {
251 2     2   1494 dbprint(1, "Disconnected from ORIG_SERVER on",
252             "$self->{orig_port} : $self->{orig_address}");
253 2         9 $_[KERNEL]->post($session_id, "shutdown");
254             },
255            
256             ServerError => sub {
257 2     2   489 my ($syscall_name, $error_number, $error_string) = @_[ARG0, ARG1, ARG2];
258 2         13 dbprint(1, "ServerError from ORIG_SERVER on $self->{orig_port} :",
259             "$self->{orig_address}");
260             },
261            
262             InlineStates =>
263             {
264             # Send data to the server.
265             send_server => sub {
266 6     6   1097 my ( $heap, $message ) = @_[ HEAP, ARG0 ];
267 6         48 dbprint(3, "sending to server:$self->{orig_address}:$self->{orig_port}:",
268             "mess:$message.");
269 6 50       25 if ($heap->{is_connected_to_server}) {
270 6         38 $heap->{server}->put($message);
271             } else {
272 0           dbprint(1, "send_server error not connected to server.");
273             }
274             },
275             },
276 2         93 );
277             }
278            
279             # The default server error handler logs to STDERR and shuts down the
280             # server connection.
281             # XXX remove both of these somoday, they are not used.
282            
283             sub _default_server_error {
284 0     0     warn( 'Server ', $_[SESSION]->ID,
285             " got $_[ARG0] error $_[ARG1] ($_[ARG2])\n"
286             );
287             # delete $_[HEAP]->{listener};
288             }
289            
290             # The default client error handler logs to STDERR and shuts down the
291             # client connection.
292            
293             sub _default_client_error {
294 0     0     my ($syscall, $errno, $error) = @_[ARG0..ARG2];
295 0 0 0       unless ($syscall eq "read" and ($errno == 0 or $errno == ECONNRESET)) {
      0        
296 0 0         $error = "(no error)" unless $errno;
297 0           warn(
298             'Client session ', $_[SESSION]->ID,
299             " got $syscall error $errno ($error)\n"
300             );
301             }
302             }
303            
304             1;
305            
306             __END__