File Coverage

blib/lib/Net/Async/TransferFD.pm
Criterion Covered Total %
statement 70 97 72.1
branch 8 26 30.7
condition 1 3 33.3
subroutine 16 22 72.7
pod 8 10 80.0
total 103 158 65.1


line stmt bran cond sub pod time code
1             package Net::Async::TransferFD;
2             # ABSTRACT: send handles between processes
3 1     1   130943 use strict;
  1         2  
  1         28  
4 1     1   5 use warnings;
  1         2  
  1         31  
5              
6 1     1   5 use parent qw(IO::Async::Notifier);
  1         3  
  1         5  
7              
8             our $VERSION = '0.002';
9              
10             =head1 NAME
11              
12             Net::Async::TransferFD - support for transferring handles between
13             processes via socketpair
14              
15             =head1 VERSION
16              
17             version 0.002
18              
19             =head1 SYNOPSIS
20              
21             use feature qw(say);
22             my $loop = IO::Async::Loop->new;
23             my $proc = IO::Async::Process->new(
24             code => sub { ... },
25             fd3 => { via => 'socketpair' },
26             );
27             $loop->add(my $control = Net::Async::TransferFD->new(
28             handle => $proc->fd(3),
29             on_fh => sub {
30             my $h = shift;
31             say "New handle $h - " . join '', <$h>;
32             }
33             ));
34             $control->send(\*STDIN);
35              
36             =head1 DESCRIPTION
37              
38             Uses SCM_RIGHTS to pass an open handle from one process to another.
39             Typically used to hand a network socket off to another process, for
40             example an accept loop in one process dispatching incoming connections
41             to other active processes.
42              
43             =cut
44              
45 1     1   72 use IO::Async::Handle;
  1         2  
  1         29  
46              
47 1     1   718 use Socket::MsgHdr qw(sendmsg recvmsg);
  1         1078  
  1         7  
48 1     1   70 use Socket qw(AF_UNIX SOCK_STREAM PF_UNSPEC SOL_SOCKET SCM_RIGHTS);
  1         2  
  1         61  
49 1     1   744 use curry::weak;
  1         968  
  1         28  
50 1     1   5 use Scalar::Util qw(weaken);
  1         2  
  1         49  
51 1     1   674 use Variable::Disposition qw(retain_future);
  1         438  
  1         55  
52              
53             # Not sure of a good value for this but 16 seems low enough
54             # to avoid problems, we'll split into multiple packets if
55             # we have more than this number of pending FDs to send.
56             # On linux the /proc/sys/net/core/optmem_max figure may be
57             # relevant here - it's 10240 on one test system.
58 1     1   12 use constant MAX_FD_PER_PACKET => 16;
  1         2  
  1         978  
