File Coverage

blib/lib/Queue/Q/DistFIFO.pm
Criterion Covered Total %
statement 114 117 97.4
branch 19 26 73.0
condition 11 17 64.7
subroutine 19 19 100.0
pod 0 12 0.0
total 163 191 85.3


line stmt bran cond sub pod time code
1             package Queue::Q::DistFIFO;
2 3     3   2003 use strict;
  3         7  
  3         92  
3 3     3   18 use warnings;
  3         7  
  3         97  
4 3     3   16 use Carp qw(croak);
  3         66  
  3         166  
5              
6 3     3   19 use List::Util ();
  3         5  
  3         66  
7 3     3   17 use Scalar::Util qw(refaddr blessed);
  3         4  
  3         200  
8              
9             use Class::XSAccessor {
10 3         26 getters => [qw(shards next_shard)],
11 3     3   18 };
  3         5  
12              
13             sub new {
14 2     2 0 10934 my $class = shift;
15 2         11 my $self = bless({
16             @_,
17             next_shard => 0,
18             } => $class);
19              
20 2 50 33     24 if (not defined $self->{shards}
      33        
21             or not ref($self->{shards}) eq 'ARRAY'
22 2         13 or not @{$self->{shards}})
23             {
24 0         0 croak("Need 'shards' parameter being an array of shards");
25             }
26              
27 2         4 $self->{shards_order} = [ List::Util::shuffle( @{$self->shards} ) ];
  2         54  
28              
29 2         8 return $self;
30             }
31              
32             sub _next_shard {
33 378     378   559 my $self = shift;
34 378         616 my $ns = $self->{next_shard};
35 378         523 my $so = $self->{shards_order};
36 378 100       541 if ($ns > $#{$so}) {
  378         1026  
37 74         129 $ns = $self->{next_shard} = 0;
38             }
39 378         609 ++$self->{next_shard};
40 378         1101 return $so->[$ns];
41             }
42              
43             sub enqueue_item {
44 4     4 0 675 my $self = shift;
45 4 50       14 croak("Need exactly one item to enqeue")
46             if not @_ == 1;
47 4         13 return $self->_next_shard->enqueue_item($_[0]);
48             }
49              
50             sub enqueue_items {
51 14     14 0 1346 my $self = shift;
52 14 50       39 return if not @_;
53 14         21 my @rv;
54 14         54 push @rv, $self->_next_shard->enqueue_item($_) for @_;
55 14         46 return @rv;
56             }
57              
58             sub enqueue_items_strict_ordering {
59 6     6 0 536 my $self = shift;
60 6 50       16 return if not @_;
61 6         23 my $shard = $self->_next_shard;
62 6         24 return $shard->enqueue_items(@_);
63             }
64              
65             sub claim_item {
66 66     66 0 18534 my $self = shift;
67             # FIXME very inefficient!
68 66         163 my $shard = $self->_next_shard;
69 66         170 my $first_shard_addr = refaddr($shard);
70 66         86 my $class;
71 66         98 while (1) {
72 120         398 my $item = $shard->claim_item;
73 120 100       326 if (defined $item) {
74 64 100 66     426 $item->{_shard} = $shard
75             if blessed($item)
76             and $item->isa('Queue::Q::ClaimFIFO::Item');
77 64         233 return $item;
78             }
79 56         131 $shard = $self->_next_shard;
80 56 100       198 return undef if refaddr($shard) == $first_shard_addr;
81             }
82             }
83              
84             sub claim_items {
85 6     6 0 3098 my ($self, $n) = @_;
86 6   100     36 $n ||= 1;
87              
88 6         27 my $nshards = $self->num_shards;
89 6         27 my $at_a_time = int( $n / $nshards );
90 6         20 my $left_over = $n % $nshards;
91 6         25 my @shard_items = (($at_a_time) x $nshards);
92 6         38 ++$shard_items[$_] for 0 .. ($left_over-1);
93              
94 6         13 my @elem;
95              
96 6         21 my $shard = $self->_next_shard;
97 6         55 my $first_shard_addr = refaddr($shard);
98 6         15 my $i = 0;
99 6         15 my $nmissing = 0;
100 6         12 while (1) {
101 18         39 my $thisn = $shard_items[$i];
102 18         86 my @items = $shard->claim_items($thisn);
103 18         44 $shard_items[$i] -= scalar @items;
104 18         40 $nmissing += $shard_items[$i];
105             @items = map {
106 18 100 66     74 $_->{_shard} = $shard
  22         195  
107             if blessed($_)
108             and $_->isa('Queue::Q::ClaimFIFO::Item');
109 22         81 $_
110             } @items;
111 18         42 push @elem, @items;
112 18         55 $shard = $self->_next_shard;
113 18 100 100     140 last if scalar(@elem) == $n
114             or refaddr($shard) == $first_shard_addr;
115 12         28 ++$i;
116             }
117              
118             # Fall back to naive mode - this could be done much
119             # better by redistributing the remaining items to the
120             # shards that had data... FIXME
121 6         20 for (1 .. $nmissing) {
122 2         14 my $item = $self->claim_item;
123 2 50       12 last if not defined $item;
124 0         0 push @elem, $item;
125             }
126              
127 6         59 return @elem;
128             }
129              
130             sub flush_queue {
131 4     4 0 1027 my $self = shift;
132 4         10 my $shards = $self->{shards};
133 4         16 for my $i (0..$#$shards) {
134 20         68 $shards->[$i]->flush_queue;
135             }
136 4         13 return();
137             }
138              
139             sub queue_length {
140 22     22 0 173 my $self = shift;
141 22         46 my $shards = $self->{shards};
142 22         33 my $len = 0;
143 22         73 for my $i (0..$#$shards) {
144 110         368 $len += $shards->[$i]->queue_length;
145             }
146 22         131 return $len;
147             }
148              
149             sub claimed_count {
150 10     10 0 24 my $self = shift;
151 10         25 my $shards = $self->{shards};
152 10         20 my $ccount = 0;
153 10         42 for my $i (0..$#$shards) {
154 50         120 my $shard = $shards->[$i];
155 50         198 my $meth = $shard->can("claimed_count");
156 50 50       189 if (not $meth) {
157 0         0 Carp::croak("Shard $i does not support claimed count. Is it of type NaiveFIFO?");
158             }
159 50         182 $ccount += $meth->($shard);
160             }
161 10         83 return $ccount;
162             }
163              
164             sub mark_item_as_done {
165 43     43 0 15208 my $self = shift;
166 43         65 my $item = shift;
167 43         81 my $shard = delete $item->{_shard};
168 43 50       140 die "Need item's shard to mark it as done! "
169             . "Or was this item previously marked as done?" if not $shard;
170 43         143 $shard->mark_item_as_done($item);
171             }
172              
173             sub mark_items_as_done {
174 1     1 0 2051 my $self = shift;
175 1         6 $self->mark_item_as_done($_) for @_;
176             }
177              
178             sub num_shards {
179 6     6 0 18 my $self = shift;
180 6         15 return scalar(@{ $self->{shards} });
  6         26  
181             }
182              
183             1;