File Coverage

blib/lib/MogileFS/ConnectionPool.pm
Criterion Covered Total %
statement 144 202 71.2
branch 33 74 44.5
condition 17 28 60.7
subroutine 26 32 81.2
pod 0 13 0.0
total 220 349 63.0


line stmt bran cond sub pod time code
1             # a connection pool class with queueing.
2             # (something doesn't sound quite right with that...)
3             # This requires Danga::Socket to drive, but may also function without it
4             # via conn_get/conn_put.
5             package MogileFS::ConnectionPool;
6 21     21   125 use strict;
  21         39  
  21         504  
7 21     21   88 use warnings;
  21         37  
  21         531  
8 21     21   106 use Carp qw(croak confess);
  21         34  
  21         1119  
9 21     21   131 use Time::HiRes;
  21         48  
  21         129  
10              
11 21     21   1582 use constant NEVER => (0xffffffff << 32) | 0xffffffff; # portable version :P
  21         38  
  21         38658  
12              
13             sub new {
14 1     1 0 3 my ($class, $conn_class, $opts) = @_;
15              
16 1   50     4 $opts ||= {};
17             my $self = bless {
18             fdmap => {}, # { fd -> conn }
19             idle => {}, # ip:port -> [ MogileFS::Connection::Poolable, ... ]
20             queue => [], # [ [ ip, port, callback ], ... ]
21             timer => undef, # Danga::Socket::Timer object
22             timeouts => {}, # { fd -> conn }
23             inflight => {}, # ip:port -> { fd -> callback }
24             total_inflight => 0, # number of inflight connections
25             dest_capacity => $opts->{dest_capacity},
26             total_capacity => $opts->{total_capacity},
27 1         11 class => $conn_class,
28             scheduled => 0, # set if we'll start tasks on next tick
29             on_next_tick => [],
30             next_expiry => NEVER,
31             }, $class;
32              
33             # total_capacity=20 matches what we used with LWP
34 1   50     5 $self->{total_capacity} ||= 20;
35              
36             # allow users to specify per-destination capacity limits
37 1   33     5 $self->{dest_capacity} ||= $self->{total_capacity};
38              
39 1         3 return $self;
40             }
41              
42             # retrieves an idle connection for the [IP, port] pair
43             sub _conn_idle_get {
44 15     15   91 my ($self, $ip, $port) = @_;
45              
46 15         51 my $key = "$ip:$port";
47 15 100       150 my $idle = $self->{idle}->{$key} or return;
48              
49             # the Danga::Socket event loop may detect hangups and close sockets,
50             # However not all MFS workers run this event loop, so we need to
51             # validate the connection when retrieving a connection from the pool
52 13         68 while (my $conn = pop @$idle) {
53             # make sure the socket is valid:
54              
55             # due to response callback ordering, we actually place connections
56             # in the pool before invoking the user-supplied response callback
57             # (to allow connections to get reused ASAP)
58 10 50       57 my $sock = $conn->sock or next;
59              
60             # hope this returns EAGAIN, not using OO->sysread here since
61             # Net::HTTP::NB overrides that and we _want_ to hit EAGAIN here
62 10         156 my $r = sysread($sock, my $byte, 1);
63              
64             # good, connection is possibly still alive if we got EAGAIN
65 10 50 66     113 return $conn if (!defined $r && $!{EAGAIN});
66              
67 5         28 my $err = $!;
68 5 50       24 if (defined $r) {
69 5 50       19 if ($r == 0) {
70             # a common case and to be expected
71 5         27 $err = "server dropped idle connection";
72             } else {
73             # this is a bug either on our side or the HTTP server
74 0         0 Mgd::error("Bug: unexpected got $r bytes from idle conn to ". $conn->host_port. ") (byte=$byte)");
75             }
76             }
77              
78             # connection is bad, close the socket and move onto the
79             # next idle connection if there is one.
80 5         41 $conn->close($err);
81             }
82              
83 8         73 return;
84             }
85              
86             # creates a new connection if under capacity
87             # returns undef if we're at capacity (or on EMFILE/ENFILE)
88             sub _conn_new_maybe {
89 10     10   39 my ($self, $ip, $port) = @_;
90 10         34 my $key = "$ip:$port";
91              
92             # we only call this sub if we don't have idle connections, so
93             # we don't check {idle} here
94              
95             # make sure we're not already at capacity for this destination
96 10   100     15 my $nr_inflight = scalar keys %{$self->{inflight}->{$key} ||= {}};
  10         58  
97 10 50       32 return if ($nr_inflight >= $self->{dest_capacity});
98              
99             # see how we're doing with regard to total capacity:
100 10 50       31 if ($self->_total_connections >= $self->{total_capacity}) {
101             # see if we have idle connections for other pools to kill
102 0 0       0 if ($self->{total_inflight} < $self->{total_capacity}) {
103             # we have idle connections to other destinations, drop one of those
104 0         0 $self->_conn_drop_idle;
105             # fall through to creating a new connection
106             } else {
107             # we're at total capacity for the entire pool
108 0         0 return;
109             }
110             }
111              
112             # we're hopefully under capacity if we got here, create a new connection
113 10         53 $self->_conn_new($ip, $port);
114             }
115              
116             # creates new connection and registers it in our fdmap
117             # returns error string if resources (FDs, buffers) aren't available
118             sub _conn_new {
119 10     10   52 my ($self, $ip, $port) = @_;
120              
121             # calls MogileFS::Connection::{HTTP,Mogstored}->new:
122 10         134 my $conn = $self->{class}->new($ip, $port);
123 10 50       42 if ($conn) {
124             # register the connection
125 10         33 $self->{fdmap}->{$conn->fd} = $conn;
126 10         85 $conn->set_pool($self);
127              
128 10         43 return $conn;
129             } else {
130             # EMFILE/ENFILE should never happen as the capacity for this
131             # pool is far under the system defaults. Just give up on
132             # EMFILE/ENFILE like any other error.
133 0         0 return "failed to create socket to $ip:$port ($!)";
134             }
135             }
136              
137             # retrieves a connection, may return undef if at capacity
138             sub _conn_get {
139 15     15   39 my ($self, $ip, $port) = @_;
140              
141             # if we have idle connections, always use them first
142 15 100       84 $self->_conn_idle_get($ip, $port) || $self->_conn_new_maybe($ip, $port);
143             }
144              
145             # Pulls a connection out of the pool for synchronous use.
146             # This may create a new connection (independent of pool limits).
147             # The connection returned by this is _blocking_. This is currently
148             # only used by replicate.
149             sub conn_get {
150 0     0 0 0 my ($self, $ip, $port) = @_;
151 0         0 my $conn = $self->_conn_idle_get($ip, $port);
152              
153 0 0       0 if ($conn) {
154             # in case the connection never comes back, let refcounting close() it:
155 0         0 delete $self->{fdmap}->{$conn->fd};
156             } else {
157 0         0 $conn = $self->_conn_new($ip, $port);
158 0 0       0 unless (ref $conn) {
159 0         0 $! = $conn; # $conn is an error message :<
160 0         0 return;
161             }
162 0         0 delete $self->{fdmap}->{$conn->fd};
163 0         0 my $timeout = MogileFS->config("node_timeout");
164 0 0       0 MogileFS::Util::wait_for_writeability($conn->fd, $timeout) or return;
165             }
166              
167 0         0 return $conn;
168             }
169              
170             # retrieves a connection from the connection pool and executes
171             # inflight_cb on it. If the pool is at capacity, this will queue the task.
172             # This relies on Danga::Socket->EventLoop
173             sub start {
174 15     15 0 64 my ($self, $ip, $port, $inflight_cb) = @_;
175              
176 15         70 my $conn = $self->_conn_get($ip, $port);
177 15 50       153 if ($conn) {
178 15         49 $self->_conn_run($conn, $inflight_cb);
179             } else { # we're too busy right now, queue up
180 0         0 $self->enqueue($ip, $port, $inflight_cb);
181             }
182             }
183              
184             # returns the total number of connections we have
185             sub _total_connections {
186 10     10   23 my ($self) = @_;
187 10         15 return scalar keys %{$self->{fdmap}};
  10         50  
188             }
189              
190             # marks a connection as no longer inflight, returns the inflight
191             # callback if the connection was active, undef if not
192             sub inflight_cb_expire {
193 23     23 0 57 my ($self, $conn) = @_;
194 23         184 my $inflight_cb = delete $self->{inflight}->{$conn->key}->{$conn->fd};
195 23 100       252 $self->{total_inflight}-- if $inflight_cb;
196              
197 23         271 return $inflight_cb;
198             }
199              
200             # schedules the event loop to dequeue and run a task on the next
201             # tick of the Danga::Socket event loop. Call this
202             # 1) whenever a task is enqueued
203             # 2) whenever a task is complete
204             sub schedule_queued {
205 20     20 0 51 my ($self) = @_;
206              
207             # AddTimer(0) to avoid potential stack overflow
208             $self->{scheduled} ||= Danga::Socket->AddTimer(0, sub {
209 15     15   7018 $self->{scheduled} = undef;
210 15         117 my $queue = $self->{queue};
211              
212 15         112 my $total_capacity = $self->{total_capacity};
213 15         71 my $i = 0;
214              
215 15   33     715 while ($self->{total_inflight} < $total_capacity
216             && $i <= (scalar(@$queue) - 1)) {
217 0         0 my ($ip, $port, $cb) = @{$queue->[$i]};
  0         0  
218              
219 0         0 my $conn = $self->_conn_get($ip, $port);
220 0 0       0 if ($conn) {
221 0         0 splice(@$queue, $i, 1); # remove from queue
222 0         0 $self->_conn_run($conn, $cb);
223             } else {
224             # this queue object cannot be dequeued, skip it for now
225 0         0 $i++;
226             }
227             }
228 20   66     315 });
229             }
230              
231             # Call this when done using an (inflight) connection
232             # This possibly places a connection in the connection pool.
233             # This will close the connection of the pool is already at capacity.
234             # This will also start the next queued callback, or retry if needed
235             sub conn_persist {
236 11     11 0 38 my ($self, $conn) = @_;
237              
238             # schedule the next request if we're done with any connection
239 11         46 $self->schedule_queued;
240 11         429 $self->conn_put($conn);
241             }
242              
243             # The opposite of conn_get, this returns a connection retrieved with conn_get
244             # back to the connection pool, making it available for future use. Dead
245             # connections are not stored.
246             # This is currently only used by replicate.
247             sub conn_put {
248 11     11 0 32 my ($self, $conn) = @_;
249              
250 11         39 my $key = $conn->key;
251             # we do not store dead connections
252 11         99 my $peer_addr = $conn->peer_addr_string;
253              
254 11 50       600 if ($peer_addr) {
255             # connection is still alive, respect capacity limits
256 11   100     66 my $idle = $self->{idle}->{$key} ||= [];
257              
258             # register it in the fdmap just in case:
259 11         54 $self->{fdmap}->{$conn->fd} = $conn;
260              
261 11 50       126 if ($self->_dest_total($conn) < $self->{dest_capacity}) {
262 11         59 $conn->mark_idle;
263 11         31 push @$idle, $conn; # yay, connection is reusable
264 11         41 $conn->set_timeout(undef); # clear timeout
265 11         65 return 1; # success
266             }
267             }
268              
269             # we have too many connections or the socket is dead, caller
270             # should close after returning from this function.
271 0         0 return 0;
272             }
273              
274             # enqueues a request (inflight_cb) and schedules it to run ASAP
275             # This must be used with Danga::Socket->EventLoop
276             sub enqueue {
277 0     0 0 0 my ($self, $ip, $port, $inflight_cb) = @_;
278              
279 0         0 push @{$self->{queue}}, [ $ip, $port, $inflight_cb ];
  0         0  
280              
281             # we have something in the queue, make sure it's run soon
282 0         0 $self->schedule_queued;
283             }
284              
285             # returns the total connections to the host of a given connection
286             sub _dest_total {
287 11     11   43 my ($self, $conn) = @_;
288 11         38 my $key = $conn->key;
289 11         27 my $inflight = scalar keys %{$self->{inflight}->{$key}};
  11         40  
290 11         20 my $idle = scalar @{$self->{idle}->{$key}};
  11         31  
291 11         51 return $idle + $inflight;
292             }
293              
294             # only call this from the event_hup/event_err callbacks used by Danga::Socket
295             sub conn_drop {
296 0     0 0 0 my ($self, $conn, $close_reason) = @_;
297              
298 0         0 my $fd = $conn->fd;
299 0         0 my $key = $conn->key;
300              
301             # event_read must handle errors anyways, so hand off
302             # error handling to the event_read callback if inflight.
303 0 0       0 return $conn->event_read if $self->{inflight}->{$key}->{$fd};
304              
305             # we get here if and only if the socket is idle, we can drop it ourselves
306             # splice out the socket we're closing from the idle pool
307 0         0 my $idle = $self->{idle}->{$key};
308 0         0 foreach my $i (0 .. (scalar(@$idle) - 1)) {
309 0         0 my $old = $idle->[$i];
310 0 0       0 if ($old->sock) {
311 0 0       0 if ($old->fd == $fd) {
312 0         0 splice(@$idle, $i, 1);
313 0         0 $conn->close($close_reason);
314 0         0 return;
315             }
316             } else {
317             # some connections may have expired but not been spliced out, yet
318             # splice it out here since we're iterating anyways
319 0         0 splice(@$idle, $i, 1);
320             }
321             }
322             }
323              
324             # unregisters and prepares connection to be closed
325             # Returns the inflight callback if there was one
326             sub conn_close_prepare {
327 9     9 0 27 my ($self, $conn, $close_reason) = @_;
328              
329 9 50       38 if ($conn->sock) {
330 9         123 my $fd = $conn->fd;
331              
332 9         113 my $valid = delete $self->{fdmap}->{$fd};
333 9         39 delete $self->{timeouts}->{$fd};
334              
335 9         42 my $inflight_cb = $self->inflight_cb_expire($conn);
336              
337             # $valid may be undef in replicate worker which removes connections
338             # from fdmap. However, valid==undef connections should never have
339             # an inflight_cb
340 9 50 66     33 if ($inflight_cb && !$valid) {
341 0         0 croak("BUG: dropping unregistered conn with callback: $conn");
342             }
343 9         26 return $inflight_cb;
344             }
345             }
346              
347             # schedules cb to run on the next tick of the event loop,
348             # (immediately after this tick runs)
349             sub on_next_tick {
350 1     1 0 10 my ($self, $cb) = @_;
351 1         7 my $on_next_tick = $self->{on_next_tick};
352 1         7 push @$on_next_tick, $cb;
353              
354 1 50       7 if (scalar(@$on_next_tick) == 1) {
355             Danga::Socket->AddTimer(0, sub {
356             # prevent scheduled callbacks from being called on _this_ tick
357 1     1   5 $on_next_tick = $self->{on_next_tick};
358 1         4 $self->{on_next_tick} = [];
359              
360 1         9 while (my $sub = shift @$on_next_tick) {
361 1         8 $sub->()
362             }
363 1         17 });
364             }
365             }
366              
367             # marks a connection inflight and invokes cb on it
368             # $conn may be a error string, in which case we'll invoke the user-supplied
369             # callback with a mock error (this mimics how LWP fakes an HTTP response
370             # even if the socket could not be created/connected)
371             sub _conn_run {
372 15     15   41 my ($self, $conn, $cb) = @_;
373              
374 15 50       47 if (ref $conn) {
375 15   50     52 my $inflight = $self->{inflight}->{$conn->key} ||= {};
376 15         49 $inflight->{$conn->fd} = $cb; # stash callback for retrying
377 15         85 $self->{total_inflight}++;
378 15         50 $cb->($conn);
379             } else {
380             # fake an error message on the response callback
381             $self->on_next_tick(sub {
382             # fatal error creating the socket, do not queue
383 0     0   0 my $mfs_err = $conn;
384 0         0 $self->{class}->new_err($mfs_err, $cb);
385              
386             # onto the next request
387 0         0 $self->schedule_queued;
388 0         0 });
389             }
390             }
391              
392             # drops an idle connection from the idle connection pool (so we can open
393             # another socket without incurring out-of-FD errors)
394             # Only call when you're certain there's a connection to drop
395             # XXX This is O(destinations), unfortunately
396             sub _conn_drop_idle {
397 0     0   0 my ($self) = @_;
398 0         0 my $idle = $self->{idle};
399              
400 0         0 foreach my $val (values %$idle) {
401 0 0       0 my $conn = shift @$val or next;
402              
403 0 0       0 $conn->close("idle_expire") if $conn->sock;
404 0         0 return;
405             }
406              
407 0         0 confess("BUG: unable to drop an idle connection");
408             }
409              
410             # checks for expired connections, this can be expensive if there
411             # are many concurrent connections waiting on timeouts, but still
412             # better than having AddTimer create a Danga::Socket::Timer object
413             # every time a timeout is reset.
414             sub check_timeouts {
415 6     6 0 45 my ($self) = @_;
416 6         57 my $timeouts = $self->{timeouts};
417 6         72 my @fds = keys %$timeouts;
418 6         34 my $next_expiry = NEVER;
419 6         45 my $now = Time::HiRes::time();
420              
421             # this is O(n) where n is concurrent connections
422 6         45 foreach my $fd (@fds) {
423 6         22 my $conn = $timeouts->{$fd};
424 6 100       81 if ($conn->expired($now)) {
425 2         28 delete $timeouts->{$fd};
426             } else {
427             # look for the next timeout
428 4         38 my $expiry = $conn->expiry;
429 4 50       18 if ($expiry) {
430 4 50       18 $next_expiry = $expiry if $expiry < $next_expiry;
431             } else {
432             # just in case, this may not happen...
433 0         0 delete $timeouts->{$fd};
434             }
435             }
436             }
437              
438             # schedule the wakeup for the next timeout
439 6 100       32 if ($next_expiry == NEVER) {
440 2         7 $self->{timer} = undef;
441             } else {
442 4         36 my $timeout = $next_expiry - $now;
443 4 50       10 $timeout = 0 if $timeout <= 0;
444             $self->{timer} = Danga::Socket->AddTimer($timeout, sub {
445 4     4   1104017 $self->check_timeouts;
446 4         86 });
447             }
448 6         232 $self->{next_expiry} = $next_expiry;
449             }
450              
451             # registers a timeout for a given connection, each connection may only
452             # have one pending timeout. Timeout may be undef to cancel the current
453             # timeout.
454             sub register_timeout {
455 42     42 0 169 my ($self, $conn, $timeout) = @_;
456              
457 42 50       114 if ($conn->sock) {
    0          
458 42         282 my $fd = $conn->fd;
459 42 100       237 if ($timeout) {
460 31         125 $self->{timeouts}->{$fd} = $conn;
461 31         97 my $next_expiry = $self->{next_expiry};
462 31         57 my $old_timer = $self->{timer};
463 31         81 my $expiry = $timeout + Time::HiRes::time();
464              
465 31 100 66     256 if (!$old_timer || $expiry < $next_expiry) {
466 3         6 $self->{next_expiry} = $expiry;
467             $self->{timer} = Danga::Socket->AddTimer($timeout, sub {
468 2     2   1554933 $self->check_timeouts;
469 3         28 });
470 3 50       90 $old_timer->cancel if $old_timer;
471             }
472             } else {
473 11         73 delete $self->{timeouts}->{$fd};
474             }
475             } elsif ($timeout) { # this may never happen...
476             # no FD, so we must allocate a new Danga::Socket::Timer object
477             # add 1msec to avoid FP rounding problems leading to missed
478             # expiration when calling conn->expired
479 0     0     Danga::Socket->AddTimer($timeout + 0.001, sub { $conn->expired });
  0            
480             }
481             }
482              
483             1;