File Coverage

blib/lib/WorkerManager/Gearman.pm
Criterion Covered Total %
statement 15 50 30.0
branch 0 6 0.0
condition 0 13 0.0
subroutine 5 12 41.6
pod 0 4 0.0
total 20 85 23.5


line stmt bran cond sub pod time code
1             package WorkerManager::Gearman;
2 1     1   600 use strict;
  1         2  
  1         23  
3 1     1   4 use warnings;
  1         2  
  1         22  
4 1     1   366 use Module::Load ();
  1         918  
  1         19  
5 1     1   397 use Gearman::Worker;
  1         91930  
  1         48  
6              
7             our $VERSION = '0.1000';
8              
9             use Class::Accessor::Lite (
10 1         6 rw => [qw(
11             job_servers
12             prefix
13             worker_classes
14             workers
15             )],
16 1     1   404 );
  1         985  
17              
18             sub new {
19 0     0 0   my ($class, $worker_classes, $options) = @_;
20 0   0       $options ||= {};
21              
22 0   0       my $prefix = delete $options->{prefix} || '';
23 0           my $job_servers;
24 0 0         if ($job_servers = delete $options->{job_servers}) {
25 0 0         $job_servers = [$job_servers] if ref $job_servers ne 'ARRAY';
26             }
27             else {
28 0           $job_servers = [qw(127.0.0.1)];
29             }
30              
31 0   0       my $self = $class->SUPER::new({
32             job_servers => $job_servers,
33             prefix => $prefix,
34             worker_classes => $worker_classes || [],
35             terminate => undef,
36             workers => [],
37             });
38 0           $self->init;
39 0           $self;
40             }
41              
42             sub init {
43 0     0 0   my $self = shift;
44 0           for my $worker_class (@{$self->worker_classes}) {
  0            
45 0           Module::Load::load($worker_class);
46 0           push @{$self->workers}, $worker_class->new({
  0            
47             job_servers => $self->job_servers,
48             prefix => $self->prefix,
49             });
50             }
51             }
52              
53             sub work {
54 0     0 0   my $self = shift;
55 0   0       my $max = shift || 100;
56 0   0       my $delay = shift || 5;
57 0           my $count = 0;
58 0   0       while ($count < $max && !$self->{terminate}) {
59 0 0         if (getppid == 1) {
60 0           die "my dad may be killed.";
61 0           exit(1);
62             }
63 0           for my $worker (@{$self->workers}) {
  0            
64             $worker->worker->work(
65             on_start => sub {
66 0     0     my $job = shift;
67 0           $WorkerManager::LOGGER->('Gearman', sprintf('started: %s', ref $worker));
68             },
69             on_complete => sub {
70 0     0     $WorkerManager::LOGGER->('Gearman', sprintf('job completed: %s', ref $worker));
71             },
72             on_fail => sub {
73 0     0     $WorkerManager::LOGGER->('Gearman', sprintf('job failed: %s', ref $worker));
74             },
75 0           );
76             }
77 0           $count++;
78 0           sleep $delay;
79             }
80             }
81              
82             sub terminate {
83 0     0 0   my $self = shift;
84 0           $self->{terminate} = 1;
85             }
86              
87             1;
88              
89             __END__