File Coverage

blib/lib/MogileFS/IOStatWatcher.pm
Criterion Covered Total %
statement 27 92 29.3
branch 0 16 0.0
condition 0 6 0.0
subroutine 9 24 37.5
pod 3 7 42.8
total 39 145 26.9


line stmt bran cond sub pod time code
1             package MogileFS::IOStatWatcher;
2 21     21   83 use strict;
  21         100  
  21         827  
3 21     21   87 use Sys::Syscall 0.22; # We use it indirectly, and trigger bugs in earlier versions.
  21         436  
  21         724  
4 21     21   91 use Danga::Socket;
  21         28  
  21         296  
5 21     21   93 use IO::Socket::INET;
  21         25  
  21         303  
6              
7             =head1 Methods
8              
9             =head2 $iow = MogileFS::IOStatWatcher->new()
10              
11             Returns a new IOStatWatcher object.
12              
13             =cut
14              
15             sub new {
16 0     0 1   my ($class) = @_;
17 0           my $self = bless {
18             hosts => {},
19             }, $class;
20 0           $self->on_stats; # set an empty handler.
21 0           return $self;
22             }
23              
24             =head2 $iow->set_hosts( host1 [, host2 [, ...] ] )
25              
26             Sets the list of hosts to connect to for collecting IOStat information. This call can block if you
27             pass it hostnames instead of ip addresses.
28              
29             Upon successful connection, the on_stats callback will be called each time the statistics are
30             collected. Error states (failed connections, etc.) will trigger retries on 60 second intervals, and
31             disconnects will trigger an immediate reconnect.
32              
33             =cut
34              
35             sub set_hosts {
36 0     0 1   my ($self, @ips) = @_;
37 0           my $old_hosts = $self->{hosts};
38 0           my $new_hosts = {};
39 0           foreach my $host (@ips) {
40 0   0       $new_hosts->{$host} = (delete $old_hosts->{$host}) || MogileFS::IOStatWatch::Client->new($host, $self);
41             }
42             # TODO: close hosts that were removed (things in %$old_hosts)
43 0           $self->{hosts} = $new_hosts;
44             }
45              
46             =head2 $iow->on_stats( coderef )
47              
48             Sets the coderef called for the C callback.
49              
50             =cut
51              
52             sub on_stats {
53 0     0 1   my ($self, $cb) = @_;
54              
55 0 0         unless (ref $cb eq 'CODE') {
56 0     0     $cb = sub {};
  0            
57             }
58              
59 0           $self->{on_stats} = $cb;
60             }
61              
62             =head1 Callbacks
63              
64             =head2 on_stats->( host, stats )
65              
66             Called each time device use statistics are collected. The C
67             argument is the value passed in to the C method. The
68             C object is a hashref of mogile device numbers (without leading
69             "dev") to their corresponding utilization percentages.
70              
71             =cut
72              
73             # Everything beyond here is internal.
74              
75             sub got_stats {
76 0     0 0   my ($self, $host, $stats) = @_;
77 0           $self->{on_stats}->($host, $stats);
78             }
79              
80             sub restart_monitoring_if_needed {
81 0     0 0   my ($self, $host) = @_;
82 0 0 0       return unless $self->{hosts}->{$host} && $self->{hosts}->{$host}->{closed};
83 0           $self->{hosts}->{$host} = MogileFS::IOStatWatch::Client->new($host, $self);
84             }
85              
86             sub got_error {
87 0     0 0   my ($self, $host) = @_;
88             Danga::Socket->AddTimer(60, sub {
89 0     0     $self->restart_monitoring_if_needed($host);
90 0           });
91             }
92              
93             sub got_disconnect {
94 0     0 0   my ($self, $host) = @_;
95 0           $self->{hosts}->{$host} = MogileFS::IOStatWatch::Client->new($host, $self);
96             }
97              
98             # Support class that does the communication with individual hosts.
99             package MogileFS::IOStatWatch::Client;
100 21     21   18619 use Socket qw(SO_KEEPALIVE);
  21         121  
  21         879  
101              
102 21     21   88 use strict;
  21         26  
  21         557  
103 21     21   1552 use warnings;
  21         35  
  21         665  
104              
105 21     21   94 use base 'Danga::Socket';
  21         24  
  21         3060  
106 21     21   107 use fields qw(host watcher buffer active);
  21         26  
  21         114  
107              
108             sub new {
109 0     0     my MogileFS::IOStatWatch::Client $self = shift;
110 0           my $hostspec = shift;
111 0           my $watcher = shift;
112              
113 0           my $sock = IO::Socket::INET->new(
114             PeerAddr => $hostspec,
115             PeerPort => MogileFS->config("mogstored_stream_port"),
116             Proto => 'tcp',
117             Blocking => 0,
118             );
119 0 0         return unless $sock;
120              
121 0           $sock->sockopt(SO_KEEPALIVE, 1);
122 0 0         $self = fields::new($self) unless ref $self;
123 0           $self->SUPER::new($sock);
124 0           $self->watch_write(1);
125 0           $self->watch_read(1);
126              
127 0           $self->{watcher} = $watcher;
128 0           $self->{buffer} = '';
129 0           $self->{host} = $hostspec;
130              
131 0           return $self;
132             }
133              
134             sub event_write {
135 0     0     my MogileFS::IOStatWatch::Client $self = shift;
136 0           $self->{active} = 1;
137 0           $self->write("watch\n");
138 0           $self->watch_write(0); # I hope I can safely assume that 6 characters will write properly.
139             }
140              
141             sub event_read {
142 0     0     my MogileFS::IOStatWatch::Client $self = shift;
143              
144 0           my $bref = $self->read(10240);
145 0 0         return $self->close unless defined $bref;
146              
147 0           $self->{buffer} .= $$bref;
148              
149 0 0         if ($self->{buffer} =~ m/^ERR\s+(.*?)\s* $ /x) {
150             # There was an error on the way to watching this machine, close it and stay quiet.
151 0           $self->close;
152             }
153              
154             # If we can yank off lines till there is one by itself with a . on it, we've gotten a full set of stats.
155 0           while ($self->{buffer} =~ s/^(.*?\n)?\.\n//s) {
156 0           my %stats;
157 0           foreach my $line (split /\n+/, $1) {
158 0 0         next unless $line;
159 0           my ($devnum, $util) = split /\s+/, $line;
160 0           $stats{$devnum} = $util;
161             }
162 0           $self->{watcher}->got_stats($self->{host}, \%stats);
163             }
164             }
165              
166             sub event_err {
167 0     0     my MogileFS::IOStatWatch::Client $self = shift;
168 0           $self->{watcher}->got_error($self->{host});
169             }
170              
171             sub event_hup {
172 0     0     my MogileFS::IOStatWatch::Client $self = shift;
173 0           $self->{watcher}->got_error($self->{host});
174             }
175              
176             sub close {
177 0     0     my MogileFS::IOStatWatch::Client $self = shift;
178 0 0         if ($self->{active}) {
179 0           $self->{watcher}->got_disconnect($self->{host});
180             } else {
181 0           $self->{watcher}->got_error($self->{host});
182             }
183 0           $self->SUPER::close(@_);
184             }
185             1;
186