File Coverage

blib/lib/WorkerManager/TheSchwartz.pm
Criterion Covered Total %
statement 24 69 34.7
branch 0 26 0.0
condition 0 12 0.0
subroutine 8 13 61.5
pod 0 4 0.0
total 32 124 25.8


line stmt bran cond sub pod time code
1             package WorkerManager::TheSchwartz;
2 1     1   1495 use strict;
  1         2  
  1         26  
3 1     1   4 use warnings;
  1         2  
  1         25  
4              
5 1     1   517 use TheSchwartz;
  1         60737  
  1         28  
6 1     1   7 use Time::Piece;
  1         3  
  1         9  
7 1     1   66 use Module::Load ();
  1         3  
  1         26  
8 1     1   415 use Time::HiRes qw( time );
  1         1088  
  1         10  
9 1     1   167 use POSIX qw(getppid);
  1         2  
  1         6  
10 1     1   62 use Carp;
  1         2  
  1         559  
11              
12             sub new {
13 0     0 0   my ($class, $worker, $options) = @_;
14 0   0       $options ||= {};
15              
16 0           my $databases;
17 0 0         if ($databases = delete $options->{databases}) {
18 0 0         $databases = [$databases] unless UNIVERSAL::isa($databases, 'ARRAY');
19             } else {
20 0           croak 'not specified database information in config file for worker manager';
21             }
22 0           my $client = TheSchwartz->new( databases => $databases, %$options);
23              
24 0           my $self = bless {
25             client => $client,
26             worker => $worker,
27             terminate => undef,
28             start_time => undef,
29             }, $class;
30 0           $self->init;
31 0           $self;
32             }
33              
34             sub init {
35 0     0 0   my $self = shift;
36             $self->{client}->set_verbose(
37             sub {
38 0     0     my $msg = shift;
39 0           my $job = shift;
40             # $WorkerManager::LOGGER->('TheSchwartz', $msg) if($msg =~ /Working/);
41 0 0         if($msg =~ /Working/){
42 0           $self->{start_time} = time;
43             }
44 0 0         return if($msg =~ /found no jobs/);
45 0 0         if($msg =~ /^job completed|^job failed/){
46 0           $msg .= sprintf " %s", $job->funcname;
47 0 0         $msg .= sprintf " process:%d", (time - $self->{start_time}) * 1000 if($self->{start_time});
48 0 0 0       $msg .= sprintf " delay:%d", ($self->{start_time} - $job->insert_time) * 1000 if($job && $self->{start_time});
49 0           $self->{start_time} = undef;
50             };
51 0 0         $WorkerManager::LOGGER->('TheSchwartz', $msg) unless($msg =~ /found no jobs/);
52 0           });
53 0 0         if (UNIVERSAL::isa($self->{worker}, 'ARRAY')){
54 0           for (@{$self->{worker}}){
  0            
55 0           Module::Load::load($_);
56 0 0         $_->can('work') or die "cannot ${_}->work";
57 0           $self->{client}->can_do($_);
58             }
59             } else {
60 0           Module::Load::load($self->{worker});
61 0 0         $_->can('work') or die "cannot ${_}->work";
62 0           $self->{client}->can_do($self->{worker});
63             }
64             }
65              
66             sub work {
67 0     0 0   my $self = shift;
68 0   0       my $max = shift || 100;
69 0   0       my $delay = shift || 5;
70 0           my $count = 0;
71 0   0       while ($count < $max && ! $self->{terminate}) {
72 0 0         if (getppid == 1) {
73 0           die "my dad may be killed.";
74 0           exit(1);
75             }
76 0 0         if($self->{client}->work_once){
77 0           $count++;
78             } else {
79 0           sleep $delay;
80             }
81             }
82             }
83              
84             sub terminate {
85 0     0 0   my $self = shift;
86 0           $self->{terminate} = 1;
87             }
88              
89             1;