File Coverage

blib/lib/Minion/Worker/Role/Kevin.pm
Criterion Covered Total %
statement 12 84 14.2
branch 0 36 0.0
condition 1 43 2.3
subroutine 4 15 26.6
pod 0 1 0.0
total 17 179 9.5


line stmt bran cond sub pod time code
1              
2             package Minion::Worker::Role::Kevin;
3             $Minion::Worker::Role::Kevin::VERSION = '0.7.1';
4             # ABSTRACT: Alternative Minion worker
5 1     1   713 use Mojo::Base -role;
  1         1  
  1         5  
6              
7 1     1   688 use Mojo::Log;
  1         1188  
  1         5  
8 1     1   29 use Mojo::Util 'steady_time';
  1         2  
  1         42  
9              
10 1   50 1   4 use constant TRACE => $ENV{KEVIN_WORKER_TRACE} || 0;
  1         2  
  1         1057  
11              
12             has 'defaults';
13              
14             has 'log' => sub { Mojo::Log->new };
15              
16             sub _defaults {
17             return {
18 0     0     command_interval => 10,
19             dequeue_timeout => 5,
20             heartbeat_interval => 300,
21             jobs => 4,
22             queues => ['default'],
23             repair_interval => 21600,
24             };
25             }
26              
27             sub _run_defaults {
28 0     0     my $self = shift;
29 0   0       $self->{_run_defaults} //= {%{$self->_defaults}, %{$self->{defaults} // {}}};
  0   0        
  0            
30             }
31              
32             sub run {
33 0     0 0   my ($self, @args) = @_;
34              
35 0           my $status = $self->status;
36 0           my $defaults = $self->_run_defaults;
37              
38 0   0       $status->{$_} //= $defaults->{$_} for keys %$defaults;
39 0   0       $status->{performed} //= 0;
40              
41 0           my $now = steady_time;
42 0 0         $self->{next_heartbeat} = $now if $status->{heartbeat_interval};
43 0 0         $self->{next_command} = $now if $status->{command_interval};
44 0 0         if ($status->{repair_interval}) {
45              
46             # Randomize to avoid congestion
47 0           $status->{repair_interval} -= int rand $status->{repair_interval} / 2;
48              
49 0           $self->{next_repair} = $now;
50             $self->{next_repair} += $status->{repair_interval}
51 0 0         if delete $status->{fast_start};
52             }
53              
54 0           $self->{pid} = $$;
55 0     0     local $SIG{CHLD} = sub { };
56 0     0     local $SIG{INT} = local $SIG{TERM} = sub { $self->_term(1) };
  0            
57 0     0     local $SIG{QUIT} = sub { $self->_term };
  0            
58              
59             # Remote control commands need to validate arguments carefully
60 0           my $commands = $self->commands;
61             local $commands->{jobs}
62 0 0 0 0     = sub { $status->{jobs} = $_[1] if ($_[1] // '') =~ /^\d+$/ };
  0            
63             local $commands->{stop}
64 0 0 0 0     = sub { $self->{jobs}{$_[1]}->stop if $self->{jobs}{$_[1] // ''} };
  0            
65              
66             # Log fatal errors
67 0           my $log = $self->log;
68 0           $log->info("Worker $$ started");
69 0 0         eval { $self->_work until $self->{finished}; 1 }
  0            
  0            
70             or $log->fatal("Worker error: $@");
71 0           $self->unregister;
72 0           $log->info("Worker $$ stopped");
73             }
74              
75             sub _term {
76 0     0     my ($self, $graceful) = @_;
77 0 0         return unless $self->{pid} == $$;
78 0           $self->{stopping}++;
79 0 0         $self->{graceful} = $graceful or kill 'KILL', keys %{$self->{jobs}};
  0            
80             }
81              
82             sub _ping {
83 0     0     shift->register;
84             }
85              
86             sub _work {
87 0     0     my $self = shift;
88              
89 0           my $log = $self->log;
90 0           my $status = $self->status;
91              
92 0 0 0       if ($self->{stopping} && !$self->{quit}++) {
93             $log->info("Stopping worker $$ "
94 0 0         . ($self->{graceful} ? 'gracefully' : 'immediately'));
95              
96             # Skip hearbeats, remote command and repairs
97 0           delete @{$status}{qw(heartbeat_interval command_interval )}
98 0 0         unless $self->{graceful};
99 0           delete $status->{repair_interval};
100             }
101              
102             # Send heartbeats in regular intervals
103 0 0 0       if ($status->{heartbeat_interval} && $self->{next_heartbeat} < steady_time) {
104 0           $log->debug('Sending heartbeat') if TRACE;
105 0           $self->_ping;
106 0           $self->{next_heartbeat} = steady_time + $status->{heartbeat_interval};
107             }
108              
109             # Process worker remote control commands in regular intervals
110 0 0 0       if ($status->{command_interval} && $self->{next_command} < steady_time) {
111 0           $log->debug('Checking remote control') if TRACE;
112 0           $self->process_commands;
113 0           $self->{next_command} = steady_time + $status->{command_interval};
114             }
115              
116             # Repair in regular intervals
117 0 0 0       if ($status->{repair_interval} && $self->{next_repair} < steady_time) {
118 0           $log->debug('Checking worker registry and job queue');
119 0           $self->minion->repair;
120 0           $self->{next_repair} = steady_time + $status->{repair_interval};
121             }
122              
123             # Check if jobs are finished
124 0   0       my $jobs = $self->{jobs} ||= {};
125             $jobs->{$_}->is_finished and ++$status->{performed} and delete $jobs->{$_}
126 0   0       for keys %$jobs;
      0        
127              
128             # Return if worker is finished
129 0 0 0       ++$self->{finished} and return if $self->{stopping} && !keys %{$self->{jobs}};
  0   0        
130              
131             # Job limit has been reached or worker is stopping
132             return $self->emit('busy')
133 0 0 0       if ($status->{jobs} <= keys %$jobs) || $self->{stopping};
134              
135             # Try to get more jobs
136 0           my ($max, $queues) = @{$status}{qw(dequeue_timeout queues)};
  0            
137 0 0         if (my $job = $self->emit('wait')->dequeue($max => {queues => $queues})) {
138 0           $jobs->{my $id = $job->id} = $job->start;
139 0           my ($pid, $task) = ($job->pid, $job->task);
140 0           $log->debug(qq{Process $pid is performing job "$id" with task "$task"});
141             }
142             }
143              
144             1;
145              
146             __END__