File Coverage

blib/lib/BusyBird/Timeline.pm
Criterion Covered Total %
statement 166 170 97.6
branch 36 42 85.7
condition 20 27 74.0
subroutine 41 41 100.0
pod 15 15 100.0
total 278 295 94.2


line stmt bran cond sub pod time code
1             package BusyBird::Timeline;
2 11     11   10989 use strict;
  11         21  
  11         386  
3 11     11   50 use warnings;
  11         13  
  11         318  
4 11     11   2402 use BusyBird::Util qw(set_param);
  11         120  
  11         625  
5 11     11   54 use BusyBird::Log qw(bblog);
  11         14  
  11         399  
6 11     11   3799 use BusyBird::Flow;
  11         29  
  11         330  
7 11     11   4176 use BusyBird::Watcher::Aggregator;
  11         30  
  11         342  
8 11     11   67 use BusyBird::DateTime::Format 0.04;
  11         231  
  11         222  
9 11     11   3726 use BusyBird::Config;
  11         30  
  11         449  
10 11     11   79 use Async::Selector 1.0;
  11         345  
  11         258  
11 11     11   5495 use Data::UUID;
  11         6928  
  11         658  
12 11     11   64 use Carp;
  11         13  
  11         525  
13 11     11   5288 use Storable qw(dclone);
  11         24349  
  11         719  
14 11     11   68 use Scalar::Util qw(weaken looks_like_number);
  11         18  
  11         548  
15 11     11   52 use DateTime;
  11         15  
  11         17614  
