File Coverage

blib/lib/POE/Component/Client/Keepalive.pm
Criterion Covered Total %
statement 363 439 82.6
branch 83 122 68.0
condition 27 51 52.9
subroutine 49 54 90.7
pod 5 5 100.0
total 527 671 78.5


line stmt bran cond sub pod time code
1             package POE::Component::Client::Keepalive;
2             # vim: ts=2 sw=2 expandtab
3             $POE::Component::Client::Keepalive::VERSION = '0.272';
4 15     15   3670002 use warnings;
  15         41  
  15         605  
5 15     15   87 use strict;
  15         29  
  15         574  
6              
7 15     15   121 use Carp qw(croak);
  15         29  
  15         1178  
8 15     15   90 use Errno qw(ETIMEDOUT EBADF);
  15         33  
  15         762  
9 15     15   87 use Socket qw(SOL_SOCKET SO_LINGER);
  15         26  
  15         1111  
10              
11 15     15   92 use POE;
  15         30  
  15         101  
12 15     15   24217 use POE::Wheel::SocketFactory;
  15         188838  
  15         571  
13 15     15   11548 use POE::Component::Connection::Keepalive;
  15         86  
  15         527  
14 15     15   21357 use POE::Component::Resolver;
  15         1043528  
  15         1191  
15 15     15   15457 use Net::IP::Minimal qw(ip_is_ipv4);
  15         13801  
  15         1591  
16              
17             my $ssl_available;
18             eval {
19             require POE::Component::SSLify;
20             $ssl_available = 1;
21             };
22              
23 15     15   129 use constant DEBUG => 0;
  15         32  
  15         1599  
24              
25             use constant {
26 15         1460 DEBUG_DNS => (DEBUG || 0),
27             DEBUG_DEALLOCATE => (DEBUG || 0),
28 15     15   91 };
  15         31  
29              
30 15   50     36666 use constant TCP_PROTO => scalar(getprotobyname "tcp") || (
31             die "getprotobyname('tcp') failed: $!"
32 15     15   86 );
  15         30  
33              
34             # Manage connection request IDs.
35              
36             my $current_id = 0;
37             my %active_req_ids;
38              
39             sub _allocate_req_id {
40 33     33   316 while (1) {
41 33 50       170 last unless exists $active_req_ids{++$current_id};
42             }
43 33         273 return $active_req_ids{$current_id} = $current_id;
44             }
45              
46             sub _free_req_id {
47 31     31   71 my $id = shift;
48 31         98 delete $active_req_ids{$id};
49             }
50              
51             my $default_resolver;
52             my $instances = 0;
53              
54             # The connection manager uses a number of data structures, most of
55             # them arrays. These constants define offsets into those arrays, and
56             # the comments document them.
57              
58             use constant { # @$self = (
59 15         3845 SF_POOL => 0, # \%socket_pool,
60             SF_QUEUE => 1, # \@request_queue,
61             SF_USED => 2, # \%sockets_in_use,
62             SF_WHEELS => 3, # \%wheels_by_id,
63             SF_USED_EACH => 4, # \%count_by_triple,
64             SF_MAX_OPEN => 5, # $max_open_count,
65             SF_MAX_HOST => 6, # $max_per_host,
66             SF_SOCKETS => 7, # \%socket_xref,
67             SF_KEEPALIVE => 8, # $keep_alive_secs,
68             SF_TIMEOUT => 9, # $default_request_timeout,
69             SF_RESOLVER => 10, # $poco_client_dns_object,
70             SF_SHUTDOWN => 11, # $shutdown_flag,
71             SF_REQ_INDEX => 12, # \%request_id_to_wheel_id,
72             SF_BIND_ADDR => 13, # $bind_address,
73             SF_ALIAS => 14, # $embedded_session_alias
74 15     15   124 }; # );
  15         31  
75              
76             use constant { # $socket_xref{$socket} = [
77 15         1378 SK_KEY => 0, # $conn_key,
78             SK_TIMER => 1, # $idle_timer,
79 15     15   178 }; # ];
  15         30  
80              
81             # $count_by_triple{$conn_key} = $conn_count;
82              
83             use constant { # $wheels_by_id{$wheel_id} = [
84 15         1512 WHEEL_WHEEL => 0, # $wheel_object,
85             WHEEL_REQUEST => 1, # $request,
86 15     15   79 }; # ];
  15         40  
87              
88             # $socket_pool{$conn_key}{$socket} = $socket;
89              
90             use constant { # $sockets_in_use{$socket} = (
91 15         2531 USED_SOCKET => 0, # $socket_handle,
92             USED_TIME => 1, # $allocation_time,
93             USED_KEY => 2, # $conn_key,
94 15     15   83 }; # );
  15         31  
95              
96             # @request_queue = (
97             # $request,
98             # $request,
99             # ....
100             # );
101              
102             use constant { # $request = [
103 15         73003 RQ_SESSION => 0, # $request_session,
104             RQ_EVENT => 1, # $request_event,
105             RQ_SCHEME => 2, # $request_scheme,
106             RQ_ADDRESS => 3, # $request_address,
107             RQ_IP => 4, # $request_ip,
108             RQ_PORT => 5, # $request_port,
109             RQ_CONN_KEY => 6, # $request_connection_key,
110             RQ_CONTEXT => 7, # $request_context,
111             RQ_TIMEOUT => 8, # $request_timeout,
112             RQ_START => 9, # $request_start_time,
113             RQ_TIMER_ID => 10, # $request_timer_id,
114             RQ_WHEEL_ID => 11, # $request_wheel_id,
115             RQ_ACTIVE => 12, # $request_is_active,
116             RQ_ID => 13, # $request_id,
117             RQ_ADDR_FAM => 14, # $request_address_family,
118             RQ_FOR_SCHEME => 15, # $...
119             RQ_FOR_ADDRESS => 16, # $...
120             RQ_FOR_PORT => 17, # $...
121             RQ_RESOLVER_ID => 18, # $resolver_request_id,
122 15     15   106 }; # ];
  15         42  
