File Coverage

blib/lib/Data/RecordStore/Silo.pm
Criterion Covered Total %
statement 219 231 94.8
branch 60 68 88.2
condition 16 16 100.0
subroutine 33 34 97.0
pod 17 17 100.0
total 345 366 94.2


line stmt bran cond sub pod time code
1             package Data::RecordStore::Silo;
2              
3             #
4             # I am a silo. I live in a directory.
5             # I keep my data in silo files in this directory.
6             # Each silo file is allowed to be only so large,
7             # so that is why there may be more than one of them.
8             #
9             # I may be changed by async processes, so a lot
10             # of my coordination and state is on the file system.
11             #
12             # You can init me by giving me a directory,
13             # a template and an optional size and a max size.
14             # I will figure out the record size based on what you give me.
15             # I will default to 2GB for a max size if no max size is given.
16             # I will have limitless size if you give me 0 for a max size
17             # I will save my version, the size, max size and template to the directory
18             #
19             #
20             # You can open me by giving a directory, then
21             # push data to me and I return its id
22             # get data from me after giving me its id
23             # pop data from me
24             # ask how many records I have
25             #
26              
27 13     13   394155 use strict;
  13         62  
  13         428  
28 13     13   69 use warnings;
  13         26  
  13         347  
29 13     13   53 no warnings 'uninitialized';
  13         26  
  13         352  
30 13     13   61 no warnings 'numeric';
  13         19  
  13         308  
31 13     13   146 no strict 'refs';
  13         26  
  13         498  
32              
33 13     13   77 use Fcntl qw( SEEK_SET );
  13         22  
  13         621  
34 13     13   82 use File::Path qw(make_path);
  13         38  
  13         732  
35             #use FileCache;
36 13     13   588 use IO::Handle;
  13         7663  
  13         730  
37 13     13   1711 use YAML;
  13         26614  
  13         661  
38              
39 13     13   89 use vars qw($VERSION);
  13         26  
  13         1171  
40             $VERSION = '6.00';
41              
42             $Data::RecordStore::Silo::DEFAULT_MAX_FILE_SIZE = 2_000_000_000;
43             $Data::RecordStore::Silo::DEFAULT_MIN_FILE_SIZE = 4_096;
44              
45             use constant {
46 13         2761 DIRECTORY => 0,
47             VERSION => 1,
48             TEMPLATE => 2,
49             RECORD_SIZE => 3,
50             MAX_FILE_SIZE => 4,
51             RECORDS_PER_SUBSILO => 5,
52             DIR_HANDLE => 6,
53 13     13   91 };
  13         26  
