File Coverage

blib/lib/Queue/Dir.pm
Criterion Covered Total %
statement 192 222 86.4
branch 61 100 61.0
condition 28 49 57.1
subroutine 24 27 88.8
pod 7 7 100.0
total 312 405 77.0


line stmt bran cond sub pod time code
1             package Queue::Dir;
2             # $Id: Dir.pm,v 1.13 2003/03/09 16:18:48 lem Exp $
3              
4             require 5.005_62;
5              
6 11     11   21718 use strict;
  11         22  
  11         605  
7 11     11   12956 use IO::Dir;
  11         355022  
  11         571  
8 11     11   100 use IO::File;
  11         24  
  11         1538  
9 11     11   56 use warnings;
  11         19  
  11         276  
10 11     11   10856 use Sys::Hostname;
  11         15171  
  11         800  
11 11     11   71 use Fcntl qw(:flock);
  11         24  
  11         1390  
12 11     11   10651 use Params::Validate qw(:all);
  11         145393  
  11         3206  
13              
14             our $Debug = 0;
15             our $hires = 'gettimeofday';
16              
17 11     11   11731 eval "use Time::HiRes qw(gettimeofday);";
  11         24436  
  11         53  
18              
19             if ($@) { $hires = 'time' }
20              
21 11     11   99 use vars qw($a $b);
  11         19  
  11         8098  
