File Coverage

blib/lib/Gearman/Server.pm
Criterion Covered Total %
statement 33 219 15.0
branch 0 80 0.0
condition 0 26 0.0
subroutine 11 34 32.3
pod 3 20 15.0
total 47 379 12.4


line stmt bran cond sub pod time code
1             package Gearman::Server;
2 1     1   468 use version;
  1         1  
  1         5  
3             $Gearman::Server::VERSION = qv("v1.130.1");
4              
5 1     1   62 use strict;
  1         1  
  1         15  
6 1     1   8 use warnings;
  1         1  
  1         28  
7              
8             =head1 NAME
9              
10             Gearman::Server - function call "router" and load balancer
11              
12             =head1 DESCRIPTION
13              
14             You run a Gearman server (or more likely, many of them for both
15             high-availability and load balancing), then have workers (using
16             L from the Gearman module, or libraries for other
17             languages) register their ability to do certain functions to all of
18             them, and then clients (using L,
19             L, etc) request work to be done from one of
20             the Gearman servers.
21              
22             The servers connect them, routing function call requests to the
23             appropriate workers, multiplexing responses to duplicate requests as
24             requested, etc.
25              
26             More than likely, you want to use the provided L wrapper
27             script, and not use Gearman::Server directly.
28              
29             =cut
30              
31 1     1   3 use Carp qw(croak);
  1         1  
  1         41  
32 1     1   379 use Gearman::Server::Client;
  1         2  
  1         21  
33 1     1   360 use Gearman::Server::Listener;
  1         1  
  1         21  
34 1     1   313 use Gearman::Server::Job;
  1         2  
  1         18  
35 1     1   4 use IO::Handle ();
  1         0  
  1         18  
36 1         39 use Socket qw/
37             IPPROTO_TCP
38             SOL_SOCKET
39             SOCK_STREAM
40             AF_UNIX
41             SOCK_STREAM
42             PF_UNSPEC
43 1     1   2 /;
  1         1  
44 1     1   3 use Sys::Hostname ();
  1         1  
  1         23  
45              
46             use fields (
47 1         3 'client_map', # fd -> Client
48             'sleepers', # func -> { "Client=HASH(0xdeadbeef)" => Client }
49             'sleepers_list', # func -> [ Client, ... ], ...
50             'job_queue', # job_name -> [Job, Job*] (key only exists if non-empty)
51             'job_of_handle', # handle -> Job
52             'max_queue', # func -> configured max jobqueue size
53             'job_of_uniq', # func -> uniq -> Job
54             'handle_ct', # atomic counter
55             'handle_base', # atomic counter
56             'listeners', # arrayref of listener objects
57             'wakeup', # number of workers to wake
58             'wakeup_delay', # seconds to wait before waking more workers
59             'wakeup_timers', # func -> timer, timer to be canceled or adjusted
60             # when job grab/inject is called
61 1     1   3 );
  1         0  
