File Coverage

blib/lib/RxPerl/Operators/Creation.pm
Criterion Covered Total %
statement 313 367 85.2
branch 75 148 50.6
condition 17 33 51.5
subroutine 56 64 87.5
pod n/a
total 461 612 75.3


line stmt bran cond sub pod time code
1             package RxPerl::Operators::Creation;
2 5     5   26 use strict;
  5         8  
  5         108  
3 5     5   19 use warnings;
  5         10  
  5         91  
4              
5 5     5   1703 use RxPerl::Observable;
  5         23  
  5         116  
6 5     5   27 use RxPerl::Subscription;
  5         9  
  5         120  
7 5     5   1515 use RxPerl::Utils 'get_timer_subs', 'get_interval_subs';
  5         8  
  5         224  
8 5     5   1519 use RxPerl::Subject;
  5         13  
  5         121  
9 5     5   1822 use RxPerl::BehaviorSubject;
  5         10  
  5         116  
10 5     5   1490 use RxPerl::ReplaySubject;
  5         12  
  5         118  
11              
12 5     5   27 use Carp 'croak';
  5         7  
  5         186  
13 5     5   25 use Scalar::Util qw/ weaken blessed reftype /;
  5         8  
  5         185  
14 5     5   23 use List::Util 'first';
  5         8  
  5         236  
15              
16 5     5   22 use Exporter 'import';
  5         11  
  5         18527  
