File Coverage

blib/lib/POE/Component/Gearman/Client.pm
Criterion Covered Total %
statement 30 107 28.0
branch 0 28 0.0
condition 0 5 0.0
subroutine 10 22 45.4
pod 5 9 55.5
total 45 171 26.3


line stmt bran cond sub pod time code
1             # ===========================================================================
2             # POE::Component::Gearman::Client
3             #
4             # POE-based client for Gearman servers
5             #
6             # Author: Alessandro Ranellucci
7             #
8             # See below for documentation.
9             #
10              
11             package POE::Component::Gearman::Client;
12              
13 1     1   9805 use strict;
  1         2  
  1         37  
14 1     1   5 use vars qw($VERSION);
  1         2  
  1         42  
15              
16 1     1   5 use Carp qw(croak);
  1         6  
  1         54  
17             use fields (
18 1         6 'job_servers', # arrayref of POE::Component::Gearman::Client::Connection objects
19             't_no_random', # don't randomize job server to use: use first alive one.
20             't_offline_host', # hashref: hostname -> $bool, if host should act as offline, for testing
21 1     1   919 );
  1         1757  
22 1     1   1996 use Gearman::Objects;
  1         537  
  1         27  
23 1     1   880 use Gearman::Task;
  1         69138  
  1         26  
24 1     1   105 use Gearman::JobStatus;
  1         2  
  1         24  
25 1     1   4 use List::Util qw(first);
  1         2  
  1         113  
26 1     1   673 use POE::Component::Gearman::Client::Connection;
  1         3  
  1         36  
27 1     1   6 use POE;
  1         2  
  1         11  
