File Coverage

blib/lib/DBIx/TxnPool.pm
Criterion Covered Total %
statement 24 136 17.6
branch 0 42 0.0
condition 0 13 0.0
subroutine 8 38 21.0
pod 8 13 61.5
total 40 242 16.5


line stmt bran cond sub pod time code
1             package DBIx::TxnPool;
2              
3 1     1   46438 use strict;
  1         3  
  1         42  
4 1     1   5 use warnings;
  1         2  
  1         36  
5 1     1   6 use Exporter 5.57 qw( import );
  1         50  
  1         42  
6              
7 1     1   1042 use Try::Tiny;
  1         1715  
  1         72  
8 1     1   979 use Signal::Mask;
  1         33583  
  1         47  
9 1     1   8 use Carp qw( confess croak );
  1         2  
  1         269  
10              
11             our $VERSION = 0.10;
12             our $BlockSignals = [ qw( TERM INT ) ];
13             our @EXPORT = qw( txn_item txn_post_item txn_commit txn_sort );
14              
15             # It's better to look for the "try restarting transaction" string
16             # because sometime may be happens other error: Lock wait timeout exceeded
17 1     1   6 use constant DEADLOCK_REGEXP => qr/try restarting transaction/o;
  1         2  
  1         1301  
18              
19             sub new {
20 0     0 0   my ( $class, %args ) = @_;
21              
22 0 0         croak( __PACKAGE__ . ": the dbh should be defined" )
23             unless $args{dbh};
24              
25 0   0       $args{size} ||= 100;
26 0   0       $args{block_signals} ||= $BlockSignals;
27 0   0       $args{max_repeated_deadlocks} ||= 5;
28 0           $args{_amnt_nested_signals} = 0;
29 0           $args{_saved_signal_masks} = {};
30 0           $args{pool} = [];
31 0           $args{amount_deadlocks} = 0;
32              
33 0   0       bless \%args, ref $class || $class;
34             }
35              
36             sub DESTROY {
37 0     0     $_[0]->finish;
38             }
39              
40             sub txn_item (&@) {
41 0     0 1   __PACKAGE__->new( %{ __make_chain( 'item_callback', @_ ) } );
  0            
42             }
43              
44             sub txn_post_item (&@) {
45 0     0 1   __make_chain( 'post_item_callback', @_ );
46             }
47              
48             sub txn_commit (&@) {
49 0     0 1   __make_chain( 'commit_callback', @_ );
50             }
51              
52             sub txn_sort (&@) {
53 0     0 1   my $ret = __make_chain( 'sort_callback', @_ );
54 0           $ret->{sort_callback_package} = caller;
55 0           $ret;
56             }
57              
58             sub __make_chain {
59 0     0     my $cb_name = shift;
60 0           my $cb_func = shift;
61 0           my $ret;
62              
63 0 0         ( $ret = ref $_[0] eq 'HASH' ? $_[0] : { @_ } )->{ $cb_name } = $cb_func;
64 0           $ret;
65             }
66              
67 0     0 1   sub dbh { $_[0]->{dbh} }
68              
69             sub add {
70 0     0 1   my ( $self, $data ) = @_;
71              
72 0           $self->{repeated_deadlocks} = 0;
73              
74 0 0         die "assert: _amnt_nested_signals is not zero!"
75             if $self->{_amnt_nested_signals};
76              
77             try {
78 0     0     push @{ $self->{pool} }, $data;
  0            
79              
80 0 0         if ( ! $self->{sort_callback} ) {
81 0           $self->start_txn;
82             $self->_safe_signals( sub {
83 0           local $_ = $data;
84 0           $self->{item_callback}->( $self, $data );
85 0           } );
86             }
87             }
88             catch {
89 0     0     $self->_check_deadlock( $_ );
90 0           };
91              
92 0           $self->finish
93 0 0         if ( @{ $self->{pool} } >= $self->{size} );
94              
95 0 0         die "assert: _amnt_nested_signals is not zero!"
96             if $self->{_amnt_nested_signals};
97             }
98              
99             sub _check_deadlock {
100 0     0     my ( $self, $error ) = @_;
101              
102 0           $self->rollback_txn;
103              
104 0 0         if ( $error =~ DEADLOCK_REGEXP ) {
105 0           $self->{amount_deadlocks}++;
106 0 0         if ( $self->{repeated_deadlocks} >= $self->{max_repeated_deadlocks} ) {
107 0           confess( "limit ($self->{repeated_deadlocks}) of deadlock resolvings" )
108             }
109             else {
110 0           $self->play_pool
111             }
112             } else {
113             # Fatal error - may be bad SQL statement - finish
114 0           $self->{pool} = []; # If DESTROY calls finish() there will not problems
115 0           confess( "error in item callback ($error)" );
116             }
117             }
118              
119             sub play_pool {
120 0     0 0   my $self = shift;
121              
122 0           $self->start_txn;
123              
124             $self->_safe_signals( sub {
125 0     0     select( undef, undef, undef, 0.5 * ++$self->{repeated_deadlocks} );
126 0           } );
127              
128             try {
129 0     0     foreach my $data ( @{ $self->{pool} } ) {
  0            
130             $self->_safe_signals( sub {
131 0           local $_ = $data;
132 0           $self->{item_callback}->( $self, $data );
133 0           } );
134             }
135             }
136             catch {
137 0     0     $self->_check_deadlock( $_ );
138 0           };
139             }
140              
141             sub finish {
142 0     0 1   my $self = shift;
143              
144 0 0         die "assert: _amnt_nested_signals is not zero!"
145             if $self->{_amnt_nested_signals};
146              
147 0 0 0       if ( $self->{sort_callback} && @{ $self->{pool} } ) {
  0            
148 1     1   108 no strict 'refs';
  1         2  
  1         874  
149 0           local *a = *{"$self->{sort_callback_package}\::a"};
  0            
150 0           local *b = *{"$self->{sort_callback_package}\::b"};
  0            
151              
152 0           $self->{pool} = [ sort { $self->{sort_callback}->() } ( @{ $self->{pool} } ) ];
  0            
  0            
153              
154 0           $self->play_pool;
155             }
156              
157             try {
158 0     0     $self->{repeated_deadlocks} = 0;
159 0           $self->commit_txn;
160             }
161             catch {
162 0     0     $self->_check_deadlock( $_ );
163 0           };
164              
165 0 0         if ( exists $self->{post_item_callback} ) {
166 0           foreach my $data ( @{ $self->{pool} } ) {
  0            
167 0           local $_ = $data;
168 0           $self->{post_item_callback}->( $self, $data );
169             }
170             }
171              
172 0           $self->{pool} = [];
173              
174 0 0         die "assert: _amnt_nested_signals is not zero!"
175             if $self->{_amnt_nested_signals};
176             }
177              
178             sub start_txn {
179 0     0 0   my $self = shift;
180              
181 0 0         if ( ! $self->{in_txn} ) {
182             $self->_safe_signals( sub {
183 0 0   0     $self->{dbh}->begin_work or die $self->{dbh}->errstr;
184 0           $self->{in_txn} = 1;
185 0           } );
186             }
187             }
188              
189             sub rollback_txn {
190 0     0 0   my $self = shift;
191              
192 0 0         if ( $self->{in_txn} ) {
193             $self->_safe_signals( sub {
194 0 0   0     $self->{dbh}->rollback or die $self->{dbh}->errstr;
195 0           $self->{in_txn} = undef;
196 0           } );
197             }
198             }
199              
200             sub commit_txn {
201 0     0 0   my $self = shift;
202              
203 0 0         if ( $self->{in_txn} ) {
204             $self->_safe_signals( sub {
205 0 0   0     $self->{dbh}->commit or die $self->{dbh}->errstr;
206 0           $self->{in_txn} = undef;
207 0           } );
208 0 0         $self->{commit_callback}->( $self )
209             if exists $self->{commit_callback};
210             }
211             }
212              
213 0     0 1   sub amount_deadlocks { $_[0]->{amount_deadlocks} }
214              
215             sub _safe_signals {
216 0     0     my ( $self, $code ) = @_;
217              
218 0 0         if ( ! $self->{_amnt_nested_signals}++ ) {
219 0           for ( @{ $self->{block_signals} } ) {
  0            
220 0           $self->{_saved_signal_masks}{ $_ } = $Signal::Mask{ $_ };
221 0           $Signal::Mask{ $_ } = 1;
222             }
223             }
224             try {
225 0     0     $code->();
226             }
227             catch {
228 0     0     die $_;
229             }
230             finally {
231 0 0   0     if ( ! --$self->{_amnt_nested_signals} ) {
232 0           for ( @{ $self->{block_signals} } ) {
  0            
233 0           $Signal::Mask{ $_ } = delete $self->{_saved_signal_masks}{ $_ };
234             }
235             }
236 0           };
237             }
238              
239             1;
240              
241             __END__