File Coverage

blib/lib/Data/Monad/CondVar.pm
Criterion Covered Total %
statement 202 208 97.1
branch 27 32 84.3
condition 9 12 75.0
subroutine 55 59 93.2
pod 6 8 75.0
total 299 319 93.7


line stmt bran cond sub pod time code
1             package Data::Monad::CondVar;
2 28     28   89485 use strict;
  28         61  
  28         1565  
3 28     28   147 use warnings;
  28         59  
  28         718  
4 28     28   829 use 5.012;
  28         98  
  28         1021  
5 28     28   61810 use AnyEvent;
  28         224138  
  28         8613  
6 28     28   381 use Scalar::Util;
  28         64  
  28         2831  
7 28     28   622 use Exporter qw/import/;
  28         59  
  28         25525  
8              
9             our $VERSION = "0.06";
10             our @EXPORT = qw/as_cv cv_unit cv_zero cv_fail cv_flat_map_multi cv_map_multi
11             cv_sequence call_cc/;
12              
13             sub _assert_cv($) {
14 271 50   271   1058 $_[0]->ready and die "[BUG]It already has been ready";
15 271         2175 $_[0];
16             }
17              
18             sub as_cv(&) {
19 1     1 0 13 my $code = shift;
20 1         33 $code->(my $cv = AE::cv);
21 1         5275 $cv;
22             }
23              
24 92     92 1 56140 sub cv_unit { AnyEvent::CondVar->unit(@_) }
25 1     1 1 4094 sub cv_zero { AnyEvent::CondVar->zero(@_) }
26 19     19 1 7617 sub cv_fail { AnyEvent::CondVar->fail(@_) }
27 0     0 1 0 sub cv_flat_map_multi(&@) { AnyEvent::CondVar->flat_map_multi(@_) }
28 2     2 1 5908 sub cv_map_multi(&@) { AnyEvent::CondVar->map_multi(@_) }
29 1     1 1 13887 sub cv_sequence { AnyEvent::CondVar->sequence(@_) }
30              
31             sub call_cc(&) {
32 4     4 0 1776 my $f = shift;
33 4         143 my $ret_cv = AE::cv;
34              
35             my $skip = sub {
36 1     1   12 my @v = @_;
37 1         2 _assert_cv $ret_cv;
38 1         4 $ret_cv->send(@v);
39              
40 1         52 return AE::cv; # nop
41 4         5034 };
42              
43             my $branch_cv = $f->($skip)->map(sub {
44 1     1   12 _assert_cv $ret_cv;
45 1         3 $ret_cv->send(@_);
46             })->catch(sub {
47 1     1   2 _assert_cv $ret_cv;
48 1         3 $ret_cv->croak(@_);
49 1         10 cv_unit; # void
50 4     0   19 })->catch(sub { warn @_ });
  0         0  
51 4     1   52 $ret_cv->canceler(sub { $branch_cv->cancel });
  1         5  
52              
53 4         15 return $ret_cv;
54             }
55              
56              
57             package Data::Monad::CondVar::Mixin;
58 28     28   728 use strict;
  28         60  
  28         2679  
59 28     28   171 use warnings;
  28         52  
  28         1266  
60 28     28   177 use Carp ();
  28         72  
  28         503  
61 28     28   417 use Scalar::Util ();
  28         76  
  28         496  
62 28     28   228 use AnyEvent ();
  28         66  
  28         79864  
