File Coverage

blib/lib/Gearman/Driver/Job.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             package Gearman::Driver::Job;
2              
3 1     1   1654 use Moose;
  0            
  0            
4             use Gearman::Driver::Adaptor;
5             use POE qw(Wheel::Run);
6              
7             =head1 NAME
8              
9             Gearman::Driver::Job - Handles the POE magic
10              
11             =head1 DESCRIPTION
12              
13             This class is responsible for starting/stopping processes as well as
14             handling all pipes (STDOUT/STDERR/STDIN) of the processes. All events
15             are written to a logfile. Possible events are:
16              
17             =over 4
18              
19             =item * Starting processes
20              
21             =item * STDOUT of processes
22              
23             =item * STDERR of processes
24              
25             =item * Stopping processes
26              
27             =back
28              
29             The current interface may only be interesting for people subclassing
30             L<Gearman::Driver> or for people writing commands/extensions for
31             L<Gearman::Driver::Console>.
32              
33             =head1 ATTRIBUTES
34              
35             =head2 driver
36              
37             Reference to the L<Gearman::Driver> instance.
38              
39             =cut
40              
41             has 'driver' => (
42             handles => { log => 'log' },
43             is => 'rw',
44             isa => 'Gearman::Driver',
45             required => 1,
46             weak_ref => 1,
47             );
48              
49             =head2 name
50              
51             The job's name.
52              
53             =cut
54              
55             has 'name' => (
56             is => 'rw',
57             isa => 'Str',
58             required => 1,
59             );
60              
61             =head2 methods
62              
63             ArrayRef of L<Gearman::Driver::Job::Method> objects.
64              
65             =cut
66              
67             has 'methods' => (
68             is => 'rw',
69             isa => 'ArrayRef[Gearman::Driver::Job::Method]',
70             required => 1,
71             );
72              
73             =head2 max_processes
74              
75             Maximum number of concurrent processes this job may have.
76              
77             =cut
78              
79             has 'max_processes' => (
80             default => 1,
81             is => 'rw',
82             isa => 'Int',
83             required => 1,
84             );
85              
86             =head2 min_processes
87              
88             Minimum number of concurrent processes this job may have.
89              
90             =cut
91              
92             has 'min_processes' => (
93             default => 1,
94             is => 'rw',
95             isa => 'Int',
96             required => 1,
97             );
98              
99             =head2 processes
100              
101             This attribute stores a key/value pair containing:
102             C<$pid> => L<$job|Gearman::Driver::Job>
103              
104             It provides following methods:
105              
106             =over 4
107              
108             =item * C<count_processes()>
109              
110             =item * C<delete_process($pid)>
111              
112             =item * C<get_process($pid)>
113              
114             =item * C<get_processes()>
115              
116             =item * C<get_pids()>
117              
118             =item * C<set_process($pid => $job)>
119              
120             =back
121              
122             =cut
123              
124             has 'processes' => (
125             default => sub { {} },
126             handles => {
127             count_processes => 'count',
128             delete_process => 'delete',
129             get_process => 'get',
130             get_processes => 'values',
131             get_pids => 'keys',
132             set_process => 'set',
133             },
134             is => 'ro',
135             isa => 'HashRef',
136             traits => [qw(Hash)],
137             );
138              
139             =head2 gearman
140              
141             Instance of L<Gearman::Driver::Adaptor>.
142              
143             =cut
144              
145             has 'gearman' => (
146             is => 'ro',
147             isa => 'Gearman::Driver::Adaptor',
148             );
149              
150             =head2 session
151              
152             Instance of L<POE::Session>.
153              
154             =cut
155              
156             has 'session' => (
157             is => 'ro',
158             isa => 'POE::Session',
159             );
160              
161             =head2 lastrun
162              
163             Each time this job is called it stores C<time()> in this attribute.
164              
165             =cut
166              
167             has 'lastrun' => (
168             default => 0,
169             is => 'rw',
170             isa => 'Int',
171             );
172              
173             =head2 lasterror
174              
175             Each time this job failed it stores C<time()> in this attribute.
176              
177             =cut
178              
179             has 'lasterror' => (
180             default => 0,
181             is => 'rw',
182             isa => 'Int',
183             );
184              
185             =head2 lasterror_msg
186              
187             Each time this job failed it stores the error message in this
188             attribute.
189              
190             =cut
191              
192             has 'lasterror_msg' => (
193             default => '',
194             is => 'rw',
195             isa => 'Str',
196             );
197              
198             =head2 worker
199              
200             Reference to the worker object.
201              
202             =cut
203              
204             has 'worker' => (
205             is => 'rw',
206             isa => 'Any',
207             required => 1,
208             );
209              
210             =head1 METHODS
211              
212             =head2 add_process
213              
214             Starts/forks/adds another process of this job.
215              
216             =cut
217              
218             sub add_process {
219             my ($self) = @_;
220             POE::Kernel->post( $self->session => 'add_process' );
221             }
222              
223             =head2 remove_process
224              
225             Removes/kills one process of this job.
226              
227             =cut
228              
229             sub remove_process {
230             my ($self) = @_;
231             POE::Kernel->post( $self->session => 'remove_process' );
232             }
233              
234             sub BUILD {
235             my ($self) = @_;
236              
237             $self->{gearman} = Gearman::Driver::Adaptor->new( server => $self->driver->server );
238              
239             foreach my $method ( @{ $self->methods } ) {
240             $self->gearman->add_function( $method->name => $method->wrapper );
241             }
242              
243             $self->{session} = POE::Session->create(
244             object_states => [
245             $self => {
246             _start => '_start',
247             got_process_stdout => '_on_process_stdout',
248             got_process_stderr => '_on_process_stderr',
249             got_process_close => '_on_process_close',
250             got_process_signal => '_on_process_signal',
251             add_process => '_add_process',
252             remove_process => '_remove_process',
253             }
254             ]
255             );
256             }
257              
258             sub _start {
259             $_[KERNEL]->alias_set( $_[OBJECT]->name );
260             }
261              
262             sub _add_process {
263             my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
264             my $process = POE::Wheel::Run->new(
265             Program => sub {
266             POE::Kernel->stop();
267              
268             if ( my $process_name = $self->worker->process_name( $0, $self->name ) ) {
269             $0 = $process_name;
270             }
271              
272             $self->gearman->work;
273             },
274             StdoutEvent => "got_process_stdout",
275             StderrEvent => "got_process_stderr",
276             CloseEvent => "got_process_close",
277             );
278             $kernel->sig_child( $process->PID, "got_process_signal" );
279              
280             # Wheel events include the wheel's ID.
281             $heap->{wheels}{ $process->ID } = $process;
282              
283             $self->log->info( sprintf '(%d) [%s] Process started', $process->PID, $self->name );
284              
285             $self->set_process( $process->PID => $process );
286             }
287              
288             sub _remove_process {
289             my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
290             my ($pid) = ( $self->get_pids )[0];
291             return unless $pid;
292             my $process = $self->delete_process($pid);
293             $process->kill();
294             $self->log->info( sprintf '(%d) [%s] Process killed', $process->PID, $self->name );
295             }
296              
297             sub _on_process_stdout {
298             my ( $self, $heap, $stdout, $wid ) = @_[ OBJECT, HEAP, ARG0, ARG1 ];
299             my $process = $heap->{wheels}{$wid};
300             my ( $attr, $value ) = $stdout =~ /^(\w+) (.*?)$/;
301             return if !defined $attr || !defined $value;
302             $self->$attr($value) if $self->can($attr);
303             }
304              
305             sub _on_process_stderr {
306             my ( $self, $heap, $stderr, $wid ) = @_[ OBJECT, HEAP, ARG0, ARG1 ];
307             my $process = $heap->{wheels}{$wid};
308             $self->log->info( sprintf '(%d) [%s] STDERR: %s', $process->PID, $self->name, $stderr );
309             }
310              
311             sub _on_process_close {
312             my ( $self, $heap, $wid ) = @_[ OBJECT, HEAP, ARG0 ];
313              
314             my $process = delete $heap->{wheels}{$wid};
315              
316             # May have been reaped by got_process_signal
317             return unless defined $process;
318              
319             $self->delete_process( $process->PID );
320             }
321              
322             sub _on_process_signal {
323             my ( $self, $heap, $pid, $status ) = @_[ OBJECT, HEAP, ARG1 .. ARG2 ];
324              
325             my $process = $self->delete_process($pid);
326              
327             $self->log->info( sprintf '(%d) [%s] Exited with status %s', $pid, $self->name, $status );
328              
329             # May have been reaped by got_process_close
330             return unless defined $process;
331              
332             delete $heap->{wheels}{ $process->ID };
333             }
334              
335             no Moose;
336              
337             __PACKAGE__->meta->make_immutable;
338              
339             =head1 AUTHOR
340              
341             See L<Gearman::Driver>.
342              
343             =head1 COPYRIGHT AND LICENSE
344              
345             See L<Gearman::Driver>.
346              
347             =head1 SEE ALSO
348              
349             =over 4
350              
351             =item * L<Gearman::Driver>
352              
353             =item * L<Gearman::Driver::Adaptor>
354              
355             =item * L<Gearman::Driver::Console>
356              
357             =item * L<Gearman::Driver::Console::Basic>
358              
359             =item * L<Gearman::Driver::Console::Client>
360              
361             =item * L<Gearman::Driver::Job::Method>
362              
363             =item * L<Gearman::Driver::Loader>
364              
365             =item * L<Gearman::Driver::Observer>
366              
367             =item * L<Gearman::Driver::Worker>
368              
369             =back
370              
371             =cut
372              
373             1;