File Coverage

blib/lib/RxPerl/Subject.pm
Criterion Covered Total %
statement 43 45 95.5
branch 11 20 55.0
condition n/a
subroutine 10 11 90.9
pod 0 4 0.0
total 64 80 80.0


line stmt bran cond sub pod time code
1             package RxPerl::Subject;
2 5     5   31 use strict;
  5         9  
  5         131  
3 5     5   25 use warnings;
  5         9  
  5         122  
4              
5 5     5   24 use base 'RxPerl::Observable';
  5         10  
  5         390  
6              
7 5     5   2551 use Hash::Ordered;
  5         17446  
  5         2462  
8              
9             our $VERSION = "v6.27.0";
10              
11             # over-rideable
12             # sub _on_subscribe {
13             # my ($self, $subscriber) = @_;
14             # ...
15             # }
16              
17             # over-rideable
18             # sub _on_subscribe_closed {
19             # my ($self, $subscriber) = @_;
20             # ...
21             # }
22              
23             sub new {
24 27     27 0 51 my ($class) = @_;
25              
26 27         78 my $subscribers_oh = Hash::Ordered->new();
27              
28 27         279 my $self; $self = $class->SUPER::new(sub {
29 47     47   79 my ($subscriber) = @_;
30              
31 47 100       87 if ($self->{_closed}) {
32 1 50       8 $self->_on_subscribe_closed($subscriber) if $self->can('_on_subscribe_closed');
33 1         4 my ($type, @args) = @{ $self->{_closed} };
  1         3  
34 1 50       4 $subscriber->{$type}->(@args) if defined $subscriber->{$type};
35 1         3 return;
36             }
37              
38 46         171 $subscribers_oh->set("$subscriber", $subscriber);
39 46 100       803 $self->_on_subscribe($subscriber) if $self->can('_on_subscribe');
40              
41             return sub {
42 6         25 $subscribers_oh->delete("$subscriber");
43 46         182 };
44 27         219 });
45              
46 27         71 $self->{_closed} = 0;
47 27         49 foreach my $type (qw/ error complete /) {
48             $self->{$type} = sub {
49 7 50   7   31 return if $self->{_closed};
50 7         17 $self->{_closed} = [$type, @_];
51 7         22 foreach my $subscriber ($subscribers_oh->values) {
52 6 50       59 $subscriber->{$type}->(@_) if defined $subscriber->{$type};
53             }
54 7         35 $subscribers_oh->clear();
55             # TODO: maybe: delete @$self{qw/ next error complete /};
56             # (Think about how subclasses such as BehaviorSubjects will be affected)
57 54         211 };
58             }
59             $self->{next} = sub {
60 37     37   82 foreach my $subscriber ($subscribers_oh->values) {
61 52 50       427 $subscriber->{next}->(@_) if defined $subscriber->{next};
62             }
63 27         140 };
64              
65 27         69 return $self;
66             }
67              
68             sub next {
69 30     30 0 114 my $self = shift;
70              
71 30 50       95 $self->{next}->(splice @_, 0, 1) if defined $self->{next};
72             }
73              
74             sub error {
75 0     0 0 0 my $self = shift;
76              
77 0 0       0 $self->{error}->(splice @_, 0, 1) if defined $self->{error};
78             }
79              
80             sub complete {
81 4     4 0 26 my $self = shift;
82              
83 4 50       27 $self->{complete}->() if defined $self->{complete};
84             }
85              
86             1;