File Coverage

blib/lib/Data/RecordStore/Silo.pm
Criterion Covered Total %
statement 219 231 94.8
branch 60 68 88.2
condition 16 16 100.0
subroutine 33 34 97.0
pod 17 17 100.0
total 345 366 94.2


line stmt bran cond sub pod time code
1             package Data::RecordStore::Silo;
2              
3             #
4             # I am a silo. I live in a directory.
5             # I keep my data in silo files in this directory.
6             # Each silo file is allowed to be only so large,
7             # so that is why there may be more than one of them.
8             #
9             # I may be changed by async processes, so a lot
10             # of my coordination and state is on the file system.
11             #
12             # You can init me by giving me a directory,
13             # a template and an optional size and a max size.
14             # I will figure out the record size based on what you give me.
15             # I will default to 2GB for a max size if no max size is given.
16             # I will have limitless size if you give me 0 for a max size
17             # I will save my version, the size, max size and template to the directory
18             #
19             #
20             # You can open me by giving a directory, then
21             # push data to me and I return its id
22             # get data from me after giving me its id
23             # pop data from me
24             # ask how many records I have
25             #
26              
27 13     13   323858 use strict;
  13         66  
  13         352  
28 13     13   52 use warnings;
  13         14  
  13         272  
29 13     13   53 no warnings 'uninitialized';
  13         17  
  13         304  
30 13     13   52 no warnings 'numeric';
  13         14  
  13         242  
31 13     13   116 no strict 'refs';
  13         33  
  13         389  
32              
33 13     13   64 use Fcntl qw( SEEK_SET );
  13         13  
  13         504  
34 13     13   61 use File::Path qw(make_path);
  13         54  
  13         449  
35             #use FileCache;
36 13     13   479 use IO::Handle;
  13         6242  
  13         857  
37 13     13   1378 use YAML;
  13         21573  
  13         566  
38              
39 13     13   69 use vars qw($VERSION);
  13         14  
  13         1038  
40             $VERSION = '6.00';
41              
42             $Data::RecordStore::Silo::DEFAULT_MAX_FILE_SIZE = 2_000_000_000;
43             $Data::RecordStore::Silo::DEFAULT_MIN_FILE_SIZE = 4_096;
44              
45             use constant {
46 13         2181 DIRECTORY => 0,
47             VERSION => 1,
48             TEMPLATE => 2,
49             RECORD_SIZE => 3,
50             MAX_FILE_SIZE => 4,
51             RECORDS_PER_SUBSILO => 5,
52             DIR_HANDLE => 6,
53 13     13   65 };
  13         18  
