File Coverage

blib/lib/MogileFS/Connection/Poolable.pm
Criterion Covered Total %
statement 85 99 85.8
branch 17 30 56.6
condition 8 16 50.0
subroutine 22 27 81.4
pod 4 18 22.2
total 136 190 71.5


line stmt bran cond sub pod time code
1             # private base class for poolable HTTP/Mogstored sidechannel connections
2             # This is currently only used by HTTP, but is intended for Mogstored
3             # connections, too.
4             package MogileFS::Connection::Poolable;
5 21     21   162 use strict;
  21         47  
  21         688  
6 21     21   111 use warnings;
  21         43  
  21         526  
7 21     21   120 use Danga::Socket;
  21         43  
  21         516  
8 21     21   126 use base qw(Danga::Socket);
  21         44  
  21         4188  
9             use fields (
10 21         220 'mfs_pool', # owner of the connection (MogileFS::ConnectionPool)
11             'mfs_hostport', # [ ip, port ]
12             'mfs_expire', # Danga::Socket::Timer object
13             'mfs_expire_cb', # Danga::Socket::Timer callback
14             'mfs_requests', # number of requests made on this object
15             'mfs_err', # used to propagate an error to start()
16 21     21   130 );
  21         49  
17 21     21   2185 use Socket qw(SO_KEEPALIVE);
  21         42  
  21         963  
18 21     21   121 use Time::HiRes;
  21         44  
  21         247  
19              
20             # subclasses (MogileFS::Connection::{HTTP,Mogstored}) must call this sub
21             sub new {
22 10     10 1 63 my ($self, $sock, $ip, $port) = @_;
23 10         113 $self->SUPER::new($sock); # Danga::Socket->new
24              
25             # connection may not be established, yet
26             # so Danga::Socket->peer_addr_string can't be used here
27 10         847 $self->{mfs_hostport} = [ $ip, $port ];
28 10         25 $self->{mfs_requests} = 0;
29              
30 10         25 return $self;
31             }
32              
33             # used by ConnectionPool for tracking per-hostport connection counts
34 75     75 0 214 sub key { join(':', @{$_[0]->{mfs_hostport}}); }
  75         589  
35              
36             # backwards compatibility
37 0     0 0 0 sub host_port { $_[0]->key; }
38              
39 0     0 0 0 sub ip_port { @{$_[0]->{mfs_hostport}}; }
  0         0  