54              
55              
56             sub open_silo {
57 5315     5315 1 68540 my( $class, $dir, $template, $size, $max_file_size ) = @_;
58              
59 5315 100       12342 if( ! $dir ) {
60 4         36 die "must supply directory to open silo";
61             }
62 5311 100       9695 if( ! $template ) {
63 4         36 die "must supply template to open silo";
64             }
65 13 100   13   108 my $record_size = $template =~ /\*/ ? $size : do { use bytes; length( pack( $template ) ) };
  13         26  
  13         94  
  5307         18401  
  596         2602  
66 5307 100       13235 if( $record_size < 1 ) {
67 4         44 die "no record size given to open silo";
68             }
69 5303 100 100     19947 if( $size && $size != $record_size ) {
70 4         36 die "Silo given size and template size do not match";
71             }
72 5299         449582 make_path( $dir, { error => \my $err } );
73              
74 5299 50       26534 if( @$err ) { die join( ", ", map { $_->{$dir} } @$err ) }
  0         0  
  0         0  
75              
76 5299 100       12701 if( $max_file_size < 1 ) {
77 35         84 $max_file_size = $Data::RecordStore::Silo::DEFAULT_MAX_FILE_SIZE;
78             }
79              
80 5299 100       78589 unless( -e "$dir/0" ) {
81 3308         163684 open my $out, '>', "$dir/config.yaml";
82 3308         25928 print $out <<"END";
83             VERSION: $VERSION
84             TEMPLATE: $template
85             RECORD_SIZE: $record_size
86             MAX_FILE_SIZE: $max_file_size
87             END
88 3308         79270 close $out;
89            
90             # must have at least an empty silo file
91 3308         131556 open $out, '>', "$dir/0";
92 3308         11909 print $out '';
93 3308         31832 close $out;
94             }
95              
96 5299         55256 return bless [
97             $dir,
98             $VERSION,
99             $template,
100             $record_size,
101             $max_file_size,
102             int($max_file_size / $record_size),
103             ], $class;
104             } #open_silo
105              
106             sub reopen_silo {
107 740     740 1 1786 my( $cls, $dir ) = @_;
108 740         1590 my $cfgfile = "$dir/config.yaml";
109 740 50       11271 if( -e $cfgfile ) {
110 740         2798 my $cfg = YAML::LoadFile( $cfgfile );
111 740         1909011 return $cls->open_silo( $dir, @$cfg{qw(TEMPLATE RECORD_SIZE MAX_FILE_SIZE)} );
112             }
113 0         0 die "could not find silo in $dir";
114             } #reopen_silo
115              
116             sub next_id {
117 1967     1967 1 12494 my( $self ) = @_;
118 1967         3892 my $next_id = 1 + $self->entry_count;
119 1967         6515 $self->ensure_entry_count( $next_id );
120 1967         4611 return $next_id;
121             } #next_id
122              
123             sub entry_count {
124             # return how many entries this silo has
125 18464     18464 1 34164 my $self = shift;
126 18464         34687 my @files = $self->subsilos;
127 18464         28746 my $filesize;
128 18464         33346 for my $file (@files) {
129 18844         254462 $filesize += -s "$self->[DIRECTORY]/$file";
130             }
131 18464         98256 return int( $filesize / $self->[RECORD_SIZE] );
132             } #entry_count
133              
134             sub get_record {
135 2943     2943   29320 my( $self, $id, $template, $offset ) = @_;
136 2943         4216 my $rec_size;
137              
138 2943 100       7833 if( $template > 0 ) {
    100          
139 4         12 $rec_size = $template;
140 4         8 $template = $self->[TEMPLATE];
141             } elsif( $template ) {
142 13 100   13   7311 my $template_size = $template =~ /\*/ ? 0 : do { use bytes; length( pack( $template ) ) };
  13         30  
  13         52  
  87         313  
  83         326  
143 87         167 $rec_size = $template_size;
144             }
145             else {
146 2852         4503 $rec_size = $self->[RECORD_SIZE];
147 2852         4486 $template = $self->[TEMPLATE];
148             }
149 2943 100 100     5993 if( $id > $self->entry_count || $id < 1 ) {
150 17         129 die "Data::RecordStore::Silo->get_record : ($$) index $id out of bounds for silo $self->[DIRECTORY]. Silo has entry count of ".$self->entry_count;
151             }
152 2926         8815 my( $idx_in_f, $fh, $subsilo_idx ) = $self->_fh( $id );
153              
154 2926   100     14239 $offset //= 0;
155 2926         5776 my $seek_pos = ( $self->[RECORD_SIZE] * $idx_in_f ) + $offset;
156              
157 2926         14752 sysseek( $fh, $seek_pos, SEEK_SET );
158 2926         63758 my $srv = sysread $fh, (my $data), $rec_size;
159              
160 2926         96832 return [unpack( $template, $data )];
161             } #get_record
162              
163             sub put_record {
164 2540     2540   13682 my( $self, $id, $data, $template, $offset ) = @_;
165              
166 2540 100 100     5834 if( $id > $self->entry_count || $id < 1 ) {
167 20         100 die "Data::RecordStore::Silo->put_record : index $id out of bounds for silo $self->[DIRECTORY]. Store has entry count of ".$self->entry_count;
168             }
169 2520 100       5732 if( ! $template ) {
170 2135         3687 $template = $self->[TEMPLATE];
171             }
172              
173 2520         4253 my $rec_size = $self->[RECORD_SIZE];
174 2520 100       41893 my $to_write = pack( $template, ref $data ? @$data : ($data) );
175              
176             # allows the put_record to grow the data store by no more than one entry
177 13     13   3922 my $write_size = do { use bytes; length( $to_write ) };
  13         48  
  13         125  
  2520         3992  
  2520         4655  
178              
179 2520 100       5180 if( $write_size > $rec_size) {
180 4         36 die "Data::RecordStore::Silo->put_record : record size $write_size too large. Max is $rec_size";
181             }
182              
183 2516         7049 my( $idx_in_f, $fh, $subsilo_idx ) = $self->_fh( $id );
184              
185 2516   100     11802 $offset //= 0;
186 2516         3959 my $seek_pos = $rec_size * $idx_in_f + $offset;
187 2516         12372 sysseek( $fh, $seek_pos, SEEK_SET );
188              
189 2516         50788 syswrite( $fh, $to_write );
190              
191 2516         29236 return 1;
192             } #put_record
193              
194             sub pop {
195 252     252 1 572 my( $self ) = @_;
196 252         587 my $entries = $self->entry_count;
197 252 100       679 unless( $entries ) {
198 4         20 return undef;
199             }
200 248         673 my $ret = $self->get_record( $entries );
201 248         18838 my( $idx_in_f, $fh, $subsilo_idx ) = $self->_fh( $entries );
202              
203 248         844 my $new_subsilo_size = (($entries-1) - ($subsilo_idx * $self->[RECORDS_PER_SUBSILO] ))*$self->[RECORD_SIZE];
204              
205 248 100 100     987 if( $new_subsilo_size || $subsilo_idx == 0 ) {
206 240         20595 truncate $fh, $new_subsilo_size;
207             } else {
208 8         264 unlink "$self->[DIRECTORY]/$subsilo_idx";
209             # FileCache::cacheout_close $fh;
210             }
211              
212 248         4410 return $ret;
213             } #pop
214              
215             sub peek {
216 12     12 1 40 my( $self ) = @_;
217 12         48 my $entries = $self->entry_count;
218 12 100       48 unless( $entries ) {
219 4         24 return undef;
220             }
221 8         28 my $r = $self->get_record( $entries );
222 8         52 return $r;
223             } #peek
224              
225             sub push {
226 1247     1247 1 23099 my( $self, $data ) = @_;
227 1247         2478 my $next_id = $self->next_id;
228              
229 1247         4045 $self->put_record( $next_id, $data );
230              
231 1247         4384 return $next_id;
232             } #push
233              
234              
235              
236 4     4 1 24 sub record_size { return shift->[RECORD_SIZE] }
237 8     8 1 124 sub template { return shift->[TEMPLATE] }
238              
239 4     4 1 28 sub max_file_size { return shift->[MAX_FILE_SIZE] }
240 4     4 1 20 sub records_per_subsilo { return shift->[RECORDS_PER_SUBSILO] }
241              
242             sub size {
243             # return how many bytes of data this silo has
244 52     52 1 132 my $self = shift;
245 52         160 my @files = $self->subsilos;
246 52         108 my $filesize = 0;
247 52         132 for my $file (@files) {
248 124         1424 $filesize += -s "$self->[DIRECTORY]/$file";
249             }
250 52         384 return $filesize;
251             }
252              
253              
254             sub copy_record {
255 71     71 1 7558 my( $self, $from_id, $to_id ) = @_;
256 71         168 my $rec = $self->get_record($from_id);
257 63         294 $self->put_record( $to_id, $rec );
258 55         235 return $rec;
259             } #copy_record
260              
261              
262             #
263             # Destroys all the data in the silo
264             #
265             sub empty_silo {
266 0     0 1 0 my $self = shift;
267 0         0 my $dir = $self->[DIRECTORY];
268 0         0 for my $file ($self->subsilos) {
269 0 0       0 if( $file eq '0' ) {
270 0         0 open my $fh, '+<', "$dir/0";
271 0         0 truncate $fh, 0;
272             } else {
273 0         0 unlink "$dir/$file";
274             }
275             }
276             } #empty_silo
277              
278             # destroys the silo. The silo will not be
279             # functional after this call.
280             sub unlink_silo {
281 4     4 1 2316 my $self = shift;
282 4         12 my $dir = $self->[DIRECTORY];
283 4         12 for my $file ($self->subsilos) {
284 16         612 unlink "$dir/$file";
285             }
286 4         80 unlink "$dir/SINFO";
287 4         112 @$self = ();
288             } #unlink_silo
289              
290              
291             #Makes sure this silo has at least as many entries
292             #as the count given. This creates empty records if needed
293             #to rearch the target record count.
294             sub ensure_entry_count {
295 2550     2550 1 5017 my( $self, $count ) = @_;
296              
297 2550         5278 my $ec = $self->entry_count;
298 2550         5494 my $needed = $count - $ec;
299 2550         4412 my $dir = $self->[DIRECTORY];
300 2550         4272 my $rec_size = $self->[RECORD_SIZE];
301 2550         3683 my $rec_per_subsilo = $self->[RECORDS_PER_SUBSILO];
302              
303 2550 100       5866 if( $needed > 0 ) {
304 1992         4684 my( @files ) = $self->subsilos;
305 1992         4132 my $write_file = $files[$#files];
306              
307 1992         24614 my $existing_file_records = int( (-s "$dir/$write_file" ) / $rec_size );
308 1992         4998 my $records_needed_to_fill = $rec_per_subsilo - $existing_file_records;
309 1992 100       4652 $records_needed_to_fill = $needed if $records_needed_to_fill > $needed;
310 1992         2902 my $nulls;
311 1992 100       3603 if( $records_needed_to_fill > 0 ) {
312             # fill the last file up with \0
313             # my $fh = cacheout "+<",
314 1984 50       58316 open my $fh, '+<', "$dir/$write_file" or die "$dir/$write_file : $!";
315             # $fh->autoflush(1);
316 1984         63678 $nulls = "\0" x ( $records_needed_to_fill * $rec_size );
317 1984         3395 my $seek_pos = $rec_size * $existing_file_records;
318 1984         10637 sysseek( $fh, $seek_pos, SEEK_SET );
319 1984         115615 syswrite( $fh, $nulls );
320 1984         18871 close $fh;
321 1984         7519 undef $nulls;
322 1984         8194 $needed -= $records_needed_to_fill;
323             }
324 1992         6006 while( $needed > $rec_per_subsilo ) {
325             # still needed, so create a new file
326 8         28 $write_file++;
327              
328 8 50       140 if( -e "$dir/$write_file" ) {
329 0         0 die "Data::RecordStore::Silo->ensure_entry_count : file $dir/$write_file already exists";
330             }
331 8         392 open( my $fh, ">", "$dir/$write_file" );
332             # $fh->autoflush(1);
333 8         32 print $fh '';
334 8 100       32 unless( $nulls ) {
335 4         56 $nulls = "\0" x ( $rec_per_subsilo * $rec_size );
336             }
337 8         52 sysseek( $fh, 0, SEEK_SET );
338 8         248 syswrite( $fh, $nulls );
339 8         28 $needed -= $rec_per_subsilo;
340 8         100 close $fh;
341             }
342 1992 100       5245 if( $needed > 0 ) {
343             # still needed, so create a new file
344 12         24 $write_file++;
345              
346 12 50       220 if( -e "$dir/$write_file" ) {
347 0         0 die "Data::RecordStore::Silo->ensure_entry_count : file $dir/$write_file already exists";
348             }
349 12         720 open( my $fh, ">", "$dir/$write_file" );
350             # $fh->autoflush(1);
351 12         60 print $fh '';
352 12         144 my $nulls = "\0" x ( $needed * $rec_size );
353 12         84 sysseek( $fh, 0, SEEK_SET );
354 12         292 syswrite( $fh, $nulls );
355 12         156 close $fh;
356             }
357             }
358 2550         7657 $ec = $self->entry_count;
359 2550         5974 return;
360             } #ensure_entry_count
361              
362             #
363             # Returns the list of filenames of the 'silos' of this store. They are numbers starting with 0
364             #
365             sub subsilos {
366 20560     20560 1 29028 my $self = shift;
367 20560         33325 my $dir = $self->[DIRECTORY];
368 20560         28830 my $dh = $self->[DIR_HANDLE];
369 20560 100       39347 if( $dh ) {
370 19010         106601 rewinddir $dh;
371             } else {
372 1550 100       39315 opendir( $dh, $self->[DIRECTORY] ) or die "Data::RecordStore::Silo->subsilos : can't open $dir\n";
373 1546         5893 $self->[DIR_HANDLE] = $dh;
374             }
375 20556 100       306924 my( @files ) = (sort { $a <=> $b } grep { $_ eq '0' || (-s "$dir/$_") > 0 } grep { $_ > 0 || $_ eq '0' } readdir( $dh ) );
  776 100       1608  
  21104         87761  
  82782         373425  
376 20556         57752 return @files;
377             } #subsilos
378              
379              
380             #
381             # Takes an insertion id and returns
382             # an insertion index for in the file
383             # filehandle.
384             # filepath/filename
385             # which number file this is (0 is the first)
386             #
387             sub _fh {
388 6082     6082   15097 my( $self, $id ) = @_;
389              
390 6082         9423 my $dir = $self->[DIRECTORY];
391 6082         8994 my $rec_per_subsilo = $self->[RECORDS_PER_SUBSILO];
392              
393 6082         11745 my $subsilo_idx = int( ($id-1) / $rec_per_subsilo );
394 6082         9871 my $idx_in_f = ($id - ($subsilo_idx*$rec_per_subsilo)) - 1;
395              
396 6082 50       184249 open my $fh, "+<", "$dir/$subsilo_idx" or die "$dir/$subsilo_idx : $!";
397 6082         33076 return $idx_in_f, $fh, $subsilo_idx;
398              
399             } #_fh
400              
401              
402              
403             "Silos are the great hidden constant of the industrialised world.
404             - John Darnielle, Universal Harvester";
405              
406             __END__