22              
23             our $VERSION = 0.01;
24              
25             =pod
26              
27             =head1 NAME
28              
29             Queue::Dir - Manage queue directories where each object is a file
30              
31             =head1 SYNOPSIS
32              
33             use Queue::Dir;
34              
35             my $q = new Queue::File (
36             -id => $my_process_id,
37             -paths => [ '/var/path/to/queue1', ... ],
38             -promiscuous => 1,
39             -sort => 'sortsub',
40             -filter => sub { ... },
41             -lockdir => 'lock',
42             -lockmax => 300,
43             );
44            
45             my ($fh, $qid) = $q->store($oid);
46              
47             my $qid = $q->next();
48              
49             my $fh = $q->visit($mode, $qid);
50              
51             my $status = $q->done($qid);
52              
53             my $name = $q->name($qid);
54              
55             =head1 DESCRIPTION
56              
57             C allows the manipulation of objects placed in a
58             queue. The queue is implemented as a directory where each object is
59             stored as a file.
60              
61             =head2 METHODS
62              
63             The following methods are defined:
64              
65             =over 4
66              
67             =item C
68              
69             B<-id> assigns a unique process-id to this queue object. Defaults to
70             something built from the serialization of the object + C<$$> or
71             something similar.
72              
73             B<-paths> specifies a list of paths to use as storage points for the
74             queue files. If more than one are supplied, round-robin will be used
75             to store objects there.
76              
77             When B<-promiscuous> is true (the default), objects stored with any
78             other C object are accessible. If set to false, only
79             files whose id matches the value for B<-id> are visible.
80              
81             B<-sort> allows for the specification of a sorting function, used to
82             decide the order in which the queue files will be used. The function
83             is invoked in the same fashion as C, getting two variables
84             (C<$a> and C<$b>) and returning -1, 0 or 1 depending on
85             comparison. C<$a> and C<$b> are hash references whose first element is
86             the queue id of the object and the second element is a the full
87             pathname of such object.
88              
89             The C passed in the B<-filter> parameter can control
90             which files in a given directory to consider as queue objects. By
91             default, all files will be considered part of the queue. This function
92             is called with a reference of the invoking object and the full
93             pathname of each file. A true return value causes the given file to be
94             included in the queue. Note that this is only called if
95             B<-promiscuous> is set to a false value.
96              
97             B<-lockdir> and B<-lockmax> control an optional locking mechanism that
98             reduces the chance of multiple collaborating instances of
99             C objects, from picking the same object from the
100             queue. B<-lockdir>, when present, defines the name of the directory
101             (within each queue directory) to use for storing the lock files. The
102             B<-lockmax> parameter, which defaults to 300 seconds, control for how
103             long the locks are honored.
104              
105             Note that locking is disabled by default.
106              
107             =cut
108              
109             sub new
110             {
111 23     23 1 6833 my $name = shift;
112 23   33     133 my $class = ref($name) || $name;
113              
114 23 50       99 warn "Queue::Dir::new()\n" if $Debug;
115              
116             my %self = validate_with
117             (
118             params => \@_,
119             ignore_case => 1,
120             strip_leading => '-',
121             spec =>
122             {
123             id =>
124             {
125             type => SCALAR,
126             default => hostname . $$,
127             },
128             paths =>
129             {
130             type => ARRAYREF,
131             callbacks =>
132             {
133 23     23   1872 directory => sub { $_ = shift; @$_ == grep { -d } @$_; }
  23         69  
  24         1370  
134             }
135             },
136             promiscuous =>
137             {
138             type => SCALAR | BOOLEAN,
139             default => 1,
140             },
141             sort =>
142             {
143             type => SCALAR,
144             default => 'Queue::Dir::_sort',
145             },
146             lockdir =>
147             {
148             type => SCALAR,
149             default => undef,
150             },
151             lockmax =>
152             {
153             type => SCALAR,
154             default => 300,
155             callbacks =>
156             {
157 0     0   0 numeric => sub { shift =~ /^\d+$/ },
158 0     0   0 positive => sub { shift > 0 },
159             },
160             },
161             filter =>
162             {
163             type => CODEREF,
164             default => sub
165             {
166 0     0   0 my $self = shift;
167 0         0 my $long = shift;
168            
169 0 0       0 return 0 unless $long;
170              
171 0         0 my ($path, $id) = (File::Spec->splitpath($long))[1,2];
172            
173 0         0 for my $p (@{$self->{paths}})
  0         0  
174             {
175 0 0 0     0 if (substr($p->[0], $path, 0) == 0
      0        
176             and -f $p->[0] . '/' . $id
177             and $id =~ m!^\d+\.\d+\.$self->{id}\.\d+$!)
178             {
179 0         0 return 1;
180             }
181             }
182            
183 0         0 return 0;
184             },
185             },
186 23         144 });
187            
188 21         406 @{$self{paths}} = sort { $a cmp $b } @{$self{paths}};
  21         68  
  1         3  
  21         74  
189              
190 21         52 $_ = [$_, new IO::Dir $_] for @{$self{paths}};
  21         196  
191              
192 21 50       1970 if (grep { ! defined $_->[1] } @{$self{paths}}) {
  22         134  
  21         52  
193 0         0 warn "One of the queue paths seems invalid\n";
194 0         0 return;
195             }
196              
197             # Prime the object with an empty file
198             # inventory.
199 21         74 $self{_files} = [];
200            
201             # We store objects in round-robin.
202 21         47 $self{_rr} = 0;
203 21         52 $self{_current} = [0, 0];
204              
205 21         55 my $self = bless \%self, $class;
206            
207 21 100       129 $self->_clean_locks if $self->{lockdir};
208              
209 21         492 return $self->_refresh;
210             }
211              
212 60     60   99 sub _sort { $a->[0] cmp $b->[0]; }
213 11     11   69 sub _timestamp { no strict "refs"; return join '', &$hires; }
  11     28   26  
  11         22291  
  28         282  
