File Coverage

blib/lib/ODS/Storage/Directory.pm
Criterion Covered Total %
statement 208 243 85.6
branch 46 68 67.6
condition 14 21 66.6
subroutine 33 39 84.6
pod 0 27 0.0
total 301 398 75.6


line stmt bran cond sub pod time code
1             package ODS::Storage::Directory;
2              
3 70     70   488 use YAOO;
  70         136  
  70         301  
4 70     70   21213 use Cwd qw/getcwd/;
  70         140  
  70         3684  
5 70     70   34484 use Parallel::ForkManager;
  70         3550017  
  70         3724  
6              
7             extends 'ODS::Storage::Base';
8              
9 70     70   570 use ODS::Utils qw/load move unique_class_name error write_directory/;
  70         204  
  70         1113  
10              
11             auto_build;
12              
13             has file_handle => isa(fh);
14              
15             has directory => isa(string);
16              
17             has cache_directory => isa(string);
18              
19             has remove_regex => isa(string("\'\"\."));
20              
21             sub all {
22 106     106 0 1832 my ($self, %params) = @_;
23              
24 106   50     2313 $params{type} ||= 'all';
25 106   66     994 $params{sort} ||= $self->table->keyfield;
26 106   100     2129 $params{sort_direction} ||= 'asc';
27              
28 106         833 my ($data, $from_cache) = $self->into_rows($self->cache_or_all(%params));
29              
30 41         11019 return $data;
31             }
32              
33             sub create {
34 1717 50   1717 0 33773 my ($self, %params) = (shift, @_ > 1 ? @_ : %{ $_[0] });
  1717         15848  
35              
36 1717   66     28486 my $file = $params{__custom_file_name} || sprintf '%s_%s.%s', time, unique_class_name, $self->serialize_class->file_suffix;
37              
38 1717         168439 $params{__file} = $file;
39              
40 1717         6090 $file .= '.tmp';
41              
42 1717         16193 my $data = $self->into_rows(\%params, 1);
43              
44 1717         41491 $data->validate();
45              
46 1717 100       25223 if ($self->table->rows) {
47 1715         27722 push @{ $self->table->rows }, $data;
  1715         4908  
48             } else {
49 2 50 50     28 $self->table->rows(ref($data || "") eq 'ARRAY' ? $data : [$data]);
50             }
51              
52 1717         27991 $data = $self->into_storage($data);
53              
54 1717         11648 $self->write_file(sprintf("%s/%s", $self->directory, $file), $data);
55              
56 1717         219175 $self->cache_clear();
57              
58 1717         11176 $self->table;
59             }
60              
61             sub search {
62 2 50   2 0 62 my ($self, %params) = (shift, @_ > 1 ? @_ : %{ $_[0] });
  0         0  
63              
64 2         12 my $cache_prefix = $self->cache_prefix('search', %params);
65              
66 2         25 my ($data, $from_cache) = $self->cache_or_filter($cache_prefix, %params);
67              
68 2 100 66     33 if (ref $data eq 'ARRAY' && ref $data->[0] eq 'HASH') {
69 1         3 $data = [ map { $self->into_rows($_) } @{ $self->into_rows($data) } ];
  1         11  
  1         11  
70             }
71              
72 2         27 my $table = $self->table->clone();
73 2         9 $table->rows($data);
74 2         73 return ODS::Iterator->new(table => $table);
75             }
76              
77             sub find {
78 2 50   2 0 44 my ($self, %params) = (shift, @_ > 1 ? @_ : %{ $_[0] });
  0         0  
79              
80 2         10 my $cache_prefix = $self->cache_prefix('find', %params);
81              
82 2         9 my ($data, $from_cache) = $self->cache_or_find($cache_prefix, %params);
83              
84 2 50       11 if (ref $data eq 'HASH') {
85 0         0 $data = $self->into_rows($data);
86             }
87              
88 2         9 return $data;
89             }
90              
91             sub update {
92 0     0 0 0 my ($self, $update, %params) = (shift, pop, @_);
93              
94 0         0 my $find = $self->find(%params);
95              
96 0 0       0 croak sprintf "No row found for search params %s", Dumper \%params
97             unless $find;
98              
99 0         0 $find->validate($update);
100              
101 0         0 $self->update_row($find);
102             }
103              
104             sub update_row {
105 3     3 0 24 my ($self, $row) = @_;
106              
107 3         18 my $data = $self->into_storage($row);
108              
109 3   33     12 my $file = $row->__custom_file_name || $row->__file;
110              
111 3         12 $self->write_file(sprintf("%s/%s", $self->directory, $file), $data);
112              
113 3         222 $self->table;
114             }
115              
116             sub delete {
117 1 50   1 0 18 my ($self, %params) = (shift, @_ > 1 ? @_ : %{ $_[0] });
  1         4  
118              
119 1 50       8 my $data = $self->table->rows ? ODS::Iterator->new(table => $self->table) : $self->all;
120              
121             my $index = $data->find_index(sub {
122 2     2   3 my $row = shift;
123 2         3 my $select = 1;
124 2         5 for my $key ( keys %params ) {
125 2 100       6 if ( $params{$key} ne $row->{$key} ) {
126 1         2 $select = undef;
127 1         2 last;
128             }
129             }
130 2         4 $select;
131 1         160 });
132              
133 1         7 my $delete = $data->splice($index, 1);
134              
135 1         13 my $file = $delete->__file;
136              
137 1         4 $self->unlink_file(sprintf("%s/%s", $self->directory, $file));
138              
139 1         8 $self->cache_clear($delete);
140              
141 1         7 $self->table;
142             }
143              
144             sub delete_row {
145 0     0 0 0 my ($self, $r) = @_;
146              
147 0         0 my $data = ODS::Iterator->new(table => $self->table);
148              
149 0         0 my $keyfield = $data->table->keyfield;
150              
151 0         0 my $index;
152 0 0       0 if ($keyfield) {
153             $index = $data->find_index(sub {
154 0     0   0 $_[0]->{$keyfield} eq $r->$keyfield;
155 0         0 });
156             } else {
157             $index = $data->find_index(sub {
158 0     0   0 my $row = shift;
159 0         0 my $select = 1;
160 0         0 for my $key ( keys %{ $row->columns } ) {
  0         0  
161 0 0       0 if ( $r->$key ne $row->{$key} ) {
162 0         0 $select = undef;
163 0         0 last;
164             }
165             }
166 0         0 $select;
167 0         0 });
168             }
169              
170 0         0 my $delete = $data->splice($index, 1);
171              
172 0         0 my $file = $delete->__file;
173              
174 0         0 $self->unlink_file(sprintf("%s/%s", $self->directory, $file));
175              
176 0         0 $self->cache_clear($delete);
177              
178 0         0 $self->table;
179             }
180              
181             sub parse_data_format {
182 0     0 0 0 my ($self, $data) = @_;
183 0         0 return $self->serialize_class->parse($data);
184             }
185              
186             sub stringify_data_format {
187 1720     1720 0 5744 my ($self, $data) = @_;
188 1720         11418 return $self->serialize_class->stringify($data);
189             }
190              
191             # methods very much specific to files
192              
193             sub directory_files_last_updated {
194 110     110 0 343 my ($self) = @_;
195 110         507 my $files = $self->read_directory($self->directory);
196 110         1791 (my $last_update = $files->[-1]) =~ s/(\d+).*/$1/;
197 110         552 return ($files, $last_update);
198             }
199              
200             sub cache_write {
201 40     40 0 482 my ($self, $type, $data) = @_;
202 40         469 my $file = sprintf "%s/%s__%s.%s.tmp", $self->cache_directory, $type, time, $self->serialize_class->file_suffix;
203 40         2012 $self->write_file($file, $data);
204             }
205              
206             sub cache_clear {
207 1718     1718 0 6511 my ($self, $row) = @_;
208 1718         10065 my $files = $self->read_directory($self->cache_directory);
209 1718         5029 for (@{$files}) {
  1718         9817  
210 6         18 my %file_params = $self->cache_parse_filename($_);
211 6         9 my $clear = 1;
212 6 100       13 if (scalar keys %file_params) {
213             PARAM:
214 3         10 for my $key ( keys %file_params ) {
215 5 100       30 next PARAM if $key =~ m/^__/;
216 3 100 100     20 if (!$row || $row->$key ne $file_params{$key}) {
217 2         20 $clear = 0;
218 2         5 last PARAM;
219             }
220             }
221             }
222 6 100       27 $self->unlink_file(sprintf("%s/%s", $self->cache_directory, $_))
223             if $clear;
224             }
225             }
226              
227             sub cache_parse_filename {
228 6     6 0 12 my ($self, $name) = @_;
229 6         7 my %file;
230 6 100       35 return %file unless $name =~ s/^find__//;
231 3         10 my @parts = split "__", $name;
232 3         16 ($file{__create_time} = pop @parts) =~ s/\.\w+$//;
233 3         10 for (@parts) {
234 3         13 my ($key, $value) = split "_", $_;
235 3         12 $file{$key} = $value;
236             }
237 3         14 return %file;
238             }
239              
240              
241             sub cache_prefix {
242 110     110 0 424 my ($self, $type, %args) = @_;
243              
244 110         323 my $regex = $self->remove_regex;
245 110         1085 for my $key ( keys %args ) {
246 216         3376 (my $value = $args{$key}) =~ s/$regex//g;
247 216         1256 $type .= sprintf('__%s_%s', $key, $value);
248             }
249              
250 110         384 return $type;
251             }
252              
253             sub cache_file {
254 110     110 0 346 my ($self, $type) = @_;
255              
256             my @cache_file = grep {
257 50         1234 $_ =~ m/^$type/;
258 110         270 } @{ $self->read_directory($self->cache_directory) };
  110         461  
259              
260 110 100       619 return scalar @cache_file ? sprintf( "%s/%s", $self->cache_directory, $cache_file[0]) : undef;
261             }
262              
263             sub cache_or_all {
264 106     106 0 496 my ($self, %args) = @_;
265              
266 106         376 my $type = delete $args{type};
267              
268 106         696 my $file_prefix = $self->cache_prefix($type, %args);
269              
270 106         950 my ($files, $last_update) = $self->directory_files_last_updated();
271              
272 106         428 my $cache_file = $self->cache_file($file_prefix);
273              
274 106 100       446 if ($cache_file) {
275 4         12 return ($self->serialize_class->parse(
276             $self->read_file($cache_file)
277             ), 1);
278             }
279              
280 102         2180 my $fm = Parallel::ForkManager->new(5000);
281              
282 102         404480 my @data;
283             $fm->run_on_finish(sub {
284 909     909   68806659 my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_structure_reference) = @_;
285              
286 909         2770 push @data, $data_structure_reference;
287              
288 102         2095 });
289              
290             READ_FILE:
291 102         1217 for my $file (@{$files}) {
  102         597  
292 1792 100       4083976 my $pid = $fm->start and next READ_FILE;
293              
294 65         637237 my $d = $self->serialize_class->parse(
295             $self->read_file(sprintf("%s/%s", $self->directory, $file))
296             );
297              
298 65         1594 $d->{__file} = $file;
299 65 100       1881 if ($file !~ m/^\d{9}\d+/) {
300 1         20 $d->{__custom_file_name} = $file;
301             }
302              
303              
304 65         2555 $fm->finish(0, $d);
305             }
306              
307 37         97668 $fm->wait_all_children;
308              
309 37 100       828 if ($args{sort_direction} eq 'asc') {
310 36         763 @data = sort { $a->{$args{sort}} cmp $b->{$args{sort}} } @data;
  3126         3778  
311             } else {
312 1         29 @data = sort { $b->{$args{sort}} <=> $a->{$args{sort}} } @data;
  105         129  
313             }
314 37         772 $self->cache_write($file_prefix, $self->serialize_class->stringify(\@data));
315              
316 37         5553 return \@data;
317             }
318              
319             sub cache_or_filter {
320 2     2 0 8 my ($self, $type, %params) = @_;
321              
322 2         9 my ($files, $last_update) = $self->directory_files_last_updated();
323              
324 2         18 my $cache_file = $self->cache_file($type);
325              
326 2 100       21 if ($cache_file) {
327 1         5 return $self->serialize_class->parse(
328             $self->read_file($cache_file)
329             );
330             }
331              
332 1 50       5 my $data = $self->table->rows ? ODS::Iterator->new(table => $self->table) : $self->all();
333              
334             my $select = $data->filter(sub {
335 3     3   10 my $row = shift;
336 3         6 my $select = 1;
337 3         8 for my $key ( keys %params ) {
338 3 100       7 if ( $params{$key} ne $row->{$key} ) {
339 2         3 $select = undef;
340 2         5 last;
341             }
342             }
343 3         15 $select;
344 1         174 });
345              
346 1         10 $self->cache_write($type, $self->serialize_class->stringify([ map { $_->as_hash } @{$select}]));
  1         3  
  1         8  
347              
348 1         116 return $select;
349             }
350              
351             sub cache_or_find {
352 2     2 0 7 my ($self, $type, %params) = @_;
353              
354 2         6 my ($files, $last_update) = $self->directory_files_last_updated();
355              
356 2         13 my $cache_file = $self->cache_file($type);
357              
358 2 50       14 if ($cache_file) {
359 0         0 return $self->serialize_class->parse(
360             $self->read_file($cache_file)
361             );
362             }
363              
364 2 50       12 my $data = $self->table->rows ? ODS::Iterator->new(table => $self->table) : $self->all;
365              
366             # this only works for JSON and YAML, CSS and JSONL we can stream/read rows/lines instead of reading/loading
367             # all into memory.
368             my $select = $data->find(sub {
369 3     3   7 my $row = shift;
370 3         5 my $select = 1;
371 3         7 for my $key ( keys %params ) {
372 3 100       10 if ( $params{$key} ne $row->{$key} ) {
373 1         3 $select = undef;
374 1         2 last;
375             }
376             }
377 3         6 $select;
378 2         349 });
379              
380 2 50       16 $self->cache_write($type, $self->serialize_class->stringify($select->as_hash))
381             if ($select);
382              
383 2         226 return $select;
384             }
385              
386             sub open_file {
387 70     70 0 484 my ($self, $file) = @_;
388 5 50   5   88 open my $fh, '<:encoding(UTF-8)', $file or die "Cannot open file $file for reading: $!";
  5         42  
  5         121  
  70         16822  
389 70         25332 return $fh;
390             }
391              
392             sub open_write_file {
393 1760     1760 0 5223 my ($self, $file) = @_;
394 1760         13657 write_directory($file, 1);
395 1760 50   64   346123 open my $fh, '>:encoding(UTF-8)', $file or die "Cannot open file $file for writing: $!";
  64         1222  
  64         190  
  64         1153  
396 1760         341951 return $fh;
397             }
398              
399             sub seek_file {
400 0     0 0 0 my ($self, @args) = @_;
401 0 0       0 @args = (0, 0) if (!scalar @args);
402 0         0 seek $self->file_handle, shift @args, shift @args;
403             }
404              
405             sub read_file {
406 70     70 0 9422 my ($self, $file) = @_;
407 70         1597 my $fh = $self->open_file($file);
408 70         1395 my $data = do { local $/; <$fh> };
  70         1575  
  70         14295  
409 70         5475 return $data;
410             }
411              
412             sub read_directory {
413 1938     1938 0 22571 my ($self, $directory) = @_;
414 1938         8646 write_directory($directory);
415 1938 50       80803 opendir(my $dh, $directory) || die "Can't opendir $directory: $!";
416 1938         54159 my @files = sort { $a cmp $b } grep { $_ !~ m/^\.+$/ } readdir($dh);
  9841         9621  
  6559         38407  
417 1938         22236 closedir $dh;
418 1938         12893 return \@files;
419             }
420              
421             sub write_file {
422 1760     1760 0 37532 my ($self, $file, $data) = @_;
423 1760         10287 my $fh = $self->open_write_file($file);
424 1760         30550 print $fh $data;
425 1760         9493 $self->close_file($fh);
426 1760         24939 (my $real = $file) =~ s/\.tmp$//;
427 1760         16570 move($file, $real);
428             }
429              
430             sub unlink_file {
431 5     5 0 47 my ($self, $file) = @_;
432 5         275 unlink $file;
433             }
434              
435             sub close_file {
436 1760     1760 0 426227 close $_[1];
437             }
438              
439             1;