File Coverage

blib/lib/Data/Throttler.pm
Criterion Covered Total %
statement 231 272 84.9
branch 53 66 80.3
condition 9 15 60.0
subroutine 44 48 91.6
pod 0 8 0.0
total 337 409 82.4


line stmt bran cond sub pod time code
1             ###########################################
2             package Data::Throttler;
3             ###########################################
4 3     3   66966 use strict;
  3         6  
  3         139  
5 3     3   17 use warnings;
  3         6  
  3         117  
6 3     3   3005 use Log::Log4perl qw(:easy);
  3         173340  
  3         30  
7              
8             our $VERSION = "0.07";
9             our $DB_VERSION = "1.1";
10              
11             ###########################################
12             sub new {
13             ###########################################
14 7     7 0 13119 my($class, %options) = @_;
15              
16 7         67 my $self = {
17             db_version => $DB_VERSION,
18             backend => "Memory",
19             backend_options => {},
20             reset => 0,
21             %options,
22             };
23              
24 7 100       41 if($self->{db_file}) {
25             # legacy option, translate
26 4         21 $self->{backend_options} = {
27             db_file => $self->{db_file},
28             };
29 4         15 $self->{backend} = "YAML";
30             }
31              
32 7         29 my $backend_class = "Data::Throttler::Backend::$self->{backend}";
33              
34 7         87 $self->{db} = $backend_class->new(
35 7         15 %{ $self->{backend_options} } );
36              
37 7         24 $self->{changed} = 0;
38              
39 7         15 bless $self, $class;
40              
41 7         15 my $create = 1;
42              
43 7 100       55 if( $self->{ db }->exists() ) {
44 3         16 DEBUG "Backend store exists";
45 3         39 $self->lock();
46 3         112 $self->{data} = $self->{ db }->load();
47              
48 3         86735 $create = 0;
49              
50 3 100 66     80 if($self->{data}->{chain} and
      33        
51             ($self->{data}->{chain}->{max_items} != $options{max_items} or
52             $self->{data}->{chain}->{interval} != $options{interval})) {
53 2         7 $self->{changed} = 1;
54 2         6 $create = 1;
55             }
56              
57 3 100 66     31 if($options{reset} or !$self->{ db }->backend_store_ok() ) {
58 1         4 $create = 1;
59             }
60 3         20 $self->unlock();
61             }
62            
63 7 100       42 if($create) {
64 6 50       45 $self->{ db }->create( \%options ) or
65             LOGDIE "Creating backend store failed";
66              
67             # create bucket chain
68 6         56 $self->create( {
69             max_items => $options{max_items},
70             interval => $options{interval},
71             });
72              
73 6         94 $self->{db}->save( $self->{data} );
74             }
75              
76 7         84007 return $self;
77             }
78              
79             ###########################################
80             sub create {
81             ###########################################
82 6     6 0 13 my($self, $options) = @_;
83              
84 6 100       22 if( $self->{changed} ) {
85 2         40 ERROR "Bucket chain parameters have changed ",
86             "(max_items: $self->{data}->{chain}->{max_items}/",
87             "$options->{max_items} ",
88             "(interval: $self->{data}->{chain}->{interval}/",
89             "$options->{interval})", ", throwing old chain away";
90 2         20 $self->{changed} = 0;
91             }
92              
93 6         51 DEBUG "Creating bucket chain max_items=$options->{max_items} ",
94             "interval=$options->{interval}";
95              
96 6         110 $self->{data}->{chain} = Data::Throttler::BucketChain->new(
97             max_items => $options->{max_items},
98             interval => $options->{interval},
99             );
100             }
101              
102             ###########################################
103             sub lock {
104             ###########################################
105 52     52 0 71 my($self) = @_;
106 52         259 DEBUG "Trying to get lock ($$)";
107 52         1822 $self->{db}->lock();
108 52         198 DEBUG "Lock on ($$)";
109             }
110              
111             ###########################################
112             sub unlock {
113             ###########################################
114 52     52 0 74 my($self) = @_;
115 52         170 DEBUG "Lock off";
116 52         479 $self->{db}->unlock();
117             }
118              
119             ###########################################
120             sub try_push {
121             ###########################################
122 44     44 0 2607 my($self, %options) = @_;
123              
124 44 100       125 if(exists $options{key}) {
125 26         60 DEBUG "Pushing key $options{key}";
126             } else {
127 18         78 DEBUG "Pushing keyless item";
128             }
129              
130 44         324 $self->lock();
131              
132 44         278 $self->{data} = $self->{db}->load();
133 44         284910 my $ret = $self->{data}->{chain}->try_push(%options);
134 44         150 $self->{db}->save( $self->{data} );
135              
136 44         241640 $self->unlock();
137 44         208 return $ret;
138             }
139              
140             ###########################################
141             sub reset_key {
142             ###########################################
143 5     5 0 17 my($self, %options) = @_;
144              
145 5 100       17 if(exists $options{key}) {
146 3         11 DEBUG "Resetting count for $options{key}";
147             } else {
148 2         7 DEBUG "Resetting count for keyless item";
149             }
150              
151 5         40 $self->lock();
152              
153 5         44 $self->{data} = $self->{db}->load();
154 5         51372 my $ret = $self->{data}->{chain}->reset_key(%options);
155 5         50 $self->{db}->save( $self->{data} );
156              
157 5         39943 $self->unlock();
158 5         32 return $ret;
159             }
160              
161             ###########################################
162             sub buckets_dump {
163             ###########################################
164 0     0 0 0 my($self) = @_;
165 0         0 $self->lock();
166 0         0 $self->{data} = $self->{db}->load();
167 0         0 my $ret = $self->{data}->{chain}->as_string();
168 0         0 $self->unlock();
169 0         0 return $ret;
170             }
171              
172             ###########################################
173             sub buckets_rotate {
174             ###########################################
175 0     0 0 0 my($self) = @_;
176 0         0 my $ret = $self->{data}->{chain}->rotate();
177 0         0 return $ret;
178             }
179              
180             package Data::Throttler::Range;
181              
182             ###########################################
183             sub new {
184             ###########################################
185 55     55   71 my($class, $start, $stop) = @_;
186              
187 55         140 my $self = {
188             start => $start,
189             stop => $stop,
190             };
191 55         287 bless $self, $class;
192             }
193              
194             ###########################################
195             sub min {
196             ###########################################
197 23     23   21 my($self) = @_;
198 23         45 return $self->{start};
199             }
200              
201             ###########################################
202             sub max {
203             ###########################################
204 23     23   25 my($self) = @_;
205 23         42 return $self->{stop};
206             }
207              
208             ###########################################
209             sub member {
210             ###########################################
211 184     184   160 my($self, $time) = @_;
212              
213 184   100     794 return ($time >= $self->{start} and $time <= $self->{stop});
214             }
215              
216             ###########################################
217             package Data::Throttler::BucketChain;
218             ###########################################
219 3     3   5700 use Log::Log4perl qw(:easy);
  3         8  
  3         17  
