File Coverage

blib/lib/Deeme/Worker.pm
Criterion Covered Total %
statement 40 40 100.0
branch 7 10 70.0
condition 2 4 50.0
subroutine 8 8 100.0
pod 6 6 100.0
total 63 68 92.6


line stmt bran cond sub pod time code
1             package Deeme::Worker;
2 1     1   348 use Deeme::Obj "Deeme";
  1         1  
  1         5  
3 1   50 1   5 use constant DEBUG => $ENV{DEEME_DEBUG} || 0;
  1         1  
  1         536  
4              
5             sub jobs {
6 3     3 1 6 my $j;
7 3 100       46 return ( $j = shift->backend->events_get(shift) ) ? scalar @$j : 0;
8             }
9              
10             sub dequeue_event {
11 1     1 1 2 my ( $self, $name ) = @_;
12 1 50       15 if ( my $s = $self->backend->events_get($name) ) {
13 1         1 warn
14             "Worker -- dequeue $name in @{[blessed $self]} (@{[scalar @$s]})\n"
15             if DEBUG;
16 1         17 my @onces = $self->backend->events_onces($name);
17 1         1 my $i = 0;
18 1         2 for my $cb (@$s) {
19 2 50 50     9 ( $onces[$i] == 1 )
20             ? ( splice( @onces, $i, 1 )
21             and $self->_unsubscribe_index( $name => $i ) )
22             : $i++;
23 2         6 push(
24 2         3 @{ $self->{'queue'} },
25             Deeme::Job->new(
26             deeme => $self,
27             cb => $cb
28             )
29             );
30             }
31             }
32 1         1 return @{ $self->{'queue'} };
  1         4  
33             }
34              
35             sub dequeue {
36 10     10 1 454 my ( $self, $name ) = @_;
37              
38 10 100       186 if ( my $s = $self->backend->events_get($name) ) {
39 5         4 warn
40             "Worker -- dequeue $name in @{[blessed $self]} safely (@{[scalar @$s]})\n"
41             if DEBUG;
42 5         21 my $cb = Deeme::Job->new(
43             deeme => $self,
44             cb => @$s[0]
45             );
46 5         16 $self->_unsubscribe_index( $name, 0 );
47 5         6 push( @{ $self->{'queue'} }, $cb );
  5         10  
48             }
49 10         8 return @{ $self->{'queue'} }[0];
  10         22  
50             }
51              
52             sub process {
53 3     3 1 8 my $self = shift;
54 3         8 return @{ $self->{'queue'} } > 0
  3         10  
55 3 50       3 ? @{ $self->{'queue'} }[0]->process(@_)
56             : undef;
57             }
58              
59             sub process_all {
60 1     1 1 2 my $self = shift;
61 1         2 my @args = @_;
62 1         1 my @returns;
63 1         1 while ( my $job = shift @{ $self->{'queue'} } ) {
  3         9  
64 2         4 push( @returns, $job->process(@args) );
65             }
66 1         2 return @returns;
67             }
68              
69 7     7 1 433 sub add { return shift->once(@_) }
70             1;
71             __END__