File Coverage

blib/lib/MogileFS/IOStatWatcher.pm
Criterion Covered Total %
statement 24 88 27.2
branch 0 16 0.0
condition 0 6 0.0
subroutine 8 23 34.7
pod 3 7 42.8
total 35 140 25.0


line stmt bran cond sub pod time code
1             package MogileFS::IOStatWatcher;
2 21     21   119 use strict;
  21         54  
  21         913  
3 21     21   123 use Sys::Syscall 0.22; # We use it indirectly, and trigger bugs in earlier versions.
  21         545  
  21         823  
4 21     21   121 use Danga::Socket;
  21         43  
  21         398  
5 21     21   204 use IO::Socket::INET;
  21         47  
  21         345  
6              
7             =head1 Methods
8              
9             =head2 $iow = MogileFS::IOStatWatcher->new()
10              
11             Returns a new IOStatWatcher object.
12              
13             =cut
14              
15             sub new {
16 0     0 1   my ($class) = @_;
17 0           my $self = bless {
18             hosts => {},
19             }, $class;
20 0           $self->on_stats; # set an empty handler.
21 0           return $self;
22             }
23              
24             =head2 $iow->set_hosts( host1 [, host2 [, ...] ] )
25              
26             Sets the list of hosts to connect to for collecting IOStat information. This call can block if you
27             pass it hostnames instead of ip addresses.
28              
29             Upon successful connection, the on_stats callback will be called each time the statistics are
30             collected. Error states (failed connections, etc.) will trigger retries on 60 second intervals, and
31             disconnects will trigger an immediate reconnect.
32              
33             =cut
34              
35             sub set_hosts {
36 0     0 1   my ($self, @ips) = @_;
37 0           my $old_hosts = $self->{hosts};
38 0           my $new_hosts = {};
39 0           foreach my $host (@ips) {
40 0   0       $new_hosts->{$host} = (delete $old_hosts->{$host}) || MogileFS::IOStatWatch::Client->new($host, $self);
41             }
42             # TODO: close hosts that were removed (things in %$old_hosts)
43 0           $self->{hosts} = $new_hosts;
44             }
45              
46             =head2 $iow->on_stats( coderef )
47              
48             Sets the coderef called for the C callback.
49              
50             =cut
51              
52             sub on_stats {
53 0     0 1   my ($self, $cb) = @_;
54              
55 0 0         unless (ref $cb eq 'CODE') {
56 0     0     $cb = sub {};
  0            
57             }
58              
59 0           $self->{on_stats} = $cb;
60             }
61              
62             =head1 Callbacks
63              
64             =head2 on_stats->( host, stats )
65              
66             Called each time device use statistics are collected. The C
67             argument is the value passed in to the C method. The
68             C object is a hashref of mogile device numbers (without leading
69             "dev") to their corresponding utilization percentages.
70              
71             =cut
72              
73             # Everything beyond here is internal.
74              
75             sub got_stats {
76 0     0 0   my ($self, $host, $stats) = @_;
77 0           $self->{on_stats}->($host, $stats);
78             }
79              
80             sub restart_monitoring_if_needed {
81 0     0 0   my ($self, $host) = @_;
82 0 0 0       return unless $self->{hosts}->{$host} && $self->{hosts}->{$host}->{closed};
83 0           $self->{hosts}->{$host} = MogileFS::IOStatWatch::Client->new($host, $self);
84             }
85              
86             sub got_error {
87 0     0 0   my ($self, $host) = @_;
88             Danga::Socket->AddTimer(60, sub {
89 0     0     $self->restart_monitoring_if_needed($host);
90 0           });
91             }
92              
93             sub got_disconnect {
94 0     0 0   my ($self, $host) = @_;
95 0           $self->{hosts}->{$host} = MogileFS::IOStatWatch::Client->new($host, $self);
96             }
97              
98             # Support class that does the communication with individual hosts.
99             package MogileFS::IOStatWatch::Client;
100              
101 21     21   28530 use strict;
  21         136  
  21         700  
102 21     21   113 use warnings;
  21         46  
  21         813  
103              
104 21     21   2119 use base 'Danga::Socket';
  21         56  
  21         3987  
105 21     21   209 use fields qw(host watcher buffer active);
  21         34  
  21         145  
106              
107             sub new {
108 0     0     my MogileFS::IOStatWatch::Client $self = shift;
109 0           my $hostspec = shift;
110 0           my $watcher = shift;
111              
112 0           my $sock = IO::Socket::INET->new(
113             PeerAddr => $hostspec,
114             PeerPort => MogileFS->config("mogstored_stream_port"),
115             Proto => 'tcp',
116             Blocking => 0,
117             );
118 0 0         return unless $sock;
119              
120 0 0         $self = fields::new($self) unless ref $self;
121 0           $self->SUPER::new($sock);
122 0           $self->watch_write(1);
123 0           $self->watch_read(1);
124              
125 0           $self->{watcher} = $watcher;
126 0           $self->{buffer} = '';
127 0           $self->{host} = $hostspec;
128              
129 0           return $self;
130             }
131              
132             sub event_write {
133 0     0     my MogileFS::IOStatWatch::Client $self = shift;
134 0           $self->{active} = 1;
135 0           $self->write("watch\n");
136 0           $self->watch_write(0); # I hope I can safely assume that 6 characters will write properly.
137             }
138              
139             sub event_read {
140 0     0     my MogileFS::IOStatWatch::Client $self = shift;
141              
142 0           my $bref = $self->read(10240);
143 0 0         return $self->close unless defined $bref;
144              
145 0           $self->{buffer} .= $$bref;
146              
147 0 0         if ($self->{buffer} =~ m/^ERR\s+(.*?)\s* $ /x) {
148             # There was an error on the way to watching this machine, close it and stay quiet.
149 0           $self->close;
150             }
151              
152             # If we can yank off lines till there is one by itself with a . on it, we've gotten a full set of stats.
153 0           while ($self->{buffer} =~ s/^(.*?\n)?\.\n//s) {
154 0           my %stats;
155 0           foreach my $line (split /\n+/, $1) {
156 0 0         next unless $line;
157 0           my ($devnum, $util) = split /\s+/, $line;
158 0           $stats{$devnum} = $util;
159             }
160 0           $self->{watcher}->got_stats($self->{host}, \%stats);
161             }
162             }
163              
164             sub event_err {
165 0     0     my MogileFS::IOStatWatch::Client $self = shift;
166 0           $self->{watcher}->got_error($self->{host});
167             }
168              
169             sub event_hup {
170 0     0     my MogileFS::IOStatWatch::Client $self = shift;
171 0           $self->{watcher}->got_error($self->{host});
172             }
173              
174             sub close {
175 0     0     my MogileFS::IOStatWatch::Client $self = shift;
176 0 0         if ($self->{active}) {
177 0           $self->{watcher}->got_disconnect($self->{host});
178             } else {
179 0           $self->{watcher}->got_error($self->{host});
180             }
181 0           $self->SUPER::close(@_);
182             }
183             1;
184