File Coverage

lib/DR/Tnt/LowLevel/Connector/AE.pm
Criterion Covered Total %
statement 21 52 40.3
branch 0 14 0.0
condition n/a
subroutine 7 14 50.0
pod 0 1 0.0
total 28 81 34.5


line stmt bran cond sub pod time code
1 3     3   4041 use utf8;
  3         5  
  3         19  
2 3     3   81 use strict;
  3         5  
  3         48  
3 3     3   11 use warnings;
  3         6  
  3         88  
4              
5             package DR::Tnt::LowLevel::Connector::AE;
6              
7 3     3   12 use Mouse;
  3         4  
  3         11  
8 3     3   3559 use AnyEvent;
  3         13847  
  3         117  
9 3     3   1393 use AnyEvent::Socket;
  3         67651  
  3         284  
10 3     3   1878 use AnyEvent::Handle;
  3         19618  
  3         1511  
11              
12             extends 'DR::Tnt::LowLevel::Connector';
13              
14              
15             has _fileno => is => 'rw', isa => 'Maybe[Int]';
16             has _handle => is => 'rw', isa => 'Maybe[Object]';
17              
18             sub _connect {
19 0     0     my ($self, $cb) = @_;
20              
21             my $h = tcp_connect
22             $self->host,
23             $self->port,
24             sub {
25 0     0     my ($fh) = @_;
26 0 0         unless ($fh) {
27 0           $cb->(ER_CONNECT => $!);
28 0           return;
29             }
30 0           $self->_fileno(fileno $fh);
31 0           $self->_set_fh(new AnyEvent::Handle
32             fh => $fh,
33             on_read => $self->_on_read,
34             on_error => $self->_on_error,
35             );
36              
37 0           $cb->(OK => 'Connected');
38             }
39 0           ;
40 0           $self->_handle($h);
41 0           return;
42             }
43              
44             before _clean_fh => sub {
45             my ($self) = @_;
46             $self->fh->destroy if $self->fh;
47             $self->_handle(undef);
48             };
49              
50             after _set_fh => sub {
51             my ($self) = @_;
52             if ($self->fh) {
53             $self->_fileno(fileno $self->fh->fh);
54             } else {
55             $self->_fileno(undef);
56             $self->_handle(undef);
57             }
58             };
59              
60             sub _on_read {
61 0     0     my ($self) = @_;
62             sub {
63 0     0     my ($handle) = @_;
64 0 0         return unless $handle;
65              
66             # reconnect artefacts
67 0 0         return unless $self->_fileno;
68 0 0         return unless $self->_fileno == fileno $self->fh->fh;
69              
70 0           $self->rbuf($self->rbuf . $handle->rbuf);
71 0           $handle->{rbuf} = '';
72 0           $self->check_rbuf;
73 0           };
74             }
75              
76             sub _on_error {
77 0     0     my ($self) = @_;
78              
79             sub {
80 0     0     my ($handle, $fatal, $message) = @_;
81 0 0         return unless $fatal;
82            
83             # reconnect artefacts
84 0 0         return unless $self->_fileno;
85 0 0         return unless $self->_fileno == fileno $self->fh->fh;
86              
87 0           $self->socket_error($message);
88             }
89 0           }
90              
91             sub send_pkt {
92 0     0 0   my ($self, $pkt, $cb) = @_;
93              
94 0           $self->fh->push_write($pkt);
95 0           $cb->(OK => 'packet was queued to send');
96 0           return;
97             }
98              
99              
100             __PACKAGE__->meta->make_immutable;