File Coverage

blib/lib/Data/Throttler.pm
Criterion Covered Total %
statement 246 287 85.7
branch 59 72 81.9
condition 9 15 60.0
subroutine 46 50 92.0
pod 0 9 0.0
total 360 433 83.1


line stmt bran cond sub pod time code
1             ###########################################
2             package Data::Throttler;
3             ###########################################
4 3     3   45431 use strict;
  3         5  
  3         98  
5 3     3   12 use warnings;
  3         4  
  3         74  
6 3     3   2342 use Log::Log4perl qw(:easy);
  3         115948  
  3         17  
7              
8             our $VERSION = "0.08";
9             our $DB_VERSION = "1.1";
10              
11             ###########################################
12             sub new {
13             ###########################################
14 7     7 0 7738 my($class, %options) = @_;
15              
16 7         63 my $self = {
17             db_version => $DB_VERSION,
18             backend => "Memory",
19             backend_options => {},
20             reset => 0,
21             %options,
22             };
23              
24 7 100       31 if($self->{db_file}) {
25             # legacy option, translate
26 4         15 $self->{backend_options} = {
27             db_file => $self->{db_file},
28             };
29 4         11 $self->{backend} = "YAML";
30             }
31              
32 7         24 my $backend_class = "Data::Throttler::Backend::$self->{backend}";
33              
34 7         63 $self->{db} = $backend_class->new(
35 7         11 %{ $self->{backend_options} } );
36              
37 7         17 $self->{changed} = 0;
38              
39 7         13 bless $self, $class;
40              
41 7         12 my $create = 1;
42              
43 7 100       36 if( $self->{ db }->exists() ) {
44 3         10 DEBUG "Backend store exists";
45 3         25 $self->lock();
46 3         24 $self->{data} = $self->{ db }->load();
47              
48 3         54591 $create = 0;
49              
50 3 100 66     50 if($self->{data}->{chain} and
      33        
51             ($self->{data}->{chain}->{max_items} != $options{max_items} or
52             $self->{data}->{chain}->{interval} != $options{interval})) {
53 2         5 $self->{changed} = 1;
54 2         3 $create = 1;
55             }
56              
57 3 100 66     23 if($options{reset} or !$self->{ db }->backend_store_ok() ) {
58 1         2 $create = 1;
59             }
60 3         12 $self->unlock();
61             }
62            
63 7 100       26 if($create) {
64 6 50       29 $self->{ db }->create( \%options ) or
65             LOGDIE "Creating backend store failed";
66              
67             # create bucket chain
68 6         33 $self->create( {
69             max_items => $options{max_items},
70             interval => $options{interval},
71             });
72              
73 6         55 $self->{db}->save( $self->{data} );
74             }
75              
76 7         49414 return $self;
77             }
78              
79             ###########################################
80             sub create {
81             ###########################################
82 6     6 0 13 my($self, $options) = @_;
83              
84 6 100       17 if( $self->{changed} ) {
85 2         26 ERROR "Bucket chain parameters have changed ",
86             "(max_items: $self->{data}->{chain}->{max_items}/",
87             "$options->{max_items} ",
88             "(interval: $self->{data}->{chain}->{interval}/",
89             "$options->{interval})", ", throwing old chain away";
90 2         11 $self->{changed} = 0;
91             }
92              
93 6         38 DEBUG "Creating bucket chain max_items=$options->{max_items} ",
94             "interval=$options->{interval}";
95              
96 6         85 $self->{data}->{chain} = Data::Throttler::BucketChain->new(
97             max_items => $options->{max_items},
98             interval => $options->{interval},
99             );
100             }
101              
102             ###########################################
103             sub lock {
104             ###########################################
105 57     57 0 64 my($self) = @_;
106 57         212 DEBUG "Trying to get lock ($$)";
107 57         341 $self->{db}->lock();
108 57         160 DEBUG "Lock on ($$)";
109             }
110              
111             ###########################################
112             sub unlock {
113             ###########################################
114 57     57 0 80 my($self) = @_;
115 57         147 DEBUG "Lock off";
116 57         598 $self->{db}->unlock();
117             }
118              
119             ###########################################
120             sub current_value {
121             ###########################################
122 2     2 0 3 my($self, %options) = @_;
123              
124 2         6 $self->{data} = $self->{db}->load();
125 2         6 my $ret = $self->{data}->{chain}->current_value(%options);
126              
127 2         14 return $ret;
128             }
129              
130             ###########################################
131             sub try_push {
132             ###########################################
133 49     49 0 1733 my($self, %options) = @_;
134              
135 49 100       118 if(exists $options{key}) {
136 26         64 DEBUG "Pushing key $options{key}";
137             } else {
138 23         61 DEBUG "Pushing keyless item";
139             }
140              
141 49         302 $self->lock();
142              
143 49         250 $self->{data} = $self->{db}->load();
144 49         185188 my $ret = $self->{data}->{chain}->try_push(%options);
145 49         123 $self->{db}->save( $self->{data} );
146              
147 49         148834 $self->unlock();
148 49         175 return $ret;
149             }
150              
151             ###########################################
152             sub reset_key {
153             ###########################################
154 5     5 0 13 my($self, %options) = @_;
155              
156 5 100       15 if(exists $options{key}) {
157 3         9 DEBUG "Resetting count for $options{key}";
158             } else {
159 2         7 DEBUG "Resetting count for keyless item";
160             }
161              
162 5         36 $self->lock();
163              
164 5         68 $self->{data} = $self->{db}->load();
165 5         35864 my $ret = $self->{data}->{chain}->reset_key(%options);
166 5         15 $self->{db}->save( $self->{data} );
167              
168 5         28746 $self->unlock();
169 5         25 return $ret;
170             }
171              
172             ###########################################
173             sub buckets_dump {
174             ###########################################
175 0     0 0 0 my($self) = @_;
176 0         0 $self->lock();
177 0         0 $self->{data} = $self->{db}->load();
178 0         0 my $ret = $self->{data}->{chain}->as_string();
179 0         0 $self->unlock();
180 0         0 return $ret;
181             }
182              
183             ###########################################
184             sub buckets_rotate {
185             ###########################################
186 0     0 0 0 my($self) = @_;
187 0         0 my $ret = $self->{data}->{chain}->rotate();
188 0         0 return $ret;
189             }
190              
191             package Data::Throttler::Range;
192              
193             ###########################################
194             sub new {
195             ###########################################
196 55     55   58 my($class, $start, $stop) = @_;
197              
198 55         98 my $self = {
199             start => $start,
200             stop => $stop,
201             };
202 55         195 bless $self, $class;
203             }
204              
205             ###########################################
206             sub min {
207             ###########################################
208 28     28   66 my($self) = @_;
209 28         51 return $self->{start};
210             }
211              
212             ###########################################
213             sub max {
214             ###########################################
215 28     28   23 my($self) = @_;
216 28         52 return $self->{stop};
217             }
218              
219             ###########################################
220             sub member {
221             ###########################################
222 194     194   178 my($self, $time) = @_;
223              
224 194   100     810 return ($time >= $self->{start} and $time <= $self->{stop});
225             }
226              
227             ###########################################
228             package Data::Throttler::BucketChain;
229             ###########################################
230 3     3   3697 use Log::Log4perl qw(:easy);
  3         8  
  3         11  
