File Coverage

blib/lib/Log/Syslog/DangaSocket/Socket.pm
Criterion Covered Total %
statement 30 88 34.0
branch 0 16 0.0
condition 0 8 0.0
subroutine 10 19 52.6
pod 4 6 66.6
total 44 137 32.1


line stmt bran cond sub pod time code
1             package Log::Syslog::DangaSocket::Socket;
2              
3 2     2   16 use strict;
  2         4  
  2         257  
4 2     2   10 use warnings;
  2         4  
  2         49  
5              
6 2     2   11 use Carp;
  2         3  
  2         177  
7 2     2   7005 use IO::Socket::INET;
  2         29900  
  2         16  
8 2     2   1480 use IO::Socket::UNIX;
  2         4  
  2         27  
9 2     2   4451 use POSIX 'strftime';
  2         17807  
  2         18  
10 2     2   2480 use Socket qw(SOL_SOCKET SO_ERROR);
  2         5  
  2         625  
11              
12 2     2   11 use base 'Danga::Socket';
  2         2  
  2         2483  
13              
14             use fields (
15 2         16 'err_handler', # subref to call on error
16             'connecting', # connect timer object before connected, undef afterwards
17             'queue', # messages which haven't been fully sent
18 2     2   45632 );
  2         7  
19              
20             our $CONNECT_TIMEOUT = 1;
21 2     2   177 use constant DEBUG => 0;
  2         5  
  2         2143  
22              
23             # $class->new($proto, $host, $port, $err_handler, $messages)
24             # $err_handler callback will be called with an arrayref of any unsent data
25             # optional $messages should be arrayref of stringrefs
26             sub new {
27 0     0 1   my $ref = shift;
28 0   0       my $class = ref $ref || $ref;
29              
30 0           my Log::Syslog::DangaSocket::Socket $self = fields::new($class);
31              
32             # kick off non-blocking connect
33 0           my $sock;
34 0 0         if ($_[0] eq 'unix') {
35 0           $sock = IO::Socket::UNIX->new(
36             Peer => $_[1],
37             Blocking => 0,
38             );
39             }
40             else {
41 0           $sock = IO::Socket::INET->new(
42             Proto => $_[0],
43             PeerAddr => $_[1],
44             PeerPort => $_[2],
45             Blocking => 0,
46             );
47             }
48              
49 0 0         croak "couldn't create sock: $!" unless $sock;
50              
51 0           $self->SUPER::new($sock);
52              
53 0           $self->{err_handler} = $_[3];
54              
55             # get notified when connect completes
56 0           $self->watch_write(1);
57              
58             # for prompt error notifications
59 0           $self->watch_read(1);
60              
61             # start with initial message queue (probably from reconnect) if present
62 0   0       $self->{queue} = $_[4] || [];
63              
64             $self->{connecting} = Danga::Socket->AddTimer(
65 0     0     $CONNECT_TIMEOUT, sub { $self->close }
66 0           );
67              
68 0           return $self;
69             }
70              
71             sub write_buffered {
72 0     0 0   my Log::Syslog::DangaSocket::Socket $self = shift;
73              
74 0           my $message_ref = shift;
75 0           push @{ $self->{queue} }, $message_ref;
  0            
76              
77 0           DEBUG && warn "queued $$message_ref\n";
78              
79             # flush will happen upon connection
80 0 0         $self->flush_queue unless $self->{connecting};
81             }
82              
83             sub flush_queue {
84 0     0 0   my Log::Syslog::DangaSocket::Socket $self = shift;
85 0           my $queue = $self->{queue};
86              
87 0           my @to_send = @$queue; # copy so shift() below doesn't modify iterated list
88 0           for my $message_ref (@to_send) {
89             # give the message to Danga::Socket...
90 0           $self->write($message_ref);
91 0           DEBUG && warn "wrote '$$message_ref'\n";
92              
93             # but only forget it in the local queue once notified that the write completed
94             $self->write(sub {
95 0     0     shift @$queue;
96 0           DEBUG && warn "completed '$$message_ref'\n";
97 0           });
98             }
99             }
100              
101             sub event_write {
102 0     0 1   my Log::Syslog::DangaSocket::Socket $self = shift;
103 0           DEBUG && warn "entering event_write\n";
104 0 0         if ($self->{connecting}) {
105 0           my $packed_error = getsockopt($self->sock, SOL_SOCKET, SO_ERROR);
106 0           local $! = unpack('I', $packed_error);
107              
108 0 0         if ($! == 0) {
109             # connected
110 0           DEBUG && warn "connected\n";
111 0           $self->{connecting}->cancel;
112 0           $self->{connecting} = undef;
113 0           $self->watch_write(0);
114 0           $self->flush_queue;
115             }
116             else {
117 0           DEBUG && warn "connect error: $!\n";
118 0           $self->close;
119             }
120             }
121 0           $self->SUPER::event_write(@_);
122             }
123              
124             # normally syslogd doesn't send anything back. if an error occurs (like remote
125             # side closing the connection), we'll be notified of eof this way
126             sub event_read {
127 0     0 1   my Log::Syslog::DangaSocket::Socket $self = shift;
128 0           my $read = sysread $self->{sock}, my $buf, 1;
129 0 0 0       $self->close if defined $read && $read == 0; # eof
130             }
131              
132             sub close {
133 0     0 1   my Log::Syslog::DangaSocket::Socket $self = shift;
134 0 0         return if $self->{closed};
135 0           DEBUG && warn "closing\n";
136 0 0         if ($self->{connecting}) {
137             # if we got an error while still trying to connect, back off before trying again
138 0           DEBUG && warn "error while connecting\n";
139             Danga::Socket->AddTimer($CONNECT_TIMEOUT, sub {
140 0     0     DEBUG && warn "retrying connect\n";
141 0           $self->{err_handler}->($self->{queue});
142 0           });
143             }
144             else {
145             # otherwise try to reconnect immediately
146 0           $self->{err_handler}->($self->{queue});
147             }
148 0           $self->SUPER::close(@_);
149             }
150              
151             # close on any error
152             *event_err = \&close;
153             *event_hup = \&close;
154              
155             1;