220              
221             our $DEFAULT_KEY = "_default";
222              
223             ###########################################
224             sub new {
225             ###########################################
226 6     6   24 my($class, %options) = @_;
227              
228 6         64 my $self = {
229             max_items => undef,
230             interval => undef,
231             %options,
232             };
233              
234 6 50 33     50 if(!$self->{max_items} or
235             !$self->{interval}) {
236 0         0 LOGDIE "Both max_items and interval need to be defined";
237             }
238              
239 6 50       25 if(!$self->{nof_buckets}) {
240 6         18 $self->{nof_buckets} = 10;
241             }
242              
243 6 100       43 if($self->{nof_buckets} > $self->{interval}) {
244 1         3 $self->{nof_buckets} = $self->{interval};
245             }
246              
247 6         22 bless $self, $class;
248              
249 6         21 $self->reset();
250              
251 6         39 return $self;
252             }
253              
254             ###########################################
255             sub reset {
256             ###########################################
257 6     6   14 my($self) = @_;
258              
259 6         37 $self->{buckets} = [];
260              
261 6         35 my $bucket_time_span = int ($self->{interval} /
262             $self->{nof_buckets});
263              
264 6         14 $self->{bucket_time_span} = $bucket_time_span;
265              
266 6         35 my $time_start = time() -
267             ($self->{nof_buckets}-1) * $bucket_time_span;
268              
269 6         25 for(1..$self->{nof_buckets}) {
270 52         67 my $time_end = $time_start + $bucket_time_span - 1;
271 52         87 DEBUG "Creating bucket ", hms($time_start), " - ", hms($time_end);
272 52         313 push @{$self->{buckets}}, {
  52         168  
273             time => Data::Throttler::Range->new($time_start, $time_end),
274             count => {},
275             };
276 52         92 $time_start = $time_end + 1;
277             }
278              
279 6         18 $self->{head_bucket_idx} = 0;
280 6         10 $self->{tail_bucket_idx} = $#{$self->{buckets}};
  6         23  
281             }
282              
283             ###########################################
284             sub first_bucket {
285             ###########################################
286 19     19   16 my($self) = @_;
287              
288 19         26 $self->{current_idx} = $self->{head_bucket_idx};
289 19         53 return $self->{buckets}->[ $self->{current_idx} ];
290             }
291              
292             ###########################################
293             sub last_bucket {
294             ###########################################
295 50     50   57 my($self) = @_;
296              
297 50         108 $self->{current_idx} = $self->{tail_bucket_idx};
298 50         171 return $self->{buckets}->[ $self->{current_idx} ];
299             }
300              
301             ###########################################
302             sub next_bucket {
303             ###########################################
304 121     121   89 my($self) = @_;
305              
306 121 100       174 return undef if $self->{current_idx} == $self->{tail_bucket_idx};
307              
308 120         87 $self->{current_idx}++;
309 120 50       92 $self->{current_idx} = 0 if $self->{current_idx} > $#{$self->{buckets}};
  120         167  
310              
311 120         215 return $self->{buckets}->[ $self->{current_idx} ];
312             }
313              
314             ###########################################
315             sub as_string {
316             ###########################################
317 0     0   0 my($self) = @_;
318              
319 0         0 require Text::ASCIITable;
320              
321 0         0 my $t = Text::ASCIITable->new();
322 0         0 $t->setCols("#", "idx", ("Time: " . hms(time)), "Key", "Count");
323              
324 0         0 my $count = 1;
325              
326 0         0 for(my $b = $self->first_bucket(); $b; $b = $self->next_bucket()) {
327 0         0 my $span = hms($b->{time}->min) . " - " . hms($b->{time}->max);
328 0         0 my $idx = $self->{current_idx};
329 0         0 my $count_string = $count;
330              
331 0 0       0 if(! scalar keys %{$b->{count}}) {
  0         0  
332 0         0 $t->addRow($count_string, $idx, $span, "", "");
333             }
334              
335 0         0 foreach my $key (sort keys %{$b->{count}}) {
  0         0  
336 0         0 $t->addRow($count_string, $idx, $span, $key, $b->{count}->{$key});
337 0         0 $span = "";
338 0         0 $count_string = "";
339 0         0 $idx = "";
340             }
341              
342 0         0 $count++;
343             }
344 0         0 return $t->draw();
345             }
346              
347             ###########################################
348             sub hms {
349             ###########################################
350 266     266   278 my($time) = @_;
351              
352 266         4378 my ($sec,$min,$hour) = localtime($time);
353 266         1362 return sprintf "%02d:%02d:%02d",
354             $hour, $min, $sec;
355             }
356              
357             ###########################################
358             sub bucket_add {
359             ###########################################
360 3     3   5 my($self, $time) = @_;
361              
362             # ... and append a new one at the end
363 3         62 my $time_start = $self->{buckets}->
364             [$self->{tail_bucket_idx}]->{time}->max + 1;
365 3         89 my $time_end = $time_start + $self->{bucket_time_span} - 1;
366              
367 3         8 DEBUG "Adding bucket: ", hms($time_start), " - ", hms($time_end);
368              
369 3         24 $self->{tail_bucket_idx}++;
370 3         14 $self->{tail_bucket_idx} = 0 if $self->{tail_bucket_idx} >
371 3 100       5 $#{$self->{buckets}};
372 3         6 $self->{head_bucket_idx}++;
373 3         10 $self->{head_bucket_idx} = 0 if $self->{head_bucket_idx} >
374 3 100       5 $#{$self->{buckets}};
375              
376 3         13 $self->{buckets}->[ $self->{tail_bucket_idx} ] = {
377             time => Data::Throttler::Range->new($time_start, $time_end),
378             count => {},
379             };
380             }
381              
382             ###########################################
383             sub rotate {
384             ###########################################
385 1     1   2 my($self, $time) = @_;
386 1 50       6 $time = time() unless defined $time;
387              
388             # If the last bucket handles a time interval that doesn't cover
389             # $time, we need to rotate the bucket brigade. The first bucket
390             # will be cleared and re-used as the new last bucket of the chain.
391              
392 1         5 DEBUG "Rotating buckets time=", hms($time), " ",
393             "head=", $self->{head_bucket_idx};
394              
395 1 50       9 if($self->last_bucket->{time}->{stop} >= $time) {
396             # $time is still covered in the bucket brigade, we're golden
397 0         0 DEBUG "Rotation not necessary (",
398             hms($self->last_bucket->{time}->{stop}),
399             " - ", hms($time), ")";
400 0         0 return 1;
401             }
402              
403             # If we're too far off, just dump all buckets and re-init
404 1 50       8 if($self->{buckets}->[ $self->{tail_bucket_idx} ]->{time}->max <
405             $time - $self->{interval}) {
406 0         0 DEBUG "Too far off, resetting (", hms($time), " >> ",
407             hms($self->{buckets}->[ $self->{head_bucket_idx} ]->{time}->min),
408             ")";
409 0         0 $self->reset();
410 0         0 return 1;
411             }
412              
413 1         5 while($self->last_bucket()->{time}->min <= $time) {
414 3         8 $self->bucket_add();
415             }
416              
417 1         6 DEBUG "After rotation: ",
418             hms($self->{buckets}->[ $self->{head_bucket_idx} ]->{time}->min),
419             " - ",
420             hms($self->{buckets}->[ $self->{tail_bucket_idx} ]->{time}->max),
421             " (covers ", hms($time), ")";
422             }
423              
424             ###########################################
425             sub bucket_find {
426             ###########################################
427 45     45   61 my($self, $time) = @_;
428              
429 45         73 DEBUG "Searching bucket for time=", hms($time);
430              
431             # Search in the newest bucket first, chances are it's there
432 45         261 my $last_bucket = $self->last_bucket();
433 45 100       132 if($last_bucket->{time}->member($time)) {
434 26         54 DEBUG hms($time), " covered by last bucket";
435 26         161 return $last_bucket;
436             }
437              
438 19         37 for(my $b = $self->first_bucket(); $b; $b = $self->next_bucket()) {
439 139 100       166 if($b->{time}->member($time)) {
440 18         31 DEBUG "Found bucket ", hms($b->{time}->min),
441             " - ", hms($b->{time}->max);
442 18         98 return $b;
443             }
444             }
445              
446 1         3 DEBUG "No bucket found for time=", hms($time);
447 1         9 return undef;
448             }
449              
450             ###########################################
451             sub try_push {
452             ###########################################
453 44     44   83 my($self, %options) = @_;
454              
455 44         59 my $key = $DEFAULT_KEY;
456 44 100       146 $key = $options{key} if defined $options{key};
457              
458 44         68 my $time = time();
459 44 100       92 $time = $options{time} if defined $options{time};
460              
461 44         53 my $count = 1;
462 44 50       173 $count = $options{count} if defined $options{count};
463              
464 44         130 DEBUG "Trying to push $key ", hms($time), " $count";
465              
466 44         370 my $b = $self->bucket_find($time);
467              
468 44 100       106 if(!$b) {
469 1         6 $self->rotate($time);
470 1         11 $b = $self->bucket_find($time);
471             }
472              
473             # Determine the total count for this key
474 44         49 my $val = 0;
475 44         48 for(0..$#{$self->{buckets}}) {
  44         130  
476 400 100       840 $val += $self->{buckets}->[$_]->{count}->{$key} if
477             exists $self->{buckets}->[$_]->{count}->{$key};
478             }
479              
480 44 100       214 if($val >= $self->{max_items}) {
481 9         68 DEBUG "Not increasing counter $key by $count (already at max)";
482 9         56 return 0;
483             } else {
484 35         254 DEBUG "Increasing counter $key by $count ",
485             "($val|$self->{max_items})";
486 35         189 $b->{count}->{$key} += $count;
487 35         84 return 1;
488             }
489              
490 0         0 LOGDIE "Time $time is outside of bucket range\n", $self->as_string;
491 0         0 return undef;
492             }
493              
494             ###########################################
495             sub reset_key {
496             ###########################################
497 5     5   15 my ($self, %options) = @_;
498              
499 5         10 my $key = $DEFAULT_KEY;
500 5 100       22 $key = $options{key} if defined $options{key};
501              
502 5         23 DEBUG "Resetting $key";
503              
504 5         40 my $total = 0;
505 5         7 for(0..$#{$self->{buckets}}) {
  5         23  
506 50 100       219 if (exists $self->{buckets}->[$_]->{count}->{$key}) {
507 5         17 $total += $self->{buckets}->[$_]->{count}->{$key};
508 5         20 $self->{buckets}->[$_]->{count}->{$key} = 0;
509             }
510             }
511              
512 5         15 return $total;
513             }
514              
515             ###########################################
516             package Data::Throttler::Backend::Base;
517             ###########################################
518              
519             ###########################################
520             sub new {
521             ###########################################
522 7     7   23 my($class, %options) = @_;
523              
524 7         22 my $self = {
525             %options,
526             };
527              
528 7         25 bless $self, $class;
529 7         32 $self->init();
530 7         32 return $self;
531             }
532              
533 3     3   9 sub exists { 0 }
534 6     6   21 sub create { 1 }
535             #sub save { }
536             #sub load { }
537 3     3   3 sub init { }
538 37     37   28 sub lock { }
539 37     37   32 sub unlock { }
540 0     0   0 sub backend_store_ok { 1 }
541              
542             ###########################################
543             package Data::Throttler::Backend::Memory;
544             ###########################################
545 3     3   7969 use base 'Data::Throttler::Backend::Base';
  3         10  
  3         1910  
