File Coverage

blib/lib/WorkerManager/TheSchwartz.pm
Criterion Covered Total %
statement 24 71 33.8
branch 0 28 0.0
condition 0 12 0.0
subroutine 8 13 61.5
pod 0 4 0.0
total 32 128 25.0


line stmt bran cond sub pod time code
1             package WorkerManager::TheSchwartz;
2 1     1   1303 use strict;
  1         3  
  1         30  
3 1     1   5 use warnings;
  1         2  
  1         26  
4              
5 1     1   632 use TheSchwartz;
  1         79150  
  1         31  
6 1     1   8 use Time::Piece;
  1         2  
  1         11  
7 1     1   79 use Module::Load ();
  1         3  
  1         34  
8 1     1   527 use Time::HiRes qw( time );
  1         1317  
  1         14  
9 1     1   198 use POSIX qw(getppid);
  1         2  
  1         7  
10 1     1   93 use Carp;
  1         2  
  1         746  
11              
12             sub new {
13 0     0 0   my ($class, $worker, $options) = @_;
14 0   0       $options ||= {};
15              
16 0           my $databases;
17 0 0         if ($databases = delete $options->{databases}) {
18 0 0         $databases = [$databases] unless ref($databases) eq 'ARRAY';
19             } else {
20 0           croak 'not specified database information in config file for worker manager';
21             }
22 0           my $has_custom_verbose = exists $options->{verbose};
23 0           my $client = TheSchwartz->new( databases => $databases, %$options);
24              
25 0           my $self = bless {
26             client => $client,
27             worker => $worker,
28             terminate => undef,
29             start_time => undef,
30             has_custom_verbose => $has_custom_verbose,
31             }, $class;
32 0           $self->init;
33 0           $self;
34             }
35              
36             sub init {
37 0     0 0   my $self = shift;
38              
39 0 0         if (!$self->{has_custom_verbose}) {
40             $self->{client}->set_verbose(
41             sub {
42 0     0     my $msg = shift;
43 0           my $job = shift;
44             # $WorkerManager::LOGGER->('TheSchwartz', $msg) if($msg =~ /Working/);
45 0 0         if($msg =~ /Working/){
46 0           $self->{start_time} = time;
47             }
48 0 0         return if($msg =~ /found no jobs/);
49 0 0         if($msg =~ /^job completed|^job failed/){
50 0           $msg .= sprintf " %s", $job->funcname;
51 0 0         $msg .= sprintf " process:%d", (time - $self->{start_time}) * 1000 if($self->{start_time});
52 0 0 0       $msg .= sprintf " delay:%d", ($self->{start_time} - $job->insert_time) * 1000 if($job && $self->{start_time});
53 0           $self->{start_time} = undef;
54             };
55 0 0         $WorkerManager::LOGGER->('TheSchwartz', $msg) unless($msg =~ /found no jobs/);
56             }
57 0           );
58             }
59              
60 0 0         if (ref($self->{worker}) eq 'ARRAY') {
61 0           for (@{$self->{worker}}){
  0            
62 0           Module::Load::load($_);
63 0 0         $_->can('work') or die "cannot ${_}->work";
64 0           $self->{client}->can_do($_);
65             }
66             } else {
67 0           Module::Load::load($self->{worker});
68 0 0         $_->can('work') or die "cannot ${_}->work";
69 0           $self->{client}->can_do($self->{worker});
70             }
71             }
72              
73             sub work {
74 0     0 0   my $self = shift;
75 0   0       my $max = shift || 100;
76 0   0       my $delay = shift || 5;
77 0           my $count = 0;
78 0   0       while ($count < $max && ! $self->{terminate}) {
79 0 0         if (getppid == 1) {
80 0           die "my dad may be killed.";
81 0           exit(1);
82             }
83 0 0         if($self->{client}->work_once){
84 0           $count++;
85             } else {
86 0           sleep $delay;
87             }
88             }
89             }
90              
91             sub terminate {
92 0     0 0   my $self = shift;
93 0           $self->{terminate} = 1;
94             }
95              
96             1;