59              
60             =head1 METHODS
61              
62             =cut
63              
64             =head2 outgoing_packet
65              
66             Convert a list of handles to a cmsghdr struct suitable for
67             transferring to another process.
68              
69             Returns the encoded cmsghdr struct.
70              
71             =cut
72              
73             sub outgoing_packet {
74 1     1 1 7 my $self = shift;
75             # FIXME presumably this 512 figure should really be calculated,
76             # also surely it'd be controllen rather than buflen?
77 1         6 my $data = pack "i" x @_, map $_->fileno, @_;
78 1         36 my $hdr = Socket::MsgHdr->new(buflen => length($data));
79 1         83 $hdr->cmsghdr(SOL_SOCKET, SCM_RIGHTS, $data);
80 1         28 $hdr
81             }
82              
83             =head2 recv_fds
84              
85             Receive packet containing FDs.
86              
87             Takes a single coderef which will be called with two
88             parameters.
89              
90             Returns $self.
91              
92             =cut
93              
94             sub recv_fds {
95 0     0 1 0 my $self = shift;
96 0         0 my $handler = shift;
97             # FIXME more magic numbers
98 0         0 my $inHdr = Socket::MsgHdr->new(buflen => 512, controllen => 512);
99             $handler->($inHdr, sub {
100 0     0   0 my ($level, $type, $data) = $inHdr->cmsghdr();
101 0         0 unpack('i*', $data);
102 0         0 });
103 0         0 $self
104             }
105              
106 8     8 0 118 sub handle { shift->{handle} }
107              
108             =head2 send_queued
109              
110             If we have any FDs queued for sending, bundle them into a packet
111             and send them over. Will close the FDs once the send is complete.
112              
113             Returns $self.
114              
115             =cut
116              
117             sub send_queued {
118 1     1 1 7016 my $self = shift;
119             # Send a single batch at a time
120 1 50       1 if(@{$self->{pending} || []}) {
  1 50       8  
121 1         2 my @chunk = splice @{$self->{pending}}, 0, MAX_FD_PER_PACKET;
  1         4  
122 1         5 my @fd = map $_->[0], @chunk;
123 1         52 my @future = map $_->[1], @chunk;
124 1         6 $self->debug_printf("Sending %d FDs - %s", scalar(@fd), join ',', map $_->fileno, @fd);
125 1         24 sendmsg $self->handle->write_handle, $self->outgoing_packet(
126             @fd
127             );
128 1         25 $_->close for @fd;
129 1         18 $_->done for @future;
130             }
131             # If we have any leftovers, we hope to be called next time around
132 1 0 33     1035 $self->handle->want_writeready(0) if $self->handle && ! @{$self->{pending}||[]};
  0 50       0  
133 1         4 $self
134             }
135              
136             =head2 read_pending
137              
138             Reads any pending messages, converting to FDs
139             as appropriate and calling the on_fh callback.
140              
141             Returns $self.
142              
143             =cut
144              
145             sub read_pending {
146 0     0 1 0 my $self = shift;
147 0         0 $self->recv_fds($self->curry::accept_fds);
148             }
149              
150             =head2 accept_fds
151              
152             Attempts to accept the given FDs from the remote.
153              
154             Will call L for each received file descriptor after reopening.
155              
156             =cut
157              
158             sub accept_fds {
159 0     0 1 0 my ($self, $hdr, $code) = @_;
160 0 0       0 defined(recvmsg $self->handle->write_handle, $hdr, 0)
161             or $self->debug_printf("Failed to recvmsg - %s", $!);
162 0 0       0 unless(length $hdr->{control}) {
163 0         0 $self->debug_printf("No control data, remote has probably gone away - closing");
164 0         0 $self->handle->want_readready(0);
165 0         0 return;
166             }
167              
168 0         0 my @fd = $code->();
169 0         0 foreach my $fileno (@fd) {
170 0         0 $self->debug_printf("Opening handle for %d", $fileno);
171 0 0       0 open my $fh, '+<&=', $fileno or die $!;
172 0         0 $self->on_fh($fh);
173             }
174             }
175              
176             =head2 on_fh
177              
178             Calls the configured filehandle method if provided (via L(C)).
179             =cut
180              
181             sub on_fh {
182 0     0 1 0 my ($self, $fh) = @_;
183 0 0       0 $self->{on_fh}->($fh) if $self->{on_fh};
184 0         0 $self
185             }
186              
187             sub configure {
188 1     1 1 40199 my $self = shift;
189 1         17 my %args = @_;
190              
191 1 50       21 $self->{on_fh} = delete $args{on_fh} if exists $args{on_fh};
192              
193 1 50       10 if(exists $args{handle}) {
194 1         16 my $h = delete $args{handle};
195 1 50       39 if($h->isa('IO::Async::Handle')) {
196 1         9 $self->{handle} = $h;
197 1         9 $self->handle->configure(
198             on_write_ready => $self->curry::weak::send_queued,
199             on_read_ready => $self->curry::weak::read_pending,
200             );
201             } else {
202             $self->add_child(
203 0         0 $self->{handle} = IO::Async::Handle->new(
204             handle => $h,
205             on_write_ready => $self->curry::weak::send_queued,
206             on_read_ready => $self->curry::weak::read_pending,
207             )
208             );
209             }
210 1         341 $self->handle->want_writeready(0);
211 1         9 $self->handle->want_readready(1);
212             };
213 1         23 $self->SUPER::configure(%args);
214             }
215              
216             =head2 send
217              
218             Sends the given FDs to the remote, returning a L which will resolve once
219             all FDs have been transferred.
220              
221             =cut
222              
223             sub send {
224 1     1 1 2654 my $self = shift;
225 1         13 my @future;
226 1         4 for (@_) {
227 1         2 push @{$self->{pending}}, [
  1         23  
228             $_,
229             my $f = $self->loop->new_future
230             ];
231 1         34 push @future, $f;
232             }
233 1 50       5 $self->handle->want_writeready(1) if $self->handle;
234 1         136 retain_future(
235             Future->wait_all(@future)
236             )
237             }
238              
239             sub _remove_from_loop {
240 0     0   0 my ($self) = @_;
241 0         0 $self->stop;
242             }
243              
244             sub stop {
245 1     1 0 321 my ($self) = @_;
246 1 50       4 (delete $self->{handle})->close if $self->handle;
247             }
248              
249              
250             1;
251              
252             __END__