| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
|
|
2
|
|
|
|
|
|
|
package Minion::Worker::Role::Kevin; |
|
3
|
|
|
|
|
|
|
$Minion::Worker::Role::Kevin::VERSION = '0.7.1'; |
|
4
|
|
|
|
|
|
|
# ABSTRACT: Alternative Minion worker |
|
5
|
1
|
|
|
1
|
|
713
|
use Mojo::Base -role; |
|
|
1
|
|
|
|
|
1
|
|
|
|
1
|
|
|
|
|
5
|
|
|
6
|
|
|
|
|
|
|
|
|
7
|
1
|
|
|
1
|
|
688
|
use Mojo::Log; |
|
|
1
|
|
|
|
|
1188
|
|
|
|
1
|
|
|
|
|
5
|
|
|
8
|
1
|
|
|
1
|
|
29
|
use Mojo::Util 'steady_time'; |
|
|
1
|
|
|
|
|
2
|
|
|
|
1
|
|
|
|
|
42
|
|
|
9
|
|
|
|
|
|
|
|
|
10
|
1
|
|
50
|
1
|
|
4
|
use constant TRACE => $ENV{KEVIN_WORKER_TRACE} || 0; |
|
|
1
|
|
|
|
|
2
|
|
|
|
1
|
|
|
|
|
1057
|
|
|
11
|
|
|
|
|
|
|
|
|
12
|
|
|
|
|
|
|
has 'defaults'; |
|
13
|
|
|
|
|
|
|
|
|
14
|
|
|
|
|
|
|
has 'log' => sub { Mojo::Log->new }; |
|
15
|
|
|
|
|
|
|
|
|
16
|
|
|
|
|
|
|
sub _defaults { |
|
17
|
|
|
|
|
|
|
return { |
|
18
|
0
|
|
|
0
|
|
|
command_interval => 10, |
|
19
|
|
|
|
|
|
|
dequeue_timeout => 5, |
|
20
|
|
|
|
|
|
|
heartbeat_interval => 300, |
|
21
|
|
|
|
|
|
|
jobs => 4, |
|
22
|
|
|
|
|
|
|
queues => ['default'], |
|
23
|
|
|
|
|
|
|
repair_interval => 21600, |
|
24
|
|
|
|
|
|
|
}; |
|
25
|
|
|
|
|
|
|
} |
|
26
|
|
|
|
|
|
|
|
|
27
|
|
|
|
|
|
|
sub _run_defaults { |
|
28
|
0
|
|
|
0
|
|
|
my $self = shift; |
|
29
|
0
|
|
0
|
|
|
|
$self->{_run_defaults} //= {%{$self->_defaults}, %{$self->{defaults} // {}}}; |
|
|
0
|
|
0
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
30
|
|
|
|
|
|
|
} |
|
31
|
|
|
|
|
|
|
|
|
32
|
|
|
|
|
|
|
sub run { |
|
33
|
0
|
|
|
0
|
0
|
|
my ($self, @args) = @_; |
|
34
|
|
|
|
|
|
|
|
|
35
|
0
|
|
|
|
|
|
my $status = $self->status; |
|
36
|
0
|
|
|
|
|
|
my $defaults = $self->_run_defaults; |
|
37
|
|
|
|
|
|
|
|
|
38
|
0
|
|
0
|
|
|
|
$status->{$_} //= $defaults->{$_} for keys %$defaults; |
|
39
|
0
|
|
0
|
|
|
|
$status->{performed} //= 0; |
|
40
|
|
|
|
|
|
|
|
|
41
|
0
|
|
|
|
|
|
my $now = steady_time; |
|
42
|
0
|
0
|
|
|
|
|
$self->{next_heartbeat} = $now if $status->{heartbeat_interval}; |
|
43
|
0
|
0
|
|
|
|
|
$self->{next_command} = $now if $status->{command_interval}; |
|
44
|
0
|
0
|
|
|
|
|
if ($status->{repair_interval}) { |
|
45
|
|
|
|
|
|
|
|
|
46
|
|
|
|
|
|
|
# Randomize to avoid congestion |
|
47
|
0
|
|
|
|
|
|
$status->{repair_interval} -= int rand $status->{repair_interval} / 2; |
|
48
|
|
|
|
|
|
|
|
|
49
|
0
|
|
|
|
|
|
$self->{next_repair} = $now; |
|
50
|
|
|
|
|
|
|
$self->{next_repair} += $status->{repair_interval} |
|
51
|
0
|
0
|
|
|
|
|
if delete $status->{fast_start}; |
|
52
|
|
|
|
|
|
|
} |
|
53
|
|
|
|
|
|
|
|
|
54
|
0
|
|
|
|
|
|
$self->{pid} = $$; |
|
55
|
0
|
|
|
0
|
|
|
local $SIG{CHLD} = sub { }; |
|
56
|
0
|
|
|
0
|
|
|
local $SIG{INT} = local $SIG{TERM} = sub { $self->_term(1) }; |
|
|
0
|
|
|
|
|
|
|
|
57
|
0
|
|
|
0
|
|
|
local $SIG{QUIT} = sub { $self->_term }; |
|
|
0
|
|
|
|
|
|
|
|
58
|
|
|
|
|
|
|
|
|
59
|
|
|
|
|
|
|
# Remote control commands need to validate arguments carefully |
|
60
|
0
|
|
|
|
|
|
my $commands = $self->commands; |
|
61
|
|
|
|
|
|
|
local $commands->{jobs} |
|
62
|
0
|
0
|
0
|
0
|
|
|
= sub { $status->{jobs} = $_[1] if ($_[1] // '') =~ /^\d+$/ }; |
|
|
0
|
|
|
|
|
|
|
|
63
|
|
|
|
|
|
|
local $commands->{stop} |
|
64
|
0
|
0
|
0
|
0
|
|
|
= sub { $self->{jobs}{$_[1]}->stop if $self->{jobs}{$_[1] // ''} }; |
|
|
0
|
|
|
|
|
|
|
|
65
|
|
|
|
|
|
|
|
|
66
|
|
|
|
|
|
|
# Log fatal errors |
|
67
|
0
|
|
|
|
|
|
my $log = $self->log; |
|
68
|
0
|
|
|
|
|
|
$log->info("Worker $$ started"); |
|
69
|
0
|
0
|
|
|
|
|
eval { $self->_work until $self->{finished}; 1 } |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
70
|
|
|
|
|
|
|
or $log->fatal("Worker error: $@"); |
|
71
|
0
|
|
|
|
|
|
$self->unregister; |
|
72
|
0
|
|
|
|
|
|
$log->info("Worker $$ stopped"); |
|
73
|
|
|
|
|
|
|
} |
|
74
|
|
|
|
|
|
|
|
|
75
|
|
|
|
|
|
|
sub _term { |
|
76
|
0
|
|
|
0
|
|
|
my ($self, $graceful) = @_; |
|
77
|
0
|
0
|
|
|
|
|
return unless $self->{pid} == $$; |
|
78
|
0
|
|
|
|
|
|
$self->{stopping}++; |
|
79
|
0
|
0
|
|
|
|
|
$self->{graceful} = $graceful or kill 'KILL', keys %{$self->{jobs}}; |
|
|
0
|
|
|
|
|
|
|
|
80
|
|
|
|
|
|
|
} |
|
81
|
|
|
|
|
|
|
|
|
82
|
|
|
|
|
|
|
sub _ping { |
|
83
|
0
|
|
|
0
|
|
|
shift->register; |
|
84
|
|
|
|
|
|
|
} |
|
85
|
|
|
|
|
|
|
|
|
86
|
|
|
|
|
|
|
sub _work { |
|
87
|
0
|
|
|
0
|
|
|
my $self = shift; |
|
88
|
|
|
|
|
|
|
|
|
89
|
0
|
|
|
|
|
|
my $log = $self->log; |
|
90
|
0
|
|
|
|
|
|
my $status = $self->status; |
|
91
|
|
|
|
|
|
|
|
|
92
|
0
|
0
|
0
|
|
|
|
if ($self->{stopping} && !$self->{quit}++) { |
|
93
|
|
|
|
|
|
|
$log->info("Stopping worker $$ " |
|
94
|
0
|
0
|
|
|
|
|
. ($self->{graceful} ? 'gracefully' : 'immediately')); |
|
95
|
|
|
|
|
|
|
|
|
96
|
|
|
|
|
|
|
# Skip hearbeats, remote command and repairs |
|
97
|
0
|
|
|
|
|
|
delete @{$status}{qw(heartbeat_interval command_interval )} |
|
98
|
0
|
0
|
|
|
|
|
unless $self->{graceful}; |
|
99
|
0
|
|
|
|
|
|
delete $status->{repair_interval}; |
|
100
|
|
|
|
|
|
|
} |
|
101
|
|
|
|
|
|
|
|
|
102
|
|
|
|
|
|
|
# Send heartbeats in regular intervals |
|
103
|
0
|
0
|
0
|
|
|
|
if ($status->{heartbeat_interval} && $self->{next_heartbeat} < steady_time) { |
|
104
|
0
|
|
|
|
|
|
$log->debug('Sending heartbeat') if TRACE; |
|
105
|
0
|
|
|
|
|
|
$self->_ping; |
|
106
|
0
|
|
|
|
|
|
$self->{next_heartbeat} = steady_time + $status->{heartbeat_interval}; |
|
107
|
|
|
|
|
|
|
} |
|
108
|
|
|
|
|
|
|
|
|
109
|
|
|
|
|
|
|
# Process worker remote control commands in regular intervals |
|
110
|
0
|
0
|
0
|
|
|
|
if ($status->{command_interval} && $self->{next_command} < steady_time) { |
|
111
|
0
|
|
|
|
|
|
$log->debug('Checking remote control') if TRACE; |
|
112
|
0
|
|
|
|
|
|
$self->process_commands; |
|
113
|
0
|
|
|
|
|
|
$self->{next_command} = steady_time + $status->{command_interval}; |
|
114
|
|
|
|
|
|
|
} |
|
115
|
|
|
|
|
|
|
|
|
116
|
|
|
|
|
|
|
# Repair in regular intervals |
|
117
|
0
|
0
|
0
|
|
|
|
if ($status->{repair_interval} && $self->{next_repair} < steady_time) { |
|
118
|
0
|
|
|
|
|
|
$log->debug('Checking worker registry and job queue'); |
|
119
|
0
|
|
|
|
|
|
$self->minion->repair; |
|
120
|
0
|
|
|
|
|
|
$self->{next_repair} = steady_time + $status->{repair_interval}; |
|
121
|
|
|
|
|
|
|
} |
|
122
|
|
|
|
|
|
|
|
|
123
|
|
|
|
|
|
|
# Check if jobs are finished |
|
124
|
0
|
|
0
|
|
|
|
my $jobs = $self->{jobs} ||= {}; |
|
125
|
|
|
|
|
|
|
$jobs->{$_}->is_finished and ++$status->{performed} and delete $jobs->{$_} |
|
126
|
0
|
|
0
|
|
|
|
for keys %$jobs; |
|
|
|
|
0
|
|
|
|
|
|
127
|
|
|
|
|
|
|
|
|
128
|
|
|
|
|
|
|
# Return if worker is finished |
|
129
|
0
|
0
|
0
|
|
|
|
++$self->{finished} and return if $self->{stopping} && !keys %{$self->{jobs}}; |
|
|
0
|
|
0
|
|
|
|
|
|
130
|
|
|
|
|
|
|
|
|
131
|
|
|
|
|
|
|
# Job limit has been reached or worker is stopping |
|
132
|
|
|
|
|
|
|
return $self->emit('busy') |
|
133
|
0
|
0
|
0
|
|
|
|
if ($status->{jobs} <= keys %$jobs) || $self->{stopping}; |
|
134
|
|
|
|
|
|
|
|
|
135
|
|
|
|
|
|
|
# Try to get more jobs |
|
136
|
0
|
|
|
|
|
|
my ($max, $queues) = @{$status}{qw(dequeue_timeout queues)}; |
|
|
0
|
|
|
|
|
|
|
|
137
|
0
|
0
|
|
|
|
|
if (my $job = $self->emit('wait')->dequeue($max => {queues => $queues})) { |
|
138
|
0
|
|
|
|
|
|
$jobs->{my $id = $job->id} = $job->start; |
|
139
|
0
|
|
|
|
|
|
my ($pid, $task) = ($job->pid, $job->task); |
|
140
|
0
|
|
|
|
|
|
$log->debug(qq{Process $pid is performing job "$id" with task "$task"}); |
|
141
|
|
|
|
|
|
|
} |
|
142
|
|
|
|
|
|
|
} |
|
143
|
|
|
|
|
|
|
|
|
144
|
|
|
|
|
|
|
1; |
|
145
|
|
|
|
|
|
|
|
|
146
|
|
|
|
|
|
|
__END__ |