File Coverage

blib/lib/Data/RecordStore.pm
Criterion Covered Total %
statement 334 346 96.5
branch 87 100 87.0
condition 11 12 91.6
subroutine 42 42 100.0
pod 19 21 90.4
total 493 521 94.6


line stmt bran cond sub pod time code
1             package Data::RecordStore;
2              
3             #######################################################################
4             # The RecordStore is an index-value store that is composed of fixed #
5             # sized data silos to store its data, transaction info and #
6             # indexes. The silos are primarily binary data files so finding a #
7             # record in them is as easy as multiplying the record size times #
8             # the index and performing a seek on the silo's file handle. #
9             # #
10             # Its primary subs are stow and fetch. Their description should #
11             # give a good feel for how the record store works in general. #
12             # #
13             # fetch : #
14             # When fetch is called for a id, it uses its index silo to look #
15             # up which fixed record silo the data for that id is stored in #
16             # and what silo index it has. #
17             # The store then accesses that silo and gets the data it needs. #
18             # Silos use pack and unpack to store and retreive data. #
19             # #
20             # stow : #
21             # When stow is called with data and an id, it picks a data #
22             # silo based on the size of the data. The silos are numbered #
23             # starting with 12. The size of the silo is 2^N where N is #
24             # the number of the silo. The smallest silo by default is 4096 #
25             # bytes. #
26             #######################################################################
27              
28              
29             #
30             # The original that adheres the record store interface.
31             # namely
32             # * open_store
33             #
34             # * stow
35             # * fetch
36             # * delete_record
37             # * next_id
38             # * last_updated
39             #
40             # * highest_entry_id
41             #
42             # * lock
43             # * unlock
44             #
45             # * use_transaction
46             # * commit_transaction
47             # * rollback_transaction
48             # * list_transactions
49             # * empty
50             #
51              
52 9     9   769984 use strict;
  9         22  
  9         223  
53 9     9   36 use warnings;
  9         9  
  9         237  
54 9     9   44 no warnings 'numeric';
  9         9  
  9         294  
55 9     9   44 no warnings 'uninitialized';
  9         18  
  9         202  
56              
57 9     9   43 use Fcntl qw( :flock SEEK_SET );
  9         10  
  9         1211  
58 9     9   45 use File::Path qw(make_path);
  9         18  
  9         348  
59 9     9   659 use Data::Dumper;
  9         5463  
  9         328  
60 9     9   3405 use YAML;
  9         50770  
  9         370  
61              
62 9     9   3346 use Data::RecordStore::Silo;
  9         18  
  9         232  
63 9     9   3303 use Data::RecordStore::Transaction;
  9         35  
  9         232  
64              
65 9     9   45 use vars qw($VERSION);
  9         18  
  9         638  
66              
67             $VERSION = '6.05';
68             my $SILO_VERSION = '6.00';
69              
70             use constant {
71             # record state
72 9         3531 RS_ACTIVE => 1,
73             RS_DEAD => 2,
74             RS_IN_TRANSACTION => 3,
75              
76             TR_ACTIVE => 1,
77             TR_IN_COMMIT => 2,
78             TR_IN_ROLLBACK => 3,
79             TR_COMPLETE => 4,
80              
81             DIRECTORY => 0,
82             ACTIVE_TRANS_FILE => 1,
83             MAX_FILE_SIZE => 2,
84             MIN_SILO_ID => 3,
85             INDEX_SILO => 4,
86             SILOS => 5,
87             TRANSACTION_INDEX_SILO => 6,
88             TRANSACTION => 7,
89             HEADER_SIZE => 8,
90             LOCK_FH => 9,
91             LOCKS => 10,
92             MAX_SILO_ID => 11,
93 9     9   45 };
  9         18  
