File Coverage

blib/lib/POE/Component/Gearman/Client/Connection.pm
Criterion Covered Total %
statement 48 221 21.7
branch 0 72 0.0
condition 0 10 0.0
subroutine 16 41 39.0
pod 0 15 0.0
total 64 359 17.8


line stmt bran cond sub pod time code
1             package POE::Component::Gearman::Client::Connection;
2 1     1   5 use strict;
  1         1  
  1         32  
3 1     1   6 use warnings;
  1         1  
  1         31  
4              
5 1     1   4 use Carp qw(croak);
  1         1  
  1         54  
6             use fields (
7 1         6 'state', # one of 3 state constants below
8             'waiting', # hashref of $handle -> [ Task+ ]
9             'need_handle', # arrayref of Gearman::Task objects which
10             # have been submitted but need handles.
11             'parser', # parser object
12             'hostspec', # scalar: "host:ip"
13             'deadtime', # unixtime we're marked dead until.
14             'task2handle', # hashref of stringified Task -> scalar handle
15             'on_ready', # arrayref of on_ready callbacks to run on connect success
16             'on_error', # arrayref of on_error callbacks to run on connect failure
17             'poe_session_id', # POE::Component::Client::TCP session ID
18 1     1   3 );
  1         2  
19 1     1   88 use Gearman::Task;
  1         3  
  1         20  
20 1     1   4 use Gearman::Util;
  1         2  
  1         22  
21 1     1   4 use Scalar::Util qw(weaken);
  1         1  
  1         109  
22 1     1   3105 use POE qw(Component::Client::TCP Filter::Stream);
  1         40508  
  1         10  
23              
24 1     1   208473 use constant S_DISCONNECTED => \"disconnected";
  1         3  
  1         79  
25 1     1   6 use constant S_CONNECTING => \"connecting";
  1         2  
  1         51  
26 1     1   8 use constant S_READY => \"ready";
  1         2  
  1         2632  
