line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
=head1 NAME |
2
|
|
|
|
|
|
|
|
3
|
|
|
|
|
|
|
IPC::DirQueue - disk-based many-to-many task queue |
4
|
|
|
|
|
|
|
|
5
|
|
|
|
|
|
|
=head1 SYNOPSIS |
6
|
|
|
|
|
|
|
|
7
|
|
|
|
|
|
|
my $dq = IPC::DirQueue->new({ dir => "/path/to/queue" }); |
8
|
|
|
|
|
|
|
$dq->enqueue_file("filename"); |
9
|
|
|
|
|
|
|
|
10
|
|
|
|
|
|
|
my $dq = IPC::DirQueue->new({ dir => "/path/to/queue" }); |
11
|
|
|
|
|
|
|
my $job = $dq->pickup_queued_job(); |
12
|
|
|
|
|
|
|
if (!$job) { print "no jobs left\n"; exit; } |
13
|
|
|
|
|
|
|
# ...do something interesting with $job->get_data_path() ... |
14
|
|
|
|
|
|
|
$job->finish(); |
15
|
|
|
|
|
|
|
|
16
|
|
|
|
|
|
|
=head1 DESCRIPTION |
17
|
|
|
|
|
|
|
|
18
|
|
|
|
|
|
|
This module implements a FIFO queueing infrastructure, using a directory |
19
|
|
|
|
|
|
|
as the communications and storage media. No daemon process is required to |
20
|
|
|
|
|
|
|
manage the queue; all communication takes place via the filesystem. |
21
|
|
|
|
|
|
|
|
22
|
|
|
|
|
|
|
A common UNIX system design pattern is to use a tool like C as a task |
23
|
|
|
|
|
|
|
queueing system; for example, |
24
|
|
|
|
|
|
|
C describes the |
25
|
|
|
|
|
|
|
use of C as an MP3 jukebox. |
26
|
|
|
|
|
|
|
|
27
|
|
|
|
|
|
|
However, C isn't as efficient as it could be. When used in this way, you |
28
|
|
|
|
|
|
|
have to restart each task processor for every new task. If you have a lot of |
29
|
|
|
|
|
|
|
startup overhead, this can be very inefficient. With C, a |
30
|
|
|
|
|
|
|
processing server can run persistently and cache data needed across multiple |
31
|
|
|
|
|
|
|
tasks efficiently; it will not be restarted unless you restart it. |
32
|
|
|
|
|
|
|
|
33
|
|
|
|
|
|
|
Multiple enqueueing and dequeueing processes on multiple hosts (NFS-safe |
34
|
|
|
|
|
|
|
locking is used) can run simultaneously, and safely, on the same queue. |
35
|
|
|
|
|
|
|
|
36
|
|
|
|
|
|
|
Since multiple dequeuers can run simultaneously, this provides a good way |
37
|
|
|
|
|
|
|
to process a variable level of incoming tasks using a pre-defined number |
38
|
|
|
|
|
|
|
of worker processes. |
39
|
|
|
|
|
|
|
|
40
|
|
|
|
|
|
|
If you need more CPU power working on a queue, you can simply start |
41
|
|
|
|
|
|
|
another dequeuer to help out. If you need less, kill off a few dequeuers. |
42
|
|
|
|
|
|
|
|
43
|
|
|
|
|
|
|
If you need to take down the server to perform some maintainance or |
44
|
|
|
|
|
|
|
upgrades, just kill the dequeuer processes, perform the work, and start up |
45
|
|
|
|
|
|
|
new ones. Since there's no 'socket' or similar point of failure aside from |
46
|
|
|
|
|
|
|
the directory itself, the queue will just quietly fill with waiting jobs |
47
|
|
|
|
|
|
|
until the new dequeuer is ready. |
48
|
|
|
|
|
|
|
|
49
|
|
|
|
|
|
|
Arbitrary 'name = value' string-pair metadata can be transferred alongside data |
50
|
|
|
|
|
|
|
files. In fact, in some cases, you may find it easier to send unused and |
51
|
|
|
|
|
|
|
empty data files, and just use the 'metadata' fields to transfer the details of |
52
|
|
|
|
|
|
|
what will be worked on. |
53
|
|
|
|
|
|
|
|
54
|
|
|
|
|
|
|
=head1 METHODS |
55
|
|
|
|
|
|
|
|
56
|
|
|
|
|
|
|
=over 4 |
57
|
|
|
|
|
|
|
|
58
|
|
|
|
|
|
|
=cut |
59
|
|
|
|
|
|
|
|
60
|
|
|
|
|
|
|
package IPC::DirQueue; |
61
|
23
|
|
|
23
|
|
302052
|
use strict; |
|
23
|
|
|
|
|
64
|
|
|
23
|
|
|
|
|
4954
|
|
62
|
23
|
|
|
23
|
|
37848
|
use bytes; |
|
23
|
|
|
|
|
326
|
|
|
23
|
|
|
|
|
126
|
|
63
|
23
|
|
|
23
|
|
74674
|
use Time::HiRes qw(); |
|
23
|
|
|
|
|
598363
|
|
|
23
|
|
|
|
|
893
|
|
64
|
23
|
|
|
23
|
|
195
|
use Fcntl qw(O_WRONLY O_CREAT O_EXCL O_RDONLY); |
|
23
|
|
|
|
|
51
|
|
|
23
|
|
|
|
|
7653
|
|
65
|
23
|
|
|
23
|
|
22360
|
use IPC::DirQueue::Job; |
|
23
|
|
|
|
|
65
|
|
|
23
|
|
|
|
|
642
|
|
66
|
23
|
|
|
23
|
|
15457
|
use IPC::DirQueue::IndexClient; |
|
23
|
|
|
|
|
82
|
|
|
23
|
|
|
|
|
876
|
|
67
|
23
|
|
|
23
|
|
268
|
use Errno qw(EEXIST); |
|
23
|
|
|
|
|
55
|
|
|
23
|
|
|
|
|
6299
|
|
68
|
|
|
|
|
|
|
|
69
|
|
|
|
|
|
|
our @ISA = (); |
70
|
|
|
|
|
|
|
|
71
|
|
|
|
|
|
|
our $VERSION = "1.0"; |
72
|
|
|
|
|
|
|
|
73
|
23
|
|
|
23
|
|
141
|
use constant SLASH => '/'; |
|
23
|
|
|
|
|
44
|
|
|
23
|
|
|
|
|
234657
|
|
74
|
|
|
|
|
|
|
|
75
|
|
|
|
|
|
|
# our $DEBUG = 1; |
76
|
|
|
|
|
|
|
our $DEBUG; # = 1; |
77
|
|
|
|
|
|
|
|
78
|
|
|
|
|
|
|
########################################################################### |
79
|
|
|
|
|
|
|
|
80
|
|
|
|
|
|
|
=item $dq->new ($opts); |
81
|
|
|
|
|
|
|
|
82
|
|
|
|
|
|
|
Create a new queue object, suitable for either enqueueing jobs |
83
|
|
|
|
|
|
|
or picking up already-queued jobs for processing. |
84
|
|
|
|
|
|
|
|
85
|
|
|
|
|
|
|
C<$opts> is a reference to a hash, which may contain the following |
86
|
|
|
|
|
|
|
options: |
87
|
|
|
|
|
|
|
|
88
|
|
|
|
|
|
|
=over 4 |
89
|
|
|
|
|
|
|
|
90
|
|
|
|
|
|
|
=item dir => $path_to_directory (no default) |
91
|
|
|
|
|
|
|
|
92
|
|
|
|
|
|
|
Name the directory where the queue files are stored. This is required. |
93
|
|
|
|
|
|
|
|
94
|
|
|
|
|
|
|
=item data_file_mode => $mode (default: 0666) |
95
|
|
|
|
|
|
|
|
96
|
|
|
|
|
|
|
The C-style file mode for data files. This should be specified |
97
|
|
|
|
|
|
|
as a string with a leading 0. It will be affected by the current |
98
|
|
|
|
|
|
|
process C. |
99
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
=item queue_file_mode => $mode (default: 0666) |
101
|
|
|
|
|
|
|
|
102
|
|
|
|
|
|
|
The C-style file mode for queue control files. This should be |
103
|
|
|
|
|
|
|
specified as a string with a leading 0. It will be affected by the |
104
|
|
|
|
|
|
|
current process C. |
105
|
|
|
|
|
|
|
|
106
|
|
|
|
|
|
|
=item ordered => { 0 | 1 } (default: 1) |
107
|
|
|
|
|
|
|
|
108
|
|
|
|
|
|
|
Whether the jobs should be processed in order of submission, or |
109
|
|
|
|
|
|
|
in no particular order. |
110
|
|
|
|
|
|
|
|
111
|
|
|
|
|
|
|
=item queue_fanout => { 0 | 1 } (default: 0) |
112
|
|
|
|
|
|
|
|
113
|
|
|
|
|
|
|
Whether the queue directory should be 'fanned out'. This allows better |
114
|
|
|
|
|
|
|
scalability with NFS-shared queues with large numbers of pending files, but |
115
|
|
|
|
|
|
|
hurts performance otherwise. It also implies B = 0. (This is |
116
|
|
|
|
|
|
|
strictly experimental, has overall poor performance, and is not recommended.) |
117
|
|
|
|
|
|
|
|
118
|
|
|
|
|
|
|
=item indexd_uri => $uri (default: undef) |
119
|
|
|
|
|
|
|
|
120
|
|
|
|
|
|
|
A URI of a C daemon, used to maintain the list of waiting jobs. The |
121
|
|
|
|
|
|
|
URI must be of the form C . (This is strictly |
122
|
|
|
|
|
|
|
experimental, and is not recommended.) |
123
|
|
|
|
|
|
|
|
124
|
|
|
|
|
|
|
=item buf_size => $number (default: 65536) |
125
|
|
|
|
|
|
|
|
126
|
|
|
|
|
|
|
The buffer size to use when copying files, in bytes. |
127
|
|
|
|
|
|
|
|
128
|
|
|
|
|
|
|
=item active_file_lifetime => $number (default: 600) |
129
|
|
|
|
|
|
|
|
130
|
|
|
|
|
|
|
The lifetime of an untouched active lockfile, in seconds. See 'STALE LOCKS AND |
131
|
|
|
|
|
|
|
SIGNAL HANDLING', below, for more details. |
132
|
|
|
|
|
|
|
|
133
|
|
|
|
|
|
|
=back |
134
|
|
|
|
|
|
|
|
135
|
|
|
|
|
|
|
=cut |
136
|
|
|
|
|
|
|
|
137
|
|
|
|
|
|
|
sub new { |
138
|
20
|
|
|
20
|
1
|
629666
|
my $class = shift; |
139
|
20
|
|
|
|
|
132
|
my $opts = shift; |
140
|
20
|
|
50
|
|
|
242
|
$opts ||= { }; |
141
|
20
|
|
33
|
|
|
311
|
$class = ref($class) || $class; |
142
|
20
|
|
|
|
|
70
|
my $self = $opts; |
143
|
20
|
|
|
|
|
94
|
bless ($self, $class); |
144
|
|
|
|
|
|
|
|
145
|
20
|
50
|
|
|
|
453
|
die "no 'dir' specified" unless $self->{dir}; |
146
|
20
|
|
50
|
|
|
1256
|
$self->{data_file_mode} ||= '0666'; |
147
|
20
|
|
|
|
|
106
|
$self->{data_file_mode} = oct ($self->{data_file_mode}); |
148
|
20
|
|
50
|
|
|
181
|
$self->{queue_file_mode} ||= '0666'; |
149
|
20
|
|
|
|
|
65
|
$self->{queue_file_mode} = oct ($self->{queue_file_mode}); |
150
|
|
|
|
|
|
|
|
151
|
20
|
100
|
|
|
|
161
|
if ($self->{queue_fanout}) { |
|
|
100
|
|
|
|
|
|
152
|
2
|
|
|
|
|
4
|
$self->{queue_fanout} = 1; |
153
|
2
|
|
|
|
|
5
|
$self->{ordered} = 0; # fanout wins |
154
|
|
|
|
|
|
|
} |
155
|
|
|
|
|
|
|
elsif (!defined $self->{ordered}) { |
156
|
10
|
|
|
|
|
27
|
$self->{ordered} = 1; |
157
|
|
|
|
|
|
|
} |
158
|
|
|
|
|
|
|
|
159
|
20
|
|
50
|
|
|
159
|
$self->{buf_size} ||= 65536; |
160
|
20
|
|
50
|
|
|
168
|
$self->{active_file_lifetime} ||= 600; |
161
|
|
|
|
|
|
|
|
162
|
20
|
|
|
|
|
63
|
$self->{ensured_dir_exists} = { }; |
163
|
20
|
|
|
|
|
172
|
$self->ensure_dir_exists ($self->{dir}); |
164
|
|
|
|
|
|
|
|
165
|
20
|
50
|
|
|
|
76
|
if ($self->{indexd_uri}) { |
166
|
0
|
|
|
|
|
0
|
$self->{indexclient} = IPC::DirQueue::IndexClient->new({ |
167
|
|
|
|
|
|
|
uri => $self->{indexd_uri} |
168
|
|
|
|
|
|
|
}); |
169
|
|
|
|
|
|
|
} |
170
|
|
|
|
|
|
|
|
171
|
20
|
|
|
|
|
102
|
$self; |
172
|
|
|
|
|
|
|
} |
173
|
|
|
|
|
|
|
|
174
|
|
|
|
|
|
|
sub dbg; |
175
|
|
|
|
|
|
|
|
176
|
|
|
|
|
|
|
########################################################################### |
177
|
|
|
|
|
|
|
|
178
|
|
|
|
|
|
|
=item $dq->enqueue_file ($filename [, $metadata [, $pri] ] ); |
179
|
|
|
|
|
|
|
|
180
|
|
|
|
|
|
|
Enqueue a new job for processing. Returns C<1> if the job was enqueued, or |
181
|
|
|
|
|
|
|
C on failure. |
182
|
|
|
|
|
|
|
|
183
|
|
|
|
|
|
|
C<$filename> is the path to the file to be enqueued. Its contents |
184
|
|
|
|
|
|
|
will be read, and will be used as the contents of the data file available |
185
|
|
|
|
|
|
|
to dequeuers using C. |
186
|
|
|
|
|
|
|
|
187
|
|
|
|
|
|
|
C<$metadata> is an optional hash reference; every item of metadata will be |
188
|
|
|
|
|
|
|
available to worker processes on the C object, in the |
189
|
|
|
|
|
|
|
C<$job-E{metadata}> hashref. Note that using this channel for metadata |
190
|
|
|
|
|
|
|
brings with it several restrictions: |
191
|
|
|
|
|
|
|
|
192
|
|
|
|
|
|
|
=over 4 |
193
|
|
|
|
|
|
|
|
194
|
|
|
|
|
|
|
=item 1. it requires that the metadata be stored as 'name' => 'value' string pairs |
195
|
|
|
|
|
|
|
|
196
|
|
|
|
|
|
|
=item 2. neither 'name' nor 'value' may contain newline (\n) or NUL (\0) characters |
197
|
|
|
|
|
|
|
|
198
|
|
|
|
|
|
|
=item 3. 'name' cannot contain colon (:) characters |
199
|
|
|
|
|
|
|
|
200
|
|
|
|
|
|
|
=item 4. 'name' cannot start with a capital letter 'Q' and be 4 characters in length |
201
|
|
|
|
|
|
|
|
202
|
|
|
|
|
|
|
=back |
203
|
|
|
|
|
|
|
|
204
|
|
|
|
|
|
|
If those restrictions are broken, die() will be called with the following |
205
|
|
|
|
|
|
|
error: |
206
|
|
|
|
|
|
|
|
207
|
|
|
|
|
|
|
die "IPC::DirQueue: invalid metadatum: '$k'"; |
208
|
|
|
|
|
|
|
|
209
|
|
|
|
|
|
|
This is a change added in release 0.06; prior to that, that metadatum would be |
210
|
|
|
|
|
|
|
silently dropped. |
211
|
|
|
|
|
|
|
|
212
|
|
|
|
|
|
|
An optional priority can be specified; lower priorities are run first. |
213
|
|
|
|
|
|
|
Priorities range from 0 to 99, and 50 is default. |
214
|
|
|
|
|
|
|
|
215
|
|
|
|
|
|
|
=cut |
216
|
|
|
|
|
|
|
|
217
|
|
|
|
|
|
|
sub enqueue_file { |
218
|
0
|
|
|
0
|
1
|
0
|
my ($self, $file, $metadata, $pri) = @_; |
219
|
0
|
0
|
|
|
|
0
|
if (!open (IN, "<$file")) { |
220
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: cannot open $file for read: $!"; |
221
|
0
|
|
|
|
|
0
|
return; |
222
|
|
|
|
|
|
|
} |
223
|
0
|
|
|
|
|
0
|
my $ret = $self->_enqueue_backend ($metadata, $pri, \*IN); |
224
|
0
|
|
|
|
|
0
|
close IN; |
225
|
0
|
|
|
|
|
0
|
return $ret; |
226
|
|
|
|
|
|
|
} |
227
|
|
|
|
|
|
|
|
228
|
|
|
|
|
|
|
=item $dq->enqueue_fh ($filehandle [, $metadata [, $pri] ] ); |
229
|
|
|
|
|
|
|
|
230
|
|
|
|
|
|
|
Enqueue a new job for processing. Returns C<1> if the job was enqueued, or |
231
|
|
|
|
|
|
|
C on failure. C<$pri> and C<$metadata> are as described in |
232
|
|
|
|
|
|
|
C<$dq-Eenqueue_file()>. |
233
|
|
|
|
|
|
|
|
234
|
|
|
|
|
|
|
C<$filehandle> is a perl file handle that must be open for reading. It will be |
235
|
|
|
|
|
|
|
closed on completion, regardless of success or failure. Its contents will be |
236
|
|
|
|
|
|
|
read, and will be used as the contents of the data file available to dequeuers |
237
|
|
|
|
|
|
|
using C. |
238
|
|
|
|
|
|
|
|
239
|
|
|
|
|
|
|
=cut |
240
|
|
|
|
|
|
|
|
241
|
|
|
|
|
|
|
sub enqueue_fh { |
242
|
10
|
|
|
10
|
1
|
482
|
my ($self, $fhin, $metadata, $pri) = @_; |
243
|
10
|
|
|
|
|
25
|
my $ret = $self->_enqueue_backend ($metadata, $pri, $fhin); |
244
|
10
|
|
|
|
|
15
|
close $fhin; |
245
|
10
|
|
|
|
|
54
|
return $ret; |
246
|
|
|
|
|
|
|
} |
247
|
|
|
|
|
|
|
|
248
|
|
|
|
|
|
|
=item $dq->enqueue_string ($string [, $metadata [, $pri] ] ); |
249
|
|
|
|
|
|
|
|
250
|
|
|
|
|
|
|
Enqueue a new job for processing. The job data is entirely read from |
251
|
|
|
|
|
|
|
C<$string>. Returns C<1> if the job was enqueued, or C on failure. |
252
|
|
|
|
|
|
|
C<$pri> and C<$metadata> are as described in C<$dq-Eenqueue_file()>. |
253
|
|
|
|
|
|
|
|
254
|
|
|
|
|
|
|
=cut |
255
|
|
|
|
|
|
|
|
256
|
|
|
|
|
|
|
sub enqueue_string { |
257
|
1010
|
|
|
1010
|
1
|
204985688
|
my ($self, $string, $metadata, $pri) = @_; |
258
|
1010
|
|
|
|
|
3744
|
my $enqd_already = 0; |
259
|
|
|
|
|
|
|
return $self->_enqueue_backend ($metadata, $pri, undef, |
260
|
|
|
|
|
|
|
sub { |
261
|
2020
|
100
|
|
2020
|
|
8181
|
return if $enqd_already++; |
262
|
1010
|
|
|
|
|
2839
|
return $string; |
263
|
1010
|
|
|
|
|
20278
|
}); |
264
|
|
|
|
|
|
|
} |
265
|
|
|
|
|
|
|
|
266
|
|
|
|
|
|
|
=item $dq->enqueue_sub ($subref [, $metadata [, $pri] ] ); |
267
|
|
|
|
|
|
|
|
268
|
|
|
|
|
|
|
Enqueue a new job for processing. Returns C<1> if the job was enqueued, or |
269
|
|
|
|
|
|
|
C on failure. C<$pri> and C<$metadata> are as described in |
270
|
|
|
|
|
|
|
C<$dq-Eenqueue_file()>. |
271
|
|
|
|
|
|
|
|
272
|
|
|
|
|
|
|
C<$subref> is a perl subroutine, which is expected to return one of the |
273
|
|
|
|
|
|
|
following each time it is called: |
274
|
|
|
|
|
|
|
|
275
|
|
|
|
|
|
|
- a string of data bytes to be appended to any existing data. (the |
276
|
|
|
|
|
|
|
string may be empty, C<''>, in which case it's a no-op.) |
277
|
|
|
|
|
|
|
|
278
|
|
|
|
|
|
|
- C when the enqueued data has ended, ie. EOF. |
279
|
|
|
|
|
|
|
|
280
|
|
|
|
|
|
|
- C if an error occurs. The C message will be converted into |
281
|
|
|
|
|
|
|
a warning, and the C call will return C. |
282
|
|
|
|
|
|
|
|
283
|
|
|
|
|
|
|
(Tip: note that this is a closure, so variables outside the subroutine can be |
284
|
|
|
|
|
|
|
accessed safely.) |
285
|
|
|
|
|
|
|
|
286
|
|
|
|
|
|
|
=cut |
287
|
|
|
|
|
|
|
|
288
|
|
|
|
|
|
|
sub enqueue_sub { |
289
|
100
|
|
|
100
|
1
|
13974
|
my ($self, $subref, $metadata, $pri) = @_; |
290
|
100
|
|
|
|
|
232
|
return $self->_enqueue_backend ($metadata, $pri, undef, $subref); |
291
|
|
|
|
|
|
|
} |
292
|
|
|
|
|
|
|
|
293
|
|
|
|
|
|
|
# private implementation. |
294
|
|
|
|
|
|
|
sub _enqueue_backend { |
295
|
1120
|
|
|
1120
|
|
3503
|
my ($self, $metadata, $pri, $fhin, $callbackin) = @_; |
296
|
|
|
|
|
|
|
|
297
|
1120
|
50
|
|
|
|
20136
|
if (!defined($pri)) { $pri = 50; } |
|
1120
|
|
|
|
|
2561
|
|
298
|
1120
|
50
|
33
|
|
|
10309
|
if ($pri < 0 || $pri > 99) { |
299
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: bad priority $pri is > 99 or < 0"; |
300
|
0
|
|
|
|
|
0
|
return; |
301
|
|
|
|
|
|
|
} |
302
|
|
|
|
|
|
|
|
303
|
1120
|
|
|
|
|
22877
|
my ($now, $nowmsecs) = Time::HiRes::gettimeofday; |
304
|
|
|
|
|
|
|
|
305
|
1120
|
|
|
|
|
9600
|
my $job = { |
306
|
|
|
|
|
|
|
pri => $pri, |
307
|
|
|
|
|
|
|
metadata => $metadata, |
308
|
|
|
|
|
|
|
time_submitted_secs => $now, |
309
|
|
|
|
|
|
|
time_submitted_msecs => $nowmsecs |
310
|
|
|
|
|
|
|
}; |
311
|
|
|
|
|
|
|
|
312
|
|
|
|
|
|
|
# NOTE: this can change until the moment we've renamed the ctrl file |
313
|
|
|
|
|
|
|
# into 'queue'! |
314
|
1120
|
|
|
|
|
7510
|
my $qfnametmp = $self->new_q_filename($job); |
315
|
1120
|
|
|
|
|
2879
|
my $qcnametmp = $qfnametmp; |
316
|
|
|
|
|
|
|
|
317
|
1120
|
|
|
|
|
5805
|
my $pathtmp = $self->q_subdir('tmp'); |
318
|
1120
|
|
|
|
|
13796
|
$self->ensure_dir_exists ($pathtmp); |
319
|
|
|
|
|
|
|
|
320
|
1120
|
|
|
|
|
3900
|
my $pathtmpctrl = $pathtmp.SLASH.$qfnametmp.".ctrl"; |
321
|
1120
|
|
|
|
|
3207
|
my $pathtmpdata = $pathtmp.SLASH.$qfnametmp.".data"; |
322
|
|
|
|
|
|
|
|
323
|
1120
|
50
|
|
|
|
3366461
|
if (!sysopen (OUT, $pathtmpdata, O_WRONLY|O_CREAT|O_EXCL, |
324
|
|
|
|
|
|
|
$self->{data_file_mode})) |
325
|
|
|
|
|
|
|
{ |
326
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: cannot open $pathtmpdata for write: $!"; |
327
|
0
|
|
|
|
|
0
|
return; |
328
|
|
|
|
|
|
|
} |
329
|
1120
|
|
|
|
|
10041
|
my $pathtmpdata_created = 1; |
330
|
|
|
|
|
|
|
|
331
|
1120
|
|
|
|
|
2661
|
my $siz; |
332
|
1120
|
|
|
|
|
12582
|
eval { |
333
|
1120
|
|
|
|
|
7088
|
$siz = $self->copy_in_to_out_fh ($fhin, $callbackin, |
334
|
|
|
|
|
|
|
\*OUT, $pathtmpdata); |
335
|
|
|
|
|
|
|
}; |
336
|
1120
|
50
|
|
|
|
3440
|
if ($@) { |
337
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: enqueue failed: $@"; |
338
|
|
|
|
|
|
|
} |
339
|
1120
|
50
|
|
|
|
24436
|
if (!defined $siz) { |
340
|
0
|
|
|
|
|
0
|
goto failure; |
341
|
|
|
|
|
|
|
} |
342
|
1120
|
|
|
|
|
3680
|
$job->{size_bytes} = $siz; |
343
|
|
|
|
|
|
|
|
344
|
|
|
|
|
|
|
# get the data dir |
345
|
1120
|
|
|
|
|
5640
|
my $pathdatadir = $self->q_subdir('data'); |
346
|
|
|
|
|
|
|
|
347
|
|
|
|
|
|
|
# hashing the data dir, using 2 levels of directory hashing. This has a tiny |
348
|
|
|
|
|
|
|
# effect on speed in all cases up to 10k queued files, but has good results |
349
|
|
|
|
|
|
|
# in terms of the usability of those dirs for users doing direct access, so |
350
|
|
|
|
|
|
|
# enabled by default. |
351
|
1120
|
|
|
|
|
2613
|
if (1) { |
352
|
|
|
|
|
|
|
# take the last two chars for the hashname. In most cases, this will |
353
|
|
|
|
|
|
|
# be the last 2 chars of a hash of (hostname, pid), so effectively |
354
|
|
|
|
|
|
|
# random. Remove it from the filename entirely, since it's redundant |
355
|
|
|
|
|
|
|
# to have it both in the dir name and the filename. |
356
|
1120
|
|
|
|
|
11645
|
$qfnametmp =~ s/([A-Za-z0-9+_])([A-Za-z0-9+_])$//; |
357
|
1120
|
|
50
|
|
|
9105
|
my $hash1 = $1 || '0'; |
358
|
1120
|
|
50
|
|
|
5422
|
my $hash2 = $2 || '0'; |
359
|
1120
|
|
|
|
|
1987
|
my $origdatadir = $pathdatadir; |
360
|
1120
|
|
|
|
|
4807
|
$pathdatadir = "$pathdatadir/$hash1/$hash2"; |
361
|
|
|
|
|
|
|
# check to see if that hashdir exists... build it up if req'd |
362
|
1120
|
100
|
|
|
|
55537
|
if (!-d $pathdatadir) { |
363
|
15
|
|
|
|
|
216
|
foreach my $dir ($origdatadir, "$origdatadir/$hash1", $pathdatadir) |
364
|
|
|
|
|
|
|
{ |
365
|
45
|
100
|
|
|
|
359870
|
(-d $dir) or mkdir ($dir); |
366
|
|
|
|
|
|
|
} |
367
|
|
|
|
|
|
|
} |
368
|
|
|
|
|
|
|
} |
369
|
|
|
|
|
|
|
|
370
|
|
|
|
|
|
|
# now link(2) the data tmpfile into the 'data' dir. |
371
|
1120
|
|
|
|
|
8431
|
my $pathdata = $self->link_into_dir ($job, $pathtmpdata, |
372
|
|
|
|
|
|
|
$pathdatadir, $qfnametmp); |
373
|
1120
|
50
|
|
|
|
3007
|
if (!$pathdata) { |
374
|
0
|
|
|
|
|
0
|
goto failure; |
375
|
|
|
|
|
|
|
} |
376
|
1120
|
|
|
|
|
1868
|
my $pathdata_created = 1; |
377
|
1120
|
|
|
|
|
4159
|
$job->{pathdata} = $pathdata; |
378
|
|
|
|
|
|
|
|
379
|
|
|
|
|
|
|
# ok, write a control file now that data is safe and we know it's |
380
|
|
|
|
|
|
|
# new filename... |
381
|
1120
|
50
|
|
|
|
5366
|
if (!$self->create_control_file ($job, $pathtmpdata, $pathtmpctrl)) { |
382
|
0
|
|
|
|
|
0
|
goto failure; |
383
|
|
|
|
|
|
|
} |
384
|
1120
|
|
|
|
|
2024
|
my $pathtmpctrl_created = 1; |
385
|
|
|
|
|
|
|
|
386
|
|
|
|
|
|
|
# now link(2) that into the 'queue' dir. |
387
|
1120
|
|
|
|
|
4832
|
my $pathqueuedir = $self->q_subdir('queue'); |
388
|
1120
|
|
|
|
|
5335
|
my $fanout = $self->queue_dir_fanout_create($pathqueuedir); |
389
|
|
|
|
|
|
|
|
390
|
1120
|
|
|
|
|
4762
|
my $pathqueue = $self->link_into_dir ($job, $pathtmpctrl, |
391
|
|
|
|
|
|
|
$self->queue_dir_fanout_path($pathqueuedir, $fanout), |
392
|
|
|
|
|
|
|
$qcnametmp); |
393
|
|
|
|
|
|
|
|
394
|
1120
|
50
|
|
|
|
3082
|
if (!$pathqueue) { |
395
|
0
|
|
|
|
|
0
|
dbg ("failed to link_into_dir, enq failed"); |
396
|
0
|
|
|
|
|
0
|
goto failure; |
397
|
|
|
|
|
|
|
} |
398
|
|
|
|
|
|
|
|
399
|
|
|
|
|
|
|
# and incr the fanout counter for that fanout dir |
400
|
1120
|
|
|
|
|
10855
|
$self->queue_dir_fanout_commit($pathqueuedir, $fanout); |
401
|
|
|
|
|
|
|
|
402
|
|
|
|
|
|
|
# touch the "queue" directory to indicate that it's changed |
403
|
|
|
|
|
|
|
# and a file has been enqueued; required to support Reiserfs |
404
|
|
|
|
|
|
|
# and XFS, where this is not implicit |
405
|
1120
|
|
|
|
|
3787
|
$pathqueuedir = $self->q_subdir('queue'); |
406
|
1120
|
50
|
|
|
|
4148
|
$self->touch($pathqueuedir) or warn "touch failed on $pathqueuedir"; |
407
|
1120
|
|
|
|
|
6424
|
dbg ("touched $pathqueuedir at ".time); |
408
|
|
|
|
|
|
|
|
409
|
1120
|
50
|
|
|
|
4273
|
if ($self->{indexclient}) { |
410
|
0
|
|
|
|
|
0
|
$self->{indexclient}->enqueue($pathqueuedir, $pathqueue); |
411
|
|
|
|
|
|
|
} |
412
|
|
|
|
|
|
|
|
413
|
|
|
|
|
|
|
# my $pathqueue_created = 1; # not required, we're done! |
414
|
1120
|
|
|
|
|
16787
|
return 1; |
415
|
|
|
|
|
|
|
|
416
|
|
|
|
|
|
|
failure: |
417
|
0
|
0
|
|
|
|
0
|
if ($pathtmpctrl_created) { |
418
|
0
|
0
|
|
|
|
0
|
unlink $pathtmpctrl or warn "IPC::DirQueue: cannot unlink $pathtmpctrl"; |
419
|
|
|
|
|
|
|
} |
420
|
0
|
0
|
|
|
|
0
|
if ($pathtmpdata_created) { |
421
|
0
|
0
|
|
|
|
0
|
unlink $pathtmpdata or warn "IPC::DirQueue: cannot unlink $pathtmpdata"; |
422
|
|
|
|
|
|
|
} |
423
|
0
|
0
|
|
|
|
0
|
if ($pathdata_created) { |
424
|
0
|
0
|
|
|
|
0
|
unlink $pathdata or warn "IPC::DirQueue: cannot unlink $pathdata"; |
425
|
|
|
|
|
|
|
} |
426
|
0
|
|
|
|
|
0
|
return; |
427
|
|
|
|
|
|
|
} |
428
|
|
|
|
|
|
|
|
429
|
|
|
|
|
|
|
########################################################################### |
430
|
|
|
|
|
|
|
|
431
|
|
|
|
|
|
|
=item $job = $dq->pickup_queued_job( [ path => $path ] ); |
432
|
|
|
|
|
|
|
|
433
|
|
|
|
|
|
|
Pick up the next job in the queue, so that it can be processed. |
434
|
|
|
|
|
|
|
|
435
|
|
|
|
|
|
|
If no job is available for processing, either because the queue is |
436
|
|
|
|
|
|
|
empty or because other worker processes are already working on |
437
|
|
|
|
|
|
|
them, C is returned; otherwise, a new instance of C |
438
|
|
|
|
|
|
|
is returned. |
439
|
|
|
|
|
|
|
|
440
|
|
|
|
|
|
|
Note that the job is marked as I until C<$job-Efinish()> |
441
|
|
|
|
|
|
|
is called. |
442
|
|
|
|
|
|
|
|
443
|
|
|
|
|
|
|
If the (optional) parameter C is used, its value indicates the path of |
444
|
|
|
|
|
|
|
the desired job's data file. By using this, it is possible to cancel |
445
|
|
|
|
|
|
|
not-yet-active items from anywhere in the queue, or pick up jobs out of |
446
|
|
|
|
|
|
|
sequence. The data path must match the value of the I member of |
447
|
|
|
|
|
|
|
the C object passed to the C callback. |
448
|
|
|
|
|
|
|
|
449
|
|
|
|
|
|
|
=cut |
450
|
|
|
|
|
|
|
|
451
|
|
|
|
|
|
|
sub pickup_queued_job { |
452
|
403
|
|
|
403
|
1
|
2055
|
my ($self, %args) = @_; |
453
|
|
|
|
|
|
|
|
454
|
403
|
|
|
|
|
876
|
my $pathqueuedir = $self->q_subdir('queue'); |
455
|
403
|
|
|
|
|
975
|
my $pathactivedir = $self->q_subdir('active'); |
456
|
403
|
|
|
|
|
1128
|
$self->ensure_dir_exists ($pathactivedir); |
457
|
|
|
|
|
|
|
|
458
|
403
|
|
|
|
|
1227
|
my $iter = $self->queue_iter_start($pathqueuedir); |
459
|
|
|
|
|
|
|
|
460
|
403
|
|
|
|
|
1677
|
while (1) { |
461
|
1457
|
|
|
|
|
3822
|
my $nextfile = $self->queue_iter_next($iter); |
462
|
|
|
|
|
|
|
|
463
|
1457
|
100
|
|
|
|
3025
|
if (!defined $nextfile) { |
464
|
|
|
|
|
|
|
# no more files in the queue, return empty |
465
|
3
|
|
|
|
|
7
|
last; |
466
|
|
|
|
|
|
|
} |
467
|
|
|
|
|
|
|
|
468
|
1454
|
|
|
|
|
3081
|
my $nextfilebase = $self->queue_dir_fanout_path_strip($nextfile); |
469
|
|
|
|
|
|
|
|
470
|
1454
|
100
|
|
|
|
4358
|
next if ($nextfilebase !~ /^\d/); |
471
|
1059
|
|
|
|
|
2336
|
my $pathactive = $pathactivedir.SLASH.$nextfilebase; |
472
|
1059
|
|
|
|
|
1561
|
my $pathqueue = $pathqueuedir.SLASH.$nextfile; |
473
|
|
|
|
|
|
|
|
474
|
1059
|
100
|
100
|
|
|
4374
|
next if (exists($args{path}) && ($pathqueue ne $args{path})); |
475
|
|
|
|
|
|
|
|
476
|
400
|
|
|
|
|
42597
|
my ($dev,$ino,$mode,$nlink,$uid,$gid,$rdev,$size, |
477
|
|
|
|
|
|
|
$atime,$mtime,$ctime,$blksize,$blocks) = lstat($pathactive); |
478
|
|
|
|
|
|
|
|
479
|
400
|
50
|
|
|
|
927
|
if (defined $mtime) { |
480
|
|
|
|
|
|
|
# *DO* call time() here. In extremely large dirs, it may take |
481
|
|
|
|
|
|
|
# several seconds to traverse the entire listing from start |
482
|
|
|
|
|
|
|
# to finish! |
483
|
0
|
0
|
|
|
|
0
|
if (time() - $mtime < $self->{active_file_lifetime}) { |
484
|
|
|
|
|
|
|
# active lockfile; it's being worked on. skip this file |
485
|
0
|
|
|
|
|
0
|
next; |
486
|
|
|
|
|
|
|
} |
487
|
|
|
|
|
|
|
|
488
|
0
|
0
|
|
|
|
0
|
if ($self->worker_still_working($pathactive)) { |
489
|
|
|
|
|
|
|
# worker is still alive, although not updating the lock |
490
|
0
|
|
|
|
|
0
|
dbg ("worker still working, skip: $pathactive"); |
491
|
0
|
|
|
|
|
0
|
next; |
492
|
|
|
|
|
|
|
} |
493
|
|
|
|
|
|
|
|
494
|
|
|
|
|
|
|
# now, we want to try to avoid 2 or 3 dequeuers removing |
495
|
|
|
|
|
|
|
# the lockfile simultaneously, as that could cause this race: |
496
|
|
|
|
|
|
|
# |
497
|
|
|
|
|
|
|
# dqproc1: [checks file] [unlinks] [starts work] |
498
|
|
|
|
|
|
|
# dqproc2: [checks file] [unlinks] |
499
|
|
|
|
|
|
|
# |
500
|
|
|
|
|
|
|
# ie. the second process unlinks the first process' brand-new |
501
|
|
|
|
|
|
|
# lockfile! |
502
|
|
|
|
|
|
|
# |
503
|
|
|
|
|
|
|
# to avoid this, use a random "fudge" on the timeout, so |
504
|
|
|
|
|
|
|
# that dqproc2 will wait for possibly much longer than |
505
|
|
|
|
|
|
|
# dqproc1 before it decides to unlink it. |
506
|
|
|
|
|
|
|
# |
507
|
|
|
|
|
|
|
# this isn't perfect. TODO: is there a "rename this fd" syscall |
508
|
|
|
|
|
|
|
# accessible from perl? |
509
|
|
|
|
|
|
|
|
510
|
0
|
|
|
|
|
0
|
my $fudge = get_random_int() % 256; |
511
|
0
|
0
|
|
|
|
0
|
if (time() - $mtime < $self->{active_file_lifetime}+$fudge) { |
512
|
|
|
|
|
|
|
# within the fudge zone. don't unlink it in this process. |
513
|
0
|
|
|
|
|
0
|
dbg ("within active fudge zone, skip: $pathactive"); |
514
|
0
|
|
|
|
|
0
|
next; |
515
|
|
|
|
|
|
|
} |
516
|
|
|
|
|
|
|
|
517
|
|
|
|
|
|
|
# else, we can kill the stale lockfile |
518
|
0
|
0
|
|
|
|
0
|
unlink $pathactive or warn "IPC::DirQueue: unlink failed: $pathactive"; |
519
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: killed stale lockfile: $pathactive"; |
520
|
|
|
|
|
|
|
} |
521
|
|
|
|
|
|
|
|
522
|
|
|
|
|
|
|
# ok, we're free to get cracking on this file. |
523
|
400
|
|
|
|
|
902
|
my $pathtmp = $self->q_subdir('tmp'); |
524
|
400
|
|
|
|
|
28807
|
$self->ensure_dir_exists ($pathtmp); |
525
|
|
|
|
|
|
|
|
526
|
|
|
|
|
|
|
# use the name of the queue file itself, plus a tmp prefix, plus active |
527
|
400
|
|
|
|
|
1582
|
my $pathtmpactive = $pathtmp.SLASH. |
528
|
|
|
|
|
|
|
$nextfilebase.".".$self->new_lock_filename().".active"; |
529
|
|
|
|
|
|
|
|
530
|
400
|
|
|
|
|
5256
|
dbg ("creating tmp active $pathtmpactive"); |
531
|
400
|
50
|
|
|
|
54580
|
if (!sysopen (LOCK, $pathtmpactive, O_WRONLY|O_CREAT|O_EXCL, |
532
|
|
|
|
|
|
|
$self->{queue_file_mode})) |
533
|
|
|
|
|
|
|
{ |
534
|
0
|
0
|
|
|
|
0
|
if ($!{EEXIST}) { |
535
|
|
|
|
|
|
|
# contention; skip this file |
536
|
0
|
|
|
|
|
0
|
dbg ("IPC::DirQueue: $pathtmpactive already created, skipping: $!"); |
537
|
|
|
|
|
|
|
} |
538
|
|
|
|
|
|
|
else { |
539
|
|
|
|
|
|
|
# could be serious; disk space, permissions etc. |
540
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: cannot open $pathtmpactive for write: $!"; |
541
|
|
|
|
|
|
|
} |
542
|
0
|
|
|
|
|
0
|
next; |
543
|
|
|
|
|
|
|
} |
544
|
400
|
|
|
|
|
1377
|
print LOCK $self->gethostname(), "\n", $$, "\n"; |
545
|
400
|
|
|
|
|
26219
|
close LOCK; |
546
|
|
|
|
|
|
|
|
547
|
400
|
50
|
|
|
|
22167
|
if (!-f $pathqueue) { |
548
|
|
|
|
|
|
|
# queue file already gone; another worker got it before we did. |
549
|
|
|
|
|
|
|
# catch this case before we create a lockfile. |
550
|
|
|
|
|
|
|
# see the "pathqueue_gone" comment below for an explanation |
551
|
0
|
|
|
|
|
0
|
dbg("IPC::DirQueue: $pathqueue no longer exists, skipping"); |
552
|
0
|
|
|
|
|
0
|
goto nextfile; |
553
|
|
|
|
|
|
|
} |
554
|
|
|
|
|
|
|
|
555
|
400
|
|
|
|
|
3767
|
my $job = IPC::DirQueue::Job->new ($self, { |
556
|
|
|
|
|
|
|
jobid => $nextfilebase, |
557
|
|
|
|
|
|
|
pathqueue => $pathqueue, |
558
|
|
|
|
|
|
|
pathactive => $pathactive |
559
|
|
|
|
|
|
|
}); |
560
|
|
|
|
|
|
|
|
561
|
400
|
|
|
|
|
1337
|
my $pathnewactive = $self->link_into_dir_no_retry ($job, |
562
|
|
|
|
|
|
|
$pathtmpactive, $pathactivedir, $nextfilebase); |
563
|
400
|
50
|
|
|
|
921
|
if (!defined($pathnewactive)) { |
564
|
|
|
|
|
|
|
# link failed; another worker got it before we did |
565
|
|
|
|
|
|
|
# no need to unlink tmpfile, the "nextfile" action will do that |
566
|
0
|
|
|
|
|
0
|
goto nextfile; |
567
|
|
|
|
|
|
|
} |
568
|
|
|
|
|
|
|
|
569
|
400
|
50
|
|
|
|
934
|
if ($pathactive ne $pathnewactive) { |
570
|
0
|
|
|
|
|
0
|
die "oops! active paths differ: $pathactive $pathnewactive"; |
571
|
|
|
|
|
|
|
} |
572
|
|
|
|
|
|
|
|
573
|
400
|
50
|
|
|
|
21251
|
if (!open (IN, "<".$pathqueue)) |
574
|
|
|
|
|
|
|
{ |
575
|
|
|
|
|
|
|
# since we read the list of files upfront, this can happen: |
576
|
|
|
|
|
|
|
# |
577
|
|
|
|
|
|
|
# dqproc1: [gets lock] [work] [finish_job] |
578
|
|
|
|
|
|
|
# dqproc2: [gets lock] |
579
|
|
|
|
|
|
|
# |
580
|
|
|
|
|
|
|
# "dqproc1" has already completed the job, unlinking both the active |
581
|
|
|
|
|
|
|
# *and* queue files, by the time "dqproc2" gets to it. This is OK; |
582
|
|
|
|
|
|
|
# just skip the file, since it's already done. [pathqueue_gone] |
583
|
|
|
|
|
|
|
|
584
|
0
|
|
|
|
|
0
|
dbg("IPC::DirQueue: cannot open $pathqueue for read: $!"); |
585
|
0
|
|
|
|
|
0
|
unlink $pathnewactive; |
586
|
0
|
|
|
|
|
0
|
next; # NOT "goto nextfile", as pathtmpactive is already unlinked |
587
|
|
|
|
|
|
|
} |
588
|
|
|
|
|
|
|
|
589
|
400
|
|
|
|
|
3944
|
my $red = $self->read_control_file ($job, \*IN); |
590
|
400
|
|
|
|
|
4546
|
close IN; |
591
|
|
|
|
|
|
|
|
592
|
400
|
50
|
|
|
|
1003
|
next if (!$red); |
593
|
|
|
|
|
|
|
|
594
|
400
|
|
|
|
|
1257
|
$self->queue_iter_stop($iter); |
595
|
400
|
|
|
|
|
3889
|
return $job; |
596
|
|
|
|
|
|
|
|
597
|
0
|
0
|
|
|
|
0
|
nextfile: |
598
|
|
|
|
|
|
|
unlink $pathtmpactive or warn "IPC::DirQueue: unlink failed: $pathtmpactive"; |
599
|
|
|
|
|
|
|
} |
600
|
|
|
|
|
|
|
|
601
|
3
|
|
|
|
|
12
|
$self->queue_iter_stop($iter); |
602
|
3
|
|
|
|
|
17
|
return; # empty |
603
|
|
|
|
|
|
|
} |
604
|
|
|
|
|
|
|
|
605
|
|
|
|
|
|
|
########################################################################### |
606
|
|
|
|
|
|
|
|
607
|
|
|
|
|
|
|
=item $job = $dq->wait_for_queued_job ([ $timeout [, $pollinterval] ]); |
608
|
|
|
|
|
|
|
|
609
|
|
|
|
|
|
|
Wait for a job to be queued within the next C<$timeout> seconds. |
610
|
|
|
|
|
|
|
|
611
|
|
|
|
|
|
|
If there is already a job ready for processing, this will return immediately. |
612
|
|
|
|
|
|
|
If one is not available, it will sleep, wake up periodically, check for job |
613
|
|
|
|
|
|
|
availabilty, and either carry on sleeping or return the new job if one |
614
|
|
|
|
|
|
|
is now available. |
615
|
|
|
|
|
|
|
|
616
|
|
|
|
|
|
|
If a job becomes available, a new instance of C is |
617
|
|
|
|
|
|
|
returned. If the timeout is reached, C is returned. |
618
|
|
|
|
|
|
|
|
619
|
|
|
|
|
|
|
If C<$timeout> is not specified, or is less than 1, this function will wait |
620
|
|
|
|
|
|
|
indefinitely. |
621
|
|
|
|
|
|
|
|
622
|
|
|
|
|
|
|
The optional parameter C<$pollinterval> indicates how frequently to wake |
623
|
|
|
|
|
|
|
up and check for new jobs. It is specified in seconds, and floating-point |
624
|
|
|
|
|
|
|
precision is supported. The default is C<1>. |
625
|
|
|
|
|
|
|
|
626
|
|
|
|
|
|
|
Note that if C<$timeout> is not a round multiple of C<$pollinterval>, |
627
|
|
|
|
|
|
|
the nearest round multiple of C<$pollinterval> greater than C<$timeout> |
628
|
|
|
|
|
|
|
will be used instead. Also note that C<$timeout> is used as an integer. |
629
|
|
|
|
|
|
|
|
630
|
|
|
|
|
|
|
=cut |
631
|
|
|
|
|
|
|
|
632
|
|
|
|
|
|
|
sub wait_for_queued_job { |
633
|
310
|
|
|
310
|
1
|
14533
|
my ($self, $timeout, $pollintvl) = @_; |
634
|
|
|
|
|
|
|
|
635
|
310
|
|
|
|
|
445
|
my $finishtime; |
636
|
310
|
50
|
33
|
|
|
2618
|
if ($timeout && $timeout > 0) { |
637
|
0
|
|
|
|
|
0
|
$finishtime = time + int ($timeout); |
638
|
|
|
|
|
|
|
} |
639
|
|
|
|
|
|
|
|
640
|
310
|
|
|
|
|
671
|
dbg "wait_for_queued_job starting"; |
641
|
|
|
|
|
|
|
|
642
|
310
|
50
|
|
|
|
861
|
if ($pollintvl) { |
643
|
0
|
|
|
|
|
0
|
$pollintvl *= 1000000; # from secs to usecs |
644
|
|
|
|
|
|
|
} else { |
645
|
310
|
|
|
|
|
616
|
$pollintvl = 1000000; # default: 1 sec |
646
|
|
|
|
|
|
|
} |
647
|
|
|
|
|
|
|
|
648
|
310
|
|
|
|
|
883
|
my $pathqueuedir = $self->q_subdir('queue'); |
649
|
310
|
|
|
|
|
1009
|
$self->ensure_dir_exists ($pathqueuedir); |
650
|
|
|
|
|
|
|
|
651
|
|
|
|
|
|
|
# TODO: would be nice to use fam for this, where available. But |
652
|
|
|
|
|
|
|
# no biggie... |
653
|
|
|
|
|
|
|
|
654
|
310
|
|
|
|
|
410
|
while (1) { |
655
|
|
|
|
|
|
|
# check the stat time on the queue dir *before* we call pickup, |
656
|
|
|
|
|
|
|
# to avoid a race condition where a job is added while we're |
657
|
|
|
|
|
|
|
# checking in that function. |
658
|
|
|
|
|
|
|
|
659
|
310
|
|
|
|
|
10740
|
my @stat = stat ($pathqueuedir); |
660
|
310
|
|
|
|
|
810
|
my $qdirlaststat = $stat[9]; |
661
|
|
|
|
|
|
|
|
662
|
310
|
|
|
|
|
809
|
my $job = $self->pickup_queued_job(); |
663
|
310
|
50
|
|
|
|
764
|
if ($job) { return $job; } |
|
310
|
|
|
|
|
1526
|
|
664
|
|
|
|
|
|
|
|
665
|
|
|
|
|
|
|
# there's another semi-race condition here, brought about by a lack of |
666
|
|
|
|
|
|
|
# sub-second precision from stat(2). if the last enq occurred inside |
667
|
|
|
|
|
|
|
# *this* current 1-second window, then *another* one can happen inside this |
668
|
|
|
|
|
|
|
# second right afterwards, and we wouldn't notice. |
669
|
|
|
|
|
|
|
|
670
|
|
|
|
|
|
|
# in other words (ASCII-art alert): |
671
|
|
|
|
|
|
|
# TIME | t | t+1 |
672
|
|
|
|
|
|
|
# E | enq enq | |
673
|
|
|
|
|
|
|
# D | stat pickup_queued_job | |
674
|
|
|
|
|
|
|
|
675
|
|
|
|
|
|
|
# the enqueuer process E enqueues a job just after the stat, inside the |
676
|
|
|
|
|
|
|
# 1-second period "t". dequeuer process D dequeues it with |
677
|
|
|
|
|
|
|
# pickup_queued_job(). all is well. But then, E enqueues another job |
678
|
|
|
|
|
|
|
# inside the same 1-second period "t", and since the stat() has already |
679
|
|
|
|
|
|
|
# happened for "t", and since we've already picked up the job in "t", we |
680
|
|
|
|
|
|
|
# don't recheck; result is, we miss this enqueue event. |
681
|
|
|
|
|
|
|
# |
682
|
|
|
|
|
|
|
# Avoid this by checking in a busy-loop until time(2) says we're out of |
683
|
|
|
|
|
|
|
# that "danger zone" 1-second period. Any further enq's would then |
684
|
|
|
|
|
|
|
# cause stat(2) to report a different timestamp. |
685
|
|
|
|
|
|
|
|
686
|
0
|
|
|
|
|
0
|
while (time == $qdirlaststat) { |
687
|
0
|
|
|
|
|
0
|
Time::HiRes::usleep ($pollintvl); |
688
|
0
|
|
|
|
|
0
|
dbg "wait_for_queued_job: spinning until time != stat $qdirlaststat"; |
689
|
0
|
|
|
|
|
0
|
my $job = $self->pickup_queued_job(); |
690
|
0
|
0
|
|
|
|
0
|
if ($job) { return $job; } |
|
0
|
|
|
|
|
0
|
|
691
|
|
|
|
|
|
|
} |
692
|
|
|
|
|
|
|
|
693
|
|
|
|
|
|
|
# sleep until the directory's mtime changes from what it was when |
694
|
|
|
|
|
|
|
# we ran pickup_queued_job() last. |
695
|
|
|
|
|
|
|
|
696
|
0
|
|
|
|
|
0
|
dbg "wait_for_queued_job: sleeping on $pathqueuedir"; |
697
|
0
|
|
|
|
|
0
|
while (1) { |
698
|
0
|
|
|
|
|
0
|
my $now = time; |
699
|
0
|
0
|
0
|
|
|
0
|
if ($finishtime && $now >= $finishtime) { |
700
|
0
|
|
|
|
|
0
|
dbg "wait_for_queued_job timed out"; |
701
|
0
|
|
|
|
|
0
|
return undef; # out of time |
702
|
|
|
|
|
|
|
} |
703
|
|
|
|
|
|
|
|
704
|
0
|
|
|
|
|
0
|
Time::HiRes::usleep ($pollintvl); |
705
|
|
|
|
|
|
|
|
706
|
0
|
|
|
|
|
0
|
@stat = stat ($pathqueuedir); |
707
|
|
|
|
|
|
|
# dbg "wait_for_queued_job: stat $stat[9] $qdirlaststat $pathqueuedir"; |
708
|
0
|
0
|
0
|
|
|
0
|
last if (defined $stat[9] && |
|
|
|
0
|
|
|
|
|
709
|
|
|
|
|
|
|
((defined $qdirlaststat && $stat[9] != $qdirlaststat) |
710
|
|
|
|
|
|
|
|| !defined $qdirlaststat)); |
711
|
|
|
|
|
|
|
} |
712
|
|
|
|
|
|
|
|
713
|
0
|
|
|
|
|
0
|
dbg "wait_for_queued_job: activity, calling pickup"; |
714
|
|
|
|
|
|
|
} |
715
|
|
|
|
|
|
|
} |
716
|
|
|
|
|
|
|
|
717
|
|
|
|
|
|
|
########################################################################### |
718
|
|
|
|
|
|
|
|
719
|
|
|
|
|
|
|
=item $dq->visit_all_jobs($visitor, $visitcontext); |
720
|
|
|
|
|
|
|
|
721
|
|
|
|
|
|
|
Visit all the jobs in the queue, in a read-only mode. Used to list |
722
|
|
|
|
|
|
|
the entire queue. |
723
|
|
|
|
|
|
|
|
724
|
|
|
|
|
|
|
The callback function C<$visitor> will be called for each job in |
725
|
|
|
|
|
|
|
the queue, like so: |
726
|
|
|
|
|
|
|
|
727
|
|
|
|
|
|
|
&$visitor ($visitcontext, $job); |
728
|
|
|
|
|
|
|
|
729
|
|
|
|
|
|
|
C<$visitcontext> is whatever you pass in that variable above. |
730
|
|
|
|
|
|
|
C<$job> is a new, read-only instance of C representing |
731
|
|
|
|
|
|
|
that job. |
732
|
|
|
|
|
|
|
|
733
|
|
|
|
|
|
|
If a job is active (being processed), the C<$job> object also contains the |
734
|
|
|
|
|
|
|
following additional data: |
735
|
|
|
|
|
|
|
|
736
|
|
|
|
|
|
|
'active_host': the hostname on which the job is active |
737
|
|
|
|
|
|
|
'active_pid': the process ID of the process which picked up the job |
738
|
|
|
|
|
|
|
|
739
|
|
|
|
|
|
|
=cut |
740
|
|
|
|
|
|
|
|
741
|
|
|
|
|
|
|
sub visit_all_jobs { |
742
|
4
|
|
|
4
|
1
|
997
|
my ($self, $visitor, $visitcontext) = @_; |
743
|
|
|
|
|
|
|
|
744
|
4
|
|
|
|
|
17
|
my $pathqueuedir = $self->q_subdir('queue'); |
745
|
4
|
|
|
|
|
14
|
my $pathactivedir = $self->q_subdir('active'); |
746
|
|
|
|
|
|
|
|
747
|
4
|
|
|
|
|
37
|
my $iter = $self->queue_iter_start($pathqueuedir); |
748
|
|
|
|
|
|
|
|
749
|
4
|
|
|
|
|
17
|
my $nextfile; |
750
|
4
|
|
|
|
|
9
|
while (1) { |
751
|
122
|
|
|
|
|
933
|
$nextfile = $self->queue_iter_next($iter); |
752
|
|
|
|
|
|
|
|
753
|
122
|
100
|
|
|
|
245
|
if (!defined $nextfile) { |
754
|
|
|
|
|
|
|
# no more files in the queue, return empty |
755
|
4
|
|
|
|
|
11
|
last; |
756
|
|
|
|
|
|
|
} |
757
|
|
|
|
|
|
|
|
758
|
118
|
|
|
|
|
246
|
my $nextfilebase = $self->queue_dir_fanout_path_strip($nextfile); |
759
|
|
|
|
|
|
|
|
760
|
118
|
100
|
|
|
|
386
|
next if ($nextfilebase !~ /^\d/); |
761
|
90
|
|
|
|
|
176
|
my $pathqueue = $pathqueuedir.SLASH.$nextfile; |
762
|
90
|
|
|
|
|
261
|
my $pathactive = $pathactivedir.SLASH.$nextfilebase; |
763
|
|
|
|
|
|
|
|
764
|
90
|
50
|
|
|
|
1495
|
next if (!-f $pathqueue); |
765
|
|
|
|
|
|
|
|
766
|
90
|
|
|
|
|
106
|
my $acthost; |
767
|
|
|
|
|
|
|
my $actpid; |
768
|
90
|
50
|
|
|
|
1268
|
if (open (IN, "<$pathactive")) { |
769
|
0
|
|
|
|
|
0
|
$acthost = ; chomp $acthost; |
|
0
|
|
|
|
|
0
|
|
770
|
0
|
|
|
|
|
0
|
$actpid = ; chomp $actpid; |
|
0
|
|
|
|
|
0
|
|
771
|
0
|
|
|
|
|
0
|
close IN; |
772
|
|
|
|
|
|
|
} |
773
|
|
|
|
|
|
|
|
774
|
90
|
|
|
|
|
741
|
my $job = IPC::DirQueue::Job->new ($self, { |
775
|
|
|
|
|
|
|
is_readonly => 1, # means finish() will not rm files |
776
|
|
|
|
|
|
|
jobid => $nextfilebase, |
777
|
|
|
|
|
|
|
active_host => $acthost, |
778
|
|
|
|
|
|
|
active_pid => $actpid, |
779
|
|
|
|
|
|
|
pathqueue => $pathqueue, |
780
|
|
|
|
|
|
|
pathactive => $pathactive |
781
|
|
|
|
|
|
|
}); |
782
|
|
|
|
|
|
|
|
783
|
90
|
50
|
|
|
|
3209
|
if (!open (IN, "<".$pathqueue)) { |
784
|
0
|
|
|
|
|
0
|
dbg ("queue file disappeared, job finished? skip: $pathqueue"); |
785
|
0
|
|
|
|
|
0
|
next; |
786
|
|
|
|
|
|
|
} |
787
|
|
|
|
|
|
|
|
788
|
90
|
|
|
|
|
272
|
my $red = $self->read_control_file ($job, \*IN); |
789
|
90
|
|
|
|
|
845
|
close IN; |
790
|
|
|
|
|
|
|
|
791
|
90
|
50
|
|
|
|
200
|
if (!$red) { |
792
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: cannot read control file: $pathqueue"; |
793
|
0
|
|
|
|
|
0
|
next; |
794
|
|
|
|
|
|
|
} |
795
|
|
|
|
|
|
|
|
796
|
90
|
|
|
|
|
229
|
&$visitor ($visitcontext, $job); |
797
|
|
|
|
|
|
|
} |
798
|
|
|
|
|
|
|
|
799
|
4
|
|
|
|
|
19
|
$self->queue_iter_stop($iter); |
800
|
4
|
|
|
|
|
18
|
return; |
801
|
|
|
|
|
|
|
} |
802
|
|
|
|
|
|
|
|
803
|
|
|
|
|
|
|
########################################################################### |
804
|
|
|
|
|
|
|
|
805
|
|
|
|
|
|
|
# private API: performs logic of IPC::DirQueue::Job::finish(). |
806
|
|
|
|
|
|
|
sub finish_job { |
807
|
400
|
|
|
400
|
0
|
633
|
my ($self, $job, $isdone) = @_; |
808
|
|
|
|
|
|
|
|
809
|
400
|
|
|
|
|
1175
|
dbg ("finish_job: ", $job->{pathactive}); |
810
|
|
|
|
|
|
|
|
811
|
400
|
50
|
|
|
|
967
|
if ($job->{is_readonly}) { |
812
|
0
|
|
|
|
|
0
|
return; |
813
|
|
|
|
|
|
|
} |
814
|
|
|
|
|
|
|
|
815
|
400
|
50
|
|
|
|
837
|
if ($isdone) { |
816
|
400
|
50
|
|
|
|
95478
|
unlink($job->{pathqueue}) |
817
|
|
|
|
|
|
|
or warn "IPC::DirQueue: unlink failed: $job->{pathqueue}"; |
818
|
400
|
50
|
|
|
|
35962
|
unlink($job->{QDFN}) |
819
|
|
|
|
|
|
|
or warn "IPC::DirQueue: unlink failed: $job->{QDFN}"; |
820
|
|
|
|
|
|
|
|
821
|
400
|
50
|
|
|
|
1216
|
if ($self->{indexclient}) { |
822
|
0
|
|
|
|
|
0
|
my $pathqueuedir = $self->q_subdir('queue'); |
823
|
0
|
|
|
|
|
0
|
$self->{indexclient}->dequeue($pathqueuedir, $job->{pathqueue}); |
824
|
|
|
|
|
|
|
} |
825
|
|
|
|
|
|
|
|
826
|
|
|
|
|
|
|
# touch the dir so that other dequeuers re-check; activity can |
827
|
|
|
|
|
|
|
# introduce a small race, I think. (don't think this is necessary) |
828
|
|
|
|
|
|
|
# $self->touch($pathqueuedir) or warn "touch failed on $pathqueuedir"; |
829
|
|
|
|
|
|
|
} |
830
|
|
|
|
|
|
|
|
831
|
400
|
50
|
|
|
|
55193
|
unlink($job->{pathactive}) |
832
|
|
|
|
|
|
|
or warn "IPC::DirQueue: unlink failed: $job->{pathactive}"; |
833
|
|
|
|
|
|
|
} |
834
|
|
|
|
|
|
|
|
835
|
|
|
|
|
|
|
########################################################################### |
836
|
|
|
|
|
|
|
|
837
|
|
|
|
|
|
|
sub get_dir_filelist_sorted { |
838
|
343
|
|
|
343
|
0
|
530
|
my ($self, $dir) = @_; |
839
|
|
|
|
|
|
|
|
840
|
343
|
100
|
|
|
|
8755
|
if (!opendir (DIR, $dir)) { |
841
|
1
|
|
|
|
|
3
|
return []; # no dir? nothing queued |
842
|
|
|
|
|
|
|
} |
843
|
|
|
|
|
|
|
# have to read the lot, to sort them. |
844
|
342
|
|
|
|
|
41730
|
my @files = sort grep { /^\d/ } readdir(DIR); |
|
16384
|
|
|
|
|
47175
|
|
845
|
342
|
|
|
|
|
6808
|
closedir DIR; |
846
|
342
|
|
|
|
|
3726
|
return \@files; |
847
|
|
|
|
|
|
|
} |
848
|
|
|
|
|
|
|
|
849
|
|
|
|
|
|
|
########################################################################### |
850
|
|
|
|
|
|
|
|
851
|
|
|
|
|
|
|
sub copy_in_to_out_fh { |
852
|
1120
|
|
|
1120
|
0
|
3033
|
my ($self, $fhin, $callbackin, $fhout, $outfname) = @_; |
853
|
|
|
|
|
|
|
|
854
|
1120
|
|
|
|
|
3277
|
my $buf; |
855
|
|
|
|
|
|
|
my $len; |
856
|
1120
|
|
|
|
|
4241
|
my $siz = 0; |
857
|
|
|
|
|
|
|
|
858
|
1120
|
|
|
|
|
12121
|
binmode $fhout; |
859
|
1120
|
100
|
|
|
|
5595
|
if ($callbackin) { |
860
|
1110
|
|
|
|
|
1820
|
while (1) { |
861
|
2420
|
|
|
|
|
7238
|
my $stringin = $callbackin->(); |
862
|
|
|
|
|
|
|
|
863
|
2420
|
100
|
|
|
|
8219
|
if (!defined($stringin)) { |
864
|
1110
|
|
|
|
|
2985
|
last; # EOF |
865
|
|
|
|
|
|
|
} |
866
|
|
|
|
|
|
|
|
867
|
1310
|
|
|
|
|
2793
|
$len = length ($stringin); |
868
|
1310
|
50
|
|
|
|
3686
|
next if ($len == 0); # empty string, nothing to write |
869
|
|
|
|
|
|
|
|
870
|
1310
|
50
|
|
|
|
3842752
|
if (!print $fhout $stringin) { |
871
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: enqueue: cannot write to $outfname: $!"; |
872
|
0
|
|
|
|
|
0
|
close $fhout; |
873
|
0
|
|
|
|
|
0
|
return; |
874
|
|
|
|
|
|
|
} |
875
|
1310
|
|
|
|
|
2468
|
$siz += $len; |
876
|
|
|
|
|
|
|
} |
877
|
|
|
|
|
|
|
} |
878
|
|
|
|
|
|
|
else { |
879
|
10
|
|
|
|
|
16
|
binmode $fhin; |
880
|
10
|
|
|
|
|
185
|
while (($len = read ($fhin, $buf, $self->{buf_size})) > 0) { |
881
|
10
|
50
|
|
|
|
69
|
if (!print $fhout $buf) { |
882
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: cannot write to $outfname: $!"; |
883
|
0
|
|
|
|
|
0
|
close $fhin; close $fhout; |
|
0
|
|
|
|
|
0
|
|
884
|
0
|
|
|
|
|
0
|
return; |
885
|
|
|
|
|
|
|
} |
886
|
10
|
|
|
|
|
37
|
$siz += $len; |
887
|
|
|
|
|
|
|
} |
888
|
10
|
|
|
|
|
94
|
close $fhin; |
889
|
|
|
|
|
|
|
} |
890
|
|
|
|
|
|
|
|
891
|
1120
|
50
|
|
|
|
43920005
|
if (!close $fhout) { |
892
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: cannot close $outfname"; |
893
|
0
|
|
|
|
|
0
|
return; |
894
|
|
|
|
|
|
|
} |
895
|
1120
|
|
|
|
|
6438
|
return $siz; |
896
|
|
|
|
|
|
|
} |
897
|
|
|
|
|
|
|
|
898
|
|
|
|
|
|
|
sub link_into_dir { |
899
|
2240
|
|
|
2240
|
0
|
5312
|
my ($self, $job, $pathtmp, $pathlinkdir, $qfname) = @_; |
900
|
2240
|
|
|
|
|
5480
|
$self->ensure_dir_exists ($pathlinkdir); |
901
|
2240
|
|
|
|
|
2902
|
my $path; |
902
|
|
|
|
|
|
|
|
903
|
|
|
|
|
|
|
# retry 10 times; add a random few digits on link(2) failure |
904
|
2240
|
|
|
|
|
3661
|
my $maxretries = 10; |
905
|
2240
|
|
|
|
|
6374
|
for my $retry (1 .. $maxretries) { |
906
|
2240
|
|
|
|
|
6498
|
$path = $pathlinkdir.SLASH.$qfname; |
907
|
|
|
|
|
|
|
|
908
|
2240
|
|
|
|
|
8335
|
dbg ("link_into_dir retry=", $retry, " tmp=", $pathtmp, " path=", $path); |
909
|
|
|
|
|
|
|
|
910
|
2240
|
50
|
|
|
|
17790520
|
if (link ($pathtmp, $path)) { |
911
|
2240
|
|
|
|
|
5310
|
last; # got it |
912
|
|
|
|
|
|
|
} |
913
|
|
|
|
|
|
|
|
914
|
|
|
|
|
|
|
# link() may return failure, even if it succeeded. |
915
|
|
|
|
|
|
|
# use lstat() to verify that link() really failed. |
916
|
0
|
|
|
|
|
0
|
my ($dev,$ino,$mode,$nlink,$uid) = lstat($pathtmp); |
917
|
0
|
0
|
|
|
|
0
|
if ($nlink == 2) { |
918
|
0
|
|
|
|
|
0
|
last; # got it |
919
|
|
|
|
|
|
|
} |
920
|
|
|
|
|
|
|
|
921
|
|
|
|
|
|
|
# failed. check for retry limit first |
922
|
0
|
0
|
|
|
|
0
|
if ($retry == $maxretries) { |
923
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: cannot link $pathtmp to $path"; |
924
|
0
|
|
|
|
|
0
|
return; |
925
|
|
|
|
|
|
|
} |
926
|
|
|
|
|
|
|
|
927
|
|
|
|
|
|
|
# try a new q_filename, use randomness to avoid |
928
|
|
|
|
|
|
|
# further collisions |
929
|
0
|
|
|
|
|
0
|
$qfname = $self->new_q_filename($job, 1); |
930
|
|
|
|
|
|
|
|
931
|
0
|
|
|
|
|
0
|
dbg ("link_into_dir retrying: $retry"); |
932
|
0
|
|
|
|
|
0
|
Time::HiRes::usleep (250 * $retry); |
933
|
|
|
|
|
|
|
} |
934
|
|
|
|
|
|
|
|
935
|
|
|
|
|
|
|
# got it! unlink(2) the tmp file, since we don't need it. |
936
|
2240
|
|
|
|
|
14866
|
dbg ("link_into_dir unlink tmp file: $pathtmp"); |
937
|
2240
|
50
|
|
|
|
231222
|
if (!unlink ($pathtmp)) { |
938
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: cannot unlink $pathtmp"; |
939
|
|
|
|
|
|
|
# non-fatal, we can still continue anyway |
940
|
|
|
|
|
|
|
} |
941
|
|
|
|
|
|
|
|
942
|
2240
|
|
|
|
|
13718
|
dbg ("link_into_dir return: $path"); |
943
|
2240
|
|
|
|
|
6846
|
return $path; |
944
|
|
|
|
|
|
|
} |
945
|
|
|
|
|
|
|
|
946
|
|
|
|
|
|
|
sub link_into_dir_no_retry { |
947
|
400
|
|
|
400
|
0
|
779
|
my ($self, $job, $pathtmp, $pathlinkdir, $qfname) = @_; |
948
|
400
|
|
|
|
|
821
|
$self->ensure_dir_exists ($pathlinkdir); |
949
|
|
|
|
|
|
|
|
950
|
400
|
|
|
|
|
971
|
dbg ("lidnr: ", $pathtmp, " ", $pathlinkdir, "/", $qfname); |
951
|
|
|
|
|
|
|
|
952
|
400
|
|
|
|
|
10663
|
my ($dev1,$ino1,$mode1,$nlink1,$uid1) = lstat($pathtmp); |
953
|
400
|
50
|
|
|
|
1186
|
if (!defined $nlink1) { |
954
|
0
|
|
|
|
|
0
|
warn ("lidnr: tmp file disappeared?! $pathtmp"); |
955
|
0
|
|
|
|
|
0
|
return; # not going to have much luck here |
956
|
|
|
|
|
|
|
} |
957
|
|
|
|
|
|
|
|
958
|
400
|
|
|
|
|
857
|
my $path = $pathlinkdir.SLASH.$qfname; |
959
|
|
|
|
|
|
|
|
960
|
400
|
50
|
|
|
|
5427
|
if (-f $path) { |
961
|
0
|
|
|
|
|
0
|
dbg ("lidnr: target file already exists: $path"); |
962
|
0
|
|
|
|
|
0
|
return; # we've been beaten to it |
963
|
|
|
|
|
|
|
} |
964
|
|
|
|
|
|
|
|
965
|
400
|
|
|
|
|
549
|
my $linkfailed; |
966
|
400
|
50
|
|
|
|
40396
|
if (!link ($pathtmp, $path)) { |
967
|
0
|
|
|
|
|
0
|
dbg("link failure, recovering: $!"); |
968
|
0
|
|
|
|
|
0
|
$linkfailed = 1; |
969
|
|
|
|
|
|
|
} |
970
|
|
|
|
|
|
|
|
971
|
|
|
|
|
|
|
# link() may return failure, even if it succeeded. use lstat() to verify that |
972
|
|
|
|
|
|
|
# link() really failed. use lstat() even if it reported success, just to be |
973
|
|
|
|
|
|
|
# sure. ;) |
974
|
|
|
|
|
|
|
|
975
|
400
|
|
|
|
|
6268
|
my ($dev3,$ino3,$mode3,$nlink3,$uid3) = lstat($path); |
976
|
400
|
50
|
|
|
|
5285
|
if (!defined $nlink3) { |
977
|
0
|
|
|
|
|
0
|
dbg ("lidnr: link failed, target file nonexistent: $path"); |
978
|
0
|
|
|
|
|
0
|
return; |
979
|
|
|
|
|
|
|
} |
980
|
|
|
|
|
|
|
|
981
|
|
|
|
|
|
|
# now, be paranoid and verify that the inode data is identical |
982
|
400
|
50
|
33
|
|
|
3472
|
if ($dev1 != $dev3 || $ino1 != $ino3 || $uid1 != $uid3) { |
|
|
|
33
|
|
|
|
|
983
|
|
|
|
|
|
|
# the tmpfile and the target don't match each other. |
984
|
|
|
|
|
|
|
# if the link failed, this means that another qproc got |
985
|
|
|
|
|
|
|
# the file before we did, which is not an error. |
986
|
0
|
0
|
|
|
|
0
|
if (!$linkfailed) { |
987
|
|
|
|
|
|
|
# link supposedly succeeded, so this *is* an error. warn |
988
|
0
|
|
|
|
|
0
|
warn ("lidnr: tmp file doesn't match target: $path ($dev3,$ino3,$mode3,$nlink3,$uid3) vs $pathtmp ($dev1,$ino1,$mode1,$nlink1,$uid1)"); |
989
|
|
|
|
|
|
|
} |
990
|
0
|
|
|
|
|
0
|
return; |
991
|
|
|
|
|
|
|
} |
992
|
|
|
|
|
|
|
|
993
|
|
|
|
|
|
|
# got it! unlink(2) the tmp file, since we don't need it. |
994
|
400
|
|
|
|
|
1253
|
dbg ("lidnr: unlink tmp file: $pathtmp"); |
995
|
400
|
50
|
|
|
|
30321
|
if (!unlink ($pathtmp)) { |
996
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: cannot unlink $pathtmp"; |
997
|
|
|
|
|
|
|
# non-fatal, we can still continue anyway |
998
|
|
|
|
|
|
|
} |
999
|
|
|
|
|
|
|
|
1000
|
400
|
|
|
|
|
1197
|
dbg ("lidnr: return: $path"); |
1001
|
400
|
|
|
|
|
2465
|
return $path; |
1002
|
|
|
|
|
|
|
} |
1003
|
|
|
|
|
|
|
|
1004
|
|
|
|
|
|
|
sub create_control_file { |
1005
|
1120
|
|
|
1120
|
0
|
2777
|
my ($self, $job, $pathtmpdata, $pathtmpctrl) = @_; |
1006
|
|
|
|
|
|
|
|
1007
|
1120
|
|
|
|
|
6403
|
dbg ("create_control_file $pathtmpctrl for $pathtmpdata ($job->{pathdata})"); |
1008
|
1120
|
50
|
|
|
|
140349
|
if (!sysopen (OUT, $pathtmpctrl, O_WRONLY|O_CREAT|O_EXCL, |
1009
|
|
|
|
|
|
|
$self->{queue_file_mode})) |
1010
|
|
|
|
|
|
|
{ |
1011
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: cannot open $pathtmpctrl for write: $!"; |
1012
|
0
|
|
|
|
|
0
|
return; |
1013
|
|
|
|
|
|
|
} |
1014
|
|
|
|
|
|
|
|
1015
|
1120
|
|
|
|
|
7745
|
print OUT "QDFN: ", $job->{pathdata}, "\n"; |
1016
|
1120
|
|
|
|
|
7610
|
print OUT "QDSB: ", $job->{size_bytes}, "\n"; |
1017
|
1120
|
|
|
|
|
4324
|
print OUT "QSTT: ", $job->{time_submitted_secs}, "\n"; |
1018
|
1120
|
|
|
|
|
4984
|
print OUT "QSTM: ", $job->{time_submitted_msecs}, "\n"; |
1019
|
1120
|
|
|
|
|
3897
|
print OUT "QSHN: ", $self->gethostname(), "\n"; |
1020
|
|
|
|
|
|
|
|
1021
|
1120
|
|
|
|
|
2889
|
my $md = $job->{metadata}; |
1022
|
1120
|
|
|
|
|
1669
|
foreach my $k (keys %{$md}) { |
|
1120
|
|
|
|
|
5154
|
|
1023
|
1120
|
|
|
|
|
2885
|
my $v = $md->{$k}; |
1024
|
1120
|
50
|
33
|
|
|
15395
|
if (($k =~ /^Q...$/) |
|
|
|
33
|
|
|
|
|
1025
|
|
|
|
|
|
|
|| ($k =~ /[:\0\n]/s) |
1026
|
|
|
|
|
|
|
|| ($v =~ /[\0\n]/s)) |
1027
|
|
|
|
|
|
|
{ |
1028
|
0
|
|
|
|
|
0
|
close OUT; |
1029
|
0
|
|
|
|
|
0
|
die "IPC::DirQueue: invalid metadatum: '$k'"; # TODO: clean up files? |
1030
|
|
|
|
|
|
|
} |
1031
|
1120
|
|
|
|
|
7583
|
print OUT $k, ": ", $v, "\n"; |
1032
|
|
|
|
|
|
|
} |
1033
|
|
|
|
|
|
|
|
1034
|
1120
|
50
|
|
|
|
9898090
|
if (!close (OUT)) { |
1035
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: cannot close $pathtmpctrl for write: $!"; |
1036
|
0
|
|
|
|
|
0
|
return; |
1037
|
|
|
|
|
|
|
} |
1038
|
|
|
|
|
|
|
|
1039
|
1120
|
|
|
|
|
4787
|
return 1; |
1040
|
|
|
|
|
|
|
} |
1041
|
|
|
|
|
|
|
|
1042
|
|
|
|
|
|
|
sub read_control_file { |
1043
|
490
|
|
|
490
|
0
|
973
|
my ($self, $job, $infh) = @_; |
1044
|
490
|
|
|
|
|
629
|
local ($_); |
1045
|
|
|
|
|
|
|
|
1046
|
490
|
|
|
|
|
7792
|
while (<$infh>) { |
1047
|
2940
|
|
|
|
|
19660
|
my ($k, $value) = split (/: /, $_, 2); |
1048
|
2940
|
|
|
|
|
4028
|
chop $value; |
1049
|
2940
|
100
|
|
|
|
8907
|
if ($k =~ /^Q[A-Z]{3}$/) { |
1050
|
2450
|
|
|
|
|
20818
|
$job->{$k} = $value; |
1051
|
|
|
|
|
|
|
} |
1052
|
|
|
|
|
|
|
else { |
1053
|
490
|
|
|
|
|
5077
|
$job->{metadata}->{$k} = $value; |
1054
|
|
|
|
|
|
|
} |
1055
|
|
|
|
|
|
|
} |
1056
|
|
|
|
|
|
|
|
1057
|
|
|
|
|
|
|
# all jobs must have a datafile (even if it's empty) |
1058
|
490
|
50
|
33
|
|
|
11227
|
if (!$job->{QDFN} || !-f $job->{QDFN}) { |
1059
|
0
|
|
|
|
|
0
|
return; |
1060
|
|
|
|
|
|
|
} |
1061
|
|
|
|
|
|
|
|
1062
|
490
|
|
|
|
|
1743
|
return $job; |
1063
|
|
|
|
|
|
|
# print OUT "QDFN: ", $job->{pathdata}, "\n"; |
1064
|
|
|
|
|
|
|
# print OUT "QDSB: ", $job->{size_bytes}, "\n"; |
1065
|
|
|
|
|
|
|
# print OUT "QSTT: ", $job->{time_submitted_secs}, "\n"; |
1066
|
|
|
|
|
|
|
# print OUT "QSTM: ", $job->{time_submitted_msecs}, "\n"; |
1067
|
|
|
|
|
|
|
# print OUT "QSHN: ", $self->gethostname(), "\n"; |
1068
|
|
|
|
|
|
|
} |
1069
|
|
|
|
|
|
|
|
1070
|
|
|
|
|
|
|
sub worker_still_working { |
1071
|
0
|
|
|
0
|
0
|
0
|
my ($self, $fname) = @_; |
1072
|
0
|
0
|
|
|
|
0
|
if (!$fname) { |
1073
|
0
|
|
|
|
|
0
|
return; |
1074
|
|
|
|
|
|
|
} |
1075
|
0
|
0
|
|
|
|
0
|
if (!open (IN, "<".$fname)) { |
1076
|
0
|
|
|
|
|
0
|
return; |
1077
|
|
|
|
|
|
|
} |
1078
|
0
|
|
|
|
|
0
|
my $hname = ; chomp $hname; |
|
0
|
|
|
|
|
0
|
|
1079
|
0
|
|
|
|
|
0
|
my $wpid = ; chomp $wpid; |
|
0
|
|
|
|
|
0
|
|
1080
|
0
|
|
|
|
|
0
|
close IN; |
1081
|
0
|
0
|
|
|
|
0
|
if ($hname eq $self->gethostname()) { |
1082
|
0
|
0
|
|
|
|
0
|
if (!kill (0, $wpid)) { |
1083
|
0
|
|
|
|
|
0
|
return; # pid is local and no longer running |
1084
|
|
|
|
|
|
|
} |
1085
|
|
|
|
|
|
|
} |
1086
|
|
|
|
|
|
|
|
1087
|
|
|
|
|
|
|
# pid is still running, or remote |
1088
|
0
|
|
|
|
|
0
|
return 1; |
1089
|
|
|
|
|
|
|
} |
1090
|
|
|
|
|
|
|
|
1091
|
|
|
|
|
|
|
########################################################################### |
1092
|
|
|
|
|
|
|
|
1093
|
|
|
|
|
|
|
sub q_dir { |
1094
|
6004
|
|
|
6004
|
0
|
9126
|
my ($self) = @_; |
1095
|
6004
|
|
|
|
|
51461
|
return $self->{dir}; |
1096
|
|
|
|
|
|
|
} |
1097
|
|
|
|
|
|
|
|
1098
|
|
|
|
|
|
|
sub q_subdir { |
1099
|
6004
|
|
|
6004
|
0
|
12163
|
my ($self, $subdir) = @_; |
1100
|
6004
|
|
|
|
|
20900
|
return $self->q_dir().SLASH.$subdir; |
1101
|
|
|
|
|
|
|
} |
1102
|
|
|
|
|
|
|
|
1103
|
|
|
|
|
|
|
sub new_q_filename { |
1104
|
1120
|
|
|
1120
|
0
|
2304
|
my ($self, $job, $addextra) = @_; |
1105
|
|
|
|
|
|
|
|
1106
|
1120
|
|
|
|
|
12616
|
my @gmt = gmtime ($job->{time_submitted_secs}); |
1107
|
|
|
|
|
|
|
|
1108
|
|
|
|
|
|
|
# NN.20040718140300MMMM.hash(hostname.$$)[.rand] |
1109
|
|
|
|
|
|
|
# |
1110
|
|
|
|
|
|
|
# NN = priority, default 50 |
1111
|
|
|
|
|
|
|
# MMMM = microseconds from Time::HiRes::gettimeofday() |
1112
|
|
|
|
|
|
|
# hostname = current hostname |
1113
|
|
|
|
|
|
|
|
1114
|
1120
|
|
|
|
|
12604
|
my $buf = sprintf ("%02d.%04d%02d%02d%02d%02d%02d%06d.%s", |
1115
|
|
|
|
|
|
|
$job->{pri}, |
1116
|
|
|
|
|
|
|
$gmt[5]+1900, $gmt[4]+1, $gmt[3], $gmt[2], $gmt[1], $gmt[0], |
1117
|
|
|
|
|
|
|
$job->{time_submitted_msecs}, |
1118
|
|
|
|
|
|
|
hash_string_to_filename ($self->gethostname().$$)); |
1119
|
|
|
|
|
|
|
|
1120
|
|
|
|
|
|
|
# normally, this isn't used. but if there's a collision, |
1121
|
|
|
|
|
|
|
# all retries after that will do this; in this case, the |
1122
|
|
|
|
|
|
|
# extra anti-collision stuff is useful |
1123
|
1120
|
50
|
|
|
|
4287
|
if ($addextra) { |
1124
|
0
|
|
|
|
|
0
|
$buf .= ".".$$.".".$self->get_random_int(); |
1125
|
|
|
|
|
|
|
} |
1126
|
|
|
|
|
|
|
|
1127
|
1120
|
|
|
|
|
4609
|
return $buf; |
1128
|
|
|
|
|
|
|
} |
1129
|
|
|
|
|
|
|
|
1130
|
|
|
|
|
|
|
sub hash_string_to_filename { |
1131
|
1120
|
|
|
1120
|
0
|
3310
|
my ($str) = @_; |
1132
|
|
|
|
|
|
|
# get a 16-bit checksum of the input, then uuencode that string |
1133
|
1120
|
|
|
|
|
13994
|
$str = pack ("u*", unpack ("%16C*", $str)); |
1134
|
|
|
|
|
|
|
# transcode from uuencode-space into safe, base64-ish space |
1135
|
1120
|
|
|
|
|
5823
|
$str =~ y/ -_/A-Za-z0-9+_/; |
1136
|
|
|
|
|
|
|
# and remove the stuff that wasn't in that "safe" range |
1137
|
1120
|
|
|
|
|
4067
|
$str =~ y/A-Za-z0-9+_//cd; |
1138
|
1120
|
|
|
|
|
12647
|
return $str; |
1139
|
|
|
|
|
|
|
} |
1140
|
|
|
|
|
|
|
|
1141
|
|
|
|
|
|
|
sub new_lock_filename { |
1142
|
400
|
|
|
400
|
0
|
636
|
my ($self) = @_; |
1143
|
400
|
|
|
|
|
1261
|
return sprintf ("%d.%s.%d", time, $self->gethostname(), $$); |
1144
|
|
|
|
|
|
|
} |
1145
|
|
|
|
|
|
|
|
1146
|
|
|
|
|
|
|
sub get_random_int { |
1147
|
130
|
|
|
130
|
0
|
231
|
my ($self) = @_; |
1148
|
|
|
|
|
|
|
|
1149
|
|
|
|
|
|
|
# we try to use /dev/random first, as that's globally random for all PIDs on |
1150
|
|
|
|
|
|
|
# the system. this avoids brokenness if the caller has called srand(), then |
1151
|
|
|
|
|
|
|
# forked multiple enqueueing procs, as they will all share the same seed and |
1152
|
|
|
|
|
|
|
# will all return the same "random" output. |
1153
|
130
|
|
|
|
|
166
|
my $buf; |
1154
|
130
|
50
|
33
|
|
|
2701
|
if (sysopen (IN, "
|
1155
|
0
|
|
|
|
|
0
|
my ($hi, $lo) = unpack ("C2", $buf); |
1156
|
0
|
|
|
|
|
0
|
return ($hi << 8) | $lo; |
1157
|
|
|
|
|
|
|
} else { |
1158
|
|
|
|
|
|
|
# fall back to plain old rand(), use perl's implicit srand() call, |
1159
|
|
|
|
|
|
|
# and hope caller hasn't called srand() yet in a parent process. |
1160
|
130
|
|
|
|
|
1005
|
return int rand (65536); |
1161
|
|
|
|
|
|
|
} |
1162
|
|
|
|
|
|
|
} |
1163
|
|
|
|
|
|
|
|
1164
|
|
|
|
|
|
|
sub gethostname { |
1165
|
3040
|
|
|
3040
|
0
|
6736
|
my ($self) = @_; |
1166
|
|
|
|
|
|
|
|
1167
|
3040
|
|
|
|
|
9931
|
my $hname = $self->{myhostname}; |
1168
|
3040
|
100
|
|
|
|
26152
|
return $hname if $hname; |
1169
|
|
|
|
|
|
|
|
1170
|
|
|
|
|
|
|
# try using Sys::Hostname. may fail on non-UNIX platforms |
1171
|
16
|
|
|
16
|
|
7853
|
eval ' |
|
16
|
|
|
|
|
42804
|
|
|
16
|
|
|
|
|
37575
|
|
|
16
|
|
|
|
|
1818
|
|
1172
|
|
|
|
|
|
|
use Sys::Hostname; |
1173
|
|
|
|
|
|
|
$self->{myhostname} = hostname; # cache the result |
1174
|
|
|
|
|
|
|
'; |
1175
|
|
|
|
|
|
|
|
1176
|
|
|
|
|
|
|
# could have failed. supply a default in that case |
1177
|
16
|
|
50
|
|
|
694
|
$self->{myhostname} ||= 'nohost'; |
1178
|
|
|
|
|
|
|
|
1179
|
16
|
|
|
|
|
265
|
return $self->{myhostname}; |
1180
|
|
|
|
|
|
|
} |
1181
|
|
|
|
|
|
|
|
1182
|
|
|
|
|
|
|
sub ensure_dir_exists { |
1183
|
5155
|
|
|
5155
|
0
|
9658
|
my ($self, $dir) = @_; |
1184
|
5155
|
100
|
|
|
|
24863
|
return if exists ($self->{ensured_dir_exists}->{$dir}); |
1185
|
105
|
|
|
|
|
381
|
$self->{ensured_dir_exists}->{$dir} = 1; |
1186
|
105
|
100
|
|
|
|
59784
|
(-d $dir) or mkdir($dir); |
1187
|
|
|
|
|
|
|
} |
1188
|
|
|
|
|
|
|
|
1189
|
|
|
|
|
|
|
sub queuedir_is_bad { |
1190
|
2
|
|
|
2
|
0
|
5
|
my ($self, $pathqueuedir) = @_; |
1191
|
|
|
|
|
|
|
|
1192
|
|
|
|
|
|
|
# try creating the dir; it may not exist yet |
1193
|
2
|
|
|
|
|
6
|
$self->ensure_dir_exists ($pathqueuedir); |
1194
|
2
|
50
|
|
|
|
51
|
if (!opendir (RETRY, $pathqueuedir)) { |
1195
|
|
|
|
|
|
|
# still can't open it! problem |
1196
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: cannot open queue dir \"$pathqueuedir\": $!\n"; |
1197
|
0
|
|
|
|
|
0
|
return 1; |
1198
|
|
|
|
|
|
|
} |
1199
|
|
|
|
|
|
|
# otherwise, we could open it -- it just needed to be created. |
1200
|
2
|
|
|
|
|
19
|
closedir RETRY; |
1201
|
2
|
|
|
|
|
19
|
return 0; |
1202
|
|
|
|
|
|
|
} |
1203
|
|
|
|
|
|
|
|
1204
|
|
|
|
|
|
|
sub dbg { |
1205
|
12135
|
50
|
|
12135
|
0
|
40130
|
return unless $DEBUG; |
1206
|
0
|
|
|
|
|
0
|
warn "dq debug: ".join(' ',@_)."\n"; |
1207
|
|
|
|
|
|
|
} |
1208
|
|
|
|
|
|
|
|
1209
|
|
|
|
|
|
|
########################################################################### |
1210
|
|
|
|
|
|
|
|
1211
|
|
|
|
|
|
|
sub queue_iter_start { |
1212
|
407
|
|
|
407
|
0
|
794
|
my ($self, $pathqueuedir) = @_; |
1213
|
|
|
|
|
|
|
|
1214
|
407
|
50
|
|
|
|
1670
|
if ($self->{indexclient}) { |
|
|
100
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
1215
|
0
|
|
|
|
|
0
|
dbg ("queue iter: getting list for $pathqueuedir"); |
1216
|
0
|
|
|
|
|
0
|
my @files = sort grep { /^\d/ } $self->{indexclient}->ls($pathqueuedir); |
|
0
|
|
|
|
|
0
|
|
1217
|
|
|
|
|
|
|
|
1218
|
0
|
0
|
|
|
|
0
|
if (scalar @files <= 0) { |
1219
|
0
|
0
|
|
|
|
0
|
return if $self->queuedir_is_bad($pathqueuedir); |
1220
|
|
|
|
|
|
|
} |
1221
|
|
|
|
|
|
|
|
1222
|
0
|
|
|
|
|
0
|
return { files => \@files }; |
1223
|
|
|
|
|
|
|
} |
1224
|
|
|
|
|
|
|
elsif ($self->{ordered}) { |
1225
|
343
|
|
|
|
|
1383
|
dbg ("queue iter: opening $pathqueuedir (ordered)"); |
1226
|
343
|
|
|
|
|
38399
|
my $files = $self->get_dir_filelist_sorted($pathqueuedir); |
1227
|
343
|
100
|
|
|
|
1016
|
if (scalar @$files <= 0) { |
1228
|
2
|
50
|
|
|
|
10
|
return if $self->queuedir_is_bad($pathqueuedir); |
1229
|
|
|
|
|
|
|
} |
1230
|
|
|
|
|
|
|
|
1231
|
343
|
|
|
|
|
1481
|
return { files => $files }; |
1232
|
|
|
|
|
|
|
} |
1233
|
|
|
|
|
|
|
elsif ($self->{queue_fanout}) { |
1234
|
32
|
|
|
|
|
85
|
return $self->queue_iter_fanout_start($pathqueuedir); |
1235
|
|
|
|
|
|
|
} |
1236
|
|
|
|
|
|
|
else { |
1237
|
32
|
|
|
|
|
35
|
my $dirfh; |
1238
|
32
|
|
|
|
|
78
|
dbg ("queue iter: opening $pathqueuedir"); |
1239
|
32
|
50
|
|
|
|
797
|
if (!opendir ($dirfh, $pathqueuedir)) { |
1240
|
0
|
0
|
|
|
|
0
|
return if $self->queuedir_is_bad($pathqueuedir); |
1241
|
0
|
0
|
|
|
|
0
|
if (!opendir ($dirfh, $pathqueuedir)) { |
1242
|
0
|
|
|
|
|
0
|
warn "oops? pathqueuedir bad"; |
1243
|
0
|
|
|
|
|
0
|
return; |
1244
|
|
|
|
|
|
|
} |
1245
|
|
|
|
|
|
|
} |
1246
|
|
|
|
|
|
|
|
1247
|
32
|
|
|
|
|
108
|
return { fh => $dirfh }; |
1248
|
|
|
|
|
|
|
} |
1249
|
|
|
|
|
|
|
|
1250
|
0
|
|
|
|
|
0
|
die "cannot get here"; |
1251
|
|
|
|
|
|
|
} |
1252
|
|
|
|
|
|
|
|
1253
|
|
|
|
|
|
|
sub queue_iter_next { |
1254
|
1579
|
|
|
1579
|
0
|
2371
|
my ($self, $iter) = @_; |
1255
|
|
|
|
|
|
|
|
1256
|
1579
|
50
|
|
|
|
5319
|
if ($self->{indexclient}) { |
|
|
100
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
1257
|
0
|
|
|
|
|
0
|
return shift @{$iter->{files}}; |
|
0
|
|
|
|
|
0
|
|
1258
|
|
|
|
|
|
|
} |
1259
|
|
|
|
|
|
|
elsif ($self->{ordered}) { |
1260
|
546
|
|
|
|
|
692
|
return shift @{$iter->{files}}; |
|
546
|
|
|
|
|
1714
|
|
1261
|
|
|
|
|
|
|
} |
1262
|
|
|
|
|
|
|
elsif ($self->{queue_fanout}) { |
1263
|
687
|
|
|
|
|
1589
|
return $self->queue_iter_fanout_next($iter); |
1264
|
|
|
|
|
|
|
} |
1265
|
|
|
|
|
|
|
else { |
1266
|
346
|
|
|
|
|
1988
|
return readdir($iter->{fh}); |
1267
|
|
|
|
|
|
|
} |
1268
|
|
|
|
|
|
|
|
1269
|
0
|
|
|
|
|
0
|
return; |
1270
|
|
|
|
|
|
|
} |
1271
|
|
|
|
|
|
|
|
1272
|
|
|
|
|
|
|
sub queue_iter_stop { |
1273
|
407
|
|
|
407
|
0
|
638
|
my ($self, $iter) = @_; |
1274
|
|
|
|
|
|
|
|
1275
|
407
|
50
|
|
|
|
822
|
return unless $iter; |
1276
|
407
|
100
|
|
|
|
1260
|
if (defined $iter->{fanfh}) { closedir($iter->{fanfh}); } |
|
30
|
|
|
|
|
403
|
|
1277
|
407
|
100
|
|
|
|
1296
|
if (defined $iter->{fh}) { closedir($iter->{fh}); } |
|
32
|
|
|
|
|
347
|
|
1278
|
|
|
|
|
|
|
} |
1279
|
|
|
|
|
|
|
|
1280
|
|
|
|
|
|
|
########################################################################### |
1281
|
|
|
|
|
|
|
|
1282
|
|
|
|
|
|
|
sub queue_dir_fanout_create { |
1283
|
1120
|
|
|
1120
|
0
|
2178
|
my ($self, $pathqueuedir) = @_; |
1284
|
|
|
|
|
|
|
|
1285
|
1120
|
100
|
|
|
|
4449
|
if (!$self->{queue_fanout}) { |
1286
|
990
|
|
|
|
|
2580
|
return; |
1287
|
|
|
|
|
|
|
} |
1288
|
|
|
|
|
|
|
|
1289
|
130
|
|
|
|
|
1922
|
my @letters = split '', q{0123456789abcdef}; |
1290
|
130
|
|
|
|
|
582
|
my $fanout = $letters[get_random_int() % (scalar @letters)]; |
1291
|
|
|
|
|
|
|
|
1292
|
130
|
|
|
|
|
571
|
$self->ensure_dir_exists ($pathqueuedir); |
1293
|
130
|
|
|
|
|
581
|
$self->ensure_dir_exists ($pathqueuedir.SLASH.$fanout); |
1294
|
130
|
|
|
|
|
701
|
return $fanout; |
1295
|
|
|
|
|
|
|
} |
1296
|
|
|
|
|
|
|
|
1297
|
|
|
|
|
|
|
sub queue_dir_fanout_commit { |
1298
|
1120
|
|
|
1120
|
0
|
2234
|
my ($self, $pathqueuedir, $fanout) = @_; |
1299
|
|
|
|
|
|
|
|
1300
|
1120
|
100
|
|
|
|
4010
|
if (!$self->{queue_fanout}) { |
1301
|
990
|
|
|
|
|
3452
|
return; |
1302
|
|
|
|
|
|
|
} |
1303
|
|
|
|
|
|
|
|
1304
|
|
|
|
|
|
|
# now touch all levels ($pathqueuedir will be touched later) |
1305
|
130
|
50
|
|
|
|
810
|
$self->touch($pathqueuedir.SLASH.$fanout) |
1306
|
|
|
|
|
|
|
or die "cannot touch fanout for $pathqueuedir/$fanout"; |
1307
|
|
|
|
|
|
|
} |
1308
|
|
|
|
|
|
|
|
1309
|
|
|
|
|
|
|
sub queue_dir_fanout_path { |
1310
|
1120
|
|
|
1120
|
0
|
2266
|
my ($self, $pathqueuedir, $fanout) = @_; |
1311
|
|
|
|
|
|
|
|
1312
|
1120
|
100
|
|
|
|
4403
|
if (!$self->{queue_fanout}) { |
1313
|
990
|
|
|
|
|
3672
|
return $pathqueuedir; |
1314
|
|
|
|
|
|
|
} |
1315
|
|
|
|
|
|
|
else { |
1316
|
130
|
|
|
|
|
692
|
return $pathqueuedir.SLASH.$fanout; |
1317
|
|
|
|
|
|
|
} |
1318
|
|
|
|
|
|
|
} |
1319
|
|
|
|
|
|
|
|
1320
|
|
|
|
|
|
|
sub queue_dir_fanout_path_strip { |
1321
|
1572
|
|
|
1572
|
0
|
2174
|
my ($self, $fname) = @_; |
1322
|
|
|
|
|
|
|
|
1323
|
1572
|
100
|
|
|
|
11466
|
if ($self->{queue_fanout}) { |
1324
|
685
|
|
|
|
|
19182
|
$fname =~ s/^.*\///; |
1325
|
|
|
|
|
|
|
} |
1326
|
1572
|
|
|
|
|
3501
|
return $fname; |
1327
|
|
|
|
|
|
|
} |
1328
|
|
|
|
|
|
|
|
1329
|
|
|
|
|
|
|
sub queue_iter_fanout_start { |
1330
|
32
|
|
|
32
|
0
|
47
|
my ($self, $pathqueuedir) = @_; |
1331
|
32
|
|
|
|
|
57
|
my $iter = { }; |
1332
|
|
|
|
|
|
|
|
1333
|
|
|
|
|
|
|
{ |
1334
|
32
|
|
|
|
|
40
|
my @fanouts; |
|
32
|
|
|
|
|
40
|
|
1335
|
32
|
|
|
|
|
98
|
dbg ("queue iter: opening $pathqueuedir"); |
1336
|
32
|
50
|
|
|
|
703
|
if (!opendir (DIR, $pathqueuedir)) { |
1337
|
0
|
|
|
|
|
0
|
@fanouts = (); # no dir? nothing queued |
1338
|
|
|
|
|
|
|
} |
1339
|
|
|
|
|
|
|
else { |
1340
|
416
|
|
|
|
|
6664
|
my %map = map { |
1341
|
480
|
|
|
|
|
1055
|
$_ => (-M $pathqueuedir.SLASH.$_) |
1342
|
32
|
|
|
|
|
471
|
} grep { /^[a-z0-9]$/ } readdir(DIR); |
1343
|
32
|
|
|
|
|
283
|
@fanouts = sort { $map{$a} <=> $map{$b} } keys %map; |
|
896
|
|
|
|
|
1193
|
|
1344
|
32
|
|
|
|
|
187
|
dbg ("fanout: $pathqueuedir, order is ".join ' ', @fanouts); |
1345
|
|
|
|
|
|
|
} |
1346
|
32
|
|
|
|
|
350
|
closedir DIR; |
1347
|
32
|
|
|
|
|
101
|
$iter->{fanoutlist} = \@fanouts; |
1348
|
32
|
|
|
|
|
80
|
$iter->{pathqueuedir} = $pathqueuedir; |
1349
|
|
|
|
|
|
|
|
1350
|
|
|
|
|
|
|
} |
1351
|
32
|
|
|
|
|
95
|
return $iter; |
1352
|
|
|
|
|
|
|
} |
1353
|
|
|
|
|
|
|
|
1354
|
|
|
|
|
|
|
sub queue_iter_fanout_next { |
1355
|
687
|
|
|
687
|
0
|
1841
|
my ($self, $iter) = @_; |
1356
|
|
|
|
|
|
|
|
1357
|
|
|
|
|
|
|
# dir handles are: |
1358
|
|
|
|
|
|
|
# /path/to/queue = $iter->{fh} |
1359
|
|
|
|
|
|
|
# /f = $iter->{fanfh} |
1360
|
|
|
|
|
|
|
|
1361
|
|
|
|
|
|
|
next_fanout: |
1362
|
|
|
|
|
|
|
|
1363
|
|
|
|
|
|
|
# open the {fanfh} handle, if it isn't already going |
1364
|
884
|
100
|
|
|
|
1997
|
if (!defined $iter->{fanfh}) { |
1365
|
229
|
|
|
|
|
228
|
my $nextfanout = shift @{$iter->{fanoutlist}}; |
|
229
|
|
|
|
|
507
|
|
1366
|
229
|
100
|
|
|
|
484
|
if (!defined $nextfanout) { |
1367
|
2
|
|
|
|
|
5
|
dbg ("fanout: end of list"); |
1368
|
2
|
|
|
|
|
5
|
return; |
1369
|
|
|
|
|
|
|
} |
1370
|
|
|
|
|
|
|
|
1371
|
227
|
|
|
|
|
248
|
my $dirfh; |
1372
|
227
|
|
|
|
|
607
|
dbg ("fanout: opening next dir: $nextfanout"); |
1373
|
227
|
50
|
|
|
|
18624
|
if (!opendir ($dirfh, $iter->{pathqueuedir}.SLASH.$nextfanout)) { |
1374
|
0
|
|
|
|
|
0
|
warn "opendir failed $iter->{pathqueuedir}/$nextfanout: $!"; |
1375
|
0
|
|
|
|
|
0
|
return; |
1376
|
|
|
|
|
|
|
} |
1377
|
|
|
|
|
|
|
|
1378
|
227
|
|
|
|
|
420
|
$iter->{fanstr} = $nextfanout; |
1379
|
227
|
|
|
|
|
686
|
$iter->{fanfh} = $dirfh; |
1380
|
|
|
|
|
|
|
} |
1381
|
|
|
|
|
|
|
|
1382
|
882
|
|
|
|
|
4492
|
my $fname = readdir($iter->{fanfh}); |
1383
|
882
|
100
|
|
|
|
1960
|
if (defined $fname) { |
1384
|
685
|
|
|
|
|
2150
|
return $iter->{fanstr}.SLASH.$fname; # best-case scenario |
1385
|
|
|
|
|
|
|
} |
1386
|
|
|
|
|
|
|
|
1387
|
197
|
|
|
|
|
415
|
dbg ("fanout: finished this dir, trying next one"); |
1388
|
197
|
|
|
|
|
2056
|
closedir($iter->{fanfh}); |
1389
|
197
|
|
|
|
|
358
|
$iter->{fanstr} = undef; |
1390
|
197
|
|
|
|
|
289
|
$iter->{fanfh} = undef; |
1391
|
197
|
|
|
|
|
546
|
goto next_fanout; |
1392
|
|
|
|
|
|
|
} |
1393
|
|
|
|
|
|
|
|
1394
|
23
|
|
|
23
|
|
316
|
use constant UTIME_TAKES_UNDEF_FOR_TOUCH => ($] >= 5.007002); |
|
23
|
|
|
|
|
80
|
|
|
23
|
|
|
|
|
5614
|
|
1395
|
|
|
|
|
|
|
|
1396
|
|
|
|
|
|
|
sub touch { |
1397
|
1250
|
|
|
1250
|
0
|
3001
|
my ($self, $path) = @_; |
1398
|
|
|
|
|
|
|
|
1399
|
|
|
|
|
|
|
# 'Since perl 5.7.2, if the first two elements of the list are "undef", then |
1400
|
|
|
|
|
|
|
# the utime(2) function in the C library will be called with a null second |
1401
|
|
|
|
|
|
|
# argument. On most systems, this will set the file's access and modification |
1402
|
|
|
|
|
|
|
# times to the current time'. |
1403
|
|
|
|
|
|
|
|
1404
|
1250
|
|
|
|
|
1771
|
if (UTIME_TAKES_UNDEF_FOR_TOUCH) { |
1405
|
1250
|
|
|
|
|
58649
|
return utime undef, undef, $path; |
1406
|
|
|
|
|
|
|
} else { |
1407
|
|
|
|
|
|
|
my $now = time; |
1408
|
|
|
|
|
|
|
return utime $now, $now, $path; |
1409
|
|
|
|
|
|
|
} |
1410
|
|
|
|
|
|
|
} |
1411
|
|
|
|
|
|
|
|
1412
|
|
|
|
|
|
|
########################################################################### |
1413
|
|
|
|
|
|
|
|
1414
|
|
|
|
|
|
|
1; |
1415
|
|
|
|
|
|
|
|
1416
|
|
|
|
|
|
|
=back |
1417
|
|
|
|
|
|
|
|
1418
|
|
|
|
|
|
|
=head1 STALE LOCKS AND SIGNAL HANDLING |
1419
|
|
|
|
|
|
|
|
1420
|
|
|
|
|
|
|
If interrupted or terminated, dequeueing processes should be careful to either |
1421
|
|
|
|
|
|
|
call C<$job-Efinish()> or C<$job-Ereturn_to_queue()> on any active |
1422
|
|
|
|
|
|
|
tasks before exiting -- otherwise those jobs will remain marked I. |
1423
|
|
|
|
|
|
|
|
1424
|
|
|
|
|
|
|
Dequeueing processes can also call C<$job-Etouch_active_lock()> |
1425
|
|
|
|
|
|
|
periodically, while processing large tasks, to ensure that the task is still |
1426
|
|
|
|
|
|
|
marked as I. |
1427
|
|
|
|
|
|
|
|
1428
|
|
|
|
|
|
|
Stale locks are normally dealt with automatically. If a lock is still |
1429
|
|
|
|
|
|
|
I after about 10 minutes of inactivity, the other dequeuers on |
1430
|
|
|
|
|
|
|
that machine will probe the process ID listed in that lock file using |
1431
|
|
|
|
|
|
|
C. If that process ID is no longer running, the lock is presumed |
1432
|
|
|
|
|
|
|
likely to be stale. If a given timeout (10 minutes plus a random value |
1433
|
|
|
|
|
|
|
between 0 and 256 seconds) has elapsed since the lock file was last |
1434
|
|
|
|
|
|
|
modified, the lock file is deleted. |
1435
|
|
|
|
|
|
|
|
1436
|
|
|
|
|
|
|
This 10-minute default can be modified using the C |
1437
|
|
|
|
|
|
|
parameter to the C constructor. |
1438
|
|
|
|
|
|
|
|
1439
|
|
|
|
|
|
|
Note: this means that if the dequeueing processes are spread among |
1440
|
|
|
|
|
|
|
multiple machines, and there is no longer a dequeuer running on the |
1441
|
|
|
|
|
|
|
machine that initially 'locked' the task, it will never be unlocked, |
1442
|
|
|
|
|
|
|
unless you delete the I file for that task. |
1443
|
|
|
|
|
|
|
|
1444
|
|
|
|
|
|
|
=head1 QUEUE DIRECTORY STRUCTURE |
1445
|
|
|
|
|
|
|
|
1446
|
|
|
|
|
|
|
C maintains the following structure for a queue directory: |
1447
|
|
|
|
|
|
|
|
1448
|
|
|
|
|
|
|
=over 4 |
1449
|
|
|
|
|
|
|
|
1450
|
|
|
|
|
|
|
=item queue directory |
1451
|
|
|
|
|
|
|
|
1452
|
|
|
|
|
|
|
The B directory is used to store the queue control files. Queue |
1453
|
|
|
|
|
|
|
control files determine what jobs are in the queue; if a job has a queue |
1454
|
|
|
|
|
|
|
control file in this directory, it is listed in the queue. |
1455
|
|
|
|
|
|
|
|
1456
|
|
|
|
|
|
|
The filename format is as follows: |
1457
|
|
|
|
|
|
|
|
1458
|
|
|
|
|
|
|
50.20040909232529941258.HASH[.PID.RAND] |
1459
|
|
|
|
|
|
|
|
1460
|
|
|
|
|
|
|
The first two digits (C<50>) are the priority of the job. Lower priority |
1461
|
|
|
|
|
|
|
numbers are run first. C<20040909232529> is the current date and time when the |
1462
|
|
|
|
|
|
|
enqueueing process was run, in C format. C<941258> is the time in |
1463
|
|
|
|
|
|
|
microseconds, as returned by C. And finally, C is a |
1464
|
|
|
|
|
|
|
variable-length hash of some semi-random data, used to increase the chance of |
1465
|
|
|
|
|
|
|
uniqueness. |
1466
|
|
|
|
|
|
|
|
1467
|
|
|
|
|
|
|
If there is a collision, the timestamps are regenerated after a 250 msec sleep, |
1468
|
|
|
|
|
|
|
and further randomness will be added at the end of the string (namely, the |
1469
|
|
|
|
|
|
|
current process ID and a random integer value). Up to 10 retries will be |
1470
|
|
|
|
|
|
|
attempted. Once the file is atomically moved into the B directory |
1471
|
|
|
|
|
|
|
without collision, the retries cease. |
1472
|
|
|
|
|
|
|
|
1473
|
|
|
|
|
|
|
If B was used in the C constructor, then |
1474
|
|
|
|
|
|
|
the B directory does not contain the queue control files directly; |
1475
|
|
|
|
|
|
|
instead, there is an interposing set of 16 "fan-out" directories, named |
1476
|
|
|
|
|
|
|
according to the hex digits from C<0> to C. |
1477
|
|
|
|
|
|
|
|
1478
|
|
|
|
|
|
|
=item active directory |
1479
|
|
|
|
|
|
|
|
1480
|
|
|
|
|
|
|
The B directory is used to store active queue control files. |
1481
|
|
|
|
|
|
|
|
1482
|
|
|
|
|
|
|
When a job becomes 'active' -- ie. is picked up by C -- |
1483
|
|
|
|
|
|
|
its control file is moved from the B directory into the B |
1484
|
|
|
|
|
|
|
directory while it is processed. |
1485
|
|
|
|
|
|
|
|
1486
|
|
|
|
|
|
|
=item data directory |
1487
|
|
|
|
|
|
|
|
1488
|
|
|
|
|
|
|
The B directory is used to store enqueued data files. |
1489
|
|
|
|
|
|
|
|
1490
|
|
|
|
|
|
|
It contains a two-level "fan-out" hashed directory structure; each data file is |
1491
|
|
|
|
|
|
|
stored under a single-letter directory, which in turn is under a single-letter |
1492
|
|
|
|
|
|
|
directory. This increases the efficiency of directory lookups under many |
1493
|
|
|
|
|
|
|
filesystems. |
1494
|
|
|
|
|
|
|
|
1495
|
|
|
|
|
|
|
The format of filenames here is similar to that used in the B directory, |
1496
|
|
|
|
|
|
|
except that the last two characters are removed and used instead for the |
1497
|
|
|
|
|
|
|
"fan-out" directory names. |
1498
|
|
|
|
|
|
|
|
1499
|
|
|
|
|
|
|
=item tmp directory |
1500
|
|
|
|
|
|
|
|
1501
|
|
|
|
|
|
|
The B directory contains temporary work files that are in the process |
1502
|
|
|
|
|
|
|
of enqueueing, and not ready ready for processing. |
1503
|
|
|
|
|
|
|
|
1504
|
|
|
|
|
|
|
The filename format here is similar to the above, with suffixes indicating |
1505
|
|
|
|
|
|
|
the type of file (".ctrl", ".data"). |
1506
|
|
|
|
|
|
|
|
1507
|
|
|
|
|
|
|
=back |
1508
|
|
|
|
|
|
|
|
1509
|
|
|
|
|
|
|
Atomic, NFS-safe renaming is used to avoid collisions, overwriting or |
1510
|
|
|
|
|
|
|
other unsafe operations. |
1511
|
|
|
|
|
|
|
|
1512
|
|
|
|
|
|
|
=head1 SEE ALSO |
1513
|
|
|
|
|
|
|
|
1514
|
|
|
|
|
|
|
C |
1515
|
|
|
|
|
|
|
|
1516
|
|
|
|
|
|
|
=head1 AUTHOR |
1517
|
|
|
|
|
|
|
|
1518
|
|
|
|
|
|
|
Justin Mason Edq /at/ jmason.orgE |
1519
|
|
|
|
|
|
|
|
1520
|
|
|
|
|
|
|
=head1 MAILING LIST |
1521
|
|
|
|
|
|
|
|
1522
|
|
|
|
|
|
|
The IPC::DirQueue mailing list is at Eipc-dirqueue-subscribe@perl.orgE. |
1523
|
|
|
|
|
|
|
|
1524
|
|
|
|
|
|
|
=head1 COPYRIGHT |
1525
|
|
|
|
|
|
|
|
1526
|
|
|
|
|
|
|
C is distributed under the same license as perl itself. |
1527
|
|
|
|
|
|
|
|
1528
|
|
|
|
|
|
|
=head1 AVAILABILITY |
1529
|
|
|
|
|
|
|
|
1530
|
|
|
|
|
|
|
The latest version of this library is likely to be available from CPAN. |
1531
|
|
|
|
|
|
|
|