File Coverage

blib/lib/Data/RecordStore.pm
Criterion Covered Total %
statement 334 346 96.5
branch 87 100 87.0
condition 11 12 91.6
subroutine 42 42 100.0
pod 19 21 90.4
total 493 521 94.6


line stmt bran cond sub pod time code
1             package Data::RecordStore;
2              
3             #######################################################################
4             # The RecordStore is an index-value store that is composed of fixed #
5             # sized data silos to store its data, transaction info and #
6             # indexes. The silos are primarily binary data files so finding a #
7             # record in them is as easy as multiplying the record size times #
8             # the index and performing a seek on the silo's file handle. #
9             # #
10             # Its primary subs are stow and fetch. Their description should #
11             # give a good feel for how the record store works in general. #
12             # #
13             # fetch : #
14             # When fetch is called for a id, it uses its index silo to look #
15             # up which fixed record silo the data for that id is stored in #
16             # and what silo index it has. #
17             # The store then accesses that silo and gets the data it needs. #
18             # Silos use pack and unpack to store and retreive data. #
19             # #
20             # stow : #
21             # When stow is called with data and an id, it picks a data #
22             # silo based on the size of the data. The silos are numbered #
23             # starting with 12. The size of the silo is 2^N where N is #
24             # the number of the silo. The smallest silo by default is 4096 #
25             # bytes. #
26             #######################################################################
27              
28              
29             #
30             # The original that adheres the record store interface.
31             # namely
32             # * open_store
33             #
34             # * stow
35             # * fetch
36             # * delete_record
37             # * next_id
38             # * last_updated
39             #
40             # * highest_entry_id
41             #
42             # * lock
43             # * unlock
44             #
45             # * use_transaction
46             # * commit_transaction
47             # * rollback_transaction
48             # * list_transactions
49             # * empty
50             #
51              
52 9     9   953015 use strict;
  9         27  
  9         271  
53 9     9   46 use warnings;
  9         18  
  9         266  
54 9     9   53 no warnings 'numeric';
  9         18  
  9         291  
55 9     9   71 no warnings 'uninitialized';
  9         18  
  9         240  
56              
57 9     9   57 use Fcntl qw( :flock SEEK_SET );
  9         20  
  9         960  
58 9     9   55 use File::Path qw(make_path);
  9         18  
  9         462  
59 9     9   753 use Data::Dumper;
  9         6860  
  9         413  
60 9     9   4601 use YAML;
  9         64305  
  9         461  
61              
62 9     9   4330 use Data::RecordStore::Silo;
  9         27  
  9         268  
63 9     9   4029 use Data::RecordStore::Transaction;
  9         27  
  9         349  
64              
65 9     9   62 use vars qw($VERSION);
  9         18  
  9         684  
66              
67             $VERSION = '6.04';
68             my $SILO_VERSION = '6.00';
69              
70             use constant {
71             # record state
72 9         4510 RS_ACTIVE => 1,
73             RS_DEAD => 2,
74             RS_IN_TRANSACTION => 3,
75              
76             TR_ACTIVE => 1,
77             TR_IN_COMMIT => 2,
78             TR_IN_ROLLBACK => 3,
79             TR_COMPLETE => 4,
80              
81             DIRECTORY => 0,
82             ACTIVE_TRANS_FILE => 1,
83             MAX_FILE_SIZE => 2,
84             MIN_SILO_ID => 3,
85             INDEX_SILO => 4,
86             SILOS => 5,
87             TRANSACTION_INDEX_SILO => 6,
88             TRANSACTION => 7,
89             HEADER_SIZE => 8,
90             LOCK_FH => 9,
91             LOCKS => 10,
92             MAX_SILO_ID => 11,
93 9     9   55 };
  9         17  
