File Coverage

blib/lib/Gearman/Client/Async.pm
Criterion Covered Total %
statement 73 102 71.5
branch 7 26 26.9
condition 1 3 33.3
subroutine 15 21 71.4
pod 0 8 0.0
total 96 160 60.0


line stmt bran cond sub pod time code
1             package Gearman::Client::Async;
2              
3             =head1 NAME
4              
5             Gearman::Client::Async - Asynchronous client module for Gearman for Danga::Socket applications
6              
7             =head1 SYNOPSIS
8              
9             use Gearman::Client::Async;
10              
11             # Instantiate a new Gearman::Client::Async object.
12             $client = Gearman::Client::Async->new(
13             job_servers => [ '127.0.0.1', '192.168.0.1:123' ],
14             );
15              
16             # Overwrite job server list with a new one.
17             $client->set_job_servers( '10.0.0.1' );
18              
19             # Read list of job servers out of the client.
20             $arrayref = $client->job_servers;
21             @array = $client->job_servers;
22              
23             # Start a task
24             $task = Gearman::Task->new(...); # with callbacks, etc
25             $client->add_task( $task );
26              
27             =head1 COPYRIGHT
28              
29             Copyright 2006 Six Apart, Ltd.
30              
31             License granted to use/distribute under the same terms as Perl itself.
32              
33             =head1 WARRANTY
34              
35             This is free software. This comes with no warranty whatsoever.
36              
37             =head1 AUTHORS
38              
39             Brad Fitzpatrick (brad@danga.com)
40             Jonathan Steinert (hachi@cpan.org)
41              
42             =cut
43              
44 11     11   134797 use strict;
  11         27  
  11         366  
45 11     11   66 use warnings;
  11         22  
  11         329  
46 11     11   61 use Carp qw(croak);
  11         24  
  11         750  
47              
48             use fields (
49 11         65 'job_servers', # arrayref of Gearman::Client::Async::Connection objects
50             't_no_random', # don't randomize job server to use: use first alive one.
51             't_offline_host', # hashref: hostname -> $bool, if host should act as offline, for testing
52 11     11   8838 );
  11         15721  
53              
54 11     11   10564 use Danga::Socket 1.52;
  11         317191  
  11         359  
55 11     11   8755 use Gearman::Objects;
  11         4659  
  11         290  
56 11     11   8573 use Gearman::Task;
  11         449464  
  11         369  
57 11     11   123 use Gearman::JobStatus;
  11         22  
  11         270  
58 11     11   10098 use Gearman::Client::Async::Connection;
  11         35  
  11         342  
59              
60 11     11   69 use List::Util qw(first);
  11         19  
  11         1154  
61 11     11   64 use vars qw($VERSION);
  11         28  
  11         10335  
