File Coverage

lib/DR/TarantoolQueue.pm
Criterion Covered Total %
statement 24 195 12.3
branch 0 114 0.0
condition 0 34 0.0
subroutine 8 25 32.0
pod 8 8 100.0
total 40 376 10.6


line stmt bran cond sub pod time code
1             package DR::TarantoolQueue;
2 5     5   500049 use utf8;
  5         12  
  5         30  
3 5     5   155 use strict;
  5         10  
  5         93  
4 5     5   22 use warnings;
  5         8  
  5         116  
5 5     5   1421 use Mouse;
  5         116775  
  5         28  
6 5     5   2297 use Carp;
  5         13  
  5         378  
7 5     5   2672 use JSON::XS;
  5         34372  
  5         514  
8             require DR::TarantoolQueue::Task;
9             $Carp::Internal{ (__PACKAGE__) }++;
10              
11             our $VERSION = '0.44';
12 5     5   42 use feature 'state';
  5         13  
  5         14349  
13              
14             =head1 NAME
15              
16             DR::TarantoolQueue - client for tarantool's queue
17              
18              
19             =head1 SYNOPSIS
20              
21             my $queue = DR::TarantoolQueue->new(
22             host => 'tarantool.host',
23             port => 33014,
24             tube => 'request_queue',
25             space => 11,
26              
27             connect_opts => { # see perldoc DR::Tarantool
28             reconnect_period => 1,
29             reconnect_always => 1
30             }
31             );
32              
33              
34             # put empty task into queue with name 'request_queue'
35             my $task = $queue->put;
36              
37             my $task = $queue->put(data => [ 1, 2, 3 ]);
38              
39             printf "task.id = %s\n", $task->id;
40              
41             =head2 DESCRIPTION
42              
43             The module contains sync and async (coro) driver for tarantool queue.
44              
45             =head1 ATTRIBUTES
46              
47             =head2 host (ro) & port (ro)
48              
49             Tarantool's parameters.
50              
51             =head2 connect_opts (ro)
52              
53             Additional options for L. HashRef.
54              
55             =head2 fake_in_test (ro, default=true)
56              
57             Start fake tarantool (only for msgpack) if C<($0 =~ /\.t$/)>.
58              
59             For the case the driver uses the following lua code:
60            
61             log.info('Fake Queue starting')
62            
63             box.cfg{ listen = os.getenv('PRIMARY_PORT') }
64            
65             box.schema.user.create('test', { password = 'test' })
66             box.schema.user.grant('test', 'read,write,execute', 'universe')
67            
68             _G.queue = require('megaqueue')
69             queue:init()
70            
71             log.info('Fake Queue started')
72              
73             =head2 msgpack (ro)
74              
75             If true, the driver will use L driver (C<1.6>). Also it will use
76             L lua
77             module with namespace C.
78              
79             =head2 coro (ro)
80              
81             If B (default) the driver will use L tarantool's driver,
82             otherwise the driver will use sync driver.
83              
84             =head2 ttl (rw)
85              
86             Default B for tasks.
87              
88             =head2 ttr (rw)
89              
90             Default B for tasks.
91              
92             =head2 pri (rw)
93              
94             Default B for tasks.
95              
96             =head2 delay (rw)
97              
98             Default B for tasks.
99              
100             =head2 space (rw)
101              
102             Default B for tasks.
103              
104             =head2 tube (rw)
105              
106             Default B for tasks.
107              
108              
109             =head2 defaults
110              
111             Defaults for queues. B. Key is tube name. Value is a hash with
112             the following fields:
113              
114             =over
115              
116             =item ttl
117              
118             =item ttr
119              
120             =item delay
121              
122             =item pri
123              
124             =back
125              
126             Methods L (L) use these parameters if they
127             are absent (otherwise it uses the same global attributes).
128              
129             =cut
130              
131             with 'DR::TarantoolQueue::JSE';
132              
133             has host => is => 'ro', isa => 'Maybe[Str]';
134             has port => is => 'ro', isa => 'Maybe[Str]';
135             has user => is => 'ro', isa => 'Maybe[Str]';
136             has password => is => 'ro', isa => 'Maybe[Str]';
137              
138             has coro => is => 'ro', isa => 'Bool', default => 1;
139              
140             has ttl => is => 'rw', isa => 'Maybe[Num]';
141             has ttr => is => 'rw', isa => 'Maybe[Num]';
142             has pri => is => 'rw', isa => 'Maybe[Num]';
143             has delay => is => 'rw', isa => 'Maybe[Num]';
144             has space => is => 'rw', isa => 'Maybe[Str]';
145             has tube => is => 'rw', isa => 'Maybe[Str]';
146             has connect_opts => is => 'ro', isa => 'HashRef', default => sub {{}};
147              
148             has defaults => is => 'ro', isa => 'HashRef', default => sub {{}};
149             has msgpack => is => 'ro', isa => 'Bool', default => 0;
150              
151             # если $0 =~ /\.t$/ то будет запускать фейковый тарантул
152             has fake_in_test => is => 'ro', isa => 'Bool', default => 1;
153              
154              
155             sub _check_opts($@) {
156 0     0     my $h = shift;
157 0           my %can = map { ($_ => 1) } @_;
  0            
158              
159 0           for (keys %$h) {
160 0 0         next if $can{$_};
161 0           croak 'unknown option: ' . $_;
162             }
163             }
164              
165             sub _producer_messagepack {
166 0     0     my ($self, $method, $o) = @_;
167              
168 0           state $alias = {
169             urgent => 'put',
170             };
171              
172 0 0         $method = $alias->{$method} if exists $alias->{$method};
173              
174 0           _check_opts $o, qw(space tube delay ttl ttr pri data domain);
175            
176 0           my $tube = $o->{tube};
177 0 0         $tube = $self->tube unless defined $tube;
178 0 0         croak 'tube was not defined' unless defined $tube;
179              
180 0           for ('ttl', 'delay', 'ttr', 'pri') {
181 0           my $n = $_;
182              
183 0           my $res;
184              
185 0 0         if (exists $o->{$n}) {
186 0           $res = $o->{$n};
187             } else {
188 0 0         if (exists $self->defaults->{ $tube }) {
189 0 0         if (exists $self->defaults->{ $tube }{ $n }) {
190 0           $res = $self->defaults->{ $tube }{ $n };
191             } else {
192 0           $res = $self->$n;
193             }
194             } else {
195 0           $res = $self->$n;
196             }
197             }
198 0   0       $res ||= 0;
199            
200 0 0         if ($res == 0) {
201 0           delete $o->{ $n };
202             } else {
203 0           $o->{ $n } = $res;
204             }
205             }
206              
207              
208             my $task = $self->tnt->call_lua(
209             ["queue:$method" => 'MegaQueue'],
210             $tube,
211             $o,
212             $self->jse->encode($o->{data})
213 0           );
214              
215              
216 0           DR::TarantoolQueue::Task->tuple_messagepack($task->[0], $self);
217             }
218              
219             sub _producer {
220 0     0     my ($self, $method, $o) = @_;
221              
222 0 0         goto \&_producer_messagepack if $self->msgpack;
223              
224 0           _check_opts $o, qw(space tube delay ttl ttr pri data domain);
225              
226 0           my $space = $o->{space};
227 0 0         $space = $self->space unless defined $space;
228 0 0         croak 'space was not defined' unless defined $space;
229              
230 0           my $tube = $o->{tube};
231 0 0         $tube = $self->tube unless defined $tube;
232 0 0         croak 'tube was not defined' unless defined $tube;
233              
234 0           my ($ttl, $ttr, $pri, $delay);
235              
236 0           for ([\$ttl, 'ttl'], [\$delay, 'delay'], [\$ttr, 'ttr'], [\$pri, 'pri']) {
237 0           my $rv = $_->[0];
238 0           my $n = $_->[1];
239              
240 0 0         if (exists $o->{$n}) {
241 0           $$rv = $o->{$n};
242             } else {
243 0 0         if (exists $self->defaults->{ $tube }) {
244 0 0         if (exists $self->defaults->{ $tube }{ $n }) {
245 0           $$rv = $self->defaults->{ $tube }{ $n };
246             } else {
247 0           $$rv = $self->$n;
248             }
249             } else {
250 0           $$rv = $self->$n;
251             }
252             }
253 0   0       $$rv ||= 0;
254              
255             }
256              
257              
258             my $tuple = $self->tnt->call_lua(
259             "queue.$method" => [
260             $space,
261             $tube,
262             $delay,
263             $ttl,
264             $ttr,
265             $pri,
266             $self->jse->encode($o->{data})
267 0           ]
268             );
269              
270 0           return DR::TarantoolQueue::Task->tuple($tuple, $space, $self);
271             }
272              
273             =head1 METHODS
274              
275             =head2 new
276              
277             my $q = DR::TarantoolQueue->new(host => 'abc.com', port => 123);
278              
279             Creates new queue(s) accessor.
280              
281             =cut
282              
283             =head2 dig
284              
285             $q->dig(task => $task);
286             $task->dig; # the same
287              
288             $q->dig(id => $task->id);
289             $q->dig(id => $task->id, space => $task->space);
290              
291             'Dig up' a buried task. Checks, that the task is buried.
292             The task status is changed to ready.
293              
294             =head2 unbury
295              
296             Is a synonym of L.
297              
298              
299             =head2 delete
300              
301             $q->delete(task => $task);
302             $task->delete; # the same
303              
304             $q->delete(id => $task->id);
305             $q->delete(id => $task->id, space => $task->space);
306              
307             Delete a task from the queue (regardless of task state or status).
308              
309             =head2 peek
310              
311             $q->peek(task => $task);
312             $task->peek; # the same
313              
314             $q->peek(id => $task->id);
315             $q->peek(id => $task->id, space => $task->space);
316              
317             Return a task by task id.
318              
319              
320             =head2 statistics
321              
322             my $s = $q->statistics;
323             my $s = $q->statistics(space => 123);
324             my $s = $q->statistics(space => 123, tube => 'abc');
325             my $s = DR::TarantoolQueue->statistics(space => 123);
326             my $s = DR::TarantoolQueue->statistics(space => 123, tube => 'abc');
327              
328             Return queue module statistics, since server start.
329             The statistics is broken down by queue id.
330             Only queues on which there was some activity are
331             included in the output.
332              
333              
334             =cut
335              
336             sub _statistics_msgpack {
337 0     0     my ($self, %o) = @_;
338              
339 0           _check_opts \%o, qw(tube);
340              
341             my $list = $self->tnt->call_lua(
342             ["queue:stats" => 'MegaQueueStats'], $o{tube}
343 0           );
344              
345 0           my %res = map { ($_->{tube}, $_->{counters}) } @$list;
  0            
346 0           return \%res;
347             }
348              
349             sub statistics {
350 0     0 1   my ($self, %o) = @_;
351 0 0         goto \&_statistics_msgpack if $self->msgpack;
352 0           _check_opts \%o, qw(space tube);
353 0 0         unless (exists $o{space}) {
354 0 0         $o{space} = $self->space if ref $self;
355             }
356 0 0         unless (exists $o{tube}) {
357 0 0         $o{tube} = $self->tube if ref $self;
358             }
359              
360             croak 'space was not defined'
361 0 0 0       if defined $o{tube} and !defined $o{space};
362              
363             my $raw = $self->tnt->call_lua(
364             "queue.statistics" => [
365             defined($o{space}) ? $o{space} : (),
366             defined($o{tube}) ? $o{tube} : ()
367 0 0         ]
    0          
368             )->raw;
369 0           return { @$raw };
370             }
371              
372              
373              
374              
375             =head2 get_meta
376              
377             Task was processed (and will be deleted after the call).
378              
379             my $m = $q->get_meta(task => $task);
380             my $m = $q->get_meta(id => $task->id);
381              
382             Returns a hashref with fields:
383              
384              
385             =over
386              
387             =item id
388              
389             task id
390              
391             =item tube
392              
393             queue id
394              
395             =item status
396              
397             task status
398              
399             =item event
400              
401             time of the next important event in task life time, for example,
402             when ttl or ttr expires, in microseconds since start of the UNIX epoch.
403              
404             =item ipri
405              
406             internal value of the task priority
407              
408             =item pri
409              
410             task priority as set when the task was added to the queue
411              
412             =item cid
413              
414             consumer id, of the consumer which took the task (only if the task is taken)
415              
416             =item created
417              
418             time when the task was created (microseconds since start of the UNIX epoch)
419              
420             =item ttl
421              
422             task time to live (microseconds)
423              
424             =item ttr
425              
426             task time to run (microseconds)
427              
428             =item cbury
429              
430             how many times the task was buried
431              
432             =item ctaken
433              
434             how many times the task was taken
435              
436             =item now
437              
438             time recorded when the meta was called
439              
440             =back
441              
442             =cut
443              
444             sub get_meta {
445 0     0 1   my ($self, %o) = @_;
446 0           _check_opts \%o, qw(task id space);
447 0 0 0       croak 'task was not defined' unless $o{task} or $o{id};
448              
449 0           my ($id, $space, $tube);
450 0 0         if ($o{task}) {
451             ($id, $space, $tube) = ($o{task}->id,
452 0           $o{task}->space, $o{task}->tube);
453             } else {
454 0           ($id, $space, $tube) = @o{'id', 'space', 'tube'};
455 0 0         $space = $self->space unless defined $o{space};
456 0 0         croak 'space is not defined' unless defined $space;
457 0 0         $tube = $self->tube unless defined $tube;
458             }
459              
460              
461 0           my $fields = [
462             { name => 'id', type => 'STR' },
463             { name => 'tube', type => 'STR' },
464             { name => 'status', type => 'STR' },
465             { name => 'event', type => 'NUM64' },
466             { name => 'ipri', type => 'STR', },
467             { name => 'pri', type => 'STR', },
468             { name => 'cid', type => 'NUM', },
469             { name => 'created', type => 'NUM64', },
470             { name => 'ttl', type => 'NUM64' },
471             { name => 'ttr', type => 'NUM64' },
472             { name => 'cbury', type => 'NUM' },
473             { name => 'ctaken', type => 'NUM' },
474             { name => 'now', type => 'NUM64' },
475             ];
476 0           my $tuple = $self->tnt->call_lua(
477             "queue.meta" => [ $space, $id ], fields => $fields
478             )->raw;
479              
480              
481 0           return { map { ( $fields->[$_]{name}, $tuple->[$_] ) } 0 .. $#$fields };
  0            
482             }
483              
484              
485              
486              
487             =head1 Producer methods
488              
489             =head2 put
490              
491             $q->put;
492             $q->put(data => { 1 => 2 });
493             $q->put(space => 1, tube => 'abc',
494             delay => 10, ttl => 3600,
495             ttr => 60, pri => 10, data => [ 3, 4, 5 ]);
496             $q->put(data => 'string');
497              
498              
499             Enqueue a task. Returns new L object.
500             The list of fields with task data (C<< data => ... >>) is optional.
501              
502              
503             If 'B' and (or) 'B' aren't defined the method
504             will try to use them from L object.
505              
506             =cut
507              
508             sub put {
509 0     0 1   my ($self, %opts) = @_;
510 0           return $self->_producer(put => \%opts);
511             }
512              
513             =head2 put_unique
514              
515             $q->put_unique(data => { 1 => 2 });
516             $q->put_unique(space => 1, tube => 'abc',
517             delay => 10, ttl => 3600,
518             ttr => 60, pri => 10, data => [ 3, 4, 5 ]);
519             $q->put_unique(data => 'string');
520              
521              
522             Enqueue an unique task. Returns new L object,
523             if it was not enqueued previously. Otherwise it will return existing task.
524             The list of fields with task data (C<< data => ... >>) is optional.
525              
526              
527             If 'B' and (or) 'B' aren't defined the method
528             will try to use them from L object.
529              
530             =cut
531              
532             sub put_unique {
533 0     0 1   my ($self, %opts) = @_;
534 0           return $self->_producer(put_unique => \%opts);
535             }
536              
537             =head2 urgent
538              
539             Enqueue a task. The task will get the highest priority.
540             If delay is not zero, the function is equivalent to
541             L.
542              
543             =cut
544              
545             sub urgent {
546 0     0 1   my ($self, %opts) = @_;
547 0           return $self->_producer(urgent => \%opts);
548             }
549              
550              
551             =head1 Consumer methods
552              
553             =head2 take
554              
555             my $task = $q->take;
556             my $task = $q->take(timeout => 0.5);
557             my $task = $q->take(space => 1, tube => 'requests, timeout => 20);
558              
559             If there are tasks in the queue ready for execution,
560             take the highest-priority task. Otherwise, wait for
561             a ready task to appear in the queue, and, as soon as
562             it appears, mark it as taken and return to the consumer.
563             If there is a timeout, and the task doesn't appear until
564             the timeout expires, returns B. If timeout is not
565             given, waits indefinitely.
566              
567             All the time while the consumer is working on a task,
568             it must keep the connection to the server open. If a
569             connection disappears while the consumer is still
570             working on a task, the task is put back on the ready list.
571              
572             =cut
573              
574             sub _take_messagepack {
575 0     0     my ($self, %o) = @_;
576            
577 0           _check_opts \%o, qw(tube timeout);
578            
579 0 0         $o{tube} = $self->tube unless defined $o{tube};
580 0 0         croak 'tube was not defined' unless defined $o{tube};
581 0   0       $o{timeout} ||= 0;
582              
583              
584             my $tuples = $self->tnt->call_lua(
585             ['queue:take' => 'MegaQueue'] => $o{tube}, $o{timeout}
586 0           );
587              
588 0 0 0       if (@$tuples and $tuples->[0]{tube} ne $o{tube}) {
589             warn sprintf "take(%s, timeout => %s) returned task.tube == %s\n",
590             $o{tube},
591             $o{timeout} // 'undef',
592 0   0       $tuples->[0]{tube} // 'undef';
      0        
593             }
594 0           return DR::TarantoolQueue::Task->tuple_messagepack($tuples->[0], $self);
595             }
596              
597             sub take {
598 0     0 1   my ($self, %o) = @_;
599 0 0         goto \&_take_messagepack if $self->msgpack;
600              
601 0           _check_opts \%o, qw(space tube timeout);
602 0 0         $o{space} = $self->space unless defined $o{space};
603 0 0         croak 'space was not defined' unless defined $o{space};
604 0 0         $o{tube} = $self->tube unless defined $o{tube};
605 0 0         croak 'tube was not defined' unless defined $o{tube};
606 0   0       $o{timeout} ||= 0;
607              
608              
609             my $tuple = $self->tnt->call_lua(
610             'queue.take' => [
611             $o{space},
612             $o{tube},
613             $o{timeout}
614 0           ]
615             );
616              
617              
618 0           return DR::TarantoolQueue::Task->tuple($tuple, $o{space}, $self);
619             }
620              
621              
622             =head2 ack
623              
624             $q->ack(task => $task);
625             $task->ack; # the same
626              
627             $q->ack(id => $task->id);
628             $q->ack(space => $task->space, id => $task->id);
629              
630              
631             Confirm completion of a task. Before marking a task as
632             complete, this function verifies that:
633              
634             =over
635              
636             =item *
637              
638             the task is taken
639              
640             =item *
641              
642             the consumer that is confirming the task is the one which took it
643              
644             =back
645              
646             Consumer identity is established using a session identifier.
647             In other words, the task must be confirmed by the same connection
648             which took it. If verification fails, the function returns an error.
649              
650             On success, deletes the task from the queue. Throws an exception otherwise.
651              
652              
653             =head2 requeue
654              
655             $q->requeue(task => $task);
656             $task->requeue; # the same
657              
658             $q->requeue(id => $task->id);
659             $q->requeue(id => $task->id, space => $task->space);
660              
661             Return a task to the queue, the task is not executed.
662             Puts the task at the end of the queue, so that it's executed
663             only after all existing tasks in the queue are executed.
664              
665              
666             =head2 bury
667              
668             $q->bury(task => $task);
669             $task->bury; # the same
670              
671             $q->bury(id => $task->id);
672             $q->bury(id => $task->id, space => $task->space);
673              
674             Mark a task as B. This special status excludes the task
675             from the active list, until it's dug up. This function is useful
676             when several attempts to execute a task lead to a failure. Buried
677             tasks can be monitored by the queue owner, and treated specially.
678              
679              
680             =cut
681              
682             sub _task_method_messagepack {
683 0     0     my ($self, $m, %o) = @_;
684 0           _check_opts \%o, qw(task id);
685 0 0 0       croak 'task was not defined' unless $o{task} or $o{id};
686              
687 0           my $id;
688 0 0         if ($o{task}) {
689 0           $id = $o{task}->id;
690             } else {
691 0           $id = $o{id};
692             }
693              
694 0           state $alias = { requeue => 'release' };
695              
696 0 0         $m = $alias->{$m} if exists $alias->{$m};
697              
698 0           my $tuples = $self->tnt->call_lua( [ "queue:$m" => 'MegaQueue' ] => $id );
699 0           my $task = DR::TarantoolQueue::Task->tuple_messagepack($tuples->[0], $self);
700              
701 0 0         if ($m eq 'delete') {
    0          
702 0           $task->_set_status('removed');
703             } elsif ($m eq 'ack') {
704 0           $task->_set_status('ack(removed)');
705             }
706 0           $task;
707             }
708              
709             sub _task_method {
710 0     0     my ($self, $m, %o) = @_;
711            
712 0 0         goto \&_task_method_messagepack if $self->msgpack;
713              
714 0           _check_opts \%o, qw(task id space);
715 0 0 0       croak 'task was not defined' unless $o{task} or $o{id};
716              
717 0           my ($id, $space);
718 0 0         if ($o{task}) {
719 0           ($id, $space) = ($o{task}->id, $o{task}->space);
720             } else {
721 0           ($id, $space) = @o{'id', 'space'};
722 0 0         $space = $self->space unless defined $o{space};
723 0 0         croak 'space is not defined' unless defined $space;
724             }
725              
726 0           my $tuple = $self->tnt->call_lua( "queue.$m" => [ $space, $id ] );
727 0           my $task = DR::TarantoolQueue::Task->tuple($tuple, $space, $self);
728              
729 0 0         if ($m eq 'delete') {
    0          
730 0           $task->_set_status('removed');
731             } elsif ($m eq 'ack') {
732 0           $task->_set_status('ack(removed)');
733             }
734 0           $task;
735             }
736              
737              
738             for my $m (qw(ack requeue bury dig unbury delete peek)) {
739 5     5   48 no strict 'refs';
  5         12  
  5         3533  
740             next if *{ __PACKAGE__ . "::$m" }{CODE};
741             *{ __PACKAGE__ . "::$m" } = sub {
742 0     0     splice @_, 1, 0, $m;
743 0           goto \&_task_method;
744             }
745             }
746              
747              
748             =head2 release
749              
750             $q->release(task => $task);
751             $task->release; # the same
752              
753             $q->release(id => $task->id, space => $task->space);
754             $q->release(task => $task, delay => 10); # delay the task
755             $q->release(task => $task, ttl => 3600); # append task's ttl
756              
757             Return a task back to the queue: the task is not executed.
758             Additionally, a new time to live and re-execution delay can be provided.
759              
760             =cut
761              
762             sub _release_messagepack {
763 0     0     my ($self, %o) = @_;
764 0           _check_opts \%o, qw(task id delay);
765 0   0       $o{delay} ||= 0;
766 0           my $id;
767 0 0         if ($o{task}) {
768 0           $id = $o{task}->id;
769             } else {
770 0           $id = $o{id};
771             }
772             my $tuples = $self->tnt->call_lua(
773 0           ['queue:release' => 'MegaQueue'], $id, $o{delay});
774            
775 0           return DR::TarantoolQueue::Task->tuple_messagepack($tuples->[0], $self);
776             }
777              
778             sub release {
779 0     0 1   my ($self, %o) = @_;
780 0 0         goto \&_release_messagepack if $self->msgpack;
781 0           _check_opts \%o, qw(task id space ttl delay);
782 0   0       $o{delay} ||= 0;
783 0           my ($id, $space);
784 0 0         if ($o{task}) {
785 0           ($id, $space) = ($o{task}->id, $o{task}->space);
786             } else {
787 0           ($id, $space) = @o{'id', 'space'};
788 0 0         $space = $self->space unless defined $o{space};
789 0 0         croak 'space is not defined' unless defined $space;
790             }
791             my $tuple = $self->tnt->call_lua('queue.release' =>
792 0   0       [ $space, $id, $o{delay}, $o{ttl} || () ]
793             );
794 0           return DR::TarantoolQueue::Task->tuple($tuple, $space, $self);
795             }
796              
797              
798              
799             =head2 done
800              
801             $q->done(task => $task, data => { result => '123' });
802             $task->done(data => { result => '123' }); # the same
803             $q->done(id => $task->id, space => $task->space);
804              
805             Mark a task as complete (done), but don't delete it. Replaces task
806             data with the supplied B.
807              
808             =cut
809              
810             sub done {
811 0     0 1   my ($self, %o) = @_;
812 0           _check_opts \%o, qw(task id space data);
813 0           my ($id, $space);
814 0 0         if ($o{task}) {
815 0           ($id, $space) = ($o{task}->id, $o{task}->space);
816             } else {
817 0           ($id, $space) = @o{'id', 'space'};
818 0 0         $space = $self->space unless defined $o{space};
819 0 0         croak 'space is not defined' unless defined $space;
820             }
821             my $tuple = $self->tnt->call_lua('queue.done' =>
822 0           [ $space, $id, $self->jse->encode($o{data}) ]
823             );
824 0           return DR::TarantoolQueue::Task->tuple($tuple, $space, $self);
825             }
826              
827              
828             =head1 COPYRIGHT AND LICENCE
829              
830             Copyright (C) 2012 by Dmitry E. Oboukhov
831             Copyright (C) 2012 by Roman V. Nikolaev
832              
833             This library is free software; you can redistribute it and/or modify
834             it under the same terms as Perl itself, either Perl version 5.8.8 or,
835             at your option, any later version of Perl 5 you may have available.
836              
837             =cut
838              
839             with 'DR::TarantoolQueue::Tnt';
840              
841             __PACKAGE__->meta->make_immutable();