File Coverage

blib/lib/Net/BitTorrent/DHT.pm
Criterion Covered Total %
statement 25 27 92.5
branch n/a
condition n/a
subroutine 9 9 100.0
pod n/a
total 34 36 94.4


line stmt bran cond sub pod time code
1             package Net::BitTorrent::DHT;
2 2     2   13706 use Moose;
  2         958586  
  2         20  
3 2     2   14933 use Moose::Util;
  2         7  
  2         22  
4 2     2   2187 use AnyEvent;
  2         6345  
  2         75  
5 2     2   2332 use AnyEvent::Socket qw[];
  2         62199  
  2         73  
6 2     2   2157 use AnyEvent::HTTP;
  2         37813  
  2         253  
7 2         1326 use Socket qw[/SOCK_/ /F_INET/ inet_aton /sockaddr_in/ inet_ntoa
8             SOL_SOCKET SO_REUSEADDR
9 2     2   25 ];
  2         3  
10 2     2   2102 use Net::BitTorrent::Protocol qw[:bencode :compact];
  2         39490  
  2         455  
11 2     2   1138 use Bit::Vector;
  2         5402  
  2         131  
12 2     2   1355 use Net::BitTorrent::DHT::Node;
  0            
  0            
13             use Net::BitTorrent::DHT::RoutingTable;
14             use 5.10.0;
15             our $VERSION = 'v1.0.2';
16             eval $VERSION;
17              
18             # Stub
19             sub BUILD {1}
20             after 'BUILD' => sub {
21             my ($s, $a) = @_;
22              
23             # Hey! Open up!
24             $s->udp6;
25             $s->udp4;
26             };
27             #
28             for my $type (qw[requests replies]) {
29             for my $var (qw[count length]) {
30             my $attr = join '_', '', 'recv_invalid', $var;
31             has $attr => (isa => 'Int',
32             is => 'ro',
33             init_arg => undef,
34             traits => ['Counter'],
35             handles => {'_inc' . $attr => 'inc'},
36             default => 0
37             );
38             for my $dir (qw[recv send]) {
39             my $attr = join '_', '', $dir, $type, $var;
40             has $attr => (isa => 'Int',
41             is => 'ro',
42             init_arg => undef,
43             traits => ['Counter'],
44             handles => {'_inc' . $attr => 'inc'},
45             default => 0
46             );
47             }
48             }
49             }
50             has nodeid => (isa => 'Bit::Vector',
51             is => 'ro',
52             builder => '_build_nodeid'
53             );
54              
55             sub _build_nodeid {
56             my $s = shift;
57              
58             # TODO: Base on DHT security extention: http://libtorrent.org/dht_sec.html
59             AnyEvent::HTTP::http_get(
60             'http://icanhazip.com',
61             sub {
62             chomp $_[0];
63             $s->nodeid->from_Hex(
64             unpack 'H*', join '',
65             AnyEvent::Socket::parse_address($_[0]), # Ext ipv4 address
66             (map { chr rand 16 } 1 .. 16)
67             );
68             }
69             );
70              
71             # alt services:
72             # myip.dnsomatic.com
73             # ipecho.net/plain
74             # ipv4.icanhazip.com
75             # bot.whatismyipaddress.com
76             # www.myip.ru
77             return Bit::Vector->new(160);
78             }
79             #
80             sub send {
81             my ($s, $node, $packet, $reply) = @_;
82             if (defined $s->ip_filter) {
83             my $rule = $s->ip_filter->is_banned($node->host);
84             if (defined $rule) {
85             $s->trigger_ip_filter(
86             {protocol => ($node->ipv6 ? 'udp6' : 'udp4'),
87             severity => 'debug',
88             event => 'ip_filter',
89             address => [$node->host, $node->port],
90             rule => $rule,
91             message => 'Outgoing data was blocked by ipfilter'
92             }
93             );
94             return $s->routing_table->del_node($node);
95             }
96             }
97             my $sock
98             = $node->ipv6 && $s->has_udp6_sock ? $s->udp6_sock
99             : $s->has_udp4_sock ? $s->udp4_sock
100             : ();
101             my $sent = $sock ? send $sock, $packet, 0, $node->sockaddr : return;
102             if ($reply) {
103             $s->_inc_send_replies_count;
104             $s->_inc_send_replies_length($sent);
105             }
106             else {
107             $s->_inc_send_requests_count;
108             $s->_inc_send_requests_length($sent);
109             }
110             return $sent;
111             }
112             #
113             has ipv4_routing_table => (isa => 'Net::BitTorrent::DHT::RoutingTable',
114             is => 'ro',
115             lazy_build => 1,
116             handles => {
117             ipv4_add_node => 'add_node',
118             ipv4_buckets => 'buckets'
119             }
120             );
121             has ipv6_routing_table => (isa => 'Net::BitTorrent::DHT::RoutingTable',
122             is => 'ro',
123             lazy_build => 1,
124             handles => {
125             ipv6_add_node => 'add_node',
126             ipv6_buckets => 'buckets'
127             }
128             );
129              
130             sub _build_ipv4_routing_table {
131             Net::BitTorrent::DHT::RoutingTable->new(dht => shift);
132             }
133              
134             sub _build_ipv6_routing_table {
135             Net::BitTorrent::DHT::RoutingTable->new(dht => shift);
136             }
137              
138             sub add_node {
139             my ($s, $n) = @_;
140             AnyEvent::Socket::resolve_sockaddr(
141             $n->[0],
142             $n->[1],
143             0, undef, undef,
144             sub {
145             my $sockaddr = $_[0]->[3];
146             return if !$sockaddr;
147             $n
148             = blessed $n ? $n
149             : Net::BitTorrent::DHT::Node->new(
150             host => $n->[0],
151             port => $n->[1],
152             sockaddr => $sockaddr,
153             routing_table => (
154             length $sockaddr == 28 ? $s->ipv6_routing_table
155             : $s->ipv4_routing_table
156             )
157             );
158             ($n->ipv6 ?
159             $s->ipv6_routing_table->add_node($n)
160             : $s->ipv4_routing_table->add_node($n)
161             )->find_node($s->nodeid)
162             if !$s->nodeid->is_empty;
163             }
164             );
165             }
166             after 'BUILD' => sub {
167             my ($self, $args) = @_;
168             return if !defined $args->{'boot_nodes'};
169             $self->add_node($_) for @{$args->{'boot_nodes'}};
170             };
171             #
172             for my $type (qw[get_peers announce_peer find_node]) {
173             has "_${type}_quests" => (isa => 'ArrayRef[Ref]',
174             is => 'ro',
175             init_arg => undef,
176             traits => ['Array'],
177             handles => {
178             "add_${type}_quest" => 'push',
179             "${type}_quests" => 'elements',
180             "get_${type}_quest" => 'get',
181             "grep_${type}_quests" => 'grep',
182             "map_${type}_quests" => 'map'
183             },
184             default => sub { [] }
185             );
186             after "add_${type}_quest" => sub {
187             Scalar::Util::weaken $_[0]->{"_${type}_quests"}->[-1];
188             };
189             }
190             #
191             sub get_peers {
192             my ($self, $infohash, $code) = @_;
193             Scalar::Util::weaken $self;
194             my $quest = [
195             $infohash,
196             $code,
197             [],
198             AE::timer(
199             0,
200             0.25 * 60,
201             sub {
202             return if !$self;
203             for my $rt ($self->ipv6_routing_table,
204             $self->ipv4_routing_table)
205             { for my $node (@{$rt->nearest_bucket($infohash)->nodes}) {
206             $node->get_peers($infohash);
207             }
208             }
209             }
210             )
211             ];
212             $self->add_get_peers_quest($quest);
213             return $quest;
214             }
215              
216             sub announce_peer {
217             my ($self, $infohash, $port, $code) = @_;
218             Scalar::Util::weaken $self;
219             my $quest = [
220             $infohash,
221             $code, $port,
222             [],
223             AE::timer(
224             10,
225             0.25 * 60,
226             sub {
227             return if !$self;
228             for my $rt ($self->ipv6_routing_table,
229             $self->ipv4_routing_table)
230             { for my $node (@{$rt->nearest_bucket($infohash)->nodes}) {
231             $node->announce_peer($infohash, $port);
232             }
233             }
234             }
235             )
236             ];
237             $self->add_announce_peer_quest($quest);
238             return $quest;
239             }
240              
241             sub find_node {
242             my ($self, $target, $code) = @_;
243             Scalar::Util::weaken $self;
244             my $quest = [
245             $target, $code,
246             [],
247             AE::timer(
248             0,
249             0.25 * 60,
250             sub {
251             return if !$self;
252             for my $rt ($self->ipv6_routing_table,
253             $self->ipv4_routing_table)
254             { for my $node (@{$rt->nearest_bucket($target)->nodes}) {
255             $node->find_node($target);
256             }
257             }
258             }
259             )
260             ];
261             $self->add_find_node_quest($quest);
262             return $quest;
263             }
264             #
265             sub _on_udp6_in {
266             my ($self, $sock, $sockaddr, $host, $port, $data, $flags) = @_;
267             my $packet = bdecode $data;
268             if ( !$packet
269             || !ref $packet
270             || ref $packet ne 'HASH'
271             || !keys %$packet)
272             { $self->_inc_recv_invalid_count;
273             $self->_inc_recv_invalid_length(length $data);
274             return;
275             }
276             my $node = $self->ipv6_routing_table->find_node_by_sockaddr($sockaddr);
277             if (!defined $node) {
278             $node =
279             Net::BitTorrent::DHT::Node->new(
280             host => $host,
281             port => $port,
282             routing_table => $self->ipv6_routing_table,
283             sockaddr => $sockaddr
284             );
285             }
286             }
287              
288             sub _on_udp4_in {
289             my ($self, $sock, $sockaddr, $host, $port, $data, $flags) = @_;
290             my $packet = bdecode $data;
291             if ( !$packet
292             || !ref $packet
293             || ref $packet ne 'HASH'
294             || !keys %$packet
295             || !defined $packet->{'y'})
296             { $self->_inc_recv_invalid_count;
297             $self->_inc_recv_invalid_length(length $data);
298             return;
299             }
300             my $node = $self->ipv4_routing_table->find_node_by_sockaddr($sockaddr);
301             if (!defined $node) {
302             $node =
303             Net::BitTorrent::DHT::Node->new(
304             host => $host,
305             port => $port,
306             routing_table => $self->ipv4_routing_table,
307             sockaddr => $sockaddr
308             );
309             }
310              
311             # Basic identity checks
312             # TODO - if v is set, make sure it matches
313             # - make note of changes in nodeid/sockaddr combinations
314             return $node->routing_table->del_node($node)
315             if $node->has_nodeid # Wait, this is me!
316             && ($node->nodeid->Lexicompare($self->nodeid) == 0);
317             $node->touch;
318             #
319             if ($packet->{'y'} eq 'r') {
320             if (defined $packet->{'r'}) {
321             if ($node->is_expecting($packet->{'t'})) {
322             $self->_inc_recv_replies_count;
323             $self->_inc_recv_replies_length(length $data);
324             $node->_v($packet->{'v'})
325             if !$node->has_v && defined $packet->{'v'};
326             my $req = $node->del_request($packet->{'t'}); # For future ref
327             $req->{'cb'}->($packet, $host, $port)
328             if defined $req->{'cb'};
329             my $type = $req->{'type'};
330             $node->_set_nodeid(Bit::Vector->new_Hex(
331             160, unpack 'H*', $packet->{'r'}{'id'}
332             )
333             ) if !$node->has_nodeid; # Adds node to router table
334             if ($type eq 'ping') {
335             }
336             elsif ($type eq 'find_node') {
337             my ($quest) = $self->grep_find_node_quests(
338             sub {
339             defined $_
340             && $req->{'target'}->equal($_->[0]);
341             }
342             );
343             return if !defined $quest;
344             my @nodes
345             = map { uncompact_ipv4($_) }
346             ref $packet->{'r'}{'nodes'}
347             ?
348             @{$packet->{'r'}{'nodes'}}
349             : $packet->{'r'}{'nodes'};
350             {
351             my %seen = ();
352             @{$quest->[2]}
353             = grep { !$seen{$_->[0]}{$_->[1]}++ }
354             @{$quest->[2]}, @nodes;
355             }
356             $self->ipv4_add_node($_) for @nodes;
357             $quest->[1]->($quest->[0], $node, \@nodes);
358             }
359             elsif ($type eq 'get_peers') {
360              
361             # TODO - store token by id
362             if (!( defined $packet->{'r'}{'nodes'}
363             || defined $packet->{'r'}{'values'}
364             )
365             )
366             { # Malformed packet
367             die '...';
368             }
369             if (defined $packet->{'r'}{'nodes'}) {
370             for my $new_node ( # XXX - may be ipv6
371             uncompact_ipv4($packet->{'r'}{'nodes'})
372             )
373             { $new_node = $self->ipv4_add_node($new_node);
374             $new_node->get_peers($req->{'info_hash'})
375             if $new_node;
376             }
377             if (defined $packet->{'r'}{'values'}) { # peers
378             my ($quest) = $self->grep_get_peers_quests(
379             sub {
380             defined $_
381             && $req->{'info_hash'}
382             ->equal($_->[0]);
383             }
384             );
385             return if !defined $quest;
386             my @peers
387             = map { uncompact_ipv4($_) }
388             ref $packet->{'r'}{'values'}
389             ?
390             @{$packet->{'r'}{'values'}}
391             : $packet->{'r'}{'values'};
392             {
393             my %seen = ();
394             @{$quest->[2]}
395             = grep { !$seen{$_->[0]}{$_->[1]}++ }
396             @{$quest->[2]}, @peers;
397             }
398             $quest->[1]
399             ->($req->{'info_hash'}, $node, \@peers);
400             }
401             if (defined $packet->{'r'}{'token'})
402             { # for announce_peer
403             $node->_set_announce_peer_token_in(
404             $req->{'info_hash'}->to_Hex,
405             $packet->{'r'}{'token'});
406             }
407             }
408             }
409             elsif ($type eq 'announce_peer') {
410             my ($quest) = $self->grep_announce_peer_quests(
411             sub {
412             defined $_
413             && $req->{'info_hash'}->equal($_->[0]);
414             }
415             );
416             return if !defined $quest;
417             push @{$quest->[3]}, [$node->host, $node->port];
418             $quest->[1]->($req->{'info_hash'}, $node, $quest->[2]);
419             $node->get_prev_get_peers(0)
420             if # seek peers sooner than we should
421             $node->defined_prev_get_peers($req->{'info_hash'});
422             }
423             else {
424             warn sprintf '%s:%d', $node->host, $node->port;
425              
426             #ddx $packet;
427             #ddx $req;
428             #...;
429             }
430             }
431             else { # A reply we are not expecting. Strange.
432             $node->inc_fail;
433             $self->_inc_recv_invalid_count;
434             $self->_inc_recv_invalid_length(length $data);
435              
436             #...;
437             }
438             }
439             }
440             elsif ($packet->{'y'} eq 'q' && defined $packet->{'a'}) {
441             $self->_inc_recv_requests_count;
442             $self->_inc_recv_requests_length(length $data);
443             my $type = $packet->{'q'};
444             $node->_set_nodeid(
445             Bit::Vector->new_Hex(160, unpack 'H*', $packet->{'a'}{'id'}))
446             if !$node->has_nodeid; # Adds node to router table
447             if ($type eq 'ping' && defined $packet->{'t'}) {
448             return $node->_reply_ping($packet->{'t'});
449             }
450             elsif ($type eq 'get_peers'
451             && defined $packet->{'a'}{'info_hash'})
452             { return
453             $node->_reply_get_peers(
454             $packet->{'t'},
455             Bit::Vector->new_Hex(160, unpack 'H*',
456             $packet->{'a'}{'info_hash'}
457             )
458             );
459             }
460             elsif ($type eq 'find_node'
461             && defined $packet->{'a'}{'target'})
462             { return
463             $node->_reply_find_node(
464             $packet->{'t'},
465             Bit::Vector->new_Hex(160, unpack 'H*',
466             $packet->{'a'}{'target'}
467             )
468             );
469             }
470             elsif ($type eq 'announce_peer'
471             && defined $packet->{'a'}{'info_hash'})
472             { return
473             $node->_reply_announce_peer(
474             $packet->{'t'},
475             Bit::Vector->new_Hex(160, unpack 'H*',
476             $packet->{'a'}{'info_hash'}
477             ),
478             $packet->{'a'},
479             );
480             }
481             else {
482             die '...';
483             }
484             }
485             elsif ($packet->{'y'} eq 'q' && defined $packet->{'a'}) {
486             warn sprintf 'Error from %s:%d', $node->host, $node->port;
487              
488             #use Data::Dump;
489             #ddx $packet;
490             }
491             else {
492             #use Data::Dump;
493             #warn sprintf '%s:%d', $node->host, $node->port;
494             #ddx $packet;
495             #ddx $data;
496             #...;
497             # TODO: ID checks against $packet->{'a'}{'id'}
498             }
499             }
500              
501             sub dump_ipv4_buckets {
502             my @return = _dump_buckets($_[0], $_[0]->ipv4_routing_table());
503             return wantarray ? @return : sub { say $_ for @_ }
504             ->(@return);
505             }
506              
507             sub dump_ipv6_buckets {
508             my @return = _dump_buckets($_[0], $_[0]->ipv6_routing_table());
509             return wantarray ? @return : sub { say $_ for @_ }
510             ->(@return);
511             }
512              
513             sub _dump_buckets {
514             my ($self, $routing_table) = @_;
515             my @return = sprintf 'Num buckets: %d. My DHT ID: %s',
516             $routing_table->count_buckets, $self->nodeid->to_Hex;
517             my ($x, $t_primary, $t_backup) = (0, 0, 0);
518             for my $bucket (@{$routing_table->buckets}) {
519             push @return, sprintf 'Bucket %s: %s (replacement cache: %d)',
520             $x++, $bucket->floor->to_Hex, $bucket->count_backup_nodes;
521             for my $node (@{$bucket->nodes}) {
522             push @return,
523             sprintf ' %s %s:%d fail:%d seen:%d age:%s ver:%s',
524             $node->nodeid->to_Hex, $node->host,
525             $node->port, $node->fail || 0, $node->seen,
526             __duration(time - $node->birth), $node->v || '?';
527             }
528             $t_primary += $bucket->count_nodes;
529             $t_backup += $bucket->count_backup_nodes;
530             }
531             push @return, sprintf 'Total peers: %d (in replacement cache %d)',
532             $t_primary + $t_backup, $t_backup;
533             push @return, sprintf 'Outstanding add nodes: %d',
534             scalar $routing_table->outstanding_add_nodes;
535             push @return,
536             sprintf
537             'Received: %d requests (%s), %d replies (%s), %d invalid (%s)',
538             $self->_recv_requests_count,
539             __data($self->_recv_requests_length),
540             $self->_recv_replies_count,
541             __data($self->_recv_replies_length),
542             $self->_recv_invalid_count,
543             __data($self->_recv_invalid_length);
544             push @return, sprintf 'Sent: %d requests (%s), %d replies (%s)',
545             $self->_send_requests_count,
546             __data($self->_send_requests_length),
547             $self->_send_replies_count,
548             __data($self->_send_replies_length);
549             return @return;
550             }
551             has 'port' => (is => 'ro',
552             isa => 'Int|ArrayRef[Int]',
553             builder => '_build_port',
554             writer => '_set_port'
555             );
556              
557             sub _build_port {
558             0; # Let the system pick
559             }
560             my %_sock_types = (4 => '0.0.0.0', 6 => '::');
561             for my $ipv (keys %_sock_types) {
562             has 'udp'
563             . $ipv => (is => 'ro',
564             init_arg => undef,
565             isa => 'Maybe[Object]',
566             lazy_build => 1,
567             writer => '_set_udp' . $ipv
568             );
569             has 'udp'
570             . $ipv
571             . '_sock' => (is => 'ro',
572             init_arg => undef,
573             isa => 'GlobRef',
574             lazy_build => 1,
575             weak_ref => 1,
576             writer => '_set_udp' . $ipv . '_sock'
577             );
578             has 'udp'
579             . $ipv
580             . '_host' => (is => 'ro',
581             isa => 'Str',
582             default => $_sock_types{$ipv},
583             writer => '_set_udp' . $ipv . '_host'
584             );
585             }
586             #
587             has 'ip_filter' => (is => 'ro',
588             isa => 'Maybe[Config::IPFilter]',
589             init_arg => undef,
590             builder => '_build_ip_filter'
591             );
592              
593             sub _build_ip_filter {
594             return eval('require Config::IPFilter;') ? Config::IPFilter->new() : ();
595             }
596              
597             sub _build_udp6 {
598             my $s = shift;
599             my ($server, $actual_socket, $actual_host, $actual_port);
600             for my $port (ref $s->port ? @{$s->port} : $s->port) {
601             $server = server(
602             $s->udp6_host,
603             $port,
604             sub { $s->_on_udp6_in(@_); },
605             sub {
606             ($actual_socket, $actual_host, $actual_port) = @_;
607              
608             #if ($self->port != $port) { ...; }
609             $s->_set_udp6_sock($actual_socket);
610             $s->_set_udp6_host($actual_host);
611             $s->_set_port($actual_port);
612             },
613             'udp'
614             );
615             last if defined $server;
616             }
617             if ($server) {
618             $s->trigger_listen_success(
619             {port => $actual_port,
620             protocol => 'udp6',
621             severity => 'debug',
622             event => 'listen_success',
623             message => sprintf
624             'Bound UDP port %d to the outside world over IPv6',
625             $actual_port
626             }
627             );
628             }
629             else {
630             $s->trigger_listen_failure(
631             {port => $s->port,
632             protocol => 'udp6',
633             severity => 'fatal',
634             event => 'listen_failure',
635             message =>
636             'Failed to bind UDP port for the outside world over IPv6'
637             }
638             );
639             }
640             return $server;
641             }
642              
643             sub _build_udp4 {
644             my $s = shift;
645             my ($server, $actual_socket, $actual_host, $actual_port);
646             for my $port (ref $s->port ? @{$s->port} : $s->port) {
647             $server = server(
648             $s->udp4_host,
649             $port,
650             sub { $s->_on_udp4_in(@_); },
651             sub {
652             ($actual_socket, $actual_host, $actual_port) = @_;
653              
654             #if ($self->port != $port) { ...; }
655             $s->_set_udp4_sock($actual_socket);
656             $s->_set_udp4_host($actual_host);
657             $s->_set_port($actual_port);
658             },
659             'udp'
660             );
661             last if defined $server;
662             }
663             if ($server) {
664             $s->trigger_listen_success(
665             {port => $actual_port,
666             protocol => 'udp4',
667             severity => 'debug',
668             event => 'listen_success',
669             message => sprintf
670             'Bound UDP port %d to the outside world over IPv4',
671             $actual_port
672             }
673             );
674             }
675             else {
676             $s->trigger_listen_failure(
677             {port => $s->port,
678             protocol => 'udp4',
679             severity => 'fatal',
680             event => 'listen_failure',
681             message =>
682             'Failed to bind UDP port for the outside world over IPv4'
683             }
684             );
685             }
686             return $server;
687             }
688             around '_on_udp4_in' => sub {
689             my ($c, $s, $sock, $sockaddr, $host, $port, $data, $flags) = @_;
690             if (defined $s->ip_filter) {
691             my $rule = $s->ip_filter->is_banned($host);
692             if (defined $rule) {
693             $s->trigger_ip_filter(
694             {protocol => 'udp4',
695             severity => 'debug',
696             event => 'ip_filter',
697             address => [$host, $port],
698             rule => $rule,
699             message => 'Incoming data was blocked by ipfilter'
700             }
701             );
702             return;
703             }
704             }
705             $c->($s, $sock, $sockaddr, $host, $port, $data, $flags);
706             };
707             around '_on_udp6_in' => sub {
708             my ($c, $s, $sock, $sockaddr, $host, $port, $data, $flags) = @_;
709             my $rule = $s->ip_filter->is_banned($host);
710             if (defined $rule) {
711             $s->trigger_ip_filter(
712             {protocol => 'udp6',
713             severity => 'debug',
714             event => 'ip_filter',
715             address => [$host, $port],
716             rule => $rule,
717             message => 'Incoming data was blocked by ipfilter'
718             }
719             );
720             return;
721             }
722             $c->($s, $sock, $sockaddr, $host, $port, $data, $flags);
723             };
724              
725             # Callback system
726             sub _build_callback_no_op {
727             sub {1}
728             }
729             has "on_$_" => (isa => 'CodeRef',
730             is => 'ro',
731             traits => ['Code'],
732             handles => {"trigger_$_" => 'execute_method'},
733             lazy_build => 1,
734             builder => '_build_callback_no_op',
735             clearer => "_no_$_",
736             weak_ref => 1
737             )
738             for qw[
739             listen_failure listen_success
740             ];
741              
742             sub server ($$&;&$) {
743             my ($host, $port, $callback, $prepare, $proto) = @_;
744             $proto //= 'tcp';
745             my $sockaddr = Net::BitTorrent::DHT::sockaddr($host, $port) or return;
746             my $type = length $sockaddr == 16 ? PF_INET : PF_INET6;
747             socket my ($socket), $type,
748             $proto eq 'udp' ? SOCK_DGRAM : SOCK_STREAM, getprotobyname($proto)
749             or return;
750              
751             # - What is the difference between SO_REUSEADDR and SO_REUSEPORT?
752             # [http://www.unixguide.net/network/socketfaq/4.11.shtml]
753             # SO_REUSEPORT is undefined on Win32 and pre-2.4.15 Linux distros.
754             setsockopt $socket, SOL_SOCKET, SO_REUSEADDR, pack('l', 1)
755             or return
756             if $^O !~ m[Win32];
757             return if !bind $socket, $sockaddr;
758             my $listen = 8;
759             if (defined $prepare) {
760             my ($_port, $packed_ip)
761             = Net::BitTorrent::DHT::unpack_sockaddr(getsockname $socket);
762             my $return = $prepare->($socket, paddr2ip($packed_ip), $_port);
763             $listen = $return if defined $return;
764             }
765             require AnyEvent::Util;
766             AnyEvent::Util::fh_nonblocking $socket, 1;
767             listen $socket, $listen or return if $proto ne 'udp';
768             return AE::io(
769             $socket, 0,
770             $proto eq 'udp' ?
771             sub {
772             my $flags = 0;
773             if ($socket
774             && (my $peer = recv $socket, my ($data), 16 * 1024, $flags))
775             { my ($service, $host)
776             = Net::BitTorrent::DHT::unpack_sockaddr($peer);
777             $callback->($socket, $peer, paddr2ip($host), $service,
778             $data, $flags
779             );
780             }
781             }
782             : sub {
783             while ($socket
784             && (my $peer = accept my ($fh), $socket))
785             { my ($service, $host)
786             = Net::BitTorrent::DHT::unpack_sockaddr($peer);
787             $callback->($fh, $peer, paddr2ip($host), $service);
788             }
789             }
790             );
791             }
792              
793             sub paddr2ip ($) {
794             return inet_ntoa($_[0]) if length $_[0] == 4; # ipv4
795             return inet_ntoa($1)
796             if length $_[0] == 16
797             && $_[0] =~ m[^\0{10}\xff{2}(.{4})$]; # ipv4
798             return unless length($_[0]) == 16;
799             my @hex = (unpack('n8', $_[0]));
800             $hex[9] = $hex[7] & 0xff;
801             $hex[8] = $hex[7] >> 8;
802             $hex[7] = $hex[6] & 0xff;
803             $hex[6] >>= 8;
804             my $return = sprintf '%X:%X:%X:%X:%X:%X:%D:%D:%D:%D', @hex;
805             $return =~ s|(0+:)+|:|x;
806             $return =~ s|^0+ ||x;
807             $return =~ s|^:+ |::|x;
808             $return =~ s|::0+ |::|x;
809             $return =~ s|^::(\d+):(\d+):(\d+):(\d+)|$1.$2.$3.$4|x;
810             return $return;
811             }
812              
813             sub __duration ($) {
814             my %dhms = (d => int($_[0] / (24 * 60 * 60)),
815             h => ($_[0] / (60 * 60)) % 24,
816             m => ($_[0] / 60) % 60,
817             s => $_[0] % 60
818             );
819             return join ' ', map { $dhms{$_} ? $dhms{$_} . $_ : () } sort keys %dhms;
820             }
821              
822             sub unpack_sockaddr ($) {
823             my ($packed_host) = @_;
824             return length $packed_host == 28 ?
825             (unpack('SnLa16L', $packed_host))[1, 3]
826             : unpack_sockaddr_in($packed_host);
827             }
828              
829             sub sockaddr ($$) {
830             my $resolver = AE::cv();
831             AnyEvent::Socket::resolve_sockaddr(
832             $_[0],
833             $_[1],
834             0, undef, undef,
835             sub {
836             $resolver->send($_[0]->[3]);
837             }
838             );
839             return $resolver->recv();
840             }
841              
842             sub __data($) {
843             $_[0] >= 1073741824 ? sprintf('%0.2f GB', $_[0] / 1073741824)
844             : $_[0] >= 1048576 ? sprintf('%0.2f MB', $_[0] / 1048576)
845             : $_[0] >= 1024 ? sprintf('%0.2f KB', $_[0] / 1024)
846             : $_[0] . ' bytes';
847             }
848             1;
849              
850             =pod
851              
852             =head1 NAME
853              
854             Net::BitTorrent::DHT - Kademlia-like DHT Node for BitTorrent
855              
856             =head1 Synopsis
857              
858             use Net::BitTorrent::DHT;
859             use AnyEvent;
860             use Bit::Vector;
861             # Standalone node with user-defined port and boot_nodes
862             my $dht = Net::BitTorrent::DHT->new(
863             port => [1337 .. 1340, 0],
864             boot_nodes =>
865             [['router.bittorrent.com', 6881], ['router.utorrent.com', 6881]]
866             );
867              
868             my $peer_quest
869             = $dht->get_peers(Bit::Vector->new_Hex('ab97a7bca78f2628380e6609a8241a7fb02aa981'), \&dht_cb);
870              
871             # tick, tick, tick, ...
872             AnyEvent->condvar->recv;
873              
874             sub dht_cb {
875             my ($infohash, $node, $peers) = @_;
876             printf "We found %d peers for %s from %s:%d via DHT\n\t%s\n",
877             scalar(@$peers),
878             $infohash->to_Hex, $node->host, $node->port,
879             join ', ', map { sprintf '%s:%d', @$_ } @$peers;
880             }
881              
882             =head1 Description
883              
884             BitTorrent uses a "distributed sloppy hash table" (DHT) for storing peer
885             contact information for "trackerless" torrents. In effect, each peer becomes a
886             tracker. The protocol is based on L<Kademila|/Kademlia> and is implemented
887             over UDP.
888              
889             =head1 Methods
890              
891             L<Net::BitTorrent::DHT|Net::BitTorrent::DHT>'s API is simple but powerful.
892             ...well, I think so anyway.
893              
894             =head1 Net::BitTorrent::DHT->new( )
895              
896             The constructor accepts a number different arguments which all greatly affect
897             the function of your DHT node. Any combination of the following arguments may
898             be used during construction.
899              
900             For brevity, the following examples assume you are building a
901             L<standalone node|Net::BitTorrent::DHT::Standalone> (for reasearch, etc.).
902              
903             =head2 Net::BitTorrent::DHT->new( nodeid => ... )
904              
905             During construction, our local DHT nodeID can be set during construction. This
906             is mostly useful when creating a
907             L<standalone DHT node|Net::BitTorrent::DHT::Standalone>.
908              
909             use Net::BitTorrent::DHT;
910             # Bit::Vector object
911             use Bit::Vector;
912             my $node_c = Net::BitTorrent::DHT->new(
913             nodeid => Bit::Vector->new_Hex( 160, 'ABCD' x 10 )
914             );
915             # A SHA1 digest
916             use Digest::SHA;
917             my $node_d = Net::BitTorrent::DHT->new(
918             nodeid => Bit::Vector->new_Hex( 160, Digest::SHA::sha1( $possibly_random_value ) )
919             );
920              
921             Note that storing and reusing DHT nodeIDs over a number of sessions may seem
922             advantagious (as if you had a "reserved parking place" in the DHT network) but
923             will likely not improve performance as unseen nodeIDs are removed from remote
924             routing tables after a half hour.
925              
926             NodeIDs, are 160-bit integers.
927              
928             =head2 Net::BitTorrent::DHT->new( port => ... )
929              
930             Opens a specific UDP port number to the outside world on both IPv4 and IPv6.
931              
932             use Net::BitTorrent::DHT;
933             # A single possible port
934             my $node_a = Net::BitTorrent::DHT->new( port => 1123 );
935             # A list of ports
936             my $node_b = Net::BitTorrent::DHT->new( port => [1235 .. 9875] );
937              
938             Note that when handed a list of ports, they are each tried until we are able
939             to bind to the specific port.
940              
941             =head1 Net::BitTorrent::DHT->find_node( $target, $callback )
942              
943             This method asks for remote nodes with nodeIDs closer to our target. As the
944             remote nodes respond, the callback is called with the following arguments:
945              
946             =over
947              
948             =item * target
949              
950             This is the target nodeid. This is useful when you've set the same callback
951             for multiple, concurrent C<find_node( )> L<quest|/"Quests and Callbacks">.
952              
953             Targets are 160-bit L<Bit::Vector|Bit::Vector> objects.
954              
955             =item * node
956              
957             This is a blessed object. TODO.
958              
959             =item * nodes
960              
961             This is a list of ip:port combinations the remote node claims are close to our
962             target.
963              
964             =back
965              
966             A single C<find_node> L<quest|Net::BitTorrent::Notes/"Quests and Callbacks">
967             is an array ref which contains the following data:
968              
969             =over
970              
971             =item * target
972              
973             This is the target nodeID.
974              
975             =item * coderef
976              
977             This is the callback triggered as we locate new peers.
978              
979             =item * nodes
980              
981             This is a list of nodes we have announced to so far.
982              
983             =item * timer
984              
985             This is an L<AnyEvent|AnyEvent> timer which is triggered every few minutes.
986              
987             Don't modify this.
988              
989             =back
990              
991             =head1 Net::BitTorrent::DHT->get_peers( $infohash, $callback )
992              
993             This method initiates a search for peers serving a torrent with this infohash.
994             As they are found, the callback is called with the following arguments:
995              
996             =over
997              
998             =item * infohash
999              
1000             This is the infohash related to these peers. This is useful when you've set
1001             the same callback for multiple, concurrent C<get_peers( )> quests. This is a
1002             160-bit L<Bit::Vector|Bit::Vector> object.
1003              
1004             =item * node
1005              
1006             This is a blessed object. TODO.
1007              
1008             =item * peers
1009              
1010             This is an array ref of peers sent to us by aforementioned remote node.
1011              
1012             =back
1013              
1014             A single C<get_peers> L<quest|Net::BitTorrent::Notes/"Quests and Callbacks">
1015             is an array ref which contains the following data:
1016              
1017             =over
1018              
1019             =item * infohash
1020              
1021             This is the infohash related to these peers. This is a 160-bit
1022             L<Bit::Vector|Bit::Vector> object.
1023              
1024             =item * coderef
1025              
1026             This is the callback triggered as we locate new peers.
1027              
1028             =item * peers
1029              
1030             This is a compacted list of all peers found so far. This is probably more
1031             useful than the list passed to the callback.
1032              
1033             =item * timer
1034              
1035             This is an L<AnyEvent|AnyEvent> timer which is triggered every five minutes.
1036             When triggered, the node requests new peers from nodes in the bucket nearest
1037             to the infohash.
1038              
1039             Don't modify this.
1040              
1041             =back
1042              
1043             =head1 Net::BitTorrent::DHT->B<announce_peer>( $infohash, $port, $callback )
1044              
1045             This method announces that the peer controlling the querying node is
1046             downloading a torrent on a port. These outgoing queries are sent to nodes
1047             'close' to the target infohash. As the remote nodes respond, the callback is
1048             called with the following arguments:
1049              
1050             =over
1051              
1052             =item * infohash
1053              
1054             This is the infohash related to this announcment. This is useful when you've
1055             set the same callback for multiple, concurrent C<announce_peer( )>
1056             L<quest|/"Quests and Callbacks">. Infohashes are 160-bit
1057             L<Bit::Vector|Bit::Vector> objects.
1058              
1059             =item * port
1060              
1061             This is port you defined above.
1062              
1063             =item * node
1064              
1065             This is a blessed object. TODO.
1066              
1067             =back
1068              
1069             A single C<announce_peer> L<quest|/"Quests and Callbacks"> is an array ref
1070             which contains the following data:
1071              
1072             =over
1073              
1074             =item * infohash
1075              
1076             This is the infohash related to these peers. This is a 160-bit
1077             L<Bit::Vector|Bit::Vector> object.
1078              
1079             =item * coderef
1080              
1081             This is the callback triggered as we locate new peers.
1082              
1083             =item * port
1084              
1085             This is port you defined above.
1086              
1087             =item * nodes
1088              
1089             This is a list of nodes we have announced to so far.
1090              
1091             =item * timer
1092              
1093             This is an L<AnyEvent|AnyEvent> timer which is triggered every few minutes.
1094              
1095             Don't modify this.
1096              
1097             =back
1098              
1099             C<announce_peer> queries require a token sent in reply to a C<get_peers> query
1100             so they should be used together.
1101              
1102             =for meditation
1103             Should I automatically send get_peers queries before an announce if the token
1104             is missing?
1105              
1106             use Net::BitTorrent::DHT;
1107             my $node = Net::BitTorrent::DHT->new( );
1108             my $quest_a = $dht->announce_peer(Bit::Vector->new_Hex('A' x 40), 6881, \&dht_cb);
1109             my $quest_b = $dht->announce_peer(Bit::Vector->new_Hex('1' x 40), 9585, \&dht_cb);
1110              
1111             sub dht_cb {
1112             my ($infohash, $port, $node) = @_;
1113             say sprintf '%s:%d now knows we are serving %s on port %d',
1114             $node->host, $node->port, $infohash->to_Hex, $port;
1115             }
1116              
1117             =head1 Net::BitTorrent::DHT->dump_ipv4_buckets( )
1118              
1119             This is a quick utility method which returns or prints (depending on context)
1120             a list of the IPv4-based routing table's bucket structure.
1121              
1122             use Net::BitTorrent::DHT;
1123             my $node = Net::BitTorrent::DHT->new( );
1124             # After some time has passed...
1125             $node->dump_ipv4_buckets; # prints to STDOUT with say
1126             my @dump = $node->dump_ipv4_buckets; # returns list of lines
1127              
1128             =head1 Net::BitTorrent::DHT->dump_ipv6_buckets( )
1129              
1130             This is a quick utility method which returns or prints (depending on context)
1131             a list of the IPv6-based routing table's bucket structure.
1132              
1133             use Net::BitTorrent::DHT;
1134             my $node = Net::BitTorrent::DHT->new( );
1135             # After some time has passed...
1136             $node->dump_ipv6_buckets; # prints to STDOUT with say
1137             my @dump = $node->dump_ipv6_buckets; # returns list of lines
1138              
1139             =head1 Author
1140              
1141             Sanko Robinson <sanko@cpan.org> - http://sankorobinson.com/
1142              
1143             CPAN ID: SANKO
1144              
1145             =head1 License and Legal
1146              
1147             Copyright (C) 2008-2014 by Sanko Robinson <sanko@cpan.org>
1148              
1149             This program is free software; you can redistribute it and/or modify it under
1150             the terms of
1151             L<The Artistic License 2.0|http://www.perlfoundation.org/artistic_license_2_0>.
1152             See the F<LICENSE> file included with this distribution or
1153             L<notes on the Artistic License 2.0|http://www.perlfoundation.org/artistic_2_0_notes>
1154             for clarification.
1155              
1156             When separated from the distribution, all original POD documentation is
1157             covered by the
1158             L<Creative Commons Attribution-Share Alike 3.0 License|http://creativecommons.org/licenses/by-sa/3.0/us/legalcode>.
1159             See the
1160             L<clarification of the CCA-SA3.0|http://creativecommons.org/licenses/by-sa/3.0/us/>.
1161              
1162             Neither this module nor the L<Author|/Author> is affiliated with BitTorrent,
1163             Inc.
1164              
1165             =cut