File Coverage

blib/lib/TheSchwartz.pm
Criterion Covered Total %
statement 39 439 8.8
branch 0 174 0.0
condition 0 66 0.0
subroutine 13 64 20.3
pod 30 45 66.6
total 82 788 10.4


line stmt bran cond sub pod time code
1             # $Id$
2              
3             package TheSchwartz;
4 24     24   120107 use 5.008001;
  24         186  
5 24     24   125 use strict;
  24         41  
  24         953  
6             use fields
7 24     24   8867 qw( databases retry_seconds dead_dsns retry_at funcmap_cache verbose all_abilities current_abilities current_job cached_drivers driver_cache_expiration scoreboard prioritize floor batch_size strict_remove_ability);
  24         34036  
  24         84  
8              
9             our $VERSION = "1.15";
10              
11 24     24   3459 use Carp qw( croak );
  24         42  
  24         1461  
12 24     24   8685 use Data::ObjectDriver::Errors;
  24         3944  
  24         573  
13 24     24   10157 use Data::ObjectDriver::Driver::DBI;
  24         956490  
  24         153  
14 24     24   915 use Digest::MD5 qw( md5_hex );
  24         41  
  24         1382  
15 24     24   137 use List::Util qw( shuffle );
  24         45  
  24         2069  
16 24     24   8502 use TheSchwartz::FuncMap;
  24         55  
  24         531  
17 24     24   9122 use TheSchwartz::Job;
  24         56  
  24         525  
18 24     24   136 use TheSchwartz::JobHandle;
  24         41  
  24         73  
19              
20 24     24   636 use constant RETRY_DEFAULT => 30;
  24         38  
  24         1913  
21             use constant OK_ERRORS =>
22 24     24   129 { map { $_ => 1 } Data::ObjectDriver::Errors->UNIQUE_CONSTRAINT, };
  24         44  
  24         268  
  24         98072  
