File Coverage

blib/lib/POE/Component/Client/TCPMulti.pm
Criterion Covered Total %
statement 125 176 71.0
branch 22 58 37.9
condition 13 27 48.1
subroutine 23 33 69.7
pod 0 3 0.0
total 183 297 61.6


line stmt bran cond sub pod time code
1             # vim600: set ts=4 sw=4 tw=80 expandtab nowrap noai cin foldmethod=marker:
2             # A Multiplex TCP Component designed for performance.
3             # -----------------------------------------------------------------------------
4             # "THE BEER-WARE LICENSE" (Revision 43) borrowed from FreeBSD's jail.c:
5             # wrote this file. As long as you retain this notice you
6             # can do whatever you want with this stuff. If we meet some day, and you think
7             # this stuff is worth it, you can buy me a beer in return. Scott S. McCoy
8             # -----------------------------------------------------------------------------
9             # See TCPMulti.otl (TVO format) or TCPMulti.pod (POD format) for documentation
10             package POE::Component::Client::TCPMulti;
11              
12             # Settings and Initialization {{{
13              
14 2     2   552691 use strict;
  2         4  
  2         85  
15 2     2   10 use warnings FATAL => qw( all );
  2         3  
  2         109  
16 2     2   11 use constant CHEAP => -1;
  2         8  
  2         148  
17              
18             # POE::Component::Server::TCPMulti can export cheap also
19             # We're not going to require order from the user.
20             sub import {
21 2     2   11 no strict "refs";
  2         5  
  2         182  
22 2     2   34 my $caller = caller;
23              
24 2 50       5 unless (defined *{"${caller}::CHEAP"}) {
  2         20  
25 2         5 *{"${caller}::CHEAP"} = \&CHEAP;
  2         38  
26             }
27             }
28              
29              
30 2     2   1917 use UNIVERSAL;
  2         27  
  2         10  
31 2         58 use POE qw( Kernel
32             Session
33             Driver::SysRW
34             Filter::Line
35             Wheel::ReadWrite
36 2     2   71 Wheel::SocketFactory );
  2         3  
37              
38 2     2   76819 use Carp qw( carp croak );
  2         7  
  2         391  
