File Coverage

blib/lib/Ryu/Observable.pm
Criterion Covered Total %
statement 62 77 80.5
branch 7 12 58.3
condition 3 9 33.3
subroutine 21 26 80.7
pod 11 11 100.0
total 104 135 77.0


line stmt bran cond sub pod time code
1             package Ryu::Observable;
2              
3 1     1   90113 use strict;
  1         7  
  1         28  
4 1     1   5 use warnings;
  1         2  
  1         23  
5              
6 1     1   589 use utf8;
  1         14  
  1         6  
7              
8             our $VERSION = '3.000'; # VERSION
9             our $AUTHORITY = 'cpan:TEAM'; # AUTHORITY
10              
11             =encoding utf8
12              
13             =head1 NAME
14              
15             Ryu::Observable - plus ça change
16              
17             =head1 SYNOPSIS
18              
19             # Set initial value
20             my $observed = Ryu::Observable->new(123)
21             # and a callback for any changes
22             ->subscribe(sub { print "New value, is now: $_\n" });
23             # Basic numeric increment/decrement should trigger a notification
24             ++$observed;
25             # To assign a new value, use ->set_numeric or ->set_string
26             $observed->set_numeric(88);
27              
28             =head1 DESCRIPTION
29              
30             Simple monitorable variables.
31              
32             =cut
33              
34             use overload
35 1     1   5 '""' => sub { shift->as_string },
36 1     1   35 '0+' => sub { shift->as_number },
37 1     1   7 '++' => sub { my $v = ++$_[0]->{value}; $_[0]->notify_all; $v },
  1         38  
  1         4  
38 0     0   0 '--' => sub { my $v = --$_[0]->{value}; $_[0]->notify_all; $v },
  0         0  
  0         0  
39 7     7   36 'bool' => sub { !!shift->{value} },
40 1     1   1450 fallback => 1;
  1         901  
  1         11  
41              
42 1     1   115 use Scalar::Util;
  1         2  
  1         31  
43 1     1   520 use List::UtilsBy;
  1         1955  
  1         43  
44              
45 1     1   573 use Ryu::Source;
  1         3  
  1         715  
46              
47             =head1 METHODS
48              
49             Public API, such as it is.
50              
51             =head2 as_string
52              
53             Returns the string representation of this value.
54              
55             =cut
56              
57 1     1 1 8 sub as_string { '' . shift->{value} }
58              
59             =head2 as_number
60              
61             Returns the numeric representation of this value.
62              
63             =cut
64              
65 1     1 1 6 sub as_number { 0 + shift->{value} }
66              
67             =head2 new
68              
69             Instantiates with the given value.
70              
71             my $observed = Ryu::Observable->new('whatever');
72              
73             =cut
74              
75 8     8 1 9247 sub new { bless { value => $_[1] }, $_[0] }
76              
77             =head2 subscribe
78              
79             Requests notifications when the value changes.
80              
81             my $observed = Ryu::Observable->new('whatever')
82             ->subscribe(sub { print "New value - $_\n" });
83              
84             =cut
85              
86 1     1 1 11 sub subscribe { my $self = shift; push @{$self->{subscriptions}}, @_; $self }
  1         2  
  1         4  
  1         3  
87              
88             =head2 unsubscribe
89              
90             Removes an existing callback.
91              
92             my $code;
93             my $observed = Ryu::Observable->new('whatever')
94             ->subscribe($code = sub { print "New value - $_\n" })
95             ->set_string('test')
96             ->unsubscribe($code);
97              
98             =cut
99              
100             sub unsubscribe {
101 0     0 1 0 my ($self, @code) = @_;
102 0         0 for my $addr (map Scalar::Util::refaddr($_), @code) {
103 0     0   0 List::UtilsBy::extract_by { Scalar::Util::refaddr($_) == $addr } @{$self->{subscriptions}};
  0         0  
  0         0  
104             }
105             $self
106 0         0 }
107              
108             =head2 set
109              
110             Sets the value to the given scalar, then notifies all subscribers (regardless
111             of whether the value has changed or not).
112              
113             =cut
114              
115 1     1 1 355 sub set { my ($self, $v) = @_; $self->{value} = $v; $self->notify_all }
  1         4  
  1         3  
116              
117             =head2 value
118              
119             Returns the raw value.
120              
121             =cut
122              
123 0     0 1 0 sub value { shift->{value} }
124              
125             =head2 set_numeric
126              
127             Applies a new numeric value, and notifies subscribers if the value is numerically
128             different to the previous one (or if we had no previous value).
129              
130             Returns C<$self>.
131              
132             =cut
133              
134             sub set_numeric {
135 0     0 1 0 my ($self, $v) = @_;
136 0         0 my $prev = $self->{value};
137 0 0 0     0 return $self if defined($prev) && $prev == $v;
138 0         0 $self->{value} = $v;
139 0         0 $self->notify_all
140             }
141              
142             =head2 set_string
143              
144             Applies a new string value, and notifies subscribers if the value stringifies to a
145             different value than the previous one (or if we had no previous value).
146              
147             Returns C<$self>.
148              
149             =cut
150              
151             sub set_string {
152 1     1 1 3 my ($self, $v) = @_;
153 1         3 my $prev = $self->{value};
154 1 50 33     8 return $self if defined($prev) && $prev eq $v;
155 1         3 $self->{value} = $v;
156 1         3 $self->notify_all
157             }
158              
159             =head2 source
160              
161             Returns a L, which will emit each new value
162             until the observable is destroyed.
163              
164             =cut
165              
166             sub source {
167 2     2 1 8 my ($self) = @_;
168 2   66     12 $self->{source} //= do {
169 1         7 my $src = Ryu::Source->new;
170 1         5 Scalar::Util::weaken(my $copy = $self);
171             $src->completed->on_ready(sub {
172 1 50   1   159 delete $copy->{source} if $copy
173 1         6 });
174 1         83 $src;
175             };
176             }
177              
178             =head1 METHODS - Internal
179              
180             Don't use these.
181              
182             =head2 notify_all
183              
184             Notifies all currently-subscribed callbacks with the current value.
185              
186             =cut
187              
188             sub notify_all {
189 3     3 1 7 my $self = shift;
190 3         7 my $v = $self->{value};
191 3 100       15 $self->{source}->emit($v) if $self->{source};
192 3         5 for my $sub (@{$self->{subscriptions}}) {
  3         8  
193 2         11 $sub->($_) for $v;
194             }
195             $self
196 3         2542 }
197              
198             sub DESTROY {
199 8     8   2977 my ($self) = @_;
200 8 50       32 return if ${^GLOBAL_PHASE} eq 'DESTRUCT';
201 8 100       20 if(my $src = $self->{source}) {
202 1         5 $src->finish;
203             }
204 8         18 delete $self->{value};
205 8         38 return;
206             }
207              
208             1;
209              
210             __END__