94             #
95             # On disc block size : If the reads align with block sizes, the reads will go faster
96             # however, there are some annoying things about finding out what that blocksize is.
97             # (spoiler: it is currently either 4096 or 512 bytes and probably the former)
98             #
99             # ways to determine block size :
100             # sudo blockdev -getbsz /dev/sda1
101             # lsblk -o NAME,MOUNTPOINT,FSTYPE,TYPE,SIZE,STATE,DISC-MAX,DISC-GRAN,PHY-SEC,MIN-IO,RQ-SIZE
102             # and they disagree :(
103             # using 4096 in any case. It won't be slower, though the space required could be 8 times are large.
104             #
105             # note: this is only for the records, not for any indexes.
106             #
107              
108             sub reopen_store {
109 36     36 1 91900 my( $cls, $dir ) = @_;
110 36         161 my $cfgfile = "$dir/config.yaml";
111 36 100       761 if( -e $cfgfile ) {
112 32         154 my $lockfile = "$dir/LOCK";
113 32 50       303 die "no lock file found" unless -e $lockfile;
114 32 50       1581 open my $lock_fh, '+<', $lockfile or die "$@ $!";
115 32         238925 flock( $lock_fh, LOCK_EX );
116              
117 32         199 my $lock_dir = "$dir/user_locks";
118 32         136 my $trans_dir = "$dir/transactions";
119 32 50       358 die "locks directory not found" unless -d $lock_dir;
120 32 50       338 die "transaction directory not found" unless -d $trans_dir;
121            
122 32         421 my $cfg = YAML::LoadFile( $cfgfile );
123              
124 32         208208 my $index_silo = Data::RecordStore::Silo->reopen_silo( "$dir/index_silo" );
125 32         153 my $transaction_index_silo = Data::RecordStore::Silo->reopen_silo( "$dir/transaction_index_silo" );
126              
127 32         96 my $max_file_size = $cfg->{MAX_FILE_SIZE};
128 32         74 my $min_file_size = $cfg->{MIN_FILE_SIZE};
129 32         483 my $max_silo_id = int( log( $max_file_size ) / log( 2 ));
130 32 50       146 $max_silo_id++ if 2 ** $max_silo_id < $max_file_size;
131 32         113 my $min_silo_id = int( log( $min_file_size ) / log( 2 ));
132 32 100       114 $min_silo_id++ if 2 ** $min_silo_id < $min_file_size;
133              
134 32         134 my $silo_dir = "$dir/data_silos";
135 32         71 my $silos = [];
136 32         195 for my $silo_id ($min_silo_id..$max_silo_id) {
137 672         2738 $silos->[$silo_id] = Data::RecordStore::Silo->reopen_silo( "$silo_dir/$silo_id" );
138             }
139              
140 32         104 my $header = pack( 'ILL', 1,2,3 );
141 9     9   54 my $header_size = do { use bytes; length( $header ) };
  9         17  
  9         27  
  32         65  
  32         94  
142              
143 32         263 my $store = bless [
144             $dir,
145             "$dir/ACTIVE_TRANS",
146             $max_file_size,
147             $min_silo_id,
148             $index_silo,
149             $silos,
150             $transaction_index_silo,
151             undef,
152             $header_size, # the ILL from ILLa*
153             $lock_fh,
154             [],
155             $max_silo_id,
156             ], $cls;
157 32         323 $store->_fix_transactions;
158 32         14445 flock( $lock_fh, LOCK_UN );
159 32         1827 return $store;
160             }
161 4         72 die "could not find record store in $dir";
162             } #reopen_store
163              
164             sub open_store {
165 229     229 1 7746286 my( $cls, @options ) = @_;
166 229 100       945 if( @options == 1 ) {
167 55         128 unshift @options, 'BASE_PATH';
168             }
169 229         789 my( %options ) = @options;
170 229         488 my $dir = $options{BASE_PATH};
171 229 100       2384 unless( -d $dir ) {
172 14         76 _make_path( $dir, 'base' );
173             }
174 229         719 my $max_file_size = $options{MAX_FILE_SIZE};
175 229 100       625 $max_file_size = $Data::RecordStore::Silo::DEFAULT_MAX_FILE_SIZE unless $max_file_size;
176            
177 229         370 my $min_file_size = $options{MIN_FILE_SIZE};
178 229 100       591 $min_file_size = $Data::RecordStore::Silo::DEFAULT_MIN_FILE_SIZE unless $min_file_size;
179 229 100       482 if( $min_file_size > $max_file_size ) {
180 16         152 die "MIN_FILE_SIZE cannot be more than MAX_FILE_SIZE";
181             }
182 213         899 my $max_silo_id = int( log( $max_file_size ) / log( 2 ));
183 213 100       613 $max_silo_id++ if 2 ** $max_silo_id < $max_file_size;
184 213         612 my $min_silo_id = int( log( $min_file_size ) / log( 2 ));
185 213 100       665 $min_silo_id++ if 2 ** $min_silo_id < $min_file_size;
186 213         552 my $lockfile = "$dir/LOCK";
187 213         263 my $lock_fh;
188              
189 213         715 my $silo_dir = "$dir/data_silos";
190 213         365 my $lock_dir = "$dir/user_locks";
191 213         377 my $trans_dir = "$dir/transactions";
192            
193 213 100       2838 if( -e $lockfile ) {
194 55 50       1482 open $lock_fh, '+<', $lockfile or die "$@ $!";
195 55         462 flock( $lock_fh, LOCK_EX );
196             }
197             else {
198 158 50       7523 open $lock_fh, '>', $lockfile or die "$@ $!";
199 158         1458 flock( $lock_fh, LOCK_EX );
200 158         1376 $lock_fh->autoflush(1);
201 158         11878 print $lock_fh "LOCK\n";
202            
203 158         660 my $vers_file = "$dir/VERSION";
204 158 50       2284 if( -e $vers_file ) {
205 0         0 die "Aborting open : lock file did not exist but version file did. This may mean a partial store was in here at some point in $dir.";
206             }
207            
208 158         819 _make_path( $silo_dir, 'silo' );
209 158         455 _make_path( $lock_dir, 'lock' );
210 158         500 _make_path( $trans_dir, 'transaction' );
211            
212 158         6967 open my $out, '>', "$dir/config.yaml";
213 158         1783 print $out <<"END";
214             VERSION: $VERSION
215             MAX_FILE_SIZE: $max_file_size
216             MIN_FILE_SIZE: $min_file_size
217             END
218 158         3337 close $out;
219            
220 158         5956 open my $vers_fh, '>', $vers_file;
221 158         955 print $vers_fh "$VERSION\n";
222 158         4997 close $vers_fh;
223             }
224              
225 213         5761 my $index_silo = Data::RecordStore::Silo->open_silo( "$dir/index_silo",
226             "ILL", #silo id, id in silo, last updated time
227             0,
228             $max_file_size );
229 213         1000 my $transaction_index_silo = Data::RecordStore::Silo->open_silo( "$dir/transaction_index_silo",
230             "IL", #state, time
231             0,
232             $max_file_size );
233              
234 213         523 my $silos = [];
235              
236 213         927 for my $silo_id ($min_silo_id..$max_silo_id) {
237 4004         16733 $silos->[$silo_id] = Data::RecordStore::Silo->open_silo( "$silo_dir/$silo_id",
238             'ILLa*', # status, id, data-length, data
239             2 ** $silo_id,
240             $max_file_size );
241             }
242 213         450 my $header = pack( 'ILL', 1,2,3 );
243 9     9   4647 my $header_size = do { use bytes; length( $header ) };
  9         18  
  9         27  
  213         762  
  213         414  
244              
245 213         1174 my $store = bless [
246             $dir,
247             "$dir/ACTIVE_TRANS",
248             $max_file_size,
249             $min_silo_id,
250             $index_silo,
251             $silos,
252             $transaction_index_silo,
253             undef,
254             $header_size, # the ILL from ILLa*
255             $lock_fh,
256             [],
257             $max_silo_id,
258             ], $cls;
259 213         795 $store->_fix_transactions;
260 213         1441 flock( $lock_fh, LOCK_UN );
261 213         6395 return $store;
262             } #open_store
263              
264             sub fetch {
265 803     803 1 579855 my( $self, $id, $no_trans ) = @_;
266 803         1294 my $trans = $self->[TRANSACTION];
267 803 100 100     2145 if( $trans && ! $no_trans ) {
268 169 100       403 if( $trans->{state} != TR_ACTIVE ) {
269 16         112 die "Transaction is in a bad state. Cannot fetch";
270             }
271 153         502 return $trans->fetch( $id );
272             }
273              
274 634         1548 $self->_read_lock;
275              
276 634 100       1793 if( $id > $self->entry_count ) {
277 1         5 return undef;
278             }
279              
280 633         1060 my( $silo_id, $id_in_silo ) = @{$self->[INDEX_SILO]->get_record($id)};
  633         1989  
281 633 100       2360 if( $silo_id ) {
282 536         1652 my $ret = $self->[SILOS]->[$silo_id]->get_record( $id_in_silo );
283 536         20362 $self->_unlock;
284 536         53342 return substr( $ret->[3], 0, $ret->[2] );
285             }
286              
287 97         307 $self->_unlock;
288 97         501 return undef;
289             } #fetch
290              
291             sub stow {
292 824     824 1 227463 my $self = $_[0];
293 824         1344 my $id = $_[2];
294              
295 824         1220 my $trans = $self->[TRANSACTION];
296 824 100       1679 if( $trans ) {
297 263         1023 return $trans->stow( $_[1], $id );
298             }
299              
300 561         772 my $index = $self->[INDEX_SILO];
301 561         1277 $self->_write_lock;
302              
303 561         1848 $index->ensure_entry_count( $id );
304 561 100 100     1477 if( defined $id && $id < 1 ) {
305 2         20 die "The id must be a supplied as a positive integer";
306             }
307 559         830 my( $old_silo_id, $old_id_in_silo );
308 559 100       1121 if( $id > 0 ) {
309 49         59 ( $old_silo_id, $old_id_in_silo ) = @{$index->get_record($id)};
  49         161  
310             }
311             else {
312 510         1341 $id = $index->next_id;
313             }
314              
315 9     9   3763 my $data_write_size = do { use bytes; length $_[1] };
  9         17  
  9         27  
  558         802  
  558         1000  
316 558         1578 my $new_silo_id = $self->silo_id_for_size( $data_write_size );
317 558         859 my $new_silo = $self->[SILOS][$new_silo_id];
318              
319 558         2027 my $new_id_in_silo = $new_silo->push( [RS_ACTIVE, $id, $data_write_size, $_[1]] );
320              
321 558         2401 $index->put_record( $id, [$new_silo_id,$new_id_in_silo,time] );
322              
323 558 100       1587 if( $old_silo_id ) {
324 20         59 $self->_vacate( $old_silo_id, $old_id_in_silo );
325             }
326              
327 558         1639 $self->_unlock;
328 558         2734 return $id;
329             } #stow
330              
331             sub next_id {
332 193     193 1 86910 return shift->[INDEX_SILO]->next_id;
333             } #next_id
334              
335             sub delete_record {
336 138     138 1 529 my( $self, $del_id ) = @_;
337 138         695 $self->_write_lock;
338 138         270 my $trans = $self->[TRANSACTION];
339 138 100       602 if( $trans ) {
340 72         232 $self->_unlock;
341 72         328 return $trans->delete_record( $del_id );
342             }
343              
344 66 100       208 if( $del_id > $self->[INDEX_SILO]->entry_count ) {
345 1         9 warn "Tried to delete past end of records";
346 1         12 $self->_unlock;
347 1         7 return undef;
348             }
349 65         106 my( $old_silo_id, $old_id_in_silo ) = @{$self->[INDEX_SILO]->get_record($del_id)};
  65         236  
350 65         350 $self->[INDEX_SILO]->put_record( $del_id, [0,0,time] );
351              
352 65 100       212 if( $old_silo_id ) {
353 57         259 $self->_vacate( $old_silo_id, $old_id_in_silo );
354             }
355 65         195 $self->_unlock;
356             } #delete_record
357              
358             # locks the given lock names
359             # they are locked in order to prevent deadlocks.
360             sub lock {
361 42     42 1 4650 my( $self, @locknames ) = @_;
362              
363 42         63 my( %previously_locked ) = ( map { $_ => 1 } @{$self->[LOCKS]} );
  27         71  
  42         122  
364              
365 42 100 66     85 if( @{$self->[LOCKS]} && grep { ! $previously_locked{$_} } @locknames ) {
  42         189  
  9         69  
366 9         74 die "Data::RecordStore->lock cannot be called twice in a row without unlocking between";
367             }
368 33         76 my $fhs = [];
369              
370 33         44 my $failed;
371              
372 33         119 for my $name (sort @locknames) {
373 70 100       264 next if $previously_locked{$name}++;
374 61         160 my $lockfile = "$self->[DIRECTORY]/user_locks/$name";
375 61         67 my $fh;
376 61 100       852 if( -e $lockfile ) {
377 11 50       315 unless( open ( $fh, '+<', $lockfile ) ) {
378 0         0 $failed = 1;
379 0         0 last;
380             }
381 11         159682 flock( $fh, LOCK_EX ); #WRITE LOCK
382             }
383             else {
384 50 50       1945 unless( open( $fh, '>', $lockfile ) ) {
385 0         0 $failed = 1;
386 0         0 last;
387             }
388 50         366 flock( $fh, LOCK_EX ); #WRITE LOCK
389 50         291 $fh->autoflush(1);
390 50         1999 print $fh '';
391             }
392 61         200 push @$fhs, $fh;
393             }
394              
395 33 50       87 if( $failed ) {
396             # it must be able to lock all the locks or it fails
397             # if it failed, unlock any locks it managed to get
398 0         0 for my $fh (@$fhs) {
399 0         0 flock( $fh, LOCK_UN );
400             }
401 0         0 die "Data::RecordStore->lock : lock failed";
402             } else {
403 33         151 $self->[LOCKS] = $fhs;
404             }
405              
406             } #lock
407              
408             # unlocks all locks
409             sub unlock {
410 35     35 1 220420 my $self = shift;
411 35         77 my $fhs = $self->[LOCKS];
412              
413 35         145 for my $fh (@$fhs) {
414 60         441 flock( $fh, LOCK_UN );
415             }
416 35         564 @$fhs = ();
417             } #unlock
418              
419             sub use_transaction {
420 94     94 1 25140 my $self = shift;
421 94 100       402 if( $self->[TRANSACTION] ) {
422 24         168 warn __PACKAGE__."->use_transaction : already in transaction";
423 24         96 return $self->[TRANSACTION];
424             }
425 70         235 $self->_write_lock;
426 70         457 my $tid = $self->[TRANSACTION_INDEX_SILO]->push( [TR_ACTIVE, time] );
427 70         346 $self->_unlock;
428 70         361 my $tdir = "$self->[DIRECTORY]/transactions/$tid";
429 70         8483 make_path( $tdir, { error => \my $err } );
430 70 50       407 if( @$err ) { die join( ", ", map { values %$_ } @$err ) }
  0         0  
  0         0  
431              
432 70         925 $self->[TRANSACTION] = Data::RecordStore::Transaction->create( $self, $tdir, $tid );
433 70         200 return $self->[TRANSACTION];
434             } #use_transaction
435              
436             sub commit_transaction {
437 54     54 1 26536 my $self = shift;
438 54         190 $self->_write_lock;
439 54         90 my $trans = $self->[TRANSACTION];
440 54 100       200 unless( $trans ) {
441 8         64 die __PACKAGE__."->commit_transaction : no transaction to commit";
442             }
443 46         88 my $trans_file = $self->[ACTIVE_TRANS_FILE];
444 46         2627 open my $trans_fh, '>', $trans_file;
445 46         469 print $trans_fh " ";
446 46         1160 close $trans_fh;
447              
448 46         380 $trans->commit;
449 30         86 delete $self->[TRANSACTION];
450 30         1286 unlink $trans_file;
451 30         154 $self->_unlock;
452             } #commit_transaction
453              
454             sub rollback_transaction {
455 32     32 1 4560 my $self = shift;
456 32         120 $self->_write_lock;
457 32         64 my $trans = $self->[TRANSACTION];
458 32 100       96 unless( $trans ) {
459 8         144 die __PACKAGE__."->rollback_transaction : no transaction to roll back";
460             }
461 24         72 my $trans_file = $self->[ACTIVE_TRANS_FILE];
462 24         1600 open my $trans_fh, '>', $trans_file;
463 24         240 print $trans_fh " ";
464 24         1144 close $trans_fh;
465 24         176 $trans->rollback;
466 16         48 delete $self->[TRANSACTION];
467 16         600 unlink $trans_file;
468 16         80 $self->_unlock;
469             } #rollback_transaction
470              
471             sub entry_count {
472 828     828 1 2911 return shift->[INDEX_SILO]->entry_count;
473             }
474              
475             sub index_silo {
476 519     519 1 2307 return shift->[INDEX_SILO];
477             }
478              
479             sub max_file_size {
480 86     86 1 581 return shift->[MAX_FILE_SIZE];
481             }
482              
483             sub silos {
484 574     574 1 20946 return [@{shift->[SILOS]}];
  574         4354  
485             }
486              
487             sub transaction_silo {
488 1938     1938 1 18575 return shift->[TRANSACTION_INDEX_SILO];
489             }
490              
491             sub silos_entry_count {
492 112     112 1 288 my $self = shift;
493 112         240 my $silos = $self->silos;
494 112         152 my $count = 0;
495 112         208 for my $silo (grep {defined} @$silos) {
  3584         4640  
496 2240         4848 $count += $silo->entry_count;
497             }
498 112         728 return $count;
499             }
500              
501             sub record_count {
502 8     8 0 104 goto &active_entry_count;
503             }
504              
505             sub active_entry_count {
506 64     64 1 136 my $self = shift;
507 64         144 my $index = $self->index_silo;
508 64         88 my $count = 0;
509 64         152 for(1..$self->entry_count) {
510 312         384 my( $silo_id ) = @{$index->get_record( $_ )};
  312         704  
511 312 100       984 ++$count if $silo_id;
512             }
513 64         304 return $count;
514             }
515              
516             sub detect_version {
517 15     15 1 3791 my( $cls, $dir ) = @_;
518 15         46 my $ver_file = "$dir/VERSION";
519 15         27 my $source_version;
520 15 100       207 if ( -e $ver_file ) {
    100          
521 13         343 open( my $FH, "<", $ver_file );
522 13         197 $source_version = <$FH>;
523 13         58 chomp $source_version;
524 13         116 close $FH;
525             } elsif( -e "$dir/STORE_INDEX" ) {
526 1         5 return 1;
527             }
528 14         73 return $source_version;
529             } #detect_version
530              
531              
532              
533             sub _vacate {
534 205     205   433 my( $self, $silo_id, $id_to_empty ) = @_;
535 205         382 my $silo = $self->[SILOS][$silo_id];
536 205         522 my $rc = $silo->entry_count;
537 205 100       516 if( $id_to_empty == $rc ) {
538 118         346 $silo->pop;
539             } else {
540 87         263 while( $rc > $id_to_empty ) {
541 63         117 my( $state, $id ) = (@{$silo->get_record( $rc, 'IL' )});
  63         202  
542 63 100       264 if( $state == RS_ACTIVE ) {
    100          
543 47         189 $silo->copy_record($rc,$id_to_empty);
544 47         258 $self->[INDEX_SILO]->put_record( $id, [$silo_id,$id_to_empty], "IL" );
545 47         190 $silo->pop;
546 47         152 return;
547             }
548             elsif( $state == RS_DEAD ) {
549 8         40 $silo->pop;
550             }
551             else {
552 8         24 return;
553             }
554 8         40 $rc--;
555             }
556             }
557             } #_vacate
558              
559             sub silo_id_for_size {
560 821     821 0 1636 my( $self, $data_write_size ) = @_;
561              
562 821         1504 my $write_size = $self->[HEADER_SIZE] + $data_write_size;
563              
564 821         2280 my $silo_id = int( log( $write_size ) / log( 2 ) );
565 821 100       1996 $silo_id++ if 2 ** $silo_id < $write_size;
566 821 100       1920 $silo_id = $self->[MIN_SILO_ID] if $silo_id < $self->[MIN_SILO_ID];
567 821         1424 return $silo_id;
568             } #silo_id_for_size
569              
570             # ---------------------- private stuffs -------------------------
571              
572             sub _make_path {
573 488     488   1169 my( $dir, $msg ) = @_;
574 488         55594 make_path( $dir, { error => \my $err } );
575 488 50       2530 if( @$err ) {
576 0         0 die "unable to make $msg directory.". join( ", ", map { $_->{$dir} } @$err );
  0         0  
577             }
578             }
579              
580             sub _read_lock {
581 634     634   853 my $self = shift;
582 634         42171 flock( $self->[LOCK_FH], LOCK_SH );
583 634         1967 $self->_fix_transactions;
584             }
585              
586             sub _unlock {
587 1469     1469   2582 my( $self ) = @_;
588 1469         10255 flock( $self->[LOCK_FH], LOCK_UN );
589             }
590              
591             sub _write_lock {
592 855     855   1126 my $self = shift;
593 855         363390 flock( $self->[LOCK_FH], LOCK_EX );
594 855         2347 $self->_fix_transactions;
595             }
596              
597             sub _fix_transactions {
598 1734     1734   2613 my $self = shift;
599             # check the transactions
600             # if the transaction is in an incomplete state, fix it. Since the store is write locked
601             # during transactions, the lock has expired if this point has been reached.
602             # that means the process that made the lock has fallen.
603             #
604             # of course, do a little experiment to test this with two processes and flock when
605             # one exits before unflocking.
606             #
607 1734         3202 my $transaction_index_silo = $self->transaction_silo;
608 1734         4721 my $last_trans = $transaction_index_silo->entry_count;
609 1734         4478 while( $last_trans ) {
610 333         451 my( $state ) = @{$transaction_index_silo->get_record( $last_trans )};
  333         845  
611 333         1769 my $tdir = "$self->[DIRECTORY]/transactions/$last_trans";
612 333 100 100     1506 if( $state == TR_IN_ROLLBACK ||
    100          
613             $state == TR_IN_COMMIT ) {
614             # do a full rollback
615             # load the transaction
616 16         136 my $trans = Data::RecordStore::Transaction->create( $self, $tdir, $last_trans );
617 16         72 $trans->rollback;
618 16         272 $transaction_index_silo->pop;
619             }
620             elsif( $state == TR_COMPLETE ) {
621 43         176 $transaction_index_silo->pop;
622             }
623             else {
624 274         578 return;
625             }
626 59         291 $last_trans--;
627             }
628              
629             } #_fix_transactions
630              
631              
632             "I became Iggy because I had a sadistic boss at a record store. I'd been in a band called the Iguanas. And when this boss wanted to embarrass and demean me, he'd say, 'Iggy, get me a coffee, light.' - Iggy Pop";
633              
634             __END__