File Coverage

blib/lib/Mojo/Rx/Subscription.pm
Criterion Covered Total %
statement 42 45 93.3
branch 18 28 64.2
condition 2 6 33.3
subroutine 9 9 100.0
pod 0 4 0.0
total 71 92 77.1


line stmt bran cond sub pod time code
1             package Mojo::Rx::Subscription;
2 2     2   14 use strict;
  2         12  
  2         62  
3 2     2   13 use warnings FATAL => 'all';
  2         4  
  2         69  
4              
5 2     2   9 use Scalar::Util 'blessed', 'reftype', 'weaken';
  2         5  
  2         1403  
6              
7             our $VERSION = "v0.13.0";
8              
9             sub new {
10 6     6 0 13 my ($class) = @_;
11              
12 6         16 my $self = {
13             # the 'subrefs' key will be created by autovivification
14             closed => 0,
15             subscribers => [],
16             };
17              
18 6         24 bless $self, $class;
19             }
20              
21             sub _execute_item {
22 16     16   30 my ($self, $item) = @_;
23              
24 16 100       41 if (! defined $item) {
    50          
25 5         10 return undef;
26             } elsif (ref $item ne '') {
27 11 100 33     71 if (reftype $item eq 'CODE') {
    50 33        
    100          
    50          
    50          
    50          
28 2         7 $item->();
29             }
30             elsif (defined blessed($item) and $item->isa('Mojo::Rx::Subscription')) {
31 0 0       0 $item->unsubscribe unless $item eq $self;
32             }
33             elsif (reftype $item eq 'ARRAY') {
34 7         32 $self->_execute_item($_) foreach @$item;
35             }
36             elsif (reftype $item eq 'REF') {
37             # ref to ::Subscription object
38 0         0 $self->_execute_item($$item);
39             }
40             elsif (reftype $item eq 'SCALAR') {
41             # ref to undef, or some other invalid construct
42 0         0 return undef;
43             }
44             elsif (reftype $item eq 'HASH' and not defined blessed($item)) {
45 2         10 $self->_execute_item([values %$item]);
46             }
47             }
48             }
49              
50             sub add_to_subscribers {
51 6     6 0 14 my ($self, $subscriber) = @_;
52              
53 6         9 push @{ $self->{subscribers} }, $subscriber;
  6         13  
54              
55 6         39 weaken($self->{subscribers}[-1]);
56              
57             # wrap 'complete' and 'error' of first subscriber
58 6 50       9 if ((grep defined, @{ $self->{subscribers} }) == 1) {
  6         22  
59 6         14 foreach (qw/ error complete /) {
60             # wrap with 'unsubscribe'
61 12         24 my $orig_fn = $subscriber->{$_};
62             $subscriber->{$_} = sub {
63 5     5   13 $self->unsubscribe;
64 5 50       18 $orig_fn->(@_) if defined $orig_fn;
65             }
66 12         46 }
67             }
68             }
69              
70             sub add_dependents {
71 7     7 0 14 my ($self, @subrefs) = @_;
72              
73             # filter out any non-refs
74 7         16 @subrefs = grep ref ne '', @subrefs;
75              
76 7 100       18 if (! $self->{closed}) {
77 2         9 $self->{subrefs}{$_} = $_ foreach @subrefs;
78             } else {
79 5         11 $self->_execute_item(\@subrefs);
80             }
81             }
82              
83             sub unsubscribe {
84 7     7 0 1434 my ($self) = @_;
85              
86 7 100       20 return if $self->{closed}++;
87              
88             # no need for 'of' (or any other observable) to check 'closed status' anymore
89 6         12 foreach my $subscriber (@{ $self->{subscribers} }) {
  6         11  
90 6 50       35 delete @$subscriber{qw/ next error complete /} if defined $subscriber;
91             }
92              
93 6         13 $self->{subscribers} = [];
94              
95 6         15 $self->_execute_item(delete $self->{subrefs});
96             }
97              
98             1;