File Coverage

blib/lib/Gearman/Server.pm
Criterion Covered Total %
statement 80 219 36.5
branch 17 80 21.2
condition 7 26 26.9
subroutine 19 36 52.7
pod 18 20 90.0
total 141 381 37.0


line stmt bran cond sub pod time code
1             package Gearman::Server;
2 3     3   3214 use version ();
  3         4250  
  3         135  
3             $Gearman::Server::VERSION = version->declare("1.140_001");
4              
5 3     3   14 use strict;
  3         4  
  3         69  
6 3     3   20 use warnings;
  3         6  
  3         119  
7              
8             =head1 NAME
9              
10             Gearman::Server - function call "router" and load balancer
11              
12             =head1 DESCRIPTION
13              
14             You run a Gearman server (or more likely, many of them for both
15             high-availability and load balancing), then have workers (using
16             L from the Gearman module, or libraries for other
17             languages) register their ability to do certain functions to all of
18             them, and then clients (using L,
19             L, etc) request work to be done from one of
20             the Gearman servers.
21              
22             The servers connect them, routing function call requests to the
23             appropriate workers, multiplexing responses to duplicate requests as
24             requested, etc.
25              
26             More than likely, you want to use the provided L wrapper
27             script, and not use Gearman::Server directly.
28              
29             =cut
30              
31 3     3   14 use Carp qw(croak);
  3         4  
  3         227  
32 3     3   1381 use Gearman::Server::Client;
  3         11  
  3         152  
33 3     3   2120 use Gearman::Server::Listener;
  3         9  
  3         114  
34 3     3   1622 use Gearman::Server::Job;
  3         8  
  3         113  
35 3     3   21 use Gearman::Util;
  3         6  
  3         92  
36 3     3   763 use IO::Socket::INET;
  3         12121  
  3         39  
37 3     3   2717 use IO::Handle ();
  3         7  
  3         102  
38 3         256 use Socket qw/
39             IPPROTO_TCP
40             SOL_SOCKET
41             SOCK_STREAM
42             AF_UNIX
43             SOCK_STREAM
44             PF_UNSPEC
45 3     3   16 /;
  3         7  
46 3     3   18 use Sys::Hostname ();
  3         9  
  3         170  
47              
48             use fields (
49 3         21 'client_map', # fd -> Client
50             'sleepers', # func -> { "Client=HASH(0xdeadbeef)" => Client }
51             'sleepers_list', # func -> [ Client, ... ], ...
52             'job_queue', # job_name -> [Job, Job*] (key only exists if non-empty)
53             'job_of_handle', # handle -> Job
54             'max_queue', # func -> configured max jobqueue size
55             'job_of_uniq', # func -> uniq -> Job
56             'handle_ct', # atomic counter
57             'handle_base', # atomic counter
58             'listeners', # arrayref of listener objects
59             'wakeup', # number of workers to wake
60             'wakeup_delay', # seconds to wait before waking more workers
61             'wakeup_timers', # func -> timer, timer to be canceled or adjusted
62             # when job grab/inject is called
63 3     3   18 );
  3         5  
