File Coverage

blib/lib/IO/Async/Listener.pm
Criterion Covered Total %
statement 97 107 90.6
branch 38 58 65.5
condition 1 6 16.6
subroutine 21 22 95.4
pod 7 8 87.5
total 164 201 81.5


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2008-2015 -- leonerd@leonerd.org.uk
5              
6             package IO::Async::Listener;
7              
8 4     4   16436 use strict;
  4         7  
  4         101  
9 4     4   18 use warnings;
  4         5  
  4         105  
10 4     4   21 use base qw( IO::Async::Handle );
  4         5  
  4         1406  
11              
12             our $VERSION = '0.802';
13              
14 4     4   24 use IO::Async::Handle;
  4         5  
  4         80  
15 4     4   15 use IO::Async::OS;
  4         5  
  4         82  
16              
17 4     4   17 use Future 0.33; # ->catch
  4         73  
  4         104  
18              
19 4     4   18 use Errno qw( EAGAIN EWOULDBLOCK );
  4         6  
  4         221  
20              
21 4     4   28 use Socket qw( sockaddr_family SOL_SOCKET SO_ACCEPTCONN SO_TYPE );
  4         6  
  4         193  
22              
23 4     4   19 use Carp;
  4         6  
  4         4383  
24              
25             =head1 NAME
26              
27             C - listen on network sockets for incoming connections
28              
29             =head1 SYNOPSIS
30              
31             use IO::Async::Listener;
32              
33             use IO::Async::Loop;
34             my $loop = IO::Async::Loop->new;
35              
36             my $listener = IO::Async::Listener->new(
37             on_stream => sub {
38             my ( undef, $stream ) = @_;
39              
40             $stream->configure(
41             on_read => sub {
42             my ( $self, $buffref, $eof ) = @_;
43             $self->write( $$buffref );
44             $$buffref = "";
45             return 0;
46             },
47             );
48              
49             $loop->add( $stream );
50             },
51             );
52              
53             $loop->add( $listener );
54              
55             $listener->listen(
56             service => "echo",
57             socktype => 'stream',
58             )->get;
59              
60             $loop->run;
61              
62             This object can also be used indirectly via an L:
63              
64             use IO::Async::Stream;
65              
66             use IO::Async::Loop;
67             my $loop = IO::Async::Loop->new;
68              
69             $loop->listen(
70             service => "echo",
71             socktype => 'stream',
72              
73             on_stream => sub {
74             ...
75             },
76             )->get;
77              
78             $loop->run;
79              
80             =head1 DESCRIPTION
81              
82             This subclass of L adds behaviour which watches a socket in
83             listening mode, to accept incoming connections on them.
84              
85             A Listener can be constructed and given a existing socket in listening mode.
86             Alternatively, the Listener can construct a socket by calling the C
87             method. Either a list of addresses can be provided, or a service name can be
88             looked up using the underlying loop's C method.
89              
90             =cut
91              
92             =head1 EVENTS
93              
94             The following events are invoked, either using subclass methods or CODE
95             references in parameters:
96              
97             =head2 on_accept $clientsocket | $handle
98              
99             Invoked whenever a new client connects to the socket.
100              
101             If neither C nor C parameters are set, this
102             will be invoked with the new client socket directly. If a handle constructor
103             or class are set, this will be invoked with the newly-constructed handle,
104             having the new socket already configured onto it.
105              
106             =head2 on_stream $stream
107              
108             An alternative to C, this is passed an instance of
109             L when a new client connects. This is provided as a
110             convenience for the common case that a Stream object is required as the
111             transport for a Protocol object.
112              
113             This is now vaguely deprecated in favour of using C with a handle
114             constructor or class.
115              
116             =head2 on_socket $socket
117              
118             Similar to C, but constructs an instance of L.
119             This is most useful for C or C sockets.
120              
121             This is now vaguely deprecated in favour of using C with a handle
122             constructor or class.
123              
124             =head2 on_accept_error $socket, $errno
125              
126             Optional. Invoked if the C syscall indicates an error (other than
127             C or C). If not provided, failures of C will
128             be passed to the main C handler.
129              
130             =cut
131              
132             =head1 PARAMETERS
133              
134             The following named parameters may be passed to C or C:
135              
136             =head2 on_accept => CODE
137              
138             =head2 on_stream => CODE
139              
140             =head2 on_socket => CODE
141              
142             CODE reference for the event handlers. Because of the mutually-exclusive
143             nature of their behaviour, only one of these may be set at a time. Setting one
144             will remove the other two.
145              
146             =head2 handle => IO
147              
148             The IO handle containing an existing listen-mode socket.
149              
150             =head2 handle_constructor => CODE
151              
152             Optional. If defined, gives a CODE reference to be invoked every time a new
153             client socket is accepted from the listening socket. It is passed the listener
154             object itself, and is expected to return a new instance of
155             L or a subclass, used to wrap the new client socket.
156              
157             $handle = $handle_constructor->( $listener )
158              
159             This can also be given as a subclass method
160              
161             $handle = $listener->handle_constructor()
162              
163             =head2 handle_class => STRING
164              
165             Optional. If defined and C isn't, then new wrapper handles
166             are constructed by invoking the C method on the given class name, passing
167             in no additional parameters.
168              
169             $handle = $handle_class->new()
170              
171             This can also be given as a subclass method
172              
173             $handle = $listener->handle_class->new
174              
175             =head2 acceptor => STRING|CODE
176              
177             Optional. If defined, gives the name of a method or a CODE reference to use to
178             implement the actual accept behaviour. This will be invoked as:
179              
180             ( $accepted ) = $listener->acceptor( $socket )->get
181              
182             ( $handle ) = $listener->acceptor( $socket, handle => $handle )->get
183              
184             It is invoked with the listening socket as its its argument, and optionally
185             an L instance as a named parameter, and is expected to
186             return a C that will eventually yield the newly-accepted socket or
187             handle instance, if such was provided.
188              
189             =cut
190              
191             sub _init
192             {
193 12     12   50 my $self = shift;
194 12         41 $self->SUPER::_init( @_ );
195              
196 12         36 $self->{acceptor} = "_accept";
197             }
198              
199             my @acceptor_events = qw( on_accept on_stream on_socket );
200              
201             sub configure
202             {
203 18     18 1 558 my $self = shift;
204 18         37 my %params = @_;
205              
206 18 100       76 if( grep exists $params{$_}, @acceptor_events ) {
207 11 50       44 grep( defined $_, @params{@acceptor_events} ) <= 1 or
208             croak "Can only set at most one of 'on_accept', 'on_stream' or 'on_socket'";
209              
210             # Don't exists-test, so we'll clear the other two
211 11         44 $self->{$_} = delete $params{$_} for @acceptor_events;
212             }
213              
214 18 50       39 croak "Cannot set 'on_read_ready' on a Listener" if exists $params{on_read_ready};
215              
216 18 100       43 if( defined $params{handle} ) {
    100          
217 10         13 my $handle = delete $params{handle};
218             # Sanity check it - it may be a bare GLOB ref, not an IO::Socket-derived handle
219 10 50       90 defined getsockname( $handle ) or croak "IO handle $handle does not have a sockname";
220              
221             # So now we know it's at least some kind of socket. Is it listening?
222             # SO_ACCEPTCONN would tell us, but not all OSes implement it. Since it's
223             # only a best-effort sanity check, we won't mind if the OS doesn't.
224 10         93 my $acceptconn = getsockopt( $handle, SOL_SOCKET, SO_ACCEPTCONN );
225 10 50 33     71 !defined $acceptconn or unpack( "I", $acceptconn ) or croak "Socket is not accepting connections";
226              
227             # This is a bit naughty but hopefully nobody will mind...
228 10 50       36 bless $handle, "IO::Socket" if ref( $handle ) eq "GLOB";
229              
230 10         44 $self->SUPER::configure( read_handle => $handle );
231             }
232             elsif( exists $params{handle} ) {
233 1         3 delete $params{handle};
234              
235 1         3 $self->SUPER::configure( read_handle => undef );
236             }
237              
238 18 50       50 unless( grep $self->can_event( $_ ), @acceptor_events ) {
239 0         0 croak "Expected to be able to 'on_accept', 'on_stream' or 'on_socket'";
240             }
241              
242 18         32 foreach (qw( acceptor handle_constructor handle_class )) {
243 54 100       93 $self->{$_} = delete $params{$_} if exists $params{$_};
244             }
245              
246 18 50       60 if( keys %params ) {
247 0         0 croak "Cannot pass though configuration keys to underlying Handle - " . join( ", ", keys %params );
248             }
249             }
250              
251             sub on_read_ready
252             {
253 10     10 1 16 my $self = shift;
254              
255 10         34 my $socket = $self->read_handle;
256              
257 10         17 my $on_done;
258             my %acceptor_params;
259              
260 10 100       24 if( $on_done = $self->can_event( "on_stream" ) ) {
    100          
    50          
261             # TODO: It doesn't make sense to put a SOCK_DGRAM in an
262             # IO::Async::Stream but currently we don't detect this
263 1         5 require IO::Async::Stream;
264 1         6 $acceptor_params{handle} = IO::Async::Stream->new;
265             }
266             elsif( $on_done = $self->can_event( "on_socket" ) ) {
267 1         521 require IO::Async::Socket;
268 1         9 $acceptor_params{handle} = IO::Async::Socket->new;
269             }
270             # on_accept needs to be last in case of multiple layers of subclassing
271             elsif( $on_done = $self->can_event( "on_accept" ) ) {
272 8         12 my $handle;
273              
274             # Test both params before moving on to either method
275 8 100       67 if( my $constructor = $self->{handle_constructor} ) {
    50          
    100          
    50          
276 2         5 $handle = $self->{handle_constructor}->( $self );
277             }
278             elsif( my $class = $self->{handle_class} ) {
279 0         0 $handle = $class->new;
280             }
281             elsif( $self->can( "handle_constructor" ) ) {
282 1         3 $handle = $self->handle_constructor;
283             }
284             elsif( $self->can( "handle_class" ) ) {
285 0         0 $handle = $self->handle_class->new;
286             }
287              
288 8 100       20 $acceptor_params{handle} = $handle if $handle;
289             }
290             else {
291 0         0 die "ARG! Missing on_accept,on_stream,on_socket!";
292             }
293              
294 10         27 my $acceptor = $self->acceptor;
295             my $f = $self->$acceptor( $socket, %acceptor_params )->on_done( sub {
296 10 50   10   395 my ( $result ) = @_ or return; # false-alarm
297 10         35 $on_done->( $self, $result );
298             })->catch( accept => sub {
299 0     0   0 my ( $message, $name, @args ) = @_;
300 0         0 my ( $socket, $dollarbang ) = @args;
301 0 0       0 $self->maybe_invoke_event( on_accept_error => $socket, $dollarbang ) or
302             $self->invoke_error( "accept() failed - $dollarbang", accept => $socket, $dollarbang );
303 10         33 });
304              
305             # TODO: Consider if this wants a more fine-grained place to report
306             # non-accept() failures (such as SSL) to
307 10         997 $self->adopt_future( $f );
308             }
309              
310             sub _accept
311             {
312 10     10   13 my $self = shift;
313 10         19 my ( $listen_sock, %params ) = @_;
314              
315 10         31 my $accepted = $listen_sock->accept;
316              
317 10 50 0     1217 if( defined $accepted ) {
    0          
318 10         34 $accepted->blocking( 0 );
319 10 100       140 if( my $handle = $params{handle} ) {
320 5         25 $handle->set_handle( $accepted );
321 5         18 return Future->done( $handle );
322             }
323             else {
324 5         26 return Future->done( $accepted );
325             }
326             }
327             elsif( $! == EAGAIN or $! == EWOULDBLOCK ) {
328 0         0 return Future->done;
329             }
330             else {
331 0         0 return Future->fail( "Cannot accept() - $!", accept => $listen_sock, $! );
332             }
333             }
334              
335             =head1 METHODS
336              
337             The following methods documented with a trailing call to C<< ->get >> return
338             L instances.
339              
340             =cut
341              
342             =head2 acceptor
343              
344             $acceptor = $listener->acceptor
345              
346             Returns the currently-set C method name or code reference. This may
347             be of interest to Loop C extension methods that wish to extend or wrap
348             it.
349              
350             =cut
351              
352             sub acceptor
353             {
354 10     10 1 16 my $self = shift;
355 10         22 return $self->{acceptor};
356             }
357              
358             sub is_listening
359             {
360 3     3 0 1343 my $self = shift;
361              
362 3         7 return ( defined $self->sockname );
363             }
364              
365             =head2 sockname
366              
367             $name = $listener->sockname
368              
369             Returns the C of the underlying listening socket
370              
371             =cut
372              
373             sub sockname
374             {
375 7     7 1 644 my $self = shift;
376              
377 7 100       18 my $handle = $self->read_handle or return undef;
378 6         20 return $handle->sockname;
379             }
380              
381             =head2 family
382              
383             $family = $listener->family
384              
385             Returns the socket address family of the underlying listening socket
386              
387             =cut
388              
389             sub family
390             {
391 2     2 1 1766 my $self = shift;
392              
393 2 50       5 my $sockname = $self->sockname or return undef;
394 2         35 return sockaddr_family( $sockname );
395             }
396              
397             =head2 socktype
398              
399             $socktype = $listener->socktype
400              
401             Returns the socket type of the underlying listening socket
402              
403             =cut
404              
405             sub socktype
406             {
407 2     2 1 5 my $self = shift;
408              
409 2 50       7 my $handle = $self->read_handle or return undef;
410 2         9 return $handle->sockopt(SO_TYPE);
411             }
412              
413             =head2 listen
414              
415             $listener->listen( %params )->get
416              
417             This method sets up a listening socket and arranges for the acceptor callback
418             to be invoked each time a new connection is accepted on the socket.
419              
420             Most parameters given to this method are passed into the C method of
421             the L object. In addition, the following arguments are also
422             recognised directly:
423              
424             =over 8
425              
426             =item on_listen => CODE
427              
428             Optional. A callback that is invoked when the listening socket is ready.
429             Similar to that on the underlying loop method, except it is passed the
430             listener object itself.
431              
432             $on_listen->( $listener )
433              
434             =back
435              
436             =cut
437              
438             sub listen
439             {
440 1     1 1 3 my $self = shift;
441 1         4 my ( %params ) = @_;
442              
443 1         4 my $loop = $self->loop;
444 1 50       4 defined $loop or croak "Cannot listen when not a member of a Loop"; # TODO: defer?
445              
446 1 50       4 if( my $on_listen = delete $params{on_listen} ) {
447 1     1   3 $params{on_listen} = sub { $on_listen->( $self ) };
  1         3  
448             }
449              
450 1         10 $loop->listen( listener => $self, %params );
451             }
452              
453             =head1 EXAMPLES
454              
455             =head2 Listening on UNIX Sockets
456              
457             The C argument can be passed an existing socket already in listening
458             mode, making it possible to listen on other types of socket such as UNIX
459             sockets.
460              
461             use IO::Async::Listener;
462             use IO::Socket::UNIX;
463              
464             use IO::Async::Loop;
465             my $loop = IO::Async::Loop->new;
466              
467             my $listener = IO::Async::Listener->new(
468             on_stream => sub {
469             my ( undef, $stream ) = @_;
470              
471             $stream->configure(
472             on_read => sub {
473             my ( $self, $buffref, $eof ) = @_;
474             $self->write( $$buffref );
475             $$buffref = "";
476             return 0;
477             },
478             );
479              
480             $loop->add( $stream );
481             },
482             );
483              
484             $loop->add( $listener );
485              
486             my $socket = IO::Socket::UNIX->new(
487             Local => "echo.sock",
488             Listen => 1,
489             ) or die "Cannot make UNIX socket - $!\n";
490              
491             $listener->listen(
492             handle => $socket,
493             );
494              
495             $loop->run;
496              
497             =head2 Passing Plain Socket Addresses
498              
499             The C or C parameters should contain a definition of a plain
500             socket address in a form that the L C
501             method can use.
502              
503             This example shows how to listen on TCP port 8001 on address 10.0.0.1:
504              
505             $listener->listen(
506             addr => {
507             family => "inet",
508             socktype => "stream",
509             port => 8001,
510             ip => "10.0.0.1",
511             },
512             ...
513             );
514              
515             This example shows another way to listen on a UNIX socket, similar to the
516             earlier example:
517              
518             $listener->listen(
519             addr => {
520             family => "unix",
521             socktype => "stream",
522             path => "echo.sock",
523             },
524             ...
525             );
526              
527             =head2 Using A Kernel-Assigned Port Number
528              
529             Rather than picking a specific port number, is it possible to ask the kernel
530             to assign one arbitrarily that is currently free. This can be done by
531             requesting port number 0 (which is actually the default if no port number is
532             otherwise specified). To determine which port number the kernel actually
533             picked, inspect the C accessor on the actual socket filehandle.
534              
535             Either use the L returned by the C method:
536              
537             $listener->listen(
538             addr => { family => "inet" },
539             )->on_done( sub {
540             my ( $listener ) = @_;
541             my $socket = $listener->read_handle;
542              
543             say "Now listening on port ", $socket->sockport;
544             });
545              
546             Or pass an C continuation:
547              
548             $listener->listen(
549             addr => { family => "inet" },
550              
551             on_listen => sub {
552             my ( $listener ) = @_;
553             my $socket = $listener->read_handle;
554              
555             say "Now listening on port ", $socket->sockport;
556             },
557             );
558              
559             =head1 AUTHOR
560              
561             Paul Evans
562              
563             =cut
564              
565             0x55AA;