File Coverage

blib/lib/Helios/TS.pm
Criterion Covered Total %
statement 27 113 23.8
branch 0 44 0.0
condition 0 25 0.0
subroutine 9 16 56.2
pod 3 6 50.0
total 39 204 19.1


line stmt bran cond sub pod time code
1             package Helios::TS;
2              
3 1     1   19 use 5.008;
  1         3  
4 1     1   6 use strict;
  1         2  
  1         26  
5 1     1   5 use warnings;
  1         2  
  1         35  
6 1     1   6 use base qw(TheSchwartz);
  1         1  
  1         840  
7 1     1   19231 use fields qw(active_worker_class); # new fields for this subclass
  1         2  
  1         6  
8 1     1   45 use Carp qw( croak );
  1         1  
  1         53  
9 1     1   4 use List::Util qw( shuffle );
  1         1  
  1         45  
10 1     1   441 use Helios::TS::Job;
  1         2  
  1         41  
11              
12 1     1   5 use constant OK_ERRORS => { map { $_ => 1 } Data::ObjectDriver::Errors->UNIQUE_CONSTRAINT, };
  1         1  
  1         5  
  1         806  
13              
14             our $VERSION = '2.80';
15              
16             # FILE CHANGE HISTORY
17             # (This code is modified from the original TheSchwartz.pm where noted.)
18             # [LH] [2012-07-11]: driver_for(): Changed driver creation to use Helios driver
19             # to cache database connections.
20             # [LH] [2013-09-21]: find_job_for_workers(): Added code to enable job
21             # prioritization.
22             # [LH] [2013-10-04]: Implemented "virtual jobtypes" - funcmap entries without
23             # actual TheSchwartz::Worker subclasses to back them up. Switched to using
24             # Helios::TS::Job instead of base TheSchwartz::Job because of this.
25             # [LH] [2013-10-04]: work_once(): Commented out call to
26             # temporarily_remove_ability() because we do not think the issue it solves is
27             # a concern for Helios::TS (Oracle's indexes do not exhibit the issue t_r_a()
28             # is supposed to solve, and we're not sure MySQL indexes do anymore either).
29             # [LH] [2013-10-04]: Fix for Helios bug [RT79690], which appears to be a DBD
30             # problem where a LOB becomes unbound in a query.
31             # [LH] [2013-11-24]: Removed old code already commented out.
32              
33             our $T_AFTER_GRAB_SELECT_BEFORE_UPDATE;
34             our $T_LOST_RACE;
35             our $FIND_JOB_BATCH_SIZE = 50;
36              
37             # BEGIN CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC.
38             # [LH] [2013-10-04]: Virtual jobtypes: Helios::TS->{active_worker_class}
39             # attribute and accessors for it.
40             sub new {
41 0     0 1   my $class = shift;
42 0           my %params = @_;
43 0           my $self = fields::new($class);
44 0           $self->SUPER::new(@_); # init base fields
45 0 0         if ( defined($params{active_worker_class})) {
46 0           $self->{active_worker_class} = $params{active_worker_class};
47             }
48 0           return $self;
49             }
50              
51             sub active_worker_class {
52 0     0 0   my Helios::TS $hts = shift;
53 0           return $hts->{active_worker_class};
54             }
55             sub set_active_worker_class {
56 0     0 0   my Helios::TS $hts = shift;
57 0           $hts->{active_worker_class} = shift;
58             }
59             # END CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC.
60              
61              
62             sub driver_for {
63 0     0 0   my Helios::TS $client = shift;
64 0           my($hashdsn) = @_;
65 0           my $driver;
66 0           my $t = time;
67 0           my $cache_duration = $client->{driver_cache_expiration};
68 0 0 0       if ($cache_duration && $client->{cached_drivers}{$hashdsn}{create_ts} && $client->{cached_drivers}{$hashdsn}{create_ts} + $cache_duration > $t) {
      0        
69 0           $driver = $client->{cached_drivers}{$hashdsn}{driver};
70             } else {
71 0 0         my $db = $client->{databases}{$hashdsn}
72             or croak "Ouch, I don't know about a database whose hash is $hashdsn";
73 0 0         if ($db->{driver}) {
74 0           $driver = $db->{driver};
75             } else {
76             # [LH] 2012-07-11: Changed driver creation to use Helios driver to
77             # cache database connections.
78             $driver = Helios::ObjectDriver::DBI->new(
79             dsn => $db->{dsn},
80             username => $db->{user},
81             password => $db->{pass},
82 0           );
83             }
84 0 0         $driver->prefix($db->{prefix}) if exists $db->{prefix};
85              
86 0 0         if ($cache_duration) {
87 0           $client->{cached_drivers}{$hashdsn}{driver} = $driver;
88 0           $client->{cached_drivers}{$hashdsn}{create_ts} = $t;
89             }
90             }
91 0           return $driver;
92             }
93              
94              
95             sub find_job_for_workers {
96 0     0 1   my Helios::TS $client = shift;
97 0           my($worker_classes) = @_;
98 0   0       $worker_classes ||= $client->{current_abilities};
99 0           for my $hashdsn ($client->shuffled_databases) {
100             ## If the database is dead, skip it.
101 0 0         next if $client->is_database_dead($hashdsn);
102              
103 0           my $driver = $client->driver_for($hashdsn);
104 0           my $unixtime = $driver->dbd->sql_for_unixtime;
105              
106 0           my @jobs;
107 0           eval {
108             ## Search for jobs in this database where:
109             ## 1. funcname is in the list of abilities this $client supports;
110             ## 2. the job is scheduled to be run (run_after is in the past);
111             ## 3. no one else is working on the job (grabbed_until is in
112             ## in the past).
113 0           my @ids = map { $client->funcname_to_id($driver, $hashdsn, $_) }
  0            
114             @$worker_classes;
115              
116             # BEGIN CODE Copyright (C) 2012-3 by Logical Helion, LLC.
117             # [LH] [2013-09-21]: Added code to enable job prioritization.
118 0           my $direction = 'descend';
119 0 0         if ( $client->prioritize eq 'low' ) {
120 0           $direction = 'ascend';
121             }
122             # END CODE Copyright (C) 2012-3 by Logical Helion, LLC.
123              
124             # [LH] [2013-10-04]: Implemented "virtual jobtypes" - funcmap entries without
125             # actual TheSchwartz::Worker subclasses to back them up. Switched to using
126             # Helios::TS::Job instead of base TheSchwartz::Job because of this.
127 0 0         @jobs = $driver->search('Helios::TS::Job' => {
128             funcid => \@ids,
129             run_after => \ "<= $unixtime",
130             grabbed_until => \ "<= $unixtime",
131             }, { limit => $FIND_JOB_BATCH_SIZE,
132             ( $client->prioritize ? ( sort => 'priority',
133             direction => $direction ) : () )
134             }
135             );
136             };
137 0 0         if ($@) {
138 0 0 0       unless (OK_ERRORS->{ $driver->last_error || 0 }) {
139 0           $client->mark_database_as_dead($hashdsn);
140             }
141             }
142              
143             # for test harness race condition testing
144 0 0         $T_AFTER_GRAB_SELECT_BEFORE_UPDATE->() if $T_AFTER_GRAB_SELECT_BEFORE_UPDATE;
145              
146 0           my $job = $client->_grab_a_job($hashdsn, @jobs);
147 0 0         return $job if $job;
148             }
149             }
150              
151              
152             sub _grab_a_job {
153 0     0     my Helios::TS $client = shift;
154 0           my $hashdsn = shift;
155 0           my $driver = $client->driver_for($hashdsn);
156              
157             ## Got some jobs! Randomize them to avoid contention between workers.
158 0           my @jobs = shuffle(@_);
159              
160             JOB:
161 0           while (my $job = shift @jobs) {
162             # BEGIN CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC.
163             # [LH] [2013-10-04] [RT79690] Check the job to see that it has an arg()
164             # value. If it doesn't, throw it away and get a new one. This won't
165             # prevent the LOB from unbinding, but it will work around it in a
166             # relatively transparent way.
167 0 0         unless ( ref($job->arg()) ) {
168 0           next;
169             }
170             # END CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC.
171             ## Convert the funcid to a funcname, based on this database's map.
172 0           $job->funcname( $client->funcid_to_name($driver, $hashdsn, $job->funcid) );
173              
174             ## Update the job's grabbed_until column so that
175             ## no one else takes it.
176             # my $worker_class = $job->funcname;
177             # BEGIN CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC.
178             # [LH] [2013-10-04] The worker class is the "Active Worker Class" if
179             # it's set. Otherwise, assume it's just the job's jobtype (funcname).
180 0   0       my $worker_class = $client->{active_worker_class} || $job->funcname;
181             # END CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC.
182 0           my $old_grabbed_until = $job->grabbed_until;
183              
184 0 0         my $server_time = $client->get_server_time($driver)
185             or die "expected a server time";
186              
187 0   0       $job->grabbed_until($server_time + ($worker_class->grab_for || 1));
188              
189             ## Update the job in the database, and end the transaction.
190 0 0         if ($driver->update($job, { grabbed_until => $old_grabbed_until }) < 1) {
191             ## We lost the race to get this particular job--another worker must
192             ## have got it and already updated it. Move on to the next job.
193 0 0         $T_LOST_RACE->() if $T_LOST_RACE;
194 0           next JOB;
195             }
196              
197             ## Now prepare the job, and return it.
198 0           my $handle = TheSchwartz::JobHandle->new({
199             dsn_hashed => $hashdsn,
200             jobid => $job->jobid,
201             });
202 0           $handle->client($client);
203 0           $job->handle($handle);
204 0           return $job;
205             }
206              
207 0           return undef;
208             }
209              
210              
211             sub work_once {
212             # [LH] [2013-10-04] Using Helios::TS not TheSchwartz.
213 0     0 1   my Helios::TS $client = shift;
214 0           my $job = shift; # optional specific job to work on
215              
216             ## Look for a job with our current set of abilities. Note that the
217             ## list of current abilities may not be equal to the full set of
218             ## abilities, to allow for even distribution between jobs.
219 0   0       $job ||= $client->find_job_for_workers;
220              
221             ## If we didn't find anything, restore our full abilities, and try
222             ## again.
223 0 0 0       if (!$job &&
224 0           @{ $client->{current_abilities} } < @{ $client->{all_abilities} }) {
  0            
225 0           $client->restore_full_abilities;
226 0           $job = $client->find_job_for_workers;
227             }
228              
229             # [LH] [2013-10-04]: Virtual Jobtypes: Use the active_worker_class
230             # instead of the job's funcname if active_worker_class is set.
231 0   0       my $class = $client->{active_worker_class} || ($job ? $job->funcname : undef);
232              
233 0 0         if ($job) {
234 0 0         my $priority = $job->priority ? ", priority " . $job->priority : "";
235             # BEGIN CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC.
236 0 0         if ($client->{active_worker_class}) {
237 0           $job->{active_worker_class} = $client->{active_worker_class};
238             }
239             # END CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC.
240 0           $job->debug("TheSchwartz::work_once got job of class '$class'$priority");
241             } else {
242 0           $client->debug("TheSchwartz::work_once found no jobs");
243             }
244              
245             ## If we still don't have anything, return.
246 0 0         return unless $job;
247              
248             ## Now that we found a job for this particular funcname, remove it
249             ## from our list of current abilities. So the next time we look for a
250             ## we'll find a job for a different funcname. This prevents starvation of
251             ## high funcid values because of the way MySQL's indexes work.
252             # [LH] [2013-10-04]: work_once(): Commented out call to
253             # temporarily_remove_ability() because we do not think the issue it solves is
254             # a concern for Helios::TS (Oracle's indexes do not exhibit the issue t_r_a()
255             # is supposed to solve, and we're not sure MySQL indexes do anymore either).
256             # $client->temporarily_remove_ability($class);
257              
258 0           $class->work_safely($job);
259              
260             ## We got a job, so return 1 so work_until_done (which calls this method)
261             ## knows to keep looking for jobs.
262 0           return 1;
263             }
264              
265              
266             1;
267             __END__