64              
65             =head1 METHODS
66              
67             =head2 new
68              
69             $server_object = Gearman::Server->new( %options )
70              
71             Creates and returns a new Gearman::Server object, which attaches itself to the
72             L event loop. The server will begin operating when the
73             L runloop is started. This means you need to start up the
74             runloop before anything will happen.
75              
76             Options:
77              
78             =over
79              
80             =item
81              
82             port
83              
84             Specify a port which you would like the B to listen on for TCP connections (not necessary, but useful)
85              
86             =item
87              
88             wakeup
89              
90             Number of workers to wake up per job inserted into the queue.
91              
92             Zero (0) is a perfectly acceptable answer, and can be used if you don't care much about job latency.
93             This would bank on the base idea of a worker checking in with the server every so often.
94              
95             Negative One (-1) indicates that all sleeping workers should be woken up.
96              
97             All other negative numbers will cause the server to throw exception and not start.
98              
99             =item
100              
101             wakeup_delay
102              
103             Time interval before waking up more workers (the value specified by B) when jobs are still in
104             the queue.
105              
106             Zero (0) means go as fast as possible, but not all at the same time. Similar to -1 on B, but
107             is more cooperative in gearmand's multitasking model.
108              
109             Negative One (-1) means that this event won't happen, so only the initial workers will be woken up to
110             handle jobs in the queue.
111              
112             =back
113              
114             =cut
115              
116             sub new {
117 9     9 1 23790 my ($class, %opts) = @_;
118 9 50       49 my $self = ref($class) ? $class : fields::new($class);
119              
120 9         10091 $self->{$_} = {} for qw/
121             client_map
122             sleepers
123             sleepers_list
124             job_queue
125             job_of_handle
126             max_queue
127             job_of_uniq
128             wakeup_timers
129             /;
130              
131 9         20 $self->{listeners} = [];
132 9         16 $self->{wakeup} = 3;
133 9         13 $self->{wakeup_delay} = .1;
134              
135 9         15 $self->{handle_ct} = 0;
136 9         39 $self->{handle_base} = "H:" . Sys::Hostname::hostname() . ":";
137              
138 9         108 my $port = delete $opts{port};
139              
140 9         14 my $wakeup = delete $opts{wakeup};
141 9 100       33 if (defined $wakeup) {
142 2 100 66     42 die "Invalid value passed in wakeup option"
143             if $wakeup < 0 && $wakeup != -1;
144 1         4 $self->{wakeup} = $wakeup;
145             }
146              
147 8         12 my $wakeup_delay = delete $opts{wakeup_delay};
148 8 100       22 if (defined $wakeup_delay) {
149 2 100 66     25 die "Invalid value passed in wakeup_delay option"
150             if $wakeup_delay < 0 && $wakeup_delay != -1;
151 1         4 $self->{wakeup_delay} = $wakeup_delay;
152             }
153              
154 7 100       49 croak("Unknown options") if %opts;
155              
156 6         27 $self->create_listening_sock($port);
157              
158 6         21 return $self;
159             } ## end sub new
160              
161             sub debug {
162 0     0 0 0 my ($self, $msg) = @_;
163              
164 0         0 warn "$msg\n";
165             }
166              
167             =head2 create_listening_sock($portnum, %options)
168              
169             Add a TCP port listener for incoming Gearman worker and client connections. Options:
170              
171             =over 4
172              
173             =item accept_per_loop
174              
175             =item local_addr
176              
177             Bind socket to only this address.
178              
179             =back
180              
181             =cut
182              
183             sub create_listening_sock {
184 7     7 1 2711 my ($self, $portnum, %opts) = @_;
185              
186 7         12 my $accept_per_loop = delete $opts{accept_per_loop};
187 7         13 my $local_addr = delete $opts{local_addr};
188              
189 7 50       25 warn "Extra options passed into create_listening_sock: "
190             . join(', ', keys %opts) . "\n"
191             if keys %opts;
192              
193 7 100       78 my $ssock = IO::Socket::INET->new(
    50          
194             LocalPort => $portnum,
195             Type => SOCK_STREAM,
196             Proto => IPPROTO_TCP,
197             Blocking => 0,
198             Reuse => 1,
199             Listen => 1024,
200             ($local_addr ? (LocalAddr => $local_addr) : ())
201             ) or die "Error creating socket: $@\n";
202              
203 7         1993 my $listeners = $self->{listeners};
204 7         65 push @$listeners,
205             Gearman::Server::Listener->new($ssock, $self,
206             accept_per_loop => $accept_per_loop);
207              
208 7         23 return $ssock;
209             } ## end sub create_listening_sock
210              
211             =head2 new_client($sock)
212              
213             init new L object and add it to internal clients map
214              
215             =cut
216              
217             sub new_client {
218 1     1 1 491 my ($self, $sock) = @_;
219 1         12 my $client = Gearman::Server::Client->new($sock, $self);
220 1         9 $client->watch_read(1);
221 1         31 $self->{client_map}{ $client->{fd} } = $client;
222             } ## end sub new_client
223              
224             =head2 note_disconnected_client($client)
225              
226             delete the client from internal clients map
227              
228             B deleted object
229              
230             =cut
231              
232             sub note_disconnected_client {
233 1     1 1 3 my ($self, $client) = @_;
234 1         7 delete $self->{client_map}{ $client->{fd} };
235             }
236              
237             =head2 clients()
238              
239             B internal clients map
240              
241             =cut
242              
243             sub clients {
244 1     1 1 3 my $self = shift;
245 1         2 return values %{ $self->{client_map} };
  1         15  
246             }
247              
248             =head2 to_inprocess_server()
249              
250             Returns a socket that is connected to the server, we can then use this
251             socket with a Gearman::Client::Async object to run clients and servers in the
252             same thread.
253              
254             =cut
255              
256             sub to_inprocess_server {
257 0     0 1 0 my $self = shift;
258              
259 0         0 my ($psock, $csock);
260 0 0       0 socketpair($csock, $psock, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
261             or die "socketpair: $!";
262              
263 0         0 $csock->autoflush(1);
264 0         0 $psock->autoflush(1);
265              
266 0         0 IO::Handle::blocking($csock, 0);
267 0         0 IO::Handle::blocking($psock, 0);
268              
269 0         0 my $client = Gearman::Server::Client->new($csock, $self);
270              
271 0         0 my ($package, $file, $line) = caller;
272 0         0 $client->{peer_ip} = "[$package|$file|$line]";
273 0         0 $client->watch_read(1);
274 0         0 $self->{client_map}{ $client->{fd} } = $client;
275              
276 0         0 return $psock;
277             } ## end sub to_inprocess_server
278              
279             =head2 start_worker($prog)
280              
281             $pid = $server_object->start_worker( $prog )
282              
283             ($pid, $client) = $server_object->start_worker( $prog )
284              
285             Fork and start a worker process named by C<$prog> and returns the pid (or pid and client object).
286              
287             =cut
288              
289             sub start_worker {
290 0     0 1 0 my ($self, $prog) = @_;
291              
292 0         0 my ($psock, $csock);
293 0 0       0 socketpair($csock, $psock, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
294             or die "socketpair: $!";
295              
296 0         0 $csock->autoflush(1);
297 0         0 $psock->autoflush(1);
298              
299 0         0 my $pid = fork;
300 0 0       0 unless (defined $pid) {
301 0         0 warn "fork failed: $!\n";
302 0         0 return undef;
303             }
304              
305             # child process
306 0 0       0 unless ($pid) {
307 0         0 local $ENV{'GEARMAN_WORKER_USE_STDIO'} = 1;
308 0         0 close(STDIN);
309 0         0 close(STDOUT);
310 0 0       0 open(STDIN, '<&', $psock)
311             or die "Unable to dup socketpair to STDIN: $!";
312 0 0       0 open(STDOUT, '>&', $psock)
313             or die "Unable to dup socketpair to STDOUT: $!";
314 0 0       0 if (UNIVERSAL::isa($prog, "CODE")) {
315 0         0 $prog->();
316              
317             # shouldn't get here. subref should exec.
318 0         0 exit 0;
319             } ## end if (UNIVERSAL::isa($prog...))
320              
321 0         0 exec $prog;
322 0         0 die "Exec failed: $!";
323             } ## end unless ($pid)
324              
325 0         0 close($psock);
326              
327 0         0 IO::Handle::blocking($csock, 0);
328 0         0 my $sock = $csock;
329              
330 0         0 my $client = Gearman::Server::Client->new($sock, $self);
331              
332 0         0 $client->{peer_ip} = "[gearman_child]";
333 0         0 $client->watch_read(1);
334 0         0 $self->{client_map}{ $client->{fd} } = $client;
335 0 0       0 return wantarray ? ($pid, $client) : $pid;
336             } ## end sub start_worker
337              
338             =head2 enqueue_job()
339              
340             =cut
341              
342             sub enqueue_job {
343 0     0 1 0 my ($self, $job, $highpri) = @_;
344 0   0     0 my $jq = ($self->{job_queue}{ $job->{func} } ||= []);
345              
346 0 0       0 if (defined(my $max_queue_size = $self->{max_queue}{ $job->{func} })) {
347              
348             # Subtract one, because we're about to add one more below.
349 0         0 $max_queue_size--;
350 0         0 while (@$jq > $max_queue_size) {
351 0         0 my $delete_job = pop @$jq;
352 0         0 my $msg = Gearman::Util::pack_res_command("work_fail",
353             $delete_job->handle);
354 0         0 $delete_job->relay_to_listeners($msg);
355 0         0 $delete_job->note_finished;
356             } ## end while (@$jq > $max_queue_size)
357             } ## end if (defined(my $max_queue_size...))
358              
359 0 0       0 if ($highpri) {
360 0         0 unshift @$jq, $job;
361             }
362             else {
363 0         0 push @$jq, $job;
364             }
365              
366 0         0 $self->{job_of_handle}{ $job->{'handle'} } = $job;
367             } ## end sub enqueue_job
368              
369             =head2 wake_up_sleepers($func)
370              
371             =cut
372              
373             sub wake_up_sleepers {
374 0     0 1 0 my ($self, $func) = @_;
375              
376 0 0       0 if (my $existing_timer = delete($self->{wakeup_timers}->{$func})) {
377 0         0 $existing_timer->cancel();
378             }
379              
380 0 0       0 return unless $self->_wake_up_some($func);
381              
382 0         0 my $delay = $self->{wakeup_delay};
383              
384             # -1 means don't setup a timer. 0 actually means go as fast as we can, cooperatively.
385 0 0       0 return if $delay == -1;
386              
387             # If we're only going to wakeup 0 workers anyways, don't set up a timer.
388 0 0       0 return if $self->{wakeup} == 0;
389              
390             my $timer = Danga::Socket->AddTimer(
391             $delay,
392             sub {
393             # Be sure to not wake up more sleepers if we have no jobs in the queue.
394             # I know the object definition above says I can trust the func element to determine
395             # if there are items in the list, but I'm just gonna be safe, rather than sorry.
396 0 0   0   0 return unless @{ $self->{job_queue}{$func} || [] };
  0 0       0  
397 0         0 $self->wake_up_sleepers($func);
398             }
399 0         0 );
400 0         0 $self->{wakeup_timers}->{$func} = $timer;
401             } ## end sub wake_up_sleepers
402              
403             # Returns true when there are still more workers to wake up
404             # False if there are no sleepers
405             sub _wake_up_some {
406 0     0   0 my ($self, $func) = @_;
407 0 0       0 my $sleepmap = $self->{sleepers}{$func} or return;
408 0 0       0 my $sleeporder = $self->{sleepers_list}{$func} or return;
409              
410             # TODO SYNC UP STATE HERE IN CASE TWO LISTS END UP OUT OF SYNC
411              
412 0         0 my $max = $self->{wakeup};
413              
414 0         0 while (@$sleeporder) {
415 0         0 my Gearman::Server::Client $c = shift @$sleeporder;
416 0 0 0     0 next if $c->{closed} || !$c->{sleeping};
417 0 0       0 if ($max-- <= 0) {
418 0         0 unshift @$sleeporder, $c;
419 0         0 return 1;
420             }
421 0         0 delete $sleepmap->{"$c"};
422 0         0 $c->res_packet("noop");
423 0         0 $c->{sleeping} = 0;
424             } ## end while (@$sleeporder)
425              
426 0         0 delete $self->{sleepers}{$func};
427 0         0 delete $self->{sleepers_list}{$func};
428 0         0 return;
429             } ## end sub _wake_up_some
430              
431             =head2 on_client_sleep($client)
432              
433             =cut
434              
435             sub on_client_sleep {
436 0     0 1 0 my $self = shift;
437 0         0 my Gearman::Server::Client $cl = shift;
438              
439 0         0 foreach my $cd (@{ $cl->{can_do_list} }) {
  0         0  
440              
441             # immediately wake the sleeper up if there are things to be done
442 0 0       0 if ($self->{job_queue}{$cd}) {
443 0         0 $cl->res_packet("noop");
444 0         0 $cl->{sleeping} = 0;
445 0         0 return;
446             }
447              
448 0   0     0 my $sleepmap = ($self->{sleepers}{$cd} ||= {});
449 0         0 my $count = $sleepmap->{"$cl"}++;
450              
451 0 0       0 next if $count >= 2;
452              
453 0   0     0 my $sleeporder = ($self->{sleepers_list}{$cd} ||= []);
454              
455             # The idea here is to keep workers at the head of the list if they are doing work, hopefully
456             # this will allow extra workers that aren't needed to actually go 'idle' safely.
457 0         0 my $jobs_done = $cl->{jobs_done_since_sleep};
458              
459 0 0       0 if ($jobs_done) {
460 0         0 unshift @$sleeporder, $cl;
461             }
462             else {
463 0         0 push @$sleeporder, $cl;
464             }
465              
466 0         0 $cl->{jobs_done_since_sleep} = 0;
467              
468             } ## end foreach my $cd (@{ $cl->{can_do_list...}})
469             } ## end sub on_client_sleep
470              
471             =head2 jobs_outstanding()
472              
473             =cut
474              
475             sub jobs_outstanding {
476 0     0 1 0 my Gearman::Server $self = shift;
477 0         0 return scalar keys %{ $self->{job_queue} };
  0         0  
478             }
479              
480             =head2 jobs()
481              
482             =cut
483              
484             sub jobs {
485 0     0 1 0 my Gearman::Server $self = shift;
486 0         0 return values %{ $self->{job_of_handle} };
  0         0  
487             }
488              
489             =head2 jobs_by_handle($ahndle)
490              
491             =cut
492              
493             sub job_by_handle {
494 0     0 0 0 my ($self, $handle) = @_;
495 0         0 return $self->{job_of_handle}{$handle};
496             }
497              
498             =head2 note_job_finished($job)
499              
500             =cut
501              
502             sub note_job_finished {
503 0     0 1 0 my Gearman::Server $self = shift;
504 0         0 my Gearman::Server::Job $job = shift;
505              
506 0 0       0 if (my Gearman::Server::Client $worker = $job->worker) {
507 0         0 $worker->{jobs_done_since_sleep}++;
508             }
509              
510 0 0       0 if (length($job->{uniq})) {
511 0         0 delete $self->{job_of_uniq}{ $job->{func} }{ $job->{uniq} };
512             }
513 0         0 delete $self->{job_of_handle}{ $job->{handle} };
514             } ## end sub note_job_finished
515              
516             =head2 set_max_queue($func, $max)
517              
518             =over
519              
520             =item
521              
522             $func
523              
524             function name
525              
526             =item
527              
528             $max
529              
530             0/undef/"" to reset. else integer max depth.
531              
532             =back
533              
534             =cut
535              
536             sub set_max_queue {
537 2     2 1 504 my ($self, $func, $max) = @_;
538 2 100 33     23 if (defined($max) && length($max) && $max > 0) {
      66        
539 1         10 $self->{max_queue}{$func} = int($max);
540             }
541             else {
542 1         9 delete $self->{max_queue}{$func};
543             }
544             } ## end sub set_max_queue
545              
546             =head2 new_job_handle()
547              
548             =cut
549              
550             sub new_job_handle {
551 0     0 1   my $self = shift;
552 0           return $self->{handle_base} . (++$self->{handle_ct});
553             }
554              
555             =head2 job_of_unique($func, $uniq)
556              
557             =cut
558              
559             sub job_of_unique {
560 0     0 1   my ($self, $func, $uniq) = @_;
561 0 0         return undef unless $self->{job_of_uniq}{$func};
562 0           return $self->{job_of_uniq}{$func}{$uniq};
563             }
564              
565             =head2 set_unique_job($func, $uniq, $job)
566              
567             =cut
568              
569             sub set_unique_job {
570 0     0 1   my ($self, $func, $uniq, $job) = @_;
571 0   0       $self->{job_of_uniq}{$func} ||= {};
572 0           $self->{job_of_uniq}{$func}{$uniq} = $job;
573             }
574              
575             =head2 grab_job($func)
576              
577             =cut
578              
579             sub grab_job {
580 0     0 1   my ($self, $func) = @_;
581 0 0         return undef unless $self->{job_queue}{$func};
582              
583             my $empty = sub {
584 0     0     delete $self->{job_queue}{$func};
585 0           return undef;
586 0           };
587              
588 0           my Gearman::Server::Job $job;
589 0           while (1) {
590 0           $job = shift @{ $self->{job_queue}{$func} };
  0            
591 0 0         return $empty->() unless $job;
592 0 0         return $job unless $job->require_listener;
593              
594 0           foreach my Gearman::Server::Client $c (@{ $job->{listeners} }) {
  0            
595 0 0 0       return $job if $c && !$c->{closed};
596             }
597 0           $job->note_finished(0);
598             } ## end while (1)
599             } ## end sub grab_job
600              
601             1;
602             __END__