23              
24             # test harness hooks
25             our $T_AFTER_GRAB_SELECT_BEFORE_UPDATE;
26             our $T_LOST_RACE;
27              
28             ## Number of jobs to fetch at a time in find_job_for_workers.
29             our $FIND_JOB_BATCH_SIZE = 50;
30              
31             sub new {
32 0     0 1   my TheSchwartz $client = shift;
33 0           my %args = @_;
34 0 0         $client = fields::new($client) unless ref $client;
35              
36             croak "databases must be an arrayref if specified"
37 0 0 0       unless !exists $args{databases} || ref $args{databases} eq 'ARRAY';
38 0           my $databases = delete $args{databases};
39              
40 0   0       $client->{retry_seconds} = delete $args{retry_seconds} || RETRY_DEFAULT;
41 0           $client->set_prioritize( delete $args{prioritize} );
42 0           $client->set_verbose( delete $args{verbose} );
43 0           $client->set_scoreboard( delete $args{scoreboard} );
44             $client->{driver_cache_expiration} = delete $args{driver_cache_expiration}
45 0   0       || 0;
46 0   0       $client->{batch_size} = delete $args{batch_size} || $FIND_JOB_BATCH_SIZE;
47              
48 0           $client->{strict_remove_ability} = delete $args{strict_remove_ability};
49              
50 0           my $floor = delete $args{floor};
51 0 0         $client->set_floor($floor) if ($floor);
52              
53 0 0         croak "unknown options ", join( ', ', keys %args ) if keys %args;
54              
55 0           $client->hash_databases($databases);
56 0           $client->reset_abilities;
57 0           $client->{dead_dsns} = {};
58 0           $client->{retry_at} = {};
59 0           $client->{funcmap_cache} = {};
60              
61 0           return $client;
62             }
63              
64             sub debug {
65 0     0 0   my TheSchwartz $client = shift;
66 0 0         return unless $client->{verbose};
67 0           $client->{verbose}->(@_); # ($msg, $job) but $job is optional
68             }
69              
70             sub hash_databases {
71 0     0 0   my TheSchwartz $client = shift;
72 0           my ($list) = @_;
73 0           for my $ref (@$list) {
74 0           my $var;
75             my @parts;
76 0 0         if ( $ref->{driver} ) {
77 0           my $dbh;
78 0 0         if ( my $getter = $ref->{driver}->get_dbh ) {
79 0           $dbh = $getter->();
80             }
81             else {
82 0           $dbh = $ref->{driver}->dbh;
83             }
84 0           $dbh = tied( %{$dbh} );
  0            
85 0           my $dsn = "dbd:" . $dbh->{Driver}->{Name} . ":" . $dbh->{Name};
86 0   0       my $user = $dbh->{Username} || '';
87 0           @parts = ( $dsn, $user );
88             }
89             else {
90 0 0         @parts = map { $ref->{$_} || '' } qw(dsn user);
  0            
91             }
92 0           my $full = join '|', @parts;
93 0           $client->{databases}{ md5_hex($full) } = $ref;
94             }
95             }
96              
97             sub driver_for {
98 0     0 0   my TheSchwartz $client = shift;
99 0           my ($hashdsn) = @_;
100 0           my $driver;
101 0           my $t = time;
102 0           my $cache_duration = $client->{driver_cache_expiration};
103 0 0 0       if ( $cache_duration
      0        
104             && $client->{cached_drivers}{$hashdsn}{create_ts}
105             && $client->{cached_drivers}{$hashdsn}{create_ts} + $cache_duration
106             > $t )
107             {
108 0           $driver = $client->{cached_drivers}{$hashdsn}{driver};
109             }
110             else {
111 0 0         my $db = $client->{databases}{$hashdsn}
112             or croak
113             "Ouch, I don't know about a database whose hash is $hashdsn";
114 0 0         if ( $db->{driver} ) {
115 0           $driver = $db->{driver};
116             }
117             else {
118             $driver = Data::ObjectDriver::Driver::DBI->new(
119             dsn => $db->{dsn},
120             username => $db->{user},
121             password => $db->{pass},
122 0           );
123             }
124 0 0         $driver->prefix( $db->{prefix} ) if exists $db->{prefix};
125              
126 0 0         if ($cache_duration) {
127 0           $client->{cached_drivers}{$hashdsn}{driver} = $driver;
128 0           $client->{cached_drivers}{$hashdsn}{create_ts} = $t;
129             }
130             }
131 0           return $driver;
132             }
133              
134             sub mark_database_as_dead {
135 0     0 0   my TheSchwartz $client = shift;
136 0           my ($hashdsn) = @_;
137 0           $client->{dead_dsns}{$hashdsn} = 1;
138 0           $client->{retry_at}{$hashdsn} = time + $client->{retry_seconds};
139 0   0       $client->debug("Disabling DB $hashdsn because " . ($client->driver_for($hashdsn)->last_error() || 'unknown'));
140             }
141              
142             sub is_database_dead {
143 0     0 0   my TheSchwartz $client = shift;
144 0           my ($hashdsn) = @_;
145             ## If this database is marked as dead, check the retry time. If
146             ## it has passed, try the database again to see if it's undead.
147 0 0         if ( $client->{dead_dsns}{$hashdsn} ) {
148 0 0         if ( $client->{retry_at}{$hashdsn} < time ) {
149 0           delete $client->{dead_dsns}{$hashdsn};
150 0           delete $client->{retry_at}{$hashdsn};
151 0           return 0;
152             }
153             else {
154 0           return 1;
155             }
156             }
157 0           return 0;
158             }
159              
160             sub lookup_job {
161 0     0 1   my TheSchwartz $client = shift;
162 0           my $handle = $client->handle_from_string(@_);
163 0           my $driver = $client->driver_for( $handle->dsn_hashed );
164              
165 0           my $id = $handle->jobid;
166 0 0         my $job = $driver->lookup( 'TheSchwartz::Job' => $handle->jobid )
167             or return;
168              
169 0           $job->handle($handle);
170 0           $job->funcname(
171             $client->funcid_to_name( $driver, $handle->dsn_hashed, $job->funcid )
172             );
173 0           return $job;
174             }
175              
176             sub list_jobs {
177 0     0 1   my TheSchwartz $client = shift;
178 0           my $arg = shift;
179              
180 0           my ( %terms, %options );
181              
182             $terms{run_after} = { op => '<=', value => $arg->{run_after} }
183 0 0         if exists $arg->{run_after};
184              
185             $terms{grabbed_until} = { op => '<=', value => $arg->{grabbed_until} }
186 0 0         if exists $arg->{grabbed_until};
187              
188             $terms{jobid} = { op => '=', value => $arg->{jobid} }
189 0 0         if exists $arg->{jobid};
190              
191 0 0         die "No funcname" unless exists $arg->{funcname};
192              
193 0 0         $arg->{want_handle} = 1 unless defined $arg->{want_handle};
194              
195 0   0       my $limit = $arg->{limit} || $client->batch_size;
196              
197 0 0         if ( $arg->{coalesce} ) {
198 0   0       $arg->{coalesce_op} ||= '=';
199             }
200              
201 0           $options{limit} = $limit;
202 0 0         if ( $client->prioritize ) {
203             $options{sort} = [
204 0           { column => 'priority', direction => 'descend' },
205             { column => 'jobid' },
206             ];
207             }
208             else { # RT #34843
209 0           $options{sort} = [ { column => 'jobid' }, ];
210             }
211              
212 0 0         if ( $client->floor ) {
213 0           $terms{priority} = { op => '>=', value => $client->floor };
214             }
215              
216 0           my @jobs;
217 0           for my $hashdsn ( $client->shuffled_databases ) {
218             ## If the database is dead, skip it
219 0 0         next if $client->is_database_dead($hashdsn);
220 0           my $driver = $client->driver_for($hashdsn);
221 0 0         if ( ref( $arg->{funcname} ) ) {
222             $terms{funcid}
223 0           = [ map { $client->funcname_to_id( $driver, $hashdsn, $_ ) }
224 0           @{ $arg->{funcname} } ];
  0            
225             }
226             else {
227             $terms{funcid} = $client->funcname_to_id( $driver, $hashdsn,
228 0           $arg->{funcname} );
229             }
230              
231 0 0         if ( $arg->{want_handle} ) {
232             push @jobs, map {
233 0           my $handle = TheSchwartz::JobHandle->new(
  0            
234             { dsn_hashed => $hashdsn,
235             client => $client,
236             jobid => $_->jobid
237             }
238             );
239 0           $_->handle($handle);
240 0           $_;
241             } $driver->search( 'TheSchwartz::Job' => \%terms, \%options );
242             }
243             else {
244 0           push @jobs,
245             $driver->search( 'TheSchwartz::Job' => \%terms, \%options );
246             }
247             }
248 0           return @jobs;
249             }
250              
251             sub find_job_with_coalescing_prefix {
252 0     0 1   my TheSchwartz $client = shift;
253 0           my ( $funcname, $coval ) = @_;
254 0           $coval .= "%";
255 0           return $client->_find_job_with_coalescing( 'LIKE', $funcname, $coval );
256             }
257              
258             sub find_job_with_coalescing_value {
259 0     0 1   my TheSchwartz $client = shift;
260 0           return $client->_find_job_with_coalescing( '=', @_ );
261             }
262              
263             sub _find_job_with_coalescing {
264 0     0     my TheSchwartz $client = shift;
265 0           my ( $op, $funcname, $coval ) = @_;
266              
267 0           for my $hashdsn ( $client->shuffled_databases ) {
268             ## If the database is dead, skip it
269 0 0         next if $client->is_database_dead($hashdsn);
270              
271 0           my $driver = $client->driver_for($hashdsn);
272 0           my $unixtime = $driver->dbd->sql_for_unixtime;
273              
274 0           my %options = ( limit => $client->batch_size );
275 0 0         if ( $client->prioritize ) {
276             $options{sort} = [
277 0           { column => 'priority', direction => 'descend' },
278             { column => 'jobid' },
279             ];
280             }
281             else { # RT #34843
282 0           $options{sort} = [ { column => 'jobid' }, ];
283             }
284              
285 0           my @jobs;
286 0           eval {
287             ## Search for jobs in this database where:
288             ## 1. funcname is in the list of abilities this $client supports;
289             ## 2. the job is scheduled to be run (run_after is in the past);
290             ## 3. no one else is working on the job (grabbed_until is in
291             ## in the past).
292 0           my $funcid
293             = $client->funcname_to_id( $driver, $hashdsn, $funcname );
294              
295 0           my %terms = (
296             funcid => $funcid,
297             run_after => \"<= $unixtime",
298             grabbed_until => \"<= $unixtime",
299             coalesce => { op => $op, value => $coval },
300             );
301              
302 0 0         if ( $client->floor ) {
303 0           $terms{priority} = { op => '>=', value => $client->floor };
304             }
305              
306 0           @jobs = $driver->search(
307             'TheSchwartz::Job' => \%terms,
308             \%options,
309             );
310             };
311 0 0         if ($@) {
312 0 0 0       unless ( OK_ERRORS->{ $driver->last_error || 0 } ) {
313 0           $client->mark_database_as_dead($hashdsn);
314             }
315             }
316              
317 0           my $job = $client->_grab_a_job( $hashdsn, @jobs );
318 0 0         return $job if $job;
319             }
320             }
321              
322             sub find_job_for_workers {
323 0     0 1   my TheSchwartz $client = shift;
324 0           my ($worker_classes) = @_;
325 0   0       $worker_classes ||= $client->{current_abilities};
326              
327 0           my %options = ( limit => $client->batch_size );
328 0 0         if ( $client->prioritize ) {
329             $options{sort} = [
330 0           { column => 'priority', direction => 'descend' },
331             { column => 'jobid' },
332             ];
333             }
334             else { # RT #34843
335 0           $options{sort} = [ { column => 'jobid' }, ];
336             }
337              
338 0           for my $hashdsn ( $client->shuffled_databases ) {
339             ## If the database is dead, skip it.
340 0 0         next if $client->is_database_dead($hashdsn);
341              
342 0           my $driver = $client->driver_for($hashdsn);
343 0           my $unixtime = $driver->dbd->sql_for_unixtime;
344              
345 0           my @jobs;
346 0           eval {
347             ## Search for jobs in this database where:
348             ## 1. funcname is in the list of abilities this $client supports;
349             ## 2. the job is scheduled to be run (run_after is in the past);
350             ## 3. no one else is working on the job (grabbed_until is in
351             ## in the past).
352 0           my @ids = map { $client->funcname_to_id( $driver, $hashdsn, $_ ) }
  0            
353             @$worker_classes;
354              
355 0           my %terms = (
356             funcid => \@ids,
357             run_after => \"<= $unixtime",
358             grabbed_until => \"<= $unixtime",
359             );
360              
361 0 0         if ( $client->floor ) {
362 0           $terms{priority} = { op => '>=', value => $client->floor };
363             }
364              
365 0           @jobs = $driver->search(
366             'TheSchwartz::Job' => \%terms,
367             \%options,
368             );
369             };
370 0 0         if ($@) {
371 0 0 0       unless ( OK_ERRORS->{ $driver->last_error || 0 } ) {
372 0           $client->mark_database_as_dead($hashdsn);
373             }
374             }
375              
376             # for test harness race condition testing
377 0 0         $T_AFTER_GRAB_SELECT_BEFORE_UPDATE->()
378             if $T_AFTER_GRAB_SELECT_BEFORE_UPDATE;
379              
380 0           my $job = $client->_grab_a_job( $hashdsn, @jobs );
381 0 0         return $job if $job;
382             }
383             }
384              
385             sub get_server_time {
386 0     0 1   my TheSchwartz $client = shift;
387 0           my ($driver) = @_;
388 0           my $unixtime_sql = $driver->dbd->sql_for_unixtime;
389              
390             # RT #58049
391 0 0         $unixtime_sql .= ' FROM DUAL'
392             if ( $driver->dbd->isa('Data::ObjectDriver::Driver::DBD::Oracle') );
393              
394 0           return $driver->rw_handle->selectrow_array("SELECT $unixtime_sql");
395             }
396              
397             sub _grab_a_job {
398 0     0     my TheSchwartz $client = shift;
399 0           my $hashdsn = shift;
400 0           my $driver = $client->driver_for($hashdsn);
401              
402             ## Got some jobs! Randomize them to avoid contention between workers.
403 0           my @jobs = shuffle(@_);
404              
405             JOB:
406 0           while ( my $job = shift @jobs ) {
407             ## Convert the funcid to a funcname, based on this database's map.
408 0           $job->funcname(
409             $client->funcid_to_name( $driver, $hashdsn, $job->funcid ) );
410              
411             ## Update the job's grabbed_until column so that
412             ## no one else takes it.
413 0           my $worker_class = $job->funcname;
414 0           my $old_grabbed_until = $job->grabbed_until;
415              
416 0 0         my $server_time = $client->get_server_time($driver)
417             or die "expected a server time";
418              
419 0   0       $job->grabbed_until(
420             $server_time + ( $worker_class->grab_for || 1 ) );
421              
422             ## Update the job in the database, and end the transaction.
423             ## NOTE: For some reason, D::OD doesn't ensure the object's value is
424             ## in bounds of original search query. so we need to be more paranoic
425             ## to make sure it's not grabbed by other workers.
426 0           my $unixtime = $driver->dbd->sql_for_unixtime;
427 0 0         if ( $driver->update( $job, {
428             grabbed_until => [
429             '-and',
430             { op => '=', value => $old_grabbed_until},
431             \" <= $unixtime"
432             ]}) < 1 )
433             {
434             ## We lost the race to get this particular job--another worker must
435             ## have got it and already updated it. Move on to the next job.
436 0 0         $T_LOST_RACE->() if $T_LOST_RACE;
437 0           next JOB;
438             }
439              
440             ## Now prepare the job, and return it.
441 0           my $handle = TheSchwartz::JobHandle->new(
442             { dsn_hashed => $hashdsn,
443             jobid => $job->jobid,
444             }
445             );
446 0           $handle->client($client);
447 0           $job->handle($handle);
448 0           return $job;
449             }
450              
451 0           return;
452             }
453              
454             sub shuffled_databases {
455 0     0 0   my TheSchwartz $client = shift;
456 0           my @dsns = keys %{ $client->{databases} };
  0            
457 0           return shuffle(@dsns);
458             }
459              
460             sub insert_job_to_driver {
461 0     0 0   my $client = shift;
462 0           my ( $job, $driver, $hashdsn ) = @_;
463 0           eval {
464             ## Set the funcid of the job, based on the funcname. Since each
465             ## database has a separate cache, this needs to be calculated based
466             ## on the hashed DSN. Also: this might fail, if the database is dead.
467 0           $job->funcid(
468             $client->funcname_to_id( $driver, $hashdsn, $job->funcname ) );
469              
470             ## This is sub-optimal because of clock skew, but something is
471             ## better than a NULL value. And currently, nothing in TheSchwartz
472             ## code itself uses insert_time. TODO: use server time, but without
473             ## having to do a roundtrip just to get the server time.
474 0           $job->insert_time(time);
475              
476             ## Now, insert the job. This also might fail.
477 0           $driver->insert($job);
478             };
479 0 0         if ($@) {
    0          
480 0 0 0       unless ( OK_ERRORS->{ $driver->last_error || 0 } ) {
481 0           $client->mark_database_as_dead($hashdsn);
482             }
483             }
484             elsif ( $job->jobid ) {
485             ## We inserted the job successfully!
486             ## Attach a handle to the job, and return the handle.
487 0           my $handle = TheSchwartz::JobHandle->new(
488             { dsn_hashed => $hashdsn,
489             client => $client,
490             jobid => $job->jobid
491             }
492             );
493 0           $job->handle($handle);
494 0           return $handle;
495             }
496 0           return;
497             }
498              
499             sub insert_jobs {
500 0     0 1   my TheSchwartz $client = shift;
501 0           my (@jobs) = @_;
502              
503             ## Try each of the databases that are registered with $client, in
504             ## random order. If we successfully create the job, exit the loop.
505 0           my @handles;
506             DATABASE:
507 0           for my $hashdsn ( $client->shuffled_databases ) {
508             ## If the database is dead, skip it.
509 0 0         next if $client->is_database_dead($hashdsn);
510              
511 0           my $driver = $client->driver_for($hashdsn);
512 0           $driver->begin_work;
513 0           for my $j (@jobs) {
514 0           my $h = $client->insert_job_to_driver( $j, $driver, $hashdsn );
515 0 0         if ($h) {
516 0           push @handles, $h;
517             }
518             else {
519 0           $driver->rollback;
520 0           @handles = ();
521 0           next DATABASE;
522             }
523             }
524 0 0         last if eval { $driver->commit };
  0            
525 0           @handles = ();
526 0           next DATABASE;
527             }
528              
529 0 0         return wantarray ? @handles : scalar @handles;
530             }
531              
532             sub insert {
533 0     0 1   my TheSchwartz $client = shift;
534 0           my $job = shift;
535 0 0         if ( ref( $_[0] ) eq "TheSchwartz::Job" ) {
536 0           croak "Can't insert multiple jobs with method 'insert'\n";
537             }
538 0 0         unless ( ref($job) eq 'TheSchwartz::Job' ) {
539 0           $job = TheSchwartz::Job->new_from_array( $job, $_[0] );
540             }
541              
542             ## Try each of the databases that are registered with $client, in
543             ## random order. If we successfully create the job, exit the loop.
544 0           for my $hashdsn ( $client->shuffled_databases ) {
545             ## If the database is dead, skip it.
546 0 0         next if $client->is_database_dead($hashdsn);
547              
548 0           my $driver = $client->driver_for($hashdsn);
549              
550             ## Try to insert the job into this database. If we get a handle
551             ## back, return it.
552 0           my $handle = $client->insert_job_to_driver( $job, $driver, $hashdsn );
553 0 0         return $handle if $handle;
554             }
555              
556             ## If the job wasn't submitted successfully to any database, return.
557 0           return;
558             }
559              
560             sub handle_from_string {
561 0     0 0   my TheSchwartz $client = shift;
562 0           my $handle = TheSchwartz::JobHandle->new_from_string(@_);
563 0           $handle->client($client);
564 0           return $handle;
565             }
566              
567             sub can_do {
568 0     0 1   my TheSchwartz $client = shift;
569 0           my ($class) = @_;
570 0           push @{ $client->{all_abilities} }, $class;
  0            
571 0           push @{ $client->{current_abilities} }, $class;
  0            
572             }
573              
574             sub reset_abilities {
575 0     0 0   my TheSchwartz $client = shift;
576 0           $client->{all_abilities} = [];
577 0           $client->{current_abilities} = [];
578             }
579              
580             sub restore_full_abilities {
581 0     0 0   my $client = shift;
582 0           $client->{current_abilities} = [ @{ $client->{all_abilities} } ];
  0            
583             }
584              
585             sub temporarily_remove_ability {
586 0     0 0   my $client = shift;
587 0           my ($class) = @_;
588             $client->{current_abilities}
589 0           = [ grep { $_ ne $class } @{ $client->{current_abilities} } ];
  0            
  0            
590 0 0         if ( !@{ $client->{current_abilities} } ) {
  0            
591 0           $client->restore_full_abilities;
592             }
593             }
594              
595             sub work_on {
596 0     0 1   my TheSchwartz $client = shift;
597 0           my $hstr = shift; # Handle string
598 0 0         my $job = $client->lookup_job($hstr)
599             or return 0;
600 0           return $client->work_once($job);
601             }
602              
603             sub grab_and_work_on {
604 0     0 1   my TheSchwartz $client = shift;
605 0           my $hstr = shift; # Handle string
606 0 0         my $job = $client->lookup_job($hstr)
607             or return 0;
608              
609             ## check that the job is grabbable
610 0           my $hashdsn = $job->handle->dsn_hashed;
611 0           my $driver = $client->driver_for($hashdsn);
612 0           my $current_time = $client->get_server_time($driver);
613 0 0         return 0 if $current_time < $job->grabbed_until;
614              
615             ## grab the job the usual way
616 0 0         $job = $client->_grab_a_job( $hashdsn, $job )
617             or return 0;
618              
619 0           return $client->work_once($job);
620             }
621              
622             sub work {
623 0     0 1   my TheSchwartz $client = shift;
624 0           my ($delay) = @_;
625 0   0       $delay ||= 5;
626 0           while (1) {
627 0 0         sleep $delay unless $client->work_once;
628             }
629             }
630              
631             sub work_until_done {
632 0     0 1   my TheSchwartz $client = shift;
633 0           while (1) {
634 0 0         $client->work_once or last;
635             }
636             }
637              
638             ## Returns true if it did something, false if no jobs were found
639             sub work_once {
640 0     0 1   my TheSchwartz $client = shift;
641 0           my $job = shift; # optional specific job to work on
642              
643             ## Look for a job with our current set of abilities. Note that the
644             ## list of current abilities may not be equal to the full set of
645             ## abilities, to allow for even distribution between jobs.
646 0   0       $job ||= $client->find_job_for_workers;
647              
648             ## If we didn't find anything, restore our full abilities, and try
649             ## again.
650 0 0 0       if ( !$job
      0        
651             && !$client->{strict_remove_ability}
652 0           && @{ $client->{current_abilities} } < @{ $client->{all_abilities} } )
  0            
653             {
654 0           $client->restore_full_abilities;
655 0           $job = $client->find_job_for_workers;
656             }
657              
658 0 0         my $class = $job ? $job->funcname : undef;
659 0 0         if ($job) {
660 0 0         my $priority = $job->priority ? ", priority " . $job->priority : "";
661 0           $job->debug(
662             "TheSchwartz::work_once got job of class '$class'$priority");
663             }
664             else {
665 0           $client->debug("TheSchwartz::work_once found no jobs");
666             }
667              
668             ## If we still don't have anything, return.
669 0 0         return unless $job;
670              
671             ## Now that we found a job for this particular funcname, remove it
672             ## from our list of current abilities. So the next time we look for a
673             ## we'll find a job for a different funcname. This prevents starvation of
674             ## high funcid values because of the way MySQL's indexes work.
675             ## BUGBUG this looks odd since ordering by job_id should limit any skew ...
676 0 0         $client->temporarily_remove_ability($class) unless($client->{strict_remove_ability});
677              
678 0           $class->work_safely($job);
679              
680             ## We got a job, so return 1 so work_until_done (which calls this method)
681             ## knows to keep looking for jobs.
682 0           return 1;
683             }
684              
685             sub funcid_to_name {
686 0     0 0   my TheSchwartz $client = shift;
687 0           my ( $driver, $hashdsn, $funcid ) = @_;
688 0           my $cache = $client->_funcmap_cache($hashdsn);
689 0           return $cache->{funcid2name}{$funcid};
690             }
691              
692             sub funcname_to_id {
693 0     0 0   my TheSchwartz $client = shift;
694 0           my ( $driver, $hashdsn, $funcname ) = @_;
695 0           my $cache = $client->_funcmap_cache($hashdsn);
696 0 0         unless ( exists $cache->{funcname2id}{$funcname} ) {
697 0           my $map = TheSchwartz::FuncMap->create_or_find( $driver, $funcname );
698 0           $cache->{funcname2id}{ $map->funcname } = $map->funcid;
699 0           $cache->{funcid2name}{ $map->funcid } = $map->funcname;
700             }
701 0           return $cache->{funcname2id}{$funcname};
702             }
703              
704             sub _funcmap_cache {
705 0     0     my TheSchwartz $client = shift;
706 0           my ($hashdsn) = @_;
707 0 0         unless ( exists $client->{funcmap_cache}{$hashdsn} ) {
708 0           my $driver = $client->driver_for($hashdsn);
709 0           my @maps = $driver->search('TheSchwartz::FuncMap');
710 0           my $cache = { funcname2id => {}, funcid2name => {} };
711 0           for my $map (@maps) {
712 0           $cache->{funcname2id}{ $map->funcname } = $map->funcid;
713 0           $cache->{funcid2name}{ $map->funcid } = $map->funcname;
714             }
715 0           $client->{funcmap_cache}{$hashdsn} = $cache;
716             }
717 0           return $client->{funcmap_cache}{$hashdsn};
718             }
719              
720             # accessors
721              
722             sub verbose {
723 0     0 1   my TheSchwartz $client = shift;
724 0           return $client->{verbose};
725             }
726              
727             sub set_verbose {
728 0     0 1   my TheSchwartz $client = shift;
729 0           my $logger = shift; # or non-coderef to just print to stderr
730 0 0 0       if ( $logger && ref $logger ne "CODE" ) {
731             $logger = sub {
732 0     0     my $msg = shift;
733 0           $msg =~ s/\s+$//;
734 0           print STDERR "$msg\n";
735 0           };
736             }
737 0           $client->{verbose} = $logger;
738             }
739              
740             sub scoreboard {
741 0     0 1   my TheSchwartz $client = shift;
742              
743 0           return $client->{scoreboard};
744             }
745              
746             sub set_scoreboard {
747 0     0 1   my TheSchwartz $client = shift;
748 0           my ($dir) = @_;
749              
750 0 0         return unless $dir;
751              
752             # They want the scoreboard but don't care where it goes
753 0 0 0       if ( ( $dir eq '1' ) or ( $dir eq 'on' ) ) {
754              
755             # Find someplace in tmpfs to save this
756 0           foreach my $d (qw(/var/run /dev/shm)) {
757 0           $dir = $d;
758 0 0         last if -e $dir;
759             }
760             }
761              
762 0           $dir .= '/theschwartz';
763 0 0         unless ( -e $dir ) {
764 0 0         mkdir( $dir, 0755 )
765             or die "Can't create scoreboard directory '$dir': $!";
766             }
767              
768 0           $client->{scoreboard} = $dir . "/scoreboard.$$";
769             }
770              
771             sub start_scoreboard {
772 0     0 1   my TheSchwartz $client = shift;
773              
774             # Don't do anything if we're not configured to write to the scoreboard
775 0           my $scoreboard = $client->scoreboard;
776 0 0         return unless $scoreboard;
777              
778             # Don't do anything of (for some reason) we don't have a current job
779 0           my $job = $client->current_job;
780 0 0         return unless $job;
781              
782 0           my $class = $job->funcname;
783              
784 0 0         open( my $SB, '>', $scoreboard )
785             or $job->debug("Could not write scoreboard '$scoreboard': $!");
786 0   0       print $SB join(
      0        
787             "\n",
788             ( "pid=$$",
789             'funcname=' . ( $class || '' ),
790             'started=' . ( $job->grabbed_until - ( $class->grab_for || 1 ) ),
791             'arg=' . _serialize_args( $job->arg ),
792             )
793             ),
794             "\n";
795 0           close($SB);
796              
797 0           return;
798             }
799              
800             # Quick and dirty serializer. Don't use Data::Dumper because we don't need to
801             # recurse indefinitely and we want to truncate the output produced
802             sub _serialize_args {
803 0     0     my ($args) = @_;
804              
805 0 0         if ( ref $args ) {
806 0 0         if ( ref $args eq 'HASH' ) {
    0          
807             return join ',', map {
808 0   0       ( $_ || '' ) . '=' . substr( $args->{$_} || '', 0, 200 )
  0   0        
809             }
810             keys %$args;
811             }
812             elsif ( ref $args eq 'ARRAY' ) {
813 0   0       return join ',', map { substr( $_ || '', 0, 200 ) } @$args;
  0            
814             }
815             }
816             else {
817 0           return $args;
818             }
819             }
820              
821             sub end_scoreboard {
822 0     0 1   my TheSchwartz $client = shift;
823              
824             # Don't do anything if we're not configured to write to the scoreboard
825 0           my $scoreboard = $client->scoreboard;
826 0 0         return unless $scoreboard;
827              
828 0           my $job = $client->current_job;
829              
830 0 0         open( my $SB, '>>', $scoreboard )
831             or $job->debug("Could not append scoreboard '$scoreboard': $!");
832 0           print $SB "done=" . time . "\n";
833 0           close($SB);
834              
835 0           return;
836             }
837              
838             sub clean_scoreboard {
839 0     0 1   my TheSchwartz $client = shift;
840              
841             # Don't do anything if we're not configured to write to the scoreboard
842 0           my $scoreboard = $client->scoreboard;
843 0 0         return unless $scoreboard;
844              
845 0           unlink($scoreboard);
846             }
847              
848             sub prioritize {
849 0     0 1   my TheSchwartz $client = shift;
850 0           return $client->{prioritize};
851             }
852              
853             sub set_prioritize {
854 0     0 1   my TheSchwartz $client = shift;
855 0           $client->{prioritize} = shift;
856             }
857              
858             sub floor {
859 0     0 1   my TheSchwartz $client = shift;
860 0           return $client->{floor};
861             }
862              
863             sub set_floor {
864 0     0 1   my TheSchwartz $client = shift;
865 0 0         die "set_floor only works if prioritize is set."
866             unless ( $client->prioritize );
867 0           $client->{floor} = shift;
868             }
869              
870             sub batch_size {
871 0     0 1   my TheSchwartz $client = shift;
872 0           return $client->{batch_size};
873             }
874              
875             sub set_batch_size {
876 0     0 1   my TheSchwartz $client = shift;
877 0           $client->{batch_size} = shift;
878             }
879              
880             # current job being worked. so if something dies, work_safely knows which to mark as dead.
881             sub current_job {
882 0     0 0   my TheSchwartz $client = shift;
883 0           $client->{current_job};
884             }
885              
886             sub set_current_job {
887 0     0 0   my TheSchwartz $client = shift;
888 0           $client->{current_job} = shift;
889             }
890              
891             sub strict_remove_ability {
892 0     0 1   my TheSchwartz $client = shift;
893 0           return $client->{strict_remove_ability};
894             }
895              
896             sub set_strict_remove_ability {
897 0     0 1   my TheSchwartz $client = shift;
898 0           $client->{strict_remove_ability} = shift;
899             }
900              
901             DESTROY {
902 0     0     foreach my $arg (@_) {
903              
904             # Call 'clean_scoreboard' on TheSchwartz objects
905 0 0 0       if ( ref($arg) and $arg->isa('TheSchwartz') ) {
906 0           $arg->clean_scoreboard;
907             }
908             }
909             }
910              
911             1;
912              
913             __END__