File Coverage

blib/lib/BusyBird/StatusStorage/SQLite.pm
Criterion Covered Total %
statement 453 482 93.9
branch 169 202 83.6
condition 66 81 81.4
subroutine 58 66 87.8
pod 9 9 100.0
total 755 840 89.8


line stmt bran cond sub pod time code
1             package BusyBird::StatusStorage::SQLite;
2 12     12   73304 use strict;
  12         21  
  12         401  
3 12     12   51 use warnings;
  12         18  
  12         326  
4 12     12   54 use parent ("BusyBird::StatusStorage");
  12         16  
  12         100  
5 12     12   16308 use DBI;
  12         171501  
  12         790  
6 12     12   103 use Carp;
  12         33  
  12         730  
7 12     12   62 use Try::Tiny;
  12         24  
  12         644  
8 12     12   7041 use SQL::Maker 1.19;
  12         133981  
  12         458  
9 12     12   5800 use SQL::QueryMaker 0.03 qw(sql_and sql_eq sql_ne sql_or sql_lt sql_le sql_raw);
  12         26350  
  12         991  
10 12     12   74 use BusyBird::DateTime::Format;
  12         18  
  12         304  
11 12     12   53 use BusyBird::Util qw(set_param);
  12         18  
  12         563  
12 12     12   6535 use JSON;
  12         99700  
  12         64  
13 12     12   1642 use Scalar::Util qw(looks_like_number);
  12         21  
  12         559  
14 12     12   60 use DateTime::Format::Strptime;
  12         23  
  12         501  
15 12     12   59 use DateTime;
  12         18  
  12         252  
16 12     12   2956 no autovivification;
  12         4563  
  12         76  
