File Coverage

blib/lib/RxPerl/Operators/Creation.pm
Criterion Covered Total %
statement 314 368 85.3
branch 75 148 50.6
condition 15 27 55.5
subroutine 57 65 87.6
pod n/a
total 461 608 75.8


line stmt bran cond sub pod time code
1             package RxPerl::Operators::Creation;
2              
3 5     5   72 use v5.10;
  5         23  
4 5     5   32 use strict;
  5         9  
  5         107  
5 5     5   32 use warnings;
  5         13  
  5         112  
6              
7 5     5   2102 use RxPerl::Observable;
  5         11  
  5         140  
8 5     5   36 use RxPerl::Subscription;
  5         11  
  5         115  
9 5     5   2000 use RxPerl::Utils 'get_timer_subs', 'get_interval_subs';
  5         13  
  5         273  
10 5     5   1955 use RxPerl::Subject;
  5         13  
  5         152  
11 5     5   2017 use RxPerl::BehaviorSubject;
  5         13  
  5         156  
12 5     5   1939 use RxPerl::ReplaySubject;
  5         16  
  5         147  
13              
14 5     5   46 use Carp 'croak';
  5         13  
  5         239  
15 5     5   27 use Scalar::Util qw/ weaken blessed reftype /;
  5         13  
  5         217  
16 5     5   27 use List::Util 'first';
  5         9  
  5         325  
17              
18 5     5   27 use Exporter 'import';
  5         10  
  5         24198  
