File Coverage

blib/lib/Ryu/Node.pm
Criterion Covered Total %
statement 61 61 100.0
branch 32 38 84.2
condition 16 25 64.0
subroutine 17 17 100.0
pod 7 11 63.6
total 133 152 87.5


line stmt bran cond sub pod time code
1             package Ryu::Node;
2              
3 37     37   15389 use strict;
  37         91  
  37         1012  
4 37     37   176 use warnings;
  37         69  
  37         1747  
5              
6             our $VERSION = '3.000'; # VERSION
7             our $AUTHORITY = 'cpan:TEAM'; # AUTHORITY
8              
9             =head1 NAME
10              
11             Ryu::Node - generic node
12              
13             =head1 DESCRIPTION
14              
15             This is a common base class for all sources, sinks and other related things.
16             It does very little.
17              
18             =cut
19              
20 37     37   25584 use Future;
  37         473033  
  37         1399  
21 37     37   309 use Scalar::Util qw(refaddr);
  37         73  
  37         31343  
22              
23             =head1 METHODS
24              
25             Not really. There's a constructor, but that's not particularly exciting.
26              
27             =cut
28              
29             sub new {
30 135     135 0 5821 bless {
31             pause_propagation => 1,
32             @_[1..$#_]
33             }, $_[0]
34             }
35              
36             =head2 describe
37              
38             Returns a string describing this node and any parents - typically this will result in a chain
39             like C<< from->combine_latest->count >>.
40              
41             =cut
42              
43             # It'd be nice if L already provided a method for this, maybe I should suggest it
44             sub describe {
45 485     485 1 822 my ($self) = @_;
46 485 100       812 ($self->parent ? $self->parent->describe . '=>' : '') . $self->label . '(' . $self->completed->state . ')';
47             }
48              
49             =head2 completed
50              
51             Returns a L indicating completion (or failure) of this stream.
52              
53             =cut
54              
55             sub completed {
56 518     518 1 16654 my ($self) = @_;
57 518         881 return $self->_completed->without_cancel;
58             }
59              
60             # Internal use only, since it's cancellable
61             sub _completed {
62 1484     1484   11411 my ($self) = @_;
63 1484   66     5087 $self->{completed} //= do {
64 126         396 my $f = $self->new_future(
65             'completion'
66             );
67 126 50       3132 $f->on_ready(
68             $self->curry::weak::cleanup
69             ) if $self->can('cleanup');
70 126 50       6323 $self->prepare_await if $self->can('prepare_await');
71 126         1166 $f
72             }
73             }
74              
75             =head2 pause
76              
77             Does nothing useful.
78              
79             =cut
80              
81             sub pause {
82 18     18 1 3767 my ($self, $src) = @_;
83 18   100     86 my $k = refaddr($src) // 0;
84              
85 18   66     76 my $was_paused = $self->{is_paused} && keys %{$self->{is_paused}};
86 18 50       60 unless($was_paused) {
87 18 100 66     76 delete $self->{unblocked} if $self->{unblocked} and $self->{unblocked}->is_ready;
88             }
89 18         102 ++$self->{is_paused}{$k};
90 18 100       45 if(my $parent = $self->parent) {
91 10 100       43 $parent->pause($self) if $self->{pause_propagation};
92             }
93 18 100       49 if(my $flow_control = $self->{flow_control}) {
94 4 50       26 $flow_control->emit(0) unless $was_paused;
95             }
96             $self
97 18         33 }
98              
99             =head2 resume
100              
101             Is about as much use as L.
102              
103             =cut
104              
105             sub resume {
106 22     22 1 7583 my ($self, $src) = @_;
107 22   100     101 my $k = refaddr($src) // 0;
108 22 50       96 delete $self->{is_paused}{$k} unless --$self->{is_paused}{$k} > 0;
109 22 50 33     61 unless($self->{is_paused} and keys %{$self->{is_paused}}) {
  22         82  
110 22         58 my $f = $self->_unblocked;
111 22 100       367 $f->done unless $f->is_ready;
112 22 100       312 if(my $parent = $self->parent) {
113 12 100       48 $parent->resume($self) if $self->{pause_propagation};
114             }
115 22 100       60 if(my $flow_control = $self->{flow_control}) {
116 4         20 $flow_control->emit(1);
117             }
118             }
119             $self
120 22         47 }
121              
122             =head2 unblocked
123              
124             Returns a L representing the current flow control state of this node.
125              
126             It will be L if this node is currently paused,
127             otherwise L.
128              
129             =cut
130              
131             sub unblocked {
132             # Since we don't want stray callers to affect our internal state, we always return
133             # a non-cancellable version of our internal Future.
134             shift->_unblocked->without_cancel
135 6     6 1 16 }
136              
137             sub _unblocked {
138 28     28   47 my ($self) = @_;
139             # Since we don't want stray callers to affect our internal state, we always return
140             # a non-cancellable version of our internal Future.
141 28   66     121 $self->{unblocked} //= do {
142 24 100       61 $self->is_paused
143             ? $self->new_future
144             : Future->done
145             };
146             }
147              
148             =head2 is_paused
149              
150             Might return 1 or 0, but is generally meaningless.
151              
152             =cut
153              
154             sub is_paused {
155 95     95 1 7887 my ($self, $obj) = @_;
156 95 100       216 return keys %{ $self->{is_paused} } ? 1 : 0 unless defined $obj;
  80 100       500  
157 15         40 my $k = refaddr($obj);
158             return exists $self->{is_paused}{$k}
159 15 100       92 ? 0 + $self->{is_paused}{$k}
160             : 0;
161             }
162              
163             sub flow_control {
164 4     4 0 12 my ($self) = @_;
165             $self->{flow_control} //= Ryu::Source->new(
166             new_future => $self->{new_future}
167             )
168 4   33     25 }
169              
170 601     601 0 17230 sub label { shift->{label} }
171              
172 1001     1001 0 3003 sub parent { shift->{parent} }
173              
174             =head2 new_future
175              
176             Used internally to get a L.
177              
178             =cut
179              
180             sub new_future {
181 131     131 1 229 my $self = shift;
182             (
183 131   66     876 $self->{new_future} //= $Ryu::Source::FUTURE_FACTORY
184             )->($self, @_)
185             }
186              
187              
188             1;
189              
190             __END__