File Coverage

blib/lib/RxPerl/Operators/Creation.pm
Criterion Covered Total %
statement 313 367 85.2
branch 75 148 50.6
condition 17 33 51.5
subroutine 56 64 87.5
pod n/a
total 461 612 75.3


line stmt bran cond sub pod time code
1             package RxPerl::Operators::Creation;
2 5     5   34 use strict;
  5         9  
  5         135  
3 5     5   24 use warnings;
  5         11  
  5         135  
4              
5 5     5   1921 use RxPerl::Observable;
  5         14  
  5         140  
6 5     5   34 use RxPerl::Subscription;
  5         9  
  5         110  
7 5     5   1906 use RxPerl::Utils 'get_timer_subs', 'get_interval_subs';
  5         10  
  5         278  
8 5     5   1881 use RxPerl::Subject;
  5         13  
  5         157  
9 5     5   2105 use RxPerl::BehaviorSubject;
  5         13  
  5         143  
10 5     5   1891 use RxPerl::ReplaySubject;
  5         13  
  5         148  
11              
12 5     5   31 use Carp 'croak';
  5         12  
  5         227  
13 5     5   35 use Scalar::Util qw/ weaken blessed reftype /;
  5         10  
  5         220  
14 5     5   28 use List::Util 'first';
  5         10  
  5         277  
15              
16 5     5   27 use Exporter 'import';
  5         10  
  5         22912  
