File Coverage

blib/lib/Sprocket/Server.pm
Criterion Covered Total %
statement 92 117 78.6
branch 12 28 42.8
condition 11 27 40.7
subroutine 19 20 95.0
pod 4 10 40.0
total 138 202 68.3


line stmt bran cond sub pod time code
1             package Sprocket::Server;
2              
3 5     5   7480 use strict;
  5         14  
  5         172  
4 5     5   25 use warnings;
  5         11  
  5         148  
5              
6 5     5   25 use Sprocket qw( Base );
  5         11  
  5         31  
7 5     5   35 use base qw( Sprocket::Base );
  5         10  
  5         486  
8 5         36 use POE qw(
9             Wheel::SocketFactory
10             Filter::Stackable
11             Filter::Stream
12             Driver::SysRW
13 5     5   28 );
  5         11  
14 5     5   10368 use Socket qw( INADDR_ANY inet_ntoa inet_aton AF_INET AF_UNIX PF_UNIX sockaddr_in );
  5         13  
  5         471  
15 5     5   30 use Scalar::Util qw( dualvar );
  5         11  
  5         369  
16              
17             __PACKAGE__->mk_accessors( qw( listen_address listen_port ) );
18              
19             BEGIN {
20 5     5   39 $sprocket->register_hook( [qw(
21             sprocket.local.connection.accept
22             sprocket.local.connection.reject
23             sprocket.local.connection.receive
24             sprocket.local.wheel.error
25             )] );
26             }
27              
28             sub spawn {
29 6     6 1 72 my $class = shift;
30            
31 6         92 my $self = $class->SUPER::spawn(
32             $class->SUPER::new(
33             @_,
34             _type => 'local',
35             ),
36             qw(
37             _startup
38             _stop
39              
40             local_accept
41             local_receive
42             local_flushed
43             local_wheel_error
44             local_error
45              
46             accept
47             reject
48             ),
49             );
50              
51 6         26 return $self;
52             }
53              
54             sub check_params {
55 6     6 0 28 my $self = shift;
56              
57 6   50     36 $self->{name} ||= 'Server';
58 6   50     34 $self->{opts}->{listen_address} ||= INADDR_ANY;
59 6   50     60 $self->{opts}->{domain} ||= AF_INET;
60 6 50       32 $self->{opts}->{listen_port} = 0
61             unless ( defined( $self->{opts}->{listen_port} ) );
62 6   50     46 $self->{opts}->{listen_queue} ||= 10000;
63 6   50     53 $self->{opts}->{reuse} ||= 'yes';
64            
65 6 50       29 if ( $self->{opts}->{ssl} ) {
66 0         0 eval 'use POE::Filter::SSL;';
67 0 0       0 if ( $@ ) {
68 0         0 die "During load of POE::Filter::SSL: $@\n";
69             } else {
70 0         0 $self->_log( v => 2, msg => "SSL is ON for "
71             ."$self->{opts}->{listen_address}:$self->{opts}->{listen_port}");
72             }
73             }
74              
75 6         34 return;
76             }
77              
78             sub _startup {
79 6     6   15 my $self = $_[ OBJECT ];
80              
81             # create a socket factory
82 6         130 $self->{wheel} = POE::Wheel::SocketFactory->new(
83             BindPort => $self->{opts}->{listen_port},
84             BindAddress => $self->{opts}->{listen_address},
85             SocketDomain => $self->{opts}->{domain},
86             Reuse => $self->{opts}->{reuse},
87             SuccessEvent => 'local_accept',
88             FailureEvent => 'local_wheel_error',
89             ListenQueue => $self->{opts}->{listen_queue},
90             );
91              
92 6         8907 my ( $port, $ip ) = ( sockaddr_in( $self->{wheel}->getsockname() ) );
93 6         223 $ip = inet_ntoa( $ip );
94              
95 6   66     113 $self->listen_port( $self->{opts}->{listen_port} || $port );
96 6   33     75 $self->listen_address( $ip || $self->{opts}->{listen_address} );
97              
98 6         55 $self->_log( v => 2, msg => sprintf( "Listening to port %d(%d) on %s(%s)",
99             $self->{opts}->{listen_port}, $self->listen_port,
100             $self->{opts}->{listen_address}, $self->listen_address ) );
101             }
102              
103             sub _stop {
104 6     6   13 my $self = $_[ OBJECT ];
105 6         37 $self->_log( v => 2, msg => $self->{name}." stopped.");
106             }
107              
108             # Accept a new connection
109              
110             sub local_accept {
111 4     4 1 15 my ( $self, $socket, $peer_ip, $peer_port ) =
112             @_[ OBJECT, ARG0, ARG1, ARG2 ];
113              
114 4         19 my ( $port, $ip );
115 4 50       19 if ( length( $peer_ip ) == 4 ) {
116 4         45 ( $port, $ip ) = ( sockaddr_in( getsockname( $socket ) ) );
117 4         74 $peer_ip = inet_ntoa( $peer_ip );
118 4         20 $ip = inet_ntoa( $ip );
119             } else {
120             # ipv6
121 0         0 ( $port, $ip ) = ( Socket6::sockaddr_in6( getsockname( $socket ) ) );
122 0         0 $peer_ip = Socket6::inet_ntop( $self->{opts}->{domain}, $peer_ip );
123 0         0 $ip = Socket6::inet_ntop( $self->{opts}->{domain}, $ip );
124             }
125              
126 4         47 my $con = $self->new_connection(
127             local_ip => $ip,
128             local_port => $port,
129             peer_ip => $peer_ip,
130             # TODO resolve these?
131             peer_hostname => $peer_ip,
132             peer_port => $peer_port,
133             peer_addr => "$peer_ip:$peer_port",
134             );
135            
136 4         16 $con->socket( $socket );
137              
138 4         37 $self->process_plugins( [ 'local_accept', $self, $con, $socket ] );
139            
140 4         15 return;
141             }
142              
143             sub accept {
144 4     4 0 13 my ( $self, $con, $opts ) = @_[ OBJECT, HEAP, ARG0 ];
145            
146 4 50       17 $opts = {} unless ( $opts );
147              
148 4   50     26 $opts->{block_size} ||= 2048;
149 4   33     56 $opts->{filter} ||= POE::Filter::Stackable->new(
150             Filters => [
151             POE::Filter::Stream->new(),
152             ]
153             );
154 4 50       150 $opts->{time_out} = $self->{opts}->{time_out}
155             unless( defined( $opts->{time_out} ) );
156              
157 4 50       19 if ( $self->{opts}->{ssl} ) {
158 0 0 0     0 if ( $opts->{filter}->isa( 'POE::Filter::Stackable' ) || $opts->{filter}->can( 'push' ) ) {
159             # TODO use filter push
160 0         0 eval {
161 0         0 $opts->{filter} = POE::Filter::Stackable->new(
162             Filters => [
163             POE::Filter::SSL->new(
164             key_file => $self->{opts}->{ssl_key_file},
165             cert_file => $self->{opts}->{ssl_cert_file}
166             )
167             ]
168             );
169             };
170 0 0       0 if ( $@ ) {
171 0         0 $self->_log( v => 1, msg => "Could not push POE::Filter::SSL on the stack, REJECTING CONNECTION : $@");
172 0         0 $con->close( 1 );
173 0         0 return;
174             }
175 0         0 $self->_log( v => 4, msg => "Using SSL");
176             } else {
177 0         0 $self->_log( v => 1, msg => "The filter: $opts->{filter} does not have a push method, or isn't a Stackable Filter. REJECTING CONNECTION");
178 0         0 $con->close( 1 );
179 0         0 return;
180             }
181             }
182              
183 4         41 my $socket = $con->socket;
184              
185 4         55 $con->wheel_readwrite(
186             Handle => $socket,
187             Driver => POE::Driver::SysRW->new( BlockSize => $opts->{block_size} ),
188             Filter => $opts->{filter},
189             InputEvent => $con->event( 'local_receive' ),
190             ErrorEvent => $con->event( 'local_error' ),
191             FlushedEvent => $con->event( 'local_flushed' ),
192             );
193              
194 4 100       23 $con->set_time_out( $opts->{time_out} )
195             if ( $opts->{time_out} );
196            
197 4         31 $sprocket->broadcast( 'sprocket.local.connection.accept', {
198             source => $self,
199             target => $con,
200             } );
201            
202 4         21 $con->socket( undef );
203            
204 4         89 $self->process_plugins( [ 'local_connected', $self, $con, $socket ] );
205              
206             # nothing took the connection
207 4 50       18 unless ( $con->plugin ) {
208 0         0 $self->_log( v => 2, msg => "No plugin took this connection, Dropping.");
209 0         0 $con->close();
210             }
211            
212 4         33 return;
213             }
214              
215             sub reject {
216 0     0 0 0 my ( $self, $con ) = @_[ OBJECT, HEAP ];
217            
218 0         0 $sprocket->broadcast( 'sprocket.remote.connection.reject', {
219             source => $self,
220             target => $con,
221             } );
222            
223             # XXX other?
224 0         0 $con->socket( undef );
225 0         0 $con->close( 1 );
226            
227 0         0 return;
228             }
229              
230             sub local_receive {
231 5     5 1 17 my ( $self, $con ) = @_[ OBJECT, HEAP ];
232            
233 5         49 $sprocket->broadcast( 'sprocket.local.connection.receive', {
234             source => $self,
235             target => $con,
236             data => $_[ARG0]
237             } );
238            
239 5         37 $self->process_plugins( [ 'local_receive', $self, $con, $_[ARG0] ] );
240            
241 5         23 return;
242             }
243              
244             sub local_flushed {
245 6     6 0 15 my ( $self, $con ) = @_[ OBJECT, HEAP ];
246              
247 6 50 33     25 $con->close()
248             if ( $con->close_on_flush && not $con->get_driver_out_octets() );
249              
250             # If you need this event in your plugin, subclass Sprocket::Server
251            
252 6         43 return;
253             }
254              
255             sub local_wheel_error {
256 1     1 0 4 my ( $self, $operation, $errnum, $errstr ) =
257             @_[ OBJECT, ARG0, ARG1, ARG2 ];
258            
259 1         10 $self->_log( v => 1, msg => $self->{name}." encountered $operation error $errnum: $errstr (Server socket wheel)");
260            
261 1         9 $sprocket->broadcast( 'sprocket.local.wheel.error', {
262             source => $self,
263             operation => $operation,
264             errnum => $errnum,
265             errstr => $errstr,
266             } );
267            
268 1         8 $self->process_plugins( [ 'local_error', $self, $operation, $errnum, $errstr ] );
269            
270 1         5 return;
271             }
272              
273             sub local_error {
274 1     1 1 5 my ( $self, $con, $operation, $errnum, $errstr ) =
275             @_[ OBJECT, HEAP, ARG0, ARG1, ARG2 ];
276            
277 1         18 $con->error( dualvar( $errnum, "$operation - $errstr" ) );
278            
279             # TODO use constant
280 1 50       10 $self->_log( v => 3, msg => $self->{name}." encountered $operation error $errnum: $errstr")
281             if ( $errnum != 0 );
282            
283 1         6 $self->process_plugins( [ 'local_disconnected', $self, $con, 1, $operation, $errnum, $errstr ] );
284            
285 1         5 $con->close();
286            
287 1         4 return;
288             }
289              
290             sub begin_soft_shutdown {
291 1     1 0 3 my $self = $_[ OBJECT ];
292              
293 1         6 $self->_log( v => 2, msg => $self->{name}." is shuting down (soft)");
294              
295 1         3 foreach ( values %{$self->{heaps}} ) {
  1         4  
296 1 50       3 next unless defined;
297 1         6 $self->process_plugins( [ 'local_shutdown', $self, $_ ] );
298             }
299              
300 1         4 return;
301             }
302              
303             1;
304              
305             __END__