File Coverage

blib/lib/Perlbal/Plugin/Throttle.pm
Criterion Covered Total %
statement 75 313 23.9
branch 0 118 0.0
condition 1 51 1.9
subroutine 25 40 62.5
pod 0 3 0.0
total 101 525 19.2


line stmt bran cond sub pod time code
1             package Perlbal::Plugin::Throttle;
2              
3 1     1   8765 use strict;
  1         3  
  1         46  
4 1     1   6 use warnings;
  1         2  
  1         524  
5              
6             our $VERSION = '1.20';
7              
8 1     1   10 use List::Util 'min';
  1         3  
  1         158  
9 1     1   7 use Danga::Socket 1.59;
  1         71  
  1         32  
10 1     1   6 use Perlbal 1.70;
  1         21  
  1         25  
11 1     1   5 use Perlbal::ClientProxy ();
  1         3  
  1         18  
12 1     1   6 use Perlbal::HTTPHeaders ();
  1         2  
  1         17  
13 1     1   5 use Time::HiRes ();
  1         2  
  1         35  
14              
15             # Debugging flag
16 1   50 1   7 use constant VERBOSE => $ENV{THROTTLE_VERBOSE} || 0;
  1         3  
  1         2139  
17              
18             sub load {
19             # behavior
20 0     0 0   Perlbal::Service::add_tunable(
21             whitelist_file => {
22             check_role => '*',
23             des => "File containing CIDRs which are never throttled. (Net::CIDR::Lite must be installed.)",
24             check_type => 'file_or_none',
25             }
26             );
27 0           Perlbal::Service::add_tunable(
28             blacklist_file => {
29             check_role => '*',
30             des => "File containing CIDRs which are always denied outright. (Net::CIDR::Lite must be installed.)",
31             check_type => 'file_or_none',
32             }
33             );
34 0           Perlbal::Service::add_tunable(
35             default_action => {
36             check_role => '*',
37             des => "Whether to throttle or allow new connections from clients on neither the whitelist nor blacklist.",
38             check_type => [enum => [qw( allow throttle )]],
39             default => 'throttle',
40             }
41             );
42 0           Perlbal::Service::add_tunable(
43             blacklist_action => {
44             check_role => '*',
45             des => "Whether to deny or throttle connections from blacklisted IPs.",
46             check_type => [enum => [qw( deny throttle )]],
47             default => 'deny',
48             }
49             );
50              
51             # filters
52 0           Perlbal::Service::add_tunable(
53             path_regex => {
54             check_role => '*',
55             des => "Regex which path portion of URI must match for throttling to be in effect.",
56             }
57             );
58 0           Perlbal::Service::add_tunable(
59             method_regex => {
60             check_role => '*',
61             des => "Regex which HTTP method must match for throttling to be in effect.",
62             }
63             );
64              
65             # logging
66 0           Perlbal::Service::add_tunable(
67             log_events => {
68             check_role => '*',
69             des => q{Comma-separated list of events to log (ban, unban, whitelisted, blacklisted, concurrent, throttled, banned; all; none). If this is changed after the plugin is registered, the "throttle reload config" command must be issued.},
70             check_type => [regexp => qr/^(ban|unban|whitelisted|blacklisted|concurrent|throttled|banned|all|none| |,)+$/, "log_events is a comma-separated list of loggable events"],
71             default => 'all',
72             }
73             );
74 0           Perlbal::Service::add_tunable(
75             log_only => {
76             check_role => '*',
77             des => "Perform the full throttling calculation, but don't actually throttle or deny connections.",
78             check_type => 'bool',
79             default => 0,
80             }
81             );
82              
83             # throttler parameters
84 0           Perlbal::Service::add_tunable(
85             throttle_threshold_seconds => {
86             check_role => '*',
87             des => "Minimum allowable time between requests. If a non-white/-blacklisted client makes another connection within this interval, it will be throttled for initial_delay seconds. Further connections will double the delay time.",
88             check_type => 'int',
89             default => 60,
90             }
91             );
92 0           Perlbal::Service::add_tunable(
93             initial_delay => {
94             check_role => '*',
95             des => "Minimum time for a connection to be throttled if occurring within throttle_threshold_seconds of last attempt.",
96             check_type => 'int',
97             default => 3,
98             }
99             );
100 0           Perlbal::Service::add_tunable(
101             max_delay => {
102             check_role => '*',
103             des => "Maximum time for a connection to be throttled after exponential increase from initial_delay.",
104             check_type => 'int',
105             default => 300,
106             }
107             );
108 0           Perlbal::Service::add_tunable(
109             max_concurrent => {
110             check_role => '*',
111             des => "Maximum number of connections accepted at a time from a single IP, per perlbal instance.",
112             check_type => 'int',
113             default => 2,
114             }
115             );
116 0           Perlbal::Service::add_tunable(
117             ban_threshold => {
118             check_role => '*',
119             des => "Number of accumulated violations required to temporarily ban the source IP.",
120             check_type => 'int',
121             default => 0,
122             }
123             );
124 0           Perlbal::Service::add_tunable(
125             ban_expiration => {
126             check_role => '*',
127             des => "Number of seconds after which banned IP is unbanned.",
128             check_type => 'int',
129             default => 60,
130             }
131             );
132              
133             # memcached
134 0           Perlbal::Service::add_tunable(
135             memcached_servers => {
136             check_role => '*',
137             des => "List of memcached servers to share state in, if desired. (Cache::Memcached::Async must be installed.)",
138             }
139             );
140 0           Perlbal::Service::add_tunable(
141             memcached_async_clients => {
142             check_role => '*',
143             des => "Number of parallel Cache::Memcached::Async objects to use.",
144             check_type => 'int',
145             default => 10,
146             }
147             );
148 0           Perlbal::Service::add_tunable(
149             instance_name => {
150             check_role => '*',
151             des => "Name of throttler instance; instances with the same name will share knowledge of IPs.",
152             default => 'Throttle',
153             }
154             );
155              
156             Perlbal::register_global_hook('manage_command.throttle', sub {
157 0     0     my $mc = shift->parse(qr/^
158             throttle\s+
159             (reload)\s+ # command
160             (whitelist|blacklist|config)
161             $/xi,
162             "usage: throttle reload ");
163 0           my ($cmd, $what) = $mc->args;
164              
165 0           my $svcname = $mc->{ctx}{last_created};
166 0 0         unless ($svcname) {
167 0           return $mc->err("No service name set. This command must be used after CREATE SERVICE or USE ");
168             }
169              
170 0           my $ss = Perlbal->service($svcname);
171 0 0         return $mc->err("Non-existent service '$svcname'") unless $ss;
172              
173 0   0       my $cfg = $ss->{extra_config} ||= {};
174 0   0       my $stash = $cfg->{_throttle_stash} ||= {};
175              
176 0 0         if ($cmd eq 'reload') {
177 0 0         if ($what eq 'whitelist') {
    0          
    0          
178 0 0         if (my $whitelist = $cfg->{whitelist_file}) {
179 0           eval { $stash->{whitelist} = load_cidr_list($whitelist); };
  0            
180 0 0 0       return $mc->err("Couldn't load $whitelist: $@")
181             if $@ || !$stash->{whitelist};
182             }
183             else {
184 0           return $mc->err("no whitelist file configured");
185             }
186             }
187             elsif ($what eq 'blacklist') {
188 0 0         if (my $blacklist = $cfg->{blacklist_file}) {
189 0           eval { $stash->{blacklist} = load_cidr_list($blacklist); };
  0            
190 0 0 0       return $mc->err("Couldn't load $blacklist: $@")
191             if $@ || !$stash->{blacklist};
192             }
193             else {
194 0           return $mc->err("no blacklist file configured");
195             }
196             }
197             elsif ($what eq 'config') {
198 0           $stash->{config_reloader}->();
199             }
200             else {
201 0           return $mc->err("unknown object to reload: $what");
202             }
203             }
204             else {
205 0           return $mc->err("unknown command $cmd");
206             }
207              
208 0           return $mc->ok;
209 0           });
210             }
211              
212             # magical Perlbal hook return value constants
213 1     1   9 use constant HANDLE_REQUEST => 0;
  1         2  
  1         70  
