File Coverage

blib/lib/Piper/Instance.pm
Criterion Covered Total %
statement 208 209 99.5
branch 76 82 92.6
condition 27 29 93.1
subroutine 45 45 100.0
pod 14 17 82.3
total 370 382 96.8


line stmt bran cond sub pod time code
1             #####################################################################
2             ## AUTHOR: Mary Ehlers, regina.verbae@gmail.com
3             ## ABSTRACT: An initialized pipeline segment for the Piper system
4             #####################################################################
5              
6             package Piper::Instance;
7              
8 4     4   26 use v5.10;
  4         9  
9 4     4   13 use strict;
  4         2  
  4         54  
10 4     4   10 use warnings;
  4         4  
  4         87  
11              
12 4     4   1405 use List::AllUtils qw(any last_value max part sum);
  4         36257  
  4         337  
13 4     4   24 use List::UtilsBy qw(max_by min_by);
  4         5  
  4         189  
14 4     4   1370 use Piper::Path;
  4         12  
  4         124  
15 4     4   24 use Scalar::Util qw(weaken);
  4         5  
  4         204  
16 4     4   16 use Types::Standard qw(ArrayRef ConsumerOf Enum HashRef InstanceOf Tuple slurpy);
  4         5  
  4         33  
17              
18 4     4   3063 use Moo;
  4         6  
  4         18  
19 4     4   1024 use namespace::clean;
  4         5  
  4         27  
20              
21             with qw(Piper::Role::Queue);
22              
23             use overload (
24 3274     3274   106498 q{""} => sub { $_[0]->path },
25 4         25 fallback => 1,
26 4     4   1048 );
  4         8  