63              
64             # extends AE::cv directly
65             require Data::Monad::Base::MonadZero;
66             for my $mixin (__PACKAGE__, 'Data::Monad::Base::MonadZero') {
67             next if grep { $_ eq $mixin } @AnyEvent::CondVar::ISA;
68             push @AnyEvent::CondVar::ISA, $mixin;
69             }
70              
71             our $ZERO = "[ZERO of ${\ __PACKAGE__}]";
72              
73             *_assert_cv = \&Data::Monad::CondVar::_assert_cv;
74              
75             sub _cv_or_die($) {
76 258 100 66 258   8866 Scalar::Util::blessed $_[0] && $_[0]->isa('AnyEvent::CondVar')
77             or Carp::croak "You must use CondVar object here.";
78             }
79              
80             sub unit {
81 197     197   7266 my $class = shift;
82 197         13196 (my $cv = AE::cv)->send(@_);
83 197         53104 return $cv;
84             }
85              
86             sub fail {
87 26     26   62 my $class = shift;
88              
89             # XXX cv's croak doesn't throw the error if the message is empty.
90 26   66     122 my $msg = $_[0] || $ZERO;
91 26         4484 (my $cv = AE::cv)->croak($msg);
92              
93 26         6766 return $cv;
94             }
95              
96 6     6   384 sub zero { $_[0]->fail($ZERO) }
97              
98             sub any {
99 12     12   110 my ($class, @cvs) = @_;
100              
101 12         1147 my $result_cv = AE::cv;
102 12         78 my @any_cv;
103 12         33 for (@cvs) {
104             push @any_cv, $_->map(sub {
105 9 50   9   131 return unless $result_cv;
106 9         27 _assert_cv $result_cv;
107 9         39 $result_cv->send(@_);
108 9         374 $result_cv->cancel;
109             })->catch(sub {
110 1 50   1   5 return unless $result_cv;
111 1         3 _assert_cv $result_cv;
112 1         4 $result_cv->croak(@_);
113 1         11 $result_cv->cancel;
114 1         6 return $class->unit;
115 24     0   182 })->catch(sub { warn @_ });
  0         0  
116             }
117 12     11   59 $result_cv->canceler(sub { $_->cancel for @any_cv });
  11         206  
118              
119 12         61 $result_cv;
120             }
121              
122             sub all {
123 10     10   34 my ($class, @cvs) = @_;
124              
125 10         29 my @result;
126             (my $result_cv = AE::cv)->begin(sub {
127 7     7   64 my ($cv) = @_;
128 7         25 _assert_cv $cv;
129 7         45 $cv->send(@result);
130 10         837 });
131 10         1078 for my $i (0 .. $#cvs) {
132 22         74 $result_cv->begin;
133              
134             $cvs[$i]->map(sub {
135 16     16   425 $result[$i] = [@_];
136 16         91 $result_cv->end;
137             })->catch(sub {
138 2     2   8 _assert_cv $result_cv;
139 2         8 $result_cv->croak(@_);
140 2         35 $result_cv->cancel;
141 2         21 return $class->unit;
142 22     0   963 })->catch(sub { warn @_ });
  0         0  
143             }
144 10         320 $result_cv->end;
145              
146 10     5   171 $result_cv->canceler(sub { $_->cancel for @cvs });
  5         41  
147              
148 10         58 $result_cv;
149             }
150              
151 14   100 14   48 sub cancel { (delete $_[0]->{_monad_canceler} || sub {})->() }
  206     206   1947  
