| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
package Helios::TS; |
|
2
|
|
|
|
|
|
|
|
|
3
|
1
|
|
|
1
|
|
19
|
use 5.008; |
|
|
1
|
|
|
|
|
3
|
|
|
4
|
1
|
|
|
1
|
|
6
|
use strict; |
|
|
1
|
|
|
|
|
2
|
|
|
|
1
|
|
|
|
|
26
|
|
|
5
|
1
|
|
|
1
|
|
5
|
use warnings; |
|
|
1
|
|
|
|
|
2
|
|
|
|
1
|
|
|
|
|
35
|
|
|
6
|
1
|
|
|
1
|
|
6
|
use base qw(TheSchwartz); |
|
|
1
|
|
|
|
|
1
|
|
|
|
1
|
|
|
|
|
840
|
|
|
7
|
1
|
|
|
1
|
|
19231
|
use fields qw(active_worker_class); # new fields for this subclass |
|
|
1
|
|
|
|
|
2
|
|
|
|
1
|
|
|
|
|
6
|
|
|
8
|
1
|
|
|
1
|
|
45
|
use Carp qw( croak ); |
|
|
1
|
|
|
|
|
1
|
|
|
|
1
|
|
|
|
|
53
|
|
|
9
|
1
|
|
|
1
|
|
4
|
use List::Util qw( shuffle ); |
|
|
1
|
|
|
|
|
1
|
|
|
|
1
|
|
|
|
|
45
|
|
|
10
|
1
|
|
|
1
|
|
441
|
use Helios::TS::Job; |
|
|
1
|
|
|
|
|
2
|
|
|
|
1
|
|
|
|
|
41
|
|
|
11
|
|
|
|
|
|
|
|
|
12
|
1
|
|
|
1
|
|
5
|
use constant OK_ERRORS => { map { $_ => 1 } Data::ObjectDriver::Errors->UNIQUE_CONSTRAINT, }; |
|
|
1
|
|
|
|
|
1
|
|
|
|
1
|
|
|
|
|
5
|
|
|
|
1
|
|
|
|
|
806
|
|
|
13
|
|
|
|
|
|
|
|
|
14
|
|
|
|
|
|
|
our $VERSION = '2.80'; |
|
15
|
|
|
|
|
|
|
|
|
16
|
|
|
|
|
|
|
# FILE CHANGE HISTORY |
|
17
|
|
|
|
|
|
|
# (This code is modified from the original TheSchwartz.pm where noted.) |
|
18
|
|
|
|
|
|
|
# [LH] [2012-07-11]: driver_for(): Changed driver creation to use Helios driver |
|
19
|
|
|
|
|
|
|
# to cache database connections. |
|
20
|
|
|
|
|
|
|
# [LH] [2013-09-21]: find_job_for_workers(): Added code to enable job |
|
21
|
|
|
|
|
|
|
# prioritization. |
|
22
|
|
|
|
|
|
|
# [LH] [2013-10-04]: Implemented "virtual jobtypes" - funcmap entries without |
|
23
|
|
|
|
|
|
|
# actual TheSchwartz::Worker subclasses to back them up. Switched to using |
|
24
|
|
|
|
|
|
|
# Helios::TS::Job instead of base TheSchwartz::Job because of this. |
|
25
|
|
|
|
|
|
|
# [LH] [2013-10-04]: work_once(): Commented out call to |
|
26
|
|
|
|
|
|
|
# temporarily_remove_ability() because we do not think the issue it solves is |
|
27
|
|
|
|
|
|
|
# a concern for Helios::TS (Oracle's indexes do not exhibit the issue t_r_a() |
|
28
|
|
|
|
|
|
|
# is supposed to solve, and we're not sure MySQL indexes do anymore either). |
|
29
|
|
|
|
|
|
|
# [LH] [2013-10-04]: Fix for Helios bug [RT79690], which appears to be a DBD |
|
30
|
|
|
|
|
|
|
# problem where a LOB becomes unbound in a query. |
|
31
|
|
|
|
|
|
|
# [LH] [2013-11-24]: Removed old code already commented out. |
|
32
|
|
|
|
|
|
|
|
|
33
|
|
|
|
|
|
|
our $T_AFTER_GRAB_SELECT_BEFORE_UPDATE; |
|
34
|
|
|
|
|
|
|
our $T_LOST_RACE; |
|
35
|
|
|
|
|
|
|
our $FIND_JOB_BATCH_SIZE = 50; |
|
36
|
|
|
|
|
|
|
|
|
37
|
|
|
|
|
|
|
# BEGIN CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC. |
|
38
|
|
|
|
|
|
|
# [LH] [2013-10-04]: Virtual jobtypes: Helios::TS->{active_worker_class} |
|
39
|
|
|
|
|
|
|
# attribute and accessors for it. |
|
40
|
|
|
|
|
|
|
sub new { |
|
41
|
0
|
|
|
0
|
1
|
|
my $class = shift; |
|
42
|
0
|
|
|
|
|
|
my %params = @_; |
|
43
|
0
|
|
|
|
|
|
my $self = fields::new($class); |
|
44
|
0
|
|
|
|
|
|
$self->SUPER::new(@_); # init base fields |
|
45
|
0
|
0
|
|
|
|
|
if ( defined($params{active_worker_class})) { |
|
46
|
0
|
|
|
|
|
|
$self->{active_worker_class} = $params{active_worker_class}; |
|
47
|
|
|
|
|
|
|
} |
|
48
|
0
|
|
|
|
|
|
return $self; |
|
49
|
|
|
|
|
|
|
} |
|
50
|
|
|
|
|
|
|
|
|
51
|
|
|
|
|
|
|
sub active_worker_class { |
|
52
|
0
|
|
|
0
|
0
|
|
my Helios::TS $hts = shift; |
|
53
|
0
|
|
|
|
|
|
return $hts->{active_worker_class}; |
|
54
|
|
|
|
|
|
|
} |
|
55
|
|
|
|
|
|
|
sub set_active_worker_class { |
|
56
|
0
|
|
|
0
|
0
|
|
my Helios::TS $hts = shift; |
|
57
|
0
|
|
|
|
|
|
$hts->{active_worker_class} = shift; |
|
58
|
|
|
|
|
|
|
} |
|
59
|
|
|
|
|
|
|
# END CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC. |
|
60
|
|
|
|
|
|
|
|
|
61
|
|
|
|
|
|
|
|
|
62
|
|
|
|
|
|
|
sub driver_for { |
|
63
|
0
|
|
|
0
|
0
|
|
my Helios::TS $client = shift; |
|
64
|
0
|
|
|
|
|
|
my($hashdsn) = @_; |
|
65
|
0
|
|
|
|
|
|
my $driver; |
|
66
|
0
|
|
|
|
|
|
my $t = time; |
|
67
|
0
|
|
|
|
|
|
my $cache_duration = $client->{driver_cache_expiration}; |
|
68
|
0
|
0
|
0
|
|
|
|
if ($cache_duration && $client->{cached_drivers}{$hashdsn}{create_ts} && $client->{cached_drivers}{$hashdsn}{create_ts} + $cache_duration > $t) { |
|
|
|
|
0
|
|
|
|
|
|
69
|
0
|
|
|
|
|
|
$driver = $client->{cached_drivers}{$hashdsn}{driver}; |
|
70
|
|
|
|
|
|
|
} else { |
|
71
|
0
|
0
|
|
|
|
|
my $db = $client->{databases}{$hashdsn} |
|
72
|
|
|
|
|
|
|
or croak "Ouch, I don't know about a database whose hash is $hashdsn"; |
|
73
|
0
|
0
|
|
|
|
|
if ($db->{driver}) { |
|
74
|
0
|
|
|
|
|
|
$driver = $db->{driver}; |
|
75
|
|
|
|
|
|
|
} else { |
|
76
|
|
|
|
|
|
|
# [LH] 2012-07-11: Changed driver creation to use Helios driver to |
|
77
|
|
|
|
|
|
|
# cache database connections. |
|
78
|
|
|
|
|
|
|
$driver = Helios::ObjectDriver::DBI->new( |
|
79
|
|
|
|
|
|
|
dsn => $db->{dsn}, |
|
80
|
|
|
|
|
|
|
username => $db->{user}, |
|
81
|
|
|
|
|
|
|
password => $db->{pass}, |
|
82
|
0
|
|
|
|
|
|
); |
|
83
|
|
|
|
|
|
|
} |
|
84
|
0
|
0
|
|
|
|
|
$driver->prefix($db->{prefix}) if exists $db->{prefix}; |
|
85
|
|
|
|
|
|
|
|
|
86
|
0
|
0
|
|
|
|
|
if ($cache_duration) { |
|
87
|
0
|
|
|
|
|
|
$client->{cached_drivers}{$hashdsn}{driver} = $driver; |
|
88
|
0
|
|
|
|
|
|
$client->{cached_drivers}{$hashdsn}{create_ts} = $t; |
|
89
|
|
|
|
|
|
|
} |
|
90
|
|
|
|
|
|
|
} |
|
91
|
0
|
|
|
|
|
|
return $driver; |
|
92
|
|
|
|
|
|
|
} |
|
93
|
|
|
|
|
|
|
|
|
94
|
|
|
|
|
|
|
|
|
95
|
|
|
|
|
|
|
sub find_job_for_workers { |
|
96
|
0
|
|
|
0
|
1
|
|
my Helios::TS $client = shift; |
|
97
|
0
|
|
|
|
|
|
my($worker_classes) = @_; |
|
98
|
0
|
|
0
|
|
|
|
$worker_classes ||= $client->{current_abilities}; |
|
99
|
0
|
|
|
|
|
|
for my $hashdsn ($client->shuffled_databases) { |
|
100
|
|
|
|
|
|
|
## If the database is dead, skip it. |
|
101
|
0
|
0
|
|
|
|
|
next if $client->is_database_dead($hashdsn); |
|
102
|
|
|
|
|
|
|
|
|
103
|
0
|
|
|
|
|
|
my $driver = $client->driver_for($hashdsn); |
|
104
|
0
|
|
|
|
|
|
my $unixtime = $driver->dbd->sql_for_unixtime; |
|
105
|
|
|
|
|
|
|
|
|
106
|
0
|
|
|
|
|
|
my @jobs; |
|
107
|
0
|
|
|
|
|
|
eval { |
|
108
|
|
|
|
|
|
|
## Search for jobs in this database where: |
|
109
|
|
|
|
|
|
|
## 1. funcname is in the list of abilities this $client supports; |
|
110
|
|
|
|
|
|
|
## 2. the job is scheduled to be run (run_after is in the past); |
|
111
|
|
|
|
|
|
|
## 3. no one else is working on the job (grabbed_until is in |
|
112
|
|
|
|
|
|
|
## in the past). |
|
113
|
0
|
|
|
|
|
|
my @ids = map { $client->funcname_to_id($driver, $hashdsn, $_) } |
|
|
0
|
|
|
|
|
|
|
|
114
|
|
|
|
|
|
|
@$worker_classes; |
|
115
|
|
|
|
|
|
|
|
|
116
|
|
|
|
|
|
|
# BEGIN CODE Copyright (C) 2012-3 by Logical Helion, LLC. |
|
117
|
|
|
|
|
|
|
# [LH] [2013-09-21]: Added code to enable job prioritization. |
|
118
|
0
|
|
|
|
|
|
my $direction = 'descend'; |
|
119
|
0
|
0
|
|
|
|
|
if ( $client->prioritize eq 'low' ) { |
|
120
|
0
|
|
|
|
|
|
$direction = 'ascend'; |
|
121
|
|
|
|
|
|
|
} |
|
122
|
|
|
|
|
|
|
# END CODE Copyright (C) 2012-3 by Logical Helion, LLC. |
|
123
|
|
|
|
|
|
|
|
|
124
|
|
|
|
|
|
|
# [LH] [2013-10-04]: Implemented "virtual jobtypes" - funcmap entries without |
|
125
|
|
|
|
|
|
|
# actual TheSchwartz::Worker subclasses to back them up. Switched to using |
|
126
|
|
|
|
|
|
|
# Helios::TS::Job instead of base TheSchwartz::Job because of this. |
|
127
|
0
|
0
|
|
|
|
|
@jobs = $driver->search('Helios::TS::Job' => { |
|
128
|
|
|
|
|
|
|
funcid => \@ids, |
|
129
|
|
|
|
|
|
|
run_after => \ "<= $unixtime", |
|
130
|
|
|
|
|
|
|
grabbed_until => \ "<= $unixtime", |
|
131
|
|
|
|
|
|
|
}, { limit => $FIND_JOB_BATCH_SIZE, |
|
132
|
|
|
|
|
|
|
( $client->prioritize ? ( sort => 'priority', |
|
133
|
|
|
|
|
|
|
direction => $direction ) : () ) |
|
134
|
|
|
|
|
|
|
} |
|
135
|
|
|
|
|
|
|
); |
|
136
|
|
|
|
|
|
|
}; |
|
137
|
0
|
0
|
|
|
|
|
if ($@) { |
|
138
|
0
|
0
|
0
|
|
|
|
unless (OK_ERRORS->{ $driver->last_error || 0 }) { |
|
139
|
0
|
|
|
|
|
|
$client->mark_database_as_dead($hashdsn); |
|
140
|
|
|
|
|
|
|
} |
|
141
|
|
|
|
|
|
|
} |
|
142
|
|
|
|
|
|
|
|
|
143
|
|
|
|
|
|
|
# for test harness race condition testing |
|
144
|
0
|
0
|
|
|
|
|
$T_AFTER_GRAB_SELECT_BEFORE_UPDATE->() if $T_AFTER_GRAB_SELECT_BEFORE_UPDATE; |
|
145
|
|
|
|
|
|
|
|
|
146
|
0
|
|
|
|
|
|
my $job = $client->_grab_a_job($hashdsn, @jobs); |
|
147
|
0
|
0
|
|
|
|
|
return $job if $job; |
|
148
|
|
|
|
|
|
|
} |
|
149
|
|
|
|
|
|
|
} |
|
150
|
|
|
|
|
|
|
|
|
151
|
|
|
|
|
|
|
|
|
152
|
|
|
|
|
|
|
sub _grab_a_job { |
|
153
|
0
|
|
|
0
|
|
|
my Helios::TS $client = shift; |
|
154
|
0
|
|
|
|
|
|
my $hashdsn = shift; |
|
155
|
0
|
|
|
|
|
|
my $driver = $client->driver_for($hashdsn); |
|
156
|
|
|
|
|
|
|
|
|
157
|
|
|
|
|
|
|
## Got some jobs! Randomize them to avoid contention between workers. |
|
158
|
0
|
|
|
|
|
|
my @jobs = shuffle(@_); |
|
159
|
|
|
|
|
|
|
|
|
160
|
|
|
|
|
|
|
JOB: |
|
161
|
0
|
|
|
|
|
|
while (my $job = shift @jobs) { |
|
162
|
|
|
|
|
|
|
# BEGIN CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC. |
|
163
|
|
|
|
|
|
|
# [LH] [2013-10-04] [RT79690] Check the job to see that it has an arg() |
|
164
|
|
|
|
|
|
|
# value. If it doesn't, throw it away and get a new one. This won't |
|
165
|
|
|
|
|
|
|
# prevent the LOB from unbinding, but it will work around it in a |
|
166
|
|
|
|
|
|
|
# relatively transparent way. |
|
167
|
0
|
0
|
|
|
|
|
unless ( ref($job->arg()) ) { |
|
168
|
0
|
|
|
|
|
|
next; |
|
169
|
|
|
|
|
|
|
} |
|
170
|
|
|
|
|
|
|
# END CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC. |
|
171
|
|
|
|
|
|
|
## Convert the funcid to a funcname, based on this database's map. |
|
172
|
0
|
|
|
|
|
|
$job->funcname( $client->funcid_to_name($driver, $hashdsn, $job->funcid) ); |
|
173
|
|
|
|
|
|
|
|
|
174
|
|
|
|
|
|
|
## Update the job's grabbed_until column so that |
|
175
|
|
|
|
|
|
|
## no one else takes it. |
|
176
|
|
|
|
|
|
|
# my $worker_class = $job->funcname; |
|
177
|
|
|
|
|
|
|
# BEGIN CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC. |
|
178
|
|
|
|
|
|
|
# [LH] [2013-10-04] The worker class is the "Active Worker Class" if |
|
179
|
|
|
|
|
|
|
# it's set. Otherwise, assume it's just the job's jobtype (funcname). |
|
180
|
0
|
|
0
|
|
|
|
my $worker_class = $client->{active_worker_class} || $job->funcname; |
|
181
|
|
|
|
|
|
|
# END CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC. |
|
182
|
0
|
|
|
|
|
|
my $old_grabbed_until = $job->grabbed_until; |
|
183
|
|
|
|
|
|
|
|
|
184
|
0
|
0
|
|
|
|
|
my $server_time = $client->get_server_time($driver) |
|
185
|
|
|
|
|
|
|
or die "expected a server time"; |
|
186
|
|
|
|
|
|
|
|
|
187
|
0
|
|
0
|
|
|
|
$job->grabbed_until($server_time + ($worker_class->grab_for || 1)); |
|
188
|
|
|
|
|
|
|
|
|
189
|
|
|
|
|
|
|
## Update the job in the database, and end the transaction. |
|
190
|
0
|
0
|
|
|
|
|
if ($driver->update($job, { grabbed_until => $old_grabbed_until }) < 1) { |
|
191
|
|
|
|
|
|
|
## We lost the race to get this particular job--another worker must |
|
192
|
|
|
|
|
|
|
## have got it and already updated it. Move on to the next job. |
|
193
|
0
|
0
|
|
|
|
|
$T_LOST_RACE->() if $T_LOST_RACE; |
|
194
|
0
|
|
|
|
|
|
next JOB; |
|
195
|
|
|
|
|
|
|
} |
|
196
|
|
|
|
|
|
|
|
|
197
|
|
|
|
|
|
|
## Now prepare the job, and return it. |
|
198
|
0
|
|
|
|
|
|
my $handle = TheSchwartz::JobHandle->new({ |
|
199
|
|
|
|
|
|
|
dsn_hashed => $hashdsn, |
|
200
|
|
|
|
|
|
|
jobid => $job->jobid, |
|
201
|
|
|
|
|
|
|
}); |
|
202
|
0
|
|
|
|
|
|
$handle->client($client); |
|
203
|
0
|
|
|
|
|
|
$job->handle($handle); |
|
204
|
0
|
|
|
|
|
|
return $job; |
|
205
|
|
|
|
|
|
|
} |
|
206
|
|
|
|
|
|
|
|
|
207
|
0
|
|
|
|
|
|
return undef; |
|
208
|
|
|
|
|
|
|
} |
|
209
|
|
|
|
|
|
|
|
|
210
|
|
|
|
|
|
|
|
|
211
|
|
|
|
|
|
|
sub work_once { |
|
212
|
|
|
|
|
|
|
# [LH] [2013-10-04] Using Helios::TS not TheSchwartz. |
|
213
|
0
|
|
|
0
|
1
|
|
my Helios::TS $client = shift; |
|
214
|
0
|
|
|
|
|
|
my $job = shift; # optional specific job to work on |
|
215
|
|
|
|
|
|
|
|
|
216
|
|
|
|
|
|
|
## Look for a job with our current set of abilities. Note that the |
|
217
|
|
|
|
|
|
|
## list of current abilities may not be equal to the full set of |
|
218
|
|
|
|
|
|
|
## abilities, to allow for even distribution between jobs. |
|
219
|
0
|
|
0
|
|
|
|
$job ||= $client->find_job_for_workers; |
|
220
|
|
|
|
|
|
|
|
|
221
|
|
|
|
|
|
|
## If we didn't find anything, restore our full abilities, and try |
|
222
|
|
|
|
|
|
|
## again. |
|
223
|
0
|
0
|
0
|
|
|
|
if (!$job && |
|
224
|
0
|
|
|
|
|
|
@{ $client->{current_abilities} } < @{ $client->{all_abilities} }) { |
|
|
0
|
|
|
|
|
|
|
|
225
|
0
|
|
|
|
|
|
$client->restore_full_abilities; |
|
226
|
0
|
|
|
|
|
|
$job = $client->find_job_for_workers; |
|
227
|
|
|
|
|
|
|
} |
|
228
|
|
|
|
|
|
|
|
|
229
|
|
|
|
|
|
|
# [LH] [2013-10-04]: Virtual Jobtypes: Use the active_worker_class |
|
230
|
|
|
|
|
|
|
# instead of the job's funcname if active_worker_class is set. |
|
231
|
0
|
|
0
|
|
|
|
my $class = $client->{active_worker_class} || ($job ? $job->funcname : undef); |
|
232
|
|
|
|
|
|
|
|
|
233
|
0
|
0
|
|
|
|
|
if ($job) { |
|
234
|
0
|
0
|
|
|
|
|
my $priority = $job->priority ? ", priority " . $job->priority : ""; |
|
235
|
|
|
|
|
|
|
# BEGIN CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC. |
|
236
|
0
|
0
|
|
|
|
|
if ($client->{active_worker_class}) { |
|
237
|
0
|
|
|
|
|
|
$job->{active_worker_class} = $client->{active_worker_class}; |
|
238
|
|
|
|
|
|
|
} |
|
239
|
|
|
|
|
|
|
# END CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC. |
|
240
|
0
|
|
|
|
|
|
$job->debug("TheSchwartz::work_once got job of class '$class'$priority"); |
|
241
|
|
|
|
|
|
|
} else { |
|
242
|
0
|
|
|
|
|
|
$client->debug("TheSchwartz::work_once found no jobs"); |
|
243
|
|
|
|
|
|
|
} |
|
244
|
|
|
|
|
|
|
|
|
245
|
|
|
|
|
|
|
## If we still don't have anything, return. |
|
246
|
0
|
0
|
|
|
|
|
return unless $job; |
|
247
|
|
|
|
|
|
|
|
|
248
|
|
|
|
|
|
|
## Now that we found a job for this particular funcname, remove it |
|
249
|
|
|
|
|
|
|
## from our list of current abilities. So the next time we look for a |
|
250
|
|
|
|
|
|
|
## we'll find a job for a different funcname. This prevents starvation of |
|
251
|
|
|
|
|
|
|
## high funcid values because of the way MySQL's indexes work. |
|
252
|
|
|
|
|
|
|
# [LH] [2013-10-04]: work_once(): Commented out call to |
|
253
|
|
|
|
|
|
|
# temporarily_remove_ability() because we do not think the issue it solves is |
|
254
|
|
|
|
|
|
|
# a concern for Helios::TS (Oracle's indexes do not exhibit the issue t_r_a() |
|
255
|
|
|
|
|
|
|
# is supposed to solve, and we're not sure MySQL indexes do anymore either). |
|
256
|
|
|
|
|
|
|
# $client->temporarily_remove_ability($class); |
|
257
|
|
|
|
|
|
|
|
|
258
|
0
|
|
|
|
|
|
$class->work_safely($job); |
|
259
|
|
|
|
|
|
|
|
|
260
|
|
|
|
|
|
|
## We got a job, so return 1 so work_until_done (which calls this method) |
|
261
|
|
|
|
|
|
|
## knows to keep looking for jobs. |
|
262
|
0
|
|
|
|
|
|
return 1; |
|
263
|
|
|
|
|
|
|
} |
|
264
|
|
|
|
|
|
|
|
|
265
|
|
|
|
|
|
|
|
|
266
|
|
|
|
|
|
|
1; |
|
267
|
|
|
|
|
|
|
__END__ |