File Coverage

blib/lib/DBIx/TransactionManager/Distributed.pm
Criterion Covered Total %
statement 96 96 100.0
branch 29 40 72.5
condition 6 10 60.0
subroutine 14 14 100.0
pod 5 5 100.0
total 150 165 90.9


line stmt bran cond sub pod time code
1             package DBIx::TransactionManager::Distributed;
2              
3 1     1   51116 use strict;
  1         3  
  1         24  
4 1     1   3 use warnings;
  1         1  
  1         19  
5              
6 1     1   3 use Exporter qw(import export_to_level);
  1         1  
  1         39  
7              
8             =head1 NAME
9              
10             DBIx::TransactionManager::Distributed;
11              
12             =head1 VERSION
13              
14             0.02
15              
16             =cut
17              
18             our $VERSION = "0.02";
19              
20             =head1 DESCRIPTION
21              
22             Generic database handling utilities.
23              
24             Currently provides a minimal database handle tracking facility, allowing code
25             to request a transaction against all active database handles.
26              
27             =head1 SYNOPSIS
28              
29             use DBIx::TransactionManager::Distributed qw(register_dbh release_dbh txn);
30             my $dbh1 = DBI->connect('dbi:Pg', '', '', { RaiseError => 1});
31             my $dbh2 = DBI->connect('dbi:Pg', '', '', { RaiseError => 1});
32             my $dbh3 = DBI->connect('dbi:Pg', '', '', { RaiseError => 1});
33              
34             register_dbh(category1 => $dbh1);
35             register_dbh(category1 => $dbh2);
36             register_dbh(category2 => $dbh2);
37             register_dbh(category2 => $dbh3);
38              
39             txn { $dbh1->do('update ta set name = "a"'); $dbh2->do('insert into tb values (1)') } 'category1';
40             txn { $dbh2->do('update tc set name = "b"'); $dbh3->do('insert into td values (2)') } 'category2';
41              
42             release_dbh(category1 => $dbh1);
43             release_dbh(category1 => $dbh1);
44             release_dbh(category2 => $dbh2);
45             release_dbh(category3 => $dbh3);
46              
47             =cut
48              
49 1     1   4 use Scalar::Util qw(weaken refaddr);
  1         1  
  1         46  
50 1     1   424 use List::UtilsBy qw(extract_by);
  1         1181  
  1         878  