214 1     1   7 use constant IGNORE_REQUEST => 1;
  1         2  
  1         266  
215              
216             # indexes into logging flag list
217 1     1   74 use constant LOG_BAN_ADDED => 0;
  1         2  
  1         57  
218 1     1   6 use constant LOG_BAN_REMOVED => 1;
  1         3  
  1         47  
219 1     1   6 use constant LOG_ALLOW_WHITELISTED => 2;
  1         1  
  1         42  
220 1     1   6 use constant LOG_ALLOW_DEFAULT => 3;
  1         4  
  1         43  
221 1     1   5 use constant LOG_DENY_BANNED => 4;
  1         3  
  1         44  
222 1     1   5 use constant LOG_DENY_BLACKLISTED => 5;
  1         2  
  1         42  
223 1     1   5 use constant LOG_DENY_CONCURRENT => 6;
  1         2  
  1         41  
224 1     1   5 use constant LOG_THROTTLE_BLACKLISTED => 7;
  1         3  
  1         40  
225 1     1   5 use constant LOG_THROTTLE_DEFAULT => 8;
  1         2  
  1         41  
226 1     1   6 use constant NUM_LOG_FLAGS => 9;
  1         2  
  1         42  
227              
228 1     1   6 use constant RESULT_ALLOW => 0;
  1         3  
  1         43  
