File Coverage

blib/lib/BusyBird/Timeline.pm
Criterion Covered Total %
statement 169 173 97.6
branch 36 42 85.7
condition 20 27 74.0
subroutine 42 42 100.0
pod 15 15 100.0
total 282 299 94.3


line stmt bran cond sub pod time code
1             package BusyBird::Timeline;
2 11     11   11194 use v5.8.0;
  11         31  
  11         431  
3 11     11   48 use strict;
  11         15  
  11         327  
4 11     11   44 use warnings;
  11         16  
  11         307  
5 11     11   2483 use BusyBird::Util qw(set_param);
  11         24  
  11         902  
6 11     11   64 use BusyBird::Log qw(bblog);
  11         16  
  11         475  
7 11     11   4747 use BusyBird::Flow;
  11         35  
  11         475  
8 11     11   4883 use BusyBird::Watcher::Aggregator;
  11         41  
  11         471  
9 11     11   65 use BusyBird::DateTime::Format 0.04;
  11         239  
  11         243  
10 11     11   3924 use BusyBird::Config;
  11         45  
  11         621  
11 11     11   131 use Async::Selector 1.0;
  11         446  
  11         324  
12 11     11   7462 use Data::UUID;
  11         8614  
  11         803  
13 11     11   76 use Carp;
  11         20  
  11         661  
14 11     11   5772 use Storable qw(dclone);
  11         27741  
  11         963  
15 11     11   97 use Scalar::Util qw(weaken looks_like_number);
  11         21  
  11         663  
16 11     11   66 use DateTime;
  11         20  
  11         20552  