17              
18             my @STATUSES_ORDER_BY = ('utc_acked_at DESC', 'utc_created_at DESC', 'status_id DESC');
19             my $DELETE_COUNT_ID = 0;
20              
21             my $UNDEF_TIMESTAMP = '9999-99-99T99:99:99';
22              
23             {
24             my $TIMESTAMP_FORMAT_STR = '%Y-%m-%dT%H:%M:%S';
25             my $TIMESTAMP_FORMAT = DateTime::Format::Strptime->new(
26             pattern => $TIMESTAMP_FORMAT_STR,
27             time_zone => 'UTC',
28             on_error => 'croak',
29             );
30              
31             sub _format_datetime {
32 4539     4539   39851 my ($dt) = @_;
33 4539         10005 return $dt->strftime($TIMESTAMP_FORMAT_STR);
34             }
35              
36             sub _parse_datetime {
37 15621     15621   17653 my ($dt_str) = @_;
38 15621         43818 return $TIMESTAMP_FORMAT->parse_datetime($dt_str);
39             }
40             }
41              
42              
43             sub new {
44 183     183 1 222424 my ($class, %args) = @_;
45 183         1394 my $self = bless {
46             maker => SQL::Maker->new(driver => 'SQLite', strict => 1),
47             in_memory_dbh => undef,
48             }, $class;
49 183         5783 $self->set_param(\%args, "path", undef, 1);
50 183         550 $self->set_param(\%args, "max_status_num", 2000);
51 183         1074 $self->set_param(\%args, "hard_max_status_num", int($self->{max_status_num} * 1.2));
52 183         730 $self->set_param(\%args, "vacuum_on_delete", int($self->{max_status_num} * 2.0));
53 183 50       1032 croak "max_status_num must be a number" if !looks_like_number($self->{max_status_num});
54 183 50       773 croak "hard_max_status_num must be a number" if !looks_like_number($self->{hard_max_status_num});
55 183         483 $self->{max_status_num} = int($self->{max_status_num});
56 183         448 $self->{hard_max_status_num} = int($self->{hard_max_status_num});
57 183 50       796 croak "hard_max_status_num must be >= max_status_num" if !($self->{hard_max_status_num} >= $self->{max_status_num});
58 183         639 $self->_create_tables();
59 182         93402 return $self;
60             }
61              
62             sub _create_new_dbh {
63 821     821   1778 my ($self, @connect_params) = @_;
64 821         4767 my $dbh = DBI->connect(@connect_params);
65 820         391195 $dbh->do(q{PRAGMA foreign_keys = ON});
66 820         74678 return $dbh;
67             }
68              
69             sub _get_my_dbh {
70 2649     2649   4446 my ($self) = @_;
71 2649         18298 my @connect_params = ("dbi:SQLite:dbname=$self->{path}", "", "", {
72             RaiseError => 1, PrintError => 0, AutoCommit => 1, sqlite_unicode => 1,
73             });
74 2649 100       9327 if($self->{path} eq ':memory:') {
75 2000 100       6297 $self->{in_memory_dbh} = $self->_create_new_dbh(@connect_params) if !$self->{in_memory_dbh};
76 2000         7039 return $self->{in_memory_dbh};
77             }
78 649         1869 return $self->_create_new_dbh(@connect_params);
79             }
80              
81             sub _create_tables {
82 183     183   309 my ($self) = @_;
83 183         576 my $dbh = $self->_get_my_dbh();
84 182         731 $dbh->do(<
85             CREATE TABLE IF NOT EXISTS timelines (
86             timeline_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
87             name TEXT UNIQUE NOT NULL
88             )
89             EOD
90 182         166140 $dbh->do(<
91             CREATE TABLE IF NOT EXISTS statuses (
92             timeline_id INTEGER NOT NULL
93             REFERENCES timelines(timeline_id) ON DELETE CASCADE ON UPDATE CASCADE,
94             status_id TEXT NOT NULL,
95             utc_acked_at TEXT NOT NULL,
96             utc_created_at TEXT NOT NULL,
97             timezone_acked_at TEXT NOT NULL,
98             timezone_created_at TEXT NOT NULL,
99             level INTEGER NOT NULL,
100             content TEXT NOT NULL,
101              
102             PRIMARY KEY (timeline_id, status_id)
103             )
104             EOD
105 182         132545 $dbh->do(<
106             CREATE TABLE IF NOT EXISTS delete_counts (
107             delete_count_id INTEGER PRIMARY KEY NOT NULL,
108             delete_count INTEGER NOT NULL
109             )
110             EOD
111 182         115651 my ($sql, @bind) = $self->{maker}->insert('delete_counts', [
112             delete_count_id => $DELETE_COUNT_ID, delete_count => 0
113             ], {prefix => 'INSERT OR IGNORE INTO'});
114 182         20773 $dbh->do($sql, undef, @bind);
115             }
116              
117             sub _record_hash_to_array {
118 3283     3283   3376 my ($record) = @_;
119 3283         15487 return [ map { $_, $record->{$_} } sort { $a cmp $b } keys %$record ];
  26264         49650  
  49973         42335  
120             }
121              
122             sub _put_update {
123 426     426   756 my ($self, $dbh, $record, $prev_sth) = @_;
124 426         430 my $sth = $prev_sth;
125 426         940 my ($sql, @bind) = $self->{maker}->update('statuses', _record_hash_to_array($record), sql_and([
126             sql_eq('timeline_id' => $record->{timeline_id}), sql_eq(status_id => $record->{status_id})
127             ]));
128 426 100       128527 if(!$sth) {
129             ## Or, should we check $sql is not changed...?
130 111         738 $sth = $dbh->prepare($sql);
131             }
132 426         34082 return ($sth->execute(@bind), $sth);
133             }
134              
135             sub _put_insert {
136 2857     2857   3791 my ($self, $dbh, $record, $prev_sth) = @_;
137 2857         2957 my $sth = $prev_sth;
138 2857         6398 my ($sql, @bind) = $self->{maker}->insert('statuses', _record_hash_to_array($record), {
139             prefix => 'INSERT OR IGNORE INTO'
140             });
141 2857 100       534434 if(!$sth) {
142 371         2377 $sth = $dbh->prepare($sql);
143             }
144 2857         115367 return ($sth->execute(@bind), $sth);
145             }
146              
147             sub _put_upsert {
148 68     68   95 my ($self, $dbh, $record) = @_;
149 68         138 my ($count) = $self->_put_update($dbh, $record);
150 68 100       931 if($count <= 0) {
151 34         95 ($count) = $self->_put_insert($dbh, $record);
152             }
153 68         480 return ($count, undef);
154             }
155              
156             sub put_statuses {
157 430     430 1 214029 my ($self, %args) = @_;
158 430         921 my $timeline = $args{timeline};
159 430 100       1431 croak "timeline parameter is mandatory" if not defined $timeline;
160 428         762 my $mode = $args{mode};
161 428 100       1321 croak "mode parameter is mandatory" if not defined $mode;
162 426 50 100     1975 if($mode ne 'insert' && $mode ne 'update' && $mode ne 'upsert') {
      66        
163 0         0 croak "mode must be either insert, update or upsert";
164             }
165 426         710 my $statuses = $args{statuses};
166 426 100       1293 croak "statuses parameter is mandatory" if not defined $statuses;
167 424 50 66     2467 if(ref($statuses) ne 'HASH' && ref($statuses) ne 'ARRAY') {
168 0         0 croak "statuses parameter must be either a status object or an array-ref of statuses";
169             }
170 424 100       1222 if(ref($statuses) eq 'HASH') {
171 82         182 $statuses = [$statuses];
172             }
173 424         1061 foreach my $status (@$statuses) {
174 3309 100 66     16265 croak "status object must be a hash-ref" if !defined($status) || !ref($status) || ref($status) ne 'HASH';
      66        
175 3303 100       8816 croak "status ID is missing" if not defined $status->{id};
176             }
177 394   50 0   1302 my $callback = $args{callback} || sub {};
  0         0  
178 394         514 my $dbh;
179             my @results = try {
180 394 100   394   14209 return (undef, 0) if @$statuses == 0;
181 388         1088 $dbh = $self->_get_my_dbh();
182 388         2815 $dbh->begin_work();
183 388   66     6468 my $timeline_id = $self->_get_timeline_id($dbh, $timeline) || $self->_create_timeline($dbh, $timeline);
184 388 50       1098 if(!defined($timeline_id)) {
185 0         0 die "Internal error: could not create a timeline '$timeline' somehow.";
186             }
187 388         541 my $sth;
188 388         545 my $total_count = 0;
189 388         1233 my $put_method = "_put_$mode";
190 388         998 foreach my $status (@$statuses) {
191 3267         6559 my $record = _to_status_record($timeline_id, $status);
192 3249         3204 my $count;
193 3249         8797 ($count, $sth) = $self->$put_method($dbh, $record, $sth);
194 3249 100       8860 if($count > 0) {
195 3211         11949 $total_count += $count;
196             }
197             }
198 370         673 my $exceeding_delete_count = 0;
199 370 100 100     2227 if($mode ne "update" && $total_count > 0) {
200 325         1314 $exceeding_delete_count = $self->_delete_exceeding_statuses($dbh, $timeline_id);
201             }
202 370         1055414 $dbh->commit();
203 370 100       1580 if($exceeding_delete_count > 0) {
204 8         47 $self->_add_to_delete_count($dbh, $exceeding_delete_count);
205             }
206 370         51814 return (undef, $total_count);
207             } catch {
208 18     18   827 my $e = shift;
209 18 50       54 if($dbh) {
210 18         1406 $dbh->rollback();
211             }
212 18         155 return ($e);
213 394         4014 };
214 394         10745 @_ = @results;
215 394         12190 goto $callback;
216             }
217              
218             sub _get_timeline_id {
219 2721     2721   4780 my ($self, $dbh, $timeline_name) = @_;
220 2721         13263 my ($sql, @bind) = $self->{maker}->select('timelines', ['timeline_id'], sql_eq(name => $timeline_name));
221 2721         739915 my $record = $dbh->selectrow_arrayref($sql, undef, @bind);
222 2721 100       396817 if(!defined($record)) {
223 644         3251 return undef;
224             }
225 2077         10928 return $record->[0];
226             }
227              
228             sub _create_timeline {
229 267     267   716 my ($self, $dbh, $timeline_name) = @_;
230 267         1793 my ($sql, @bind) = $self->{maker}->insert('timelines', [name => "$timeline_name"]);
231 267         21480 $dbh->do($sql, undef, @bind);
232 267         42058 return $self->_get_timeline_id($dbh, $timeline_name);
233             }
234              
235             sub _to_status_record {
236 3267     3267   4317 my ($timeline_id, $status) = @_;
237 3267 50       8612 croak "status ID must be set" if not defined $status->{id};
238 3267 50       5601 croak "timeline_id must be defined" if not defined $timeline_id;
239 3267   100     23241 my $record = {
240             timeline_id => $timeline_id,
241             status_id => $status->{id},
242             level => $status->{busybird}{level} || 0,
243             };
244 3261         7870 my $acked_at = $status->{busybird}{acked_at}; ## avoid autovivification
245 3261         6551 ($record->{utc_acked_at}, $record->{timezone_acked_at}) = _extract_utc_timestamp_and_timezone($acked_at);
246 3255         6895 ($record->{utc_created_at}, $record->{timezone_created_at}) = _extract_utc_timestamp_and_timezone($status->{created_at});
247 3249         9507 $record->{content} = to_json($status);
248 3249         55814 return $record;
249             }
250              
251             sub _from_status_record {
252 10319     10319   12435 my ($record) = @_;
253 10319         30213 my $status = from_json($record->{content});
254 10319         163404 $status->{id} = $record->{status_id};
255 10319 100 100     60889 if($record->{level} != 0 || defined($status->{busybird}{level})) {
256 2760         5656 $status->{busybird}{level} = $record->{level};
257             }
258 10319         23060 my $acked_at_str = _create_bb_timestamp_from_utc_timestamp_and_timezone($record->{utc_acked_at}, $record->{timezone_acked_at});
259 10319 100 66     934297 if(defined($acked_at_str) || defined($status->{busybird}{acked_at})) {
260 5328         12829 $status->{busybird}{acked_at} = $acked_at_str;
261             }
262 10319         22233 my $created_at_str = _create_bb_timestamp_from_utc_timestamp_and_timezone($record->{utc_created_at}, $record->{timezone_created_at});
263 10319 100 66     1758100 if(defined($created_at_str) || defined($status->{created_at})) {
264 10293         20323 $status->{created_at} = $created_at_str;
265             }
266 10319         287390 return $status;
267             }
268              
269             sub _extract_utc_timestamp_and_timezone {
270 6516     6516   6652 my ($timestamp_str) = @_;
271 6516 100 66     22599 if(!defined($timestamp_str) || $timestamp_str eq '') {
272 2106         6126 return ($UNDEF_TIMESTAMP, 'UTC');
273             }
274 4410         13990 my $datetime = BusyBird::DateTime::Format->parse_datetime($timestamp_str);
275 4410 100       3562901 croak "Invalid datetime format: $timestamp_str" if not defined $datetime;
276 4398         10719 my $timezone_name = $datetime->time_zone->name;
277 4398         31500 $datetime->set_time_zone('UTC');
278 4398         34055 my $utc_timestamp = _format_datetime($datetime);
279 4398         277501 return ($utc_timestamp, $timezone_name);
280             }
281              
282             sub _create_bb_timestamp_from_utc_timestamp_and_timezone {
283 20638     20638   27483 my ($utc_timestamp_str, $timezone) = @_;
284 20638 100       39652 if($utc_timestamp_str eq $UNDEF_TIMESTAMP) {
285 5017         9864 return undef;
286             }
287 15621         26053 my $dt = _parse_datetime($utc_timestamp_str);
288 15621         8266082 $dt->set_time_zone($timezone);
289 15621         149262 return BusyBird::DateTime::Format->format_datetime($dt);
290             }
291              
292             sub get_statuses {
293 1142     1142 1 115154 my ($self, %args) = @_;
294 1142         2445 my $timeline = $args{timeline};
295 1142 100       3870 croak "timeline parameter is mandatory" if not defined $timeline;
296 1140         2368 my $callback = $args{callback};
297 1140 100       3214 croak "callback parameter is mandatory" if not defined $callback;
298 1138 50       3706 croak "callback parameter must be a CODEREF" if ref($callback) ne "CODE";
299 1138 100       3960 my $ack_state = defined($args{ack_state}) ? $args{ack_state} : "any";
300 1138 50 100     6661 if($ack_state ne "any" && $ack_state ne "unacked" && $ack_state ne "acked") {
      66        
301 0         0 croak "ack_state parameter must be either 'any' or 'acked' or 'unacked'";
302             }
303 1138         1854 my $max_id = $args{max_id};
304 1138 100       3483 my $count = defined($args{count}) ? $args{count} : 'all';
305 1138 50 66     4653 if($count ne 'all' && !looks_like_number($count)) {
306 0         0 croak "count parameter must be either 'all' or number";
307             }
308             my @results = try {
309 1138     1138   42745 my $dbh = $self->_get_my_dbh();
310 1138         3603 my $timeline_id = $self->_get_timeline_id($dbh, $timeline);
311 1138 100       2945 if(!defined($timeline_id)) {
312 86         2439 return (undef, []);
313             }
314 1052         4063 my $cond = $self->_create_base_condition($timeline_id, $ack_state);
315 1052 100       3233 if(defined($max_id)) {
316 193         519 my $max_id_cond = $self->_create_max_id_condition($dbh, $timeline_id, $max_id, $ack_state);
317 193 100       507 if(!defined($max_id_cond)) {
318 78         2912 return (undef, []);
319             }
320 115         333 $cond = sql_and([$cond, $max_id_cond]);
321             }
322 974         6126 my %maker_opt = (order_by => \@STATUSES_ORDER_BY);
323 974 100       2940 if($count ne 'all') {
324 204         562 $maker_opt{limit} = "$count";
325             }
326 974         4446 my ($sql, @bind) = $self->{maker}->select("statuses", ['*'], $cond, \%maker_opt);
327 974         281188 my $sth = $dbh->prepare($sql);
328 974         157308 $sth->execute(@bind);
329 974         2662 my @statuses = ();
330 974         36212 while(my $record = $sth->fetchrow_hashref('NAME_lc')) {
331 10319         17213 push(@statuses, _from_status_record($record));
332             }
333 974         77696 return (undef, \@statuses);
334             }catch {
335 0     0   0 my $e = shift;
336 0         0 return ($e);
337 1138         10596 };
338 1138         29427 @_ = @results;
339 1138         6580 goto $callback;
340             }
341              
342             sub _create_base_condition {
343 1856     1856   3456 my ($self, $timeline_id, $ack_state) = @_;
344 1856   50     3888 $ack_state ||= 'any';
345 1856         4869 my $cond = sql_eq(timeline_id => $timeline_id);
346 1856 100       40389 if($ack_state eq 'acked') {
    100          
347 369         1336 $cond = sql_and([$cond, sql_ne(utc_acked_at => $UNDEF_TIMESTAMP)]);
348             }elsif($ack_state eq 'unacked') {
349 970         2317 $cond = sql_and([$cond, sql_eq(utc_acked_at => $UNDEF_TIMESTAMP)]);
350             }
351 1856         61832 return $cond;
352             }
353              
354             sub _get_timestamps_of {
355 244     244   424 my ($self, $dbh, $timeline_id, $status_id, $ack_state) = @_;
356 244         528 my $cond = $self->_create_base_condition($timeline_id, $ack_state);
357 244         675 $cond = sql_and([$cond, sql_eq(status_id => $status_id)]);
358 244         10873 my ($sql, @bind) = $self->{maker}->select("statuses", ['utc_acked_at', 'utc_created_at'], $cond, {
359             limit => 1
360             });
361 244         75636 my $record = $dbh->selectrow_arrayref($sql, undef, @bind);
362 244 100       27003 if(!$record) {
363 96         2060 return ();
364             }
365 148         3162 return ($record->[0], $record->[1]);
366             }
367              
368             sub _create_max_id_condition {
369 244     244   553 my ($self, $dbh, $timeline_id, $max_id, $ack_state) = @_;
370 244         765 my ($max_acked_at, $max_created_at) = $self->_get_timestamps_of($dbh, $timeline_id, $max_id, $ack_state);
371 244 100 66     1216 if(!defined($max_acked_at) || !defined($max_created_at)) {
372 96         201 return undef;
373             }
374 148         468 return $self->_create_max_time_condition($max_acked_at, $max_created_at, $max_id);
375             }
376              
377             sub _create_max_time_condition {
378 156     156   340 my ($self, $max_acked_at, $max_created_at, $max_id) = @_;
379 156         684 my $cond = sql_or([
380             sql_lt(utc_acked_at => $max_acked_at),
381             sql_and([
382             sql_eq(utc_acked_at => $max_acked_at),
383             sql_or([
384             sql_lt(utc_created_at => $max_created_at),
385             sql_and([
386             sql_eq(utc_created_at => $max_created_at),
387             sql_le(status_id => $max_id),
388             ])
389             ])
390             ])
391             ]);
392 156         26416 return $cond;
393             }
394              
395             sub ack_statuses {
396 145     145 1 5057 my ($self, %args) = @_;
397 145         339 my $timeline = $args{timeline};
398 145 100       790 croak "timeline parameter is mandatory" if not defined $timeline;
399 143 50   0   503 my $callback = defined($args{callback}) ? $args{callback} : sub {};
  0         0  
400 143 50       564 croak "callback parameter must be a CODEREF" if ref($callback) ne 'CODE';
401 143         275 my $ids = $args{ids};
402 143 50 100     1385 if(defined($ids) && ref($ids) && ref($ids) ne 'ARRAY') {
      66        
403 0         0 croak "ids parameter must be either undef, a status ID or an array-ref of status IDs";
404             }
405 143 100 100     735 if(defined($ids) && !ref($ids)) {
406 19         53 $ids = [$ids];
407             }
408 143 100 100     591 if(defined($ids) && grep { !defined($_) } @$ids) {
  190         460  
409 2         227 croak "ids parameter array must not contain undef.";
410             }
411 141         304 my $max_id = $args{max_id};
412 141         190 my $dbh;
413             my @results = try {
414 141     141   5242 my $ack_utc_timestamp = _format_datetime(DateTime->now(time_zone => 'UTC'));
415 141         10077 $dbh = $self->_get_my_dbh();
416 141         458 my $timeline_id = $self->_get_timeline_id($dbh, $timeline);
417 141 50       449 return (undef, 0) if not defined $timeline_id;
418 141         817 $dbh->begin_work();
419 141         2320 my $total_count = 0;
420 141 100 100     800 if(!defined($ids) && !defined($max_id)) {
421 36         155 $total_count = $self->_ack_all($dbh, $timeline_id, $ack_utc_timestamp);
422             }else {
423 105 100       360 if(defined($max_id)) {
424 51         237 my $max_id_count = $self->_ack_max_id($dbh, $timeline_id, $ack_utc_timestamp, $max_id);
425 51 100       9814 $total_count += $max_id_count if $max_id_count > 0;
426             }
427 105 100       365 if(defined($ids)) {
428 71         331 my $ids_count = $self->_ack_ids($dbh, $timeline_id, $ack_utc_timestamp, $ids);
429 71 100       295 $total_count += $ids_count if $ids_count > 0;
430             }
431             }
432 141         347970 $dbh->commit();
433 141 50       602 $total_count = 0 if $total_count < 0;
434 141         767 return (undef, $total_count);
435             }catch {
436 0     0   0 my $e = shift;
437 0 0       0 if($dbh) {
438 0         0 $dbh->rollback();
439             }
440 0         0 return ($e);
441 141         1380 };
442 141         3498 @_ = @results;
443 141         3801 goto $callback;
444             }
445              
446             sub _ack_all {
447 36     36   82 my ($self, $dbh, $timeline_id, $ack_utc_timestamp) = @_;
448 36         227 my ($sql, @bind) = $self->{maker}->update(
449             'statuses', [utc_acked_at => $ack_utc_timestamp],
450             sql_and([sql_eq(timeline_id => $timeline_id), sql_eq(utc_acked_at => $UNDEF_TIMESTAMP)]),
451             );
452 36         8235 return $dbh->do($sql, undef, @bind);
453             }
454              
455             sub _ack_max_id {
456 51     51   134 my ($self, $dbh, $timeline_id, $ack_utc_timestamp, $max_id) = @_;
457 51         216 my $max_id_cond = $self->_create_max_id_condition($dbh, $timeline_id, $max_id, 'unacked');
458 51 100       178 if(!defined($max_id_cond)) {
459 18         55 return 0;
460             }
461 33         101 my $cond = $self->_create_base_condition($timeline_id, 'unacked');
462 33         166 my ($sql, @bind) = $self->{maker}->update(
463             'statuses', [utc_acked_at => $ack_utc_timestamp], sql_and([$cond, $max_id_cond])
464             );
465 33         9842 return $dbh->do($sql, undef, @bind);
466             }
467              
468             sub _ack_ids {
469 71     71   219 my ($self, $dbh, $timeline_id, $ack_utc_timestamp, $ids) = @_;
470 71 100       250 if(@$ids == 0) {
471 14         35 return 0;
472             }
473 57         94 my $total_count = 0;
474 57         88 my $sth;
475 57         301 foreach my $id (@$ids) {
476 184         478 my $cond = $self->_create_base_condition($timeline_id, 'unacked');
477 184         424 $cond = sql_and([$cond, sql_eq(status_id => $id)]);
478 184         7315 my ($sql, @bind) = $self->{maker}->update(
479             'statuses', [utc_acked_at => $ack_utc_timestamp], $cond
480             );
481 184 100       25991 if(!$sth) {
482 57         365 $sth = $dbh->prepare($sql);
483             }
484 184         10663 my $count = $sth->execute(@bind);
485 184 100       2309 if($count > 0) {
486 94         2007 $total_count += $count;
487             }
488             }
489 57         730 return $total_count;
490             }
491              
492             sub delete_statuses {
493 151     151 1 59978 my ($self, %args) = @_;
494 151         375 my $timeline = $args{timeline};
495 151 100       690 croak 'timeline parameter is mandatory' if not defined $timeline;
496 149 100       851 croak 'ids parameter is mandatory' if not exists $args{ids};
497 147         287 my $ids = $args{ids};
498 147 50 100     885 if(defined($ids) && ref($ids) && ref($ids) ne 'ARRAY') {
      66        
499 0         0 croak 'ids parameter must be either undef, a status ID or array-ref of status IDs.';
500             }
501 147 100 100     661 if(defined($ids) && !ref($ids)) {
502 15         44 $ids = [$ids];
503             }
504 147 100 100     534 if(defined($ids) && grep { !defined($_) } @$ids) {
  90         256  
505 2         176 croak "ids parameter array must not contain undef.";
506             }
507 145 50   0   557 my $callback = defined($args{callback}) ? $args{callback} : sub {};
  0         0  
508 145 50       472 croak 'callback parameter must be a CODEREF' if ref($callback) ne 'CODE';
509 145         192 my $dbh;
510             my @results = try {
511 145     145   5524 my $dbh = $self->_get_my_dbh();
512 145         495 my $timeline_id = $self->_get_timeline_id($dbh, $timeline);
513 145 100       407 if(!defined($timeline_id)) {
514 27         845 return (undef, 0);
515             }
516 118         747 $dbh->begin_work();
517 118         1721 my $total_count;
518 118 100       368 if(defined($ids)) {
519 31         158 $total_count = $self->_delete_ids($dbh, $timeline_id, $ids);
520             }else {
521 87         380 $total_count = $self->_delete_timeline($dbh, $timeline_id);
522             }
523 118 50       455 $total_count = 0 if $total_count < 0;
524 118         463374 $dbh->commit();
525 118         812 $self->_add_to_delete_count($dbh, $total_count);
526 118         95623 return (undef, $total_count);
527             }catch {
528 0     0   0 my $e = shift;
529 0 0       0 if($dbh) {
530 0         0 $dbh->rollback();
531             }
532 0         0 return ($e);
533 145         1318 };
534 145         3574 @_ = @results;
535 145         910 goto $callback;
536             }
537              
538             sub _delete_timeline {
539 87     87   163 my ($self, $dbh, $timeline_id) = @_;
540 87         386 my ($sql, @bind) = $self->{maker}->delete('statuses', sql_eq(timeline_id => $timeline_id));
541 87         8366 my $status_count = $dbh->do($sql, undef, @bind);
542 87         22967 ($sql, @bind) = $self->{maker}->delete('timelines', sql_eq(timeline_id => $timeline_id));
543 87         7661 $dbh->do($sql, undef, @bind);
544 87         13526 return $status_count;
545             }
546              
547             sub _delete_ids {
548 31     31   71 my ($self, $dbh, $timeline_id, $ids) = @_;
549 31 50       117 return 0 if @$ids == 0;
550 31         40 my $sth;
551 31         49 my $total_count = 0;
552 31         100 foreach my $id (@$ids) {
553 84         330 my ($sql, @bind) = $self->{maker}->delete('statuses', sql_and([
554             sql_eq(timeline_id => $timeline_id), sql_eq(status_id => $id)
555             ]));
556 84 100       14368 if(!$sth) {
557 31         194 $sth = $dbh->prepare($sql);
558             }
559 84         6205 my $count = $sth->execute(@bind);
560 84 100       297 if($count > 0) {
561 70         224 $total_count += $count;
562             }
563             }
564 31         416 return $total_count;
565             }
566              
567             sub _delete_exceeding_statuses {
568 325     325   604 my ($self, $dbh, $timeline_id) = @_;
569             ## get total count in the timeline
570 325         1988 my ($sql, @bind) = $self->{maker}->select('statuses', [\'count(*)'], sql_eq(timeline_id => $timeline_id));
571 325         90830 my $row = $dbh->selectrow_arrayref($sql, undef, @bind);
572 325 50       37982 if(!defined($row)) {
573 0         0 die "count query for timeline $timeline_id returns undef. something is wrong.";
574             }
575 325         1033 my $total_count = $row->[0];
576            
577 325 100       1511 if($total_count <= $self->{hard_max_status_num}) {
578 317         1130 return 0;
579             }
580              
581             ## get the top of the exceeding statuses
582 8         53 ($sql, @bind) = $self->{maker}->select('statuses', [qw(utc_acked_at utc_created_at status_id)], sql_eq(timeline_id => $timeline_id), {
583             order_by => \@STATUSES_ORDER_BY,
584             offset => $self->{max_status_num},
585             limit => 1,
586             });
587 8         3145 $row = $dbh->selectrow_arrayref($sql, undef, @bind);
588 8 50       1281 if(!defined($row)) {
589 0         0 die "selecting the top of exceeding status returns undef. something is wrong.";
590             }
591 8         48 my $time_cond = $self->_create_max_time_condition(@$row);
592              
593             ## execute deletion
594 8         24 my $timeline_cond = sql_eq(timeline_id => $timeline_id);
595 8         148 ($sql, @bind) = $self->{maker}->delete('statuses', sql_and([$timeline_cond, $time_cond]));
596 8         1983 return $dbh->do($sql, undef, @bind);
597             }
598              
599             sub get_unacked_counts {
600 607     607 1 240843 my ($self, %args) = @_;
601 607         1156 my $timeline = $args{timeline};
602 607 100       1833 croak 'timeline parameter is mandatory' if not defined $timeline;
603 605         899 my $callback = $args{callback};
604 605 100       1440 croak 'callback parameter is mandatory' if not defined $callback;
605 603 50       1907 croak 'callback parameter must be a CODEREF' if ref($callback) ne 'CODE';
606             my @results = try {
607 603     603   21112 my $dbh = $self->_get_my_dbh();
608 603         1876 my $timeline_id = $self->_get_timeline_id($dbh, $timeline);
609 603         1949 my %result_obj = (total => 0);
610 603 100       1408 if(!defined($timeline_id)) {
611 260         1073 return (undef, \%result_obj);
612             }
613 343         1307 my $cond = $self->_create_base_condition($timeline_id, 'unacked');
614 343         1840 my ($sql, @bind) = $self->{maker}->select('statuses', ['level', \'count(status_id)'], $cond, {
615             group_by => 'level'
616             });
617 343         100259 my $sth = $dbh->prepare($sql);
618 343         38161 $sth->execute(@bind);
619 343         3710 while(my $record = $sth->fetchrow_arrayref()) {
620 395         1230 $result_obj{total} += $record->[1];
621 395         2809 $result_obj{$record->[0]} = $record->[1];
622             }
623 343         9932 return (undef, \%result_obj);
624             }catch {
625 0     0   0 my $e = shift;
626 0         0 return ($e);
627 603         5467 };
628 603         11817 @_ = @results;
629 603         2638 goto $callback;
630             }
631              
632             sub _add_to_delete_count {
633 126     126   381 my ($self, $dbh, $add_count) = @_;
634 126 50       815 return if $self->{vacuum_on_delete} <= 0;
635 126 100       427 return if $add_count <= 0;
636 122         874 my ($sql, @bind) = $self->{maker}->update(
637             'delete_counts',
638             [delete_count => sql_raw('delete_count + ?', $add_count)],
639             sql_eq(delete_count_id => $DELETE_COUNT_ID));
640 122         22946 $dbh->do($sql, undef, @bind);
641            
642 122         398963 ($sql, @bind) = $self->{maker}->select('delete_counts', ["delete_count"], sql_eq(delete_count_id => $DELETE_COUNT_ID));
643 122         37796 my $row = $dbh->selectrow_arrayref($sql, undef, @bind);
644 122 50       15236 if(!defined($row)) {
645 0         0 die "no delete_counts row with delete_count_id = $DELETE_COUNT_ID. something is wrong.";
646             }
647 122         452 my $current_delete_count = $row->[0];
648              
649 122 100       778 if($current_delete_count >= $self->{vacuum_on_delete}) {
650 17         116 $self->_do_vacuum($dbh);
651             }
652             }
653              
654             sub _do_vacuum {
655 19     19   2109 my ($self, $dbh) = @_;
656 19         124 my ($sql, @bind) = $self->{maker}->update('delete_counts', [delete_count => 0], sql_eq(delete_count_id => $DELETE_COUNT_ID));
657 19         2907 $dbh->do($sql, undef, @bind);
658 19         96851 $dbh->do('VACUUM');
659             }
660              
661             sub vacuum {
662 2     2 1 2454 my ($self) = @_;
663 2         18 $self->_do_vacuum($self->_get_my_dbh());
664             }
665              
666             sub contains {
667 47     47 1 6677 my ($self, %args) = @_;
668 47         98 my $timeline = $args{timeline};
669 47 100       324 croak 'timeline parameter is mandatory' if not defined $timeline;
670 45         82 my $callback = $args{callback};
671 45 100       326 croak 'callback parameter is mandatory' if not defined $callback;
672 43         72 my $query = $args{query};
673 43 100       288 croak 'query parameter is mandatory' if not defined $query;
674 41         135 my $ref_query = ref($query);
675 41 100 100     313 if(!$ref_query || $ref_query eq 'HASH') {
    50          
676 16         43 $query = [$query];
677             }elsif($ref_query eq 'ARRAY') {
678             ;
679             }else {
680 0         0 croak 'query parameter must be either a status, status ID or array-ref';
681             }
682 41 100       101 if(grep { !defined($_) } @$query) {
  195         327  
683 2         176 croak "query element must be defined";
684             }
685             my @method_result = try {
686 39     39   1488 my $dbh = $self->_get_my_dbh();
687 39         123 my $timeline_id = $self->_get_timeline_id($dbh, $timeline);
688 39         105 my @ret_contained = ();
689 39         70 my @ret_not_contained = ();
690 39 100       112 if(!defined($timeline_id)) {
691 4         14 @ret_not_contained = @$query;
692 4         66 return (undef, \@ret_contained, \@ret_not_contained);
693             }
694 35         43 my $sth;
695 35         103 foreach my $query_elem (@$query) {
696 132 100       392 my $status_id = (ref($query_elem) eq 'HASH') ? $query_elem->{id} : $query_elem;
697 132 100       232 if(!defined($status_id)) {
698             ## ID-less statuses are always 'not contained'.
699 12         21 push @ret_not_contained, $query_elem;
700 12         26 next;
701             }
702 120         707 my ($sql, @bind) = $self->{maker}->select(
703             'statuses', ['timeline_id', 'status_id'],
704             sql_and([sql_eq(timeline_id => $timeline_id), sql_eq(status_id => $status_id)])
705             );
706 120 100       42631 if(!$sth) {
707 28         189 $sth = $dbh->prepare($sql);
708             }
709 120         5062 $sth->execute(@bind);
710 120         1024 my $result = $sth->fetchall_arrayref();
711 120 50       283 if(!defined($result)) {
712 0         0 confess "Statement handle is inactive. Something is wrong.";
713             }
714 120 100       388 if(@$result) {
715 67         233 push @ret_contained, $query_elem;
716             }else {
717 53         176 push @ret_not_contained, $query_elem;
718             }
719             }
720 35         1198 return (undef, \@ret_contained, \@ret_not_contained);
721             }catch {
722 0     0   0 my $e = shift;
723 0         0 return ($e);
724 39         408 };
725 39         945 @_ = @method_result;
726 39         196 goto $callback;
727             }
728              
729             sub get_timeline_names {
730 10     10 1 3259 my ($self) = @_;
731 10         32 my $dbh = $self->_get_my_dbh();
732 10         60 my ($sql, @bind) = $self->{maker}->select(
733             'timelines', ['name']
734             );
735 10         2261 my $result = $dbh->selectall_arrayref($sql, undef, @bind);
736 10         2203 my @return = map { $_->[0] } @$result;
  16         57  
737 10         461 return @return;
738             }
739              
740             1;
741              
742             __END__