229 1     1   5 use constant RESULT_THROTTLE => 1;
  1         2  
  1         84  
230 1     1   6 use constant RESULT_DENY => 2;
  1         4  
  1         4567  
231              
232             # localized variable to track if a connection has already been throttled
233             our $DELAYED = undef;
234              
235             sub register {
236 0     0 0   my ($class, $svc) = @_;
237              
238 0           VERBOSE and Perlbal::log(info => "Registering Throttle plugin on service $svc->{name}");
239              
240 0   0       my $cfg = $svc->{extra_config} ||= {};
241 0   0       my $stash = $cfg->{_throttle_stash} ||= {};
242              
243             # these are allowed to die at register time
244 0 0         $stash->{whitelist} = load_cidr_list($cfg->{whitelist_file}) if $cfg->{whitelist_file};
245 0 0         $stash->{blacklist} = load_cidr_list($cfg->{blacklist_file}) if $cfg->{blacklist_file};
246              
247             # several service tunables are cached in lexicals for efficiency. if these
248             # are changed, the "throttle reload config" command must be issued to
249             # update the cache. this implements the reloading (and initial loading).
250 0           my ($log, $path_regex, $method_regex);
251             my $loader = $stash->{config_reloader} = sub {
252 0     0     my @log_on_cfg = grep {length} split /[, ]+/, lc $cfg->{log_events};
  0            
253 0           my @log_events = (0) x NUM_LOG_FLAGS;
254 0           for (@log_on_cfg) {
255 0 0         $log_events[LOG_BAN_ADDED] = 1 if $_ eq 'ban';
256 0 0         $log_events[LOG_BAN_REMOVED] = 1 if $_ eq 'unban';
257 0 0         $log_events[LOG_ALLOW_WHITELISTED] = 1 if $_ eq 'whitelisted';
258 0 0         $log_events[LOG_DENY_BANNED] = 1 if $_ eq 'banned';
259 0 0         $log_events[LOG_DENY_BLACKLISTED] =
260             $log_events[LOG_THROTTLE_BLACKLISTED] = 1 if $_ eq 'blacklisted';
261 0 0         $log_events[LOG_DENY_CONCURRENT] = 1 if $_ eq 'concurrent';
262 0 0         $log_events[LOG_THROTTLE_DEFAULT] = 1 if $_ eq 'throttled';
263 0 0         @log_events = (1) x NUM_LOG_FLAGS if $_ eq 'all';
264 0 0         @log_events = (0) x NUM_LOG_FLAGS if $_ eq 'none';
265             }
266              
267 0           $log = sub {};
  0            
268 0 0         if (grep {$_} @log_events) {
  0            
269 0           my $has_syslogger = eval { require Perlbal::Plugin::Syslogger; 1 };
  0            
  0            
270 0 0 0       if ($has_syslogger && $cfg->{syslog_host}) {
271 0           VERBOSE and Perlbal::log(info => "Using Perlbal::Plugin::Syslogger");
272             $log = sub {
273 0           my $action = shift;
274 0 0         return unless $log_events[$action];
275 0           Perlbal::Plugin::Syslogger::send_syslog_msg($svc, @_);
276 0           };
277             }
278             else {
279 0           VERBOSE and Perlbal::log(warn => "Syslogger plugin unavailable, using Perlbal::log");
280             $log = sub {
281 0           my $action = shift;
282 0 0         return unless $log_events[$action];
283 0           Perlbal::log(info => @_);
284 0           };
285             }
286             }
287              
288 0 0         $path_regex = $cfg->{path_regex} ? qr/$cfg->{path_regex}/ : undef;
289 0 0         $method_regex = $cfg->{method_regex} ? qr/$cfg->{method_regex}/ : undef;
290 0           };
291 0           $loader->();
292              
293             # structures for tracking IP states
294 0           my %throttled;
295             my %banned;
296 0           my $store = Perlbal::Plugin::Throttle::Store->new($cfg);
297              
298             my $start_handler = sub {
299 0     0     my $retval = eval {
300 0           my $request_start = Time::HiRes::time;
301              
302 0           VERBOSE and Perlbal::log(info => "In Throttle (%s)",
303             defined $DELAYED ? sprintf 'back after %.2fs', $DELAYED : 'initial'
304             );
305              
306 0           my Perlbal::ClientProxy $cp = shift;
307 0 0         unless ($cp) {
308 0           VERBOSE and Perlbal::log(error => "Missing ClientProxy");
309 0           return HANDLE_REQUEST;
310             }
311              
312 0           my $headers = $cp->{req_headers};
313 0 0         unless ($headers) {
314 0           VERBOSE and Perlbal::log(info => "Missing headers");
315 0           return HANDLE_REQUEST;
316             }
317 0           my $uri = $headers->request_uri;
318 0           my $method = $headers->request_method;
319              
320 0   0       my $ip = $cp->observed_ip_string() || $cp->peer_ip_string;
321 0 0         unless (defined $ip) {
322             # happens if client goes away
323 0           VERBOSE and Perlbal::log(warn => "Client went away");
324 0           $cp->send_response(500, "Internal server error.\n");
325 0           return IGNORE_REQUEST;
326             }
327              
328             # back from throttling, all later checks were already passed
329 0 0         return HANDLE_REQUEST if defined $DELAYED;
330              
331             # increment the count of throttled conns
332 0           $throttled{$ip}++;
333              
334             my $result = sub {
335             # immediately passthrough whitelistees
336 0 0 0       if ($stash->{whitelist} && $stash->{whitelist}->find($ip)) {
337 0           $log->(LOG_ALLOW_WHITELISTED, "Letting whitelisted ip $ip through");
338 0           return RESULT_ALLOW;
339             }
340              
341             # drop conns from banned IPs
342 0 0         if ($banned{$ip}) {
343 0           $log->(LOG_DENY_BANNED, "Denying banned IP $ip");
344 0           return RESULT_DENY;
345             }
346              
347             # drop conns from banned/blacklisted IPs
348 0 0 0       if ($stash->{blacklist} && $stash->{blacklist}->find($ip)) {
349 0 0         if ($cfg->{blacklist_action} eq 'deny') {
350 0           $log->(LOG_DENY_BLACKLISTED, "Denying blacklisted IP $ip");
351 0           return RESULT_DENY;
352             }
353             else {
354 0           $log->(LOG_THROTTLE_BLACKLISTED, "Throttling blacklisted IP $ip");
355 0           return RESULT_THROTTLE;
356             }
357             }
358              
359 0 0 0       if (exists $throttled{$ip} && $throttled{$ip} > $cfg->{max_concurrent}) {
360 0           $log->(LOG_DENY_CONCURRENT, "Too many concurrent connections from $ip");
361 0           return RESULT_DENY;
362             }
363              
364             # only throttle matching requests
365 0 0 0       if (defined $path_regex && $uri !~ $path_regex) {
366 0           VERBOSE && Perlbal::log(info => "This isn't a throttled URL: %s", $uri);
367 0           return RESULT_ALLOW;
368             }
369 0 0 0       if (defined $method_regex && $method !~ $method_regex) {
370 0           VERBOSE && Perlbal::log(info => "This isn't a throttled method: %s", $method);
371 0           return RESULT_ALLOW;
372             }
373              
374 0 0         return $cfg->{default_action} eq 'allow' ? RESULT_ALLOW : RESULT_THROTTLE;
375 0           }->();
376              
377 0 0         if ($result == RESULT_DENY) {
    0          
378 0 0         unless ($cfg->{log_only}) {
379 0           $cp->send_response(403, "Forbidden.\n");
380 0           return IGNORE_REQUEST;
381             }
382             }
383             elsif ($result == RESULT_ALLOW) {
384 0           return HANDLE_REQUEST;
385             }
386              
387             # now entering throttle path...
388              
389             # check if we've seen this IP lately.
390 0           my $key = $cfg->{instance_name} . $ip;
391             $store->get(key => $key, timeout => $cfg->{initial_delay}, callback => sub {
392 0           my ($last_request_time, $violations) = @_;
393 0   0       $violations ||= 0;
394              
395             # do an early set to update the last_request_time and
396             # expiration in case of early exit
397 0           $store->set(
398             key => $key,
399             start => $request_start,
400             count => $violations,
401             exptime => $cfg->{throttle_threshold_seconds},
402             timeout => $cfg->{initial_delay},
403             );
404              
405 0           my $time_since_last_request;
406 0 0         if (defined $last_request_time) {
407 0           $time_since_last_request = $request_start - $last_request_time;
408             }
409              
410 0           VERBOSE and Perlbal::log(
411             info => "%s; this request at %.3f; last at %s; interval is %s",
412             $ip, $request_start,
413             $last_request_time || 'n/a', $time_since_last_request || 'n/a'
414             );
415              
416             my $handle_after = sub {
417 0           my $delay = shift;
418 0 0         $delay = 0 if $cfg->{log_only};
419              
420             # put request on the backburner
421 0           $cp->watch_read(0);
422             Danga::Socket->AddTimer($delay, sub {
423             # we're now executing in a timer callback after
424             # perlbal has been told to ignore the request. so if we
425             # now want it handled it needs to be re-adopted.
426 0           local $DELAYED = $delay; # to short-circuit throttling logic on the next pass through
427 0           $cp->watch_read(1);
428 0           $svc->adopt_base_client($cp);
429 0           });
430              
431 0           return IGNORE_REQUEST;
432 0           };
433              
434             # can we let it through immediately?
435 0 0         unless (defined $time_since_last_request) {
436             # forgotten or haven't seen ip before
437 0           $log->(LOG_ALLOW_DEFAULT, "Allowed unseen $ip");
438 0           return $handle_after->(0);
439             }
440 0 0         if ($time_since_last_request >= $cfg->{throttle_threshold_seconds}) {
441             # waited long enough
442 0           $log->(LOG_ALLOW_DEFAULT, "Allowed reformed $ip");
443 0           return $handle_after->(0);
444             }
445              
446             # need to throttle, now figure out by how much. at least
447             # initial_delay, at most max_delay, exponentially increasing in
448             # between
449 0           my $delay = min($cfg->{initial_delay} * 2**$violations, $cfg->{max_delay});
450              
451 0           $violations++;
452              
453             # banhammer for great justice
454 0 0 0       if ($cfg->{ban_threshold} && $violations >= $cfg->{ban_threshold}) {
455 0           $log->(LOG_BAN_ADDED, "Banning $ip for $cfg->{ban_expiration}s: %s", $uri);
456 0 0         $banned{$ip}++ unless $cfg->{log_only};
457             Danga::Socket->AddTimer($cfg->{ban_expiration}, sub {
458 0           $log->(LOG_BAN_REMOVED, "Unbanning $ip");
459 0           delete $banned{$ip};
460 0           });
461 0           $cp->close;
462 0           return IGNORE_REQUEST;
463             }
464              
465             $store->set(
466 0           key => $key,
467             start => $request_start,
468             count => $violations,
469             exptime => $delay,
470             timeout => $cfg->{initial_delay},
471             );
472              
473 0           $log->(LOG_THROTTLE_DEFAULT, "Throttling $ip for $delay: %s", $uri);
474              
475             # schedule request to be re-handled
476 0           return $handle_after->($delay);
477 0           });
478              
479             # make sure we don't take up reading until readoption
480 0           $cp->watch_read(0);
481 0           return IGNORE_REQUEST;
482             };
483 0 0         if ($@) {
484             # if something horrible should happen internally, don't take out perlbal
485 0           Perlbal::log(err => "Throttle failed: '%s'", $@);
486 0           return HANDLE_REQUEST;
487             }
488             else {
489 0           return $retval;
490             }
491 0           };
492              
493             my $end_handler = sub {
494 0     0     my Perlbal::ClientProxy $cp = shift;
495              
496 0   0       my $ip = $cp->observed_ip_string() || $cp->peer_ip_string;
497 0 0         return unless $ip;
498              
499 0 0         delete $throttled{$ip} unless --$throttled{$ip} > 0;
500 0           };
501              
502 0           $svc->register_hook(Throttle => start_proxy_request => $start_handler);
503 0           $svc->register_hook(Throttle => end_proxy_request => $end_handler);
504             }
505              
506             sub load_cidr_list {
507 0     0 0   my $file = shift;
508              
509 0           require Net::CIDR::Lite;
510              
511 0           my $empty = 1;
512 0           my $list = Net::CIDR::Lite->new;
513              
514 0 0         open my $fh, '<', $file or die "Unable to open file $file: $!";
515 0           while (my $line = <$fh>) {
516 0           $line =~ s/#.*//; # comments
517 0 0         if ($line =~ /([0-9\/\.]+)/) {
518 0           my $cidr = $1;
519 0 0         if (index($cidr, "/") < 0) {
520             # slash-less specifications are assumed to be singular IPs
521 0           $list->add_ip($cidr);
522             }
523             else {
524 0           $list->add($cidr);
525             }
526 0           $empty = 0;
527             }
528             }
529              
530 0 0         die "$file contains no recognizable CIDRs\n" if $empty;
531              
532 0           return $list;
533             }
534              
535             package Perlbal::Plugin::Throttle::Store;
536              
537             sub new {
538 0     0     my $class = shift;
539 0           my $cfg = shift;
540              
541 0           my $want_memcached = $cfg->{memcached_servers};
542 0           my $has_memcached = eval { require Cache::Memcached::Async; 1 };
  0            
  0            
543              
544 0 0 0       if ($want_memcached && !$has_memcached) {
545 0           die "memcached support requested but Cache::Memcached::Async failed to load: $@\n";
546             }
547 0 0         return $want_memcached
548             ? Perlbal::Plugin::Throttle::Store::Memcached->new($cfg)
549             : Perlbal::Plugin::Throttle::Store::Memory->new($cfg);
550             }
551              
552             package Perlbal::Plugin::Throttle::Store::Memcached;
553              
554             sub new {
555 0     0     my $class = shift;
556 0           my $cfg = shift;
557              
558 0           my @servers = split /[,\s]+/, $cfg->{memcached_servers};
559 0           my @cxns = map {
560 0           Cache::Memcached::Async->new({ servers => \@servers })
561             } 1 .. $cfg->{memcached_async_clients};
562              
563 0           return bless \@cxns, $class;
564             }
565              
566             sub get {
567 0     0     my $self = shift;
568 0           my %p = @_;
569             $self->[rand @$self]->get(
570             $p{key},
571             timeout => $p{timeout},
572             callback => sub {
573 0     0     my $value = shift;
574 0 0         return $p{callback}->() unless $value;
575 0           return $p{callback}->(unpack('FS', $value));
576             },
577 0           );
578 0           return;
579             }
580              
581             sub set {
582 0     0     my $self = shift;
583 0           my %p = @_;
584              
585 0           $self->[rand @$self]->set(
586             $p{key} => pack('FS', $p{start}, $p{count}),
587             exptime => $p{exptime},
588             timeout => $p{timeout},
589             );
590             }
591              
592             package Perlbal::Plugin::Throttle::Store::Memory;
593              
594 1     1   15 use Time::HiRes 'time';
  1         2  
  1         13  
595              
596             sub new {
597 0     0     my $class = shift;
598 0           my $cfg = shift;
599 0           return bless {}, $class;
600             }
601              
602             sub get {
603 0     0     my $self = shift;
604 0           my %p = @_;
605 0           my $entry = $self->{$p{key}};
606              
607 0 0 0       return $p{callback}->($entry->[1], $entry->[2]) if $entry && time < $entry->[0];
608 0           return $p{callback}->();
609             }
610              
611             sub set {
612 0     0     my $self = shift;
613 0           my %p = @_;
614 0           $self->{$p{key}} = [time + $p{exptime}, $p{start}, $p{count}];
615 0           return;
616             }
617              
618             1;
619              
620             __END__