line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Resque::Worker; |
2
|
|
|
|
|
|
|
# ABSTRACT: Does the hard work of babysitting Resque::Job's |
3
|
|
|
|
|
|
|
$Resque::Worker::VERSION = '0.42'; |
4
|
9
|
|
|
9
|
|
72
|
use Moose; |
|
9
|
|
|
|
|
18
|
|
|
9
|
|
|
|
|
86
|
|
5
|
|
|
|
|
|
|
with 'Resque::Encoder'; |
6
|
|
|
|
|
|
|
|
7
|
9
|
|
|
9
|
|
67158
|
use FindBin; # so it will work after playing around $0 |
|
9
|
|
|
|
|
10575
|
|
|
9
|
|
|
|
|
415
|
|
8
|
9
|
|
|
9
|
|
4359
|
use Resque::Stat; |
|
9
|
|
|
|
|
3245
|
|
|
9
|
|
|
|
|
375
|
|
9
|
9
|
|
|
9
|
|
78
|
use POSIX ":sys_wait_h"; |
|
9
|
|
|
|
|
17
|
|
|
9
|
|
|
|
|
78
|
|
10
|
9
|
|
|
9
|
|
21154
|
use Sys::Hostname; |
|
9
|
|
|
|
|
10172
|
|
|
9
|
|
|
|
|
517
|
|
11
|
9
|
|
|
9
|
|
73
|
use Scalar::Util qw(blessed weaken); |
|
9
|
|
|
|
|
17
|
|
|
9
|
|
|
|
|
468
|
|
12
|
9
|
|
|
9
|
|
5621
|
use List::MoreUtils qw(uniq any); |
|
9
|
|
|
|
|
120657
|
|
|
9
|
|
|
|
|
57
|
|
13
|
9
|
|
|
9
|
|
15923
|
use Time::HiRes qw(sleep); |
|
9
|
|
|
|
|
13023
|
|
|
9
|
|
|
|
|
44
|
|
14
|
9
|
|
|
9
|
|
10152
|
use DateTime; |
|
9
|
|
|
|
|
4200059
|
|
|
9
|
|
|
|
|
472
|
|
15
|
9
|
|
|
9
|
|
93
|
use Try::Tiny; |
|
9
|
|
|
|
|
29
|
|
|
9
|
|
|
|
|
881
|
|
16
|
|
|
|
|
|
|
|
17
|
|
|
|
|
|
|
use overload |
18
|
9
|
|
|
|
|
79
|
'""' => \&_string, |
19
|
|
|
|
|
|
|
'==' => \&_is_equal, |
20
|
9
|
|
|
9
|
|
64
|
'eq' => \&_is_equal; |
|
9
|
|
|
|
|
17
|
|
21
|
|
|
|
|
|
|
|
22
|
|
|
|
|
|
|
has 'resque' => ( |
23
|
|
|
|
|
|
|
is => 'ro', |
24
|
|
|
|
|
|
|
required => 1, |
25
|
|
|
|
|
|
|
handles => [qw/ redis key /] |
26
|
|
|
|
|
|
|
); |
27
|
|
|
|
|
|
|
|
28
|
|
|
|
|
|
|
has queues => ( |
29
|
|
|
|
|
|
|
is => 'rw', |
30
|
|
|
|
|
|
|
isa => 'ArrayRef', |
31
|
|
|
|
|
|
|
lazy => 1, |
32
|
|
|
|
|
|
|
default => sub {[]} |
33
|
|
|
|
|
|
|
); |
34
|
|
|
|
|
|
|
|
35
|
|
|
|
|
|
|
has stat => ( |
36
|
|
|
|
|
|
|
is => 'ro', |
37
|
|
|
|
|
|
|
lazy => 1, |
38
|
|
|
|
|
|
|
default => sub { Resque::Stat->new( resque => $_[0]->resque ) } |
39
|
|
|
|
|
|
|
); |
40
|
|
|
|
|
|
|
|
41
|
|
|
|
|
|
|
has id => ( |
42
|
|
|
|
|
|
|
is => 'rw', |
43
|
|
|
|
|
|
|
lazy => 1, |
44
|
|
|
|
|
|
|
clearer => '_clear_id', |
45
|
|
|
|
|
|
|
default => sub { $_[0]->_stringify } |
46
|
|
|
|
|
|
|
); |
47
|
0
|
|
|
0
|
|
|
sub _string { $_[0]->id } # can't point overload to a mo[o|u]se attribute :-( |
48
|
|
|
|
|
|
|
|
49
|
|
|
|
|
|
|
has verbose => ( is => 'rw', default => sub {0} ); |
50
|
|
|
|
|
|
|
|
51
|
|
|
|
|
|
|
has cant_fork => ( is => 'rw', default => sub {0} ); |
52
|
|
|
|
|
|
|
|
53
|
|
|
|
|
|
|
has cant_poll => ( is => 'rw', default => sub {0} ); |
54
|
|
|
|
|
|
|
|
55
|
|
|
|
|
|
|
has child => ( is => 'rw' ); |
56
|
|
|
|
|
|
|
|
57
|
|
|
|
|
|
|
has shutdown => ( is => 'rw', default => sub{0} ); |
58
|
|
|
|
|
|
|
|
59
|
|
|
|
|
|
|
has paused => ( is => 'rw', default => sub{0} ); |
60
|
|
|
|
|
|
|
|
61
|
|
|
|
|
|
|
has interval => ( is => 'rw', lazy => 1, default => sub{5} ); |
62
|
|
|
|
|
|
|
|
63
|
|
|
|
|
|
|
has timeout => ( is => 'rw', default => sub{30} ); |
64
|
|
|
|
|
|
|
|
65
|
|
|
|
|
|
|
has autoconfig => ( is => 'rw', predicate => 'has_autoconfig' ); |
66
|
|
|
|
|
|
|
|
67
|
0
|
|
|
0
|
1
|
|
sub pause { $_[0]->paused(1) } |
68
|
|
|
|
|
|
|
|
69
|
0
|
|
|
0
|
1
|
|
sub unpause { $_[0]->paused(0) } |
70
|
|
|
|
|
|
|
|
71
|
|
|
|
|
|
|
sub shutdown_please { |
72
|
0
|
|
|
0
|
1
|
|
print "Shutting down...\n"; |
73
|
0
|
|
|
|
|
|
$_[0]->shutdown(1); |
74
|
|
|
|
|
|
|
} |
75
|
|
|
|
|
|
|
|
76
|
0
|
0
|
|
0
|
1
|
|
sub shutdown_now { $_[0]->shutdown_please && $_[0]->kill_child } |
77
|
|
|
|
|
|
|
|
78
|
|
|
|
|
|
|
sub work { |
79
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
80
|
0
|
|
|
|
|
|
my $waiting; # Keep track for logging purposes only! |
81
|
|
|
|
|
|
|
|
82
|
0
|
|
|
|
|
|
$self->startup; |
83
|
0
|
|
|
|
|
|
while ( ! $self->shutdown ) { |
84
|
0
|
0
|
|
|
|
|
$self->autoconfig->($self) if $self->has_autoconfig; |
85
|
0
|
0
|
0
|
|
|
|
if ( !$self->paused && ( my $job = $self->reserve ) ) { |
|
|
0
|
0
|
|
|
|
|
86
|
0
|
|
|
|
|
|
$waiting=0; |
87
|
0
|
|
|
|
|
|
$self->log("Got job $job"); |
88
|
0
|
|
|
|
|
|
$self->work_tick($job); |
89
|
|
|
|
|
|
|
} |
90
|
|
|
|
|
|
|
elsif( !$self->cant_poll && $self->interval ) { |
91
|
0
|
0
|
|
|
|
|
unless ( $waiting ) { |
92
|
0
|
0
|
|
|
|
|
my $status = $self->paused ? "Paused" : 'Waiting for ' . join( ', ', @{$self->queues} ); |
|
0
|
|
|
|
|
|
|
93
|
0
|
|
|
|
|
|
$self->procline( $status ); |
94
|
0
|
|
|
|
|
|
$self->log( $status ); |
95
|
0
|
|
|
|
|
|
$waiting=1; |
96
|
|
|
|
|
|
|
} |
97
|
0
|
|
|
|
|
|
sleep( $self->interval ); |
98
|
|
|
|
|
|
|
} |
99
|
|
|
|
|
|
|
} |
100
|
0
|
|
|
|
|
|
$self->unregister_worker; |
101
|
|
|
|
|
|
|
} |
102
|
|
|
|
|
|
|
|
103
|
|
|
|
|
|
|
sub work_tick { |
104
|
0
|
|
|
0
|
1
|
|
my ($self, $job) = @_; |
105
|
|
|
|
|
|
|
|
106
|
0
|
|
|
|
|
|
$self->working_on($job); |
107
|
0
|
|
|
|
|
|
my $timestamp = DateTime->now->strftime("%Y/%m/%d %H:%M:%S %Z"); |
108
|
|
|
|
|
|
|
|
109
|
0
|
0
|
0
|
|
|
|
if ( !$self->cant_fork && ( my $pid = fork ) ) { |
110
|
0
|
|
|
|
|
|
$self->procline( "Forked $pid at $timestamp" ); |
111
|
0
|
|
|
|
|
|
$self->child($pid); |
112
|
0
|
|
|
|
|
|
$self->log( "Waiting for $pid" ); |
113
|
|
|
|
|
|
|
#while ( ! waitpid( $pid, WNOHANG ) ) { } # non-blocking has sense? |
114
|
0
|
|
|
|
|
|
waitpid( $pid, 0 ); |
115
|
0
|
|
|
|
|
|
$self->log( "Forked job($pid) exited with status $?" ); |
116
|
|
|
|
|
|
|
|
117
|
0
|
0
|
|
|
|
|
if ($?) { |
118
|
0
|
|
|
|
|
|
$job->fail("Exited with status $?"); |
119
|
0
|
|
|
|
|
|
$self->failed(1); |
120
|
|
|
|
|
|
|
} |
121
|
|
|
|
|
|
|
} |
122
|
|
|
|
|
|
|
else { |
123
|
0
|
|
|
|
|
|
undef $SIG{TERM}; |
124
|
0
|
|
|
|
|
|
undef $SIG{INT}; |
125
|
|
|
|
|
|
|
|
126
|
|
|
|
|
|
|
# Allow graceful shutdown in "cant fork mode" |
127
|
0
|
0
|
|
|
|
|
undef $SIG{QUIT} unless $self->cant_fork; |
128
|
|
|
|
|
|
|
|
129
|
0
|
|
|
|
|
|
$self->procline( sprintf( "Processing %s since %s", $job->queue, $timestamp ) ); |
130
|
0
|
|
|
|
|
|
$self->perform($job); |
131
|
0
|
0
|
|
|
|
|
exit(0) unless $self->cant_fork; |
132
|
|
|
|
|
|
|
} |
133
|
|
|
|
|
|
|
|
134
|
0
|
|
|
|
|
|
$self->done_working; |
135
|
0
|
|
|
|
|
|
$self->child(0); |
136
|
|
|
|
|
|
|
} |
137
|
|
|
|
|
|
|
|
138
|
|
|
|
|
|
|
|
139
|
|
|
|
|
|
|
sub perform { |
140
|
0
|
|
|
0
|
1
|
|
my ( $self, $job ) = @_; |
141
|
0
|
|
|
|
|
|
my $ret; |
142
|
|
|
|
|
|
|
try { |
143
|
0
|
|
|
0
|
|
|
$ret = $job->perform; |
144
|
0
|
|
|
|
|
|
$self->log( sprintf( "done: %s", $job->stringify ) ); |
145
|
|
|
|
|
|
|
} |
146
|
|
|
|
|
|
|
catch { |
147
|
0
|
|
|
0
|
|
|
$self->log( sprintf( "%s failed: %s", $job->stringify, $_ ) ); |
148
|
0
|
|
|
|
|
|
$job->fail($_); |
149
|
0
|
|
|
|
|
|
$self->failed(1); |
150
|
0
|
|
|
|
|
|
}; |
151
|
0
|
|
|
|
|
|
$ret; |
152
|
|
|
|
|
|
|
} |
153
|
|
|
|
|
|
|
|
154
|
|
|
|
|
|
|
sub kill_child { |
155
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
156
|
0
|
0
|
|
|
|
|
return unless $self->child; |
157
|
|
|
|
|
|
|
|
158
|
0
|
0
|
|
|
|
|
if ( kill 0, $self->child ) { |
159
|
0
|
|
|
|
|
|
$self->log( "Killing my child: " . $self->child ); |
160
|
0
|
|
|
|
|
|
kill 9, $self->child; |
161
|
|
|
|
|
|
|
} |
162
|
|
|
|
|
|
|
else { |
163
|
0
|
|
|
|
|
|
$self->log( "Child " . $self->child . " not found, shutting down." ); |
164
|
0
|
|
|
|
|
|
$self->shutdown_please; |
165
|
|
|
|
|
|
|
} |
166
|
|
|
|
|
|
|
} |
167
|
|
|
|
|
|
|
|
168
|
|
|
|
|
|
|
sub add_queue { |
169
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
170
|
0
|
0
|
|
|
|
|
return unless @_; |
171
|
0
|
|
|
|
|
|
$self->queues( [ uniq( @{$self->queues}, @_ ) ] ); |
|
0
|
|
|
|
|
|
|
172
|
|
|
|
|
|
|
} |
173
|
|
|
|
|
|
|
|
174
|
|
|
|
|
|
|
sub del_queue { |
175
|
0
|
|
|
0
|
1
|
|
my ( $self, $queue ) = @_; |
176
|
0
|
0
|
|
|
|
|
return unless $queue; |
177
|
|
|
|
|
|
|
|
178
|
|
|
|
|
|
|
return |
179
|
0
|
|
|
|
|
|
@{$self->queues} |
180
|
|
|
|
|
|
|
- |
181
|
0
|
0
|
|
|
|
|
@{$self->queues( [ grep {$_} map { $_ eq $queue ? undef : $_ } @{$self->queues} ] )}; |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
182
|
|
|
|
|
|
|
} |
183
|
|
|
|
|
|
|
|
184
|
|
|
|
|
|
|
sub reserve { |
185
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
186
|
|
|
|
|
|
|
|
187
|
0
|
0
|
|
|
|
|
if ( $self->cant_poll ) { |
188
|
0
|
|
|
|
|
|
return $self->resque->blpop($self->queues, $self->timeout); |
189
|
|
|
|
|
|
|
} |
190
|
|
|
|
|
|
|
else { |
191
|
0
|
|
|
|
|
|
for my $queue ( @{$self->queues} ) { |
|
0
|
|
|
|
|
|
|
192
|
0
|
0
|
|
|
|
|
if ( my $job = $self->resque->pop($queue) ) { |
193
|
0
|
|
|
|
|
|
return $job; |
194
|
|
|
|
|
|
|
} |
195
|
|
|
|
|
|
|
} |
196
|
|
|
|
|
|
|
} |
197
|
|
|
|
|
|
|
} |
198
|
|
|
|
|
|
|
|
199
|
|
|
|
|
|
|
sub working_on { |
200
|
0
|
|
|
0
|
1
|
|
my ( $self, $job ) = @_; |
201
|
0
|
|
|
|
|
|
$self->redis->set( |
202
|
|
|
|
|
|
|
$self->key( worker => $self->id ), |
203
|
|
|
|
|
|
|
$self->encoder->encode({ |
204
|
|
|
|
|
|
|
queue => $job->queue, |
205
|
|
|
|
|
|
|
run_at => DateTime->now->strftime("%Y/%m/%d %H:%M:%S %Z"), |
206
|
|
|
|
|
|
|
payload => $job->payload |
207
|
|
|
|
|
|
|
}) |
208
|
|
|
|
|
|
|
); |
209
|
0
|
|
|
|
|
|
$job->worker($self); |
210
|
|
|
|
|
|
|
} |
211
|
|
|
|
|
|
|
|
212
|
|
|
|
|
|
|
sub done_working { |
213
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
214
|
0
|
|
|
|
|
|
$self->processed(1); |
215
|
0
|
|
|
|
|
|
$self->redis->del( $self->key( worker => $self->id ) ); |
216
|
|
|
|
|
|
|
} |
217
|
|
|
|
|
|
|
|
218
|
|
|
|
|
|
|
sub started { |
219
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
220
|
0
|
|
|
|
|
|
_parsedate( $self->_started ); |
221
|
|
|
|
|
|
|
} |
222
|
|
|
|
|
|
|
|
223
|
|
|
|
|
|
|
sub _started { |
224
|
0
|
|
|
0
|
|
|
my $self = shift; |
225
|
0
|
|
|
|
|
|
$self->redis->get( $self->key( worker => $self->id => 'started' ) ); |
226
|
|
|
|
|
|
|
} |
227
|
|
|
|
|
|
|
|
228
|
|
|
|
|
|
|
sub _parsedate { |
229
|
0
|
|
|
0
|
|
|
my $str = pop; |
230
|
0
|
|
|
|
|
|
my ( $year, $month, $day, $hour, $minute, $secs, $tz ) = $str =~ m|^(\d+)[-/](\d+)[-/](\d+) (\d+):(\d+):(\d+) (.+)$|; |
231
|
0
|
|
|
|
|
|
DateTime->new( day => $day, month => $month, year => $year, hour => $hour, minute => $minute, second => $secs, time_zone => $tz ); |
232
|
|
|
|
|
|
|
} |
233
|
|
|
|
|
|
|
|
234
|
|
|
|
|
|
|
sub set_started { |
235
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
236
|
0
|
|
0
|
|
|
|
my $date = shift || DateTime->now->strftime('%Y-%m-%d %H:%M:%S %Z'); |
237
|
0
|
|
|
|
|
|
$self->redis->set( $self->key( worker => $self->id => 'started' ), $date ); |
238
|
|
|
|
|
|
|
} |
239
|
|
|
|
|
|
|
|
240
|
|
|
|
|
|
|
sub processing { |
241
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
242
|
0
|
0
|
|
|
|
|
eval { $self->encoder->decode( $self->redis->get( $self->key( worker => $self->id ) ) ) } || {}; |
|
0
|
|
|
|
|
|
|
243
|
|
|
|
|
|
|
} |
244
|
|
|
|
|
|
|
|
245
|
|
|
|
|
|
|
sub processing_map { |
246
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
247
|
0
|
0
|
|
|
|
|
return {} unless @_; |
248
|
|
|
|
|
|
|
|
249
|
0
|
0
|
|
|
|
|
my @ids = map { ref($_) ? $_->id : $_ } @_; |
|
0
|
|
|
|
|
|
|
250
|
0
|
0
|
0
|
|
|
|
my @proc = map { ($_ && $self->encoder->decode($_)) || {} } |
251
|
0
|
|
|
|
|
|
$self->redis->mget( map { $self->key( worker => $_ ) } @ids ); |
|
0
|
|
|
|
|
|
|
252
|
|
|
|
|
|
|
|
253
|
0
|
|
|
|
|
|
my $count = 0; |
254
|
0
|
|
|
|
|
|
return { map { $_ => $proc[$count++] } @ids }; |
|
0
|
|
|
|
|
|
|
255
|
|
|
|
|
|
|
} |
256
|
|
|
|
|
|
|
|
257
|
|
|
|
|
|
|
sub processing_started { |
258
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
259
|
0
|
|
0
|
|
|
|
my $proc = shift || $self->processing; |
260
|
0
|
|
0
|
|
|
|
my $run_at = $proc->{run_at} || return; |
261
|
0
|
|
|
|
|
|
_parsedate($run_at); |
262
|
|
|
|
|
|
|
} |
263
|
|
|
|
|
|
|
|
264
|
|
|
|
|
|
|
sub state { |
265
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
266
|
0
|
0
|
|
|
|
|
$self->redis->exists( $self->key( worker => $self->id ) ) ? 'working' : 'idle'; |
267
|
|
|
|
|
|
|
} |
268
|
|
|
|
|
|
|
|
269
|
|
|
|
|
|
|
sub is_working { |
270
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
271
|
0
|
|
|
|
|
|
$self->state eq 'working'; |
272
|
|
|
|
|
|
|
} |
273
|
|
|
|
|
|
|
|
274
|
|
|
|
|
|
|
sub is_idle { |
275
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
276
|
0
|
|
|
|
|
|
$self->state eq 'idle'; |
277
|
|
|
|
|
|
|
} |
278
|
|
|
|
|
|
|
|
279
|
|
|
|
|
|
|
sub _stringify { |
280
|
0
|
|
|
0
|
|
|
my $self = shift; |
281
|
0
|
|
|
|
|
|
join ':', hostname, $$, join( ',', @{$self->queues} ); |
|
0
|
|
|
|
|
|
|
282
|
|
|
|
|
|
|
} |
283
|
|
|
|
|
|
|
|
284
|
|
|
|
|
|
|
# Is this worker the same as another worker? |
285
|
|
|
|
|
|
|
sub _is_equal { |
286
|
0
|
|
|
0
|
|
|
my ($self, $other) = @_; |
287
|
0
|
|
|
|
|
|
$self->id eq $other->id; |
288
|
|
|
|
|
|
|
} |
289
|
|
|
|
|
|
|
|
290
|
|
|
|
|
|
|
sub procline { |
291
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
292
|
0
|
0
|
|
|
|
|
if ( my $str = shift ) { |
293
|
0
|
|
0
|
|
|
|
$0 = sprintf( "resque-%s: %s", $Resque::VERSION || 'devel', $str ); |
294
|
|
|
|
|
|
|
} |
295
|
0
|
|
|
|
|
|
$0; |
296
|
|
|
|
|
|
|
} |
297
|
|
|
|
|
|
|
|
298
|
|
|
|
|
|
|
sub startup { |
299
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
300
|
0
|
|
|
|
|
|
$0 = 'resque: Starting'; |
301
|
|
|
|
|
|
|
|
302
|
0
|
|
|
|
|
|
$self->register_signal_handlers; |
303
|
0
|
|
|
|
|
|
$self->prune_dead_workers; |
304
|
|
|
|
|
|
|
#run_hook: before_first_fork |
305
|
0
|
|
|
|
|
|
$self->register_worker; |
306
|
|
|
|
|
|
|
} |
307
|
|
|
|
|
|
|
|
308
|
|
|
|
|
|
|
sub register_signal_handlers { |
309
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
310
|
0
|
|
|
|
|
|
weaken $self; |
311
|
0
|
|
|
0
|
|
|
$SIG{TERM} = sub { $self->shutdown_now }; |
|
0
|
|
|
|
|
|
|
312
|
0
|
|
|
0
|
|
|
$SIG{INT} = sub { $self->shutdown_now }; |
|
0
|
|
|
|
|
|
|
313
|
0
|
|
|
0
|
|
|
$SIG{QUIT} = sub { $self->shutdown_please }; |
|
0
|
|
|
|
|
|
|
314
|
0
|
|
|
0
|
|
|
$SIG{USR1} = sub { $self->kill_child }; |
|
0
|
|
|
|
|
|
|
315
|
0
|
|
|
0
|
|
|
$SIG{USR2} = sub { $self->pause }; |
|
0
|
|
|
|
|
|
|
316
|
0
|
|
|
0
|
|
|
$SIG{CONT} = sub { $self->unpause }; |
|
0
|
|
|
|
|
|
|
317
|
|
|
|
|
|
|
} |
318
|
|
|
|
|
|
|
|
319
|
|
|
|
|
|
|
sub prune_dead_workers { |
320
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
321
|
0
|
|
|
|
|
|
my @all_workers = $self->all; |
322
|
0
|
0
|
|
|
|
|
my @known_workers = $self->worker_pids if @all_workers; |
323
|
0
|
|
|
|
|
|
for my $worker (@all_workers) { |
324
|
0
|
|
|
|
|
|
my ($host, $pid, $queues) = split( ':', $worker->id ); |
325
|
0
|
0
|
|
|
|
|
next unless $host eq hostname; |
326
|
0
|
0
|
|
0
|
|
|
next if any { $_ eq $pid } @known_workers; |
|
0
|
|
|
|
|
|
|
327
|
0
|
|
|
|
|
|
$self->log( "Pruning dead worker: $worker" ); |
328
|
0
|
|
|
|
|
|
$worker->unregister_worker; |
329
|
|
|
|
|
|
|
} |
330
|
|
|
|
|
|
|
} |
331
|
|
|
|
|
|
|
|
332
|
|
|
|
|
|
|
sub register_worker { |
333
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
334
|
0
|
|
|
|
|
|
$self->_register_id; |
335
|
0
|
|
|
|
|
|
$self->set_started; |
336
|
|
|
|
|
|
|
} |
337
|
|
|
|
|
|
|
|
338
|
|
|
|
|
|
|
sub _register_id { |
339
|
0
|
|
|
0
|
|
|
my $self = shift; |
340
|
0
|
|
|
|
|
|
$self->redis->sadd( $self->key( 'workers'), $self->id ); |
341
|
|
|
|
|
|
|
} |
342
|
|
|
|
|
|
|
|
343
|
|
|
|
|
|
|
sub refresh_id { |
344
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
345
|
0
|
|
|
|
|
|
my $id = $self->_stringify; |
346
|
0
|
0
|
|
|
|
|
return if $id eq $self->id; # unchanged? |
347
|
0
|
|
0
|
|
|
|
my $start = $self->_started || die "Can't refresh_id before start"; |
348
|
0
|
|
|
|
|
|
my $proc = $self->stat->get("processed:$self"); |
349
|
0
|
|
|
|
|
|
my $fail = $self->stat->get("failed:$self"); |
350
|
0
|
|
|
|
|
|
$self->redis->multi; |
351
|
0
|
|
|
|
|
|
$self->_unregister_id; |
352
|
0
|
|
|
|
|
|
$self->id($id); |
353
|
0
|
|
|
|
|
|
$self->_register_id; |
354
|
0
|
|
|
|
|
|
$self->set_started($start); |
355
|
0
|
|
|
|
|
|
$self->stat->set("processed:$self", $proc); |
356
|
0
|
|
|
|
|
|
$self->stat->set("failed:$self", $fail); |
357
|
0
|
|
|
|
|
|
$self->redis->exec; |
358
|
|
|
|
|
|
|
} |
359
|
|
|
|
|
|
|
|
360
|
|
|
|
|
|
|
sub unregister_worker { |
361
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
362
|
|
|
|
|
|
|
|
363
|
|
|
|
|
|
|
# If we're still processing a job, make sure it gets logged as a |
364
|
|
|
|
|
|
|
# failure. |
365
|
|
|
|
|
|
|
{ |
366
|
0
|
|
|
|
|
|
my $hr = $self->processing; |
|
0
|
|
|
|
|
|
|
367
|
0
|
0
|
|
|
|
|
if ( %$hr ) { |
368
|
|
|
|
|
|
|
# Ensure the proper worker is attached to this job, even if |
369
|
|
|
|
|
|
|
# it's not the precise instance that died. |
370
|
|
|
|
|
|
|
my $job = $self->resque->new_job({ |
371
|
|
|
|
|
|
|
worker => $self, |
372
|
|
|
|
|
|
|
queue => $hr->{queue}, |
373
|
|
|
|
|
|
|
payload => $hr->{payload} |
374
|
0
|
|
|
|
|
|
}); |
375
|
0
|
|
|
|
|
|
$job->fail( 'Dirty exit' ); |
376
|
|
|
|
|
|
|
} |
377
|
|
|
|
|
|
|
} |
378
|
|
|
|
|
|
|
|
379
|
0
|
|
|
|
|
|
$self->_unregister_id; |
380
|
|
|
|
|
|
|
|
381
|
|
|
|
|
|
|
} |
382
|
|
|
|
|
|
|
|
383
|
|
|
|
|
|
|
# clear worker registry keys |
384
|
|
|
|
|
|
|
sub _unregister_id { |
385
|
0
|
|
|
0
|
|
|
my $self = shift; |
386
|
0
|
|
|
|
|
|
$self->redis->srem( $self->key('workers'), $self->id ); |
387
|
0
|
|
|
|
|
|
$self->redis->del( $self->key( worker => $self->id ) ); |
388
|
0
|
|
|
|
|
|
$self->redis->del( $self->key( worker => $self->id => 'started' ) ); |
389
|
0
|
|
|
|
|
|
$self->stat->clear("processed:$self"); |
390
|
0
|
|
|
|
|
|
$self->stat->clear("failed:$self"); |
391
|
|
|
|
|
|
|
} |
392
|
|
|
|
|
|
|
|
393
|
|
|
|
|
|
|
sub worker_pids { |
394
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
395
|
0
|
|
|
|
|
|
my @pids; |
396
|
|
|
|
|
|
|
|
397
|
0
|
0
|
|
|
|
|
if($^O=~m/^(cygwin|MSWin32)$/i) { |
398
|
|
|
|
|
|
|
# $0 assignment does not work under Win32, so we'll return a list of perl PIDs instead |
399
|
0
|
0
|
|
|
|
|
@pids = map { s/^PID:\s*// && $_ } |
400
|
0
|
|
|
|
|
|
grep { /^PID/ } |
|
0
|
|
|
|
|
|
|
401
|
|
|
|
|
|
|
split( /[\r\n]/ , `tasklist /FI "IMAGENAME eq perl.exe" /FO list` ); |
402
|
|
|
|
|
|
|
} else { |
403
|
0
|
0
|
|
|
|
|
my $ps_command = $^O eq 'solaris' |
404
|
|
|
|
|
|
|
? 'ps -A -o pid,args' |
405
|
|
|
|
|
|
|
: 'ps -A -o pid,command'; |
406
|
|
|
|
|
|
|
|
407
|
0
|
|
|
|
|
|
for ( split "\n", `$ps_command | grep resque | grep -v resque-web | grep -v grep` ) { |
408
|
0
|
0
|
|
|
|
|
if ( m/^\s*(\d+)\s(.+)$/ ) { |
409
|
0
|
|
|
|
|
|
push @pids, $1; |
410
|
|
|
|
|
|
|
} |
411
|
|
|
|
|
|
|
} |
412
|
|
|
|
|
|
|
} |
413
|
0
|
0
|
|
|
|
|
return wantarray ? @pids : \@pids; |
414
|
|
|
|
|
|
|
} |
415
|
|
|
|
|
|
|
|
416
|
|
|
|
|
|
|
#TODO: add logger() attr to containg a logger object and if set, use that instead of print! |
417
|
|
|
|
|
|
|
sub log { |
418
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
419
|
0
|
0
|
|
|
|
|
return unless $self->verbose; |
420
|
0
|
|
|
|
|
|
print STDERR shift, "\n"; |
421
|
|
|
|
|
|
|
} |
422
|
|
|
|
|
|
|
|
423
|
|
|
|
|
|
|
sub processed { |
424
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
425
|
0
|
0
|
|
|
|
|
if (shift) { |
426
|
0
|
|
|
|
|
|
$self->stat->incr('processed'); |
427
|
0
|
|
|
|
|
|
$self->stat->incr("processed:$self"); |
428
|
|
|
|
|
|
|
} |
429
|
0
|
|
|
|
|
|
$self->stat->get("processed:$self"); |
430
|
|
|
|
|
|
|
} |
431
|
|
|
|
|
|
|
|
432
|
|
|
|
|
|
|
sub failed { |
433
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
434
|
0
|
0
|
|
|
|
|
if (shift) { |
435
|
0
|
|
|
|
|
|
$self->stat->incr('failed'); |
436
|
0
|
|
|
|
|
|
$self->stat->incr("failed:$self"); |
437
|
|
|
|
|
|
|
} |
438
|
0
|
|
|
|
|
|
$self->stat->get("failed:$self"); |
439
|
|
|
|
|
|
|
} |
440
|
|
|
|
|
|
|
|
441
|
|
|
|
|
|
|
sub find { |
442
|
0
|
|
|
0
|
1
|
|
my ( $self, $worker_id ) = @_; |
443
|
0
|
0
|
|
|
|
|
if ( $self->exists( $worker_id ) ) { |
444
|
0
|
|
|
|
|
|
return $self->_from_id( $worker_id ); |
445
|
|
|
|
|
|
|
} |
446
|
|
|
|
|
|
|
} |
447
|
|
|
|
|
|
|
|
448
|
|
|
|
|
|
|
sub _from_id { |
449
|
0
|
|
|
0
|
|
|
my ( $self, $worker_id ) = @_; |
450
|
0
|
|
|
|
|
|
my @queues = split ',', (split( ':', $worker_id))[-1]; |
451
|
0
|
|
|
|
|
|
__PACKAGE__->new( |
452
|
|
|
|
|
|
|
resque => $self->resque, |
453
|
|
|
|
|
|
|
queues => \@queues, |
454
|
|
|
|
|
|
|
id => $worker_id |
455
|
|
|
|
|
|
|
); |
456
|
|
|
|
|
|
|
} |
457
|
|
|
|
|
|
|
|
458
|
|
|
|
|
|
|
sub all { |
459
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
460
|
0
|
|
|
|
|
|
my @w = map { $self->_from_id($_) } $self->redis->smembers( $self->key('workers') ); |
|
0
|
|
|
|
|
|
|
461
|
0
|
0
|
|
|
|
|
return wantarray ? @w : \@w; |
462
|
|
|
|
|
|
|
} |
463
|
|
|
|
|
|
|
|
464
|
|
|
|
|
|
|
sub exists { |
465
|
0
|
|
|
0
|
1
|
|
my ($self, $worker_id) = @_; |
466
|
0
|
|
|
|
|
|
$self->redis->sismember( $self->key( 'workers' ), $worker_id ); |
467
|
|
|
|
|
|
|
} |
468
|
|
|
|
|
|
|
|
469
|
|
|
|
|
|
|
__PACKAGE__->meta->make_immutable(); |
470
|
|
|
|
|
|
|
|
471
|
|
|
|
|
|
|
__END__ |
472
|
|
|
|
|
|
|
|
473
|
|
|
|
|
|
|
=pod |
474
|
|
|
|
|
|
|
|
475
|
|
|
|
|
|
|
=encoding UTF-8 |
476
|
|
|
|
|
|
|
|
477
|
|
|
|
|
|
|
=head1 NAME |
478
|
|
|
|
|
|
|
|
479
|
|
|
|
|
|
|
Resque::Worker - Does the hard work of babysitting Resque::Job's |
480
|
|
|
|
|
|
|
|
481
|
|
|
|
|
|
|
=head1 VERSION |
482
|
|
|
|
|
|
|
|
483
|
|
|
|
|
|
|
version 0.42 |
484
|
|
|
|
|
|
|
|
485
|
|
|
|
|
|
|
=head1 ATTRIBUTES |
486
|
|
|
|
|
|
|
|
487
|
|
|
|
|
|
|
=head2 resque |
488
|
|
|
|
|
|
|
|
489
|
|
|
|
|
|
|
The L<Resque> object running this worker. |
490
|
|
|
|
|
|
|
|
491
|
|
|
|
|
|
|
=head2 queues |
492
|
|
|
|
|
|
|
|
493
|
|
|
|
|
|
|
Queues this worker should fetch jobs from. |
494
|
|
|
|
|
|
|
|
495
|
|
|
|
|
|
|
=head2 stat |
496
|
|
|
|
|
|
|
|
497
|
|
|
|
|
|
|
See L<Resque::Stat>. |
498
|
|
|
|
|
|
|
|
499
|
|
|
|
|
|
|
=head2 id |
500
|
|
|
|
|
|
|
|
501
|
|
|
|
|
|
|
Unique identifier for the running worker. |
502
|
|
|
|
|
|
|
Used to set process status all around. |
503
|
|
|
|
|
|
|
|
504
|
|
|
|
|
|
|
The worker stringify to this attribute. |
505
|
|
|
|
|
|
|
|
506
|
|
|
|
|
|
|
=head2 verbose |
507
|
|
|
|
|
|
|
|
508
|
|
|
|
|
|
|
Set to a true value to make this worker report what's doing while |
509
|
|
|
|
|
|
|
on work(). |
510
|
|
|
|
|
|
|
|
511
|
|
|
|
|
|
|
=head2 cant_fork |
512
|
|
|
|
|
|
|
|
513
|
|
|
|
|
|
|
Set it to a true value to stop this worker from fork jobs. |
514
|
|
|
|
|
|
|
|
515
|
|
|
|
|
|
|
By default, the worker will fork the job out and control the |
516
|
|
|
|
|
|
|
children process. This make the worker more resilient to |
517
|
|
|
|
|
|
|
memory leaks. |
518
|
|
|
|
|
|
|
|
519
|
|
|
|
|
|
|
=head2 cant_poll |
520
|
|
|
|
|
|
|
|
521
|
|
|
|
|
|
|
Set it to a true value to stop this worker from polling for jobs and |
522
|
|
|
|
|
|
|
use experimental blocking pop instead. |
523
|
|
|
|
|
|
|
|
524
|
|
|
|
|
|
|
See timeout(). |
525
|
|
|
|
|
|
|
|
526
|
|
|
|
|
|
|
=head2 child |
527
|
|
|
|
|
|
|
|
528
|
|
|
|
|
|
|
PID of current running child. |
529
|
|
|
|
|
|
|
|
530
|
|
|
|
|
|
|
=head2 shutdown |
531
|
|
|
|
|
|
|
|
532
|
|
|
|
|
|
|
When true, this worker will shutdown after finishing current job. |
533
|
|
|
|
|
|
|
|
534
|
|
|
|
|
|
|
=head2 paused |
535
|
|
|
|
|
|
|
|
536
|
|
|
|
|
|
|
When true, this worker won't proccess more jobs till false. |
537
|
|
|
|
|
|
|
|
538
|
|
|
|
|
|
|
=head2 interval |
539
|
|
|
|
|
|
|
|
540
|
|
|
|
|
|
|
Float representing the polling frequency. The default is 5 seconds, but for a semi-active app you may want to use a smaller value. |
541
|
|
|
|
|
|
|
|
542
|
|
|
|
|
|
|
=head2 timeout |
543
|
|
|
|
|
|
|
|
544
|
|
|
|
|
|
|
Integer representing the blocking timeout. The default is not to block but to poll queues (see inverval), |
545
|
|
|
|
|
|
|
so this attribute will be completely ignored unless dont_poll(). |
546
|
|
|
|
|
|
|
The default is 30 seconds. Setting it to 0 will make reserve() to block until some job is assigned to this |
547
|
|
|
|
|
|
|
workers and will prevent autoconfig() to be called until it happen. |
548
|
|
|
|
|
|
|
|
549
|
|
|
|
|
|
|
=head2 autoconfig |
550
|
|
|
|
|
|
|
|
551
|
|
|
|
|
|
|
An optional callback to be called periodically while work()'ing. It's main purpose is to |
552
|
|
|
|
|
|
|
allow running auto-config code as this function will receive this worker as it's only argument |
553
|
|
|
|
|
|
|
and will be called before reserving the first job. |
554
|
|
|
|
|
|
|
|
555
|
|
|
|
|
|
|
When this callback is provided, it will be called on every wheel iteration, so it's recommended |
556
|
|
|
|
|
|
|
to keep track of time to prevent running slow re-configuration code every time. |
557
|
|
|
|
|
|
|
|
558
|
|
|
|
|
|
|
=head1 METHODS |
559
|
|
|
|
|
|
|
|
560
|
|
|
|
|
|
|
=head2 pause |
561
|
|
|
|
|
|
|
|
562
|
|
|
|
|
|
|
Stop processing jobs after the current one has completed (if we're |
563
|
|
|
|
|
|
|
currently running one). |
564
|
|
|
|
|
|
|
|
565
|
|
|
|
|
|
|
$worker->pause(); |
566
|
|
|
|
|
|
|
|
567
|
|
|
|
|
|
|
=head2 unpause |
568
|
|
|
|
|
|
|
|
569
|
|
|
|
|
|
|
Start processing jobs again after a pause |
570
|
|
|
|
|
|
|
|
571
|
|
|
|
|
|
|
$worker->unpause(); |
572
|
|
|
|
|
|
|
|
573
|
|
|
|
|
|
|
=head2 shutdown_please |
574
|
|
|
|
|
|
|
|
575
|
|
|
|
|
|
|
Schedule this worker for shutdown. Will finish processing the |
576
|
|
|
|
|
|
|
current job. |
577
|
|
|
|
|
|
|
|
578
|
|
|
|
|
|
|
$worker->shutdown_please(); |
579
|
|
|
|
|
|
|
|
580
|
|
|
|
|
|
|
=head2 shutdown_now |
581
|
|
|
|
|
|
|
|
582
|
|
|
|
|
|
|
Kill the child and shutdown immediately. |
583
|
|
|
|
|
|
|
|
584
|
|
|
|
|
|
|
$worker->shutdown_now(); |
585
|
|
|
|
|
|
|
|
586
|
|
|
|
|
|
|
=head2 work |
587
|
|
|
|
|
|
|
|
588
|
|
|
|
|
|
|
Calling this method will make this worker start pulling & running jobs |
589
|
|
|
|
|
|
|
from queues(). |
590
|
|
|
|
|
|
|
|
591
|
|
|
|
|
|
|
This is the main wheel and will run while shutdown() is false. |
592
|
|
|
|
|
|
|
|
593
|
|
|
|
|
|
|
$worker->work(); |
594
|
|
|
|
|
|
|
|
595
|
|
|
|
|
|
|
=head2 work_tick |
596
|
|
|
|
|
|
|
|
597
|
|
|
|
|
|
|
Perform() one job and wait till it finish. |
598
|
|
|
|
|
|
|
|
599
|
|
|
|
|
|
|
$worker->work_tick(); |
600
|
|
|
|
|
|
|
|
601
|
|
|
|
|
|
|
=head2 perform |
602
|
|
|
|
|
|
|
|
603
|
|
|
|
|
|
|
Call perform() on the given Resque::Job capturing and reporting |
604
|
|
|
|
|
|
|
any exception. |
605
|
|
|
|
|
|
|
|
606
|
|
|
|
|
|
|
$worker->perform( $job ); |
607
|
|
|
|
|
|
|
|
608
|
|
|
|
|
|
|
=head2 kill_child |
609
|
|
|
|
|
|
|
|
610
|
|
|
|
|
|
|
Kills the forked child immediately, without remorse. The job it |
611
|
|
|
|
|
|
|
is processing will not be completed. |
612
|
|
|
|
|
|
|
|
613
|
|
|
|
|
|
|
$worker->kill_child(); |
614
|
|
|
|
|
|
|
|
615
|
|
|
|
|
|
|
=head2 add_queue |
616
|
|
|
|
|
|
|
|
617
|
|
|
|
|
|
|
Add a queue this worker should listen to. |
618
|
|
|
|
|
|
|
|
619
|
|
|
|
|
|
|
$worker->add_queue( "queuename" ); |
620
|
|
|
|
|
|
|
|
621
|
|
|
|
|
|
|
=head2 del_queue |
622
|
|
|
|
|
|
|
|
623
|
|
|
|
|
|
|
Stop listening to the given queue. |
624
|
|
|
|
|
|
|
|
625
|
|
|
|
|
|
|
$worker->del_queue( "queuename" ); |
626
|
|
|
|
|
|
|
|
627
|
|
|
|
|
|
|
=head2 reserve |
628
|
|
|
|
|
|
|
|
629
|
|
|
|
|
|
|
Pull the next job to be precessed. |
630
|
|
|
|
|
|
|
|
631
|
|
|
|
|
|
|
my $job = $worker->reserve(); |
632
|
|
|
|
|
|
|
|
633
|
|
|
|
|
|
|
=head2 working_on |
634
|
|
|
|
|
|
|
|
635
|
|
|
|
|
|
|
Set worker and working status on the given L<Resque::Job>. |
636
|
|
|
|
|
|
|
|
637
|
|
|
|
|
|
|
$job->working_on( $resque_job ); |
638
|
|
|
|
|
|
|
|
639
|
|
|
|
|
|
|
=head2 done_working |
640
|
|
|
|
|
|
|
|
641
|
|
|
|
|
|
|
Inform the backend this worker has done its current job |
642
|
|
|
|
|
|
|
|
643
|
|
|
|
|
|
|
$job->done_working(); |
644
|
|
|
|
|
|
|
|
645
|
|
|
|
|
|
|
=head2 started |
646
|
|
|
|
|
|
|
|
647
|
|
|
|
|
|
|
What time did this worker start? |
648
|
|
|
|
|
|
|
Returns an instance of DateTime. |
649
|
|
|
|
|
|
|
|
650
|
|
|
|
|
|
|
my $datetime = $worker->started(); |
651
|
|
|
|
|
|
|
|
652
|
|
|
|
|
|
|
=head2 set_started |
653
|
|
|
|
|
|
|
|
654
|
|
|
|
|
|
|
Tell Redis we've started |
655
|
|
|
|
|
|
|
|
656
|
|
|
|
|
|
|
$worker->set_started(); |
657
|
|
|
|
|
|
|
|
658
|
|
|
|
|
|
|
=head2 processing |
659
|
|
|
|
|
|
|
|
660
|
|
|
|
|
|
|
Returns a hash explaining the Job we're currently processing, if any. |
661
|
|
|
|
|
|
|
|
662
|
|
|
|
|
|
|
$worker->processing(); |
663
|
|
|
|
|
|
|
|
664
|
|
|
|
|
|
|
=head2 processing_map |
665
|
|
|
|
|
|
|
|
666
|
|
|
|
|
|
|
Returns a hashref of processing info for a given worker or worker ID list |
667
|
|
|
|
|
|
|
|
668
|
|
|
|
|
|
|
$worker->processing( $worker1, $worker2, $worker3->id ); |
669
|
|
|
|
|
|
|
|
670
|
|
|
|
|
|
|
=head2 processing_started |
671
|
|
|
|
|
|
|
|
672
|
|
|
|
|
|
|
What time did this worker started to work on current job? |
673
|
|
|
|
|
|
|
Returns an instance of DateTime or undef when it's not working. |
674
|
|
|
|
|
|
|
|
675
|
|
|
|
|
|
|
my $datetime = $worker->processing_started(); |
676
|
|
|
|
|
|
|
|
677
|
|
|
|
|
|
|
=head2 state |
678
|
|
|
|
|
|
|
|
679
|
|
|
|
|
|
|
Returns a string representing the current worker state, |
680
|
|
|
|
|
|
|
which can be either working or idle |
681
|
|
|
|
|
|
|
|
682
|
|
|
|
|
|
|
my $state = $worker->state(); |
683
|
|
|
|
|
|
|
|
684
|
|
|
|
|
|
|
=head2 is_working |
685
|
|
|
|
|
|
|
|
686
|
|
|
|
|
|
|
Boolean - true if working, false if not |
687
|
|
|
|
|
|
|
|
688
|
|
|
|
|
|
|
my $working = $worker->is_working(); |
689
|
|
|
|
|
|
|
|
690
|
|
|
|
|
|
|
=head2 is_idle |
691
|
|
|
|
|
|
|
|
692
|
|
|
|
|
|
|
Boolean - true if idle, false if not |
693
|
|
|
|
|
|
|
|
694
|
|
|
|
|
|
|
my $idle = $worker->is_idle(); |
695
|
|
|
|
|
|
|
|
696
|
|
|
|
|
|
|
=head2 procline |
697
|
|
|
|
|
|
|
|
698
|
|
|
|
|
|
|
Given a string, sets the procline ($0) and logs. |
699
|
|
|
|
|
|
|
Procline is always in the format of: |
700
|
|
|
|
|
|
|
resque-VERSION: STRING |
701
|
|
|
|
|
|
|
|
702
|
|
|
|
|
|
|
$worker->procline( "string" ); |
703
|
|
|
|
|
|
|
|
704
|
|
|
|
|
|
|
=head2 startup |
705
|
|
|
|
|
|
|
|
706
|
|
|
|
|
|
|
Helper method called by work() to: |
707
|
|
|
|
|
|
|
|
708
|
|
|
|
|
|
|
1. register_signal_handlers() |
709
|
|
|
|
|
|
|
2. prune_dead_workers(); |
710
|
|
|
|
|
|
|
3. register_worker(); |
711
|
|
|
|
|
|
|
|
712
|
|
|
|
|
|
|
$worker->startup(); |
713
|
|
|
|
|
|
|
|
714
|
|
|
|
|
|
|
=head2 register_signal_handlers |
715
|
|
|
|
|
|
|
|
716
|
|
|
|
|
|
|
Registers the various signal handlers a worker responds to. |
717
|
|
|
|
|
|
|
|
718
|
|
|
|
|
|
|
TERM: Shutdown immediately, stop processing jobs. |
719
|
|
|
|
|
|
|
INT: Shutdown immediately, stop processing jobs. |
720
|
|
|
|
|
|
|
QUIT: Shutdown after the current job has finished processing. |
721
|
|
|
|
|
|
|
USR1: Kill the forked child immediately, continue processing jobs. |
722
|
|
|
|
|
|
|
USR2: Don't process any new jobs |
723
|
|
|
|
|
|
|
CONT: Start processing jobs again after a USR2 |
724
|
|
|
|
|
|
|
|
725
|
|
|
|
|
|
|
$worker->register_signal_handlers(); |
726
|
|
|
|
|
|
|
|
727
|
|
|
|
|
|
|
=head2 prune_dead_workers |
728
|
|
|
|
|
|
|
|
729
|
|
|
|
|
|
|
Looks for any workers which should be running on this server |
730
|
|
|
|
|
|
|
and, if they're not, removes them from Redis. |
731
|
|
|
|
|
|
|
|
732
|
|
|
|
|
|
|
This is a form of garbage collection. If a server is killed by a |
733
|
|
|
|
|
|
|
hard shutdown, power failure, or something else beyond our |
734
|
|
|
|
|
|
|
control, the Resque workers will not die gracefully and therefore |
735
|
|
|
|
|
|
|
will leave stale state information in Redis. |
736
|
|
|
|
|
|
|
|
737
|
|
|
|
|
|
|
By checking the current Redis state against the actual |
738
|
|
|
|
|
|
|
environment, we can determine if Redis is old and clean it up a bit. |
739
|
|
|
|
|
|
|
|
740
|
|
|
|
|
|
|
$worker->prune_dead_worker(); |
741
|
|
|
|
|
|
|
|
742
|
|
|
|
|
|
|
=head2 register_worker |
743
|
|
|
|
|
|
|
|
744
|
|
|
|
|
|
|
Registers ourself as a worker. Useful when entering the worker |
745
|
|
|
|
|
|
|
lifecycle on startup. |
746
|
|
|
|
|
|
|
|
747
|
|
|
|
|
|
|
$worker->register_worker(); |
748
|
|
|
|
|
|
|
|
749
|
|
|
|
|
|
|
=head2 refresh_id |
750
|
|
|
|
|
|
|
|
751
|
|
|
|
|
|
|
Do the dirty work after changing the queues on an already |
752
|
|
|
|
|
|
|
register_worker(). |
753
|
|
|
|
|
|
|
|
754
|
|
|
|
|
|
|
This will update backend on a single transactionto reflex |
755
|
|
|
|
|
|
|
current queues by changing this worker ID which need to |
756
|
|
|
|
|
|
|
unregister_worker() and register_worker() again while |
757
|
|
|
|
|
|
|
keeping stat() and started(). |
758
|
|
|
|
|
|
|
|
759
|
|
|
|
|
|
|
Only useful when you dinamically update queues and want to |
760
|
|
|
|
|
|
|
watch it on the web interface. |
761
|
|
|
|
|
|
|
|
762
|
|
|
|
|
|
|
=head2 unregister_worker |
763
|
|
|
|
|
|
|
|
764
|
|
|
|
|
|
|
Unregisters ourself as a worker. Useful when shutting down. |
765
|
|
|
|
|
|
|
|
766
|
|
|
|
|
|
|
$worker->unregister_worker(); |
767
|
|
|
|
|
|
|
|
768
|
|
|
|
|
|
|
=head2 worker_pids |
769
|
|
|
|
|
|
|
|
770
|
|
|
|
|
|
|
Returns an Array of string pids of all the other workers on this |
771
|
|
|
|
|
|
|
machine. Useful when pruning dead workers on startup. |
772
|
|
|
|
|
|
|
|
773
|
|
|
|
|
|
|
my @pids = $worker->worker_pids(); |
774
|
|
|
|
|
|
|
|
775
|
|
|
|
|
|
|
=head2 log |
776
|
|
|
|
|
|
|
|
777
|
|
|
|
|
|
|
If verbose() is true, this will print to STDERR. |
778
|
|
|
|
|
|
|
|
779
|
|
|
|
|
|
|
$worker->log( 'message here' ); |
780
|
|
|
|
|
|
|
|
781
|
|
|
|
|
|
|
=head2 processed |
782
|
|
|
|
|
|
|
|
783
|
|
|
|
|
|
|
Retrieve from L<Resque::Stat> many jobs has done this worker. |
784
|
|
|
|
|
|
|
Pass a true argument to increment by one before retrieval. |
785
|
|
|
|
|
|
|
|
786
|
|
|
|
|
|
|
my $jobs_run = $worker->processed( $boolean ); |
787
|
|
|
|
|
|
|
|
788
|
|
|
|
|
|
|
=head2 failed |
789
|
|
|
|
|
|
|
|
790
|
|
|
|
|
|
|
How many failed jobs has this worker seen. |
791
|
|
|
|
|
|
|
Pass a true argument to increment by one before retrieval. |
792
|
|
|
|
|
|
|
|
793
|
|
|
|
|
|
|
my $jobs_run = $worker->failed( $boolean ); |
794
|
|
|
|
|
|
|
|
795
|
|
|
|
|
|
|
=head2 find |
796
|
|
|
|
|
|
|
|
797
|
|
|
|
|
|
|
Returns a single worker object. Accepts a string id. |
798
|
|
|
|
|
|
|
|
799
|
|
|
|
|
|
|
my $worker_object = $worker->find( $worker_id ); |
800
|
|
|
|
|
|
|
|
801
|
|
|
|
|
|
|
=head2 all |
802
|
|
|
|
|
|
|
|
803
|
|
|
|
|
|
|
Returns a list of all worker registered on the backend, or an |
804
|
|
|
|
|
|
|
arrayref in scalar context; |
805
|
|
|
|
|
|
|
|
806
|
|
|
|
|
|
|
my @workers = $worker->all(); |
807
|
|
|
|
|
|
|
|
808
|
|
|
|
|
|
|
=head2 exists |
809
|
|
|
|
|
|
|
|
810
|
|
|
|
|
|
|
Returns true if the given worker id exists on redis() backend. |
811
|
|
|
|
|
|
|
|
812
|
|
|
|
|
|
|
my $exists = $worker->exists( $worker_id ); |
813
|
|
|
|
|
|
|
|
814
|
|
|
|
|
|
|
=head1 AUTHOR |
815
|
|
|
|
|
|
|
|
816
|
|
|
|
|
|
|
Diego Kuperman <diego@freekeylabs.com> |
817
|
|
|
|
|
|
|
|
818
|
|
|
|
|
|
|
=head1 COPYRIGHT AND LICENSE |
819
|
|
|
|
|
|
|
|
820
|
|
|
|
|
|
|
This software is copyright (c) 2021 by Diego Kuperman. |
821
|
|
|
|
|
|
|
|
822
|
|
|
|
|
|
|
This is free software; you can redistribute it and/or modify it under |
823
|
|
|
|
|
|
|
the same terms as the Perl 5 programming language system itself. |
824
|
|
|
|
|
|
|
|
825
|
|
|
|
|
|
|
=cut |