File Coverage

blib/lib/Data/Throttler.pm
Criterion Covered Total %
statement 230 271 84.8
branch 53 66 80.3
condition 9 15 60.0
subroutine 44 48 91.6
pod 0 8 0.0
total 336 408 82.3


line stmt bran cond sub pod time code
1             ###########################################
2             package Data::Throttler;
3             ###########################################
4 3     3   283390 use strict;
  3         9  
  3         216  
5 3     3   19 use warnings;
  3         8  
  3         225  
6 3     3   945830 use Log::Log4perl qw(:easy);
  3         1534731  
  3         24  
7              
8             our $VERSION = "0.06";
9             our $DB_VERSION = "1.1";
10              
11             ###########################################
12             sub new {
13             ###########################################
14 7     7 0 41126 my($class, %options) = @_;
15              
16 7         66 my $self = {
17             db_version => $DB_VERSION,
18             backend => "Memory",
19             backend_options => {},
20             reset => 0,
21             %options,
22             };
23              
24 7 100       45 if($self->{db_file}) {
25             # legacy option, translate
26 4         14 $self->{backend_options} = {
27             db_file => $self->{db_file},
28             };
29 4         11 $self->{backend} = "YAML";
30             }
31              
32 7         23 my $backend_class = "Data::Throttler::Backend::$self->{backend}";
33              
34 7         82 $self->{db} = $backend_class->new(
35 7         12 %{ $self->{backend_options} } );
36              
37 7         21 $self->{changed} = 0;
38              
39 7         15 bless $self, $class;
40              
41 7         12 my $create = 1;
42              
43 7 100       48 if( $self->{ db }->exists() ) {
44 3         9 DEBUG "Backend store exists";
45 3         29 $self->lock();
46 3         12 $self->{data} = $self->{ db }->load();
47              
48 3         76899 $create = 0;
49              
50 3 100 66     55 if($self->{data}->{chain} and
      33        
51             ($self->{data}->{chain}->{max_items} != $options{max_items} or
52             $self->{data}->{chain}->{interval} != $options{interval})) {
53 2         7 $self->{changed} = 1;
54 2         3 $create = 1;
55             }
56              
57 3 100 66     25 if($options{reset} or !$self->{ db }->backend_store_ok() ) {
58 1         4 $create = 1;
59             }
60 3         18 $self->unlock();
61             }
62            
63 7 100       40 if($create) {
64 6 50       44 $self->{ db }->create( \%options ) or
65             LOGDIE "Creating backend store failed";
66              
67             # create bucket chain
68 6         83 $self->create( {
69             max_items => $options{max_items},
70             interval => $options{interval},
71             });
72              
73 6         76 $self->{db}->save( $self->{data} );
74             }
75              
76 7         89172 return $self;
77             }
78              
79             ###########################################
80             sub create {
81             ###########################################
82 6     6 0 14 my($self, $options) = @_;
83              
84 6 100       26 if( $self->{changed} ) {
85 2         27 ERROR "Bucket chain parameters have changed ",
86             "(max_items: $self->{data}->{chain}->{max_items}/",
87             "$options->{max_items} ",
88             "(interval: $self->{data}->{chain}->{interval}/",
89             "$options->{interval})", ", throwing old chain away";
90 2         18 $self->{changed} = 0;
91             }
92              
93 6         50 DEBUG "Creating bucket chain max_items=$options->{max_items} ",
94             "interval=$options->{interval}";
95              
96 6         119 $self->{data}->{chain} = Data::Throttler::BucketChain->new(
97             max_items => $options->{max_items},
98             interval => $options->{interval},
99             );
100             }
101              
102             ###########################################
103             sub lock {
104             ###########################################
105 52     52 0 81 my($self) = @_;
106 52         373 DEBUG "Lock on";
107 52         401 $self->{db}->lock();
108             }
109              
110             ###########################################
111             sub unlock {
112             ###########################################
113 52     52 0 87 my($self) = @_;
114 52         167 DEBUG "Lock off";
115 52         508 $self->{db}->unlock();
116             }
117              
118             ###########################################
119             sub try_push {
120             ###########################################
121 44     44 0 3089 my($self, %options) = @_;
122              
123 44 100       138 if(exists $options{key}) {
124 26         86 DEBUG "Pushing key $options{key}";
125             } else {
126 18         66 DEBUG "Pushing keyless item";
127             }
128              
129 44         382 $self->lock();
130              
131 44         124 $self->{data} = $self->{db}->load();
132 44         297511 my $ret = $self->{data}->{chain}->try_push(%options);
133 44         166 $self->{db}->save( $self->{data} );
134              
135 44         196379 $self->unlock();
136 44         204 return $ret;
137             }
138              
139             ###########################################
140             sub reset_key {
141             ###########################################
142 5     5 0 16 my($self, %options) = @_;
143              
144 5 100       19 if(exists $options{key}) {
145 3         12 DEBUG "Resetting count for $options{key}";
146             } else {
147 2         9 DEBUG "Resetting count for keyless item";
148             }
149              
150 5         44 $self->lock();
151              
152 5         20 $self->{data} = $self->{db}->load();
153 5         58785 my $ret = $self->{data}->{chain}->reset_key(%options);
154 5         22 $self->{db}->save( $self->{data} );
155              
156 5         44778 $self->unlock();
157 5         35 return $ret;
158             }
159              
160             ###########################################
161             sub buckets_dump {
162             ###########################################
163 0     0 0 0 my($self) = @_;
164 0         0 $self->lock();
165 0         0 $self->{data} = $self->{db}->load();
166 0         0 my $ret = $self->{data}->{chain}->as_string();
167 0         0 $self->unlock();
168 0         0 return $ret;
169             }
170              
171             ###########################################
172             sub buckets_rotate {
173             ###########################################
174 0     0 0 0 my($self) = @_;
175 0         0 my $ret = $self->{data}->{chain}->rotate();
176 0         0 return $ret;
177             }
178              
179             package Data::Throttler::Range;
180              
181             ###########################################
182             sub new {
183             ###########################################
184 55     55   81 my($class, $start, $stop) = @_;
185              
186 55         158 my $self = {
187             start => $start,
188             stop => $stop,
189             };
190 55         284 bless $self, $class;
191             }
192              
193             ###########################################
194             sub min {
195             ###########################################
196 23     23   27 my($self) = @_;
197 23         61 return $self->{start};
198             }
199              
200             ###########################################
201             sub max {
202             ###########################################
203 23     23   48 my($self) = @_;
204 23         59 return $self->{stop};
205             }
206              
207             ###########################################
208             sub member {
209             ###########################################
210 184     184   230 my($self, $time) = @_;
211              
212 184   100     1148 return ($time >= $self->{start} and $time <= $self->{stop});
213             }
214              
215             ###########################################
216             package Data::Throttler::BucketChain;
217             ###########################################
218 3     3   6335 use Log::Log4perl qw(:easy);
  3         13  
  3         14  