27              
28             our $VERSION = '0.03'; # from Piper-0.03.tar.gz
29              
30             #pod =head1 ATTRIBUTES
31             #pod
32             #pod =head2 batch_size
33             #pod
34             #pod The number of items to process at a time for this segment.
35             #pod
36             #pod If not set, inherits the C of any existing parent(s). If the segment has no parents, or if none of its parents have a C defined, the default C will be used. The default is 200, but this can be configured at import of L.
37             #pod
38             #pod To clear a previously-set C, simply set it to C or use the C method.
39             #pod
40             #pod $segment->batch_size(undef);
41             #pod $segment->clear_batch_size;
42             #pod
43             #pod =cut
44              
45             # around below to set up inheritance through parents
46              
47             #pod =head2 children
48             #pod
49             #pod For container instances (made from L objects, not L objects), the C attribute holds an arrayref of the contained instance objects.
50             #pod
51             #pod =cut
52              
53             has children => (
54             is => 'ro',
55             # Must contain at least one child
56             isa => Tuple[InstanceOf['Piper::Instance'],
57             slurpy ArrayRef[InstanceOf['Piper::Instance']]
58             ],
59             required => 0,
60             predicate => 1,
61             );
62              
63             #pod =head2 debug
64             #pod
65             #pod Debug level for this segment. When accessing, inherits the debug level of any existing parent(s) if not explicitly set for this segment. The default level is 0, but can be globally overridden with the environment variable C.
66             #pod
67             #pod To clear a previously-set debug level for a segment, simply set it to C or use the C method.
68             #pod
69             #pod $segment->debug(undef);
70             #pod $segment->clear_debug;
71             #pod
72             #pod =cut
73              
74             # around below to set up inheritance through parents
75              
76             #pod =head2 enabled
77             #pod
78             #pod A boolean indicating that the segment is enabled and can accept items for processing. Inherits this attribute from any existing parent(s) with a default of true.
79             #pod
80             #pod To clear a previously-set enabled attribute, simply set it to C or use the C method.
81             #pod
82             #pod $segment->enabled(undef);
83             #pod $segment->clear_enabled;
84             #pod
85             #pod =cut
86              
87             # around below to set up inheritance through parents
88              
89             #pod =head2 main
90             #pod
91             #pod Holds a reference to the outermost container instance for the pipeline.
92             #pod
93             #pod =cut
94              
95             has main => (
96             is => 'lazy',
97             isa => InstanceOf['Piper::Instance'],
98             weak_ref => 1,
99             builder => sub {
100 39     39   2363 my ($self) = @_;
101 39         48 my $parent = $self;
102 39         125 while ($parent->has_parent) {
103 35         73 $parent = $parent->parent;
104             }
105 39         488 return $parent;
106             },
107             );
108              
109             #pod =head2 parent
110             #pod
111             #pod Unless this segment is the outermost container (C
), this attribute holds a reference to the segment's immediate container.
112             #pod
113             #pod =cut
114              
115             has parent => (
116             is => 'rwp',
117             isa => InstanceOf['Piper::Instance'],
118             # Setting a parent will introduce a self-reference
119             weak_ref => 1,
120             required => 0,
121             predicate => 1,
122             );
123              
124             #pod =head2 path
125             #pod
126             #pod The full path to this segment, built as the concatenation of all the parent(s) labels and the segment's label, joined by C. L objects stringify to this attribute.
127             #pod
128             #pod =cut
129              
130             has path => (
131             is => 'lazy',
132             isa => InstanceOf['Piper::Path'],
133             builder => sub {
134 35     35   5063 my ($self) = @_;
135              
136 35 100       498 return $self->has_parent
137             ? $self->parent->path->child($self->label)
138             : Piper::Path->new($self->label);
139             },
140             );
141              
142             #pod =head2 verbose
143             #pod
144             #pod Verbosity level for this segment. When accessing, inherits verbosity level of any existing parent(s) if not explicitly set for this segment.
145             #pod
146             #pod To clear a previously-set verbosity level for a segment, simply set it to C or use the C method.
147             #pod
148             #pod $segment->verbose(undef);
149             #pod $segment->clear_verbose;
150             #pod
151             #pod =cut
152              
153             # Inherit parent settings
154             for my $attr (qw(batch_size debug enabled verbose)) {
155             my $clear = "clear_$attr";
156             my $has = "has_$attr";
157              
158             around $attr => sub {
159             my ($orig, $self) = splice @_, 0, 2;
160              
161             state $default = {
162             batch_size => $self->main->config->batch_size,
163             debug => 0,
164             enabled => 1,
165             verbose => 0,
166             };
167              
168             if (@_) {
169             return $self->$clear() if !defined $_[0];
170             return $self->$orig(@_);
171             }
172             else {
173             return $self->$has()
174             ? $self->$orig()
175             : $self->has_parent
176             ? $self->parent->$attr()
177             : $default->{$attr};
178             }
179             };
180             }
181              
182             #pod =head1 METHODS
183             #pod
184             #pod Methods marked with a (*) should only be called from the outermost instance.
185             #pod
186             #pod =head2 clear_batch_size
187             #pod
188             #pod =head2 clear_debug
189             #pod
190             #pod =head2 clear_enabled
191             #pod
192             #pod =head2 clear_verbose
193             #pod
194             #pod Methods for clearing the corresponding attribute.
195             #pod
196             #pod =head2 has_children
197             #pod
198             #pod A boolean indicating whether the instance has any children (contained instances). Will be true for all segments initialized from a L object and false for all segments initialized from a L object.
199             #pod
200             #pod =head2 has_parent
201             #pod
202             #pod A boolean indicating whether the instance has a parent (container instance). Will be true for all segments except the outermost segment (C
).
203             #pod
204             #pod =head2 has_pending
205             #pod
206             #pod Returns a boolean indicating whether there are any items that are queued at some level of the segment but have not completed processing.
207             #pod
208             #pod =cut
209              
210             sub has_pending {
211 398     398 1 3985 my ($self) = @_;
212              
213             return $self->has_children
214 398 100   267   3342 ? any { $_->has_pending } @{$self->children}
  267         332  
  208         457  
215             : $self->queue->ready;
216             }
217              
218             #pod =head2 *dequeue([$num])
219             #pod
220             #pod Remove at most C<$num> S<(default 1)> processed items from the end of the segment.
221             #pod
222             #pod =head2 *enqueue(@data)
223             #pod
224             #pod Queue C<@data> for processing by the pipeline.
225             #pod
226             #pod =cut
227              
228             around enqueue => sub {
229             my ($orig, $self, @args) = @_;
230              
231             if (!$self->enabled) {
232             # Bypass - go straight to drain
233             $self->INFO('Skipping disabled process', @args);
234             $self->drain->enqueue(@args);
235             return;
236             }
237              
238             my @items;
239             if ($self->has_allow) {
240             my ($skip, $queue) = part {
241             $self->allow->($_) ? 1 : 0
242             } @args;
243              
244             @items = @$queue if defined $queue;
245              
246             if (defined $skip) {
247             $self->INFO('Disallowed items emitted to next handler', @$skip);
248             $self->drain->enqueue(@$skip);
249             }
250             }
251             else {
252             @items = @args;
253             }
254              
255             return unless @items;
256              
257             $self->INFO('Queueing items', @items);
258             $self->$orig(@items);
259             };
260              
261             #pod =head2 find_segment($location)
262             #pod
263             #pod Find and return the segment instance according to <$location>, which can be a label or a path-like hierarchy of labels.
264             #pod
265             #pod For example, in the following pipeline, a few possible C<$location> values include C, C, or C
.
266             #pod
267             #pod my $pipe = Piper->new(
268             #pod { label => 'main' },
269             #pod subpipe => Piper->new(
270             #pod a => sub { ... },
271             #pod b => sub { ... },
272             #pod c => sub { ... },
273             #pod ),
274             #pod )->init;
275             #pod
276             #pod If a label is unique within the pipeline, no path is required. For non-unique labels, searches are performed in a nearest-neighbor, depth-first manner.
277             #pod
278             #pod For example, in the following pipeline, searching for C from C would find C
, not C
. So to reach C
from C, the appropriate search would be for C
.
279             #pod
280             #pod my $pipe = Piper->new(
281             #pod { label => 'main' },
282             #pod pipeA => Piper->new(
283             #pod processA => sub { ... },
284             #pod processB => sub { ... },
285             #pod ),
286             #pod processA => sub { ... },
287             #pod );
288             #pod
289             #pod =cut
290              
291             sub find_segment {
292 192     192 1 45996 my ($self, $location) = @_;
293            
294 192         185 state $global_cache = {};
295 192   100     3548 $global_cache->{$self->main->id}{$self->path} //= {};
296 192         2470 my $cache = $global_cache->{$self->main->id}{$self->path};
297              
298 192 100       459 unless (exists $cache->{$location}) {
299 161         2142 $location = Piper::Path->new($location);
300 161 100 100     6433 if ($self->has_children or $self->has_parent) {
301 159 100       348 my $parent = $self->has_children ? $self : $self->parent;
302 159         236 my $segment = $parent->descendant($location);
303 159   100     481 while (!defined $segment and $parent->has_parent) {
304 88         94 my $referrer = $parent;
305 88         93 $parent = $parent->parent;
306 88         123 $segment = $parent->descendant($location, $referrer);
307             }
308 159         346 $cache->{$location} = $segment;
309             }
310             else {
311             # Lonely Process (no parents or children)
312 2 100       6 $cache->{$location} = "$self" eq "$location" ? $self : undef;
313             }
314 161 100       265 weaken($cache->{$location}) if defined $cache->{$location};
315             }
316              
317 192 100       348 $self->DEBUG("Found label $location: '$cache->{$location}'") if defined $cache->{$location};
318 192         4777 return $cache->{$location};
319             }
320              
321             #pod =head2 *flush
322             #pod
323             #pod Process batches until there are no more items pending.
324             #pod
325             #pod =cut
326              
327             sub flush {
328 2     2 0 5 my ($self) = @_;
329              
330 2         8 while ($self->has_pending) {
331 62         121 $self->process_batch;
332             }
333             }
334              
335             #pod =head2 *is_exhausted
336             #pod
337             #pod Returns a boolean indicating whether there are any items left to process or dequeue.
338             #pod
339             #pod =cut
340              
341             sub is_exhausted {
342 24     24 0 5207 my ($self) = @_;
343            
344 24 100       42 return $self->prepare ? 0 : 1;
345             }
346              
347             #pod =head2 *isnt_exhausted
348             #pod
349             #pod Returns the opposite of C.
350             #pod
351             #pod =cut
352              
353             sub isnt_exhausted {
354 15     15 0 18 my ($self) = @_;
355 15         26 return !$self->is_exhausted;
356             }
357              
358             #pod =head2 next_segment
359             #pod
360             #pod Returns the next adjacent segment from the calling segment. Returns C for the outermost container.
361             #pod
362             #pod =cut
363              
364             sub next_segment {
365 12     12 1 13 my ($self) = @_;
366 12 50       26 return unless $self->has_parent;
367 12         160 return $self->parent->follower->{$self};
368             }
369              
370             #pod =head2 pending
371             #pod
372             #pod Returns the number of items that are queued at some level of the segment but have not completed processing.
373             #pod
374             #pod =cut
375              
376             sub pending {
377 746     746 1 2425 my ($self) = @_;
378 746 100       850 if ($self->has_children) {
379 14         17 return sum(map { $_->pending } @{$self->children});
  24         41  
  14         32  
380             }
381             else {
382 732         9509 return $self->queue->ready;
383             }
384             }
385              
386             #pod =head2 *prepare([$num])
387             #pod
388             #pod Process batches while data is still C until at least C<$num> S<(default 1)> items are C for C.
389             #pod
390             #pod =cut
391              
392             sub prepare {
393 28     28 1 36 my ($self, $num) = @_;
394 28   100     92 $num //= 1;
395              
396 28   66     51 while ($self->has_pending and $self->ready < $num) {
397 26         68 $self->process_batch;
398             }
399 28         376 return $self->ready;
400             }
401              
402             #pod =head2 ready
403             #pod
404             #pod Returns the number of items that have finished processing and are ready for C from the segment.
405             #pod
406             #pod =cut
407              
408             #pod =head1 FLOW CONTROL METHODS
409             #pod
410             #pod These methods are available for use within process handler subroutines (see L).
411             #pod
412             #pod =head2 eject(@data)
413             #pod
414             #pod If the segment has a parent, send C<@data> to the drain of its parent. Otherwise, enqueues C<@data> to the segment's drain.
415             #pod
416             #pod =cut
417              
418             sub eject {
419 5     5 1 4867 my $self = shift;
420 5 100       19 if ($self->has_parent) {
421 4         18 $self->INFO('Ejecting to drain of parent ('.$self->parent.')', @_);
422 4         148 $self->parent->drain->enqueue(@_);
423             }
424             else {
425 1         26 $self->INFO('Ejecting to drain', @_);
426 1         39 $self->drain->enqueue(@_);
427             }
428             }
429              
430             #pod =head2 emit(@data)
431             #pod
432             #pod Send C<@data> to the next segment in the pipeline. If the segment is the last in the pipeline, emits to the drain, making the C<@data> ready for C.
433             #pod
434             #pod =cut
435              
436             sub emit {
437 130     130 1 4954 my $self = shift;
438 130         1943 $self->INFO('Emitting', @_);
439             # Just collect in the drain
440 130         4521 $self->drain->enqueue(@_);
441             }
442              
443             #pod =head2 inject(@data)
444             #pod
445             #pod If the segment has a parent, enqueues C<@data> to its parent. Otherwise, enqueues <@data> to itself.
446             #pod
447             #pod =cut
448              
449             sub inject {
450 5     5 1 6465 my $self = shift;
451              
452 5 100       22 if ($self->has_parent) {
453 4         18 $self->INFO('Injecting to parent ('.$self->parent.')', @_);
454 4         158 $self->parent->enqueue(@_);
455             }
456             else {
457 1         6 $self->INFO('Injecting to self ('.$self.')', @_);
458 1         41 $self->enqueue(@_);
459             }
460             }
461              
462             #pod =head2 injectAfter($location, @data)
463             #pod
464             #pod Send C<@data> to the segment I the specified C<$location>. See L|/find_segment($location)> for a detailed description of C<$location>.
465             #pod
466             #pod =cut
467              
468             sub injectAfter {
469 5     5 1 4400 my $self = shift;
470 5         8 my $location = shift;
471 5         9 my $segment = $self->find_segment($location);
472 5 100       49 $self->ERROR("Could not find $location to injectAfter", @_)
473             if !defined $segment;
474 3         53 $self->INFO("Injecting to $location", @_);
475 3         111 $segment->drain->enqueue(@_);
476             }
477              
478             #pod =head2 injectAt($location, @data)
479             #pod
480             #pod Send C<@data> to the segment I the specified C<$location>. See L|/find_segment($location)> for a detailed description of C<$location>.
481             #pod
482             #pod =cut
483              
484             sub injectAt {
485 5     5 1 3714 my $self = shift;
486 5         6 my $location = shift;
487 5         15 my $segment = $self->find_segment($location);
488 5 100       54 $self->ERROR("Could not find $location to injectAt", @_)
489             if !defined $segment;
490 3         54 $self->INFO("Injecting to $location", @_);
491 3         114 $segment->enqueue(@_);
492             }
493              
494             #pod =head2 recycle(@data)
495             #pod
496             #pod Re-queue C<@data> to the top of the current segment in an order such that C would subsequently return C<$data[0]> and so forth.
497             #pod
498             #pod =cut
499              
500             sub recycle {
501 5     5 1 26 my $self = shift;
502 5         80 $self->INFO('Recycling', @_);
503 5         159 $self->requeue(@_);
504             }
505              
506             #pod =head1 LOGGING AND DEBUGGING METHODS
507             #pod
508             #pod See L for detailed descriptions.
509             #pod
510             #pod =head2 INFO($message, [@items])
511             #pod
512             #pod Prints an informational C<$message> to STDERR if either the debug or verbosity level for the segment S<< is > 0 >>.
513             #pod
514             #pod =head2 DEBUG($message, [@items])
515             #pod
516             #pod Prints a debug C<$message> to STDERR if the debug level for the segment S<< is > 0 >>.
517             #pod
518             #pod =head2 WARN($message, [@items])
519             #pod
520             #pod Issues a warning with C<$message> via L.
521             #pod
522             #pod =head2 ERROR($message, [@items])
523             #pod
524             #pod Throws an error with C<$message> via L.
525             #pod
526             #pod =head1 UTILITY ATTRIBUTES
527             #pod
528             #pod None of these should be directly accessed. Documented for contributors and source-code readers.
529             #pod
530             #pod =head2 args
531             #pod
532             #pod The arguments passed to the C method of L.
533             #pod
534             #pod =cut
535              
536             has args => (
537             is => 'rwp',
538             isa => ArrayRef,
539             lazy => 1,
540             builder => sub {
541 10     10   563 my ($self) = @_;
542 10 50       27 if ($self->has_parent) {
543 10         133 return $self->main->args;
544             }
545             else {
546 0         0 return [];
547             }
548             },
549             );
550              
551             #pod =head2 directory
552             #pod
553             #pod A hashref of the segment's children, keyed by their labels. Used by C.
554             #pod
555             #pod =cut
556              
557             has directory => (
558             is => 'lazy',
559             isa => HashRef,
560             builder => sub {
561 7     7   522 my ($self) = @_;
562 7 50       13 return {} unless $self->has_children;
563 7         9 my %dir;
564 7         8 for my $child (@{$self->children}) {
  7         15  
565 11         130 $dir{$child->path->name} = $child;
566             }
567 7         93 return \%dir;
568             },
569             );
570              
571             #pod =head2 drain
572             #pod
573             #pod A reference to the location where the segment's processed items are emitted.
574             #pod
575             #pod =cut
576              
577             BEGIN { # Enables 'with Piper::Role::Queue'
578             has drain => (
579             is => 'lazy',
580             handles => [qw(dequeue ready)],
581             builder => sub {
582 23     23   1798 my ($self) = @_;
583 23 100       56 if ($self->has_parent) {
584 12         33 return $self->next_segment;
585             }
586             else {
587 11         153 return $self->main->config->queue_class->new();
588             }
589             },
590 4     4   5504 );
591             }
592              
593             #pod =head2 follower
594             #pod
595             #pod A hashref of children paths to the child's next adjacent segment. Used by C.
596             #pod
597             #pod =cut
598              
599             has follower => (
600             is => 'lazy',
601             isa => HashRef,
602             builder => sub {
603 8     8   506 my ($self) = @_;
604 8 50       24 return {} unless $self->has_children;
605 8         11 my %follow;
606 8         9 for my $index (0..$#{$self->children}) {
  8         29  
607 12 100       34 if (defined $self->children->[$index + 1]) {
608 4         16 $follow{$self->children->[$index]} =
609             $self->children->[$index + 1];
610             }
611             else {
612 8         113 $follow{$self->children->[$index]} = $self->drain;
613             }
614             }
615 8         110 return \%follow;
616             },
617             );
618              
619             #pod =head2 logger
620             #pod
621             #pod A reference to the logger for the pipeline. Handles L methods.
622             #pod
623             #pod =cut
624              
625             has logger => (
626             is => 'lazy',
627             isa => ConsumerOf['Piper::Role::Logger'],
628             handles => 'Piper::Role::Logger',
629             builder => sub {
630 35     35   2985 my ($self) = @_;
631            
632 35 100       91 if ($self->has_parent) {
633 22         283 return $self->main->logger;
634             }
635             else {
636 13         203 return $self->main->config->logger_class->new();
637             }
638             },
639             );
640              
641             # Cute little trick to auto-insert the instance object
642             # as first argument, since $self will become the logger
643             # object and lose access to paths/labels/etc.
644             around [qw(INFO DEBUG WARN ERROR)] => sub {
645             my ($orig, $self) = splice @_, 0, 2;
646             if (ref $_[0]) {
647             $self->$orig(@_);
648             }
649             else {
650             $self->$orig($self, @_);
651             }
652             };
653              
654             #pod =head2 queue
655             #pod
656             #pod A reference to the location where data is queued for processing by this segment.
657             #pod
658             #pod =cut
659              
660             BEGIN { # Enables 'with Piper::Role::Queue'
661             has queue => (
662             is => 'lazy',
663             isa => ConsumerOf['Piper::Role::Queue'],
664             handles => [qw(enqueue requeue)],
665             builder => sub {
666 23     23   1929 my ($self) = @_;
667 23 100       67 if ($self->has_children) {
668 8         107 return $self->children->[0];
669             }
670             else {
671 15         220 return $self->main->config->queue_class->new();
672             }
673             },
674 4     4   3798 );
675             }
676              
677             #pod =head2 segment
678             #pod
679             #pod The L or L object from which the instance segment was created.
680             #pod
681             #pod =cut
682              
683             BEGIN { # So we can 'around' on Piper::Role::Segment methods
684 4     4   10518 has segment => (
685             is => 'ro',
686             isa => ConsumerOf['Piper::Role::Segment'],
687             handles => 'Piper::Role::Segment',
688             required => 1,
689             );
690             }
691              
692             #pod =head1 UTILITY METHODS
693             #pod
694             #pod None of these should be directly accessed. Documented for contributors and source-code readers.
695             #pod
696             #pod =head2 descendant($path, $referrer)
697             #pod
698             #pod Returns a child segment if its path ends with C<$path>. Does not search children with a path of C<$referrer>, as it was presumably already searched by a previous iteration of the search. Used by C.
699             #pod
700             #pod =cut
701              
702             sub descendant {
703 554     554 1 15433 my ($self, $path, $referrer) = @_;
704 554 50       1012 return unless $self->has_children;
705 554   100     1211 $referrer //= '';
706              
707 554         3744 $self->DEBUG("Searching for location '$path'");
708 554 100       13113 $self->DEBUG('Referrer', $referrer) if $referrer;
709              
710             # Search immediate children
711 554 100 66     7842 $path = Piper::Path->new($path) if $path and not ref $path;
712 554 50       7033 my @pieces = $path ? $path->split : ();
713 554         610 my $descend = $self;
714 554   100     1676 while (defined $descend and @pieces) {
715 731 100       10022 if (!$descend->has_children) {
    100          
716 62         116 $descend = undef;
717             }
718             elsif (exists $descend->directory->{$pieces[0]}) {
719 348         5500 $descend = $descend->directory->{$pieces[0]};
720 348         2129 shift @pieces;
721             }
722             else {
723 321         1978 $descend = undef;
724             }
725             }
726              
727             # Search grandchildren,
728             # but not when checking whether requested location starts at $self (referrer = $self)
729 554 100 100     1281 if (!defined $descend and $referrer ne $self) {
730 358         259 my @possible;
731 358         246 for my $child (@{$self->children}) {
  358         640  
732 570 100       778 if ($child eq $referrer) {
733 79         135 $self->DEBUG("Skipping search of '$child' referrer");
734 79         1807 next;
735             }
736 491 100       1052 if ($child->has_children) {
737 133         178 my $potential = $child->descendant($path);
738 133 100       262 push @possible, $potential if defined $potential;
739             }
740             }
741              
742 358 100       574 if (@possible) {
743 24     24   142 $descend = min_by { $_->path->split } @possible;
  24         447  
744             }
745             }
746              
747             # If location begins with $self->label, see if requested location starts at $self
748             # but not if already checking that (referrer = $self)
749 554 100 100     1299 if (!defined $descend and $referrer ne $self) {
750 334         4139 my $overlap = $self->label;
751 334 100       15270 if ($path =~ m{^\Q$overlap\E(?:$|/(?.*))}) {
752 4   100 4   9052 $path = $+{path} // '';
  4         1538  
  4         1287  
  134         841  
753 134 100       2294 $self->DEBUG('Overlapping descendant search', $path ? $path : ());
754 134 100       3119 $descend = $path ? $self->descendant($path, $self) : $self;
755             }
756             }
757              
758 554         1988 return $descend;
759             }
760              
761             #pod =head2 pressure
762             #pod
763             #pod An integer metric for the "fullness" of the pending queue. For handler instances (initialized from L objects), it is the percentage of pending items vs the batch size of the segment. For container instances (initialized from L objects), is is the maximum C of the contained instances. Used by process_batch for choosing which segment to process.
764             #pod
765             #pod =cut
766              
767             # Metric for "how full" the pending queue is
768             sub pressure {
769 532     532 1 393 my ($self) = @_;
770 532 100       633 if ($self->has_children) {
771 79         56 return max(map { $_->pressure } @{$self->children});
  213         5116  
  79         109  
772             }
773             else {
774 453 100       567 return $self->pending ? int(100 * $self->pending / $self->batch_size) : 0;
775             }
776             }
777              
778             #pod =head2 process_batch
779             #pod
780             #pod Chooses the "best" segment for processing, and processes a batch for that segment.
781             #pod
782             #pod It first attempts to choose the full-batch segment (C<< pending >= batch_size >>) closest to the end of the pipeline. If there are no full-batch segments, it chooses the segment closest to being full.
783             #pod
784             #pod =cut
785              
786             sub process_batch {
787 283     283 1 5783 my ($self) = @_;
788 283 100       460 if ($self->has_children) {
789 166         119 my $best;
790             # Full-batch process closest to drain
791 166 100   282   391 if ($best = last_value { $_->pressure >= 100 } @{$self->children}) {
  282         912  
  166         438  
792 148         273 $self->DEBUG("Chose batch $best: full-batch process closest to drain");
793             }
794             # If no full batch, choose the one closest to full
795             else {
796 18     31   728 $best = max_by { $_->pressure } @{$self->children};
  31         437  
  18         70  
797 18         805 $self->DEBUG("Chose batch $best: closest to full-batch");
798             }
799 166         4404 $best->process_batch;
800             }
801             else {
802 117         1834 my $num = $self->batch_size;
803 117         9077 $self->DEBUG('Processing batch with max size', $num);
804              
805 117         4008 my @batch = $self->queue->dequeue($num);
806 117         1776 $self->INFO('Processing batch', @batch);
807              
808             $self->segment->handler->(
809             $self,
810             \@batch,
811 117         2738 @{$self->args}
  117         1611  
812             );
813             }
814             }
815              
816             1;
817              
818             __END__