File Coverage

blib/lib/RxPerl/Subject.pm
Criterion Covered Total %
statement 43 45 95.5
branch 11 20 55.0
condition n/a
subroutine 10 11 90.9
pod 0 4 0.0
total 64 80 80.0


line stmt bran cond sub pod time code
1             package RxPerl::Subject;
2 5     5   31 use strict;
  5         9  
  5         140  
3 5     5   20 use warnings;
  5         10  
  5         153  
4              
5 5     5   34 use base 'RxPerl::Observable';
  5         9  
  5         474  
6              
7 5     5   2758 use Hash::Ordered;
  5         17603  
  5         2578  
8              
9             our $VERSION = "v6.28.0";
10              
11             # over-rideable
12             # sub _on_subscribe {
13             # my ($self, $subscriber) = @_;
14             # ...
15             # }
16              
17             # over-rideable
18             # sub _on_subscribe_closed {
19             # my ($self, $subscriber) = @_;
20             # ...
21             # }
22              
23             sub new {
24 25     25 0 47 my ($class) = @_;
25              
26 25         61 my $subscribers_oh = Hash::Ordered->new();
27              
28 25         260 my $self; $self = $class->SUPER::new(sub {
29 45     45   70 my ($subscriber) = @_;
30              
31 45 100       89 if ($self->{_closed}) {
32 1 50       8 $self->_on_subscribe_closed($subscriber) if $self->can('_on_subscribe_closed');
33 1         6 my ($type, @args) = @{ $self->{_closed} };
  1         3  
34 1 50       5 $subscriber->{$type}->(@args) if defined $subscriber->{$type};
35 1         2 return;
36             }
37              
38 44         191 $subscribers_oh->set("$subscriber", $subscriber);
39 44 100       715 $self->_on_subscribe($subscriber) if $self->can('_on_subscribe');
40              
41             return sub {
42 4         18 $subscribers_oh->delete("$subscriber");
43 44         218 };
44 25         172 });
45              
46 25         66 $self->{_closed} = 0;
47 25         47 foreach my $type (qw/ error complete /) {
48             $self->{$type} = sub {
49 5 50   5   15 return if $self->{_closed};
50 5         17 $self->{_closed} = [$type, @_];
51 5         14 foreach my $subscriber ($subscribers_oh->values) {
52 4 50       38 $subscriber->{$type}->(@_) if defined $subscriber->{$type};
53             }
54 5         25 $subscribers_oh->clear();
55             # TODO: maybe: delete @$self{qw/ next error complete /};
56             # (Think about how subclasses such as BehaviorSubjects will be affected)
57 50         192 };
58             }
59             $self->{next} = sub {
60 32     32   94 foreach my $subscriber ($subscribers_oh->values) {
61 47 50       389 $subscriber->{next}->(@_) if defined $subscriber->{next};
62             }
63 25         117 };
64              
65 25         62 return $self;
66             }
67              
68             sub next {
69 30     30 0 167 my $self = shift;
70              
71 30 50       96 $self->{next}->(splice @_, 0, 1) if defined $self->{next};
72             }
73              
74             sub error {
75 0     0 0 0 my $self = shift;
76              
77 0 0       0 $self->{error}->(splice @_, 0, 1) if defined $self->{error};
78             }
79              
80             sub complete {
81 4     4 0 39 my $self = shift;
82              
83 4 50       27 $self->{complete}->() if defined $self->{complete};
84             }
85              
86             1;