62              
63             $VERSION = "0.94";
64              
65             sub DEBUGGING () { 0 }
66              
67             sub new {
68 1     1 0 7678 my ($class, %opts) = @_;
69 1         19 my $self = $class;
70 1 50       50 $self = fields::new($class) unless ref $self;
71              
72 1         128 $self->{job_servers} = [];
73 1         10 $self->{t_offline_host} = {};
74              
75 1         10 my $js = delete $opts{job_servers};
76 1 50       39 $self->set_job_servers(@$js) if $js;
77              
78 1 50       12 croak "Unknown parameters: " . join(", ", keys %opts) if %opts;
79 1         4 return $self;
80             }
81              
82             # for testing.
83             sub t_set_disable_random {
84 0     0 0 0 my $self = shift;
85 0         0 $self->{t_no_random} = shift;
86             }
87              
88             sub t_set_offline_host {
89 0     0 0 0 my ($self, $host, $val) = @_;
90 0 0       0 $val = 1 unless defined $val;
91 0         0 $self->{t_offline_host}{$host} = $val;
92              
93 0 0   0   0 my $conn = first { $_->hostspec eq $host } @{ $self->{job_servers} }
  0         0  
  0         0  
94             or die "No host found with that spec to mark offline";
95              
96 0         0 $conn->t_set_offline($val);
97             }
98              
99             # set job servers, without shutting down dups, and shutting down old ones gracefully
100             sub set_job_servers {
101 1     1 0 17 my Gearman::Client::Async $self = shift;
102              
103 1         3 my %being_set; # hostspec -> 1
104 1         2 %being_set = map { $_, 1 } @_;
  1         12  
105              
106 1         18 my %exist; # hostspec -> existing conn
107 1         3 foreach my $econn (@{ $self->{job_servers} }) {
  1         10  
108 0         0 my $spec = $econn->hostspec;
109 0 0       0 if ($being_set{$spec}) {
110 0         0 $exist{$spec} = $econn;
111             } else {
112 0         0 $econn->close_when_finished;
113             }
114             }
115              
116 1         3 my @newlist;
117 1         12 foreach (@_) {
118 1   33     86 push @newlist, $exist{$_} || Gearman::Client::Async::Connection->new( hostspec => $_ );
119             }
120 1         5 $self->{job_servers} = \@newlist;
121             }
122              
123             # getter
124             sub job_servers {
125 0     0 0 0 my Gearman::Client::Async $self = shift;
126 0 0       0 croak "Not a setter" if @_;
127 0         0 my @list = map { $_->hostspec } @{ $self->{job_servers} };
  0         0  
  0         0  
128 0 0       0 return wantarray ? @list : \@list;
129             }
130              
131             sub add_task {
132 2     2 0 883 my Gearman::Client::Async $self = shift;
133 2         7 my Gearman::Task $task = shift;
134              
135 2         3 my $try_again;
136             $try_again = sub {
137              
138 2     2   3 my @job_servers = grep { $_->alive } @{$self->{job_servers}};
  2         10  
  2         6  
139 2         4 warn "Alive servers: " . @job_servers . " out of " . @{$self->{job_servers}} . "\n" if DEBUGGING;
140 2 50       8 unless (@job_servers) {
141 0         0 $task->final_fail;
142 0         0 $try_again = undef;
143 0         0 return;
144             }
145              
146 2         3 my $js;
147 2 50       21 if (defined( my $hash = $task->hash )) {
148             # Task is hashed, use key to fetch job server
149 0         0 $js = @job_servers[$hash % @job_servers];
150             }
151             else {
152             # Task is not hashed, random job server
153 2 50       93 $js = @job_servers[$self->{t_no_random} ? 0 :
154             int( rand( @job_servers ))];
155             }
156              
157             # TODO Fix this violation of object privacy.
158 2         6 $task->{taskset} = $self;
159              
160             $js->get_in_ready_state(
161             # on_ready:
162             sub {
163 2         3 my $timer;
164 2 50       6 if (my $timeout = $task->{timeout}) {
165             $timer = Danga::Socket->AddTimer($timeout, sub {
166 0         0 $task->final_fail('timeout');
167 0         0 });
168             }
169             $task->set_on_post_hooks(sub {
170 0 0       0 $timer->cancel if $timer;
171              
172             # ALSO clean up our $js (connection's) waiting stuff:
173 0         0 $js->give_up_on($task);
174 2         21 });
175 2         16 $js->add_task( $task );
176 2         6 $try_again = undef;
177             },
178             # on_error:
179 2         20 $try_again,
180             );
181 2         14 };
182 2         9 $try_again->();
183             }
184              
185             # Gearman::Client::Async sometimes fakes itself duck-typing style as a
186             # Gearman::Taskset, since a task"set" makes no sense in an async
187             # world, where there's no need to wait on a set of things... since
188             # everything happens at its own pace. so for duck-typing reasons (or,
189             # er, "implementing an interface", say), we need to implement a the
190             # "taskset client method" but in our case, that's just us.
191 0     0 0   sub client { $_[0] }
192              
193             # as a Gearman::Client-like thing, we'll be asked for our prefix, which this module
194             # currently doesn't support, but the base Gearman libraries expect.
195 0     0 0   sub prefix { "" }
196              
197              
198             1;