File Coverage

blib/lib/Data/RecordStore.pm
Criterion Covered Total %
statement 334 346 96.5
branch 87 100 87.0
condition 11 12 91.6
subroutine 42 42 100.0
pod 19 21 90.4
total 493 521 94.6


line stmt bran cond sub pod time code
1             package Data::RecordStore;
2              
3             #######################################################################
4             # The RecordStore is an index-value store that is composed of fixed #
5             # sized data silos to store its data, transaction info and #
6             # indexes. The silos are primarily binary data files so finding a #
7             # record in them is as easy as multiplying the record size times #
8             # the index and performing a seek on the silo's file handle. #
9             # #
10             # Its primary subs are stow and fetch. Their description should #
11             # give a good feel for how the record store works in general. #
12             # #
13             # fetch : #
14             # When fetch is called for a id, it uses its index silo to look #
15             # up which fixed record silo the data for that id is stored in #
16             # and what silo index it has. #
17             # The store then accesses that silo and gets the data it needs. #
18             # Silos use pack and unpack to store and retreive data. #
19             # #
20             # stow : #
21             # When stow is called with data and an id, it picks a data #
22             # silo based on the size of the data. The silos are numbered #
23             # starting with 12. The size of the silo is 2^N where N is #
24             # the number of the silo. The smallest silo by default is 4096 #
25             # bytes. #
26             #######################################################################
27              
28              
29             #
30             # The original that adheres the record store interface.
31             # namely
32             # * open_store
33             #
34             # * stow
35             # * fetch
36             # * delete_record
37             # * next_id
38             # * last_updated
39             #
40             # * highest_entry_id
41             #
42             # * lock
43             # * unlock
44             #
45             # * use_transaction
46             # * commit_transaction
47             # * rollback_transaction
48             # * list_transactions
49             # * empty
50             #
51              
52 9     9   928601 use strict;
  9         32  
  9         259  
53 9     9   46 use warnings;
  9         18  
  9         257  
54 9     9   44 no warnings 'numeric';
  9         18  
  9         308  
55 9     9   45 no warnings 'uninitialized';
  9         26  
  9         240  
56              
57 9     9   68 use Fcntl qw( :flock SEEK_SET );
  9         19  
  9         932  
58 9     9   55 use File::Path qw(make_path);
  9         18  
  9         647  
59 9     9   676 use Data::Dumper;
  9         6777  
  9         399  
60 9     9   4298 use YAML;
  9         62276  
  9         457  
61              
62 9     9   4321 use Data::RecordStore::Silo;
  9         19  
  9         262  
63 9     9   3917 use Data::RecordStore::Transaction;
  9         35  
  9         277  
64              
65 9     9   63 use vars qw($VERSION);
  9         18  
  9         765  
66              
67             $VERSION = '6.06';
68             my $SILO_VERSION = '6.00';
69              
70             use constant {
71             # record state
72 9         4393 RS_ACTIVE => 1,
73             RS_DEAD => 2,
74             RS_IN_TRANSACTION => 3,
75              
76             TR_ACTIVE => 1,
77             TR_IN_COMMIT => 2,
78             TR_IN_ROLLBACK => 3,
79             TR_COMPLETE => 4,
80              
81             DIRECTORY => 0,
82             ACTIVE_TRANS_FILE => 1,
83             MAX_FILE_SIZE => 2,
84             MIN_SILO_ID => 3,
85             INDEX_SILO => 4,
86             SILOS => 5,
87             TRANSACTION_INDEX_SILO => 6,
88             TRANSACTION => 7,
89             HEADER_SIZE => 8,
90             LOCK_FH => 9,
91             LOCKS => 10,
92             MAX_SILO_ID => 11,
93 9     9   63 };
  9         17  
