File Coverage

blib/lib/POE/Component/Server/FTP/DataSession.pm
Criterion Covered Total %
statement 18 222 8.1
branch 0 74 0.0
condition 0 12 0.0
subroutine 6 25 24.0
pod 0 14 0.0
total 24 347 6.9


line stmt bran cond sub pod time code
1             package POE::Component::Server::FTP::DataSession;
2              
3             ###########################################################################
4             ### POE::Component::Server::FTP::DataSession
5             ### L.M.Orchard (deus_x@pobox.com)
6             ### David Davis (xantus@cpan.org)
7             ###
8             ### TODO:
9             ### -- get rid of *_limit and use params instead
10             ###
11             ### Copyright (c) 2001 Leslie Michael Orchard. All Rights Reserved.
12             ### This module is free software; you can redistribute it and/or
13             ### modify it under the same terms as Perl itself.
14             ###
15             ### Changes Copyright (c) 2003-2004 David Davis and Teknikill Software
16             ###########################################################################
17              
18 1     1   6 use strict;
  1         1  
  1         30  
19 1     1   7 use IO::Socket::INET;
  1         2  
  1         16  
20 1     1   1854 use IO::Scalar;
  1         4545  
  1         58  
21 1     1   7 use POE qw(Session Wheel::ReadWrite Filter::Stream Driver::SysRW Wheel::SocketFactory);
  1         2  
  1         11  
22 1     1   1792 use Time::HiRes qw(time);
  1         3  
  1         8  
23              
24 1     1   2653 use Data::Dumper;
  1         8219  
  1         3479  
