File Coverage

blib/lib/MogileFS/Connection/Client.pm
Criterion Covered Total %
statement 21 127 16.5
branch 0 54 0.0
condition 0 3 0.0
subroutine 7 21 33.3
pod 7 12 58.3
total 35 217 16.1


line stmt bran cond sub pod time code
1             # A client is a user connection for sending requests to us. Requests
2             # can either be normal user requests to be sent to a QueryWorker
3             # or management requests that start with a !.
4              
5             package MogileFS::Connection::Client;
6              
7 21     21   113 use strict;
  21         33  
  21         462  
8 21     21   9833 use Danga::Socket ();
  21         292687  
  21         507  
9 21     21   109 use base qw{Danga::Socket};
  21         37  
  21         3664  
10 21     21   126 use IO::Handle;
  21         70  
  21         721  
11 21     21   93 use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
  21         33  
  21         97  
12 21     21   3041 use MogileFS::Util qw(error);
  21         61  
  21         843  
13              
14 21     21   112 use fields qw{read_buf pipelined};
  21         30  
  21         99  
15              
16             my %SLOW_WRITERS = ();
17             my $EXPTIME = 120;
18             my $PIPELINE = [];
19              
20             sub Reset {
21 0     0 1   %SLOW_WRITERS = ();
22 0           $PIPELINE = [];
23             }
24              
25             sub WriterWatchDog {
26 0     0 0   my $dmap = Danga::Socket->DescriptorMap;
27 0           my $old = clock_gettime(CLOCK_MONOTONIC) - $EXPTIME;
28 0           foreach my $fd (keys %SLOW_WRITERS) {
29 0           my $last_write = $SLOW_WRITERS{$fd};
30 0 0         next if $last_write > $old;
31              
32 0 0         if (my $ds = $dmap->{$fd}) {
33 0           error('write timeout expired: '.$ds->as_string);
34 0           $ds->close;
35             } else {
36 0           error("fd=$fd not known to Danga::Socket(!?), ignoring");
37             }
38 0           delete $SLOW_WRITERS{$fd};
39             }
40             }
41              
42             sub ProcessPipelined {
43 0     0 0   my $run = $PIPELINE;
44 0           $PIPELINE = [];
45 0           foreach my MogileFS::Connection::Client $clref (@$run) {
46 0           $clref->{pipelined} = undef;
47 0 0         $clref->process_request or $clref->watch_read(1);
48             }
49             }
50              
51             sub new {
52 0     0 1   my $self = shift;
53 0 0         $self = fields::new($self) unless ref $self;
54 0           $self->SUPER::new( @_ );
55 0           IO::Handle::blocking($self->{sock}, 0);
56 0           delete $SLOW_WRITERS{$self->{fd}};
57 0           $self->watch_read(1);
58 0           return $self;
59             }
60              
61             # Client
62              
63             sub process_request {
64 0     0 0   my MogileFS::Connection::Client $self = shift;
65              
66 0           while ($self->{read_buf} =~ s/^(.*?)\r?\n//) {
67 0 0         next unless length $1;
68 0           $self->handle_request($1);
69 0           return 1;
70             }
71 0           0;
72             }
73              
74             sub event_read {
75 0     0 1   my MogileFS::Connection::Client $self = shift;
76              
77 0           my $bref = $self->read(1024);
78 0 0         return $self->close unless defined $bref;
79 0           $self->{read_buf} .= $$bref;
80 0 0         if ($self->process_request) {
81 0           $self->watch_read(0);
82             }
83             }
84              
85             sub write {
86 0     0 1   my MogileFS::Connection::Client $self = shift;
87 0           my $done = $self->SUPER::write(@_);
88 0           my $fd = $self->{fd};
89 0 0         if ($done) {
90 0 0         if (defined $fd) {
91 0           delete $SLOW_WRITERS{$fd};
92 0 0         unless ($self->{pipelined}) {
93 0           $self->{pipelined} = 1;
94 0           push @$PIPELINE, $self;
95             }
96             }
97             } else {
98             # stop reading if we can't write, otherwise we'll OOM
99 0 0         if (defined $fd) {
100 0           $SLOW_WRITERS{$fd} = clock_gettime(CLOCK_MONOTONIC);
101 0           $self->watch_read(0);
102             }
103             }
104 0           $done;
105             }
106              
107             sub handle_request {
108 0     0 0   my ($self, $line) = @_;
109              
110             # if it's just 'help', 'h', '?', or something, do that
111             #if ((substr($line, 0, 1) eq '?') || ($line eq 'help')) {
112             # MogileFS::ProcManager->SendHelp($_[1]);
113             # return;
114             #}
115              
116 0 0         if ($line =~ /^!(\S+)(?:\s+(.+))?$/) {
117 0           my ($cmd, $args) = ($1, $2);
118 0           return $self->handle_admin_command($cmd, $args);
119             }
120              
121 0           MogileFS::ProcManager->EnqueueCommandRequest($line, $self);
122             }
123              
124             sub handle_admin_command {
125 0     0 0   my ($self, $cmd, $args) = @_;
126              
127 0           my @out;
128 0 0 0       if ($cmd =~ /^stats$/) {
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
129             # print out some stats on the queues
130 0           my $uptime = time() - MogileFS::ProcManager->server_starttime;
131 0           my $ccount = MogileFS::ProcManager->PendingQueryCount;
132 0           my $wcount = MogileFS::ProcManager->BoredQueryWorkerCount;
133 0           my $ipcount = MogileFS::ProcManager->QueriesInProgressCount;
134 0           my $stats = MogileFS::ProcManager->StatsHash;
135             push @out, "uptime $uptime",
136             "pending_queries $ccount",
137             "processing_queries $ipcount",
138             "bored_queryworkers $wcount",
139 0           map { "$_ $stats->{$_}" } sort keys %$stats;
  0            
140              
141             } elsif ($cmd =~ /^shutdown/) {
142 0           print "User requested shutdown: $args\n";
143 0           kill 15, $$; # kill us, that kills our kids
144              
145             } elsif ($cmd =~ /^jobs/) {
146             # dump out a list of running jobs and pids
147             MogileFS::ProcManager->foreach_job(sub {
148 0     0     my ($job, $ct, $desired, $pidlist) = @_;
149 0           push @out, "$job count $ct";
150 0           push @out, "$job desired $desired";
151 0           push @out, "$job pids " . join(' ', @$pidlist);
152 0           });
153              
154             } elsif ($cmd =~ /^want/) {
155             # !want
156             # set the new desired staffing level for a class
157 0 0         if ($args =~ /^(\d+)\s+(\S+)/) {
158 0           my ($count, $job) = ($1, $2);
159              
160 0 0         $count = 500 if $count > 500;
161              
162             # now make sure it's a real job
163 0 0         if (MogileFS::ProcManager->is_monitor_good) {
164 0 0         if (MogileFS::ProcManager->is_valid_job($job)) {
165 0           MogileFS::ProcManager->request_job_process($job, $count);
166 0           push @out, "Now desiring $count children doing '$job'.";
167             } else {
168 0           my $classes = join(", ", MogileFS::ProcManager->valid_jobs);
169 0           push @out, "ERROR: Invalid class '$job'. Valid classes: $classes";
170             }
171             } else {
172 0           push @out, "ERROR: Monitor has not completed initial run yet\n";
173             }
174             } else {
175 0           push @out, "ERROR: usage: !want ";
176             }
177              
178             } elsif ($cmd =~ /^to/) {
179             # !to
180             # sends to all children of
181 0 0         if ($args =~ /^(\S+)\s+(.+)/) {
182 0           my $ct = MogileFS::ProcManager->ImmediateSendToChildrenByJob($1, $2);
183 0           push @out, "Message sent to $ct children.";
184              
185             } else {
186 0           push @out, "ERROR: usage: !to ";
187             }
188              
189             } elsif ($cmd =~ /^queue/ || $cmd =~ /^pend/) {
190             MogileFS::ProcManager->foreach_pending_query(sub {
191 0     0     my ($client, $query) = @_;
192 0           push @out, $query;
193 0           });
194              
195             } elsif ($cmd =~ /^watch/) {
196 0 0         if (MogileFS::ProcManager->RemoveErrorWatcher($self)) {
197 0           push @out, "Removed you from watcher list.";
198             } else {
199 0           MogileFS::ProcManager->AddErrorWatcher($self);
200 0           push @out, "Added you to watcher list.";
201             }
202              
203             } elsif ($cmd =~ /^recent/) {
204             # show the most recent N queries
205 0           push @out, MogileFS::ProcManager->RecentQueries;
206              
207             } elsif ($cmd =~ /^version/) {
208             # show the most recent N queries
209 0           push @out, $MogileFS::Server::VERSION;
210              
211             } else {
212 0           MogileFS::ProcManager->SendHelp($self, $args);
213             }
214              
215 0           push @out, '.', '';
216 0           $self->write(join("\r\n", @out));
217 0           return;
218             }
219              
220             # Client
221 0     0 1   sub event_err { my $self = shift; $self->close; }
  0            
222 0     0 1   sub event_hup { my $self = shift; $self->close; }
  0            
223              
224             # just note that we've died
225             sub close {
226             # mark us as being dead
227 0     0 1   my $self = shift;
228 0           MogileFS::ProcManager->NoteDeadClient($self);
229 0           $self->SUPER::close(@_);
230             }
231              
232             1;
233              
234             # Local Variables:
235             # mode: perl
236             # c-basic-indent: 4
237             # indent-tabs-mode: nil
238             # End: