File Coverage

blib/lib/Test/POE/Server/TCP.pm
Criterion Covered Total %
statement 200 244 81.9
branch 57 102 55.8
condition 6 16 37.5
subroutine 37 43 86.0
pod 16 16 100.0
total 316 421 75.0


line stmt bran cond sub pod time code
1             package Test::POE::Server::TCP;
2             BEGIN {
3 12     12   11796379 $Test::POE::Server::TCP::VERSION = '1.16';
4             }
5              
6             # ABSTRACT: A POE Component providing TCP server services for test cases
7              
8 12     12   117 use strict;
  12         22  
  12         401  
9 12     12   69 use warnings;
  12         22  
  12         623  
10 12     12   925 use POE qw(Wheel::SocketFactory Wheel::ReadWrite Filter::Line);
  12         62421  
  12         80  
11 12     12   183364 use Socket;
  12         37  
  12         9329  
12 12     12   109 use Carp qw(carp croak);
  12         42  
  12         53975  
13              
14             sub spawn {
15 12     12 1 10888 my $package = shift;
16 12         65 my %opts = @_;
17 12         105 $opts{lc $_} = delete $opts{$_} for keys %opts;
18 12         44 my $options = delete $opts{options};
19 12         562 my $self = bless \%opts, $package;
20 12         93 $self->{_prefix} = delete $self->{prefix};
21 12 100       66 $self->{_prefix} = 'testd_' unless defined $self->{_prefix};
22 12 100       89 $self->{_prefix} .= '_' unless $self->{_prefix} =~ /\_$/;
23 12 100       427 $self->{session_id} = POE::Session->create(
24             object_states => [
25             $self => { shutdown => '_shutdown',
26             send_event => '__send_event',
27             send_to_client => '_send_to_client',
28             send_to_all_clients => '_send_to_all_clients',
29             disconnect => '_disconnect',
30             terminate => '_terminate',
31             start_listener => '_start_listener',
32             },
33             $self => [ qw(_start register unregister _accept_client _accept_failed _conn_input _conn_error _conn_flushed _conn_alarm _send_to_client __send_event _disconnect _send_to_all_clients) ],
34             ],
35             heap => $self,
36             ( ref($options) eq 'HASH' ? ( options => $options ) : () ),
37             )->ID();
38 12         1936 return $self;
39             }
40              
41             sub session_id {
42 15     15 1 630 return $_[0]->{session_id};
43             }
44              
45             sub pause_listening {
46 1 50   1 1 61 return unless $_[0]->{listener};
47 1         6 $_[0]->{listener}->pause_accept();
48             }
49              
50             sub resume_listening {
51 1 50   1 1 1002405 return unless $_[0]->{listener};
52 1         13 $_[0]->{listener}->resume_accept();
53             }
54              
55             sub getsockname {
56 11 50   11 1 70 return unless $_[0]->{listener};
57 11         85 return $_[0]->{listener}->getsockname();
58             }
59              
60             sub port {
61 11     11 1 59028 my $self = shift;
62 11         69 return ( sockaddr_in( $self->getsockname() ) )[0];
63             }
64              
65             sub _conn_exists {
66 86     86   126 my ($self,$wheel_id) = @_;
67 86 50 33     799 return 0 unless $wheel_id and defined $self->{clients}->{ $wheel_id };
68 86         264 return 1;
69             }
70              
71             sub shutdown {
72 10     10 1 16716 my $self = shift;
73 10         86 $poe_kernel->call( $self->{session_id}, 'shutdown' );
74             }
75              
76             sub _start {
77 12     12   6405 my ($kernel,$self,$sender) = @_[KERNEL,OBJECT,SENDER];
78 12         143 $self->{session_id} = $_[SESSION]->ID();
79 12 50       115 if ( $self->{alias} ) {
80 0         0 $kernel->alias_set( $self->{alias} );
81             }
82             else {
83 12         102 $kernel->refcount_increment( $self->{session_id} => __PACKAGE__ );
84             }
85 12 100       553 if ( $kernel != $sender ) {
86 11         77 my $sender_id = $sender->ID;
87 11         98 $self->{events}->{$self->{_prefix} . 'all'}->{$sender_id} = $sender_id;
88 11         51 $self->{sessions}->{$sender_id}->{'ref'} = $sender_id;
89 11         46 $self->{sessions}->{$sender_id}->{'refcnt'}++;
90 11         42 $kernel->refcount_increment($sender_id, __PACKAGE__);
91 11         341 $kernel->post( $sender, $self->{_prefix} . 'registered', $self );
92 11         1465 $kernel->detach_myself();
93             }
94              
95 12         1652 $kernel->call( $self->{session_id}, 'start_listener' );
96 12         123 return;
97             }
98              
99             sub start_listener {
100 0     0 1 0 my $self = shift;
101 0         0 $poe_kernel->call( $self->{session_id}, 'start_listener', @_ );
102             }
103              
104             sub _start_listener {
105 12     12   603 my ($kernel,$self) = @_[KERNEL,OBJECT];
106 12 50       57 return if $self->{listener};
107              
108 12 100       212 $self->{listener} = POE::Wheel::SocketFactory->new(
    100          
109             ( defined $self->{address} ? ( BindAddress => $self->{address} ) : () ),
110             ( defined $self->{port} ? ( BindPort => $self->{port} ) : ( BindPort => 0 ) ),
111             SuccessEvent => '_accept_client',
112             FailureEvent => '_accept_failed',
113             SocketDomain => AF_INET, # Sets the socket() domain
114             SocketType => SOCK_STREAM, # Sets the socket() type
115             SocketProtocol => 'tcp', # Sets the socket() protocol
116             Reuse => 'on', # Lets the port be reused
117             );
118              
119 12         20032 return;
120             }
121              
122             sub _accept_client {
123 12     12   16648 my ($kernel,$self,$socket,$peeraddr,$peerport) = @_[KERNEL,OBJECT,ARG0..ARG2];
124 12         615537 my $sockaddr = inet_ntoa( ( unpack_sockaddr_in ( CORE::getsockname $socket ) )[1] );
125 12         142 my $sockport = ( unpack_sockaddr_in ( CORE::getsockname $socket ) )[0];
126 12         71 $peeraddr = inet_ntoa( $peeraddr );
127              
128 12         116 my $wheel = POE::Wheel::ReadWrite->new(
129             Handle => $socket,
130             _get_filters(
131             $self->{filter},
132             $self->{inputfilter},
133             $self->{outputfilter}
134             ),
135             InputEvent => '_conn_input',
136             ErrorEvent => '_conn_error',
137             FlushedEvent => '_conn_flushed',
138             );
139              
140 12 50       5445 return unless $wheel;
141            
142 12         304 my $id = $wheel->ID();
143 12         181 $self->{clients}->{ $id } =
144             {
145             wheel => $wheel,
146             peeraddr => $peeraddr,
147             peerport => $peerport,
148             sockaddr => $sockaddr,
149             sockport => $sockport,
150             };
151 12         150 $self->_send_event( $self->{_prefix} . 'connected', $id, $peeraddr, $peerport, $sockaddr, $sockport );
152              
153             #$self->{clients}->{ $id }->{alarm} = $kernel->delay_set( '_conn_alarm', $self->{time_out} || 300, $id );
154 12         79 return;
155             }
156              
157             sub client_info {
158 2     2 1 1571 my $self = shift;
159 2   50     6 my $id = shift || return;
160 2 50       6 return unless $self->_conn_exists( $id );
161 2         4 my %hash = %{ $self->{clients}->{ $id } };
  2         9  
162 2         4 delete $hash{wheel};
163 2 100       16 return map { $hash{$_} } qw(peeraddr peerport sockaddr sockport) if wantarray;
  4         8  
164 1         3 return \%hash;
165             }
166              
167             sub client_wheel {
168 1     1 1 6 my $self = shift;
169 1   50     13 my $id = shift || return;
170 1 50       4 return unless $self->_conn_exists( $id );
171 1         8 return $self->{clients}->{ $id }->{wheel};
172             }
173              
174             sub _get_filters {
175 12     12   105 my ($client_filter, $client_infilter, $client_outfilter) = @_;
176 12 100 66     297 if (defined $client_infilter or defined $client_outfilter) {
    50          
177             return (
178 1         5 "InputFilter" => _load_filter($client_infilter),
179             "OutputFilter" => _load_filter($client_outfilter)
180             );
181 0 0       0 if (defined $client_filter) {
182 0         0 carp(
183             "Filter ignored with InputFilter or OutputFilter"
184             );
185             }
186             }
187             elsif (defined $client_filter) {
188 0         0 return ( "Filter" => _load_filter($client_filter) );
189             }
190             else {
191 11         290 return ( Filter => POE::Filter::Line->new(), );
192             }
193              
194             }
195              
196             # Get something: either arrayref, ref, or string
197             # Return filter
198             sub _load_filter {
199 2     2   30 my $filter = shift;
200 2 50       28 if (ref ($filter) eq 'ARRAY') {
    50          
201 0         0 my @args = @$filter;
202 0         0 $filter = shift @args;
203 0 0       0 if ( _test_filter($filter) ){
204 0         0 return $filter->new(@args);
205             } else {
206 0         0 return POE::Filter::Line->new(@args);
207             }
208             }
209             elsif (ref $filter) {
210 2         11 return $filter->clone();
211             }
212             else {
213 0 0       0 if ( _test_filter($filter) ) {
214 0         0 return $filter->new();
215             } else {
216 0         0 return POE::Filter::Line->new();
217             }
218             }
219             }
220              
221             # Test if a Filter can be loaded, return sucess or failure
222             sub _test_filter {
223 0     0   0 my $filter = shift;
224 0         0 my $eval = eval {
225 0         0 (my $mod = $filter) =~ s!::!/!g;
226 0         0 require "$mod.pm";
227 0         0 1;
228             };
229 0 0 0     0 if (!$eval and $@) {
230 0         0 carp(
231             "Failed to load [$filter]\n" .
232             "Reason $@\nUsing defualt POE::Filter::Line "
233             );
234 0         0 return 0;
235             }
236 0         0 return 1;
237             }
238              
239             sub _accept_failed {
240 0     0   0 my ($kernel,$self,$operation,$errnum,$errstr,$wheel_id) = @_[KERNEL,OBJECT,ARG0..ARG3];
241 0         0 warn "Wheel $wheel_id generated $operation error $errnum: $errstr\n";
242 0 0       0 delete $self->{listener} if $operation eq 'listen';
243 0         0 $self->_send_event( $self->{_prefix} . 'listener_failed', $operation, $errnum, $errstr );
244 0         0 return;
245             }
246              
247             sub disconnect {
248 5     5 1 2940 my $self = shift;
249 5         29 $poe_kernel->call( $self->{session_id}, 'disconnect', @_ );
250             }
251              
252             sub _disconnect {
253 5     5   304 my ($kernel,$self,$id) = @_[KERNEL,OBJECT,ARG0];
254 5 50       19 return unless $self->_conn_exists( $id );
255 5         18 $self->{clients}->{ $id }->{quit} = 1;
256 5         29 return 1;
257             }
258              
259             sub terminate {
260 4     4 1 2052 my $self = shift;
261 4         23 $poe_kernel->call( $self->{session_id}, 'terminate', @_ );
262             }
263              
264             sub _terminate {
265 4     4   203 my ($kernel,$self,$id) = @_[KERNEL,OBJECT,ARG0];
266 4 50       15 return unless $self->_conn_exists( $id );
267 4         32 delete $self->{clients}->{ $id };
268 4         1451 $self->_send_event( $self->{_prefix} . 'disconnected', $id );
269 4         13 return 1;
270             }
271              
272             sub _conn_input {
273 18     18   25625 my ($kernel,$self,$input,$id) = @_[KERNEL,OBJECT,ARG0,ARG1];
274 18 50       66 return unless $self->_conn_exists( $id );
275             #$kernel->delay_adjust( $self->{clients}->{ $id }->{alarm}, $self->{time_out} || 300 );
276 18         82 $self->_send_event( $self->{_prefix} . 'client_input', $id, $input );
277 18         54 return;
278             }
279              
280             sub _conn_error {
281 2     2   3010 my ($self,$errstr,$id) = @_[OBJECT,ARG2,ARG3];
282 2 50       8 return unless $self->_conn_exists( $id );
283 2         7 my $href = delete $self->{clients}->{ $id };
284 2         10 delete $href->{wheel};
285 2         342 $self->_send_event( $self->{_prefix} . 'disconnected', $id, map { $href->{$_} } qw(peeraddr peerport sockaddr sockport) );
  8         19  
286 2         8 return;
287             }
288              
289             sub _conn_flushed {
290 29     29   26278 my ($self,$id) = @_[OBJECT,ARG0];
291 29 50       92 return unless $self->_conn_exists( $id );
292 29 100       461 if ( $self->{clients}->{ $id }->{BUFFER} ) {
293 6         9 my $item = shift @{ $self->{clients}->{ $id }->{BUFFER} };
  6         14  
294 6 100       16 unless ( $item ) {
295 2         10 delete $self->{clients}->{ $id }->{BUFFER};
296 2         15 $self->_send_event( $self->{_prefix} . 'client_flushed', $id );
297 2         8 return;
298             }
299 4         15 $self->{clients}->{ $id }->{wheel}->put($item);
300 4         243 return;
301             }
302 23 100       99 unless ( $self->{clients}->{ $id }->{quit} ) {
303 18         92 $self->_send_event( $self->{_prefix} . 'client_flushed', $id );
304 18         59 return;
305             }
306 5         47 delete $self->{clients}->{ $id };
307 5         1370 $self->_send_event( $self->{_prefix} . 'disconnected', $id );
308 5         16 return;
309             }
310              
311             sub _conn_alarm {
312 0     0   0 my ($kernel,$self,$id) = @_[KERNEL,OBJECT,ARG0];
313 0 0       0 return unless $self->_conn_exists( $id );
314 0         0 delete $self->{clients}->{ $id };
315 0         0 $self->_send_event( $self->{_prefix} . 'disconnected', $id );
316 0         0 return;
317             }
318              
319             sub _shutdown {
320 14     14   9071 my ($kernel,$self) = @_[KERNEL,OBJECT];
321 14         126 delete $self->{listener};
322 14         2531 delete $self->{clients};
323 14         380 $kernel->alarm_remove_all();
324 14         636 $kernel->alias_remove( $_ ) for $kernel->alias_list();
325 14 50       577 $kernel->refcount_decrement( $self->{session_id} => __PACKAGE__ ) unless $self->{alias};
326             # $self->_pluggable_destroy();
327 14         625 $self->_unregister_sessions();
328 14         379 return;
329             }
330              
331             sub register {
332 1     1 1 377 my ($kernel, $self, $session, $sender, @events) =
333             @_[KERNEL, OBJECT, SESSION, SENDER, ARG0 .. $#_];
334              
335 1 50       5 unless (@events) {
336 0         0 warn "register: Not enough arguments";
337 0         0 return;
338             }
339              
340 1         6 my $sender_id = $sender->ID();
341              
342 1         5 foreach (@events) {
343 1 50       7 $_ = $self->{_prefix} . $_ unless /^_/;
344 1         5 $self->{events}->{$_}->{$sender_id} = $sender_id;
345 1         19 $self->{sessions}->{$sender_id}->{'ref'} = $sender_id;
346 1 50 33     10 unless ($self->{sessions}->{$sender_id}->{refcnt}++ or $session == $sender) {
347 1         5 $kernel->refcount_increment($sender_id, __PACKAGE__);
348             }
349             }
350              
351 1         36 $kernel->post( $sender, $self->{_prefix} . 'registered', $self );
352 1         81 return;
353             }
354              
355             sub unregister {
356 2     2 1 3053 my ($kernel, $self, $session, $sender, @events) =
357             @_[KERNEL, OBJECT, SESSION, SENDER, ARG0 .. $#_];
358              
359 2 50       10 unless (@events) {
360 0         0 warn "unregister: Not enough arguments";
361 0         0 return;
362             }
363              
364 2         12 $self->_unregister($session,$sender,@events);
365 2         9 undef;
366             }
367              
368             sub _unregister {
369 2     2   7 my ($self,$session,$sender) = splice @_,0,3;
370 2         10 my $sender_id = $sender->ID();
371              
372 2         12 foreach (@_) {
373 2 50       13 $_ = $self->{_prefix} . $_ unless /^_/;
374 2         36 my $blah = delete $self->{events}->{$_}->{$sender_id};
375 2 50       18 unless ( $blah ) {
376 0         0 warn "$sender_id hasn't registered for '$_' events\n";
377 0         0 next;
378             }
379 2 50       9 if (--$self->{sessions}->{$sender_id}->{refcnt} <= 0) {
380 2         7 delete $self->{sessions}->{$sender_id};
381 2 50       9 unless ($session == $sender) {
382 2         10 $poe_kernel->refcount_decrement($sender_id, __PACKAGE__);
383             }
384             }
385             }
386 2         125 undef;
387             }
388              
389             sub _unregister_sessions {
390 14     14   42 my $self = shift;
391 14         80 my $testd_id = $self->session_id();
392 14         29 foreach my $session_id ( keys %{ $self->{sessions} } ) {
  14         67  
393 10 50       76 if (--$self->{sessions}->{$session_id}->{refcnt} <= 0) {
394 10         34 delete $self->{sessions}->{$session_id};
395 10 50       68 $poe_kernel->refcount_decrement($session_id, __PACKAGE__)
396             unless ( $session_id eq $testd_id );
397             }
398             }
399             }
400              
401             sub __send_event {
402 0     0   0 my( $self, $event, @args ) = @_[ OBJECT, ARG0, ARG1 .. $#_ ];
403 0         0 $self->_send_event( $event, @args );
404 0         0 return;
405             }
406              
407             sub _send_event {
408 61     61   91 my $self = shift;
409 61         180 my ($event, @args) = @_;
410 61         100 my $kernel = $POE::Kernel::poe_kernel;
411 61         76 my %sessions;
412              
413 61         73 $sessions{$_} = $_ for (values %{$self->{events}->{$self->{_prefix} . 'all'}}, values %{$self->{events}->{$event}});
  61         230  
  61         384  
414              
415 61         312 $kernel->post( $_ => $event => @args ) for values %sessions;
416 61         6178 undef;
417             }
418              
419             sub send_to_client {
420 25     25 1 18721 my $self = shift;
421 25         140 $poe_kernel->call( $self->{session_id}, '_send_to_client', @_ );
422             }
423              
424             sub _send_to_client {
425 25     25   1387 my ($kernel,$self,$id,$output) = @_[KERNEL,OBJECT,ARG0..ARG1];
426 25 50       93 return unless $self->_conn_exists( $id );
427 25 50       74 return unless defined $output;
428              
429 25 100       83 if ( ref $output eq 'ARRAY' ) {
430 2         4 my $temp = [ @{ $output } ];
  2         5  
431 2         4 my $first = shift @{ $temp };
  2         4  
432 2 50       3 $self->{clients}->{ $id }->{BUFFER} = $temp if scalar @{ $temp };
  2         11  
433 2 50       23 $self->{clients}->{ $id }->{wheel}->put($first) if defined $first;
434 2         307 return 1;
435             }
436              
437 23         310 $self->{clients}->{ $id }->{wheel}->put($output);
438 23         2370 return 1;
439             }
440              
441             sub send_to_all_clients {
442 0     0 1 0 my $self = shift;
443 0         0 $poe_kernel->call( $self->{session_id}, '_send_to_all_clients', @_ );
444             }
445              
446             sub _send_to_all_clients {
447 1     1   645 my ($kernel,$self,$output) = @_[KERNEL,OBJECT,ARG0];
448 1 50       4 return unless defined $output;
449 1         2 $self->send_to_client( $_, $output ) for
  1         6  
450             keys %{ $self->{clients} };
451 1         9 return 1;
452             }
453              
454             q{Putting the test into POE};
455              
456              
457             __END__