| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
package Queue::Dir; |
|
2
|
|
|
|
|
|
|
# $Id: Dir.pm,v 1.13 2003/03/09 16:18:48 lem Exp $ |
|
3
|
|
|
|
|
|
|
|
|
4
|
|
|
|
|
|
|
require 5.005_62; |
|
5
|
|
|
|
|
|
|
|
|
6
|
11
|
|
|
11
|
|
21718
|
use strict; |
|
|
11
|
|
|
|
|
22
|
|
|
|
11
|
|
|
|
|
605
|
|
|
7
|
11
|
|
|
11
|
|
12956
|
use IO::Dir; |
|
|
11
|
|
|
|
|
355022
|
|
|
|
11
|
|
|
|
|
571
|
|
|
8
|
11
|
|
|
11
|
|
100
|
use IO::File; |
|
|
11
|
|
|
|
|
24
|
|
|
|
11
|
|
|
|
|
1538
|
|
|
9
|
11
|
|
|
11
|
|
56
|
use warnings; |
|
|
11
|
|
|
|
|
19
|
|
|
|
11
|
|
|
|
|
276
|
|
|
10
|
11
|
|
|
11
|
|
10856
|
use Sys::Hostname; |
|
|
11
|
|
|
|
|
15171
|
|
|
|
11
|
|
|
|
|
800
|
|
|
11
|
11
|
|
|
11
|
|
71
|
use Fcntl qw(:flock); |
|
|
11
|
|
|
|
|
24
|
|
|
|
11
|
|
|
|
|
1390
|
|
|
12
|
11
|
|
|
11
|
|
10651
|
use Params::Validate qw(:all); |
|
|
11
|
|
|
|
|
145393
|
|
|
|
11
|
|
|
|
|
3206
|
|
|
13
|
|
|
|
|
|
|
|
|
14
|
|
|
|
|
|
|
our $Debug = 0; |
|
15
|
|
|
|
|
|
|
our $hires = 'gettimeofday'; |
|
16
|
|
|
|
|
|
|
|
|
17
|
11
|
|
|
11
|
|
11731
|
eval "use Time::HiRes qw(gettimeofday);"; |
|
|
11
|
|
|
|
|
24436
|
|
|
|
11
|
|
|
|
|
53
|
|
|
18
|
|
|
|
|
|
|
|
|
19
|
|
|
|
|
|
|
if ($@) { $hires = 'time' } |
|
20
|
|
|
|
|
|
|
|
|
21
|
11
|
|
|
11
|
|
99
|
use vars qw($a $b); |
|
|
11
|
|
|
|
|
19
|
|
|
|
11
|
|
|
|
|
8098
|
|
|
22
|
|
|
|
|
|
|
|
|
23
|
|
|
|
|
|
|
our $VERSION = 0.01; |
|
24
|
|
|
|
|
|
|
|
|
25
|
|
|
|
|
|
|
=pod |
|
26
|
|
|
|
|
|
|
|
|
27
|
|
|
|
|
|
|
=head1 NAME |
|
28
|
|
|
|
|
|
|
|
|
29
|
|
|
|
|
|
|
Queue::Dir - Manage queue directories where each object is a file |
|
30
|
|
|
|
|
|
|
|
|
31
|
|
|
|
|
|
|
=head1 SYNOPSIS |
|
32
|
|
|
|
|
|
|
|
|
33
|
|
|
|
|
|
|
use Queue::Dir; |
|
34
|
|
|
|
|
|
|
|
|
35
|
|
|
|
|
|
|
my $q = new Queue::File ( |
|
36
|
|
|
|
|
|
|
-id => $my_process_id, |
|
37
|
|
|
|
|
|
|
-paths => [ '/var/path/to/queue1', ... ], |
|
38
|
|
|
|
|
|
|
-promiscuous => 1, |
|
39
|
|
|
|
|
|
|
-sort => 'sortsub', |
|
40
|
|
|
|
|
|
|
-filter => sub { ... }, |
|
41
|
|
|
|
|
|
|
-lockdir => 'lock', |
|
42
|
|
|
|
|
|
|
-lockmax => 300, |
|
43
|
|
|
|
|
|
|
); |
|
44
|
|
|
|
|
|
|
|
|
45
|
|
|
|
|
|
|
my ($fh, $qid) = $q->store($oid); |
|
46
|
|
|
|
|
|
|
|
|
47
|
|
|
|
|
|
|
my $qid = $q->next(); |
|
48
|
|
|
|
|
|
|
|
|
49
|
|
|
|
|
|
|
my $fh = $q->visit($mode, $qid); |
|
50
|
|
|
|
|
|
|
|
|
51
|
|
|
|
|
|
|
my $status = $q->done($qid); |
|
52
|
|
|
|
|
|
|
|
|
53
|
|
|
|
|
|
|
my $name = $q->name($qid); |
|
54
|
|
|
|
|
|
|
|
|
55
|
|
|
|
|
|
|
=head1 DESCRIPTION |
|
56
|
|
|
|
|
|
|
|
|
57
|
|
|
|
|
|
|
C allows the manipulation of objects placed in a |
|
58
|
|
|
|
|
|
|
queue. The queue is implemented as a directory where each object is |
|
59
|
|
|
|
|
|
|
stored as a file. |
|
60
|
|
|
|
|
|
|
|
|
61
|
|
|
|
|
|
|
=head2 METHODS |
|
62
|
|
|
|
|
|
|
|
|
63
|
|
|
|
|
|
|
The following methods are defined: |
|
64
|
|
|
|
|
|
|
|
|
65
|
|
|
|
|
|
|
=over 4 |
|
66
|
|
|
|
|
|
|
|
|
67
|
|
|
|
|
|
|
=item C |
|
68
|
|
|
|
|
|
|
|
|
69
|
|
|
|
|
|
|
B<-id> assigns a unique process-id to this queue object. Defaults to |
|
70
|
|
|
|
|
|
|
something built from the serialization of the object + C<$$> or |
|
71
|
|
|
|
|
|
|
something similar. |
|
72
|
|
|
|
|
|
|
|
|
73
|
|
|
|
|
|
|
B<-paths> specifies a list of paths to use as storage points for the |
|
74
|
|
|
|
|
|
|
queue files. If more than one are supplied, round-robin will be used |
|
75
|
|
|
|
|
|
|
to store objects there. |
|
76
|
|
|
|
|
|
|
|
|
77
|
|
|
|
|
|
|
When B<-promiscuous> is true (the default), objects stored with any |
|
78
|
|
|
|
|
|
|
other C object are accessible. If set to false, only |
|
79
|
|
|
|
|
|
|
files whose id matches the value for B<-id> are visible. |
|
80
|
|
|
|
|
|
|
|
|
81
|
|
|
|
|
|
|
B<-sort> allows for the specification of a sorting function, used to |
|
82
|
|
|
|
|
|
|
decide the order in which the queue files will be used. The function |
|
83
|
|
|
|
|
|
|
is invoked in the same fashion as C, getting two variables |
|
84
|
|
|
|
|
|
|
(C<$a> and C<$b>) and returning -1, 0 or 1 depending on |
|
85
|
|
|
|
|
|
|
comparison. C<$a> and C<$b> are hash references whose first element is |
|
86
|
|
|
|
|
|
|
the queue id of the object and the second element is a the full |
|
87
|
|
|
|
|
|
|
pathname of such object. |
|
88
|
|
|
|
|
|
|
|
|
89
|
|
|
|
|
|
|
The C passed in the B<-filter> parameter can control |
|
90
|
|
|
|
|
|
|
which files in a given directory to consider as queue objects. By |
|
91
|
|
|
|
|
|
|
default, all files will be considered part of the queue. This function |
|
92
|
|
|
|
|
|
|
is called with a reference of the invoking object and the full |
|
93
|
|
|
|
|
|
|
pathname of each file. A true return value causes the given file to be |
|
94
|
|
|
|
|
|
|
included in the queue. Note that this is only called if |
|
95
|
|
|
|
|
|
|
B<-promiscuous> is set to a false value. |
|
96
|
|
|
|
|
|
|
|
|
97
|
|
|
|
|
|
|
B<-lockdir> and B<-lockmax> control an optional locking mechanism that |
|
98
|
|
|
|
|
|
|
reduces the chance of multiple collaborating instances of |
|
99
|
|
|
|
|
|
|
C objects, from picking the same object from the |
|
100
|
|
|
|
|
|
|
queue. B<-lockdir>, when present, defines the name of the directory |
|
101
|
|
|
|
|
|
|
(within each queue directory) to use for storing the lock files. The |
|
102
|
|
|
|
|
|
|
B<-lockmax> parameter, which defaults to 300 seconds, control for how |
|
103
|
|
|
|
|
|
|
long the locks are honored. |
|
104
|
|
|
|
|
|
|
|
|
105
|
|
|
|
|
|
|
Note that locking is disabled by default. |
|
106
|
|
|
|
|
|
|
|
|
107
|
|
|
|
|
|
|
=cut |
|
108
|
|
|
|
|
|
|
|
|
109
|
|
|
|
|
|
|
sub new |
|
110
|
|
|
|
|
|
|
{ |
|
111
|
23
|
|
|
23
|
1
|
6833
|
my $name = shift; |
|
112
|
23
|
|
33
|
|
|
133
|
my $class = ref($name) || $name; |
|
113
|
|
|
|
|
|
|
|
|
114
|
23
|
50
|
|
|
|
99
|
warn "Queue::Dir::new()\n" if $Debug; |
|
115
|
|
|
|
|
|
|
|
|
116
|
|
|
|
|
|
|
my %self = validate_with |
|
117
|
|
|
|
|
|
|
( |
|
118
|
|
|
|
|
|
|
params => \@_, |
|
119
|
|
|
|
|
|
|
ignore_case => 1, |
|
120
|
|
|
|
|
|
|
strip_leading => '-', |
|
121
|
|
|
|
|
|
|
spec => |
|
122
|
|
|
|
|
|
|
{ |
|
123
|
|
|
|
|
|
|
id => |
|
124
|
|
|
|
|
|
|
{ |
|
125
|
|
|
|
|
|
|
type => SCALAR, |
|
126
|
|
|
|
|
|
|
default => hostname . $$, |
|
127
|
|
|
|
|
|
|
}, |
|
128
|
|
|
|
|
|
|
paths => |
|
129
|
|
|
|
|
|
|
{ |
|
130
|
|
|
|
|
|
|
type => ARRAYREF, |
|
131
|
|
|
|
|
|
|
callbacks => |
|
132
|
|
|
|
|
|
|
{ |
|
133
|
23
|
|
|
23
|
|
1872
|
directory => sub { $_ = shift; @$_ == grep { -d } @$_; } |
|
|
23
|
|
|
|
|
69
|
|
|
|
24
|
|
|
|
|
1370
|
|
|
134
|
|
|
|
|
|
|
} |
|
135
|
|
|
|
|
|
|
}, |
|
136
|
|
|
|
|
|
|
promiscuous => |
|
137
|
|
|
|
|
|
|
{ |
|
138
|
|
|
|
|
|
|
type => SCALAR | BOOLEAN, |
|
139
|
|
|
|
|
|
|
default => 1, |
|
140
|
|
|
|
|
|
|
}, |
|
141
|
|
|
|
|
|
|
sort => |
|
142
|
|
|
|
|
|
|
{ |
|
143
|
|
|
|
|
|
|
type => SCALAR, |
|
144
|
|
|
|
|
|
|
default => 'Queue::Dir::_sort', |
|
145
|
|
|
|
|
|
|
}, |
|
146
|
|
|
|
|
|
|
lockdir => |
|
147
|
|
|
|
|
|
|
{ |
|
148
|
|
|
|
|
|
|
type => SCALAR, |
|
149
|
|
|
|
|
|
|
default => undef, |
|
150
|
|
|
|
|
|
|
}, |
|
151
|
|
|
|
|
|
|
lockmax => |
|
152
|
|
|
|
|
|
|
{ |
|
153
|
|
|
|
|
|
|
type => SCALAR, |
|
154
|
|
|
|
|
|
|
default => 300, |
|
155
|
|
|
|
|
|
|
callbacks => |
|
156
|
|
|
|
|
|
|
{ |
|
157
|
0
|
|
|
0
|
|
0
|
numeric => sub { shift =~ /^\d+$/ }, |
|
158
|
0
|
|
|
0
|
|
0
|
positive => sub { shift > 0 }, |
|
159
|
|
|
|
|
|
|
}, |
|
160
|
|
|
|
|
|
|
}, |
|
161
|
|
|
|
|
|
|
filter => |
|
162
|
|
|
|
|
|
|
{ |
|
163
|
|
|
|
|
|
|
type => CODEREF, |
|
164
|
|
|
|
|
|
|
default => sub |
|
165
|
|
|
|
|
|
|
{ |
|
166
|
0
|
|
|
0
|
|
0
|
my $self = shift; |
|
167
|
0
|
|
|
|
|
0
|
my $long = shift; |
|
168
|
|
|
|
|
|
|
|
|
169
|
0
|
0
|
|
|
|
0
|
return 0 unless $long; |
|
170
|
|
|
|
|
|
|
|
|
171
|
0
|
|
|
|
|
0
|
my ($path, $id) = (File::Spec->splitpath($long))[1,2]; |
|
172
|
|
|
|
|
|
|
|
|
173
|
0
|
|
|
|
|
0
|
for my $p (@{$self->{paths}}) |
|
|
0
|
|
|
|
|
0
|
|
|
174
|
|
|
|
|
|
|
{ |
|
175
|
0
|
0
|
0
|
|
|
0
|
if (substr($p->[0], $path, 0) == 0 |
|
|
|
|
0
|
|
|
|
|
|
176
|
|
|
|
|
|
|
and -f $p->[0] . '/' . $id |
|
177
|
|
|
|
|
|
|
and $id =~ m!^\d+\.\d+\.$self->{id}\.\d+$!) |
|
178
|
|
|
|
|
|
|
{ |
|
179
|
0
|
|
|
|
|
0
|
return 1; |
|
180
|
|
|
|
|
|
|
} |
|
181
|
|
|
|
|
|
|
} |
|
182
|
|
|
|
|
|
|
|
|
183
|
0
|
|
|
|
|
0
|
return 0; |
|
184
|
|
|
|
|
|
|
}, |
|
185
|
|
|
|
|
|
|
}, |
|
186
|
23
|
|
|
|
|
144
|
}); |
|
187
|
|
|
|
|
|
|
|
|
188
|
21
|
|
|
|
|
406
|
@{$self{paths}} = sort { $a cmp $b } @{$self{paths}}; |
|
|
21
|
|
|
|
|
68
|
|
|
|
1
|
|
|
|
|
3
|
|
|
|
21
|
|
|
|
|
74
|
|
|
189
|
|
|
|
|
|
|
|
|
190
|
21
|
|
|
|
|
52
|
$_ = [$_, new IO::Dir $_] for @{$self{paths}}; |
|
|
21
|
|
|
|
|
196
|
|
|
191
|
|
|
|
|
|
|
|
|
192
|
21
|
50
|
|
|
|
1970
|
if (grep { ! defined $_->[1] } @{$self{paths}}) { |
|
|
22
|
|
|
|
|
134
|
|
|
|
21
|
|
|
|
|
52
|
|
|
193
|
0
|
|
|
|
|
0
|
warn "One of the queue paths seems invalid\n"; |
|
194
|
0
|
|
|
|
|
0
|
return; |
|
195
|
|
|
|
|
|
|
} |
|
196
|
|
|
|
|
|
|
|
|
197
|
|
|
|
|
|
|
# Prime the object with an empty file |
|
198
|
|
|
|
|
|
|
# inventory. |
|
199
|
21
|
|
|
|
|
74
|
$self{_files} = []; |
|
200
|
|
|
|
|
|
|
|
|
201
|
|
|
|
|
|
|
# We store objects in round-robin. |
|
202
|
21
|
|
|
|
|
47
|
$self{_rr} = 0; |
|
203
|
21
|
|
|
|
|
52
|
$self{_current} = [0, 0]; |
|
204
|
|
|
|
|
|
|
|
|
205
|
21
|
|
|
|
|
55
|
my $self = bless \%self, $class; |
|
206
|
|
|
|
|
|
|
|
|
207
|
21
|
100
|
|
|
|
129
|
$self->_clean_locks if $self->{lockdir}; |
|
208
|
|
|
|
|
|
|
|
|
209
|
21
|
|
|
|
|
492
|
return $self->_refresh; |
|
210
|
|
|
|
|
|
|
} |
|
211
|
|
|
|
|
|
|
|
|
212
|
60
|
|
|
60
|
|
99
|
sub _sort { $a->[0] cmp $b->[0]; } |
|
213
|
11
|
|
|
11
|
|
69
|
sub _timestamp { no strict "refs"; return join '', &$hires; } |
|
|
11
|
|
|
28
|
|
26
|
|
|
|
11
|
|
|
|
|
22291
|
|
|
|
28
|
|
|
|
|
282
|
|
|
214
|
|
|
|
|
|
|
|
|
215
|
|
|
|
|
|
|
# Update the inventory of queue |
|
216
|
|
|
|
|
|
|
# objects, if required. |
|
217
|
|
|
|
|
|
|
sub _refresh |
|
218
|
|
|
|
|
|
|
{ |
|
219
|
64
|
|
|
64
|
|
297
|
my $self = shift; |
|
220
|
|
|
|
|
|
|
|
|
221
|
64
|
50
|
|
|
|
146
|
warn "Queue::Dir::_refresh()\n" if $Debug; |
|
222
|
|
|
|
|
|
|
|
|
223
|
|
|
|
|
|
|
# warn "_files ", scalar @{$self->{_files}}, " _current[0] ", |
|
224
|
|
|
|
|
|
|
# $self->{_current}->[0], "\n"; |
|
225
|
|
|
|
|
|
|
|
|
226
|
64
|
100
|
66
|
|
|
70
|
unless (@{$self->{_files}} or $self->{_current}->[0]) |
|
|
64
|
|
|
|
|
378
|
|
|
227
|
|
|
|
|
|
|
{ |
|
228
|
47
|
50
|
|
|
|
98
|
warn "Queue::Dir::_refresh() running\n" if $Debug; |
|
229
|
|
|
|
|
|
|
|
|
230
|
47
|
|
|
|
|
56
|
for my $p (@{$self->{paths}}) |
|
|
47
|
|
|
|
|
98
|
|
|
231
|
|
|
|
|
|
|
{ |
|
232
|
|
|
|
|
|
|
# warn "p\n"; |
|
233
|
48
|
|
|
|
|
222
|
$p->[1]->rewind; |
|
234
|
48
|
|
|
|
|
702
|
while (defined (my $f = $p->[1]->read)) |
|
235
|
|
|
|
|
|
|
{ |
|
236
|
173
|
100
|
100
|
|
|
2552
|
next if $f eq '.' or $f eq '..'; |
|
237
|
77
|
100
|
|
|
|
1067
|
next unless -f $p->[0] . '/' . $f; |
|
238
|
|
|
|
|
|
|
# warn "f\n"; |
|
239
|
46
|
|
|
|
|
216
|
my $t = [$f, $p->[0] . '/' . $f]; |
|
240
|
46
|
0
|
33
|
|
|
128
|
if (!$self->{promiscuous} |
|
241
|
|
|
|
|
|
|
and !$self->{filter}->($t->[1])) |
|
242
|
|
|
|
|
|
|
{ |
|
243
|
0
|
|
|
|
|
0
|
next; |
|
244
|
|
|
|
|
|
|
} |
|
245
|
46
|
|
|
|
|
52
|
push @{$self->{_files}}, $t; |
|
|
46
|
|
|
|
|
197
|
|
|
246
|
|
|
|
|
|
|
} |
|
247
|
|
|
|
|
|
|
} |
|
248
|
|
|
|
|
|
|
# XXX - I seem unable to specify the sort |
|
249
|
|
|
|
|
|
|
# function directly. |
|
250
|
47
|
|
|
|
|
515
|
my $sort = $self->{sort}; |
|
251
|
47
|
|
|
|
|
60
|
@{$self->{_files}} = sort $sort @{$self->{_files}}; |
|
|
47
|
|
|
|
|
111
|
|
|
|
47
|
|
|
|
|
191
|
|
|
252
|
|
|
|
|
|
|
# $self->{_current} = shift @{$self->{_files}} || [0,0]; |
|
253
|
|
|
|
|
|
|
} |
|
254
|
|
|
|
|
|
|
|
|
255
|
64
|
|
|
|
|
158
|
return $self; |
|
256
|
|
|
|
|
|
|
} |
|
257
|
|
|
|
|
|
|
|
|
258
|
|
|
|
|
|
|
# Give a $qid, fetch pathname |
|
259
|
|
|
|
|
|
|
sub _name |
|
260
|
|
|
|
|
|
|
{ |
|
261
|
76
|
|
|
76
|
|
91
|
my $self = shift; |
|
262
|
76
|
|
33
|
|
|
203
|
my $qid = shift || $self->{_current}->[0] || $self->next; |
|
263
|
|
|
|
|
|
|
|
|
264
|
|
|
|
|
|
|
# First, try to find this object in |
|
265
|
|
|
|
|
|
|
# out cached structures |
|
266
|
|
|
|
|
|
|
|
|
267
|
76
|
50
|
|
|
|
237
|
for my $t (($self->{_current}->[1] ? $self->{_current} : ()), |
|
|
76
|
|
|
|
|
176
|
|
|
268
|
|
|
|
|
|
|
@{$self->{_files}}) |
|
269
|
|
|
|
|
|
|
{ |
|
270
|
81
|
100
|
|
|
|
238
|
if ($qid eq $t->[0]) { return $t->[1]; } |
|
|
46
|
|
|
|
|
532
|
|
|
271
|
|
|
|
|
|
|
} |
|
272
|
|
|
|
|
|
|
|
|
273
|
|
|
|
|
|
|
# As a last resort, attempt to find |
|
274
|
|
|
|
|
|
|
# the objext in the fs |
|
275
|
|
|
|
|
|
|
|
|
276
|
30
|
|
|
|
|
39
|
for my $p (@{$self->{paths}}) |
|
|
30
|
|
|
|
|
59
|
|
|
277
|
|
|
|
|
|
|
{ |
|
278
|
42
|
|
|
|
|
186
|
$p->[1]->rewind; |
|
279
|
42
|
|
|
|
|
308
|
while (my $n = $p->[1]->read) |
|
280
|
|
|
|
|
|
|
{ |
|
281
|
131
|
100
|
|
|
|
1267
|
if ($n eq $qid) |
|
282
|
|
|
|
|
|
|
{ |
|
283
|
29
|
|
|
|
|
215
|
return $p->[0] . '/' . $n; |
|
284
|
|
|
|
|
|
|
} |
|
285
|
|
|
|
|
|
|
} |
|
286
|
|
|
|
|
|
|
} |
|
287
|
|
|
|
|
|
|
|
|
288
|
|
|
|
|
|
|
# Otherwise, we have to fail... |
|
289
|
|
|
|
|
|
|
|
|
290
|
1
|
|
|
|
|
12
|
return; |
|
291
|
|
|
|
|
|
|
} |
|
292
|
|
|
|
|
|
|
|
|
293
|
|
|
|
|
|
|
sub _clean_locks |
|
294
|
|
|
|
|
|
|
{ |
|
295
|
10
|
|
|
10
|
|
14
|
my $self = shift; |
|
296
|
|
|
|
|
|
|
|
|
297
|
10
|
50
|
|
|
|
97
|
return unless $self->{lockdir}; |
|
298
|
|
|
|
|
|
|
|
|
299
|
10
|
|
|
|
|
13
|
for my $p (@{$self->{paths}}) |
|
|
10
|
|
|
|
|
25
|
|
|
300
|
|
|
|
|
|
|
{ |
|
301
|
10
|
|
|
|
|
26
|
my $lock = $p->[0] . '/' . $self->{lockdir}; |
|
302
|
10
|
|
|
|
|
334
|
mkdir $lock; |
|
303
|
10
|
|
|
|
|
52
|
my $d = new IO::Dir $lock; |
|
304
|
10
|
|
|
|
|
554
|
while (my $f = $d->read) |
|
305
|
|
|
|
|
|
|
{ |
|
306
|
21
|
100
|
100
|
|
|
429
|
next if $f eq '.' or $f eq '..'; |
|
307
|
1
|
|
|
|
|
3
|
my $name = $lock . '/' . $f; |
|
308
|
1
|
50
|
|
|
|
86
|
if ((stat($name))[9] + $self->{lockmax} < time) |
|
309
|
|
|
|
|
|
|
{ |
|
310
|
0
|
|
|
|
|
0
|
unlink $name; |
|
311
|
|
|
|
|
|
|
} |
|
312
|
|
|
|
|
|
|
} |
|
313
|
|
|
|
|
|
|
} |
|
314
|
|
|
|
|
|
|
|
|
315
|
|
|
|
|
|
|
} |
|
316
|
|
|
|
|
|
|
|
|
317
|
|
|
|
|
|
|
# The test below might seem redundant, but |
|
318
|
|
|
|
|
|
|
# it's an attempt to improve in a lot of |
|
319
|
|
|
|
|
|
|
# broken NFS locking implementations. |
|
320
|
|
|
|
|
|
|
|
|
321
|
|
|
|
|
|
|
sub _lock |
|
322
|
|
|
|
|
|
|
{ |
|
323
|
47
|
|
|
47
|
|
72
|
my $self = shift; |
|
324
|
47
|
|
|
|
|
67
|
my $qid = shift; |
|
325
|
|
|
|
|
|
|
|
|
326
|
47
|
|
|
|
|
182
|
$self->{lockfh} = new IO::File; |
|
327
|
|
|
|
|
|
|
|
|
328
|
47
|
50
|
|
|
|
1308
|
warn "_lock $qid\n" if $Debug; |
|
329
|
|
|
|
|
|
|
|
|
330
|
47
|
100
|
|
|
|
186
|
return 1 unless $self->{lockdir}; |
|
331
|
|
|
|
|
|
|
|
|
332
|
15
|
|
|
|
|
91
|
$self->{lockfile} = $self->{paths}->[(split(/\./, $qid))[1]]->[0]; |
|
333
|
|
|
|
|
|
|
|
|
334
|
15
|
50
|
|
|
|
57
|
return unless $self->{lockfile}; |
|
335
|
|
|
|
|
|
|
|
|
336
|
15
|
|
|
|
|
309
|
$self->{_key} = $self->{id} . '-' . $$ . '-' . int(rand(10000)); |
|
337
|
15
|
|
|
|
|
52
|
$self->{lockfile} .= '/' . $self->{lockdir} . '/' . $qid; |
|
338
|
|
|
|
|
|
|
|
|
339
|
15
|
50
|
|
|
|
46
|
warn "_lock lockfile is $self->{lockfile}\n" if $Debug; |
|
340
|
|
|
|
|
|
|
|
|
341
|
15
|
100
|
|
|
|
282
|
if (-f $self->{lockfile}) |
|
342
|
|
|
|
|
|
|
{ |
|
343
|
3
|
50
|
|
|
|
30
|
if ((stat(_))[9] + $self->{lockmax} < time) |
|
344
|
|
|
|
|
|
|
{ |
|
345
|
0
|
0
|
|
|
|
0
|
warn "_lock forcing unlink (stale) lockfile\n" if $Debug; |
|
346
|
0
|
|
|
|
|
0
|
unlink $self->{lockfile}; |
|
347
|
|
|
|
|
|
|
} |
|
348
|
|
|
|
|
|
|
else |
|
349
|
|
|
|
|
|
|
{ |
|
350
|
3
|
50
|
|
|
|
10
|
warn "_lock failing due to previous lock\n" if $Debug; |
|
351
|
3
|
|
|
|
|
25
|
return; |
|
352
|
|
|
|
|
|
|
} |
|
353
|
|
|
|
|
|
|
} |
|
354
|
|
|
|
|
|
|
# Store our key in the lock file |
|
355
|
|
|
|
|
|
|
|
|
356
|
12
|
50
|
|
|
|
55
|
$self->{lockfh}->open($self->{lockfile}, O_RDWR | O_CREAT) or return; |
|
357
|
12
|
|
|
|
|
1013
|
$self->{lockfh}->autoflush(1); |
|
358
|
|
|
|
|
|
|
|
|
359
|
12
|
50
|
|
|
|
583
|
unless (flock $self->{lockfh}, LOCK_EX | LOCK_NB) |
|
360
|
|
|
|
|
|
|
{ |
|
361
|
0
|
|
|
|
|
0
|
$self->{lockfh}->close; |
|
362
|
0
|
|
|
|
|
0
|
$self->{lockfh} = undef; |
|
363
|
0
|
|
|
|
|
0
|
unlink $self->{lockfile}; |
|
364
|
0
|
|
|
|
|
0
|
$self->{lockfile} = undef; |
|
365
|
0
|
|
|
|
|
0
|
return; |
|
366
|
|
|
|
|
|
|
} |
|
367
|
12
|
|
|
|
|
69
|
$self->{lockfh}->print($self->{_key}); |
|
368
|
|
|
|
|
|
|
|
|
369
|
12
|
50
|
|
|
|
6501
|
warn "_lock key $self->{_key} stored\n" if $Debug; |
|
370
|
|
|
|
|
|
|
|
|
371
|
|
|
|
|
|
|
# Verify that the key is indeed in the |
|
372
|
|
|
|
|
|
|
# lock file |
|
373
|
|
|
|
|
|
|
|
|
374
|
12
|
|
|
|
|
77
|
$self->{lockfh}->seek(0, 0); |
|
375
|
12
|
|
|
|
|
536
|
chomp(my $rkey = $self->{lockfh}->getline); |
|
376
|
|
|
|
|
|
|
|
|
377
|
12
|
50
|
|
|
|
543
|
warn "_lock key $rkey recovered\n" if $Debug; |
|
378
|
|
|
|
|
|
|
|
|
379
|
12
|
50
|
|
|
|
61
|
unless ($rkey eq $self->{_key}) |
|
380
|
|
|
|
|
|
|
{ |
|
381
|
0
|
|
|
|
|
0
|
$self->{lockfh}->close; |
|
382
|
0
|
|
|
|
|
0
|
$self->{lockfh} = undef; |
|
383
|
0
|
|
|
|
|
0
|
unlink $self->{lockfile}; |
|
384
|
0
|
|
|
|
|
0
|
$self->{lockfile} = undef; |
|
385
|
0
|
|
|
|
|
0
|
return; |
|
386
|
|
|
|
|
|
|
} |
|
387
|
|
|
|
|
|
|
|
|
388
|
12
|
50
|
|
|
|
27
|
warn "_lock key matched\n" if $Debug; |
|
389
|
|
|
|
|
|
|
|
|
390
|
|
|
|
|
|
|
# If all this passed, the lock is ours |
|
391
|
12
|
|
|
|
|
51
|
return 1; |
|
392
|
|
|
|
|
|
|
} |
|
393
|
|
|
|
|
|
|
|
|
394
|
|
|
|
|
|
|
=pod |
|
395
|
|
|
|
|
|
|
|
|
396
|
|
|
|
|
|
|
=item Cstore();> |
|
397
|
|
|
|
|
|
|
|
|
398
|
|
|
|
|
|
|
Store a file in the queue. Returns an array whose first element is an |
|
399
|
|
|
|
|
|
|
C object for writing to the file. The second element is |
|
400
|
|
|
|
|
|
|
the identifier of the object in the queue. |
|
401
|
|
|
|
|
|
|
|
|
402
|
|
|
|
|
|
|
If you created the C object with locking enabled, you must |
|
403
|
|
|
|
|
|
|
call C<-Eunlock> after closing the file handle. |
|
404
|
|
|
|
|
|
|
|
|
405
|
|
|
|
|
|
|
=cut |
|
406
|
|
|
|
|
|
|
|
|
407
|
|
|
|
|
|
|
sub store |
|
408
|
|
|
|
|
|
|
{ |
|
409
|
28
|
|
|
28
|
1
|
10770
|
my $self = shift; |
|
410
|
28
|
|
|
|
|
158
|
my $fh = new IO::File; |
|
411
|
28
|
|
|
|
|
1233
|
my $queue = $self->{paths}->[$self->{_rr}]; |
|
412
|
28
|
|
|
|
|
64
|
my $qid = _timestamp . '.' . $self->{_rr} . '.' . $self->{id}; |
|
413
|
28
|
|
|
|
|
56
|
my $counter = 0; |
|
414
|
28
|
|
|
|
|
33
|
my $pname; |
|
415
|
|
|
|
|
|
|
|
|
416
|
28
|
50
|
|
|
|
88
|
warn "Queue::Dir::store() qid=$qid\n" if $Debug; |
|
417
|
|
|
|
|
|
|
|
|
418
|
28
|
|
|
|
|
54
|
$self->{_rr} ++; |
|
419
|
28
|
|
|
|
|
38
|
$self->{_rr} %= @{$self->{paths}}; |
|
|
28
|
|
|
|
|
68
|
|
|
420
|
|
|
|
|
|
|
|
|
421
|
28
|
|
|
|
|
614
|
while (-f ($pname = $queue->[0] . '/' . $qid . '.' . $counter)) |
|
422
|
|
|
|
|
|
|
{ |
|
423
|
0
|
|
|
|
|
0
|
++ $counter; |
|
424
|
|
|
|
|
|
|
} |
|
425
|
|
|
|
|
|
|
|
|
426
|
28
|
|
|
|
|
60
|
$qid .= '.' . $counter; |
|
427
|
|
|
|
|
|
|
|
|
428
|
28
|
50
|
|
|
|
121
|
$fh->open($pname, "w") or return; |
|
429
|
28
|
|
|
|
|
3082
|
$self->{_current} = [$qid, $pname]; |
|
430
|
|
|
|
|
|
|
|
|
431
|
28
|
|
|
|
|
105
|
$self->_lock($qid); |
|
432
|
|
|
|
|
|
|
|
|
433
|
28
|
|
|
|
|
101
|
return ($fh, $qid); |
|
434
|
|
|
|
|
|
|
} |
|
435
|
|
|
|
|
|
|
|
|
436
|
|
|
|
|
|
|
=pod |
|
437
|
|
|
|
|
|
|
|
|
438
|
|
|
|
|
|
|
=item Cnext();> |
|
439
|
|
|
|
|
|
|
|
|
440
|
|
|
|
|
|
|
Returns the queue identifier of the next file to be processed. When |
|
441
|
|
|
|
|
|
|
the queue is empty, returns undef. |
|
442
|
|
|
|
|
|
|
|
|
443
|
|
|
|
|
|
|
Note that if multiple consumers are working on the same queues in |
|
444
|
|
|
|
|
|
|
promiscuous mode, the file referenced by the returned id might be |
|
445
|
|
|
|
|
|
|
removed at any time so care must be used. |
|
446
|
|
|
|
|
|
|
|
|
447
|
|
|
|
|
|
|
Entries will be returned in an arbitrary order. |
|
448
|
|
|
|
|
|
|
|
|
449
|
|
|
|
|
|
|
=cut |
|
450
|
|
|
|
|
|
|
|
|
451
|
|
|
|
|
|
|
sub next |
|
452
|
|
|
|
|
|
|
{ |
|
453
|
46
|
|
|
46
|
1
|
8165
|
my $self = shift; |
|
454
|
|
|
|
|
|
|
|
|
455
|
46
|
100
|
|
|
|
56
|
$self->_refresh unless @{$self->{_files}}; |
|
|
46
|
|
|
|
|
169
|
|
|
456
|
|
|
|
|
|
|
|
|
457
|
46
|
|
100
|
|
|
56
|
$self->{_current} = shift @{$self->{_files}} || [0, 0]; |
|
458
|
|
|
|
|
|
|
|
|
459
|
46
|
50
|
|
|
|
132
|
warn "Queue::Dir::next() current=", $self->{_current}->[0], "\n" if $Debug; |
|
460
|
|
|
|
|
|
|
|
|
461
|
|
|
|
|
|
|
# warn "next: Current queue has\n"; |
|
462
|
|
|
|
|
|
|
# foreach (@{$self->{_files}}) |
|
463
|
|
|
|
|
|
|
# { |
|
464
|
|
|
|
|
|
|
# warn " $_->[1]\n"; |
|
465
|
|
|
|
|
|
|
# } |
|
466
|
|
|
|
|
|
|
|
|
467
|
46
|
100
|
|
|
|
114
|
unless ($self->{_current}->[0]) |
|
468
|
|
|
|
|
|
|
{ |
|
469
|
21
|
|
|
|
|
43
|
$self->_refresh; |
|
470
|
21
|
|
|
|
|
70
|
return; |
|
471
|
|
|
|
|
|
|
} |
|
472
|
|
|
|
|
|
|
|
|
473
|
25
|
|
|
|
|
94
|
return $self->{_current}->[0]; |
|
474
|
|
|
|
|
|
|
} |
|
475
|
|
|
|
|
|
|
|
|
476
|
|
|
|
|
|
|
=pod |
|
477
|
|
|
|
|
|
|
|
|
478
|
|
|
|
|
|
|
=item Cvisit($mode, $qid);> |
|
479
|
|
|
|
|
|
|
|
|
480
|
|
|
|
|
|
|
On success, returns an C object, opened according to the |
|
481
|
|
|
|
|
|
|
specified C<$mode> for the file with C<$qid>. If C<$mode> is not |
|
482
|
|
|
|
|
|
|
specified, it defaults to a read from the start of the file. If |
|
483
|
|
|
|
|
|
|
C<$qid> is not specified, it defaults to the next entry, as if |
|
484
|
|
|
|
|
|
|
C<-Enext()> were called. In order for the file to be eligible, |
|
485
|
|
|
|
|
|
|
either the C object is not created with locking enabled or |
|
486
|
|
|
|
|
|
|
the file in the queue is not locked. |
|
487
|
|
|
|
|
|
|
|
|
488
|
|
|
|
|
|
|
It can fail in a number of situations. The obvious one, is when the |
|
489
|
|
|
|
|
|
|
queue is empty. The second one, happens when the desired file is no |
|
490
|
|
|
|
|
|
|
longer in the queue, which can happen if multiple consumers are |
|
491
|
|
|
|
|
|
|
accessing the queue in promiscuous mode. |
|
492
|
|
|
|
|
|
|
|
|
493
|
|
|
|
|
|
|
To help disambiguate both scenarios, undef will be returned on an |
|
494
|
|
|
|
|
|
|
empty queue. A defined but false value will be returned when the |
|
495
|
|
|
|
|
|
|
desired file is missing but others remain in the queue. |
|
496
|
|
|
|
|
|
|
|
|
497
|
|
|
|
|
|
|
The object in the queue will be automatically locked if this option is |
|
498
|
|
|
|
|
|
|
enabled when C<-Enew> was called. In this case, you should call |
|
499
|
|
|
|
|
|
|
the C<-Eunlock> method. |
|
500
|
|
|
|
|
|
|
|
|
501
|
|
|
|
|
|
|
=cut |
|
502
|
|
|
|
|
|
|
|
|
503
|
|
|
|
|
|
|
sub visit |
|
504
|
|
|
|
|
|
|
{ |
|
505
|
20
|
|
|
20
|
1
|
3385
|
my $self = shift; |
|
506
|
20
|
|
100
|
|
|
101
|
my $mode = shift || "r"; |
|
507
|
20
|
|
100
|
|
|
139
|
my $qid = shift || $self->{_current}->[0] || $self->next; |
|
508
|
|
|
|
|
|
|
|
|
509
|
20
|
50
|
|
|
|
50
|
warn "Queue::Dir::visit() qid=$qid\n" if $Debug; |
|
510
|
|
|
|
|
|
|
|
|
511
|
20
|
100
|
|
|
|
57
|
return unless $qid; |
|
512
|
|
|
|
|
|
|
|
|
513
|
19
|
|
|
|
|
85
|
my $fh = new IO::File; |
|
514
|
19
|
|
|
|
|
492
|
my $name; |
|
515
|
|
|
|
|
|
|
|
|
516
|
19
|
|
33
|
|
|
58
|
until ($name = $self->_name($qid) |
|
|
|
|
66
|
|
|
|
|
|
|
|
|
66
|
|
|
|
|
|
517
|
|
|
|
|
|
|
and -f $name |
|
518
|
|
|
|
|
|
|
and $self->_lock($qid) |
|
519
|
|
|
|
|
|
|
and $fh->open($name, $mode)) |
|
520
|
|
|
|
|
|
|
{ |
|
521
|
3
|
50
|
|
|
|
16
|
unless ($qid = $self->next) |
|
522
|
|
|
|
|
|
|
{ |
|
523
|
3
|
50
|
|
|
|
6
|
if (@{$self->{_files}}) |
|
|
3
|
|
|
|
|
10
|
|
|
524
|
|
|
|
|
|
|
{ |
|
525
|
3
|
50
|
|
|
|
25
|
warn "Queue::Dir::visit() ret undef\n" if $Debug; |
|
526
|
3
|
|
|
|
|
16
|
return undef; |
|
527
|
|
|
|
|
|
|
} |
|
528
|
|
|
|
|
|
|
else |
|
529
|
|
|
|
|
|
|
{ |
|
530
|
0
|
0
|
|
|
|
0
|
warn "Queue::Dir::visit() ret 0\n" if $Debug; |
|
531
|
0
|
|
|
|
|
0
|
return 0; |
|
532
|
|
|
|
|
|
|
} |
|
533
|
|
|
|
|
|
|
} |
|
534
|
|
|
|
|
|
|
} |
|
535
|
|
|
|
|
|
|
|
|
536
|
16
|
|
|
|
|
921
|
return $fh; |
|
537
|
|
|
|
|
|
|
} |
|
538
|
|
|
|
|
|
|
|
|
539
|
|
|
|
|
|
|
=pod |
|
540
|
|
|
|
|
|
|
|
|
541
|
|
|
|
|
|
|
=item C<$q-Edone($qid);> |
|
542
|
|
|
|
|
|
|
|
|
543
|
|
|
|
|
|
|
Disposes the queue file whose C<$qid> matches the given identifier as |
|
544
|
|
|
|
|
|
|
well as its potential lock. If none is specified, defaults to the last |
|
545
|
|
|
|
|
|
|
one used in a C<-Evisit()>. |
|
546
|
|
|
|
|
|
|
|
|
547
|
|
|
|
|
|
|
It is a bad idea (or at least rough manners) to C the file |
|
548
|
|
|
|
|
|
|
without invoking C<-Edone>. Besides, C<-Edone> will do it for |
|
549
|
|
|
|
|
|
|
you. |
|
550
|
|
|
|
|
|
|
|
|
551
|
|
|
|
|
|
|
=cut |
|
552
|
|
|
|
|
|
|
|
|
553
|
|
|
|
|
|
|
sub done |
|
554
|
|
|
|
|
|
|
{ |
|
555
|
27
|
|
|
27
|
1
|
2903
|
my $self = shift; |
|
556
|
27
|
|
66
|
|
|
91
|
my $qid = shift || $self->{_current}->[0]; |
|
557
|
27
|
|
|
|
|
37
|
my $wipe = 0; |
|
558
|
|
|
|
|
|
|
|
|
559
|
27
|
50
|
|
|
|
65
|
warn "Queue::Dir::done() qid=$qid\n" if $Debug; |
|
560
|
|
|
|
|
|
|
|
|
561
|
27
|
50
|
|
|
|
90
|
return if $qid eq 0; |
|
562
|
|
|
|
|
|
|
|
|
563
|
27
|
|
|
|
|
69
|
my $name = $self->_name($qid); |
|
564
|
|
|
|
|
|
|
|
|
565
|
27
|
100
|
|
|
|
232
|
return unless $name; |
|
566
|
|
|
|
|
|
|
|
|
567
|
26
|
|
|
|
|
83
|
$self->unlock($qid); |
|
568
|
|
|
|
|
|
|
|
|
569
|
26
|
|
|
|
|
1950
|
unlink $name; |
|
570
|
|
|
|
|
|
|
|
|
571
|
26
|
|
|
|
|
58
|
for (my $i = 0; |
|
|
31
|
|
|
|
|
293
|
|
|
572
|
|
|
|
|
|
|
$i < @{$self->{_files}}; |
|
573
|
|
|
|
|
|
|
$i ++) |
|
574
|
|
|
|
|
|
|
{ |
|
575
|
7
|
100
|
|
|
|
28
|
if ($self->{_files}->[$i]->[0] eq $qid) |
|
576
|
|
|
|
|
|
|
{ |
|
577
|
2
|
|
|
|
|
3
|
splice(@{$self->{_files}}, $i, 1); |
|
|
2
|
|
|
|
|
7
|
|
|
578
|
2
|
|
|
|
|
7
|
return; |
|
579
|
|
|
|
|
|
|
} |
|
580
|
|
|
|
|
|
|
} |
|
581
|
|
|
|
|
|
|
|
|
582
|
|
|
|
|
|
|
} |
|
583
|
|
|
|
|
|
|
|
|
584
|
|
|
|
|
|
|
=pod |
|
585
|
|
|
|
|
|
|
|
|
586
|
|
|
|
|
|
|
=item Cname($qid);> |
|
587
|
|
|
|
|
|
|
|
|
588
|
|
|
|
|
|
|
Returns the full pathname of the queue file whose id matches |
|
589
|
|
|
|
|
|
|
C<$qid>. If none is supplied, defaults to the last one obtained |
|
590
|
|
|
|
|
|
|
through a C<-Estore()>, C<-Enext()> or C<-Evisit()>. |
|
591
|
|
|
|
|
|
|
|
|
592
|
|
|
|
|
|
|
It could return C is the queue object no longer exists. |
|
593
|
|
|
|
|
|
|
|
|
594
|
|
|
|
|
|
|
=cut |
|
595
|
|
|
|
|
|
|
|
|
596
|
|
|
|
|
|
|
sub name |
|
597
|
|
|
|
|
|
|
{ |
|
598
|
30
|
|
|
30
|
1
|
10513
|
my $self = shift; |
|
599
|
30
|
|
33
|
|
|
79
|
my $qid = shift || $self->{_current}->[0] || $self->next; |
|
600
|
30
|
50
|
|
|
|
51
|
warn "Queue::Dir::name() qid=$qid\n" if $Debug; |
|
601
|
30
|
|
|
|
|
51
|
return $self->_name($qid); |
|
602
|
|
|
|
|
|
|
} |
|
603
|
|
|
|
|
|
|
|
|
604
|
|
|
|
|
|
|
=pod |
|
605
|
|
|
|
|
|
|
|
|
606
|
|
|
|
|
|
|
=item C<-Eunlock($qid)> |
|
607
|
|
|
|
|
|
|
|
|
608
|
|
|
|
|
|
|
Removes any locks outstanding in the file identified by C<$qid>, or |
|
609
|
|
|
|
|
|
|
the last Ced file. Use of this method is only required if the |
|
610
|
|
|
|
|
|
|
object is created with locking enabled. |
|
611
|
|
|
|
|
|
|
|
|
612
|
|
|
|
|
|
|
=cut |
|
613
|
|
|
|
|
|
|
|
|
614
|
|
|
|
|
|
|
sub unlock |
|
615
|
|
|
|
|
|
|
{ |
|
616
|
33
|
|
|
33
|
1
|
7261
|
my $self = shift; |
|
617
|
33
|
|
66
|
|
|
106
|
my $qid = shift || $self->{_current}->[0]; |
|
618
|
33
|
|
|
|
|
146
|
my $fh = new IO::File; |
|
619
|
|
|
|
|
|
|
|
|
620
|
33
|
50
|
|
|
|
889
|
warn "unlock $qid\n" if $Debug; |
|
621
|
|
|
|
|
|
|
|
|
622
|
33
|
100
|
|
|
|
124
|
return 1 unless $self->{lockdir}; |
|
623
|
14
|
100
|
|
|
|
49
|
return 1 unless $self->{lockfh}; |
|
624
|
|
|
|
|
|
|
|
|
625
|
9
|
|
|
|
|
121
|
close $self->{lockfh}; |
|
626
|
9
|
|
|
|
|
19
|
$self->{lockfh} = undef; |
|
627
|
|
|
|
|
|
|
|
|
628
|
9
|
|
|
|
|
863
|
unlink $self->{lockfile}; |
|
629
|
9
|
|
|
|
|
25
|
$self->{lockfile} = undef; |
|
630
|
|
|
|
|
|
|
|
|
631
|
9
|
|
|
|
|
35
|
return 1; |
|
632
|
|
|
|
|
|
|
} |
|
633
|
|
|
|
|
|
|
|
|
634
|
|
|
|
|
|
|
1; |
|
635
|
|
|
|
|
|
|
__END__ |