17              
18             our @CARP_NOT = qw(BusyBird::Config);
19              
20             sub new {
21 229     229 1 9233 my ($class, %args) = @_;
22 229         1339 my $self = bless {
23             filter_flow => BusyBird::Flow->new,
24             selector => Async::Selector->new,
25             unacked_counts => {total => 0},
26             config => BusyBird::Config->new(type => "timeline", with_default => 0),
27             id_generator => Data::UUID->new,
28             }, $class;
29 229         1695 $self->set_param(\%args, 'name', undef, 1);
30 229         756 $self->set_param(\%args, 'storage', undef, 1);
31 229         634 $self->set_param(\%args, 'watcher_max', 512);
32 229 50       806 croak 'name must not be empty' if $self->{name} eq '';
33 229         699 $self->_init_selector();
34 229         3183 $self->_update_unacked_counts();
35 229         11276 return $self;
36             }
37              
38             sub _log {
39 11     11   18 my ($self, $level, $msg) = @_;
40 11         26 bblog($level, $self->name . ": $msg");
41             }
42              
43             sub _update_unacked_counts {
44 546     546   1101 my ($self) = @_;
45             $self->get_unacked_counts(callback => sub {
46 546     546   1767 my ($error, $unacked_counts) = @_;
47 546 100       1836 if(defined($error)) {
48 11         51 $self->_log('error', "error while updating unacked count: $error");
49 11         51 return;
50             }
51 535         1415 $self->{unacked_counts} = $unacked_counts;
52 535         3340 $self->{selector}->trigger('unacked_counts');
53 546         3824 });
54             }
55              
56             sub _init_selector {
57 229     229   430 my ($self) = @_;
58 229         808 weaken $self;
59             $self->{selector}->register(unacked_counts => sub {
60 211     211   9927 my ($exp_unacked_counts) = @_;
61 211 50 33     1224 if(!defined($exp_unacked_counts) || ref($exp_unacked_counts) ne 'HASH') {
62 0         0 croak "unacked_counts watcher: condition input must be a hash-ref";
63             }
64 211 100       841 return { %{$self->{unacked_counts}} } if !%$exp_unacked_counts;
  10         51  
65 201         511 foreach my $key (keys %$exp_unacked_counts) {
66 235   100     783 my $exp_val = $exp_unacked_counts->{$key} || 0;
67 235   100     704 my $got_val = $self->{unacked_counts}{$key} || 0;
68 235 100       658 return { %{$self->{unacked_counts}} } if $exp_val != $got_val;
  89         420  
69             }
70 112         285 return undef;
71 229         2010 });
72             $self->{selector}->register(watcher_quota => sub {
73 300     300   10744 my ($in) = @_;
74 300         761 my @watchers = $self->{selector}->watchers('watcher_quota');
75 300 100       7506 if(int(@watchers) <= $self->{watcher_max}) {
76 286         610 return undef;
77             }
78 14   100     47 my $watcher_age = $in->{age} || 0;
79 14 100       42 return $watcher_age > $self->{watcher_max} ? 1 : undef;
80 229         6339 });
81             }
82              
83             sub name {
84 1857     1857 1 19945 return shift->{name};
85             }
86              
87             sub _get_from_storage {
88 917     917   1728 my ($self, $method, $args_ref) = @_;
89 917         2729 $args_ref->{timeline} = $self->name;
90 917         3713 local @CARP_NOT = (ref($self->{storage}));
91 917         6042 $self->{storage}->$method(%$args_ref);
92             }
93              
94             sub get_statuses {
95 310     310 1 223741 my ($self, %args) = @_;
96 310         1203 $self->_get_from_storage("get_statuses", \%args);
97             }
98              
99             sub get_unacked_counts {
100 590     590 1 13678 my ($self, %args) = @_;
101 590         2167 $self->_get_from_storage("get_unacked_counts", \%args);
102             }
103              
104             sub _write_statuses {
105 319     319   656 my ($self, $method, $args_ref) = @_;
106 319         1185 $args_ref->{timeline} = $self->name;
107 319         1441 local @CARP_NOT = (ref($self->{storage}));
108 319         603 my $orig_callback = $args_ref->{callback};
109             $self->{storage}->$method(%$args_ref, callback => sub {
110 317     317   1935 $self->_update_unacked_counts();
111 317 100       11966 goto $orig_callback if defined($orig_callback);
112 319         3105 });
113             }
114              
115             sub put_statuses {
116 32     32 1 62684 my ($self, %args) = @_;
117 32         171 $self->_write_statuses('put_statuses', \%args);
118             }
119              
120             sub delete_statuses {
121 19     19 1 100716 my ($self, %args) = @_;
122 19         92 $self->_write_statuses('delete_statuses', \%args);
123             }
124              
125             sub ack_statuses {
126 85     85 1 77661 my ($self, %args) = @_;
127 85         415 $self->_write_statuses('ack_statuses', \%args);
128             }
129              
130             sub add_statuses {
131 183     183 1 585117 my ($self, %args) = @_;
132 183 50       901 if(!defined($args{statuses})) {
133 0         0 croak "statuses argument is mandatory";
134             }
135 183         460 my $ref = ref($args{statuses});
136 183 100       953 if($ref eq "HASH") {
    50          
137 9         33 $args{statuses} = [ $args{statuses} ];
138             }elsif($ref ne "ARRAY") {
139 0         0 croak "statuses argument must be a status or an array-ref of statuses";
140             }
141 183         13539 my $statuses = dclone($args{statuses});
142 183         479 my $final_callback = $args{callback};
143             $self->{filter_flow}->execute($statuses, sub {
144 183     183   1038 my $filter_result = shift;
145 183         262 my $cur_time;
146 183         471 foreach my $status (@$filter_result) {
147 1521 50 33     6173 next if !defined($status) || ref($status) ne 'HASH';
148 1521 100       2396 if(!defined($status->{id})) {
149 19         50 $status->{id} = sprintf('busybird://%s/%s', $self->name, $self->{id_generator}->create_str);
150             }
151 1521 100       2773 if(!defined($status->{created_at})) {
152 16   66     131 $cur_time ||= DateTime->now;
153 16         4100 $status->{created_at} = BusyBird::DateTime::Format->format_datetime($cur_time);
154             }
155             }
156 183         4307 $self->_write_statuses('put_statuses', {
157             mode => 'insert', statuses => $filter_result,
158             callback => $final_callback
159             });
160 183         1833 });
161             }
162              
163             sub add {
164 29     29 1 22287 my ($self, $statuses, $callback) = @_;
165 29         128 $self->add_statuses(statuses => $statuses, callback => $callback);
166             }
167              
168             sub contains {
169 17     17 1 38621 my ($self, %args) = @_;
170 17         122 $self->_get_from_storage("contains", \%args);
171             }
172              
173             sub add_filter {
174 57     57 1 27754 my ($self, $filter, $is_async) = @_;
175 57 100       179 if(!$is_async) {
176 33         46 my $sync_filter = $filter;
177             $filter = sub {
178 50     50   100 my ($statuses, $done) = @_;
179 50         167 @_ = $sync_filter->($statuses);
180 48         2181 goto $done;
181 33         147 };
182             }
183 57         258 $self->{filter_flow}->add($filter);
184             }
185              
186             sub add_filter_async {
187 8     8 1 40 my ($self, $filter) = @_;
188 8         21 $self->add_filter($filter, 1);
189             }
190              
191             sub set_config {
192 12     12 1 691 shift()->{config}->set_config(@_);
193             }
194              
195             sub get_config {
196 330     330 1 2088 shift()->{config}->get_config(@_);
197             }
198              
199             sub watch_unacked_counts {
200 169     169 1 102849 my ($self, %watch_args) = @_;
201 169         311 my $callback = $watch_args{callback};
202 169         231 my $assumed = $watch_args{assumed};
203 169 100 100     1026 if(!defined($callback) || ref($callback) ne 'CODE') {
204 10         832 croak "watch_unacked_counts: callback must be a code-ref";
205             }
206 159 100 100     786 if(!defined($assumed) || ref($assumed) ne 'HASH') {
207 10         1041 croak "watch_unacked_counts: assumed must be a hash-ref";
208             }
209 149         460 $assumed = +{ %$assumed };
210 149         374 foreach my $key (keys %$assumed) {
211 191 100 66     1226 next if $key eq 'total' || (looks_like_number($key) && int($key) == $key);
      66        
212 15         30 delete $assumed->{$key};
213             }
214 149         627 my $watcher = BusyBird::Watcher::Aggregator->new();
215             my $orig_watcher = $self->{selector}->watch(
216             unacked_counts => $assumed, watcher_quota => { age => 0 }, sub {
217 103     103   920 my ($orig_w, %res) = @_;
218 103 100       280 if($res{watcher_quota}) {
219 4         12 $watcher->cancel();
220 4         146 $callback->("watcher cancelled because it is too old", $watcher);
221 4         20 return;
222             }
223 99 50       256 if($res{unacked_counts}) {
224 99         310 $callback->(undef, $watcher, $res{unacked_counts});
225 99         39893 return;
226             }
227 0         0 confess("Something terrible happened.");
228             }
229 149         2062 );
230 149         1315 $watcher->add($orig_watcher);
231 149 100       4567 if($watcher->active) {
232 111         642 my @quota_watchers = $self->{selector}->watchers('watcher_quota');
233 111         2466 foreach my $w (@quota_watchers) {
234 151         319 my %cond = $w->conditions;
235 151         801 $cond{watcher_quota}{age}++;
236             }
237 111         366 $self->{selector}->trigger('watcher_quota');
238             }
239 149         1201 return $watcher;
240             }
241              
242              
243             1;
244              
245             __END__