File Coverage

blib/lib/IPC/DirQueue.pm
Criterion Covered Total %
statement 391 545 71.7
branch 122 236 51.6
condition 20 55 36.3
subroutine 46 48 95.8
pod 8 36 22.2
total 587 920 63.8


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             IPC::DirQueue - disk-based many-to-many task queue
4              
5             =head1 SYNOPSIS
6              
7             my $dq = IPC::DirQueue->new({ dir => "/path/to/queue" });
8             $dq->enqueue_file("filename");
9              
10             my $dq = IPC::DirQueue->new({ dir => "/path/to/queue" });
11             my $job = $dq->pickup_queued_job();
12             if (!$job) { print "no jobs left\n"; exit; }
13             # ...do something interesting with $job->get_data_path() ...
14             $job->finish();
15              
16             =head1 DESCRIPTION
17              
18             This module implements a FIFO queueing infrastructure, using a directory
19             as the communications and storage media. No daemon process is required to
20             manage the queue; all communication takes place via the filesystem.
21              
22             A common UNIX system design pattern is to use a tool like C as a task
23             queueing system; for example,
24             C describes the
25             use of C as an MP3 jukebox.
26              
27             However, C isn't as efficient as it could be. When used in this way, you
28             have to restart each task processor for every new task. If you have a lot of
29             startup overhead, this can be very inefficient. With C, a
30             processing server can run persistently and cache data needed across multiple
31             tasks efficiently; it will not be restarted unless you restart it.
32              
33             Multiple enqueueing and dequeueing processes on multiple hosts (NFS-safe
34             locking is used) can run simultaneously, and safely, on the same queue.
35              
36             Since multiple dequeuers can run simultaneously, this provides a good way
37             to process a variable level of incoming tasks using a pre-defined number
38             of worker processes.
39              
40             If you need more CPU power working on a queue, you can simply start
41             another dequeuer to help out. If you need less, kill off a few dequeuers.
42              
43             If you need to take down the server to perform some maintainance or
44             upgrades, just kill the dequeuer processes, perform the work, and start up
45             new ones. Since there's no 'socket' or similar point of failure aside from
46             the directory itself, the queue will just quietly fill with waiting jobs
47             until the new dequeuer is ready.
48              
49             Arbitrary 'name = value' string-pair metadata can be transferred alongside data
50             files. In fact, in some cases, you may find it easier to send unused and
51             empty data files, and just use the 'metadata' fields to transfer the details of
52             what will be worked on.
53              
54             =head1 METHODS
55              
56             =over 4
57              
58             =cut
59              
60             package IPC::DirQueue;
61 23     23   302052 use strict;
  23         64  
  23         4954  
62 23     23   37848 use bytes;
  23         326  
  23         126  
63 23     23   74674 use Time::HiRes qw();
  23         598363  
  23         893  
64 23     23   195 use Fcntl qw(O_WRONLY O_CREAT O_EXCL O_RDONLY);
  23         51  
  23         7653  
65 23     23   22360 use IPC::DirQueue::Job;
  23         65  
  23         642  
66 23     23   15457 use IPC::DirQueue::IndexClient;
  23         82  
  23         876  
67 23     23   268 use Errno qw(EEXIST);
  23         55  
  23         6299  
68              
69             our @ISA = ();
70              
71             our $VERSION = "1.0";
72              
73 23     23   141 use constant SLASH => '/';
  23         44  
  23         234657  
