File Coverage

blib/lib/Perinci/Tx/Manager.pm
Criterion Covered Total %
statement 529 627 84.3
branch 241 380 63.4
condition 75 111 67.5
subroutine 60 68 88.2
pod 15 19 78.9
total 920 1205 76.3


line stmt bran cond sub pod time code
1             package Perinci::Tx::Manager;
2              
3             our $DATE = '2016-06-10'; # DATE
4             our $VERSION = '0.56'; # VERSION
5              
6 4     4   87501 use 5.010001;
  4         15  
7 4     4   19 use strict;
  4         8  
  4         70  
8 4     4   15 use warnings;
  4         7  
  4         118  
9 4     4   20 use experimental 'smartmatch';
  4         9  
  4         25  
10 4     4   265 use Log::Any::IfLOG '$log';
  4         9  
  4         24  
11              
12 4     4   4998 use DBI;
  4         49870  
  4         265  
13 4     4   1895 use File::Flock::Retry;
  4         2635  
  4         105  
14 4     4   364 use File::Remove qw(remove);
  4         1447  
  4         170  
15 4     4   1414 use JSON::MaybeXS;
  4         17415  
  4         220  
16 4     4   38 use Perinci::Sub::Util qw(err);
  4         7  
  4         152  
17 4     4   21 use Scalar::Util qw(blessed);
  4         8  
  4         191  
18 4     4   23 use Package::MoreUtil qw(package_exists);
  4         6  
  4         142  
19 4     4   1663 use Time::HiRes qw(time);
  4         3828  
  4         16  
20 4     4   1840 use UUID::Random;
  4         628  
  4         24285  
