File Coverage

blib/lib/RxPerl/Functions.pm
Criterion Covered Total %
statement 45 50 90.0
branch 5 8 62.5
condition 1 3 33.3
subroutine 13 15 86.6
pod n/a
total 64 76 84.2


line stmt bran cond sub pod time code
1             package RxPerl::Functions;
2              
3 5     5   28 use strict;
  5         9  
  5         126  
4 5     5   21 use warnings;
  5         9  
  5         145  
5              
6             require RxPerl::Operators::Pipeable;
7              
8 5     5   22 use Carp 'croak';
  5         8  
  5         181  
9 5     5   23 use Scalar::Util 'blessed';
  5         9  
  5         164  
10              
11 5     5   23 use Exporter 'import';
  5         7  
  5         586  
12             our @EXPORT_OK = qw/
13             last_value_from first_value_from is_observable
14             /;
15             our %EXPORT_TAGS = (all => \@EXPORT_OK);
16              
17             our $VERSION = "v6.27.1";
18              
19             sub _promise_class {
20 4     4   24 my $fn = (caller(1))[3];
21 4         9 my $rx_class = $fn;
22 4         21 $rx_class =~ s/\:\:[^\:]+\z//;
23 5     5   29 no strict 'refs';
  5         22  
  5         1915  
24 4         8 my $promise_class = ${ "${rx_class}::promise_class" };
  4         12  
25 4 50       14 return wantarray ? ($promise_class, $rx_class) : $promise_class;
26             }
27              
28             sub last_value_from {
29 4     4   6459 my ($observable) = @_;
30              
31 4         6 my ($promise_class, $rx_class) = _promise_class;
32 4 50       9 $promise_class or croak "Promise class not set, set it with: ${rx_class}->set_promise_class(\$promise_class)";
33              
34 4         6 my ($promise, $resolve, $reject) = do {
35 4 50       8 if ($promise_class eq 'Future') {
36 0         0 my $future = Future->new;
37 0     0   0 ( $future, sub { $future->done(@_) }, sub { $future->fail(@_) } );
  0         0  
  0         0  
38             } else {
39 4         6 my ($res, $rej);
40             my $p = $promise_class->new(sub {
41 4     4   95 ($res, $rej) = @_;
42 4         21 });
43 4         27 ( $p, $res, $rej );
44             }
45             };
46              
47 4         8 my ($got_value, $last_value);
48             $observable->subscribe({
49             next => sub {
50 4     4   6 $last_value = $_[0];
51 4         8 $got_value = 1;
52             },
53             error => sub {
54 0     0   0 $reject->($_[0]);
55             },
56             complete => sub {
57 4 100   4   7 if ($got_value) {
58 2         6 $resolve->($last_value);
59             } else {
60 2         5 $reject->('no elements in sequence');
61             }
62             },
63 4         36 });
64              
65 4         58 return $promise;
66             }
67              
68             sub first_value_from {
69 2     2   13 my ($observable) = @_;
70 2         6 return last_value_from(
71             $observable->pipe(RxPerl::Operators::Pipeable::op_first())
72             );
73             }
74              
75             sub is_observable {
76 1     1   6 my ($thing) = @_;
77              
78 1   33     16 return !!(blessed($thing) && $thing->isa('RxPerl::Observable'));
79             }
80              
81             1;