28              
29             $VERSION = '0.03';
30              
31             sub DEBUGGING () { 0 }
32              
33             sub spawn {
34 0     0 1   my ($class, %opts) = @_;
35 0           my $self = $class;
36 0 0         $self = fields::new($class) unless ref $self;
37            
38 0           $self->{job_servers} = [];
39 0           $self->{t_offline_host} = {};
40              
41 0           my $js = delete $opts{job_servers};
42 0           my $alias = delete $opts{alias};
43 0 0         croak "Unknown parameters: " . join(", ", keys %opts) if %opts;
44            
45             # register session with POE
46             POE::Session->create(
47             inline_states => {
48             _start => sub {
49 0   0 0     $_[KERNEL]->alias_set( $alias || 'Gearman' );
50            
51             # call instead of yield so that the job_servers method is
52             # instantly available
53 0 0         $_[KERNEL]->call($_[SESSION], 'set_job_servers', $js) if $js;
54             },
55             },
56 0           object_states => [
57             $self => [qw(t_set_disable_random t_set_offline_host
58             set_job_servers add_task disconnect_all)]
59             ]
60             );
61            
62 0           return $self;
63             }
64              
65             # for testing.
66             sub t_set_disable_random {
67 0     0 0   my $self = $_[OBJECT];
68 0           $self->{t_no_random} = shift;
69             }
70              
71             sub t_set_offline_host {
72 0     0 0   my ($self, $host, $val) = @_[OBJECT, ARG0, ARG1];
73 0 0         $val = 1 unless defined $val;
74 0           $self->{t_offline_host}{$host} = $val;
75              
76 0 0   0     my $conn = first { $_->hostspec eq $host } @{ $self->{job_servers} }
  0            
  0            
77             or die "No host found with that spec to mark offline";
78              
79 0           $conn->t_set_offline($val);
80             }
81              
82             # set job servers, without shutting down dups, and shutting down old ones gracefully
83             sub set_job_servers {
84 0     0 1   my ($self, $js) = @_[OBJECT, ARG0];
85              
86 0           my %being_set; # hostspec -> 1
87 0           %being_set = map { $_, 1 } @$js;
  0            
88              
89 0           my %exist; # hostspec -> existing conn
90 0           foreach my $econn (@{ $self->{job_servers} }) {
  0            
91 0           my $spec = $econn->hostspec;
92 0 0         if ($being_set{$spec}) {
93 0           $exist{$spec} = $econn;
94             } else {
95 0           $econn->close_when_finished;
96             }
97             }
98              
99 0           my @newlist;
100 0           foreach (@$js) {
101 0   0       push @newlist, $exist{$_} || POE::Component::Gearman::Client::Connection->new( hostspec => $_ );
102             }
103 0           $self->{job_servers} = \@newlist;
104             }
105              
106             # getter
107             sub job_servers {
108 0     0 1   my $self = shift;
109 0 0         croak "Not a setter" if @_;
110 0           my @list = map { $_->hostspec } @{ $self->{job_servers} };
  0            
  0            
111 0 0         return wantarray ? @list : \@list;
112             }
113              
114             sub add_task {
115 0     0 1   my $self = $_[OBJECT];
116 0 0         my Gearman::Task $task = $_[ARG0] or return;
117              
118 0           my $try_again;
119             $try_again = sub {
120              
121 0     0     my @job_servers = grep { $_->alive } @{$self->{job_servers}};
  0            
  0            
122 0           warn "Alive servers: " . @job_servers . " out of " . @{$self->{job_servers}} . "\n" if DEBUGGING;
123 0 0         unless (@job_servers) {
124 0           $task->final_fail;
125 0           $try_again = undef;
126 0           return;
127             }
128              
129 0           my $js;
130 0 0         if (defined( my $hash = $task->hash )) {
131             # Task is hashed, use key to fetch job server
132 0           $js = @job_servers[$hash % @job_servers];
133             }
134             else {
135             # Task is not hashed, random job server
136 0 0         $js = @job_servers[$self->{t_no_random} ? 0 :
137             int( rand( @job_servers ))];
138             }
139              
140             # TODO Fix this violation of object privacy.
141 0           $task->{taskset} = $self;
142              
143             $js->get_in_ready_state(
144             # on_ready:
145             sub {
146 0           my $timer;
147 0 0         if (my $timeout = $task->{timeout}) {
148             # TODO: setup timer
149             #$timer = Danga::Socket->AddTimer($timeout, sub {
150             # $task->final_fail('timeout');
151             #});
152             }
153             $task->set_on_post_hooks(sub {
154 0 0         $timer->cancel if $timer;
155              
156             # ALSO clean up our $js (connection's) waiting stuff:
157 0           $js->give_up_on($task);
158 0           });
159 0           $js->add_task( $task );
160 0           $try_again = undef;
161             },
162             # on_error:
163 0           $try_again,
164             );
165 0           };
166 0           $try_again->();
167             }
168              
169             sub disconnect_all {
170 0     0 1   my $self = $_[OBJECT];
171 0           warn "Disconnecting all server sockets\n" if DEBUGGING;
172 0           my @job_servers = grep { $_->alive } @{$self->{job_servers}};
  0            
  0            
173 0           warn "Alive servers: " . @job_servers . " out of " . @{$self->{job_servers}} . "\n" if DEBUGGING;
174            
175             # TODO: we should better use close_when_finished
176 0           $_->close for @job_servers;
177             }
178              
179             # POE::Component::Gearman::Client sometimes fakes itself duck-typing style as a
180             # Gearman::Taskset, since a task"set" makes no sense in an async
181             # world, where there's no need to wait on a set of things... since
182             # everything happens at its own pace. so for duck-typing reasons (or,
183             # er, "implementing an interface", say), we need to implement a the
184             # "taskset client method" but in our case, that's just us.
185 0     0 0   sub client { $_[0] }
186              
187             # as a Gearman::Client-like thing, we'll be asked for our prefix, which this module
188             # currently doesn't support, but the base Gearman libraries expect.
189 0     0 0   sub prefix { "" }
190              
191              
192             1;
193             __END__