16              
17             our @CARP_NOT = qw(BusyBird::Config);
18              
19             sub new {
20 229     229 1 8168 my ($class, %args) = @_;
21 229         1212 my $self = bless {
22             filter_flow => BusyBird::Flow->new,
23             selector => Async::Selector->new,
24             unacked_counts => {total => 0},
25             config => BusyBird::Config->new(type => "timeline", with_default => 0),
26             id_generator => Data::UUID->new,
27             }, $class;
28 229         1441 $self->set_param(\%args, 'name', undef, 1);
29 229         657 $self->set_param(\%args, 'storage', undef, 1);
30 229         619 $self->set_param(\%args, 'watcher_max', 512);
31 229 50       828 croak 'name must not be empty' if $self->{name} eq '';
32 229         678 $self->_init_selector();
33 229         2995 $self->_update_unacked_counts();
34 229         10263 return $self;
35             }
36              
37             sub _log {
38 11     11   13 my ($self, $level, $msg) = @_;
39 11         24 bblog($level, $self->name . ": $msg");
40             }
41              
42             sub _update_unacked_counts {
43 546     546   863 my ($self) = @_;
44             $self->get_unacked_counts(callback => sub {
45 546     546   1404 my ($error, $unacked_counts) = @_;
46 546 100       1414 if(defined($error)) {
47 11         34 $self->_log('error', "error while updating unacked count: $error");
48 11         36 return;
49             }
50 535         1036 $self->{unacked_counts} = $unacked_counts;
51 535         2742 $self->{selector}->trigger('unacked_counts');
52 546         3095 });
53             }
54              
55             sub _init_selector {
56 229     229   309 my ($self) = @_;
57 229         670 weaken $self;
58             $self->{selector}->register(unacked_counts => sub {
59 207     207   7713 my ($exp_unacked_counts) = @_;
60 207 50 33     1052 if(!defined($exp_unacked_counts) || ref($exp_unacked_counts) ne 'HASH') {
61 0         0 croak "unacked_counts watcher: condition input must be a hash-ref";
62             }
63 207 100       709 return { %{$self->{unacked_counts}} } if !%$exp_unacked_counts;
  10         50  
64 197         440 foreach my $key (keys %$exp_unacked_counts) {
65 233   100     616 my $exp_val = $exp_unacked_counts->{$key} || 0;
66 233   100     614 my $got_val = $self->{unacked_counts}{$key} || 0;
67 233 100       528 return { %{$self->{unacked_counts}} } if $exp_val != $got_val;
  89         404  
68             }
69 108         225 return undef;
70 229         1725 });
71             $self->{selector}->register(watcher_quota => sub {
72 292     292   10372 my ($in) = @_;
73 292         659 my @watchers = $self->{selector}->watchers('watcher_quota');
74 292 100       7164 if(int(@watchers) <= $self->{watcher_max}) {
75 277         768 return undef;
76             }
77 15   100     46 my $watcher_age = $in->{age} || 0;
78 15 100       45 return $watcher_age > $self->{watcher_max} ? 1 : undef;
79 229         5741 });
80             }
81              
82             sub name {
83 1857     1857 1 12824 return shift->{name};
84             }
85              
86             sub _get_from_storage {
87 917     917   1382 my ($self, $method, $args_ref) = @_;
88 917         2166 $args_ref->{timeline} = $self->name;
89 917         2949 local @CARP_NOT = (ref($self->{storage}));
90 917         5271 $self->{storage}->$method(%$args_ref);
91             }
92              
93             sub get_statuses {
94 310     310 1 195901 my ($self, %args) = @_;
95 310         1049 $self->_get_from_storage("get_statuses", \%args);
96             }
97              
98             sub get_unacked_counts {
99 590     590 1 10817 my ($self, %args) = @_;
100 590         1735 $self->_get_from_storage("get_unacked_counts", \%args);
101             }
102              
103             sub _write_statuses {
104 319     319   592 my ($self, $method, $args_ref) = @_;
105 319         840 $args_ref->{timeline} = $self->name;
106 319         1352 local @CARP_NOT = (ref($self->{storage}));
107 319         544 my $orig_callback = $args_ref->{callback};
108             $self->{storage}->$method(%$args_ref, callback => sub {
109 317     317   1677 $self->_update_unacked_counts();
110 317 100       10091 goto $orig_callback if defined($orig_callback);
111 319         2756 });
112             }
113              
114             sub put_statuses {
115 32     32 1 51687 my ($self, %args) = @_;
116 32         121 $self->_write_statuses('put_statuses', \%args);
117             }
118              
119             sub delete_statuses {
120 19     19 1 82395 my ($self, %args) = @_;
121 19         80 $self->_write_statuses('delete_statuses', \%args);
122             }
123              
124             sub ack_statuses {
125 85     85 1 62927 my ($self, %args) = @_;
126 85         322 $self->_write_statuses('ack_statuses', \%args);
127             }
128              
129             sub add_statuses {
130 183     183 1 505570 my ($self, %args) = @_;
131 183 50       750 if(!defined($args{statuses})) {
132 0         0 croak "statuses argument is mandatory";
133             }
134 183         578 my $ref = ref($args{statuses});
135 183 100       850 if($ref eq "HASH") {
    50          
136 9         34 $args{statuses} = [ $args{statuses} ];
137             }elsif($ref ne "ARRAY") {
138 0         0 croak "statuses argument must be a status or an array-ref of statuses";
139             }
140 183         12211 my $statuses = dclone($args{statuses});
141 183         398 my $final_callback = $args{callback};
142             $self->{filter_flow}->execute($statuses, sub {
143 183     183   930 my $filter_result = shift;
144 183         244 my $cur_time;
145 183         405 foreach my $status (@$filter_result) {
146 1521 50 33     5769 next if !defined($status) || ref($status) ne 'HASH';
147 1521 100       2296 if(!defined($status->{id})) {
148 19         51 $status->{id} = sprintf('busybird://%s/%s', $self->name, $self->{id_generator}->create_str);
149             }
150 1521 100       2726 if(!defined($status->{created_at})) {
151 16   66     126 $cur_time ||= DateTime->now;
152 16         3600 $status->{created_at} = BusyBird::DateTime::Format->format_datetime($cur_time);
153             }
154             }
155 183         3558 $self->_write_statuses('put_statuses', {
156             mode => 'insert', statuses => $filter_result,
157             callback => $final_callback
158             });
159 183         1778 });
160             }
161              
162             sub add {
163 29     29 1 18273 my ($self, $statuses, $callback) = @_;
164 29         110 $self->add_statuses(statuses => $statuses, callback => $callback);
165             }
166              
167             sub contains {
168 17     17 1 28544 my ($self, %args) = @_;
169 17         62 $self->_get_from_storage("contains", \%args);
170             }
171              
172             sub add_filter {
173 57     57 1 23451 my ($self, $filter, $is_async) = @_;
174 57 100       223 if(!$is_async) {
175 33         61 my $sync_filter = $filter;
176             $filter = sub {
177 50     50   76 my ($statuses, $done) = @_;
178 50         152 @_ = $sync_filter->($statuses);
179 48         1876 goto $done;
180 33         133 };
181             }
182 57         230 $self->{filter_flow}->add($filter);
183             }
184              
185             sub add_filter_async {
186 8     8 1 38 my ($self, $filter) = @_;
187 8         20 $self->add_filter($filter, 1);
188             }
189              
190             sub set_config {
191 12     12 1 619 shift()->{config}->set_config(@_);
192             }
193              
194             sub get_config {
195 330     330 1 1739 shift()->{config}->get_config(@_);
196             }
197              
198             sub watch_unacked_counts {
199 165     165 1 135765 my ($self, %watch_args) = @_;
200 165         373 my $callback = $watch_args{callback};
201 165         200 my $assumed = $watch_args{assumed};
202 165 100 100     1006 if(!defined($callback) || ref($callback) ne 'CODE') {
203 10         886 croak "watch_unacked_counts: callback must be a code-ref";
204             }
205 155 100 100     725 if(!defined($assumed) || ref($assumed) ne 'HASH') {
206 10         1027 croak "watch_unacked_counts: assumed must be a hash-ref";
207             }
208 145         418 $assumed = +{ %$assumed };
209 145         347 foreach my $key (keys %$assumed) {
210 187 100 66     1152 next if $key eq 'total' || (looks_like_number($key) && int($key) == $key);
      66        
211 15         33 delete $assumed->{$key};
212             }
213 145         579 my $watcher = BusyBird::Watcher::Aggregator->new();
214             my $orig_watcher = $self->{selector}->watch(
215             unacked_counts => $assumed, watcher_quota => { age => 0 }, sub {
216 103     103   833 my ($orig_w, %res) = @_;
217 103 100       294 if($res{watcher_quota}) {
218 4         11 $watcher->cancel();
219 4         126 $callback->("watcher cancelled because it is too old", $watcher);
220 4         20 return;
221             }
222 99 50       232 if($res{unacked_counts}) {
223 99         286 $callback->(undef, $watcher, $res{unacked_counts});
224 99         48136 return;
225             }
226 0         0 confess("Something terrible happened.");
227             }
228 145         1825 );
229 145         1199 $watcher->add($orig_watcher);
230 145 100       4291 if($watcher->active) {
231 107         605 my @quota_watchers = $self->{selector}->watchers('watcher_quota');
232 107         2268 foreach my $w (@quota_watchers) {
233 147         273 my %cond = $w->conditions;
234 147         702 $cond{watcher_quota}{age}++;
235             }
236 107         278 $self->{selector}->trigger('watcher_quota');
237             }
238 145         1082 return $watcher;
239             }
240              
241              
242             1;
243              
244             __END__