219              
220             our $DEFAULT_KEY = "_default";
221              
222             ###########################################
223             sub new {
224             ###########################################
225 6     6   37 my($class, %options) = @_;
226              
227 6         72 my $self = {
228             max_items => undef,
229             interval => undef,
230             %options,
231             };
232              
233 6 50 33     49 if(!$self->{max_items} or
234             !$self->{interval}) {
235 0         0 LOGDIE "Both max_items and interval need to be defined";
236             }
237              
238 6 50       29 if(!$self->{nof_buckets}) {
239 6         18 $self->{nof_buckets} = 10;
240             }
241              
242 6 100       46 if($self->{nof_buckets} > $self->{interval}) {
243 1         3 $self->{nof_buckets} = $self->{interval};
244             }
245              
246 6         21 bless $self, $class;
247              
248 6         24 $self->reset();
249              
250 6         33 return $self;
251             }
252              
253             ###########################################
254             sub reset {
255             ###########################################
256 6     6   11 my($self) = @_;
257              
258 6         32 $self->{buckets} = [];
259              
260 6         34 my $bucket_time_span = int ($self->{interval} /
261             $self->{nof_buckets});
262              
263 6         24 $self->{bucket_time_span} = $bucket_time_span;
264              
265 6         35 my $time_start = time() -
266             ($self->{nof_buckets}-1) * $bucket_time_span;
267              
268 6         29 for(1..$self->{nof_buckets}) {
269 52         147 my $time_end = $time_start + $bucket_time_span - 1;
270 52         97 DEBUG "Creating bucket ", hms($time_start), " - ", hms($time_end);
271 52         440 push @{$self->{buckets}}, {
  52         196  
272             time => Data::Throttler::Range->new($time_start, $time_end),
273             count => {},
274             };
275 52         112 $time_start = $time_end + 1;
276             }
277              
278 6         17 $self->{head_bucket_idx} = 0;
279 6         8 $self->{tail_bucket_idx} = $#{$self->{buckets}};
  6         22  
280             }
281              
282             ###########################################
283             sub first_bucket {
284             ###########################################
285 19     19   22 my($self) = @_;
286              
287 19         31 $self->{current_idx} = $self->{head_bucket_idx};
288 19         73 return $self->{buckets}->[ $self->{current_idx} ];
289             }
290              
291             ###########################################
292             sub last_bucket {
293             ###########################################
294 50     50   75 my($self) = @_;
295              
296 50         117 $self->{current_idx} = $self->{tail_bucket_idx};
297 50         170 return $self->{buckets}->[ $self->{current_idx} ];
298             }
299              
300             ###########################################
301             sub next_bucket {
302             ###########################################
303 121     121   142 my($self) = @_;
304              
305 121 100       345 return undef if $self->{current_idx} == $self->{tail_bucket_idx};
306              
307 120         129 $self->{current_idx}++;
308 120 50       122 $self->{current_idx} = 0 if $self->{current_idx} > $#{$self->{buckets}};
  120         295  
309              
310 120         405 return $self->{buckets}->[ $self->{current_idx} ];
311             }
312              
313             ###########################################
314             sub as_string {
315             ###########################################
316 0     0   0 my($self) = @_;
317              
318 0         0 require Text::ASCIITable;
319              
320 0         0 my $t = Text::ASCIITable->new();
321 0         0 $t->setCols("#", "idx", ("Time: " . hms(time)), "Key", "Count");
322              
323 0         0 my $count = 1;
324              
325 0         0 for(my $b = $self->first_bucket(); $b; $b = $self->next_bucket()) {
326 0         0 my $span = hms($b->{time}->min) . " - " . hms($b->{time}->max);
327 0         0 my $idx = $self->{current_idx};
328 0         0 my $count_string = $count;
329              
330 0 0       0 if(! scalar keys %{$b->{count}}) {
  0         0  
331 0         0 $t->addRow($count_string, $idx, $span, "", "");
332             }
333              
334 0         0 foreach my $key (sort keys %{$b->{count}}) {
  0         0  
335 0         0 $t->addRow($count_string, $idx, $span, $key, $b->{count}->{$key});
336 0         0 $span = "";
337 0         0 $count_string = "";
338 0         0 $idx = "";
339             }
340              
341 0         0 $count++;
342             }
343 0         0 return $t->draw();
344             }
345              
346             ###########################################
347             sub hms {
348             ###########################################
349 266     266   391 my($time) = @_;
350              
351 266         11164 my ($sec,$min,$hour) = localtime($time);
352 266         1826 return sprintf "%02d:%02d:%02d",
353             $hour, $min, $sec;
354             }
355              
356             ###########################################
357             sub bucket_add {
358             ###########################################
359 3     3   5 my($self, $time) = @_;
360              
361             # ... and append a new one at the end
362 3         10 my $time_start = $self->{buckets}->
363             [$self->{tail_bucket_idx}]->{time}->max + 1;
364 3         5 my $time_end = $time_start + $self->{bucket_time_span} - 1;
365              
366 3         65 DEBUG "Adding bucket: ", hms($time_start), " - ", hms($time_end);
367              
368 3         25 $self->{tail_bucket_idx}++;
369 3         14 $self->{tail_bucket_idx} = 0 if $self->{tail_bucket_idx} >
370 3 100       7 $#{$self->{buckets}};
371 3         5 $self->{head_bucket_idx}++;
372 3         10 $self->{head_bucket_idx} = 0 if $self->{head_bucket_idx} >
373 3 100       5 $#{$self->{buckets}};
374              
375 3         12 $self->{buckets}->[ $self->{tail_bucket_idx} ] = {
376             time => Data::Throttler::Range->new($time_start, $time_end),
377             count => {},
378             };
379             }
380              
381             ###########################################
382             sub rotate {
383             ###########################################
384 1     1   3 my($self, $time) = @_;
385 1 50       4 $time = time() unless defined $time;
386              
387             # If the last bucket handles a time interval that doesn't cover
388             # $time, we need to rotate the bucket brigade. The first bucket
389             # will be cleared and re-used as the new last bucket of the chain.
390              
391 1         4 DEBUG "Rotating buckets time=", hms($time), " ",
392             "head=", $self->{head_bucket_idx};
393              
394 1 50       9 if($self->last_bucket->{time}->{stop} >= $time) {
395             # $time is still covered in the bucket brigade, we're golden
396 0         0 DEBUG "Rotation not necessary (",
397             hms($self->last_bucket->{time}->{stop}),
398             " - ", hms($time), ")";
399 0         0 return 1;
400             }
401              
402             # If we're too far off, just dump all buckets and re-init
403 1 50       8 if($self->{buckets}->[ $self->{tail_bucket_idx} ]->{time}->max <
404             $time - $self->{interval}) {
405 0         0 DEBUG "Too far off, resetting (", hms($time), " >> ",
406             hms($self->{buckets}->[ $self->{head_bucket_idx} ]->{time}->min),
407             ")";
408 0         0 $self->reset();
409 0         0 return 1;
410             }
411              
412 1         6 while($self->last_bucket()->{time}->min <= $time) {
413 3         8 $self->bucket_add();
414             }
415              
416 1         6 DEBUG "After rotation: ",
417             hms($self->{buckets}->[ $self->{head_bucket_idx} ]->{time}->min),
418             " - ",
419             hms($self->{buckets}->[ $self->{tail_bucket_idx} ]->{time}->max),
420             " (covers ", hms($time), ")";
421             }
422              
423             ###########################################
424             sub bucket_find {
425             ###########################################
426 45     45   64 my($self, $time) = @_;
427              
428 45         88 DEBUG "Searching bucket for time=", hms($time);
429              
430             # Search in the newest bucket first, chances are it's there
431 45         324 my $last_bucket = $self->last_bucket();
432 45 100       120 if($last_bucket->{time}->member($time)) {
433 26         50 DEBUG hms($time), " covered by last bucket";
434 26         183 return $last_bucket;
435             }
436              
437 19         48 for(my $b = $self->first_bucket(); $b; $b = $self->next_bucket()) {
438 139 100       268 if($b->{time}->member($time)) {
439 18         44 DEBUG "Found bucket ", hms($b->{time}->min),
440             " - ", hms($b->{time}->max);
441 18         141 return $b;
442             }
443             }
444              
445 1         5 DEBUG "No bucket found for time=", hms($time);
446 1         11 return undef;
447             }
448              
449             ###########################################
450             sub try_push {
451             ###########################################
452 44     44   113 my($self, %options) = @_;
453              
454 44         81 my $key = $DEFAULT_KEY;
455 44 100       237 $key = $options{key} if defined $options{key};
456              
457 44         73 my $time = time();
458 44 100       367 $time = $options{time} if defined $options{time};
459              
460 44         55 my $count = 1;
461 44 50       148 $count = $options{count} if defined $options{count};
462              
463 44         146 DEBUG "Trying to push $key ", hms($time), " $count";
464              
465 44         778 my $b = $self->bucket_find($time);
466              
467 44 100       129 if(!$b) {
468 1         8 $self->rotate($time);
469 1         11 $b = $self->bucket_find($time);
470             }
471              
472             # Determine the total count for this key
473 44         64 my $val = 0;
474 44         54 for(0..$#{$self->{buckets}}) {
  44         155  
475 400 100       1480 $val += $self->{buckets}->[$_]->{count}->{$key} if
476             exists $self->{buckets}->[$_]->{count}->{$key};
477             }
478              
479 44 100       140 if($val >= $self->{max_items}) {
480 9         45 DEBUG "Not increasing counter $key by $count (already at max)";
481 9         72 return 0;
482             } else {
483 35         720 DEBUG "Increasing counter $key by $count ",
484             "($val|$self->{max_items})";
485 35         258 $b->{count}->{$key} += $count;
486 35         105 return 1;
487             }
488              
489 0         0 LOGDIE "Time $time is outside of bucket range\n", $self->as_string;
490 0         0 return undef;
491             }
492              
493             ###########################################
494             sub reset_key {
495             ###########################################
496 5     5   14 my ($self, %options) = @_;
497              
498 5         10 my $key = $DEFAULT_KEY;
499 5 100       22 $key = $options{key} if defined $options{key};
500              
501 5         25 DEBUG "Resetting $key";
502              
503 5         43 my $total = 0;
504 5         9 for(0..$#{$self->{buckets}}) {
  5         23  
505 50 100       139 if (exists $self->{buckets}->[$_]->{count}->{$key}) {
506 5         57 $total += $self->{buckets}->[$_]->{count}->{$key};
507 5         19 $self->{buckets}->[$_]->{count}->{$key} = 0;
508             }
509             }
510              
511 5         16 return $total;
512             }
513              
514             ###########################################
515             package Data::Throttler::Backend::Base;
516             ###########################################
517              
518             ###########################################
519             sub new {
520             ###########################################
521 7     7   23 my($class, %options) = @_;
522              
523 7         21 my $self = {
524             %options,
525             };
526              
527 7         24 bless $self, $class;
528 7         36 $self->init();
529 7         29 return $self;
530             }
531              
532 3     3   11 sub exists { 0 }
533 6     6   21 sub create { 1 }
534             #sub save { }
535             #sub load { }
536 3     3   5 sub init { }
537 37     37   64 sub lock { }
538 37     37   53 sub unlock { }
539 0     0   0 sub backend_store_ok { 1 }
540              
541             ###########################################
542             package Data::Throttler::Backend::Memory;
543             ###########################################
544 3     3   10619 use base 'Data::Throttler::Backend::Base';
  3         7  
  3         2961  