40              
41 110     110 0 407 sub fd { fileno($_[0]->sock); }
42              
43             # marks a connection as idle, call this before putting it in a connection
44             # pool for eventual reuse.
45             sub mark_idle {
46 11     11 0 28 my ($self) = @_;
47              
48 11         129 $self->watch_read(0);
49              
50             # set the keepalive flag the first time we're idle
51 11 100       639 $self->sock->sockopt(SO_KEEPALIVE, 1) if $self->{mfs_requests} == 0;
52              
53 11         482 $self->{mfs_requests}++;
54             }
55              
56             # the request running on this connection is retryable if this socket
57             # has ever been marked idle. The connection pool can never be 100%
58             # reliable for detecting dead sockets, and all HTTP requests made by
59             # MogileFS are idempotent.
60             sub retryable {
61 3     3 0 19 my ($self, $reason) = @_;
62 3   66     127 return ($reason !~ /timeout/ && $self->{mfs_requests} > 0);
63             }
64              
65             # Sets (or updates) the timeout of the connection
66             # timeout_key is "node_timeout" or "conn_timeout"
67             # clears the current timeout if timeout_key is undef
68             sub set_timeout {
69 42     42 0 281 my ($self, $timeout_key) = @_;
70 42         91 my $mfs_pool = $self->{mfs_pool};
71              
72 42 100       108 if ($timeout_key) {
73 31         43 my $timeout;
74              
75 31 50       256 if ($timeout_key =~ /\A[a-z_]+\z/) {
76 31   50     343 $timeout = MogileFS->config($timeout_key) || 2;
77             } else {
78 0         0 $timeout = $timeout_key;
79 0         0 $timeout_key = "timeout";
80             }
81              
82 31         340 my $t0 = Time::HiRes::time();
83 31         121 $self->{mfs_expire} = $t0 + $timeout;
84             $self->{mfs_expire_cb} = sub {
85 2     2   11 my ($now) = @_;
86 2         24 my $elapsed = $now - $t0;
87              
88             # for HTTP, this will fake an HTTP error response like LWP does
89 2         87 $self->err("$timeout_key: $timeout (elapsed: $elapsed)");
90 31         309 };
91 31 50       425 $mfs_pool->register_timeout($self, $timeout) if $mfs_pool;
92             } else {
93 11         55 $self->{mfs_expire} = $self->{mfs_expire_cb} = undef;
94 11 50       273 $mfs_pool->register_timeout($self, undef) if $mfs_pool;
95             }
96             }
97              
98             # returns the expiry time of the connection
99 4     4 0 25 sub expiry { $_[0]->{mfs_expire} }
100              
101             # runs expiry callback and returns true if time is up,
102             # returns false if there is time remaining
103             sub expired {
104 6     6 0 46 my ($self, $now) = @_;
105 6 50       65 my $expire = $self->{mfs_expire} or return 0;
106 6   33     28 $now ||= Time::HiRes::time();
107              
108 6 100       24 if ($now >= $expire) {
109 2         42 my $expire_cb = delete $self->{mfs_expire_cb};
110 2 50 33     89 if ($expire_cb && $self->sock) {
111 2         45 $expire_cb->($now);
112             }
113 2         129 return 1;
114             }
115 4         25 return 0;
116             }
117              
118             # may be overriden in subclass, called only on errors
119             # The HTTP version of this will fake an HTTP response for LWP compatibility
120             sub err {
121 0     0 0 0 my ($self, $close_reason) = @_;
122              
123 0         0 $self->inflight_expire; # ensure we don't call new_err on eventual close()
124              
125 0 0       0 if ($close_reason =~ /\A:event_(?:hup|err)\z/) {
126             # there's a chance this can be invoked while inflight,
127             # conn_drop will handle this case appropriately
128 0 0       0 $self->{mfs_pool}->conn_drop($self, $close_reason) if $self->{mfs_pool};
129             } else {
130 0         0 $self->close($close_reason);
131             }
132             }
133              
134             # sets the pool this connection belongs to, only call from ConnectionPool
135             sub set_pool {
136 10     10 0 25 my ($self, $pool) = @_;
137              
138 10         54 $self->{mfs_pool} = $pool;
139             }
140              
141             # closes a connection, and may reschedule the inflight callback if
142             # close_reason is ":retry"
143             sub close {
144 9     9 1 34 my ($self, $close_reason) = @_;
145              
146 9         136 delete $self->{mfs_expire_cb}; # avoid circular ref
147              
148 9         36 my $mfs_pool = delete $self->{mfs_pool}; # avoid circular ref
149 9         21 my $inflight_cb;
150              
151 9 50       42 if ($mfs_pool) {
152 9         67 $mfs_pool->schedule_queued;
153 9         286 $inflight_cb = $mfs_pool->conn_close_prepare($self, $close_reason);
154             }
155 9         108 $self->SUPER::close($close_reason); # Danga::Socket->close
156              
157 9 100 66     832 if ($inflight_cb && $close_reason) {
158 1 50       14 if ($close_reason eq ":retry") {
159 0         0 my ($ip, $port) = $self->ip_port;
160              
161 0         0 $mfs_pool->enqueue($ip, $port, $inflight_cb);
162             } else {
163             # Danga::Socket-scheduled write()s which fail with ECONNREFUSED,
164             # EPIPE, or "write_error" after an initial (non-blocking)
165             # connect()
166             $mfs_pool->on_next_tick(sub {
167 1   50 1   53 ref($self)->new_err($close_reason || "error", $inflight_cb);
168 1         23 });
169             }
170             }
171             }
172              
173             # Marks a connection as no-longer inflight. Calling this prevents retries.
174             sub inflight_expire {
175 14     14 0 35 my ($self) = @_;
176 14         48 my $mfs_pool = $self->{mfs_pool};
177 14 50       54 die "BUG: expiring without MogileFS::ConnectionPool\n" unless $mfs_pool;
178 14         266 $mfs_pool->inflight_cb_expire($self);
179             }
180              
181             # Danga::Socket callbacks
182 0     0 1 0 sub event_hup { $_[0]->err(':event_hup'); }
183 0     0 1 0 sub event_err { $_[0]->err(':event_err'); }
184              
185             # called when we couldn't create a socket, but need to create an object
186             # anyways for errors (creating fake, LWP-style error responses)
187             sub new_err {
188 1     1 0 5 my ($class, $err, $start_cb) = @_;
189 1         6 my $self = fields::new($class);
190 1         340 $self->{mfs_err} = $err;
191             # on socket errors
192 1         5 $start_cb->($self);
193             }
194              
195             # returns this connection back to its associated pool.
196             # Returns false if not successful (pool is full)
197             sub persist {
198 11     11 0 22 my ($self) = @_;
199 11         30 my $mfs_pool = $self->{mfs_pool};
200              
201 11 50       71 return $mfs_pool ? $mfs_pool->conn_persist($self) : 0;
202             }
203              
204             1;