File Coverage

lib/DR/TarantoolQueue/Worker.pm
Criterion Covered Total %
statement 27 116 23.2
branch 0 44 0.0
condition 0 13 0.0
subroutine 9 16 56.2
pod 3 3 100.0
total 39 192 20.3


line stmt bran cond sub pod time code
1 1     1   31492 use utf8;
  1         2  
  1         7  
2 1     1   29 use strict;
  1         2  
  1         18  
3 1     1   4 use warnings;
  1         1  
  1         32  
4             package DR::TarantoolQueue::Worker;
5 1     1   5 use Carp;
  1         2  
  1         43  
6 1     1   5 use Mouse;
  1         2  
  1         5  
7 1     1   804 use Coro;
  1         7032  
  1         64  
8 1     1   419 use Data::Dumper;
  1         5325  
  1         84  
9 1     1   11 use Encode qw(encode_utf8);
  1         2  
  1         63  
10 1     1   7 use List::MoreUtils 'any';
  1         3  
  1         10  
11             with 'DR::TarantoolQueue::Worker::QueueList';
12              
13             =head1 NAME
14              
15             DR::TarantoolQueue::Worker - template for workers
16              
17             =head1 SYNOPSIS
18              
19             my $worker = DR::TarantoolQueue::Worker->new(
20             count => 10, # defaults 1
21             queue => $queue
22             );
23              
24             sub process {
25             my ($task) = @_;
26              
27              
28             ... do something with task
29              
30              
31             }
32              
33             $worker->run(\&process)
34              
35             =head1 DESCRIPTION
36              
37             =over
38              
39             =item *
40              
41             Process function can throw exception. The task will be buried (if process
42             function didn't change task status yet.
43              
44             =item *
45              
46             If process function didn't change task status (didn't call B, or
47             L) worker calls
48             L.
49              
50             =item *
51              
52             L method catches B and B and waits for all process
53             functions are done and then do return.
54              
55             =item *
56              
57             Worker uses default B and B in queue. So You have to define
58             them in Your queue or here.
59              
60             =back
61              
62             =head1 ATTRIBUTES
63              
64             =cut
65              
66             =head2 count
67              
68             Count of process functions that can do something at the same time.
69             Default value is B<1>. The attribute means something if Your B
70             function uses L and Your queue uses L, too.
71              
72             =cut
73              
74             has count => isa => 'Num', is => 'rw', default => 1;
75              
76              
77             =head2 queues
78              
79             List of queues.
80              
81             =cut
82              
83              
84             =head2 space & tube
85              
86             Space and tube for processing queue.
87              
88             =cut
89              
90             has space => isa => 'Str|Undef', is => 'ro';
91             has tube => isa => 'Str|Undef', is => 'ro';
92              
93             =head2 restart
94              
95             The function will be called if L is reached.
96              
97             =cut
98              
99             has restart => isa => 'CodeRef|Undef', is => 'rw';
100              
101             =head2 restart_limit
102              
103             How many tasks can be processed before restart worker.
104              
105             If B is 0, restart mechanizm will be disabled.
106              
107             If L callback isn't defined, restart mechanizm will be disabled.
108              
109             Each processed task increments common taskcounter. When B is
110             reached by the counter, worker don't take new task and call L
111             function. After L worker will continue to process tasks.
112              
113             In L callback user can do L or L
114             to avoid memory leaks.
115              
116             DR::TarantoolQueue::Worker->new(
117             restart_limit => 100,
118             restart => sub { exec perl => $0 },
119             queue => $q,
120             count => 10
121             )->run(sub { ... });
122              
123             =cut
124              
125             has restart_limit => isa => 'Num', is => 'rw', default => 0;
126              
127             =head1 PRIVATE ATTRIBUTES
128              
129             =head2 timeout
130              
131             timeout for queue.take
132              
133             =cut
134              
135             has timeout => isa => 'Num', is => 'ro', default => 2;
136              
137             =head2 is_run
138              
139             B means that workers are run
140              
141             =cut
142              
143             has is_run => isa => 'Bool', is => 'rw', default => 0;
144              
145             =head2 is_stopping
146              
147             B means that workers are stopping (by B/B/L)
148              
149             =cut
150              
151             has is_stopping => isa => 'Bool', is => 'rw', default => 0;
152              
153              
154             has stop_waiters => isa => 'ArrayRef', is => 'ro', default => sub {[]};
155              
156              
157             has mailto => isa => 'Maybe[Str]', is => 'ro';
158             has mailfrom => isa => 'Maybe[Str]', is => 'ro';
159             has mailsublect => isa => 'Str', is => 'ro', default => 'Worker died';
160             has mailheaders => isa => 'HashRef[Str]', is => 'ro', default => sub {{}};
161              
162             has restart_check => isa => 'CodeRef', is => 'ro', default => sub {sub { 0 }};
163              
164             =head1 METHODS
165              
166             =head2 run(CODEREF[, CODEREF])
167              
168             Run workers. Two arguments:
169              
170             =over
171              
172             =item process function
173              
174             Function will receive three arguments:
175              
176             =over
177              
178             =item task
179              
180             =item queue
181              
182             =item task number
183              
184             =back
185              
186             =item debug function
187              
188             The function can be used to show internal debug messages.
189              
190             =over
191              
192             =item *
193              
194             Debug messages aren't finished by B (C<\n>).
195              
196             =item *
197              
198             The function will be called as L.
199              
200             =back
201              
202             =back
203              
204             =cut
205              
206             sub run {
207 0     0 1   my ($self, $cb, $debugf) = @_;
208 0 0         croak 'process subroutine is not CODEREF' unless 'CODE' eq ref $cb;
209       0     $debugf = sub { }
210 0 0         unless defined $debugf;
211 0 0         croak 'debugf subroutine is not CODEREF' unless 'CODE' eq ref $debugf;
212              
213 0 0         croak 'worker is already run' if $self->is_run;
214              
215             local $SIG{TERM} = sub {
216 0     0     $debugf->('SIGTERM was received, stopping...');
217 0           $self->is_stopping( 1 )
218 0           };
219             local $SIG{INT} = sub {
220 0     0     $debugf->('SIGINT was received, stopping...');
221 0           $self->is_stopping( 1 )
222 0           };
223              
224            
225 0           $self->is_run( 1 );
226 0           $self->is_stopping( 0 );
227              
228 0           my $no;
229             my @f;
230 0           while(1) {
231 0           ($no, @f) = (0);
232              
233 0           for (1 .. $self->count) {
234 0           for my $q (@{ $self->queue }) {
  0            
235             push @f => async {
236 0   0 0     while($self->is_run and !$self->is_stopping) {
237 0 0 0       last if $self->restart and $no >= $self->restart_limit;
238 0 0 0       last if $self->restart and $self->restart_check->();
239 0 0         my $task = $q->take(
    0          
240             defined($self->space) ? (space => $self->space) : (),
241             defined($self->tube) ? (tube => $self->tube) : (),
242             timeout => $self->timeout,
243             );
244 0 0         next unless $task;
245              
246 0           $no++;
247 0           eval {
248 0           $cb->( $task, $q, $no );
249             };
250              
251 0 0         if ($@) {
252 0           my $err = $@;
253 0           $debugf->('Worker was died (%s)', $@);
254 0           eval {
255 0           $self->sendmail(
256             $task,
257             sprintf "Worker was died: %s", $err
258             );
259             };
260 0 0         if ($@) {
261 0           $debugf->("Can't send mail (%s)", $@);
262             }
263 0 0         if (any { $_ eq $task->status } 'work', 'taken') {
  0            
264 0           eval { $task->bury };
  0            
265 0 0         if ($@) {
266 0           $debugf->("Can't bury task %s: %s",
267             $task->id, $@);
268             }
269             }
270 0           next;
271             }
272 0 0         if (any { $_ eq $task->status } 'work', 'taken') {
  0            
273 0           eval { $task->ack };
  0            
274 0 0         if ($@) {
275 0           $debugf->("Can't ack task %s: %s", $task->id, $@);
276             }
277 0           next;
278             }
279             }
280             }
281 0           }
282             }
283              
284 0           $_->join for @f;
285              
286 0 0         last unless $self->is_run;
287 0 0         last if $self->is_stopping;
288 0 0         last unless $self->restart;
289 0 0         last unless $no >= $self->restart_limit;
290 0           $self->restart->( );
291             }
292              
293 0           $self->is_run( 0 );
294 0           $self->is_stopping( 0 );
295 0           while(@{ $self->stop_waiters }) {
  0            
296 0           my $w = shift @{ $self->stop_waiters };
  0            
297 0           $w->ready;
298             }
299 0           return $self->count;
300             }
301              
302              
303             =head2 sendmail
304              
305             Send mail about worker crash
306              
307             =cut
308              
309             sub sendmail {
310 0     0 1   my ($self, $task, $error) = @_;
311 0 0         return unless $self->mailto;
312 0 0         return unless $self->mailfrom;
313              
314 0           my $subject = encode_utf8 $self->mailsublect;
315              
316 0           require MIME::Lite;
317 0           require MIME::Words;
318              
319 0           $subject .= sprintf' (space: %s, tube: %s)', $task->space, $task->tube;
320 0           $subject = MIME::Words::encode_mimeword($subject, 'B', 'utf-8');
321              
322 0   0       my $mail = MIME::Lite->new(
      0        
323             From => $self->mailfrom || 'dimka@uvw.ru',
324             To => $self->mailto || 'dimka@uvw.ru',
325             Subject => $subject,
326             Type => 'multipart/fixed',
327             );
328              
329 0           local $Data::Dumper::Indent = 1;
330 0           local $Data::Dumper::Terse = 1;
331 0           local $Data::Dumper::Useqq = 1;
332 0           local $Data::Dumper::Deepcopy = 1;
333 0           local $Data::Dumper::Maxdepth = 0;
334              
335              
336 0           $mail->attach(
337             Type => 'text/plain; charset=utf-8',
338             Data => encode_utf8($error),
339             );
340 0           $mail->attach(
341             Type => 'text/plain; charset=utf-8',
342             Filename => 'task.dump.txt',
343             Disposition => 'inline',
344             Data => Dumper($task),
345             );
346              
347 0           $mail->add($_ => $self->mailheaders->{$_}) for keys %{ $self->mailheaders };
  0            
348 0           $mail->send;
349             }
350              
351             =head2 stop
352              
353             Stop worker cycle
354              
355             =cut
356              
357             sub stop {
358 0     0 1   my ($self) = @_;
359 0 0         return 0 unless $self->is_run;
360 0           $self->is_stopping( 1 );
361 0           push @{ $self->stop_waiters } => $Coro::current;
  0            
362 0           Coro::schedule;
363 0           return $self->is_run;
364             }
365              
366             __PACKAGE__->meta->make_immutable();
367