545              
546             ###########################################
547             sub save {
548             ###########################################
549 40     40   58 my($self, $data) = @_;
550 40         129 $self->{data} = $data;
551             }
552              
553             ###########################################
554             sub load {
555             ###########################################
556 37     37   45 my($self) = @_;
557 37         82 return $self->{data};
558             }
559              
560             ###########################################
561             package Data::Throttler::Backend::YAML;
562             ###########################################
563 3     3   22 use base 'Data::Throttler::Backend::Base';
  3         7  
  3         1681  
564 3     3   21 use Log::Log4perl qw(:easy);
  3         7  
  3         21  
565 3     3   2381 use Fcntl qw(:flock);
  3         5  
  3         3392  
566              
567             ###########################################
568             sub init {
569             ###########################################
570 4     4   7 my($self) = @_;
571              
572 4         51 require YAML;
573             }
574              
575             ###########################################
576             sub backend_store_ok {
577             ###########################################
578 2     2   4 my($self) = @_;
579              
580             # Legacy instances used DBM::Deep, but those data stores will be
581             # replaced by YAML backends. If we reuse a backend data store, make
582             # sure it's a YAML file and not a DBM::Deep blob.
583 2 50       114 if(! -f $self->{db_file} ) {
584 0         0 return 1;
585             }
586              
587 2         6 eval {
588 2         7 $self->load();
589             };
590              
591 2 50       58890 if($@) {
592 0         0 ERROR "$self->{db_file} apparently isn't a YAML file, we'll ",
593             "have to dump it and rebuild the bucket chain in YAML";
594 0         0 return 0;
595             }
596              
597 2         19 return 1;
598             }
599              
600             ###########################################
601             sub exists {
602             ###########################################
603 4     4   4 my($self) = @_;
604              
605 4         167 return -f $self->{db_file};
606             }
607              
608             ###########################################
609             sub save {
610             ###########################################
611 15     15   45 my($self, $data) = @_;
612              
613 15         94 DEBUG "Saving YAML file $self->{db_file}";
614 15         184 YAML::DumpFile( $self->{db_file}, $data );
615             }
616              
617             ###########################################
618             sub load {
619             ###########################################
620 17     17   45 my($self) = @_;
621              
622 17         105 DEBUG "Loading YAML file $self->{db_file}";
623 17         228 return YAML::LoadFile( $self->{db_file} );
624             }
625              
626             ###########################################
627             sub lock {
628             ###########################################
629 15     15   37 my($self) = @_;
630              
631 15 50       1818 open $self->{fh}, "+<", $self->{db_file} or
632             LOGDIE "Can't open $self->{db_file} for locking: $!";
633 15         133 flock $self->{fh}, LOCK_EX;
634             }
635              
636             ###########################################
637             sub unlock {
638             ###########################################
639 15     15   35 my($self) = @_;
640 15         228 flock $self->{fh}, LOCK_UN;
641             }
642              
643             1;
644              
645             __END__