File Coverage

blib/lib/POE/Component/Resolver.pm
Criterion Covered Total %
statement 143 222 64.4
branch 37 94 39.3
condition 12 26 46.1
subroutine 22 33 66.6
pod 5 5 100.0
total 219 380 57.6


line stmt bran cond sub pod time code
1             package POE::Component::Resolver;
2             {
3             $POE::Component::Resolver::VERSION = '0.921';
4             }
5              
6 2     2   416351 use warnings;
  2         4  
  2         85  
7 2     2   10 use strict;
  2         4  
  2         73  
8              
9 2     2   12 use POE qw(Wheel::Run Filter::Reference);
  2         4  
  2         13  
10 2     2   81955 use Carp qw(croak carp);
  2         5  
  2         133  
11 2     2   13 use Time::HiRes qw(time);
  2         5  
  2         22  
12 2         159 use Socket qw(
13             unpack_sockaddr_in AF_INET AF_INET6
14             getnameinfo NI_NUMERICSERV NI_NUMERICHOST
15 2     2   426 );
  2         5  
16              
17 2     2   1587 use POE::Component::Resolver::Sidecar;
  2         6  
  2         61  
18              
19 2     2   13 use Exporter;
  2         4  
  2         84  
20 2     2   11 use base 'Exporter';
  2         5  
  2         250  
21             our (@EXPORT_OK) = qw(AF_INET AF_INET6);
22              
23             my $next_alias_index = "aaaaaaaa";
24              
25             # Determine Perl's location, per perldoc perlvar's treatment of $^X.
26              
27 2     2   14 use Config;
  2         3  
  2         6487  
