File Coverage

blib/lib/Tak/Daemon/ListenerService.pm
Criterion Covered Total %
statement 6 56 10.7
branch 0 12 0.0
condition 0 6 0.0
subroutine 2 9 22.2
pod n/a
total 8 83 9.6


line stmt bran cond sub pod time code
1             package Tak::Daemon::ListenerService;
2              
3 1     1   33676 use Scalar::Util qw(weaken);
  1         2  
  1         138  
4 1     1   3116 use Moo;
  1         18726  
  1         10  
5              
6             with 'Tak::Role::Service';
7              
8             has listen_on => (is => 'ro', required => 1);
9             has router => (is => 'ro', required => 1);
10              
11             has state => (is => 'rw', default => sub { 'down' }, init_arg => undef);
12              
13             has _start_in_progress => (is => 'lazy', clearer => '_clear_start_in_progress');
14              
15             has listener => (is => 'rw', clearer => 'clear_listener');
16              
17             has connections => (is => 'ro', default => sub { {} });
18              
19             sub start_start_request {
20 0     0     my ($self, $req) = @_;
21 0 0         $req->result('already_started') if $self->state eq 'running';
22 0           push(@{$self->_start_in_progress->{requests}}, $req);
  0            
23 0           $self->_start_in_progress->{start}();
24             }
25              
26             sub _build__start_in_progress {
27 0     0     my ($self) = @_;
28 0           weaken($self);
29 0           my %start = (requests => (my $requests = []));
30 0           my $listen_on = $self->listen_on;
31 0 0         my %addr = (
32             socktype => "stream",
33             map +(
34             ref($_)
35             ? (family => "inet", %$_)
36             : (family => "unix", path => $_)
37             ), $listen_on
38             );
39             $start{start} = sub {
40 0     0     $self->state('starting');
41 0           Tak->loop_upgrade;
42             Tak->loop->listen(
43             addr => \%addr,
44             on_notifier => sub {
45 0           $self->listener($_[0]);
46 0           $_->success('started') for @$requests;
47 0           $self->_clear_start_in_progress;
48 0           $self->state('running');
49             },
50             on_resolve_error => sub { # no-op until we add non-unix
51 0           $_->failure(resolve => @_) for @$requests;
52 0           $self->_clear_start_in_progress;
53 0           $self->state('stopped');
54             },
55             on_listen_error => sub {
56 0           $_->failure(listen => @_) for @$requests;
57 0           $self->_clear_start_in_progress;
58 0           $self->state('stopped');
59             },
60             on_accept => sub {
61 0           $self->setup_connection($_[0]);
62             },
63             on_accept_error => sub {
64 0           $self->handle_stop;
65             },
66 0           );
67 0           $start{start} = sub {}; # delete yourself
  0            
68 0           };
69 0           \%start;
70             }
71              
72             sub handle_stop {
73 0     0     my ($self) = @_;
74 0 0         return 'already_stopped' if $self->state eq 'down';
75             # there's probably something more intelligent to do here, but meh
76 0 0         die failure => 'starting' if $self->state eq 'starting';
77 0           Tak->loop->remove($self->clear_listener);
78 0   0       !ref and unlink($_) for $self->listen_on;
79 0           $self->state('down');
80 0           return 'stopped';
81             }
82              
83             sub DEMOLISH {
84 0     0     my ($self, $in_global_destruction) = @_;
85              
86 0 0         return unless $self->state eq 'running';
87              
88 0   0       !ref and unlink($_) for $self->listen_on;
89              
90 0 0         return if $in_global_destruction;
91              
92 0           Tak->loop->remove($self->listener);
93             }
94              
95             sub setup_connection {
96 0     0     my ($self, $socket) = @_;
97 0           my $conn_set = $self->connections;
98 0           my $conn_str;
99             my $connection = Tak::ConnectionService->new(
100             read_fh => $socket, write_fh => $socket,
101             listening_service => $self->router->clone_or_self,
102 0     0     on_close => sub { delete $conn_set->{$conn_str} }
103 0           );
104 0           $conn_str = "$connection";
105 0           $connection->receiver->service->service->register_weak(remote => $connection);
106 0           $conn_set->{$conn_str} = $connection;
107 0           return;
108             }
109              
110             1;