19             our @EXPORT_OK = qw/
20             rx_behavior_subject rx_combine_latest rx_concat rx_defer rx_EMPTY rx_fork_join rx_from rx_from_event
21             rx_from_event_array rx_generate rx_iif rx_interval rx_merge rx_NEVER rx_observable rx_of rx_on_error_resume_next
22             rx_partition rx_race rx_range rx_replay_subject rx_subject rx_throw_error rx_timer rx_zip
23             /;
24             our %EXPORT_TAGS = (all => \@EXPORT_OK);
25              
26             our $VERSION = "v6.28.0";
27              
28             sub rx_observable;
29              
30 3     3   15250 sub rx_behavior_subject { "RxPerl::BehaviorSubject" }
31              
32             sub rx_combine_latest {
33 2     2   25 my ($sources) = @_;
34              
35             return rx_observable->new(sub {
36 2     2   7 my ($subscriber) = @_;
37              
38 2         5 my $sources = [@$sources];
39              
40 2         3 my %own_subscriptions;
41 2         4 my $i = 0;
42 2         5 my %didnt_emit = map {($i++, 1)} @$sources;
  4         14  
43 2         4 my @latest_values;
44 2         5 my $num_active = @$sources;
45              
46             $subscriber->subscription->add(
47 2         9 \%own_subscriptions, sub { undef @$sources },
48 2         5 );
49              
50 2         14 for (my $i = 0; $i < @$sources; $i++) {
51 4         8 my $j = $i;
52 4         7 my $source = $sources->[$j];
53 4         11 my $own_subscription = RxPerl::Subscription->new;
54 4         11 $own_subscriptions{$own_subscription} = $own_subscription;
55             my $own_observer = {
56             new_subscription => $own_subscription,
57             next => sub {
58 14         26 my ($value) = @_;
59              
60 14         25 $latest_values[$j] = $value;
61 14         25 delete $didnt_emit{$j};
62              
63 14 100       31 if (!%didnt_emit) {
64 12 50       44 $subscriber->{next}->([@latest_values]) if defined $subscriber->{next};
65             }
66             },
67             error => $subscriber->{error},
68             complete => sub {
69 4         8 $num_active--;
70 4 100       12 if ($num_active == 0) {
71 2 50       9 $subscriber->{complete}->() if defined $subscriber->{complete};
72             }
73             },
74 4         36 };
75 4         12 $source->subscribe($own_observer);
76             }
77              
78 2         6 return;
79 2         5 });
80             }
81              
82             sub _rx_concat_helper {
83 76     76   145 my ($sources, $subscriber, $active) = @_;
84              
85 76 100       189 if (! @$sources) {
86 5 50       41 $subscriber->{complete}->() if defined $subscriber->{complete};
87 5         15 return;
88             }
89              
90 71         117 my $source = shift @$sources;
91 71         161 my $own_subscription = RxPerl::Subscription->new;
92             my $own_subscriber = {
93             new_subscription => $own_subscription,
94             next => $subscriber->{next},
95             error => $subscriber->{error},
96             complete => sub {
97 42     42   130 _rx_concat_helper($sources, $subscriber, $active);
98             },
99 71         345 };
100 71         159 @$active = ($own_subscription);
101 71         173 $source->subscribe($own_subscriber);
102             }
103              
104             sub rx_concat {
105 29     29   77 my @sources = @_;
106              
107             return rx_observable->new(sub {
108 34     34   73 my ($subscriber) = @_;
109              
110 34         82 my @sources = @sources;
111              
112 34         56 my @active;
113             $subscriber->subscription->add(
114 34         93 \@active, sub { undef @sources },
115 34         84 );
116              
117 34         140 _rx_concat_helper(\@sources, $subscriber, \@active);
118              
119 34         82 return;
120 29         61 });
121             }
122              
123             sub rx_defer {
124 1     1   4 my ($observable_factory) = @_;
125              
126             return rx_observable->new(sub {
127 2     2   6 my ($subscriber) = @_;
128              
129 2         6 my $observable = $observable_factory->();
130              
131 2         19 return $observable->subscribe($subscriber);
132 1         4 });
133             }
134              
135             sub rx_EMPTY {
136             state $rx_EMPTY = rx_observable->new(sub {
137 9     9   27 my ($subscriber) = @_;
138              
139 9 50       35 $subscriber->{complete}->() if defined $subscriber->{complete};
140              
141 9         28 return;
142 9     9   8857 });
143             }
144              
145             sub rx_fork_join {
146 4     4   13 my ($sources) = @_;
147              
148 4   66     40 my $arg_is_array = !(blessed $sources) && (reftype $sources eq 'ARRAY');
149 4   66     74 my $arg_is_hash = !(blessed $sources) && (reftype $sources eq 'HASH');
150              
151 4 50 66     16 croak "argument of rx_fork_join needs to be either an arrayref or a hashref"
152             unless $arg_is_array or $arg_is_hash;
153              
154 4 100       9 if ($arg_is_array) {
155 2         5 my $i = 0;
156 2         6 $sources = { map {($i++, $_)} @$sources };
  7         18  
157             }
158              
159             return rx_observable->new(sub {
160 4     4   12 my ($subscriber) = @_;
161              
162 4         15 my $sources = { %$sources };
163 4         102 my %last_values;
164             my %own_subscriptions;
165 4         15 my @keys = keys %$sources;
166 4 100       16 @keys = sort {$a <=> $b} @keys if $arg_is_array;
  6         15  
167              
168             $subscriber->subscription->add(
169 4         12 \%own_subscriptions, sub { undef @keys },
170 4         72 );
171              
172 4 50       12 if (! @keys) {
173 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
174 0         0 return;
175             }
176              
177 4         14 for (my $i = 0; $i < @keys; $i++) {
178 11         24 my $key = $keys[$i];
179 11         16 my $source = $sources->{$key};
180 11         26 my $own_subscription = RxPerl::Subscription->new;
181 11         38 $own_subscriptions{$own_subscription} = $own_subscription;
182             $source->subscribe({
183             new_subscription => $own_subscription,
184             next => sub {
185 27         62 $last_values{$key} = $_[0];
186             },
187             error => $subscriber->{error},
188             complete => sub {
189 11 100       24 if (exists $last_values{$key}) {
190 9 100       41 if (keys(%last_values) == keys %$sources) {
191 2 100       6 if ($arg_is_array) {
192 1         3 my @ret;
193 1         5 $ret[$_] = $last_values{$_} foreach keys %last_values;
194 1 50       7 $subscriber->{next}->(\@ret) if defined $subscriber->{next};
195             }
196             else {
197 1 50       6 $subscriber->{next}->(\%last_values) if defined $subscriber->{next};
198             }
199 2 50       16 $subscriber->{complete}->() if defined $subscriber->{complete};
200             }
201             } else {
202 2 50       8 $subscriber->{complete}->() if defined $subscriber->{complete};
203             }
204             },
205 11         77 });
206             }
207              
208 4         14 return;
209 4         10 });
210             }
211              
212             sub rx_from {
213 3     3   8135 my ($thing) = @_;
214              
215 3 100 66     53 if (blessed $thing and $thing->isa('RxPerl::Observable')) {
    50 33        
    50 33        
    100 66        
    50 33        
216 1         7 return $thing;
217             }
218              
219             elsif (blessed $thing and $thing->isa('Future')) {
220             return rx_observable->new(sub {
221 0     0   0 my ($subscriber) = @_;
222              
223             $thing->on_done(sub {
224 0 0       0 $subscriber->{next}->(splice @_, 0, 1) if defined $subscriber->{next};
225 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
226 0         0 });
227              
228             $thing->on_fail(sub {
229 0 0       0 $subscriber->{error}->(splice @_, 0, 1) if defined $subscriber->{error};
230 0         0 });
231              
232             $thing->on_ready(sub {
233 0 0       0 if ($thing->is_cancelled) {
234 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
235             }
236 0         0 });
237 0         0 });
238             }
239              
240             elsif (blessed $thing and $thing->can('then')) {
241             return rx_observable->new(sub {
242 0     0   0 my ($subscriber) = @_;
243              
244             $thing->then(
245             sub {
246 0 0       0 $subscriber->{next}->(splice @_, 0, 1) if defined $subscriber->{next};
247 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
248             },
249             sub {
250 0 0       0 $subscriber->{error}->(splice @_, 0, 1) if defined $subscriber->{error};
251             },
252 0         0 );
253              
254 0         0 return;
255 0         0 });
256             }
257              
258             elsif (ref $thing eq 'ARRAY' and ! blessed $thing) {
259 1         6 return rx_of(@$thing);
260             }
261              
262             elsif (defined $thing and ! length(ref $thing)) {
263 1         32 my @letters = split //, $thing;
264 1         6 return rx_of(@letters);
265             }
266              
267             else {
268 0         0 croak "rx_from only accepts arrayrefs, promises, observables, and strings as argument at the moment,";
269             }
270             }
271              
272             # NOTE: rx_from_event and rx_from_event_array keep a weak reference to the
273             # EventEmitter $object. Should this change? TODO: think about that.
274              
275             sub rx_from_event {
276 0     0   0 my ($object, $event_type) = @_;
277              
278 0 0       0 croak 'invalid object type, at rx_from_event' if not $object->isa('Mojo::EventEmitter');
279              
280 0         0 weaken($object);
281             return rx_observable->new(sub {
282 0     0   0 my ($subscriber) = @_;
283              
284             my $cb = sub {
285 0         0 my ($e, @args) = @_;
286              
287 0 0       0 $subscriber->{next}->(splice @args, 0, 1) if defined $subscriber->{next};
288 0         0 };
289              
290             $subscriber->subscription->add(sub {
291 0 0       0 $object->unsubscribe($event_type, $cb) if defined $object;
292 0         0 });
293              
294 0         0 $object->on($event_type, $cb);
295              
296 0         0 return;
297 0         0 });
298             }
299              
300             sub rx_from_event_array {
301 0     0   0 my ($object, $event_type) = @_;
302              
303 0 0       0 croak 'invalid object type, at rx_from_event_array' if not $object->isa('Mojo::EventEmitter');
304              
305 0         0 weaken($object);
306             return rx_observable->new(sub {
307 0     0   0 my ($subscriber) = @_;
308              
309             my $cb = sub {
310 0         0 my ($e, @args) = @_;
311              
312 0 0       0 $subscriber->{next}->([@args]) if defined $subscriber->{next};
313 0         0 };
314              
315             $subscriber->subscription->add(sub {
316 0 0       0 $object->unsubscribe($event_type, $cb) if defined $object;
317 0         0 });
318              
319 0         0 $object->on($event_type, $cb);
320              
321 0         0 return;
322 0         0 });
323             }
324              
325             sub rx_generate {
326 1     1   4768 my ($initial, $condition, $iterate, $result_selector) = @_;
327              
328             return rx_observable->new(sub {
329 1     1   6 my ($subscriber) = @_;
330              
331 1         3 my $must_finish = 0;
332              
333 1         4 $subscriber->subscription->add(sub { $must_finish = 1 });
  1         10  
334              
335 1         3 my $x = $initial;
336 1         3 while (1) {
337 6 50       14 ! $must_finish or last;
338 6         8 my $cond; my $ok = eval { local $_ = $x; $cond = $condition->($x); 1 };
  6         8  
  6         11  
  6         12  
  6         19  
339 6 50       15 if (! $ok) {
340 0 0       0 $subscriber->{error}->($@) if defined $subscriber->{error};
341 0         0 last;
342             }
343 6 100       14 if (! $cond) {
344 1 50       5 $subscriber->{complete}->() if defined $subscriber->{complete};
345 1         4 last;
346             }
347 5 50       7 my $output_val; $ok = eval { local $_ = $x; $output_val = $result_selector ? $result_selector->($x) : $x; 1 };
  5         7  
  5         7  
  5         28  
  5         20  
348 5 50       10 if (! $ok) {
349 0 0       0 $subscriber->{error}->($@) if defined $subscriber->{error};
350 0         0 last;
351             }
352 5 50       33 $subscriber->{next}->($output_val) if defined $subscriber->{next};
353 5         12 $ok = eval { local $_ = $x; $x = $iterate->($x); 1 };
  5         7  
  5         12  
  5         22  
354 5 50       15 if (! $ok) {
355 0 0       0 $subscriber->{error}->($@) if defined $subscriber->{error};
356 0         0 last;
357             }
358             }
359 1         4 });
360             }
361              
362             sub rx_iif {
363 1     1   3 my ($condition, $true_result, $false_result) = @_;
364              
365             return rx_defer(sub {
366 2 100   2   33 return $condition->() ? $true_result : $false_result;
367 1         8 });
368             }
369              
370             sub rx_interval {
371 63     63   88841 my ($after) = @_;
372              
373 63         159 my ($interval_sub, $cancel_interval_sub) = get_interval_subs;
374              
375             return rx_observable->new(sub {
376 47     47   108 my ($subscriber) = @_;
377              
378 47         71 my $counter = 0;
379             my $timer = $interval_sub->($after, sub {
380 172 50       614 $subscriber->{next}->($counter++) if defined $subscriber->{next};
381 47         201 });
382              
383             return sub {
384 47         129 $cancel_interval_sub->($timer);
385 47         200 };
386 63         155 });
387             }
388              
389             sub rx_merge {
390 191     191   457 my @sources = @_;
391              
392             return rx_observable->new(sub {
393 202     202   344 my ($subscriber) = @_;
394              
395 202         401 my @sources = @sources;
396              
397 202         279 my %own_subscriptions;
398             $subscriber->subscription->add(
399             \%own_subscriptions,
400 202         600 sub { @sources = () },
401 202         500 );
402              
403 202         388 my $num_active_subscriptions = @sources;
404 202 50 66     600 $num_active_subscriptions or $subscriber->{complete}->() if defined $subscriber->{complete};
405              
406 202         492 for (my $i = 0; $i < @sources; $i++) {
407 794         1371 my $source = $sources[$i];
408 794         1798 my $own_subscription = RxPerl::Subscription->new;
409 794         2593 $own_subscriptions{$own_subscription} = $own_subscription;
410             my $own_subscriber = {
411             new_subscription => $own_subscription,
412             next => $subscriber->{next},
413             error => $subscriber->{error},
414             complete => sub {
415 717         1666 delete $own_subscriptions{$own_subscription};
416 717 100       1659 if (! --$num_active_subscriptions) {
417 156 50       438 $subscriber->{complete}->() if defined $subscriber->{complete};
418             }
419             },
420 794         3246 };
421 794         2005 $source->subscribe($own_subscriber);
422             }
423              
424 202         499 return;
425 191         338 });
426             }
427              
428             sub rx_NEVER {
429 0     0   0 state $rx_never = rx_observable->new(sub { return });
  0     0   0  
430             }
431              
432 2081     2081   18814 sub rx_observable { "RxPerl::Observable" }
433              
434             sub rx_of {
435 720     720   83403 my (@values) = @_;
436              
437             return rx_observable->new(sub {
438 768     768   1247 my ($subscriber) = @_;
439              
440 768         1179 my $i = 0;
441              
442 768         1719 $subscriber->subscription->add(sub { $i = @values });
  768         2007  
443              
444 768         1960 for (; $i < @values; $i++) {
445 889 50       2597 $subscriber->{next}->($values[$i]) if defined $subscriber->{next};
446             }
447              
448 768 100       2634 $subscriber->{complete}->() if defined $subscriber->{complete};
449              
450 768         2095 return;
451 720         1297 });
452             }
453              
454             sub _rx_on_error_resume_next_helper {
455 10     10   20 my ($sources, $subscriber, $active) = @_;
456              
457 10 100       24 if (! @$sources) {
458 2 50       13 $subscriber->{complete}->() if defined $subscriber->{complete};
459 2         15 return;
460             }
461              
462 8         16 my $source = shift @$sources;
463 8         18 my $own_subscription = RxPerl::Subscription->new;
464             my $own_subscriber = {
465             new_subscription => $own_subscription,
466             next => $subscriber->{next},
467             error => sub {
468 6     6   19 _rx_on_error_resume_next_helper($sources, $subscriber, $active);
469             },
470             complete => sub {
471 2     2   7 _rx_on_error_resume_next_helper($sources, $subscriber, $active);
472             },
473 8         51 };
474 8         19 @$active = ($own_subscription);
475 8         37 $source->subscribe($own_subscriber);
476             }
477              
478             sub rx_on_error_resume_next {
479 2     2   6 my @sources = @_;
480              
481             return rx_observable->new(sub {
482 2     2   6 my ($subscriber) = @_;
483              
484 2         6 my @sources = @sources;
485              
486 2         4 my @active;
487             $subscriber->subscription->add(
488 2         10 \@active, sub { undef @sources },
489 2         39 );
490              
491 2         10 _rx_on_error_resume_next_helper(\@sources, $subscriber, \@active);
492              
493 2         5 return;
494 2         7 });
495             }
496              
497             sub rx_partition {
498 2     2   21 my ($source, $predicate) = @_;
499              
500 2         9 my $o1 = $source->pipe(
501             RxPerl::Operators::Pipeable::op_filter($predicate),
502             );
503              
504 2         4 my $i = 0;
505             my $o2 = $source->pipe(
506             RxPerl::Operators::Pipeable::op_filter(sub {
507 20     20   55 return not $predicate->($_[0], $i++);
508 2         10 }),
509             );
510              
511 2         12 return ($o1, $o2);
512             }
513              
514             sub rx_race {
515 2     2   16 my (@sources) = @_;
516              
517             return rx_observable->new(sub {
518 2     2   18 my ($subscriber) = @_;
519             # TODO: experiment in the end with passing a second parameter here, an arrayref, called \@early_return_values
520             # TODO: like: my ($subscriber, $early_return_values) = @_; and then push @$early_return_values, sub {...};
521              
522 2         6 my @sources = @sources;
523              
524 2         5 my @own_subscriptions;
525 2         6 $subscriber->subscription->add(\@own_subscriptions);
526              
527 2         18 for (my $i = 0; $i < @sources; $i++) {
528 6         11 my $source = $sources[$i];
529              
530 6         17 my $own_subscription = RxPerl::Subscription->new;
531 6         17 push @own_subscriptions, $own_subscription;
532 6         13 my $own_subscriber = {
533             new_subscription => $own_subscription,
534             };
535              
536 6         13 foreach my $type (qw/ next error complete /) {
537             $own_subscriber->{$type} = sub {
538 9         55 $_->unsubscribe foreach grep $_ ne $own_subscription, @own_subscriptions;
539 9         30 @own_subscriptions = ($own_subscription);
540 9         18 @sources = ();
541 9 50       34 $subscriber->{$type}->(@_) if defined $subscriber->{$type};
542 9         59 @$own_subscriber{qw/ next error complete /} = @$subscriber{qw/ next error complete /};
543 18         63 };
544             }
545              
546 6         20 $source->subscribe($own_subscriber);
547             }
548              
549             # this could be replaced with a 'return undef' at this point
550 2         8 return \@own_subscriptions;
551 2         6 });
552             }
553              
554             sub rx_range {
555 1     1   2721 my ($start, $count) = @_;
556              
557             return rx_observable->new(sub {
558 1     1   7 my ($subscriber) = @_;
559              
560 1         3 my $i = $start;
561              
562 1         5 $subscriber->subscription->add(sub { $i = $start + $count });
  1         3  
563              
564 1         6 for (; $i < $start + $count; $i++) {
565 7 50       21 $subscriber->{next}->($i) if defined $subscriber->{next};
566             }
567              
568 1 50       6 $subscriber->{complete}->() if defined $subscriber->{complete};
569              
570 1         3 return;
571 1         4 });
572             }
573              
574 1     1   2295 sub rx_replay_subject { "RxPerl::ReplaySubject" }
575              
576 21     21   6109 sub rx_subject { "RxPerl::Subject" }
577              
578             sub rx_throw_error {
579 28     28   2199 my ($error) = @_;
580              
581             return rx_observable->new(sub {
582 33     33   106 my ($subscriber) = @_;
583              
584 33 50       151 $subscriber->{error}->($error) if defined $subscriber->{error};
585              
586 33         96 return;
587 28         63 });
588             }
589              
590             sub rx_timer {
591 91     91   24113 my ($after, $period) = @_;
592              
593 91         205 my ($timer_sub, $cancel_timer_sub) = get_timer_subs;
594 91         255 my ($interval_sub, $cancel_interval_sub) = get_interval_subs;
595              
596             return rx_observable->new(sub {
597 99     99   166 my ($subscriber) = @_;
598              
599 99         156 my $counter = 0;
600 99         128 my $timer_int;
601             my $timer = $timer_sub->($after, sub {
602 84 100       246 $subscriber->{next}->($counter++) if defined $subscriber->{next};
603 84 100       196 if (defined $period) {
604             $timer_int = $interval_sub->($period, sub {
605 14 50       74 $subscriber->{next}->($counter++) if defined $subscriber->{next};
606 3         28 });
607             } else {
608 81 100       275 $subscriber->{complete}->() if defined $subscriber->{complete};
609             }
610 99         836 });
611              
612             return sub {
613 99         307 $cancel_timer_sub->($timer);
614 99         228 $cancel_interval_sub->($timer_int);
615 99         533 };
616 91         194 });
617             }
618              
619             sub rx_zip {
620 5     5   12 my @sources = @_;
621              
622             return rx_observable->new(sub {
623 5     5   11 my ($subscriber) = @_;
624              
625             my @sources_metadata = map {
626 5         10 +{
627 13         34 buffer => [],
628             completed => 0,
629             };
630             } @sources;
631 5         16 my @own_subscriptions = map RxPerl::Subscription->new, @sources;
632              
633 5         15 $subscriber->subscription->add(\@own_subscriptions);
634              
635 5         18 for my $i (0 .. (@sources - 1)) {
636             my $own_subscriber = {
637             new_subscription => $own_subscriptions[$i],
638             next => sub {
639 48         84 my ($v) = @_;
640              
641             # push to buffer
642 48         63 push @{$sources_metadata[$i]{buffer}}, $v;
  48         91  
643              
644             # if all buffers have elements in them:
645 48 100       162 if (!first {!@{$_->{buffer}}} @sources_metadata) {
  112         149  
  112         263  
646 17         48 my @next = map {shift @$_} map $_->{buffer}, @sources_metadata;
  43         76  
647 17 50       65 $subscriber->{next}->(\@next) if defined $subscriber->{next};
648 17 100       86 if (first {!@{$_->{buffer}} and $_->{completed}} @sources_metadata) {
  37 100       55  
  37         171  
649 4 50       16 $subscriber->{complete}->() if defined $subscriber->{complete};
650             }
651             }
652             },
653             error => sub {
654 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
655             },
656             complete => sub {
657 6         11 $sources_metadata[$i]{completed} = 1;
658 6 50       9 if (!@{$sources_metadata[$i]{buffer}}) {
  6         17  
659 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
660             }
661             },
662 13         94 };
663              
664 13         41 $sources[$i]->subscribe($own_subscriber);
665             }
666              
667 5         14 return;
668 5         43 });
669             }
670              
671             1;