File Coverage

blib/lib/RxPerl/Observable.pm
Criterion Covered Total %
statement 42 44 95.4
branch 5 6 83.3
condition 5 7 71.4
subroutine 9 10 90.0
pod 0 3 0.0
total 61 70 87.1


line stmt bran cond sub pod time code
1             package RxPerl::Observable;
2 5     5   40 use strict;
  5         12  
  5         143  
3 5     5   23 use warnings;
  5         10  
  5         191  
4              
5 5     5   2056 use RxPerl::Subscription;
  5         13  
  5         144  
6 5     5   1957 use RxPerl::Subscriber;
  5         10  
  5         152  
7              
8 5     5   29 use Scalar::Util 'reftype';
  5         13  
  5         257  
9 5     5   31 use Carp 'croak';
  5         18  
  5         2001  
10              
11             # an observable is something you can subscribe to.
12              
13             # The class RxPerl::Observable has a method 'new'
14             # (arguments) This method accepts a function as an argument.
15             # This function:
16             # - accepts a subscriber as its only argument
17             # - calls $subscriber->next,error,complete at its appropriate moments
18             # - returns a subref, which contains the cleanup required, when the subscriber wishes to unsubscribe
19             # (return) This method returns an instance of the RxPerl::Observable
20             # This RxPerl::Observable instance contains:
21             # - the function
22              
23             # Objects of the RxPerl::Observable class have a 'subscribe' method
24             # (arguments) This method accepts zero to three arguments, which should be converted by the subscribe method to a clean hashref ('the subscriber') with the corresponding 0-3 keys
25             # (body) This method calls the $function that RxPerl::Observable->new received as argument (and that initiates the subscription)
26             # (return) This method returns a new RxPerl::Subscription object, that contains the "cleanup subref" returned by $function
27              
28             our $VERSION = "v6.28.0";
29              
30             sub new {
31 2106     2106 0 3679 my ($class, $function) = @_;
32              
33 2106         3786 my $self = {function => $function};
34              
35 2106         7148 bless $self, $class;
36             }
37              
38             sub subscribe {
39 2229     2229 0 5321 my ($self, @args) = @_;
40              
41 2229         3122 my $subscriber = {};
42 2229         3531 bless $subscriber, 'RxPerl::Subscriber';
43              
44 2229 100 50     7017 if ((reftype($args[0]) // '') eq 'HASH') {
45 2180 100       4903 $args[0]{_subscription} = delete $args[0]{new_subscription} if $args[0]{new_subscription};
46 2180         2929 @$subscriber{qw/ next error complete _subscription /} = @{ $args[0] }{qw/ next error complete _subscription /};
  2180         6322  
47             } else {
48 49         147 @$subscriber{qw/ next error complete /} = @args;
49             }
50              
51             $subscriber->{error} //= sub {
52 0     0   0 my ($err) = @_;
53              
54             # TODO: shouldn't croak immediately, to be like rxjs, but on the next tick
55 0         0 croak $err;
56 2229   100     4899 };
57              
58 2229   66     5512 my $subscription = $subscriber->{_subscription} //= RxPerl::Subscription->new;
59 2229         3612 $subscriber->{closed_ref} = \$subscription->{closed};
60              
61             # don't continue if the subscription has already closed (complete/error)
62 2229 50       4021 return $subscription if $subscription->{closed};
63              
64 2229         5208 $subscription->_add_to_subscribers($subscriber);
65              
66 2229         4045 my $fn = $self->{function};
67              
68 2229         4633 my @cbs = $fn->($subscriber);
69              
70 2229         5835 $subscription->add(@cbs);
71              
72 2229         11298 return $subscription;
73             }
74              
75             sub pipe {
76 902     902 0 1776 my ($self, @operators) = @_;
77              
78 902         1220 my $result = $self;
79 902         2258 $result = $_->($result) foreach @operators;
80              
81 902         2993 return $result;
82             }
83              
84             1;