File Coverage

blib/lib/Ryu/Node.pm
Criterion Covered Total %
statement 61 61 100.0
branch 32 38 84.2
condition 16 25 64.0
subroutine 17 17 100.0
pod 7 11 63.6
total 133 152 87.5


line stmt bran cond sub pod time code
1             package Ryu::Node;
2              
3 37     37   15365 use strict;
  37         76  
  37         995  
4 37     37   172 use warnings;
  37         61  
  37         1755  
5              
6             our $VERSION = '3.002'; # VERSION
7             our $AUTHORITY = 'cpan:TEAM'; # AUTHORITY
8              
9             =head1 NAME
10              
11             Ryu::Node - generic node
12              
13             =head1 DESCRIPTION
14              
15             This is a common base class for all sources, sinks and other related things.
16             It does very little.
17              
18             =cut
19              
20 37     37   25435 use Future;
  37         466063  
  37         1475  
21 37     37   318 use Scalar::Util qw(refaddr);
  37         73  
  37         31152  
22              
23             =head1 METHODS
24              
25             Not really. There's a constructor, but that's not particularly exciting.
26              
27             =cut
28              
29             sub new {
30 141     141 0 7556 bless {
31             pause_propagation => 1,
32             @_[1..$#_]
33             }, $_[0]
34             }
35              
36             =head2 describe
37              
38             Returns a string describing this node and any parents - typically this will result in a chain
39             like C<< from->combine_latest->count >>.
40              
41             =cut
42              
43             # It'd be nice if L already provided a method for this, maybe I should suggest it
44             sub describe {
45 517     517 1 866 my ($self) = @_;
46 517 100       865 ($self->parent ? $self->parent->describe . '=>' : '') . $self->label . '(' . $self->completed->state . ')';
47             }
48              
49             =head2 completed
50              
51             Returns a L indicating completion (or failure) of this stream.
52              
53             =cut
54              
55             sub completed {
56 551     551 1 29924 my ($self) = @_;
57 551         897 return $self->_completed->without_cancel;
58             }
59              
60             # Internal use only, since it's cancellable
61             sub _completed {
62 1559     1559   12006 my ($self) = @_;
63 1559   66     5098 $self->{completed} //= do {
64 132         396 my $f = $self->new_future(
65             'completion'
66             );
67 132 50       3196 $f->on_ready(
68             $self->curry::weak::cleanup
69             ) if $self->can('cleanup');
70 132 50       6298 $self->prepare_await if $self->can('prepare_await');
71 132         1186 $f
72             }
73             }
74              
75             =head2 pause
76              
77             Does nothing useful.
78              
79             =cut
80              
81             sub pause {
82 18     18 1 2755 my ($self, $src) = @_;
83 18   100     98 my $k = refaddr($src) // 0;
84              
85 18   66     60 my $was_paused = $self->{is_paused} && keys %{$self->{is_paused}};
86 18 50       46 unless($was_paused) {
87 18 100 66     81 delete $self->{unblocked} if $self->{unblocked} and $self->{unblocked}->is_ready;
88             }
89 18         114 ++$self->{is_paused}{$k};
90 18 100       44 if(my $parent = $self->parent) {
91 10 100       49 $parent->pause($self) if $self->{pause_propagation};
92             }
93 18 100       51 if(my $flow_control = $self->{flow_control}) {
94 4 50       15 $flow_control->emit(0) unless $was_paused;
95             }
96             $self
97 18         33 }
98              
99             =head2 resume
100              
101             Is about as much use as L.
102              
103             =cut
104              
105             sub resume {
106 22     22 1 6408 my ($self, $src) = @_;
107 22   100     102 my $k = refaddr($src) // 0;
108 22 50       95 delete $self->{is_paused}{$k} unless --$self->{is_paused}{$k} > 0;
109 22 50 33     67 unless($self->{is_paused} and keys %{$self->{is_paused}}) {
  22         80  
110 22         58 my $f = $self->_unblocked;
111 22 100       328 $f->done unless $f->is_ready;
112 22 100       326 if(my $parent = $self->parent) {
113 12 100       50 $parent->resume($self) if $self->{pause_propagation};
114             }
115 22 100       64 if(my $flow_control = $self->{flow_control}) {
116 4         8 $flow_control->emit(1);
117             }
118             }
119             $self
120 22         49 }
121              
122             =head2 unblocked
123              
124             Returns a L representing the current flow control state of this node.
125              
126             It will be L if this node is currently paused,
127             otherwise L.
128              
129             =cut
130              
131             sub unblocked {
132             # Since we don't want stray callers to affect our internal state, we always return
133             # a non-cancellable version of our internal Future.
134             shift->_unblocked->without_cancel
135 6     6 1 24 }
136              
137             sub _unblocked {
138 28     28   47 my ($self) = @_;
139             # Since we don't want stray callers to affect our internal state, we always return
140             # a non-cancellable version of our internal Future.
141 28   66     116 $self->{unblocked} //= do {
142 24 100       52 $self->is_paused
143             ? $self->new_future
144             : Future->done
145             };
146             }
147              
148             =head2 is_paused
149              
150             Might return 1 or 0, but is generally meaningless.
151              
152             =cut
153              
154             sub is_paused {
155 95     95 1 9720 my ($self, $obj) = @_;
156 95 100       201 return keys %{ $self->{is_paused} } ? 1 : 0 unless defined $obj;
  80 100       532  
157 15         31 my $k = refaddr($obj);
158             return exists $self->{is_paused}{$k}
159 15 100       58 ? 0 + $self->{is_paused}{$k}
160             : 0;
161             }
162              
163             sub flow_control {
164 4     4 0 8 my ($self) = @_;
165             $self->{flow_control} //= Ryu::Source->new(
166             new_future => $self->{new_future}
167             )
168 4   33     29 }
169              
170 637     637 0 17294 sub label { shift->{label} }
171              
172 1057     1057 0 3019 sub parent { shift->{parent} }
173              
174             =head2 new_future
175              
176             Used internally to get a L.
177              
178             =cut
179              
180             sub new_future {
181 137     137 1 220 my $self = shift;
182             (
183 137   66     940 $self->{new_future} //= $Ryu::Source::FUTURE_FACTORY
184             )->($self, @_)
185             }
186              
187              
188             1;
189              
190             __END__