File Coverage

blib/lib/RxPerl/Operators/Pipeable.pm
Criterion Covered Total %
statement 780 1082 72.0
branch 199 436 45.6
condition 48 88 54.5
subroutine 116 166 69.8
pod n/a
total 1143 1772 64.5


line stmt bran cond sub pod time code
1             package RxPerl::Operators::Pipeable;
2 5     5   35 use strict;
  5         12  
  5         171  
3 5     5   29 use warnings;
  5         8  
  5         194  
4              
5 5         458 use RxPerl::Operators::Creation qw/
6             rx_observable rx_subject rx_concat rx_of rx_interval rx_combine_latest rx_concat
7             rx_throw_error rx_zip rx_merge rx_on_error_resume_next rx_race rx_timer
8             rx_behavior_subject
9 5     5   27 /;
  5         10  
10 5     5   2070 use RxPerl::ConnectableObservable;
  5         11  
  5         146  
11 5     5   40 use RxPerl::Utils qw/ get_timer_subs /;
  5         10  
  5         256  
12 5     5   30 use RxPerl::Subscription;
  5         11  
  5         112  
13              
14 5     5   23 use Carp 'croak';
  5         11  
  5         206  
15 5     5   35 use Scalar::Util 'reftype', 'refaddr', 'blessed', 'weaken';
  5         12  
  5         276  
16 5     5   32 use Time::HiRes ();
  5         9  
  5         122  
17              
18 5     5   26 use Exporter 'import';
  5         18  
  5         73199  