28             my $perl_path = $Config{perlpath};
29             if ($^O ne 'VMS') {
30             $perl_path .= $Config{_exe} unless (
31             $perl_path =~ /$Config{_exe}$/i
32             );
33             }
34              
35             # Plain Perl constructor.
36              
37             sub new {
38 5     5 1 162816 my ($class, @args) = @_;
39              
40 5 50       32 croak "new() requires an even number of parameters" if @args % 2;
41 5         73 my %args = @args;
42              
43 5   50     30 my $max_resolvers = delete($args{max_resolvers}) || 8;
44 5   50     22 my $idle_timeout = delete($args{idle_timeout}) || 15;
45 5   50     39 my $debug = delete($args{debug}) || 0;
46 5         10 my $sidecar_program = delete($args{sidecar_program});
47              
48 5         10 my $af_order = delete($args{af_order});
49 5 100 66     57 if (defined $af_order and @$af_order) {
    50          
50 4 50       25 if (ref($af_order) eq "") {
    50          
51 0         0 $af_order = [ $af_order ];
52             }
53             elsif (ref($af_order) ne "ARRAY") {
54 0         0 croak "af_order must be a scalar or an array reference";
55             }
56              
57 4 100       12 my @illegal_afs = grep { ($_ ne AF_INET) && ($_ ne AF_INET6) } @$af_order;
  6         52  
58 4 50       15 croak "af_order may only contain AF_INET and/or AF_INET6" if @illegal_afs;
59             }
60             elsif (exists $ENV{POCO_RESOLVER_IPV}) {
61 0         0 my %number_to_address_family = ( 4 => AF_INET, 6 => AF_INET6 );
62 0         0 $af_order = [
63 0         0 map { $number_to_address_family{$_} }
64             ($ENV{POCO_RESOLVER_IPV} =~ m/([46])/g)
65             ];
66             }
67              
68 5 100 66     32 unless ($af_order and @$af_order) {
69             # Default to IPv4 preference for backward compatibility.
70 1         3 $af_order = [ AF_INET, AF_INET6 ];
71             }
72              
73 5         24 my @error = sort keys %args;
74 5 50       16 croak "unknown new() parameter(s): @error" if @error;
75              
76 5 50 33     18 unless (defined $sidecar_program and length $sidecar_program) {
77 5 50       97 if ($^O eq "MSWin32") {
78 0         0 $sidecar_program = \&POE::Component::Resolver::Sidecar::main;
79             }
80             else {
81 55         141 $sidecar_program = [
82             $perl_path,
83 5         14 (map { "-I$_" } @INC),
84             '-MPOE::Component::Resolver::Sidecar',
85             '-e', 'POE::Component::Resolver::Sidecar->main()'
86             ];
87             }
88             }
89              
90 5         74 my $self = bless {
91             alias => "poe_component_resolver_" . $next_alias_index++,
92             debug => $debug,
93             }, $class;
94              
95             POE::Session->create(
96             inline_states => {
97             _start => \&_poe_start,
98             _stop => \&_poe_stop,
99 0     0   0 _parent => sub { undef }, # for ASSERT_DEFAULT
100 0     0   0 _child => sub { undef }, # for ASSERT_DEFAULT
101 5         217 request => \&_poe_request,
102             shutdown => \&_poe_shutdown,
103             cancel => \&_poe_cancel,
104             sidecar_closed => \&_poe_sidecar_closed,
105             sidecar_error => \&_poe_sidecar_error,
106             sidecar_response => \&_poe_sidecar_response,
107             sidecar_signal => \&_poe_sidecar_signal,
108             sidecar_eject => \&_poe_sidecar_eject,
109             sidecar_attach => \&_poe_sidecar_attach,
110             },
111             heap => {
112             af_order => $af_order,
113             alias => $self->{alias},
114             idle_timeout => $idle_timeout,
115             last_request_id => 0,
116             max_resolvers => $max_resolvers,
117             requests => { },
118             sidecar_ring => [ ],
119             sidecar_program => $sidecar_program,
120             debug => $debug,
121             }
122             );
123              
124 5         5723 return $self;
125             }
126              
127             sub DESTROY {
128 5     5   1574 my $self = shift;
129              
130             # Can't resolve the session: it must already be gone.
131 5 50       25 return unless $poe_kernel->alias_resolve($self->{alias});
132              
133 0 0       0 carp " destroying $self->{alias}" if $self->{debug};
134              
135 0         0 $poe_kernel->call($self->{alias}, "shutdown");
136             }
137              
138             sub _poe_stop {
139 5     5   2645 my $heap = $_[HEAP];
140 5 50       31 carp " stopping $heap->{alias}" if $heap->{debug};
141             }
142              
143             sub shutdown {
144 0     0 1 0 my $self = shift;
145              
146             # Can't resolve the session: it must already be gone.
147 0 0       0 return unless $poe_kernel->alias_resolve($self->{alias});
148              
149 0 0       0 carp " got shutdown request for $self->{alias}" if $self->{debug};
150              
151 0         0 $poe_kernel->call($self->{alias}, "shutdown");
152             }
153              
154             # Internal POE event handler to release all resources owned by the
155             # hidden POE::Session and then shut it down. It's an event handler so
156             # that this code can run "within" the POE::Session.
157              
158             sub _poe_shutdown {
159 0     0   0 my ($kernel, $heap) = @_[KERNEL, HEAP];
160              
161 0         0 $heap->{shutdown} = 1;
162              
163 0         0 $kernel->alias_remove($heap->{alias});
164              
165 0         0 _poe_wipe_sidecars($heap);
166              
167 0         0 foreach my $request (values %{$heap->{requests}}) {
  0         0  
168 0         0 $kernel->post(
169             $request->{sender},
170             $request->{event},
171             'component shut down',
172             [ ],
173 0         0 { map { $_ => $request->{$_} } qw(host service misc) },
174             );
175              
176 0 0       0 warn " $heap->{alias} --refcount for sender $request->{sender}" if (
177             $heap->{debug}
178             );
179              
180 0         0 $kernel->refcount_decrement($request->{sender}, __PACKAGE__);
181             }
182              
183 0         0 $heap->{requests} = {};
184              
185             # No more sidecars to eject.
186 0         0 $kernel->delay(sidecar_eject => undef);
187             }
188              
189             # POE event handler to accept a request from some other session. The
190             # public Perl resolve() method forwards into this. This runs "within"
191             # the session so the resources it creates are properly owned.
192              
193             sub _poe_request {
194 4     4   2488 my ($kernel, $heap, $host, $service, $hints, $event, $misc) = @_[
195             KERNEL, HEAP, ARG0..ARG4
196             ];
197              
198 4 50       20 return if $heap->{shutdown};
199              
200 4         12 my $request_id = ++$heap->{last_request_id};
201 4         20 my $sender_id = $_[SENDER]->ID();
202              
203 4 50       37 warn " $heap->{alias} ++refcount for sender $sender_id" if (
204             $heap->{debug}
205             );
206              
207 4         36 $kernel->refcount_increment($sender_id, __PACKAGE__);
208              
209 4         178 _poe_setup_sidecar_ring($kernel, $heap);
210              
211 4         15 my $next_sidecar = pop @{$heap->{sidecar_ring}};
  4         64  
212 4         15 unshift @{$heap->{sidecar_ring}}, $next_sidecar;
  4         9  
213              
214 4         135 $next_sidecar->put( [ $request_id, $host, $service, $hints ] );
215              
216 4         1814 $heap->{requests}{$request_id} = {
217             begin => time(),
218             host => $host,
219             service => $service,
220             hints => $hints,
221             sender => $sender_id,
222             event => $event,
223             misc => $misc,
224             sidecar_id => $next_sidecar->ID(),
225             };
226              
227             # No ejecting until we're done.
228 4         347 $kernel->delay(sidecar_eject => undef);
229              
230 4         1097 return $request_id;
231             }
232              
233             # The user wishes to cancel a DNS request that may still be in
234             # progress. This can happen in places like PoCo::Client::HTTP when
235             # the HTTP request times out before the DNS request is done.
236             #
237             # The public cancel() API forwards the cancelation request into the
238             # POE::Session managing requests via POE::Kernel's call() method.
239              
240             sub cancel {
241 0     0 1 0 my ($self, $request_id) = @_;
242 0         0 return $poe_kernel->call($self->{alias}, "cancel", $request_id);
243             }
244              
245             # The inside-POE cancelation code. It must run within POE so that the
246             # proper resources are removed from the correct session.
247              
248             sub _poe_cancel {
249 0     0   0 my ($kernel, $heap, $request_id) = @_[KERNEL, HEAP, ARG0];
250              
251 0 0       0 return unless exists $heap->{requests}{$request_id};
252              
253 0         0 my $request = $heap->{requests}{$request_id};
254 0         0 _sidecar_cleanup($kernel, $heap, $request->{sidecar_id});
255             }
256              
257             # POE _start handler. Initialize the session and start sidecar
258             # processes, which are owned and managed by that session.
259              
260             sub _poe_start {
261 5     5   1860 my ($kernel, $heap) = @_[KERNEL, HEAP];
262              
263 5 50       19 carp " starting $heap->{alias}" if $heap->{debug};
264              
265 5         39 $kernel->alias_set($heap->{alias});
266              
267             #_poe_setup_sidecar_ring($kernel, $heap);
268              
269 5         224 undef;
270             }
271              
272             # Internal helper sub. Make sure the apprpriate number of sidecar
273             # resolvers are running at any given time.
274              
275             sub _poe_setup_sidecar_ring {
276 4     4   8 my ($kernel, $heap) = @_;
277              
278 4 50       20 return if $heap->{shutdown};
279              
280 4         7 while (scalar(keys %{$heap->{sidecar}}) < $heap->{max_resolvers}) {
  8         1191  
281 4         78 my $sidecar = POE::Wheel::Run->new(
282             StdioFilter => POE::Filter::Reference->new(),
283             StdoutEvent => 'sidecar_response',
284             StderrEvent => 'sidecar_error',
285             CloseEvent => 'sidecar_closed',
286             Program => $heap->{sidecar_program},
287             );
288              
289 4         58042 $heap->{sidecar}{$sidecar->PID} = $sidecar;
290 4         284 $heap->{sidecar_id}{$sidecar->ID} = $sidecar;
291 4         34 push @{$heap->{sidecar_ring}}, $sidecar;
  4         39  
292              
293 4         22 $kernel->sig_child($sidecar->PID(), "sidecar_signal");
294             }
295             }
296              
297             # Internal helper sub to replay pending requests when their associated
298             # sidecars are destroyed.
299              
300             sub _poe_replay_pending {
301 0     0   0 my ($kernel, $heap) = @_;
302              
303 0         0 while (my ($request_id, $request) = each %{$heap->{requests}}) {
  0         0  
304              
305             # This request is riding in an existing sidecar.
306             # No need to replay it.
307 0 0       0 next if exists $heap->{sidecar_id}{$request->{sidecar_id}};
308              
309             # Give the request to a new sidecar.
310 0         0 my $next_sidecar = pop @{$heap->{sidecar_ring}};
  0         0  
311 0         0 unshift @{$heap->{sidecar_ring}}, $next_sidecar;
  0         0  
312              
313 0         0 $request->{sidecar_id} = $next_sidecar->ID();
314              
315 0         0 $next_sidecar->put(
316             [
317             $request_id, $request->{host}, $request->{service}, $request->{hints}
318             ]
319             );
320             }
321             }
322              
323             # Internal event handler to briefly defer replaying requests until any
324             # responses in the queue have had time to be delivered. This prevents
325             # us from replaying requests that may already have answers.
326              
327             sub _poe_sidecar_attach {
328 0     0   0 my ($kernel, $heap) = @_[KERNEL, HEAP];
329              
330             # Nothing to do if we don't have requests.
331 0 0       0 return unless scalar keys %{$heap->{requests}};
  0         0  
332              
333             # Requests exist.
334 0         0 _poe_setup_sidecar_ring($kernel, $heap);
335 0         0 _poe_replay_pending($kernel, $heap);
336             }
337              
338             # Plain public Perl method. Begin resolving something.
339              
340             sub resolve {
341 4     4 1 4658 my ($self, @args) = @_;
342              
343 4 50       23 croak "resolve() requires an even number of parameters" if @args % 2;
344 4         18 my %args = @args;
345              
346 4         97 my $host = delete $args{host};
347 4 50 33     38 croak "resolve() requires a host" unless defined $host and length $host;
348              
349 4         13 my $service = delete $args{service};
350 4 50 33     38 croak "resolve() requires a service" unless (
351             defined $service and length $service
352             );
353              
354 4         9 my $misc = delete $args{misc};
355 4 50       13 $misc = "" unless defined $misc;
356              
357 4         9 my $hints = delete $args{hints};
358 4   50     11 $hints ||= { };
359              
360 4         6 my $event = delete $args{event};
361 4 50 33     16 $event = "resolver_response" unless defined $event and length $event;
362              
363 4         17 my @error = sort keys %args;
364 4 50       11 croak "unknown resolve() parameter(s): @error" if @error;
365              
366 4         7 my $result;
367 4 50       37 croak "resolve() on shutdown resolver" unless (
368             $result = $poe_kernel->call(
369             $self->{alias}, "request", $host, $service, $hints, $event, $misc
370             )
371             );
372              
373 4 50       254 carp " $self->{alias} request for host($host) service($service)" if (
374             $self->{debug}
375             );
376              
377 4         109 return $result;
378             }
379              
380             # A sidecar process has produced an error or warning. Pass it
381             # through to the parent process' console.
382              
383             sub _poe_sidecar_error {
384 0     0   0 warn __PACKAGE__, " error in getaddrinfo subprocess: $_[ARG0]\n";
385             }
386              
387             # A sidecar process has closed its standard output. We will receive
388             # no more responses from it. Clean up the sidecar's resources, and
389             # start new ones if necessary.
390              
391             sub _poe_sidecar_closed {
392 0     0   0 my ($kernel, $heap, $wheel_id) = @_[KERNEL, HEAP, ARG0];
393              
394             # Don't bother checking for pending requests if we've shut down.
395 0 0       0 return if $heap->{shutdown};
396              
397 0         0 _sidecar_cleanup($kernel, $heap, $wheel_id);
398             }
399              
400             sub _sidecar_cleanup {
401 0     0   0 my ($kernel, $heap, $wheel_id) = @_;
402              
403 0         0 my $sidecar = delete $heap->{sidecar_id}{$wheel_id};
404 0 0       0 if (defined $sidecar) {
405 0         0 $sidecar->kill(9);
406 0         0 delete $heap->{sidecar}{$sidecar->PID()};
407             }
408              
409             # Remove the sidecar from the rotation.
410 0         0 my $i = @{$heap->{sidecar_ring}};
  0         0  
411 0         0 while ($i--) {
412 0 0       0 next unless $heap->{sidecar_ring}[$i]->ID() == $wheel_id;
413 0         0 splice(@{$heap->{sidecar_ring}}, $i, 1);
  0         0  
414 0         0 last;
415             }
416              
417 0         0 _poe_setup_sidecar_ring($kernel, $heap);
418 0 0       0 _poe_replay_pending($kernel, $heap) if scalar keys %{$heap->{requests}};
  0         0  
419             }
420              
421             # A sidecar has produced a response. Pass it back to the original
422             # caller of resolve(). If we've run out of requests, briefly defer a
423             # partial shutdown. We don't need all those sidecar processes if we
424             # might be done.
425              
426             sub _poe_sidecar_response {
427 4     4   109871 my ($kernel, $heap, $response_rec) = @_[KERNEL, HEAP, ARG0];
428 4         16 my ($request_id, $error, $addresses) = @$response_rec;
429              
430 4         19 my $request_rec = delete $heap->{requests}{$request_id};
431 4 50       19 return unless defined $request_rec;
432              
433 4 50       15 if (defined $heap->{af_order}) {
434 4         8 my @filtered_addresses;
435 4         7 foreach my $af_filter (@{$heap->{af_order}}) {
  4         13  
436 6         17 push @filtered_addresses, grep { $_->{family} == $af_filter } @$addresses;
  12         37  
437             }
438 4         13 $addresses = \@filtered_addresses;
439             }
440              
441             $kernel->post(
442 12         63 $request_rec->{sender}, $request_rec->{event},
443             $error, $addresses,
444 4         13 { map { $_ => $request_rec->{$_} } qw(host service misc) },
445             );
446              
447 4 50       579 warn " $heap->{alias} --refcount for sender $request_rec->{sender}" if (
448             $heap->{debug}
449             );
450              
451 4         20 $kernel->refcount_decrement($request_rec->{sender}, __PACKAGE__);
452              
453             # No more requests? Consder detaching sidecar.
454 4         32 $kernel->delay(sidecar_eject => $heap->{idle_timeout}) unless (
455 4 50       129 scalar keys %{$heap->{requests}}
456             );
457             }
458              
459             # A sidecar process has exited. Clean up its resources, and attach a
460             # replacement sidecar if there are requests.
461              
462             sub _poe_sidecar_signal {
463 4     4   7635 my ($heap, $pid) = @_[HEAP, ARG1];
464              
465 4 50       32 return unless exists $heap->{sidecar}{$pid};
466 0         0 my $sidecar = delete $heap->{sidecar}{$pid};
467 0         0 my $sidecar_id = $sidecar->ID();
468 0         0 delete $heap->{sidecar_id}{$sidecar_id};
469              
470             # Remove the sidecar from the rotation.
471 0         0 my $i = @{$heap->{sidecar_ring}};
  0         0  
472 0         0 while ($i--) {
473 0 0       0 next unless $heap->{sidecar_ring}[$i]->ID() == $sidecar_id;
474 0         0 splice(@{$heap->{sidecar_ring}}, 1, 1);
  0         0  
475 0         0 last;
476             }
477              
478 0 0       0 $_[KERNEL]->yield("sidecar_attach") if scalar keys %{$heap->{requests}};
  0         0  
479              
480 0         0 undef;
481             }
482              
483             # Event handler to defer wiping out all sidecars. This allows for
484             # lazy cleanup, which may eliminate thrashing in some situations.
485              
486             sub _poe_sidecar_eject {
487 4     4   992962 my ($kernel, $heap) = @_[KERNEL, HEAP];
488 4 50       10 _poe_wipe_sidecars($heap) unless scalar keys %{$heap->{requests}};
  4         36  
489             }
490              
491             # Internal helper sub to synchronously wipe out all sidecars.
492              
493             sub _poe_wipe_sidecars {
494 4     4   14 my $heap = shift;
495              
496 4 50       10 return unless @{$heap->{sidecar_ring}};
  4         23  
497              
498 4         8 foreach my $sidecar (@{$heap->{sidecar_ring}}) {
  4         17  
499 4         36 $sidecar->kill(-9);
500             }
501              
502 4         1307 $heap->{sidecar} = {};
503 4         26 $heap->{sidecar_id} = {};
504 4         25 $heap->{sidecar_ring} = [];
505             }
506              
507             sub unpack_addr {
508 6     6 1 1338 my ($self, $address_rec) = @_;
509              
510             # [rt.cpan.org 76314] Untaint the address.
511 6         34 my ($input_addr) = ($address_rec->{addr} =~ /\A(.*)\z/s);
512              
513 6         81 my ($error, $address, $port) = (
514             (getnameinfo $input_addr, NI_NUMERICHOST | NI_NUMERICSERV)[0,1]
515             );
516              
517 6 50       22 return if $error;
518 6 50       13 return($address, $port) if wantarray();
519 6         47 return $address;
520             }
521              
522             1;
523              
524             __END__