File Coverage

blib/lib/Coro/PriorityQueue.pm
Criterion Covered Total %
statement 77 91 84.6
branch 17 24 70.8
condition 9 12 75.0
subroutine 19 22 86.3
pod 7 15 46.6
total 129 164 78.6


line stmt bran cond sub pod time code
1             package Coro::PriorityQueue;
2              
3 1     1   674 use strict;
  1         1  
  1         39  
4 1     1   7 use warnings;
  1         2  
  1         33  
5 1     1   6 use Carp;
  1         10  
  1         76  
6 1     1   6 use Coro;
  1         2  
  1         130  
7 1     1   699 use Coro::Semaphore;
  1         246  
  1         36  
8 1     1   961 use POSIX qw(floor);
  1         6789  
  1         8  
9              
10             our $VERSION = 1.1;
11              
12             sub new {
13 4     4 1 189465 my ($class, $max) = @_;
14 4 50 33     35 croak 'expected positive int for $max'
15             unless defined $max && $max > 0;
16              
17             # Pre-allocate array
18 4         6 my @arr;
19 4         17 $#arr = $max - 1;
20              
21 4         46 return bless [
22             \@arr, # data
23             0, # item count
24             $max, # max items
25             Coro::Semaphore->new($max), # slots available
26             Coro::Semaphore->new(0), # items ready
27             0, # shutdown
28             ], $class;
29             }
30              
31 14     14 1 77 sub count { $_[0]->[1] }
32 2     2 0 15 sub max { $_[0]->[2] }
33 7     7 1 1160 sub is_empty { $_[0]->count == 0 }
34 2     2 1 14 sub is_full { $_[0]->count >= $_[0]->max }
35 0     0 0 0 sub peek { $_[0]->[0][$_[1]] }
36 34     34 0 1174 sub is_shutdown { $_[0]->[5] };
37              
38 14     14 0 164 sub slots_up { Coro::Semaphore::up $_[0]->[3] }
39 20     20 0 74 sub slots_down { Coro::Semaphore::down $_[0]->[3] }
40 18     18 0 41 sub items_up { Coro::Semaphore::up $_[0]->[4] }
41 16     16 0 340 sub items_down { Coro::Semaphore::down $_[0]->[4] }
42              
43             sub shutdown {
44 1     1 1 8 my $self = shift;
45 1         3 $self->[5] = 1;
46 1         4 Coro::Semaphore::adjust $self->[3], 999_999_999;
47 1         3 Coro::Semaphore::adjust $self->[4], 999_999_999;
48             }
49              
50             sub insert {
51 20     20 1 13981 my ($self, $item) = @_;
52 20 50       53 croak 'cannot insert undef' unless defined $item;
53              
54             # Wait for an available slot
55 20         45 $self->slots_down;
56 19 100       389 croak 'queue shut down' if $self->is_shutdown;
57              
58 18         23 ++$self->[1];
59              
60             # Place item at the bottom of the heap and sift up
61 18         24 my $arr = $self->[0];
62 18         26 my $idx = $self->[1] - 1;
63 18 100       78 my $parent = $idx == 0 ? undef : floor(($idx - 1) / 2);
64              
65 18         35 $self->[0][$idx] = $item;
66              
67 18   100     163 while (defined $parent && $arr->[$idx] < $arr->[$parent]) {
68 6         15 @$arr[$idx, $parent] = @$arr[$parent, $idx];
69 6         7 $idx = $parent;
70 6 50       41 $parent = $idx == 0 ? undef : floor(($idx - 1) / 2);
71             }
72              
73             # Signal waiters
74 18         35 $self->items_up;
75              
76 18         49 return $self->[1];
77             }
78              
79             sub remove {
80 16     16 1 11018 my $self = shift;
81              
82             # Wait for an item to be available
83 16         40 $self->items_down;
84 15 100 100     38 return if $self->is_shutdown && $self->is_empty;
85              
86 14         21 my $item = shift @{$self->[0]};
  14         37  
87 14         18 --$self->[1];
88              
89             # Move the last item to the root
90 14         20 unshift @{$self->[0]}, pop @{$self->[0]};
  14         24  
  14         29  
91              
92             # Sift down
93 14         17 my $idx = 0;
94 14         27 my $last = $self->[1] - 1;
95 14         19 my $arr = $self->[0];
96              
97 14         16 while (1) {
98 27         42 my $l = $idx * 2 + 1;
99 27         170 my $r = $idx * 2 + 2;
100              
101 27 100 66     305 last if $l > $last && $r > $last;
102              
103 16         18 my $least;
104 16 100       31 if ($r > $last) {
105 5         8 $least = $l;
106             } else {
107 11 100       48 $least = $arr->[$l] <= $arr->[$r] ? $l : $r;
108             }
109              
110 16 100       37 if ($arr->[$idx] > $arr->[$least]) {
111 13         232 @$arr[$idx, $least] = @$arr[$least, $idx];
112 13         20 $idx = $least;
113             } else {
114 3         7 last;
115             }
116             }
117              
118             # Signal waiters
119 14         33 $self->slots_up;
120              
121 14         51 return $item;
122             }
123              
124             sub dump {
125 0     0 0   my $self = shift;
126 0           printf "Heap (%d/%d)\n", $self->count, $self->max;
127 0           $self->_dump(0, 0);
128             }
129              
130             sub _dump {
131 0     0     my ($self, $idx, $indent) = @_;
132 0 0         return unless defined $self->peek($idx);
133              
134 0 0         if ($indent > 0) {
135 0           print ' ' for (1 .. $indent);
136             }
137              
138 0           print '-' . $self->peek($idx);
139 0           print "\n";
140              
141 0           my $l = $idx * 2 + 1;
142 0           my $r = $idx * 2 + 2;
143 0           $self->_dump($l, $indent + 1);
144 0           $self->_dump($r, $indent + 1);
145             }
146              
147             1;
148             __END__