File Coverage

blib/lib/RxPerl/Operators/Pipeable.pm
Criterion Covered Total %
statement 811 1110 73.0
branch 205 446 45.9
condition 54 94 57.4
subroutine 120 168 71.4
pod n/a
total 1190 1818 65.4


line stmt bran cond sub pod time code
1             package RxPerl::Operators::Pipeable;
2 5     5   29 use strict;
  5         9  
  5         118  
3 5     5   22 use warnings;
  5         9  
  5         155  
4              
5 5         414 use RxPerl::Operators::Creation qw/
6             rx_observable rx_subject rx_concat rx_of rx_interval rx_combine_latest rx_concat
7             rx_throw_error rx_zip rx_merge rx_on_error_resume_next rx_race rx_timer
8             rx_behavior_subject
9 5     5   25 /;
  5         24  
10 5     5   1625 use RxPerl::ConnectableObservable;
  5         11  
  5         119  
11 5     5   25 use RxPerl::Utils qw/ get_timer_subs /;
  5         8  
  5         194  
12 5     5   21 use RxPerl::Subscription;
  5         17  
  5         90  
13              
14 5     5   21 use Carp 'croak';
  5         8  
  5         171  
15 5     5   20 use Scalar::Util 'reftype', 'refaddr', 'blessed', 'weaken';
  5         9  
  5         229  
16 5     5   26 use Time::HiRes ();
  5         5  
  5         96  
17              
18 5     5   20 use Exporter 'import';
  5         13  
  5         55164  