231              
232             our $DEFAULT_KEY = "_default";
233              
234             ###########################################
235             sub new {
236             ###########################################
237 6     6   18 my($class, %options) = @_;
238              
239 6         47 my $self = {
240             max_items => undef,
241             interval => undef,
242             %options,
243             };
244              
245 6 50 33     36 if(!$self->{max_items} or
246             !$self->{interval}) {
247 0         0 LOGDIE "Both max_items and interval need to be defined";
248             }
249              
250 6 50       23 if(!$self->{nof_buckets}) {
251 6         12 $self->{nof_buckets} = 10;
252             }
253              
254 6 100       33 if($self->{nof_buckets} > $self->{interval}) {
255 1         2 $self->{nof_buckets} = $self->{interval};
256             }
257              
258 6         16 bless $self, $class;
259              
260 6         18 $self->reset();
261              
262 6         24 return $self;
263             }
264              
265             ###########################################
266             sub reset {
267             ###########################################
268 6     6   7 my($self) = @_;
269              
270 6         34 $self->{buckets} = [];
271              
272 6         23 my $bucket_time_span = int ($self->{interval} /
273             $self->{nof_buckets});
274              
275 6         10 $self->{bucket_time_span} = $bucket_time_span;
276              
277 6         23 my $time_start = time() -
278             ($self->{nof_buckets}-1) * $bucket_time_span;
279              
280 6         21 for(1..$self->{nof_buckets}) {
281 52         53 my $time_end = $time_start + $bucket_time_span - 1;
282 52         69 DEBUG "Creating bucket ", hms($time_start), " - ", hms($time_end);
283 52         231 push @{$self->{buckets}}, {
  52         120  
284             time => Data::Throttler::Range->new($time_start, $time_end),
285             count => {},
286             };
287 52         68 $time_start = $time_end + 1;
288             }
289              
290 6         19 $self->{head_bucket_idx} = 0;
291 6         7 $self->{tail_bucket_idx} = $#{$self->{buckets}};
  6         19  
292             }
293              
294             ###########################################
295             sub first_bucket {
296             ###########################################
297 24     24   25 my($self) = @_;
298              
299 24         28 $self->{current_idx} = $self->{head_bucket_idx};
300 24         64 return $self->{buckets}->[ $self->{current_idx} ];
301             }
302              
303             ###########################################
304             sub last_bucket {
305             ###########################################
306 55     55   63 my($self) = @_;
307              
308 55         93 $self->{current_idx} = $self->{tail_bucket_idx};
309 55         136 return $self->{buckets}->[ $self->{current_idx} ];
310             }
311              
312             ###########################################
313             sub next_bucket {
314             ###########################################
315 121     121   100 my($self) = @_;
316              
317 121 100       190 return undef if $self->{current_idx} == $self->{tail_bucket_idx};
318              
319 120         89 $self->{current_idx}++;
320 120 50       92 $self->{current_idx} = 0 if $self->{current_idx} > $#{$self->{buckets}};
  120         198  
321              
322 120         242 return $self->{buckets}->[ $self->{current_idx} ];
323             }
324              
325             ###########################################
326             sub as_string {
327             ###########################################
328 0     0   0 my($self) = @_;
329              
330 0         0 require Text::ASCIITable;
331              
332 0         0 my $t = Text::ASCIITable->new();
333 0         0 $t->setCols("#", "idx", ("Time: " . hms(time)), "Key", "Count");
334              
335 0         0 my $count = 1;
336              
337 0         0 for(my $b = $self->first_bucket(); $b; $b = $self->next_bucket()) {
338 0         0 my $span = hms($b->{time}->min) . " - " . hms($b->{time}->max);
339 0         0 my $idx = $self->{current_idx};
340 0         0 my $count_string = $count;
341              
342 0 0       0 if(! scalar keys %{$b->{count}}) {
  0         0  
343 0         0 $t->addRow($count_string, $idx, $span, "", "");
344             }
345              
346 0         0 foreach my $key (sort keys %{$b->{count}}) {
  0         0  
347 0         0 $t->addRow($count_string, $idx, $span, $key, $b->{count}->{$key});
348 0         0 $span = "";
349 0         0 $count_string = "";
350 0         0 $idx = "";
351             }
352              
353 0         0 $count++;
354             }
355 0         0 return $t->draw();
356             }
357              
358             ###########################################
359             sub hms {
360             ###########################################
361 286     286   254 my($time) = @_;
362              
363 286         3210 my ($sec,$min,$hour) = localtime($time);
364 286         1159 return sprintf "%02d:%02d:%02d",
365             $hour, $min, $sec;
366             }
367              
368             ###########################################
369             sub bucket_add {
370             ###########################################
371 3     3   3 my($self, $time) = @_;
372              
373             # ... and append a new one at the end
374 3         8 my $time_start = $self->{buckets}->
375             [$self->{tail_bucket_idx}]->{time}->max + 1;
376 3         4 my $time_end = $time_start + $self->{bucket_time_span} - 1;
377              
378 3         6 DEBUG "Adding bucket: ", hms($time_start), " - ", hms($time_end);
379              
380 3         16 $self->{tail_bucket_idx}++;
381 3         9 $self->{tail_bucket_idx} = 0 if $self->{tail_bucket_idx} >
382 3 100       3 $#{$self->{buckets}};
383 3         4 $self->{head_bucket_idx}++;
384 3         8 $self->{head_bucket_idx} = 0 if $self->{head_bucket_idx} >
385 3 100       6 $#{$self->{buckets}};
386              
387 3         8 $self->{buckets}->[ $self->{tail_bucket_idx} ] = {
388             time => Data::Throttler::Range->new($time_start, $time_end),
389             count => {},
390             };
391             }
392              
393             ###########################################
394             sub rotate {
395             ###########################################
396 1     1   2 my($self, $time) = @_;
397 1 50       4 $time = time() unless defined $time;
398              
399             # If the last bucket handles a time interval that doesn't cover
400             # $time, we need to rotate the bucket brigade. The first bucket
401             # will be cleared and re-used as the new last bucket of the chain.
402              
403 1         3 DEBUG "Rotating buckets time=", hms($time), " ",
404             "head=", $self->{head_bucket_idx};
405              
406 1 50       6 if($self->last_bucket->{time}->{stop} >= $time) {
407             # $time is still covered in the bucket brigade, we're golden
408 0         0 DEBUG "Rotation not necessary (",
409             hms($self->last_bucket->{time}->{stop}),
410             " - ", hms($time), ")";
411 0         0 return 1;
412             }
413              
414             # If we're too far off, just dump all buckets and re-init
415 1 50       46 if($self->{buckets}->[ $self->{tail_bucket_idx} ]->{time}->max <
416             $time - $self->{interval}) {
417 0         0 DEBUG "Too far off, resetting (", hms($time), " >> ",
418             hms($self->{buckets}->[ $self->{head_bucket_idx} ]->{time}->min),
419             ")";
420 0         0 $self->reset();
421 0         0 return 1;
422             }
423              
424 1         3 while($self->last_bucket()->{time}->min <= $time) {
425 3         6 $self->bucket_add();
426             }
427              
428 1         4 DEBUG "After rotation: ",
429             hms($self->{buckets}->[ $self->{head_bucket_idx} ]->{time}->min),
430             " - ",
431             hms($self->{buckets}->[ $self->{tail_bucket_idx} ]->{time}->max),
432             " (covers ", hms($time), ")";
433             }
434              
435             ###########################################
436             sub bucket_find {
437             ###########################################
438 50     50   60 my($self, $time) = @_;
439              
440 50         77 DEBUG "Searching bucket for time=", hms($time);
441              
442             # Search in the newest bucket first, chances are it's there
443 50         255 my $last_bucket = $self->last_bucket();
444 50 100       106 if($last_bucket->{time}->member($time)) {
445 26         45 DEBUG hms($time), " covered by last bucket";
446 26         137 return $last_bucket;
447             }
448              
449 24         39 for(my $b = $self->first_bucket(); $b; $b = $self->next_bucket()) {
450 144 100       174 if($b->{time}->member($time)) {
451 23         38 DEBUG "Found bucket ", hms($b->{time}->min),
452             " - ", hms($b->{time}->max);
453 23         135 return $b;
454             }
455             }
456              
457 1         5 DEBUG "No bucket found for time=", hms($time);
458 1         6 return undef;
459             }
460              
461             ###########################################
462             sub current_value {
463             ###########################################
464 51     51   82 my($self, %options) = @_;
465              
466 51         60 my $key = $DEFAULT_KEY;
467 51 100       102 $key = $options{key} if defined $options{key};
468              
469 51         86 DEBUG "Getting current value for key=", $key;
470              
471 51         193 my $val = 0;
472              
473 51         47 for(0..$#{$self->{buckets}}) {
  51         115  
474 414 100       794 $val += $self->{buckets}->[$_]->{count}->{$key} if
475             exists $self->{buckets}->[$_]->{count}->{$key};
476             }
477              
478 51         97 return $val;
479             }
480              
481             ###########################################
482             sub try_push {
483             ###########################################
484 49     49   87 my($self, %options) = @_;
485              
486 49         61 my $key = $DEFAULT_KEY;
487 49 100       122 $key = $options{key} if defined $options{key};
488              
489 49         61 my $time = time();
490 49 100       95 $time = $options{time} if defined $options{time};
491              
492 49         50 my $count = 1;
493 49 50       101 $count = $options{count} if defined $options{count};
494              
495 49         63 my $force = 0;
496 49 100       106 $force = $options{force} if defined $options{force};
497              
498 49         107 DEBUG "Trying to push $key ", hms($time), " $count";
499              
500 49         367 my $b = $self->bucket_find($time);
501              
502 49 100       108 if(!$b) {
503 1         5 $self->rotate($time);
504 1         7 $b = $self->bucket_find($time);
505             }
506              
507             # Determine the total count for this key
508 49         114 my $val = $self->current_value(%options);
509              
510 49 100       114 if($val >= $self->{max_items}) {
511 13 100       28 if ($force) {
512 1         13 DEBUG "Increasing (force) counter $key by $count ",
513             "($val|$self->{max_items})";
514 1         5 $b->{count}->{$key} += $count;
515             } else {
516 12         40 DEBUG "Not increasing counter $key by $count (already at max)";
517             }
518 13         63 return 0;
519             } else {
520 36         139 DEBUG "Increasing counter $key by $count ",
521             "($val|$self->{max_items})";
522 36         162 $b->{count}->{$key} += $count;
523 36         67 return 1;
524             }
525              
526 0         0 LOGDIE "Time $time is outside of bucket range\n", $self->as_string;
527 0         0 return undef;
528             }
529              
530             ###########################################
531             sub reset_key {
532             ###########################################
533 5     5   10 my ($self, %options) = @_;
534              
535 5         10 my $key = $DEFAULT_KEY;
536 5 100       15 $key = $options{key} if defined $options{key};
537              
538 5         30 DEBUG "Resetting $key";
539              
540 5         34 my $total = 0;
541 5         7 for(0..$#{$self->{buckets}}) {
  5         17  
542 50 100       94 if (exists $self->{buckets}->[$_]->{count}->{$key}) {
543 5         11 $total += $self->{buckets}->[$_]->{count}->{$key};
544 5         15 $self->{buckets}->[$_]->{count}->{$key} = 0;
545             }
546             }
547              
548 5         12 return $total;
549             }
550              
551             ###########################################
552             package Data::Throttler::Backend::Base;
553             ###########################################
554              
555             ###########################################
556             sub new {
557             ###########################################
558 7     7   14 my($class, %options) = @_;
559              
560 7         18 my $self = {
561             %options,
562             };
563              
564 7         16 bless $self, $class;
565 7         29 $self->init();
566 7         21 return $self;
567             }
568              
569 3     3   7 sub exists { 0 }
570 6     6   18 sub create { 1 }
571             #sub save { }
572             #sub load { }
573 3     3   4 sub init { }
574 42     42   42 sub lock { }
575 42     42   53 sub unlock { }
576 0     0   0 sub backend_store_ok { 1 }
577              
578             ###########################################
579             package Data::Throttler::Backend::Memory;
580             ###########################################
581 3     3   5924 use base 'Data::Throttler::Backend::Base';
  3         6  
  3         1227  