19             our @EXPORT_OK = qw/
20             op_audit op_audit_time op_buffer op_buffer_count op_buffer_time op_catch_error op_combine_latest_with op_concat_all
21             op_concat_map op_concat_with op_count op_debounce op_debounce_time op_default_if_empty op_delay op_delay_when
22             op_distinct op_distinct_until_changed op_distinct_until_key_changed op_element_at op_end_with op_every
23             op_exhaust_all op_exhaust_map op_filter op_finalize op_find op_find_index op_first op_ignore_elements
24             op_is_empty op_last op_map op_map_to op_max op_merge_all op_merge_map op_merge_with op_min op_multicast
25             op_on_error_resume_next_with op_pairwise op_pluck op_race_with op_reduce op_ref_count op_repeat op_retry op_sample
26             op_sample_time op_scan op_share op_single op_skip op_skip_last op_skip_until op_skip_while op_start_with
27             op_switch_all op_switch_map op_take op_take_last op_take_until op_take_while op_tap op_throttle op_throttle_time
28             op_throw_if_empty op_time_interval op_timeout op_timestamp op_to_array op_with_latest_from op_zip_with
29             /;
30             our %EXPORT_TAGS = (all => \@EXPORT_OK);
31              
32             our $VERSION = "v6.28.0";
33              
34             sub op_audit {
35 3     3   8 my ($duration_selector) = @_;
36              
37             return sub {
38 3     3   7 my ($source) = @_;
39              
40             return rx_observable->new(sub {
41 3         6 my ($subscriber) = @_;
42              
43 3         9 my $last_val;
44             my $mini_subscription;
45 3         0 my $main_is_complete;
46              
47             my $mini_subscriber = {
48             next => sub {
49 5 100       25 $subscriber->{next}->($last_val) if defined $subscriber->{next};
50             },
51             error => sub {
52 1 50       24 $subscriber->{error}->(@_) if defined $subscriber->{error};
53             },
54             complete => sub {
55 5         10 undef $mini_subscription;
56 5         17 undef $last_val;
57 5 100       16 if ($main_is_complete) {
58 1 50       19 $subscriber->{complete}->() if defined $subscriber->{complete};
59             }
60             },
61 3         16 };
62              
63             my $own_subscriber = {
64             next => sub {
65 11         31 my ($v) = @_;
66              
67 11         24 $last_val = $v;
68 11 100       29 if (!defined $mini_subscription) {
69 6         9 my $o = do { local $_ = $v; $duration_selector->($v) };
  6         11  
  6         19  
70 6         18 $mini_subscription = $o->pipe(
71             op_take(1),
72             )->subscribe($mini_subscriber);
73             }
74             },
75             error => sub {
76 1 50       7 $subscriber->{error}->(@_) if defined $subscriber->{error};
77             },
78             complete => sub {
79 1         9 $main_is_complete = 1;
80 1 50       9 if (! defined $mini_subscription) {
81 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
82             }
83             }
84 3         18 };
85              
86 3         11 my $own_subscription = $source->subscribe($own_subscriber);
87              
88 3         9 return $mini_subscription, $own_subscription;
89 3         8 });
90 3         15 };
91             }
92              
93             sub op_audit_time {
94 0     0   0 my ($duration) = @_;
95              
96 0     0   0 return op_audit(sub { rx_timer($duration) });
  0         0  
97             }
98              
99             sub op_buffer {
100 2     2   14 my ($notifier) = @_;
101              
102             return sub {
103 2     2   8 my ($source) = @_;
104              
105             return rx_observable->new(sub {
106 2         4 my ($subscriber) = @_;
107              
108 2         3 my @buffer;
109              
110             my $own_subscriber = {
111             %$subscriber,
112             next => sub {
113 16         40 push @buffer, $_[0];
114             },
115             error => sub {
116 0 0       0 $subscriber->{error}->($_[0]) if defined $subscriber->{error};
117             },
118             complete => sub {
119 2 50 33     17 $subscriber->{next}->([@buffer]) if @buffer and defined $subscriber->{next};
120 2         5 undef @buffer;
121 2 50       9 $subscriber->{complete}->() if defined $subscriber->{complete};
122             },
123 2         18 };
124              
125             my $notifier_subscriber = {
126             next => sub {
127 5 50       27 $subscriber->{next}->([@buffer]) if defined $subscriber->{next};
128 5         20 undef @buffer;
129             },
130             error => sub {
131 0 0       0 $subscriber->{error}->($_[0]) if defined $subscriber->{error};
132             },
133 2         13 };
134              
135 2         6 my $s1 = $source->subscribe($own_subscriber);
136 2         10 my $s2 = $notifier->subscribe($notifier_subscriber);
137              
138 2         16 return [$s1, $s2], sub { undef @buffer };
  2         6  
139             })
140 2         6 }
141 2         12 }
142              
143             sub op_buffer_count {
144 3     3   8 my ($buffer_size, $start_buffer_every) = @_;
145              
146 3   66     14 $start_buffer_every //= $buffer_size;
147              
148             return sub {
149 3     3   7 my ($source) = @_;
150              
151             return rx_observable->new(sub {
152 3         15 my ($subscriber) = @_;
153              
154 3         6 my @buffers;
155 3         4 my $count = 0;
156             my $own_subscriber = {
157             %$subscriber,
158             next => sub {
159 18         41 my ($value) = @_;
160              
161 18 100       83 if ($count++ % $start_buffer_every == 0) {
162 13         33 push @buffers, [];
163             }
164              
165 18         46 for (my $i = 0; $i < @buffers; $i++) {
166 25         38 my $buffer = $buffers[$i];
167              
168 25         46 push @$buffer, $value;
169              
170 25 100       107 if (@$buffer == $buffer_size) {
171 10 50       40 $subscriber->{next}->($buffer) if defined $subscriber->{next};
172 10         23 splice @buffers, $i, 1;
173 10         15 $i--;
174 10         34 next;
175             }
176             }
177             },
178             complete => sub {
179 3 50       13 if (defined $subscriber->{next}) {
180 3         13 $subscriber->{next}->($_) foreach @buffers;
181             }
182              
183 3 50       13 $subscriber->{complete}->() if defined $subscriber->{complete};
184             },
185 3         25 };
186              
187 3         13 $source->subscribe($own_subscriber);
188              
189 3         8 return;
190 3         8 });
191 3         19 };
192             }
193              
194             sub op_buffer_time {
195 1     1   12 my ($buffer_time_span) = @_;
196              
197 1         5 return op_buffer(rx_interval($buffer_time_span));
198             }
199              
200             sub _op_catch_error_helper {
201 0     0   0 my ($source, $selector, $subscriber, $dependents, $error) = @_;
202              
203 0         0 my $new_observable;
204 0 0       0 if (@_ == 4) {
205 0         0 $new_observable = $source;
206             } else {
207 0         0 eval { $new_observable = $selector->($error, $source) };
  0         0  
208 0 0       0 if (my $e = $@) {
209 0 0       0 $subscriber->{error}->($e) if defined $subscriber->{error};
210 0         0 return;
211             }
212             }
213              
214 0         0 my $own_subscription = RxPerl::Subscription->new;
215 0         0 @$dependents = ($own_subscription);
216             my $own_subscriber = {
217             new_subscription => $own_subscription,
218             next => $subscriber->{next},
219             error => sub {
220 0     0   0 _op_catch_error_helper($source, $selector, $subscriber, $dependents, $_[0]);
221             },
222             complete => $subscriber->{complete},
223 0         0 };
224              
225 0         0 $new_observable->subscribe($own_subscriber);
226             }
227              
228             sub op_catch_error {
229 0     0   0 my ($selector) = @_;
230              
231             return sub {
232 0     0   0 my ($source) = @_;
233              
234             return rx_observable->new(sub {
235 0         0 my ($subscriber) = @_;
236              
237 0         0 my $dependents = [];
238 0         0 $subscriber->subscription->add($dependents);
239              
240 0         0 _op_catch_error_helper(
241             $source, $selector, $subscriber, $dependents,
242             );
243              
244 0         0 return;
245 0         0 });
246 0         0 };
247             }
248              
249             sub op_combine_latest_with {
250 1     1   22 my (@other_observables) = @_;
251              
252             return sub {
253 1     1   3 my $source = shift;
254              
255 1         7 return rx_combine_latest([$source, @other_observables]);
256             }
257 1         8 }
258              
259             sub op_concat_all {
260 1     1   5 return op_merge_all(1);
261             }
262              
263             sub op_concat_map {
264 0     0   0 my ($observable_factory) = @_;
265              
266             return sub {
267 0     0   0 my ($source) = @_;
268              
269 0         0 return $source->pipe(
270             op_map($observable_factory),
271             op_concat_all(),
272             );
273 0         0 };
274             }
275              
276             sub op_concat_with {
277 3     3   27 my @other_observables = @_;
278              
279             return sub {
280 3     3   15 my ($source) = @_;
281              
282 3         13 return rx_concat(
283             $source,
284             @other_observables,
285             );
286 3         17 };
287             }
288              
289             sub op_count {
290 3     3   7 my ($predicate) = @_;
291              
292             return sub {
293 3     3   13 my ($source) = @_;
294              
295             return rx_observable->new(sub {
296 3         13 my ($subscriber) = @_;
297              
298 3         7 my $count = 0;
299 3         5 my $idx = 0;
300              
301             my $own_subscriber = {
302             %$subscriber,
303             next => sub {
304 14         29 my ($v) = @_;
305 14         27 local $_ = $v;
306 14 100 66     50 if (!$predicate or $predicate->($v, $idx++)) {
307 7         43 $count++;
308             }
309             },
310             complete => sub {
311 3 50       16 $subscriber->{next}->($count) if defined $subscriber->{next};
312 3 50       15 $subscriber->{complete}->() if defined $subscriber->{complete};
313             },
314 3         22 };
315              
316 3         12 $source->subscribe($own_subscriber);
317              
318 3         22 return;
319 3         8 });
320 3         16 };
321             }
322              
323             sub op_debounce {
324 2     2   6 my ($duration_selector) = @_;
325              
326             return sub {
327 2     2   5 my ($source) = @_;
328              
329             return rx_observable->new(sub {
330 2         14 my ($subscriber) = @_;
331              
332 2         10 my $mini_subscription;
333             my $last_val;
334 2         0 my $has_last_val;
335              
336             my $mini_subscriber = {
337             next => sub {
338 5 50       21 $subscriber->{next}->($last_val) if defined $subscriber->{next};
339 5         14 undef $has_last_val;
340             },
341             error => sub {
342 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
343             },
344             complete => sub {
345 5         8 undef $mini_subscription;
346             },
347 2         13 };
348              
349             my $own_subscriber = {
350             next => sub {
351 12         24 my ($v) = @_;
352              
353 12 100       28 if (defined $mini_subscription) {
354 6         15 $mini_subscription->unsubscribe();
355             }
356              
357 12         23 $last_val = $v;
358 12         18 $has_last_val = 1;
359              
360 12         17 my $o = do { local $_ = $v; $duration_selector->($v) };
  12         16  
  12         24  
361 12         33 $mini_subscription = $o->pipe(
362             op_take(1),
363             )->subscribe($mini_subscriber);
364             },
365             error => sub {
366 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
367             },
368             complete => sub {
369 2 100       6 if ($has_last_val) {
370 1 50       7 $subscriber->{next}->($last_val) if defined $subscriber->{next};
371             }
372 2 50       9 $subscriber->{complete}->() if defined $subscriber->{complete};
373             }
374 2         14 };
375              
376 2         7 my $main_subscription = $source->subscribe($own_subscriber);
377 2         17 $main_subscription->add(\$mini_subscription);
378              
379 2         6 return $main_subscription, $mini_subscription;
380 2         5 });
381 2         11 };
382             }
383              
384             sub op_debounce_time {
385 2     2   6 my ($due_time) = @_;
386              
387 2     12   11 return op_debounce(sub { rx_timer($due_time) });
  12         28  
388             }
389              
390             sub op_default_if_empty {
391 7     7   17 my ($default_value) = @_;
392              
393             return sub {
394 7     7   35 my ($source) = @_;
395              
396             return rx_observable->new(sub {
397 7         25 my ($subscriber) = @_;
398              
399 7         21 my $source_emitted = 0;
400              
401             my $own_subscriber = {
402             %$subscriber,
403             next => sub {
404 3         9 $source_emitted = 1;
405 3 50       13 $subscriber->{next}->(@_) if exists $subscriber->{next};
406             },
407             complete => sub {
408 7 50 66     49 $subscriber->{next}->($default_value) if ! $source_emitted and exists $subscriber->{next};
409 7 50       31 $subscriber->{complete}->() if exists $subscriber->{complete};
410             },
411 7         49 };
412              
413 7         26 $source->subscribe($own_subscriber);
414              
415 7         33 return;
416 7         20 });
417 7         41 };
418             }
419              
420             sub op_delay {
421 672     672   1105 my ($delay) = @_;
422              
423 672         1384 my ($timer_sub, $cancel_timer_sub) = get_timer_subs;
424              
425             return sub {
426 672     672   1050 my ($source) = @_;
427              
428             return rx_observable->new(sub {
429 722         1153 my ($subscriber) = @_;
430              
431 722         1525 my %timers;
432             my $queue;
433 722         0 my $completed;
434             my $own_subscriber = {
435             error => sub {
436 1         3 my @value = @_;
437 1 50       7 $subscriber->{error}->(@value) if defined $subscriber->{error};
438             },
439             next => sub {
440 728         1561 my @value = @_;
441              
442 728 100       1542 if (!defined $queue) {
443 727         1107 $queue = [];
444 727         1101 my ($timer1, $timer2);
445             $timer1 = $timer_sub->(0, sub {
446 727         1204 delete $timers{$timer1};
447 727         1424 my @queue_copy = @$queue;
448 727         1149 undef $queue;
449             $timer2 = $timer_sub->($delay, sub {
450 692         1175 delete $timers{$timer2};
451 692         1248 foreach my $item (@queue_copy) {
452 693 100       2104 $subscriber->{next}->(@$item) if defined $subscriber->{next};
453             }
454 692 100 100     2472 if ($completed and ! %timers) {
455 687 100       2117 $subscriber->{complete}->() if defined $subscriber->{complete};
456             }
457 727         2701 });
458 727         2191 $timers{$timer2} = $timer2;
459 727         3258 });
460 727         1672 $timers{$timer1} = $timer1;
461             }
462 728         2621 push @$queue, \@value;
463             },
464             complete => sub {
465 721         1004 $completed = 1;
466 721 100       1611 if (! %timers) {
467 1 50       11 $subscriber->{complete}->() if defined $subscriber->{complete};
468             }
469             },
470 722         4694 };
471              
472             return [
473             $source->subscribe($own_subscriber),
474             sub {
475 722         1745 $cancel_timer_sub->($_) foreach values %timers;
476 722         1716 %timers = ();
477             },
478 722         1901 ];
479 672         1311 });
480 672         2931 };
481             }
482              
483             sub op_delay_when {
484 1     1   3 my ($delay_duration_selector) = @_;
485              
486             return sub {
487 1     1   4 my ($source) = @_;
488              
489             return rx_observable->new(sub {
490 1         4 my ($subscriber) = @_;
491              
492 1         3 my $idx = 0;
493 1         3 my %mini_subscriptions;
494             my $main_finished;
495              
496             my $make_mini_subscriber = sub {
497 3         7 my ($v) = @_;
498              
499 3         9 my $mini_subscription = RxPerl::Subscription->new;
500 3         11 $mini_subscriptions{$mini_subscription} = $mini_subscription;
501              
502             return {
503             new_subscription => $mini_subscription,
504             next => sub {
505 3 50       11 $subscriber->{next}->($v) if defined $subscriber->{next};
506             },
507             error => sub {
508 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
509             },
510             complete => sub {
511 3         10 delete $mini_subscriptions{$mini_subscription};
512 3 100 66     15 if ($main_finished and ! %mini_subscriptions) {
513 1 50       14 $subscriber->{complete}->() if defined $subscriber->{complete};
514             }
515             },
516 3         23 };
517 1         5 };
518              
519 1         5 my $own_subscription = RxPerl::Subscription->new;
520             my $own_subscriber = {
521             new_subscription => $own_subscription,
522             next => sub {
523 3         6 my ($v) = @_;
524              
525 3         8 local $_ = $v;
526 3         16 my $mini_obs = $delay_duration_selector->($v, $idx++);
527 3         11 $mini_obs->subscribe($make_mini_subscriber->($v));
528             },
529             error => sub {
530 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
531             },
532             complete => sub {
533 1         6 $main_finished = 1;
534 1 50       8 if (!%mini_subscriptions) {
535 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
536             }
537             },
538 1         12 };
539              
540 1         5 $subscriber->subscription->add($own_subscription, \%mini_subscriptions);
541              
542 1         6 $source->subscribe($own_subscriber);
543              
544 1         18 return;
545 1         5 });
546 1         7 };
547             }
548              
549             sub op_distinct {
550 2     2   6 my ($key_selector) = @_;
551              
552             return sub {
553 2     2   4 my ($source) = @_;
554              
555             return rx_observable->new(sub {
556 2         6 my ($subscriber) = @_;
557              
558 2         3 my %keys_passed;
559 2         7 $subscriber->subscription->add(sub { %keys_passed = () });
  2         9  
560              
561             my $own_subscriber = {
562             %$subscriber,
563             next => sub {
564 15         24 my ($v) = @_;
565              
566 15         19 my $k;
567 15 100       24 if ($key_selector) {
568 3         7 my $ok = eval { local $_ = $v; $k = $key_selector->($v); 1 };
  3         5  
  3         6  
  3         11  
569 3 50       10 if (! $ok) {
570 0 0       0 $subscriber->{error}->($@) if defined $subscriber->{error};
571 0         0 return;
572             }
573             } else {
574 12         16 $k = $v;
575             }
576 15 100       39 if (! exists $keys_passed{$k}) {
577 6         24 $keys_passed{$k} = 1;
578 6 50       18 $subscriber->{next}->($v) if defined $subscriber->{next};
579             }
580             },
581 2         13 };
582              
583 2         8 $source->subscribe($own_subscriber);
584              
585 2         29 return;
586 2         6 });
587 2         10 };
588             }
589              
590             sub op_distinct_until_changed {
591 4     4   14 my ($comparison_function) = @_;
592              
593 4   100     20 $comparison_function //= \&_eqq;
594              
595             return sub {
596 4     4   10 my ($source) = @_;
597              
598             return rx_observable->new(sub {
599 4         14 my ($subscriber) = @_;
600              
601 4         5 my $prev_value;
602 4         8 my $have_prev_value = 0;
603              
604             my $own_subscriber = {
605             %$subscriber,
606             next => sub {
607 27         55 my @value = @_;
608              
609 27 100 100     84 if (! $have_prev_value or ! $comparison_function->($prev_value, $value[0])) {
610 15 50       55 $subscriber->{next}->(@value) if defined $subscriber->{next};
611 15         27 $have_prev_value = 1;
612 15         39 $prev_value = $value[0];
613             }
614             },
615 4         23 };
616              
617 4         15 $source->subscribe($own_subscriber);
618              
619 4         22 return;
620 4         9 });
621 4         23 };
622             }
623              
624             sub op_distinct_until_key_changed {
625 1     1   4 my ($key) = @_;
626              
627             return op_distinct_until_changed(sub {
628 5     5   12 _eqq($_[0]->{$key}, $_[1]->{$key});
629 1         7 }),
630             }
631              
632             sub op_element_at {
633 2     2   7 my ($index, $default) = @_;
634 2         7 my $has_default = @_ >= 2;
635 2         5 $index = int $index;
636              
637             return sub {
638 2     2   6 my ($source) = @_;
639              
640 2 50       7 $index >= 0 or return rx_throw_error('ArgumentOutOfRangeError');
641              
642             return rx_observable->new(sub {
643 2         5 my ($subscriber) = @_;
644              
645 2         3 my $i = 0;
646             my $own_subscriber = {
647             %$subscriber,
648             next => sub {
649 6 100       30 if ($i++ == $index) {
650 1 50       7 $subscriber->{next}->(@_) if defined $subscriber->{next};
651 1 50       8 $subscriber->{complete}->() if defined $subscriber->{complete};
652             }
653             },
654             complete => sub {
655 1 50       4 if ($has_default) {
656 1 50       6 $subscriber->{next}->($default) if defined $subscriber->{next};
657 1 50       6 $subscriber->{complete}->() if defined $subscriber->{complete};
658             } else {
659 0 0       0 $subscriber->{error}->('ArgumentOutOfRangeError') if defined $subscriber->{error};
660             }
661             },
662 2         16 };
663              
664 2         9 $source->subscribe($own_subscriber);
665              
666 2         6 return;
667 2         7 });
668 2         12 };
669             }
670              
671             sub op_end_with {
672 0     0   0 my (@values) = @_;
673              
674             return sub {
675 0     0   0 my ($source) = @_;
676              
677 0         0 return rx_concat(
678             $source,
679             rx_of(@values),
680             );
681             }
682 0         0 }
683              
684             sub op_every {
685 2     2   5 my ($predicate) = @_;
686              
687             return sub {
688 2     2   8 my ($source) = @_;
689              
690             return rx_observable->new(sub {
691 2         5 my ($subscriber) = @_;
692              
693 2         5 my $idx = 0;
694             my $own_subscriber = {
695             %$subscriber,
696             next => sub {
697 8         23 my ($v) = @_;
698 8         14 local $_ = $v;
699 8 100       20 if (! $predicate->($v, $idx++)) {
700 1 50       9 $subscriber->{next}->(0) if defined $subscriber->{next};
701 1 50       14 $subscriber->{complete}->() if defined $subscriber->{complete};
702             }
703             },
704             complete => sub {
705 1 50       15 $subscriber->{next}->(1) if defined $subscriber->{next};
706 1 50       5 $subscriber->{complete}->() if defined $subscriber->{complete};
707             },
708 2         14 };
709              
710 2         9 $source->subscribe($own_subscriber);
711              
712 2         6 return;
713 2         5 });
714 2         12 };
715             }
716              
717             sub op_exhaust_all {
718             return sub {
719 1     1   9 my ($source) = @_;
720              
721             return rx_observable->new(sub {
722 1         6 my ($subscriber) = @_;
723              
724 1         3 my $active_subscription;
725             my $big_completed;
726 1         5 my $own_subscription = RxPerl::Subscription->new;
727              
728 1         7 $subscriber->subscription->add(
729             \$active_subscription,
730             $own_subscription,
731             );
732              
733             my $own_subscriber = {
734             new_subscription => $own_subscription,
735             next => sub {
736 6         15 my ($new_obs) = @_;
737              
738 6 100       56 !$active_subscription or return;
739 3         9 $active_subscription = RxPerl::Subscription->new;
740             my $small_subscriber = {
741             new_subscription => $active_subscription,
742             next => sub {
743 12 50       44 $subscriber->{next}->(@_) if defined $subscriber->{next};
744             },
745             error => sub {
746 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
747             },
748             complete => sub {
749 2         14 undef $active_subscription;
750 2 50 33     10 $subscriber->{complete}->() if $big_completed and defined $subscriber->{complete};
751             },
752 3         25 };
753 3         11 $new_obs->subscribe($small_subscriber);
754             },
755             error => sub {
756 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
757             },
758             complete => sub {
759 0         0 $big_completed = 1;
760 0 0 0     0 $subscriber->{complete}->() if !$active_subscription and defined $subscriber->{complete};
761             },
762 1         18 };
763              
764 1         5 $source->subscribe($own_subscriber);
765              
766 1         3 return;
767 1         6 });
768 1     1   6 };
769             }
770              
771             sub op_exhaust_map {
772 0     0   0 my ($observable_factory) = @_;
773              
774             return sub {
775 0     0   0 my ($source) = @_;
776              
777 0         0 return $source->pipe(
778             op_map($observable_factory),
779             op_exhaust_all(),
780             );
781 0         0 };
782             }
783              
784             sub op_filter {
785 9     9   21 my ($filtering_sub) = @_;
786              
787             return sub {
788 9     9   22 my ($source) = @_;
789              
790             return rx_observable->new(sub {
791 9         23 my ($subscriber) = @_;
792              
793 9         67 my $own_subscriber = { %$subscriber };
794 9         17 my $idx = 0;
795             $own_subscriber->{next} = sub {
796 75         145 my ($value) = @_;
797 75         115 my $passes = eval {
798 75         117 local $_ = $value;
799 75         169 $filtering_sub->($value, $idx++);
800             };
801 75 50       382 if (my $error = $@) {
802 0         0 $subscriber->{error}->($error);
803             } else {
804 75 100 66     312 $subscriber->{next}->(@_) if $passes and defined $subscriber->{next};
805             }
806 9         41 };
807              
808 9         58 $source->subscribe($own_subscriber);
809              
810 9         26 return;
811 9         17 });
812 9         40 };
813             }
814              
815             sub op_finalize {
816 8     8   14 my ($fn) = @_;
817              
818             return sub {
819 8     8   13 my ($source) = @_;
820              
821             return rx_observable->new(sub {
822 8         13 my ($subscriber) = @_;
823              
824 8   100     26 my $arr = $subscriber->{_subscription}{_finalize_cbs} //= [];
825 8         17 unshift @$arr, $fn;
826 8         22 $subscriber->{_subscription}->add( $arr );
827              
828 8         26 $source->subscribe($subscriber);
829              
830 8         14 return;
831 8         16 });
832 8         43 };
833             }
834              
835             sub op_find {
836 2     2   8 my ($predicate) = @_;
837              
838             return sub {
839 2     2   8 my ($source) = @_;
840              
841 2 50       7 $predicate or return rx_throw_error('missing predicate in op_find');
842              
843 2         8 return $source->pipe(
844             op_first($predicate),
845             op_default_if_empty(undef),
846             );
847 2         12 };
848             }
849              
850             sub op_find_index {
851 2     2   5 my ($predicate) = @_;
852              
853             return sub {
854 2     2   7 my ($source) = @_;
855              
856 2 50       7 $predicate or return rx_throw_error('missing predicate in op_find_index');
857              
858             return rx_observable->new(sub {
859 2         20 my ($subscriber) = @_;
860              
861 2         6 my $idx = 0;
862             my $own_subscriber = {
863             %$subscriber,
864             next => sub {
865 13         31 my ($val) = @_;
866 13         20 my $truth;
867 13         19 my $ok = eval {
868 13         19 local $_ = $val;
869 13         27 $truth = $predicate->($val, $idx++);
870 13         50 1
871             };
872 13 50       28 if (!$ok) {
873 0 0       0 $subscriber->{error}->($@) if defined $subscriber->{error};
874             }
875 13 100       44 if ($truth) {
876 1 50       10 $subscriber->{next}->($idx - 1) if defined $subscriber->{next};
877 1 50       16 $subscriber->{complete}->() if defined $subscriber->{complete};
878             }
879             },
880             complete => sub {
881 1 50       6 $subscriber->{next}->(-1) if defined $subscriber->{next};
882 1 50       6 $subscriber->{complete}->() if defined $subscriber->{complete};
883             },
884 2         18 };
885              
886 2         11 $source->subscribe($own_subscriber);
887              
888 2         5 return;
889 2         5 });
890 2         12 };
891             }
892              
893             sub op_first {
894 7     7   25 my ($condition) = @_;
895              
896             return sub {
897 7     7   18 my ($source) = @_;
898              
899 7         22 my @pipes = (op_take(1));
900 7 100       27 unshift @pipes, op_filter($condition) if defined $condition;
901              
902 7         63 return $source->pipe(@pipes);
903 7         42 };
904             }
905              
906             sub op_ignore_elements {
907             return sub {
908 68     68   124 my ($source) = @_;
909              
910             return rx_observable->new(sub {
911 76         135 my ($subscriber) = @_;
912              
913 76         322 my %own_subscriber = %$subscriber;
914 76         176 delete $own_subscriber{next};
915              
916 76         227 $source->subscribe(\%own_subscriber);
917              
918 76         188 return;
919 68         142 });
920             }
921 68     68   305 }
922              
923             sub op_is_empty {
924             return sub {
925 2     2   6 my ($source) = @_;
926              
927 2         6 return $source->pipe(
928             op_first(),
929             op_map_to(0),
930             op_default_if_empty(1),
931             );
932 2     2   11 };
933             }
934              
935             sub op_last {
936 3     3   8 my ($predicate, $default) = @_;
937 3         7 my $has_default = @_ >= 2;
938              
939             return sub {
940 3     3   6 my ($source) = @_;
941              
942             return rx_observable->new(sub {
943 3         6 my ($subscriber) = @_;
944              
945 3         6 my $last_val;
946             my $last_val_obtained;
947              
948 3         5 my $idx = 0;
949             my $own_subscriber = {
950             %$subscriber,
951             next => sub {
952 5         10 my ($v) = @_;
953              
954 5 100       23 if ($predicate) {
955 3         6 my $passes;
956 3         5 my $ok = eval { local $_ = $v; $passes = $predicate->($v, $idx++); 1 };
  3         6  
  3         9  
  3         18  
957 3 50       7 $ok or do {
958 0 0       0 $subscriber->{error}->($@) if defined $subscriber->{error};
959 0         0 return;
960             };
961 3 100       9 if ($passes) {
962 1         2 $last_val = $v;
963 1         10 $last_val_obtained = 1;
964             }
965             } else {
966 2         5 $last_val = $v;
967 2         6 $last_val_obtained = 1;
968             }
969             },
970             complete => sub {
971 3 100       11 if (! $last_val_obtained) {
972 1 50       3 if ($has_default) {
973 1 50       18 $subscriber->{next}->($default) if defined $subscriber->{next};
974 1 50       8 $subscriber->{complete}->() if defined $subscriber->{complete};
975             } else {
976 0         0 $subscriber->{error}->("no last value found");
977             }
978             } else {
979 2 50       12 $subscriber->{next}->($last_val) if defined $subscriber->{next};
980 2 50       10 $subscriber->{complete}->() if defined $subscriber->{complete};
981             }
982             },
983 3         26 };
984              
985 3         12 $source->subscribe($own_subscriber);
986              
987 3         7 return;
988 3         9 });
989 3         21 };
990             }
991              
992             sub op_map {
993 27     27   68 my ($mapping_sub) = @_;
994              
995             return sub {
996 27     27   52 my ($source) = @_;
997              
998             return rx_observable->new(sub {
999 18         57 my ($subscriber) = @_;
1000              
1001 18         68 my $own_subscriber = { %$subscriber };
1002 18         34 my $idx = 0;
1003             $own_subscriber->{next} = sub {
1004 94         187 my ($value) = @_;
1005 94         144 my $result = eval {
1006 94         147 local $_ = $value;
1007 94         220 $mapping_sub->($value, $idx++);
1008             };
1009 94 50       400 if (my $error = $@) {
1010 0 0       0 $subscriber->{error}->($error) if defined $subscriber->{error};
1011             } else {
1012 94 50       288 $subscriber->{next}->($result) if defined $subscriber->{next};
1013             }
1014 18         91 };
1015              
1016 18         65 $source->subscribe($own_subscriber);
1017              
1018 18         64 return;
1019 27         60 });
1020 27         138 };
1021             }
1022              
1023             sub op_map_to {
1024 2     2   6 my ($mapping_value) = @_;
1025              
1026             return sub {
1027 2     2   17 my ($source) = @_;
1028              
1029             return rx_observable->new(sub {
1030 2         9 my ($subscriber) = @_;
1031              
1032 2         11 my $own_subscriber = { %$subscriber };
1033             $own_subscriber->{next} &&= sub {
1034 1 50       18 $subscriber->{next}->($mapping_value) if defined $subscriber->{next};
1035 2   50     18 };
1036              
1037 2         7 $source->subscribe($own_subscriber);
1038              
1039 2         6 return;
1040 2         6 });
1041 2         9 };
1042             }
1043              
1044             sub op_max {
1045 0     0   0 my ($comparer) = @_;
1046              
1047             return sub {
1048 0     0   0 my ($source) = @_;
1049              
1050             return rx_observable->new(sub {
1051 0         0 my ($subscriber) = @_;
1052              
1053 0         0 my $curr_max;
1054             my $has_curr_max;
1055              
1056             my $own_subscriber = {
1057             %$subscriber,
1058             next => sub {
1059 0         0 my ($v) = @_;
1060              
1061 0 0       0 if (!$has_curr_max) {
1062 0         0 $curr_max = $v;
1063 0         0 $has_curr_max = 1;
1064             }
1065             else {
1066 0 0       0 if (!$comparer) {
1067 0 0       0 if ($v > $curr_max) {
1068 0         0 $curr_max = $v;
1069             }
1070             }
1071             else {
1072 0 0       0 if ($comparer->($v, $curr_max) > 0) {
1073 0         0 $curr_max = $v;
1074             }
1075             }
1076             }
1077             },
1078             complete => sub {
1079 0 0       0 if ($has_curr_max) {
1080 0 0       0 $subscriber->{next}->($curr_max) if defined $subscriber->{next};
1081             }
1082 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
1083             },
1084 0         0 };
1085              
1086 0         0 $source->subscribe($own_subscriber);
1087              
1088 0         0 return;
1089 0         0 });
1090 0         0 };
1091             }
1092              
1093             sub _op_merge_all_make_subscriber {
1094 11     11   23 my ($small_subscriptions, $subscriber, $stored_observables, $big_completed_ref) = @_;
1095              
1096 11         27 my $small_subscription = RxPerl::Subscription->new;
1097 11         36 $small_subscriptions->{$small_subscription} = $small_subscription;
1098             return {
1099             new_subscription => $small_subscription,
1100             next => sub {
1101 26 50   26   74 $subscriber->{next}->(@_) if defined $subscriber->{next};
1102             },
1103             error => sub {
1104 0 0   0   0 $subscriber->{error}->(@_) if defined $subscriber->{error};
1105             },
1106             complete => sub {
1107 9     9   27 delete $small_subscriptions->{$small_subscription};
1108 9 100 33     38 if (@$stored_observables) {
    50          
1109 8         15 my $new_obs = shift @$stored_observables;
1110 8         21 $new_obs->subscribe(
1111             _op_merge_all_make_subscriber(
1112             $small_subscriptions,
1113             $subscriber,
1114             $stored_observables,
1115             $big_completed_ref,
1116             ),
1117             );
1118             } elsif ($$big_completed_ref and !%$small_subscriptions) {
1119 1 50       6 $subscriber->{complete}->() if defined $subscriber->{complete};
1120             }
1121             },
1122 11         117 };
1123             }
1124              
1125             sub op_merge_all {
1126 3     3   12 my ($concurrent) = @_;
1127              
1128             return sub {
1129 3     3   19 my ($source) = @_;
1130              
1131             return rx_observable->new(sub {
1132 3         11 my ($subscriber) = @_;
1133              
1134 3         33 my @stored_observables;
1135             my %small_subscriptions;
1136 3         0 my $big_completed;
1137              
1138 3         12 my $own_subscription = RxPerl::Subscription->new;
1139 3         12 $subscriber->subscription->add(
1140             $own_subscription,
1141             \%small_subscriptions,
1142             );
1143              
1144             my $own_subscriber = {
1145             new_subscription => $own_subscription,
1146             next => sub {
1147 24         53 my ($new_observable) = @_;
1148              
1149 24         45 push @stored_observables, $new_observable;
1150 24 100 66     180 if (!defined $concurrent or keys(%small_subscriptions) < $concurrent) {
1151 3         6 my $new_obs = shift @stored_observables;
1152 3         11 $new_obs->subscribe(
1153             _op_merge_all_make_subscriber(
1154             \%small_subscriptions,
1155             $subscriber,
1156             \@stored_observables,
1157             \$big_completed,
1158             ),
1159             );
1160             }
1161             },
1162             error => sub {
1163 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
1164             },
1165             complete => sub {
1166 1         3 $big_completed = 1;
1167 1 50 33     5 $subscriber->{complete}->() if !%small_subscriptions and defined $subscriber->{complete};
1168             },
1169 3         48 };
1170              
1171 3         11 $source->subscribe($own_subscriber);
1172              
1173 3         18 return;
1174 3         9 });
1175 3         19 };
1176             }
1177              
1178             sub op_merge_map {
1179 0     0   0 my ($observable_factory) = @_;
1180              
1181             return sub {
1182 0     0   0 my ($source) = @_;
1183              
1184 0         0 return $source->pipe(
1185             op_map($observable_factory),
1186             op_merge_all(),
1187             );
1188 0         0 };
1189             }
1190              
1191             sub op_merge_with {
1192 1     1   4 my @other_sources = @_;
1193              
1194             return sub {
1195 1     1   6 my ($source) = @_;
1196              
1197 1         6 return rx_merge(
1198             $source,
1199             @other_sources,
1200             );
1201 1         8 };
1202             }
1203              
1204             sub op_min {
1205 0     0   0 my ($comparer) = @_;
1206              
1207             return sub {
1208 0     0   0 my ($source) = @_;
1209              
1210             return rx_observable->new(sub {
1211 0         0 my ($subscriber) = @_;
1212              
1213 0         0 my $curr_min;
1214             my $has_curr_min;
1215              
1216             my $own_subscriber = {
1217             %$subscriber,
1218             next => sub {
1219 0         0 my ($v) = @_;
1220              
1221 0 0       0 if (!$has_curr_min) {
1222 0         0 $curr_min = $v;
1223 0         0 $has_curr_min = 1;
1224             }
1225             else {
1226 0 0       0 if (!$comparer) {
1227 0 0       0 if ($v < $curr_min) {
1228 0         0 $curr_min = $v;
1229             }
1230             }
1231             else {
1232 0 0       0 if ($comparer->($v, $curr_min) < 0) {
1233 0         0 $curr_min = $v;
1234             }
1235             }
1236             }
1237             },
1238             complete => sub {
1239 0 0       0 if ($has_curr_min) {
1240 0 0       0 $subscriber->{next}->($curr_min) if defined $subscriber->{next};
1241             }
1242 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
1243             },
1244 0         0 };
1245              
1246 0         0 $source->subscribe($own_subscriber);
1247              
1248 0         0 return;
1249 0         0 });
1250 0         0 };
1251             }
1252              
1253             sub op_multicast {
1254 0     0   0 my ($subject_factory) = @_;
1255              
1256             return sub {
1257 0     0   0 my ($source) = @_;
1258              
1259 0         0 return RxPerl::ConnectableObservable->new($source, $subject_factory);
1260 0         0 };
1261             }
1262              
1263             sub op_on_error_resume_next_with {
1264 1     1   4 my @other_sources = @_;
1265              
1266             return sub {
1267 1     1   12 my ($source) = @_;
1268              
1269 1         5 return rx_on_error_resume_next(
1270             $source,
1271             @other_sources,
1272             );
1273 1         6 };
1274             }
1275              
1276             sub op_pairwise {
1277             return sub {
1278 0     0   0 my ($source) = @_;
1279              
1280             return rx_observable->new(sub {
1281 0         0 my ($subscriber) = @_;
1282              
1283 0         0 my $prev_value;
1284 0         0 my $have_prev_value = 0;
1285              
1286             my $own_subscriber = {
1287             %$subscriber,
1288             (
1289             next => sub {
1290 0         0 my ($value) = @_;
1291              
1292 0 0       0 if ($have_prev_value) {
1293 0 0       0 $subscriber->{next}->([$prev_value, $value]) if defined $subscriber->{next};
1294             } else {
1295 0         0 $have_prev_value = 1;
1296             }
1297              
1298 0         0 $prev_value = $value;
1299             }
1300             ) x!! defined $subscriber->{next},
1301 0         0 };
1302              
1303 0         0 $source->subscribe($own_subscriber);
1304              
1305 0         0 return;
1306 0         0 });
1307 0     0   0 };
1308             }
1309              
1310             sub op_pluck {
1311 1     1   4 my (@keys) = @_;
1312              
1313 1 50       5 croak 'List of properties cannot be empty,' unless @keys;
1314              
1315             return sub {
1316 1     1   2 my ($source) = @_;
1317              
1318             return rx_observable->new(sub {
1319 1         3 my ($subscriber) = @_;
1320              
1321             my $own_subscriber = {
1322             %$subscriber,
1323             next => sub {
1324 5         14 my (@value) = @_;
1325              
1326 5 50       13 if (! @value) {
1327 0 0       0 $subscriber->{next}->() if defined $subscriber->{next};
1328 0         0 return;
1329             }
1330              
1331 5         8 my $cursor = $value[0];
1332 5         9 foreach my $key (@keys) {
1333 7 100 100     37 if ((reftype($cursor) // '') eq 'HASH' and exists $cursor->{$key}) {
      100        
1334 4         11 $cursor = $cursor->{$key};
1335             } else {
1336 3 50       12 $subscriber->{next}->(undef) if defined $subscriber->{next};
1337 3         9 return;
1338             }
1339             }
1340              
1341 2 50       10 $subscriber->{next}->($cursor) if defined $subscriber->{next};
1342             },
1343 1         12 };
1344              
1345 1         5 $source->subscribe($own_subscriber);
1346 1         3 });
1347 1         6 };
1348             }
1349              
1350             sub op_race_with {
1351 1     1   5 my @other_sources = @_;
1352              
1353             return sub {
1354 1     1   3 my ($source) = @_;
1355              
1356 1         12 return rx_race(
1357             $source,
1358             @other_sources,
1359             );
1360 1         9 };
1361             }
1362              
1363             sub op_reduce {
1364 2     2   5 my ($accumulator, @seed) = @_;
1365              
1366             return sub {
1367 2     2   5 my ($source) = @_;
1368              
1369             return rx_observable->new(sub {
1370 2         5 my ($subscriber) = @_;
1371              
1372 2         5 my $got_first = @seed;
1373 2         3 my $acc;
1374 2 100       7 $acc = $seed[0] if $got_first;
1375              
1376 2         4 my $idx = 0;
1377             my $own_subscriber = {
1378             %$subscriber,
1379             next => sub {
1380 10         20 my ($v) = @_;
1381              
1382 10 100       25 if ($got_first) {
1383 9         16 my $ok = eval { $acc = $accumulator->($acc, $v, $idx++); 1 };
  9         24  
  9         34  
1384 9 50 33     36 $ok or $subscriber->{error}->($@) if defined $subscriber->{error};
1385             } else {
1386 1         3 $acc = $v;
1387 1         2 $got_first = 1;
1388             }
1389             },
1390             complete => sub {
1391 2 50 33     16 $subscriber->{next}->($acc) if $got_first and defined $subscriber->{next};
1392 2 50       10 $subscriber->{complete}->() if defined $subscriber->{complete};
1393             },
1394 2         17 };
1395              
1396 2         8 $source->subscribe($own_subscriber);
1397              
1398 2         6 return;
1399 2         5 });
1400 2         11 };
1401             }
1402              
1403             sub op_ref_count {
1404             return sub {
1405 0     0   0 my ($source) = @_;
1406              
1407 0 0       0 croak 'op_ref_count() was not applied to a connectable observable'
1408             unless $source->isa('RxPerl::ConnectableObservable');
1409              
1410 0         0 my $count = 0;
1411              
1412 0         0 my $connection_subscription;
1413             my $typical_unsubscription_fn = sub {
1414 0 0       0 if (--$count == 0) {
1415 0         0 $connection_subscription->unsubscribe;
1416             }
1417 0         0 };
1418              
1419             return rx_observable->new(sub {
1420 0         0 my ($subscriber) = @_;
1421              
1422 0         0 my $count_was = $count++;
1423              
1424 0 0       0 if ($count_was == 0) {
1425 0         0 $connection_subscription = RxPerl::Subscription->new;
1426              
1427 0         0 $subscriber->subscription->add($typical_unsubscription_fn);
1428 0         0 $source->subscribe($subscriber);
1429              
1430 0         0 $connection_subscription = $source->connect;
1431             } else {
1432 0         0 $subscriber->subscription->add($typical_unsubscription_fn);
1433 0         0 $source->subscribe($subscriber);
1434             }
1435              
1436 0         0 return;
1437 0         0 });
1438 0     0   0 };
1439             }
1440              
1441             sub _op_repeat_helper {
1442 8     8   18 my ($subscriber, $source, $count_ref, $own_subscription_ref) = @_;
1443              
1444 8         22 my $own_subscription = RxPerl::Subscription->new;
1445 8         14 $$own_subscription_ref = $own_subscription;
1446             my $own_subscriber = {
1447             new_subscription => $own_subscription,
1448             next => $subscriber->{next},
1449             error => $subscriber->{error},
1450             complete => sub {
1451 6 100   6   15 if (--$$count_ref) {
1452 5         13 _op_repeat_helper(
1453             $subscriber, $source, $count_ref, $own_subscription_ref,
1454             );
1455             } else {
1456 1 50       5 $subscriber->{complete}->() if defined $subscriber->{complete};
1457             }
1458             },
1459 8         41 };
1460              
1461 8         23 $source->subscribe($own_subscriber);
1462             }
1463              
1464             sub op_repeat {
1465 3     3   8 my ($count) = @_;
1466              
1467             return sub {
1468 3     3   7 my ($source) = @_;
1469              
1470             return rx_observable->new(sub {
1471 3         4 my ($subscriber) = @_;
1472              
1473 3         7 my $count = $count;
1474              
1475 3 50       8 $count = -1 if ! defined $count;
1476 3 50       8 if ($count == 0) {
1477 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
1478 0         0 return;
1479             }
1480              
1481 3         5 my $own_subscription;
1482 3         6 my $own_subscription_ref = \$own_subscription;
1483              
1484 3         9 $subscriber->subscription->add(
1485             $own_subscription_ref,
1486             );
1487              
1488 3         13 _op_repeat_helper(
1489             $subscriber, $source, \$count, $own_subscription_ref,
1490             );
1491              
1492 3         9 return;
1493 3         8 });
1494 3         23 };
1495             }
1496              
1497             sub _op_retry_helper {
1498 8     8   15 my ($subscriber, $source, $count_ref, $own_subscription_ref) = @_;
1499              
1500 8         20 my $own_subscription = RxPerl::Subscription->new;
1501 8         16 $$own_subscription_ref = $own_subscription;
1502             my $own_subscriber = {
1503             new_subscription => $own_subscription,
1504             next => $subscriber->{next},
1505             error => sub {
1506 7 100   7   20 if ($$count_ref--) {
1507 6         16 _op_retry_helper(
1508             $subscriber, $source, $count_ref, $own_subscription_ref,
1509             );
1510             } else {
1511 1 50       6 $subscriber->{error}->(@_) if defined $subscriber->{error};
1512             }
1513             },
1514             complete => $subscriber->{complete},
1515 8         38 };
1516              
1517 8         24 $source->subscribe($own_subscriber);
1518             }
1519              
1520             sub op_retry {
1521 2     2   6 my ($count) = @_;
1522              
1523             return sub {
1524 2     2   6 my ($source) = @_;
1525              
1526             return rx_observable->new(sub {
1527 2         5 my ($subscriber) = @_;
1528              
1529 2         3 my $count = $count;
1530              
1531 2 50       6 $count = -1 if ! defined $count;
1532              
1533 2         5 my $own_subscription;
1534 2         3 my $own_subscription_ref = \$own_subscription;
1535              
1536 2         9 $subscriber->subscription->add(
1537             $own_subscription_ref,
1538             );
1539              
1540 2         15 _op_retry_helper(
1541             $subscriber, $source, \$count, $own_subscription_ref,
1542             );
1543              
1544 2         4 return;
1545 2         6 });
1546 2         11 };
1547             }
1548              
1549             sub op_sample {
1550 0     0   0 my ($notifier) = @_;
1551              
1552             return sub {
1553 0     0   0 my ($source) = @_;
1554              
1555             return rx_observable->new(sub {
1556 0         0 my ($subscriber) = @_;
1557              
1558 0         0 my $last_val;
1559             my $has_last_val;
1560              
1561 0         0 my $notifier_subscription = RxPerl::Subscription->new;
1562             my $notifier_subscriber = {
1563             new_subscription => $notifier_subscription,
1564             next => sub {
1565 0 0       0 if ($has_last_val) {
1566 0 0       0 $subscriber->{next}->($last_val) if defined $subscriber->{next};
1567 0         0 undef $has_last_val;
1568             }
1569             },
1570             error => sub {
1571 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
1572             },
1573 0         0 };
1574              
1575 0         0 $subscriber->subscription->add($notifier_subscription);
1576              
1577             my $own_subscriber = {
1578             %$subscriber,
1579             next => sub {
1580 0         0 my ($v) = @_;
1581              
1582 0         0 $last_val = $v;
1583 0         0 $has_last_val = 1;
1584             },
1585 0         0 };
1586              
1587 0         0 $notifier->subscribe($notifier_subscriber);
1588 0         0 $source->subscribe($own_subscriber);
1589              
1590 0         0 return;
1591 0         0 });
1592 0         0 };
1593             }
1594              
1595             sub op_sample_time {
1596 0     0   0 my ($period) = @_;
1597              
1598 0         0 return op_sample(rx_interval($period));
1599             }
1600              
1601             sub op_scan {
1602 0     0   0 my ($accumulator_function, $seed) = @_;
1603 0         0 my $has_seed = @_ >= 2;
1604              
1605             return sub {
1606 0     0   0 my ($source) = @_;
1607              
1608             return rx_observable->new(sub {
1609 0         0 my ($subscriber) = @_;
1610              
1611 0         0 my $has_seed = $has_seed;
1612              
1613 0 0       0 my $acc; $acc = $seed if $has_seed;
  0         0  
1614 0         0 my $index = -1;
1615             my $own_subscriber = {
1616             %$subscriber,
1617             (
1618             next => sub {
1619 0         0 my ($value) = @_;
1620              
1621 0 0       0 if (! $has_seed) {
1622 0         0 $acc = $value;
1623 0         0 $has_seed = 1;
1624             } else {
1625 0         0 ++$index;
1626 0         0 $acc = $accumulator_function->($acc, $value, $index);
1627             }
1628              
1629 0 0       0 $subscriber->{next}->($acc) if defined $subscriber->{next};
1630             },
1631             ) x!! defined $subscriber->{next},
1632 0         0 };
1633              
1634 0         0 $source->subscribe($own_subscriber);
1635              
1636 0         0 return;
1637 0         0 });
1638 0         0 };
1639             }
1640              
1641             sub op_share {
1642             return (
1643 0     0   0 op_multicast(sub { rx_subject->new }),
  0     0   0  
1644             op_ref_count(),
1645             );
1646             }
1647              
1648             sub op_single {
1649 2     2   10 my ($predicate) = @_;
1650              
1651             return sub {
1652 2     2   9 my ($source) = @_;
1653              
1654             return rx_observable->new(sub {
1655 2         7 my ($subscriber) = @_;
1656              
1657 2         5 my @found;
1658              
1659 2         8 my $idx = 0;
1660             my $own_subscriber = {
1661             %$subscriber,
1662             next => sub {
1663 3         7 my ($v) = @_;
1664              
1665 3 100       14 if (!$predicate) {
1666 1         9 push @found, $v;
1667             } else {
1668 2         3 my $found;
1669 2         4 my $ok = eval { local $_ = $v; $found = $predicate->($v, $idx++); 1 };
  2         5  
  2         8  
  2         20  
1670 2 50       6 if (! $ok) {
1671 0 0       0 $subscriber->{error}->($@) if defined $subscriber->{error};
1672 0         0 return;
1673             }
1674 2 100       9 push @found, $v if $found;
1675             }
1676              
1677 3 50 33     12 $subscriber->{error}->('Too many values match') if @found > 1 and defined $subscriber->{error};
1678             },
1679             complete => sub {
1680 2 50       6 if (! @found) {
1681 0 0       0 $subscriber->{error}->('No values match') if defined $subscriber->{error};
1682             } else {
1683 2 50       11 $subscriber->{next}->($found[0]) if defined $subscriber->{next};
1684 2 50       9 $subscriber->{complete}->() if defined $subscriber->{complete};
1685             }
1686             },
1687 2         21 };
1688              
1689 2         8 $source->subscribe($own_subscriber);
1690              
1691 2         5 return;
1692 2         6 });
1693 2         11 };
1694             }
1695              
1696             sub op_skip {
1697 1     1   4 my ($count) = @_;
1698              
1699             return sub {
1700 1     1   5 my ($source) = @_;
1701              
1702             return rx_observable->new(sub {
1703 1         15 my ($subscriber) = @_;
1704              
1705 1         5 my $count = int $count;
1706              
1707 1         3 my $own_subscriber;
1708 1 50       41 $own_subscriber = $subscriber if $count <= 0;
1709             $own_subscriber //= {
1710             %$subscriber,
1711             next => sub {
1712 5 100       54 if ($count-- <= 0) {
1713 2 50       11 $subscriber->{next}->(@_) if defined $subscriber->{next};
1714             }
1715             },
1716 1   50     18 };
1717              
1718 1         8 $source->subscribe($own_subscriber);
1719              
1720 1         8 return;
1721 1         5 });
1722 1         8 };
1723             }
1724              
1725             sub op_skip_last {
1726 1     1   5 my ($skip_count) = @_;
1727              
1728             return sub {
1729 1     1   3 my ($source) = @_;
1730              
1731             return rx_observable->new(sub {
1732 1         3 my ($subscriber) = @_;
1733              
1734 1         3 my @skipped;
1735 1         5 $subscriber->subscription->add(sub { undef @skipped });
  1         13  
1736              
1737 1         5 my $own_subscriber = { %$subscriber };
1738             $own_subscriber->{next} &&= sub {
1739 4         19 my ($v) = @_;
1740              
1741 4         9 push @skipped, $v;
1742 4 100       14 if (@skipped > $skip_count) {
1743 2         6 my $new_v = shift @skipped;
1744 2 50       15 $subscriber->{next}->($new_v) if defined $subscriber->{next};
1745             }
1746 1   50     30 };
1747              
1748 1         6 $source->subscribe($own_subscriber);
1749              
1750 1         3 return;
1751 1         5 });
1752 1         7 };
1753             }
1754              
1755             sub op_skip_until {
1756 6     6   13 my ($notifier) = @_;
1757              
1758             # FUTURE TODO: allow notifier to be a promise
1759 6 50       16 croak q"You provided 'undef' where a stream was expected. You can provide an observable."
1760             unless defined $notifier;
1761 6 50 33     44 croak q"The notifier of 'op_skip_until' needs to be an observable."
1762             unless blessed $notifier and $notifier->isa('RxPerl::Observable');
1763              
1764             return sub {
1765 6     6   15 my ($source) = @_;
1766              
1767             return rx_observable->new(sub {
1768 6         12 my ($subscriber) = @_;
1769              
1770 6         10 my $notifier_has_emitted;
1771             my $n_s = $notifier->pipe(
1772             op_take(1),
1773             )->subscribe(
1774             sub {
1775 3         7 $notifier_has_emitted = 1;
1776             },
1777             sub {
1778 1 50       13 $subscriber->{error}->(@_) if defined $subscriber->{error};
1779             },
1780 6         15 );
1781              
1782             my $own_subscriber = {
1783             %$subscriber,
1784             next => sub {
1785             $subscriber->{next}->(@_) if defined $subscriber->{next}
1786 32 100 66     144 and $notifier_has_emitted;
1787             },
1788 6         45 };
1789              
1790 6         24 $source->subscribe($own_subscriber);
1791              
1792 6         19 return $n_s;
1793 6         14 });
1794 6         68 };
1795             }
1796              
1797             sub op_skip_while {
1798 1     1   8 my ($predicate) = @_;
1799              
1800             return sub {
1801 1     1   6 my ($source) = @_;
1802              
1803             return rx_observable->new(sub {
1804 1         30 my ($subscriber) = @_;
1805              
1806 1         3 my $finished_skipping = 0;
1807              
1808 1         10 my $idx = 0;
1809             my $own_subscriber = {
1810             %$subscriber,
1811             next => sub {
1812 6         17 my ($v) = @_;
1813              
1814 6         7 my $should_display;
1815 6 100       12 if ($finished_skipping) {
1816 3         9 $should_display = 1;
1817             } else {
1818 3         4 my $satisfies_predicate;
1819 3         6 my $ok = eval { local $_ = $v; $satisfies_predicate = $predicate->($v, $idx++); 1 };
  3         6  
  3         7  
  3         14  
1820 3 50       8 $ok or do {
1821 0 0       0 $subscriber->{error}->($@) if defined $subscriber->{error};
1822 0         0 return;
1823             };
1824 3 100       10 if (! $satisfies_predicate) {
1825 1         6 $finished_skipping = 1;
1826 1         4 $should_display = 1;
1827             }
1828             }
1829              
1830 6 100 66     31 $subscriber->{next}->(@_) if $should_display and defined $subscriber->{next};
1831             }
1832 1         11 };
1833              
1834 1         6 $source->subscribe($own_subscriber);
1835              
1836 1         12 return;
1837 1         6 });
1838 1         7 };
1839             }
1840              
1841             sub op_start_with {
1842 3     3   10 my (@values) = @_;
1843              
1844             return sub {
1845 3     3   9 my ($source) = @_;
1846              
1847 3         29 return rx_concat(
1848             rx_of(@values),
1849             $source,
1850             );
1851 3         28 };
1852             }
1853              
1854             sub op_switch_all {
1855             return sub {
1856 1     1   3 my ($source) = @_;
1857              
1858             return rx_observable->new(sub {
1859 1         3 my ($subscriber) = @_;
1860              
1861 1         4 my $old_subscription;
1862              
1863             my $chief_is_complete;
1864 1         0 my $sub_is_complete;
1865              
1866             my $obs_subscriber = {
1867             next => sub {
1868 6 50       28 $subscriber->{next}->(@_) if defined $subscriber->{next};
1869             },
1870             error => sub {
1871 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
1872             },
1873             complete => sub {
1874 0         0 $sub_is_complete = 1;
1875 0 0 0     0 $subscriber->{complete}->() if $chief_is_complete and defined $subscriber->{complete};
1876             },
1877 1         10 };
1878              
1879 1         5 my $own_subscription = RxPerl::Subscription->new;
1880 1         7 $subscriber->subscription->add(\$old_subscription, $own_subscription);
1881              
1882             my $own_subscriber = {
1883             new_subscription => $own_subscription,
1884             next => sub {
1885 3         7 my ($new_observable) = @_;
1886              
1887 3         5 $sub_is_complete = 0;
1888 3 100       12 $old_subscription->unsubscribe() if $old_subscription;
1889 3         9 $old_subscription = $new_observable->subscribe($obs_subscriber);
1890             },
1891             error => sub {
1892 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
1893             },
1894             complete => sub {
1895 1         12 $chief_is_complete = 1;
1896 1 50 33     6 $subscriber->{complete}->() if $sub_is_complete and defined $subscriber->{complete};
1897             },
1898 1         17 };
1899              
1900 1         4 $source->subscribe($own_subscriber);
1901              
1902 1         3 return;
1903 1         4 });
1904 1     1   6 };
1905             }
1906              
1907             sub op_switch_map {
1908 0     0   0 my ($observable_factory) = @_;
1909              
1910             return sub {
1911 0     0   0 my ($source) = @_;
1912              
1913 0         0 return $source->pipe(
1914             op_map($observable_factory),
1915             op_switch_all(),
1916             );
1917             }
1918 0         0 }
1919              
1920             sub op_take {
1921 86     86   161 my ($count) = @_;
1922              
1923 86 50       224 croak 'negative argument passed to op_take' unless $count >= 0;
1924              
1925             return sub {
1926 86     86   183 my ($source) = @_;
1927              
1928             return rx_observable->new(sub {
1929 70         148 my ($subscriber) = @_;
1930              
1931 70         174 my $remaining = int $count;
1932              
1933 70 100       178 if ($remaining == 0) {
1934 1 50       7 $subscriber->{complete}->() if defined $subscriber->{complete};
1935 1         3 return;
1936             }
1937              
1938             my $own_subscriber = {
1939             %$subscriber,
1940             next => sub {
1941 187 100       659 $subscriber->{next}->(@_) if defined $subscriber->{next};
1942 187 100 100     895 $subscriber->{complete}->() if --$remaining == 0 and defined $subscriber->{complete};
1943             },
1944 69         368 };
1945              
1946 69         236 $source->subscribe($own_subscriber);
1947              
1948 69         217 return;
1949 86         176 });
1950 86         389 };
1951             }
1952              
1953             sub op_take_last {
1954 2     2   6 my ($count) = @_;
1955              
1956             return sub {
1957 2     2   5 my ($source) = @_;
1958              
1959             return rx_observable->new(sub {
1960 2         6 my ($subscriber) = @_;
1961              
1962 2         3 my @last_values;
1963              
1964             my $own_subscriber = {
1965             %$subscriber,
1966             next => sub {
1967 7         12 my ($v) = @_;
1968              
1969 7         10 push @last_values, $v;
1970 7 100       22 if (@last_values > $count) {
1971 2         7 shift @last_values;
1972             }
1973             },
1974             complete => sub {
1975 2         5 foreach my $last_val (@last_values) {
1976 5 50       17 $subscriber->{next}->($last_val) if defined $subscriber->{next};
1977             }
1978 2 50       12 $subscriber->{complete}->() if defined $subscriber->{complete};
1979             },
1980 2         15 };
1981              
1982 2         13 $source->subscribe($own_subscriber);
1983              
1984 2         13 return;
1985 2         5 });
1986             }
1987 2         11 }
1988              
1989             sub op_take_until {
1990 1     1   28 my ($notifier_observable) = @_;
1991              
1992             return sub {
1993 1     1   5 my ($source) = @_;
1994              
1995             return rx_observable->new(sub {
1996 1         3 my ($subscriber) = @_;
1997              
1998             my $n_s = $notifier_observable->subscribe(
1999             sub {
2000 1 50       11 $subscriber->{complete}->() if defined $subscriber->{complete};
2001             },
2002             sub {
2003 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
2004             },
2005 1         7 );
2006              
2007 1         15 $source->subscribe($subscriber);
2008              
2009 1         12 return $n_s;
2010 1         4 });
2011 1         7 };
2012             }
2013              
2014             sub op_take_while {
2015 2     2   6 my ($cond, $include) = @_;
2016              
2017             return sub {
2018 2     2   4 my ($source) = @_;
2019              
2020             return rx_observable->new(sub {
2021 2         9 my ($subscriber) = @_;
2022              
2023 2         5 my $i = 0;
2024             my $own_subscriber = {
2025             %$subscriber,
2026             next => sub {
2027 6         28 my ($value) = @_;
2028              
2029 6 100       9 if (! do { local $_ = $value; $cond->($value, $i++) }) {
  6         12  
  6         18  
2030 2 100 66     20 $subscriber->{next}->($value) if $include and defined $subscriber->{next};
2031 2 50       10 $subscriber->{complete}->() if defined $subscriber->{complete};
2032 2         19 return;
2033             }
2034              
2035 4 50       33 $subscriber->{next}->(@_) if defined $subscriber->{next};
2036             },
2037 2         14 };
2038              
2039 2         9 $source->subscribe($own_subscriber);
2040              
2041 2         5 return;
2042 2         6 });
2043 2         11 };
2044             }
2045              
2046             sub op_tap {
2047 0     0   0 my @args = @_;
2048              
2049             return sub {
2050 0     0   0 my ($source) = @_;
2051              
2052             return rx_observable->new(sub {
2053 0         0 my ($subscriber) = @_;
2054              
2055 0         0 my @args = @args;
2056 0 0 0     0 my $tap_subscriber; $tap_subscriber = $args[0] if (reftype($args[0]) // '') eq 'HASH';
  0         0  
2057             $tap_subscriber //= {
2058 0   0     0 map {($_, shift @args)} qw/ next error complete /
  0         0  
2059             };
2060              
2061 0         0 my %own_keys = map {$_ => 1} grep { /^(next|error|complete)\z/ } (keys(%$tap_subscriber), keys(%$subscriber));
  0         0  
  0         0  
2062              
2063             my $own_subscriber = {
2064             %$subscriber,
2065             map {
2066 0         0 my $key = $_;
  0         0  
2067             ($key => sub {
2068 0 0       0 $tap_subscriber->{$key}->(@_) if defined $tap_subscriber->{$key};
2069 0 0       0 $subscriber->{$key}->(@_) if defined $subscriber->{$key};
2070 0         0 });
2071             } keys %own_keys
2072             };
2073              
2074 0         0 $source->subscribe($own_subscriber);
2075              
2076 0         0 return;
2077 0         0 });
2078 0         0 };
2079             }
2080              
2081             sub op_throttle {
2082 0     0   0 my ($duration_selector) = @_;
2083              
2084             return sub {
2085 0     0   0 my ($source) = @_;
2086              
2087             return rx_observable->new(sub {
2088 0         0 my ($subscriber) = @_;
2089              
2090 0         0 my $mini_subscription;
2091              
2092             my $mini_subscriber = {
2093             error => sub {
2094 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
2095             },
2096             complete => sub {
2097 0         0 undef $mini_subscription;
2098             },
2099 0         0 };
2100              
2101             my $own_subscriber = {
2102             %$subscriber,
2103             next => sub {
2104 0         0 my ($v) = @_;
2105              
2106 0 0       0 if (! $mini_subscription) {
2107 0 0       0 $subscriber->{next}->(@_) if defined $subscriber->{next};
2108 0         0 $mini_subscription = do { local $_ = $v; $duration_selector->($v) }->pipe(
  0         0  
  0         0  
2109             op_take(1),
2110             )->subscribe($mini_subscriber);
2111             }
2112             },
2113 0         0 };
2114              
2115 0         0 $subscriber->subscription->add(\$mini_subscription);
2116              
2117 0         0 $source->subscribe($own_subscriber);
2118              
2119 0         0 return;
2120 0         0 });
2121 0         0 };
2122             }
2123              
2124             sub op_throttle_time {
2125 0     0   0 my ($duration) = @_;
2126              
2127 0     0   0 return op_throttle(sub { rx_timer($duration) });
  0         0  
2128             }
2129              
2130             sub op_throw_if_empty {
2131 0     0   0 my ($error_factory) = @_;
2132              
2133             return sub {
2134 0     0   0 my ($source) = @_;
2135              
2136             return rx_observable->new(sub {
2137 0         0 my ($subscriber) = @_;
2138              
2139 0         0 my $is_empty = 1;
2140              
2141             my $own_subscriber = {
2142             %$subscriber,
2143             next => sub {
2144 0         0 $is_empty = 0;
2145 0 0       0 $subscriber->{next}->(@_) if defined $subscriber->{next};
2146             },
2147             complete => sub {
2148 0 0       0 if ($is_empty) {
2149 0 0       0 $subscriber->{error}->($error_factory->()) if defined $subscriber->{error};
2150             } else {
2151 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
2152             }
2153             },
2154 0         0 };
2155              
2156 0         0 $source->subscribe($own_subscriber);
2157              
2158 0         0 return;
2159 0         0 });
2160 0         0 };
2161             }
2162              
2163             sub op_time_interval {
2164             return sub {
2165 0     0   0 my ($source) = @_;
2166              
2167 0         0 my $t0 = Time::HiRes::time();
2168              
2169             return $source->pipe(
2170             op_map(sub {
2171 0         0 my $now = Time::HiRes::time();
2172 0         0 my $interval = $now - $t0;
2173 0         0 $t0 = $now;
2174 0         0 return { value => $_, interval => $interval };
2175 0         0 }),
2176             );
2177 0     0   0 };
2178             }
2179              
2180             sub op_timeout {
2181 0     0   0 my ($duration) = @_;
2182              
2183             return sub {
2184 0     0   0 my ($source) = @_;
2185              
2186             return rx_observable->new(sub {
2187 0         0 my ($subscriber) = @_;
2188              
2189 0         0 my $subject = rx_behavior_subject->new(1);
2190             my $s_s = $subject->pipe(
2191 0         0 op_switch_map(sub { rx_timer($duration) }),
2192             )->subscribe(sub {
2193 0 0       0 $subscriber->{error}->('Timeout has occurred') if defined $subscriber->{error};
2194 0         0 });
2195              
2196 0         0 my $own_subscription = RxPerl::Subscription->new;
2197 0         0 $subscriber->subscription->add($own_subscription, $s_s);
2198              
2199             my $own_subscriber = {
2200             new_subscription => $own_subscription,
2201             next => sub {
2202 0 0       0 $subject->{next}->(1) if defined $subject->{next};
2203 0 0       0 $subscriber->{next}->(@_) if defined $subscriber->{next};
2204             },
2205             error => sub {
2206 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
2207             },
2208             complete => sub {
2209 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
2210             }
2211 0         0 };
2212              
2213 0         0 $source->subscribe($own_subscriber);
2214              
2215 0         0 return;
2216 0         0 });
2217 0         0 };
2218             }
2219              
2220             sub op_timestamp {
2221             return op_map(sub {
2222             return {
2223 0     0   0 value => $_,
2224             timestamp => Time::HiRes::time(),
2225             };
2226 0     0   0 });
2227             }
2228              
2229             sub op_to_array {
2230             return sub {
2231 0     0   0 my ($source) = @_;
2232              
2233             return rx_observable->new(sub {
2234 0         0 my ($subscriber) = @_;
2235              
2236 0         0 my @values;
2237              
2238             my $own_subscriber = {
2239             %$subscriber,
2240             next => sub {
2241 0         0 my ($v) = @_;
2242 0         0 push @values, $v;
2243             },
2244             complete => sub {
2245 0 0       0 $subscriber->{next}->(\@values) if defined $subscriber->{next};
2246 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
2247             },
2248 0         0 };
2249              
2250 0         0 $source->subscribe($own_subscriber);
2251              
2252 0         0 return;
2253 0         0 });
2254 0     0   0 };
2255             }
2256              
2257             sub op_with_latest_from {
2258 1     1   18 my (@other_observables) = @_;
2259              
2260             return sub {
2261 1     1   3 my ($source) = @_;
2262              
2263             return rx_observable->new(sub {
2264 1         3 my ($subscriber) = @_;
2265              
2266 1         2 my @other_observables = @other_observables;
2267 1         4 my $i = 0;
2268 1         3 my %didnt_emit = map {($i++, 1)} @other_observables;
  1         4  
2269 1         7 my @latest_values;
2270             my %other_subscriptions;
2271              
2272             $subscriber->subscription->add(
2273 1         4 \%other_subscriptions, sub { undef @other_observables },
2274 1         5 );
2275              
2276 1         8 for (my $i = 0; $i < @other_observables; $i++) {
2277 1         2 my $j = $i;
2278 1         3 my $other_observable = $other_observables[$j];
2279              
2280 1         4 my $other_subscription = RxPerl::Subscription->new;
2281 1         3 $other_subscriptions{$other_subscription} = $other_subscription;
2282             $other_observable->subscribe({
2283             new_subscription => $other_subscription,
2284             next => sub {
2285 4         27 my ($value) = @_;
2286              
2287 4         14 $latest_values[$j] = $value;
2288 4         9 delete $didnt_emit{$j};
2289             },
2290             error => $subscriber->{error},
2291 1         9 });
2292             }
2293              
2294             $source->subscribe({
2295             %$subscriber,
2296             next => sub {
2297 5         14 my ($value) = @_;
2298              
2299 5 100       14 if (! %didnt_emit) {
2300 4 50       26 $subscriber->{next}->([$value, @latest_values]) if defined $subscriber->{next};
2301             }
2302             },
2303 1         21 });
2304              
2305 1         5 return;
2306 1         4 });
2307 1         7 };
2308             }
2309              
2310             sub op_zip_with {
2311 1     1   6 my @other_sources = @_;
2312              
2313             return sub {
2314 1     1   12 my ($source) = @_;
2315              
2316 1         7 return rx_zip(
2317             $source,
2318             @other_sources,
2319             );
2320 1         7 };
2321             }
2322              
2323             sub _eqq {
2324 20     20   35 my ($x, $y) = @_;
2325              
2326 20 100       48 defined $x or return !defined $y;
2327 17 100       35 defined $y or return !!0;
2328 15 100       33 ref $x eq ref $y or return !!0;
2329 14 100       78 return length(ref $x) ? refaddr $x == refaddr $y : $x eq $y;
2330             }
2331              
2332             1;