17             our @EXPORT_OK = qw/
18             rx_behavior_subject rx_combine_latest rx_concat rx_defer rx_EMPTY rx_fork_join rx_from rx_from_event
19             rx_from_event_array rx_generate rx_iif rx_interval rx_merge rx_NEVER rx_observable rx_of rx_on_error_resume_next
20             rx_partition rx_race rx_range rx_replay_subject rx_subject rx_throw_error rx_timer rx_zip
21             /;
22             our %EXPORT_TAGS = (all => \@EXPORT_OK);
23              
24             our $VERSION = "v6.27.0";
25              
26             sub rx_observable;
27              
28 3     3   14005 sub rx_behavior_subject { "RxPerl::BehaviorSubject" }
29              
30             sub rx_combine_latest {
31 2     2   14 my ($sources) = @_;
32              
33             return rx_observable->new(sub {
34 2     2   6 my ($subscriber) = @_;
35              
36 2         4 my $sources = [@$sources];
37              
38 2         3 my %own_subscriptions;
39 2         5 my $i = 0;
40 2         4 my %didnt_emit = map {($i++, 1)} @$sources;
  4         13  
41 2         4 my @latest_values;
42 2         3 my $num_active = @$sources;
43              
44             $subscriber->subscription->add(
45 2         6 \%own_subscriptions, sub { undef @$sources },
46 2         7 );
47              
48 2         10 for (my $i = 0; $i < @$sources; $i++) {
49 4         5 my $j = $i;
50 4         9 my $source = $sources->[$j];
51 4         10 my $own_subscription = RxPerl::Subscription->new;
52 4         12 $own_subscriptions{$own_subscription} = $own_subscription;
53             my $own_observer = {
54             new_subscription => $own_subscription,
55             next => sub {
56 14         24 my ($value) = @_;
57              
58 14         28 $latest_values[$j] = $value;
59 14         20 delete $didnt_emit{$j};
60              
61 14 100       31 if (!%didnt_emit) {
62 12 50       43 $subscriber->{next}->([@latest_values]) if defined $subscriber->{next};
63             }
64             },
65             error => $subscriber->{error},
66             complete => sub {
67 4         8 $num_active--;
68 4 100       10 if ($num_active == 0) {
69 2 50       8 $subscriber->{complete}->() if defined $subscriber->{complete};
70             }
71             },
72 4         28 };
73 4         12 $source->subscribe($own_observer);
74             }
75              
76 2         6 return;
77 2         6 });
78             }
79              
80             sub _rx_concat_helper {
81 76     76   163 my ($sources, $subscriber, $active) = @_;
82              
83 76 100       172 if (! @$sources) {
84 5 50       24 $subscriber->{complete}->() if defined $subscriber->{complete};
85 5         16 return;
86             }
87              
88 71         108 my $source = shift @$sources;
89 71         163 my $own_subscription = RxPerl::Subscription->new;
90             my $own_subscriber = {
91             new_subscription => $own_subscription,
92             next => $subscriber->{next},
93             error => $subscriber->{error},
94             complete => sub {
95 42     42   102 _rx_concat_helper($sources, $subscriber, $active);
96             },
97 71         321 };
98 71         157 @$active = ($own_subscription);
99 71         163 $source->subscribe($own_subscriber);
100             }
101              
102             sub rx_concat {
103 29     29   74 my @sources = @_;
104              
105             return rx_observable->new(sub {
106 34     34   64 my ($subscriber) = @_;
107              
108 34         68 my @sources = @sources;
109              
110 34         51 my @active;
111             $subscriber->subscription->add(
112 34         98 \@active, sub { undef @sources },
113 34         89 );
114              
115 34         118 _rx_concat_helper(\@sources, $subscriber, \@active);
116              
117 34         75 return;
118 29         55 });
119             }
120              
121             sub rx_defer {
122 1     1   3 my ($observable_factory) = @_;
123              
124             return rx_observable->new(sub {
125 2     2   8 my ($subscriber) = @_;
126              
127 2         6 my $observable = $observable_factory->();
128              
129 2         15 return $observable->subscribe($subscriber);
130 1         4 });
131             }
132              
133             my $rx_EMPTY;
134              
135             sub rx_EMPTY {
136             $rx_EMPTY //= rx_observable->new(sub {
137 9     9   28 my ($subscriber) = @_;
138              
139 9 50       36 $subscriber->{complete}->() if defined $subscriber->{complete};
140              
141 9         20 return;
142 9   66 9   8896 });
143             }
144              
145             sub rx_fork_join {
146 4     4   10 my ($sources) = @_;
147              
148 4   66     56 my $arg_is_array = !(blessed $sources) && (reftype $sources eq 'ARRAY');
149 4   66     21 my $arg_is_hash = !(blessed $sources) && (reftype $sources eq 'HASH');
150              
151 4 50 66     18 croak "argument of rx_fork_join needs to be either an arrayref or a hashref"
152             unless $arg_is_array or $arg_is_hash;
153              
154 4 100       10 if ($arg_is_array) {
155 2         3 my $i = 0;
156 2         6 $sources = { map {($i++, $_)} @$sources };
  7         20  
157             }
158              
159             return rx_observable->new(sub {
160 4     4   12 my ($subscriber) = @_;
161              
162 4         16 my $sources = { %$sources };
163 4         8 my %last_values;
164             my %own_subscriptions;
165 4         12 my @keys = keys %$sources;
166 4 100       115 @keys = sort {$a <=> $b} @keys if $arg_is_array;
  7         17  
167              
168             $subscriber->subscription->add(
169 4         15 \%own_subscriptions, sub { undef @keys },
170 4         15 );
171              
172 4 50       17 if (! @keys) {
173 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
174 0         0 return;
175             }
176              
177 4         84 for (my $i = 0; $i < @keys; $i++) {
178 12         26 my $key = $keys[$i];
179 12         21 my $source = $sources->{$key};
180 12         31 my $own_subscription = RxPerl::Subscription->new;
181 12         41 $own_subscriptions{$own_subscription} = $own_subscription;
182             $source->subscribe({
183             new_subscription => $own_subscription,
184             next => sub {
185 30         69 $last_values{$key} = $_[0];
186             },
187             error => $subscriber->{error},
188             complete => sub {
189 12 100       26 if (exists $last_values{$key}) {
190 10 100       32 if (keys(%last_values) == keys %$sources) {
191 2 100       5 if ($arg_is_array) {
192 1         2 my @ret;
193 1         6 $ret[$_] = $last_values{$_} foreach keys %last_values;
194 1 50       7 $subscriber->{next}->(\@ret) if defined $subscriber->{next};
195             }
196             else {
197 1 50       6 $subscriber->{next}->(\%last_values) if defined $subscriber->{next};
198             }
199 2 50       15 $subscriber->{complete}->() if defined $subscriber->{complete};
200             }
201             } else {
202 2 50       17 $subscriber->{complete}->() if defined $subscriber->{complete};
203             }
204             },
205 12         108 });
206             }
207              
208 4         20 return;
209 4         12 });
210             }
211              
212             sub rx_from {
213 3     3   7944 my ($thing) = @_;
214              
215 3 100 66     51 if (blessed $thing and $thing->isa('RxPerl::Observable')) {
    50 33        
    50 33        
    100 66        
    50 33        
216 1         7 return $thing;
217             }
218              
219             elsif (blessed $thing and $thing->isa('Future')) {
220             return rx_observable->new(sub {
221 0     0   0 my ($subscriber) = @_;
222              
223             $thing->on_done(sub {
224 0 0       0 $subscriber->{next}->(splice @_, 0, 1) if defined $subscriber->{next};
225 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
226 0         0 });
227              
228             $thing->on_fail(sub {
229 0 0       0 $subscriber->{error}->(splice @_, 0, 1) if defined $subscriber->{error};
230 0         0 });
231              
232             $thing->on_ready(sub {
233 0 0       0 if ($thing->is_cancelled) {
234 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
235             }
236 0         0 });
237 0         0 });
238             }
239              
240             elsif (blessed $thing and $thing->can('then')) {
241             return rx_observable->new(sub {
242 0     0   0 my ($subscriber) = @_;
243              
244             $thing->then(
245             sub {
246 0 0       0 $subscriber->{next}->(splice @_, 0, 1) if defined $subscriber->{next};
247 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
248             },
249             sub {
250 0 0       0 $subscriber->{error}->(splice @_, 0, 1) if defined $subscriber->{error};
251             },
252 0         0 );
253              
254 0         0 return;
255 0         0 });
256             }
257              
258             elsif (ref $thing eq 'ARRAY' and ! blessed $thing) {
259 1         4 return rx_of(@$thing);
260             }
261              
262             elsif (defined $thing and ! length(ref $thing)) {
263 1         12 my @letters = split //, $thing;
264 1         4 return rx_of(@letters);
265             }
266              
267             else {
268 0         0 croak "rx_from only accepts arrayrefs, promises, observables, and strings as argument at the moment,";
269             }
270             }
271              
272             # NOTE: rx_from_event and rx_from_event_array keep a weak reference to the
273             # EventEmitter $object. Should this change? TODO: think about that.
274              
275             sub rx_from_event {
276 0     0   0 my ($object, $event_type) = @_;
277              
278 0 0       0 croak 'invalid object type, at rx_from_event' if not $object->isa('Mojo::EventEmitter');
279              
280 0         0 weaken($object);
281             return rx_observable->new(sub {
282 0     0   0 my ($subscriber) = @_;
283              
284             my $cb = sub {
285 0         0 my ($e, @args) = @_;
286              
287 0 0       0 $subscriber->{next}->(splice @args, 0, 1) if defined $subscriber->{next};
288 0         0 };
289              
290             $subscriber->subscription->add(sub {
291 0 0       0 $object->unsubscribe($cb) if defined $object;
292 0         0 });
293              
294 0         0 $object->on($event_type, $cb);
295              
296 0         0 return;
297 0         0 });
298             }
299              
300             sub rx_from_event_array {
301 0     0   0 my ($object, $event_type) = @_;
302              
303 0 0       0 croak 'invalid object type, at rx_from_event_array' if not $object->isa('Mojo::EventEmitter');
304              
305 0         0 weaken($object);
306             return rx_observable->new(sub {
307 0     0   0 my ($subscriber) = @_;
308              
309             my $cb = sub {
310 0         0 my ($e, @args) = @_;
311              
312 0 0       0 $subscriber->{next}->([@args]) if defined $subscriber->{next};
313 0         0 };
314              
315             $subscriber->subscription->add(sub {
316 0 0       0 $object->unsubscribe($cb) if defined $object;
317 0         0 });
318              
319 0         0 $object->on($event_type, $cb);
320              
321 0         0 return;
322 0         0 });
323             }
324              
325             sub rx_generate {
326 1     1   4831 my ($initial, $condition, $iterate, $result_selector) = @_;
327              
328             return rx_observable->new(sub {
329 1     1   3 my ($subscriber) = @_;
330              
331 1         2 my $must_finish = 0;
332              
333 1         3 $subscriber->subscription->add(sub { $must_finish = 1 });
  1         5  
334              
335 1         2 my $x = $initial;
336 1         3 while (1) {
337 6 50       11 ! $must_finish or last;
338 6         10 my $cond; my $ok = eval { local $_ = $x; $cond = $condition->($x); 1 };
  6         7  
  6         8  
  6         14  
  6         18  
339 6 50       12 if (! $ok) {
340 0 0       0 $subscriber->{error}->($@) if defined $subscriber->{error};
341 0         0 last;
342             }
343 6 100       13 if (! $cond) {
344 1 50       4 $subscriber->{complete}->() if defined $subscriber->{complete};
345 1         3 last;
346             }
347 5 50       7 my $output_val; $ok = eval { local $_ = $x; $output_val = $result_selector ? $result_selector->($x) : $x; 1 };
  5         7  
  5         6  
  5         13  
  5         17  
348 5 50       9 if (! $ok) {
349 0 0       0 $subscriber->{error}->($@) if defined $subscriber->{error};
350 0         0 last;
351             }
352 5 50       16 $subscriber->{next}->($output_val) if defined $subscriber->{next};
353 5         9 $ok = eval { local $_ = $x; $x = $iterate->($x); 1 };
  5         7  
  5         8  
  5         15  
354 5 50       12 if (! $ok) {
355 0 0       0 $subscriber->{error}->($@) if defined $subscriber->{error};
356 0         0 last;
357             }
358             }
359 1         3 });
360             }
361              
362             sub rx_iif {
363 1     1   7 my ($condition, $true_result, $false_result) = @_;
364              
365             return rx_defer(sub {
366 2 100   2   6 return $condition->() ? $true_result : $false_result;
367 1         10 });
368             }
369              
370             sub rx_interval {
371 64     64   90765 my ($after) = @_;
372              
373 64         164 my ($interval_sub, $cancel_interval_sub) = get_interval_subs;
374              
375             return rx_observable->new(sub {
376 48     48   92 my ($subscriber) = @_;
377              
378 48         74 my $counter = 0;
379             my $timer = $interval_sub->($after, sub {
380 177 50       569 $subscriber->{next}->($counter++) if defined $subscriber->{next};
381 48         194 });
382              
383             return sub {
384 48         144 $cancel_interval_sub->($timer);
385 48         206 };
386 64         157 });
387             }
388              
389             sub rx_merge {
390 192     192   401 my @sources = @_;
391              
392             return rx_observable->new(sub {
393 203     203   351 my ($subscriber) = @_;
394              
395 203         409 my @sources = @sources;
396              
397 203         312 my %own_subscriptions;
398             $subscriber->subscription->add(
399             \%own_subscriptions,
400 203         581 sub { @sources = () },
401 203         551 );
402              
403 203         406 my $num_active_subscriptions = @sources;
404 203 50 66     662 $num_active_subscriptions or $subscriber->{complete}->() if defined $subscriber->{complete};
405              
406 203         479 for (my $i = 0; $i < @sources; $i++) {
407 796         1373 my $source = $sources[$i];
408 796         1872 my $own_subscription = RxPerl::Subscription->new;
409 796         2310 $own_subscriptions{$own_subscription} = $own_subscription;
410             my $own_subscriber = {
411             new_subscription => $own_subscription,
412             next => $subscriber->{next},
413             error => $subscriber->{error},
414             complete => sub {
415 719         1739 delete $own_subscriptions{$own_subscription};
416 719 100       1710 if (! --$num_active_subscriptions) {
417 157 50       500 $subscriber->{complete}->() if defined $subscriber->{complete};
418             }
419             },
420 796         3374 };
421 796         1947 $source->subscribe($own_subscriber);
422             }
423              
424 203         489 return;
425 192         369 });
426             }
427              
428             my $rx_never;
429              
430             sub rx_NEVER {
431             return $rx_never //= rx_observable->new(sub {
432 0     0   0 return;
433 0   0 0   0 });
434             }
435              
436 2093     2093   16151 sub rx_observable { "RxPerl::Observable" }
437              
438             sub rx_of {
439 722     722   81251 my (@values) = @_;
440              
441             return rx_observable->new(sub {
442 771     771   1287 my ($subscriber) = @_;
443              
444 771         1083 my $i = 0;
445              
446 771         1647 $subscriber->subscription->add(sub { $i = @values });
  771         2012  
447              
448 771         1883 for (; $i < @values; $i++) {
449 894 50       2768 $subscriber->{next}->($values[$i]) if defined $subscriber->{next};
450             }
451              
452 771 100       2866 $subscriber->{complete}->() if defined $subscriber->{complete};
453              
454 771         2080 return;
455 722         1319 });
456             }
457              
458             sub _rx_on_error_resume_next_helper {
459 10     10   21 my ($sources, $subscriber, $active) = @_;
460              
461 10 100       21 if (! @$sources) {
462 2 50       13 $subscriber->{complete}->() if defined $subscriber->{complete};
463 2         5 return;
464             }
465              
466 8         16 my $source = shift @$sources;
467 8         17 my $own_subscription = RxPerl::Subscription->new;
468             my $own_subscriber = {
469             new_subscription => $own_subscription,
470             next => $subscriber->{next},
471             error => sub {
472 6     6   31 _rx_on_error_resume_next_helper($sources, $subscriber, $active);
473             },
474             complete => sub {
475 2     2   7 _rx_on_error_resume_next_helper($sources, $subscriber, $active);
476             },
477 8         39 };
478 8         16 @$active = ($own_subscription);
479 8         24 $source->subscribe($own_subscriber);
480             }
481              
482             sub rx_on_error_resume_next {
483 2     2   10 my @sources = @_;
484              
485             return rx_observable->new(sub {
486 2     2   13 my ($subscriber) = @_;
487              
488 2         7 my @sources = @sources;
489              
490 2         3 my @active;
491             $subscriber->subscription->add(
492 2         10 \@active, sub { undef @sources },
493 2         6 );
494              
495 2         12 _rx_on_error_resume_next_helper(\@sources, $subscriber, \@active);
496              
497 2         6 return;
498 2         5 });
499             }
500              
501             sub rx_partition {
502 2     2   35 my ($source, $predicate) = @_;
503              
504 2         9 my $o1 = $source->pipe(
505             RxPerl::Operators::Pipeable::op_filter($predicate),
506             );
507              
508 2         4 my $i = -1;
509             my $o2 = $source->pipe(
510             RxPerl::Operators::Pipeable::op_filter(sub {
511 20     20   26 $i++;
512 20         55 return not $predicate->($_[0], $i);
513 2         9 }),
514             );
515              
516 2         20 return ($o1, $o2);
517             }
518              
519             sub rx_race {
520 2     2   11 my (@sources) = @_;
521              
522             return rx_observable->new(sub {
523 2     2   6 my ($subscriber) = @_;
524             # TODO: experiment in the end with passing a second parameter here, an arrayref, called \@early_return_values
525             # TODO: like: my ($subscriber, $early_return_values) = @_; and then push @$early_return_values, sub {...};
526              
527 2         5 my @sources = @sources;
528              
529 2         4 my @own_subscriptions;
530 2         7 $subscriber->subscription->add(\@own_subscriptions);
531              
532 2         10 for (my $i = 0; $i < @sources; $i++) {
533 6         12 my $source = $sources[$i];
534              
535 6         15 my $own_subscription = RxPerl::Subscription->new;
536 6         13 push @own_subscriptions, $own_subscription;
537 6         14 my $own_subscriber = {
538             new_subscription => $own_subscription,
539             };
540              
541 6         11 foreach my $type (qw/ next error complete /) {
542             $own_subscriber->{$type} = sub {
543 9         44 $_->unsubscribe foreach grep $_ ne $own_subscription, @own_subscriptions;
544 9         22 @own_subscriptions = ($own_subscription);
545 9         18 @sources = ();
546 9 50       38 $subscriber->{$type}->(@_) if defined $subscriber->{$type};
547 9         59 @$own_subscriber{qw/ next error complete /} = @$subscriber{qw/ next error complete /};
548 18         65 };
549             }
550              
551 6         18 $source->subscribe($own_subscriber);
552             }
553              
554             # this could be replaced with a 'return undef' at this point
555 2         7 return \@own_subscriptions;
556 2         6 });
557             }
558              
559             sub rx_range {
560 1     1   2764 my ($start, $count) = @_;
561              
562             return rx_observable->new(sub {
563 1     1   3 my ($subscriber) = @_;
564              
565 1         3 my $i = $start;
566              
567 1         2 $subscriber->subscription->add(sub { $i = $start + $count });
  1         3  
568              
569 1         11 for (; $i < $start + $count; $i++) {
570 7 50       19 $subscriber->{next}->($i) if defined $subscriber->{next};
571             }
572              
573 1 50       11 $subscriber->{complete}->() if defined $subscriber->{complete};
574              
575 1         3 return;
576 1         4 });
577             }
578              
579 1     1   1911 sub rx_replay_subject { "RxPerl::ReplaySubject" }
580              
581 23     23   6068 sub rx_subject { "RxPerl::Subject" }
582              
583             sub rx_throw_error {
584 28     28   2156 my ($error) = @_;
585              
586             return rx_observable->new(sub {
587 33     33   62 my ($subscriber) = @_;
588              
589 33 50       142 $subscriber->{error}->($error) if defined $subscriber->{error};
590              
591 33         72 return;
592 28         55 });
593             }
594              
595             sub rx_timer {
596 91     91   24659 my ($after, $period) = @_;
597              
598 91         206 my ($timer_sub, $cancel_timer_sub) = get_timer_subs;
599 91         239 my ($interval_sub, $cancel_interval_sub) = get_interval_subs;
600              
601             return rx_observable->new(sub {
602 99     99   175 my ($subscriber) = @_;
603              
604 99         139 my $counter = 0;
605 99         145 my $timer_int;
606             my $timer = $timer_sub->($after, sub {
607 84 100       249 $subscriber->{next}->($counter++) if defined $subscriber->{next};
608 84 100       173 if (defined $period) {
609             $timer_int = $interval_sub->($period, sub {
610 14 50       57 $subscriber->{next}->($counter++) if defined $subscriber->{next};
611 3         18 });
612             } else {
613 81 100       286 $subscriber->{complete}->() if defined $subscriber->{complete};
614             }
615 99         441 });
616              
617             return sub {
618 99         273 $cancel_timer_sub->($timer);
619 99         220 $cancel_interval_sub->($timer_int);
620 99         449 };
621 91         217 });
622             }
623              
624             sub rx_zip {
625 5     5   12 my @sources = @_;
626              
627             return rx_observable->new(sub {
628 5     5   16 my ($subscriber) = @_;
629              
630             my @sources_metadata = map {
631 5         11 +{
632 13         38 buffer => [],
633             completed => 0,
634             };
635             } @sources;
636 5         16 my @own_subscriptions = map RxPerl::Subscription->new, @sources;
637              
638 5         14 $subscriber->subscription->add(\@own_subscriptions);
639              
640 5         21 for my $i (0 .. (@sources - 1)) {
641             my $own_subscriber = {
642             new_subscription => $own_subscriptions[$i],
643             next => sub {
644 48         90 my ($v) = @_;
645              
646             # push to buffer
647 48         61 push @{$sources_metadata[$i]{buffer}}, $v;
  48         91  
648              
649             # if all buffers have elements in them:
650 48 100       174 if (!first {!@{$_->{buffer}}} @sources_metadata) {
  112         144  
  112         265  
651 17         47 my @next = map {shift @$_} map $_->{buffer}, @sources_metadata;
  43         75  
652 17 50       69 $subscriber->{next}->(\@next) if defined $subscriber->{next};
653 17 100       88 if (first {!@{$_->{buffer}} and $_->{completed}} @sources_metadata) {
  37 100       53  
  37         160  
654 4 50       16 $subscriber->{complete}->() if defined $subscriber->{complete};
655             }
656             }
657             },
658             error => sub {
659 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
660             },
661             complete => sub {
662 6         26 $sources_metadata[$i]{completed} = 1;
663 6 50       10 if (!@{$sources_metadata[$i]{buffer}}) {
  6         17  
664 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
665             }
666             },
667 13         94 };
668              
669 13         43 $sources[$i]->subscribe($own_subscriber);
670             }
671              
672 5         14 return;
673 5         11 });
674             }
675              
676             1;