582              
583             ###########################################
584             sub save {
585             ###########################################
586 45     45   51 my($self, $data) = @_;
587 45         58 $self->{data} = $data;
588             }
589              
590             ###########################################
591             sub load {
592             ###########################################
593 44     44   42 my($self) = @_;
594 44         69 return $self->{data};
595             }
596              
597             ###########################################
598             package Data::Throttler::Backend::YAML;
599             ###########################################
600 3     3   16 use base 'Data::Throttler::Backend::Base';
  3         4  
  3         656  
601 3     3   13 use Log::Log4perl qw(:easy);
  3         9  
  3         12  
602 3     3   1380 use Fcntl qw(:flock);
  3         6  
  3         1370  
603              
604             ###########################################
605             sub init {
606             ###########################################
607 4     4   6 my($self) = @_;
608              
609 4         27 require YAML;
610             }
611              
612             ###########################################
613             sub backend_store_ok {
614             ###########################################
615 2     2   5 my($self) = @_;
616              
617             # Legacy instances used DBM::Deep, but those data stores will be
618             # replaced by YAML backends. If we reuse a backend data store, make
619             # sure it's a YAML file and not a DBM::Deep blob.
620 2 50       63 if(! -f $self->{db_file} ) {
621 0         0 return 1;
622             }
623              
624 2         6 eval {
625 2         8 $self->load();
626             };
627              
628 2 50       41769 if($@) {
629 0         0 ERROR "$self->{db_file} apparently isn't a YAML file, we'll ",
630             "have to dump it and rebuild the bucket chain in YAML";
631 0         0 return 0;
632             }
633              
634 2         16 return 1;
635             }
636              
637             ###########################################
638             sub exists {
639             ###########################################
640 4     4   6 my($self) = @_;
641              
642 4         99 return -f $self->{db_file};
643             }
644              
645             ###########################################
646             sub save {
647             ###########################################
648 15     15   30 my($self, $data) = @_;
649              
650 15         83 DEBUG "Saving YAML file $self->{db_file}";
651 15         136 YAML::DumpFile( $self->{db_file}, $data );
652             }
653              
654             ###########################################
655             sub load {
656             ###########################################
657 17     17   34 my($self) = @_;
658              
659 17         84 DEBUG "Loading YAML file $self->{db_file}";
660 17         151 return YAML::LoadFile( $self->{db_file} );
661             }
662              
663             ###########################################
664             sub lock {
665             ###########################################
666 15     15   27 my($self) = @_;
667              
668 15 50       699 open $self->{fh}, "+<", $self->{db_file} or
669             LOGDIE "Can't open $self->{db_file} for locking: $!";
670 15         107 flock $self->{fh}, LOCK_EX;
671             }
672              
673             ###########################################
674             sub unlock {
675             ###########################################
676 15     15   26 my($self) = @_;
677 15         169 flock $self->{fh}, LOCK_UN;
678             }
679              
680             1;
681              
682             __END__