25              
26             # Create a new DataSession
27              
28             sub new {
29 0     0 0   my ($type, $para, $opt) = @_;
30 0           my $self = bless { }, $type;
31              
32 0           my $ses = POE::Session->create(
33             #options =>{ trace=>1 },
34             args => [ $para, $opt ],
35             object_states => [
36             $self => {
37             _start => '_start',
38             _stop => '_stop',
39              
40             _drop => '_drop',
41             start_LIST => 'start_LIST',
42             start_NLST => 'start_NLST',
43             start_STOR => 'start_STOR',
44             start_RETR => 'start_RETR',
45              
46             execute => 'execute',
47             data_send => 'data_send',
48              
49             data_receive => 'data_receive',
50             data_flushed => 'data_flushed',
51             data_error => 'data_error',
52             data_throttle => 'data_throttle',
53             data_resume => 'data_resume',
54              
55             stop_socket => 'stop_socket',
56              
57             _sock_up => '_sock_up',
58             _sock_down => '_sock_down',
59              
60             send_stats => 'send_stats',
61             }
62             ],
63             );
64              
65 0           return $ses->ID;
66             }
67              
68             sub _start {
69 0     0     my ($kernel, $heap, $para, $opt) = @_[KERNEL, HEAP, ARG0, ARG1];
70              
71             # generating a port num
72             # my $x = pack('n',$port);
73             # my $p1 = ord(substr($x,0,1));
74             # my $p2 = ord(substr($x,1,1));
75              
76 0           $heap->{send_recv_okay} = 0;
77 0           $heap->{listening} = 0;
78 0           $heap->{rest} = 0;
79 0           $heap->{total_bytes} = 0;
80 0           $heap->{bps} = 0;
81 0           $heap->{send_done} = 0;
82 0           $heap->{type} = 'dl'; # default to download
83 0           $heap->{c_session} = $_[SENDER]->ID;
84 0           %{$heap->{params}} = %{$para};
  0            
  0            
85              
86 0 0         if ($opt->{data_port}) {
87 0           $kernel->call($heap->{c_session} => _write_log => 4 => "starting a PORT data session");
88             # PORT command
89 0           my ($h1, $h2, $h3, $h4, $p1, $p2) = split(',', $opt->{data_port});
90              
91 0           my $peer_addr = $h1.".".$h2.".".$h3.".".$h4;
92 0           $heap->{port} = ($p1<<8)+$p2;
93 0           $heap->{remote_ip} = $peer_addr;
94              
95 0           $heap->{data} = POE::Wheel::SocketFactory->new(
96             SocketDomain => AF_INET,
97             SocketType => SOCK_STREAM,
98             SocketProtocol => 'tcp',
99             RemoteAddress => $peer_addr,
100             RemotePort => $heap->{port},
101             SuccessEvent => '_sock_up',
102             FailureEvent => '_sock_down',
103             );
104              
105 0           $heap->{cmd} = $opt->{cmd};
106 0 0         $heap->{rest} = $opt->{rest} if ($opt->{rest});
107 0           $heap->{filename} = $opt->{filename};
108 0           $heap->{file_path} = $opt->{fs}->{file_path};
109             } else {
110 0           $kernel->call($heap->{c_session} => _write_log => 4 => "starting a PASV data session");
111             # PASV command
112 0           $heap->{port} = ($opt->{port1}<<8)+$opt->{port2};
113              
114 0           $heap->{data} = POE::Wheel::SocketFactory->new(
115             BindAddress => INADDR_ANY, # Sets the bind() address
116             BindPort => $heap->{port}, # Sets the bind() port
117             SuccessEvent => '_sock_up', # Event to emit upon accept()
118             FailureEvent => '_sock_down', # Event to emit upon error
119             SocketDomain => AF_INET, # Sets the socket() domain
120             SocketType => SOCK_STREAM, # Sets the socket() type
121             SocketProtocol => 'tcp', # Sets the socket() protocol
122             Reuse => 'off', # Lets the port be reused
123             );
124              
125 0           $heap->{listening} = 1;
126             # the command is issued on the next call via
127             # a direct post to our session
128             }
129              
130 0           $heap->{filesystem} = $opt->{fs};
131 0           $heap->{block_size} = 8 * 1024;
132 0           $heap->{opt} = $opt->{opt};
133             }
134              
135             sub _sock_up {
136 0     0     my ($kernel, $heap, $session, $socket) = @_[KERNEL, HEAP, SESSION, ARG0];
137              
138 0           my $buffer_max = 4 * 1024;
139 0           my $buffer_min = 128;
140              
141 0           $heap->{data} = POE::Wheel::ReadWrite->new(
142             Handle => $socket,
143             Driver => POE::Driver::SysRW->new(),
144             Filter => POE::Filter::Stream->new(),
145             InputEvent => 'data_receive',
146             ErrorEvent => 'data_error',
147             FlushedEvent => 'data_flushed',
148             HighMark => $buffer_max,
149             LowMark => $buffer_min,
150             HighEvent => 'data_throttle',
151             LowEvent => 'data_resume',
152             );
153              
154 0           my ($port, $ip) = (sockaddr_in(getsockname($socket)));
155 0           $heap->{remote_ip} = inet_ntoa($ip);
156 0           $heap->{remote_port} = $port;
157            
158 0           $kernel->call($heap->{params}{'Alias'}, notify => ftpd_dcon_connected => {
159             dcon_session => $session->ID,
160             con_session => $heap->{c_session},
161             remote_ip => $heap->{remote_ip},
162             port => $heap->{remote_port},
163             });
164              
165 0 0         if ($heap->{listening} == 0) {
166 0           $kernel->call($heap->{c_session} => _write_log => 4 => "data session started for $heap->{cmd} ($heap->{opt})");
167 0           $kernel->yield('start_'.(uc $heap->{cmd}), $heap->{opt});
168             } else {
169             # TODO check if correct IP connected if that option is on
170 0           $kernel->call($heap->{c_session} => _write_log => 4 => "received connection from $heap->{remote_ip}");
171             }
172             }
173              
174             sub _sock_down {
175 0     0     my ($kernel, $heap) = @_[KERNEL, HEAP];
176 0           $kernel->call($heap->{c_session} => _write_log => 4 => "socket down");
177 0           delete $heap->{data};
178             }
179              
180             sub send_stats {
181 0     0 0   my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP];
182              
183 0           $kernel->call($heap->{params}{'Alias'}, notify => ftpd_bps_stats => {
184             type => $heap->{type},
185             bps => $heap->{bps},
186             session => $session->ID,
187             con_session => $heap->{c_session},
188             remote_ip => $heap->{remote_ip},
189             remote_port => $heap->{remote_port},
190             xfer_time => $heap->{xfer_time},
191             total_bytes => $heap->{total_bytes},
192             time => time(),
193             send_done => $heap->{send_done},
194             rest => $heap->{rest},
195             file_size => $heap->{file_size},
196             file_stat => $heap->{file_stat},
197             filename => $heap->{filename},
198             file_path => $heap->{file_path},
199             });
200              
201 0 0         unless ($heap->{send_done} == 1) {
202 0           $kernel->delay_set(send_stats => 2);
203             }
204             }
205              
206             sub start_LIST {
207 0     0 0   my ($kernel, $heap, $dirfile) = @_[KERNEL, HEAP, ARG0];
208 0           my $fs = $heap->{filesystem};
209              
210 0           my $out = "";
211 0           foreach ($fs->list_details($dirfile)) {
212 0           $out .= "$_\r\n";
213             }
214              
215 0           $heap->{input_fh} = IO::Scalar->new(\$out);
216 0           $heap->{send_done} = 0;
217 0           $heap->{send_recv_okay} = 1;
218 0           $kernel->yield('execute');
219             }
220              
221             sub start_NLST {
222 0     0 0   my ($kernel, $heap, $dirfile) = @_[KERNEL, HEAP, ARG0];
223 0           my $fs = $heap->{filesystem};
224              
225 0           my $out = "";
226 0           foreach ($fs->list($dirfile)) {
227 0           $out .= "$_\r\n";
228             }
229              
230 0           $heap->{input_fh} = IO::Scalar->new(\$out);
231 0           $heap->{send_done} = 0;
232 0           $heap->{send_recv_okay} = 1;
233 0           $kernel->yield('execute');
234             }
235              
236             sub start_RETR {
237 0     0 0   my ($kernel, $heap, $fh, $opt) = @_[KERNEL, HEAP, ARG0, ARG1];
238              
239 0           foreach my $f (qw( rest filename )) {
240 0 0         if (exists($opt->{$f})) {
241 0           $heap->{$f} = $opt->{$f};
242             }
243             }
244              
245 0           $heap->{file_path} = $heap->{filesystem}->{file_path};
246            
247 0           $heap->{input_fh} = $fh;
248 0           $heap->{filesystem}->seek($fh,$heap->{rest},0);
249              
250 0           @{$heap->{file_stat}} = $fh->stat();
  0            
251 0           $heap->{file_size} = $heap->{file_stat}[7];
252              
253 0           $heap->{send_done} = 0;
254 0           $heap->{send_recv_okay} = 1;
255 0           $kernel->yield('execute');
256             }
257              
258             sub start_STOR {
259 0     0 0   my ($kernel, $heap, $fh, $opt) = @_[KERNEL, HEAP, ARG0, ARG1];
260              
261 0           foreach my $f (qw( rest filename )) {
262 0 0         if (exists($opt->{$f})) {
263 0           $heap->{$f} = $opt->{$f};
264             }
265             }
266            
267 0           $heap->{file_path} = $heap->{filesystem}->{file_path};
268            
269 0           $heap->{output_fh} = $fh;
270 0           $heap->{filesystem}->seek($fh,$heap->{rest},0);
271            
272 0           @{$heap->{file_stat}} = $fh->stat();
  0            
273             # not usefull?
274 0           $heap->{file_size} = $heap->{file_stat}[7];
275            
276 0           $heap->{type} = 'ul';
277 0           $heap->{send_recv_okay} = 1;
278 0           $heap->{xfer_time} = time();
279 0           $kernel->yield('execute');
280             }
281              
282 0     0     sub _stop {
283             # my $kernel = $_[KERNEL];
284             }
285              
286             # Execute the session's pending upload
287              
288             sub execute {
289 0     0 0   my ($kernel, $heap, $session) = @_[KERNEL, HEAP, SESSION];
290              
291 0           $kernel->yield("send_stats");
292            
293 0 0         if (defined $heap->{input_fh}) {
    0          
294 0           $heap->{xfer_time} = time();
295 0           $kernel->yield('data_send');
296             } elsif (!defined $heap->{output_fh}) {
297 0 0         if ($heap->{listening} == 0) {
298 0           $kernel->call($session->ID => '_drop');
299             }
300             }
301             }
302              
303             sub stop_socket {
304 0     0 0   my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP];
305              
306 0           delete $heap->{time_out};
307              
308 0 0         if (ref($heap->{data}) eq 'POE::Wheel::SocketFactory') {
309             # still a factory?! Time to drop connection
310 0           delete $heap->{data};
311             }
312             }
313              
314             # Send a block to the remote client
315              
316             sub data_send {
317 0     0 0   my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP];
318              
319 0 0 0       if ( (!defined $heap->{input_fh}) || (! ref $heap->{input_fh} ) ) {
    0 0        
320 0           $kernel->call($session->ID => '_drop');
321             } elsif ($heap->{send_recv_okay} && (defined $heap->{data})) {
322              
323             # if we haven't connected yet, then data will still be a factory
324 0 0         if (ref($heap->{data}) eq 'POE::Wheel::SocketFactory') {
325 0           $kernel->call($heap->{c_session} => _write_log => 4 => "data is still a SocketFactory (not connected yet?)");
326 0 0         if (defined $heap->{time_out}) {
327 0           $heap->{time_out} = $kernel->delay_set(stop_socket => 30);
328             }
329 0           $kernel->delay_set('data_send' => 2);
330 0           return;
331             }
332              
333 0 0         if (defined $heap->{time_out}) {
334 0           $kernel->alarm_remove($heap->{time_out});
335 0           delete $heap->{time_out};
336             }
337              
338 0           $heap->{bps} = ($heap->{total_bytes} / (time() - $heap->{xfer_time}));
339              
340 0 0         if ($heap->{params}{'DownloadLimit'} > 0) {
341 0 0         if ($heap->{params}{'LimitSceme'} eq 'ip') {
342 0 0         if ($kernel->call($heap->{params}{'Alias'} => _bw_limit => 'dl' => $heap->{remote_ip} => $heap->{bps})) {
343 0           $kernel->yield('data_send');
344 0           return;
345             }
346             } else {
347 0 0         if ($heap->{bps} > $heap->{params}{'DownloadLimit'}) {
348 0           $kernel->yield('data_send');
349 0           return;
350             }
351             }
352             }
353              
354             ### Read in a block from the file.
355 0           my $buf;
356 0           my $len = $heap->{input_fh}->read($buf, $heap->{block_size});
357              
358             ### If something was read, queue it to be sent, and yield
359             ### back for another data_send.
360 0 0         if ($len > 0) {
361 0           $heap->{total_bytes} += $len;
362 0           $heap->{data}->put($buf);
363 0           $kernel->yield('data_send');
364             } else {
365             # If nothing was read, assume EOF, and shut everything down.
366 0           my $fs = $heap->{filesystem};
367 0           $fs->close_read($heap->{input_fh});
368 0           delete $heap->{input_fh};
369              
370 0           $kernel->call($session->ID => '_drop');
371             }
372             }
373             }
374              
375             # Recieve a block from the remote client
376              
377             sub data_receive {
378 0     0 0   my ($kernel, $heap, $session, $data) = @_[KERNEL, HEAP, SESSION, ARG0];
379              
380 0 0 0       if ( (!defined $heap->{output_fh}) || (! ref $heap->{output_fh} ) ) {
    0 0        
381 0           $kernel->call($session->ID => '_drop');
382             } elsif ($heap->{send_recv_okay} && (defined $heap->{data})) {
383              
384             # if we haven't connected yet, then data will still be a factory
385 0 0         if (ref($heap->{data}) eq 'POE::Wheel::SocketFactory') {
386 0           $kernel->call($heap->{c_session} => _write_log => 4 => "data is still a SocketFactory (not connected yet?)");
387 0 0         if (defined $heap->{time_out}) {
388 0           $heap->{time_out} = $kernel->delay_set(stop_socket => 30);
389             }
390 0           $kernel->delay_set('data_receive' => 1, $data);
391 0           return;
392             }
393              
394 0 0         if (defined $heap->{time_out}) {
395 0           $kernel->alarm_remove($heap->{time_out});
396 0           delete $heap->{time_out};
397             }
398              
399 0           $heap->{bps} = ($heap->{total_bytes} / (time() - $heap->{xfer_time}));
400              
401 0 0         if ($heap->{params}{'UploadLimit'} > 0) {
402 0 0         if ($heap->{params}{'LimitSceme'} eq 'ip') {
403 0 0         if ($kernel->call($heap->{params}{'Alias'} => _bw_limit => 'ul' => $heap->{remote_ip} => $heap->{bps})) {
404 0           $kernel->yield('data_receive');
405 0           $heap->{data}->pause_input();
406             } else {
407 0           $heap->{data}->resume_input();
408             }
409             } else {
410 0 0         if ($heap->{bps} > $heap->{params}{'UploadLimit'}) {
411 0           $kernel->yield('data_receive');
412 0           $heap->{data}->pause_input();
413             } else {
414 0           $heap->{data}->resume_input();
415             }
416             }
417             }
418              
419 0 0         if (defined $data) {
420 0           $heap->{total_bytes} += length($data);
421              
422 0           $heap->{output_fh}->print($data);
423             }
424             }
425             }
426              
427             sub data_error {
428 0     0 0   my ($kernel, $heap, $session, $operation, $errnum, $errstr) = @_[KERNEL, HEAP, SESSION, ARG0, ARG1, ARG2];
429 0           my $fs = $heap->{filesystem};
430              
431 0 0         if ($errnum) {
432 0           $kernel->call($heap->{c_session} => _write_log => 4 => "session with $heap->{remote_ip} : $heap->{port} encountered $operation error $errnum: $errstr");
433             } else {
434 0           $kernel->call($heap->{c_session} => _write_log => 4 => "client at $heap->{remote_ip} : $heap->{port} disconnected");
435             }
436              
437             # either way, stop this session
438 0 0         if (defined $heap->{output_fh}) {
439 0           $fs->close_write($heap->{output_fh});
440 0           delete $heap->{output_fh};
441             }
442              
443 0 0         if (defined $heap->{input_fh}) {
444 0           $fs->close_read($heap->{input_fh});
445 0           delete $heap->{input_fh};
446             }
447            
448 0           $heap->{send_done} = 1;
449 0           $kernel->call($session->ID => 'send_stats');
450 0           $kernel->alarm_remove_all();
451              
452 0           delete $heap->{data};
453             }
454              
455             sub data_flushed {
456 0     0 0   my ($kernel, $heap, $session) = @_[KERNEL, HEAP, SESSION];
457 0 0         if ($heap->{send_done} == 1) {
458 0           $kernel->call($session->ID => 'send_stats');
459 0           $kernel->alarm_remove_all();
460 0           $kernel->call($heap->{c_session} => _write_log => 4 => "data flushed, dropping connection");
461 0           delete $heap->{data};
462             }
463             }
464              
465             sub data_throttle {
466 0     0 0   $_[HEAP]->{send_recv_okay} = 0;
467             }
468              
469             sub data_resume {
470 0     0 0   $_[HEAP]->{send_recv_okay} = 1;
471 0           $_[KERNEL]->yield('data_send');
472             }
473              
474             sub _drop {
475 0     0     my ($kernel, $heap, $session) = @_[KERNEL, HEAP, SESSION];
476              
477 0           $kernel->alarm_remove_all();
478            
479 0           $heap->{send_done} = 1; # for send_stats, so it doesn't delay again
480            
481 0 0         return unless ($heap->{data});
482              
483 0 0         if (ref($heap->{data}) eq 'POE::Wheel::SocketFactory') {
484             # never connected...
485 0           $kernel->call($heap->{c_session} => _write_log => 4 => "Still a SocketFactory in _drop");
486 0           $kernel->call($heap->{c_session} => _write_log => 3 => "Connection timed out");
487 0           delete $heap->{data};
488 0           return;
489             }
490            
491             # if we are fully flushed, go ahead and disconnect
492 0 0         if ($heap->{data}->get_driver_out_octets() == 0) {
493 0           $kernel->call($heap->{c_session} => _write_log => 4 => "data finished, dropping connection");
494 0           delete $heap->{data};
495             } else {
496             # if not, then we set a flag and the flushed event
497             # drops the connection
498 0           $heap->{send_done} = 1;
499             }
500             }
501             1;