54              
55              
56             sub open_silo {
57 5315     5315 1 57050 my( $class, $dir, $template, $size, $max_file_size ) = @_;
58              
59 5315 100       10007 if( ! $dir ) {
60 4         24 die "must supply directory to open silo";
61             }
62 5311 100       8573 if( ! $template ) {
63 4         44 die "must supply template to open silo";
64             }
65 13 100   13   78 my $record_size = $template =~ /\*/ ? $size : do { use bytes; length( pack( $template ) ) };
  13         22  
  13         77  
  5307         16341  
  596         2176  
66 5307 100       11230 if( $record_size < 1 ) {
67 4         36 die "no record size given to open silo";
68             }
69 5303 100 100     65430 if( $size && $size != $record_size ) {
70 4         28 die "Silo given size and template size do not match";
71             }
72 5299         372597 make_path( $dir, { error => \my $err } );
73              
74 5299 50       22476 if( @$err ) { die join( ", ", map { $_->{$dir} } @$err ) }
  0         0  
  0         0  
75              
76 5299 100       13017 if( $max_file_size < 1 ) {
77 35         101 $max_file_size = $Data::RecordStore::Silo::DEFAULT_MAX_FILE_SIZE;
78             }
79              
80 5299 100       62791 unless( -e "$dir/0" ) {
81 3308         133223 open my $out, '>', "$dir/config.yaml";
82 3308         22236 print $out <<"END";
83             VERSION: $VERSION
84             TEMPLATE: $template
85             RECORD_SIZE: $record_size
86             MAX_FILE_SIZE: $max_file_size
87             END
88 3308         64860 close $out;
89            
90             # must have at least an empty silo file
91 3308         105328 open $out, '>', "$dir/0";
92 3308         9677 print $out '';
93 3308         26319 close $out;
94             }
95              
96 5299         46543 return bless [
97             $dir,
98             $VERSION,
99             $template,
100             $record_size,
101             $max_file_size,
102             int($max_file_size / $record_size),
103             ], $class;
104             } #open_silo
105              
106             sub reopen_silo {
107 740     740 1 1635 my( $cls, $dir ) = @_;
108 740         1304 my $cfgfile = "$dir/config.yaml";
109 740 50       8055 if( -e $cfgfile ) {
110 740         2473 my $cfg = YAML::LoadFile( $cfgfile );
111 740         1531693 return $cls->open_silo( $dir, @$cfg{qw(TEMPLATE RECORD_SIZE MAX_FILE_SIZE)} );
112             }
113 0         0 die "could not find silo in $dir";
114             } #reopen_silo
115              
116             sub next_id {
117 1967     1967 1 11160 my( $self ) = @_;
118 1967         3204 my $next_id = 1 + $self->entry_count;
119 1967         5535 $self->ensure_entry_count( $next_id );
120 1967         3618 return $next_id;
121             } #next_id
122              
123             sub entry_count {
124             # return how many entries this silo has
125 18464     18464 1 29183 my $self = shift;
126 18464         28495 my @files = $self->subsilos;
127 18464         22838 my $filesize;
128 18464         28768 for my $file (@files) {
129 18844         201746 $filesize += -s "$self->[DIRECTORY]/$file";
130             }
131 18464         80177 return int( $filesize / $self->[RECORD_SIZE] );
132             } #entry_count
133              
134             sub get_record {
135 2943     2943   42845 my( $self, $id, $template, $offset ) = @_;
136 2943         3589 my $rec_size;
137              
138 2943 100       6194 if( $template > 0 ) {
    100          
139 4         8 $rec_size = $template;
140 4         8 $template = $self->[TEMPLATE];
141             } elsif( $template ) {
142 13 100   13   6016 my $template_size = $template =~ /\*/ ? 0 : do { use bytes; length( pack( $template ) ) };
  13         30  
  13         39  
  87         273  
  83         246  
143 87         141 $rec_size = $template_size;
144             }
145             else {
146 2852         4081 $rec_size = $self->[RECORD_SIZE];
147 2852         3539 $template = $self->[TEMPLATE];
148             }
149 2943 100 100     4800 if( $id > $self->entry_count || $id < 1 ) {
150 17         99 die "Data::RecordStore::Silo->get_record : ($$) index $id out of bounds for silo $self->[DIRECTORY]. Silo has entry count of ".$self->entry_count;
151             }
152 2926         7299 my( $idx_in_f, $fh, $subsilo_idx ) = $self->_fh( $id );
153              
154 2926   100     11528 $offset //= 0;
155 2926         4730 my $seek_pos = ( $self->[RECORD_SIZE] * $idx_in_f ) + $offset;
156              
157 2926         12367 sysseek( $fh, $seek_pos, SEEK_SET );
158 2926         56253 my $srv = sysread $fh, (my $data), $rec_size;
159              
160 2926         80169 return [unpack( $template, $data )];
161             } #get_record
162              
163             sub put_record {
164 2540     2540   11094 my( $self, $id, $data, $template, $offset ) = @_;
165              
166 2540 100 100     4445 if( $id > $self->entry_count || $id < 1 ) {
167 20         88 die "Data::RecordStore::Silo->put_record : index $id out of bounds for silo $self->[DIRECTORY]. Store has entry count of ".$self->entry_count;
168             }
169 2520 100       4555 if( ! $template ) {
170 2135         3078 $template = $self->[TEMPLATE];
171             }
172              
173 2520         3520 my $rec_size = $self->[RECORD_SIZE];
174 2520 100       38111 my $to_write = pack( $template, ref $data ? @$data : ($data) );
175              
176             # allows the put_record to grow the data store by no more than one entry
177 13     13   3165 my $write_size = do { use bytes; length( $to_write ) };
  13         79  
  13         65  
  2520         3192  
  2520         3784  
178              
179 2520 100       4031 if( $write_size > $rec_size) {
180 4         32 die "Data::RecordStore::Silo->put_record : record size $write_size too large. Max is $rec_size";
181             }
182              
183 2516         5500 my( $idx_in_f, $fh, $subsilo_idx ) = $self->_fh( $id );
184              
185 2516   100     9430 $offset //= 0;
186 2516         4485 my $seek_pos = $rec_size * $idx_in_f + $offset;
187 2516         11930 sysseek( $fh, $seek_pos, SEEK_SET );
188              
189 2516         44189 syswrite( $fh, $to_write );
190              
191 2516         23927 return 1;
192             } #put_record
193              
194             sub pop {
195 252     252 1 429 my( $self ) = @_;
196 252         450 my $entries = $self->entry_count;
197 252 100       518 unless( $entries ) {
198 4         24 return undef;
199             }
200 248         598 my $ret = $self->get_record( $entries );
201 248         17484 my( $idx_in_f, $fh, $subsilo_idx ) = $self->_fh( $entries );
202              
203 248         675 my $new_subsilo_size = (($entries-1) - ($subsilo_idx * $self->[RECORDS_PER_SUBSILO] ))*$self->[RECORD_SIZE];
204              
205 248 100 100     1074 if( $new_subsilo_size || $subsilo_idx == 0 ) {
206 240         16592 truncate $fh, $new_subsilo_size;
207             } else {
208 8         204 unlink "$self->[DIRECTORY]/$subsilo_idx";
209             # FileCache::cacheout_close $fh;
210             }
211              
212 248         3539 return $ret;
213             } #pop
214              
215             sub peek {
216 12     12 1 32 my( $self ) = @_;
217 12         68 my $entries = $self->entry_count;
218 12 100       36 unless( $entries ) {
219 4         20 return undef;
220             }
221 8         20 my $r = $self->get_record( $entries );
222 8         48 return $r;
223             } #peek
224              
225             sub push {
226 1247     1247 1 19135 my( $self, $data ) = @_;
227 1247         2272 my $next_id = $self->next_id;
228              
229 1247         3571 $self->put_record( $next_id, $data );
230              
231 1247         3748 return $next_id;
232             } #push
233              
234              
235              
236 4     4 1 20 sub record_size { return shift->[RECORD_SIZE] }
237 8     8 1 108 sub template { return shift->[TEMPLATE] }
238              
239 4     4 1 20 sub max_file_size { return shift->[MAX_FILE_SIZE] }
240 4     4 1 20 sub records_per_subsilo { return shift->[RECORDS_PER_SUBSILO] }
241              
242             sub size {
243             # return how many bytes of data this silo has
244 52     52 1 100 my $self = shift;
245 52         132 my @files = $self->subsilos;
246 52         88 my $filesize = 0;
247 52         104 for my $file (@files) {
248 124         1072 $filesize += -s "$self->[DIRECTORY]/$file";
249             }
250 52         324 return $filesize;
251             }
252              
253              
254             sub copy_record {
255 71     71 1 6522 my( $self, $from_id, $to_id ) = @_;
256 71         154 my $rec = $self->get_record($from_id);
257 63         225 $self->put_record( $to_id, $rec );
258 55         185 return $rec;
259             } #copy_record
260              
261              
262             #
263             # Destroys all the data in the silo
264             #
265             sub empty_silo {
266 0     0 1 0 my $self = shift;
267 0         0 my $dir = $self->[DIRECTORY];
268 0         0 for my $file ($self->subsilos) {
269 0 0       0 if( $file eq '0' ) {
270 0         0 open my $fh, '+<', "$dir/0";
271 0         0 truncate $fh, 0;
272             } else {
273 0         0 unlink "$dir/$file";
274             }
275             }
276             } #empty_silo
277              
278             # destroys the silo. The silo will not be
279             # functional after this call.
280             sub unlink_silo {
281 4     4 1 1804 my $self = shift;
282 4         8 my $dir = $self->[DIRECTORY];
283 4         16 for my $file ($self->subsilos) {
284 16         544 unlink "$dir/$file";
285             }
286 4         68 unlink "$dir/SINFO";
287 4         100 @$self = ();
288             } #unlink_silo
289              
290              
291             #Makes sure this silo has at least as many entries
292             #as the count given. This creates empty records if needed
293             #to rearch the target record count.
294             sub ensure_entry_count {
295 2550     2550 1 4659 my( $self, $count ) = @_;
296              
297 2550         4249 my $ec = $self->entry_count;
298 2550         4541 my $needed = $count - $ec;
299 2550         3805 my $dir = $self->[DIRECTORY];
300 2550         3450 my $rec_size = $self->[RECORD_SIZE];
301 2550         2944 my $rec_per_subsilo = $self->[RECORDS_PER_SUBSILO];
302              
303 2550 100       5000 if( $needed > 0 ) {
304 1992         3961 my( @files ) = $self->subsilos;
305 1992         3323 my $write_file = $files[$#files];
306              
307 1992         20630 my $existing_file_records = int( (-s "$dir/$write_file" ) / $rec_size );
308 1992         3949 my $records_needed_to_fill = $rec_per_subsilo - $existing_file_records;
309 1992 100       3858 $records_needed_to_fill = $needed if $records_needed_to_fill > $needed;
310 1992         2323 my $nulls;
311 1992 100       3186 if( $records_needed_to_fill > 0 ) {
312             # fill the last file up with \0
313             # my $fh = cacheout "+<",
314 1984 50       49598 open my $fh, '+<', "$dir/$write_file" or die "$dir/$write_file : $!";
315             # $fh->autoflush(1);
316 1984         52127 $nulls = "\0" x ( $records_needed_to_fill * $rec_size );
317 1984         2930 my $seek_pos = $rec_size * $existing_file_records;
318 1984         8394 sysseek( $fh, $seek_pos, SEEK_SET );
319 1984         97807 syswrite( $fh, $nulls );
320 1984         14545 close $fh;
321 1984         6477 undef $nulls;
322 1984         6309 $needed -= $records_needed_to_fill;
323             }
324 1992         4559 while( $needed > $rec_per_subsilo ) {
325             # still needed, so create a new file
326 8         20 $write_file++;
327              
328 8 50       104 if( -e "$dir/$write_file" ) {
329 0         0 die "Data::RecordStore::Silo->ensure_entry_count : file $dir/$write_file already exists";
330             }
331 8         324 open( my $fh, ">", "$dir/$write_file" );
332             # $fh->autoflush(1);
333 8         28 print $fh '';
334 8 100       24 unless( $nulls ) {
335 4         44 $nulls = "\0" x ( $rec_per_subsilo * $rec_size );
336             }
337 8         40 sysseek( $fh, 0, SEEK_SET );
338 8         200 syswrite( $fh, $nulls );
339 8         20 $needed -= $rec_per_subsilo;
340 8         84 close $fh;
341             }
342 1992 100       4258 if( $needed > 0 ) {
343             # still needed, so create a new file
344 12         24 $write_file++;
345              
346 12 50       172 if( -e "$dir/$write_file" ) {
347 0         0 die "Data::RecordStore::Silo->ensure_entry_count : file $dir/$write_file already exists";
348             }
349 12         476 open( my $fh, ">", "$dir/$write_file" );
350             # $fh->autoflush(1);
351 12         44 print $fh '';
352 12         116 my $nulls = "\0" x ( $needed * $rec_size );
353 12         60 sysseek( $fh, 0, SEEK_SET );
354 12         240 syswrite( $fh, $nulls );
355 12         128 close $fh;
356             }
357             }
358 2550         5917 $ec = $self->entry_count;
359 2550         4785 return;
360             } #ensure_entry_count
361              
362             #
363             # Returns the list of filenames of the 'silos' of this store. They are numbers starting with 0
364             #
365             sub subsilos {
366 20560     20560 1 24870 my $self = shift;
367 20560         27878 my $dir = $self->[DIRECTORY];
368 20560         24010 my $dh = $self->[DIR_HANDLE];
369 20560 100       32743 if( $dh ) {
370 19010         89025 rewinddir $dh;
371             } else {
372 1550 100       33377 opendir( $dh, $self->[DIRECTORY] ) or die "Data::RecordStore::Silo->subsilos : can't open $dir\n";
373 1546         4936 $self->[DIR_HANDLE] = $dh;
374             }
375 20556 100       247188 my( @files ) = (sort { $a <=> $b } grep { $_ eq '0' || (-s "$dir/$_") > 0 } grep { $_ > 0 || $_ eq '0' } readdir( $dh ) );
  776 100       1300  
  21104         69309  
  82782         298465  
376 20556         45386 return @files;
377             } #subsilos
378              
379              
380             #
381             # Takes an insertion id and returns
382             # an insertion index for in the file
383             # filehandle.
384             # filepath/filename
385             # which number file this is (0 is the first)
386             #
387             sub _fh {
388 6082     6082   11682 my( $self, $id ) = @_;
389              
390 6082         8507 my $dir = $self->[DIRECTORY];
391 6082         7453 my $rec_per_subsilo = $self->[RECORDS_PER_SUBSILO];
392              
393 6082         10807 my $subsilo_idx = int( ($id-1) / $rec_per_subsilo );
394 6082         8117 my $idx_in_f = ($id - ($subsilo_idx*$rec_per_subsilo)) - 1;
395              
396 6082 50       165139 open my $fh, "+<", "$dir/$subsilo_idx" or die "$dir/$subsilo_idx : $!";
397 6082         27137 return $idx_in_f, $fh, $subsilo_idx;
398              
399             } #_fh
400              
401              
402              
403             "Silos are the great hidden constant of the industrialised world.
404             - John Darnielle, Universal Harvester";
405              
406             __END__