line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package POE::Component::Client::Keepalive; |
2
|
|
|
|
|
|
|
# vim: ts=2 sw=2 expandtab |
3
|
|
|
|
|
|
|
$POE::Component::Client::Keepalive::VERSION = '0.272'; |
4
|
15
|
|
|
15
|
|
3670002
|
use warnings; |
|
15
|
|
|
|
|
41
|
|
|
15
|
|
|
|
|
605
|
|
5
|
15
|
|
|
15
|
|
87
|
use strict; |
|
15
|
|
|
|
|
29
|
|
|
15
|
|
|
|
|
574
|
|
6
|
|
|
|
|
|
|
|
7
|
15
|
|
|
15
|
|
121
|
use Carp qw(croak); |
|
15
|
|
|
|
|
29
|
|
|
15
|
|
|
|
|
1178
|
|
8
|
15
|
|
|
15
|
|
90
|
use Errno qw(ETIMEDOUT EBADF); |
|
15
|
|
|
|
|
33
|
|
|
15
|
|
|
|
|
762
|
|
9
|
15
|
|
|
15
|
|
87
|
use Socket qw(SOL_SOCKET SO_LINGER); |
|
15
|
|
|
|
|
26
|
|
|
15
|
|
|
|
|
1111
|
|
10
|
|
|
|
|
|
|
|
11
|
15
|
|
|
15
|
|
92
|
use POE; |
|
15
|
|
|
|
|
30
|
|
|
15
|
|
|
|
|
101
|
|
12
|
15
|
|
|
15
|
|
24217
|
use POE::Wheel::SocketFactory; |
|
15
|
|
|
|
|
188838
|
|
|
15
|
|
|
|
|
571
|
|
13
|
15
|
|
|
15
|
|
11548
|
use POE::Component::Connection::Keepalive; |
|
15
|
|
|
|
|
86
|
|
|
15
|
|
|
|
|
527
|
|
14
|
15
|
|
|
15
|
|
21357
|
use POE::Component::Resolver; |
|
15
|
|
|
|
|
1043528
|
|
|
15
|
|
|
|
|
1191
|
|
15
|
15
|
|
|
15
|
|
15457
|
use Net::IP::Minimal qw(ip_is_ipv4); |
|
15
|
|
|
|
|
13801
|
|
|
15
|
|
|
|
|
1591
|
|
16
|
|
|
|
|
|
|
|
17
|
|
|
|
|
|
|
my $ssl_available; |
18
|
|
|
|
|
|
|
eval { |
19
|
|
|
|
|
|
|
require POE::Component::SSLify; |
20
|
|
|
|
|
|
|
$ssl_available = 1; |
21
|
|
|
|
|
|
|
}; |
22
|
|
|
|
|
|
|
|
23
|
15
|
|
|
15
|
|
129
|
use constant DEBUG => 0; |
|
15
|
|
|
|
|
32
|
|
|
15
|
|
|
|
|
1599
|
|
24
|
|
|
|
|
|
|
|
25
|
|
|
|
|
|
|
use constant { |
26
|
15
|
|
|
|
|
1460
|
DEBUG_DNS => (DEBUG || 0), |
27
|
|
|
|
|
|
|
DEBUG_DEALLOCATE => (DEBUG || 0), |
28
|
15
|
|
|
15
|
|
91
|
}; |
|
15
|
|
|
|
|
31
|
|
29
|
|
|
|
|
|
|
|
30
|
15
|
|
50
|
|
|
36666
|
use constant TCP_PROTO => scalar(getprotobyname "tcp") || ( |
31
|
|
|
|
|
|
|
die "getprotobyname('tcp') failed: $!" |
32
|
15
|
|
|
15
|
|
86
|
); |
|
15
|
|
|
|
|
30
|
|
33
|
|
|
|
|
|
|
|
34
|
|
|
|
|
|
|
# Manage connection request IDs. |
35
|
|
|
|
|
|
|
|
36
|
|
|
|
|
|
|
my $current_id = 0; |
37
|
|
|
|
|
|
|
my %active_req_ids; |
38
|
|
|
|
|
|
|
|
39
|
|
|
|
|
|
|
sub _allocate_req_id { |
40
|
33
|
|
|
33
|
|
316
|
while (1) { |
41
|
33
|
50
|
|
|
|
170
|
last unless exists $active_req_ids{++$current_id}; |
42
|
|
|
|
|
|
|
} |
43
|
33
|
|
|
|
|
273
|
return $active_req_ids{$current_id} = $current_id; |
44
|
|
|
|
|
|
|
} |
45
|
|
|
|
|
|
|
|
46
|
|
|
|
|
|
|
sub _free_req_id { |
47
|
31
|
|
|
31
|
|
71
|
my $id = shift; |
48
|
31
|
|
|
|
|
98
|
delete $active_req_ids{$id}; |
49
|
|
|
|
|
|
|
} |
50
|
|
|
|
|
|
|
|
51
|
|
|
|
|
|
|
my $default_resolver; |
52
|
|
|
|
|
|
|
my $instances = 0; |
53
|
|
|
|
|
|
|
|
54
|
|
|
|
|
|
|
# The connection manager uses a number of data structures, most of |
55
|
|
|
|
|
|
|
# them arrays. These constants define offsets into those arrays, and |
56
|
|
|
|
|
|
|
# the comments document them. |
57
|
|
|
|
|
|
|
|
58
|
|
|
|
|
|
|
use constant { # @$self = ( |
59
|
15
|
|
|
|
|
3845
|
SF_POOL => 0, # \%socket_pool, |
60
|
|
|
|
|
|
|
SF_QUEUE => 1, # \@request_queue, |
61
|
|
|
|
|
|
|
SF_USED => 2, # \%sockets_in_use, |
62
|
|
|
|
|
|
|
SF_WHEELS => 3, # \%wheels_by_id, |
63
|
|
|
|
|
|
|
SF_USED_EACH => 4, # \%count_by_triple, |
64
|
|
|
|
|
|
|
SF_MAX_OPEN => 5, # $max_open_count, |
65
|
|
|
|
|
|
|
SF_MAX_HOST => 6, # $max_per_host, |
66
|
|
|
|
|
|
|
SF_SOCKETS => 7, # \%socket_xref, |
67
|
|
|
|
|
|
|
SF_KEEPALIVE => 8, # $keep_alive_secs, |
68
|
|
|
|
|
|
|
SF_TIMEOUT => 9, # $default_request_timeout, |
69
|
|
|
|
|
|
|
SF_RESOLVER => 10, # $poco_client_dns_object, |
70
|
|
|
|
|
|
|
SF_SHUTDOWN => 11, # $shutdown_flag, |
71
|
|
|
|
|
|
|
SF_REQ_INDEX => 12, # \%request_id_to_wheel_id, |
72
|
|
|
|
|
|
|
SF_BIND_ADDR => 13, # $bind_address, |
73
|
|
|
|
|
|
|
SF_ALIAS => 14, # $embedded_session_alias |
74
|
15
|
|
|
15
|
|
124
|
}; # ); |
|
15
|
|
|
|
|
31
|
|
75
|
|
|
|
|
|
|
|
76
|
|
|
|
|
|
|
use constant { # $socket_xref{$socket} = [ |
77
|
15
|
|
|
|
|
1378
|
SK_KEY => 0, # $conn_key, |
78
|
|
|
|
|
|
|
SK_TIMER => 1, # $idle_timer, |
79
|
15
|
|
|
15
|
|
178
|
}; # ]; |
|
15
|
|
|
|
|
30
|
|
80
|
|
|
|
|
|
|
|
81
|
|
|
|
|
|
|
# $count_by_triple{$conn_key} = $conn_count; |
82
|
|
|
|
|
|
|
|
83
|
|
|
|
|
|
|
use constant { # $wheels_by_id{$wheel_id} = [ |
84
|
15
|
|
|
|
|
1512
|
WHEEL_WHEEL => 0, # $wheel_object, |
85
|
|
|
|
|
|
|
WHEEL_REQUEST => 1, # $request, |
86
|
15
|
|
|
15
|
|
79
|
}; # ]; |
|
15
|
|
|
|
|
40
|
|
87
|
|
|
|
|
|
|
|
88
|
|
|
|
|
|
|
# $socket_pool{$conn_key}{$socket} = $socket; |
89
|
|
|
|
|
|
|
|
90
|
|
|
|
|
|
|
use constant { # $sockets_in_use{$socket} = ( |
91
|
15
|
|
|
|
|
2531
|
USED_SOCKET => 0, # $socket_handle, |
92
|
|
|
|
|
|
|
USED_TIME => 1, # $allocation_time, |
93
|
|
|
|
|
|
|
USED_KEY => 2, # $conn_key, |
94
|
15
|
|
|
15
|
|
83
|
}; # ); |
|
15
|
|
|
|
|
31
|
|
95
|
|
|
|
|
|
|
|
96
|
|
|
|
|
|
|
# @request_queue = ( |
97
|
|
|
|
|
|
|
# $request, |
98
|
|
|
|
|
|
|
# $request, |
99
|
|
|
|
|
|
|
# .... |
100
|
|
|
|
|
|
|
# ); |
101
|
|
|
|
|
|
|
|
102
|
|
|
|
|
|
|
use constant { # $request = [ |
103
|
15
|
|
|
|
|
73003
|
RQ_SESSION => 0, # $request_session, |
104
|
|
|
|
|
|
|
RQ_EVENT => 1, # $request_event, |
105
|
|
|
|
|
|
|
RQ_SCHEME => 2, # $request_scheme, |
106
|
|
|
|
|
|
|
RQ_ADDRESS => 3, # $request_address, |
107
|
|
|
|
|
|
|
RQ_IP => 4, # $request_ip, |
108
|
|
|
|
|
|
|
RQ_PORT => 5, # $request_port, |
109
|
|
|
|
|
|
|
RQ_CONN_KEY => 6, # $request_connection_key, |
110
|
|
|
|
|
|
|
RQ_CONTEXT => 7, # $request_context, |
111
|
|
|
|
|
|
|
RQ_TIMEOUT => 8, # $request_timeout, |
112
|
|
|
|
|
|
|
RQ_START => 9, # $request_start_time, |
113
|
|
|
|
|
|
|
RQ_TIMER_ID => 10, # $request_timer_id, |
114
|
|
|
|
|
|
|
RQ_WHEEL_ID => 11, # $request_wheel_id, |
115
|
|
|
|
|
|
|
RQ_ACTIVE => 12, # $request_is_active, |
116
|
|
|
|
|
|
|
RQ_ID => 13, # $request_id, |
117
|
|
|
|
|
|
|
RQ_ADDR_FAM => 14, # $request_address_family, |
118
|
|
|
|
|
|
|
RQ_FOR_SCHEME => 15, # $... |
119
|
|
|
|
|
|
|
RQ_FOR_ADDRESS => 16, # $... |
120
|
|
|
|
|
|
|
RQ_FOR_PORT => 17, # $... |
121
|
|
|
|
|
|
|
RQ_RESOLVER_ID => 18, # $resolver_request_id, |
122
|
15
|
|
|
15
|
|
106
|
}; # ]; |
|
15
|
|
|
|
|
42
|
|
123
|
|
|
|
|
|
|
|
124
|
|
|
|
|
|
|
# Create a connection manager. |
125
|
|
|
|
|
|
|
|
126
|
|
|
|
|
|
|
sub new { |
127
|
19
|
|
|
19
|
1
|
19372
|
my $class = shift; |
128
|
19
|
100
|
|
|
|
341
|
croak "new() needs an even number of parameters" if @_ % 2; |
129
|
18
|
|
|
|
|
72
|
my %args = @_; |
130
|
|
|
|
|
|
|
|
131
|
18
|
|
100
|
|
|
198
|
my $max_per_host = delete($args{max_per_host}) || 4; |
132
|
18
|
|
100
|
|
|
215
|
my $max_open = delete($args{max_open}) || 128; |
133
|
18
|
|
100
|
|
|
133
|
my $keep_alive = delete($args{keep_alive}) || 15; |
134
|
18
|
|
100
|
|
|
123
|
my $timeout = delete($args{timeout}) || 120; |
135
|
18
|
|
|
|
|
50
|
my $resolver = delete($args{resolver}); |
136
|
18
|
|
|
|
|
45
|
my $bind_address = delete($args{bind_address}); |
137
|
|
|
|
|
|
|
|
138
|
18
|
|
|
|
|
60
|
my @unknown = sort keys %args; |
139
|
18
|
100
|
|
|
|
64
|
if (@unknown) { |
140
|
1
|
|
|
|
|
150
|
croak "new() doesn't accept: @unknown"; |
141
|
|
|
|
|
|
|
} |
142
|
|
|
|
|
|
|
|
143
|
17
|
|
|
|
|
83
|
my $alias = "POE::Component::Client::Keepalive::" . ++$current_id; |
144
|
|
|
|
|
|
|
|
145
|
17
|
|
|
|
|
152
|
my $self = bless [ |
146
|
|
|
|
|
|
|
{ }, # SF_POOL |
147
|
|
|
|
|
|
|
[ ], # SF_QUEUE |
148
|
|
|
|
|
|
|
{ }, # SF_USED |
149
|
|
|
|
|
|
|
{ }, # SF_WHEELS |
150
|
|
|
|
|
|
|
{ }, # SF_USED_EACH |
151
|
|
|
|
|
|
|
$max_open, # SF_MAX_OPEN |
152
|
|
|
|
|
|
|
$max_per_host, # SF_MAX_HOST |
153
|
|
|
|
|
|
|
{ }, # SF_SOCKETS |
154
|
|
|
|
|
|
|
$keep_alive, # SF_KEEPALIVE |
155
|
|
|
|
|
|
|
$timeout, # SF_TIMEOUT |
156
|
|
|
|
|
|
|
undef, # SF_RESOLVER |
157
|
|
|
|
|
|
|
undef, # SF_SHUTDOWN |
158
|
|
|
|
|
|
|
undef, # SF_REQ_INDEX |
159
|
|
|
|
|
|
|
$bind_address, # SF_BIND_ADDR |
160
|
|
|
|
|
|
|
undef, # SF_ALIAS |
161
|
|
|
|
|
|
|
], $class; |
162
|
|
|
|
|
|
|
|
163
|
|
|
|
|
|
|
$default_resolver = $resolver if ( |
164
|
17
|
50
|
33
|
|
|
134
|
$resolver and eval { $resolver->isa('POE::Component::Resolver') } |
|
17
|
|
|
|
|
191
|
|
165
|
|
|
|
|
|
|
); |
166
|
|
|
|
|
|
|
|
167
|
17
|
|
33
|
|
|
189
|
$self->[SF_RESOLVER] = ( |
168
|
|
|
|
|
|
|
$default_resolver ||= POE::Component::Resolver->new() |
169
|
|
|
|
|
|
|
); |
170
|
|
|
|
|
|
|
|
171
|
17
|
|
|
|
|
432
|
my $session = POE::Session->create( |
172
|
|
|
|
|
|
|
object_states => [ |
173
|
|
|
|
|
|
|
$self => { |
174
|
|
|
|
|
|
|
_start => "_ka_initialize", |
175
|
|
|
|
|
|
|
_stop => "_ka_stopped", |
176
|
|
|
|
|
|
|
ka_add_to_queue => "_ka_add_to_queue", |
177
|
|
|
|
|
|
|
ka_cancel_dns_response => "_ka_cancel_dns_response", |
178
|
|
|
|
|
|
|
ka_conn_failure => "_ka_conn_failure", |
179
|
|
|
|
|
|
|
ka_conn_success => "_ka_conn_success", |
180
|
|
|
|
|
|
|
ka_deallocate => "_ka_deallocate", |
181
|
|
|
|
|
|
|
ka_dns_response => "_ka_dns_response", |
182
|
|
|
|
|
|
|
ka_keepalive_timeout => "_ka_keepalive_timeout", |
183
|
|
|
|
|
|
|
ka_reclaim_socket => "_ka_reclaim_socket", |
184
|
|
|
|
|
|
|
ka_relinquish_socket => "_ka_relinquish_socket", |
185
|
|
|
|
|
|
|
ka_request_timeout => "_ka_request_timeout", |
186
|
|
|
|
|
|
|
ka_resolve_request => "_ka_resolve_request", |
187
|
|
|
|
|
|
|
ka_set_timeout => "_ka_set_timeout", |
188
|
|
|
|
|
|
|
ka_shutdown => "_ka_shutdown", |
189
|
|
|
|
|
|
|
ka_socket_activity => "_ka_socket_activity", |
190
|
|
|
|
|
|
|
ka_wake_up => "_ka_wake_up", |
191
|
|
|
|
|
|
|
}, |
192
|
|
|
|
|
|
|
], |
193
|
|
|
|
|
|
|
); |
194
|
|
|
|
|
|
|
|
195
|
17
|
|
|
|
|
3163
|
$self->[SF_ALIAS] = ref($self) . "::" . $session->ID(); |
196
|
|
|
|
|
|
|
|
197
|
17
|
|
|
|
|
139
|
return $self; |
198
|
|
|
|
|
|
|
} |
199
|
|
|
|
|
|
|
|
200
|
|
|
|
|
|
|
# Initialize the hidden session behind this component. |
201
|
|
|
|
|
|
|
# Rendezvous with the object via a mutually agreed upon alias. |
202
|
|
|
|
|
|
|
|
203
|
|
|
|
|
|
|
sub _ka_initialize { |
204
|
17
|
|
|
17
|
|
7284
|
my ($object, $kernel, $heap) = @_[OBJECT, KERNEL, HEAP]; |
205
|
17
|
|
|
|
|
44
|
$instances++; |
206
|
17
|
|
|
|
|
60
|
$heap->{dns_requests} = { }; |
207
|
17
|
|
|
|
|
108
|
$kernel->alias_set(ref($object) . "::" . $_[SESSION]->ID()); |
208
|
|
|
|
|
|
|
} |
209
|
|
|
|
|
|
|
|
210
|
|
|
|
|
|
|
# When programs crash, the session may stop in a non-shutdown state. |
211
|
|
|
|
|
|
|
# _ka_stopped and DESTROY catch this either way the death occurs. |
212
|
|
|
|
|
|
|
|
213
|
|
|
|
|
|
|
sub _ka_stopped { |
214
|
17
|
|
|
17
|
|
29004970
|
$_[OBJECT][SF_SHUTDOWN] = 1; |
215
|
|
|
|
|
|
|
} |
216
|
|
|
|
|
|
|
|
217
|
|
|
|
|
|
|
sub DESTROY { |
218
|
14
|
|
|
14
|
|
124541
|
$_[0]->shutdown(); |
219
|
|
|
|
|
|
|
} |
220
|
|
|
|
|
|
|
|
221
|
|
|
|
|
|
|
# Request to wake up. This should only happen during the edge |
222
|
|
|
|
|
|
|
# condition where the component's request queue goes from empty to |
223
|
|
|
|
|
|
|
# having one item. |
224
|
|
|
|
|
|
|
# |
225
|
|
|
|
|
|
|
# It also happens during free(), to see if there are more sockets to |
226
|
|
|
|
|
|
|
# deal with. |
227
|
|
|
|
|
|
|
# |
228
|
|
|
|
|
|
|
# TODO - Make the _ka_wake_up stuff smart enough not to post duplicate |
229
|
|
|
|
|
|
|
# messages to the queue. |
230
|
|
|
|
|
|
|
|
231
|
|
|
|
|
|
|
sub _ka_wake_up { |
232
|
55
|
|
|
55
|
|
5183
|
my ($self, $kernel) = @_[OBJECT, KERNEL]; |
233
|
|
|
|
|
|
|
|
234
|
|
|
|
|
|
|
# Scan the list of requests, until we find one that can be met. |
235
|
|
|
|
|
|
|
# Fire off POE::Wheel::SocketFactory to begin the connection |
236
|
|
|
|
|
|
|
# process. |
237
|
|
|
|
|
|
|
|
238
|
55
|
|
|
|
|
110
|
my $request_index = 0; |
239
|
55
|
|
|
|
|
95
|
my $currently_open = keys(%{$self->[SF_USED]}) + keys(%{$self->[SF_SOCKETS]}); |
|
55
|
|
|
|
|
140
|
|
|
55
|
|
|
|
|
196
|
|
240
|
55
|
|
|
|
|
116
|
my @splice_list; |
241
|
|
|
|
|
|
|
|
242
|
55
|
|
|
|
|
298
|
QUEUED: |
243
|
55
|
|
|
|
|
115
|
foreach my $request (@{$self->[SF_QUEUE]}) { |
244
|
36
|
|
|
|
|
66
|
DEBUG and warn "WAKEUP: checking for $request->[RQ_CONN_KEY]"; |
245
|
|
|
|
|
|
|
|
246
|
|
|
|
|
|
|
# Sweep away inactive requests. |
247
|
|
|
|
|
|
|
|
248
|
36
|
100
|
|
|
|
148
|
unless ($request->[RQ_ACTIVE]) { |
249
|
1
|
|
|
|
|
2
|
push @splice_list, $request_index; |
250
|
1
|
|
|
|
|
2
|
next; |
251
|
|
|
|
|
|
|
} |
252
|
|
|
|
|
|
|
|
253
|
|
|
|
|
|
|
# Skip this request if its scheme/address/port triple is maxed |
254
|
|
|
|
|
|
|
# out. |
255
|
|
|
|
|
|
|
|
256
|
35
|
|
|
|
|
107
|
my $req_key = $request->[RQ_CONN_KEY]; |
257
|
|
|
|
|
|
|
next if ( |
258
|
35
|
100
|
100
|
|
|
323
|
($self->[SF_USED_EACH]{$req_key} || 0) >= $self->[SF_MAX_HOST] |
259
|
|
|
|
|
|
|
); |
260
|
|
|
|
|
|
|
|
261
|
|
|
|
|
|
|
# Honor the request from the free pool, if possible. The |
262
|
|
|
|
|
|
|
# currently open socket count does not increase. |
263
|
|
|
|
|
|
|
|
264
|
32
|
|
|
|
|
274
|
my $existing_connection = $self->_check_free_pool($req_key); |
265
|
32
|
100
|
|
|
|
367
|
if ($existing_connection) { |
266
|
5
|
|
|
|
|
15
|
push @splice_list, $request_index; |
267
|
|
|
|
|
|
|
|
268
|
5
|
|
|
|
|
40
|
_respond( |
269
|
|
|
|
|
|
|
$request, { |
270
|
|
|
|
|
|
|
connection => $existing_connection, |
271
|
|
|
|
|
|
|
from_cache => "deferred", |
272
|
|
|
|
|
|
|
} |
273
|
|
|
|
|
|
|
); |
274
|
|
|
|
|
|
|
|
275
|
|
|
|
|
|
|
# Remove the wheel-to-request index. |
276
|
5
|
|
|
|
|
21
|
delete $self->[SF_REQ_INDEX]{$request->[RQ_ID]}; |
277
|
5
|
|
|
|
|
21
|
_free_req_id($request->[RQ_ID]); |
278
|
|
|
|
|
|
|
|
279
|
5
|
|
|
|
|
11
|
next; |
280
|
|
|
|
|
|
|
} |
281
|
|
|
|
|
|
|
|
282
|
|
|
|
|
|
|
# we can't easily take this out of the outer loop since _check_free_pool |
283
|
|
|
|
|
|
|
# can change it from under us |
284
|
27
|
|
|
|
|
61
|
my @free_sockets = keys(%{$self->[SF_SOCKETS]}); |
|
27
|
|
|
|
|
109
|
|
285
|
|
|
|
|
|
|
|
286
|
|
|
|
|
|
|
# Try to free over-committed (but unused) sockets until we're back |
287
|
|
|
|
|
|
|
# under SF_MAX_OPEN sockets. Bail out if we can't free enough. |
288
|
|
|
|
|
|
|
# TODO - Consider removing @free_sockets in least- to |
289
|
|
|
|
|
|
|
# most-recently used order. |
290
|
27
|
|
|
|
|
116
|
while ($currently_open >= $self->[SF_MAX_OPEN]) { |
291
|
4
|
100
|
|
|
|
18
|
last QUEUED unless @free_sockets; |
292
|
3
|
|
|
|
|
220
|
my $next_to_go = $free_sockets[rand(@free_sockets)]; |
293
|
3
|
|
|
|
|
19
|
$self->_remove_socket_from_pool($next_to_go); |
294
|
3
|
|
|
|
|
1101
|
$currently_open--; |
295
|
|
|
|
|
|
|
} |
296
|
|
|
|
|
|
|
|
297
|
|
|
|
|
|
|
# Start the request. Create a wheel to begin the connection. |
298
|
|
|
|
|
|
|
# Move the wheel and its request into SF_WHEELS. |
299
|
26
|
|
|
|
|
46
|
DEBUG and warn "WAKEUP: creating wheel for $req_key"; |
300
|
|
|
|
|
|
|
|
301
|
26
|
|
66
|
|
|
146
|
my $addr = ($request->[RQ_IP] or $request->[RQ_ADDRESS]); |
302
|
26
|
50
|
|
|
|
1204
|
my $wheel = POE::Wheel::SocketFactory->new( |
303
|
|
|
|
|
|
|
( |
304
|
|
|
|
|
|
|
defined($self->[SF_BIND_ADDR]) |
305
|
|
|
|
|
|
|
? (BindAddress => $self->[SF_BIND_ADDR]) |
306
|
|
|
|
|
|
|
: () |
307
|
|
|
|
|
|
|
), |
308
|
|
|
|
|
|
|
RemoteAddress => $addr, |
309
|
|
|
|
|
|
|
RemotePort => $request->[RQ_PORT], |
310
|
|
|
|
|
|
|
SuccessEvent => "ka_conn_success", |
311
|
|
|
|
|
|
|
FailureEvent => "ka_conn_failure", |
312
|
|
|
|
|
|
|
SocketDomain => $request->[RQ_ADDR_FAM], |
313
|
|
|
|
|
|
|
); |
314
|
|
|
|
|
|
|
|
315
|
26
|
|
|
|
|
47659
|
$self->[SF_WHEELS]{$wheel->ID} = [ |
316
|
|
|
|
|
|
|
$wheel, # WHEEL_WHEEL |
317
|
|
|
|
|
|
|
$request, # WHEEL_REQUEST |
318
|
|
|
|
|
|
|
]; |
319
|
|
|
|
|
|
|
|
320
|
|
|
|
|
|
|
# store the wheel's ID in the request object |
321
|
26
|
|
|
|
|
224
|
$request->[RQ_WHEEL_ID] = $wheel->ID; |
322
|
|
|
|
|
|
|
|
323
|
|
|
|
|
|
|
# Count it as used, so we don't over commit file handles. |
324
|
26
|
|
|
|
|
157
|
$currently_open++; |
325
|
26
|
|
|
|
|
103
|
$self->[SF_USED_EACH]{$req_key}++; |
326
|
|
|
|
|
|
|
|
327
|
|
|
|
|
|
|
# Temporarily store the SF_USED record under the wheel ID. It |
328
|
|
|
|
|
|
|
# will be moved to the socket when the wheel responds. |
329
|
26
|
|
|
|
|
171
|
$self->[SF_USED]{$wheel->ID} = [ |
330
|
|
|
|
|
|
|
undef, # USED_SOCKET |
331
|
|
|
|
|
|
|
time(), # USED_TIME |
332
|
|
|
|
|
|
|
$req_key, # USED_KEY |
333
|
|
|
|
|
|
|
]; |
334
|
|
|
|
|
|
|
|
335
|
|
|
|
|
|
|
# Mark the request index as one to splice out. |
336
|
|
|
|
|
|
|
|
337
|
26
|
|
|
|
|
179
|
push @splice_list, $request_index; |
338
|
|
|
|
|
|
|
} |
339
|
|
|
|
|
|
|
continue { |
340
|
35
|
|
|
|
|
154
|
$request_index++; |
341
|
|
|
|
|
|
|
} |
342
|
|
|
|
|
|
|
|
343
|
|
|
|
|
|
|
# The @splice_list is a list of element indices that need to be |
344
|
|
|
|
|
|
|
# spliced out of the request queue. We scan in backwards, from |
345
|
|
|
|
|
|
|
# highest index to lowest, so that each splice does not affect the |
346
|
|
|
|
|
|
|
# indices of the other. |
347
|
|
|
|
|
|
|
# |
348
|
|
|
|
|
|
|
# This removes the request from the queue. It's vastly important |
349
|
|
|
|
|
|
|
# that the request be entered into SF_WHEELS before now. |
350
|
|
|
|
|
|
|
|
351
|
55
|
|
|
|
|
160
|
my $splice_index = @splice_list; |
352
|
55
|
|
|
|
|
309
|
while ($splice_index--) { |
353
|
32
|
|
|
|
|
67
|
splice @{$self->[SF_QUEUE]}, $splice_list[$splice_index], 1; |
|
32
|
|
|
|
|
241
|
|
354
|
|
|
|
|
|
|
} |
355
|
|
|
|
|
|
|
} |
356
|
|
|
|
|
|
|
|
357
|
|
|
|
|
|
|
sub allocate { |
358
|
45
|
|
|
45
|
1
|
2995000
|
my $self = shift; |
359
|
45
|
100
|
|
|
|
436
|
croak "allocate() needs an even number of parameters" if @_ % 2; |
360
|
44
|
|
|
|
|
430
|
my %args = @_; |
361
|
|
|
|
|
|
|
|
362
|
|
|
|
|
|
|
# TODO - Validate arguments. |
363
|
|
|
|
|
|
|
|
364
|
44
|
|
|
|
|
150
|
my $scheme = delete $args{scheme}; |
365
|
44
|
100
|
|
|
|
268
|
croak "allocate() needs a 'scheme'" unless $scheme; |
366
|
43
|
|
|
|
|
99
|
my $address = delete $args{addr}; |
367
|
43
|
100
|
|
|
|
232
|
croak "allocate() needs an 'addr'" unless $address; |
368
|
42
|
|
|
|
|
253
|
my $port = delete $args{port}; |
369
|
42
|
100
|
|
|
|
238
|
croak "allocate() needs a 'port'" unless $port; |
370
|
41
|
|
|
|
|
83
|
my $event = delete $args{event}; |
371
|
41
|
100
|
|
|
|
238
|
croak "allocate() needs an 'event'" unless $event; |
372
|
40
|
|
|
|
|
115
|
my $context = delete $args{context}; |
373
|
40
|
100
|
|
|
|
224
|
croak "allocate() needs a 'context'" unless $context; |
374
|
39
|
|
|
|
|
72
|
my $timeout = delete $args{timeout}; |
375
|
39
|
100
|
|
|
|
147
|
$timeout = $self->[SF_TIMEOUT] unless $timeout; |
376
|
|
|
|
|
|
|
|
377
|
39
|
|
33
|
|
|
288
|
my $for_scheme = delete($args{for_scheme}) || $scheme; |
378
|
39
|
|
33
|
|
|
288
|
my $for_address = delete($args{for_addr}) || $address; |
379
|
39
|
|
33
|
|
|
379
|
my $for_port = delete($args{for_port}) || $port; |
380
|
|
|
|
|
|
|
|
381
|
39
|
50
|
|
|
|
128
|
croak "allocate() on shut-down connection manager" if $self->[SF_SHUTDOWN]; |
382
|
|
|
|
|
|
|
|
383
|
39
|
|
|
|
|
131
|
my @unknown = sort keys %args; |
384
|
39
|
100
|
|
|
|
147
|
if (@unknown) { |
385
|
1
|
|
|
|
|
124
|
croak "allocate() doesn't accept: @unknown"; |
386
|
|
|
|
|
|
|
} |
387
|
|
|
|
|
|
|
|
388
|
38
|
|
|
|
|
211
|
my $conn_key = ( |
389
|
|
|
|
|
|
|
"$scheme $address $port for $for_scheme $for_address $for_port" |
390
|
|
|
|
|
|
|
); |
391
|
|
|
|
|
|
|
|
392
|
|
|
|
|
|
|
# If we have a connection pool for the scheme/address/port triple, |
393
|
|
|
|
|
|
|
# then we can maybe post an available connection right away. |
394
|
|
|
|
|
|
|
|
395
|
38
|
|
|
|
|
252
|
my $existing_connection = $self->_check_free_pool($conn_key); |
396
|
38
|
100
|
|
|
|
121
|
if (defined $existing_connection) { |
397
|
5
|
|
|
|
|
34
|
$poe_kernel->post( |
398
|
|
|
|
|
|
|
$poe_kernel->get_active_session, |
399
|
|
|
|
|
|
|
$event => { |
400
|
|
|
|
|
|
|
addr => $address, |
401
|
|
|
|
|
|
|
context => $context, |
402
|
|
|
|
|
|
|
port => $port, |
403
|
|
|
|
|
|
|
scheme => $scheme, |
404
|
|
|
|
|
|
|
connection => $existing_connection, |
405
|
|
|
|
|
|
|
from_cache => "immediate", |
406
|
|
|
|
|
|
|
} |
407
|
|
|
|
|
|
|
); |
408
|
5
|
|
|
|
|
502
|
return; |
409
|
|
|
|
|
|
|
} |
410
|
|
|
|
|
|
|
|
411
|
|
|
|
|
|
|
# We can't honor the request immediately, so it's put into a queue. |
412
|
33
|
|
|
|
|
51
|
DEBUG and warn "ALLOCATE: enqueuing request for $conn_key"; |
413
|
|
|
|
|
|
|
|
414
|
33
|
|
|
|
|
210
|
my $request = [ |
415
|
|
|
|
|
|
|
$poe_kernel->get_active_session(), # RQ_SESSION |
416
|
|
|
|
|
|
|
$event, # RQ_EVENT |
417
|
|
|
|
|
|
|
$scheme, # RQ_SCHEME |
418
|
|
|
|
|
|
|
$address, # RQ_ADDRESS |
419
|
|
|
|
|
|
|
undef, # RQ_IP |
420
|
|
|
|
|
|
|
$port, # RQ_PORT |
421
|
|
|
|
|
|
|
$conn_key, # RQ_CONN_KEY |
422
|
|
|
|
|
|
|
$context, # RQ_CONTEXT |
423
|
|
|
|
|
|
|
$timeout, # RQ_TIMEOUT |
424
|
|
|
|
|
|
|
time(), # RQ_START |
425
|
|
|
|
|
|
|
undef, # RQ_TIMER_ID |
426
|
|
|
|
|
|
|
undef, # RQ_WHEEL_ID |
427
|
|
|
|
|
|
|
1, # RQ_ACTIVE |
428
|
|
|
|
|
|
|
_allocate_req_id(), # RQ_ID |
429
|
|
|
|
|
|
|
undef, # RQ_ADDR_FAM |
430
|
|
|
|
|
|
|
$for_scheme, # RQ_FOR_SCHEME |
431
|
|
|
|
|
|
|
$for_address, # RQ_FOR_ADDRESS |
432
|
|
|
|
|
|
|
$for_port, # RQ_FOR_PORT |
433
|
|
|
|
|
|
|
undef, # RQ_RESOLVER_ID |
434
|
|
|
|
|
|
|
]; |
435
|
|
|
|
|
|
|
|
436
|
33
|
|
|
|
|
130
|
$self->[SF_REQ_INDEX]{$request->[RQ_ID]} = $request; |
437
|
|
|
|
|
|
|
|
438
|
33
|
|
|
|
|
161
|
$poe_kernel->refcount_increment( |
439
|
|
|
|
|
|
|
$request->[RQ_SESSION]->ID(), |
440
|
|
|
|
|
|
|
"poco-client-keepalive" |
441
|
|
|
|
|
|
|
); |
442
|
|
|
|
|
|
|
|
443
|
33
|
|
|
|
|
1609
|
$poe_kernel->call($self->[SF_ALIAS], ka_set_timeout => $request); |
444
|
33
|
|
|
|
|
3585
|
$poe_kernel->call($self->[SF_ALIAS], ka_resolve_request => $request); |
445
|
|
|
|
|
|
|
|
446
|
33
|
|
|
|
|
1842
|
return $request->[RQ_ID]; |
447
|
|
|
|
|
|
|
} |
448
|
|
|
|
|
|
|
|
449
|
|
|
|
|
|
|
sub deallocate { |
450
|
0
|
|
|
0
|
1
|
0
|
my ($self, $req_id) = @_; |
451
|
|
|
|
|
|
|
|
452
|
0
|
0
|
0
|
|
|
0
|
croak "deallocate() requires a request ID" unless( |
453
|
|
|
|
|
|
|
defined($req_id) and exists($active_req_ids{$req_id}) |
454
|
|
|
|
|
|
|
); |
455
|
|
|
|
|
|
|
|
456
|
0
|
|
|
|
|
0
|
my $request = delete $self->[SF_REQ_INDEX]{$req_id}; |
457
|
0
|
0
|
|
|
|
0
|
unless (defined $request) { |
458
|
0
|
|
|
|
|
0
|
DEBUG_DEALLOCATE and warn "deallocate could not find request $req_id"; |
459
|
0
|
|
|
|
|
0
|
return; |
460
|
|
|
|
|
|
|
} |
461
|
0
|
|
|
|
|
0
|
_free_req_id($request->[RQ_ID]); |
462
|
|
|
|
|
|
|
|
463
|
|
|
|
|
|
|
# Now pass the vetted request & its ID into our manager session. |
464
|
0
|
|
|
|
|
0
|
$poe_kernel->call($self->[SF_ALIAS], "ka_deallocate", $request, $req_id); |
465
|
|
|
|
|
|
|
} |
466
|
|
|
|
|
|
|
|
467
|
|
|
|
|
|
|
sub _ka_deallocate { |
468
|
0
|
|
|
0
|
|
0
|
my ($self, $heap, $request, $req_id) = @_[OBJECT, HEAP, ARG0, ARG1]; |
469
|
|
|
|
|
|
|
|
470
|
0
|
|
|
|
|
0
|
my $conn_key = $request->[RQ_CONN_KEY]; |
471
|
0
|
|
|
|
|
0
|
my $existing_connection = $self->_check_free_pool($conn_key); |
472
|
|
|
|
|
|
|
|
473
|
|
|
|
|
|
|
# Existing connection. Remove it from the pool, and delete the socket. |
474
|
0
|
0
|
|
|
|
0
|
if (defined $existing_connection) { |
475
|
0
|
|
|
|
|
0
|
$self->_remove_socket_from_pool($existing_connection->{socket}); |
476
|
0
|
|
|
|
|
0
|
DEBUG_DEALLOCATE and warn( |
477
|
|
|
|
|
|
|
"deallocate called, deleted already-connected socket" |
478
|
|
|
|
|
|
|
); |
479
|
0
|
|
|
|
|
0
|
return; |
480
|
|
|
|
|
|
|
} |
481
|
|
|
|
|
|
|
|
482
|
|
|
|
|
|
|
# No connection yet. Cancel the request. |
483
|
0
|
|
|
|
|
0
|
DEBUG_DEALLOCATE and warn( |
484
|
|
|
|
|
|
|
"deallocate called without an existing connection. ", |
485
|
|
|
|
|
|
|
"cancelling connection request" |
486
|
|
|
|
|
|
|
); |
487
|
|
|
|
|
|
|
|
488
|
0
|
0
|
|
|
|
0
|
unless (exists $heap->{dns_requests}{$request->[RQ_ADDRESS]}) { |
489
|
0
|
|
|
|
|
0
|
DEBUG_DEALLOCATE and warn( |
490
|
|
|
|
|
|
|
"deallocate cannot cancel dns -- no pending request" |
491
|
|
|
|
|
|
|
); |
492
|
0
|
|
|
|
|
0
|
return; |
493
|
|
|
|
|
|
|
} |
494
|
|
|
|
|
|
|
|
495
|
0
|
|
|
|
|
0
|
$poe_kernel->call( $self->[SF_ALIAS], ka_cancel_dns_response => $request ); |
496
|
0
|
|
|
|
|
0
|
return; |
497
|
|
|
|
|
|
|
} |
498
|
|
|
|
|
|
|
|
499
|
|
|
|
|
|
|
sub _ka_cancel_dns_response { |
500
|
0
|
|
|
0
|
|
0
|
my ($self, $kernel, $heap, $request) = @_[OBJECT, KERNEL, HEAP, ARG0]; |
501
|
|
|
|
|
|
|
|
502
|
0
|
|
|
|
|
0
|
my $address = $request->[RQ_ADDRESS]; |
503
|
0
|
|
|
|
|
0
|
DEBUG_DNS and warn "DNS: canceling request for $address\n"; |
504
|
|
|
|
|
|
|
|
505
|
0
|
|
|
|
|
0
|
my $requests = $heap->{dns_requests}{$address}; |
506
|
|
|
|
|
|
|
|
507
|
|
|
|
|
|
|
# Remove the resolver request for the address of this connection |
508
|
|
|
|
|
|
|
# request |
509
|
|
|
|
|
|
|
|
510
|
0
|
|
|
|
|
0
|
my $req_index = @$requests; |
511
|
0
|
|
|
|
|
0
|
while ($req_index--) { |
512
|
0
|
0
|
|
|
|
0
|
next unless $requests->[$req_index] == $request; |
513
|
0
|
|
|
|
|
0
|
splice(@$requests, $req_index, 1); |
514
|
0
|
|
|
|
|
0
|
last; |
515
|
|
|
|
|
|
|
} |
516
|
|
|
|
|
|
|
|
517
|
|
|
|
|
|
|
# Clean up the structure for the address if there are no more |
518
|
|
|
|
|
|
|
# requests to resolve that address. |
519
|
|
|
|
|
|
|
|
520
|
0
|
0
|
|
|
|
0
|
unless (@$requests) { |
521
|
0
|
|
|
|
|
0
|
DEBUG_DNS and warn "DNS: canceled all requests for $address"; |
522
|
0
|
|
|
|
|
0
|
$self->[SF_RESOLVER]->cancel( $request->[RQ_RESOLVER_ID] ); |
523
|
0
|
|
|
|
|
0
|
delete $heap->{dns_requests}{$address}; |
524
|
|
|
|
|
|
|
} |
525
|
|
|
|
|
|
|
|
526
|
|
|
|
|
|
|
# cancel our attempt to connect |
527
|
0
|
|
|
|
|
0
|
$poe_kernel->alarm_remove( $request->[RQ_TIMER_ID] ); |
528
|
0
|
|
|
|
|
0
|
$poe_kernel->refcount_decrement( |
529
|
|
|
|
|
|
|
$request->[RQ_SESSION]->ID(), "poco-client-keepalive" |
530
|
|
|
|
|
|
|
); |
531
|
|
|
|
|
|
|
} |
532
|
|
|
|
|
|
|
|
533
|
|
|
|
|
|
|
# Set the request's timeout, in the component's context. |
534
|
|
|
|
|
|
|
|
535
|
|
|
|
|
|
|
sub _ka_set_timeout { |
536
|
33
|
|
|
33
|
|
3367
|
my ($kernel, $request) = @_[KERNEL, ARG0]; |
537
|
33
|
|
|
|
|
344
|
$request->[RQ_TIMER_ID] = $kernel->delay_set( |
538
|
|
|
|
|
|
|
ka_request_timeout => $request->[RQ_TIMEOUT], $request |
539
|
|
|
|
|
|
|
); |
540
|
|
|
|
|
|
|
} |
541
|
|
|
|
|
|
|
|
542
|
|
|
|
|
|
|
# The request has timed out. Mark it as defunct, and respond with an |
543
|
|
|
|
|
|
|
# ETIMEDOUT error. |
544
|
|
|
|
|
|
|
|
545
|
|
|
|
|
|
|
sub _ka_request_timeout { |
546
|
2
|
|
|
2
|
|
1080
|
my ($self, $kernel, $request) = @_[OBJECT, KERNEL, ARG0]; |
547
|
|
|
|
|
|
|
|
548
|
2
|
|
|
|
|
3
|
DEBUG and warn( |
549
|
|
|
|
|
|
|
"CON: request from session ", $request->[RQ_SESSION]->ID, |
550
|
|
|
|
|
|
|
" for address ", $request->[RQ_ADDRESS], " timed out" |
551
|
|
|
|
|
|
|
); |
552
|
2
|
|
|
|
|
8
|
$! = ETIMEDOUT; |
553
|
|
|
|
|
|
|
|
554
|
|
|
|
|
|
|
# The easiest way to do this? Simulate an error from the wheel |
555
|
|
|
|
|
|
|
# itself. |
556
|
|
|
|
|
|
|
|
557
|
2
|
50
|
|
|
|
19
|
if (defined $request->[RQ_WHEEL_ID]) { |
558
|
0
|
|
|
|
|
0
|
@_[ARG0..ARG3] = ("connect", $!+0, "$!", $request->[RQ_WHEEL_ID]); |
559
|
0
|
|
|
|
|
0
|
goto &_ka_conn_failure; |
560
|
|
|
|
|
|
|
} |
561
|
|
|
|
|
|
|
|
562
|
2
|
|
|
|
|
42
|
my ($errnum, $errstr) = ($!+0, "$!"); |
563
|
|
|
|
|
|
|
|
564
|
|
|
|
|
|
|
# No wheel yet. It must have timed out in connect. |
565
|
2
|
100
|
|
|
|
18
|
if ($request->[RQ_RESOLVER_ID]) { |
566
|
1
|
|
|
|
|
12
|
$self->[SF_RESOLVER]->cancel( $request->[RQ_RESOLVER_ID] ); |
567
|
1
|
|
|
|
|
17657
|
$request->[RQ_RESOLVER_ID] = undef; |
568
|
|
|
|
|
|
|
} |
569
|
|
|
|
|
|
|
|
570
|
|
|
|
|
|
|
# But what if there is no wheel? |
571
|
2
|
|
|
|
|
55
|
_respond_with_error($request, "connect", $errnum, $errstr), |
572
|
|
|
|
|
|
|
} |
573
|
|
|
|
|
|
|
|
574
|
|
|
|
|
|
|
# Connection failed. Remove the SF_WHEELS record corresponding to the |
575
|
|
|
|
|
|
|
# request. Remove the SF_USED placeholder record so it won't count |
576
|
|
|
|
|
|
|
# anymore. Send a failure notice to the requester. |
577
|
|
|
|
|
|
|
|
578
|
|
|
|
|
|
|
sub _ka_conn_failure { |
579
|
2
|
|
|
2
|
|
1158
|
my ($self, $func, $errnum, $errstr, $wheel_id) = @_[OBJECT, ARG0..ARG3]; |
580
|
|
|
|
|
|
|
|
581
|
2
|
|
|
|
|
4
|
DEBUG and warn "CON: sending $errstr for function $func"; |
582
|
|
|
|
|
|
|
# Remove the SF_WHEELS record. |
583
|
2
|
|
|
|
|
8
|
my $wheel_rec = delete $self->[SF_WHEELS]{$wheel_id}; |
584
|
2
|
|
|
|
|
5
|
my $request = $wheel_rec->[WHEEL_REQUEST]; |
585
|
|
|
|
|
|
|
|
586
|
|
|
|
|
|
|
# Remove the SF_USED placeholder. |
587
|
2
|
|
|
|
|
7
|
delete $self->[SF_USED]{$wheel_id}; |
588
|
|
|
|
|
|
|
|
589
|
|
|
|
|
|
|
# remove the wheel-to-request index |
590
|
2
|
|
|
|
|
5
|
delete $self->[SF_REQ_INDEX]{$request->[RQ_ID]}; |
591
|
2
|
|
|
|
|
9
|
_free_req_id($request->[RQ_ID]); |
592
|
|
|
|
|
|
|
|
593
|
|
|
|
|
|
|
# Discount the use by request key, removing the SF_USED record |
594
|
|
|
|
|
|
|
# entirely if it's now moot. |
595
|
2
|
|
|
|
|
3
|
my $request_key = $request->[RQ_CONN_KEY]; |
596
|
2
|
|
|
|
|
9
|
$self->_decrement_used_each($request_key); |
597
|
|
|
|
|
|
|
|
598
|
|
|
|
|
|
|
# Tell the requester about the failure. |
599
|
2
|
|
|
|
|
9
|
_respond_with_error($request, $func, $errnum, $errstr), |
600
|
|
|
|
|
|
|
|
601
|
|
|
|
|
|
|
$self->_ka_wake_up($_[KERNEL]); |
602
|
|
|
|
|
|
|
} |
603
|
|
|
|
|
|
|
|
604
|
|
|
|
|
|
|
# Connection succeeded. Remove the SF_WHEELS record corresponding to |
605
|
|
|
|
|
|
|
# the request. Flesh out the placeholder SF_USED record so it counts. |
606
|
|
|
|
|
|
|
|
607
|
|
|
|
|
|
|
sub _ka_conn_success { |
608
|
24
|
|
|
24
|
|
30566
|
my ($self, $socket, $wheel_id) = @_[OBJECT, ARG0, ARG3]; |
609
|
|
|
|
|
|
|
|
610
|
|
|
|
|
|
|
# Remove the SF_WHEELS record. |
611
|
24
|
|
|
|
|
89
|
my $wheel_rec = delete $self->[SF_WHEELS]{$wheel_id}; |
612
|
24
|
|
|
|
|
237
|
my $request = $wheel_rec->[WHEEL_REQUEST]; |
613
|
|
|
|
|
|
|
|
614
|
|
|
|
|
|
|
# remove the wheel-to-request index |
615
|
24
|
|
|
|
|
97
|
delete $self->[SF_REQ_INDEX]{$request->[RQ_ID]}; |
616
|
24
|
|
|
|
|
140
|
_free_req_id($request->[RQ_ID]); |
617
|
|
|
|
|
|
|
|
618
|
|
|
|
|
|
|
# Remove the SF_USED placeholder, add in the socket, and store it |
619
|
|
|
|
|
|
|
# properly. |
620
|
24
|
|
|
|
|
67
|
my $used = delete $self->[SF_USED]{$wheel_id}; |
621
|
|
|
|
|
|
|
|
622
|
24
|
50
|
|
|
|
137
|
unless ($request->[RQ_SCHEME] eq 'https') { |
623
|
24
|
|
|
|
|
452
|
$self->_store_socket($used, $socket); |
624
|
24
|
|
|
|
|
107
|
$self->_send_back_socket($request, $socket); |
625
|
24
|
|
|
|
|
326
|
return; |
626
|
|
|
|
|
|
|
} |
627
|
|
|
|
|
|
|
|
628
|
|
|
|
|
|
|
# HTTPS here. |
629
|
|
|
|
|
|
|
# Really applies to all SSL schemes. |
630
|
|
|
|
|
|
|
|
631
|
0
|
0
|
|
|
|
0
|
unless ($ssl_available) { |
632
|
0
|
|
|
|
|
0
|
die "There is no SSL support, please install POE::Component::SSLify"; |
633
|
|
|
|
|
|
|
} |
634
|
|
|
|
|
|
|
|
635
|
0
|
|
|
|
|
0
|
eval { |
636
|
0
|
|
|
|
|
0
|
$socket = POE::Component::SSLify::Client_SSLify( |
637
|
|
|
|
|
|
|
$socket, |
638
|
|
|
|
|
|
|
|
639
|
|
|
|
|
|
|
# TODO - To make non-blocking sslify work, I need to somehow |
640
|
|
|
|
|
|
|
# defer the response until the following callback says it's |
641
|
|
|
|
|
|
|
# fine. Or if the callback says there's an error, it needs to |
642
|
|
|
|
|
|
|
# be propagated out. |
643
|
|
|
|
|
|
|
# |
644
|
|
|
|
|
|
|
# Problem is, just setting the callback doesn't seem to get the |
645
|
|
|
|
|
|
|
# connection to complete (successfully or otherwise). There |
646
|
|
|
|
|
|
|
# needs to be something more going on... but what? |
647
|
|
|
|
|
|
|
|
648
|
|
|
|
|
|
|
# sub { |
649
|
|
|
|
|
|
|
# my ($socket, $status, $errval) = @_; |
650
|
|
|
|
|
|
|
# $errval = 'undef' unless defined $errval; |
651
|
|
|
|
|
|
|
# |
652
|
|
|
|
|
|
|
# warn "socket($socket) status($status) errval($errval)"; |
653
|
|
|
|
|
|
|
# |
654
|
|
|
|
|
|
|
# # Connected okay. |
655
|
|
|
|
|
|
|
# if ($status == 1) { |
656
|
|
|
|
|
|
|
# $self->_send_back_socket($request, $socket); |
657
|
|
|
|
|
|
|
# $self = $request = undef; |
658
|
|
|
|
|
|
|
# return; |
659
|
|
|
|
|
|
|
# } |
660
|
|
|
|
|
|
|
# |
661
|
|
|
|
|
|
|
# # Didn't connect okay, or hasn't so far. |
662
|
|
|
|
|
|
|
# # Report the error. |
663
|
|
|
|
|
|
|
# if ($errval == 1) { |
664
|
|
|
|
|
|
|
# |
665
|
|
|
|
|
|
|
# # Get all known errors, but only retain the most recent one. |
666
|
|
|
|
|
|
|
# # I'm not sure this is needed, but the API mentions an error |
667
|
|
|
|
|
|
|
# # queue, which implies that it could contain stale errors. |
668
|
|
|
|
|
|
|
# |
669
|
|
|
|
|
|
|
# my $errnum; |
670
|
|
|
|
|
|
|
# while (my $new_errnum = Net::SSLeay::ERR_get_error()) { |
671
|
|
|
|
|
|
|
# $errnum = $new_errnum; |
672
|
|
|
|
|
|
|
# } |
673
|
|
|
|
|
|
|
# |
674
|
|
|
|
|
|
|
# my $errstr = Net::SSLeay::ERR_error_string($errnum); |
675
|
|
|
|
|
|
|
# warn " ssl_error($errnum) string($errstr)"; |
676
|
|
|
|
|
|
|
# _respond_with_error($request, "sslify", undef, $errstr); |
677
|
|
|
|
|
|
|
# |
678
|
|
|
|
|
|
|
# # TODO - May the circle be broken. |
679
|
|
|
|
|
|
|
# $self = $request = undef; |
680
|
|
|
|
|
|
|
# return; |
681
|
|
|
|
|
|
|
# } |
682
|
|
|
|
|
|
|
# } |
683
|
|
|
|
|
|
|
); |
684
|
|
|
|
|
|
|
}; |
685
|
|
|
|
|
|
|
|
686
|
0
|
0
|
|
|
|
0
|
if ($@) { |
687
|
0
|
|
|
|
|
0
|
_respond_with_error($request, "sslify", undef, "$@"); |
688
|
0
|
|
|
|
|
0
|
return; |
689
|
|
|
|
|
|
|
} |
690
|
|
|
|
|
|
|
|
691
|
|
|
|
|
|
|
# TODO - I think for SSL we just need to _store_socket(). The call |
692
|
|
|
|
|
|
|
# to _send_back_socket() should be inside the SSL callback. |
693
|
|
|
|
|
|
|
# |
694
|
|
|
|
|
|
|
# Also, I think the callback might leak. $request and $self may |
695
|
|
|
|
|
|
|
# need to be weakened. |
696
|
|
|
|
|
|
|
|
697
|
0
|
|
|
|
|
0
|
$self->_store_socket($used, $socket); |
698
|
0
|
|
|
|
|
0
|
$self->_send_back_socket($request, $socket); |
699
|
|
|
|
|
|
|
} |
700
|
|
|
|
|
|
|
|
701
|
|
|
|
|
|
|
sub _store_socket { |
702
|
24
|
|
|
24
|
|
52
|
my ($self, $used, $socket) = @_; |
703
|
24
|
|
|
|
|
62
|
$used->[USED_SOCKET] = $socket; |
704
|
24
|
|
|
|
|
134
|
$self->[SF_USED]{$socket} = $used; |
705
|
|
|
|
|
|
|
} |
706
|
|
|
|
|
|
|
|
707
|
|
|
|
|
|
|
sub _send_back_socket { |
708
|
24
|
|
|
24
|
|
66
|
my ($self, $request, $socket) = @_; |
709
|
|
|
|
|
|
|
|
710
|
24
|
|
|
|
|
44
|
DEBUG and warn( |
711
|
|
|
|
|
|
|
"CON: posting... to $request->[RQ_SESSION] . $request->[RQ_EVENT]" |
712
|
|
|
|
|
|
|
); |
713
|
|
|
|
|
|
|
|
714
|
|
|
|
|
|
|
# Build a connection object around the socket. |
715
|
24
|
|
|
|
|
424
|
my $connection = POE::Component::Connection::Keepalive->new( |
716
|
|
|
|
|
|
|
socket => $socket, |
717
|
|
|
|
|
|
|
manager => $self, |
718
|
|
|
|
|
|
|
); |
719
|
|
|
|
|
|
|
|
720
|
|
|
|
|
|
|
# Give the socket to the requester. |
721
|
24
|
|
|
|
|
182
|
_respond( |
722
|
|
|
|
|
|
|
$request, { |
723
|
|
|
|
|
|
|
connection => $connection, |
724
|
|
|
|
|
|
|
} |
725
|
|
|
|
|
|
|
); |
726
|
|
|
|
|
|
|
} |
727
|
|
|
|
|
|
|
|
728
|
|
|
|
|
|
|
# The user is done with a socket. Make it available for reuse. |
729
|
|
|
|
|
|
|
|
730
|
|
|
|
|
|
|
sub free { |
731
|
36
|
|
|
36
|
1
|
2438
|
my ($self, $socket) = @_; |
732
|
|
|
|
|
|
|
|
733
|
36
|
100
|
|
|
|
179
|
return if $self->[SF_SHUTDOWN]; |
734
|
34
|
|
|
|
|
59
|
DEBUG and warn "FREE: freeing socket"; |
735
|
|
|
|
|
|
|
|
736
|
|
|
|
|
|
|
# Remove the accompanying SF_USED record. |
737
|
34
|
100
|
|
|
|
516
|
croak "can't free() undefined socket" unless defined $socket; |
738
|
33
|
|
|
|
|
141
|
my $used = delete $self->[SF_USED]{$socket}; |
739
|
33
|
100
|
|
|
|
218
|
croak "can't free() unallocated socket" unless defined $used; |
740
|
|
|
|
|
|
|
|
741
|
|
|
|
|
|
|
# Reclaim the socket. |
742
|
32
|
|
|
|
|
1457
|
$poe_kernel->call($self->[SF_ALIAS], "ka_reclaim_socket", $used); |
743
|
|
|
|
|
|
|
|
744
|
|
|
|
|
|
|
# Avoid returning things by mistake. |
745
|
32
|
|
|
|
|
918
|
return; |
746
|
|
|
|
|
|
|
} |
747
|
|
|
|
|
|
|
|
748
|
|
|
|
|
|
|
# A sink for deliberately unhandled events. |
749
|
|
|
|
|
|
|
|
750
|
0
|
|
|
0
|
|
0
|
sub _ka_ignore_this_event { |
751
|
|
|
|
|
|
|
# Do nothing. |
752
|
|
|
|
|
|
|
} |
753
|
|
|
|
|
|
|
|
754
|
|
|
|
|
|
|
# An internal method to fetch a socket from the free pool, if one |
755
|
|
|
|
|
|
|
# exists. |
756
|
|
|
|
|
|
|
|
757
|
|
|
|
|
|
|
sub _check_free_pool { |
758
|
70
|
|
|
70
|
|
197
|
my ($self, $conn_key) = @_; |
759
|
|
|
|
|
|
|
|
760
|
70
|
100
|
|
|
|
1168
|
return unless exists $self->[SF_POOL]{$conn_key}; |
761
|
|
|
|
|
|
|
|
762
|
10
|
|
|
|
|
26
|
my $free = $self->[SF_POOL]{$conn_key}; |
763
|
|
|
|
|
|
|
|
764
|
10
|
|
|
|
|
17
|
DEBUG and warn "CHECK: reusing $conn_key"; |
765
|
|
|
|
|
|
|
|
766
|
10
|
|
|
|
|
40
|
my $next_socket = (values %$free)[0]; |
767
|
10
|
|
|
|
|
35
|
delete $free->{$next_socket}; |
768
|
10
|
100
|
|
|
|
49
|
unless (keys %$free) { |
769
|
8
|
|
|
|
|
22
|
delete $self->[SF_POOL]{$conn_key}; |
770
|
|
|
|
|
|
|
} |
771
|
|
|
|
|
|
|
|
772
|
|
|
|
|
|
|
# _check_free_pool() may be operating in another session, so we call |
773
|
|
|
|
|
|
|
# the correct one here. |
774
|
10
|
|
|
|
|
48
|
$poe_kernel->call($self->[SF_ALIAS], "ka_relinquish_socket", $next_socket); |
775
|
|
|
|
|
|
|
|
776
|
10
|
|
|
|
|
14039
|
$self->[SF_USED]{$next_socket} = [ |
777
|
|
|
|
|
|
|
$next_socket, # USED_SOCKET |
778
|
|
|
|
|
|
|
time(), # USED_TIME |
779
|
|
|
|
|
|
|
$conn_key, # USED_KEY |
780
|
|
|
|
|
|
|
]; |
781
|
|
|
|
|
|
|
|
782
|
10
|
|
|
|
|
41
|
delete $self->[SF_SOCKETS]{$next_socket}; |
783
|
|
|
|
|
|
|
|
784
|
10
|
|
|
|
|
41
|
$self->[SF_USED_EACH]{$conn_key}++; |
785
|
|
|
|
|
|
|
|
786
|
|
|
|
|
|
|
# Build a connection object around the socket. |
787
|
10
|
|
|
|
|
314
|
my $connection = POE::Component::Connection::Keepalive->new( |
788
|
|
|
|
|
|
|
socket => $next_socket, |
789
|
|
|
|
|
|
|
manager => $self, |
790
|
|
|
|
|
|
|
); |
791
|
|
|
|
|
|
|
|
792
|
10
|
|
|
|
|
36
|
return $connection; |
793
|
|
|
|
|
|
|
} |
794
|
|
|
|
|
|
|
|
795
|
|
|
|
|
|
|
sub _decrement_used_each { |
796
|
34
|
|
|
34
|
|
320
|
my ($self, $request_key) = @_; |
797
|
34
|
100
|
|
|
|
272
|
unless (--$self->[SF_USED_EACH]{$request_key}) { |
798
|
29
|
|
|
|
|
107
|
delete $self->[SF_USED_EACH]{$request_key}; |
799
|
|
|
|
|
|
|
} |
800
|
|
|
|
|
|
|
} |
801
|
|
|
|
|
|
|
|
802
|
|
|
|
|
|
|
# Reclaim a socket. Put it in the free socket pool, and wrap it with |
803
|
|
|
|
|
|
|
# select_read() to discard any data and detect when it's closed. |
804
|
|
|
|
|
|
|
|
805
|
|
|
|
|
|
|
sub _ka_reclaim_socket { |
806
|
32
|
|
|
32
|
|
10133
|
my ($self, $kernel, $used) = @_[OBJECT, KERNEL, ARG0]; |
807
|
|
|
|
|
|
|
|
808
|
32
|
|
|
|
|
98
|
my $socket = $used->[USED_SOCKET]; |
809
|
|
|
|
|
|
|
|
810
|
|
|
|
|
|
|
# Decrement the usage counter for the given connection key. |
811
|
32
|
|
|
|
|
79
|
my $request_key = $used->[USED_KEY]; |
812
|
32
|
|
|
|
|
322
|
$self->_decrement_used_each($request_key); |
813
|
|
|
|
|
|
|
|
814
|
|
|
|
|
|
|
# Socket is closed. We can't reuse it. |
815
|
32
|
100
|
|
|
|
145
|
unless (defined fileno $socket) { |
816
|
2
|
|
|
|
|
3
|
DEBUG and warn "RECLAIM: freed socket has previously been closed"; |
817
|
2
|
|
|
|
|
81
|
goto &_ka_wake_up; |
818
|
|
|
|
|
|
|
} |
819
|
|
|
|
|
|
|
|
820
|
|
|
|
|
|
|
# Socket is still open. Check for lingering data. |
821
|
30
|
|
|
|
|
44
|
DEBUG and warn "RECLAIM: checking if socket still works"; |
822
|
|
|
|
|
|
|
|
823
|
|
|
|
|
|
|
# Check for data on the socket, which implies that the server |
824
|
|
|
|
|
|
|
# doesn't know we're done. That leads to desynchroniziation on the |
825
|
|
|
|
|
|
|
# protocol level, which strongly implies that we can't reuse the |
826
|
|
|
|
|
|
|
# socket. In this case, we'll make a quick attempt at fetching all |
827
|
|
|
|
|
|
|
# the data, then close the socket. |
828
|
|
|
|
|
|
|
|
829
|
30
|
|
|
|
|
96
|
my $rin = ''; |
830
|
30
|
|
|
|
|
309
|
vec($rin, fileno($socket), 1) = 1; |
831
|
30
|
|
|
|
|
147
|
my ($rout, $eout); |
832
|
30
|
|
|
|
|
303
|
my $socket_is_active = select ($rout=$rin, undef, $eout=$rin, 0); |
833
|
|
|
|
|
|
|
|
834
|
30
|
100
|
|
|
|
109
|
if ($socket_is_active) { |
835
|
3
|
|
|
|
|
6
|
DEBUG and warn "RECLAIM: socket is still active; trying to drain"; |
836
|
15
|
|
|
15
|
|
196
|
use bytes; |
|
15
|
|
|
|
|
39
|
|
|
15
|
|
|
|
|
269
|
|
837
|
|
|
|
|
|
|
|
838
|
3
|
|
50
|
|
|
152
|
my $socket_had_data = sysread($socket, my $buf = "", 65536) || 0; |
839
|
3
|
|
|
|
|
7
|
DEBUG and warn "RECLAIM: socket had $socket_had_data bytes. 0 means EOF"; |
840
|
3
|
|
|
|
|
4
|
DEBUG and warn "RECLAIM: Giving up on socket."; |
841
|
|
|
|
|
|
|
|
842
|
|
|
|
|
|
|
# Avoid common FIN_WAIT_2 issues, but only for valid sockets. |
843
|
|
|
|
|
|
|
#if ($socket_had_data and fileno($socket)) { |
844
|
3
|
50
|
|
|
|
11
|
if ($socket_had_data) { |
845
|
0
|
|
|
|
|
0
|
my $opt_result = setsockopt( |
846
|
|
|
|
|
|
|
$socket, SOL_SOCKET, SO_LINGER, pack("sll",1,0,0) |
847
|
|
|
|
|
|
|
); |
848
|
0
|
0
|
0
|
|
|
0
|
die "setsockopt: " . ($!+0) . " $!" if (not $opt_result and $! != EBADF); |
849
|
|
|
|
|
|
|
} |
850
|
|
|
|
|
|
|
|
851
|
3
|
|
|
|
|
24
|
goto &_ka_wake_up; |
852
|
|
|
|
|
|
|
} |
853
|
|
|
|
|
|
|
|
854
|
|
|
|
|
|
|
# Socket is alive and has no data, so it's in a quiet, theoretically |
855
|
|
|
|
|
|
|
# reclaimable state. |
856
|
|
|
|
|
|
|
|
857
|
27
|
|
|
|
|
44
|
DEBUG and warn "RECLAIM: reclaiming socket"; |
858
|
|
|
|
|
|
|
|
859
|
|
|
|
|
|
|
# Watch the socket, and set a keep-alive timeout. |
860
|
27
|
|
|
|
|
142
|
$kernel->select_read($socket, "ka_socket_activity"); |
861
|
27
|
|
|
|
|
4068
|
my $timer_id = $kernel->delay_set( |
862
|
|
|
|
|
|
|
ka_keepalive_timeout => $self->[SF_KEEPALIVE], $socket |
863
|
|
|
|
|
|
|
); |
864
|
|
|
|
|
|
|
|
865
|
|
|
|
|
|
|
# Record the socket as free to be used. |
866
|
27
|
|
|
|
|
4100
|
$self->[SF_POOL]{$request_key}{$socket} = $socket; |
867
|
27
|
|
|
|
|
125
|
$self->[SF_SOCKETS]{$socket} = [ |
868
|
|
|
|
|
|
|
$request_key, # SK_KEY |
869
|
|
|
|
|
|
|
$timer_id, # SK_TIMER |
870
|
|
|
|
|
|
|
]; |
871
|
|
|
|
|
|
|
|
872
|
27
|
|
|
|
|
224
|
goto &_ka_wake_up; |
873
|
|
|
|
|
|
|
} |
874
|
|
|
|
|
|
|
|
875
|
|
|
|
|
|
|
# Socket timed out. Discard it. |
876
|
|
|
|
|
|
|
|
877
|
|
|
|
|
|
|
sub _ka_keepalive_timeout { |
878
|
3
|
|
|
3
|
|
1998447
|
my ($self, $socket) = @_[OBJECT, ARG0]; |
879
|
3
|
|
|
|
|
24
|
$self->_remove_socket_from_pool($socket); |
880
|
|
|
|
|
|
|
} |
881
|
|
|
|
|
|
|
|
882
|
|
|
|
|
|
|
# Relinquish a socket. Stop selecting on it. |
883
|
|
|
|
|
|
|
|
884
|
|
|
|
|
|
|
sub _ka_relinquish_socket { |
885
|
10
|
|
|
10
|
|
700
|
my ($kernel, $socket) = @_[KERNEL, ARG0]; |
886
|
10
|
|
|
|
|
68
|
$kernel->alarm_remove($_[OBJECT][SF_SOCKETS]{$socket}[SK_TIMER]); |
887
|
10
|
|
|
|
|
1283
|
$kernel->select_read($socket, undef); |
888
|
|
|
|
|
|
|
} |
889
|
|
|
|
|
|
|
|
890
|
|
|
|
|
|
|
# Shut down the component. Release any sockets we're currently |
891
|
|
|
|
|
|
|
# holding onto. Clean up any timers. Remove the alias it's known by. |
892
|
|
|
|
|
|
|
|
893
|
|
|
|
|
|
|
sub shutdown { |
894
|
29
|
|
|
29
|
1
|
6064
|
my $self = shift; |
895
|
29
|
100
|
|
|
|
1737
|
return if $self->[SF_SHUTDOWN]; |
896
|
15
|
|
|
|
|
84
|
$poe_kernel->call($self->[SF_ALIAS], "ka_shutdown"); |
897
|
|
|
|
|
|
|
} |
898
|
|
|
|
|
|
|
|
899
|
|
|
|
|
|
|
sub _ka_shutdown { |
900
|
15
|
|
|
15
|
|
1383
|
my ($self, $kernel, $heap) = @_[OBJECT, KERNEL, HEAP]; |
901
|
|
|
|
|
|
|
|
902
|
15
|
50
|
|
|
|
93
|
return if $self->[SF_SHUTDOWN]; |
903
|
|
|
|
|
|
|
|
904
|
15
|
|
|
|
|
75
|
$instances--; |
905
|
|
|
|
|
|
|
|
906
|
|
|
|
|
|
|
# Clean out the request queue. |
907
|
15
|
|
|
|
|
34
|
foreach my $request (@{$self->[SF_QUEUE]}) { |
|
15
|
|
|
|
|
85
|
|
908
|
0
|
|
|
|
|
0
|
$self->_shutdown_request($kernel, $request); |
909
|
|
|
|
|
|
|
} |
910
|
15
|
|
|
|
|
47
|
$self->[SF_QUEUE] = [ ]; |
911
|
|
|
|
|
|
|
|
912
|
|
|
|
|
|
|
# Clean out the socket pool. |
913
|
15
|
|
|
|
|
42
|
foreach my $sockets (values %{$self->[SF_POOL]}) { |
|
15
|
|
|
|
|
76
|
|
914
|
10
|
|
|
|
|
367
|
foreach my $socket (values %$sockets) { |
915
|
10
|
|
|
|
|
64
|
$kernel->alarm_remove($self->[SF_SOCKETS]{$socket}[SK_TIMER]); |
916
|
10
|
|
|
|
|
3495
|
$kernel->select_read($socket, undef); |
917
|
|
|
|
|
|
|
} |
918
|
|
|
|
|
|
|
} |
919
|
|
|
|
|
|
|
|
920
|
|
|
|
|
|
|
# Stop any pending resolver requests. |
921
|
15
|
|
|
|
|
1630
|
foreach my $host (keys %{$heap->{dns_requests}}) { |
|
15
|
|
|
|
|
73
|
|
922
|
0
|
|
|
|
|
0
|
DEBUG and warn "SHT: Shutting down resolver requests for $host"; |
923
|
|
|
|
|
|
|
|
924
|
0
|
|
|
|
|
0
|
foreach my $request (@{$heap->{dns_requests}{$host}}) { |
|
0
|
|
|
|
|
0
|
|
925
|
0
|
|
|
|
|
0
|
$self->_shutdown_request($kernel, $request); |
926
|
|
|
|
|
|
|
} |
927
|
|
|
|
|
|
|
|
928
|
|
|
|
|
|
|
# Technically not needed since the resolver shutdown should do it. |
929
|
|
|
|
|
|
|
# They all share the same host, so canceling the first should get |
930
|
|
|
|
|
|
|
# them all. |
931
|
0
|
|
|
|
|
0
|
$self->[SF_RESOLVER]->cancel( |
932
|
|
|
|
|
|
|
$heap->{dns_requests}{$host}[0][RQ_RESOLVER_ID] |
933
|
|
|
|
|
|
|
); |
934
|
|
|
|
|
|
|
} |
935
|
|
|
|
|
|
|
|
936
|
15
|
|
|
|
|
48
|
$heap->{dns_requests} = { }; |
937
|
|
|
|
|
|
|
|
938
|
|
|
|
|
|
|
# Shut down the resolver. |
939
|
15
|
|
|
|
|
39
|
DEBUG and warn "SHT: Shutting down resolver"; |
940
|
15
|
100
|
|
|
|
101
|
if ( $self->[SF_RESOLVER] != $default_resolver ) { |
941
|
1
|
|
|
|
|
13
|
$self->[SF_RESOLVER]->shutdown(); |
942
|
|
|
|
|
|
|
} |
943
|
15
|
|
|
|
|
13942
|
$self->[SF_RESOLVER] = undef; |
944
|
|
|
|
|
|
|
|
945
|
15
|
100
|
66
|
|
|
203
|
if ( $default_resolver and !$instances ) { |
946
|
14
|
|
|
|
|
1833
|
$default_resolver->shutdown(); |
947
|
14
|
|
|
|
|
91516
|
$default_resolver = undef; |
948
|
|
|
|
|
|
|
} |
949
|
|
|
|
|
|
|
|
950
|
|
|
|
|
|
|
# Finish keepalive's shutdown. |
951
|
15
|
|
|
|
|
208
|
$kernel->alias_remove($self->[SF_ALIAS]); |
952
|
15
|
|
|
|
|
2208
|
$self->[SF_SHUTDOWN] = 1; |
953
|
|
|
|
|
|
|
|
954
|
15
|
|
|
|
|
136
|
return; |
955
|
|
|
|
|
|
|
} |
956
|
|
|
|
|
|
|
|
957
|
|
|
|
|
|
|
sub _shutdown_request { |
958
|
0
|
|
|
0
|
|
0
|
my ($self, $kernel, $request) = @_; |
959
|
|
|
|
|
|
|
|
960
|
0
|
0
|
|
|
|
0
|
if (defined $request->[RQ_TIMER_ID]) { |
961
|
0
|
|
|
|
|
0
|
DEBUG and warn "SHT: Shutting down resolver timer $request->[RQ_TIMER_ID]"; |
962
|
0
|
|
|
|
|
0
|
$kernel->alarm_remove($request->[RQ_TIMER_ID]); |
963
|
|
|
|
|
|
|
} |
964
|
|
|
|
|
|
|
|
965
|
0
|
0
|
|
|
|
0
|
if (defined $request->[RQ_WHEEL_ID]) { |
966
|
0
|
|
|
|
|
0
|
DEBUG and warn "SHT: Shutting down resolver wheel $request->[RQ_TIMER_ID]"; |
967
|
0
|
|
|
|
|
0
|
delete $self->[SF_WHEELS]{$request->[RQ_WHEEL_ID]}; |
968
|
|
|
|
|
|
|
|
969
|
|
|
|
|
|
|
# remove the wheel-to-request index |
970
|
0
|
|
|
|
|
0
|
delete $self->[SF_REQ_INDEX]{$request->[RQ_ID]}; |
971
|
0
|
|
|
|
|
0
|
_free_req_id($request->[RQ_ID]); |
972
|
|
|
|
|
|
|
} |
973
|
|
|
|
|
|
|
|
974
|
0
|
0
|
|
|
|
0
|
if (defined $request->[RQ_SESSION]) { |
975
|
0
|
|
|
|
|
0
|
my $session_id = $request->[RQ_SESSION]->ID; |
976
|
0
|
|
|
|
|
0
|
DEBUG and warn "SHT: Releasing session $session_id"; |
977
|
0
|
|
|
|
|
0
|
$kernel->refcount_decrement($session_id, "poco-client-keepalive"); |
978
|
|
|
|
|
|
|
} |
979
|
|
|
|
|
|
|
} |
980
|
|
|
|
|
|
|
|
981
|
|
|
|
|
|
|
# A socket in the free pool has activity. Read from it and discard |
982
|
|
|
|
|
|
|
# the output. Discard the socket on error or remote closure. |
983
|
|
|
|
|
|
|
|
984
|
|
|
|
|
|
|
sub _ka_socket_activity { |
985
|
1
|
|
|
1
|
|
575
|
my ($self, $kernel, $socket) = @_[OBJECT, KERNEL, ARG0]; |
986
|
|
|
|
|
|
|
|
987
|
1
|
|
|
|
|
1
|
if (DEBUG) { |
988
|
|
|
|
|
|
|
my $socket_rec = $self->[SF_SOCKETS]{$socket}; |
989
|
|
|
|
|
|
|
my $key = $socket_rec->[SK_KEY]; |
990
|
|
|
|
|
|
|
warn "CON: Got activity on socket for $key"; |
991
|
|
|
|
|
|
|
} |
992
|
|
|
|
|
|
|
|
993
|
|
|
|
|
|
|
# Any socket activity on a kept-alive socket implies that the socket |
994
|
|
|
|
|
|
|
# is no longer reusable. |
995
|
|
|
|
|
|
|
|
996
|
15
|
|
|
15
|
|
24157
|
use bytes; |
|
15
|
|
|
|
|
75
|
|
|
15
|
|
|
|
|
88
|
|
997
|
1
|
|
50
|
|
|
174
|
my $socket_had_data = sysread($socket, my $buf = "", 65536) || 0; |
998
|
1
|
|
|
|
|
2
|
DEBUG and warn "CON: socket had $socket_had_data bytes. 0 means EOF"; |
999
|
1
|
|
|
|
|
2
|
DEBUG and warn "CON: Removing socket from the pool"; |
1000
|
|
|
|
|
|
|
|
1001
|
1
|
|
|
|
|
6
|
$self->_remove_socket_from_pool($socket); |
1002
|
|
|
|
|
|
|
} |
1003
|
|
|
|
|
|
|
|
1004
|
|
|
|
|
|
|
sub _ka_resolve_request { |
1005
|
33
|
|
|
33
|
|
2640
|
my ($self, $kernel, $heap, $request) = @_[OBJECT, KERNEL, HEAP, ARG0]; |
1006
|
|
|
|
|
|
|
|
1007
|
33
|
|
|
|
|
77
|
my $host = $request->[RQ_ADDRESS]; |
1008
|
|
|
|
|
|
|
|
1009
|
|
|
|
|
|
|
# Skip DNS resolution if it's already a dotted quad. |
1010
|
|
|
|
|
|
|
# ip_is_ipv4() doesn't require quads, so we count the dots. |
1011
|
|
|
|
|
|
|
# |
1012
|
|
|
|
|
|
|
# TODO - Do the same for IPv6 addresses containing colons? |
1013
|
|
|
|
|
|
|
# TODO - Would require AF_INET6 support around the SocketFactory. |
1014
|
33
|
100
|
66
|
|
|
217
|
if ((($host =~ tr[.][.]) == 3) and ip_is_ipv4($host)) { |
1015
|
6
|
|
|
|
|
213
|
DEBUG_DNS and warn "DNS: $host is a dotted quad; skipping lookup"; |
1016
|
6
|
|
|
|
|
57
|
$kernel->call($self->[SF_ALIAS], ka_add_to_queue => $request); |
1017
|
6
|
|
|
|
|
45
|
return; |
1018
|
|
|
|
|
|
|
} |
1019
|
|
|
|
|
|
|
|
1020
|
|
|
|
|
|
|
# It's already pending DNS resolution. Combine this with previous. |
1021
|
27
|
100
|
|
|
|
148
|
if (exists $heap->{dns_requests}{$host}) { |
1022
|
8
|
|
|
|
|
22
|
DEBUG_DNS and warn "DNS: $host is piggybacking on a pending lookup.\n"; |
1023
|
|
|
|
|
|
|
|
1024
|
|
|
|
|
|
|
# All requests for the same host share the same resolver ID. |
1025
|
|
|
|
|
|
|
# TODO - Although it should probably be keyed on host:port. |
1026
|
8
|
|
|
|
|
35
|
$request->[RQ_RESOLVER_ID] = $heap->{dns_requests}{$host}[0][RQ_RESOLVER_ID]; |
1027
|
|
|
|
|
|
|
|
1028
|
8
|
|
|
|
|
20
|
push @{$heap->{dns_requests}{$host}}, $request; |
|
8
|
|
|
|
|
46
|
|
1029
|
8
|
|
|
|
|
41
|
return; |
1030
|
|
|
|
|
|
|
} |
1031
|
|
|
|
|
|
|
|
1032
|
|
|
|
|
|
|
# New request. Start lookup. |
1033
|
19
|
|
|
|
|
1185
|
$heap->{dns_requests}{$host} = [ $request ]; |
1034
|
|
|
|
|
|
|
|
1035
|
19
|
|
|
|
|
271
|
$request->[RQ_RESOLVER_ID] = $self->[SF_RESOLVER]->resolve( |
1036
|
|
|
|
|
|
|
event => 'ka_dns_response', |
1037
|
|
|
|
|
|
|
host => $host, |
1038
|
|
|
|
|
|
|
service => $request->[RQ_PORT], |
1039
|
|
|
|
|
|
|
hints => { protocol => TCP_PROTO }, |
1040
|
|
|
|
|
|
|
); |
1041
|
|
|
|
|
|
|
|
1042
|
19
|
|
|
|
|
1540829
|
DEBUG_DNS and warn "DNS: looking up $host in the background.\n"; |
1043
|
|
|
|
|
|
|
} |
1044
|
|
|
|
|
|
|
|
1045
|
|
|
|
|
|
|
sub _ka_dns_response { |
1046
|
19
|
|
|
19
|
|
1944516
|
my ($self, $kernel, $heap, $response_error, $addresses, $request) = @_[ |
1047
|
|
|
|
|
|
|
OBJECT, KERNEL, HEAP, ARG0..ARG2 |
1048
|
|
|
|
|
|
|
]; |
1049
|
|
|
|
|
|
|
|
1050
|
|
|
|
|
|
|
# We've shut down. Nothing to do here. |
1051
|
19
|
50
|
|
|
|
149
|
return if $self->[SF_SHUTDOWN]; |
1052
|
|
|
|
|
|
|
|
1053
|
19
|
|
|
|
|
1173
|
my $request_address = $request->{host}; |
1054
|
19
|
|
|
|
|
99
|
my $requests = delete $heap->{dns_requests}{$request_address}; |
1055
|
|
|
|
|
|
|
|
1056
|
19
|
|
|
|
|
58
|
DEBUG_DNS and warn "DNS: got response for request address $request_address"; |
1057
|
|
|
|
|
|
|
|
1058
|
|
|
|
|
|
|
# Requests on record. |
1059
|
19
|
50
|
|
|
|
241
|
if (defined $requests) { |
1060
|
|
|
|
|
|
|
# We can receive responses for canceled requests. Ignore them: we |
1061
|
|
|
|
|
|
|
# cannot cancel PoCo::Client::DNS requests, so this is how we reap |
1062
|
|
|
|
|
|
|
# them when they're canceled. |
1063
|
19
|
50
|
|
|
|
109
|
if ($requests eq 'cancelled') { |
1064
|
0
|
|
|
|
|
0
|
DEBUG_DNS and warn "DNS: reaping cancelled request for $request_address"; |
1065
|
0
|
|
|
|
|
0
|
return; |
1066
|
|
|
|
|
|
|
} |
1067
|
19
|
50
|
|
|
|
119
|
unless (ref $requests eq 'ARRAY') { |
1068
|
0
|
|
|
|
|
0
|
die "DNS: got an unknown requests for $request_address: $requests"; |
1069
|
|
|
|
|
|
|
} |
1070
|
|
|
|
|
|
|
} |
1071
|
|
|
|
|
|
|
else { |
1072
|
0
|
|
|
|
|
0
|
die "DNS: Unexpectedly undefined requests for $request_address"; |
1073
|
|
|
|
|
|
|
} |
1074
|
|
|
|
|
|
|
|
1075
|
|
|
|
|
|
|
# This is an error. Cancel all requests for the address. |
1076
|
|
|
|
|
|
|
# Tell everybody that their requests failed. |
1077
|
19
|
100
|
|
|
|
79
|
if ($response_error) { |
1078
|
1
|
|
|
|
|
2
|
DEBUG_DNS and warn "DNS: resolver error = $response_error"; |
1079
|
1
|
|
|
|
|
6
|
foreach my $request (@$requests) { |
1080
|
1
|
|
|
|
|
6
|
_respond_with_error($request, "resolve", undef, $response_error), |
1081
|
|
|
|
|
|
|
} |
1082
|
1
|
|
|
|
|
6
|
return; |
1083
|
|
|
|
|
|
|
} |
1084
|
|
|
|
|
|
|
|
1085
|
18
|
|
|
|
|
42
|
DEBUG_DNS and warn "DNS: got a response"; |
1086
|
|
|
|
|
|
|
|
1087
|
|
|
|
|
|
|
# A response! |
1088
|
18
|
|
|
|
|
105
|
foreach my $address_rec (@$addresses) { |
1089
|
18
|
|
|
|
|
203
|
my $numeric = $self->[SF_RESOLVER]->unpack_addr($address_rec); |
1090
|
|
|
|
|
|
|
|
1091
|
18
|
|
|
|
|
1513
|
DEBUG_DNS and warn "DNS: $request_address resolves to $numeric"; |
1092
|
|
|
|
|
|
|
|
1093
|
18
|
|
|
|
|
69
|
foreach my $request (@$requests) { |
1094
|
|
|
|
|
|
|
# Don't bother continuing inactive requests. |
1095
|
26
|
50
|
|
|
|
1073
|
next unless $request->[RQ_ACTIVE]; |
1096
|
26
|
|
|
|
|
65
|
$request->[RQ_IP] = $numeric; |
1097
|
26
|
|
|
|
|
74
|
$request->[RQ_ADDR_FAM] = $address_rec->{family}; |
1098
|
26
|
|
|
|
|
140
|
$kernel->yield(ka_add_to_queue => $request); |
1099
|
|
|
|
|
|
|
} |
1100
|
|
|
|
|
|
|
|
1101
|
|
|
|
|
|
|
# Return after the first good answer. |
1102
|
18
|
|
|
|
|
7891
|
return; |
1103
|
|
|
|
|
|
|
} |
1104
|
|
|
|
|
|
|
|
1105
|
|
|
|
|
|
|
# Didn't return here. No address record for the host? |
1106
|
0
|
|
|
|
|
0
|
foreach my $request (@$requests) { |
1107
|
0
|
|
|
|
|
0
|
DEBUG_DNS and warn "DNS: $request_address does not resolve"; |
1108
|
0
|
|
|
|
|
0
|
_respond_with_error($request, "resolve", undef, "Host has no address."), |
1109
|
|
|
|
|
|
|
} |
1110
|
|
|
|
|
|
|
} |
1111
|
|
|
|
|
|
|
|
1112
|
|
|
|
|
|
|
|
1113
|
|
|
|
|
|
|
sub _ka_add_to_queue { |
1114
|
32
|
|
|
32
|
|
7222
|
my ($self, $kernel, $request) = @_[OBJECT, KERNEL, ARG0]; |
1115
|
|
|
|
|
|
|
|
1116
|
32
|
|
|
|
|
218
|
push @{ $self->[SF_QUEUE] }, $request; |
|
32
|
|
|
|
|
97
|
|
1117
|
|
|
|
|
|
|
|
1118
|
|
|
|
|
|
|
# If the queue has more than one request in it, then it already has |
1119
|
|
|
|
|
|
|
# a wakeup event pending. We don't need to send another one. |
1120
|
|
|
|
|
|
|
|
1121
|
32
|
100
|
|
|
|
123
|
return if @{$self->[SF_QUEUE]} > 1; |
|
32
|
|
|
|
|
169
|
|
1122
|
|
|
|
|
|
|
|
1123
|
|
|
|
|
|
|
# If the component's allocated socket count is maxed out, then it |
1124
|
|
|
|
|
|
|
# will check the queue when an existing socket is released. We |
1125
|
|
|
|
|
|
|
# don't need to wake it up here. |
1126
|
|
|
|
|
|
|
|
1127
|
22
|
100
|
|
|
|
56
|
return if keys(%{$self->[SF_USED]}) >= $self->[SF_MAX_OPEN]; |
|
22
|
|
|
|
|
393
|
|
1128
|
|
|
|
|
|
|
|
1129
|
|
|
|
|
|
|
# Likewise, we shouldn't awaken the session if there are no |
1130
|
|
|
|
|
|
|
# available slots for the given scheme/address/port triple. "|| 0" |
1131
|
|
|
|
|
|
|
# to avoid an undef error. |
1132
|
|
|
|
|
|
|
|
1133
|
21
|
|
|
|
|
61
|
my $conn_key = $request->[RQ_CONN_KEY]; |
1134
|
|
|
|
|
|
|
return if ( |
1135
|
21
|
50
|
50
|
|
|
273
|
($self->[SF_USED_EACH]{$conn_key} || 0) >= $self->[SF_MAX_HOST] |
1136
|
|
|
|
|
|
|
); |
1137
|
|
|
|
|
|
|
|
1138
|
|
|
|
|
|
|
# Wake the session up, and return nothing, signifying sound and fury |
1139
|
|
|
|
|
|
|
# yet to come. |
1140
|
21
|
|
|
|
|
38
|
DEBUG and warn "posting wakeup for $conn_key"; |
1141
|
21
|
|
|
|
|
134
|
$poe_kernel->post($self->[SF_ALIAS], "ka_wake_up"); |
1142
|
21
|
|
|
|
|
3493
|
return; |
1143
|
|
|
|
|
|
|
} |
1144
|
|
|
|
|
|
|
|
1145
|
|
|
|
|
|
|
# Remove a socket from the free pool, by the socket handle itself. |
1146
|
|
|
|
|
|
|
|
1147
|
|
|
|
|
|
|
sub _remove_socket_from_pool { |
1148
|
7
|
|
|
7
|
|
26
|
my ($self, $socket) = @_; |
1149
|
|
|
|
|
|
|
|
1150
|
7
|
|
|
|
|
42
|
my $socket_rec = delete $self->[SF_SOCKETS]{$socket}; |
1151
|
7
|
|
|
|
|
47
|
my $key = $socket_rec->[SK_KEY]; |
1152
|
|
|
|
|
|
|
|
1153
|
|
|
|
|
|
|
# Get the blessed version. |
1154
|
7
|
|
|
|
|
13
|
DEBUG and warn "removing socket for $key"; |
1155
|
7
|
|
|
|
|
37
|
$socket = delete $self->[SF_POOL]{$key}{$socket}; |
1156
|
|
|
|
|
|
|
|
1157
|
7
|
100
|
|
|
|
15
|
unless (keys %{$self->[SF_POOL]{$key}}) { |
|
7
|
|
|
|
|
63
|
|
1158
|
5
|
|
|
|
|
16
|
delete $self->[SF_POOL]{$key}; |
1159
|
|
|
|
|
|
|
} |
1160
|
|
|
|
|
|
|
|
1161
|
7
|
|
|
|
|
52
|
$poe_kernel->alarm_remove($socket_rec->[SK_TIMER]); |
1162
|
7
|
|
|
|
|
478
|
$poe_kernel->select_read($socket, undef); |
1163
|
|
|
|
|
|
|
|
1164
|
|
|
|
|
|
|
# Avoid common FIN_WAIT_2 issues. |
1165
|
|
|
|
|
|
|
# Commented out because fileno() will return true for closed |
1166
|
|
|
|
|
|
|
# sockets, which makes setsockopt() highly unhappy. Also, SO_LINGER |
1167
|
|
|
|
|
|
|
# will cause te socket closure to block, which is less than ideal. |
1168
|
|
|
|
|
|
|
# We need to revisit this another way, or just let sockets enter |
1169
|
|
|
|
|
|
|
# FIN_WAIT_2. |
1170
|
|
|
|
|
|
|
|
1171
|
|
|
|
|
|
|
# if (fileno $socket) { |
1172
|
|
|
|
|
|
|
# setsockopt($socket, SOL_SOCKET, SO_LINGER, pack("sll",1,0,0)) or die( |
1173
|
|
|
|
|
|
|
# "setsockopt: $!" |
1174
|
|
|
|
|
|
|
# ); |
1175
|
|
|
|
|
|
|
# } |
1176
|
|
|
|
|
|
|
} |
1177
|
|
|
|
|
|
|
|
1178
|
|
|
|
|
|
|
# Internal function. NOT AN EVENT HANDLER. |
1179
|
|
|
|
|
|
|
|
1180
|
|
|
|
|
|
|
sub _respond_with_error { |
1181
|
5
|
|
|
5
|
|
24
|
my ($request, $func, $num, $string) = @_; |
1182
|
5
|
|
|
|
|
75
|
_respond( |
1183
|
|
|
|
|
|
|
$request, |
1184
|
|
|
|
|
|
|
{ |
1185
|
|
|
|
|
|
|
connection => undef, |
1186
|
|
|
|
|
|
|
function => $func, |
1187
|
|
|
|
|
|
|
error_num => $num, |
1188
|
|
|
|
|
|
|
error_str => $string, |
1189
|
|
|
|
|
|
|
} |
1190
|
|
|
|
|
|
|
); |
1191
|
|
|
|
|
|
|
} |
1192
|
|
|
|
|
|
|
|
1193
|
|
|
|
|
|
|
sub _respond { |
1194
|
34
|
|
|
34
|
|
76
|
my ($request, $fields) = @_; |
1195
|
|
|
|
|
|
|
|
1196
|
|
|
|
|
|
|
# Bail out early if the request isn't active. |
1197
|
34
|
100
|
66
|
|
|
382
|
return unless $request->[RQ_ACTIVE] and $request->[RQ_SESSION]; |
1198
|
|
|
|
|
|
|
|
1199
|
33
|
|
|
|
|
985
|
$poe_kernel->post( |
1200
|
|
|
|
|
|
|
$request->[RQ_SESSION], |
1201
|
|
|
|
|
|
|
$request->[RQ_EVENT], |
1202
|
|
|
|
|
|
|
{ |
1203
|
|
|
|
|
|
|
addr => $request->[RQ_ADDRESS], |
1204
|
|
|
|
|
|
|
context => $request->[RQ_CONTEXT], |
1205
|
|
|
|
|
|
|
port => $request->[RQ_PORT], |
1206
|
|
|
|
|
|
|
scheme => $request->[RQ_SCHEME], |
1207
|
|
|
|
|
|
|
for_addr => $request->[RQ_FOR_ADDRESS], |
1208
|
|
|
|
|
|
|
for_scheme => $request->[RQ_FOR_SCHEME], |
1209
|
|
|
|
|
|
|
for_port => $request->[RQ_FOR_PORT], |
1210
|
|
|
|
|
|
|
%$fields, |
1211
|
|
|
|
|
|
|
} |
1212
|
|
|
|
|
|
|
); |
1213
|
|
|
|
|
|
|
|
1214
|
|
|
|
|
|
|
# Drop the extra refcount. |
1215
|
33
|
|
|
|
|
4778
|
$poe_kernel->refcount_decrement( |
1216
|
|
|
|
|
|
|
$request->[RQ_SESSION]->ID(), |
1217
|
|
|
|
|
|
|
"poco-client-keepalive" |
1218
|
|
|
|
|
|
|
); |
1219
|
|
|
|
|
|
|
|
1220
|
|
|
|
|
|
|
# Remove associated timer. |
1221
|
33
|
50
|
|
|
|
1651
|
if ($request->[RQ_TIMER_ID]) { |
1222
|
33
|
|
|
|
|
233
|
$poe_kernel->alarm_remove($request->[RQ_TIMER_ID]); |
1223
|
33
|
|
|
|
|
4735
|
$request->[RQ_TIMER_ID] = undef; |
1224
|
|
|
|
|
|
|
} |
1225
|
|
|
|
|
|
|
|
1226
|
|
|
|
|
|
|
# Deactivate the request. |
1227
|
33
|
|
|
|
|
134
|
$request->[RQ_ACTIVE] = undef; |
1228
|
|
|
|
|
|
|
} |
1229
|
|
|
|
|
|
|
|
1230
|
|
|
|
|
|
|
1; |
1231
|
|
|
|
|
|
|
|
1232
|
|
|
|
|
|
|
__END__ |