546              
547             ###########################################
548             sub save {
549             ###########################################
550 40     40   44 my($self, $data) = @_;
551 40         56 $self->{data} = $data;
552             }
553              
554             ###########################################
555             sub load {
556             ###########################################
557 37     37   33 my($self) = @_;
558 37         56 return $self->{data};
559             }
560              
561             ###########################################
562             package Data::Throttler::Backend::YAML;
563             ###########################################
564 3     3   22 use base 'Data::Throttler::Backend::Base';
  3         11  
  3         999  
565 3     3   19 use Log::Log4perl qw(:easy);
  3         5  
  3         206  
566 3     3   2055 use Fcntl qw(:flock);
  3         7  
  3         1990  
567              
568             ###########################################
569             sub init {
570             ###########################################
571 4     4   9 my($self) = @_;
572              
573 4         47 require YAML;
574             }
575              
576             ###########################################
577             sub backend_store_ok {
578             ###########################################
579 2     2   7 my($self) = @_;
580              
581             # Legacy instances used DBM::Deep, but those data stores will be
582             # replaced by YAML backends. If we reuse a backend data store, make
583             # sure it's a YAML file and not a DBM::Deep blob.
584 2 50       119 if(! -f $self->{db_file} ) {
585 0         0 return 1;
586             }
587              
588 2         8 eval {
589 2         9 $self->load();
590             };
591              
592 2 50       94560 if($@) {
593 0         0 ERROR "$self->{db_file} apparently isn't a YAML file, we'll ",
594             "have to dump it and rebuild the bucket chain in YAML";
595 0         0 return 0;
596             }
597              
598 2         28 return 1;
599             }
600              
601             ###########################################
602             sub exists {
603             ###########################################
604 4     4   8 my($self) = @_;
605              
606 4         207 return -f $self->{db_file};
607             }
608              
609             ###########################################
610             sub save {
611             ###########################################
612 15     15   45 my($self, $data) = @_;
613              
614 15         104 DEBUG "Saving YAML file $self->{db_file}";
615 15         181 YAML::DumpFile( $self->{db_file}, $data );
616             }
617              
618             ###########################################
619             sub load {
620             ###########################################
621 17     17   46 my($self) = @_;
622              
623 17         181 DEBUG "Loading YAML file $self->{db_file}";
624 17         238 return YAML::LoadFile( $self->{db_file} );
625             }
626              
627             ###########################################
628             sub lock {
629             ###########################################
630 15     15   31 my($self) = @_;
631              
632 15 50       1421 open $self->{fh}, "+<", $self->{db_file} or
633             LOGDIE "Can't open $self->{db_file} for locking: $!";
634 15         221 flock $self->{fh}, LOCK_EX;
635             }
636              
637             ###########################################
638             sub unlock {
639             ###########################################
640 15     15   34 my($self) = @_;
641 15         340 flock $self->{fh}, LOCK_UN;
642             }
643              
644             1;
645              
646             __END__