19             our @EXPORT_OK = qw/
20             op_audit op_audit_time op_buffer op_buffer_count op_buffer_time op_catch_error op_combine_latest_with op_concat_all
21             op_concat_map op_concat_with op_count op_debounce op_debounce_time op_default_if_empty op_delay op_delay_when
22             op_distinct op_distinct_until_changed op_distinct_until_key_changed op_element_at op_end_with op_every
23             op_exhaust_all op_exhaust_map op_filter op_finalize op_find op_find_index op_first op_group_by op_ignore_elements
24             op_is_empty op_last op_map op_map_to op_max op_merge_all op_merge_map op_merge_with op_min op_multicast
25             op_on_error_resume_next_with op_pairwise op_pluck op_race_with op_reduce op_ref_count op_repeat op_retry op_sample
26             op_sample_time op_scan op_share op_single op_skip op_skip_last op_skip_until op_skip_while op_start_with
27             op_switch_all op_switch_map op_take op_take_last op_take_until op_take_while op_tap op_throttle op_throttle_time
28             op_throw_if_empty op_time_interval op_timeout op_timestamp op_to_array op_with_latest_from op_zip_with
29             /;
30             our %EXPORT_TAGS = (all => \@EXPORT_OK);
31              
32             our $VERSION = "v6.27.1";
33              
34             sub op_audit {
35 3     3   4 my ($duration_selector) = @_;
36              
37             return sub {
38 3     3   4 my ($source) = @_;
39              
40             return rx_observable->new(sub {
41 3         5 my ($subscriber) = @_;
42              
43 3         6 my $last_val;
44             my $mini_subscription;
45 3         0 my $main_is_complete;
46              
47             my $mini_subscriber = {
48             next => sub {
49 5 100       16 $subscriber->{next}->($last_val) if defined $subscriber->{next};
50             },
51             error => sub {
52 1 50       5 $subscriber->{error}->(@_) if defined $subscriber->{error};
53             },
54             complete => sub {
55 5         7 undef $mini_subscription;
56 5         6 undef $last_val;
57 5 100       11 if ($main_is_complete) {
58 1 50       5 $subscriber->{complete}->() if defined $subscriber->{complete};
59             }
60             },
61 3         16 };
62              
63             my $own_subscriber = {
64             next => sub {
65 11         19 my ($v) = @_;
66              
67 11         14 $last_val = $v;
68 11 100       22 if (!defined $mini_subscription) {
69 6         7 my $o = do { local $_ = $v; $duration_selector->($v) };
  6         8  
  6         21  
70 6         14 $mini_subscription = $o->pipe(
71             op_take(1),
72             )->subscribe($mini_subscriber);
73             }
74             },
75             error => sub {
76 1 50       4 $subscriber->{error}->(@_) if defined $subscriber->{error};
77             },
78             complete => sub {
79 1         3 $main_is_complete = 1;
80 1 50       4 if (! defined $mini_subscription) {
81 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
82             }
83             }
84 3         15 };
85              
86 3         8 my $own_subscription = $source->subscribe($own_subscriber);
87              
88 3         9 return $mini_subscription, $own_subscription;
89 3         6 });
90 3         12 };
91             }
92              
93             sub op_audit_time {
94 0     0   0 my ($duration) = @_;
95              
96 0     0   0 return op_audit(sub { rx_timer($duration) });
  0         0  
97             }
98              
99             sub op_buffer {
100 2     2   7 my ($notifier) = @_;
101              
102             return sub {
103 2     2   4 my ($source) = @_;
104              
105             return rx_observable->new(sub {
106 2         3 my ($subscriber) = @_;
107              
108 2         3 my @buffer;
109              
110             my $own_subscriber = {
111             %$subscriber,
112             next => sub {
113 16         30 push @buffer, $_[0];
114             },
115             error => sub {
116 0 0       0 $subscriber->{error}->($_[0]) if defined $subscriber->{error};
117             },
118             complete => sub {
119 2 50 33     13 $subscriber->{next}->([@buffer]) if @buffer and defined $subscriber->{next};
120 2         4 undef @buffer;
121 2 50       8 $subscriber->{complete}->() if defined $subscriber->{complete};
122             },
123 2         14 };
124              
125             my $notifier_subscriber = {
126             next => sub {
127 5 50       22 $subscriber->{next}->([@buffer]) if defined $subscriber->{next};
128 5         13 undef @buffer;
129             },
130             error => sub {
131 0 0       0 $subscriber->{error}->($_[0]) if defined $subscriber->{error};
132             },
133 2         10 };
134              
135 2         6 my $s1 = $source->subscribe($own_subscriber);
136 2         6 my $s2 = $notifier->subscribe($notifier_subscriber);
137              
138 2         9 return [$s1, $s2], sub { undef @buffer };
  2         4  
139             })
140 2         5 }
141 2         9 }
142              
143             sub op_buffer_count {
144 3     3   9 my ($buffer_size, $start_buffer_every) = @_;
145              
146 3   66     12 $start_buffer_every //= $buffer_size;
147              
148             return sub {
149 3     3   4 my ($source) = @_;
150              
151             return rx_observable->new(sub {
152 3         6 my ($subscriber) = @_;
153              
154 3         5 my @buffers;
155 3         5 my $count = 0;
156             my $own_subscriber = {
157             %$subscriber,
158             next => sub {
159 18         27 my ($value) = @_;
160              
161 18 100       37 if ($count++ % $start_buffer_every == 0) {
162 13         20 push @buffers, [];
163             }
164              
165 18         38 for (my $i = 0; $i < @buffers; $i++) {
166 25         34 my $buffer = $buffers[$i];
167              
168 25         35 push @$buffer, $value;
169              
170 25 100       53 if (@$buffer == $buffer_size) {
171 10 50       34 $subscriber->{next}->($buffer) if defined $subscriber->{next};
172 10         16 splice @buffers, $i, 1;
173 10         13 $i--;
174 10         26 next;
175             }
176             }
177             },
178             complete => sub {
179 3 50       9 if (defined $subscriber->{next}) {
180 3         10 $subscriber->{next}->($_) foreach @buffers;
181             }
182              
183 3 50       10 $subscriber->{complete}->() if defined $subscriber->{complete};
184             },
185 3         21 };
186              
187 3         9 $source->subscribe($own_subscriber);
188              
189 3         7 return;
190 3         8 });
191 3         14 };
192             }
193              
194             sub op_buffer_time {
195 1     1   6 my ($buffer_time_span) = @_;
196              
197 1         4 return op_buffer(rx_interval($buffer_time_span));
198             }
199              
200             sub _op_catch_error_helper {
201 0     0   0 my ($source, $selector, $subscriber, $dependents, $error) = @_;
202              
203 0         0 my $new_observable;
204 0 0       0 if (@_ == 4) {
205 0         0 $new_observable = $source;
206             } else {
207 0         0 eval { $new_observable = $selector->($error, $source) };
  0         0  
208 0 0       0 if (my $e = $@) {
209 0 0       0 $subscriber->{error}->($e) if defined $subscriber->{error};
210 0         0 return;
211             }
212             }
213              
214 0         0 my $own_subscription = RxPerl::Subscription->new;
215 0         0 @$dependents = ($own_subscription);
216             my $own_subscriber = {
217             new_subscription => $own_subscription,
218             next => $subscriber->{next},
219             error => sub {
220 0     0   0 _op_catch_error_helper($source, $selector, $subscriber, $dependents, $_[0]);
221             },
222             complete => $subscriber->{complete},
223 0         0 };
224              
225 0         0 $new_observable->subscribe($own_subscriber);
226             }
227              
228             sub op_catch_error {
229 0     0   0 my ($selector) = @_;
230              
231             return sub {
232 0     0   0 my ($source) = @_;
233              
234             return rx_observable->new(sub {
235 0         0 my ($subscriber) = @_;
236              
237 0         0 my $dependents = [];
238 0         0 $subscriber->subscription->add($dependents);
239              
240 0         0 _op_catch_error_helper(
241             $source, $selector, $subscriber, $dependents,
242             );
243              
244 0         0 return;
245 0         0 });
246 0         0 };
247             }
248              
249             sub op_combine_latest_with {
250 1     1   14 my (@other_observables) = @_;
251              
252             return sub {
253 1     1   2 my $source = shift;
254              
255 1         3 return rx_combine_latest([$source, @other_observables]);
256             }
257 1         6 }
258              
259             sub op_concat_all {
260 1     1   4 return op_merge_all(1);
261             }
262              
263             sub op_concat_map {
264 0     0   0 my ($observable_factory) = @_;
265              
266             return sub {
267 0     0   0 my ($source) = @_;
268              
269 0         0 return $source->pipe(
270             op_map($observable_factory),
271             op_concat_all(),
272             );
273 0         0 };
274             }
275              
276             sub op_concat_with {
277 3     3   7 my @other_observables = @_;
278              
279             return sub {
280 3     3   7 my ($source) = @_;
281              
282 3         9 return rx_concat(
283             $source,
284             @other_observables,
285             );
286 3         14 };
287             }
288              
289             sub op_count {
290 3     3   7 my ($predicate) = @_;
291              
292             return sub {
293 3     3   7 my ($source) = @_;
294              
295             return rx_observable->new(sub {
296 3         5 my ($subscriber) = @_;
297              
298 3         3 my $count = 0;
299 3         27 my $idx = 0;
300              
301             my $own_subscriber = {
302             %$subscriber,
303             next => sub {
304 14         18 my ($v) = @_;
305 14         19 local $_ = $v;
306 14 100 66     34 if (!$predicate or $predicate->($v, $idx++)) {
307 7         31 $count++;
308             }
309             },
310             complete => sub {
311 3 50       11 $subscriber->{next}->($count) if defined $subscriber->{next};
312 3 50       11 $subscriber->{complete}->() if defined $subscriber->{complete};
313             },
314 3         20 };
315              
316 3         12 $source->subscribe($own_subscriber);
317              
318 3         11 return;
319 3         5 });
320 3         11 };
321             }
322              
323             sub op_debounce {
324 2     2   5 my ($duration_selector) = @_;
325              
326             return sub {
327 2     2   5 my ($source) = @_;
328              
329             return rx_observable->new(sub {
330 2         4 my ($subscriber) = @_;
331              
332 2         6 my $mini_subscription;
333             my $last_val;
334 2         0 my $has_last_val;
335              
336             my $mini_subscriber = {
337             next => sub {
338 5 50       18 $subscriber->{next}->($last_val) if defined $subscriber->{next};
339 5         6 undef $has_last_val;
340             },
341             error => sub {
342 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
343             },
344             complete => sub {
345 5         8 undef $mini_subscription;
346             },
347 2         11 };
348              
349             my $own_subscriber = {
350             next => sub {
351 12         19 my ($v) = @_;
352              
353 12 100       21 if (defined $mini_subscription) {
354 6         13 $mini_subscription->unsubscribe();
355             }
356              
357 12         19 $last_val = $v;
358 12         14 $has_last_val = 1;
359              
360 12         13 my $o = do { local $_ = $v; $duration_selector->($v) };
  12         17  
  12         19  
361 12         25 $mini_subscription = $o->pipe(
362             op_take(1),
363             )->subscribe($mini_subscriber);
364             },
365             error => sub {
366 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
367             },
368             complete => sub {
369 2 100       6 if ($has_last_val) {
370 1 50       5 $subscriber->{next}->($last_val) if defined $subscriber->{next};
371             }
372 2 50       6 $subscriber->{complete}->() if defined $subscriber->{complete};
373             }
374 2         12 };
375              
376 2         6 my $main_subscription = $source->subscribe($own_subscriber);
377 2         6 $main_subscription->add(\$mini_subscription);
378              
379 2         5 return $main_subscription, $mini_subscription;
380 2         4 });
381 2         11 };
382             }
383              
384             sub op_debounce_time {
385 2     2   4 my ($due_time) = @_;
386              
387 2     12   8 return op_debounce(sub { rx_timer($due_time) });
  12         24  
388             }
389              
390             sub op_default_if_empty {
391 7     7   13 my ($default_value) = @_;
392              
393             return sub {
394 7     7   13 my ($source) = @_;
395              
396             return rx_observable->new(sub {
397 7         12 my ($subscriber) = @_;
398              
399 7         10 my $source_emitted = 0;
400              
401             my $own_subscriber = {
402             %$subscriber,
403             next => sub {
404 3         5 $source_emitted = 1;
405 3 50       12 $subscriber->{next}->(@_) if exists $subscriber->{next};
406             },
407             complete => sub {
408 7 50 66     29 $subscriber->{next}->($default_value) if ! $source_emitted and exists $subscriber->{next};
409 7 50       20 $subscriber->{complete}->() if exists $subscriber->{complete};
410             },
411 7         37 };
412              
413 7         21 $source->subscribe($own_subscriber);
414              
415 7         23 return;
416 7         14 });
417 7         29 };
418             }
419              
420             sub op_delay {
421 674     674   964 my ($delay) = @_;
422              
423 674         1168 my ($timer_sub, $cancel_timer_sub) = get_timer_subs;
424              
425             return sub {
426 674     674   848 my ($source) = @_;
427              
428             return rx_observable->new(sub {
429 724         970 my ($subscriber) = @_;
430              
431 724         1272 my %timers;
432             my $queue;
433 724         0 my $completed;
434             my $own_subscriber = {
435             error => sub {
436 1         2 my @value = @_;
437 1 50       4 $subscriber->{error}->(@value) if defined $subscriber->{error};
438             },
439             next => sub {
440 730         1212 my @value = @_;
441              
442 730 100       1285 if (!defined $queue) {
443 729         993 $queue = [];
444 729         925 my ($timer1, $timer2);
445             $timer1 = $timer_sub->(0, sub {
446 729         964 delete $timers{$timer1};
447 729         1078 my @queue_copy = @$queue;
448 729         996 undef $queue;
449             $timer2 = $timer_sub->($delay, sub {
450 694         910 delete $timers{$timer2};
451 694         935 foreach my $item (@queue_copy) {
452 695 100       1640 $subscriber->{next}->(@$item) if defined $subscriber->{next};
453             }
454 694 100 100     1935 if ($completed and ! %timers) {
455 689 100       1607 $subscriber->{complete}->() if defined $subscriber->{complete};
456             }
457 729         2373 });
458 729         1686 $timers{$timer2} = $timer2;
459 729         2560 });
460 729         1341 $timers{$timer1} = $timer1;
461             }
462 730         2094 push @$queue, \@value;
463             },
464             complete => sub {
465 723         810 $completed = 1;
466 723 100       1297 if (! %timers) {
467 1 50       4 $subscriber->{complete}->() if defined $subscriber->{complete};
468             }
469             },
470 724         3648 };
471              
472             return [
473             $source->subscribe($own_subscriber),
474             sub {
475 724         1397 $cancel_timer_sub->($_) foreach values %timers;
476 724         1327 %timers = ();
477             },
478 724         1541 ];
479 674         1092 });
480 674         2477 };
481             }
482              
483             sub op_delay_when {
484 1     1   3 my ($delay_duration_selector) = @_;
485              
486             return sub {
487 1     1   3 my ($source) = @_;
488              
489             return rx_observable->new(sub {
490 1         4 my ($subscriber) = @_;
491              
492 1         2 my $idx = 0;
493 1         3 my %mini_subscriptions;
494             my $main_finished;
495              
496             my $make_mini_subscriber = sub {
497 3         6 my ($v) = @_;
498              
499 3         6 my $mini_subscription = RxPerl::Subscription->new;
500 3         7 $mini_subscriptions{$mini_subscription} = $mini_subscription;
501              
502             return {
503             new_subscription => $mini_subscription,
504             next => sub {
505 3 50       9 $subscriber->{next}->($v) if defined $subscriber->{next};
506             },
507             error => sub {
508 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
509             },
510             complete => sub {
511 3         7 delete $mini_subscriptions{$mini_subscription};
512 3 100 66     12 if ($main_finished and ! %mini_subscriptions) {
513 1 50       5 $subscriber->{complete}->() if defined $subscriber->{complete};
514             }
515             },
516 3         20 };
517 1         4 };
518              
519 1         3 my $own_subscription = RxPerl::Subscription->new;
520             my $own_subscriber = {
521             new_subscription => $own_subscription,
522             next => sub {
523 3         3 my ($v) = @_;
524              
525 3         6 local $_ = $v;
526 3         7 my $mini_obs = $delay_duration_selector->($v, $idx++);
527 3         6 $mini_obs->subscribe($make_mini_subscriber->($v));
528             },
529             error => sub {
530 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
531             },
532             complete => sub {
533 1         2 $main_finished = 1;
534 1 50       4 if (!%mini_subscriptions) {
535 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
536             }
537             },
538 1         7 };
539              
540 1         3 $subscriber->subscription->add($own_subscription, \%mini_subscriptions);
541              
542 1         4 $source->subscribe($own_subscriber);
543              
544 1         7 return;
545 1         3 });
546 1         6 };
547             }
548              
549             sub op_distinct {
550 2     2   4 my ($key_selector) = @_;
551              
552             return sub {
553 2     2   3 my ($source) = @_;
554              
555             return rx_observable->new(sub {
556 2         4 my ($subscriber) = @_;
557              
558 2         4 my %keys_passed;
559 2         4 $subscriber->subscription->add(sub { %keys_passed = () });
  2         6  
560              
561             my $own_subscriber = {
562             %$subscriber,
563             next => sub {
564 15         19 my ($v) = @_;
565              
566 15         16 my $k;
567 15 100       19 if ($key_selector) {
568 3         4 my $ok = eval { local $_ = $v; $k = $key_selector->($v); 1 };
  3         4  
  3         5  
  3         9  
569 3 50       6 if (! $ok) {
570 0 0       0 $subscriber->{error}->($@) if defined $subscriber->{error};
571 0         0 return;
572             }
573             } else {
574 12         13 $k = $v;
575             }
576 15 100       35 if (! exists $keys_passed{$k}) {
577 6         8 $keys_passed{$k} = 1;
578 6 50       16 $subscriber->{next}->($v) if defined $subscriber->{next};
579             }
580             },
581 2         10 };
582              
583 2         7 $source->subscribe($own_subscriber);
584              
585 2         16 return;
586 2         6 });
587 2         9 };
588             }
589              
590             sub op_distinct_until_changed {
591 4     4   9 my ($comparison_function) = @_;
592              
593 4   100     14 $comparison_function //= \&_eqq;
594              
595             return sub {
596 4     4   7 my ($source) = @_;
597              
598             return rx_observable->new(sub {
599 4         6 my ($subscriber) = @_;
600              
601 4         5 my $prev_value;
602 4         5 my $have_prev_value = 0;
603              
604             my $own_subscriber = {
605             %$subscriber,
606             next => sub {
607 27         39 my @value = @_;
608              
609 27 100 100     55 if (! $have_prev_value or ! $comparison_function->($prev_value, $value[0])) {
610 15 50       44 $subscriber->{next}->(@value) if defined $subscriber->{next};
611 15         22 $have_prev_value = 1;
612 15         31 $prev_value = $value[0];
613             }
614             },
615 4         19 };
616              
617 4         13 $source->subscribe($own_subscriber);
618              
619 4         19 return;
620 4         7 });
621 4         17 };
622             }
623              
624             sub op_distinct_until_key_changed {
625 1     1   3 my ($key) = @_;
626              
627             return op_distinct_until_changed(sub {
628 5     5   11 _eqq($_[0]->{$key}, $_[1]->{$key});
629 1         4 }),
630             }
631              
632             sub op_element_at {
633 2     2   5 my ($index, $default) = @_;
634 2         4 my $has_default = @_ >= 2;
635 2         4 $index = int $index;
636              
637             return sub {
638 2     2   3 my ($source) = @_;
639              
640 2 50       6 $index >= 0 or return rx_throw_error('ArgumentOutOfRangeError');
641              
642             return rx_observable->new(sub {
643 2         3 my ($subscriber) = @_;
644              
645 2         4 my $i = 0;
646             my $own_subscriber = {
647             %$subscriber,
648             next => sub {
649 6 100       19 if ($i++ == $index) {
650 1 50       7 $subscriber->{next}->(@_) if defined $subscriber->{next};
651 1 50       4 $subscriber->{complete}->() if defined $subscriber->{complete};
652             }
653             },
654             complete => sub {
655 1 50       3 if ($has_default) {
656 1 50       5 $subscriber->{next}->($default) if defined $subscriber->{next};
657 1 50       4 $subscriber->{complete}->() if defined $subscriber->{complete};
658             } else {
659 0 0       0 $subscriber->{error}->('ArgumentOutOfRangeError') if defined $subscriber->{error};
660             }
661             },
662 2         12 };
663              
664 2         7 $source->subscribe($own_subscriber);
665              
666 2         5 return;
667 2         6 });
668 2         8 };
669             }
670              
671             sub op_end_with {
672 0     0   0 my (@values) = @_;
673              
674             return sub {
675 0     0   0 my ($source) = @_;
676              
677 0         0 return rx_concat(
678             $source,
679             rx_of(@values),
680             );
681             }
682 0         0 }
683              
684             sub op_every {
685 2     2   3 my ($predicate) = @_;
686              
687             return sub {
688 2     2   3 my ($source) = @_;
689              
690             return rx_observable->new(sub {
691 2         3 my ($subscriber) = @_;
692              
693 2         3 my $idx = 0;
694             my $own_subscriber = {
695             %$subscriber,
696             next => sub {
697 8         11 my ($v) = @_;
698 8         10 local $_ = $v;
699 8 100       17 if (! $predicate->($v, $idx++)) {
700 1 50       9 $subscriber->{next}->(0) if defined $subscriber->{next};
701 1 50       5 $subscriber->{complete}->() if defined $subscriber->{complete};
702             }
703             },
704             complete => sub {
705 1 50       5 $subscriber->{next}->(1) if defined $subscriber->{next};
706 1 50       4 $subscriber->{complete}->() if defined $subscriber->{complete};
707             },
708 2         12 };
709              
710 2         6 $source->subscribe($own_subscriber);
711              
712 2         4 return;
713 2         3 });
714 2         9 };
715             }
716              
717             sub op_exhaust_all {
718             return sub {
719 1     1   2 my ($source) = @_;
720              
721             return rx_observable->new(sub {
722 1         2 my ($subscriber) = @_;
723              
724 1         2 my $active_subscription;
725             my $big_completed;
726 1         3 my $own_subscription = RxPerl::Subscription->new;
727              
728 1         4 $subscriber->subscription->add(
729             \$active_subscription,
730             $own_subscription,
731             );
732              
733             my $own_subscriber = {
734             new_subscription => $own_subscription,
735             next => sub {
736 6         8 my ($new_obs) = @_;
737              
738 6 100       40 !$active_subscription or return;
739 3         8 $active_subscription = RxPerl::Subscription->new;
740             my $small_subscriber = {
741             new_subscription => $active_subscription,
742             next => sub {
743 12 50       26 $subscriber->{next}->(@_) if defined $subscriber->{next};
744             },
745             error => sub {
746 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
747             },
748             complete => sub {
749 2         4 undef $active_subscription;
750 2 50 33     6 $subscriber->{complete}->() if $big_completed and defined $subscriber->{complete};
751             },
752 3         15 };
753 3         9 $new_obs->subscribe($small_subscriber);
754             },
755             error => sub {
756 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
757             },
758             complete => sub {
759 0         0 $big_completed = 1;
760 0 0 0     0 $subscriber->{complete}->() if !$active_subscription and defined $subscriber->{complete};
761             },
762 1         10 };
763              
764 1         3 $source->subscribe($own_subscriber);
765              
766 1         2 return;
767 1         3 });
768 1     1   5 };
769             }
770              
771             sub op_exhaust_map {
772 0     0   0 my ($observable_factory) = @_;
773              
774             return sub {
775 0     0   0 my ($source) = @_;
776              
777 0         0 return $source->pipe(
778             op_map($observable_factory),
779             op_exhaust_all(),
780             );
781 0         0 };
782             }
783              
784             sub op_filter {
785 9     9   19 my ($filtering_sub) = @_;
786              
787             return sub {
788 9     9   15 my ($source) = @_;
789              
790             return rx_observable->new(sub {
791 9         16 my ($subscriber) = @_;
792              
793 9         28 my $own_subscriber = { %$subscriber };
794 9         17 my $idx = 0;
795             $own_subscriber->{next} = sub {
796 75         109 my ($value) = @_;
797 75         117 my $passes = eval {
798 75         101 local $_ = $value;
799 75         123 $filtering_sub->($value, $idx++);
800             };
801 75 50       283 if (my $error = $@) {
802 0         0 $subscriber->{error}->($error);
803             } else {
804 75 100 66     249 $subscriber->{next}->(@_) if $passes and defined $subscriber->{next};
805             }
806 9         30 };
807              
808 9         26 $source->subscribe($own_subscriber);
809              
810 9         21 return;
811 9         17 });
812 9         33 };
813             }
814              
815             sub op_finalize {
816 8     8   13 my ($fn) = @_;
817              
818             return sub {
819 8     8   11 my ($source) = @_;
820              
821             return rx_observable->new(sub {
822 8         11 my ($subscriber) = @_;
823              
824 8   100     20 my $arr = $subscriber->{_subscription}{_finalize_cbs} //= [];
825 8         13 unshift @$arr, $fn;
826 8         18 $subscriber->{_subscription}->add( $arr );
827              
828 8         19 $source->subscribe($subscriber);
829              
830 8         12 return;
831 8         15 });
832 8         33 };
833             }
834              
835             sub op_find {
836 2     2   4 my ($predicate) = @_;
837              
838             return sub {
839 2     2   5 my ($source) = @_;
840              
841 2 50       6 $predicate or return rx_throw_error('missing predicate in op_find');
842              
843 2         5 return $source->pipe(
844             op_first($predicate),
845             op_default_if_empty(undef),
846             );
847 2         9 };
848             }
849              
850             sub op_find_index {
851 2     2   3 my ($predicate) = @_;
852              
853             return sub {
854 2     2   3 my ($source) = @_;
855              
856 2 50       6 $predicate or return rx_throw_error('missing predicate in op_find_index');
857              
858             return rx_observable->new(sub {
859 2         4 my ($subscriber) = @_;
860              
861 2         3 my $idx = 0;
862             my $own_subscriber = {
863             %$subscriber,
864             next => sub {
865 13         19 my ($val) = @_;
866 13         14 my $truth;
867 13         40 my $ok = eval {
868 13         16 local $_ = $val;
869 13         24 $truth = $predicate->($val, $idx++);
870 13         38 1
871             };
872 13 50       21 if (!$ok) {
873 0 0       0 $subscriber->{error}->($@) if defined $subscriber->{error};
874             }
875 13 100       34 if ($truth) {
876 1 50       5 $subscriber->{next}->($idx - 1) if defined $subscriber->{next};
877 1 50       5 $subscriber->{complete}->() if defined $subscriber->{complete};
878             }
879             },
880             complete => sub {
881 1 50       5 $subscriber->{next}->(-1) if defined $subscriber->{next};
882 1 50       5 $subscriber->{complete}->() if defined $subscriber->{complete};
883             },
884 2         12 };
885              
886 2         6 $source->subscribe($own_subscriber);
887              
888 2         5 return;
889 2         5 });
890 2         9 };
891             }
892              
893             sub op_first {
894 7     7   14 my ($condition) = @_;
895              
896             return sub {
897 7     7   13 my ($source) = @_;
898              
899 7         17 my @pipes = (op_take(1));
900 7 100       21 unshift @pipes, op_filter($condition) if defined $condition;
901              
902 7         21 return $source->pipe(@pipes);
903 7         32 };
904             }
905              
906             sub op_group_by {
907 1     1   3 my ($key_fn) = @_;
908              
909             return sub {
910 1     1   3 my ($source) = @_;
911              
912             return rx_observable->new(sub {
913 1         3 my ($subscriber) = @_;
914              
915 1         3 my %observables;
916             my @observables;
917 1         0 my $stop_producing_observables;
918              
919             $subscriber->subscription->add(sub {
920 1         3 $stop_producing_observables = 1;
921 1         2 });
922              
923             my $own_subscriber = {
924             # %$subscriber,
925             next => sub {
926 5         9 my ($v) = @_;
927              
928 5         6 my $key = do { local $_ = $v; $key_fn->($v); };
  5         5  
  5         10  
929 5 50 66     26 $observables{$key} //= do {
930 2         6 my $new_obs = rx_subject->new;
931 2         4 push @observables, $new_obs;
932 2         14 $subscriber->{next}->($new_obs);
933 2         6 $new_obs;
934             } unless $stop_producing_observables;
935 5 50 33     24 $observables{$key}{next}->($v) if exists $observables{$key} and defined $observables{$key}{next};
936             },
937             error => sub {
938 0 0       0 $subscriber->{error}->() if defined $subscriber->{error};
939             },
940             complete => sub {
941 1         11 for my $val (@observables) {
942 2 50       15 $val->{complete}->() if defined $val->{complete};
943             }
944 1 50       11 $subscriber->{complete}->() if defined $subscriber->{complete};
945 1         3 undef @observables;
946 1         2 %observables = ();
947             },
948 1         7 };
949              
950 1         4 $source->subscribe($own_subscriber);
951              
952 1         2 return;
953 1         3 });
954 1         7 };
955             }
956              
957             sub op_ignore_elements {
958             return sub {
959 68     68   100 my ($source) = @_;
960              
961             return rx_observable->new(sub {
962 76         109 my ($subscriber) = @_;
963              
964 76         273 my %own_subscriber = %$subscriber;
965 76         141 delete $own_subscriber{next};
966              
967 76         184 $source->subscribe(\%own_subscriber);
968              
969 76         155 return;
970 68         111 });
971             }
972 68     68   241 }
973              
974             sub op_is_empty {
975             return sub {
976 2     2   3 my ($source) = @_;
977              
978 2         6 return $source->pipe(
979             op_first(),
980             op_map_to(0),
981             op_default_if_empty(1),
982             );
983 2     2   8 };
984             }
985              
986             sub op_last {
987 3     3   7 my ($predicate, $default) = @_;
988 3         5 my $has_default = @_ >= 2;
989              
990             return sub {
991 3     3   4 my ($source) = @_;
992              
993             return rx_observable->new(sub {
994 3         4 my ($subscriber) = @_;
995              
996 3         4 my $last_val;
997             my $last_val_obtained;
998              
999 3         4 my $idx = 0;
1000             my $own_subscriber = {
1001             %$subscriber,
1002             next => sub {
1003 5         10 my ($v) = @_;
1004              
1005 5 100       9 if ($predicate) {
1006 3         4 my $passes;
1007 3         3 my $ok = eval { local $_ = $v; $passes = $predicate->($v, $idx++); 1 };
  3         5  
  3         6  
  3         13  
1008 3 50       6 $ok or do {
1009 0 0       0 $subscriber->{error}->($@) if defined $subscriber->{error};
1010 0         0 return;
1011             };
1012 3 100       7 if ($passes) {
1013 1         2 $last_val = $v;
1014 1         3 $last_val_obtained = 1;
1015             }
1016             } else {
1017 2         3 $last_val = $v;
1018 2         5 $last_val_obtained = 1;
1019             }
1020             },
1021             complete => sub {
1022 3 100       8 if (! $last_val_obtained) {
1023 1 50       3 if ($has_default) {
1024 1 50       5 $subscriber->{next}->($default) if defined $subscriber->{next};
1025 1 50       5 $subscriber->{complete}->() if defined $subscriber->{complete};
1026             } else {
1027 0         0 $subscriber->{error}->("no last value found");
1028             }
1029             } else {
1030 2 50       9 $subscriber->{next}->($last_val) if defined $subscriber->{next};
1031 2 50       8 $subscriber->{complete}->() if defined $subscriber->{complete};
1032             }
1033             },
1034 3         19 };
1035              
1036 3         9 $source->subscribe($own_subscriber);
1037              
1038 3         6 return;
1039 3         7 });
1040 3         12 };
1041             }
1042              
1043             sub op_map {
1044 28     28   56 my ($mapping_sub) = @_;
1045              
1046             return sub {
1047 28     28   39 my ($source) = @_;
1048              
1049             return rx_observable->new(sub {
1050 19         31 my ($subscriber) = @_;
1051              
1052 19         58 my $own_subscriber = { %$subscriber };
1053 19         30 my $idx = 0;
1054             $own_subscriber->{next} = sub {
1055 96         134 my ($value) = @_;
1056 96         131 my $result = eval {
1057 96         153 local $_ = $value;
1058 96         187 $mapping_sub->($value, $idx++);
1059             };
1060 96 50       305 if (my $error = $@) {
1061 0 0       0 $subscriber->{error}->($error) if defined $subscriber->{error};
1062             } else {
1063 96 50       248 $subscriber->{next}->($result) if defined $subscriber->{next};
1064             }
1065 19         69 };
1066              
1067 19         52 $source->subscribe($own_subscriber);
1068              
1069 19         51 return;
1070 28         60 });
1071 28         109 };
1072             }
1073              
1074             sub op_map_to {
1075 2     2   4 my ($mapping_value) = @_;
1076              
1077             return sub {
1078 2     2   4 my ($source) = @_;
1079              
1080             return rx_observable->new(sub {
1081 2         4 my ($subscriber) = @_;
1082              
1083 2         5 my $own_subscriber = { %$subscriber };
1084             $own_subscriber->{next} &&= sub {
1085 1 50       5 $subscriber->{next}->($mapping_value) if defined $subscriber->{next};
1086 2   50     12 };
1087              
1088 2         5 $source->subscribe($own_subscriber);
1089              
1090 2         5 return;
1091 2         4 });
1092 2         8 };
1093             }
1094              
1095             sub op_max {
1096 0     0   0 my ($comparer) = @_;
1097              
1098             return sub {
1099 0     0   0 my ($source) = @_;
1100              
1101             return rx_observable->new(sub {
1102 0         0 my ($subscriber) = @_;
1103              
1104 0         0 my $curr_max;
1105             my $has_curr_max;
1106              
1107             my $own_subscriber = {
1108             %$subscriber,
1109             next => sub {
1110 0         0 my ($v) = @_;
1111              
1112 0 0       0 if (!$has_curr_max) {
1113 0         0 $curr_max = $v;
1114 0         0 $has_curr_max = 1;
1115             }
1116             else {
1117 0 0       0 if (!$comparer) {
1118 0 0       0 if ($v > $curr_max) {
1119 0         0 $curr_max = $v;
1120             }
1121             }
1122             else {
1123 0 0       0 if ($comparer->($v, $curr_max) > 0) {
1124 0         0 $curr_max = $v;
1125             }
1126             }
1127             }
1128             },
1129             complete => sub {
1130 0 0       0 if ($has_curr_max) {
1131 0 0       0 $subscriber->{next}->($curr_max) if defined $subscriber->{next};
1132             }
1133 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
1134             },
1135 0         0 };
1136              
1137 0         0 $source->subscribe($own_subscriber);
1138              
1139 0         0 return;
1140 0         0 });
1141 0         0 };
1142             }
1143              
1144             sub _op_merge_all_make_subscriber {
1145 13     13   22 my ($small_subscriptions, $subscriber, $stored_observables, $big_completed_ref) = @_;
1146              
1147 13         32 my $small_subscription = RxPerl::Subscription->new;
1148 13         31 $small_subscriptions->{$small_subscription} = $small_subscription;
1149             return {
1150             new_subscription => $small_subscription,
1151             next => sub {
1152 28 50   28   64 $subscriber->{next}->(@_) if defined $subscriber->{next};
1153             },
1154             error => sub {
1155 0 0   0   0 $subscriber->{error}->(@_) if defined $subscriber->{error};
1156             },
1157             complete => sub {
1158 11     11   27 delete $small_subscriptions->{$small_subscription};
1159 11 100 66     45 if (@$stored_observables) {
    100          
1160 8         12 my $new_obs = shift @$stored_observables;
1161 8         20 $new_obs->subscribe(
1162             _op_merge_all_make_subscriber(
1163             $small_subscriptions,
1164             $subscriber,
1165             $stored_observables,
1166             $big_completed_ref,
1167             ),
1168             );
1169             } elsif ($$big_completed_ref and !%$small_subscriptions) {
1170 1 50       4 $subscriber->{complete}->() if defined $subscriber->{complete};
1171             }
1172             },
1173 13         95 };
1174             }
1175              
1176             sub op_merge_all {
1177 4     4   10 my ($concurrent) = @_;
1178              
1179             return sub {
1180 4     4   12 my ($source) = @_;
1181              
1182             return rx_observable->new(sub {
1183 4         9 my ($subscriber) = @_;
1184              
1185 4         11 my @stored_observables;
1186             my %small_subscriptions;
1187 4         0 my $big_completed;
1188              
1189 4         9 my $own_subscription = RxPerl::Subscription->new;
1190 4         12 $subscriber->subscription->add(
1191             $own_subscription,
1192             \%small_subscriptions,
1193             );
1194              
1195             my $own_subscriber = {
1196             new_subscription => $own_subscription,
1197             next => sub {
1198 26         37 my ($new_observable) = @_;
1199              
1200 26         38 push @stored_observables, $new_observable;
1201 26 100 100     142 if (!defined $concurrent or keys(%small_subscriptions) < $concurrent) {
1202 5         11 my $new_obs = shift @stored_observables;
1203 5         19 $new_obs->subscribe(
1204             _op_merge_all_make_subscriber(
1205             \%small_subscriptions,
1206             $subscriber,
1207             \@stored_observables,
1208             \$big_completed,
1209             ),
1210             );
1211             }
1212             },
1213             error => sub {
1214 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
1215             },
1216             complete => sub {
1217 2         4 $big_completed = 1;
1218 2 100 66     11 $subscriber->{complete}->() if !%small_subscriptions and defined $subscriber->{complete};
1219             },
1220 4         30 };
1221              
1222 4         14 $source->subscribe($own_subscriber);
1223              
1224 4         14 return;
1225 4         10 });
1226 4         19 };
1227             }
1228              
1229             sub op_merge_map {
1230 1     1   2 my ($observable_factory) = @_;
1231              
1232             return sub {
1233 1     1   2 my ($source) = @_;
1234              
1235 1         3 return $source->pipe(
1236             op_map($observable_factory),
1237             op_merge_all(),
1238             );
1239 1         5 };
1240             }
1241              
1242             sub op_merge_with {
1243 1     1   3 my @other_sources = @_;
1244              
1245             return sub {
1246 1     1   3 my ($source) = @_;
1247              
1248 1         3 return rx_merge(
1249             $source,
1250             @other_sources,
1251             );
1252 1         5 };
1253             }
1254              
1255             sub op_min {
1256 0     0   0 my ($comparer) = @_;
1257              
1258             return sub {
1259 0     0   0 my ($source) = @_;
1260              
1261             return rx_observable->new(sub {
1262 0         0 my ($subscriber) = @_;
1263              
1264 0         0 my $curr_min;
1265             my $has_curr_min;
1266              
1267             my $own_subscriber = {
1268             %$subscriber,
1269             next => sub {
1270 0         0 my ($v) = @_;
1271              
1272 0 0       0 if (!$has_curr_min) {
1273 0         0 $curr_min = $v;
1274 0         0 $has_curr_min = 1;
1275             }
1276             else {
1277 0 0       0 if (!$comparer) {
1278 0 0       0 if ($v < $curr_min) {
1279 0         0 $curr_min = $v;
1280             }
1281             }
1282             else {
1283 0 0       0 if ($comparer->($v, $curr_min) < 0) {
1284 0         0 $curr_min = $v;
1285             }
1286             }
1287             }
1288             },
1289             complete => sub {
1290 0 0       0 if ($has_curr_min) {
1291 0 0       0 $subscriber->{next}->($curr_min) if defined $subscriber->{next};
1292             }
1293 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
1294             },
1295 0         0 };
1296              
1297 0         0 $source->subscribe($own_subscriber);
1298              
1299 0         0 return;
1300 0         0 });
1301 0         0 };
1302             }
1303              
1304             sub op_multicast {
1305 0     0   0 my ($subject_factory) = @_;
1306              
1307             return sub {
1308 0     0   0 my ($source) = @_;
1309              
1310 0         0 return RxPerl::ConnectableObservable->new($source, $subject_factory);
1311 0         0 };
1312             }
1313              
1314             sub op_on_error_resume_next_with {
1315 1     1   4 my @other_sources = @_;
1316              
1317             return sub {
1318 1     1   3 my ($source) = @_;
1319              
1320 1         3 return rx_on_error_resume_next(
1321             $source,
1322             @other_sources,
1323             );
1324 1         5 };
1325             }
1326              
1327             sub op_pairwise {
1328             return sub {
1329 0     0   0 my ($source) = @_;
1330              
1331             return rx_observable->new(sub {
1332 0         0 my ($subscriber) = @_;
1333              
1334 0         0 my $prev_value;
1335 0         0 my $have_prev_value = 0;
1336              
1337             my $own_subscriber = {
1338             %$subscriber,
1339             (
1340             next => sub {
1341 0         0 my ($value) = @_;
1342              
1343 0 0       0 if ($have_prev_value) {
1344 0 0       0 $subscriber->{next}->([$prev_value, $value]) if defined $subscriber->{next};
1345             } else {
1346 0         0 $have_prev_value = 1;
1347             }
1348              
1349 0         0 $prev_value = $value;
1350             }
1351             ) x!! defined $subscriber->{next},
1352 0         0 };
1353              
1354 0         0 $source->subscribe($own_subscriber);
1355              
1356 0         0 return;
1357 0         0 });
1358 0     0   0 };
1359             }
1360              
1361             sub op_pluck {
1362 1     1   3 my (@keys) = @_;
1363              
1364 1 50       4 croak 'List of properties cannot be empty,' unless @keys;
1365              
1366             return sub {
1367 1     1   2 my ($source) = @_;
1368              
1369             return rx_observable->new(sub {
1370 1         2 my ($subscriber) = @_;
1371              
1372             my $own_subscriber = {
1373             %$subscriber,
1374             next => sub {
1375 5         8 my (@value) = @_;
1376              
1377 5 50       9 if (! @value) {
1378 0 0       0 $subscriber->{next}->() if defined $subscriber->{next};
1379 0         0 return;
1380             }
1381              
1382 5         8 my $cursor = $value[0];
1383 5         8 foreach my $key (@keys) {
1384 7 100 100     42 if ((reftype($cursor) // '') eq 'HASH' and exists $cursor->{$key}) {
      100        
1385 4         8 $cursor = $cursor->{$key};
1386             } else {
1387 3 50       11 $subscriber->{next}->(undef) if defined $subscriber->{next};
1388 3         9 return;
1389             }
1390             }
1391              
1392 2 50       7 $subscriber->{next}->($cursor) if defined $subscriber->{next};
1393             },
1394 1         6 };
1395              
1396 1         4 $source->subscribe($own_subscriber);
1397 1         3 });
1398 1         5 };
1399             }
1400              
1401             sub op_race_with {
1402 1     1   4 my @other_sources = @_;
1403              
1404             return sub {
1405 1     1   2 my ($source) = @_;
1406              
1407 1         3 return rx_race(
1408             $source,
1409             @other_sources,
1410             );
1411 1         5 };
1412             }
1413              
1414             sub op_reduce {
1415 4     4   20 my ($accumulator, @seed) = @_;
1416              
1417             return sub {
1418 4     4   7 my ($source) = @_;
1419              
1420             return rx_observable->new(sub {
1421 4         7 my ($subscriber) = @_;
1422              
1423 4         8 my $got_first = @seed;
1424 4         5 my $acc;
1425 4 100       11 $acc = $seed[0] if $got_first;
1426              
1427 4         6 my $idx = 0;
1428             my $own_subscriber = {
1429             %$subscriber,
1430             next => sub {
1431 15         22 my ($v) = @_;
1432              
1433 15 100       30 if ($got_first) {
1434 14         17 my $ok = eval { $acc = $accumulator->($acc, $v, $idx++); 1 };
  14         32  
  14         46  
1435 14 50 33     46 $ok or $subscriber->{error}->($@) if defined $subscriber->{error};
1436             } else {
1437 1         2 $acc = $v;
1438 1         3 $got_first = 1;
1439             }
1440             },
1441             complete => sub {
1442 4 50 33     26 $subscriber->{next}->($acc) if $got_first and defined $subscriber->{next};
1443 4 50       13 $subscriber->{complete}->() if defined $subscriber->{complete};
1444             },
1445 4         26 };
1446              
1447 4         17 $source->subscribe($own_subscriber);
1448              
1449 4         10 return;
1450 4         7 });
1451 4         20 };
1452             }
1453              
1454             sub op_ref_count {
1455             return sub {
1456 0     0   0 my ($source) = @_;
1457              
1458 0 0       0 croak 'op_ref_count() was not applied to a connectable observable'
1459             unless $source->isa('RxPerl::ConnectableObservable');
1460              
1461 0         0 my $count = 0;
1462              
1463 0         0 my $connection_subscription;
1464             my $typical_unsubscription_fn = sub {
1465 0 0       0 if (--$count == 0) {
1466 0         0 $connection_subscription->unsubscribe;
1467             }
1468 0         0 };
1469              
1470             return rx_observable->new(sub {
1471 0         0 my ($subscriber) = @_;
1472              
1473 0         0 my $count_was = $count++;
1474              
1475 0 0       0 if ($count_was == 0) {
1476 0         0 $connection_subscription = RxPerl::Subscription->new;
1477              
1478 0         0 $subscriber->subscription->add($typical_unsubscription_fn);
1479 0         0 $source->subscribe($subscriber);
1480              
1481 0         0 $connection_subscription = $source->connect;
1482             } else {
1483 0         0 $subscriber->subscription->add($typical_unsubscription_fn);
1484 0         0 $source->subscribe($subscriber);
1485             }
1486              
1487 0         0 return;
1488 0         0 });
1489 0     0   0 };
1490             }
1491              
1492             sub _op_repeat_helper {
1493 8     8   16 my ($subscriber, $source, $count_ref, $own_subscription_ref) = @_;
1494              
1495 8         17 my $own_subscription = RxPerl::Subscription->new;
1496 8         10 $$own_subscription_ref = $own_subscription;
1497             my $own_subscriber = {
1498             new_subscription => $own_subscription,
1499             next => $subscriber->{next},
1500             error => $subscriber->{error},
1501             complete => sub {
1502 6 100   6   11 if (--$$count_ref) {
1503 5         10 _op_repeat_helper(
1504             $subscriber, $source, $count_ref, $own_subscription_ref,
1505             );
1506             } else {
1507 1 50       4 $subscriber->{complete}->() if defined $subscriber->{complete};
1508             }
1509             },
1510 8         33 };
1511              
1512 8         32 $source->subscribe($own_subscriber);
1513             }
1514              
1515             sub op_repeat {
1516 3     3   6 my ($count) = @_;
1517              
1518             return sub {
1519 3     3   6 my ($source) = @_;
1520              
1521             return rx_observable->new(sub {
1522 3         5 my ($subscriber) = @_;
1523              
1524 3         4 my $count = $count;
1525              
1526 3 50       7 $count = -1 if ! defined $count;
1527 3 50       7 if ($count == 0) {
1528 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
1529 0         0 return;
1530             }
1531              
1532 3         5 my $own_subscription;
1533 3         3 my $own_subscription_ref = \$own_subscription;
1534              
1535 3         8 $subscriber->subscription->add(
1536             $own_subscription_ref,
1537             );
1538              
1539 3         12 _op_repeat_helper(
1540             $subscriber, $source, \$count, $own_subscription_ref,
1541             );
1542              
1543 3         6 return;
1544 3         6 });
1545 3         14 };
1546             }
1547              
1548             sub _op_retry_helper {
1549 8     8   15 my ($subscriber, $source, $count_ref, $own_subscription_ref) = @_;
1550              
1551 8         16 my $own_subscription = RxPerl::Subscription->new;
1552 8         14 $$own_subscription_ref = $own_subscription;
1553             my $own_subscriber = {
1554             new_subscription => $own_subscription,
1555             next => $subscriber->{next},
1556             error => sub {
1557 7 100   7   15 if ($$count_ref--) {
1558 6         14 _op_retry_helper(
1559             $subscriber, $source, $count_ref, $own_subscription_ref,
1560             );
1561             } else {
1562 1 50       6 $subscriber->{error}->(@_) if defined $subscriber->{error};
1563             }
1564             },
1565             complete => $subscriber->{complete},
1566 8         30 };
1567              
1568 8         17 $source->subscribe($own_subscriber);
1569             }
1570              
1571             sub op_retry {
1572 2     2   5 my ($count) = @_;
1573              
1574             return sub {
1575 2     2   2 my ($source) = @_;
1576              
1577             return rx_observable->new(sub {
1578 2         4 my ($subscriber) = @_;
1579              
1580 2         3 my $count = $count;
1581              
1582 2 50       6 $count = -1 if ! defined $count;
1583              
1584 2         2 my $own_subscription;
1585 2         4 my $own_subscription_ref = \$own_subscription;
1586              
1587 2         6 $subscriber->subscription->add(
1588             $own_subscription_ref,
1589             );
1590              
1591 2         8 _op_retry_helper(
1592             $subscriber, $source, \$count, $own_subscription_ref,
1593             );
1594              
1595 2         4 return;
1596 2         6 });
1597 2         11 };
1598             }
1599              
1600             sub op_sample {
1601 0     0   0 my ($notifier) = @_;
1602              
1603             return sub {
1604 0     0   0 my ($source) = @_;
1605              
1606             return rx_observable->new(sub {
1607 0         0 my ($subscriber) = @_;
1608              
1609 0         0 my $last_val;
1610             my $has_last_val;
1611              
1612 0         0 my $notifier_subscription = RxPerl::Subscription->new;
1613             my $notifier_subscriber = {
1614             new_subscription => $notifier_subscription,
1615             next => sub {
1616 0 0       0 if ($has_last_val) {
1617 0 0       0 $subscriber->{next}->($last_val) if defined $subscriber->{next};
1618 0         0 undef $has_last_val;
1619             }
1620             },
1621             error => sub {
1622 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
1623             },
1624 0         0 };
1625              
1626 0         0 $subscriber->subscription->add($notifier_subscription);
1627              
1628             my $own_subscriber = {
1629             %$subscriber,
1630             next => sub {
1631 0         0 my ($v) = @_;
1632              
1633 0         0 $last_val = $v;
1634 0         0 $has_last_val = 1;
1635             },
1636 0         0 };
1637              
1638 0         0 $notifier->subscribe($notifier_subscriber);
1639 0         0 $source->subscribe($own_subscriber);
1640              
1641 0         0 return;
1642 0         0 });
1643 0         0 };
1644             }
1645              
1646             sub op_sample_time {
1647 0     0   0 my ($period) = @_;
1648              
1649 0         0 return op_sample(rx_interval($period));
1650             }
1651              
1652             sub op_scan {
1653 0     0   0 my ($accumulator_function, $seed) = @_;
1654 0         0 my $has_seed = @_ >= 2;
1655              
1656             return sub {
1657 0     0   0 my ($source) = @_;
1658              
1659             return rx_observable->new(sub {
1660 0         0 my ($subscriber) = @_;
1661              
1662 0         0 my $has_seed = $has_seed;
1663              
1664 0 0       0 my $acc; $acc = $seed if $has_seed;
  0         0  
1665 0         0 my $index = -1;
1666             my $own_subscriber = {
1667             %$subscriber,
1668             (
1669             next => sub {
1670 0         0 my ($value) = @_;
1671              
1672 0 0       0 if (! $has_seed) {
1673 0         0 $acc = $value;
1674 0         0 $has_seed = 1;
1675             } else {
1676 0         0 ++$index;
1677 0         0 $acc = $accumulator_function->($acc, $value, $index);
1678             }
1679              
1680 0 0       0 $subscriber->{next}->($acc) if defined $subscriber->{next};
1681             },
1682             ) x!! defined $subscriber->{next},
1683 0         0 };
1684              
1685 0         0 $source->subscribe($own_subscriber);
1686              
1687 0         0 return;
1688 0         0 });
1689 0         0 };
1690             }
1691              
1692             sub op_share {
1693             return (
1694 0     0   0 op_multicast(sub { rx_subject->new }),
  0     0   0  
1695             op_ref_count(),
1696             );
1697             }
1698              
1699             sub op_single {
1700 2     2   5 my ($predicate) = @_;
1701              
1702             return sub {
1703 2     2   4 my ($source) = @_;
1704              
1705             return rx_observable->new(sub {
1706 2         3 my ($subscriber) = @_;
1707              
1708 2         3 my @found;
1709              
1710 2         3 my $idx = 0;
1711             my $own_subscriber = {
1712             %$subscriber,
1713             next => sub {
1714 3         5 my ($v) = @_;
1715              
1716 3 100       8 if (!$predicate) {
1717 1         3 push @found, $v;
1718             } else {
1719 2         3 my $found;
1720 2         2 my $ok = eval { local $_ = $v; $found = $predicate->($v, $idx++); 1 };
  2         5  
  2         5  
  2         10  
1721 2 50       5 if (! $ok) {
1722 0 0       0 $subscriber->{error}->($@) if defined $subscriber->{error};
1723 0         0 return;
1724             }
1725 2 100       5 push @found, $v if $found;
1726             }
1727              
1728 3 50 33     63 $subscriber->{error}->('Too many values match') if @found > 1 and defined $subscriber->{error};
1729             },
1730             complete => sub {
1731 2 50       5 if (! @found) {
1732 0 0       0 $subscriber->{error}->('No values match') if defined $subscriber->{error};
1733             } else {
1734 2 50       8 $subscriber->{next}->($found[0]) if defined $subscriber->{next};
1735 2 50       6 $subscriber->{complete}->() if defined $subscriber->{complete};
1736             }
1737             },
1738 2         11 };
1739              
1740 2         8 $source->subscribe($own_subscriber);
1741              
1742 2         4 return;
1743 2         4 });
1744 2         8 };
1745             }
1746              
1747             sub op_skip {
1748 1     1   3 my ($count) = @_;
1749              
1750             return sub {
1751 1     1   2 my ($source) = @_;
1752              
1753             return rx_observable->new(sub {
1754 1         3 my ($subscriber) = @_;
1755              
1756 1         2 my $count = int $count;
1757              
1758 1         2 my $own_subscriber;
1759 1 50       13 $own_subscriber = $subscriber if $count <= 0;
1760             $own_subscriber //= {
1761             %$subscriber,
1762             next => sub {
1763 5 100       13 if ($count-- <= 0) {
1764 2 50       9 $subscriber->{next}->(@_) if defined $subscriber->{next};
1765             }
1766             },
1767 1   50     11 };
1768              
1769 1         4 $source->subscribe($own_subscriber);
1770              
1771 1         5 return;
1772 1         4 });
1773 1         6 };
1774             }
1775              
1776             sub op_skip_last {
1777 1     1   2 my ($skip_count) = @_;
1778              
1779             return sub {
1780 1     1   3 my ($source) = @_;
1781              
1782             return rx_observable->new(sub {
1783 1         3 my ($subscriber) = @_;
1784              
1785 1         2 my @skipped;
1786 1         3 $subscriber->subscription->add(sub { undef @skipped });
  1         4  
1787              
1788 1         5 my $own_subscriber = { %$subscriber };
1789             $own_subscriber->{next} &&= sub {
1790 4         7 my ($v) = @_;
1791              
1792 4         7 push @skipped, $v;
1793 4 100       12 if (@skipped > $skip_count) {
1794 2         4 my $new_v = shift @skipped;
1795 2 50       9 $subscriber->{next}->($new_v) if defined $subscriber->{next};
1796             }
1797 1   50     10 };
1798              
1799 1         3 $source->subscribe($own_subscriber);
1800              
1801 1         2 return;
1802 1         3 });
1803 1         6 };
1804             }
1805              
1806             sub op_skip_until {
1807 6     6   11 my ($notifier) = @_;
1808              
1809             # FUTURE TODO: allow notifier to be a promise
1810 6 50       14 croak q"You provided 'undef' where a stream was expected. You can provide an observable."
1811             unless defined $notifier;
1812 6 50 33     41 croak q"The notifier of 'op_skip_until' needs to be an observable."
1813             unless blessed $notifier and $notifier->isa('RxPerl::Observable');
1814              
1815             return sub {
1816 6     6   12 my ($source) = @_;
1817              
1818             return rx_observable->new(sub {
1819 6         8 my ($subscriber) = @_;
1820              
1821 6         6 my $notifier_has_emitted;
1822             my $n_s = $notifier->pipe(
1823             op_take(1),
1824             )->subscribe(
1825             sub {
1826 3         6 $notifier_has_emitted = 1;
1827             },
1828             sub {
1829 1 50       5 $subscriber->{error}->(@_) if defined $subscriber->{error};
1830             },
1831 6         16 );
1832              
1833             my $own_subscriber = {
1834             %$subscriber,
1835             next => sub {
1836             $subscriber->{next}->(@_) if defined $subscriber->{next}
1837 32 100 66     112 and $notifier_has_emitted;
1838             },
1839 6         40 };
1840              
1841 6         19 $source->subscribe($own_subscriber);
1842              
1843 6         16 return $n_s;
1844 6         10 });
1845 6         26 };
1846             }
1847              
1848             sub op_skip_while {
1849 1     1   3 my ($predicate) = @_;
1850              
1851             return sub {
1852 1     1   2 my ($source) = @_;
1853              
1854             return rx_observable->new(sub {
1855 1         3 my ($subscriber) = @_;
1856              
1857 1         3 my $finished_skipping = 0;
1858              
1859 1         1 my $idx = 0;
1860             my $own_subscriber = {
1861             %$subscriber,
1862             next => sub {
1863 6         8 my ($v) = @_;
1864              
1865 6         7 my $should_display;
1866 6 100       11 if ($finished_skipping) {
1867 3         4 $should_display = 1;
1868             } else {
1869 3         4 my $satisfies_predicate;
1870 3         4 my $ok = eval { local $_ = $v; $satisfies_predicate = $predicate->($v, $idx++); 1 };
  3         4  
  3         8  
  3         19  
1871 3 50       9 $ok or do {
1872 0 0       0 $subscriber->{error}->($@) if defined $subscriber->{error};
1873 0         0 return;
1874             };
1875 3 100       5 if (! $satisfies_predicate) {
1876 1         2 $finished_skipping = 1;
1877 1         2 $should_display = 1;
1878             }
1879             }
1880              
1881 6 100 66     24 $subscriber->{next}->(@_) if $should_display and defined $subscriber->{next};
1882             }
1883 1         6 };
1884              
1885 1         4 $source->subscribe($own_subscriber);
1886              
1887 1         9 return;
1888 1         3 });
1889 1         5 };
1890             }
1891              
1892             sub op_start_with {
1893 3     3   7 my (@values) = @_;
1894              
1895             return sub {
1896 3     3   5 my ($source) = @_;
1897              
1898 3         22 return rx_concat(
1899             rx_of(@values),
1900             $source,
1901             );
1902 3         16 };
1903             }
1904              
1905             sub op_switch_all {
1906             return sub {
1907 1     1   4 my ($source) = @_;
1908              
1909             return rx_observable->new(sub {
1910 1         2 my ($subscriber) = @_;
1911              
1912 1         4 my $old_subscription;
1913              
1914             my $chief_is_complete;
1915 1         0 my $sub_is_complete;
1916              
1917             my $obs_subscriber = {
1918             next => sub {
1919 6 50       16 $subscriber->{next}->(@_) if defined $subscriber->{next};
1920             },
1921             error => sub {
1922 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
1923             },
1924             complete => sub {
1925 0         0 $sub_is_complete = 1;
1926 0 0 0     0 $subscriber->{complete}->() if $chief_is_complete and defined $subscriber->{complete};
1927             },
1928 1         6 };
1929              
1930 1         4 my $own_subscription = RxPerl::Subscription->new;
1931 1         21 $subscriber->subscription->add(\$old_subscription, $own_subscription);
1932              
1933             my $own_subscriber = {
1934             new_subscription => $own_subscription,
1935             next => sub {
1936 3         5 my ($new_observable) = @_;
1937              
1938 3         4 $sub_is_complete = 0;
1939 3 100       11 $old_subscription->unsubscribe() if $old_subscription;
1940 3         7 $old_subscription = $new_observable->subscribe($obs_subscriber);
1941             },
1942             error => sub {
1943 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
1944             },
1945             complete => sub {
1946 1         3 $chief_is_complete = 1;
1947 1 50 33     4 $subscriber->{complete}->() if $sub_is_complete and defined $subscriber->{complete};
1948             },
1949 1         8 };
1950              
1951 1         5 $source->subscribe($own_subscriber);
1952              
1953 1         2 return;
1954 1         3 });
1955 1     1   4 };
1956             }
1957              
1958             sub op_switch_map {
1959 0     0   0 my ($observable_factory) = @_;
1960              
1961             return sub {
1962 0     0   0 my ($source) = @_;
1963              
1964 0         0 return $source->pipe(
1965             op_map($observable_factory),
1966             op_switch_all(),
1967             );
1968             }
1969 0         0 }
1970              
1971             sub op_take {
1972 87     87   138 my ($count) = @_;
1973              
1974 87 50       205 croak 'negative argument passed to op_take' unless $count >= 0;
1975              
1976             return sub {
1977 87     87   129 my ($source) = @_;
1978              
1979             return rx_observable->new(sub {
1980 71         100 my ($subscriber) = @_;
1981              
1982 71         108 my $remaining = int $count;
1983              
1984 71 100       186 if ($remaining == 0) {
1985 1 50       6 $subscriber->{complete}->() if defined $subscriber->{complete};
1986 1         2 return;
1987             }
1988              
1989             my $own_subscriber = {
1990             %$subscriber,
1991             next => sub {
1992 192 100       517 $subscriber->{next}->(@_) if defined $subscriber->{next};
1993 192 100 100     727 $subscriber->{complete}->() if --$remaining == 0 and defined $subscriber->{complete};
1994             },
1995 70         304 };
1996              
1997 70         206 $source->subscribe($own_subscriber);
1998              
1999 70         172 return;
2000 87         145 });
2001 87         338 };
2002             }
2003              
2004             sub op_take_last {
2005 2     2   5 my ($count) = @_;
2006              
2007             return sub {
2008 2     2   4 my ($source) = @_;
2009              
2010             return rx_observable->new(sub {
2011 2         3 my ($subscriber) = @_;
2012              
2013 2         4 my @last_values;
2014              
2015             my $own_subscriber = {
2016             %$subscriber,
2017             next => sub {
2018 7         10 my ($v) = @_;
2019              
2020 7         10 push @last_values, $v;
2021 7 100       17 if (@last_values > $count) {
2022 2         5 shift @last_values;
2023             }
2024             },
2025             complete => sub {
2026 2         4 foreach my $last_val (@last_values) {
2027 5 50       13 $subscriber->{next}->($last_val) if defined $subscriber->{next};
2028             }
2029 2 50       6 $subscriber->{complete}->() if defined $subscriber->{complete};
2030             },
2031 2         11 };
2032              
2033 2         7 $source->subscribe($own_subscriber);
2034              
2035 2         13 return;
2036 2         4 });
2037             }
2038 2         7 }
2039              
2040             sub op_take_until {
2041 1     1   6 my ($notifier_observable) = @_;
2042              
2043             return sub {
2044 1     1   2 my ($source) = @_;
2045              
2046             return rx_observable->new(sub {
2047 1         3 my ($subscriber) = @_;
2048              
2049             my $n_s = $notifier_observable->subscribe(
2050             sub {
2051 1 50       7 $subscriber->{complete}->() if defined $subscriber->{complete};
2052             },
2053             sub {
2054 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
2055             },
2056 1         7 );
2057              
2058 1         4 $source->subscribe($subscriber);
2059              
2060 1         3 return $n_s;
2061 1         3 });
2062 1         5 };
2063             }
2064              
2065             sub op_take_while {
2066 2     2   5 my ($cond, $include) = @_;
2067              
2068             return sub {
2069 2     2   3 my ($source) = @_;
2070              
2071             return rx_observable->new(sub {
2072 2         4 my ($subscriber) = @_;
2073              
2074 2         4 my $i = 0;
2075             my $own_subscriber = {
2076             %$subscriber,
2077             next => sub {
2078 6         10 my ($value) = @_;
2079              
2080 6 100       8 if (! do { local $_ = $value; $cond->($value, $i++) }) {
  6         8  
  6         13  
2081 2 100 66     15 $subscriber->{next}->($value) if $include and defined $subscriber->{next};
2082 2 50       7 $subscriber->{complete}->() if defined $subscriber->{complete};
2083 2         11 return;
2084             }
2085              
2086 4 50       25 $subscriber->{next}->(@_) if defined $subscriber->{next};
2087             },
2088 2         10 };
2089              
2090 2         7 $source->subscribe($own_subscriber);
2091              
2092 2         6 return;
2093 2         6 });
2094 2         9 };
2095             }
2096              
2097             sub op_tap {
2098 0     0   0 my @args = @_;
2099              
2100             return sub {
2101 0     0   0 my ($source) = @_;
2102              
2103             return rx_observable->new(sub {
2104 0         0 my ($subscriber) = @_;
2105              
2106 0         0 my @args = @args;
2107 0 0 0     0 my $tap_subscriber; $tap_subscriber = $args[0] if (reftype($args[0]) // '') eq 'HASH';
  0         0  
2108             $tap_subscriber //= {
2109 0   0     0 map {($_, shift @args)} qw/ next error complete /
  0         0  
2110             };
2111              
2112 0         0 my %own_keys = map {$_ => 1} grep { /^(next|error|complete)\z/ } (keys(%$tap_subscriber), keys(%$subscriber));
  0         0  
  0         0  
2113              
2114             my $own_subscriber = {
2115             %$subscriber,
2116             map {
2117 0         0 my $key = $_;
  0         0  
2118             ($key => sub {
2119 0 0       0 $tap_subscriber->{$key}->(@_) if defined $tap_subscriber->{$key};
2120 0 0       0 $subscriber->{$key}->(@_) if defined $subscriber->{$key};
2121 0         0 });
2122             } keys %own_keys
2123             };
2124              
2125 0         0 $source->subscribe($own_subscriber);
2126              
2127 0         0 return;
2128 0         0 });
2129 0         0 };
2130             }
2131              
2132             sub op_throttle {
2133 0     0   0 my ($duration_selector) = @_;
2134              
2135             return sub {
2136 0     0   0 my ($source) = @_;
2137              
2138             return rx_observable->new(sub {
2139 0         0 my ($subscriber) = @_;
2140              
2141 0         0 my $mini_subscription;
2142              
2143             my $mini_subscriber = {
2144             error => sub {
2145 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
2146             },
2147             complete => sub {
2148 0         0 undef $mini_subscription;
2149             },
2150 0         0 };
2151              
2152             my $own_subscriber = {
2153             %$subscriber,
2154             next => sub {
2155 0         0 my ($v) = @_;
2156              
2157 0 0       0 if (! $mini_subscription) {
2158 0 0       0 $subscriber->{next}->(@_) if defined $subscriber->{next};
2159 0         0 $mini_subscription = do { local $_ = $v; $duration_selector->($v) }->pipe(
  0         0  
  0         0  
2160             op_take(1),
2161             )->subscribe($mini_subscriber);
2162             }
2163             },
2164 0         0 };
2165              
2166 0         0 $subscriber->subscription->add(\$mini_subscription);
2167              
2168 0         0 $source->subscribe($own_subscriber);
2169              
2170 0         0 return;
2171 0         0 });
2172 0         0 };
2173             }
2174              
2175             sub op_throttle_time {
2176 0     0   0 my ($duration) = @_;
2177              
2178 0     0   0 return op_throttle(sub { rx_timer($duration) });
  0         0  
2179             }
2180              
2181             sub op_throw_if_empty {
2182 0     0   0 my ($error_factory) = @_;
2183              
2184             return sub {
2185 0     0   0 my ($source) = @_;
2186              
2187             return rx_observable->new(sub {
2188 0         0 my ($subscriber) = @_;
2189              
2190 0         0 my $is_empty = 1;
2191              
2192             my $own_subscriber = {
2193             %$subscriber,
2194             next => sub {
2195 0         0 $is_empty = 0;
2196 0 0       0 $subscriber->{next}->(@_) if defined $subscriber->{next};
2197             },
2198             complete => sub {
2199 0 0       0 if ($is_empty) {
2200 0 0       0 $subscriber->{error}->($error_factory->()) if defined $subscriber->{error};
2201             } else {
2202 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
2203             }
2204             },
2205 0         0 };
2206              
2207 0         0 $source->subscribe($own_subscriber);
2208              
2209 0         0 return;
2210 0         0 });
2211 0         0 };
2212             }
2213              
2214             sub op_time_interval {
2215             return sub {
2216 0     0   0 my ($source) = @_;
2217              
2218 0         0 my $t0 = Time::HiRes::time();
2219              
2220             return $source->pipe(
2221             op_map(sub {
2222 0         0 my $now = Time::HiRes::time();
2223 0         0 my $interval = $now - $t0;
2224 0         0 $t0 = $now;
2225 0         0 return { value => $_, interval => $interval };
2226 0         0 }),
2227             );
2228 0     0   0 };
2229             }
2230              
2231             sub op_timeout {
2232 0     0   0 my ($duration) = @_;
2233              
2234             return sub {
2235 0     0   0 my ($source) = @_;
2236              
2237             return rx_observable->new(sub {
2238 0         0 my ($subscriber) = @_;
2239              
2240 0         0 my $subject = rx_behavior_subject->new(1);
2241             my $s_s = $subject->pipe(
2242 0         0 op_switch_map(sub { rx_timer($duration) }),
2243             )->subscribe(sub {
2244 0 0       0 $subscriber->{error}->('Timeout has occurred') if defined $subscriber->{error};
2245 0         0 });
2246              
2247 0         0 my $own_subscription = RxPerl::Subscription->new;
2248 0         0 $subscriber->subscription->add($own_subscription, $s_s);
2249              
2250             my $own_subscriber = {
2251             new_subscription => $own_subscription,
2252             next => sub {
2253 0 0       0 $subject->{next}->(1) if defined $subject->{next};
2254 0 0       0 $subscriber->{next}->(@_) if defined $subscriber->{next};
2255             },
2256             error => sub {
2257 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
2258             },
2259             complete => sub {
2260 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
2261             }
2262 0         0 };
2263              
2264 0         0 $source->subscribe($own_subscriber);
2265              
2266 0         0 return;
2267 0         0 });
2268 0         0 };
2269             }
2270              
2271             sub op_timestamp {
2272             return op_map(sub {
2273             return {
2274 0     0   0 value => $_,
2275             timestamp => Time::HiRes::time(),
2276             };
2277 0     0   0 });
2278             }
2279              
2280             sub op_to_array {
2281             return sub {
2282 0     0   0 my ($source) = @_;
2283              
2284             return rx_observable->new(sub {
2285 0         0 my ($subscriber) = @_;
2286              
2287 0         0 my @values;
2288              
2289             my $own_subscriber = {
2290             %$subscriber,
2291             next => sub {
2292 0         0 my ($v) = @_;
2293 0         0 push @values, $v;
2294             },
2295             complete => sub {
2296 0 0       0 $subscriber->{next}->(\@values) if defined $subscriber->{next};
2297 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
2298             },
2299 0         0 };
2300              
2301 0         0 $source->subscribe($own_subscriber);
2302              
2303 0         0 return;
2304 0         0 });
2305 0     0   0 };
2306             }
2307              
2308             sub op_with_latest_from {
2309 1     1   6 my (@other_observables) = @_;
2310              
2311             return sub {
2312 1     1   3 my ($source) = @_;
2313              
2314             return rx_observable->new(sub {
2315 1         3 my ($subscriber) = @_;
2316              
2317 1         2 my @other_observables = @other_observables;
2318 1         3 my $i = 0;
2319 1         2 my %didnt_emit = map {($i++, 1)} @other_observables;
  1         4  
2320 1         3 my @latest_values;
2321             my %other_subscriptions;
2322              
2323             $subscriber->subscription->add(
2324 1         3 \%other_subscriptions, sub { undef @other_observables },
2325 1         3 );
2326              
2327 1         4 for (my $i = 0; $i < @other_observables; $i++) {
2328 1         2 my $j = $i;
2329 1         1 my $other_observable = $other_observables[$j];
2330              
2331 1         3 my $other_subscription = RxPerl::Subscription->new;
2332 1         2 $other_subscriptions{$other_subscription} = $other_subscription;
2333             $other_observable->subscribe({
2334             new_subscription => $other_subscription,
2335             next => sub {
2336 4         7 my ($value) = @_;
2337              
2338 4         5 $latest_values[$j] = $value;
2339 4         9 delete $didnt_emit{$j};
2340             },
2341             error => $subscriber->{error},
2342 1         7 });
2343             }
2344              
2345             $source->subscribe({
2346             %$subscriber,
2347             next => sub {
2348 5         6 my ($value) = @_;
2349              
2350 5 100       12 if (! %didnt_emit) {
2351 4 50       13 $subscriber->{next}->([$value, @latest_values]) if defined $subscriber->{next};
2352             }
2353             },
2354 1         9 });
2355              
2356 1         4 return;
2357 1         3 });
2358 1         5 };
2359             }
2360              
2361             sub op_zip_with {
2362 1     1   3 my @other_sources = @_;
2363              
2364             return sub {
2365 1     1   2 my ($source) = @_;
2366              
2367 1         4 return rx_zip(
2368             $source,
2369             @other_sources,
2370             );
2371 1         7 };
2372             }
2373              
2374             sub _eqq {
2375 20     20   29 my ($x, $y) = @_;
2376              
2377 20 100       33 defined $x or return !defined $y;
2378 17 100       32 defined $y or return !!0;
2379 15 100       24 ref $x eq ref $y or return !!0;
2380 14 100       54 return length(ref $x) ? refaddr $x == refaddr $y : $x eq $y;
2381             }
2382              
2383             1;