File Coverage

blib/lib/Data/Consumer/Dir.pm
Criterion Covered Total %
statement 107 117 91.4
branch 24 40 60.0
condition 6 12 50.0
subroutine 21 24 87.5
pod 7 7 100.0
total 165 200 82.5


line stmt bran cond sub pod time code
1             package Data::Consumer::Dir;
2              
3 5     5   1335 use warnings;
  5         5  
  5         105  
4 5     5   15 use strict;
  5         0  
  5         90  
5 5     5   5515 use DBI;
  5         52245  
  5         250  
6 5     5   30 use Carp qw(confess);
  5         5  
  5         200  
7 5     5   15 use warnings FATAL => 'all';
  5         5  
  5         170  
8 5     5   15 use base 'Data::Consumer';
  5         5  
  5         2035  
9 5     5   20 use File::Spec;
  5         5  
  5         65  
10 5     5   15 use File::Path;
  5         5  
  5         200  
11 5     5   20 use Fcntl;
  5         0  
  5         915  
12 5     5   15 use Fcntl ':flock';
  5         5  
  5         490  
13 5     5   20 use vars qw/$Debug $VERSION $Cmd $Fail/;
  5         5  
  5         255  
14              
15             # This code was formatted with the following perltidy options:
16             # -ple -ce -bbb -bbc -bbs -nolq -l=100 -noll -nola -nwls='=' -isbc -nolc -otr -kis
17             # If you patch it please use the same options for your patch.
18              
19             *Debug= *Data::Consumer::Debug;
20             *Cmd= *Data::Consumer::Cmd;
21             *Fail= *Data::Consumer::Fail;
22              
23             BEGIN {
24 5     5   25 __PACKAGE__->register();
25             }
26              
27             =head1 NAME
28              
29             Data::Consumer::Dir - Data::Consumer implementation for a directory of files resource
30              
31             =head1 VERSION
32              
33             Version 0.14
34              
35             =cut
36              
37             $VERSION= '0.14';
38              
39             =head1 SYNOPSIS
40              
41             use Data::Consumer::Dir;
42              
43             my $consumer = Data::Consumer::Dir->new(
44             root => '/some/dir',
45             create => 1,
46             open_mode => '+<',
47             );
48              
49             $consumer->consume( sub {
50             my $id = shift;
51             print "processed $id\n";
52             } );
53              
54              
55             =head1 FUNCTIONS
56              
57             =head2 CLASS->new(%opts)
58              
59             Constructor for a L instance.
60              
61             Either the C option must be provided or both C and
62             C arguments must be defined. Will die if the directories do
63             not exist unless the C option is set to a true value.
64              
65             =over 4
66              
67             =item unprocessed => $path_spec
68              
69             Directory within which unprocessed files will be found.
70              
71             May also be a callback which is responsible for marking the item as
72             unprocessed. This will be called with the arguments C<($consumer,
73             'unprocessed', $spec, $fh, $name)>.
74              
75             =item working => $path_spec
76              
77             Files will be moved to this directory prior to be processed.
78              
79             May also be a callback which is responsible for marking the item as
80             working. This will be called with the arguments C<($consumer,
81             'working', $spec, $fh, $name)>.
82              
83             =item processed => $path_spec
84              
85             Once successfully processed the files will be moved to this directory.
86              
87             May also be a callback which is responsible for marking the item as
88             processed. This will be called with the arguments C<($consumer,
89             'processed', $spec, $fh, $name)>.
90              
91             =item failed => $path_spec
92              
93             If processing fails then the files will be moved to this directory.
94              
95             May also be a callback which is responsible for marking the item as
96             failed. This will be called with the arguments C<($consumer, 'failed',
97             $spec, $fh, $name)>.
98              
99             =item root => $path_spec
100              
101             Automatically creates any of the C, C,
102             C, or C directories below a specified C. Only
103             those directories not explicitly defined will be automatically created
104             so this can be used in conjunction with the other options.
105              
106             =item create => $bool
107              
108             =item create_mode => $mode_flags
109              
110             If true then directories specified by not existing will be created.
111             If C is specified then the directories will be created with that mode.
112              
113             =item open_mode => $mode_str
114              
115             In order to lock a file a filehandle must be opened, normally in
116             read-only mode (C<< < >>), however it may be useful to open with other
117             modes.
118              
119             =back
120              
121             =cut
122              
123             BEGIN {
124 5     5   10 my @keys= qw(unprocessed working processed failed);
125 5         15 my %m= (
126             '<' => O_RDONLY,
127             '+<' => O_RDWR,
128             '>>' => O_APPEND | O_WRONLY,
129             '+>>' => O_APPEND | O_RDWR,
130             );
131 5         2460 $_= $_ | O_NONBLOCK for values %m;
132              
133             sub new {
134 5     5 1 4036345 my ( $class, %opts )= @_;
135 5         286 my $self= $class->SUPER::new(); # let Data::Consumer bless the hash
136              
137 5 50       28 if ( $opts{root} ) {
138 5         247 my ( $v, $p )= File::Spec->splitpath( $opts{root}, 'nofile' );
139 5         17 for my $type (@keys) {
140 20   33     487 $opts{$type} ||= File::Spec->catpath( $v, File::Spec->catdir( $p, $type ), '' );
141             }
142             }
143 5 50 33     137 ( $opts{unprocessed} and $opts{processed} )
144             or confess "Arguments 'unprocessed' and 'processed' are mandatory";
145              
146 5 50       25 if ( $opts{create} ) {
147 5         8 for (@keys) {
148 20 50       51 next unless exists $opts{$_};
149 20 100       312 next if -d $opts{$_};
150 3   33     330 mkpath( $opts{$_}, $Debug, $opts{create_mode} || () );
151             }
152             }
153 5 50       14 if ( $opts{open_mode} ) {
154 0         0 exists $m{ $opts{open_mode} }
155             or confess "Illegal open mode '$opts{open_mode}' legal options are "
156 5 50       26 . join( ',', map { "'$_'" } sort keys %m ) . "\n";
157 5         12 $opts{open_mode}= $m{ $opts{open_mode} };
158             } else {
159 0         0 $opts{open_mode}= O_RDONLY | O_NONBLOCK;
160             }
161              
162 5         28 %$self= %opts;
163 5         21 return $self;
164             }
165             }
166              
167             =head2 $object->reset()
168              
169             Reset the state of the object.
170              
171             =head2 $object->acquire()
172              
173             Acquire an item to be processed.
174              
175             Returns an identifier to be used to identify the item acquired.
176              
177             =head2 $object->release()
178              
179             Release any locks on the currently held item.
180              
181             Normally there is no need to call this directly.
182              
183             =cut
184              
185             sub reset {
186 11     11 1 24 my $self= shift;
187 11         48 $self->debug_warn( 5, "reset (scanning $self->{unprocessed})" );
188 11         55 $self->release();
189 11 50       393 opendir my $dh, $self->{unprocessed}
190             or die "Failed to opendir '$self->{unprocessed}': $!";
191 11 50       406 my @files= map { /(.*)/s && $1 } readdir($dh);
  266         774  
192              
193             #print for @files;
194 11         49 @files= sort grep { -f _cf( $self->{unprocessed}, $_ ) } @files;
  266         348  
195 11         51 $self->{files}= \@files;
196 11         2166 return $self;
197             }
198              
199             sub _cf { # cat file
200 610     610   682 my ( $r, $f )= @_;
201              
202 610         2561 my ( $v, $p )= File::Spec->splitpath( $r, 'nofile' );
203 610         5050 return File::Spec->catpath( $v, $p, $f );
204             }
205              
206             sub _do_callback {
207 50     50   74 my ( $self, $callback )= @_;
208 50         72 local $Fail;
209 50 50       89 if ( eval { $callback->( $self, @{$self}{qw(lock_spec lock_fh last_id)} ); 1; } ) {
  50         64  
  50         186  
  50         50016650  
210 50 50       244 if ($Fail) {
211 0         0 return "Callback reports an error: $Fail";
212             }
213 50         278 return;
214             } else {
215 0         0 return "Callback failed: $@";
216             }
217             }
218              
219             sub acquire {
220 60     60 1 79 my $self= shift;
221 60         110 my $dbh= $self->{dbh};
222              
223 60 50       64 $self->reset if !@{ $self->{files} || [] };
  60 100       272  
224              
225 60         111 my $files= $self->{files};
226 60         184 while (@$files) {
227 244         340 my $file= shift @$files;
228 244 50       644 next if $self->is_ignored($file);
229 244         391 my $spec= _cf( $self->{unprocessed}, $file );
230 244         238 my $fh;
231 244 100 100     6482 if ( sysopen $fh, $spec, $self->{open_mode} and flock( $fh, LOCK_EX | LOCK_NB ) ) {
232 50         81 $self->{lock_fh}= $fh;
233 50         54236 $self->{lock_spec}= $spec;
234 50         307 $self->debug_warn( 5, "acquired '$file': $spec" );
235 50         134 $self->{last_id}= $file;
236 50         309 return $file;
237             }
238             }
239 10         46 $self->debug_warn( 5, "acquire failed -- resource has been exhausted" );
240 10         84 return;
241             }
242              
243             sub release {
244 21     21 1 26 my $self= shift;
245              
246 21 100       9788 flock( $self->{lock_fh}, LOCK_UN ) if $self->{lock_fh};
247 21         112 delete $self->{lock_fh};
248 21         39 delete $self->{lock_spec};
249 21         25 delete $self->{last_id};
250 21         324 return 1;
251             }
252              
253             =head2 $object->fh()
254              
255             Return a filehandle to the currently acquired item. See the C
256             argument in C for details on how to control the mode that the
257             filehandle is opened with.
258              
259             =head2 $object->spec()
260              
261             Return the full filespec for the currently acquired item.
262              
263             =head2 $object->file()
264              
265             Return the filename (without path) of the currently acquired item.
266              
267             Note that this is an alias for C<< $object->last_id() >>.
268              
269             =cut
270              
271 0     0 1 0 sub fh { $_[0]->{lock_fh} }
272 0     0 1 0 sub spec { $_[0]->{lock_spec} }
273 0     0 1 0 sub file { $_[0]->{last_id} }
274              
275             sub _mark_as {
276 100     100   157 my ( $self, $key, $id )= @_;
277              
278 100 50       222 if ( $self->{$key} ) {
279 100 50       225 if ( ref $self->{$key} ) {
280              
281             # assume it must be a callback
282 0         0 $self->debug_warn( 5, "executing mark_as callback for '$key'" );
283 0         0 $self->{$key}->( $self, $key, $self->{lock_spec}, $self->{lock_fh}, $self->{last_id} );
284 0         0 return;
285             }
286 100         260 my $spec= _cf( $self->{$key}, $self->{last_id} );
287 100 50       242105 rename $self->{lock_spec}, $spec
288             or confess "$$: Failed to rename '$self->{lock_spec}' to '$spec':$!";
289 100         594 $self->{lock_spec}= $spec;
290             }
291             }
292              
293             sub DESTROY {
294 5     5   2006986 my $self= shift;
295 5 50       56 $self->release() if $self;
296             }
297              
298             =head1 AUTHOR
299              
300             Yves Orton, C<< >>
301              
302             =head1 BUGS
303              
304             Please report any bugs or feature requests to
305             C, or through the web interface at
306             L.
307              
308             I will be notified, and then you'll automatically be notified of progress on
309             your bug as I make changes.
310              
311             =head1 ACKNOWLEDGEMENTS
312              
313             Igor Sutton for ideas, testing and support
314              
315             =head1 COPYRIGHT & LICENSE
316              
317             Copyright 2008 Yves Orton, all rights reserved.
318              
319             This program is free software; you can redistribute it and/or modify it
320             under the same terms as Perl itself.
321              
322             =cut
323              
324             1; # End of Data::Consumer::Dir
325