152              
153             sub canceler {
154 541     541   1173 my $cv = shift;
155 541 50       1971 @_ and $cv->{_monad_canceler} = shift;
156 541         970 $cv->{_monad_canceler};
157             }
158              
159             sub _add_cb {
160 689     689   2673 my ($self, $cb) = @_;
161 689 100       2522 if(my $old_cb = $self->cb) {
162             $self->cb(sub {
163 6     6   82 $old_cb->(@_);
164 6         733 $cb->(@_);
165 6         69 });
166             } else {
167 683         14887 $self->cb($cb);
168             }
169             }
170              
171             sub flat_map {
172 298     298   187709 my ($self, $f) = @_;
173              
174 298         10929 my $cv_bound = AE::cv;
175 298         1852 my $cv_current = $self;
176             $self->_add_cb(sub {
177 256     256   344505 my $cv = $cv_current = eval {
178 256         1678 my ($cv) = $f->($_[0]->recv);
179 238         1998 _cv_or_die $cv;
180 237         706 $cv;
181             };
182              
183 256 100       4493 if ($@) {
184 19         478 _assert_cv $cv_bound;
185 19         72 return $cv_bound->croak($@);
186             }
187             $cv->_add_cb(sub {
188 208         5701 my @v = eval { $_[0]->recv };
  208         1004  
189 208         6221 _assert_cv $cv_bound;
190 208 100       1437 $@ ? $cv_bound->croak($@) : $cv_bound->send(@v);
191 237         1691 });
192 298         2467 });
193             $cv_bound->canceler(sub {
194 86     86   219 $cv_current->cancel;
195 86         2209 $cv_current->cb(undef); # release the callback function
196 298         11032 });
197              
198 298         2266 return $cv_bound;
199             }
200              
201             sub or {
202 9     9   20 my ($self, $alter) = @_;
203              
204 9         273 my $cv_mixed = AE::cv;
205             $self->_add_cb(sub {
206 8     8   84 my @v = eval { $_[0]->recv };
  8         37  
207 8 100       1264 unless ($@) {
    50          
208 4         19 $cv_mixed->(@v);
209             } elsif ($@ =~ /\Q$ZERO\E/) {
210             $alter->_add_cb(sub {
211 3         798 my @v = eval { $_[0]->recv };
  3         15  
212 3         275 _assert_cv $cv_mixed;
213 3 100       21 $@ ? $cv_mixed->croak($@) : $cv_mixed->(@v);
214 4         32 });
215             } else {
216 0         0 _assert_cv $cv_mixed;
217 0         0 $cv_mixed->croak($@);
218             }
219 9         90 });
220 9     4   121 $cv_mixed->canceler(sub { $_->cancel for $self, $alter });
  4         25  
221              
222 9         78 $cv_mixed;
223             }
224              
225             sub catch {
226 122     122   8331 my ($self, $f) = @_;
227              
228 122         4570 my $result_cv = AE::cv;
229 122         1194 my $active_cv = $self;
230             $self->_add_cb(sub {
231 63     63   784 my @v = eval { $_[0]->recv };
  63         200  
232 63 100       3334 my $exception = $@ or return $result_cv->(@v);
233              
234 20         38 my $cv = $active_cv = eval {
235 20         64 my $cv = $f->($exception);
236 20         200 _cv_or_die $cv;
237 19         48 $cv;
238             };
239 20 100       105 $@ and return (_assert_cv $result_cv)->croak($@);
240              
241             $cv->_add_cb(sub {
242 18         127 my @v = eval { $_[0]->recv };
  18         62  
243 18         488 _assert_cv $result_cv;
244 18 100       75 $@ ? $result_cv->croak($@) : $result_cv->send(@v);
245 19         106 });
246 122         1348 });
247             $result_cv->canceler(sub {
248 52     52   137 $active_cv->cancel;
249 52         973 $active_cv->cb(undef); # release the callback function
250 122         1470 });
251              
252 122         688 return $result_cv;
253             }
254              
255             sub sleep {
256 87     87   274 my ($self, $sec) = @_;
257             $self->flat_map(sub {
258 86     86   1660 my @v = @_;
259 86         3625 my $cv = AE::cv;
260 86         535 my $t; $t = AE::timer $sec, 0, sub { $cv->(@v); undef $cv };
  86         1093  
  59         1770605  
  59         4160  
261 86         701 $cv->canceler(sub { undef $t });
  33         90  
262 86         212 return $cv;
263 87         604 });
264             }
265              
266             sub timeout {
267 8     8   19 my ($self, $sec) = @_;
268 8         16 my $class = ref $self;
269              
270 8         62 $class->any($class->unit->sleep($sec), $self);
271             }
272              
273             sub retry {
274 4     4   38 my $self = shift;
275 4         6 my $f = pop;
276 4         10 my ($retry, $pace) = @_;
277 4   50     13 $retry ||= 1;
278 4   100     23 $pace ||= 0;
279 4         8 my $class = ref $self;
280              
281             $self->flat_map(sub {
282 4     4   39 my @args = @_;
283              
284             $class->unit($retry, undef, undef)->while(
285             sub {
286 15         255 my ($retry, $ok, $fail) = @_;
287 15 100       155 ! $ok and $retry > 0;
288             }, sub {
289 11         40 my ($retry, $ok, $fail) = @_;
290             $f->(@args)->flat_map(sub {
291 3         36 $class->unit($retry - 1, [@_], undef);
292             })->catch(sub {
293 8         33 $class->unit($retry - 1, undef, [@_])
294             ->sleep($pace);
295 11         131 });
296             }
297             )->flat_map(sub {
298 4         37 my ($retry, $ok, $fail) = @_;
299 4 100       17 $fail ? $class->fail(@$fail) : $class->unit(@$ok);
300 4         15 });
301 4         33 });
302             }
303              
304             1;
305              
306             __END__