74              
75             # our $DEBUG = 1;
76             our $DEBUG; # = 1;
77              
78             ###########################################################################
79              
80             =item $dq->new ($opts);
81              
82             Create a new queue object, suitable for either enqueueing jobs
83             or picking up already-queued jobs for processing.
84              
85             C<$opts> is a reference to a hash, which may contain the following
86             options:
87              
88             =over 4
89              
90             =item dir => $path_to_directory (no default)
91              
92             Name the directory where the queue files are stored. This is required.
93              
94             =item data_file_mode => $mode (default: 0666)
95              
96             The C-style file mode for data files. This should be specified
97             as a string with a leading 0. It will be affected by the current
98             process C.
99              
100             =item queue_file_mode => $mode (default: 0666)
101              
102             The C-style file mode for queue control files. This should be
103             specified as a string with a leading 0. It will be affected by the
104             current process C.
105              
106             =item ordered => { 0 | 1 } (default: 1)
107              
108             Whether the jobs should be processed in order of submission, or
109             in no particular order.
110              
111             =item queue_fanout => { 0 | 1 } (default: 0)
112              
113             Whether the queue directory should be 'fanned out'. This allows better
114             scalability with NFS-shared queues with large numbers of pending files, but
115             hurts performance otherwise. It also implies B = 0. (This is
116             strictly experimental, has overall poor performance, and is not recommended.)
117              
118             =item indexd_uri => $uri (default: undef)
119              
120             A URI of a C daemon, used to maintain the list of waiting jobs. The
121             URI must be of the form C . (This is strictly
122             experimental, and is not recommended.)
123              
124             =item buf_size => $number (default: 65536)
125              
126             The buffer size to use when copying files, in bytes.
127              
128             =item active_file_lifetime => $number (default: 600)
129              
130             The lifetime of an untouched active lockfile, in seconds. See 'STALE LOCKS AND
131             SIGNAL HANDLING', below, for more details.
132              
133             =back
134              
135             =cut
136              
137             sub new {
138 20     20 1 629666 my $class = shift;
139 20         132 my $opts = shift;
140 20   50     242 $opts ||= { };
141 20   33     311 $class = ref($class) || $class;
142 20         70 my $self = $opts;
143 20         94 bless ($self, $class);
144              
145 20 50       453 die "no 'dir' specified" unless $self->{dir};
146 20   50     1256 $self->{data_file_mode} ||= '0666';
147 20         106 $self->{data_file_mode} = oct ($self->{data_file_mode});
148 20   50     181 $self->{queue_file_mode} ||= '0666';
149 20         65 $self->{queue_file_mode} = oct ($self->{queue_file_mode});
150              
151 20 100       161 if ($self->{queue_fanout}) {
    100          
152 2         4 $self->{queue_fanout} = 1;
153 2         5 $self->{ordered} = 0; # fanout wins
154             }
155             elsif (!defined $self->{ordered}) {
156 10         27 $self->{ordered} = 1;
157             }
158              
159 20   50     159 $self->{buf_size} ||= 65536;
160 20   50     168 $self->{active_file_lifetime} ||= 600;
161              
162 20         63 $self->{ensured_dir_exists} = { };
163 20         172 $self->ensure_dir_exists ($self->{dir});
164              
165 20 50       76 if ($self->{indexd_uri}) {
166 0         0 $self->{indexclient} = IPC::DirQueue::IndexClient->new({
167             uri => $self->{indexd_uri}
168             });
169             }
170              
171 20         102 $self;
172             }
173              
174             sub dbg;
175              
176             ###########################################################################
177              
178             =item $dq->enqueue_file ($filename [, $metadata [, $pri] ] );
179              
180             Enqueue a new job for processing. Returns C<1> if the job was enqueued, or
181             C on failure.
182              
183             C<$filename> is the path to the file to be enqueued. Its contents
184             will be read, and will be used as the contents of the data file available
185             to dequeuers using C.
186              
187             C<$metadata> is an optional hash reference; every item of metadata will be
188             available to worker processes on the C object, in the
189             C<$job-E{metadata}> hashref. Note that using this channel for metadata
190             brings with it several restrictions:
191              
192             =over 4
193              
194             =item 1. it requires that the metadata be stored as 'name' => 'value' string pairs
195              
196             =item 2. neither 'name' nor 'value' may contain newline (\n) or NUL (\0) characters
197              
198             =item 3. 'name' cannot contain colon (:) characters
199              
200             =item 4. 'name' cannot start with a capital letter 'Q' and be 4 characters in length
201              
202             =back
203              
204             If those restrictions are broken, die() will be called with the following
205             error:
206              
207             die "IPC::DirQueue: invalid metadatum: '$k'";
208              
209             This is a change added in release 0.06; prior to that, that metadatum would be
210             silently dropped.
211              
212             An optional priority can be specified; lower priorities are run first.
213             Priorities range from 0 to 99, and 50 is default.
214              
215             =cut
216              
217             sub enqueue_file {
218 0     0 1 0 my ($self, $file, $metadata, $pri) = @_;
219 0 0       0 if (!open (IN, "<$file")) {
220 0         0 warn "IPC::DirQueue: cannot open $file for read: $!";
221 0         0 return;
222             }
223 0         0 my $ret = $self->_enqueue_backend ($metadata, $pri, \*IN);
224 0         0 close IN;
225 0         0 return $ret;
226             }
227              
228             =item $dq->enqueue_fh ($filehandle [, $metadata [, $pri] ] );
229              
230             Enqueue a new job for processing. Returns C<1> if the job was enqueued, or
231             C on failure. C<$pri> and C<$metadata> are as described in
232             C<$dq-Eenqueue_file()>.
233              
234             C<$filehandle> is a perl file handle that must be open for reading. It will be
235             closed on completion, regardless of success or failure. Its contents will be
236             read, and will be used as the contents of the data file available to dequeuers
237             using C.
238              
239             =cut
240              
241             sub enqueue_fh {
242 10     10 1 482 my ($self, $fhin, $metadata, $pri) = @_;
243 10         25 my $ret = $self->_enqueue_backend ($metadata, $pri, $fhin);
244 10         15 close $fhin;
245 10         54 return $ret;
246             }
247              
248             =item $dq->enqueue_string ($string [, $metadata [, $pri] ] );
249              
250             Enqueue a new job for processing. The job data is entirely read from
251             C<$string>. Returns C<1> if the job was enqueued, or C on failure.
252             C<$pri> and C<$metadata> are as described in C<$dq-Eenqueue_file()>.
253              
254             =cut
255              
256             sub enqueue_string {
257 1010     1010 1 204985688 my ($self, $string, $metadata, $pri) = @_;
258 1010         3744 my $enqd_already = 0;
259             return $self->_enqueue_backend ($metadata, $pri, undef,
260             sub {
261 2020 100   2020   8181 return if $enqd_already++;
262 1010         2839 return $string;
263 1010         20278 });
264             }
265              
266             =item $dq->enqueue_sub ($subref [, $metadata [, $pri] ] );
267              
268             Enqueue a new job for processing. Returns C<1> if the job was enqueued, or
269             C on failure. C<$pri> and C<$metadata> are as described in
270             C<$dq-Eenqueue_file()>.
271              
272             C<$subref> is a perl subroutine, which is expected to return one of the
273             following each time it is called:
274              
275             - a string of data bytes to be appended to any existing data. (the
276             string may be empty, C<''>, in which case it's a no-op.)
277              
278             - C when the enqueued data has ended, ie. EOF.
279              
280             - C if an error occurs. The C message will be converted into
281             a warning, and the C call will return C.
282              
283             (Tip: note that this is a closure, so variables outside the subroutine can be
284             accessed safely.)
285              
286             =cut
287              
288             sub enqueue_sub {
289 100     100 1 13974 my ($self, $subref, $metadata, $pri) = @_;
290 100         232 return $self->_enqueue_backend ($metadata, $pri, undef, $subref);
291             }
292              
293             # private implementation.
294             sub _enqueue_backend {
295 1120     1120   3503 my ($self, $metadata, $pri, $fhin, $callbackin) = @_;
296              
297 1120 50       20136 if (!defined($pri)) { $pri = 50; }
  1120         2561  
298 1120 50 33     10309 if ($pri < 0 || $pri > 99) {
299 0         0 warn "IPC::DirQueue: bad priority $pri is > 99 or < 0";
300 0         0 return;
301             }
302              
303 1120         22877 my ($now, $nowmsecs) = Time::HiRes::gettimeofday;
304              
305 1120         9600 my $job = {
306             pri => $pri,
307             metadata => $metadata,
308             time_submitted_secs => $now,
309             time_submitted_msecs => $nowmsecs
310             };
311              
312             # NOTE: this can change until the moment we've renamed the ctrl file
313             # into 'queue'!
314 1120         7510 my $qfnametmp = $self->new_q_filename($job);
315 1120         2879 my $qcnametmp = $qfnametmp;
316              
317 1120         5805 my $pathtmp = $self->q_subdir('tmp');
318 1120         13796 $self->ensure_dir_exists ($pathtmp);
319              
320 1120         3900 my $pathtmpctrl = $pathtmp.SLASH.$qfnametmp.".ctrl";
321 1120         3207 my $pathtmpdata = $pathtmp.SLASH.$qfnametmp.".data";
322              
323 1120 50       3366461 if (!sysopen (OUT, $pathtmpdata, O_WRONLY|O_CREAT|O_EXCL,
324             $self->{data_file_mode}))
325             {
326 0         0 warn "IPC::DirQueue: cannot open $pathtmpdata for write: $!";
327 0         0 return;
328             }
329 1120         10041 my $pathtmpdata_created = 1;
330              
331 1120         2661 my $siz;
332 1120         12582 eval {
333 1120         7088 $siz = $self->copy_in_to_out_fh ($fhin, $callbackin,
334             \*OUT, $pathtmpdata);
335             };
336 1120 50       3440 if ($@) {
337 0         0 warn "IPC::DirQueue: enqueue failed: $@";
338             }
339 1120 50       24436 if (!defined $siz) {
340 0         0 goto failure;
341             }
342 1120         3680 $job->{size_bytes} = $siz;
343              
344             # get the data dir
345 1120         5640 my $pathdatadir = $self->q_subdir('data');
346              
347             # hashing the data dir, using 2 levels of directory hashing. This has a tiny
348             # effect on speed in all cases up to 10k queued files, but has good results
349             # in terms of the usability of those dirs for users doing direct access, so
350             # enabled by default.
351 1120         2613 if (1) {
352             # take the last two chars for the hashname. In most cases, this will
353             # be the last 2 chars of a hash of (hostname, pid), so effectively
354             # random. Remove it from the filename entirely, since it's redundant
355             # to have it both in the dir name and the filename.
356 1120         11645 $qfnametmp =~ s/([A-Za-z0-9+_])([A-Za-z0-9+_])$//;
357 1120   50     9105 my $hash1 = $1 || '0';
358 1120   50     5422 my $hash2 = $2 || '0';
359 1120         1987 my $origdatadir = $pathdatadir;
360 1120         4807 $pathdatadir = "$pathdatadir/$hash1/$hash2";
361             # check to see if that hashdir exists... build it up if req'd
362 1120 100       55537 if (!-d $pathdatadir) {
363 15         216 foreach my $dir ($origdatadir, "$origdatadir/$hash1", $pathdatadir)
364             {
365 45 100       359870 (-d $dir) or mkdir ($dir);
366             }
367             }
368             }
369              
370             # now link(2) the data tmpfile into the 'data' dir.
371 1120         8431 my $pathdata = $self->link_into_dir ($job, $pathtmpdata,
372             $pathdatadir, $qfnametmp);
373 1120 50       3007 if (!$pathdata) {
374 0         0 goto failure;
375             }
376 1120         1868 my $pathdata_created = 1;
377 1120         4159 $job->{pathdata} = $pathdata;
378              
379             # ok, write a control file now that data is safe and we know it's
380             # new filename...
381 1120 50       5366 if (!$self->create_control_file ($job, $pathtmpdata, $pathtmpctrl)) {
382 0         0 goto failure;
383             }
384 1120         2024 my $pathtmpctrl_created = 1;
385              
386             # now link(2) that into the 'queue' dir.
387 1120         4832 my $pathqueuedir = $self->q_subdir('queue');
388 1120         5335 my $fanout = $self->queue_dir_fanout_create($pathqueuedir);
389              
390 1120         4762 my $pathqueue = $self->link_into_dir ($job, $pathtmpctrl,
391             $self->queue_dir_fanout_path($pathqueuedir, $fanout),
392             $qcnametmp);
393              
394 1120 50       3082 if (!$pathqueue) {
395 0         0 dbg ("failed to link_into_dir, enq failed");
396 0         0 goto failure;
397             }
398              
399             # and incr the fanout counter for that fanout dir
400 1120         10855 $self->queue_dir_fanout_commit($pathqueuedir, $fanout);
401              
402             # touch the "queue" directory to indicate that it's changed
403             # and a file has been enqueued; required to support Reiserfs
404             # and XFS, where this is not implicit
405 1120         3787 $pathqueuedir = $self->q_subdir('queue');
406 1120 50       4148 $self->touch($pathqueuedir) or warn "touch failed on $pathqueuedir";
407 1120         6424 dbg ("touched $pathqueuedir at ".time);
408              
409 1120 50       4273 if ($self->{indexclient}) {
410 0         0 $self->{indexclient}->enqueue($pathqueuedir, $pathqueue);
411             }
412              
413             # my $pathqueue_created = 1; # not required, we're done!
414 1120         16787 return 1;
415              
416             failure:
417 0 0       0 if ($pathtmpctrl_created) {
418 0 0       0 unlink $pathtmpctrl or warn "IPC::DirQueue: cannot unlink $pathtmpctrl";
419             }
420 0 0       0 if ($pathtmpdata_created) {
421 0 0       0 unlink $pathtmpdata or warn "IPC::DirQueue: cannot unlink $pathtmpdata";
422             }
423 0 0       0 if ($pathdata_created) {
424 0 0       0 unlink $pathdata or warn "IPC::DirQueue: cannot unlink $pathdata";
425             }
426 0         0 return;
427             }
428              
429             ###########################################################################
430              
431             =item $job = $dq->pickup_queued_job( [ path => $path ] );
432              
433             Pick up the next job in the queue, so that it can be processed.
434              
435             If no job is available for processing, either because the queue is
436             empty or because other worker processes are already working on
437             them, C is returned; otherwise, a new instance of C
438             is returned.
439              
440             Note that the job is marked as I until C<$job-Efinish()>
441             is called.
442              
443             If the (optional) parameter C is used, its value indicates the path of
444             the desired job's data file. By using this, it is possible to cancel
445             not-yet-active items from anywhere in the queue, or pick up jobs out of
446             sequence. The data path must match the value of the I member of
447             the C object passed to the C callback.
448              
449             =cut
450              
451             sub pickup_queued_job {
452 403     403 1 2055 my ($self, %args) = @_;
453              
454 403         876 my $pathqueuedir = $self->q_subdir('queue');
455 403         975 my $pathactivedir = $self->q_subdir('active');
456 403         1128 $self->ensure_dir_exists ($pathactivedir);
457              
458 403         1227 my $iter = $self->queue_iter_start($pathqueuedir);
459              
460 403         1677 while (1) {
461 1457         3822 my $nextfile = $self->queue_iter_next($iter);
462              
463 1457 100       3025 if (!defined $nextfile) {
464             # no more files in the queue, return empty
465 3         7 last;
466             }
467              
468 1454         3081 my $nextfilebase = $self->queue_dir_fanout_path_strip($nextfile);
469              
470 1454 100       4358 next if ($nextfilebase !~ /^\d/);
471 1059         2336 my $pathactive = $pathactivedir.SLASH.$nextfilebase;
472 1059         1561 my $pathqueue = $pathqueuedir.SLASH.$nextfile;
473              
474 1059 100 100     4374 next if (exists($args{path}) && ($pathqueue ne $args{path}));
475              
476 400         42597 my ($dev,$ino,$mode,$nlink,$uid,$gid,$rdev,$size,
477             $atime,$mtime,$ctime,$blksize,$blocks) = lstat($pathactive);
478              
479 400 50       927 if (defined $mtime) {
480             # *DO* call time() here. In extremely large dirs, it may take
481             # several seconds to traverse the entire listing from start
482             # to finish!
483 0 0       0 if (time() - $mtime < $self->{active_file_lifetime}) {
484             # active lockfile; it's being worked on. skip this file
485 0         0 next;
486             }
487              
488 0 0       0 if ($self->worker_still_working($pathactive)) {
489             # worker is still alive, although not updating the lock
490 0         0 dbg ("worker still working, skip: $pathactive");
491 0         0 next;
492             }
493              
494             # now, we want to try to avoid 2 or 3 dequeuers removing
495             # the lockfile simultaneously, as that could cause this race:
496             #
497             # dqproc1: [checks file] [unlinks] [starts work]
498             # dqproc2: [checks file] [unlinks]
499             #
500             # ie. the second process unlinks the first process' brand-new
501             # lockfile!
502             #
503             # to avoid this, use a random "fudge" on the timeout, so
504             # that dqproc2 will wait for possibly much longer than
505             # dqproc1 before it decides to unlink it.
506             #
507             # this isn't perfect. TODO: is there a "rename this fd" syscall
508             # accessible from perl?
509              
510 0         0 my $fudge = get_random_int() % 256;
511 0 0       0 if (time() - $mtime < $self->{active_file_lifetime}+$fudge) {
512             # within the fudge zone. don't unlink it in this process.
513 0         0 dbg ("within active fudge zone, skip: $pathactive");
514 0         0 next;
515             }
516              
517             # else, we can kill the stale lockfile
518 0 0       0 unlink $pathactive or warn "IPC::DirQueue: unlink failed: $pathactive";
519 0         0 warn "IPC::DirQueue: killed stale lockfile: $pathactive";
520             }
521              
522             # ok, we're free to get cracking on this file.
523 400         902 my $pathtmp = $self->q_subdir('tmp');
524 400         28807 $self->ensure_dir_exists ($pathtmp);
525              
526             # use the name of the queue file itself, plus a tmp prefix, plus active
527 400         1582 my $pathtmpactive = $pathtmp.SLASH.
528             $nextfilebase.".".$self->new_lock_filename().".active";
529              
530 400         5256 dbg ("creating tmp active $pathtmpactive");
531 400 50       54580 if (!sysopen (LOCK, $pathtmpactive, O_WRONLY|O_CREAT|O_EXCL,
532             $self->{queue_file_mode}))
533             {
534 0 0       0 if ($!{EEXIST}) {
535             # contention; skip this file
536 0         0 dbg ("IPC::DirQueue: $pathtmpactive already created, skipping: $!");
537             }
538             else {
539             # could be serious; disk space, permissions etc.
540 0         0 warn "IPC::DirQueue: cannot open $pathtmpactive for write: $!";
541             }
542 0         0 next;
543             }
544 400         1377 print LOCK $self->gethostname(), "\n", $$, "\n";
545 400         26219 close LOCK;
546              
547 400 50       22167 if (!-f $pathqueue) {
548             # queue file already gone; another worker got it before we did.
549             # catch this case before we create a lockfile.
550             # see the "pathqueue_gone" comment below for an explanation
551 0         0 dbg("IPC::DirQueue: $pathqueue no longer exists, skipping");
552 0         0 goto nextfile;
553             }
554              
555 400         3767 my $job = IPC::DirQueue::Job->new ($self, {
556             jobid => $nextfilebase,
557             pathqueue => $pathqueue,
558             pathactive => $pathactive
559             });
560              
561 400         1337 my $pathnewactive = $self->link_into_dir_no_retry ($job,
562             $pathtmpactive, $pathactivedir, $nextfilebase);
563 400 50       921 if (!defined($pathnewactive)) {
564             # link failed; another worker got it before we did
565             # no need to unlink tmpfile, the "nextfile" action will do that
566 0         0 goto nextfile;
567             }
568              
569 400 50       934 if ($pathactive ne $pathnewactive) {
570 0         0 die "oops! active paths differ: $pathactive $pathnewactive";
571             }
572              
573 400 50       21251 if (!open (IN, "<".$pathqueue))
574             {
575             # since we read the list of files upfront, this can happen:
576             #
577             # dqproc1: [gets lock] [work] [finish_job]
578             # dqproc2: [gets lock]
579             #
580             # "dqproc1" has already completed the job, unlinking both the active
581             # *and* queue files, by the time "dqproc2" gets to it. This is OK;
582             # just skip the file, since it's already done. [pathqueue_gone]
583              
584 0         0 dbg("IPC::DirQueue: cannot open $pathqueue for read: $!");
585 0         0 unlink $pathnewactive;
586 0         0 next; # NOT "goto nextfile", as pathtmpactive is already unlinked
587             }
588              
589 400         3944 my $red = $self->read_control_file ($job, \*IN);
590 400         4546 close IN;
591              
592 400 50       1003 next if (!$red);
593              
594 400         1257 $self->queue_iter_stop($iter);
595 400         3889 return $job;
596              
597 0 0       0 nextfile:
598             unlink $pathtmpactive or warn "IPC::DirQueue: unlink failed: $pathtmpactive";
599             }
600              
601 3         12 $self->queue_iter_stop($iter);
602 3         17 return; # empty
603             }
604              
605             ###########################################################################
606              
607             =item $job = $dq->wait_for_queued_job ([ $timeout [, $pollinterval] ]);
608              
609             Wait for a job to be queued within the next C<$timeout> seconds.
610              
611             If there is already a job ready for processing, this will return immediately.
612             If one is not available, it will sleep, wake up periodically, check for job
613             availabilty, and either carry on sleeping or return the new job if one
614             is now available.
615              
616             If a job becomes available, a new instance of C is
617             returned. If the timeout is reached, C is returned.
618              
619             If C<$timeout> is not specified, or is less than 1, this function will wait
620             indefinitely.
621              
622             The optional parameter C<$pollinterval> indicates how frequently to wake
623             up and check for new jobs. It is specified in seconds, and floating-point
624             precision is supported. The default is C<1>.
625              
626             Note that if C<$timeout> is not a round multiple of C<$pollinterval>,
627             the nearest round multiple of C<$pollinterval> greater than C<$timeout>
628             will be used instead. Also note that C<$timeout> is used as an integer.
629              
630             =cut
631              
632             sub wait_for_queued_job {
633 310     310 1 14533 my ($self, $timeout, $pollintvl) = @_;
634              
635 310         445 my $finishtime;
636 310 50 33     2618 if ($timeout && $timeout > 0) {
637 0         0 $finishtime = time + int ($timeout);
638             }
639              
640 310         671 dbg "wait_for_queued_job starting";
641              
642 310 50       861 if ($pollintvl) {
643 0         0 $pollintvl *= 1000000; # from secs to usecs
644             } else {
645 310         616 $pollintvl = 1000000; # default: 1 sec
646             }
647              
648 310         883 my $pathqueuedir = $self->q_subdir('queue');
649 310         1009 $self->ensure_dir_exists ($pathqueuedir);
650              
651             # TODO: would be nice to use fam for this, where available. But
652             # no biggie...
653              
654 310         410 while (1) {
655             # check the stat time on the queue dir *before* we call pickup,
656             # to avoid a race condition where a job is added while we're
657             # checking in that function.
658              
659 310         10740 my @stat = stat ($pathqueuedir);
660 310         810 my $qdirlaststat = $stat[9];
661              
662 310         809 my $job = $self->pickup_queued_job();
663 310 50       764 if ($job) { return $job; }
  310         1526  
664              
665             # there's another semi-race condition here, brought about by a lack of
666             # sub-second precision from stat(2). if the last enq occurred inside
667             # *this* current 1-second window, then *another* one can happen inside this
668             # second right afterwards, and we wouldn't notice.
669              
670             # in other words (ASCII-art alert):
671             # TIME | t | t+1
672             # E | enq enq |
673             # D | stat pickup_queued_job |
674              
675             # the enqueuer process E enqueues a job just after the stat, inside the
676             # 1-second period "t". dequeuer process D dequeues it with
677             # pickup_queued_job(). all is well. But then, E enqueues another job
678             # inside the same 1-second period "t", and since the stat() has already
679             # happened for "t", and since we've already picked up the job in "t", we
680             # don't recheck; result is, we miss this enqueue event.
681             #
682             # Avoid this by checking in a busy-loop until time(2) says we're out of
683             # that "danger zone" 1-second period. Any further enq's would then
684             # cause stat(2) to report a different timestamp.
685              
686 0         0 while (time == $qdirlaststat) {
687 0         0 Time::HiRes::usleep ($pollintvl);
688 0         0 dbg "wait_for_queued_job: spinning until time != stat $qdirlaststat";
689 0         0 my $job = $self->pickup_queued_job();
690 0 0       0 if ($job) { return $job; }
  0         0  
691             }
692              
693             # sleep until the directory's mtime changes from what it was when
694             # we ran pickup_queued_job() last.
695              
696 0         0 dbg "wait_for_queued_job: sleeping on $pathqueuedir";
697 0         0 while (1) {
698 0         0 my $now = time;
699 0 0 0     0 if ($finishtime && $now >= $finishtime) {
700 0         0 dbg "wait_for_queued_job timed out";
701 0         0 return undef; # out of time
702             }
703              
704 0         0 Time::HiRes::usleep ($pollintvl);
705              
706 0         0 @stat = stat ($pathqueuedir);
707             # dbg "wait_for_queued_job: stat $stat[9] $qdirlaststat $pathqueuedir";
708 0 0 0     0 last if (defined $stat[9] &&
      0        
709             ((defined $qdirlaststat && $stat[9] != $qdirlaststat)
710             || !defined $qdirlaststat));
711             }
712              
713 0         0 dbg "wait_for_queued_job: activity, calling pickup";
714             }
715             }
716              
717             ###########################################################################
718              
719             =item $dq->visit_all_jobs($visitor, $visitcontext);
720              
721             Visit all the jobs in the queue, in a read-only mode. Used to list
722             the entire queue.
723              
724             The callback function C<$visitor> will be called for each job in
725             the queue, like so:
726              
727             &$visitor ($visitcontext, $job);
728              
729             C<$visitcontext> is whatever you pass in that variable above.
730             C<$job> is a new, read-only instance of C representing
731             that job.
732              
733             If a job is active (being processed), the C<$job> object also contains the
734             following additional data:
735              
736             'active_host': the hostname on which the job is active
737             'active_pid': the process ID of the process which picked up the job
738              
739             =cut
740              
741             sub visit_all_jobs {
742 4     4 1 997 my ($self, $visitor, $visitcontext) = @_;
743              
744 4         17 my $pathqueuedir = $self->q_subdir('queue');
745 4         14 my $pathactivedir = $self->q_subdir('active');
746              
747 4         37 my $iter = $self->queue_iter_start($pathqueuedir);
748              
749 4         17 my $nextfile;
750 4         9 while (1) {
751 122         933 $nextfile = $self->queue_iter_next($iter);
752              
753 122 100       245 if (!defined $nextfile) {
754             # no more files in the queue, return empty
755 4         11 last;
756             }
757              
758 118         246 my $nextfilebase = $self->queue_dir_fanout_path_strip($nextfile);
759              
760 118 100       386 next if ($nextfilebase !~ /^\d/);
761 90         176 my $pathqueue = $pathqueuedir.SLASH.$nextfile;
762 90         261 my $pathactive = $pathactivedir.SLASH.$nextfilebase;
763              
764 90 50       1495 next if (!-f $pathqueue);
765              
766 90         106 my $acthost;
767             my $actpid;
768 90 50       1268 if (open (IN, "<$pathactive")) {
769 0         0 $acthost = ; chomp $acthost;
  0         0  
770 0         0 $actpid = ; chomp $actpid;
  0         0  
771 0         0 close IN;
772             }
773              
774 90         741 my $job = IPC::DirQueue::Job->new ($self, {
775             is_readonly => 1, # means finish() will not rm files
776             jobid => $nextfilebase,
777             active_host => $acthost,
778             active_pid => $actpid,
779             pathqueue => $pathqueue,
780             pathactive => $pathactive
781             });
782              
783 90 50       3209 if (!open (IN, "<".$pathqueue)) {
784 0         0 dbg ("queue file disappeared, job finished? skip: $pathqueue");
785 0         0 next;
786             }
787              
788 90         272 my $red = $self->read_control_file ($job, \*IN);
789 90         845 close IN;
790              
791 90 50       200 if (!$red) {
792 0         0 warn "IPC::DirQueue: cannot read control file: $pathqueue";
793 0         0 next;
794             }
795              
796 90         229 &$visitor ($visitcontext, $job);
797             }
798              
799 4         19 $self->queue_iter_stop($iter);
800 4         18 return;
801             }
802              
803             ###########################################################################
804              
805             # private API: performs logic of IPC::DirQueue::Job::finish().
806             sub finish_job {
807 400     400 0 633 my ($self, $job, $isdone) = @_;
808              
809 400         1175 dbg ("finish_job: ", $job->{pathactive});
810              
811 400 50       967 if ($job->{is_readonly}) {
812 0         0 return;
813             }
814              
815 400 50       837 if ($isdone) {
816 400 50       95478 unlink($job->{pathqueue})
817             or warn "IPC::DirQueue: unlink failed: $job->{pathqueue}";
818 400 50       35962 unlink($job->{QDFN})
819             or warn "IPC::DirQueue: unlink failed: $job->{QDFN}";
820              
821 400 50       1216 if ($self->{indexclient}) {
822 0         0 my $pathqueuedir = $self->q_subdir('queue');
823 0         0 $self->{indexclient}->dequeue($pathqueuedir, $job->{pathqueue});
824             }
825              
826             # touch the dir so that other dequeuers re-check; activity can
827             # introduce a small race, I think. (don't think this is necessary)
828             # $self->touch($pathqueuedir) or warn "touch failed on $pathqueuedir";
829             }
830              
831 400 50       55193 unlink($job->{pathactive})
832             or warn "IPC::DirQueue: unlink failed: $job->{pathactive}";
833             }
834              
835             ###########################################################################
836              
837             sub get_dir_filelist_sorted {
838 343     343 0 530 my ($self, $dir) = @_;
839              
840 343 100       8755 if (!opendir (DIR, $dir)) {
841 1         3 return []; # no dir? nothing queued
842             }
843             # have to read the lot, to sort them.
844 342         41730 my @files = sort grep { /^\d/ } readdir(DIR);
  16384         47175  
845 342         6808 closedir DIR;
846 342         3726 return \@files;
847             }
848              
849             ###########################################################################
850              
851             sub copy_in_to_out_fh {
852 1120     1120 0 3033 my ($self, $fhin, $callbackin, $fhout, $outfname) = @_;
853              
854 1120         3277 my $buf;
855             my $len;
856 1120         4241 my $siz = 0;
857              
858 1120         12121 binmode $fhout;
859 1120 100       5595 if ($callbackin) {
860 1110         1820 while (1) {
861 2420         7238 my $stringin = $callbackin->();
862              
863 2420 100       8219 if (!defined($stringin)) {
864 1110         2985 last; # EOF
865             }
866              
867 1310         2793 $len = length ($stringin);
868 1310 50       3686 next if ($len == 0); # empty string, nothing to write
869              
870 1310 50       3842752 if (!print $fhout $stringin) {
871 0         0 warn "IPC::DirQueue: enqueue: cannot write to $outfname: $!";
872 0         0 close $fhout;
873 0         0 return;
874             }
875 1310         2468 $siz += $len;
876             }
877             }
878             else {
879 10         16 binmode $fhin;
880 10         185 while (($len = read ($fhin, $buf, $self->{buf_size})) > 0) {
881 10 50       69 if (!print $fhout $buf) {
882 0         0 warn "IPC::DirQueue: cannot write to $outfname: $!";
883 0         0 close $fhin; close $fhout;
  0         0  
884 0         0 return;
885             }
886 10         37 $siz += $len;
887             }
888 10         94 close $fhin;
889             }
890              
891 1120 50       43920005 if (!close $fhout) {
892 0         0 warn "IPC::DirQueue: cannot close $outfname";
893 0         0 return;
894             }
895 1120         6438 return $siz;
896             }
897              
898             sub link_into_dir {
899 2240     2240 0 5312 my ($self, $job, $pathtmp, $pathlinkdir, $qfname) = @_;
900 2240         5480 $self->ensure_dir_exists ($pathlinkdir);
901 2240         2902 my $path;
902              
903             # retry 10 times; add a random few digits on link(2) failure
904 2240         3661 my $maxretries = 10;
905 2240         6374 for my $retry (1 .. $maxretries) {
906 2240         6498 $path = $pathlinkdir.SLASH.$qfname;
907              
908 2240         8335 dbg ("link_into_dir retry=", $retry, " tmp=", $pathtmp, " path=", $path);
909              
910 2240 50       17790520 if (link ($pathtmp, $path)) {
911 2240         5310 last; # got it
912             }
913              
914             # link() may return failure, even if it succeeded.
915             # use lstat() to verify that link() really failed.
916 0         0 my ($dev,$ino,$mode,$nlink,$uid) = lstat($pathtmp);
917 0 0       0 if ($nlink == 2) {
918 0         0 last; # got it
919             }
920              
921             # failed. check for retry limit first
922 0 0       0 if ($retry == $maxretries) {
923 0         0 warn "IPC::DirQueue: cannot link $pathtmp to $path";
924 0         0 return;
925             }
926              
927             # try a new q_filename, use randomness to avoid
928             # further collisions
929 0         0 $qfname = $self->new_q_filename($job, 1);
930              
931 0         0 dbg ("link_into_dir retrying: $retry");
932 0         0 Time::HiRes::usleep (250 * $retry);
933             }
934              
935             # got it! unlink(2) the tmp file, since we don't need it.
936 2240         14866 dbg ("link_into_dir unlink tmp file: $pathtmp");
937 2240 50       231222 if (!unlink ($pathtmp)) {
938 0         0 warn "IPC::DirQueue: cannot unlink $pathtmp";
939             # non-fatal, we can still continue anyway
940             }
941              
942 2240         13718 dbg ("link_into_dir return: $path");
943 2240         6846 return $path;
944             }
945              
946             sub link_into_dir_no_retry {
947 400     400 0 779 my ($self, $job, $pathtmp, $pathlinkdir, $qfname) = @_;
948 400         821 $self->ensure_dir_exists ($pathlinkdir);
949              
950 400         971 dbg ("lidnr: ", $pathtmp, " ", $pathlinkdir, "/", $qfname);
951              
952 400         10663 my ($dev1,$ino1,$mode1,$nlink1,$uid1) = lstat($pathtmp);
953 400 50       1186 if (!defined $nlink1) {
954 0         0 warn ("lidnr: tmp file disappeared?! $pathtmp");
955 0         0 return; # not going to have much luck here
956             }
957              
958 400         857 my $path = $pathlinkdir.SLASH.$qfname;
959              
960 400 50       5427 if (-f $path) {
961 0         0 dbg ("lidnr: target file already exists: $path");
962 0         0 return; # we've been beaten to it
963             }
964              
965 400         549 my $linkfailed;
966 400 50       40396 if (!link ($pathtmp, $path)) {
967 0         0 dbg("link failure, recovering: $!");
968 0         0 $linkfailed = 1;
969             }
970              
971             # link() may return failure, even if it succeeded. use lstat() to verify that
972             # link() really failed. use lstat() even if it reported success, just to be
973             # sure. ;)
974              
975 400         6268 my ($dev3,$ino3,$mode3,$nlink3,$uid3) = lstat($path);
976 400 50       5285 if (!defined $nlink3) {
977 0         0 dbg ("lidnr: link failed, target file nonexistent: $path");
978 0         0 return;
979             }
980              
981             # now, be paranoid and verify that the inode data is identical
982 400 50 33     3472 if ($dev1 != $dev3 || $ino1 != $ino3 || $uid1 != $uid3) {
      33        
983             # the tmpfile and the target don't match each other.
984             # if the link failed, this means that another qproc got
985             # the file before we did, which is not an error.
986 0 0       0 if (!$linkfailed) {
987             # link supposedly succeeded, so this *is* an error. warn
988 0         0 warn ("lidnr: tmp file doesn't match target: $path ($dev3,$ino3,$mode3,$nlink3,$uid3) vs $pathtmp ($dev1,$ino1,$mode1,$nlink1,$uid1)");
989             }
990 0         0 return;
991             }
992            
993             # got it! unlink(2) the tmp file, since we don't need it.
994 400         1253 dbg ("lidnr: unlink tmp file: $pathtmp");
995 400 50       30321 if (!unlink ($pathtmp)) {
996 0         0 warn "IPC::DirQueue: cannot unlink $pathtmp";
997             # non-fatal, we can still continue anyway
998             }
999              
1000 400         1197 dbg ("lidnr: return: $path");
1001 400         2465 return $path;
1002             }
1003              
1004             sub create_control_file {
1005 1120     1120 0 2777 my ($self, $job, $pathtmpdata, $pathtmpctrl) = @_;
1006              
1007 1120         6403 dbg ("create_control_file $pathtmpctrl for $pathtmpdata ($job->{pathdata})");
1008 1120 50       140349 if (!sysopen (OUT, $pathtmpctrl, O_WRONLY|O_CREAT|O_EXCL,
1009             $self->{queue_file_mode}))
1010             {
1011 0         0 warn "IPC::DirQueue: cannot open $pathtmpctrl for write: $!";
1012 0         0 return;
1013             }
1014              
1015 1120         7745 print OUT "QDFN: ", $job->{pathdata}, "\n";
1016 1120         7610 print OUT "QDSB: ", $job->{size_bytes}, "\n";
1017 1120         4324 print OUT "QSTT: ", $job->{time_submitted_secs}, "\n";
1018 1120         4984 print OUT "QSTM: ", $job->{time_submitted_msecs}, "\n";
1019 1120         3897 print OUT "QSHN: ", $self->gethostname(), "\n";
1020              
1021 1120         2889 my $md = $job->{metadata};
1022 1120         1669 foreach my $k (keys %{$md}) {
  1120         5154  
1023 1120         2885 my $v = $md->{$k};
1024 1120 50 33     15395 if (($k =~ /^Q...$/)
      33        
1025             || ($k =~ /[:\0\n]/s)
1026             || ($v =~ /[\0\n]/s))
1027             {
1028 0         0 close OUT;
1029 0         0 die "IPC::DirQueue: invalid metadatum: '$k'"; # TODO: clean up files?
1030             }
1031 1120         7583 print OUT $k, ": ", $v, "\n";
1032             }
1033              
1034 1120 50       9898090 if (!close (OUT)) {
1035 0         0 warn "IPC::DirQueue: cannot close $pathtmpctrl for write: $!";
1036 0         0 return;
1037             }
1038              
1039 1120         4787 return 1;
1040             }
1041              
1042             sub read_control_file {
1043 490     490 0 973 my ($self, $job, $infh) = @_;
1044 490         629 local ($_);
1045              
1046 490         7792 while (<$infh>) {
1047 2940         19660 my ($k, $value) = split (/: /, $_, 2);
1048 2940         4028 chop $value;
1049 2940 100       8907 if ($k =~ /^Q[A-Z]{3}$/) {
1050 2450         20818 $job->{$k} = $value;
1051             }
1052             else {
1053 490         5077 $job->{metadata}->{$k} = $value;
1054             }
1055             }
1056              
1057             # all jobs must have a datafile (even if it's empty)
1058 490 50 33     11227 if (!$job->{QDFN} || !-f $job->{QDFN}) {
1059 0         0 return;
1060             }
1061              
1062 490         1743 return $job;
1063             # print OUT "QDFN: ", $job->{pathdata}, "\n";
1064             # print OUT "QDSB: ", $job->{size_bytes}, "\n";
1065             # print OUT "QSTT: ", $job->{time_submitted_secs}, "\n";
1066             # print OUT "QSTM: ", $job->{time_submitted_msecs}, "\n";
1067             # print OUT "QSHN: ", $self->gethostname(), "\n";
1068             }
1069              
1070             sub worker_still_working {
1071 0     0 0 0 my ($self, $fname) = @_;
1072 0 0       0 if (!$fname) {
1073 0         0 return;
1074             }
1075 0 0       0 if (!open (IN, "<".$fname)) {
1076 0         0 return;
1077             }
1078 0         0 my $hname = ; chomp $hname;
  0         0  
1079 0         0 my $wpid = ; chomp $wpid;
  0         0  
1080 0         0 close IN;
1081 0 0       0 if ($hname eq $self->gethostname()) {
1082 0 0       0 if (!kill (0, $wpid)) {
1083 0         0 return; # pid is local and no longer running
1084             }
1085             }
1086              
1087             # pid is still running, or remote
1088 0         0 return 1;
1089             }
1090              
1091             ###########################################################################
1092              
1093             sub q_dir {
1094 6004     6004 0 9126 my ($self) = @_;
1095 6004         51461 return $self->{dir};
1096             }
1097              
1098             sub q_subdir {
1099 6004     6004 0 12163 my ($self, $subdir) = @_;
1100 6004         20900 return $self->q_dir().SLASH.$subdir;
1101             }
1102              
1103             sub new_q_filename {
1104 1120     1120 0 2304 my ($self, $job, $addextra) = @_;
1105              
1106 1120         12616 my @gmt = gmtime ($job->{time_submitted_secs});
1107              
1108             # NN.20040718140300MMMM.hash(hostname.$$)[.rand]
1109             #
1110             # NN = priority, default 50
1111             # MMMM = microseconds from Time::HiRes::gettimeofday()
1112             # hostname = current hostname
1113              
1114 1120         12604 my $buf = sprintf ("%02d.%04d%02d%02d%02d%02d%02d%06d.%s",
1115             $job->{pri},
1116             $gmt[5]+1900, $gmt[4]+1, $gmt[3], $gmt[2], $gmt[1], $gmt[0],
1117             $job->{time_submitted_msecs},
1118             hash_string_to_filename ($self->gethostname().$$));
1119              
1120             # normally, this isn't used. but if there's a collision,
1121             # all retries after that will do this; in this case, the
1122             # extra anti-collision stuff is useful
1123 1120 50       4287 if ($addextra) {
1124 0         0 $buf .= ".".$$.".".$self->get_random_int();
1125             }
1126              
1127 1120         4609 return $buf;
1128             }
1129              
1130             sub hash_string_to_filename {
1131 1120     1120 0 3310 my ($str) = @_;
1132             # get a 16-bit checksum of the input, then uuencode that string
1133 1120         13994 $str = pack ("u*", unpack ("%16C*", $str));
1134             # transcode from uuencode-space into safe, base64-ish space
1135 1120         5823 $str =~ y/ -_/A-Za-z0-9+_/;
1136             # and remove the stuff that wasn't in that "safe" range
1137 1120         4067 $str =~ y/A-Za-z0-9+_//cd;
1138 1120         12647 return $str;
1139             }
1140              
1141             sub new_lock_filename {
1142 400     400 0 636 my ($self) = @_;
1143 400         1261 return sprintf ("%d.%s.%d", time, $self->gethostname(), $$);
1144             }
1145              
1146             sub get_random_int {
1147 130     130 0 231 my ($self) = @_;
1148              
1149             # we try to use /dev/random first, as that's globally random for all PIDs on
1150             # the system. this avoids brokenness if the caller has called srand(), then
1151             # forked multiple enqueueing procs, as they will all share the same seed and
1152             # will all return the same "random" output.
1153 130         166 my $buf;
1154 130 50 33     2701 if (sysopen (IN, "
1155 0         0 my ($hi, $lo) = unpack ("C2", $buf);
1156 0         0 return ($hi << 8) | $lo;
1157             } else {
1158             # fall back to plain old rand(), use perl's implicit srand() call,
1159             # and hope caller hasn't called srand() yet in a parent process.
1160 130         1005 return int rand (65536);
1161             }
1162             }
1163              
1164             sub gethostname {
1165 3040     3040 0 6736 my ($self) = @_;
1166              
1167 3040         9931 my $hname = $self->{myhostname};
1168 3040 100       26152 return $hname if $hname;
1169              
1170             # try using Sys::Hostname. may fail on non-UNIX platforms
1171 16     16   7853 eval '
  16         42804  
  16         37575  
  16         1818  
1172             use Sys::Hostname;
1173             $self->{myhostname} = hostname; # cache the result
1174             ';
1175              
1176             # could have failed. supply a default in that case
1177 16   50     694 $self->{myhostname} ||= 'nohost';
1178              
1179 16         265 return $self->{myhostname};
1180             }
1181              
1182             sub ensure_dir_exists {
1183 5155     5155 0 9658 my ($self, $dir) = @_;
1184 5155 100       24863 return if exists ($self->{ensured_dir_exists}->{$dir});
1185 105         381 $self->{ensured_dir_exists}->{$dir} = 1;
1186 105 100       59784 (-d $dir) or mkdir($dir);
1187             }
1188              
1189             sub queuedir_is_bad {
1190 2     2 0 5 my ($self, $pathqueuedir) = @_;
1191              
1192             # try creating the dir; it may not exist yet
1193 2         6 $self->ensure_dir_exists ($pathqueuedir);
1194 2 50       51 if (!opendir (RETRY, $pathqueuedir)) {
1195             # still can't open it! problem
1196 0         0 warn "IPC::DirQueue: cannot open queue dir \"$pathqueuedir\": $!\n";
1197 0         0 return 1;
1198             }
1199             # otherwise, we could open it -- it just needed to be created.
1200 2         19 closedir RETRY;
1201 2         19 return 0;
1202             }
1203              
1204             sub dbg {
1205 12135 50   12135 0 40130 return unless $DEBUG;
1206 0         0 warn "dq debug: ".join(' ',@_)."\n";
1207             }
1208              
1209             ###########################################################################
1210              
1211             sub queue_iter_start {
1212 407     407 0 794 my ($self, $pathqueuedir) = @_;
1213              
1214 407 50       1670 if ($self->{indexclient}) {
    100          
    100          
1215 0         0 dbg ("queue iter: getting list for $pathqueuedir");
1216 0         0 my @files = sort grep { /^\d/ } $self->{indexclient}->ls($pathqueuedir);
  0         0  
1217              
1218 0 0       0 if (scalar @files <= 0) {
1219 0 0       0 return if $self->queuedir_is_bad($pathqueuedir);
1220             }
1221              
1222 0         0 return { files => \@files };
1223             }
1224             elsif ($self->{ordered}) {
1225 343         1383 dbg ("queue iter: opening $pathqueuedir (ordered)");
1226 343         38399 my $files = $self->get_dir_filelist_sorted($pathqueuedir);
1227 343 100       1016 if (scalar @$files <= 0) {
1228 2 50       10 return if $self->queuedir_is_bad($pathqueuedir);
1229             }
1230              
1231 343         1481 return { files => $files };
1232             }
1233             elsif ($self->{queue_fanout}) {
1234 32         85 return $self->queue_iter_fanout_start($pathqueuedir);
1235             }
1236             else {
1237 32         35 my $dirfh;
1238 32         78 dbg ("queue iter: opening $pathqueuedir");
1239 32 50       797 if (!opendir ($dirfh, $pathqueuedir)) {
1240 0 0       0 return if $self->queuedir_is_bad($pathqueuedir);
1241 0 0       0 if (!opendir ($dirfh, $pathqueuedir)) {
1242 0         0 warn "oops? pathqueuedir bad";
1243 0         0 return;
1244             }
1245             }
1246              
1247 32         108 return { fh => $dirfh };
1248             }
1249              
1250 0         0 die "cannot get here";
1251             }
1252              
1253             sub queue_iter_next {
1254 1579     1579 0 2371 my ($self, $iter) = @_;
1255              
1256 1579 50       5319 if ($self->{indexclient}) {
    100          
    100          
1257 0         0 return shift @{$iter->{files}};
  0         0  
1258             }
1259             elsif ($self->{ordered}) {
1260 546         692 return shift @{$iter->{files}};
  546         1714  
1261             }
1262             elsif ($self->{queue_fanout}) {
1263 687         1589 return $self->queue_iter_fanout_next($iter);
1264             }
1265             else {
1266 346         1988 return readdir($iter->{fh});
1267             }
1268              
1269 0         0 return;
1270             }
1271              
1272             sub queue_iter_stop {
1273 407     407 0 638 my ($self, $iter) = @_;
1274              
1275 407 50       822 return unless $iter;
1276 407 100       1260 if (defined $iter->{fanfh}) { closedir($iter->{fanfh}); }
  30         403  
1277 407 100       1296 if (defined $iter->{fh}) { closedir($iter->{fh}); }
  32         347  
1278             }
1279              
1280             ###########################################################################
1281              
1282             sub queue_dir_fanout_create {
1283 1120     1120 0 2178 my ($self, $pathqueuedir) = @_;
1284              
1285 1120 100       4449 if (!$self->{queue_fanout}) {
1286 990         2580 return;
1287             }
1288              
1289 130         1922 my @letters = split '', q{0123456789abcdef};
1290 130         582 my $fanout = $letters[get_random_int() % (scalar @letters)];
1291              
1292 130         571 $self->ensure_dir_exists ($pathqueuedir);
1293 130         581 $self->ensure_dir_exists ($pathqueuedir.SLASH.$fanout);
1294 130         701 return $fanout;
1295             }
1296              
1297             sub queue_dir_fanout_commit {
1298 1120     1120 0 2234 my ($self, $pathqueuedir, $fanout) = @_;
1299              
1300 1120 100       4010 if (!$self->{queue_fanout}) {
1301 990         3452 return;
1302             }
1303              
1304             # now touch all levels ($pathqueuedir will be touched later)
1305 130 50       810 $self->touch($pathqueuedir.SLASH.$fanout)
1306             or die "cannot touch fanout for $pathqueuedir/$fanout";
1307             }
1308              
1309             sub queue_dir_fanout_path {
1310 1120     1120 0 2266 my ($self, $pathqueuedir, $fanout) = @_;
1311              
1312 1120 100       4403 if (!$self->{queue_fanout}) {
1313 990         3672 return $pathqueuedir;
1314             }
1315             else {
1316 130         692 return $pathqueuedir.SLASH.$fanout;
1317             }
1318             }
1319              
1320             sub queue_dir_fanout_path_strip {
1321 1572     1572 0 2174 my ($self, $fname) = @_;
1322              
1323 1572 100       11466 if ($self->{queue_fanout}) {
1324 685         19182 $fname =~ s/^.*\///;
1325             }
1326 1572         3501 return $fname;
1327             }
1328              
1329             sub queue_iter_fanout_start {
1330 32     32 0 47 my ($self, $pathqueuedir) = @_;
1331 32         57 my $iter = { };
1332              
1333             {
1334 32         40 my @fanouts;
  32         40  
1335 32         98 dbg ("queue iter: opening $pathqueuedir");
1336 32 50       703 if (!opendir (DIR, $pathqueuedir)) {
1337 0         0 @fanouts = (); # no dir? nothing queued
1338             }
1339             else {
1340 416         6664 my %map = map {
1341 480         1055 $_ => (-M $pathqueuedir.SLASH.$_)
1342 32         471 } grep { /^[a-z0-9]$/ } readdir(DIR);
1343 32         283 @fanouts = sort { $map{$a} <=> $map{$b} } keys %map;
  896         1193  
1344 32         187 dbg ("fanout: $pathqueuedir, order is ".join ' ', @fanouts);
1345             }
1346 32         350 closedir DIR;
1347 32         101 $iter->{fanoutlist} = \@fanouts;
1348 32         80 $iter->{pathqueuedir} = $pathqueuedir;
1349              
1350             }
1351 32         95 return $iter;
1352             }
1353              
1354             sub queue_iter_fanout_next {
1355 687     687 0 1841 my ($self, $iter) = @_;
1356              
1357             # dir handles are:
1358             # /path/to/queue = $iter->{fh}
1359             # /f = $iter->{fanfh}
1360              
1361             next_fanout:
1362              
1363             # open the {fanfh} handle, if it isn't already going
1364 884 100       1997 if (!defined $iter->{fanfh}) {
1365 229         228 my $nextfanout = shift @{$iter->{fanoutlist}};
  229         507  
1366 229 100       484 if (!defined $nextfanout) {
1367 2         5 dbg ("fanout: end of list");
1368 2         5 return;
1369             }
1370              
1371 227         248 my $dirfh;
1372 227         607 dbg ("fanout: opening next dir: $nextfanout");
1373 227 50       18624 if (!opendir ($dirfh, $iter->{pathqueuedir}.SLASH.$nextfanout)) {
1374 0         0 warn "opendir failed $iter->{pathqueuedir}/$nextfanout: $!";
1375 0         0 return;
1376             }
1377              
1378 227         420 $iter->{fanstr} = $nextfanout;
1379 227         686 $iter->{fanfh} = $dirfh;
1380             }
1381              
1382 882         4492 my $fname = readdir($iter->{fanfh});
1383 882 100       1960 if (defined $fname) {
1384 685         2150 return $iter->{fanstr}.SLASH.$fname; # best-case scenario
1385             }
1386            
1387 197         415 dbg ("fanout: finished this dir, trying next one");
1388 197         2056 closedir($iter->{fanfh});
1389 197         358 $iter->{fanstr} = undef;
1390 197         289 $iter->{fanfh} = undef;
1391 197         546 goto next_fanout;
1392             }
1393              
1394 23     23   316 use constant UTIME_TAKES_UNDEF_FOR_TOUCH => ($] >= 5.007002);
  23         80  
  23         5614  
1395              
1396             sub touch {
1397 1250     1250 0 3001 my ($self, $path) = @_;
1398              
1399             # 'Since perl 5.7.2, if the first two elements of the list are "undef", then
1400             # the utime(2) function in the C library will be called with a null second
1401             # argument. On most systems, this will set the file's access and modification
1402             # times to the current time'.
1403              
1404 1250         1771 if (UTIME_TAKES_UNDEF_FOR_TOUCH) {
1405 1250         58649 return utime undef, undef, $path;
1406             } else {
1407             my $now = time;
1408             return utime $now, $now, $path;
1409             }
1410             }
1411              
1412             ###########################################################################
1413              
1414             1;
1415              
1416             =back
1417              
1418             =head1 STALE LOCKS AND SIGNAL HANDLING
1419              
1420             If interrupted or terminated, dequeueing processes should be careful to either
1421             call C<$job-Efinish()> or C<$job-Ereturn_to_queue()> on any active
1422             tasks before exiting -- otherwise those jobs will remain marked I.
1423              
1424             Dequeueing processes can also call C<$job-Etouch_active_lock()>
1425             periodically, while processing large tasks, to ensure that the task is still
1426             marked as I.
1427              
1428             Stale locks are normally dealt with automatically. If a lock is still
1429             I after about 10 minutes of inactivity, the other dequeuers on
1430             that machine will probe the process ID listed in that lock file using
1431             C. If that process ID is no longer running, the lock is presumed
1432             likely to be stale. If a given timeout (10 minutes plus a random value
1433             between 0 and 256 seconds) has elapsed since the lock file was last
1434             modified, the lock file is deleted.
1435              
1436             This 10-minute default can be modified using the C
1437             parameter to the C constructor.
1438              
1439             Note: this means that if the dequeueing processes are spread among
1440             multiple machines, and there is no longer a dequeuer running on the
1441             machine that initially 'locked' the task, it will never be unlocked,
1442             unless you delete the I file for that task.
1443              
1444             =head1 QUEUE DIRECTORY STRUCTURE
1445              
1446             C maintains the following structure for a queue directory:
1447              
1448             =over 4
1449              
1450             =item queue directory
1451              
1452             The B directory is used to store the queue control files. Queue
1453             control files determine what jobs are in the queue; if a job has a queue
1454             control file in this directory, it is listed in the queue.
1455              
1456             The filename format is as follows:
1457              
1458             50.20040909232529941258.HASH[.PID.RAND]
1459              
1460             The first two digits (C<50>) are the priority of the job. Lower priority
1461             numbers are run first. C<20040909232529> is the current date and time when the
1462             enqueueing process was run, in C format. C<941258> is the time in
1463             microseconds, as returned by C. And finally, C is a
1464             variable-length hash of some semi-random data, used to increase the chance of
1465             uniqueness.
1466              
1467             If there is a collision, the timestamps are regenerated after a 250 msec sleep,
1468             and further randomness will be added at the end of the string (namely, the
1469             current process ID and a random integer value). Up to 10 retries will be
1470             attempted. Once the file is atomically moved into the B directory
1471             without collision, the retries cease.
1472              
1473             If B was used in the C constructor, then
1474             the B directory does not contain the queue control files directly;
1475             instead, there is an interposing set of 16 "fan-out" directories, named
1476             according to the hex digits from C<0> to C.
1477              
1478             =item active directory
1479              
1480             The B directory is used to store active queue control files.
1481              
1482             When a job becomes 'active' -- ie. is picked up by C --
1483             its control file is moved from the B directory into the B
1484             directory while it is processed.
1485              
1486             =item data directory
1487              
1488             The B directory is used to store enqueued data files.
1489              
1490             It contains a two-level "fan-out" hashed directory structure; each data file is
1491             stored under a single-letter directory, which in turn is under a single-letter
1492             directory. This increases the efficiency of directory lookups under many
1493             filesystems.
1494              
1495             The format of filenames here is similar to that used in the B directory,
1496             except that the last two characters are removed and used instead for the
1497             "fan-out" directory names.
1498              
1499             =item tmp directory
1500              
1501             The B directory contains temporary work files that are in the process
1502             of enqueueing, and not ready ready for processing.
1503              
1504             The filename format here is similar to the above, with suffixes indicating
1505             the type of file (".ctrl", ".data").
1506              
1507             =back
1508              
1509             Atomic, NFS-safe renaming is used to avoid collisions, overwriting or
1510             other unsafe operations.
1511              
1512             =head1 SEE ALSO
1513              
1514             C
1515              
1516             =head1 AUTHOR
1517              
1518             Justin Mason Edq /at/ jmason.orgE
1519              
1520             =head1 MAILING LIST
1521              
1522             The IPC::DirQueue mailing list is at Eipc-dirqueue-subscribe@perl.orgE.
1523              
1524             =head1 COPYRIGHT
1525              
1526             C is distributed under the same license as perl itself.
1527              
1528             =head1 AVAILABILITY
1529              
1530             The latest version of this library is likely to be available from CPAN.
1531