123              
124             # Create a connection manager.
125              
126             sub new {
127 19     19 1 19372 my $class = shift;
128 19 100       341 croak "new() needs an even number of parameters" if @_ % 2;
129 18         72 my %args = @_;
130              
131 18   100     198 my $max_per_host = delete($args{max_per_host}) || 4;
132 18   100     215 my $max_open = delete($args{max_open}) || 128;
133 18   100     133 my $keep_alive = delete($args{keep_alive}) || 15;
134 18   100     123 my $timeout = delete($args{timeout}) || 120;
135 18         50 my $resolver = delete($args{resolver});
136 18         45 my $bind_address = delete($args{bind_address});
137              
138 18         60 my @unknown = sort keys %args;
139 18 100       64 if (@unknown) {
140 1         150 croak "new() doesn't accept: @unknown";
141             }
142              
143 17         83 my $alias = "POE::Component::Client::Keepalive::" . ++$current_id;
144              
145 17         152 my $self = bless [
146             { }, # SF_POOL
147             [ ], # SF_QUEUE
148             { }, # SF_USED
149             { }, # SF_WHEELS
150             { }, # SF_USED_EACH
151             $max_open, # SF_MAX_OPEN
152             $max_per_host, # SF_MAX_HOST
153             { }, # SF_SOCKETS
154             $keep_alive, # SF_KEEPALIVE
155             $timeout, # SF_TIMEOUT
156             undef, # SF_RESOLVER
157             undef, # SF_SHUTDOWN
158             undef, # SF_REQ_INDEX
159             $bind_address, # SF_BIND_ADDR
160             undef, # SF_ALIAS
161             ], $class;
162              
163             $default_resolver = $resolver if (
164 17 50 33     134 $resolver and eval { $resolver->isa('POE::Component::Resolver') }
  17         191  
165             );
166              
167 17   33     189 $self->[SF_RESOLVER] = (
168             $default_resolver ||= POE::Component::Resolver->new()
169             );
170              
171 17         432 my $session = POE::Session->create(
172             object_states => [
173             $self => {
174             _start => "_ka_initialize",
175             _stop => "_ka_stopped",
176             ka_add_to_queue => "_ka_add_to_queue",
177             ka_cancel_dns_response => "_ka_cancel_dns_response",
178             ka_conn_failure => "_ka_conn_failure",
179             ka_conn_success => "_ka_conn_success",
180             ka_deallocate => "_ka_deallocate",
181             ka_dns_response => "_ka_dns_response",
182             ka_keepalive_timeout => "_ka_keepalive_timeout",
183             ka_reclaim_socket => "_ka_reclaim_socket",
184             ka_relinquish_socket => "_ka_relinquish_socket",
185             ka_request_timeout => "_ka_request_timeout",
186             ka_resolve_request => "_ka_resolve_request",
187             ka_set_timeout => "_ka_set_timeout",
188             ka_shutdown => "_ka_shutdown",
189             ka_socket_activity => "_ka_socket_activity",
190             ka_wake_up => "_ka_wake_up",
191             },
192             ],
193             );
194              
195 17         3163 $self->[SF_ALIAS] = ref($self) . "::" . $session->ID();
196              
197 17         139 return $self;
198             }
199              
200             # Initialize the hidden session behind this component.
201             # Rendezvous with the object via a mutually agreed upon alias.
202              
203             sub _ka_initialize {
204 17     17   7284 my ($object, $kernel, $heap) = @_[OBJECT, KERNEL, HEAP];
205 17         44 $instances++;
206 17         60 $heap->{dns_requests} = { };
207 17         108 $kernel->alias_set(ref($object) . "::" . $_[SESSION]->ID());
208             }
209              
210             # When programs crash, the session may stop in a non-shutdown state.
211             # _ka_stopped and DESTROY catch this either way the death occurs.
212              
213             sub _ka_stopped {
214 17     17   29004970 $_[OBJECT][SF_SHUTDOWN] = 1;
215             }
216              
217             sub DESTROY {
218 14     14   124541 $_[0]->shutdown();
219             }
220              
221             # Request to wake up. This should only happen during the edge
222             # condition where the component's request queue goes from empty to
223             # having one item.
224             #
225             # It also happens during free(), to see if there are more sockets to
226             # deal with.
227             #
228             # TODO - Make the _ka_wake_up stuff smart enough not to post duplicate
229             # messages to the queue.
230              
231             sub _ka_wake_up {
232 55     55   5183 my ($self, $kernel) = @_[OBJECT, KERNEL];
233              
234             # Scan the list of requests, until we find one that can be met.
235             # Fire off POE::Wheel::SocketFactory to begin the connection
236             # process.
237              
238 55         110 my $request_index = 0;
239 55         95 my $currently_open = keys(%{$self->[SF_USED]}) + keys(%{$self->[SF_SOCKETS]});
  55         140  
  55         196  
240 55         116 my @splice_list;
241              
242 55         298 QUEUED:
243 55         115 foreach my $request (@{$self->[SF_QUEUE]}) {
244 36         66 DEBUG and warn "WAKEUP: checking for $request->[RQ_CONN_KEY]";
245              
246             # Sweep away inactive requests.
247              
248 36 100       148 unless ($request->[RQ_ACTIVE]) {
249 1         2 push @splice_list, $request_index;
250 1         2 next;
251             }
252              
253             # Skip this request if its scheme/address/port triple is maxed
254             # out.
255              
256 35         107 my $req_key = $request->[RQ_CONN_KEY];
257             next if (
258 35 100 100     323 ($self->[SF_USED_EACH]{$req_key} || 0) >= $self->[SF_MAX_HOST]
259             );
260              
261             # Honor the request from the free pool, if possible. The
262             # currently open socket count does not increase.
263              
264 32         274 my $existing_connection = $self->_check_free_pool($req_key);
265 32 100       367 if ($existing_connection) {
266 5         15 push @splice_list, $request_index;
267              
268 5         40 _respond(
269             $request, {
270             connection => $existing_connection,
271             from_cache => "deferred",
272             }
273             );
274              
275             # Remove the wheel-to-request index.
276 5         21 delete $self->[SF_REQ_INDEX]{$request->[RQ_ID]};
277 5         21 _free_req_id($request->[RQ_ID]);
278              
279 5         11 next;
280             }
281              
282             # we can't easily take this out of the outer loop since _check_free_pool
283             # can change it from under us
284 27         61 my @free_sockets = keys(%{$self->[SF_SOCKETS]});
  27         109  
285              
286             # Try to free over-committed (but unused) sockets until we're back
287             # under SF_MAX_OPEN sockets. Bail out if we can't free enough.
288             # TODO - Consider removing @free_sockets in least- to
289             # most-recently used order.
290 27         116 while ($currently_open >= $self->[SF_MAX_OPEN]) {
291 4 100       18 last QUEUED unless @free_sockets;
292 3         220 my $next_to_go = $free_sockets[rand(@free_sockets)];
293 3         19 $self->_remove_socket_from_pool($next_to_go);
294 3         1101 $currently_open--;
295             }
296              
297             # Start the request. Create a wheel to begin the connection.
298             # Move the wheel and its request into SF_WHEELS.
299 26         46 DEBUG and warn "WAKEUP: creating wheel for $req_key";
300              
301 26   66     146 my $addr = ($request->[RQ_IP] or $request->[RQ_ADDRESS]);
302 26 50       1204 my $wheel = POE::Wheel::SocketFactory->new(
303             (
304             defined($self->[SF_BIND_ADDR])
305             ? (BindAddress => $self->[SF_BIND_ADDR])
306             : ()
307             ),
308             RemoteAddress => $addr,
309             RemotePort => $request->[RQ_PORT],
310             SuccessEvent => "ka_conn_success",
311             FailureEvent => "ka_conn_failure",
312             SocketDomain => $request->[RQ_ADDR_FAM],
313             );
314              
315 26         47659 $self->[SF_WHEELS]{$wheel->ID} = [
316             $wheel, # WHEEL_WHEEL
317             $request, # WHEEL_REQUEST
318             ];
319              
320             # store the wheel's ID in the request object
321 26         224 $request->[RQ_WHEEL_ID] = $wheel->ID;
322              
323             # Count it as used, so we don't over commit file handles.
324 26         157 $currently_open++;
325 26         103 $self->[SF_USED_EACH]{$req_key}++;
326              
327             # Temporarily store the SF_USED record under the wheel ID. It
328             # will be moved to the socket when the wheel responds.
329 26         171 $self->[SF_USED]{$wheel->ID} = [
330             undef, # USED_SOCKET
331             time(), # USED_TIME
332             $req_key, # USED_KEY
333             ];
334              
335             # Mark the request index as one to splice out.
336              
337 26         179 push @splice_list, $request_index;
338             }
339             continue {
340 35         154 $request_index++;
341             }
342              
343             # The @splice_list is a list of element indices that need to be
344             # spliced out of the request queue. We scan in backwards, from
345             # highest index to lowest, so that each splice does not affect the
346             # indices of the other.
347             #
348             # This removes the request from the queue. It's vastly important
349             # that the request be entered into SF_WHEELS before now.
350              
351 55         160 my $splice_index = @splice_list;
352 55         309 while ($splice_index--) {
353 32         67 splice @{$self->[SF_QUEUE]}, $splice_list[$splice_index], 1;
  32         241  
354             }
355             }
356              
357             sub allocate {
358 45     45 1 2995000 my $self = shift;
359 45 100       436 croak "allocate() needs an even number of parameters" if @_ % 2;
360 44         430 my %args = @_;
361              
362             # TODO - Validate arguments.
363              
364 44         150 my $scheme = delete $args{scheme};
365 44 100       268 croak "allocate() needs a 'scheme'" unless $scheme;
366 43         99 my $address = delete $args{addr};
367 43 100       232 croak "allocate() needs an 'addr'" unless $address;
368 42         253 my $port = delete $args{port};
369 42 100       238 croak "allocate() needs a 'port'" unless $port;
370 41         83 my $event = delete $args{event};
371 41 100       238 croak "allocate() needs an 'event'" unless $event;
372 40         115 my $context = delete $args{context};
373 40 100       224 croak "allocate() needs a 'context'" unless $context;
374 39         72 my $timeout = delete $args{timeout};
375 39 100       147 $timeout = $self->[SF_TIMEOUT] unless $timeout;
376              
377 39   33     288 my $for_scheme = delete($args{for_scheme}) || $scheme;
378 39   33     288 my $for_address = delete($args{for_addr}) || $address;
379 39   33     379 my $for_port = delete($args{for_port}) || $port;
380              
381 39 50       128 croak "allocate() on shut-down connection manager" if $self->[SF_SHUTDOWN];
382              
383 39         131 my @unknown = sort keys %args;
384 39 100       147 if (@unknown) {
385 1         124 croak "allocate() doesn't accept: @unknown";
386             }
387              
388 38         211 my $conn_key = (
389             "$scheme $address $port for $for_scheme $for_address $for_port"
390             );
391              
392             # If we have a connection pool for the scheme/address/port triple,
393             # then we can maybe post an available connection right away.
394              
395 38         252 my $existing_connection = $self->_check_free_pool($conn_key);
396 38 100       121 if (defined $existing_connection) {
397 5         34 $poe_kernel->post(
398             $poe_kernel->get_active_session,
399             $event => {
400             addr => $address,
401             context => $context,
402             port => $port,
403             scheme => $scheme,
404             connection => $existing_connection,
405             from_cache => "immediate",
406             }
407             );
408 5         502 return;
409             }
410              
411             # We can't honor the request immediately, so it's put into a queue.
412 33         51 DEBUG and warn "ALLOCATE: enqueuing request for $conn_key";
413              
414 33         210 my $request = [
415             $poe_kernel->get_active_session(), # RQ_SESSION
416             $event, # RQ_EVENT
417             $scheme, # RQ_SCHEME
418             $address, # RQ_ADDRESS
419             undef, # RQ_IP
420             $port, # RQ_PORT
421             $conn_key, # RQ_CONN_KEY
422             $context, # RQ_CONTEXT
423             $timeout, # RQ_TIMEOUT
424             time(), # RQ_START
425             undef, # RQ_TIMER_ID
426             undef, # RQ_WHEEL_ID
427             1, # RQ_ACTIVE
428             _allocate_req_id(), # RQ_ID
429             undef, # RQ_ADDR_FAM
430             $for_scheme, # RQ_FOR_SCHEME
431             $for_address, # RQ_FOR_ADDRESS
432             $for_port, # RQ_FOR_PORT
433             undef, # RQ_RESOLVER_ID
434             ];
435              
436 33         130 $self->[SF_REQ_INDEX]{$request->[RQ_ID]} = $request;
437              
438 33         161 $poe_kernel->refcount_increment(
439             $request->[RQ_SESSION]->ID(),
440             "poco-client-keepalive"
441             );
442              
443 33         1609 $poe_kernel->call($self->[SF_ALIAS], ka_set_timeout => $request);
444 33         3585 $poe_kernel->call($self->[SF_ALIAS], ka_resolve_request => $request);
445              
446 33         1842 return $request->[RQ_ID];
447             }
448              
449             sub deallocate {
450 0     0 1 0 my ($self, $req_id) = @_;
451              
452 0 0 0     0 croak "deallocate() requires a request ID" unless(
453             defined($req_id) and exists($active_req_ids{$req_id})
454             );
455              
456 0         0 my $request = delete $self->[SF_REQ_INDEX]{$req_id};
457 0 0       0 unless (defined $request) {
458 0         0 DEBUG_DEALLOCATE and warn "deallocate could not find request $req_id";
459 0         0 return;
460             }
461 0         0 _free_req_id($request->[RQ_ID]);
462              
463             # Now pass the vetted request & its ID into our manager session.
464 0         0 $poe_kernel->call($self->[SF_ALIAS], "ka_deallocate", $request, $req_id);
465             }
466              
467             sub _ka_deallocate {
468 0     0   0 my ($self, $heap, $request, $req_id) = @_[OBJECT, HEAP, ARG0, ARG1];
469              
470 0         0 my $conn_key = $request->[RQ_CONN_KEY];
471 0         0 my $existing_connection = $self->_check_free_pool($conn_key);
472              
473             # Existing connection. Remove it from the pool, and delete the socket.
474 0 0       0 if (defined $existing_connection) {
475 0         0 $self->_remove_socket_from_pool($existing_connection->{socket});
476 0         0 DEBUG_DEALLOCATE and warn(
477             "deallocate called, deleted already-connected socket"
478             );
479 0         0 return;
480             }
481              
482             # No connection yet. Cancel the request.
483 0         0 DEBUG_DEALLOCATE and warn(
484             "deallocate called without an existing connection. ",
485             "cancelling connection request"
486             );
487              
488 0 0       0 unless (exists $heap->{dns_requests}{$request->[RQ_ADDRESS]}) {
489 0         0 DEBUG_DEALLOCATE and warn(
490             "deallocate cannot cancel dns -- no pending request"
491             );
492 0         0 return;
493             }
494              
495 0         0 $poe_kernel->call( $self->[SF_ALIAS], ka_cancel_dns_response => $request );
496 0         0 return;
497             }
498              
499             sub _ka_cancel_dns_response {
500 0     0   0 my ($self, $kernel, $heap, $request) = @_[OBJECT, KERNEL, HEAP, ARG0];
501              
502 0         0 my $address = $request->[RQ_ADDRESS];
503 0         0 DEBUG_DNS and warn "DNS: canceling request for $address\n";
504              
505 0         0 my $requests = $heap->{dns_requests}{$address};
506              
507             # Remove the resolver request for the address of this connection
508             # request
509              
510 0         0 my $req_index = @$requests;
511 0         0 while ($req_index--) {
512 0 0       0 next unless $requests->[$req_index] == $request;
513 0         0 splice(@$requests, $req_index, 1);
514 0         0 last;
515             }
516              
517             # Clean up the structure for the address if there are no more
518             # requests to resolve that address.
519              
520 0 0       0 unless (@$requests) {
521 0         0 DEBUG_DNS and warn "DNS: canceled all requests for $address";
522 0         0 $self->[SF_RESOLVER]->cancel( $request->[RQ_RESOLVER_ID] );
523 0         0 delete $heap->{dns_requests}{$address};
524             }
525              
526             # cancel our attempt to connect
527 0         0 $poe_kernel->alarm_remove( $request->[RQ_TIMER_ID] );
528 0         0 $poe_kernel->refcount_decrement(
529             $request->[RQ_SESSION]->ID(), "poco-client-keepalive"
530             );
531             }
532              
533             # Set the request's timeout, in the component's context.
534              
535             sub _ka_set_timeout {
536 33     33   3367 my ($kernel, $request) = @_[KERNEL, ARG0];
537 33         344 $request->[RQ_TIMER_ID] = $kernel->delay_set(
538             ka_request_timeout => $request->[RQ_TIMEOUT], $request
539             );
540             }
541              
542             # The request has timed out. Mark it as defunct, and respond with an
543             # ETIMEDOUT error.
544              
545             sub _ka_request_timeout {
546 2     2   1080 my ($self, $kernel, $request) = @_[OBJECT, KERNEL, ARG0];
547              
548 2         3 DEBUG and warn(
549             "CON: request from session ", $request->[RQ_SESSION]->ID,
550             " for address ", $request->[RQ_ADDRESS], " timed out"
551             );
552 2         8 $! = ETIMEDOUT;
553              
554             # The easiest way to do this? Simulate an error from the wheel
555             # itself.
556              
557 2 50       19 if (defined $request->[RQ_WHEEL_ID]) {
558 0         0 @_[ARG0..ARG3] = ("connect", $!+0, "$!", $request->[RQ_WHEEL_ID]);
559 0         0 goto &_ka_conn_failure;
560             }
561              
562 2         42 my ($errnum, $errstr) = ($!+0, "$!");
563              
564             # No wheel yet. It must have timed out in connect.
565 2 100       18 if ($request->[RQ_RESOLVER_ID]) {
566 1         12 $self->[SF_RESOLVER]->cancel( $request->[RQ_RESOLVER_ID] );
567 1         17657 $request->[RQ_RESOLVER_ID] = undef;
568             }
569              
570             # But what if there is no wheel?
571 2         55 _respond_with_error($request, "connect", $errnum, $errstr),
572             }
573              
574             # Connection failed. Remove the SF_WHEELS record corresponding to the
575             # request. Remove the SF_USED placeholder record so it won't count
576             # anymore. Send a failure notice to the requester.
577              
578             sub _ka_conn_failure {
579 2     2   1158 my ($self, $func, $errnum, $errstr, $wheel_id) = @_[OBJECT, ARG0..ARG3];
580              
581 2         4 DEBUG and warn "CON: sending $errstr for function $func";
582             # Remove the SF_WHEELS record.
583 2         8 my $wheel_rec = delete $self->[SF_WHEELS]{$wheel_id};
584 2         5 my $request = $wheel_rec->[WHEEL_REQUEST];
585              
586             # Remove the SF_USED placeholder.
587 2         7 delete $self->[SF_USED]{$wheel_id};
588              
589             # remove the wheel-to-request index
590 2         5 delete $self->[SF_REQ_INDEX]{$request->[RQ_ID]};
591 2         9 _free_req_id($request->[RQ_ID]);
592              
593             # Discount the use by request key, removing the SF_USED record
594             # entirely if it's now moot.
595 2         3 my $request_key = $request->[RQ_CONN_KEY];
596 2         9 $self->_decrement_used_each($request_key);
597              
598             # Tell the requester about the failure.
599 2         9 _respond_with_error($request, $func, $errnum, $errstr),
600              
601             $self->_ka_wake_up($_[KERNEL]);
602             }
603              
604             # Connection succeeded. Remove the SF_WHEELS record corresponding to
605             # the request. Flesh out the placeholder SF_USED record so it counts.
606              
607             sub _ka_conn_success {
608 24     24   30566 my ($self, $socket, $wheel_id) = @_[OBJECT, ARG0, ARG3];
609              
610             # Remove the SF_WHEELS record.
611 24         89 my $wheel_rec = delete $self->[SF_WHEELS]{$wheel_id};
612 24         237 my $request = $wheel_rec->[WHEEL_REQUEST];
613              
614             # remove the wheel-to-request index
615 24         97 delete $self->[SF_REQ_INDEX]{$request->[RQ_ID]};
616 24         140 _free_req_id($request->[RQ_ID]);
617              
618             # Remove the SF_USED placeholder, add in the socket, and store it
619             # properly.
620 24         67 my $used = delete $self->[SF_USED]{$wheel_id};
621              
622 24 50       137 unless ($request->[RQ_SCHEME] eq 'https') {
623 24         452 $self->_store_socket($used, $socket);
624 24         107 $self->_send_back_socket($request, $socket);
625 24         326 return;
626             }
627              
628             # HTTPS here.
629             # Really applies to all SSL schemes.
630              
631 0 0       0 unless ($ssl_available) {
632 0         0 die "There is no SSL support, please install POE::Component::SSLify";
633             }
634              
635 0         0 eval {
636 0         0 $socket = POE::Component::SSLify::Client_SSLify(
637             $socket,
638              
639             # TODO - To make non-blocking sslify work, I need to somehow
640             # defer the response until the following callback says it's
641             # fine. Or if the callback says there's an error, it needs to
642             # be propagated out.
643             #
644             # Problem is, just setting the callback doesn't seem to get the
645             # connection to complete (successfully or otherwise). There
646             # needs to be something more going on... but what?
647              
648             # sub {
649             # my ($socket, $status, $errval) = @_;
650             # $errval = 'undef' unless defined $errval;
651             #
652             # warn "socket($socket) status($status) errval($errval)";
653             #
654             # # Connected okay.
655             # if ($status == 1) {
656             # $self->_send_back_socket($request, $socket);
657             # $self = $request = undef;
658             # return;
659             # }
660             #
661             # # Didn't connect okay, or hasn't so far.
662             # # Report the error.
663             # if ($errval == 1) {
664             #
665             # # Get all known errors, but only retain the most recent one.
666             # # I'm not sure this is needed, but the API mentions an error
667             # # queue, which implies that it could contain stale errors.
668             #
669             # my $errnum;
670             # while (my $new_errnum = Net::SSLeay::ERR_get_error()) {
671             # $errnum = $new_errnum;
672             # }
673             #
674             # my $errstr = Net::SSLeay::ERR_error_string($errnum);
675             # warn " ssl_error($errnum) string($errstr)";
676             # _respond_with_error($request, "sslify", undef, $errstr);
677             #
678             # # TODO - May the circle be broken.
679             # $self = $request = undef;
680             # return;
681             # }
682             # }
683             );
684             };
685              
686 0 0       0 if ($@) {
687 0         0 _respond_with_error($request, "sslify", undef, "$@");
688 0         0 return;
689             }
690              
691             # TODO - I think for SSL we just need to _store_socket(). The call
692             # to _send_back_socket() should be inside the SSL callback.
693             #
694             # Also, I think the callback might leak. $request and $self may
695             # need to be weakened.
696              
697 0         0 $self->_store_socket($used, $socket);
698 0         0 $self->_send_back_socket($request, $socket);
699             }
700              
701             sub _store_socket {
702 24     24   52 my ($self, $used, $socket) = @_;
703 24         62 $used->[USED_SOCKET] = $socket;
704 24         134 $self->[SF_USED]{$socket} = $used;
705             }
706              
707             sub _send_back_socket {
708 24     24   66 my ($self, $request, $socket) = @_;
709              
710 24         44 DEBUG and warn(
711             "CON: posting... to $request->[RQ_SESSION] . $request->[RQ_EVENT]"
712             );
713              
714             # Build a connection object around the socket.
715 24         424 my $connection = POE::Component::Connection::Keepalive->new(
716             socket => $socket,
717             manager => $self,
718             );
719              
720             # Give the socket to the requester.
721 24         182 _respond(
722             $request, {
723             connection => $connection,
724             }
725             );
726             }
727              
728             # The user is done with a socket. Make it available for reuse.
729              
730             sub free {
731 36     36 1 2438 my ($self, $socket) = @_;
732              
733 36 100       179 return if $self->[SF_SHUTDOWN];
734 34         59 DEBUG and warn "FREE: freeing socket";
735              
736             # Remove the accompanying SF_USED record.
737 34 100       516 croak "can't free() undefined socket" unless defined $socket;
738 33         141 my $used = delete $self->[SF_USED]{$socket};
739 33 100       218 croak "can't free() unallocated socket" unless defined $used;
740              
741             # Reclaim the socket.
742 32         1457 $poe_kernel->call($self->[SF_ALIAS], "ka_reclaim_socket", $used);
743              
744             # Avoid returning things by mistake.
745 32         918 return;
746             }
747              
748             # A sink for deliberately unhandled events.
749              
750 0     0   0 sub _ka_ignore_this_event {
751             # Do nothing.
752             }
753              
754             # An internal method to fetch a socket from the free pool, if one
755             # exists.
756              
757             sub _check_free_pool {
758 70     70   197 my ($self, $conn_key) = @_;
759              
760 70 100       1168 return unless exists $self->[SF_POOL]{$conn_key};
761              
762 10         26 my $free = $self->[SF_POOL]{$conn_key};
763              
764 10         17 DEBUG and warn "CHECK: reusing $conn_key";
765              
766 10         40 my $next_socket = (values %$free)[0];
767 10         35 delete $free->{$next_socket};
768 10 100       49 unless (keys %$free) {
769 8         22 delete $self->[SF_POOL]{$conn_key};
770             }
771              
772             # _check_free_pool() may be operating in another session, so we call
773             # the correct one here.
774 10         48 $poe_kernel->call($self->[SF_ALIAS], "ka_relinquish_socket", $next_socket);
775              
776 10         14039 $self->[SF_USED]{$next_socket} = [
777             $next_socket, # USED_SOCKET
778             time(), # USED_TIME
779             $conn_key, # USED_KEY
780             ];
781              
782 10         41 delete $self->[SF_SOCKETS]{$next_socket};
783              
784 10         41 $self->[SF_USED_EACH]{$conn_key}++;
785              
786             # Build a connection object around the socket.
787 10         314 my $connection = POE::Component::Connection::Keepalive->new(
788             socket => $next_socket,
789             manager => $self,
790             );
791              
792 10         36 return $connection;
793             }
794              
795             sub _decrement_used_each {
796 34     34   320 my ($self, $request_key) = @_;
797 34 100       272 unless (--$self->[SF_USED_EACH]{$request_key}) {
798 29         107 delete $self->[SF_USED_EACH]{$request_key};
799             }
800             }
801              
802             # Reclaim a socket. Put it in the free socket pool, and wrap it with
803             # select_read() to discard any data and detect when it's closed.
804              
805             sub _ka_reclaim_socket {
806 32     32   10133 my ($self, $kernel, $used) = @_[OBJECT, KERNEL, ARG0];
807              
808 32         98 my $socket = $used->[USED_SOCKET];
809              
810             # Decrement the usage counter for the given connection key.
811 32         79 my $request_key = $used->[USED_KEY];
812 32         322 $self->_decrement_used_each($request_key);
813              
814             # Socket is closed. We can't reuse it.
815 32 100       145 unless (defined fileno $socket) {
816 2         3 DEBUG and warn "RECLAIM: freed socket has previously been closed";
817 2         81 goto &_ka_wake_up;
818             }
819              
820             # Socket is still open. Check for lingering data.
821 30         44 DEBUG and warn "RECLAIM: checking if socket still works";
822              
823             # Check for data on the socket, which implies that the server
824             # doesn't know we're done. That leads to desynchroniziation on the
825             # protocol level, which strongly implies that we can't reuse the
826             # socket. In this case, we'll make a quick attempt at fetching all
827             # the data, then close the socket.
828              
829 30         96 my $rin = '';
830 30         309 vec($rin, fileno($socket), 1) = 1;
831 30         147 my ($rout, $eout);
832 30         303 my $socket_is_active = select ($rout=$rin, undef, $eout=$rin, 0);
833              
834 30 100       109 if ($socket_is_active) {
835 3         6 DEBUG and warn "RECLAIM: socket is still active; trying to drain";
836 15     15   196 use bytes;
  15         39  
  15         269  
837              
838 3   50     152 my $socket_had_data = sysread($socket, my $buf = "", 65536) || 0;
839 3         7 DEBUG and warn "RECLAIM: socket had $socket_had_data bytes. 0 means EOF";
840 3         4 DEBUG and warn "RECLAIM: Giving up on socket.";
841              
842             # Avoid common FIN_WAIT_2 issues, but only for valid sockets.
843             #if ($socket_had_data and fileno($socket)) {
844 3 50       11 if ($socket_had_data) {
845 0         0 my $opt_result = setsockopt(
846             $socket, SOL_SOCKET, SO_LINGER, pack("sll",1,0,0)
847             );
848 0 0 0     0 die "setsockopt: " . ($!+0) . " $!" if (not $opt_result and $! != EBADF);
849             }
850              
851 3         24 goto &_ka_wake_up;
852             }
853              
854             # Socket is alive and has no data, so it's in a quiet, theoretically
855             # reclaimable state.
856              
857 27         44 DEBUG and warn "RECLAIM: reclaiming socket";
858              
859             # Watch the socket, and set a keep-alive timeout.
860 27         142 $kernel->select_read($socket, "ka_socket_activity");
861 27         4068 my $timer_id = $kernel->delay_set(
862             ka_keepalive_timeout => $self->[SF_KEEPALIVE], $socket
863             );
864              
865             # Record the socket as free to be used.
866 27         4100 $self->[SF_POOL]{$request_key}{$socket} = $socket;
867 27         125 $self->[SF_SOCKETS]{$socket} = [
868             $request_key, # SK_KEY
869             $timer_id, # SK_TIMER
870             ];
871              
872 27         224 goto &_ka_wake_up;
873             }
874              
875             # Socket timed out. Discard it.
876              
877             sub _ka_keepalive_timeout {
878 3     3   1998447 my ($self, $socket) = @_[OBJECT, ARG0];
879 3         24 $self->_remove_socket_from_pool($socket);
880             }
881              
882             # Relinquish a socket. Stop selecting on it.
883              
884             sub _ka_relinquish_socket {
885 10     10   700 my ($kernel, $socket) = @_[KERNEL, ARG0];
886 10         68 $kernel->alarm_remove($_[OBJECT][SF_SOCKETS]{$socket}[SK_TIMER]);
887 10         1283 $kernel->select_read($socket, undef);
888             }
889              
890             # Shut down the component. Release any sockets we're currently
891             # holding onto. Clean up any timers. Remove the alias it's known by.
892              
893             sub shutdown {
894 29     29 1 6064 my $self = shift;
895 29 100       1737 return if $self->[SF_SHUTDOWN];
896 15         84 $poe_kernel->call($self->[SF_ALIAS], "ka_shutdown");
897             }
898              
899             sub _ka_shutdown {
900 15     15   1383 my ($self, $kernel, $heap) = @_[OBJECT, KERNEL, HEAP];
901              
902 15 50       93 return if $self->[SF_SHUTDOWN];
903              
904 15         75 $instances--;
905              
906             # Clean out the request queue.
907 15         34 foreach my $request (@{$self->[SF_QUEUE]}) {
  15         85  
908 0         0 $self->_shutdown_request($kernel, $request);
909             }
910 15         47 $self->[SF_QUEUE] = [ ];
911              
912             # Clean out the socket pool.
913 15         42 foreach my $sockets (values %{$self->[SF_POOL]}) {
  15         76  
914 10         367 foreach my $socket (values %$sockets) {
915 10         64 $kernel->alarm_remove($self->[SF_SOCKETS]{$socket}[SK_TIMER]);
916 10         3495 $kernel->select_read($socket, undef);
917             }
918             }
919              
920             # Stop any pending resolver requests.
921 15         1630 foreach my $host (keys %{$heap->{dns_requests}}) {
  15         73  
922 0         0 DEBUG and warn "SHT: Shutting down resolver requests for $host";
923              
924 0         0 foreach my $request (@{$heap->{dns_requests}{$host}}) {
  0         0  
925 0         0 $self->_shutdown_request($kernel, $request);
926             }
927              
928             # Technically not needed since the resolver shutdown should do it.
929             # They all share the same host, so canceling the first should get
930             # them all.
931 0         0 $self->[SF_RESOLVER]->cancel(
932             $heap->{dns_requests}{$host}[0][RQ_RESOLVER_ID]
933             );
934             }
935              
936 15         48 $heap->{dns_requests} = { };
937              
938             # Shut down the resolver.
939 15         39 DEBUG and warn "SHT: Shutting down resolver";
940 15 100       101 if ( $self->[SF_RESOLVER] != $default_resolver ) {
941 1         13 $self->[SF_RESOLVER]->shutdown();
942             }
943 15         13942 $self->[SF_RESOLVER] = undef;
944              
945 15 100 66     203 if ( $default_resolver and !$instances ) {
946 14         1833 $default_resolver->shutdown();
947 14         91516 $default_resolver = undef;
948             }
949              
950             # Finish keepalive's shutdown.
951 15         208 $kernel->alias_remove($self->[SF_ALIAS]);
952 15         2208 $self->[SF_SHUTDOWN] = 1;
953              
954 15         136 return;
955             }
956              
957             sub _shutdown_request {
958 0     0   0 my ($self, $kernel, $request) = @_;
959              
960 0 0       0 if (defined $request->[RQ_TIMER_ID]) {
961 0         0 DEBUG and warn "SHT: Shutting down resolver timer $request->[RQ_TIMER_ID]";
962 0         0 $kernel->alarm_remove($request->[RQ_TIMER_ID]);
963             }
964              
965 0 0       0 if (defined $request->[RQ_WHEEL_ID]) {
966 0         0 DEBUG and warn "SHT: Shutting down resolver wheel $request->[RQ_TIMER_ID]";
967 0         0 delete $self->[SF_WHEELS]{$request->[RQ_WHEEL_ID]};
968              
969             # remove the wheel-to-request index
970 0         0 delete $self->[SF_REQ_INDEX]{$request->[RQ_ID]};
971 0         0 _free_req_id($request->[RQ_ID]);
972             }
973              
974 0 0       0 if (defined $request->[RQ_SESSION]) {
975 0         0 my $session_id = $request->[RQ_SESSION]->ID;
976 0         0 DEBUG and warn "SHT: Releasing session $session_id";
977 0         0 $kernel->refcount_decrement($session_id, "poco-client-keepalive");
978             }
979             }
980              
981             # A socket in the free pool has activity. Read from it and discard
982             # the output. Discard the socket on error or remote closure.
983              
984             sub _ka_socket_activity {
985 1     1   575 my ($self, $kernel, $socket) = @_[OBJECT, KERNEL, ARG0];
986              
987 1         1 if (DEBUG) {
988             my $socket_rec = $self->[SF_SOCKETS]{$socket};
989             my $key = $socket_rec->[SK_KEY];
990             warn "CON: Got activity on socket for $key";
991             }
992              
993             # Any socket activity on a kept-alive socket implies that the socket
994             # is no longer reusable.
995              
996 15     15   24157 use bytes;
  15         75  
  15         88  
997 1   50     174 my $socket_had_data = sysread($socket, my $buf = "", 65536) || 0;
998 1         2 DEBUG and warn "CON: socket had $socket_had_data bytes. 0 means EOF";
999 1         2 DEBUG and warn "CON: Removing socket from the pool";
1000              
1001 1         6 $self->_remove_socket_from_pool($socket);
1002             }
1003              
1004             sub _ka_resolve_request {
1005 33     33   2640 my ($self, $kernel, $heap, $request) = @_[OBJECT, KERNEL, HEAP, ARG0];
1006              
1007 33         77 my $host = $request->[RQ_ADDRESS];
1008              
1009             # Skip DNS resolution if it's already a dotted quad.
1010             # ip_is_ipv4() doesn't require quads, so we count the dots.
1011             #
1012             # TODO - Do the same for IPv6 addresses containing colons?
1013             # TODO - Would require AF_INET6 support around the SocketFactory.
1014 33 100 66     217 if ((($host =~ tr[.][.]) == 3) and ip_is_ipv4($host)) {
1015 6         213 DEBUG_DNS and warn "DNS: $host is a dotted quad; skipping lookup";
1016 6         57 $kernel->call($self->[SF_ALIAS], ka_add_to_queue => $request);
1017 6         45 return;
1018             }
1019              
1020             # It's already pending DNS resolution. Combine this with previous.
1021 27 100       148 if (exists $heap->{dns_requests}{$host}) {
1022 8         22 DEBUG_DNS and warn "DNS: $host is piggybacking on a pending lookup.\n";
1023              
1024             # All requests for the same host share the same resolver ID.
1025             # TODO - Although it should probably be keyed on host:port.
1026 8         35 $request->[RQ_RESOLVER_ID] = $heap->{dns_requests}{$host}[0][RQ_RESOLVER_ID];
1027              
1028 8         20 push @{$heap->{dns_requests}{$host}}, $request;
  8         46  
1029 8         41 return;
1030             }
1031              
1032             # New request. Start lookup.
1033 19         1185 $heap->{dns_requests}{$host} = [ $request ];
1034              
1035 19         271 $request->[RQ_RESOLVER_ID] = $self->[SF_RESOLVER]->resolve(
1036             event => 'ka_dns_response',
1037             host => $host,
1038             service => $request->[RQ_PORT],
1039             hints => { protocol => TCP_PROTO },
1040             );
1041              
1042 19         1540829 DEBUG_DNS and warn "DNS: looking up $host in the background.\n";
1043             }
1044              
1045             sub _ka_dns_response {
1046 19     19   1944516 my ($self, $kernel, $heap, $response_error, $addresses, $request) = @_[
1047             OBJECT, KERNEL, HEAP, ARG0..ARG2
1048             ];
1049              
1050             # We've shut down. Nothing to do here.
1051 19 50       149 return if $self->[SF_SHUTDOWN];
1052              
1053 19         1173 my $request_address = $request->{host};
1054 19         99 my $requests = delete $heap->{dns_requests}{$request_address};
1055              
1056 19         58 DEBUG_DNS and warn "DNS: got response for request address $request_address";
1057              
1058             # Requests on record.
1059 19 50       241 if (defined $requests) {
1060             # We can receive responses for canceled requests. Ignore them: we
1061             # cannot cancel PoCo::Client::DNS requests, so this is how we reap
1062             # them when they're canceled.
1063 19 50       109 if ($requests eq 'cancelled') {
1064 0         0 DEBUG_DNS and warn "DNS: reaping cancelled request for $request_address";
1065 0         0 return;
1066             }
1067 19 50       119 unless (ref $requests eq 'ARRAY') {
1068 0         0 die "DNS: got an unknown requests for $request_address: $requests";
1069             }
1070             }
1071             else {
1072 0         0 die "DNS: Unexpectedly undefined requests for $request_address";
1073             }
1074              
1075             # This is an error. Cancel all requests for the address.
1076             # Tell everybody that their requests failed.
1077 19 100       79 if ($response_error) {
1078 1         2 DEBUG_DNS and warn "DNS: resolver error = $response_error";
1079 1         6 foreach my $request (@$requests) {
1080 1         6 _respond_with_error($request, "resolve", undef, $response_error),
1081             }
1082 1         6 return;
1083             }
1084              
1085 18         42 DEBUG_DNS and warn "DNS: got a response";
1086              
1087             # A response!
1088 18         105 foreach my $address_rec (@$addresses) {
1089 18         203 my $numeric = $self->[SF_RESOLVER]->unpack_addr($address_rec);
1090              
1091 18         1513 DEBUG_DNS and warn "DNS: $request_address resolves to $numeric";
1092              
1093 18         69 foreach my $request (@$requests) {
1094             # Don't bother continuing inactive requests.
1095 26 50       1073 next unless $request->[RQ_ACTIVE];
1096 26         65 $request->[RQ_IP] = $numeric;
1097 26         74 $request->[RQ_ADDR_FAM] = $address_rec->{family};
1098 26         140 $kernel->yield(ka_add_to_queue => $request);
1099             }
1100              
1101             # Return after the first good answer.
1102 18         7891 return;
1103             }
1104              
1105             # Didn't return here. No address record for the host?
1106 0         0 foreach my $request (@$requests) {
1107 0         0 DEBUG_DNS and warn "DNS: $request_address does not resolve";
1108 0         0 _respond_with_error($request, "resolve", undef, "Host has no address."),
1109             }
1110             }
1111              
1112              
1113             sub _ka_add_to_queue {
1114 32     32   7222 my ($self, $kernel, $request) = @_[OBJECT, KERNEL, ARG0];
1115              
1116 32         218 push @{ $self->[SF_QUEUE] }, $request;
  32         97  
1117              
1118             # If the queue has more than one request in it, then it already has
1119             # a wakeup event pending. We don't need to send another one.
1120              
1121 32 100       123 return if @{$self->[SF_QUEUE]} > 1;
  32         169  
1122              
1123             # If the component's allocated socket count is maxed out, then it
1124             # will check the queue when an existing socket is released. We
1125             # don't need to wake it up here.
1126              
1127 22 100       56 return if keys(%{$self->[SF_USED]}) >= $self->[SF_MAX_OPEN];
  22         393  
1128              
1129             # Likewise, we shouldn't awaken the session if there are no
1130             # available slots for the given scheme/address/port triple. "|| 0"
1131             # to avoid an undef error.
1132              
1133 21         61 my $conn_key = $request->[RQ_CONN_KEY];
1134             return if (
1135 21 50 50     273 ($self->[SF_USED_EACH]{$conn_key} || 0) >= $self->[SF_MAX_HOST]
1136             );
1137              
1138             # Wake the session up, and return nothing, signifying sound and fury
1139             # yet to come.
1140 21         38 DEBUG and warn "posting wakeup for $conn_key";
1141 21         134 $poe_kernel->post($self->[SF_ALIAS], "ka_wake_up");
1142 21         3493 return;
1143             }
1144              
1145             # Remove a socket from the free pool, by the socket handle itself.
1146              
1147             sub _remove_socket_from_pool {
1148 7     7   26 my ($self, $socket) = @_;
1149              
1150 7         42 my $socket_rec = delete $self->[SF_SOCKETS]{$socket};
1151 7         47 my $key = $socket_rec->[SK_KEY];
1152              
1153             # Get the blessed version.
1154 7         13 DEBUG and warn "removing socket for $key";
1155 7         37 $socket = delete $self->[SF_POOL]{$key}{$socket};
1156              
1157 7 100       15 unless (keys %{$self->[SF_POOL]{$key}}) {
  7         63  
1158 5         16 delete $self->[SF_POOL]{$key};
1159             }
1160              
1161 7         52 $poe_kernel->alarm_remove($socket_rec->[SK_TIMER]);
1162 7         478 $poe_kernel->select_read($socket, undef);
1163              
1164             # Avoid common FIN_WAIT_2 issues.
1165             # Commented out because fileno() will return true for closed
1166             # sockets, which makes setsockopt() highly unhappy. Also, SO_LINGER
1167             # will cause te socket closure to block, which is less than ideal.
1168             # We need to revisit this another way, or just let sockets enter
1169             # FIN_WAIT_2.
1170              
1171             # if (fileno $socket) {
1172             # setsockopt($socket, SOL_SOCKET, SO_LINGER, pack("sll",1,0,0)) or die(
1173             # "setsockopt: $!"
1174             # );
1175             # }
1176             }
1177              
1178             # Internal function. NOT AN EVENT HANDLER.
1179              
1180             sub _respond_with_error {
1181 5     5   24 my ($request, $func, $num, $string) = @_;
1182 5         75 _respond(
1183             $request,
1184             {
1185             connection => undef,
1186             function => $func,
1187             error_num => $num,
1188             error_str => $string,
1189             }
1190             );
1191             }
1192              
1193             sub _respond {
1194 34     34   76 my ($request, $fields) = @_;
1195              
1196             # Bail out early if the request isn't active.
1197 34 100 66     382 return unless $request->[RQ_ACTIVE] and $request->[RQ_SESSION];
1198              
1199 33         985 $poe_kernel->post(
1200             $request->[RQ_SESSION],
1201             $request->[RQ_EVENT],
1202             {
1203             addr => $request->[RQ_ADDRESS],
1204             context => $request->[RQ_CONTEXT],
1205             port => $request->[RQ_PORT],
1206             scheme => $request->[RQ_SCHEME],
1207             for_addr => $request->[RQ_FOR_ADDRESS],
1208             for_scheme => $request->[RQ_FOR_SCHEME],
1209             for_port => $request->[RQ_FOR_PORT],
1210             %$fields,
1211             }
1212             );
1213              
1214             # Drop the extra refcount.
1215 33         4778 $poe_kernel->refcount_decrement(
1216             $request->[RQ_SESSION]->ID(),
1217             "poco-client-keepalive"
1218             );
1219              
1220             # Remove associated timer.
1221 33 50       1651 if ($request->[RQ_TIMER_ID]) {
1222 33         233 $poe_kernel->alarm_remove($request->[RQ_TIMER_ID]);
1223 33         4735 $request->[RQ_TIMER_ID] = undef;
1224             }
1225              
1226             # Deactivate the request.
1227 33         134 $request->[RQ_ACTIVE] = undef;
1228             }
1229              
1230             1;
1231              
1232             __END__