27              
28             sub DEBUGGING () { 0 }
29              
30             sub new {
31 0     0 0   my __PACKAGE__ $self = shift;
32 0           my %opts = @_;
33            
34 0 0         $self = fields::new($self) unless ref $self;
35            
36 0 0         $self->{hostspec} = delete($opts{hostspec}) or croak("hostspec required");
37 0 0         croak("hostspec must be in host:port format") if ref $self->{hostspec};
38             # TODO: read timeout param
39            
40 0           $self->{state} = S_DISCONNECTED;
41 0           $self->{waiting} = {};
42 0           $self->{need_handle} = [];
43 0           $self->{deadtime} = 0;
44 0           $self->{on_ready} = [];
45 0           $self->{on_error} = [];
46 0           $self->{task2handle} = {};
47            
48 0 0         croak "Unknown parameters: " . join(", ", keys %opts) if %opts;
49 0           return $self;
50             }
51              
52             sub connect {
53 0     0 0   my __PACKAGE__ $self = shift;
54            
55 0           $self->{state} = S_CONNECTING;
56            
57 0           my ($host, $port) = split /:/, $self->{hostspec};
58 0   0       $port ||= 7003;
59 0           warn "Connecting to $self->{hostspec}\n" if DEBUGGING;
60            
61             $self->{poe_session_id} = POE::Component::Client::TCP->new(
62             RemoteAddress => $host,
63             RemotePort => $port,
64             Filter => POE::Filter::Stream->new(),
65 0     0     Started => sub { $_[HEAP]{connection} = $self },
66 0           Connected => \&_onConnect,
67             ConnectError => \&_onConnectError,
68             Disconnected => \&_onRead,
69             ServerInput => \&_onRead,
70             ServerError => \&_onError
71             );
72            
73 0           $self->{parser} = Gearman::ResponseParser::Async->new($self);
74             }
75              
76             sub get_session {
77 0     0 0   my __PACKAGE__ $self = shift;
78 0           return POE::Kernel->ID_id_to_session($self->{poe_session_id});
79             }
80              
81             sub close {
82 0     0 0   my __PACKAGE__ $self = shift;
83 0           $self->{state} = S_DISCONNECTED;
84 0           $self->_requeue_all;
85 0           POE::Kernel->post( $self->get_session, 'shutdown' );
86             }
87              
88             sub add_task {
89 0     0 0   my __PACKAGE__ $self = shift;
90 0           my Gearman::Task $task = shift;
91            
92 0 0         Carp::confess("add_task called when in wrong state")
93             unless $self->{state} == S_READY;
94            
95 0           warn "writing task $task to $self->{hostspec}\n" if DEBUGGING;
96            
97 0           $self->write( $task->pack_submit_packet );
98 0           push @{$self->{need_handle}}, $task;
  0            
99 0           Scalar::Util::weaken($self->{need_handle}->[-1]);
100             }
101              
102             # copy-and-paste from Gearman::Client::Async::Connection code
103             sub close_when_finished {
104 0     0 0   my __PACKAGE__ $self = shift;
105             # FIXME: implement
106             }
107              
108             # copy-and-paste from Gearman::Client::Async::Connection code
109             sub hostspec {
110 0     0 0   my __PACKAGE__ $self = shift;
111 0           return $self->{hostspec};
112             }
113              
114             # copy-and-paste from Gearman::Client::Async::Connection code
115             sub get_in_ready_state {
116 0     0 0   my ($self, $on_ready, $on_error) = @_;
117            
118 0 0         if ($self->{state} == S_READY) {
119 0           $on_ready->();
120 0           return;
121             }
122              
123 0 0         push @{$self->{on_ready}}, $on_ready if $on_ready;
  0            
124 0 0         push @{$self->{on_error}}, $on_error if $on_error;
  0            
125              
126 0 0         $self->connect if $self->{state} == S_DISCONNECTED;
127             }
128              
129             # copy-and-paste from Gearman::Client::Async::Connection code
130             sub mark_dead {
131 0     0 0   my __PACKAGE__ $self = shift;
132 0           $self->{deadtime} = time + 10;
133 0           warn "$self->{hostspec} marked dead for a bit." if DEBUGGING;
134             }
135              
136             # copy-and-paste from Gearman::Client::Async::Connection code
137             sub alive {
138 0     0 0   my __PACKAGE__ $self = shift;
139 0           return $self->{deadtime} <= time;
140             }
141              
142             # copy-and-paste from Gearman::Client::Async::Connection code
143             sub destroy_callbacks {
144 0     0 0   my __PACKAGE__ $self = shift;
145 0           $self->{on_ready} = [];
146 0           $self->{on_error} = [];
147             }
148              
149             # copy-and-paste from Gearman::Client::Async::Connection code
150             sub stuff_outstanding {
151 0     0 0   my __PACKAGE__ $self = shift;
152             return
153             @{$self->{need_handle}} ||
154 0   0       %{$self->{waiting}};
155             }
156              
157             # copy-and-paste from Gearman::Client::Async::Connection code
158             sub _requeue_all {
159 0     0     my __PACKAGE__ $self = shift;
160              
161 0           my $need_handle = $self->{need_handle};
162 0           my $waiting = $self->{waiting};
163              
164 0           $self->{need_handle} = [];
165 0           $self->{waiting} = {};
166              
167 0           while (@$need_handle) {
168 0           my $task = shift @$need_handle;
169 0           warn "Task $task in need_handle queue during socket error, queueing for redispatch\n" if DEBUGGING;
170 0 0         $task->fail if $task;
171             }
172              
173 0           while (my ($shandle, $tasklist) = each( %$waiting )) {
174 0           foreach my $task (@$tasklist) {
175 0           warn "Task $task ($shandle) in waiting queue during socket error, queueing for redispatch\n" if DEBUGGING;
176 0           $task->fail;
177             }
178             }
179             }
180              
181             # copy-and-paste from Gearman::Client::Async::Connection code
182             sub process_packet {
183 0     0 0   my __PACKAGE__ $self = shift;
184 0           my $res = shift;
185              
186 0           warn "Got packet '$res->{type}' from $self->{hostspec}\n" if DEBUGGING;
187              
188 0 0         if ($res->{type} eq "job_created") {
189              
190 0 0         die "Um, got an unexpected job_created notification" unless @{ $self->{need_handle} };
  0            
191 0 0         my Gearman::Task $task = shift @{ $self->{need_handle} } or
  0            
192             return 1;
193              
194              
195 0           my $shandle = ${ $res->{'blobref'} };
  0            
196 0 0         if ($task) {
197 0           $self->{task2handle}{"$task"} = $shandle;
198 0   0       push @{ $self->{waiting}->{$shandle} ||= [] }, $task;
  0            
199             }
200 0           return 1;
201             }
202              
203 0 0         if ($res->{type} eq "work_fail") {
204 0           my $shandle = ${ $res->{'blobref'} };
  0            
205 0           $self->_fail_jshandle($shandle);
206 0           return 1;
207             }
208              
209 0 0         if ($res->{type} eq "work_complete") {
210 0 0         ${ $res->{'blobref'} } =~ s/^(.+?)\0//
  0            
211             or die "Bogus work_complete from server";
212 0           my $shandle = $1;
213              
214 0 0         my $task_list = $self->{waiting}{$shandle} or
215             return;
216              
217 0 0         my Gearman::Task $task = shift @$task_list or
218             return;
219              
220 0           $task->complete($res->{'blobref'});
221              
222 0 0         unless (@$task_list) {
223 0           delete $self->{waiting}{$shandle};
224 0           delete $self->{task2handle}{"$task"};
225             }
226              
227 0           warn "Jobs: " . scalar( keys( %{$self->{waiting}} ) ) . "\n" if DEBUGGING;
228              
229 0           return 1;
230             }
231              
232 0 0         if ($res->{type} eq "work_status") {
233 0           my ($shandle, $nu, $de) = split(/\0/, ${ $res->{'blobref'} });
  0            
234              
235 0 0         my $task_list = $self->{waiting}{$shandle} or
236             return;
237              
238 0           foreach my Gearman::Task $task (@$task_list) {
239 0           $task->status($nu, $de);
240             }
241              
242 0           return 1;
243             }
244              
245 0           die "Unknown/unimplemented packet type: $res->{type}";
246              
247             }
248              
249             # copy-and-paste from Gearman::Client::Async::Connection code
250             sub give_up_on {
251 0     0 0   my __PACKAGE__ $self = shift;
252 0           my $task = shift;
253              
254 0 0         my $shandle = $self->{task2handle}{"$task"} or return;
255 0 0         my $task_list = $self->{waiting}{$shandle} or return;
256 0           @$task_list = grep { $_ != $task } @$task_list;
  0            
257 0 0         unless (@$task_list) {
258 0           delete $self->{waiting}{$shandle};
259             }
260              
261             }
262              
263             # copy-and-paste from Gearman::Client::Async::Connection code
264             # note the failure of a task given by its jobserver-specific handle
265             sub _fail_jshandle {
266 0     0     my __PACKAGE__ $self = shift;
267 0           my $shandle = shift;
268              
269 0 0         my $task_list = $self->{waiting}->{$shandle} or
270             return;
271              
272 0 0         my Gearman::Task $task = shift @$task_list or
273             return;
274              
275             # cleanup
276 0 0         unless (@$task_list) {
277 0           delete $self->{task2handle}{"$task"};
278 0           delete $self->{waiting}{$shandle};
279             }
280              
281 0           $task->fail;
282             }
283              
284             sub write {
285 0     0 0   my $self = shift;
286 0           my $input = shift;
287 0           my $heap = $self->get_session->get_heap;
288 0 0         croak("writing to non-connected socket") unless $heap->{connected};
289 0           $heap->{server}->put($input);
290             }
291              
292             sub _onConnect {
293 0     0     my $self = $_[HEAP]{connection};
294            
295 0 0         if ($self->{state} == S_CONNECTING) {
296 0           $self->{state} = S_READY;
297 0           warn "$self->{hostspec} connected and ready.\n" if DEBUGGING;
298 0           $_->() foreach @{$self->{on_ready}};
  0            
299 0           $self->destroy_callbacks;
300             }
301             }
302              
303             sub _onConnectError {
304 0     0     my $self = $_[HEAP]{connection};
305 0           warn "Jobserver, $self->{hostspec} ($self) has failed to connect properly\n" if DEBUGGING;
306            
307 0           $self->mark_dead;
308 0           $self->close;
309 0           $_->() foreach @{$self->{on_error}};
  0            
310 0           $self->destroy_callbacks;
311             }
312              
313             sub _onError {
314 0     0     my $self = $_[HEAP]{connection};
315 0           my $was_connecting = ($self->{state} == S_CONNECTING);
316              
317 0 0 0       if ($was_connecting && $self->{t_offline}) {
318 0           return;
319             }
320              
321 0           $self->mark_dead;
322 0           $self->close;
323 0 0         $self->on_connect_error if $was_connecting;
324             }
325              
326             sub _onRead {
327 0     0     my $self = $_[HEAP]{connection};
328            
329 0           my $input = $_[ARG0]; # should we tell POE::Filter to buffer in chunks of 128 * 1024?
330 0 0         unless (defined $input) {
331 0 0         $self->mark_dead if $self->stuff_outstanding;
332 0           $self->close;
333 0           return;
334             }
335              
336 0           $self->{parser}->parse_data(\$input);
337             }
338              
339              
340             # copy-and-paste from Gearman::Client::Async::Connection code
341             package Gearman::ResponseParser::Async;
342              
343 1     1   9 use strict;
  1         3  
  1         35  
344 1     1   7 use warnings;
  1         2  
  1         52  
345 1     1   6 use Scalar::Util qw(weaken);
  1         2  
  1         73  
346              
347 1     1   6 use Gearman::ResponseParser;
  1         2  
  1         28  
348 1     1   5 use base 'Gearman::ResponseParser';
  1         2  
  1         279  
349              
350             sub new {
351 0     0     my $class = shift;
352              
353 0           my $self = $class->SUPER::new;
354              
355 0           $self->{_conn} = shift;
356 0           weaken($self->{_conn});
357              
358 0           return $self;
359             }
360              
361             sub on_packet {
362 0     0     my $self = shift;
363 0           my $packet = shift;
364              
365 0 0         return unless $self->{_conn};
366 0           $self->{_conn}->process_packet( $packet );
367             }
368              
369             sub on_error {
370 0     0     my $self = shift;
371              
372 0 0         return unless $self->{_conn};
373 0           $self->{_conn}->mark_unsafe; # where's this?
374 0           $self->{_conn}->close;
375             }
376              
377             1;