94             #
95             # On disc block size : If the reads align with block sizes, the reads will go faster
96             # however, there are some annoying things about finding out what that blocksize is.
97             # (spoiler: it is currently either 4096 or 512 bytes and probably the former)
98             #
99             # ways to determine block size :
100             # sudo blockdev -getbsz /dev/sda1
101             # lsblk -o NAME,MOUNTPOINT,FSTYPE,TYPE,SIZE,STATE,DISC-MAX,DISC-GRAN,PHY-SEC,MIN-IO,RQ-SIZE
102             # and they disagree :(
103             # using 4096 in any case. It won't be slower, though the space required could be 8 times are large.
104             #
105             # note: this is only for the records, not for any indexes.
106             #
107              
108             sub reopen_store {
109 36     36 1 97540 my( $cls, $dir ) = @_;
110 36         268 my $cfgfile = "$dir/config.yaml";
111 36 100       1022 if( -e $cfgfile ) {
112 32         254 my $lockfile = "$dir/LOCK";
113 32 50       455 die "no lock file found" unless -e $lockfile;
114 32 50       2068 open my $lock_fh, '+<', $lockfile or die "$@ $!";
115 32         317574 flock( $lock_fh, LOCK_EX );
116              
117 32         295 my $lock_dir = "$dir/user_locks";
118 32         147 my $trans_dir = "$dir/transactions";
119 32 50       494 die "locks directory not found" unless -d $lock_dir;
120 32 50       492 die "transaction directory not found" unless -d $trans_dir;
121            
122 32         621 my $cfg = YAML::LoadFile( $cfgfile );
123              
124 32         267292 my $index_silo = Data::RecordStore::Silo->reopen_silo( "$dir/index_silo" );
125 32         232 my $transaction_index_silo = Data::RecordStore::Silo->reopen_silo( "$dir/transaction_index_silo" );
126              
127 32         147 my $max_file_size = $cfg->{MAX_FILE_SIZE};
128 32         84 my $min_file_size = $cfg->{MIN_FILE_SIZE};
129 32         585 my $max_silo_id = int( log( $max_file_size ) / log( 2 ));
130 32 50       292 $max_silo_id++ if 2 ** $max_silo_id < $max_file_size;
131 32         144 my $min_silo_id = int( log( $min_file_size ) / log( 2 ));
132 32 100       135 $min_silo_id++ if 2 ** $min_silo_id < $min_file_size;
133              
134 32         159 my $silo_dir = "$dir/data_silos";
135 32         90 my $silos = [];
136 32         218 for my $silo_id ($min_silo_id..$max_silo_id) {
137 672         3656 $silos->[$silo_id] = Data::RecordStore::Silo->reopen_silo( "$silo_dir/$silo_id" );
138             }
139              
140 32         155 my $header = pack( 'ILL', 1,2,3 );
141 9     9   72 my $header_size = do { use bytes; length( $header ) };
  9         17  
  9         37  
  32         83  
  32         119  
142              
143 32         325 my $store = bless [
144             $dir,
145             "$dir/ACTIVE_TRANS",
146             $max_file_size,
147             $min_silo_id,
148             $index_silo,
149             $silos,
150             $transaction_index_silo,
151             undef,
152             $header_size, # the ILL from ILLa*
153             $lock_fh,
154             [],
155             $max_silo_id,
156             ], $cls;
157 32         686 $store->_fix_transactions;
158 32         611 flock( $lock_fh, LOCK_UN );
159 32         2281 return $store;
160             }
161 4         90 die "could not find record store in $dir";
162             } #reopen_store
163              
164             sub open_store {
165 229     229 1 9231829 my( $cls, @options ) = @_;
166 229 100       1186 if( @options == 1 ) {
167 55         199 unshift @options, 'BASE_PATH';
168             }
169 229         928 my( %options ) = @options;
170 229         582 my $dir = $options{BASE_PATH};
171 229 100       3154 unless( -d $dir ) {
172 14         95 _make_path( $dir, 'base' );
173             }
174 229         760 my $max_file_size = $options{MAX_FILE_SIZE};
175 229 100       893 $max_file_size = $Data::RecordStore::Silo::DEFAULT_MAX_FILE_SIZE unless $max_file_size;
176            
177 229         460 my $min_file_size = $options{MIN_FILE_SIZE};
178 229 100       655 $min_file_size = $Data::RecordStore::Silo::DEFAULT_MIN_FILE_SIZE unless $min_file_size;
179 229 100       646 if( $min_file_size > $max_file_size ) {
180 16         192 die "MIN_FILE_SIZE cannot be more than MAX_FILE_SIZE";
181             }
182 213         1040 my $max_silo_id = int( log( $max_file_size ) / log( 2 ));
183 213 100       711 $max_silo_id++ if 2 ** $max_silo_id < $max_file_size;
184 213         570 my $min_silo_id = int( log( $min_file_size ) / log( 2 ));
185 213 100       592 $min_silo_id++ if 2 ** $min_silo_id < $min_file_size;
186 213         540 my $lockfile = "$dir/LOCK";
187 213         754 my $lock_fh;
188              
189 213         770 my $silo_dir = "$dir/data_silos";
190 213         472 my $lock_dir = "$dir/user_locks";
191 213         568 my $trans_dir = "$dir/transactions";
192            
193 213 100       3499 if( -e $lockfile ) {
194 55 50       1788 open $lock_fh, '+<', $lockfile or die "$@ $!";
195 55         548 flock( $lock_fh, LOCK_EX );
196             }
197             else {
198 158 50       9813 open $lock_fh, '>', $lockfile or die "$@ $!";
199 158         1838 flock( $lock_fh, LOCK_EX );
200 158         1513 $lock_fh->autoflush(1);
201 158         14927 print $lock_fh "LOCK\n";
202            
203 158         954 my $vers_file = "$dir/VERSION";
204 158 50       2971 if( -e $vers_file ) {
205 0         0 die "Aborting open : lock file did not exist but version file did. This may mean a partial store was in here at some point in $dir.";
206             }
207            
208 158         985 _make_path( $silo_dir, 'silo' );
209 158         540 _make_path( $lock_dir, 'lock' );
210 158         677 _make_path( $trans_dir, 'transaction' );
211            
212 158         9388 open my $out, '>', "$dir/config.yaml";
213 158         2620 print $out <<"END";
214             VERSION: $VERSION
215             MAX_FILE_SIZE: $max_file_size
216             MIN_FILE_SIZE: $min_file_size
217             END
218 158         4191 close $out;
219            
220 158         7491 open my $vers_fh, '>', $vers_file;
221 158         1357 print $vers_fh "$VERSION\n";
222 158         4207 close $vers_fh;
223             }
224              
225 213         3169 my $index_silo = Data::RecordStore::Silo->open_silo( "$dir/index_silo",
226             "ILL", #silo id, id in silo, last updated time
227             0,
228             $max_file_size );
229 213         1375 my $transaction_index_silo = Data::RecordStore::Silo->open_silo( "$dir/transaction_index_silo",
230             "IL", #state, time
231             0,
232             $max_file_size );
233              
234 213         634 my $silos = [];
235              
236 213         882 for my $silo_id ($min_silo_id..$max_silo_id) {
237 4004         21871 $silos->[$silo_id] = Data::RecordStore::Silo->open_silo( "$silo_dir/$silo_id",
238             'ILLa*', # status, id, data-length, data
239             2 ** $silo_id,
240             $max_file_size );
241             }
242 213         698 my $header = pack( 'ILL', 1,2,3 );
243 9     9   6198 my $header_size = do { use bytes; length( $header ) };
  9         18  
  9         36  
  213         417  
  213         852  
244              
245 213         1825 my $store = bless [
246             $dir,
247             "$dir/ACTIVE_TRANS",
248             $max_file_size,
249             $min_silo_id,
250             $index_silo,
251             $silos,
252             $transaction_index_silo,
253             undef,
254             $header_size, # the ILL from ILLa*
255             $lock_fh,
256             [],
257             $max_silo_id,
258             ], $cls;
259 213         954 $store->_fix_transactions;
260 213         1754 flock( $lock_fh, LOCK_UN );
261 213         7628 return $store;
262             } #open_store
263              
264             sub fetch {
265 803     803 1 633793 my( $self, $id, $no_trans ) = @_;
266 803         1652 my $trans = $self->[TRANSACTION];
267 803 100 100     2581 if( $trans && ! $no_trans ) {
268 169 100       453 if( $trans->{state} != TR_ACTIVE ) {
269 16         144 die "Transaction is in a bad state. Cannot fetch";
270             }
271 153         564 return $trans->fetch( $id );
272             }
273              
274 634         1710 $self->_read_lock;
275              
276 634 100       1867 if( $id > $self->entry_count ) {
277 1         13 return undef;
278             }
279              
280 633         1119 my( $silo_id, $id_in_silo ) = @{$self->[INDEX_SILO]->get_record($id)};
  633         2223  
281 633 100       2977 if( $silo_id ) {
282 536         1963 my $ret = $self->[SILOS]->[$silo_id]->get_record( $id_in_silo );
283 536         20721 $self->_unlock;
284 536         66810 return substr( $ret->[3], 0, $ret->[2] );
285             }
286              
287 97         342 $self->_unlock;
288 97         606 return undef;
289             } #fetch
290              
291             sub stow {
292 824     824 1 244636 my $self = $_[0];
293 824         1544 my $id = $_[2];
294              
295 824         1390 my $trans = $self->[TRANSACTION];
296 824 100       1899 if( $trans ) {
297 263         1269 return $trans->stow( $_[1], $id );
298             }
299              
300 561         924 my $index = $self->[INDEX_SILO];
301 561         1615 $self->_write_lock;
302              
303 561         2187 $index->ensure_entry_count( $id );
304 561 100 100     1802 if( defined $id && $id < 1 ) {
305 2         19 die "The id must be a supplied as a positive integer";
306             }
307 559         1018 my( $old_silo_id, $old_id_in_silo );
308 559 100       1366 if( $id > 0 ) {
309 49         96 ( $old_silo_id, $old_id_in_silo ) = @{$index->get_record($id)};
  49         186  
310             }
311             else {
312 510         1607 $id = $index->next_id;
313             }
314              
315 9     9   4345 my $data_write_size = do { use bytes; length $_[1] };
  9         18  
  9         36  
  558         973  
  558         1221  
316 558         1653 my $new_silo_id = $self->silo_id_for_size( $data_write_size );
317 558         997 my $new_silo = $self->[SILOS][$new_silo_id];
318              
319 558         2545 my $new_id_in_silo = $new_silo->push( [RS_ACTIVE, $id, $data_write_size, $_[1]] );
320              
321 558         2984 $index->put_record( $id, [$new_silo_id,$new_id_in_silo,time] );
322              
323 558 100       2420 if( $old_silo_id ) {
324 20         129 $self->_vacate( $old_silo_id, $old_id_in_silo );
325             }
326              
327 558         1954 $self->_unlock;
328 558         3202 return $id;
329             } #stow
330              
331             sub next_id {
332 193     193 1 96383 return shift->[INDEX_SILO]->next_id;
333             } #next_id
334              
335             sub delete_record {
336 138     138 1 662 my( $self, $del_id ) = @_;
337 138         889 $self->_write_lock;
338 138         325 my $trans = $self->[TRANSACTION];
339 138 100       617 if( $trans ) {
340 72         256 $self->_unlock;
341 72         400 return $trans->delete_record( $del_id );
342             }
343              
344 66 100       614 if( $del_id > $self->[INDEX_SILO]->entry_count ) {
345 1         17 warn "Tried to delete past end of records";
346 1         11 $self->_unlock;
347 1         5 return undef;
348             }
349 65         205 my( $old_silo_id, $old_id_in_silo ) = @{$self->[INDEX_SILO]->get_record($del_id)};
  65         262  
350 65         488 $self->[INDEX_SILO]->put_record( $del_id, [0,0,time] );
351              
352 65 100       300 if( $old_silo_id ) {
353 57         213 $self->_vacate( $old_silo_id, $old_id_in_silo );
354             }
355 65         235 $self->_unlock;
356             } #delete_record
357              
358             # locks the given lock names
359             # they are locked in order to prevent deadlocks.
360             sub lock {
361 42     42 1 4953 my( $self, @locknames ) = @_;
362              
363 42         80 my( %previously_locked ) = ( map { $_ => 1 } @{$self->[LOCKS]} );
  27         90  
  42         132  
364              
365 42 100 66     84 if( @{$self->[LOCKS]} && grep { ! $previously_locked{$_} } @locknames ) {
  42         243  
  9         70  
366 9         97 die "Data::RecordStore->lock cannot be called twice in a row without unlocking between";
367             }
368 33         80 my $fhs = [];
369              
370 33         74 my $failed;
371              
372 33         148 for my $name (sort @locknames) {
373 70 100       332 next if $previously_locked{$name}++;
374 61         204 my $lockfile = "$self->[DIRECTORY]/user_locks/$name";
375 61         95 my $fh;
376 61 100       1079 if( -e $lockfile ) {
377 11 50       387 unless( open ( $fh, '+<', $lockfile ) ) {
378 0         0 $failed = 1;
379 0         0 last;
380             }
381 11         148111 flock( $fh, LOCK_EX ); #WRITE LOCK
382             }
383             else {
384 50 50       2518 unless( open( $fh, '>', $lockfile ) ) {
385 0         0 $failed = 1;
386 0         0 last;
387             }
388 50         448 flock( $fh, LOCK_EX ); #WRITE LOCK
389 50         412 $fh->autoflush(1);
390 50         2807 print $fh '';
391             }
392 61         201 push @$fhs, $fh;
393             }
394              
395 33 50       113 if( $failed ) {
396             # it must be able to lock all the locks or it fails
397             # if it failed, unlock any locks it managed to get
398 0         0 for my $fh (@$fhs) {
399 0         0 flock( $fh, LOCK_UN );
400             }
401 0         0 die "Data::RecordStore->lock : lock failed";
402             } else {
403 33         189 $self->[LOCKS] = $fhs;
404             }
405              
406             } #lock
407              
408             # unlocks all locks
409             sub unlock {
410 35     35 1 220384 my $self = shift;
411 35         81 my $fhs = $self->[LOCKS];
412              
413 35         211 for my $fh (@$fhs) {
414 60         540 flock( $fh, LOCK_UN );
415             }
416 35         584 @$fhs = ();
417             } #unlock
418              
419             sub use_transaction {
420 94     94 1 26390 my $self = shift;
421 94 100       423 if( $self->[TRANSACTION] ) {
422 24         200 warn __PACKAGE__."->use_transaction : already in transaction";
423 24         120 return $self->[TRANSACTION];
424             }
425 70         244 $self->_write_lock;
426 70         563 my $tid = $self->[TRANSACTION_INDEX_SILO]->push( [TR_ACTIVE, time] );
427 70         361 $self->_unlock;
428 70         375 my $tdir = "$self->[DIRECTORY]/transactions/$tid";
429 70         10393 make_path( $tdir, { error => \my $err } );
430 70 50       477 if( @$err ) { die join( ", ", map { values %$_ } @$err ) }
  0         0  
  0         0  
431              
432 70         982 $self->[TRANSACTION] = Data::RecordStore::Transaction->create( $self, $tdir, $tid );
433 70         268 return $self->[TRANSACTION];
434             } #use_transaction
435              
436             sub commit_transaction {
437 54     54 1 21057 my $self = shift;
438 54         192 $self->_write_lock;
439 54         197 my $trans = $self->[TRANSACTION];
440 54 100       206 unless( $trans ) {
441 8         88 die __PACKAGE__."->commit_transaction : no transaction to commit";
442             }
443 46         124 my $trans_file = $self->[ACTIVE_TRANS_FILE];
444 46         3189 open my $trans_fh, '>', $trans_file;
445 46         448 print $trans_fh " ";
446 46         1314 close $trans_fh;
447              
448 46         473 $trans->commit;
449 30         110 delete $self->[TRANSACTION];
450 30         1605 unlink $trans_file;
451 30         183 $self->_unlock;
452             } #commit_transaction
453              
454             sub rollback_transaction {
455 32     32 1 4808 my $self = shift;
456 32         136 $self->_write_lock;
457 32         88 my $trans = $self->[TRANSACTION];
458 32 100       112 unless( $trans ) {
459 8         152 die __PACKAGE__."->rollback_transaction : no transaction to roll back";
460             }
461 24         80 my $trans_file = $self->[ACTIVE_TRANS_FILE];
462 24         1792 open my $trans_fh, '>', $trans_file;
463 24         320 print $trans_fh " ";
464 24         1760 close $trans_fh;
465 24         216 $trans->rollback;
466 16         56 delete $self->[TRANSACTION];
467 16         736 unlink $trans_file;
468 16         88 $self->_unlock;
469             } #rollback_transaction
470              
471             sub entry_count {
472 828     828 1 3608 return shift->[INDEX_SILO]->entry_count;
473             }
474              
475             sub index_silo {
476 519     519 1 2868 return shift->[INDEX_SILO];
477             }
478              
479             sub max_file_size {
480 86     86 1 730 return shift->[MAX_FILE_SIZE];
481             }
482              
483             sub silos {
484 574     574 1 24820 return [@{shift->[SILOS]}];
  574         3292  
485             }
486              
487             sub transaction_silo {
488 1938     1938 1 18443 return shift->[TRANSACTION_INDEX_SILO];
489             }
490              
491             sub silos_entry_count {
492 112     112 1 384 my $self = shift;
493 112         264 my $silos = $self->silos;
494 112         192 my $count = 0;
495 112         232 for my $silo (grep {defined} @$silos) {
  3584         5464  
496 2240         6760 $count += $silo->entry_count;
497             }
498 112         1000 return $count;
499             }
500              
501             sub record_count {
502 8     8 0 152 goto &active_entry_count;
503             }
504              
505             sub active_entry_count {
506 64     64 1 168 my $self = shift;
507 64         160 my $index = $self->index_silo;
508 64         104 my $count = 0;
509 64         168 for(1..$self->entry_count) {
510 312         528 my( $silo_id ) = @{$index->get_record( $_ )};
  312         960  
511 312 100       1160 ++$count if $silo_id;
512             }
513 64         416 return $count;
514             }
515              
516             sub detect_version {
517 15     15 1 4584 my( $cls, $dir ) = @_;
518 15         46 my $ver_file = "$dir/VERSION";
519 15         32 my $source_version;
520 15 100       246 if ( -e $ver_file ) {
    100          
521 13         437 open( my $FH, "<", $ver_file );
522 13         240 $source_version = <$FH>;
523 13         57 chomp $source_version;
524 13         150 close $FH;
525             } elsif( -e "$dir/STORE_INDEX" ) {
526 1         6 return 1;
527             }
528 14         88 return $source_version;
529             } #detect_version
530              
531              
532              
533             sub _vacate {
534 205     205   518 my( $self, $silo_id, $id_to_empty ) = @_;
535 205         442 my $silo = $self->[SILOS][$silo_id];
536 205         599 my $rc = $silo->entry_count;
537 205 100       656 if( $id_to_empty == $rc ) {
538 118         485 $silo->pop;
539             } else {
540 87         418 while( $rc > $id_to_empty ) {
541 63         124 my( $state, $id ) = (@{$silo->get_record( $rc, 'IL' )});
  63         239  
542 63 100       309 if( $state == RS_ACTIVE ) {
    100          
543 47         346 $silo->copy_record($rc,$id_to_empty);
544 47         311 $self->[INDEX_SILO]->put_record( $id, [$silo_id,$id_to_empty], "IL" );
545 47         284 $silo->pop;
546 47         174 return;
547             }
548             elsif( $state == RS_DEAD ) {
549 8         48 $silo->pop;
550             }
551             else {
552 8         32 return;
553             }
554 8         40 $rc--;
555             }
556             }
557             } #_vacate
558              
559             sub silo_id_for_size {
560 821     821 0 1841 my( $self, $data_write_size ) = @_;
561              
562 821         2007 my $write_size = $self->[HEADER_SIZE] + $data_write_size;
563              
564 821         2645 my $silo_id = int( log( $write_size ) / log( 2 ) );
565 821 100       2314 $silo_id++ if 2 ** $silo_id < $write_size;
566 821 100       1912 $silo_id = $self->[MIN_SILO_ID] if $silo_id < $self->[MIN_SILO_ID];
567 821         1777 return $silo_id;
568             } #silo_id_for_size
569              
570             # ---------------------- private stuffs -------------------------
571              
572             sub _make_path {
573 488     488   1496 my( $dir, $msg ) = @_;
574 488         64172 make_path( $dir, { error => \my $err } );
575 488 50       3270 if( @$err ) {
576 0         0 die "unable to make $msg directory.". join( ", ", map { $_->{$dir} } @$err );
  0         0  
577             }
578             }
579              
580             sub _read_lock {
581 634     634   1110 my $self = shift;
582 634         68487 flock( $self->[LOCK_FH], LOCK_SH );
583 634         2137 $self->_fix_transactions;
584             }
585              
586             sub _unlock {
587 1469     1469   3205 my( $self ) = @_;
588 1469         13192 flock( $self->[LOCK_FH], LOCK_UN );
589             }
590              
591             sub _write_lock {
592 855     855   1367 my $self = shift;
593 855         326515 flock( $self->[LOCK_FH], LOCK_EX );
594 855         2811 $self->_fix_transactions;
595             }
596              
597             sub _fix_transactions {
598 1734     1734   2981 my $self = shift;
599             # check the transactions
600             # if the transaction is in an incomplete state, fix it. Since the store is write locked
601             # during transactions, the lock has expired if this point has been reached.
602             # that means the process that made the lock has fallen.
603             #
604             # of course, do a little experiment to test this with two processes and flock when
605             # one exits before unflocking.
606             #
607 1734         3662 my $transaction_index_silo = $self->transaction_silo;
608 1734         5987 my $last_trans = $transaction_index_silo->entry_count;
609 1734         5609 while( $last_trans ) {
610 333         545 my( $state ) = @{$transaction_index_silo->get_record( $last_trans )};
  333         1122  
611 333         2358 my $tdir = "$self->[DIRECTORY]/transactions/$last_trans";
612 333 100 100     1881 if( $state == TR_IN_ROLLBACK ||
    100          
613             $state == TR_IN_COMMIT ) {
614             # do a full rollback
615             # load the transaction
616 16         144 my $trans = Data::RecordStore::Transaction->create( $self, $tdir, $last_trans );
617 16         80 $trans->rollback;
618 16         440 $transaction_index_silo->pop;
619             }
620             elsif( $state == TR_COMPLETE ) {
621 43         225 $transaction_index_silo->pop;
622             }
623             else {
624 274         730 return;
625             }
626 59         319 $last_trans--;
627             }
628              
629             } #_fix_transactions
630              
631              
632             "I became Iggy because I had a sadistic boss at a record store. I'd been in a band called the Iguanas. And when this boss wanted to embarrass and demean me, he'd say, 'Iggy, get me a coffee, light.' - Iggy Pop";
633              
634             __END__