21              
22             # patch, add special action to just retrieve code and meta
23             require Perinci::Access::Schemeless;
24             package
25             Perinci::Access::Schemeless;
26              
27             sub actionmeta_get_code_and_meta { +{
28 675     675 0 62734 applies_to => ['function'],
29             summary => "Get code and metadata",
30             } }
31              
32             sub action_get_code_and_meta {
33 666     666 0 42828 my ($self, $req) = @_;
34              
35 666         1155 my $res;
36              
37 666         2017 $res = $self->get_code($req);
38 666 50       115958 return $res if $res;
39              
40 666         1990 $res = $self->get_meta($req);
41 666 50       30979 return $res if $res;
42              
43 666         2813 [200, "OK", [$req->{-code}, $req->{-meta}]];
44             }
45              
46             package Perinci::Tx::Manager;
47              
48             my $proto_v = 2;
49              
50             our $ep = ""; # error prefix
51             our $lp = "[tm]"; # log prefix
52              
53             my $json = JSON::MaybeXS->new->allow_nonref;
54              
55             # this is used for testing purposes only (e.g. to simulate crash)
56             our %_hooks;
57             our %_settings = (
58             default_rollback_on_action_failure => 1,
59             );
60              
61             # note: to avoid confusion, whenever we mention 'transaction' (or tx for short)
62             # in the code, we must always specify whether it is a sqlite tx (sqltx) or a
63             # Rinci tx (Rtx).
64              
65             # note: no method should die(), we should return error response instead. this is
66             # historical (we are called by Perinci::Access::Schemeless and in turn it is
67             # called by Perinci::Access::HTTP::Server, they used to have no wrapper eval(),
68             # but that turns out to be rather unsafe). an exception to this is in _init(),
69             # when we don't want to deal with old data and just die.
70              
71             # note: we have not dealt with sqlite's rowid wraparound. since it's a 64-bit
72             # integer, we're pretty safe. we also usually rely on ctime first for sorting.
73              
74             # new() should return object on success, or an error string if failed (fatal
75             # error). the other methods (internal or external) returns enveloped result.
76             sub new {
77 33     33 1 2148 my ($class, %opts) = @_;
78 33 50       218 return "Please supply pa object" unless blessed $opts{pa};
79             return "pa object must be an instance of Perinci::Access::Schemeless"
80 33 50       214 unless $opts{pa}->isa("Perinci::Access::Schemeless");
81              
82 33         95 my $obj = bless \%opts, $class;
83 33 50       108 if ($opts{data_dir}) {
84 33 100       618 unless (-d $opts{data_dir}) {
85 4 50       212 mkdir $opts{data_dir} or return "Can't mkdir $opts{data_dir}: $!";
86             }
87             } else {
88 0         0 for ("$ENV{HOME}/.perinci", "$ENV{HOME}/.perinci/.tx") {
89 0 0       0 unless (-d $_) {
90 0 0       0 mkdir $_ or return "Can't mkdir $_: $!";
91             }
92             }
93 0         0 $opts{data_dir} = "$ENV{HOME}/.perinci/.tx";
94             }
95 33         151 my $res = $obj->_init;
96 33 50       164 return $res->[1] unless $res->[0] == 200;
97 33         194 $obj;
98             }
99              
100             sub _lock_db {
101 325     325   749 my ($self, $shared) = @_;
102              
103 325         589 eval {
104 325 100       1039 unless ($self->{_lock}) {
105 124         1389 $self->{_lock} = File::Flock::Retry->lock(
106             "$self->{_db_file}.lck", {retries=>5, shared=>1});
107             }
108             };
109 325 50       17684 return [532, "Tx database is still locked by other process ".
110             "(probably recovery) after 5 seconds, giving up: $@"]
111             if $@;
112 325         927 [200];
113             }
114              
115             sub _unlock_db {
116 91     91   243 my ($self) = @_;
117              
118 91         512 undef $self->{_lock};
119 91         6125 [200];
120             }
121              
122             sub _init {
123 33     33   92 my ($self) = @_;
124 33         96 my $data_dir = $self->{data_dir};
125 33         284 $log->tracef("$lp Initializing data dir %s ...", $data_dir);
126              
127 33 100       484 unless (-d "$self->{data_dir}/.trash") {
128 4 50       141 mkdir "$self->{data_dir}/.trash"
129             or return [532, "Can't create .trash dir: $!"];
130             }
131 33 100       400 unless (-d "$self->{data_dir}/.tmp") {
132 4 50       128 mkdir "$self->{data_dir}/.tmp"
133             or return [532, "Can't create .tmp dir: $!"];
134             }
135              
136 33         156 $self->{_db_file} = "$data_dir/tx.db";
137              
138 33 50       285 (-d $data_dir)
139             or return [532, "Transaction data dir ($data_dir) doesn't exist ".
140             "or not a dir"];
141 33 50       385 my $dbh = DBI->connect("dbi:SQLite:dbname=$self->{_db_file}", undef, undef,
142             {
143             RaiseError => 0,
144             #sqlite_use_immediate_transaction => 1
145             })
146             or return [532, "Can't connect to transaction DB: $DBI::errstr"];
147              
148             # init database
149              
150 33         47910 local $ep = "Can't init tx db:"; # error prefix
151              
152 33 50       170 $dbh->do(<<_) or return [532, "$ep create tx: ". $dbh->errstr];
153             CREATE TABLE IF NOT EXISTS tx (
154             ser_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
155             str_id VARCHAR(200) NOT NULL,
156             owner_id VARCHAR(64) NOT NULL,
157             summary TEXT,
158             status CHAR(1) NOT NULL, -- i, a, C, U, R, u, v, d, e, X [uppercase=final]
159             ctime REAL NOT NULL,
160             commit_time REAL,
161             last_action_id INTEGER,
162             UNIQUE (str_id)
163             )
164             _
165              
166             # for tx with status=i, last_action_id is the in-progress action ID, set
167             # when in the middle of processing actions, then unset again after action
168             # has finished. during recovery, if tx with status=i still has this field
169             # set, it means it has crashed in the middle of action.
170             #
171             # for tx with other transient status (a, u/v, d/e) this field is used to
172             # mark which action has been processed. rollback/roll forward will start
173             # from this action instead of having to start from the first action of
174             # transaction.
175              
176 33 50       57533 $dbh->do(<<_) or return [532, "$ep create do_action: ". $dbh->errstr];
177             CREATE TABLE IF NOT EXISTS do_action (
178             tx_ser_id INTEGER NOT NULL, -- refers tx(ser_id)
179             id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
180             sp TEXT, -- for named savepoint
181             ctime REAL NOT NULL,
182             f TEXT NOT NULL,
183             args TEXT NOT NULL,
184             UNIQUE(sp)
185             )
186             _
187              
188 33 50       27286 $dbh->do(<<_) or return [532, "$ep create undo_action: ". $dbh->errstr];
189             CREATE TABLE IF NOT EXISTS undo_action (
190             tx_ser_id INTEGER NOT NULL, -- refers tx(ser_id)
191             action_id INTEGER NOT NULL, -- refers do_action(id)
192             id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
193             ctime REAL NOT NULL,
194             f TEXT NOT NULL,
195             args TEXT NOT NULL
196             )
197             _
198              
199 33 50       27034 $dbh->do(<<_) or return [532, "$ep create _meta: ".$dbh->errstr];
200             CREATE TABLE IF NOT EXISTS _meta (
201             name TEXT PRIMARY KEY NOT NULL,
202             value TEXT
203             )
204             _
205 33 50       27978 $dbh->do(<<_) or return [532, "$ep insert v: ".$dbh->errstr];
206             -- v is incremented everytime schema changes
207             INSERT OR IGNORE INTO _meta VALUES ('v', '5')
208             _
209              
210             # deal with table structure changes
211             UPDATE_SCHEMA:
212 33         19877 while (1) {
213 33         342 my ($v) = $dbh->selectrow_array(
214             "SELECT value FROM _meta WHERE name='v'");
215 33 50       4341 if ($v <= 3) {
    50          
216              
217             # changes incompatible (no longer undo_step and redo_step tables),
218             # can lose data. we bail and let user decide for herself.
219              
220 0         0 die join(
221             "",
222             "Your transaction database ($self->{_db_file}) is still at v=3",
223             ", there is incompatible changes with newer version. ",
224             "Either delete the transaction database (and lose undo data) ",
225             "or use an older version of ".__PACKAGE__." (0.28 or older).\n",
226             );
227              
228             } elsif ($v == 4) {
229              
230 0         0 eval {
231 0         0 local $dbh->{RaiseError} = 1;
232 0         0 $dbh->begin_work;
233              
234             # rename field: last_call_id -> last_action_id
235 0         0 $dbh->do("ALTER TABLE tx RENAME TO tmp_tx");
236 0         0 $dbh->do(<<'_');
237             CREATE TABLE IF NOT EXISTS tx (
238             ser_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
239             str_id VARCHAR(200) NOT NULL,
240             owner_id VARCHAR(64) NOT NULL,
241             summary TEXT,
242             status CHAR(1) NOT NULL, -- i, a, C, U, R, u, v, d, e, X [uppercase=final]
243             ctime REAL NOT NULL,
244             commit_time REAL,
245             last_action_id INTEGER,
246             UNIQUE (str_id)
247             )
248             _
249 0         0 $dbh->do(<<'_');
250             INSERT INTO tx (ser_id,str_id,owner_id,summary,status,ctime,commit_time,last_action_id)
251             SELECT ser_id,str_id,owner_id,summary,status,ctime,commit_time,last_call_id FROM tmp_tx
252             _
253              
254 0         0 $dbh->do("DROP TABLE tmp_tx");
255 0         0 $dbh->do("DROP TABLE call");
256 0         0 $dbh->do("DROP TABLE undo_call");
257 0         0 $dbh->do("UPDATE _meta SET value='5' WHERE name='v'");
258             # delete column sp, not yet
259 0         0 $dbh->commit;
260             };
261 0         0 my $e = $@;
262 0 0       0 do { $dbh->rollback; die $e } if $e;
  0         0  
  0         0  
263              
264             } else {
265             # already the latest schema version
266 33         124 last UPDATE_SCHEMA;
267             }
268             }
269              
270 33         147 $self->{_dbh} = $dbh;
271 33         317 $log->tracef("$lp Data dir initialization finished");
272 33         215 $self->_recover;
273             }
274              
275             sub get_trash_dir {
276 0     0 1 0 my ($self) = @_;
277 0         0 my $tx = $self->{_cur_tx};
278 0 0       0 return [412, "No current transaction, won't create trash dir"] unless $tx;
279 0         0 my $d = "$self->{data_dir}/.trash/$tx->{ser_id}";
280 0 0       0 unless (-d $d) {
281 0 0       0 mkdir $d or return [532, "Can't mkdir $d: $!"];
282             }
283 0         0 [200, "OK", $d];
284             }
285              
286             sub get_tmp_dir {
287 0     0 1 0 my ($self) = @_;
288 0         0 my $tx = $self->{_cur_tx};
289 0 0       0 return [412, "No current transaction, won't create tmp dir"] unless $tx;
290 0         0 my $d = "$self->{data_dir}/.tmp/$tx->{ser_id}";
291 0 0       0 unless (-d $d) {
292 0 0       0 mkdir $d or return [532, "Can't mkdir $d: $!"];
293             }
294 0         0 [200, "OK", $d];
295             }
296              
297             sub get_func_and_meta {
298 666     666 0 1473 my ($self, $func) = @_;
299              
300 666 50       3996 my ($module, $leaf) = $func =~ /(.+)::(.+)/
301             or return [400, "Not a valid fully qualified function name: $func"];
302 666         1546 my $module_p = $module; $module_p =~ s!::!/!g; $module_p .= ".pm";
  666         1322  
  666         1158  
303 666         1066 eval { require $module_p };
  666         3770  
304 666         1275 my $req_err = $@;
305 666 50       2713 if ($req_err) {
    50          
306 0 0       0 if (!package_exists($module)) {
    0          
307 0         0 return [532, "Can't load module $module (probably ".
308             "mistyped or missing module): $req_err"];
309             } elsif ($req_err !~ m!Can't locate!) {
310 0         0 return [532, "Can't load module $module (probably ".
311             "compile error): $req_err"];
312             }
313             # require error of "Can't locate ..." can be ignored. it
314             # might mean package is already defined by other code. we'll
315             # try and access it anyway.
316             } elsif (!package_exists($module)) {
317             # shouldn't happen
318 0         0 return [532, "Module loaded OK, but no $module package ".
319             "found, something's wrong"];
320             }
321             # get metadata as well as wrapped
322 666         11166 my $res = $self->{pa}->request(get_code_and_meta => "/$module/$leaf");
323 666         5968 $res;
324             }
325              
326             # about _in_sqltx: DBI/DBD::SQLite currently does not support checking whether
327             # we are in an active sqltx, except $dbh->{BegunWork} which is undocumented. we
328             # use our own flag here.
329              
330             # just a wrapper to avoid error when rollback with no active tx
331             sub _rollback_dbh {
332 185     185   418 my $self = shift;
333 185 100       3009 $self->{_dbh}->rollback if $self->{_in_sqltx};
334 185         469 $self->{_in_sqltx} = 0;
335 185         567 [200];
336             }
337              
338             # just a wrapper to avoid error when committing with no active tx
339             sub _commit_dbh {
340 184     184   393 my $self = shift;
341 184 100       648 return [200] unless $self->{_in_sqltx};
342 106         409846 my $res = $self->{_dbh}->commit;
343 106         633 $self->{_in_sqltx} = 0;
344 106 50       696 $res ? [200] : [532, "db: Can't commit: ".$self->{_dbh}->errstr];
345             }
346              
347             # just a wrapper to avoid error when beginning twice
348             sub _begin_dbh {
349 234     234   490 my $self = shift;
350 234 100       714 return [200] if $self->{_in_sqltx};
351 232         1598 my $res = $self->{_dbh}->begin_work;
352 232         4561 $self->{_in_sqltx} = 1;
353 232 50       1051 $res ? [200] : [532, "db: Can't begin: ".$self->{_dbh}->errstr];
354             }
355              
356             sub _test_tx_support {
357 666     666   1324 my ($self, $meta) = @_;
358 666   100     1978 my $ff = $meta->{features} // {};
359             $ff->{tx} or
360 666 100       1801 return [412, "function does not support transaction"];
361 665 50 50     2194 ($ff->{tx}{v} // 1) == $proto_v
362             or return [412, "function does not support correct transaction ".
363             "protocol version (v=$proto_v needed)"];
364             $ff->{idempotent} or
365 665 50       1591 return [412, "function does not declare idempotent feature"];
366 665         1496 [200];
367             }
368              
369             # check actions. actions should be [[f,args,JSON(args),cid?,\&code?,$meta?],
370             # ...]. this function will check whether function name is valid, whether
371             # arguments can be deserialized, etc. modify actions in-place (e.g. qualify
372             # function names if $opts->{qualify} is set, decode/encode JSON for arguments,
373             # cache function in [4], cache meta in [5]).
374             sub _check_actions {
375 466     466   1122 my ($self, $actions, $opts) = @_;
376 466   100     1623 $opts //= {};
377 466 50       1448 return [532, "BUG: argument 'actions' not an array"]
378             unless ref($actions) eq 'ARRAY';
379 466         903 my $i = 0;
380 466         1112 for my $a (@$actions) {
381 667         1216 $i++;
382 667         1932 local $ep = "action #$i ($a->[0]): invalid action";
383 667 50       1855 return [532, "$ep: not an array"] unless ref($a) eq 'ARRAY';
384             $a->[0] = "$opts->{qualify}::$a->[0]"
385 667 100 66     3377 if $opts->{qualify} && $a->[0] !~ /::/;
386 667 50       4268 return [532, "$ep: invalid function name"]
387             unless $a->[0] =~ /\A\w+(::\w+)+\z/;
388 667         1373 eval {
389 667 100       2234 if ($a->[2]) {
    50          
390 241         1735 $a->[1] = $json->decode($a->[2]);
391             } elsif ($a->[1]) {
392 426         2938 $a->[2] = $json->encode($a->[1]);
393             }
394             };
395 667 100       2015 return [532, "$ep: can't decode/encode JSON arguments: $@"] if $@;
396 666         1811 my $res = $self->get_func_and_meta($a->[0]);
397 666 50       1872 return err(532, "$ep: can't get metadata", $res)
398             unless $res->[0] == 200;
399 666         1175 my ($func, $meta) = @{$res->[2]};
  666         1600  
400 666         1741 $res = $self->_test_tx_support($meta);
401 666 100       1878 return err(532, "$ep: function does not pass tx support test", $res)
402             unless $res->[0] == 200;
403 665         1495 $a->[4] = $func;
404 665         1919 $a->[5] = $meta;
405             }
406 464         1153 [200];
407             }
408              
409             sub _set_tx_status_before_or_after_actions {
410 287     287   803 my ($self, $which0, $whicha) = @_;
411              
412 287         617 my $dbh = $self->{_dbh};
413 287         562 my $tx = $self->{_cur_tx};
414              
415 287         732 my $os = $tx->{status};
416 287         560 my $ns; # temporary new status during processing
417             my $fs; # desired final status
418 287 100       1168 if ($whicha eq 'action') {
    100          
    100          
    50          
419             # no change is expected
420 132         264 $ns = $os;
421 132         245 $fs = $os;
422             } elsif ($whicha eq 'rollback') {
423 52 100       225 $ns = $os eq 'i' ? 'a' : $os eq 'u' ? 'v' : $os eq 'd' ? 'e' : $os;
    100          
    100          
424 52 100 100     414 $fs = $os eq 'u'||$ns eq 'v' ? 'C' : $os eq 'd'||$ns eq 'e' ? 'U' : 'R';
    100 100        
425             } elsif ($whicha eq 'undo') {
426 66         131 $ns = 'u';
427 66         134 $fs = 'U';
428             } elsif ($whicha eq 'redo') {
429 37         75 $ns = 'd';
430 37         92 $fs = 'C';
431             }
432              
433 287 100       857 if ($which0 eq 'before') {
434 171 100       493 if ($ns ne $os) {
435 85         722 $log->tracef("$lp Setting transient transaction status ".
436             "%s -> %s ...", $os, $ns);
437             $dbh->do("UPDATE tx SET status='$ns', last_action_id=NULL ".
438             "WHERE ser_id=?", {}, $tx->{ser_id})
439 85 50       799 or return [532, "db: Can't update tx status $os -> $ns: ".
440             $dbh->errstr];
441             # to make sure, check once again if Rtx status is indeed updated
442             my @r = $dbh->selectrow_array(
443 85         329598 "SELECT status FROM tx WHERE ser_id=?", {}, $tx->{ser_id});
444 85 50       10937 return [532, "Can't update tx status #3 ".
445             "(tx doesn't exist in db)"] unless @r;
446 85 50       349 return [532, "Can't update tx status #2 ".
447             "(wants $ns, still $r[0])"]
448             unless $r[0] eq $ns;
449             # update row cache
450 85         302 $tx->{status} = $ns; $tx->{last_action_id} = undef;
  85         250  
451             }
452             }
453 287         565 $os = $ns;
454              
455 287 100       806 if ($which0 eq 'after') {
456 116 100       391 if ($whicha eq 'action') {
457             # reset last_action_id to mark that we are finished
458             $dbh->do("UPDATE tx SET last_action_id=NULL ".
459             "WHERE ser_id=?", {}, $tx->{ser_id})
460 55 50       369 or return [532, "db: Can't update last_action_id->NULL: ".
461             $dbh->errstr];
462             }
463              
464 116 100       203394 if ($os ne $fs) {
465 61         579 $log->tracef("$lp Setting final transaction status %s -> %s ...",
466             $ns, $fs);
467             $dbh->do("UPDATE tx SET status='$fs',last_action_id=NULL ".
468             "WHERE ser_id=?",
469             {}, $tx->{ser_id})
470 61 50       570 or return [532, "db: Can't set tx status to $fs: ".
471             $dbh->errstr];
472             # update row cache
473 61         227478 $tx->{status} = $fs; $tx->{last_action_id} = undef;
  61         196  
474             }
475             }
476              
477 287         1113 [200];
478             }
479              
480             sub _set_tx_status_before_actions {
481 171     171   340 my $self = shift;
482 171         650 $self->_set_tx_status_before_or_after_actions('before', @_);
483             }
484              
485             sub _set_tx_status_after_actions {
486 116     116   270 my $self = shift;
487 116         428 $self->_set_tx_status_before_or_after_actions('after', @_);
488             }
489              
490             # return enveloped actions (arrayref)
491             sub _get_actions_from_db {
492 94     94   264 my ($self, $which) = @_;
493              
494             # for safety, we shouldn't call this function when which='action' anyway
495 94 50       294 return [200, "OK", []] if $which eq 'action';
496              
497 94         209 my $dbh = $self->{_dbh};
498 94         225 my $tx = $self->{_cur_tx};
499              
500 94 100 66     928 my $t = $which eq 'redo' || $which eq 'rollback' && $tx->{status} eq 'v' ?
501             'do_action' : 'undo_action';
502              
503 94         212 my $lai = $tx->{last_action_id};
504             my $actions = $dbh->selectall_arrayref(
505             "SELECT f, NULL, args, id FROM $t WHERE tx_ser_id=? ".
506             ($lai ? "AND (id<>$lai AND ".
507             "ctime <= (SELECT ctime FROM $t WHERE id=$lai)) " : "").
508 94 100       1062 "ORDER BY ctime, id", {}, $tx->{ser_id});
509 94         12929 [200, "OK", [reverse @$actions]];
510             }
511              
512             # return enveloped undo actions (arrayref), this is currently used for debugging
513             sub _get_undo_actions_from_db {
514 0     0   0 my ($self, $which) = @_;
515              
516             # rollback does not record undo actions in db
517 0 0       0 return [200, "OK", []] if $which eq 'rollback';
518              
519 0         0 my $dbh = $self->{_dbh};
520 0         0 my $tx = $self->{_cur_tx};
521             my $t = $which eq 'redo' || $which eq 'rollback' && $tx->{status} eq 'v' ||
522             # we can also invoke actions during undo
523             ($which eq 'action' && !$self->{_in_undo})
524 0 0 0     0 ? 'undo_action' : 'do_action';
525              
526             my $actions = $dbh->selectall_arrayref(
527             "SELECT f, NULL, args, id FROM $t WHERE tx_ser_id=? ".
528 0         0 "ORDER BY ctime, id", {}, $tx->{ser_id});
529 0         0 [200, "OK", [reverse @$actions]];
530             }
531              
532             sub _collect_stash {
533 506     506   1112 my ($self, $res) = @_;
534 506         1291 my $s = $res->[3]{stash};
535 506 50       1727 return [200] unless ref($s) eq 'HASH';
536 0         0 $self->{_stash}{$_} = $s->{$_} for keys %$s;
537 0         0 [200];
538             }
539              
540             sub _perform_action {
541 275     275   746 my ($self, $which, $action, $opts) = @_;
542 275         502 my $res;
543              
544 275         592 my $dbh = $self->{_dbh};
545 275         506 my $tx = $self->{_cur_tx};
546              
547 275         503 my %args = %{$action->[1]};
  275         1289  
548 275         838 $args{-tx_v} = $proto_v;
549 275 100       853 $args{-tx_rollback} = 1 if $which eq 'rollback';
550 275 100       803 $args{-tx_recovery} = 1 if $self->{_in_recovery};
551 275 100       752 $args{-confirm} = 1 if $opts->{confirm};
552 275   50     1409 my $dd = $action->[5]{deps} // {};
553 275 50       717 if ($dd->{tmp_dir}) { # XXX actually need to use dep_satisfy_rel
554 0         0 $res = $self->get_tmp_dir;
555 0 0       0 return err(412, "Can't get tmp dir", $res) unless $res->[0] == 200;
556 0         0 $args{-tmp_dir} = $res->[2];
557             }
558 275 50       654 if ($dd->{trash_dir}) { # XXX actually need to use dep_satisfy_rel
559 0         0 $res = $self->get_trash_dir;
560 0 0       0 return err($res, "Can't get trash dir", $res) unless $res->[0] == 200;
561 0         0 $args{-trash_dir} = $res->[2];
562             }
563 275         724 $args{-stash} = $self->{_stash};
564              
565             # call the first time, to get undo actions
566              
567 275         643 $args{-tx_action} = 'check_state';
568 275         950 $args{-tx_action_id} = UUID::Random::generate();
569 275         18656 $self->{_res} = $res = $action->[4]->(%args);
570 275         34581 $log->tracef("$lp check_state args: %s, result: %s", \%args, $res);
571 275 100 100     1514 return err(532, "$ep: Check state failed", $res)
572             unless $res->[0] == 200 || $res->[0] == 304;
573 272 50 66     2026 $log->debug($res->[1]) if $res->[0] == 200 && $res->[1];
574 272   100     1259 my $undo_actions = $res->[3]{undo_actions} // [];
575 272         617 my $do_actions = $res->[3]{do_actions};
576 272         867 $self->_collect_stash($res);
577              
578 272         705 for ('after_check_state') {
579 272 50       889 last unless $_hooks{$_};
580 0         0 $log->tracef("$lp hook: $_");
581 0         0 $_hooks{$_}->($self, which=>$which, action=>$action, res=>$res);
582             }
583              
584 272         598 my $pkg = $action->[0]; $pkg =~ s/::\w+\z//;
  272         1315  
585 272         1176 $res = $self->_check_actions($undo_actions, {qualify=>$pkg});
586 272 50       912 return $res unless $res->[0] == 200;
587              
588 272 100       785 if ($do_actions) {
589 23         77 $res = $self->_check_actions($do_actions, {qualify=>$pkg});
590 23 50       117 return $res unless $res->[0] == 200;
591             }
592              
593             # record action
594              
595 272 50 66     1526 if ($which eq 'action' && !$self->{_in_undo} && !$self->{_in_redo}) {
      66        
596 115         332 my $t = 'do_action';
597             $dbh->do("INSERT INTO $t (tx_ser_id,ctime,f,args) ".
598             "VALUES (?,?,?,?)", {},
599 115 50       1144 $tx->{ser_id}, time(), $action->[0], $action->[2])
600             or return [532, "$ep: db: can't insert $t: ".$dbh->errstr];
601 115         563835 my $action_id = $dbh->last_insert_id("","","","");
602             $dbh->do("UPDATE tx SET last_action_id=? WHERE ser_id=?", {},
603             $action_id, $tx->{ser_id})
604 115 50       836 or return [532, "$ep: db: can't set last_action_id: ".$dbh->errstr];
605 115         420211 $action->[3] = $action_id;
606             }
607              
608             # record undo actions. rollback doesn't need to do this, failure in rollback
609             # will result in us giving up anyway.
610              
611 272 100 100     1578 unless ($which eq 'rollback' || $do_actions) {
612             # no BEGIN + COMMIT is needed here, because actions have not been
613             # performed. all these undo actions should return 304 anyway if
614             # performed during rollback
615 207         433 my $j = 0;
616 207         530 for my $ua (@$undo_actions) {
617 192         821 local $ep = "$ep undo_actions[$j] ($ua->[0])";
618 192 100       598 if ($self->{_in_undo}) {
619             $dbh->do(
620             "INSERT INTO do_action (tx_ser_id,ctime,f,args) ".
621             "VALUES (?,?,?,?)", {},
622 68 50       585 $tx->{ser_id}, time(), $ua->[0], $ua->[2])
623             or return [532, "$ep: db: can't insert undo_action: ".
624             $dbh->errstr];
625             } else {
626             $dbh->do(
627             "INSERT INTO undo_action(tx_ser_id,action_id,ctime,f,args)".
628             "VALUES (?,?,?,?,?)", {},
629 124 50       1123 $tx->{ser_id}, $action->[3], time(), $ua->[0], $ua->[2])
630             or return [532, "$ep: db: can't insert do_action: ".
631             $dbh->errstr];
632             }
633 192         921568 $j++;
634             }
635             }
636              
637             # call function "for real" this time
638              
639 272 100 66     1681 if ($do_actions && @$do_actions) {
    100          
640              
641 23         96 for ('before_inner_action') {
642 23 50       118 last unless $_hooks{$_};
643 0         0 $log->tracef("$lp hook: $_");
644 0         0 $_hooks{$_}->($self, which=>$which, actions=>$do_actions);
645             }
646              
647 23         96 $res = $self->_action($do_actions, $opts);
648 23 100       536 return $res unless $res->[0] == 200;
649              
650 16         48 for ('after_inner_action') {
651 16 50       60 last unless $_hooks{$_};
652 0         0 $log->tracef("$lp hook: $_");
653 0         0 $_hooks{$_}->($self, which=>$which,actions=>$do_actions,res=>$res);
654             }
655              
656             } elsif ($self->{_res}[0] == 200) {
657 234         976 $args{-tx_action} = 'fix_state';
658 234         7444 $self->{_res} = $res = $action->[4]->(%args);
659 234         26058 $log->tracef("$lp fix_state args: %s, result: %s", \%args, $res);
660 234 50 33     1316 return [532, "$ep: action failed", $res]
661             unless $res->[0] == 200 || $res->[0] == 304;
662 234         786 $self->_collect_stash($res);
663             }
664              
665 265         699 for ('after_fix_state') {
666 265 100       967 last unless $_hooks{$_};
667 126         674 $log->tracef("$lp hook: $_");
668 126         656 $_hooks{$_}->($self, which=>$which, action=>$action, res=>$res);
669             }
670              
671             # update last_action_id so we don't have to repeat all steps
672             # after recovery. error can be ignored here, i think.
673              
674 222 100       658 unless ($which eq 'action') {
675             $dbh->do("UPDATE tx SET last_action_id=? WHERE ser_id=?", {},
676 124         869 $action->[3], $tx->{ser_id});
677             }
678              
679 222         454368 [200];
680             }
681              
682             # rollback, undo, redo, action are all action loops. we combine them here into a
683             # common routine.
684             sub _action_loop {
685             # $actions is only for which='action'. for rollback/undo/redo, $actions is
686             # taken from the database table.
687 171     171   537 my ($self, $which, $actions, $opts) = @_;
688 171   100     682 $opts //= {};
689 171   100     1117 $opts->{rollback} //= $_settings{default_rollback_on_action_failure};
690              
691 171         357 my $res;
692              
693 171 100 100     839 local $self->{_action_nest_level} = ($self->{_action_nest_level}//0) + 1
694             if $which eq 'action';
695              
696             local $lp = "[tm] [".
697             "$which".
698 171 100       963 ($self->{_action_nest_level} ? "($self->{_action_nest_level})":"").
699             "]";
700              
701 171 50       1184 return [532, "BUG: 'which' must be rollback/undo/redo/action"]
702             unless $which =~ /\A(rollback|undo|redo|action)\z/;
703              
704             # this prevent endless loop in rollback, since we call functions when doing
705             # rollback, and functions might call $tx->rollback too upon failure.
706 171 50 33     639 return if $self->{_in_rollback} && $which eq 'rollback';
707 171 100       568 local $self->{_in_rollback} = 1 if $which eq 'rollback';
708              
709 171 100       494 local $self->{_in_undo} = 1 if $which eq 'undo';
710 171 100       495 local $self->{_in_redo} = 1 if $which eq 'redo';
711              
712 171         376 my $tx = $self->{_cur_tx};
713 171 50       472 return [532, "called w/o Rinci transaction, probably a bug"] unless $tx;
714              
715 171         321 my $dbh = $self->{_dbh};
716 171         581 $self->_rollback_dbh;
717             # we're now in sqlite autocommit mode, we use this mode for the following
718             # reasons: 1) after we set Rtx status to a/e/v/u/d, we need other clients to
719             # immediately see this, so e.g. if Rtx was i, they do not try to add steps
720             # to it. also, when performing actions, we want to update+commit after each
721             # action.
722              
723             # first we need to set the appropriate transaction status first, to prevent
724             # other clients from interfering/racing.
725 171         639 $res = $self->_set_tx_status_before_actions($which);
726 171 50       612 return $res unless $res->[0] == 200;
727              
728 171         449 $self->{_stash} = {};
729              
730             # for the main processing, we setup a giant eval loop. any error during
731             # processing, we return() from the eval and trigger a rollback (unless we
732             # are the rollback process itself, in which case we set tx status to X and
733             # give up).
734 171         393 my $eval_res = eval {
735 171 100       653 $actions = $self->_get_actions_from_db($which)->[2] unless $actions;
736             $log->tracef("$lp Actions to perform: %s",
737 171   66     810 [map {[$_->[0], $_->[2] // $_->[1]]} @$actions]);
  295         2245  
738              
739             # check the actions
740 171         989 $res = $self->_check_actions($actions);
741 171 100       630 return $res unless $res->[0] == 200;
742              
743 169         358 my $i = 0;
744 169         425 for my $action (@$actions) {
745 275         502 $i++;
746 275         1254 local $lp = "$lp [action #$i/".scalar(@$actions)." ($action->[0])]";
747 275         909 local $ep = "action #$i/".scalar(@$actions)." ($action->[0])";
748 275         891 $res = $self->_perform_action($which, $action, $opts);
749 232 100       1399 return $res unless $res->[0] == 200;
750             }
751              
752 116         445 $res = $self->_set_tx_status_after_actions($which);
753 116 50       482 return $res unless $res->[0] == 200;
754              
755 116         391 [200];
756             }; # eval
757 171         422 my $eval_err = $@;
758              
759 171 100 100     917 if ($eval_err || $eval_res->[0] != 200) {
760 55 100 100     552 if ($which eq 'rollback') {
    100 100        
761             # if failed during rolling back, we don't know what else to do. we
762             # set Rtx status to X (inconsistent) and ignore it.
763             $dbh->do("UPDATE tx SET status='X' WHERE ser_id=?",
764 12         96 {}, $tx->{ser_id});
765 12 50       44241 return $eval_err ?
766             err(532, "died during rollback: $eval_err") :
767             err(532, "error during rollback", $eval_res);
768             } elsif (!$opts->{rollback} || ($self->{_action_nest_level}//0) > 1) {
769             # do not rollback nested action or if told not to rollback
770 16 50       114 return $eval_err ?
771             err(532, "died during nested action (no rollback): $eval_err") :
772             err(532, "error during nested action (no rollback)", $eval_res);
773             } else {
774 27         130 my $rbres = $self->_rollback;
775 27 100       143 if ($rbres->[0] != 200) {
776 12         38 $rbres->[3]{prev} = $eval_res;
777 12 100       79 return $eval_err ?
778             err(532, $eval_err." (rollback failed)", $rbres) :
779             err(532, "$eval_res->[0] - $eval_res->[1] ".
780             "(rollback failed)", $rbres);
781             } else {
782 15 100       163 return $eval_err ?
783             err(532, $eval_err." (rolled back)", $eval_res) :
784             err(532, "$eval_res->[0] - $eval_res->[1] (rolled back)",
785             $eval_res);
786             }
787             }
788             }
789              
790 116 50       964 if ($log->is_trace) {
791 0         0 my $undo_actions = $self->_get_undo_actions_from_db($which)->[2];
792             $log->tracef("$lp Recorded undo actions: %s",
793 0 0       0 [map {[$_->[0], $_->[2]]} @$undo_actions])
  0         0  
794             if $undo_actions;
795             }
796              
797 116         1187 [200];
798             }
799              
800             sub _cleanup {
801 58     58   182 my ($self, $which) = @_;
802 58         513 $log->tracef("$lp Performing cleanup ...");
803              
804             # there should be only one process running
805 58         289 my $res = $self->_lock_db(undef);
806 58 50       216 return $res unless $res->[0] == 200;
807              
808 58         155 my $data_dir = $self->{data_dir};
809 58         147 my $dbh = $self->{_dbh};
810              
811 58         173 for my $subd (".trash", ".tmp") {
812 116         354 my $dir = "$data_dir/$subd";
813 116 50       1811 (-d $dir) or next;
814 116         2726 opendir my($dh), $dir;
815 116         1130 my @dirs = grep {/^\d+$/} readdir($dh);
  232         923  
816 116         1229 closedir $dh;
817 256         13745 my @tx_ids = map {$_->[0]}
818 116   50     253 @{ $dbh->selectall_arrayref("SELECT ser_id FROM tx") // []};
  116         1063  
819 116         1878 for my $tx_id (@dirs) {
820 0 0       0 next if $tx_id ~~ @tx_ids;
821 0         0 $log->tracef("Deleting %s ...", "$dir/$tx_id");
822 0         0 remove "$dir/$tx_id";
823             }
824             }
825              
826 58         387 $self->discard_all(status=>['R','X']);
827              
828             # XXX also discard all C/U Rtxs that are too old
829              
830             # XXX also rolls back all i Rtxs that have been going around too for
831             # long
832              
833 58         1071 $log->tracef("$lp Finished cleanup");
834 58         335 $self->_unlock_db;
835              
836 58         228 [200];
837             }
838              
839             sub _recover {
840 33     33   95 my ($self, $which) = @_;
841              
842 33         198 $log->tracef("$lp Performing recovery ...");
843 33         154 local $self->{_in_recovery} = 1;
844              
845             # there should be only one process running
846 33         124 my $res = $self->_lock_db(undef);
847 33 50       129 return $res unless $res->[0] == 200;
848              
849 33         88 my $dbh = $self->{_dbh};
850 33         67 my $sth;
851              
852             # rollback all transactions that need to be rolled back (crashed
853             # in-progress, failed undo, failed redo)
854 33         208 $sth = $dbh->prepare(
855             "SELECT * FROM tx WHERE status IN ('a', 'v', 'e') ".
856             "OR (status='i' AND last_action_id IS NOT NULL)".
857             "ORDER BY ctime DESC",
858             );
859 33 50       9273 $sth->execute or return [532, "db: Can't select tx: ".$dbh->errstr];
860 33         745 while (my $row = $sth->fetchrow_hashref) {
861 0         0 $self->{_cur_tx} = $row;
862 0         0 $self->_rollback;
863             }
864              
865             # continue interrupted undo
866 33         251 $sth = $dbh->prepare(
867             "SELECT * FROM tx WHERE status IN ('u') ".
868             "ORDER BY ctime DESC",
869             );
870 33 50       3987 $sth->execute or return [532, "db: Can't select tx: ".$dbh->errstr];
871 33         566 while (my $row = $sth->fetchrow_hashref) {
872 4         15 $self->{_cur_tx} = $row;
873 4         12 $self->_undo;
874             }
875              
876             # continue interrupted redo
877 33         230 $sth = $dbh->prepare(
878             "SELECT * FROM tx WHERE status IN ('d') ".
879             "ORDER BY ctime ASC",
880             );
881 33 50       3842 $sth->execute or return [532, "db: Can't select tx: ".$dbh->errstr];
882 33         562 while (my $row = $sth->fetchrow_hashref) {
883 5         19 $self->{_cur_tx} = $row;
884 5         41 $self->_redo;
885             }
886              
887             EXIT_RECOVERY:
888 33         204 $self->_unlock_db;
889 33         271 $log->tracef("$lp Finished recovery");
890 33         544 [200];
891             }
892              
893             sub _resp_incorrect_tx_status {
894 2     2   6 my ($self, $r) = @_;
895              
896 2         14 state $statuses = {
897             i => 'still in-progress',
898             a => 'aborted, further requests ignored until rolled back',
899             v => 'aborted undo, further requests ignored until rolled back',
900             e => 'aborted redo, further requests ignored until rolled back',
901             C => 'already committed',
902             R => 'already rolled back',
903             U => 'already committed+undone',
904             u => 'undoing',
905             d => 'redoing',
906             X => 'inconsistent',
907             };
908              
909 2         17 my $s = $r->{status};
910 2   50     7 my $ss = $statuses->{$s} // "unknown (bug)";
911 2         25 [480, "tx #$r->{ser_id}: Incorrect status, status is '$s' ($ss)"];
912             }
913              
914             # all methods that work inside a transaction have some common code, e.g.
915             # database file locking, starting sqltx, checking Rtx status, etc. hence
916             # refactored into _wrap(). arguments:
917             #
918             # - label (string, just a label for logging)
919             #
920             # - args* (hashref, arguments to method)
921             #
922             # - cleanup (bool, default 0). whether to run cleanup first before code. this is
923             # curently run by begin() only, to make up room by purging old transactions.
924             #
925             # - tx_status (str/array, if set then it means method requires Rtx to exist and
926             # have a certain status(es)
927             #
928             # - code (coderef, main method code, will be passed args as hash)
929             #
930             # - rollback (bool, whether we should do rollback if code does not return
931             # success
932             #
933             # - hook_check_args (coderef, will be passed args as hash)
934             #
935             # - hook_after_commit (coderef, will be passed args as hash).
936             #
937             # wrap() will also put current Rtx record to $self->{_cur_tx}
938             #
939             # return enveloped result
940             sub _wrap {
941 234     234   1001 my ($self, %wargs) = @_;
942             my $margs = $wargs{args}
943 234 50       861 or return [532, "BUG: args not passed to _wrap()"];
944 234         1600 my @caller = caller(1);
945              
946 234         551 my $res;
947              
948 234         771 $res = $self->_lock_db("shared");
949 234 50       926 return [532, "Can't acquire lock: $res"] unless $res->[0] == 200;
950              
951 234         925 $self->{_now} = time();
952              
953             # initialize & check tx_id argument
954 234   66     952 $margs->{tx_id} //= $self->{_tx_id};
955 234         469 my $tx_id = $margs->{tx_id};
956 234         540 $self->{_tx_id} = $tx_id;
957              
958 234 50 33     1294 return [400, "Please specify tx_id"]
959             unless defined($tx_id) && length($tx_id);
960 234 50       640 return [400, "Invalid tx_id, please use 1-200 characters only"]
961             unless length($tx_id) <= 200;
962              
963 234         463 my $dbh = $self->{_dbh};
964              
965 234 100       739 if ($wargs{cleanup}) {
966 58         250 $res = $self->_cleanup;
967 58 50       277 return err(532, "Can't succesfully cleanup", $res)
968             unless $res->[0] == 200;
969             }
970              
971             # we need to begin sqltx here so that client's actions like rollback() and
972             # commit() are indeed atomic and do not interfere with other clients'.
973              
974 234 50       762 $self->_begin_dbh or return [532, "db: Can't begin: ".$dbh->errstr];
975              
976 234         1255 my $cur_tx = $dbh->selectrow_hashref(
977             "SELECT * FROM tx WHERE str_id=?", {}, $tx_id);
978 234         42729 $self->{_cur_tx} = $cur_tx;
979              
980 234 50       1131 if ($wargs{hook_check_args}) {
981 0         0 $res = $wargs{hook_check_args}->(%$margs);
982 0 0       0 do { $self->_rollback; return err(532, "hook_check_args failed", $res) }
  0         0  
  0         0  
983             unless $res->[0] == 200;
984             }
985              
986 234 100       736 if ($wargs{tx_status}) {
987 176 100       510 if (!$cur_tx) {
988 12         71 $self->_rollback_dbh;
989 12         254 return [484, "No such transaction"];
990             }
991 164         327 my $ok;
992             # 'str' ~~ $aryref doesn't seem to work?
993 164 50       611 if (ref($wargs{tx_status}) eq 'ARRAY') {
994 164         395 $ok = $cur_tx->{status} ~~ @{$wargs{tx_status}};
  164         770  
995             } else {
996 0         0 $ok = $cur_tx->{status} ~~ $wargs{tx_status};
997             }
998 164 100       551 unless ($ok) {
999 2         8 $self->_rollback_dbh;
1000 2         9 return $self->_resp_incorrect_tx_status($cur_tx);
1001             }
1002             }
1003              
1004 220 50       667 if ($wargs{code}) {
1005 220         1004 $res = $wargs{code}->(%$margs, _tx=>$cur_tx);
1006             # on error, rollback and skip the rest
1007 220 100       2576 if ($res->[0] >= 400) {
1008             $self->_rollback if $wargs{rollback} // 1
1009 36 50 50     208 && ($res->[3]{rollback} // 1);
      66        
1010 36         426 return $res;
1011             }
1012             }
1013              
1014 184         693 my $res2 = $self->_commit_dbh;
1015 184 50       749 return $res2 unless $res2->[0] == 200;
1016              
1017 184 50       629 if ($wargs{hook_after_commit}) {
1018 0         0 $res2 = $wargs{hook_after_tx}->(%$margs);
1019 0 0       0 return err(532, "hook_after_tx failed", $res2) unless $res2->[0] == 200;
1020             }
1021              
1022 184         2697 $res;
1023             }
1024              
1025             # all methods that don't work inside a transaction have some common code, e.g.
1026             # database file locking. arguments:
1027             #
1028             # - args* (hashref, arguments to method)
1029             #
1030             # - lock_db (bool, default false)
1031             #
1032             # - code* (coderef, main method code, will be passed args as hash)
1033             #
1034             # return enveloped result
1035             sub _wrap2 {
1036 141     141   622 my ($self, %wargs) = @_;
1037             my $margs = $wargs{args}
1038 141 50       575 or return [532, "BUG: args not passed to _wrap()"];
1039 141         1230 my @caller = caller(1);
1040              
1041 141         356 my $res;
1042              
1043 141 50       494 if ($wargs{lock_db}) {
1044 0         0 $res = $self->_lock_db("shared");
1045 0 0       0 return err(532, "Can't acquire lock", $res) unless $res->[0] == 200;
1046             }
1047              
1048 141         566 $res = $wargs{code}->(%$margs);
1049              
1050 141 50       594 if ($wargs{lock_db}) {
1051 0         0 $self->_unlock_db;
1052             }
1053              
1054 141         725 $res;
1055             }
1056              
1057             sub begin {
1058 58     58 1 69321 my ($self, %args) = @_;
1059             $self->_wrap(
1060             args => \%args,
1061             cleanup => 1,
1062             code => sub {
1063 58     58   154 my $dbh = $self->{_dbh};
1064             my $r = $dbh->selectrow_hashref("SELECT * FROM tx WHERE str_id=?",
1065 58         309 {}, $args{tx_id});
1066 58 100       6379 return [409, "Another transaction with that ID exists", undef,
1067             {rollback=>0}] if $r;
1068              
1069             # XXX check for limits
1070              
1071             $dbh->do("INSERT INTO tx (str_id, owner_id, summary, status, ".
1072             "ctime) VALUES (?,?,?,?,?)", {},
1073             $args{tx_id}, $args{client_token}//"", $args{summary}, "i",
1074             $self->{_now})
1075 56 50 50     619 or return [532, "db: Can't insert tx: ".$dbh->errstr];
1076              
1077 56         16337 $self->{_tx_id} = $args{tx_id};
1078             $self->{_cur_tx} = $dbh->selectrow_hashref(
1079             "SELECT * FROM tx WHERE str_id=?", {}, $args{tx_id})
1080 56 50       342 or return [532, "db: Can't select tx: ".$dbh->errstr];
1081 56         6971 [200];
1082             },
1083 58         532 );
1084             }
1085              
1086             sub _action {
1087 77     77   238 my ($self, $actions, $opts) = @_;
1088 77         300 $self->_action_loop('action', $actions, $opts);
1089             }
1090              
1091             # old name, for backward compatibility
1092 0     0   0 sub _call { my $self =shift; $self->_action(@_) }
  0         0  
1093 0     0 0 0 sub call { my $self =shift; $self->action(@_) }
  0         0  
1094              
1095             sub action {
1096 55     55 1 548026 my ($self, %args) = @_;
1097              
1098 55         166 my ($f, $args, $actions);
1099 55   50     489 $actions = $args{actions} // [[$args{f}, $args{args}]];
1100 55 50       222 return [304, "No actions to do"] unless @$actions;
1101              
1102             $self->_wrap(
1103             args => \%args,
1104             # we allow calling action() during rollback, since a function can call
1105             # other function using action(), but we don't actually bother to save
1106             # the undo actions.
1107             tx_status => ["i", "d", "u", "a", "v", "e"],
1108             rollback => 0, # _action_loop already does rollback
1109             code => sub {
1110 54     54   147 my $cur_tx = $self->{_cur_tx};
1111 54 50 33     273 if ($cur_tx->{status} ne 'i' && !$self->{_in_rollback}) {
1112 0         0 return $self->_resp_incorrect_tx_status($cur_tx);
1113             }
1114              
1115 54         168 delete $self->{_res};
1116 54         303 my $res = $self->_action($actions, {confirm=>$args{confirm}});
1117 54 100 66     1641 if ($res->[0] != 200 && $res->[0] != 304) {
1118 15 100 100     197 if ($self->{_res} && $self->{_res}[0] !~ /200|304/) {
1119             return [$self->{_res}[0],
1120 3         23 $self->{_res}[1],
1121             undef,
1122             {tx_result=>$res, prev=>$res}];
1123             } else {
1124 12         52 return err(532, {prev=>$res});
1125             }
1126             } else {
1127             return [$self->{_res}[0],
1128             $self->{_res}[1],
1129             $self->{_stash}{result},
1130 39   50     290 { %{ $self->{_stash}{result_meta} // {} },
1131 39   50     173 %{ $res->[3] // {}} }];
  39         365  
1132             }
1133             },
1134 55         519 );
1135             }
1136              
1137             sub commit {
1138 33     33 1 15626 my ($self, %args) = @_;
1139             $self->_wrap(
1140             args => \%args,
1141             tx_status => ["i", "a"],
1142             code => sub {
1143 33     33   95 my $dbh = $self->{_dbh};
1144 33         91 my $tx = $self->{_cur_tx};
1145 33 50       120 if ($tx->{status} eq 'a') {
1146 0         0 my $res = $self->_rollback;
1147 0 0       0 return $res unless $res->[0] == 200;
1148 0         0 return [200, "Rolled back"];
1149             }
1150             $dbh->do(
1151 33         233 "DELETE FROM do_action WHERE tx_ser_id=?",{},$tx->{ser_id});
1152             $dbh->do("UPDATE tx SET status=?, commit_time=? WHERE ser_id=?",
1153             {}, "C", $self->{_now}, $tx->{ser_id})
1154 33 50       7769 or return [532, "db: Can't update tx status to committed: ".
1155             $dbh->errstr];
1156 33         5331 [200];
1157             },
1158 33         311 );
1159             }
1160              
1161             sub _rollback {
1162 32     32   105 my ($self) = @_;
1163 32         75 my $dbh = $self->{_dbh};
1164 32         76 my $tx = $self->{_cur_tx};
1165              
1166 32         128 my $res = $self->_action_loop('rollback');
1167 32 100       1015 return $res unless $res->[0] == 200;
1168 20         149 $dbh->do("DELETE FROM do_action WHERE tx_ser_id=?", {}, $tx->{ser_id});
1169 20         59829 $dbh->do("DELETE FROM undo_action WHERE tx_ser_id=?", {}, $tx->{ser_id});
1170 20         44460 [200];
1171             }
1172              
1173             sub _undo {
1174 38     38   126 my ($self, $opts) = @_;
1175 38         96 my $dbh = $self->{_dbh};
1176 38         86 my $tx = $self->{_cur_tx};
1177              
1178 38         142 my $res = $self->_action_loop('undo', undef, $opts);
1179 38 100       773 return $res unless $res->[0] == 200;
1180 28         196 $dbh->do("DELETE FROM undo_action WHERE tx_ser_id=?", {}, $tx->{ser_id});
1181 28         111534 [200];
1182             }
1183              
1184             sub _redo {
1185 24     24   78 my ($self, $opts) = @_;
1186 24         68 my $dbh = $self->{_dbh};
1187 24         58 my $tx = $self->{_cur_tx};
1188              
1189 24         89 my $res = $self->_action_loop('redo', undef, $opts);
1190 24 100       878 return $res unless $res->[0] == 200;
1191 13         96 $dbh->do("DELETE FROM do_action WHERE tx_ser_id=?", {}, $tx->{ser_id});
1192 13         55967 [200];
1193             }
1194              
1195             sub rollback {
1196 7     7 1 25268 my ($self, %args) = @_;
1197             $self->_wrap(
1198             args => \%args,
1199             tx_status => ["i", "a"],
1200             rollback => 0, # _action_loop already does rollback
1201             code => sub {
1202 5     5   26 $self->_rollback;
1203             },
1204 7         55 );
1205             }
1206              
1207             sub prepare {
1208 0     0 1 0 [501, "Not implemented"];
1209             }
1210              
1211             sub savepoint {
1212 0     0 1 0 [501, "Not yet implemented"];
1213             }
1214              
1215             sub release_savepoint {
1216 0     0 1 0 [501, "Not yet implemented"];
1217             }
1218              
1219             sub list {
1220 82     82 1 34084 my ($self, %args) = @_;
1221             $self->_wrap2(
1222             args => \%args,
1223             code => sub {
1224 82     82   227 my $dbh = $self->{_dbh};
1225 82         220 my @wheres = ("1");
1226 82         168 my @params;
1227 82 100       274 if ($args{tx_id}) {
1228 74         172 push @wheres, "str_id=?";
1229 74         212 push @params, $args{tx_id};
1230             }
1231 82 100       267 if ($args{tx_status}) {
1232 6         12 push @wheres, "status=?";
1233 6         16 push @params, $args{tx_status};
1234             }
1235 82         713 my $sth = $dbh->prepare(
1236             "SELECT * FROM tx WHERE ".join(" AND ", @wheres).
1237             " ORDER BY ctime, ser_id");
1238 82         10311 $sth->execute(@params);
1239 82         278 my @res;
1240 82         2375 while (my $row = $sth->fetchrow_hashref) {
1241 81 100       357 if ($args{detail}) {
1242             push @res, {
1243             tx_id => $row->{str_id},
1244             tx_status => $row->{status},
1245             tx_start_time => $row->{ctime},
1246             tx_commit_time=> $row->{commit_time},
1247             tx_summary => $row->{summary},
1248 74         1376 };
1249             } else {
1250 7         105 push @res, $row->{str_id};
1251             }
1252             }
1253 82         1159 [200, "OK", \@res];
1254             },
1255 82         736 );
1256             }
1257              
1258             sub undo {
1259 34     34 1 10702 my ($self, %args) = @_;
1260              
1261             # find latest committed tx
1262 34 50       134 unless ($args{tx_id}) {
1263 0         0 my $dbh = $self->{_dbh};
1264 0         0 my @row = $dbh->selectrow_array(
1265             "SELECT str_id FROM tx WHERE status='C' ".
1266             "ORDER BY commit_time DESC, ser_id DESC LIMIT 1");
1267 0 0       0 return [412, "There are no committed transactions to undo"] unless @row;
1268 0         0 $args{tx_id} = $row[0];
1269             }
1270              
1271             $self->_wrap(
1272             args => \%args,
1273             tx_status => ["C"],
1274             rollback => 0, # _action_loop already does rollback
1275             code => sub {
1276 34     34   132 delete $self->{_res};
1277 34         194 my $res = $self->_undo({confirm=>$args{confirm}});
1278 34 100 66     280 if ($res->[0] != 200 && $res->[0] != 304) {
1279 10 50 33     132 if ($self->{_res} && $self->{_res}[0] !~ /200|304/) {
1280             return [$self->{_res}[0],
1281 0         0 $self->{_res}[1],
1282             undef,
1283             {tx_result=>$res, prev=>$res}];
1284             } else {
1285 10         44 return err(532, {prev=>$res});
1286             }
1287             } else {
1288 24         106 return [200];
1289             }
1290             },
1291 34         284 );
1292             }
1293              
1294             sub redo {
1295 19     19 1 6945 my ($self, %args) = @_;
1296              
1297             # find first undone committed tx
1298 19 50       86 unless ($args{tx_id}) {
1299 0         0 my $dbh = $self->{_dbh};
1300 0         0 my @row = $dbh->selectrow_array(
1301             "SELECT str_id FROM tx WHERE status='U' ".
1302             "ORDER BY commit_time ASC, ser_id ASC LIMIT 1");
1303 0 0       0 return [412, "There are no undone transactions to redo"] unless @row;
1304 0         0 $args{tx_id} = $row[0];
1305             }
1306              
1307             $self->_wrap(
1308             args => \%args,
1309             tx_status => ["U"],
1310             rollback => 0, # _action_loop already does rollback
1311             code => sub {
1312 19     19   75 delete $self->{_res};
1313 19         122 my $res = $self->_redo({confirm=>$args{confirm}});
1314 19 100 66     168 if ($res->[0] != 200 && $res->[0] != 304) {
1315 11 50 33     226 if ($self->{_res} && $self->{_res}[0] !~ /200|304/) {
1316             return [$self->{_res}[0],
1317 0         0 $self->{_res}[1],
1318             undef,
1319             {tx_result=>$res, prev=>$res}];
1320             } else {
1321 11         69 return err(532, {prev=>$res});
1322             }
1323             } else {
1324 8         35 return [200];
1325             }
1326             },
1327 19         166 );
1328             }
1329              
1330             sub _discard {
1331 87     87   362 my ($self, $which, %args) = @_;
1332 87 100       324 my $wmeth = $which eq 'one' ? '_wrap' : '_wrap2';
1333             $self->$wmeth(
1334             label => $which,
1335             args => \%args,
1336             tx_status => $which eq 'one' ? ['C','U','R','X'] : undef,
1337             code => sub {
1338 76     76   199 my $dbh = $self->{_dbh};
1339 76         185 my $sth;
1340 76 100       267 if ($which eq 'one') {
1341 17         105 $sth = $dbh->prepare("SELECT ser_id FROM tx WHERE str_id=?");
1342 17         1125 $sth->execute($self->{_cur_tx}{str_id});
1343             } else {
1344 59         135 my $txs = "'C','U','R','X'";
1345 59 100       194 if ($args{status}) {
1346 116         384 $txs = join(",",map{"'$_'"}
1347 58         118 grep {/\A[CURX]\z/} @{$args{status}});
  116         445  
  58         155  
1348             }
1349 59         387 $sth = $dbh->prepare(
1350             "SELECT ser_id FROM tx WHERE status IN ($txs)");
1351 59         6320 $sth->execute;
1352             }
1353 76         237 my @txs;
1354 76         718 while (my @row = $sth->fetchrow_array) {
1355 39         325 push @txs, $row[0];
1356             }
1357 76 100       289 if (@txs) {
1358 37         129 my $txs = join(",", @txs);
1359 37 50       234 $dbh->do("DELETE FROM tx WHERE ser_id IN ($txs)")
1360             or return [532, "db: Can't delete tx: ".$dbh->errstr];
1361 37         78183 $dbh->do("DELETE FROM do_action WHERE tx_ser_id IN ($txs)");
1362 37         17402 $log->infof("$lp discard tx: %s", \@txs);
1363             }
1364 76         1015 [200];
1365             },
1366 87 100       903 );
1367             }
1368              
1369             sub discard {
1370 28     28 1 15513 my $self = shift;
1371 28         136 $self->_discard('one', @_);
1372             }
1373              
1374             sub discard_all {
1375 59     59 1 3625 my $self = shift;
1376 59         229 $self->_discard('all', @_);
1377             }
1378              
1379             1;
1380             # ABSTRACT: A Rinci transaction manager
1381              
1382             __END__
1383              
1384             =pod
1385              
1386             =encoding UTF-8
1387              
1388             =head1 NAME
1389              
1390             Perinci::Tx::Manager - A Rinci transaction manager
1391              
1392             =head1 VERSION
1393              
1394             This document describes version 0.56 of Perinci::Tx::Manager (from Perl distribution Perinci-Tx-Manager), released on 2016-06-10.
1395              
1396             =head1 SYNOPSIS
1397              
1398             # used by Perinci::Access::Schemeless
1399              
1400             =head1 DESCRIPTION
1401              
1402             This class implements transaction and undo manager (TM), as specified by
1403             L<Rinci::function::Transaction> and L<Riap::Transaction>. It is meant to be
1404             instantiated by L<Perinci::Access::Schemeless>, but will also be passed to
1405             transactional functions to save undo/redo data.
1406              
1407             It uses SQLite database to store transaction list and undo/redo data as well as
1408             transaction data directory to provide trash_dir/tmp_dir for functions that
1409             require it.
1410              
1411             =for Pod::Coverage ^(call|get_func_and_meta)$
1412              
1413             =head1 ATTRIBUTES
1414              
1415             =head2 _tx_id
1416              
1417             This is just a convenience so that methods that require tx_id will get the
1418             default value from here if tx_id not specified in arguments.
1419              
1420             =head1 METHODS
1421              
1422             =head2 new(%args) => OBJ
1423              
1424             Create new object. Arguments:
1425              
1426             =over 4
1427              
1428             =item * pa => OBJ
1429              
1430             Perinci::Access::Schemeless object. This is required by Perinci::Tx::Manager to
1431             load/get functions when it wants to perform undo/redo/recovery.
1432             Perinci::Access::Schemeless conveniently require() the Perl modules and wraps
1433             the functions.
1434              
1435             =item * data_dir => STR (default C<~/.perinci/.tx>)
1436              
1437             =item * max_txs => INT (default 1000)
1438              
1439             Limit maximum number of transactions maintained by the TM, including all rolled
1440             back and committed transactions, since they are still recorded in the database.
1441             The default is 1000.
1442              
1443             Not yet implemented.
1444              
1445             After this limit is reached, cleanup will be performed to delete rolled back
1446             transactions, and after that committed transactions.
1447              
1448             =item * max_open_txs => INT (default 100)
1449              
1450             Limit maximum number of open (in progress, aborted, prepared) transactions. This
1451             exclude resolved transactions (rolled back and committed). The default is no
1452             limit.
1453              
1454             Not yet implemented.
1455              
1456             After this limit is reached, starting a new transaction will fail.
1457              
1458             =item * max_committed_txs => INT (default 100)
1459              
1460             Limit maximum number of committed transactions that is recorded by the database.
1461             This is equal to the number of undo steps that are remembered.
1462              
1463             After this limit is reached, cleanup will automatically be performed so that
1464             the oldest committed transactions are purged.
1465              
1466             Not yet implemented.
1467              
1468             =item * max_open_age => INT
1469              
1470             Limit the maximum age of open transactions (in seconds). If this limit is
1471             reached, in progress transactions will automatically be purged because it times
1472             out.
1473              
1474             Not yet implemented.
1475              
1476             =item * max_committed_age => INT
1477              
1478             Limit the maximum age of committed transactions (in seconds). If this limit is
1479             reached, the old transactions will start to be purged.
1480              
1481             Not yet implemented.
1482              
1483             =back
1484              
1485             =head2 $tx->get_trash_dir => RESP
1486              
1487             =head2 $tx->get_tmp_dir => RESP
1488              
1489             =head2 $tm->begin(%args) => RESP
1490              
1491             Start a new transaction.
1492              
1493             Arguments: tx_id (str, required, unless already supplied via _tx_id()), twopc
1494             (bool, optional, currently must be false since distributed transaction is not
1495             yet supported), summary (optional).
1496              
1497             TM will create an entry for this transaction in its database.
1498              
1499             =head2 $tm->action(%args) => RESP
1500              
1501             Perform action for the transaction by calling one or more functions.
1502              
1503             Arguments: C<f> (fully-qualified function name), C<args> (arguments to function,
1504             hashref). Or, C<actions> (list of function calls, array, C<[[f1, args1], ...]>,
1505             alternative to specifying C<f> and C<args>), C<confirm> (bool, if set to true
1506             then will pass C<< -confirm => 1 >> special argument to functions; see status
1507             code 331 in L<Rinci::function> for more details on this).
1508              
1509             TM will also pass the following special arguments: C<< -tx_v => PROTO_VERSION
1510             >>, C<< -tx_rollback => 1 >> during rollback, and C<< -tx_recovery => 1 >>
1511             during recovery, for informative purposes.
1512              
1513             To perform a single action, specify C<f> and C<args>. To perform several
1514             actions, supply C<actions>.
1515              
1516             Note: special arguments (those started with dash, C<->) will be stripped from
1517             function arguments by TM.
1518              
1519             If response from function is not success, rollback() will be called.
1520              
1521             Tip: To call in dry-run mode to function supporting dry-run mode, or to call a
1522             pure function, you do not have to use TM's action() but rather call the function
1523             directly, since this will not have any side effects.
1524              
1525             Tip: During C<fix_state>, function can return C<stash> in result metadata which
1526             can be set to hash. This will be collected and passed by TM in C<-stash> special
1527             argument. This is useful in multiple actions where one action might need to
1528             check result from previous action.
1529              
1530             =head2 $tx->commit(%args) => RESP
1531              
1532             Commit a transaction.
1533              
1534             Arguments: C<tx_id>
1535              
1536             =head2 $tx->rollback(%args) => RESP
1537              
1538             Rollback a transaction.
1539              
1540             Arguments: C<tx_id>, C<sp_id> (optional, savepoint name to rollback to a
1541             specific savepoint only).
1542              
1543             Currently rolling back to a savepoint is not implemented.
1544              
1545             =head2 $tx->prepare(%args) => RESP
1546              
1547             Prepare a transaction.
1548              
1549             Arguments: C<tx_id>
1550              
1551             Currently will return 501 (not implemented). Rinci::Transaction does not yet
1552             support distributed transaction.
1553              
1554             =head2 $tx->savepoint(%args) => RESP
1555              
1556             Declare a savepoint.
1557              
1558             Arguments: C<tx_id>, C<sp_id> (savepoint name).
1559              
1560             Currently not implemented.
1561              
1562             =head2 $tx->release_savepoint(%args) => RESP
1563              
1564             Release (forget) a savepoint.
1565              
1566             Arguments: C<tx_id>, C<sp_id> (savepoint name).
1567              
1568             Currently not implemented.
1569              
1570             =head2 $tx->undo(%args) => RESP
1571              
1572             Undo a committed transaction.
1573              
1574             Arguments: C<tx_id>, C<confirm> (bool, if set to true then will pass C<<
1575             -confirm => 1 >> special argument to functions; see status code 331
1576             in L<Rinci::function> for more details on this).
1577              
1578             =head2 $tx->redo(%args) => RESP
1579              
1580             Redo an undone committed transaction.
1581              
1582             Arguments: C<tx_id>, C<confirm> (bool, if set to true then will pass C<<
1583             -confirm => 1 >> special argument to functions; see status code 331
1584             in L<Rinci::function> for more details on this).
1585              
1586             =head2 $tx->list(%args) => RESP
1587              
1588             List transactions.
1589              
1590             Arguments: B<detail> (bool, default 0, whether to return transaction records
1591             instead of just a list of transaction ID's).
1592              
1593             Return an array of results sorted by creation date (in ascending order).
1594              
1595             =head2 $tx->discard(%args) => RESP
1596              
1597             Discard (forget) a client's committed transaction.
1598              
1599             Arguments: C<tx_id>
1600              
1601             Transactions that can be discarded are committed, undone committed, or
1602             inconsistent ones (i.e., those with final statuses C<C>, C<U>, C<X>).
1603              
1604             =head2 $tm->discard_all(%args) => RESP
1605              
1606             Discard (forget) all committed transactions.
1607              
1608             =head1 HOMEPAGE
1609              
1610             Please visit the project's homepage at L<https://metacpan.org/release/Perinci-Tx-Manager>.
1611              
1612             =head1 SOURCE
1613              
1614             Source repository is at L<https://github.com/perlancar/perl-Perinci-Tx-Manager>.
1615              
1616             =head1 BUGS
1617              
1618             Please report any bugs or feature requests on the bugtracker website L<https://rt.cpan.org/Public/Dist/Display.html?Name=Perinci-Tx-Manager>
1619              
1620             When submitting a bug or request, please include a test-file or a
1621             patch to an existing test-file that illustrates the bug or desired
1622             feature.
1623              
1624             =head1 SEE ALSO
1625              
1626             L<Rinci::Transaction>
1627              
1628             L<Perinci::Access::Schemeless>
1629              
1630             =head1 AUTHOR
1631              
1632             perlancar <perlancar@cpan.org>
1633              
1634             =head1 COPYRIGHT AND LICENSE
1635              
1636             This software is copyright (c) 2016 by perlancar@cpan.org.
1637              
1638             This is free software; you can redistribute it and/or modify it under
1639             the same terms as the Perl 5 programming language system itself.
1640              
1641             =cut