File Coverage

blib/lib/IO/Async/FileStream.pm
Criterion Covered Total %
statement 104 109 95.4
branch 36 52 69.2
condition 4 8 50.0
subroutine 16 18 88.8
pod 4 7 57.1
total 164 194 84.5


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2011-2015 -- leonerd@leonerd.org.uk
5              
6             package IO::Async::FileStream;
7              
8 2     2   1253 use strict;
  2         7  
  2         81  
9 2     2   11 use warnings;
  2         4  
  2         70  
10              
11             our $VERSION = '0.802';
12              
13 2     2   9 use base qw( IO::Async::Stream );
  2         4  
  2         642  
14              
15 2     2   364 use IO::Async::File;
  2         5  
  2         66  
16              
17 2     2   9 use Carp;
  2         4  
  2         99  
18 2     2   12 use Fcntl qw( SEEK_SET SEEK_CUR );
  2         3  
  2         2153  
19              
20             =head1 NAME
21              
22             C - read the tail of a file
23              
24             =head1 SYNOPSIS
25              
26             use IO::Async::FileStream;
27              
28             use IO::Async::Loop;
29             my $loop = IO::Async::Loop->new;
30              
31             open my $logh, "<", "var/logs/daemon.log" or
32             die "Cannot open logfile - $!";
33              
34             my $filestream = IO::Async::FileStream->new(
35             read_handle => $logh,
36              
37             on_initial => sub {
38             my ( $self ) = @_;
39             $self->seek_to_last( "\n" );
40             },
41              
42             on_read => sub {
43             my ( $self, $buffref ) = @_;
44              
45             while( $$buffref =~ s/^(.*\n)// ) {
46             print "Received a line $1";
47             }
48              
49             return 0;
50             },
51             );
52              
53             $loop->add( $filestream );
54              
55             $loop->run;
56              
57             =head1 DESCRIPTION
58              
59             This subclass of L allows reading the end of a regular file
60             which is being appended to by some other process. It invokes the C
61             event when more data has been added to the file.
62              
63             This class provides an API identical to L when given a
64             C; it should be treated similarly. In particular, it can be given
65             an C handler, or subclassed to provide an C method, or even
66             used as the C for an L object.
67              
68             It will not support writing.
69              
70             To watch a file, directory, or other filesystem entity for updates of other
71             properties, such as C, see also L.
72              
73             =cut
74              
75             =head1 EVENTS
76              
77             The following events are invoked, either using subclass methods or CODE
78             references in parameters.
79              
80             Because this is a subclass of L in read-only mode, all the
81             events supported by C relating to the read handle are supported here.
82             This is not a full list; see also the documentation relating to
83             L.
84              
85             =head2 $ret = on_read \$buffer, $eof
86              
87             Invoked when more data is available in the internal receiving buffer.
88              
89             Note that C<$eof> only indicates that all the data currently available in the
90             file has now been read; in contrast to a regular L, this
91             object will not stop watching after this condition. Instead, it will continue
92             watching the file for updates.
93              
94             =head2 on_truncated
95              
96             Invoked when the file size shrinks. If this happens, it is presumed that the
97             file content has been replaced. Reading will then commence from the start of
98             the file.
99              
100             =head2 on_initial $size
101              
102             Invoked the first time the file is looked at. It is passed the initial size of
103             the file. The code implementing this method can use the C or
104             C methods to set the initial read position in the file to skip
105             over some initial content.
106              
107             This method may be useful to skip initial content in the file, if the object
108             should only respond to new content added after it was created.
109              
110             =cut
111              
112             sub _init
113             {
114 7     7   22 my $self = shift;
115 7         22 my ( $params ) = @_;
116              
117 7         46 $self->SUPER::_init( $params );
118              
119 7         23 $params->{close_on_read_eof} = 0;
120              
121 7         21 $self->{last_size} = undef;
122              
123 7         46 $self->add_child( $self->{file} = IO::Async::File->new(
124             on_devino_changed => $self->_replace_weakself( 'on_devino_changed' ),
125             on_size_changed => $self->_replace_weakself( 'on_size_changed' ),
126             ) );
127             }
128              
129             =head1 PARAMETERS
130              
131             The following named parameters may be passed to C or C, in
132             addition to the parameters relating to reading supported by
133             L.
134              
135             =head2 filename => STRING
136              
137             Optional. If supplied, watches the named file rather than the filehandle given
138             in C. The file will be opened by the constructor, and then
139             watched for renames. If the file is renamed, the new filename is opened and
140             tracked similarly after closing the previous file.
141              
142             =head2 interval => NUM
143              
144             Optional. The interval in seconds to poll the filehandle using C
145             looking for size changes. A default of 2 seconds will be applied if not
146             defined.
147              
148             =cut
149              
150             sub configure
151             {
152 8     8 1 20 my $self = shift;
153 8         62 my %params = @_;
154              
155 8         29 foreach (qw( on_truncated on_initial )) {
156 16 100       67 $self->{$_} = delete $params{$_} if exists $params{$_};
157             }
158              
159 8         29 foreach (qw( interval )) {
160 8 100       47 $self->{file}->configure( $_ => delete $params{$_} ) if exists $params{$_};
161             }
162 8 100 33     68 if( exists $params{filename} ) {
    50          
163 1         10 $self->{file}->configure( filename => delete $params{filename} );
164 1         7 $params{read_handle} = $self->{file}->handle;
165             }
166             elsif( exists $params{handle} or exists $params{read_handle} ) {
167 7         22 my $handle = delete $params{handle};
168 7 50       27 defined $handle or $handle = delete $params{read_handle};
169              
170 7         63 $self->{file}->configure( handle => $handle );
171 7         33 $params{read_handle} = $self->{file}->handle;
172             }
173              
174 8 50       36 croak "Cannot have a write_handle in a ".ref($self) if defined $params{write_handle};
175              
176 8         52 $self->SUPER::configure( %params );
177              
178 8 100 66     28 if( $self->read_handle and !defined $self->{last_size} ) {
179 7         27 my $size = (stat $self->read_handle)[7];
180              
181 7         33 $self->{last_size} = $size;
182              
183 7         29 local $self->{running_initial} = 1;
184 7         112 $self->maybe_invoke_event( on_initial => $size );
185             }
186             }
187              
188             =head1 METHODS
189              
190             =cut
191              
192             # Replace IO::Async::Handle's implementation
193             sub _watch_read
194             {
195 21     21   47 my $self = shift;
196 21         55 my ( $want ) = @_;
197              
198 21 100       55 if( $want ) {
199 14 50       47 $self->{file}->start if !$self->{file}->is_running;
200             }
201             else {
202 7         50 $self->{file}->stop;
203             }
204             }
205              
206             sub _watch_write
207             {
208 7     7   18 my $self = shift;
209 7         24 my ( $want ) = @_;
210              
211 7 50       37 croak "Cannot _watch_write in " . ref($self) if $want;
212             }
213              
214             sub on_devino_changed
215             {
216 1 50   1 0 8 my $self = shift or return;
217              
218 1         5 $self->{renamed} = 1;
219 1         8 $self->debug_printf( "read tail of old file" );
220 1         5 $self->read_more;
221             }
222              
223             sub on_size_changed
224             {
225 9 50   9 0 52 my $self = shift or return;
226 9         40 my ( $size ) = @_;
227              
228 9 100       53 if( $size < $self->{last_size} ) {
229 1         18 $self->maybe_invoke_event( on_truncated => );
230 1         11 $self->{last_pos} = 0;
231             }
232              
233 9         32 $self->{last_size} = $size;
234              
235 9         88 $self->debug_printf( "read_more" );
236 9         59 $self->read_more;
237             }
238              
239             sub read_more
240             {
241 11     11 0 30 my $self = shift;
242              
243 11 100       57 sysseek( $self->read_handle, $self->{last_pos}, SEEK_SET ) if defined $self->{last_pos};
244              
245 11         101 $self->on_read_ready;
246              
247 11         66 $self->{last_pos} = sysseek( $self->read_handle, 0, SEEK_CUR ); # == systell
248              
249 11 50       164 if( $self->{last_pos} < $self->{last_size} ) {
    100          
250 0     0   0 $self->loop->later( sub { $self->read_more } );
  0         0  
251             }
252             elsif( $self->{renamed} ) {
253 1         26 $self->debug_printf( "reopening for rename" );
254              
255 1         3 $self->{last_size} = 0;
256              
257 1 50       6 if( $self->{last_pos} ) {
258 1         5 $self->maybe_invoke_event( on_truncated => );
259 1         5 $self->{last_pos} = 0;
260 1     1   5 $self->loop->later( sub { $self->read_more } );
  1         4  
261             }
262              
263 1         9 $self->configure( read_handle => $self->{file}->handle );
264 1         8 undef $self->{renamed};
265             }
266             }
267              
268             sub write
269             {
270 0     0 1 0 carp "Cannot ->write from a ".ref($_[0]);
271             }
272              
273             =head2 seek
274              
275             $filestream->seek( $offset, $whence )
276              
277             Callable only during the C event. Moves the read position in the
278             filehandle to the given offset. C<$whence> is interpreted as for C,
279             should be either C, C or C. Will be set to
280             C if not provided.
281              
282             Normally this would be used to seek to the end of the file, for example
283              
284             on_initial => sub {
285             my ( $self, $filesize ) = @_;
286             $self->seek( $filesize );
287             }
288              
289             =cut
290              
291             sub seek
292             {
293 2     2 1 14 my $self = shift;
294 2         7 my ( $offset, $whence ) = @_;
295              
296 2 50       10 $self->{running_initial} or croak "Cannot ->seek except during on_initial";
297              
298 2 100       9 defined $whence or $whence = SEEK_SET;
299              
300 2         7 sysseek( $self->read_handle, $offset, $whence );
301             }
302              
303             =head2 seek_to_last
304              
305             $success = $filestream->seek_to_last( $str_pattern, %opts )
306              
307             Callable only during the C event. Attempts to move the read
308             position in the filehandle to just after the last occurrence of a given match.
309             C<$str_pattern> may be a literal string or regexp pattern.
310              
311             Returns a true value if the seek was successful, or false if not. Takes the
312             following named arguments:
313              
314             =over 8
315              
316             =item blocksize => INT
317              
318             Optional. Read the file in blocks of this size. Will take a default of 8KiB if
319             not defined.
320              
321             =item horizon => INT
322              
323             Optional. Give up looking for a match after this number of bytes. Will take a
324             default value of 4 times the blocksize if not defined.
325              
326             To force it to always search through the entire file contents, set this
327             explicitly to C<0>.
328              
329             =back
330              
331             Because regular file reading happens synchronously, this entire method
332             operates entirely synchronously. If the file is very large, it may take a
333             while to read back through the entire contents. While this is happening no
334             other events can be invoked in the process.
335              
336             When looking for a string or regexp match, this method appends the
337             previously-read buffer to each block read from the file, in case a match
338             becomes split across two reads. If C is reduced to a very small
339             value, take care to ensure it isn't so small that a match may not be noticed.
340              
341             This is most likely useful for seeking after the last complete line in a
342             line-based log file, to commence reading from the end, while still managing to
343             capture any partial content that isn't yet a complete line.
344              
345             on_initial => sub {
346             my $self = shift;
347             $self->seek_to_last( "\n" );
348             }
349              
350             =cut
351              
352             sub seek_to_last
353             {
354 1     1 1 7 my $self = shift;
355 1         5 my ( $str_pattern, %opts ) = @_;
356              
357 1 50       5 $self->{running_initial} or croak "Cannot ->seek_to_last except during on_initial";
358              
359 1         2 my $offset = $self->{last_size};
360              
361 1   50     4 my $blocksize = $opts{blocksize} || 8192;
362              
363 1 50       6 defined $opts{horizon} or $opts{horizon} = $blocksize * 4;
364 1 50       5 my $horizon = $opts{horizon} ? $offset - $opts{horizon} : 0;
365 1 50       4 $horizon = 0 if $horizon < 0;
366              
367 1 50       29 my $re = ref $str_pattern ? $str_pattern : qr/\Q$str_pattern\E/;
368              
369 1         3 my $prev = "";
370 1         5 while( $offset > $horizon ) {
371 3         4 my $len = $blocksize;
372 3 50       6 $len = $offset if $len > $offset;
373 3         4 $offset -= $len;
374              
375 3         8 sysseek( $self->read_handle, $offset, SEEK_SET );
376 3         11 sysread( $self->read_handle, my $buffer, $blocksize );
377              
378             # TODO: If $str_pattern is a plain string this could be more efficient
379             # using rindex
380 3 100       20 if( () = ( $buffer . $prev ) =~ m/$re/sg ) {
381             # $+[0] will be end of last match
382 1         5 my $pos = $offset + $+[0];
383 1         6 $self->seek( $pos );
384 1         9 return 1;
385             }
386              
387 2         7 $prev = $buffer;
388             }
389              
390 0           $self->seek( $horizon );
391 0           return 0;
392             }
393              
394             =head1 TODO
395              
396             =over 4
397              
398             =item *
399              
400             Move the actual file update watching code into L, possibly as
401             a new watch/unwatch method pair C.
402              
403             =item *
404              
405             Consider if a construction-time parameter of C or C
406             might be neater than a small code block in C, if that turns out to
407             be the only or most common form of use.
408              
409             =back
410              
411             =cut
412              
413             =head1 AUTHOR
414              
415             Paul Evans
416              
417             =cut
418              
419             0x55AA;