File Coverage

blib/lib/ODS/Storage/Directory.pm
Criterion Covered Total %
statement 215 251 85.6
branch 51 74 68.9
condition 14 21 66.6
subroutine 34 40 85.0
pod 0 28 0.0
total 314 414 75.8


line stmt bran cond sub pod time code
1             package ODS::Storage::Directory;
2              
3 70     70   474 use YAOO;
  70         140  
  70         453  
4 70     70   21955 use Cwd qw/getcwd/;
  70         135  
  70         3759  
5 70     70   34651 use Parallel::ForkManager;
  70         4373442  
  70         3428  
6              
7             extends 'ODS::Storage::Base';
8              
9 70     70   993 use ODS::Utils qw/load move unique_class_name error/;
  70         199  
  70         1071  
10              
11             auto_build;
12              
13             has file_handle => isa(fh);
14              
15             has directory => isa(string);
16              
17             has cache_directory => isa(string);
18              
19             has remove_regex => isa(string("\'\"\."));
20              
21             sub all {
22 107     107 0 1979 my ($self, %params) = @_;
23              
24 107   50     1428 $params{type} ||= 'all';
25 107   66     1079 $params{sort} ||= $self->table->keyfield;
26 107   100     2331 $params{sort_direction} ||= 'asc';
27              
28 107         814 my ($data, $from_cache) = $self->into_rows($self->cache_or_all(%params));
29              
30 41         10804 return $data;
31             }
32              
33             sub create {
34 1714 50   1714 0 36325 my ($self, %params) = (shift, @_ > 1 ? @_ : %{ $_[0] });
  1714         16598  
35              
36 1714   66     23044 my $file = $params{__custom_file_name} || sprintf '%s_%s.%s', time, unique_class_name, $self->serialize_class->file_suffix;
37              
38 1714         162844 $params{__file} = $file;
39              
40 1714         6618 $file .= '.tmp';
41              
42 1714         19425 my $data = $self->into_rows(\%params, 1);
43              
44 1714         33020 $data->validate();
45              
46 1714 100       15654 if ($self->table->rows) {
47 1713         28285 push @{ $self->table->rows }, $data;
  1713         5551  
48             } else {
49 1 50 50     13 $self->table->rows(ref($data || "") eq 'ARRAY' ? $data : [$data]);
50             }
51              
52 1714         26088 $data = $self->into_storage($data);
53              
54 1714         11989 $self->write_file(sprintf("%s/%s", $self->directory, $file), $data);
55              
56 1714         203836 $self->cache_clear();
57              
58 1714         9102 $self->table;
59             }
60              
61             sub search {
62 2 50   2 0 55 my ($self, %params) = (shift, @_ > 1 ? @_ : %{ $_[0] });
  0         0  
63              
64 2         23 my $cache_prefix = $self->cache_prefix('search', %params);
65              
66 2         20 my ($data, $from_cache) = $self->cache_or_filter($cache_prefix, %params);
67              
68 2 100 66     35 if (ref $data eq 'ARRAY' && ref $data->[0] eq 'HASH') {
69 1         3 $data = [ map { $self->into_rows($_) } @{ $self->into_rows($data) } ];
  1         17  
  1         10  
70             }
71              
72 2         27 my $table = $self->table->clone();
73 2         9 $table->rows($data);
74 2         84 return ODS::Iterator->new(table => $table);
75             }
76              
77             sub find {
78 2 50   2 0 30 my ($self, %params) = (shift, @_ > 1 ? @_ : %{ $_[0] });
  0         0  
79              
80 2         7 my $cache_prefix = $self->cache_prefix('find', %params);
81              
82 2         17 my ($data, $from_cache) = $self->cache_or_find($cache_prefix, %params);
83              
84 2 50       10 if (ref $data eq 'HASH') {
85 0         0 $data = $self->into_rows($data);
86             }
87              
88 2         6 return $data;
89             }
90              
91             sub update {
92 0     0 0 0 my ($self, $update, %params) = (shift, pop, @_);
93              
94 0         0 my $find = $self->find(%params);
95              
96 0 0       0 croak sprintf "No row found for search params %s", Dumper \%params
97             unless $find;
98              
99 0         0 $find->validate($update);
100              
101 0         0 $self->update_row($find);
102             }
103              
104             sub update_row {
105 2     2 0 18 my ($self, $row) = @_;
106              
107 2         10 my $data = $self->into_storage($row);
108              
109 2   33     11 my $file = $row->__custom_file_name || $row->__file;
110              
111 2         7 $self->write_file(sprintf("%s/%s", $self->directory, $file), $data);
112              
113 2         122 $self->table;
114             }
115              
116             sub delete {
117 1 50   1 0 16 my ($self, %params) = (shift, @_ > 1 ? @_ : %{ $_[0] });
  1         5  
118              
119 1 50       3 my $data = $self->table->rows ? ODS::Iterator->new(table => $self->table) : $self->all;
120              
121             my $index = $data->find_index(sub {
122 2     2   3 my $row = shift;
123 2         6 my $select = 1;
124 2         8 for my $key ( keys %params ) {
125 2 100       30 if ( $params{$key} ne $row->{$key} ) {
126 1         3 $select = undef;
127 1         2 last;
128             }
129             }
130 2         5 $select;
131 1         144 });
132              
133 1         6 my $delete = $data->splice($index, 1);
134              
135 1         12 my $file = $delete->__file;
136              
137 1         5 $self->unlink_file(sprintf("%s/%s", $self->directory, $file));
138              
139 1         7 $self->cache_clear($delete);
140              
141 1         8 $self->table;
142             }
143              
144             sub delete_row {
145 0     0 0 0 my ($self, $r) = @_;
146              
147 0         0 my $data = ODS::Iterator->new(table => $self->table);
148              
149 0         0 my $keyfield = $data->table->keyfield;
150              
151 0         0 my $index;
152 0 0       0 if ($keyfield) {
153             $index = $data->find_index(sub {
154 0     0   0 $_[0]->{$keyfield} eq $r->$keyfield;
155 0         0 });
156             } else {
157             $index = $data->find_index(sub {
158 0     0   0 my $row = shift;
159 0         0 my $select = 1;
160 0         0 for my $key ( keys %{ $row->columns } ) {
  0         0  
161 0 0       0 if ( $r->$key ne $row->{$key} ) {
162 0         0 $select = undef;
163 0         0 last;
164             }
165             }
166 0         0 $select;
167 0         0 });
168             }
169              
170 0         0 my $delete = $data->splice($index, 1);
171              
172 0         0 my $file = $delete->__file;
173              
174 0         0 $self->unlink_file(sprintf("%s/%s", $self->directory, $file));
175              
176 0         0 $self->cache_clear($delete);
177              
178 0         0 $self->table;
179             }
180              
181             sub parse_data_format {
182 0     0 0 0 my ($self, $data) = @_;
183 0         0 return $self->serialize_class->parse($data);
184             }
185              
186             sub stringify_data_format {
187 1716     1716 0 6562 my ($self, $data) = @_;
188 1716         8586 return $self->serialize_class->stringify($data);
189             }
190              
191             # methods very much specific to files
192              
193             sub directory_files_last_updated {
194 111     111 0 316 my ($self) = @_;
195 111         552 my $files = $self->read_directory($self->directory);
196 111         2333 (my $last_update = $files->[-1]) =~ s/(\d+).*/$1/;
197 111         958 return ($files, $last_update);
198             }
199              
200             sub cache_write {
201 44     44 0 443 my ($self, $type, $data) = @_;
202 44         648 my $file = sprintf "%s/%s__%s.%s.tmp", $self->cache_directory, $type, time, $self->serialize_class->file_suffix;
203 44         2545 $self->write_file($file, $data);
204             }
205              
206             sub cache_clear {
207 1715     1715 0 5558 my ($self, $row) = @_;
208 1715         9455 my $files = $self->read_directory($self->cache_directory);
209 1715         5259 for (@{$files}) {
  1715         10170  
210 6         18 my %file_params = $self->cache_parse_filename($_);
211 6         10 my $clear = 1;
212 6 100       12 if (scalar keys %file_params) {
213             PARAM:
214 3         6 for my $key ( keys %file_params ) {
215 5 100       34 next PARAM if $key =~ m/^__/;
216 3 100 100     16 if (!$row || $row->$key ne $file_params{$key}) {
217 2         18 $clear = 0;
218 2         4 last PARAM;
219             }
220             }
221             }
222 6 100       20 $self->unlink_file(sprintf("%s/%s", $self->cache_directory, $_))
223             if $clear;
224             }
225             }
226              
227             sub cache_parse_filename {
228 6     6 0 12 my ($self, $name) = @_;
229 6         7 my %file;
230 6 100       32 return %file unless $name =~ s/^find__//;
231 3         11 my @parts = split "__", $name;
232 3         19 ($file{__create_time} = pop @parts) =~ s/\.\w+$//;
233 3         7 for (@parts) {
234 3         9 my ($key, $value) = split "_", $_;
235 3         8 $file{$key} = $value;
236             }
237 3         12 return %file;
238             }
239              
240              
241             sub cache_prefix {
242 111     111 0 459 my ($self, $type, %args) = @_;
243              
244 111         505 my $regex = $self->remove_regex;
245 111         1290 for my $key ( keys %args ) {
246 218         3643 (my $value = $args{$key}) =~ s/$regex//g;
247 218         1204 $type .= sprintf('__%s_%s', $key, $value);
248             }
249              
250 111         404 return $type;
251             }
252              
253             sub cache_file {
254 111     111 0 463 my ($self, $type) = @_;
255              
256             my @cache_file = grep {
257 51         1194 $_ =~ m/^$type/;
258 111         255 } @{ $self->read_directory($self->cache_directory) };
  111         1017  
259              
260 111 100       1768 return scalar @cache_file ? sprintf( "%s/%s", $self->cache_directory, $cache_file[0]) : undef;
261             }
262              
263             sub cache_or_all {
264 107     107 0 467 my ($self, %args) = @_;
265              
266 107         345 my $type = delete $args{type};
267              
268 107         503 my $file_prefix = $self->cache_prefix($type, %args);
269              
270 107         600 my ($files, $last_update) = $self->directory_files_last_updated();
271              
272 107         932 my $cache_file = $self->cache_file($file_prefix);
273              
274 107 50       573 if ($cache_file) {
275 0         0 return ($self->serialize_class->parse(
276             $self->read_file($cache_file)
277             ), 1);
278             }
279              
280 107         2669 my $fm = Parallel::ForkManager->new(5000);
281              
282 107         633556 my @data;
283             $fm->run_on_finish(sub {
284 913     913   72396215 my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_structure_reference) = @_;
285              
286 913         3960 push @data, $data_structure_reference;
287              
288 107         3559 });
289              
290             READ_FILE:
291 107         2212 for my $file (@{$files}) {
  107         939  
292 1797 100       2030845 my $pid = $fm->start and next READ_FILE;
293              
294 66         646052 my $d = $self->serialize_class->parse(
295             $self->read_file(sprintf("%s/%s", $self->directory, $file))
296             );
297              
298 66         1214 $d->{__file} = $file;
299 66 100       1978 if ($file !~ m/^\d{9}\d+/) {
300 2         21 $d->{__custom_file_name} = $file;
301             }
302              
303              
304 66         2255 $fm->finish(0, $d);
305             }
306              
307 41         60715 $fm->wait_all_children;
308              
309 41 100       961 if ($args{sort_direction} eq 'asc') {
310 40         738 @data = sort { $a->{$args{sort}} cmp $b->{$args{sort}} } @data;
  3066         5225  
311             } else {
312 1         36 @data = sort { $b->{$args{sort}} <=> $a->{$args{sort}} } @data;
  105         148  
313             }
314 41         1283 $self->cache_write($file_prefix, $self->serialize_class->stringify(\@data));
315              
316 41         7667 return \@data;
317             }
318              
319             sub cache_or_filter {
320 2     2 0 17 my ($self, $type, %params) = @_;
321              
322 2         16 my ($files, $last_update) = $self->directory_files_last_updated();
323              
324 2         27 my $cache_file = $self->cache_file($type);
325              
326 2 100       26 if ($cache_file) {
327 1         10 return $self->serialize_class->parse(
328             $self->read_file($cache_file)
329             );
330             }
331              
332 1 50       8 my $data = $self->table->rows ? ODS::Iterator->new(table => $self->table) : $self->all();
333              
334             my $select = $data->filter(sub {
335 3     3   11 my $row = shift;
336 3         5 my $select = 1;
337 3         7 for my $key ( keys %params ) {
338 3 100       15 if ( $params{$key} ne $row->{$key} ) {
339 2         4 $select = undef;
340 2         6 last;
341             }
342             }
343 3         20 $select;
344 1         261 });
345              
346 1         8 $self->cache_write($type, $self->serialize_class->stringify([ map { $_->as_hash } @{$select}]));
  1         9  
  1         7  
347              
348 1         128 return $select;
349             }
350              
351             sub cache_or_find {
352 2     2 0 6 my ($self, $type, %params) = @_;
353              
354 2         9 my ($files, $last_update) = $self->directory_files_last_updated();
355              
356 2         7 my $cache_file = $self->cache_file($type);
357              
358 2 50       8 if ($cache_file) {
359 0         0 return $self->serialize_class->parse(
360             $self->read_file($cache_file)
361             );
362             }
363              
364 2 50       6 my $data = $self->table->rows ? ODS::Iterator->new(table => $self->table) : $self->all;
365              
366             # this only works for JSON and YAML, CSS and JSONL we can stream/read rows/lines instead of reading/loading
367             # all into memory.
368             my $select = $data->find(sub {
369 3     3   6 my $row = shift;
370 3         4 my $select = 1;
371 3         7 for my $key ( keys %params ) {
372 3 100       9 if ( $params{$key} ne $row->{$key} ) {
373 1         2 $select = undef;
374 1         2 last;
375             }
376             }
377 3         6 $select;
378 2         291 });
379              
380 2 50       13 $self->cache_write($type, $self->serialize_class->stringify($select->as_hash))
381             if ($select);
382              
383 2         162 return $select;
384             }
385              
386             sub open_file {
387 67     67 0 784 my ($self, $file) = @_;
388 2 50   2   65 open my $fh, '<:encoding(UTF-8)', $file or die "Cannot open file $file for reading: $!";
  2         19  
  2         71  
  67         15032  
389 67         41451 return $fh;
390             }
391              
392             sub open_write_file {
393 1760     1760 0 6553 my ($self, $file) = @_;
394 1760 50   68   285190 open my $fh, '>:encoding(UTF-8)', $file or die "Cannot open file $file for writing: $!";
  68         1597  
  68         573  
  68         1424  
395 1760         997738 return $fh;
396             }
397              
398             sub seek_file {
399 0     0 0 0 my ($self, @args) = @_;
400 0 0       0 @args = (0, 0) if (!scalar @args);
401 0         0 seek $self->file_handle, shift @args, shift @args;
402             }
403              
404             sub read_file {
405 67     67 0 6772 my ($self, $file) = @_;
406 67         1718 my $fh = $self->open_file($file);
407 67         499 my $data = do { local $/; <$fh> };
  67         1590  
  67         7484  
408 67         5834 return $data;
409             }
410              
411             sub read_directory {
412 1937     1937 0 22699 my ($self, $directory) = @_;
413 1937         6190 $self->write_directory($directory);
414 1937 50       72826 opendir(my $dh, $directory) || die "Can't opendir $directory: $!";
415 1937         278916 my @files = sort { $a cmp $b } grep { $_ !~ m/^\.+$/ } readdir($dh);
  10021         17154  
  6559         41972  
416 1937         22881 closedir $dh;
417 1937         15216 return \@files;
418             }
419              
420             sub write_directory {
421 3697     3697 0 7800 my ($self, $directory, $file) = @_;
422 3697         21759 my @parts = split '/', $directory;
423 3697         9763 my $path = '';
424 3697 100       11014 pop @parts if $file;
425 3697         13646 while (@parts) {
426 18485 100       43314 $path .= "/" if $path;
427 18485         23536 $path .= shift @parts;
428 18485 100       266936 mkdir $path unless -d $path;
429             }
430             }
431              
432             sub write_file {
433 1760     1760 0 38442 my ($self, $file, $data) = @_;
434 1760         10378 $self->write_directory($file, 1);
435 1760         9018 my $fh = $self->open_write_file($file);
436 1760         29568 print $fh $data;
437 1760         9611 $self->close_file($fh);
438 1760         20643 (my $real = $file) =~ s/\.tmp$//;
439 1760         17475 move($file, $real);
440             }
441              
442             sub unlink_file {
443 5     5 0 43 my ($self, $file) = @_;
444 5         234 unlink $file;
445             }
446              
447             sub close_file {
448 1760     1760 0 158336 close $_[1];
449             }
450              
451             1;