File Coverage

lib/Flux/File.pm
Criterion Covered Total %
statement 19 21 90.4
branch n/a
condition n/a
subroutine 7 7 100.0
pod n/a
total 26 28 92.8


line stmt bran cond sub pod time code
1             package Flux::File;
2             {
3             $Flux::File::VERSION = '1.01';
4             }
5              
6             # ABSTRACT: file storage
7              
8              
9 1     1   180226 use Moo;
  1         17079  
  1         10  
10             with
11             'Flux::Storage',
12             'Flux::Role::Owned',
13             'Flux::Role::Description',
14             ;
15              
16 1     1   2786 use MooX::Types::MooseLike::Base qw(:all);
  1         7673  
  1         534  
17              
18 1     1   1180 use Params::Validate qw(:all);
  1         8952  
  1         270  
19              
20 1     1   11 use Carp;
  1         1  
  1         62  
21 1     1   6 use Fcntl qw(SEEK_SET SEEK_CUR SEEK_END);
  1         2  
  1         54  
22 1     1   979 use IO::Handle;
  1         8389  
  1         80  
23 1     1   518 use Lock::File qw(lockfile);
  0            
  0            
24             use Flux::File::In;
25             use Flux::File::Cursor;
26              
27             use autodie;
28              
29             sub BUILDARGS {
30             my $class = shift;
31             my ($file, $p) = validate_pos(@_, 1, { type => HASHREF, default => {} });
32             return { file => $file, %$p };
33             }
34              
35             has file => (
36             is => 'ro',
37             isa => Str,
38             required => sub { 1 },
39             );
40              
41             has lock => (
42             is => 'ro',
43             isa => Bool,
44             default => sub { 1 },
45             );
46              
47             has safe => (
48             is => 'ro',
49             isa => Bool,
50             default => sub { 1 },
51             );
52              
53             has reopen => (
54             is => 'ro',
55             isa => Bool,
56             default => sub { 0 },
57             );
58              
59             sub description {
60             my $self = shift;
61             return "file: ".$self->file;
62             }
63              
64             sub _open {
65             my ($self) = @_;
66              
67             unless (-f $self->file) {
68             # touch file, so we can open it for rw later
69             # there is still a small race - file can be renamed after this open and before the second open
70             open(my $f, '>>', $self->file);
71             close($f);
72             }
73              
74             my $mode = $self->safe ? "+<" : ">>";
75              
76             open($self->{fh}, $mode, $self->file);
77             my $lock = $self->_lockfile;
78             if ($self->safe) {
79             $self->_truncate;
80             }
81              
82             return $lock;
83             }
84              
85             sub _lockfile {
86             my $self = shift;
87             return unless ($self->lock);
88             die "no filehandle" unless ($self->{fh});
89             my $lock = lockfile($self->{fh});
90             return $lock;
91             }
92              
93             sub _truncate {
94             my $self = shift;
95              
96             # return if it is an empty file
97             my $f = $self->{fh};
98             sysseek($f, 0, SEEK_END);
99             my $fsize = sysseek($f, 0, SEEK_CUR);
100             return if ($fsize == 0);
101              
102             # initially we check only last byte and if it is a "\n",
103             # then it's all ok
104             sysseek($f, -1, SEEK_END);
105             my $eof_byte = _sysread($f, 1);
106             return if ($eof_byte eq "\n");
107              
108             my $cur_pos = $fsize;
109             while (1) {
110             # we have reached beginning of the file and haven't found "\n",
111             # so we truncate file entirely
112             if ($cur_pos == 0) {
113             sysseek($f, 0, SEEK_SET);
114             $f->truncate(0);
115             last;
116             }
117              
118             # we read file in reverse order by chunks with $read_portion size.
119             # if current position is near of the beginning, we read
120             # all remained bytes from the start of the file
121             my $read_portion = 1024;
122             $read_portion = $cur_pos if ($cur_pos < $read_portion);
123             sysseek($f, $cur_pos - $read_portion, SEEK_SET);
124             my $s = _sysread($f, $read_portion);
125              
126             # try to find last index of "\n" in the chunk
127             my $index;
128             while (1) {
129             my $index_pos = 0;
130             $index_pos = $index + 1 if (defined($index));
131             my $new_index = index($s, "\n", $index_pos);
132             if ($new_index < 0) {
133             last;
134             } else {
135             $index = $new_index;
136             }
137             }
138              
139             # if found, then we can truncate file
140             if (defined($index)) {
141             my $new_pos = $cur_pos - $read_portion + $index +1;
142             sysseek($f, $new_pos, SEEK_SET);
143             $f->truncate($new_pos);
144             last;
145             }
146              
147             # else try to read chunk nearer to the beginning of file
148             $cur_pos -= $read_portion;
149             }
150             }
151              
152             sub _sysread {
153             my ($f, $length) = @_;
154              
155             my $line;
156             my $offset = 0;
157             my $left = $length;
158             while ($left) {
159             my $bytes = $f->sysread($line, $left, $offset);
160             if (not defined $bytes) {
161             die "sysread failed: $!";
162             } elsif ($bytes == 0) {
163             die "sysread no progress";
164             } else {
165             $offset += $bytes;
166             $left -= $bytes;
167             }
168             }
169              
170             return $line;
171             }
172              
173             sub _write {
174             my ($self) = @_;
175             my $left = length $self->{data};
176             my $offset = 0;
177             while ($left) {
178             my $bytes = $self->{fh}->syswrite($self->{data}, $left, $offset);
179             if (not defined $bytes) {
180             die "syswrite failed: $!";
181             } elsif ($bytes == 0) {
182             die "syswrite no progress";
183             } else {
184             $offset += $bytes;
185             $left -= $bytes;
186             }
187             }
188             delete $self->{data};
189             }
190              
191             sub _flush {
192             my ($self) = @_;
193             return unless defined $self->{data};
194              
195             my $lock;
196             if (!$self->{fh} || $self->reopen) {
197             $lock = $self->_open;
198             } else {
199             $lock = $self->_lockfile;
200             sysseek($self->{fh}, 0, SEEK_END) if $self->safe;
201             }
202              
203             $self->_write();
204             }
205              
206             sub write {
207             my ($self, $line) = @_;
208              
209             $self->write_chunk([$line]);
210             }
211              
212             sub write_chunk {
213             my ($self, $chunk) = @_;
214             croak "write_chunk method expects arrayref" unless ref($chunk) eq 'ARRAY'; # can chunks be blessed into something?
215             return unless @$chunk;
216             for my $line (@$chunk) {
217             die "invalid line $line" if ($line !~ /\n\z/);
218             if (defined $self->{data}) {
219             $self->{data} .= $line;
220             }
221             else {
222             $self->{data} = $line;
223             }
224             }
225             if (length($self->{data}) > 1_000) {
226             $self->_flush;
227             }
228             return; # TODO - what useful data can we return?
229             }
230              
231             sub commit {
232             my ($self) = @_;
233             $self->_flush;
234             }
235              
236             sub in {
237             my $self = shift;
238             my ($posfile) = validate_pos(@_, SCALAR);
239              
240             return Flux::File::In->new(cursor => Flux::File::Cursor->new(posfile => $posfile), file => $self->file);
241             }
242              
243             sub owner {
244             my ($self) = @_;
245             if (-e $self->file) {
246             return scalar getpwuid( (stat($self->file))[4] );
247             }
248             else {
249             return scalar getpwuid($>);
250             }
251             }
252              
253              
254             1;
255              
256             __END__