File Coverage

blib/lib/RxPerl/Observable.pm
Criterion Covered Total %
statement 42 44 95.4
branch 5 6 83.3
condition 5 7 71.4
subroutine 9 10 90.0
pod 0 3 0.0
total 61 70 87.1


line stmt bran cond sub pod time code
1             package RxPerl::Observable;
2 5     5   25 use strict;
  5         9  
  5         129  
3 5     5   20 use warnings;
  5         7  
  5         94  
4              
5 5     5   1570 use RxPerl::Subscription;
  5         10  
  5         131  
6 5     5   1528 use RxPerl::Subscriber;
  5         11  
  5         120  
7              
8 5     5   26 use Scalar::Util 'reftype';
  5         7  
  5         173  
9 5     5   25 use Carp 'croak';
  5         5  
  5         1565  
10              
11             # an observable is something you can subscribe to.
12              
13             # The class RxPerl::Observable has a method 'new'
14             # (arguments) This method accepts a function as an argument.
15             # This function:
16             # - accepts a subscriber as its only argument
17             # - calls $subscriber->next,error,complete at its appropriate moments
18             # - returns a subref, which contains the cleanup required, when the subscriber wishes to unsubscribe
19             # (return) This method returns an instance of the RxPerl::Observable
20             # This RxPerl::Observable instance contains:
21             # - the function
22              
23             # Objects of the RxPerl::Observable class have a 'subscribe' method
24             # (arguments) This method accepts zero to three arguments, which should be converted by the subscribe method to a clean hashref ('the subscriber') with the corresponding 0-3 keys
25             # (body) This method calls the $function that RxPerl::Observable->new received as argument (and that initiates the subscription)
26             # (return) This method returns a new RxPerl::Subscription object, that contains the "cleanup subref" returned by $function
27              
28             our $VERSION = "v6.27.1";
29              
30             sub new {
31 2120     2120 0 2948 my ($class, $function) = @_;
32              
33 2120         3194 my $self = {function => $function};
34              
35 2120         5574 bless $self, $class;
36             }
37              
38             sub subscribe {
39 2243     2243 0 4350 my ($self, @args) = @_;
40              
41 2243         2640 my $subscriber = {};
42 2243         2794 bless $subscriber, 'RxPerl::Subscriber';
43              
44 2243 100 50     5812 if ((reftype($args[0]) // '') eq 'HASH') {
45 2194 100       4180 $args[0]{_subscription} = delete $args[0]{new_subscription} if $args[0]{new_subscription};
46 2194         2732 @$subscriber{qw/ next error complete _subscription /} = @{ $args[0] }{qw/ next error complete _subscription /};
  2194         5017  
47             } else {
48 49         111 @$subscriber{qw/ next error complete /} = @args;
49             }
50              
51             $subscriber->{error} //= sub {
52 0     0   0 my ($err) = @_;
53              
54             # TODO: shouldn't croak immediately, to be like rxjs, but on the next tick
55 0         0 croak $err;
56 2243   100     3939 };
57              
58 2243   66     4711 my $subscription = $subscriber->{_subscription} //= RxPerl::Subscription->new;
59 2243         3092 $subscriber->{closed_ref} = \$subscription->{closed};
60              
61             # don't continue if the subscription has already closed (complete/error)
62 2243 50       3418 return $subscription if $subscription->{closed};
63              
64 2243         4725 $subscription->_add_to_subscribers($subscriber);
65              
66 2243         2864 my $fn = $self->{function};
67              
68 2243         3990 my @cbs = $fn->($subscriber);
69              
70 2243         4764 $subscription->add(@cbs);
71              
72 2243         9272 return $subscription;
73             }
74              
75             sub pipe {
76 908     908 0 1407 my ($self, @operators) = @_;
77              
78 908         960 my $result = $self;
79 908         1877 $result = $_->($result) foreach @operators;
80              
81 908         2385 return $result;
82             }
83              
84             1;