94             #
95             # On disc block size : If the reads align with block sizes, the reads will go faster
96             # however, there are some annoying things about finding out what that blocksize is.
97             # (spoiler: it is currently either 4096 or 512 bytes and probably the former)
98             #
99             # ways to determine block size :
100             # sudo blockdev -getbsz /dev/sda1
101             # lsblk -o NAME,MOUNTPOINT,FSTYPE,TYPE,SIZE,STATE,DISC-MAX,DISC-GRAN,PHY-SEC,MIN-IO,RQ-SIZE
102             # and they disagree :(
103             # using 4096 in any case. It won't be slower, though the space required could be 8 times are large.
104             #
105             # note: this is only for the records, not for any indexes.
106             #
107              
108             sub reopen_store {
109 36     36 1 123708 my( $cls, $dir ) = @_;
110 36         230 my $cfgfile = "$dir/config.yaml";
111 36 100       930 if( -e $cfgfile ) {
112 32         201 my $lockfile = "$dir/LOCK";
113 32 50       508 die "no lock file found" unless -e $lockfile;
114 32 50       1986 open my $lock_fh, '+<', $lockfile or die "$@ $!";
115 32         284561 flock( $lock_fh, LOCK_EX );
116              
117 32         253 my $lock_dir = "$dir/user_locks";
118 32         114 my $trans_dir = "$dir/transactions";
119 32 50       445 die "locks directory not found" unless -d $lock_dir;
120 32 50       366 die "transaction directory not found" unless -d $trans_dir;
121            
122 32         536 my $cfg = YAML::LoadFile( $cfgfile );
123              
124 32         255402 my $index_silo = Data::RecordStore::Silo->reopen_silo( "$dir/index_silo" );
125 32         199 my $transaction_index_silo = Data::RecordStore::Silo->reopen_silo( "$dir/transaction_index_silo" );
126              
127 32         104 my $max_file_size = $cfg->{MAX_FILE_SIZE};
128 32         91 my $min_file_size = $cfg->{MIN_FILE_SIZE};
129 32         404 my $max_silo_id = int( log( $max_file_size ) / log( 2 ));
130 32 50       234 $max_silo_id++ if 2 ** $max_silo_id < $max_file_size;
131 32         117 my $min_silo_id = int( log( $min_file_size ) / log( 2 ));
132 32 100       130 $min_silo_id++ if 2 ** $min_silo_id < $min_file_size;
133              
134 32         98 my $silo_dir = "$dir/data_silos";
135 32         85 my $silos = [];
136 32         226 for my $silo_id ($min_silo_id..$max_silo_id) {
137 672         3366 $silos->[$silo_id] = Data::RecordStore::Silo->reopen_silo( "$silo_dir/$silo_id" );
138             }
139              
140 32         131 my $header = pack( 'ILL', 1,2,3 );
141 9     9   71 my $header_size = do { use bytes; length( $header ) };
  9         18  
  9         36  
  32         89  
  32         98  
142              
143 32         340 my $store = bless [
144             $dir,
145             "$dir/ACTIVE_TRANS",
146             $max_file_size,
147             $min_silo_id,
148             $index_silo,
149             $silos,
150             $transaction_index_silo,
151             undef,
152             $header_size, # the ILL from ILLa*
153             $lock_fh,
154             [],
155             $max_silo_id,
156             ], $cls;
157 32         362 $store->_fix_transactions;
158 32         33632 flock( $lock_fh, LOCK_UN );
159 32         2424 return $store;
160             }
161 4         81 die "could not find record store in $dir";
162             } #reopen_store
163              
164             sub open_store {
165 229     229 1 9447968 my( $cls, @options ) = @_;
166 229 100       1123 if( @options == 1 ) {
167 55         178 unshift @options, 'BASE_PATH';
168             }
169 229         1002 my( %options ) = @options;
170 229         600 my $dir = $options{BASE_PATH};
171 229 100       2967 unless( -d $dir ) {
172 14         98 _make_path( $dir, 'base' );
173             }
174 229         747 my $max_file_size = $options{MAX_FILE_SIZE};
175 229 100       712 $max_file_size = $Data::RecordStore::Silo::DEFAULT_MAX_FILE_SIZE unless $max_file_size;
176            
177 229         420 my $min_file_size = $options{MIN_FILE_SIZE};
178 229 100       583 $min_file_size = $Data::RecordStore::Silo::DEFAULT_MIN_FILE_SIZE unless $min_file_size;
179 229 100       1058 if( $min_file_size > $max_file_size ) {
180 16         192 die "MIN_FILE_SIZE cannot be more than MAX_FILE_SIZE";
181             }
182 213         1000 my $max_silo_id = int( log( $max_file_size ) / log( 2 ));
183 213 100       771 $max_silo_id++ if 2 ** $max_silo_id < $max_file_size;
184 213         591 my $min_silo_id = int( log( $min_file_size ) / log( 2 ));
185 213 100       634 $min_silo_id++ if 2 ** $min_silo_id < $min_file_size;
186 213         620 my $lockfile = "$dir/LOCK";
187 213         401 my $lock_fh;
188              
189 213         417 my $silo_dir = "$dir/data_silos";
190 213         378 my $lock_dir = "$dir/user_locks";
191 213         396 my $trans_dir = "$dir/transactions";
192            
193 213 100       3587 if( -e $lockfile ) {
194 55 50       1792 open $lock_fh, '+<', $lockfile or die "$@ $!";
195 55         737 flock( $lock_fh, LOCK_EX );
196             }
197             else {
198 158 50       8541 open $lock_fh, '>', $lockfile or die "$@ $!";
199 158         1622 flock( $lock_fh, LOCK_EX );
200 158         1455 $lock_fh->autoflush(1);
201 158         13746 print $lock_fh "LOCK\n";
202            
203 158         756 my $vers_file = "$dir/VERSION";
204 158 50       2789 if( -e $vers_file ) {
205 0         0 die "Aborting open : lock file did not exist but version file did. This may mean a partial store was in here at some point in $dir.";
206             }
207            
208 158         856 _make_path( $silo_dir, 'silo' );
209 158         499 _make_path( $lock_dir, 'lock' );
210 158         456 _make_path( $trans_dir, 'transaction' );
211            
212 158         8831 open my $out, '>', "$dir/config.yaml";
213 158         1987 print $out <<"END";
214             VERSION: $VERSION
215             MAX_FILE_SIZE: $max_file_size
216             MIN_FILE_SIZE: $min_file_size
217             END
218 158         4089 close $out;
219            
220 158         7054 open my $vers_fh, '>', $vers_file;
221 158         1040 print $vers_fh "$VERSION\n";
222 158         3988 close $vers_fh;
223             }
224              
225 213         2818 my $index_silo = Data::RecordStore::Silo->open_silo( "$dir/index_silo",
226             "ILL", #silo id, id in silo, last updated time
227             0,
228             $max_file_size );
229 213         1320 my $transaction_index_silo = Data::RecordStore::Silo->open_silo( "$dir/transaction_index_silo",
230             "IL", #state, time
231             0,
232             $max_file_size );
233              
234 213         918 my $silos = [];
235              
236 213         833 for my $silo_id ($min_silo_id..$max_silo_id) {
237 4004         21121 $silos->[$silo_id] = Data::RecordStore::Silo->open_silo( "$silo_dir/$silo_id",
238             'ILLa*', # status, id, data-length, data
239             2 ** $silo_id,
240             $max_file_size );
241             }
242 213         552 my $header = pack( 'ILL', 1,2,3 );
243 9     9   5834 my $header_size = do { use bytes; length( $header ) };
  9         26  
  9         36  
  213         359  
  213         483  
244              
245 213         1294 my $store = bless [
246             $dir,
247             "$dir/ACTIVE_TRANS",
248             $max_file_size,
249             $min_silo_id,
250             $index_silo,
251             $silos,
252             $transaction_index_silo,
253             undef,
254             $header_size, # the ILL from ILLa*
255             $lock_fh,
256             [],
257             $max_silo_id,
258             ], $cls;
259 213         949 $store->_fix_transactions;
260 213         1696 flock( $lock_fh, LOCK_UN );
261 213         7538 return $store;
262             } #open_store
263              
264             sub fetch {
265 803     803 1 659844 my( $self, $id, $no_trans ) = @_;
266 803         1533 my $trans = $self->[TRANSACTION];
267 803 100 100     2492 if( $trans && ! $no_trans ) {
268 169 100       469 if( $trans->{state} != TR_ACTIVE ) {
269 16         144 die "Transaction is in a bad state. Cannot fetch";
270             }
271 153         550 return $trans->fetch( $id );
272             }
273              
274 634         1751 $self->_read_lock;
275              
276 634 100       1911 if( $id > $self->entry_count ) {
277 1         7 return undef;
278             }
279              
280 633         1161 my( $silo_id, $id_in_silo ) = @{$self->[INDEX_SILO]->get_record($id)};
  633         1951  
281 633 100       2891 if( $silo_id ) {
282 536         1794 my $ret = $self->[SILOS]->[$silo_id]->get_record( $id_in_silo );
283 536         19759 $self->_unlock;
284 536         63595 return substr( $ret->[3], 0, $ret->[2] );
285             }
286              
287 97         348 $self->_unlock;
288 97         698 return undef;
289             } #fetch
290              
291             sub stow {
292 824     824 1 233682 my $self = $_[0];
293 824         1522 my $id = $_[2];
294              
295 824         1425 my $trans = $self->[TRANSACTION];
296 824 100       1838 if( $trans ) {
297 263         1129 return $trans->stow( $_[1], $id );
298             }
299              
300 561         888 my $index = $self->[INDEX_SILO];
301 561         1515 $self->_write_lock;
302              
303 561         2174 $index->ensure_entry_count( $id );
304 561 100 100     1702 if( defined $id && $id < 1 ) {
305 2         25 die "The id must be a supplied as a positive integer";
306             }
307 559         1006 my( $old_silo_id, $old_id_in_silo );
308 559 100       1358 if( $id > 0 ) {
309 49         98 ( $old_silo_id, $old_id_in_silo ) = @{$index->get_record($id)};
  49         171  
310             }
311             else {
312 510         1347 $id = $index->next_id;
313             }
314              
315 9     9   4241 my $data_write_size = do { use bytes; length $_[1] };
  9         26  
  9         35  
  558         925  
  558         1135  
316 558         1675 my $new_silo_id = $self->silo_id_for_size( $data_write_size );
317 558         1063 my $new_silo = $self->[SILOS][$new_silo_id];
318              
319 558         2367 my $new_id_in_silo = $new_silo->push( [RS_ACTIVE, $id, $data_write_size, $_[1]] );
320              
321 558         3020 $index->put_record( $id, [$new_silo_id,$new_id_in_silo,time] );
322              
323 558 100       2083 if( $old_silo_id ) {
324 20         95 $self->_vacate( $old_silo_id, $old_id_in_silo );
325             }
326              
327 558         1828 $self->_unlock;
328 558         3148 return $id;
329             } #stow
330              
331             sub next_id {
332 193     193 1 110411 return shift->[INDEX_SILO]->next_id;
333             } #next_id
334              
335             sub delete_record {
336 138     138 1 880 my( $self, $del_id ) = @_;
337 138         751 $self->_write_lock;
338 138         269 my $trans = $self->[TRANSACTION];
339 138 100       639 if( $trans ) {
340 72         280 $self->_unlock;
341 72         400 return $trans->delete_record( $del_id );
342             }
343              
344 66 100       231 if( $del_id > $self->[INDEX_SILO]->entry_count ) {
345 1         11 warn "Tried to delete past end of records";
346 1         12 $self->_unlock;
347 1         4 return undef;
348             }
349 65         123 my( $old_silo_id, $old_id_in_silo ) = @{$self->[INDEX_SILO]->get_record($del_id)};
  65         220  
350 65         472 $self->[INDEX_SILO]->put_record( $del_id, [0,0,time] );
351              
352 65 100       277 if( $old_silo_id ) {
353 57         420 $self->_vacate( $old_silo_id, $old_id_in_silo );
354             }
355 65         220 $self->_unlock;
356             } #delete_record
357              
358             # locks the given lock names
359             # they are locked in order to prevent deadlocks.
360             sub lock {
361 42     42 1 4346 my( $self, @locknames ) = @_;
362              
363 42         91 my( %previously_locked ) = ( map { $_ => 1 } @{$self->[LOCKS]} );
  27         91  
  42         144  
364              
365 42 100 66     84 if( @{$self->[LOCKS]} && grep { ! $previously_locked{$_} } @locknames ) {
  42         237  
  9         71  
366 9         101 die "Data::RecordStore->lock cannot be called twice in a row without unlocking between";
367             }
368 33         86 my $fhs = [];
369              
370 33         60 my $failed;
371              
372 33         170 for my $name (sort @locknames) {
373 70 100       324 next if $previously_locked{$name}++;
374 61         190 my $lockfile = "$self->[DIRECTORY]/user_locks/$name";
375 61         89 my $fh;
376 61 100       1096 if( -e $lockfile ) {
377 11 50       409 unless( open ( $fh, '+<', $lockfile ) ) {
378 0         0 $failed = 1;
379 0         0 last;
380             }
381 11         179844 flock( $fh, LOCK_EX ); #WRITE LOCK
382             }
383             else {
384 50 50       2580 unless( open( $fh, '>', $lockfile ) ) {
385 0         0 $failed = 1;
386 0         0 last;
387             }
388 50         490 flock( $fh, LOCK_EX ); #WRITE LOCK
389 50         396 $fh->autoflush(1);
390 50         2417 print $fh '';
391             }
392 61         228 push @$fhs, $fh;
393             }
394              
395 33 50       119 if( $failed ) {
396             # it must be able to lock all the locks or it fails
397             # if it failed, unlock any locks it managed to get
398 0         0 for my $fh (@$fhs) {
399 0         0 flock( $fh, LOCK_UN );
400             }
401 0         0 die "Data::RecordStore->lock : lock failed";
402             } else {
403 33         176 $self->[LOCKS] = $fhs;
404             }
405              
406             } #lock
407              
408             # unlocks all locks
409             sub unlock {
410 35     35 1 220486 my $self = shift;
411 35         81 my $fhs = $self->[LOCKS];
412              
413 35         196 for my $fh (@$fhs) {
414 60         521 flock( $fh, LOCK_UN );
415             }
416 35         605 @$fhs = ();
417             } #unlock
418              
419             sub use_transaction {
420 94     94 1 29555 my $self = shift;
421 94 100       379 if( $self->[TRANSACTION] ) {
422 24         984 warn __PACKAGE__."->use_transaction : already in transaction";
423 24         128 return $self->[TRANSACTION];
424             }
425 70         271 $self->_write_lock;
426 70         511 my $tid = $self->[TRANSACTION_INDEX_SILO]->push( [TR_ACTIVE, time] );
427 70         337 $self->_unlock;
428 70         394 my $tdir = "$self->[DIRECTORY]/transactions/$tid";
429 70         9272 make_path( $tdir, { error => \my $err } );
430 70 50       497 if( @$err ) { die join( ", ", map { values %$_ } @$err ) }
  0         0  
  0         0  
431              
432 70         907 $self->[TRANSACTION] = Data::RecordStore::Transaction->create( $self, $tdir, $tid );
433 70         229 return $self->[TRANSACTION];
434             } #use_transaction
435              
436             sub commit_transaction {
437 54     54 1 20522 my $self = shift;
438 54         173 $self->_write_lock;
439 54         102 my $trans = $self->[TRANSACTION];
440 54 100       239 unless( $trans ) {
441 8         80 die __PACKAGE__."->commit_transaction : no transaction to commit";
442             }
443 46         104 my $trans_file = $self->[ACTIVE_TRANS_FILE];
444 46         2953 open my $trans_fh, '>', $trans_file;
445 46         614 print $trans_fh " ";
446 46         1268 close $trans_fh;
447              
448 46         411 $trans->commit;
449 30         97 delete $self->[TRANSACTION];
450 30         1559 unlink $trans_file;
451 30         170 $self->_unlock;
452             } #commit_transaction
453              
454             sub rollback_transaction {
455 32     32 1 4744 my $self = shift;
456 32         112 $self->_write_lock;
457 32         112 my $trans = $self->[TRANSACTION];
458 32 100       120 unless( $trans ) {
459 8         152 die __PACKAGE__."->rollback_transaction : no transaction to roll back";
460             }
461 24         56 my $trans_file = $self->[ACTIVE_TRANS_FILE];
462 24         1528 open my $trans_fh, '>', $trans_file;
463 24         280 print $trans_fh " ";
464 24         1144 close $trans_fh;
465 24         200 $trans->rollback;
466 16         56 delete $self->[TRANSACTION];
467 16         704 unlink $trans_file;
468 16         96 $self->_unlock;
469             } #rollback_transaction
470              
471             sub entry_count {
472 828     828 1 3576 return shift->[INDEX_SILO]->entry_count;
473             }
474              
475             sub index_silo {
476 519     519 1 2675 return shift->[INDEX_SILO];
477             }
478              
479             sub max_file_size {
480 86     86 1 684 return shift->[MAX_FILE_SIZE];
481             }
482              
483             sub silos {
484 574     574 1 24081 return [@{shift->[SILOS]}];
  574         3023  
485             }
486              
487             sub transaction_silo {
488 1938     1938 1 18723 return shift->[TRANSACTION_INDEX_SILO];
489             }
490              
491             sub silos_entry_count {
492 112     112 1 336 my $self = shift;
493 112         272 my $silos = $self->silos;
494 112         200 my $count = 0;
495 112         240 for my $silo (grep {defined} @$silos) {
  3584         5384  
496 2240         5880 $count += $silo->entry_count;
497             }
498 112         848 return $count;
499             }
500              
501             sub record_count {
502 8     8 0 128 goto &active_entry_count;
503             }
504              
505             sub active_entry_count {
506 64     64 1 160 my $self = shift;
507 64         144 my $index = $self->index_silo;
508 64         96 my $count = 0;
509 64         152 for(1..$self->entry_count) {
510 312         536 my( $silo_id ) = @{$index->get_record( $_ )};
  312         792  
511 312 100       1176 ++$count if $silo_id;
512             }
513 64         392 return $count;
514             }
515              
516             sub detect_version {
517 15     15 1 4865 my( $cls, $dir ) = @_;
518 15         58 my $ver_file = "$dir/VERSION";
519 15         31 my $source_version;
520 15 100       254 if ( -e $ver_file ) {
    100          
521 13         422 open( my $FH, "<", $ver_file );
522 13         212 $source_version = <$FH>;
523 13         54 chomp $source_version;
524 13         133 close $FH;
525             } elsif( -e "$dir/STORE_INDEX" ) {
526 1         6 return 1;
527             }
528 14         95 return $source_version;
529             } #detect_version
530              
531              
532              
533             sub _vacate {
534 205     205   509 my( $self, $silo_id, $id_to_empty ) = @_;
535 205         421 my $silo = $self->[SILOS][$silo_id];
536 205         570 my $rc = $silo->entry_count;
537 205 100       591 if( $id_to_empty == $rc ) {
538 118         439 $silo->pop;
539             } else {
540 87         334 while( $rc > $id_to_empty ) {
541 63         97 my( $state, $id ) = (@{$silo->get_record( $rc, 'IL' )});
  63         216  
542 63 100       310 if( $state == RS_ACTIVE ) {
    100          
543 47         259 $silo->copy_record($rc,$id_to_empty);
544 47         262 $self->[INDEX_SILO]->put_record( $id, [$silo_id,$id_to_empty], "IL" );
545 47         245 $silo->pop;
546 47         168 return;
547             }
548             elsif( $state == RS_DEAD ) {
549 8         56 $silo->pop;
550             }
551             else {
552 8         32 return;
553             }
554 8         48 $rc--;
555             }
556             }
557             } #_vacate
558              
559             sub silo_id_for_size {
560 821     821 0 1928 my( $self, $data_write_size ) = @_;
561              
562 821         1556 my $write_size = $self->[HEADER_SIZE] + $data_write_size;
563              
564 821         3045 my $silo_id = int( log( $write_size ) / log( 2 ) );
565 821 100       2336 $silo_id++ if 2 ** $silo_id < $write_size;
566 821 100       1930 $silo_id = $self->[MIN_SILO_ID] if $silo_id < $self->[MIN_SILO_ID];
567 821         1652 return $silo_id;
568             } #silo_id_for_size
569              
570             # ---------------------- private stuffs -------------------------
571              
572             sub _make_path {
573 488     488   1395 my( $dir, $msg ) = @_;
574 488         64018 make_path( $dir, { error => \my $err } );
575 488 50       3157 if( @$err ) {
576 0         0 die "unable to make $msg directory.". join( ", ", map { $_->{$dir} } @$err );
  0         0  
577             }
578             }
579              
580             sub _read_lock {
581 634     634   970 my $self = shift;
582 634         52621 flock( $self->[LOCK_FH], LOCK_SH );
583 634         2064 $self->_fix_transactions;
584             }
585              
586             sub _unlock {
587 1469     1469   3156 my( $self ) = @_;
588 1469         12903 flock( $self->[LOCK_FH], LOCK_UN );
589             }
590              
591             sub _write_lock {
592 855     855   1442 my $self = shift;
593 855         325335 flock( $self->[LOCK_FH], LOCK_EX );
594 855         2882 $self->_fix_transactions;
595             }
596              
597             sub _fix_transactions {
598 1734     1734   2985 my $self = shift;
599             # check the transactions
600             # if the transaction is in an incomplete state, fix it. Since the store is write locked
601             # during transactions, the lock has expired if this point has been reached.
602             # that means the process that made the lock has fallen.
603             #
604             # of course, do a little experiment to test this with two processes and flock when
605             # one exits before unflocking.
606             #
607 1734         3613 my $transaction_index_silo = $self->transaction_silo;
608 1734         5867 my $last_trans = $transaction_index_silo->entry_count;
609 1734         5443 while( $last_trans ) {
610 333         652 my( $state ) = @{$transaction_index_silo->get_record( $last_trans )};
  333         1336  
611 333         1984 my $tdir = "$self->[DIRECTORY]/transactions/$last_trans";
612 333 100 100     1809 if( $state == TR_IN_ROLLBACK ||
    100          
613             $state == TR_IN_COMMIT ) {
614             # do a full rollback
615             # load the transaction
616 16         144 my $trans = Data::RecordStore::Transaction->create( $self, $tdir, $last_trans );
617 16         72 $trans->rollback;
618 16         344 $transaction_index_silo->pop;
619             }
620             elsif( $state == TR_COMPLETE ) {
621 43         215 $transaction_index_silo->pop;
622             }
623             else {
624 274         802 return;
625             }
626 59         320 $last_trans--;
627             }
628              
629             } #_fix_transactions
630              
631              
632             "I became Iggy because I had a sadistic boss at a record store. I'd been in a band called the Iguanas. And when this boss wanted to embarrass and demean me, he'd say, 'Iggy, get me a coffee, light.' - Iggy Pop";
633              
634             __END__