62              
63             =head1 METHODS
64              
65             =head2 new
66              
67             $server_object = Gearman::Server->new( %options )
68              
69             Creates and returns a new Gearman::Server object, which attaches itself to the Danga::Socket event loop. The server will begin operating when the Danga::Socket runloop is started. This means you need to start up the runloop before anything will happen.
70              
71             Options:
72              
73             =over
74              
75             =item port
76              
77             Specify a port which you would like the Gearman::Server to listen on for TCP connections (not necessary, but useful)
78              
79             =back
80              
81             =cut
82              
83             sub new {
84 0     0 1   my ($class, %opts) = @_;
85 0 0         my $self = ref $class ? $class : fields::new($class);
86              
87 0           $self->{client_map} = {};
88 0           $self->{sleepers} = {};
89 0           $self->{sleepers_list} = {};
90 0           $self->{job_queue} = {};
91 0           $self->{job_of_handle} = {};
92 0           $self->{max_queue} = {};
93 0           $self->{job_of_uniq} = {};
94 0           $self->{listeners} = [];
95 0           $self->{wakeup} = 3;
96 0           $self->{wakeup_delay} = .1;
97 0           $self->{wakeup_timers} = {};
98              
99 0           $self->{handle_ct} = 0;
100 0           $self->{handle_base} = "H:" . Sys::Hostname::hostname() . ":";
101              
102 0           my $port = delete $opts{port};
103              
104 0           my $wakeup = delete $opts{wakeup};
105              
106 0 0         if (defined $wakeup) {
107 0 0 0       die "Invalid value passed in wakeup option"
108             if $wakeup < 0 && $wakeup != -1;
109 0           $self->{wakeup} = $wakeup;
110             }
111              
112 0           my $wakeup_delay = delete $opts{wakeup_delay};
113              
114 0 0         if (defined $wakeup_delay) {
115 0 0 0       die "Invalid value passed in wakeup_delay option"
116             if $wakeup_delay < 0 && $wakeup_delay != -1;
117 0           $self->{wakeup_delay} = $wakeup_delay;
118             }
119              
120 0 0         croak("Unknown options") if %opts;
121 0           $self->create_listening_sock($port);
122              
123 0           return $self;
124             } ## end sub new
125              
126             sub debug {
127 0     0 0   my ($self, $msg) = @_;
128              
129             #warn "$msg\n";
130             }
131              
132             =head2 create_listening_sock
133              
134             $server_object->create_listening_sock( $portnum, \%options )
135              
136             Add a TCP port listener for incoming Gearman worker and client connections. Options:
137              
138             =over 4
139              
140             =item accept_per_loop
141              
142             =item local_addr
143              
144             Bind socket to only this address.
145              
146             =back
147              
148             =cut
149              
150             sub create_listening_sock {
151 0     0 1   my ($self, $portnum, %opts) = @_;
152              
153 0           my $accept_per_loop = delete $opts{accept_per_loop};
154 0           my $local_addr = delete $opts{local_addr};
155              
156 0 0         warn "Extra options passed into create_listening_sock: "
157             . join(', ', keys %opts) . "\n"
158             if keys %opts;
159              
160 0 0         my $ssock = IO::Socket::INET->new(
    0          
161             LocalPort => $portnum,
162             Type => SOCK_STREAM,
163             Proto => IPPROTO_TCP,
164             Blocking => 0,
165             Reuse => 1,
166             Listen => 1024,
167             ($local_addr ? (LocalAddr => $local_addr) : ())
168             ) or die "Error creating socket: $@\n";
169              
170 0           my $listeners = $self->{listeners};
171 0           push @$listeners,
172             Gearman::Server::Listener->new($ssock, $self,
173             accept_per_loop => $accept_per_loop);
174              
175 0           return $ssock;
176             } ## end sub create_listening_sock
177              
178             sub new_client {
179 0     0 0   my ($self, $sock) = @_;
180 0           my $client = Gearman::Server::Client->new($sock, $self);
181 0           $client->watch_read(1);
182 0           $self->{client_map}{ $client->{fd} } = $client;
183             } ## end sub new_client
184              
185             sub note_disconnected_client {
186 0     0 0   my ($self, $client) = @_;
187 0           delete $self->{client_map}{ $client->{fd} };
188             }
189              
190             sub clients {
191 0     0 0   my $self = shift;
192 0           return values %{ $self->{client_map} };
  0            
193             }
194              
195             # Returns a socket that is connected to the server, we can then use this
196             # socket with a Gearman::Client::Async object to run clients and servers in the
197             # same thread.
198             sub to_inprocess_server {
199 0     0 0   my $self = shift;
200              
201 0           my ($psock, $csock);
202 0 0         socketpair($csock, $psock, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
203             or die "socketpair: $!";
204              
205 0           $csock->autoflush(1);
206 0           $psock->autoflush(1);
207              
208 0           IO::Handle::blocking($csock, 0);
209 0           IO::Handle::blocking($psock, 0);
210              
211 0           my $client = Gearman::Server::Client->new($csock, $self);
212              
213 0           my ($package, $file, $line) = caller;
214 0           $client->{peer_ip} = "[$package|$file|$line]";
215 0           $client->watch_read(1);
216 0           $self->{client_map}{ $client->{fd} } = $client;
217              
218 0           return $psock;
219             } ## end sub to_inprocess_server
220              
221             =head2 start_worker
222              
223             $pid = $server_object->start_worker( $prog )
224              
225             ($pid, $client) = $server_object->start_worker( $prog )
226              
227             Fork and start a worker process named by C<$prog> and returns the pid (or pid and client object).
228              
229             =cut
230              
231             sub start_worker {
232 0     0 1   my ($self, $prog) = @_;
233              
234 0           my ($psock, $csock);
235 0 0         socketpair($csock, $psock, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
236             or die "socketpair: $!";
237              
238 0           $csock->autoflush(1);
239 0           $psock->autoflush(1);
240              
241 0           my $pid = fork;
242 0 0         unless (defined $pid) {
243 0           warn "fork failed: $!\n";
244 0           return undef;
245             }
246              
247             # child process
248 0 0         unless ($pid) {
249 0           local $ENV{'GEARMAN_WORKER_USE_STDIO'} = 1;
250 0           close(STDIN);
251 0           close(STDOUT);
252 0 0         open(STDIN, '<&', $psock)
253             or die "Unable to dup socketpair to STDIN: $!";
254 0 0         open(STDOUT, '>&', $psock)
255             or die "Unable to dup socketpair to STDOUT: $!";
256 0 0         if (UNIVERSAL::isa($prog, "CODE")) {
257 0           $prog->();
258 0           exit 0; # shouldn't get here. subref should exec.
259             }
260 0           exec $prog;
261 0           die "Exec failed: $!";
262             } ## end unless ($pid)
263              
264 0           close($psock);
265              
266 0           IO::Handle::blocking($csock, 0);
267 0           my $sock = $csock;
268              
269 0           my $client = Gearman::Server::Client->new($sock, $self);
270              
271 0           $client->{peer_ip} = "[gearman_child]";
272 0           $client->watch_read(1);
273 0           $self->{client_map}{ $client->{fd} } = $client;
274 0 0         return wantarray ? ($pid, $client) : $pid;
275             } ## end sub start_worker
276              
277             sub enqueue_job {
278 0     0 0   my ($self, $job, $highpri) = @_;
279 0   0       my $jq = ($self->{job_queue}{ $job->{func} } ||= []);
280              
281 0 0         if (defined(my $max_queue_size = $self->{max_queue}{ $job->{func} })) {
282 0           $max_queue_size
283             --; # Subtract one, because we're about to add one more below.
284 0           while (@$jq > $max_queue_size) {
285 0           my $delete_job = pop @$jq;
286 0           my $msg = Gearman::Util::pack_res_command("work_fail",
287             $delete_job->handle);
288 0           $delete_job->relay_to_listeners($msg);
289 0           $delete_job->note_finished;
290             } ## end while (@$jq > $max_queue_size)
291             } ## end if (defined(my $max_queue_size...))
292              
293 0 0         if ($highpri) {
294 0           unshift @$jq, $job;
295             }
296             else {
297 0           push @$jq, $job;
298             }
299              
300 0           $self->{job_of_handle}{ $job->{'handle'} } = $job;
301             } ## end sub enqueue_job
302              
303             sub wake_up_sleepers {
304 0     0 0   my ($self, $func) = @_;
305              
306 0 0         if (my $existing_timer = delete($self->{wakeup_timers}->{$func})) {
307 0           $existing_timer->cancel();
308             }
309              
310 0 0         return unless $self->_wake_up_some($func);
311              
312 0           my $delay = $self->{wakeup_delay};
313              
314             # -1 means don't setup a timer. 0 actually means go as fast as we can, cooperatively.
315 0 0         return if $delay == -1;
316              
317             # If we're only going to wakeup 0 workers anyways, don't set up a timer.
318 0 0         return if $self->{wakeup} == 0;
319              
320             my $timer = Danga::Socket->AddTimer(
321             $delay,
322             sub {
323             # Be sure to not wake up more sleepers if we have no jobs in the queue.
324             # I know the object definition above says I can trust the func element to determine
325             # if there are items in the list, but I'm just gonna be safe, rather than sorry.
326 0 0   0     return unless @{ $self->{job_queue}{$func} || [] };
  0 0          
327 0           $self->wake_up_sleepers($func);
328             }
329 0           );
330 0           $self->{wakeup_timers}->{$func} = $timer;
331             } ## end sub wake_up_sleepers
332              
333             # Returns true when there are still more workers to wake up
334             # False if there are no sleepers
335             sub _wake_up_some {
336 0     0     my ($self, $func) = @_;
337 0 0         my $sleepmap = $self->{sleepers}{$func} or return;
338 0 0         my $sleeporder = $self->{sleepers_list}{$func} or return;
339              
340             # TODO SYNC UP STATE HERE IN CASE TWO LISTS END UP OUT OF SYNC
341              
342 0           my $max = $self->{wakeup};
343              
344 0           while (@$sleeporder) {
345 0           my Gearman::Server::Client $c = shift @$sleeporder;
346 0 0 0       next if $c->{closed} || !$c->{sleeping};
347 0 0         if ($max-- <= 0) {
348 0           unshift @$sleeporder, $c;
349 0           return 1;
350             }
351 0           delete $sleepmap->{"$c"};
352 0           $c->res_packet("noop");
353 0           $c->{sleeping} = 0;
354             } ## end while (@$sleeporder)
355              
356 0           delete $self->{sleepers}{$func};
357 0           delete $self->{sleepers_list}{$func};
358 0           return;
359             } ## end sub _wake_up_some
360              
361             sub on_client_sleep {
362 0     0 0   my $self = shift;
363 0           my Gearman::Server::Client $cl = shift;
364              
365 0           foreach my $cd (@{ $cl->{can_do_list} }) {
  0            
366              
367             # immediately wake the sleeper up if there are things to be done
368 0 0         if ($self->{job_queue}{$cd}) {
369 0           $cl->res_packet("noop");
370 0           $cl->{sleeping} = 0;
371 0           return;
372             }
373              
374 0   0       my $sleepmap = ($self->{sleepers}{$cd} ||= {});
375 0           my $count = $sleepmap->{"$cl"}++;
376              
377 0 0         next if $count >= 2;
378              
379 0   0       my $sleeporder = ($self->{sleepers_list}{$cd} ||= []);
380              
381             # The idea here is to keep workers at the head of the list if they are doing work, hopefully
382             # this will allow extra workers that aren't needed to actually go 'idle' safely.
383 0           my $jobs_done = $cl->{jobs_done_since_sleep};
384              
385 0 0         if ($jobs_done) {
386 0           unshift @$sleeporder, $cl;
387             }
388             else {
389 0           push @$sleeporder, $cl;
390             }
391              
392 0           $cl->{jobs_done_since_sleep} = 0;
393              
394             } ## end foreach my $cd (@{ $cl->{can_do_list...}})
395             } ## end sub on_client_sleep
396              
397             sub jobs_outstanding {
398 0     0 0   my Gearman::Server $self = shift;
399 0           return scalar keys %{ $self->{job_queue} };
  0            
400             }
401              
402             sub jobs {
403 0     0 0   my Gearman::Server $self = shift;
404 0           return values %{ $self->{job_of_handle} };
  0            
405             }
406              
407             sub job_by_handle {
408 0     0 0   my ($self, $handle) = @_;
409 0           return $self->{job_of_handle}{$handle};
410             }
411              
412             sub note_job_finished {
413 0     0 0   my Gearman::Server $self = shift;
414 0           my Gearman::Server::Job $job = shift;
415              
416 0 0         if (my Gearman::Server::Client $worker = $job->worker) {
417 0           $worker->{jobs_done_since_sleep}++;
418             }
419              
420 0 0         if (length($job->{uniq})) {
421 0           delete $self->{job_of_uniq}{ $job->{func} }{ $job->{uniq} };
422             }
423 0           delete $self->{job_of_handle}{ $job->{handle} };
424             } ## end sub note_job_finished
425              
426             # <0/undef/"" to reset. else integer max depth.
427             sub set_max_queue {
428 0     0 0   my ($self, $func, $max) = @_;
429 0 0 0       if (defined $max && length $max && $max >= 0) {
      0        
430 0           $self->{max_queue}{$func} = int($max);
431             }
432             else {
433 0           delete $self->{max_queue}{$func};
434             }
435             } ## end sub set_max_queue
436              
437             sub new_job_handle {
438 0     0 0   my $self = shift;
439 0           return $self->{handle_base} . (++$self->{handle_ct});
440             }
441              
442             sub job_of_unique {
443 0     0 0   my ($self, $func, $uniq) = @_;
444 0 0         return undef unless $self->{job_of_uniq}{$func};
445 0           return $self->{job_of_uniq}{$func}{$uniq};
446             }
447              
448             sub set_unique_job {
449 0     0 0   my ($self, $func, $uniq, $job) = @_;
450 0   0       $self->{job_of_uniq}{$func} ||= {};
451 0           $self->{job_of_uniq}{$func}{$uniq} = $job;
452             }
453              
454             sub grab_job {
455 0     0 0   my ($self, $func) = @_;
456 0 0         return undef unless $self->{job_queue}{$func};
457              
458             my $empty = sub {
459 0     0     delete $self->{job_queue}{$func};
460 0           return undef;
461 0           };
462              
463 0           my Gearman::Server::Job $job;
464 0           while (1) {
465 0           $job = shift @{ $self->{job_queue}{$func} };
  0            
466 0 0         return $empty->() unless $job;
467 0 0         return $job unless $job->require_listener;
468              
469 0           foreach my Gearman::Server::Client $c (@{ $job->{listeners} }) {
  0            
470 0 0 0       return $job if $c && !$c->{closed};
471             }
472 0           $job->note_finished(0);
473             } ## end while (1)
474             } ## end sub grab_job
475              
476             1;
477             __END__