214              
215             # Update the inventory of queue
216             # objects, if required.
217             sub _refresh
218             {
219 64     64   297 my $self = shift;
220              
221 64 50       146 warn "Queue::Dir::_refresh()\n" if $Debug;
222              
223             # warn "_files ", scalar @{$self->{_files}}, " _current[0] ",
224             # $self->{_current}->[0], "\n";
225              
226 64 100 66     70 unless (@{$self->{_files}} or $self->{_current}->[0])
  64         378  
227             {
228 47 50       98 warn "Queue::Dir::_refresh() running\n" if $Debug;
229              
230 47         56 for my $p (@{$self->{paths}})
  47         98  
231             {
232             # warn "p\n";
233 48         222 $p->[1]->rewind;
234 48         702 while (defined (my $f = $p->[1]->read))
235             {
236 173 100 100     2552 next if $f eq '.' or $f eq '..';
237 77 100       1067 next unless -f $p->[0] . '/' . $f;
238             # warn "f\n";
239 46         216 my $t = [$f, $p->[0] . '/' . $f];
240 46 0 33     128 if (!$self->{promiscuous}
241             and !$self->{filter}->($t->[1]))
242             {
243 0         0 next;
244             }
245 46         52 push @{$self->{_files}}, $t;
  46         197  
246             }
247             }
248             # XXX - I seem unable to specify the sort
249             # function directly.
250 47         515 my $sort = $self->{sort};
251 47         60 @{$self->{_files}} = sort $sort @{$self->{_files}};
  47         111  
  47         191  
252             # $self->{_current} = shift @{$self->{_files}} || [0,0];
253             }
254              
255 64         158 return $self;
256             }
257              
258             # Give a $qid, fetch pathname
259             sub _name
260             {
261 76     76   91 my $self = shift;
262 76   33     203 my $qid = shift || $self->{_current}->[0] || $self->next;
263              
264             # First, try to find this object in
265             # out cached structures
266              
267 76 50       237 for my $t (($self->{_current}->[1] ? $self->{_current} : ()),
  76         176  
268             @{$self->{_files}})
269             {
270 81 100       238 if ($qid eq $t->[0]) { return $t->[1]; }
  46         532  
271             }
272              
273             # As a last resort, attempt to find
274             # the objext in the fs
275              
276 30         39 for my $p (@{$self->{paths}})
  30         59  
277             {
278 42         186 $p->[1]->rewind;
279 42         308 while (my $n = $p->[1]->read)
280             {
281 131 100       1267 if ($n eq $qid)
282             {
283 29         215 return $p->[0] . '/' . $n;
284             }
285             }
286             }
287              
288             # Otherwise, we have to fail...
289              
290 1         12 return;
291             }
292              
293             sub _clean_locks
294             {
295 10     10   14 my $self = shift;
296            
297 10 50       97 return unless $self->{lockdir};
298              
299 10         13 for my $p (@{$self->{paths}})
  10         25  
300             {
301 10         26 my $lock = $p->[0] . '/' . $self->{lockdir};
302 10         334 mkdir $lock;
303 10         52 my $d = new IO::Dir $lock;
304 10         554 while (my $f = $d->read)
305             {
306 21 100 100     429 next if $f eq '.' or $f eq '..';
307 1         3 my $name = $lock . '/' . $f;
308 1 50       86 if ((stat($name))[9] + $self->{lockmax} < time)
309             {
310 0         0 unlink $name;
311             }
312             }
313             }
314              
315             }
316              
317             # The test below might seem redundant, but
318             # it's an attempt to improve in a lot of
319             # broken NFS locking implementations.
320              
321             sub _lock
322             {
323 47     47   72 my $self = shift;
324 47         67 my $qid = shift;
325              
326 47         182 $self->{lockfh} = new IO::File;
327              
328 47 50       1308 warn "_lock $qid\n" if $Debug;
329              
330 47 100       186 return 1 unless $self->{lockdir};
331              
332 15         91 $self->{lockfile} = $self->{paths}->[(split(/\./, $qid))[1]]->[0];
333              
334 15 50       57 return unless $self->{lockfile};
335              
336 15         309 $self->{_key} = $self->{id} . '-' . $$ . '-' . int(rand(10000));
337 15         52 $self->{lockfile} .= '/' . $self->{lockdir} . '/' . $qid;
338              
339 15 50       46 warn "_lock lockfile is $self->{lockfile}\n" if $Debug;
340              
341 15 100       282 if (-f $self->{lockfile})
342             {
343 3 50       30 if ((stat(_))[9] + $self->{lockmax} < time)
344             {
345 0 0       0 warn "_lock forcing unlink (stale) lockfile\n" if $Debug;
346 0         0 unlink $self->{lockfile};
347             }
348             else
349             {
350 3 50       10 warn "_lock failing due to previous lock\n" if $Debug;
351 3         25 return;
352             }
353             }
354             # Store our key in the lock file
355              
356 12 50       55 $self->{lockfh}->open($self->{lockfile}, O_RDWR | O_CREAT) or return;
357 12         1013 $self->{lockfh}->autoflush(1);
358              
359 12 50       583 unless (flock $self->{lockfh}, LOCK_EX | LOCK_NB)
360             {
361 0         0 $self->{lockfh}->close;
362 0         0 $self->{lockfh} = undef;
363 0         0 unlink $self->{lockfile};
364 0         0 $self->{lockfile} = undef;
365 0         0 return;
366             }
367 12         69 $self->{lockfh}->print($self->{_key});
368              
369 12 50       6501 warn "_lock key $self->{_key} stored\n" if $Debug;
370              
371             # Verify that the key is indeed in the
372             # lock file
373              
374 12         77 $self->{lockfh}->seek(0, 0);
375 12         536 chomp(my $rkey = $self->{lockfh}->getline);
376              
377 12 50       543 warn "_lock key $rkey recovered\n" if $Debug;
378              
379 12 50       61 unless ($rkey eq $self->{_key})
380             {
381 0         0 $self->{lockfh}->close;
382 0         0 $self->{lockfh} = undef;
383 0         0 unlink $self->{lockfile};
384 0         0 $self->{lockfile} = undef;
385 0         0 return;
386             }
387              
388 12 50       27 warn "_lock key matched\n" if $Debug;
389              
390             # If all this passed, the lock is ours
391 12         51 return 1;
392             }
393              
394             =pod
395              
396             =item Cstore();>
397              
398             Store a file in the queue. Returns an array whose first element is an
399             C object for writing to the file. The second element is
400             the identifier of the object in the queue.
401              
402             If you created the C object with locking enabled, you must
403             call C<-Eunlock> after closing the file handle.
404              
405             =cut
406              
407             sub store
408             {
409 28     28 1 10770 my $self = shift;
410 28         158 my $fh = new IO::File;
411 28         1233 my $queue = $self->{paths}->[$self->{_rr}];
412 28         64 my $qid = _timestamp . '.' . $self->{_rr} . '.' . $self->{id};
413 28         56 my $counter = 0;
414 28         33 my $pname;
415              
416 28 50       88 warn "Queue::Dir::store() qid=$qid\n" if $Debug;
417              
418 28         54 $self->{_rr} ++;
419 28         38 $self->{_rr} %= @{$self->{paths}};
  28         68  
420              
421 28         614 while (-f ($pname = $queue->[0] . '/' . $qid . '.' . $counter))
422             {
423 0         0 ++ $counter;
424             }
425              
426 28         60 $qid .= '.' . $counter;
427              
428 28 50       121 $fh->open($pname, "w") or return;
429 28         3082 $self->{_current} = [$qid, $pname];
430              
431 28         105 $self->_lock($qid);
432              
433 28         101 return ($fh, $qid);
434             }
435              
436             =pod
437              
438             =item Cnext();>
439              
440             Returns the queue identifier of the next file to be processed. When
441             the queue is empty, returns undef.
442              
443             Note that if multiple consumers are working on the same queues in
444             promiscuous mode, the file referenced by the returned id might be
445             removed at any time so care must be used.
446              
447             Entries will be returned in an arbitrary order.
448              
449             =cut
450              
451             sub next
452             {
453 46     46 1 8165 my $self = shift;
454              
455 46 100       56 $self->_refresh unless @{$self->{_files}};
  46         169  
456              
457 46   100     56 $self->{_current} = shift @{$self->{_files}} || [0, 0];
458              
459 46 50       132 warn "Queue::Dir::next() current=", $self->{_current}->[0], "\n" if $Debug;
460              
461             # warn "next: Current queue has\n";
462             # foreach (@{$self->{_files}})
463             # {
464             # warn " $_->[1]\n";
465             # }
466              
467 46 100       114 unless ($self->{_current}->[0])
468             {
469 21         43 $self->_refresh;
470 21         70 return;
471             }
472              
473 25         94 return $self->{_current}->[0];
474             }
475              
476             =pod
477              
478             =item Cvisit($mode, $qid);>
479              
480             On success, returns an C object, opened according to the
481             specified C<$mode> for the file with C<$qid>. If C<$mode> is not
482             specified, it defaults to a read from the start of the file. If
483             C<$qid> is not specified, it defaults to the next entry, as if
484             C<-Enext()> were called. In order for the file to be eligible,
485             either the C object is not created with locking enabled or
486             the file in the queue is not locked.
487              
488             It can fail in a number of situations. The obvious one, is when the
489             queue is empty. The second one, happens when the desired file is no
490             longer in the queue, which can happen if multiple consumers are
491             accessing the queue in promiscuous mode.
492              
493             To help disambiguate both scenarios, undef will be returned on an
494             empty queue. A defined but false value will be returned when the
495             desired file is missing but others remain in the queue.
496              
497             The object in the queue will be automatically locked if this option is
498             enabled when C<-Enew> was called. In this case, you should call
499             the C<-Eunlock> method.
500              
501             =cut
502              
503             sub visit
504             {
505 20     20 1 3385 my $self = shift;
506 20   100     101 my $mode = shift || "r";
507 20   100     139 my $qid = shift || $self->{_current}->[0] || $self->next;
508              
509 20 50       50 warn "Queue::Dir::visit() qid=$qid\n" if $Debug;
510              
511 20 100       57 return unless $qid;
512              
513 19         85 my $fh = new IO::File;
514 19         492 my $name;
515            
516 19   33     58 until ($name = $self->_name($qid)
      66        
      66        
517             and -f $name
518             and $self->_lock($qid)
519             and $fh->open($name, $mode))
520             {
521 3 50       16 unless ($qid = $self->next)
522             {
523 3 50       6 if (@{$self->{_files}})
  3         10  
524             {
525 3 50       25 warn "Queue::Dir::visit() ret undef\n" if $Debug;
526 3         16 return undef;
527             }
528             else
529             {
530 0 0       0 warn "Queue::Dir::visit() ret 0\n" if $Debug;
531 0         0 return 0;
532             }
533             }
534             }
535              
536 16         921 return $fh;
537             }
538              
539             =pod
540              
541             =item C<$q-Edone($qid);>
542              
543             Disposes the queue file whose C<$qid> matches the given identifier as
544             well as its potential lock. If none is specified, defaults to the last
545             one used in a C<-Evisit()>.
546              
547             It is a bad idea (or at least rough manners) to C the file
548             without invoking C<-Edone>. Besides, C<-Edone> will do it for
549             you.
550              
551             =cut
552              
553             sub done
554             {
555 27     27 1 2903 my $self = shift;
556 27   66     91 my $qid = shift || $self->{_current}->[0];
557 27         37 my $wipe = 0;
558              
559 27 50       65 warn "Queue::Dir::done() qid=$qid\n" if $Debug;
560              
561 27 50       90 return if $qid eq 0;
562              
563 27         69 my $name = $self->_name($qid);
564              
565 27 100       232 return unless $name;
566              
567 26         83 $self->unlock($qid);
568              
569 26         1950 unlink $name;
570              
571 26         58 for (my $i = 0;
  31         293  
572             $i < @{$self->{_files}};
573             $i ++)
574             {
575 7 100       28 if ($self->{_files}->[$i]->[0] eq $qid)
576             {
577 2         3 splice(@{$self->{_files}}, $i, 1);
  2         7  
578 2         7 return;
579             }
580             }
581              
582             }
583              
584             =pod
585              
586             =item Cname($qid);>
587              
588             Returns the full pathname of the queue file whose id matches
589             C<$qid>. If none is supplied, defaults to the last one obtained
590             through a C<-Estore()>, C<-Enext()> or C<-Evisit()>.
591              
592             It could return C is the queue object no longer exists.
593              
594             =cut
595              
596             sub name
597             {
598 30     30 1 10513 my $self = shift;
599 30   33     79 my $qid = shift || $self->{_current}->[0] || $self->next;
600 30 50       51 warn "Queue::Dir::name() qid=$qid\n" if $Debug;
601 30         51 return $self->_name($qid);
602             }
603              
604             =pod
605              
606             =item C<-Eunlock($qid)>
607              
608             Removes any locks outstanding in the file identified by C<$qid>, or
609             the last Ced file. Use of this method is only required if the
610             object is created with locking enabled.
611              
612             =cut
613              
614             sub unlock
615             {
616 33     33 1 7261 my $self = shift;
617 33   66     106 my $qid = shift || $self->{_current}->[0];
618 33         146 my $fh = new IO::File;
619              
620 33 50       889 warn "unlock $qid\n" if $Debug;
621              
622 33 100       124 return 1 unless $self->{lockdir};
623 14 100       49 return 1 unless $self->{lockfh};
624              
625 9         121 close $self->{lockfh};
626 9         19 $self->{lockfh} = undef;
627              
628 9         863 unlink $self->{lockfile};
629 9         25 $self->{lockfile} = undef;
630              
631 9         35 return 1;
632             }
633              
634             1;
635             __END__