File Coverage

blib/lib/Data/Throttler/BucketChain/Memcached.pm
Criterion Covered Total %
statement 9 9 100.0
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 12 12 100.0


line stmt bran cond sub pod time code
1             # $Id: /mirror/perl/Data-Throttler-Memcached/trunk/lib/Data/Throttler/BucketChain/Memcached.pm 8774 2007-11-08T09:43:20.728908Z daisuke $
2             #
3             # Copyright (c) 2007 Daisuke Maki
4             # All rights reserved.
5              
6             package Data::Throttler::BucketChain::Memcached;
7 1     1   7 use strict;
  1         2  
  1         39  
8 1     1   6 use warnings;
  1         1  
  1         36  
9 1     1   5 use base qw(Class::Accessor::Fast Data::Throttler::BucketChain);
  1         2  
  1         757  
10             use Cache::Memcached::Managed;
11             use Log::Log4perl qw(:easy);
12              
13             __PACKAGE__->mk_accessors($_) for qw(id max_items interval cache);
14             __PACKAGE__->mk_accessors($_) for qw(buckets bucket_time_span nof_buckets );
15              
16             sub new
17             {
18             my $class = shift;
19             my %args = @_;
20              
21             my $self = bless {
22             max_items => delete $args{max_items},
23             interval => delete $args{interval},
24             nof_buckets => delete $args{nof_buckets},
25             id => delete $args{id} || do {
26             no warnings;
27             require Digest::MD5;
28             Digest::MD5::md5_hex($$, time(), rand(), {})
29             }
30             }, $class;
31              
32             my $cache = Cache::Memcached::Managed->new(
33             # defaults
34             data => '127.0.0.1:11211',
35             namespace => $class,
36             # user-specified
37             %{ $args{cache} || {} },
38             # overrides
39             expiration => $self->interval * 2
40             );
41             $self->cache( $cache );
42              
43             if(!$self->max_items or !$self->interval) {
44             LOGDIE "Both max_items and interval need to be defined";
45             }
46              
47             if(!$self->nof_buckets) {
48             $self->nof_buckets(10);
49             }
50              
51             if($self->nof_buckets > $self->interval) {
52             $self->nof_buckets( $self->interval );
53             }
54              
55             $self->reset();
56             return $self;
57             }
58              
59             sub reset
60             {
61             my $self = shift;
62              
63             $self->cache->delete_group( group => $self->id );
64             $self->buckets([]);
65              
66             my $bucket_time_span = int ($self->interval / $self->nof_buckets);
67             $self->bucket_time_span( $bucket_time_span );
68              
69             my $time_start = time() - ($self->nof_buckets - 1) * $bucket_time_span;
70              
71             for(1..$self->nof_buckets) {
72             my $time_end = $time_start + $bucket_time_span - 1;
73             DEBUG "Creating bucket ", _hms($time_start), " - ", _hms($time_end);
74             push @{$self->{buckets}}, {
75             time => Data::Throttler::Range->new($time_start, $time_end),
76             id => join('.', $self->id, $time_start, $time_end),
77             count => {},
78             };
79             $time_start = $time_end + 1;
80             }
81              
82             $self->{head_bucket_idx} = 0;
83             $self->{tail_bucket_idx} = $#{$self->{buckets}};
84             }
85              
86             sub as_string
87             {
88             my($self) = @_;
89              
90             warn "as_string for Data::Throttler::Memcached is currently unimplemented";
91             }
92              
93             sub _hms {
94             my($time) = @_;
95              
96             my ($sec,$min,$hour) = localtime($time);
97             return sprintf "%02d:%02d:%02d",
98             $hour, $min, $sec;
99             }
100              
101             sub bucket_add
102             {
103             my($self, $time) = @_;
104              
105             # ... and append a new one at the end
106             my $time_start = $self->{buckets}->
107             [$self->{tail_bucket_idx}]->{time}->max + 1;
108             my $time_end = $time_start + $self->{bucket_time_span} - 1;
109              
110             DEBUG "Adding bucket: ", _hms($time_start), " - ", _hms($time_end);
111              
112             $self->{tail_bucket_idx}++;
113             $self->{tail_bucket_idx} = 0 if $self->{tail_bucket_idx} >
114             $#{$self->{buckets}};
115             $self->{head_bucket_idx}++;
116             $self->{head_bucket_idx} = 0 if $self->{head_bucket_idx} >
117             $#{$self->{buckets}};
118              
119             $self->{buckets}->[ $self->{tail_bucket_idx} ] = {
120             time => Data::Throttler::Range->new($time_start, $time_end),
121             id => join('.', $self->id, $time_start, $time_end),
122             count => {},
123             };
124             }
125              
126             sub try_push
127             {
128             my($self, %options) = @_;
129              
130             my $key = "_default";
131             $key = $options{key} if defined $options{key};
132              
133             my $time = time();
134             $time = $options{time} if defined $options{time};
135              
136             my $count = 1;
137             $count = $options{count} if defined $options{count};
138              
139             DEBUG "Trying to push $key ", _hms($time), " $count";
140              
141             my $b = $self->bucket_find($time);
142              
143             if(!$b) {
144             $self->rotate($time);
145             $b = $self->bucket_find($time);
146             }
147              
148             # Determine the total count for this key
149             my %count = %{ $self->cache->get_multi(
150             id => [ map { [ $key, $_->{id} ] } @{ $self->buckets } ],
151             key => 'count'
152             ) };
153             my $val = 0;
154             $val += $_ for values %count;
155              
156              
157             if($val >= $self->{max_items}) {
158             DEBUG "Not increasing counter $key by $count (already at max $val|$self->{max_items})";
159             return 0;
160             } else {
161             DEBUG "Increasing counter $key by $count ",
162             "($val|$self->{max_items})";
163             $self->cache->incr(
164             value => 1,
165             id => [ $key, $b->{id} ],
166             key => 'count'
167             );
168             return 1;
169             }
170              
171             LOGDIE "Time $time is outside of bucket range\n", $self->as_string;
172             return undef;
173             }
174              
175             1;
176              
177             __END__