51              
52             our @EXPORT_OK = qw(register_dbh release_dbh dbh_is_registered txn register_cached_dbh);
53              
54             # List of all retained handles by category. Since we don't expect to update
55             # the list often, and the usual action is to iterate through them all in
56             # sequence, we're using an array rather than a hash.
57             # Each $dbh will be stored as a weakref: all calls to register_dbh should
58             # be matched with a release_dbh or global destruction, but we can recover
59             # (and complain) if that doesn't happen.
60             my %DBH;
61              
62             # Where we registered the dbh originally - top level key is category, second
63             # level is refaddr.
64             my %DBH_SOURCE;
65              
66             # Last PID we saw - used for invalidating stale DBH on fork
67             my $PID = $$;
68              
69             our $IN_TRANSACTION = 0;
70              
71             =head2 register_dbh
72              
73             Records the given database handle as being active and available for running transactions against.
74              
75             Expects a category (string value) and L instance.
76              
77             Returns the database handle.
78              
79             Example:
80              
81             sub _dbh {
82             my $dbh = DBI->connect('dbi:Pg', '', '', { RaiseError => 1});
83             return DBIx::TransactionManager::Distributaed::register_dbh(category => $dbh);
84             }
85              
86             =cut
87              
88             sub register_dbh {
89 20     20 1 7017 my ($category, $dbh) = @_;
90 20 50       47 die "too many parameters to register_dbh: @_" if @_ > 2;
91 20         38 my $addr = refaddr $dbh;
92 20 100       29 if (dbh_is_registered($category, $dbh)) {
93 1         15 warn "already registered this database handle at " . $DBH_SOURCE{$category}{$addr};
94 1         6 return;
95             }
96 19         18 push @{$DBH{$category}}, $dbh;
  19         32  
97 19         39 weaken($DBH{$category}[-1]);
98             # filename:line (package::sub)
99 19         164 $DBH_SOURCE{$category}{$addr} = sprintf "%s:%d (%s::%s)", (caller 1)[1, 2, 0, 3];
100             # We may be connecting partway through a transaction - if so, we want to join this handle onto the list of
101             # active transactions
102 19 100 66     61 $dbh->begin_work if $IN_TRANSACTION && $dbh->{AutoCommit};
103 19         355 $dbh;
104             }
105              
106             =head2 release_dbh
107              
108             Marks the given database handle as no longer active - it will not be used for any further transaction requests
109             via L.
110              
111             Returns the database handle.
112              
113             Example:
114              
115             sub DESTROY {
116             my $self = shift;
117             return if ${^GLOBAL_PHASE} eq 'DESTRUCT';
118             DBIx::TransactionManager::Distributaed::release_dbh($self->dbh)->disconnect;
119             }
120              
121             =cut
122              
123             sub release_dbh {
124 5     5 1 2127 my ($category, $dbh) = @_;
125 5 50       13 die "too many parameters to release_dbh: @_" if @_ > 2;
126             # At destruction we may have an invalid handle
127 5 50       14 my $addr = refaddr $dbh or return $dbh;
128 5 100       8 unless (dbh_is_registered($category, $dbh)) {
129 1         9 my @other_categories = grep exists $DBH_SOURCE{$_}{$addr}, sort keys %DBH_SOURCE;
130 1 50       18 warn "releasing unregistered dbh $dbh for category $category"
131             . (@other_categories ? " (but found it in these categories instead: " . join ', ', @other_categories, ')' : '');
132             # If we did find it elsewhere, make sure we do cleanup to reduce confusion
133 1         6 _remove_dbh_from_category($_ => $dbh) for @other_categories;
134             }
135 5         11 _remove_dbh_from_category($category => $dbh);
136 5         27 return $dbh;
137             }
138              
139             =head2 _remove_dbh_from_category
140              
141             Helper function to reduce common code - removes the given C<$dbh> from a single category.
142              
143             Used internally.
144              
145             =cut
146              
147             sub _remove_dbh_from_category {
148 6     6   5 my ($category, $dbh) = @_;
149 6 50       14 my $addr = refaddr $dbh or return $dbh;
150 6         10 delete $DBH_SOURCE{$category}{$addr};
151             # avoiding grep here because these are weakrefs and we want them to stay that way.
152             # since they're weakrefs, some of these may be undef
153 6 50   5   16 extract_by { $addr == (defined($_) ? refaddr($_) : 0) } @{$DBH{$category}};
  5         46  
  6         19  
154 6         43 return $dbh;
155             }
156              
157             =head2 dbh_is_registered
158              
159             Returns true if the provided database handle has been registered already.
160              
161             Used when registering a handle acquired via L.
162              
163             register_dbh($category => $dbh) unless dbh_is_registered($category => $dbh);
164              
165             =cut
166              
167             sub dbh_is_registered {
168 34     34 1 299 my ($category, $dbh) = @_;
169 34 50       54 die "too many parameters to register_dbh: @_" if @_ > 2;
170 34         40 _check_fork();
171 34         46 my $addr = refaddr $dbh;
172 34 100       122 return exists $DBH_SOURCE{$category}{$addr} ? 1 : 0;
173             }
174              
175             =head2 register_cached_dbh
176              
177             Records the given database handle created via L as being active and available for running transactions against.
178              
179             Expects a category (string value) and L instance.
180              
181             Returns the database handle.
182              
183             Example:
184              
185             sub _dbh {
186             my $dbh = DBI->connect_cached('dbi:Pg', '', '', { RaiseError => 1});
187             return register_cached_dbh('category' => $dbh);
188             }
189              
190             =cut
191              
192             sub register_cached_dbh {
193 1     1 1 783 my ($category, $dbh) = @_;
194 1 50       2 register_dbh($category => $dbh) unless dbh_is_registered($category => $dbh);
195 1 0 33     2 $dbh->begin_work if $IN_TRANSACTION && $dbh->{AutoCommit};
196 1         2 $dbh;
197             }
198              
199             =head2 txn
200              
201             Runs the given coderef in a transaction.
202              
203             Expects a coderef and one or more database handle categories.
204              
205             Will call L for every known database handle in the given category,
206             run the code, then call L on success, or L on failure.
207              
208             Will raise an exception on failure, or return an empty list on success.
209              
210             Example:
211              
212             txn { dbh()->do('NOTIFY something') } 'category';
213              
214             WARNING: This only applies transactions to known database handles. Anything else -
215             Redis, cache layers, files on disk - is out of scope. Transactions are a simple
216             L / L pair, there's no 2-phase commit or other
217             distributed transaction co-ordination happening here.
218              
219             =cut
220              
221             sub txn(&;@) {
222 6     6 1 4000 my ($code, @categories) = @_;
223 6 50       15 die "Need a database category" unless @categories;
224 6         10 _check_fork();
225 6         7 my $wantarray = wantarray;
226 6         8 for my $category (@categories) {
227 6 100   10   22 if (my $count = () = extract_by { !defined($_) } @{$DBH{$category}}) {
  10         58  
  6         19  
228 1         15 warn "Had $count database handles that were not released via release_dbh, probable candidates follow:\n";
229 1         4 my %addr = map { ; refaddr($_) => 1 } @{$DBH{$category}};
  1         4  
  1         3  
230 1         2 warn "unreleased dbh in $_\n" for sort delete @{$DBH_SOURCE{$category}}{grep !exists $addr{$_}, keys %{$DBH_SOURCE{$category}}};
  1         8  
  1         4  
231             }
232             }
233              
234 6         39 my @rslt;
235             eval {
236 6         13 for my $category (@categories) {
237 6   50     5 $_->{AutoCommit} && $_->begin_work for @{$DBH{$category}};
  6         32  
238             }
239 6         1561 local $IN_TRANSACTION = 1;
240             # We want to pass through list/scalar/void context to the coderef
241 6 100       16 if ($wantarray) {
    100          
242 1         3 @rslt = $code->();
243             } elsif (defined $wantarray) {
244 1         2 $rslt[0] = $code->();
245             } else {
246 4         7 $code->();
247             }
248 5         1559 _check_fork();
249 5         5 for my $category (@categories) {
250             # Note that we may hit exceptions here, and we want to raise them since it means the
251             # database activity did not complete as expected
252 5         6 $_->commit for grep defined, @{$DBH{$category}}; # might have closed database handle(s) in $code
  5         23  
253             }
254 5         728 1;
255 6 100       8 } or do {
256 1         182 my $err = $@;
257 1         5 warn "Error in transaction: $err";
258 1         5 for my $category (@categories) {
259             eval {
260 2         7 $_->rollback;
261 1         159 1;
262 1   100     1 } or warn "after $err also had failure in rollback for dbh in category $category: $@" for grep defined, @{$DBH{$category}};
  1         5  
263             }
264 1         72 die $err;
265             };
266 5 100       19 return $wantarray ? @rslt : $rslt[0];
267             }
268              
269             =head2 _check_fork
270              
271             Test whether we have forked recently, and invalidate all our caches if we have.
272              
273             Returns true if there has been a fork since last check, false otherwise.
274              
275             =cut
276              
277             sub _check_fork {
278 48 100   48   10652 return 0 if $PID == $$;
279 10         14 $PID = $$;
280 10         17 %DBH = ();
281 10         19 %DBH_SOURCE = ();
282 10         11 return 1;
283             }
284              
285             1;
286              
287             =head1 SEE ALSO
288              
289             =over
290              
291             =item L
292              
293             =item L
294              
295             =item L
296              
297             =back
298              
299             These modules are also handling scope-based transaction. The main difference is this one operates across database handles with different categories.
300              
301             =cut