17             our @EXPORT_OK = qw/
18             rx_behavior_subject rx_combine_latest rx_concat rx_defer rx_EMPTY rx_fork_join rx_from rx_from_event
19             rx_from_event_array rx_generate rx_iif rx_interval rx_merge rx_NEVER rx_observable rx_of rx_on_error_resume_next
20             rx_partition rx_race rx_range rx_replay_subject rx_subject rx_throw_error rx_timer rx_zip
21             /;
22             our %EXPORT_TAGS = (all => \@EXPORT_OK);
23              
24             our $VERSION = "v6.27.1";
25              
26             sub rx_observable;
27              
28 3     3   11952 sub rx_behavior_subject { "RxPerl::BehaviorSubject" }
29              
30             sub rx_combine_latest {
31 2     2   13 my ($sources) = @_;
32              
33             return rx_observable->new(sub {
34 2     2   4 my ($subscriber) = @_;
35              
36 2         4 my $sources = [@$sources];
37              
38 2         4 my %own_subscriptions;
39 2         3 my $i = 0;
40 2         3 my %didnt_emit = map {($i++, 1)} @$sources;
  4         12  
41 2         3 my @latest_values;
42 2         4 my $num_active = @$sources;
43              
44             $subscriber->subscription->add(
45 2         6 \%own_subscriptions, sub { undef @$sources },
46 2         5 );
47              
48 2         6 for (my $i = 0; $i < @$sources; $i++) {
49 4         4 my $j = $i;
50 4         7 my $source = $sources->[$j];
51 4         8 my $own_subscription = RxPerl::Subscription->new;
52 4         9 $own_subscriptions{$own_subscription} = $own_subscription;
53             my $own_observer = {
54             new_subscription => $own_subscription,
55             next => sub {
56 14         21 my ($value) = @_;
57              
58 14         22 $latest_values[$j] = $value;
59 14         17 delete $didnt_emit{$j};
60              
61 14 100       24 if (!%didnt_emit) {
62 12 50       36 $subscriber->{next}->([@latest_values]) if defined $subscriber->{next};
63             }
64             },
65             error => $subscriber->{error},
66             complete => sub {
67 4         7 $num_active--;
68 4 100       7 if ($num_active == 0) {
69 2 50       7 $subscriber->{complete}->() if defined $subscriber->{complete};
70             }
71             },
72 4         20 };
73 4         9 $source->subscribe($own_observer);
74             }
75              
76 2         5 return;
77 2         3 });
78             }
79              
80             sub _rx_concat_helper {
81 76     76   128 my ($sources, $subscriber, $active) = @_;
82              
83 76 100       143 if (! @$sources) {
84 5 50       21 $subscriber->{complete}->() if defined $subscriber->{complete};
85 5         12 return;
86             }
87              
88 71         95 my $source = shift @$sources;
89 71         147 my $own_subscription = RxPerl::Subscription->new;
90             my $own_subscriber = {
91             new_subscription => $own_subscription,
92             next => $subscriber->{next},
93             error => $subscriber->{error},
94             complete => sub {
95 42     42   92 _rx_concat_helper($sources, $subscriber, $active);
96             },
97 71         290 };
98 71         135 @$active = ($own_subscription);
99 71         145 $source->subscribe($own_subscriber);
100             }
101              
102             sub rx_concat {
103 29     29   63 my @sources = @_;
104              
105             return rx_observable->new(sub {
106 34     34   63 my ($subscriber) = @_;
107              
108 34         56 my @sources = @sources;
109              
110 34         43 my @active;
111             $subscriber->subscription->add(
112 34         82 \@active, sub { undef @sources },
113 34         71 );
114              
115 34         106 _rx_concat_helper(\@sources, $subscriber, \@active);
116              
117 34         68 return;
118 29         50 });
119             }
120              
121             sub rx_defer {
122 1     1   3 my ($observable_factory) = @_;
123              
124             return rx_observable->new(sub {
125 2     2   5 my ($subscriber) = @_;
126              
127 2         3 my $observable = $observable_factory->();
128              
129 2         11 return $observable->subscribe($subscriber);
130 1         2 });
131             }
132              
133             my $rx_EMPTY;
134              
135             sub rx_EMPTY {
136             $rx_EMPTY //= rx_observable->new(sub {
137 9     9   18 my ($subscriber) = @_;
138              
139 9 50       31 $subscriber->{complete}->() if defined $subscriber->{complete};
140              
141 9         15 return;
142 9   66 9   7415 });
143             }
144              
145             sub rx_fork_join {
146 4     4   6 my ($sources) = @_;
147              
148 4   66     62 my $arg_is_array = !(blessed $sources) && (reftype $sources eq 'ARRAY');
149 4   66     17 my $arg_is_hash = !(blessed $sources) && (reftype $sources eq 'HASH');
150              
151 4 50 66     14 croak "argument of rx_fork_join needs to be either an arrayref or a hashref"
152             unless $arg_is_array or $arg_is_hash;
153              
154 4 100       7 if ($arg_is_array) {
155 2         3 my $i = 0;
156 2         4 $sources = { map {($i++, $_)} @$sources };
  7         16  
157             }
158              
159             return rx_observable->new(sub {
160 4     4   6 my ($subscriber) = @_;
161              
162 4         13 my $sources = { %$sources };
163 4         7 my %last_values;
164             my %own_subscriptions;
165 4         9 my @keys = keys %$sources;
166 4 100       97 @keys = sort {$a <=> $b} @keys if $arg_is_array;
  7         14  
167              
168             $subscriber->subscription->add(
169 4         11 \%own_subscriptions, sub { undef @keys },
170 4         10 );
171              
172 4 50       9 if (! @keys) {
173 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
174 0         0 return;
175             }
176              
177 4         62 for (my $i = 0; $i < @keys; $i++) {
178 11         20 my $key = $keys[$i];
179 11         17 my $source = $sources->{$key};
180 11         21 my $own_subscription = RxPerl::Subscription->new;
181 11         26 $own_subscriptions{$own_subscription} = $own_subscription;
182             $source->subscribe({
183             new_subscription => $own_subscription,
184             next => sub {
185 27         52 $last_values{$key} = $_[0];
186             },
187             error => $subscriber->{error},
188             complete => sub {
189 11 100       20 if (exists $last_values{$key}) {
190 9 100       23 if (keys(%last_values) == keys %$sources) {
191 2 100       4 if ($arg_is_array) {
192 1         1 my @ret;
193 1         4 $ret[$_] = $last_values{$_} foreach keys %last_values;
194 1 50       5 $subscriber->{next}->(\@ret) if defined $subscriber->{next};
195             }
196             else {
197 1 50       5 $subscriber->{next}->(\%last_values) if defined $subscriber->{next};
198             }
199 2 50       7 $subscriber->{complete}->() if defined $subscriber->{complete};
200             }
201             } else {
202 2 50       6 $subscriber->{complete}->() if defined $subscriber->{complete};
203             }
204             },
205 11         56 });
206             }
207              
208 4         10 return;
209 4         8 });
210             }
211              
212             sub rx_from {
213 3     3   6211 my ($thing) = @_;
214              
215 3 100 66     40 if (blessed $thing and $thing->isa('RxPerl::Observable')) {
    50 33        
    50 33        
    100 66        
    50 33        
216 1         6 return $thing;
217             }
218              
219             elsif (blessed $thing and $thing->isa('Future')) {
220             return rx_observable->new(sub {
221 0     0   0 my ($subscriber) = @_;
222              
223             $thing->on_done(sub {
224 0 0       0 $subscriber->{next}->(splice @_, 0, 1) if defined $subscriber->{next};
225 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
226 0         0 });
227              
228             $thing->on_fail(sub {
229 0 0       0 $subscriber->{error}->(splice @_, 0, 1) if defined $subscriber->{error};
230 0         0 });
231              
232             $thing->on_ready(sub {
233 0 0       0 if ($thing->is_cancelled) {
234 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
235             }
236 0         0 });
237 0         0 });
238             }
239              
240             elsif (blessed $thing and $thing->can('then')) {
241             return rx_observable->new(sub {
242 0     0   0 my ($subscriber) = @_;
243              
244             $thing->then(
245             sub {
246 0 0       0 $subscriber->{next}->(splice @_, 0, 1) if defined $subscriber->{next};
247 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
248             },
249             sub {
250 0 0       0 $subscriber->{error}->(splice @_, 0, 1) if defined $subscriber->{error};
251             },
252 0         0 );
253              
254 0         0 return;
255 0         0 });
256             }
257              
258             elsif (ref $thing eq 'ARRAY' and ! blessed $thing) {
259 1         3 return rx_of(@$thing);
260             }
261              
262             elsif (defined $thing and ! length(ref $thing)) {
263 1         9 my @letters = split //, $thing;
264 1         3 return rx_of(@letters);
265             }
266              
267             else {
268 0         0 croak "rx_from only accepts arrayrefs, promises, observables, and strings as argument at the moment,";
269             }
270             }
271              
272             # NOTE: rx_from_event and rx_from_event_array keep a weak reference to the
273             # EventEmitter $object. Should this change? TODO: think about that.
274              
275             sub rx_from_event {
276 0     0   0 my ($object, $event_type) = @_;
277              
278 0 0       0 croak 'invalid object type, at rx_from_event' if not $object->isa('Mojo::EventEmitter');
279              
280 0         0 weaken($object);
281             return rx_observable->new(sub {
282 0     0   0 my ($subscriber) = @_;
283              
284             my $cb = sub {
285 0         0 my ($e, @args) = @_;
286              
287 0 0       0 $subscriber->{next}->(splice @args, 0, 1) if defined $subscriber->{next};
288 0         0 };
289              
290             $subscriber->subscription->add(sub {
291 0 0       0 $object->unsubscribe($event_type, $cb) if defined $object;
292 0         0 });
293              
294 0         0 $object->on($event_type, $cb);
295              
296 0         0 return;
297 0         0 });
298             }
299              
300             sub rx_from_event_array {
301 0     0   0 my ($object, $event_type) = @_;
302              
303 0 0       0 croak 'invalid object type, at rx_from_event_array' if not $object->isa('Mojo::EventEmitter');
304              
305 0         0 weaken($object);
306             return rx_observable->new(sub {
307 0     0   0 my ($subscriber) = @_;
308              
309             my $cb = sub {
310 0         0 my ($e, @args) = @_;
311              
312 0 0       0 $subscriber->{next}->([@args]) if defined $subscriber->{next};
313 0         0 };
314              
315             $subscriber->subscription->add(sub {
316 0 0       0 $object->unsubscribe($event_type, $cb) if defined $object;
317 0         0 });
318              
319 0         0 $object->on($event_type, $cb);
320              
321 0         0 return;
322 0         0 });
323             }
324              
325             sub rx_generate {
326 1     1   3956 my ($initial, $condition, $iterate, $result_selector) = @_;
327              
328             return rx_observable->new(sub {
329 1     1   2 my ($subscriber) = @_;
330              
331 1         3 my $must_finish = 0;
332              
333 1         3 $subscriber->subscription->add(sub { $must_finish = 1 });
  1         3  
334              
335 1         1 my $x = $initial;
336 1         2 while (1) {
337 6 50       10 ! $must_finish or last;
338 6         6 my $cond; my $ok = eval { local $_ = $x; $cond = $condition->($x); 1 };
  6         7  
  6         7  
  6         10  
  6         15  
339 6 50       12 if (! $ok) {
340 0 0       0 $subscriber->{error}->($@) if defined $subscriber->{error};
341 0         0 last;
342             }
343 6 100       7 if (! $cond) {
344 1 50       5 $subscriber->{complete}->() if defined $subscriber->{complete};
345 1         4 last;
346             }
347 5 50       6 my $output_val; $ok = eval { local $_ = $x; $output_val = $result_selector ? $result_selector->($x) : $x; 1 };
  5         6  
  5         6  
  5         11  
  5         13  
348 5 50       9 if (! $ok) {
349 0 0       0 $subscriber->{error}->($@) if defined $subscriber->{error};
350 0         0 last;
351             }
352 5 50       14 $subscriber->{next}->($output_val) if defined $subscriber->{next};
353 5         6 $ok = eval { local $_ = $x; $x = $iterate->($x); 1 };
  5         5  
  5         9  
  5         11  
354 5 50       9 if (! $ok) {
355 0 0       0 $subscriber->{error}->($@) if defined $subscriber->{error};
356 0         0 last;
357             }
358             }
359 1         3 });
360             }
361              
362             sub rx_iif {
363 1     1   2 my ($condition, $true_result, $false_result) = @_;
364              
365             return rx_defer(sub {
366 2 100   2   5 return $condition->() ? $true_result : $false_result;
367 1         7 });
368             }
369              
370             sub rx_interval {
371 64     64   76018 my ($after) = @_;
372              
373 64         153 my ($interval_sub, $cancel_interval_sub) = get_interval_subs;
374              
375             return rx_observable->new(sub {
376 48     48   81 my ($subscriber) = @_;
377              
378 48         63 my $counter = 0;
379             my $timer = $interval_sub->($after, sub {
380 177 50       494 $subscriber->{next}->($counter++) if defined $subscriber->{next};
381 48         161 });
382              
383             return sub {
384 48         111 $cancel_interval_sub->($timer);
385 48         160 };
386 64         136 });
387             }
388              
389             sub rx_merge {
390 192     192   360 my @sources = @_;
391              
392             return rx_observable->new(sub {
393 203     203   299 my ($subscriber) = @_;
394              
395 203         316 my @sources = @sources;
396              
397 203         237 my %own_subscriptions;
398             $subscriber->subscription->add(
399             \%own_subscriptions,
400 203         493 sub { @sources = () },
401 203         403 );
402              
403 203         358 my $num_active_subscriptions = @sources;
404 203 50 66     525 $num_active_subscriptions or $subscriber->{complete}->() if defined $subscriber->{complete};
405              
406 203         403 for (my $i = 0; $i < @sources; $i++) {
407 796         1052 my $source = $sources[$i];
408 796         1548 my $own_subscription = RxPerl::Subscription->new;
409 796         1637 $own_subscriptions{$own_subscription} = $own_subscription;
410             my $own_subscriber = {
411             new_subscription => $own_subscription,
412             next => $subscriber->{next},
413             error => $subscriber->{error},
414             complete => sub {
415 719         1385 delete $own_subscriptions{$own_subscription};
416 719 100       1311 if (! --$num_active_subscriptions) {
417 157 50       373 $subscriber->{complete}->() if defined $subscriber->{complete};
418             }
419             },
420 796         2758 };
421 796         1568 $source->subscribe($own_subscriber);
422             }
423              
424 203         404 return;
425 192         267 });
426             }
427              
428             my $rx_never;
429              
430             sub rx_NEVER {
431             return $rx_never //= rx_observable->new(sub {
432 0     0   0 return;
433 0   0 0   0 });
434             }
435              
436 2093     2093   14612 sub rx_observable { "RxPerl::Observable" }
437              
438             sub rx_of {
439 722     722   67565 my (@values) = @_;
440              
441             return rx_observable->new(sub {
442 770     770   1024 my ($subscriber) = @_;
443              
444 770         951 my $i = 0;
445              
446 770         1341 $subscriber->subscription->add(sub { $i = @values });
  770         1625  
447              
448 770         1556 for (; $i < @values; $i++) {
449 891 50       2257 $subscriber->{next}->($values[$i]) if defined $subscriber->{next};
450             }
451              
452 770 100       2273 $subscriber->{complete}->() if defined $subscriber->{complete};
453              
454 770         1751 return;
455 722         1100 });
456             }
457              
458             sub _rx_on_error_resume_next_helper {
459 10     10   12 my ($sources, $subscriber, $active) = @_;
460              
461 10 100       19 if (! @$sources) {
462 2 50       8 $subscriber->{complete}->() if defined $subscriber->{complete};
463 2         4 return;
464             }
465              
466 8         12 my $source = shift @$sources;
467 8         17 my $own_subscription = RxPerl::Subscription->new;
468             my $own_subscriber = {
469             new_subscription => $own_subscription,
470             next => $subscriber->{next},
471             error => sub {
472 6     6   15 _rx_on_error_resume_next_helper($sources, $subscriber, $active);
473             },
474             complete => sub {
475 2     2   6 _rx_on_error_resume_next_helper($sources, $subscriber, $active);
476             },
477 8         30 };
478 8         14 @$active = ($own_subscription);
479 8         16 $source->subscribe($own_subscriber);
480             }
481              
482             sub rx_on_error_resume_next {
483 2     2   4 my @sources = @_;
484              
485             return rx_observable->new(sub {
486 2     2   4 my ($subscriber) = @_;
487              
488 2         4 my @sources = @sources;
489              
490 2         2 my @active;
491             $subscriber->subscription->add(
492 2         5 \@active, sub { undef @sources },
493 2         5 );
494              
495 2         8 _rx_on_error_resume_next_helper(\@sources, $subscriber, \@active);
496              
497 2         3 return;
498 2         4 });
499             }
500              
501             sub rx_partition {
502 2     2   14 my ($source, $predicate) = @_;
503              
504 2         7 my $o1 = $source->pipe(
505             RxPerl::Operators::Pipeable::op_filter($predicate),
506             );
507              
508 2         4 my $i = -1;
509             my $o2 = $source->pipe(
510             RxPerl::Operators::Pipeable::op_filter(sub {
511 20     20   27 $i++;
512 20         36 return not $predicate->($_[0], $i);
513 2         7 }),
514             );
515              
516 2         11 return ($o1, $o2);
517             }
518              
519             sub rx_race {
520 2     2   9 my (@sources) = @_;
521              
522             return rx_observable->new(sub {
523 2     2   4 my ($subscriber) = @_;
524             # TODO: experiment in the end with passing a second parameter here, an arrayref, called \@early_return_values
525             # TODO: like: my ($subscriber, $early_return_values) = @_; and then push @$early_return_values, sub {...};
526              
527 2         5 my @sources = @sources;
528              
529 2         4 my @own_subscriptions;
530 2         6 $subscriber->subscription->add(\@own_subscriptions);
531              
532 2         9 for (my $i = 0; $i < @sources; $i++) {
533 6         11 my $source = $sources[$i];
534              
535 6         13 my $own_subscription = RxPerl::Subscription->new;
536 6         11 push @own_subscriptions, $own_subscription;
537 6         11 my $own_subscriber = {
538             new_subscription => $own_subscription,
539             };
540              
541 6         11 foreach my $type (qw/ next error complete /) {
542             $own_subscriber->{$type} = sub {
543 9         35 $_->unsubscribe foreach grep $_ ne $own_subscription, @own_subscriptions;
544 9         16 @own_subscriptions = ($own_subscription);
545 9         11 @sources = ();
546 9 50       34 $subscriber->{$type}->(@_) if defined $subscriber->{$type};
547 9         42 @$own_subscriber{qw/ next error complete /} = @$subscriber{qw/ next error complete /};
548 18         70 };
549             }
550              
551 6         16 $source->subscribe($own_subscriber);
552             }
553              
554             # this could be replaced with a 'return undef' at this point
555 2         6 return \@own_subscriptions;
556 2         5 });
557             }
558              
559             sub rx_range {
560 1     1   2235 my ($start, $count) = @_;
561              
562             return rx_observable->new(sub {
563 1     1   3 my ($subscriber) = @_;
564              
565 1         1 my $i = $start;
566              
567 1         3 $subscriber->subscription->add(sub { $i = $start + $count });
  1         3  
568              
569 1         4 for (; $i < $start + $count; $i++) {
570 7 50       17 $subscriber->{next}->($i) if defined $subscriber->{next};
571             }
572              
573 1 50       4 $subscriber->{complete}->() if defined $subscriber->{complete};
574              
575 1         3 return;
576 1         3 });
577             }
578              
579 1     1   1861 sub rx_replay_subject { "RxPerl::ReplaySubject" }
580              
581 23     23   5063 sub rx_subject { "RxPerl::Subject" }
582              
583             sub rx_throw_error {
584 28     28   1806 my ($error) = @_;
585              
586             return rx_observable->new(sub {
587 33     33   50 my ($subscriber) = @_;
588              
589 33 50       111 $subscriber->{error}->($error) if defined $subscriber->{error};
590              
591 33         63 return;
592 28         43 });
593             }
594              
595             sub rx_timer {
596 91     91   20205 my ($after, $period) = @_;
597              
598 91         208 my ($timer_sub, $cancel_timer_sub) = get_timer_subs;
599 91         215 my ($interval_sub, $cancel_interval_sub) = get_interval_subs;
600              
601             return rx_observable->new(sub {
602 99     99   152 my ($subscriber) = @_;
603              
604 99         125 my $counter = 0;
605 99         118 my $timer_int;
606             my $timer = $timer_sub->($after, sub {
607 84 100       214 $subscriber->{next}->($counter++) if defined $subscriber->{next};
608 84 100       160 if (defined $period) {
609             $timer_int = $interval_sub->($period, sub {
610 14 50       46 $subscriber->{next}->($counter++) if defined $subscriber->{next};
611 3         15 });
612             } else {
613 81 100       227 $subscriber->{complete}->() if defined $subscriber->{complete};
614             }
615 99         362 });
616              
617             return sub {
618 99         247 $cancel_timer_sub->($timer);
619 99         186 $cancel_interval_sub->($timer_int);
620 99         390 };
621 91         188 });
622             }
623              
624             sub rx_zip {
625 5     5   10 my @sources = @_;
626              
627             return rx_observable->new(sub {
628 5     5   9 my ($subscriber) = @_;
629              
630             my @sources_metadata = map {
631 5         8 +{
632 13         28 buffer => [],
633             completed => 0,
634             };
635             } @sources;
636 5         15 my @own_subscriptions = map RxPerl::Subscription->new, @sources;
637              
638 5         14 $subscriber->subscription->add(\@own_subscriptions);
639              
640 5         15 for my $i (0 .. (@sources - 1)) {
641             my $own_subscriber = {
642             new_subscription => $own_subscriptions[$i],
643             next => sub {
644 48         58 my ($v) = @_;
645              
646             # push to buffer
647 48         51 push @{$sources_metadata[$i]{buffer}}, $v;
  48         75  
648              
649             # if all buffers have elements in them:
650 48 100       169 if (!first {!@{$_->{buffer}}} @sources_metadata) {
  112         121  
  112         218  
651 17         40 my @next = map {shift @$_} map $_->{buffer}, @sources_metadata;
  43         64  
652 17 50       57 $subscriber->{next}->(\@next) if defined $subscriber->{next};
653 17 100       67 if (first {!@{$_->{buffer}} and $_->{completed}} @sources_metadata) {
  37 100       47  
  37         132  
654 4 50       15 $subscriber->{complete}->() if defined $subscriber->{complete};
655             }
656             }
657             },
658             error => sub {
659 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
660             },
661             complete => sub {
662 6         11 $sources_metadata[$i]{completed} = 1;
663 6 50       7 if (!@{$sources_metadata[$i]{buffer}}) {
  6         16  
664 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
665             }
666             },
667 13         75 };
668              
669 13         31 $sources[$i]->subscribe($own_subscriber);
670             }
671              
672 5         13 return;
673 5         8 });
674             }
675              
676             1;