File Coverage

blib/lib/RxPerl/Subscription.pm
Criterion Covered Total %
statement 48 48 100.0
branch 26 28 92.8
condition 5 9 55.5
subroutine 9 9 100.0
pod 0 3 0.0
total 88 97 90.7


line stmt bran cond sub pod time code
1             package RxPerl::Subscription;
2 5     5   34 use strict;
  5         9  
  5         158  
3 5     5   31 use warnings;
  5         20  
  5         143  
4              
5 5     5   28 use Scalar::Util 'blessed', 'reftype', 'weaken';
  5         10  
  5         3539  
6              
7             our $VERSION = "v6.28.0";
8              
9             sub new {
10 1995     1995 0 3439 my ($class) = @_;
11              
12 1995         4429 my $self = {
13             # the 'subrefs' key will be created by autovivification
14             closed => 0,
15             subscribers => [],
16             };
17              
18 1995         5726 bless $self, $class;
19             }
20              
21             sub _execute_item {
22 8726     8726   13419 my ($self, $item) = @_;
23              
24 8726 100       19688 if (! defined $item) {
    50          
25 45         144 return undef;
26             } elsif (ref $item ne '') {
27 8681 100 66     43676 if (reftype $item eq 'CODE') {
    100 66        
    100 33        
    100          
    100          
    50          
28 1901         3717 $item->();
29             }
30             elsif (defined blessed($item) and $item->isa('RxPerl::Subscription')) {
31 901 100       2860 $item->unsubscribe unless $item eq $self;
32             }
33             elsif (reftype $item eq 'ARRAY' and not defined blessed($item)) {
34 3747         7832 $self->_execute_item($_) foreach @$item;
35 3747         8506 @$item = ();
36             }
37             elsif (reftype $item eq 'REF') {
38             # ref to ::Subscription object
39 8         25 $self->_execute_item($$item);
40 8         39 $$item = undef;
41             }
42             elsif (reftype $item eq 'SCALAR') {
43             # ref to undef, or some other invalid construct
44 1         4 return undef;
45             }
46             elsif (reftype $item eq 'HASH' and not defined blessed($item)) {
47 2123         7478 $self->_execute_item([values %$item]);
48 2123         13972 %$item = ();
49             }
50             }
51             }
52              
53             sub _add_to_subscribers {
54 2229     2229   3737 my ($self, $subscriber) = @_;
55              
56 2229         2961 push @{ $self->{subscribers} }, $subscriber;
  2229         4220  
57              
58 2229         6385 weaken($self->{subscribers}[-1]);
59              
60             # wrap 'complete' and 'error' of first subscriber
61 2229 100       2816 if ((grep defined, @{ $self->{subscribers} }) == 1) {
  2229         6544  
62 1995         3345 foreach (qw/ error complete /) {
63             # wrap with 'unsubscribe'
64 3990         6386 my $orig_fn = $subscriber->{$_};
65             $subscriber->{$_} = sub {
66 1865 100   1865   5531 $orig_fn->(@_) if defined $orig_fn;
67 1865         3726 $self->unsubscribe;
68             }
69 3990         12899 }
70             }
71             }
72              
73             sub add {
74 3275     3275 0 5329 my ($self, @subrefs) = @_;
75              
76             # filter out any non-refs
77 3275         7026 @subrefs = grep ref ne '', @subrefs;
78              
79 3275 100       6099 if (! $self->{closed}) {
80 2422         11239 $self->{subrefs}{$_} = $_ foreach @subrefs;
81             } else {
82 853         1666 $self->_execute_item(\@subrefs);
83             }
84             }
85              
86             sub unsubscribe {
87 2776     2776 0 5830 my ($self) = @_;
88              
89 2776 100       6718 return if $self->{closed}++;
90              
91             # no need for 'of' (or any other observable) to check 'closed status' anymore
92 1955         2531 foreach my $subscriber (@{ $self->{subscribers} }) {
  1955         3656  
93 2189 100       10071 delete @$subscriber{qw/ next error complete /} if defined $subscriber;
94             }
95              
96 1955         3971 $self->{subscribers} = [];
97              
98 1955         4131 $self->_execute_item(delete $self->{subrefs});
99             }
100              
101             1;