File Coverage

blib/lib/Data/Queue/Batch.pm
Criterion Covered Total %
statement 50 52 96.1
branch 12 12 100.0
condition 6 8 75.0
subroutine 14 16 87.5
pod 11 11 100.0
total 93 99 93.9


line stmt bran cond sub pod time code
1             package Data::Queue::Batch;
2 4     4   61415 use 5.008001;
  4         11  
3 4     4   12 use strict;
  4         5  
  4         67  
4 4     4   20 use warnings;
  4         4  
  4         1772  
5              
6             our $VERSION = "0.02";
7              
8             sub new {
9 20     20 1 12201 my ($class, %args) = @_;
10 20   50     56 my $batch_size = delete($args{batch_size}) || 100;
11 20         22 my $callback = delete($args{callback});
12 20         293 return bless {
13             callback => $callback,
14             batch_size => $batch_size,,
15             %args,
16             available => 0,
17             _queue => [],
18             }, $class;
19             }
20              
21 95     95 1 2938 sub size { scalar(@{ shift->{_queue} }) }
  95         192  
22 0     0 1 0 sub available { shift->{available} }
23 0     0 1 0 sub batch_size { shift->{batch_size} }
24              
25 51     51 1 4108 sub push { shift->enqueue(@_) }
26             sub enqueue {
27 56     56 1 73 my ($self, @values) = @_;
28 56         45 CORE::push(@{ $self->{_queue} }, @values);
  56         94  
29              
30 56         73 my $unmarked = $self->size - $self->{available};
31 56         56 my $marking = $unmarked - ($unmarked % $self->{batch_size});
32 56         50 $self->{available} += $marking;
33            
34 56 100 66     127 if ($self->{callback} && $self->{available}) {
35 19         29 $self->{callback}->($self->_take($self->{available}));
36             }
37 56         550 return;
38             }
39              
40 23     23 1 45 sub shift { shift->dequeue(@_) }
41             sub dequeue {
42 28     28 1 27 my ($self) = @_;
43 28 100       149 return unless $self->{available};
44 15         19 my ($dequeued) = $self->_take(1);
45 15         65 return $dequeued;
46             }
47              
48             sub peek {
49 4     4 1 6 my ($self, $count) = @_;
50 4 100       22 $count = $self->{available} if $count > $self->{available};
51 4         7 my @peeked = @{$self->{_queue}}[0 .. $count - 1];
  4         10  
52 4         18 return @peeked;
53             }
54              
55             sub flush {
56 20     20 1 599 my ($self) = @_;
57 20         36 my @taken = $self->_take($self->size);
58 20 100 100     70 if ($self->{callback} && @taken) {
59 17         217 $self->{callback}->(@taken);
60             }
61 20         260 return @taken;
62             }
63              
64             sub clear {
65 1     1 1 2 my ($self) = @_;
66 1         3 $self->_take($self->size);
67 1         13 return;
68             }
69              
70             sub _take {
71 55     55   41 my ($self, $count) = @_;
72 55         35 my @taken = splice(@{ $self->{_queue} }, 0, $count);
  55         85  
73 55         52 $self->{available} -= $count;
74 55 100       84 $self->{available} = 0 if $self->{available} < 0;
75 55         299 return @taken;
76             }
77              
78             sub DESTROY {
79 20     20   1723 my ($self) = @_;
80 20 100       98 $self->flush if $self->{callback};
81             }
82              
83             1;
84             __END__