39              
40             *VERSION = \0.0524;
41              
42             our $VERSION;
43             BEGIN {
44 2 50   2   12 unless (defined &DEBUG) {
45 2         111 constant->import(DEBUG => 0);
46             }
47 2 50       8 unless (defined &TRACE_EVENTS) {
48 2         61 constant->import(TRACE_EVENTS => 0);
49             }
50 2 50       12 unless (defined &TRACE_CONNECT) {
51 2         54 constant->import(TRACE_CONNECT => 0);
52             }
53 2 50       17 unless (defined &TRACE_FILENAME) {
54 2         14556 constant->import(TRACE_FILENAME => 0);
55             }
56             }
57              
58             if (DEBUG) {
59             print "TCPMulti: DEBUG MODE ENABLED\n";
60             }
61             if (TRACE_FILENAME) {
62             open TRACE, ">", TRACE_FILENAME;
63             }
64             else {
65             *TRACE = *STDERR;
66             }
67              
68             # Heap is now package global. This is fine, each wheel throughout the POE
69             # Kernel has its own unique identification. So multiple component sessions
70             # can utilize the same hash for Connection Heaps.
71              
72             # Note: Explicit lexical was not accessable by the inline states (This seems to
73             # be a bug in perl >= 5.8.1, although its marked as simply changed behavior in
74             # the changelog. Its only with strange combinations of lexicals anonymous
75             # subroutines and anonymous hashrefs (As commonly used in POE
76             # programming...bastards :P)
77             our %heap;
78              
79             # }}}
80             # new (Depriciated) {{{
81              
82 1     1 0 2420 sub new { goto &create }
83            
84             # }}}
85             # Constructor {{{
86              
87             sub create {
88             # Initialization {{{
89              
90 1 50   1 0 5 shift if $_[0] eq __PACKAGE__;
91 1         2 my ($code, %user_code);
92              
93 1         5 %user_code = @_;
94              
95 1   100 60   59 $user_code{$_} ||= sub {} for qw( ErrorEvent
  60         207  
96             InputEvent
97             Initialize
98             Disconnected
99             SuccessEvent
100             FlushedEvent
101             FailureEvent
102             TimeoutEvent );
103              
104 1   50     8 $user_code{Timeout} ||= 30;
105 1   33     6 $user_code{ConnectTimeout} ||= $user_code{Timeout};
106 1   50     6 $user_code{InputTimeout} ||= 300;
107 1   50     14 $user_code{Filter} ||= "POE::Filter::Line";
108 1   50     7 $user_code{FilterArgs} ||= undef;
109 1   50     13 $user_code{options} ||= {};
110 1   50     7 $user_code{package_states} ||= [];
111 1   50     6 $user_code{object_states} ||= [];
112              
113 1 50       5 if (ref $user_code{Filter} eq "ARRAY") {
114 0         0 my @FilterData = @{ delete $user_code{Filter} };
  0         0  
115 0         0 $user_code{Filter} = shift @FilterData;
116 0         0 $user_code{FilterArgs} = \@FilterData;
117             }
118              
119 1         4 @{ $user_code{UserStates} }{ qw( _start _stop _child ) } =
  1         4  
120 1         25 delete @{ $user_code{inline_states} }{ qw( _start _stop _child ) };
121              
122             # }}}
123             # Internal States {{{
124             $code = {
125             # Session Events {{{
126             # _start: Session Start {{{
127              
128             _start => sub {
129 1 50   1   286 $_[KERNEL]->alias_set( delete $user_code{Alias} )
130             if defined $user_code{Alias};
131            
132 1 50       8 $user_code{UserStates}->{_start}->(@_)
133             if ref $user_code{UserStates}->{_start} eq "CODE";
134             },
135            
136             # }}}
137             # _child: Session Child {{{
138              
139             _child => sub {
140 0 0   0   0 $user_code{states}->{_child}->(@_)
141             if ref $user_code{UserStates}->{_child} eq "CODE";
142             },
143            
144             # }}}
145             # _stop: Session End {{{
146              
147             _stop => sub {
148 1 50   1   120 $user_code{UserStates}->{_stop}->(@_)
149             if ref $user_code{UserStates}->{_stop} eq "CODE";
150             },
151            
152             # }}}
153             # }}}
154             # Connection States {{{
155             # -success: Connection was successful (Internal) {{{
156              
157             -success => sub {
158 30     30   16260 my ($kernel, $handle, $old_id) = @_[KERNEL, ARG0, ARG3];
159 30         41 my $filter;
160            
161             # We need 1 filter per Wheel...yeah
162 30 50 33     126 if (ref $user_code{Filter} &&
163             UNIVERSAL::isa($user_code{Filter}, "UNIVERSAL")) {
164 0         0 $filter = $user_code{Filter} = ref $user_code{Filter};
165             }
166              
167 30         43 $filter = $user_code{Filter}->new( @{ $user_code{FilterArgs} } );
  30         165  
168              
169 30         1251 $heap{$old_id}{-SERVER} = POE::Wheel::ReadWrite->new
170             ( Handle => $handle,
171             Driver => POE::Driver::SysRW->new(BlockSize => 4096),
172             Filter => $filter,
173             InputEvent => '-incoming',
174             ErrorEvent => '-error',
175             # FlushedEvent => '-flushed',
176             );
177              
178             # Transfer entire heap (including wheel), reinstate -ID
179 30         9841 my $new_id = $heap{$old_id}{-SERVER}->ID;
180 30         916 my $cheap = $heap{$new_id} = delete $heap{$old_id};
181              
182 30         142 bless $heap{$new_id}, "POE::Component::Client::TCPMulti::CHEAP";
183              
184             # ARG4 differs from Wheel definition...its our new id.
185 30         84 push @_, $new_id, $cheap;
186              
187 30         76 $cheap->{-ID} = $new_id;
188 30         84 $cheap->{-TIMEOUT} = $user_code{InputTimeout};
189              
190 30 50       69 if ($user_code{InputTimeout}) {
    0          
191 30 50       79 if ($cheap->{-ALARM}) {
192             DEBUG && printf "%d << Adjusting alarm %d (%d s)\n",
193 30         31 $new_id, @{ $_[CHEAP] }{qw( -ALARM -TIMEOUT )};
194 30         133 $kernel->delay_adjust
195             ( $cheap->{-ALARM}, $cheap->{-TIMEOUT} );
196             }
197             else {
198 0         0 $cheap->{-ALARM} = $kernel->delay_set
199             ( -timeout => $cheap->{-TIMEOUT}, $cheap->{-ID} );
200             }
201             }
202             # We should have an alarm ID -> maybe we're not storing it.
203             elsif ($cheap->{-ALARM}) {
204 0         0 $kernel->alarm_remove( delete $cheap->{-ALARM} );
205             }
206              
207 30         12816 $user_code{SuccessEvent}->(@_);
208            
209             printf "%d == Successfull Connection %s:%d\n", $new_id,
210 30         3026 @{ $heap{$new_id} }{qw( -ADDR -PORT )} if DEBUG;
211             },
212            
213             # }}}
214             # connect: Open new connection {{{
215              
216             # Connect to the next available proxy
217             connect => sub {
218 30     30   46309 my $cheap;
219 30 50 33     219 if (ref $_[ARG0] eq "HASH" || ref $_[ARG0] eq "ARRAY") {
220 0         0 $cheap = splice @_, ARG0, 1;
221             }
222              
223 30         89 my ($address, $port, $bindaddress, $bindport) = @_[ARG0..ARG3];
224              
225 30         38 printf TRACE "connect event invoked (%s, %d) for %s from %s:%d\n",
226             @_[ ARG0, ARG1 ],
227             $cheap->{email}, # email para poeml lang
228             @_[ CALLER_FILE, CALLER_LINE ] if TRACE_CONNECT;
229              
230 30 50       71 unless (defined $address) {
231 0         0 return printf STDERR
232             "connect called without address or port, %s: line %d\n",
233             @_[CALLER_FILE, CALLER_LINE];
234             }
235              
236 30         33 printf STDERR "!!! !! connect state invoked from %s:%d\n",
237             @_[CALLER_FILE, CALLER_LINE] if DEBUG;
238            
239 30         222 push @_, POE::Component::Client::TCPMulti->connect
240             ( RemoteAddress => $address,
241             RemotePort => $port,
242             BindAddress => $bindaddress,
243             BindPort => $bindport,
244             Timeout => $user_code{ConnectTimeout},
245             Heap => $cheap,
246             );
247              
248 30         99 $user_code{Initialize}->(@_);
249             },
250            
251             # }}}
252             # }}}
253             # IO States {{{
254             # -incoming: Handling recieved data (Internal) {{{
255            
256             -incoming => sub {
257 30     30   190156 my ($kernel, $id) = @_[ KERNEL, ARG1 ];
258 30         118 push @_, $heap{$id};
259              
260 30         77 my $cheap = $_[ CHEAP ];
261 30 50       1595 return unless $cheap->{-RUNNING};
262              
263 30         43 if (DEBUG) {
264             print "$_[ARG1] << $_[ARG0]\n";
265             }
266              
267 30 50       118 if ($cheap->{-TIMEOUT}) {
268 30         252 $kernel->delay_adjust
269             ( $cheap->{-ALARM}, $cheap->{-TIMEOUT} );
270             }
271              
272 30         20268 $user_code{InputEvent}(@_);
273             },
274              
275             # }}}
276             # send: Send Data {{{
277              
278             send => sub {
279 30     30   4262 my $cheap = $heap{$_[ARG0]};
280              
281 30 50       133 unless (defined $_[ARG1]) {
    50          
282 0         0 return printf STDERR
283             "send called without socket or data %s: line %d\n",
284             @_[CALLER_FILE, CALLER_LINE];
285             }
286             elsif (defined $cheap->{-SERVER}) {
287 30         33 if (DEBUG) {
288             print "$_[ARG0] >> $_[ARG1]\n";
289             }
290 30         131 $cheap->{-SERVER}->put( @_[ARG1 .. $#_] );
291             }
292             },
293              
294             # }}}
295             # }}}
296             # Error States {{{
297             # -failure: Handle Connection Failure (Internal) {{{
298            
299             -failure => sub {
300 0     0   0 printf "%d !! Disconnected - Failed (%s)\n", $_[ARG3], $_[ARG2]
301             if DEBUG;
302              
303 0         0 push @_, $heap{$_[ARG3]};
304             # di ko alam kahit needed ito
305 0 0       0 $user_code{FailureEvent}->(@_) if $_[CHEAP]{-RUNNING};
306              
307             # Redundant ( This is done in shutdown )
308             # delete $_[CHEAP];
309             # delete $heap{$_[ARG3]}{-SERVER};
310              
311 0         0 $_[ARG0] = $_[ARG3];
312 0         0 $code->{shutdown}->(@_);
313             },
314            
315             # }}}
316             # -error: Handle Connection Error (Internal) {{{
317              
318             -error => sub {
319 0     0   0 printf "%d !! Disconnected - Error\n", $_[ARG3] if DEBUG;
320            
321 0         0 push @_, $heap{$_[ARG3]};
322 0 0       0 $user_code{ErrorEvent}->(@_) if $_[CHEAP]{-RUNNING};
323            
324             # Redundant
325             # delete $_[CHEAP];
326             # delete $heap{$_[ARG3]}{-SERVER};
327            
328 0         0 $_[ARG0] = $_[ARG3];
329 0         0 $code->{shutdown}->(@_);
330             },
331            
332             # }}}
333             # -timeout: Handle Connection Timeout (Internal) {{{
334             # Occsaionally -timeout is being called after the connection errors,
335             # thats what the extra check on -RUNNING is for, as well as in the
336             # other error states, just to ensure there is no problem. This doesn't
337             # really happen anymore but I'm not comfortable with it yet.
338              
339             -timeout => sub {
340             # 20050330: timeouts aren't getting cleaned up!
341             # if ($heap{$_[ARG0]}{-RUNNING}) {
342 0     0   0 printf "%d ** Disconnected - Timeout\n", $_[ARG0] if DEBUG;
343            
344 0         0 push @_, delete $heap{$_[ARG0]};
345              
346 0         0 $user_code{TimeoutEvent}->(@_);
347              
348 0         0 $user_code{Disconnected}->(@_);
349              
350             # Just incase the cheap hangs around clean up the wheel
351 0         0 delete $_[CHEAP]->{-SERVER};
352 0         0 delete $_[CHEAP];
353            
354             # kase sabi ito dalawa ng
355             # $code->{shutdown}->(@_);
356             # }
357             },
358            
359             # }}}
360             # }}}
361             # Closing States {{{
362             # -flushed: Empty Socket (Internal) {{{
363              
364             # flush - our socket is empty - Direct call is faster and fits reqs.
365             # -flushed => sub {
366             # unless ($heap{$_[ARG0]}{-RUNNING}) {
367             # $code->{shutdown}->(@_);
368             # }
369             # },
370            
371             # }}}
372             # shutdown: Handle Socket Shutdown {{{
373              
374             # Shutdown... push onto queue if not sent, delete driver.
375             shutdown => sub {
376 30     30   92674 my ($kernel, $id) = @_[ KERNEL, ARG0 ];
377              
378 30 50       267 unless (defined $id) {
379 0         0 return printf STDERR
380             "shutdown called without CHEAP id %s: line %d\n",
381             @_[CALLER_FILE, CALLER_LINE];
382             }
383 30 50       137 unless (exists $heap{$id}) {
384 0         0 die "$_[ARG0]: Socket doesn't exist?";
385             }
386              
387              
388 30         124 push @_, my $cheap = delete $heap{$id};
389              
390 30         110 $cheap->{-RUNNING} = 0;
391              
392             # Shutdown is now impolite.
393             # if (defined $heap{$_[ARG0]}{-SERVER}) {
394             # if ($heap{$_[ARG0]}{-SERVER}->can("get_driver_out_octets")) {
395             # unless ($heap{$_[ARG0]}{-SERVER}->get_driver_out_octets) {
396 30         38 printf "%d -- Disconnected - Closed\n", $_[ARG0]
397             if DEBUG;
398            
399             # Remove Alarm, tanga ko ba!?
400 30         171 $kernel->alarm_remove
401             ( delete $cheap->{-ALARM} );
402              
403 30         3310 $user_code{Disconnected}->(@_);
404            
405             # Blow shit up
406 30         82 delete $_[CHEAP];
407             # }
408            
409             # Its either gone and we're out of synch (shouldn't happen),
410             # or we want to wait for a clean shutdown.
411 30         234 return;
412             # }
413             # }
414             # Our wheel is dead if we didn't return above.
415             # This is kind of redundant, but much of this module is.
416 0         0 $_[KERNEL]->alarm_remove ( delete $heap{$_[ARG0]}{-ALARM} );
417              
418 0         0 push @_, $heap{$_[ARG0]};
419 0         0 $user_code{Disconnected}->(@_);
420            
421 0         0 delete $_[CHEAP];
422 0         0 delete $heap{$_[ARG0]};
423            
424             # Don't do this unless we're flushed...
425             # delete $heap{$_[ARG0]};
426             },
427            
428             # }}}
429             # die: Gracefully close all sockets {{{
430             # Shutdown quick, clean and gracefull.
431              
432             die => sub {
433 1     1   303 $_[KERNEL]->call(shutdown => $_) for keys %heap;
434 1         7 $_[KERNEL]->alias_remove($_) for $_[KERNEL]->alias_list;
435 1         33 $_[KERNEL]->alarm_remove_all;
436             },
437              
438             # }}}
439             # }}}
440 1         54 };
441             # }}}
442             # Session Constructor {{{
443              
444 1         15 POE::Session->create
445 1         3 ( inline_states => { %{ delete $user_code{inline_states} }, %$code },
446             object_states => delete $user_code{object_states},
447             package_states => delete $user_code{package_states},
448             options => delete $user_code{options},
449             args => delete $user_code{args},
450             );
451              
452             # }}}
453             }
454              
455             # }}}
456             # Connect Method {{{
457              
458             sub connect {
459 30     30 0 608 my %Options = @_[1..$#_];
460 30   50     348 $Options{Heap} ||= {};
461              
462 30         35 printf STDERR "!!! -> connect method called from %s:%d\n",
463             (caller)[1,2] if DEBUG;
464              
465              
466 30         262 my $server = POE::Wheel::SocketFactory->new
467             ( RemoteAddress => $Options{RemoteAddress},
468             RemotePort => $Options{RemotePort},
469             BindAddress => $Options{BindAddress},
470             BindPort => $Options{BindPort},
471             SuccessEvent => '-success',
472             FailureEvent => '-failure',
473             Reuse => 'yes',
474             );
475            
476 30         93004 my $id = $server->ID;
477              
478 30         261 printf TRACE "->connect(count %d, id %d, host (%s:%d) %s:%d);\n",
479             scalar keys %heap, $id, @Options{qw( RemoteAddress RemotePort )},
480             (caller)[1,2] if TRACE_CONNECT;
481              
482 30         1100 $heap{$id} = bless {
483 30         44 %{ $Options{Heap} },
484             -ID => $server->ID,
485             -ADDR => $Options{RemoteAddress},
486             -PORT => $Options{RemotePort},
487             -BINDA => $Options{BindAddress},
488             -BINDP => $Options{BindPort},
489             -RUNNING => 1,
490             -TIMEOUT => $Options{Timeout},
491             -SERVER => $server,
492             -STAMP => time,
493             }, __PACKAGE__ . "::CHEAP";
494            
495 30 50       603 if ($heap{$id}{-TIMEOUT}) {
496 30         209 $heap{$id}{-ALARM} = $poe_kernel->delay_set
497             ( -timeout => $heap{$id}{-TIMEOUT}, $id, $heap{$id}{email});
498             }
499             else {
500 0         0 $heap{$id}{-ALARM} = 0;
501             }
502              
503 30         3322 printf "%d ++ Connecting %s:%d \n", $id, @{ $heap{$id} }{qw( -ADDR -PORT )}
504             if DEBUG;
505              
506 30         136 return $heap{$id};
507             }
508              
509             # }}}
510             # CHEAP Package {{{
511              
512             package POE::Component::Client::TCPMulti::CHEAP;
513 2     2   23 use POE::Kernel;
  2         4  
  2         21  
514              
515             # Attribute Accessors {{{
516             sub ID {
517 60     60   52972 shift->{-ID}
518             }
519             sub ADDR {
520 0     0     shift->{-ADDR}
521             }
522             sub PORT {
523 0     0     shift->{-PORT}
524             }
525             # }}}
526             # Filter Settings {{{
527              
528             sub filter {
529 0     0     shift->{-SERVER}->set_filter( shift->new(@_) );
530             }
531              
532             sub input_filter {
533 0     0     shift->{-SERVER}->set_input_filter( shift->new(@_) );
534             }
535              
536             sub output_filter {
537 0     0     shift->{-SERVER}->set_output_filter( shift->new(@_) );
538             }
539              
540             # }}}
541             # Timeout Setting {{{
542              
543             sub timeout {
544 0     0     my ($cheap, $timeout) = @_;
545              
546 0 0         $poe_kernel->alarm_remove($cheap->{-ALARM}) if $cheap->{-ALARM};
547              
548 0 0         unless (defined $timeout) {
549 0           return $cheap->{-TIMEOUT};
550             }
551 0 0         if ($timeout) {
552 0           $cheap->{-TIMEOUT} = $timeout;
553 0           $cheap->{-STAMP} = time;
554 0           $cheap->{-ALARM} = $poe_kernel->delay_set
555             ( -timeout => $cheap->{-TIMEOUT}, $cheap->{-ID});
556             }
557             else {
558 0           $cheap->{-TIMEOUT} = 0;
559 0           $cheap->{-ALARM} = 0;
560 0           $cheap->{-STAMP} = 0;
561             }
562             }
563              
564             # }}}
565             # }}}
566              
567             return "POE Rules";