File Coverage

blib/lib/RxPerl/Subject.pm
Criterion Covered Total %
statement 43 45 95.5
branch 11 20 55.0
condition n/a
subroutine 10 11 90.9
pod 0 4 0.0
total 64 80 80.0


line stmt bran cond sub pod time code
1             package RxPerl::Subject;
2 5     5   29 use strict;
  5         7  
  5         112  
3 5     5   20 use warnings;
  5         8  
  5         101  
4              
5 5     5   21 use base 'RxPerl::Observable';
  5         7  
  5         359  
6              
7 5     5   2124 use Hash::Ordered;
  5         13471  
  5         1982  
8              
9             our $VERSION = "v6.27.1";
10              
11             # over-rideable
12             # sub _on_subscribe {
13             # my ($self, $subscriber) = @_;
14             # ...
15             # }
16              
17             # over-rideable
18             # sub _on_subscribe_closed {
19             # my ($self, $subscriber) = @_;
20             # ...
21             # }
22              
23             sub new {
24 27     27 0 41 my ($class) = @_;
25              
26 27         70 my $subscribers_oh = Hash::Ordered->new();
27              
28 27         242 my $self; $self = $class->SUPER::new(sub {
29 47     47   63 my ($subscriber) = @_;
30              
31 47 100       80 if ($self->{_closed}) {
32 1 50       8 $self->_on_subscribe_closed($subscriber) if $self->can('_on_subscribe_closed');
33 1         8 my ($type, @args) = @{ $self->{_closed} };
  1         3  
34 1 50       5 $subscriber->{$type}->(@args) if defined $subscriber->{$type};
35 1         2 return;
36             }
37              
38 46         160 $subscribers_oh->set("$subscriber", $subscriber);
39 46 100       631 $self->_on_subscribe($subscriber) if $self->can('_on_subscribe');
40              
41             return sub {
42 6         24 $subscribers_oh->delete("$subscriber");
43 46         174 };
44 27         147 });
45              
46 27         56 $self->{_closed} = 0;
47 27         43 foreach my $type (qw/ error complete /) {
48             $self->{$type} = sub {
49 7 50   7   18 return if $self->{_closed};
50 7         15 $self->{_closed} = [$type, @_];
51 7         17 foreach my $subscriber ($subscribers_oh->values) {
52 6 50       51 $subscriber->{$type}->(@_) if defined $subscriber->{$type};
53             }
54 7         31 $subscribers_oh->clear();
55             # TODO: maybe: delete @$self{qw/ next error complete /};
56             # (Think about how subclasses such as BehaviorSubjects will be affected)
57 54         189 };
58             }
59             $self->{next} = sub {
60 37     37   84 foreach my $subscriber ($subscribers_oh->values) {
61 52 50       360 $subscriber->{next}->(@_) if defined $subscriber->{next};
62             }
63 27         85 };
64              
65 27         55 return $self;
66             }
67              
68             sub next {
69 30     30 0 104 my $self = shift;
70              
71 30 50       83 $self->{next}->(splice @_, 0, 1) if defined $self->{next};
72             }
73              
74             sub error {
75 0     0 0 0 my $self = shift;
76              
77 0 0       0 $self->{error}->(splice @_, 0, 1) if defined $self->{error};
78             }
79              
80             sub complete {
81 4     4 0 21 my $self = shift;
82              
83 4 50       12 $self->{complete}->() if defined $self->{complete};
84             }
85              
86             1;