File Coverage

blib/lib/RxPerl/Subscription.pm
Criterion Covered Total %
statement 48 48 100.0
branch 26 28 92.8
condition 5 9 55.5
subroutine 9 9 100.0
pod 0 3 0.0
total 88 97 90.7


line stmt bran cond sub pod time code
1             package RxPerl::Subscription;
2 5     5   25 use strict;
  5         7  
  5         112  
3 5     5   20 use warnings;
  5         8  
  5         127  
4              
5 5     5   20 use Scalar::Util 'blessed', 'reftype', 'weaken';
  5         38  
  5         2714  
6              
7             our $VERSION = "v6.27.1";
8              
9             sub new {
10 2005     2005 0 2818 my ($class) = @_;
11              
12 2005         3677 my $self = {
13             # the 'subrefs' key will be created by autovivification
14             closed => 0,
15             subscribers => [],
16             };
17              
18 2005         4671 bless $self, $class;
19             }
20              
21             sub _execute_item {
22 8766     8766   10952 my ($self, $item) = @_;
23              
24 8766 100       15841 if (! defined $item) {
    50          
25 45         108 return undef;
26             } elsif (ref $item ne '') {
27 8721 100 66     34634 if (reftype $item eq 'CODE') {
    100 66        
    100 33        
    100          
    100          
    50          
28 1910         3054 $item->();
29             }
30             elsif (defined blessed($item) and $item->isa('RxPerl::Subscription')) {
31 904 100       2357 $item->unsubscribe unless $item eq $self;
32             }
33             elsif (reftype $item eq 'ARRAY' and not defined blessed($item)) {
34 3763         6080 $self->_execute_item($_) foreach @$item;
35 3763         6932 @$item = ();
36             }
37             elsif (reftype $item eq 'REF') {
38             # ref to ::Subscription object
39 8         18 $self->_execute_item($$item);
40 8         21 $$item = undef;
41             }
42             elsif (reftype $item eq 'SCALAR') {
43             # ref to undef, or some other invalid construct
44 1         3 return undef;
45             }
46             elsif (reftype $item eq 'HASH' and not defined blessed($item)) {
47 2135         6215 $self->_execute_item([values %$item]);
48 2135         11381 %$item = ();
49             }
50             }
51             }
52              
53             sub _add_to_subscribers {
54 2243     2243   2900 my ($self, $subscriber) = @_;
55              
56 2243         2288 push @{ $self->{subscribers} }, $subscriber;
  2243         3328  
57              
58 2243         5365 weaken($self->{subscribers}[-1]);
59              
60             # wrap 'complete' and 'error' of first subscriber
61 2243 100       2241 if ((grep defined, @{ $self->{subscribers} }) == 1) {
  2243         5164  
62 2005         2774 foreach (qw/ error complete /) {
63             # wrap with 'unsubscribe'
64 4010         4847 my $orig_fn = $subscriber->{$_};
65             $subscriber->{$_} = sub {
66 1875 100   1875   4645 $orig_fn->(@_) if defined $orig_fn;
67 1875         2993 $self->unsubscribe;
68             }
69 4010         10359 }
70             }
71             }
72              
73             sub add {
74 3294     3294 0 4462 my ($self, @subrefs) = @_;
75              
76             # filter out any non-refs
77 3294         5528 @subrefs = grep ref ne '', @subrefs;
78              
79 3294 100       5040 if (! $self->{closed}) {
80 2439         7296 $self->{subrefs}{$_} = $_ foreach @subrefs;
81             } else {
82 855         1330 $self->_execute_item(\@subrefs);
83             }
84             }
85              
86             sub unsubscribe {
87 2789     2789 0 4537 my ($self) = @_;
88              
89 2789 100       5396 return if $self->{closed}++;
90              
91             # no need for 'of' (or any other observable) to check 'closed status' anymore
92 1965         2259 foreach my $subscriber (@{ $self->{subscribers} }) {
  1965         3043  
93 2203 100       8380 delete @$subscriber{qw/ next error complete /} if defined $subscriber;
94             }
95              
96 1965         3208 $self->{subscribers} = [];
97              
98 1965         3277 $self->_execute_item(delete $self->{subrefs});
99             }
100              
101             1;