File Coverage

blib/lib/RxPerl/ConnectableObservable.pm
Criterion Covered Total %
statement 15 36 41.6
branch 0 4 0.0
condition n/a
subroutine 5 9 55.5
pod 0 2 0.0
total 20 51 39.2


line stmt bran cond sub pod time code
1             package RxPerl::ConnectableObservable;
2 5     5   35 use strict;
  5         11  
  5         145  
3 5     5   26 use warnings;
  5         15  
  5         125  
4              
5 5     5   26 use base 'RxPerl::Observable';
  5         10  
  5         372  
6              
7 5     5   32 use RxPerl::Subscription;
  5         9  
  5         131  
8              
9 5     5   28 use Scalar::Util 'weaken';
  5         30  
  5         1768  
10              
11             our $VERSION = "v6.27.0";
12              
13             sub new {
14 0     0 0   my ($class, $source, $subject_factory) = @_;
15              
16 0           my $weak_self;
17             my $self = $class->SUPER::new(sub {
18 0     0     my ($subscriber) = @_;
19              
20 0           return $weak_self->{_subject}->subscribe($subscriber);
21 0           });
22 0           weaken($weak_self = $self);
23              
24 0           %$self = (
25             %$self,
26             _source => $source,
27             _subject_factory => $subject_factory,
28             _subject => $subject_factory->(),
29             _connected => 0,
30             _subjects_subscription => undef,
31             );
32              
33 0           return $self;
34             }
35              
36             sub connect {
37 0     0 0   my ($self) = @_;
38              
39 0 0         return $self->{_subjects_subscription} if $self->{_connected};
40              
41 0           $self->{_connected} = 1;
42              
43 0           $self->{_subjects_subscription} = RxPerl::Subscription->new;
44 0           weaken(my $weak_self = $self);
45             $self->{_subjects_subscription}->add(sub {
46 0 0   0     if (defined $weak_self) {
47 0           $weak_self->{_connected} = 0;
48 0           $weak_self->{_subjects_subscription} = undef;
49 0           $weak_self->{_subject} = $weak_self->{_subject_factory}->();
50             }
51 0           });
52              
53             $self->{_source}->subscribe({
54             new_subscription => $self->{_subjects_subscription},
55 0           %{ $self->{_subject} },
  0            
56             });
57              
58 0           return $self->{_subjects_subscription};
59             }
60              
61             1;