File Coverage

blib/lib/DBIx/TxnPool.pm
Criterion Covered Total %
statement 24 142 16.9
branch 0 46 0.0
condition 0 19 0.0
subroutine 8 37 21.6
pod 8 13 61.5
total 40 257 15.5


line stmt bran cond sub pod time code
1             package DBIx::TxnPool;
2              
3 1     1   21372 use strict;
  1         2  
  1         28  
4 1     1   5 use warnings;
  1         2  
  1         41  
5 1     1   6 use Exporter 5.57 qw( import );
  1         29  
  1         44  
6              
7 1     1   1960 use Try::Tiny;
  1         2955  
  1         104  
8 1     1   1438 use Signal::Mask;
  1         18453  
  1         35  
9 1     1   8 use Carp qw( confess );
  1         2  
  1         98  
10              
11             our $VERSION = 0.12;
12             our $BlockSignals = [ qw( TERM INT ) ];
13             our @EXPORT = qw( txn_item txn_post_item txn_commit txn_sort );
14              
15             # It's better to look for the "try restarting transaction" string
16             # because sometime may be happens other error: Lock wait timeout exceeded
17 1     1   5 use constant DEADLOCK_REGEXP => qr/try restarting transaction/o;
  1         2  
  1         991  
18              
19             sub new {
20 0     0 0   my ( $class, %args ) = @_;
21              
22             confess( "The dbh should be defined" )
23 0 0         unless $args{dbh};
24              
25 0   0       $args{size} ||= 100;
26 0   0       $args{block_signals} ||= $BlockSignals;
27 0   0       $args{max_repeated_deadlocks} ||= 5;
28 0           $args{_amnt_nested_signals} = 0;
29 0           $args{_saved_signal_masks} = {};
30 0           $args{pool} = [];
31 0           $args{amount_deadlocks} = 0;
32              
33 0           $args{repeated_deadlocks} = 0;
34              
35 0   0       bless \%args, ref $class || $class;
36             }
37              
38             sub DESTROY {
39 0     0     local $@;
40              
41             $_[0]->finish
42 0 0         unless ( $_[0]->{repeated_deadlocks} >= $_[0]->{max_repeated_deadlocks} );
43             }
44              
45             sub txn_item (&@) {
46 0     0 1   __PACKAGE__->new( %{ __make_chain( 'item_callback', @_ ) } );
  0            
47             }
48              
49             sub txn_post_item (&@) {
50 0     0 1   __make_chain( 'post_item_callback', @_ );
51             }
52              
53             sub txn_commit (&@) {
54 0     0 1   __make_chain( 'commit_callback', @_ );
55             }
56              
57             sub txn_sort (&@) {
58 0     0 1   my $ret = __make_chain( 'sort_callback', @_ );
59 0           $ret->{sort_callback_package} = caller;
60 0           $ret;
61             }
62              
63             sub __make_chain {
64 0     0     my $cb_name = shift;
65 0           my $cb_func = shift;
66 0           my $ret;
67              
68 0 0         ( $ret = ref $_[0] eq 'HASH' ? $_[0] : { @_ } )->{ $cb_name } = $cb_func;
69 0           $ret;
70             }
71              
72 0     0 1   sub dbh { $_[0]->{dbh} }
73              
74             sub add {
75 0     0 1   my ( $self, $data ) = @_;
76              
77             confess "assert: _amnt_nested_signals is not zero!"
78 0 0         if $self->{_amnt_nested_signals};
79              
80             try {
81 0     0     push @{ $self->{pool} }, $data;
  0            
82              
83 0 0         if ( ! $self->{sort_callback} ) {
84 0           $self->start_txn;
85             $self->_safe_signals( sub {
86 0           local $_ = $data;
87 0           $self->{item_callback}->( $self, $data );
88 0           } );
89             }
90             }
91             catch {
92 0     0     $self->_check_deadlock( $_ );
93 0           };
94              
95             $self->finish
96 0 0         if ( @{ $self->{pool} } >= $self->{size} );
  0            
97              
98             confess "assert: _amnt_nested_signals is not zero!"
99 0 0         if $self->{_amnt_nested_signals};
100             }
101              
102             sub _check_deadlock {
103 0     0     my ( $self, $error ) = @_;
104              
105 0           my $dbi_error = $DBI::err;
106              
107 0           $self->rollback_txn;
108              
109             # For example codes: https://dev.mysql.com/doc/refman/5.5/en/error-messages-server.html
110             # MySQL codes 1213 & 1205 are reasons to redo transaction again
111             # For other SQL engines i don't know codes - patches are walcome! ;-) [https://github.com/Perlover/DBIx-TxnPool]
112 0 0 0       if ( defined $dbi_error && ( $dbi_error == 1213 || $dbi_error == 1205 ) ) {
      0        
113 0           $self->{amount_deadlocks}++;
114 0 0         if ( $self->{repeated_deadlocks} >= $self->{max_repeated_deadlocks} ) {
115 0           $self->{pool} = []; # If DESTROY calls finish() there will not problems
116 0           confess( "limit ($self->{repeated_deadlocks}) of deadlock resolvings" )
117             }
118             else {
119 0           $self->play_pool;
120             }
121             } else {
122             # Fatal error - may be bad SQL statement - finish
123 0           $self->{pool} = []; # If DESTROY calls finish() there will not problems
124 0           confess( "error in item callback ($error)" );
125             }
126             }
127              
128             sub play_pool {
129 0     0 0   my $self = shift;
130              
131 0           $self->start_txn;
132              
133             $self->_safe_signals( sub {
134 0     0     select( undef, undef, undef, 0.5 * ++$self->{repeated_deadlocks} );
135 0           } );
136              
137             try {
138 0     0     foreach my $data ( @{ $self->{pool} } ) {
  0            
139             $self->_safe_signals( sub {
140 0           local $_ = $data;
141 0           $self->{item_callback}->( $self, $data );
142 0           } );
143             }
144             }
145             catch {
146 0     0     $self->_check_deadlock( $_ );
147 0           };
148             }
149              
150             sub finish {
151 0     0 1   my $self = shift;
152              
153             confess "assert: _amnt_nested_signals is not zero!"
154 0 0         if $self->{_amnt_nested_signals};
155              
156 0 0 0       if ( $self->{sort_callback} && @{ $self->{pool} } ) {
  0            
157 1     1   5 no strict 'refs';
  1         2  
  1         704  
158 0           local *a = *{"$self->{sort_callback_package}\::a"};
  0            
159 0           local *b = *{"$self->{sort_callback_package}\::b"};
  0            
160              
161 0           $self->{pool} = [ sort { $self->{sort_callback}->() } ( @{ $self->{pool} } ) ];
  0            
  0            
162              
163 0           $self->play_pool;
164             }
165              
166 0           $self->commit_txn;
167              
168 0 0         if ( exists $self->{post_item_callback} ) {
169 0           foreach my $data ( @{ $self->{pool} } ) {
  0            
170 0           local $_ = $data;
171 0           $self->{post_item_callback}->( $self, $data );
172             }
173             }
174              
175 0           $self->{pool} = [];
176              
177             confess "assert: _amnt_nested_signals is not zero!"
178 0 0         if $self->{_amnt_nested_signals};
179             }
180              
181             sub start_txn {
182 0     0 0   my $self = shift;
183              
184 0 0         if ( ! $self->{in_txn} ) {
185             $self->_safe_signals( sub {
186 0 0   0     $self->{dbh}->begin_work or confess 'DBI error: ' . $self->{dbh}->errstr;
187 0           $self->{in_txn} = 1;
188 0           } );
189             }
190             }
191              
192             sub rollback_txn {
193 0     0 0   my $self = shift;
194              
195 0 0         if ( $self->{in_txn} ) {
196             $self->_safe_signals( sub {
197 0 0   0     $self->{dbh}->rollback or confess 'DBI error: ' . $self->{dbh}->errstr;
198 0           $self->{in_txn} = undef;
199 0           } );
200             }
201             }
202              
203             sub commit_txn {
204 0     0 0   my $self = shift;
205              
206 0 0         if ( $self->{in_txn} ) {
207             try {
208             $self->_safe_signals( sub {
209 0 0         $self->{dbh}->commit or confess 'DBI error: ' . $self->{dbh}->errstr;
210 0           $self->{in_txn} = undef;
211 0     0     } );
212 0           1;
213             }
214             catch {
215 0     0     $self->_check_deadlock( $_ );
216 0           $self->commit_txn;
217 0           0;
218 0 0         } or return;
219              
220 0           $self->{repeated_deadlocks} = 0;
221             $self->{commit_callback}->( $self )
222 0 0         if exists $self->{commit_callback};
223             }
224             }
225              
226 0     0 1   sub amount_deadlocks { $_[0]->{amount_deadlocks} }
227              
228             sub _safe_signals {
229 0     0     my ( $self, $code ) = @_;
230              
231 0 0         if ( ! $self->{_amnt_nested_signals}++ ) {
232 0           for ( @{ $self->{block_signals} } ) {
  0            
233 0           $self->{_saved_signal_masks}{ $_ } = $Signal::Mask{ $_ };
234 0           $Signal::Mask{ $_ } = 1;
235             }
236             }
237             try {
238 0     0     $code->();
239             }
240             catch {
241 0     0     die $_;
242             }
243             finally {
244 0 0   0     if ( ! --$self->{_amnt_nested_signals} ) {
245 0           for ( @{ $self->{block_signals} } ) {
  0            
246 0           $Signal::Mask{ $_ } = delete $self->{_saved_signal_masks}{ $_ };
247             }
248             }
249 0           };
250             }
251              
252             1;
253              
254             __END__