File Coverage

blib/lib/Gearman/Driver.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             package Gearman::Driver;
2              
3 15     15   581419 use Moose;
  0            
  0            
4             use Moose::Util qw(apply_all_roles);
5             use Carp qw(croak);
6             use Gearman::Driver::Observer;
7             use Gearman::Driver::Console;
8             use Gearman::Driver::Job;
9             use Gearman::Driver::Job::Method;
10             use Log::Log4perl qw(:easy);
11             use MooseX::Types::Path::Class;
12             use POE;
13             with qw(MooseX::Log::Log4perl MooseX::SimpleConfig MooseX::Getopt Gearman::Driver::Loader);
14              
15             our $VERSION = '0.02008';
16              
17             =head1 NAME
18              
19             Gearman::Driver - Manages Gearman workers
20              
21             =head1 SYNOPSIS
22              
23             package My::Workers::One;
24              
25             # Yes, you need to do it exactly this way
26             use base qw(Gearman::Driver::Worker);
27             use Moose;
28              
29             # this method will be registered with gearmand as 'My::Workers::One::scale_image'
30             sub scale_image : Job {
31             my ( $self, $job, $workload ) = @_;
32             # do something
33             }
34              
35             # this method will be registered with gearmand as 'My::Workers::One::do_something_else'
36             sub do_something_else : Job : MinProcesses(2) : MaxProcesses(15) {
37             my ( $self, $job, $workload ) = @_;
38             # do something
39             }
40              
41             # this method wont be registered with gearmand at all
42             sub do_something_internal {
43             my ( $self, $job, $workload ) = @_;
44             # do something
45             }
46              
47             1;
48              
49             package My::Workers::Two;
50              
51             use base qw(Gearman::Driver::Worker);
52             use Moose;
53              
54             # this method will be registered with gearmand as 'My::Workers::Two::scale_image'
55             sub scale_image : Job {
56             my ( $self, $job, $workload ) = @_;
57             # do something
58             }
59              
60             1;
61              
62             package main;
63              
64             use Gearman::Driver;
65              
66             my $driver = Gearman::Driver->new(
67             namespaces => [qw(My::Workers)],
68             server => 'localhost:4730,otherhost:4731',
69             interval => 60,
70             );
71              
72             #or should save all config into a YAML config file, then read config from it.
73             my $driver = Gearman::Driver->new(configfile => '/etc/gearman-driver/config.yml');
74              
75             $driver->run;
76              
77             =head1 DESCRIPTION
78              
79             Warning: This framework is still B<EXPERIMENTAL>!
80              
81             Having hundreds of Gearman workers running in separate processes can
82             consume a lot of RAM. Often many of these workers share the same
83             code/objects, like the database layer using L<DBIx::Class> for
84             example. This is where L<Gearman::Driver> comes in handy:
85              
86             You write some base class which inherits from
87             L<Gearman::Driver::Worker>. Your base class loads your database layer
88             for example. Each of your worker classes inherit from that base
89             class. In the worker classes you can register single methods as jobs
90             with gearmand. It's even possible to control how many workers doing
91             that job/method in parallel. And this is the point where you'll
92             save some RAM: Instead of starting each worker in a separate process
93             L<Gearman::Driver> will fork each worker from the main process. This
94             will take advantage of copy-on-write on Linux and save some RAM.
95              
96             There's only one mandatory parameter which has to be set when calling
97             the constructor: namespaces
98              
99             use Gearman::Driver;
100             my $driver = Gearman::Driver->new( namespaces => [qw(My::Workers)] );
101              
102             See also: L<namespaces|/namespaces>. If you do not set
103             L<server|/server> (gearmand) attribute the default will be used:
104             C<localhost:4730>
105              
106             Each module found in your namespaces will be loaded and introspected,
107             looking for methods having the 'Job' attribute set:
108              
109             package My::Workers::ONE;
110              
111             sub scale_image : Job {
112             my ( $self, $job, $workload ) = @_;
113             # do something
114             }
115              
116             This method will be registered as job function with gearmand, verify
117             it by doing:
118              
119             plu@mbp ~$ telnet localhost 4730
120             Trying ::1...
121             Connected to localhost.
122             Escape character is '^]'.
123             status
124             My::Workers::ONE::scale_image 0 0 1
125             .
126             ^]
127             telnet> Connection closed.
128              
129             If you dont like to use the full package name you can also specify
130             a custom prefix:
131              
132             package My::Workers::ONE;
133              
134             sub prefix { 'foo_bar_' }
135              
136             sub scale_image : Job {
137             my ( $self, $job, $workload ) = @_;
138             # do something
139             }
140              
141             This would register 'foo_bar_scale_image' with gearmand.
142              
143             See also: L<prefix|Gearman::Driver::Worker/prefix>
144              
145             =head1 ATTRIBUTES
146              
147             See also L<Gearman::Driver::Loader/ATTRIBUTES>.
148              
149             =head2 server
150              
151             A list of Gearman servers the workers should connect to. The format
152             for the server list is: C<host[:port][,host[:port]]>
153              
154             See also: L<Gearman::XS>
155              
156             =over 4
157              
158             =item * default: C<localhost:4730>
159              
160             =item * isa: C<Str>
161              
162             =back
163              
164             =cut
165              
166             has 'server' => (
167             default => 'localhost:4730',
168             documentation => 'Gearman host[:port][,host[:port]]',
169             is => 'rw',
170             isa => 'Str',
171             required => 1,
172             );
173              
174             =head2 console_port
175              
176             Gearman::Driver has a telnet management console, see also:
177              
178             L<Gearman::Driver::Console>
179              
180             =over 4
181              
182             =item * default: C<47300>
183              
184             =item * isa: C<Int>
185              
186             =back
187              
188             Set this to C<0> to disable management console at all.
189              
190             =cut
191              
192             has 'console_port' => (
193             default => 47300,
194             documentation => 'Port of management console (default: 47300)',
195             is => 'rw',
196             isa => 'Int',
197             required => 1,
198             );
199              
200             =head2 interval
201              
202             Each n seconds L<Net::Telnet::Gearman> is used in
203             L<Gearman::Driver::Observer> to check status of free/running/busy
204             workers on gearmand. This is used to fork more workers depending
205             on the queue size and the MinProcesses/MaxProcesses
206             L<attribute|Gearman::Driver::Worker/METHODATTRIBUTES> of the
207             job method. See also: L<Gearman::Driver::Worker>
208              
209             =over 4
210              
211             =item * default: C<5>
212              
213             =item * isa: C<Int>
214              
215             =back
216              
217             =cut
218              
219             has 'interval' => (
220             default => '5',
221             documentation => 'Interval in seconds (see Gearman::Driver::Observer)',
222             is => 'rw',
223             isa => 'Int',
224             required => 1,
225             );
226              
227             =head2 max_idle_time
228              
229             Whenever L<Gearman::Driver::Observer> notices that there are more
230             processes running than actually necessary (depending on min_processes
231             and max_processes setting) it will kill them. By default this happens
232             immediately. If you change this value to C<300>, a process which is
233             not necessary is killed after 300 seconds.
234              
235             Please remember that this also depends on what value you set
236             L</interval> to. The max_idle_time is only checked each n seconds
237             where n is L</interval>. Besides that it makes only sense when you
238             have workers where L<Gearman::Driver::Worker/MinProcesses> is set to
239             C<0>.
240              
241             =over 4
242              
243             =item * default: C<0>
244              
245             =item * isa: C<Int>
246              
247             =back
248              
249             =cut
250              
251             has 'max_idle_time' => (
252             default => '0',
253             documentation => 'How many seconds a worker may be idle before its killed',
254             is => 'rw',
255             isa => 'Int',
256             required => 1,
257             );
258              
259             =head2 logfile
260              
261             Path to logfile.
262              
263             =over 4
264              
265             =item * isa: C<Str>
266              
267             =item * default: C<gearman_driver.log>
268              
269             =back
270              
271             =cut
272              
273             has 'logfile' => (
274             coerce => 1,
275             default => 'gearman_driver.log',
276             documentation => 'Path to logfile (default: gearman_driver.log)',
277             is => 'rw',
278             isa => 'Path::Class::File',
279             );
280              
281             =head2 loglayout
282              
283             See also L<Log::Log4perl>.
284              
285             =over 4
286              
287             =item * isa: C<Str>
288              
289             =item * default: C<[%d] %p %m%n>
290              
291             =back
292              
293             =cut
294              
295             has 'loglayout' => (
296             default => '[%d] %p %m%n',
297             documentation => 'Log message layout (default: [%d] %p %m%n)',
298             is => 'rw',
299             isa => 'Str',
300             );
301              
302             =head2 loglevel
303              
304             See also L<Log::Log4perl>.
305              
306             =over 4
307              
308             =item * isa: C<Str>
309              
310             =item * default: C<INFO>
311              
312             =back
313              
314             =cut
315              
316             has 'loglevel' => (
317             default => 'INFO',
318             documentation => 'Log level (default: INFO)',
319             is => 'rw',
320             isa => 'Str',
321             );
322              
323             =head2 unknown_job_callback
324              
325             Whenever L<Gearman::Driver::Observer> sees a job that isnt handled
326             it will call this CodeRef, passing following arguments:
327              
328             =over 4
329              
330             =item * C<$driver>
331              
332             =item * C<$status>
333              
334             =back
335              
336             my $driver = Gearman::Driver->new(
337             namespaces => [qw(My::Workers)],
338             unknown_job_callback => sub {
339             my ( $driver, $status ) = @_;
340             # notify nagios here for example
341             }
342             );
343              
344             C<$status> might look like:
345              
346             $VAR1 = {
347             'busy' => 0,
348             'free' => 0,
349             'name' => 'GDExamples::Convert::unknown_job',
350             'queue' => 6,
351             'running' => 0
352             };
353              
354             =cut
355              
356             has 'unknown_job_callback' => (
357             default => sub {
358             sub { }
359             },
360             is => 'rw',
361             isa => 'CodeRef',
362             traits => [qw(NoGetopt)],
363             );
364              
365             =head2 worker_options
366              
367             You can pass runtime options to the worker module, these will merged with 'GLOBAL' and pass to the worker constructor. ( worker options override globals )
368              
369             =over 4
370              
371             =item * default: C<{}>
372              
373             =item * isa: C<HashRef>
374              
375             =back
376              
377             Example:
378              
379             my $driver = Gearman::Driver->new(
380             namespaces => [qw(My::Workers)],
381             worker_options => {
382             'GLOBAL' => {
383             'config' => $config,
384             },
385             'My::Workers::MysqlPing' => {
386             'dsn' => 'DBI:mysql:database=test;host=localhost;mysql_auto_reconnect=1;mysql_enable_utf8=1;mysql_server_prepare=1;',
387             },
388             'My::Workers::ImageThumbnail' => {
389             'default_format' => 'jpeg',
390             'default_size => ' 133 x 100 ',
391             }
392             }
393             );
394              
395             You should define these in a runtime config (See also L</configfile>), might be:
396              
397             ---
398             worker_options:
399             'My::App::Worker::MysqlPing':
400             'dsn': 'DBI:mysql:database=test;host=localhost;mysql_auto_reconnect=1;mysql_enable_utf8=1;mysql_server_prepare=1;'
401             'user': 'root'
402             'password:': ''
403             'My::App::Worker::ImageThumbnail':
404             'default_format': 'jpeg'
405             'default_size': '133x100'
406              
407             =cut
408              
409             has 'worker_options' => (
410             isa => 'HashRef',
411             is => 'rw',
412             default => sub { {} },
413             traits => [qw(Hash NoGetopt)],
414             );
415              
416             =head2 Job runtime attributes
417              
418             You can override a job attribute by its name here. This help to tuning job some runtime-related options (like max_processes, min_processes) handy.
419             You just change the options in a config file, no need to modify the worker code anymore.
420              
421             Currently only 'max_processes', 'min_processes' make sense. The hash key is "worker_module::job_key", job_key is ProcessGroup attribute or
422             job method name.
423              
424             #in your config file: /etc/gearman-driver.yml (YAML)
425             ---
426             job_runtime_attributes:
427             'My::App::Worker::job1':
428             max_processes: 25
429             min_processes: 2
430             #job has a ProcessGroup attribute named 'group1'
431             'My::App::Worker::group1':
432             max_processes: 10
433             min_processes: 2
434             #then run as:
435             gearman_driver.pl --configfile /etc/gearman_driver.yml
436              
437             =cut
438              
439             has 'job_runtime_attributes' => (
440             isa => 'HashRef',
441             is => 'rw',
442             default => sub { {} },
443             traits => [qw(Hash NoGetopt)],
444             );
445              
446             =head2 configfile
447              
448             Runtime config file path, You can provide a default configfile pathname like so:
449              
450             has +configfile ( default => '/etc/gearman-driver.yaml' );
451              
452             You can pass an array of filenames if you want, like:
453              
454             has +configfile ( default => sub { [ '/etc/gearman-driver.yaml','/opt/my-app/etc/config.yml' ] });
455              
456             =cut
457              
458             has '+configfile' => (
459             documentation => 'Gearman-driver runtime config path',
460             );
461              
462             =head2 daemonize
463              
464             Detach self and run as a daemon.
465              
466             =cut
467              
468             has 'daemonize' => (
469             isa => 'Bool',
470             is => 'rw',
471             default => 0,
472             documentation => 'Let Gearman-driver run as a daemon'
473             );
474              
475             =head1 INTERNAL ATTRIBUTES
476              
477             This might be interesting for subclassing L<Gearman::Driver>.
478              
479             =head2 jobs
480              
481             Stores all L<Gearman::Driver::Job> instances. There are also two
482             methods:
483              
484             =over 4
485              
486             =item * L<get_job|Gearman::Driver/get_job>
487              
488             =item * L<has_job|Gearman::Driver/has_job>
489              
490             =back
491              
492             Example:
493              
494             {
495             'My::Workers::ONE::scale_image' => bless( {...}, 'Gearman::Driver::Job' ),
496             'My::Workers::ONE::do_something_else' => bless( {...}, 'Gearman::Driver::Job' ),
497             'My::Workers::TWO::scale_image' => bless( {...}, 'Gearman::Driver::Job' ),
498             }
499              
500             =over 4
501              
502             =item * isa: C<HashRef>
503              
504             =item * readonly: C<True>
505              
506             =back
507              
508             =cut
509              
510             has 'jobs' => (
511             default => sub { {} },
512             handles => {
513             _set_job => 'set',
514             get_job => 'get',
515             has_job => 'defined',
516             all_jobs => 'values',
517             },
518             is => 'ro',
519             isa => 'HashRef',
520             traits => [qw(Hash NoGetopt)],
521             );
522              
523             =head2 observer
524              
525             Instance of L<Gearman::Driver::Observer>.
526              
527             =over 4
528              
529             =item * isa: C<Gearman::Driver::Observer>
530              
531             =item * readonly: C<True>
532              
533             =back
534              
535             =cut
536              
537             has 'observer' => (
538             is => 'ro',
539             isa => 'Gearman::Driver::Observer',
540             traits => [qw(NoGetopt)],
541             );
542              
543             =head2 console
544              
545             Instance of L<Gearman::Driver::Console>.
546              
547             =over 4
548              
549             =item * isa: C<Gearman::Driver::Console>
550              
551             =item * readonly: C<True>
552              
553             =back
554              
555             =cut
556              
557             has 'console' => (
558             is => 'ro',
559             isa => 'Gearman::Driver::Console',
560             traits => [qw(NoGetopt)],
561             );
562              
563             has 'session' => (
564             is => 'ro',
565             isa => 'POE::Session',
566             traits => [qw(NoGetopt)],
567             );
568              
569             has 'pid' => (
570             default => $$,
571             is => 'ro',
572             isa => 'Int',
573             );
574              
575              
576             has '+logger' => ( traits => [qw(NoGetopt)] );
577             has '+wanted' => ( traits => [qw(NoGetopt)] );
578             has '+modules' => ( traits => [qw(NoGetopt)] );
579              
580             =head1 METHODS
581              
582             =head2 add_job
583              
584             There's one mandatory param (hashref) with following keys:
585              
586             =over 4
587              
588             =item * max_processes (mandatory)
589              
590             Maximum number of processes that may be forked.
591              
592             =item * min_processes (mandatory)
593              
594             Minimum number of processes that should be forked.
595              
596             =item * name (mandatory)
597              
598             Job name/alias that method should be registered with Gearman.
599              
600             =item * methods (mandatory)
601              
602             ArrayRef of HashRefs containing following keys:
603              
604             =over 4
605              
606             =item * body (mandatory)
607              
608             CodeRef to the job method.
609              
610             =item * name (mandatory)
611              
612             The name this method should be registered with gearmand.
613              
614             =item * decode (optionally)
615              
616             Name of a decoder method in your worker object.
617              
618             =item * encode (optionally)
619              
620             Name of a encoder method in your worker object.
621              
622             =back
623              
624             =item * worker (mandatory)
625              
626             Worker object that should be passed as first parameter to the job
627             method.
628              
629             =back
630              
631             Basically you never really need this method if you use
632             L</namespaces>. But L</namespaces> depends on method attributes which
633             some people do hate. In this case, feel free to setup your C<$driver>
634             this way:
635              
636             package My::Workers::One;
637              
638             use Moose;
639             use JSON::XS;
640             extends 'Gearman::Driver::Worker::Base';
641              
642             # this method will be registered with gearmand as 'My::Workers::One::scale_image'
643             sub scale_image {
644             my ( $self, $job, $workload ) = @_;
645             # do something
646             }
647              
648             # this method will be registered with gearmand as 'My::Workers::One::do_something_else'
649             sub do_something_else {
650             my ( $self, $job, $workload ) = @_;
651             # do something
652             }
653              
654             sub encode_json {
655             my ( $self, $result ) = @_;
656             return JSON::XS::encode_json($result);
657             }
658              
659             sub decode_json {
660             my ( $self, $workload ) = @_;
661             return JSON::XS::decode_json($workload);
662             }
663              
664             1;
665              
666             package main;
667              
668             use Gearman::Driver;
669             use My::Workers::One;
670              
671             my $driver = Gearman::Driver->new(
672             server => 'localhost:4730,otherhost:4731',
673             interval => 60,
674             );
675              
676             my $worker = My::Workers::One->new();
677              
678             # run each method in an own process
679             foreach my $method (qw(scale_image do_something_else)) {
680             $driver->add_job(
681             {
682             max_processes => 5,
683             min_processes => 1,
684             name => $method,
685             worker => $worker,
686             methods => [
687             {
688             body => $w1->meta->find_method_by_name($method)->body,
689             decode => 'decode_json',
690             encode => 'encode_json',
691             name => $method,
692             },
693             ]
694             }
695             );
696             }
697              
698             # share both methods in a single process
699             $driver->add_job(
700             {
701             max_processes => 5,
702             min_processes => 1,
703             name => 'some_alias',
704             worker => $worker,
705             methods => [
706             {
707             body => $w1->meta->find_method_by_name('scale_image')->body,
708             decode => 'decode_json',
709             encode => 'encode_json',
710             name => 'scale_image',
711             },
712             {
713             body => $w1->meta->find_method_by_name('do_something_else')->body,
714             decode => 'decode_json',
715             encode => 'encode_json',
716             name => 'do_something_else',
717             },
718             ]
719             }
720             );
721              
722             $driver->run;
723              
724             =cut
725              
726             sub add_job {
727             my ( $self, $params ) = @_;
728              
729             $params->{name} = $params->{worker}->prefix . $params->{name};
730              
731             foreach my $key ( keys %$params ) {
732             delete $params->{$key} unless defined $params->{$key};
733             }
734              
735             my @methods = ();
736             foreach my $args ( @{ delete $params->{methods} } ) {
737             foreach my $key ( keys %$args ) {
738             delete $args->{$key} unless defined $args->{$key};
739             }
740             $args->{name} = $params->{worker}->prefix . $args->{name};
741             push @methods, Gearman::Driver::Job::Method->new( %$args, worker => $params->{worker} );
742             }
743              
744             my $job = Gearman::Driver::Job->new(
745             driver => $self,
746             methods => \@methods,
747             %$params
748             );
749              
750             $self->_set_job( $params->{name} => $job );
751              
752             $self->log->debug( sprintf "Added new job: %s (processes: %d)", $params->{name}, $params->{min_processes} || 1 );
753              
754             return 1;
755             }
756              
757             =head2 get_jobs
758              
759             Returns all L<Gearman::Driver::Job> objects ordered by jobname.
760              
761             =cut
762              
763             sub get_jobs {
764             my ($self) = @_;
765             my @result = ();
766             foreach my $name ( sort keys %{ $self->jobs } ) {
767             push @result, $self->get_job($name);
768             }
769             return @result;
770             }
771              
772             =head2 run
773              
774             This must be called after the L<Gearman::Driver> object is instantiated.
775              
776             =cut
777              
778             sub run {
779             my ($self) = @_;
780             push @INC, @{ $self->lib };
781             $self->load_namespaces;
782              
783             $self->_daemonize if $self->daemonize;
784              
785             $self->_start_observer;
786             $self->_start_console;
787             $self->_start_session;
788             POE::Kernel->run();
789             }
790              
791             =head2 shutdown
792              
793             Sends TERM signal to all child processes and exits Gearman::Driver.
794              
795             =cut
796              
797             sub shutdown {
798             my ($self) = @_;
799             POE::Kernel->signal( $self->{session}, 'TERM' );
800             }
801              
802             sub DEMOLISH {
803             my ($self) = @_;
804             if ( $self->pid eq $$ ) {
805             $self->shutdown;
806             }
807             }
808              
809             =head2 has_job
810              
811             Params: $name
812              
813             Returns true/false if the job exists.
814              
815             =head2 get_job
816              
817             Params: $name
818              
819             Returns the job instance.
820              
821             =cut
822              
823             sub BUILD {
824             my ($self) = @_;
825             $self->_setup_logger;
826             }
827              
828             sub _setup_logger {
829             my ($self) = @_;
830              
831             unless (Log::Log4perl->initialized()) {
832             Log::Log4perl->easy_init(
833             {
834             file => sprintf( '>>%s', $self->logfile ),
835             layout => $self->loglayout,
836             level => $self->loglevel,
837             },
838             );
839             }
840             }
841              
842             sub _start_observer {
843             my ($self) = @_;
844             if ( $self->interval > 0 ) {
845             $self->{observer} = Gearman::Driver::Observer->new(
846             callback => sub {
847             my ($response) = @_;
848             $self->_observer_callback($response);
849             },
850             interval => $self->interval,
851             server => $self->server,
852             );
853             }
854             }
855              
856             sub _start_console {
857             my ($self) = @_;
858             if ( $self->console_port > 0 ) {
859             $self->{console} = Gearman::Driver::Console->new(
860             driver => $self,
861             port => $self->console_port,
862             );
863             }
864             }
865              
866             sub _observer_callback {
867             my ( $self, $response ) = @_;
868              
869             # When $job->add_process is called and ProcessGroup is used
870             # this may end up in a race condition and more processes than
871             # wanted are started. To fix that we remember what kind of
872             # processes we need to start in each single run of this callback.
873             my %to_start = ();
874              
875             my $status = $response->{data};
876             foreach my $row (@$status) {
877             if ( my $job = $self->_find_job( $row->{name} ) ) {
878             $to_start{$job->name} ||= 0;
879             if ( $job->count_processes <= $row->{busy} && $row->{queue} ) {
880             my $diff = $row->{queue} - $row->{busy};
881             my $free = $job->max_processes - $job->count_processes;
882             if ($free) {
883             my $start = $diff > $free ? $free : $diff;
884             $to_start{$job->name} += $start;
885             }
886             }
887              
888             elsif ( $job->count_processes && $job->count_processes > $job->min_processes && $row->{queue} == 0 ) {
889             my $idle = time - $job->lastrun;
890             if ( $job->lastrun && ($idle >= $self->max_idle_time) ) {
891             my $stop = $job->count_processes - $job->min_processes;
892             $self->log->debug( sprintf "Stopping %d process(es) of type %s (idle: %d)",
893             $stop, $job->name, $idle );
894             $job->remove_process for 1 .. $stop;
895             }
896             }
897             }
898             else {
899             $self->unknown_job_callback->( $self, $row ) if $row->{queue} > 0;
900             }
901             }
902              
903             foreach my $name (keys %to_start) {
904             my $job = $self->get_job($name);
905             my $start = $to_start{$name};
906             my $free = $job->max_processes - $job->count_processes;
907             $start = $free if $start > $free;
908             if ($start) {
909             $self->log->debug( sprintf "Starting %d new process(es) of type %s", $start, $job->name );
910             $job->add_process for 1 .. $start;
911             }
912             }
913              
914             my $error = $response->{error};
915             foreach my $e (@$error) {
916             $self->log->error( sprintf "Gearman::Driver::Observer: %s", $e );
917             }
918             }
919              
920             sub _find_job {
921             my ( $self, $name ) = @_;
922             foreach my $job ( $self->all_jobs ) {
923             foreach my $method ( @{ $job->methods } ) {
924             return $job if $method->name eq $name;
925             }
926             }
927             return 0;
928             }
929              
930             sub _start_session {
931             my ($self) = @_;
932             $self->{session} = POE::Session->create(
933             object_states => [
934             $self => {
935             _start => '_start',
936             got_sig => '_on_sig',
937             monitor_processes => '_monitor_processes',
938             }
939             ]
940             );
941             }
942              
943             sub _on_sig {
944             my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
945              
946             foreach my $job ( $self->get_jobs ) {
947             foreach my $process ( $job->get_processes ) {
948             $self->log->info( sprintf '(%d) [%s] Process killed', $process->PID, $job->name );
949             $process->kill();
950             }
951             }
952              
953             $kernel->sig_handled();
954              
955             exit(0);
956             }
957              
958             sub _start {
959             $_[KERNEL]->sig( $_ => 'got_sig' ) for qw(INT QUIT ABRT KILL TERM);
960             $_[OBJECT]->_add_jobs;
961             $_[OBJECT]->_start_jobs;
962             $_[KERNEL]->delay( monitor_processes => 5 );
963             }
964              
965             sub _add_jobs {
966             my ($self) = @_;
967             my $worker_options = $self->worker_options;
968             my $job_runtime_attributes = $self->job_runtime_attributes;
969              
970             foreach my $module ( $self->get_modules ) {
971             my %module_options = (
972             %{ $worker_options->{GLOBAL} || {} },
973             %{ $worker_options->{$module} || {} },
974             );
975             $module_options{server} = $self->server;
976             my $worker = $module->new( %module_options );
977             my %methods = ();
978             foreach my $method ( $module->meta->get_nearest_methods_with_attributes ) {
979             apply_all_roles( $method => 'Gearman::Driver::Worker::AttributeParser' );
980              
981             $method->default_attributes( $worker->default_attributes );
982             $method->override_attributes( $worker->override_attributes );
983              
984             next unless $method->has_attribute('Job');
985              
986             my $name = $method->get_attribute('ProcessGroup') || $method->name;
987             $methods{$name} ||= [];
988             push @{ $methods{$name} }, $method;
989             }
990              
991             foreach my $name ( keys %methods ) {
992             my @methods = ();
993             my ( $min_processes, $max_processes );
994              
995             foreach my $method ( @{ $methods{$name} } ) {
996             warn sprintf "MinProcesses redefined in ProcessGroup(%s) at %s::%s",
997             $method->get_attribute('ProcessGroup'), ref($worker), $method->name
998             if defined $min_processes && $method->has_attribute('MinProcesses');
999              
1000             warn sprintf "MaxProcesses redefined in ProcessGroup(%s) at %s::%s",
1001             $method->get_attribute('ProcessGroup'), ref($worker), $method->name
1002             if defined $max_processes && $method->has_attribute('MaxProcesses');
1003              
1004             $min_processes ||= $method->get_attribute('MinProcesses');
1005             $max_processes ||= $method->get_attribute('MaxProcesses');
1006              
1007             push @methods,
1008             {
1009             body => $method->body,
1010             name => $method->name,
1011             decode => $method->get_attribute('Decode'),
1012             encode => $method->get_attribute('Encode'),
1013             };
1014             }
1015              
1016             my $job_runtime_attributes = $self->job_runtime_attributes->{$module.'::'.$name} || {};
1017             if (defined $job_runtime_attributes->{min_processes} ) {
1018             $min_processes = $job_runtime_attributes->{min_processes} ;
1019             }
1020              
1021             if (defined $job_runtime_attributes->{max_processes}) {
1022             $max_processes = $job_runtime_attributes->{max_processes};
1023             }
1024              
1025             $self->add_job(
1026             {
1027             max_processes => $max_processes,
1028             min_processes => $min_processes,
1029             methods => \@methods,
1030             name => $name,
1031             worker => $worker,
1032             }
1033             );
1034             }
1035             }
1036             }
1037              
1038             sub _start_jobs {
1039             my ($self) = @_;
1040              
1041             foreach my $job ( $self->get_jobs ) {
1042             for ( 1 .. $job->min_processes ) {
1043             $job->add_process();
1044             }
1045             }
1046             }
1047              
1048             sub _monitor_processes {
1049             my $self = $_[OBJECT];
1050             foreach my $job ( $self->get_jobs ) {
1051             if ( $job->count_processes < $job->min_processes ) {
1052             my $start = $job->min_processes - $job->count_processes;
1053             $self->log->debug( sprintf "Starting %d new process(es) of type %s", $start, $job->name );
1054             $job->add_process for 1 .. $start;
1055             }
1056             }
1057             $_[KERNEL]->delay( monitor_processes => 5 );
1058             }
1059              
1060              
1061             sub _daemonize {
1062             my $self = shift;
1063             my $logfile = $self->logfile || '/dev/null';
1064             # fallback to /dev/null
1065             $logfile = '/dev/null' unless -w $logfile;
1066             require POSIX;
1067             fork && exit;
1068             ## Detach ourselves from the terminal
1069             croak "Cannot detach from controlling terminal" unless POSIX::setsid();
1070             fork && exit;
1071             umask 0;
1072             close(STDIN);
1073             close(STDOUT);
1074             close(STDERR);
1075             ## Reopen stderr, stdout, stdin to $logfile
1076             open(STDIN, "+>$logfile");
1077             open(STDOUT, "+>&STDIN");
1078             open(STDERR, "+>&STDIN");
1079             chdir "/";
1080             }
1081              
1082             no Moose;
1083              
1084             __PACKAGE__->meta->make_immutable;
1085              
1086             =head1 SCRIPT
1087              
1088             There's also a script C<gearman_driver.pl> which is installed with
1089             this distribution. It just instantiates L<Gearman::Driver> with its
1090             default values, having most of the options exposed to the command
1091             line using L<MooseX::Getopt>.
1092              
1093             usage: gearman_driver.pl [long options...]
1094             --loglevel Log level (default: INFO)
1095             --lib Example: --lib ./lib --lib /custom/lib
1096             --server Gearman host[:port][,host[:port]]
1097             --logfile Path to logfile (default: gearman_driver.log)
1098             --console_port Port of management console (default: 47300)
1099             --interval Interval in seconds (see Gearman::Driver::Observer)
1100             --loglayout Log message layout (default: [%d] %p %m%n)
1101             --namespaces Example: --namespaces My::Workers --namespaces My::OtherWorkers
1102             --configfile Read options from this file. Example: --configfile ./etc/gearman-driver-config.yml
1103             --daemonize Run as daemon.
1104              
1105             =head1 AUTHOR
1106              
1107             Johannes Plunien E<lt>plu@cpan.orgE<gt>
1108              
1109             =head1 CONTRIBUTORS
1110              
1111             Uwe Voelker, <uwe.voelker@gmx.de>
1112              
1113             Night Sailer <nightsailer@gmail.com>
1114              
1115             Robert Bohne, <rbo@cpan.org>
1116              
1117             =head1 COPYRIGHT AND LICENSE
1118              
1119             Copyright 2009 by Johannes Plunien
1120              
1121             This library is free software; you can redistribute it and/or modify
1122             it under the same terms as Perl itself.
1123              
1124             =head1 SEE ALSO
1125              
1126             =over 4
1127              
1128             =item * L<Gearman::Driver::Adaptor>
1129              
1130             =item * L<Gearman::Driver::Console>
1131              
1132             =item * L<Gearman::Driver::Console::Basic>
1133              
1134             =item * L<Gearman::Driver::Console::Client>
1135              
1136             =item * L<Gearman::Driver::Job>
1137              
1138             =item * L<Gearman::Driver::Job::Method>
1139              
1140             =item * L<Gearman::Driver::Loader>
1141              
1142             =item * L<Gearman::Driver::Observer>
1143              
1144             =item * L<Gearman::Driver::Worker>
1145              
1146             =item * L<Gearman::XS>
1147              
1148             =item * L<Gearman>
1149              
1150             =item * L<Gearman::Server>
1151              
1152             =item * L<Log::Log4perl>
1153              
1154             =item * L<Module::Find>
1155              
1156             =item * L<Moose>
1157              
1158             =item * L<MooseX::Getopt>
1159              
1160             =item * L<MooseX::Log::Log4perl>
1161              
1162             =item * L<MooseX::MethodAttributes>
1163              
1164             =item * L<Net::Telnet::Gearman>
1165              
1166             =item * L<POE>
1167              
1168             =item * L<http://www.gearman.org/>
1169              
1170             =back
1171              
1172             =head1 REPOSITORY
1173              
1174             L<http://github.com/plu/gearman-driver/>
1175              
1176             =cut
1177              
1178             1;