File Coverage

blib/lib/Piper/Instance.pm
Criterion Covered Total %
statement 211 212 99.5
branch 78 84 92.8
condition 28 31 90.3
subroutine 44 44 100.0
pod 14 17 82.3
total 375 388 96.6


line stmt bran cond sub pod time code
1             #####################################################################
2             ## AUTHOR: Mary Ehlers, regina.verbae@gmail.com
3             ## ABSTRACT: An initialized pipeline segment for the Piper system
4             #####################################################################
5              
6             package Piper::Instance;
7              
8 4     4   37 use v5.10;
  4         10  
9 4     4   15 use strict;
  4         6  
  4         60  
10 4     4   14 use warnings;
  4         7  
  4         122  
11              
12 4     4   1314 use List::AllUtils qw(last_value max part sum);
  4         39376  
  4         280  
13 4     4   23 use List::UtilsBy qw(max_by min_by);
  4         6  
  4         128  
14 4     4   1340 use Piper::Path;
  4         11  
  4         105  
15 4     4   23 use Scalar::Util qw(weaken);
  4         6  
  4         165  
16 4     4   18 use Types::Standard qw(ArrayRef ConsumerOf Enum HashRef InstanceOf Tuple slurpy);
  4         5  
  4         21  
17              
18 4     4   3952 use Moo;
  4         37  
  4         20  
19 4     4   1165 use namespace::clean;
  4         7  
  4         17  
20              
21             with qw(Piper::Role::Queue);
22              
23             use overload (
24 3274     3274   113712 q{""} => sub { $_[0]->path },
25 4         22 fallback => 1,
26 4     4   1395 );
  4         7  
27              
28             our $VERSION = '0.05'; # from Piper-0.05.tar.gz
29              
30             #pod =head1 ATTRIBUTES
31             #pod
32             #pod =head2 batch_size
33             #pod
34             #pod The number of items to process at a time for this segment.
35             #pod
36             #pod If not set, inherits the C of any existing parent(s). If the segment has no parents, or if none of its parents have a C defined, the default C will be used. The default is 200, but this can be configured at import of L.
37             #pod
38             #pod To clear a previously-set C, simply set it to C or use the C method.
39             #pod
40             #pod $segment->batch_size(undef);
41             #pod $segment->clear_batch_size;
42             #pod
43             #pod =cut
44              
45             # around below to set up inheritance through parents
46              
47             #pod =head2 children
48             #pod
49             #pod For container instances (made from L objects, not L objects), the C attribute holds an arrayref of the contained instance objects.
50             #pod
51             #pod =cut
52              
53             has children => (
54             is => 'ro',
55             # Must contain at least one child
56             isa => Tuple[InstanceOf['Piper::Instance'],
57             slurpy ArrayRef[InstanceOf['Piper::Instance']]
58             ],
59             required => 0,
60             predicate => 1,
61             );
62              
63             #pod =head2 debug
64             #pod
65             #pod Debug level for this segment. When accessing, inherits the debug level of any existing parent(s) if not explicitly set for this segment. The default level is 0, but can be globally overridden with the environment variable C.
66             #pod
67             #pod To clear a previously-set debug level for a segment, simply set it to C or use the C method.
68             #pod
69             #pod $segment->debug(undef);
70             #pod $segment->clear_debug;
71             #pod
72             #pod =cut
73              
74             # around below to set up inheritance through parents
75              
76             #pod =head2 enabled
77             #pod
78             #pod A boolean indicating that the segment is enabled and can accept items for processing. Inherits this attribute from any existing parent(s) with a default of true.
79             #pod
80             #pod To clear a previously-set enabled attribute, simply set it to C or use the C method.
81             #pod
82             #pod $segment->enabled(undef);
83             #pod $segment->clear_enabled;
84             #pod
85             #pod =cut
86              
87             # around below to set up inheritance through parents
88              
89             #pod =head2 main
90             #pod
91             #pod Holds a reference to the outermost container instance for the pipeline.
92             #pod
93             #pod =cut
94              
95             has main => (
96             is => 'lazy',
97             isa => InstanceOf['Piper::Instance'],
98             weak_ref => 1,
99             builder => sub {
100 39     39   322 my ($self) = @_;
101 39         54 my $parent = $self;
102 39         114 while ($parent->has_parent) {
103 35         68 $parent = $parent->parent;
104             }
105 39         542 return $parent;
106             },
107             );
108              
109             #pod =head2 parent
110             #pod
111             #pod Unless this segment is the outermost container (C
), this attribute holds a reference to the segment's immediate container.
112             #pod
113             #pod =cut
114              
115             has parent => (
116             is => 'rwp',
117             isa => InstanceOf['Piper::Instance'],
118             # Setting a parent will introduce a self-reference
119             weak_ref => 1,
120             required => 0,
121             predicate => 1,
122             );
123              
124             #pod =head2 path
125             #pod
126             #pod The full path to this segment, built as the concatenation of all the parent(s) labels and the segment's label, joined by C. L objects stringify to this attribute.
127             #pod
128             #pod =cut
129              
130             has path => (
131             is => 'lazy',
132             isa => InstanceOf['Piper::Path'],
133             builder => sub {
134 35     35   3337 my ($self) = @_;
135              
136 35 100       524 return $self->has_parent
137             ? $self->parent->path->child($self->label)
138             : Piper::Path->new($self->label);
139             },
140             );
141              
142             #pod =head2 verbose
143             #pod
144             #pod Verbosity level for this segment. When accessing, inherits verbosity level of any existing parent(s) if not explicitly set for this segment.
145             #pod
146             #pod To clear a previously-set verbosity level for a segment, simply set it to C or use the C method.
147             #pod
148             #pod $segment->verbose(undef);
149             #pod $segment->clear_verbose;
150             #pod
151             #pod =cut
152              
153             # Inherit parent settings
154             for my $attr (qw(batch_size debug enabled verbose)) {
155             my $clear = "clear_$attr";
156             my $has = "has_$attr";
157              
158             around $attr => sub {
159             my ($orig, $self) = splice @_, 0, 2;
160              
161             state $default = {
162             batch_size => $self->main->config->batch_size,
163             debug => 0,
164             enabled => 1,
165             verbose => 0,
166             };
167              
168             if (@_) {
169             return $self->$clear() if !defined $_[0];
170             return $self->$orig(@_);
171             }
172             else {
173             return $self->$has()
174             ? $self->$orig()
175             : $self->has_parent
176             ? $self->parent->$attr()
177             : $default->{$attr};
178             }
179             };
180             }
181              
182             #pod =head1 METHODS
183             #pod
184             #pod Methods marked with a (*) should only be called from the outermost instance.
185             #pod
186             #pod =head2 clear_batch_size
187             #pod
188             #pod =head2 clear_debug
189             #pod
190             #pod =head2 clear_enabled
191             #pod
192             #pod =head2 clear_verbose
193             #pod
194             #pod Methods for clearing the corresponding attribute.
195             #pod
196             #pod =head2 has_children
197             #pod
198             #pod A boolean indicating whether the instance has any children (contained instances). Will be true for all segments initialized from a L object and false for all segments initialized from a L object.
199             #pod
200             #pod =head2 has_parent
201             #pod
202             #pod A boolean indicating whether the instance has a parent (container instance). Will be true for all segments except the outermost segment (C
).
203             #pod
204             #pod =head2 has_pending
205             #pod
206             #pod Returns a boolean indicating whether there are any items that are queued at some level of the segment but have not completed processing.
207             #pod
208             #pod =cut
209              
210             sub has_pending {
211 398     398 1 4887 my ($self) = @_;
212              
213 398 100       621 if ($self->has_children) {
214 208         244 for my $child (@{$self->children}) {
  208         307  
215 267 100       391 return 1 if $child->has_pending;
216             }
217 51         131 return 0;
218             }
219             else {
220 190         2536 return $self->queue->ready;
221             }
222             }
223              
224             #pod =head2 *dequeue([$num])
225             #pod
226             #pod Remove at most C<$num> S<(default 1)> processed items from the end of the segment.
227             #pod
228             #pod =head2 *enqueue(@data)
229             #pod
230             #pod Queue C<@data> for processing by the pipeline.
231             #pod
232             #pod =cut
233              
234             around enqueue => sub {
235             my ($orig, $self, @args) = @_;
236              
237             if (!$self->enabled) {
238             # Bypass - go straight to drain
239             $self->INFO('Skipping disabled process', @args);
240             $self->drain->enqueue(@args);
241             return;
242             }
243              
244             my @items;
245             if ($self->has_allow) {
246             my ($skip, $queue) = part {
247             $self->allow->($_) ? 1 : 0
248             } @args;
249              
250             @items = @$queue if defined $queue;
251              
252             if (defined $skip) {
253             $self->INFO('Disallowed items emitted to next handler', @$skip);
254             $self->drain->enqueue(@$skip);
255             }
256             }
257             else {
258             @items = @args;
259             }
260              
261             return unless @items;
262              
263             $self->INFO('Queueing items', @items);
264             $self->$orig(@items);
265             };
266              
267             #pod =head2 find_segment($location)
268             #pod
269             #pod Find and return the segment instance according to <$location>, which can be a label or a path-like hierarchy of labels.
270             #pod
271             #pod For example, in the following pipeline, a few possible C<$location> values include C, C, or C
.
272             #pod
273             #pod my $pipe = Piper->new(
274             #pod { label => 'main' },
275             #pod subpipe => Piper->new(
276             #pod a => sub { ... },
277             #pod b => sub { ... },
278             #pod c => sub { ... },
279             #pod ),
280             #pod )->init;
281             #pod
282             #pod If a label is unique within the pipeline, no path is required. For non-unique labels, searches are performed in a nearest-neighbor, depth-first manner.
283             #pod
284             #pod For example, in the following pipeline, searching for C from C would find C
, not C
. So to reach C
from C, the appropriate search would be for C
.
285             #pod
286             #pod my $pipe = Piper->new(
287             #pod { label => 'main' },
288             #pod pipeA => Piper->new(
289             #pod processA => sub { ... },
290             #pod processB => sub { ... },
291             #pod ),
292             #pod processA => sub { ... },
293             #pod );
294             #pod
295             #pod =cut
296              
297             sub find_segment {
298 192     192 1 58587 my ($self, $location) = @_;
299            
300 192         261 state $global_cache = {};
301 192   100     3566 $global_cache->{$self->main->id}{$self->path} //= {};
302 192         2574 my $cache = $global_cache->{$self->main->id}{$self->path};
303              
304 192 100       497 unless (exists $cache->{$location}) {
305 161         2108 $location = Piper::Path->new($location);
306 161 100 100     8761 if ($self->has_children or $self->has_parent) {
307 159 100       317 my $parent = $self->has_children ? $self : $self->parent;
308 159         273 my $segment = $parent->descendant($location);
309 159   100     432 while (!defined $segment and $parent->has_parent) {
310 88         109 my $referrer = $parent;
311 88         137 $parent = $parent->parent;
312 88         145 $segment = $parent->descendant($location, $referrer);
313             }
314 159         375 $cache->{$location} = $segment;
315             }
316             else {
317             # Lonely Process (no parents or children)
318 2 100       6 $cache->{$location} = "$self" eq "$location" ? $self : undef;
319             }
320 161 100       325 weaken($cache->{$location}) if defined $cache->{$location};
321             }
322              
323 192 100       434 $self->DEBUG("Found label $location: '$cache->{$location}'") if defined $cache->{$location};
324 192         5204 return $cache->{$location};
325             }
326              
327             #pod =head2 *flush
328             #pod
329             #pod Process batches until there are no more items pending.
330             #pod
331             #pod =cut
332              
333             sub flush {
334 2     2 0 6 my ($self) = @_;
335              
336 2         8 while ($self->has_pending) {
337 62         112 $self->process_batch;
338             }
339             }
340              
341             #pod =head2 *is_exhausted
342             #pod
343             #pod Returns a boolean indicating whether there are any items left to process or dequeue.
344             #pod
345             #pod =cut
346              
347             sub is_exhausted {
348 24     24 0 6134 my ($self) = @_;
349            
350 24 100       39 return $self->prepare ? 0 : 1;
351             }
352              
353             #pod =head2 *isnt_exhausted
354             #pod
355             #pod Returns the opposite of C.
356             #pod
357             #pod =cut
358              
359             sub isnt_exhausted {
360 15     15 0 32 my ($self) = @_;
361 15         30 return !$self->is_exhausted;
362             }
363              
364             #pod =head2 next_segment
365             #pod
366             #pod Returns the next adjacent segment from the calling segment. Returns C for the outermost container.
367             #pod
368             #pod =cut
369              
370             sub next_segment {
371 12     12 1 18 my ($self) = @_;
372 12 50       29 return unless $self->has_parent;
373 12         165 return $self->parent->follower->{$self};
374             }
375              
376             #pod =head2 pending
377             #pod
378             #pod Returns the number of items that are queued at some level of the segment but have not completed processing.
379             #pod
380             #pod =cut
381              
382             sub pending {
383 746     746 1 2981 my ($self) = @_;
384 746 100       1033 if ($self->has_children) {
385 14         15 return sum(map { $_->pending } @{$self->children});
  24         36  
  14         32  
386             }
387             else {
388 732         9528 return $self->queue->ready;
389             }
390             }
391              
392             #pod =head2 *prepare([$num])
393             #pod
394             #pod Process batches while data is still C until at least C<$num> S<(default 1)> items are C for C.
395             #pod
396             #pod =cut
397              
398             sub prepare {
399 28     28 1 49 my ($self, $num) = @_;
400 28   100     92 $num //= 1;
401              
402 28   66     48 while ($self->has_pending and $self->ready < $num) {
403 26         50 $self->process_batch;
404             }
405 28         356 return $self->ready;
406             }
407              
408             #pod =head2 ready
409             #pod
410             #pod Returns the number of items that have finished processing and are ready for C from the segment.
411             #pod
412             #pod =cut
413              
414             #pod =head1 FLOW CONTROL METHODS
415             #pod
416             #pod These methods are available for use within process handler subroutines (see L).
417             #pod
418             #pod =head2 eject(@data)
419             #pod
420             #pod If the segment has a parent, send C<@data> to the drain of its parent. Otherwise, enqueues C<@data> to the segment's drain.
421             #pod
422             #pod =cut
423              
424             sub eject {
425 5     5 1 4849 my $self = shift;
426 5 100       18 if ($self->has_parent) {
427 4         15 $self->INFO('Ejecting to drain of parent ('.$self->parent.')', @_);
428 4         153 $self->parent->drain->enqueue(@_);
429             }
430             else {
431 1         23 $self->INFO('Ejecting to drain', @_);
432 1         37 $self->drain->enqueue(@_);
433             }
434             }
435              
436             #pod =head2 emit(@data)
437             #pod
438             #pod Send C<@data> to the next segment in the pipeline. If the segment is the last in the pipeline, emits to the drain, making the C<@data> ready for C.
439             #pod
440             #pod =cut
441              
442             sub emit {
443 130     130 1 5188 my $self = shift;
444 130         1916 $self->INFO('Emitting', @_);
445             # Just collect in the drain
446 130         4681 $self->drain->enqueue(@_);
447             }
448              
449             #pod =head2 inject(@data)
450             #pod
451             #pod If the segment has a parent, enqueues C<@data> to its parent. Otherwise, enqueues <@data> to itself.
452             #pod
453             #pod =cut
454              
455             sub inject {
456 5     5 1 6948 my $self = shift;
457              
458 5 100       19 if ($self->has_parent) {
459 4         26 $self->INFO('Injecting to parent ('.$self->parent.')', @_);
460 4         156 $self->parent->enqueue(@_);
461             }
462             else {
463 1         5 $self->INFO('Injecting to self ('.$self.')', @_);
464 1         41 $self->enqueue(@_);
465             }
466             }
467              
468             #pod =head2 injectAfter($location, @data)
469             #pod
470             #pod Send C<@data> to the segment I the specified C<$location>. See L|/find_segment($location)> for a detailed description of C<$location>.
471             #pod
472             #pod =cut
473              
474             sub injectAfter {
475 5     5 1 4981 my $self = shift;
476 5         7 my $location = shift;
477 5         12 my $segment = $self->find_segment($location);
478 5 100       46 $self->ERROR("Could not find $location to injectAfter", @_)
479             if !defined $segment;
480 3         52 $self->INFO("Injecting to $location", @_);
481 3         115 $segment->drain->enqueue(@_);
482             }
483              
484             #pod =head2 injectAt($location, @data)
485             #pod
486             #pod Send C<@data> to the segment I the specified C<$location>. See L|/find_segment($location)> for a detailed description of C<$location>.
487             #pod
488             #pod =cut
489              
490             sub injectAt {
491 5     5 1 4273 my $self = shift;
492 5         26 my $location = shift;
493 5         14 my $segment = $self->find_segment($location);
494 5 100       53 $self->ERROR("Could not find $location to injectAt", @_)
495             if !defined $segment;
496 3         55 $self->INFO("Injecting to $location", @_);
497 3         113 $segment->enqueue(@_);
498             }
499              
500             #pod =head2 recycle(@data)
501             #pod
502             #pod Re-queue C<@data> to the top of the current segment in an order such that C would subsequently return C<$data[0]> and so forth.
503             #pod
504             #pod =cut
505              
506             sub recycle {
507 5     5 1 27 my $self = shift;
508 5         97 $self->INFO('Recycling', @_);
509 5         189 $self->requeue(@_);
510             }
511              
512             #pod =head1 LOGGING AND DEBUGGING METHODS
513             #pod
514             #pod See L for detailed descriptions.
515             #pod
516             #pod =head2 INFO($message, [@items])
517             #pod
518             #pod Prints an informational C<$message> to STDERR if either the debug or verbosity level for the segment S<< is > 0 >>.
519             #pod
520             #pod =head2 DEBUG($message, [@items])
521             #pod
522             #pod Prints a debug C<$message> to STDERR if the debug level for the segment S<< is > 0 >>.
523             #pod
524             #pod =head2 WARN($message, [@items])
525             #pod
526             #pod Issues a warning with C<$message> via L.
527             #pod
528             #pod =head2 ERROR($message, [@items])
529             #pod
530             #pod Throws an error with C<$message> via L.
531             #pod
532             #pod =head1 UTILITY ATTRIBUTES
533             #pod
534             #pod None of these should be directly accessed. Documented for contributors and source-code readers.
535             #pod
536             #pod =head2 args
537             #pod
538             #pod The arguments passed to the C method of L.
539             #pod
540             #pod =cut
541              
542             has args => (
543             is => 'rwp',
544             isa => ArrayRef,
545             lazy => 1,
546             builder => sub {
547 10     10   91 my ($self) = @_;
548 10 50       27 if ($self->has_parent) {
549 10         126 return $self->main->args;
550             }
551             else {
552 0         0 return [];
553             }
554             },
555             );
556              
557             #pod =head2 directory
558             #pod
559             #pod A hashref of the segment's children, keyed by their labels. Used by C.
560             #pod
561             #pod =cut
562              
563             has directory => (
564             is => 'lazy',
565             isa => HashRef,
566             builder => sub {
567 7     7   57 my ($self) = @_;
568 7 50       22 return {} unless $self->has_children;
569 7         11 my %dir;
570 7         10 for my $child (@{$self->children}) {
  7         19  
571 11         159 $dir{$child->path->name} = $child;
572             }
573 7         107 return \%dir;
574             },
575             );
576              
577             #pod =head2 drain
578             #pod
579             #pod A reference to the location where the segment's processed items are emitted.
580             #pod
581             #pod =cut
582              
583             BEGIN { # Enables 'with Piper::Role::Queue'
584             has drain => (
585             is => 'lazy',
586             handles => [qw(dequeue ready)],
587             builder => sub {
588 23     23   204 my ($self) = @_;
589 23 100       52 if ($self->has_parent) {
590 12         29 return $self->next_segment;
591             }
592             else {
593 11         142 return $self->main->config->queue_class->new();
594             }
595             },
596 4     4   6937 );
597             }
598              
599             #pod =head2 follower
600             #pod
601             #pod A hashref of children paths to the child's next adjacent segment. Used by C.
602             #pod
603             #pod =cut
604              
605             has follower => (
606             is => 'lazy',
607             isa => HashRef,
608             builder => sub {
609 8     8   74 my ($self) = @_;
610 8 50       23 return {} unless $self->has_children;
611 8         10 my %follow;
612 8         13 for my $index (0..$#{$self->children}) {
  8         25  
613 12 100       33 if (defined $self->children->[$index + 1]) {
614 4         19 $follow{$self->children->[$index]} =
615             $self->children->[$index + 1];
616             }
617             else {
618 8         110 $follow{$self->children->[$index]} = $self->drain;
619             }
620             }
621 8         112 return \%follow;
622             },
623             );
624              
625             #pod =head2 logger
626             #pod
627             #pod A reference to the logger for the pipeline. Handles L methods.
628             #pod
629             #pod =cut
630              
631             has logger => (
632             is => 'lazy',
633             isa => ConsumerOf['Piper::Role::Logger'],
634             handles => 'Piper::Role::Logger',
635             builder => sub {
636 35     35   744 my ($self) = @_;
637            
638 35 100       90 if ($self->has_parent) {
639 22         270 return $self->main->logger;
640             }
641             else {
642 13         186 return $self->main->config->logger_class->new();
643             }
644             },
645             );
646              
647             # Cute little trick to auto-insert the instance object
648             # as first argument, since $self will become the logger
649             # object and lose access to paths/labels/etc.
650             around [qw(INFO DEBUG WARN ERROR)] => sub {
651             my ($orig, $self) = splice @_, 0, 2;
652             if (ref $_[0]) {
653             $self->$orig(@_);
654             }
655             else {
656             $self->$orig($self, @_);
657             }
658             };
659              
660             #pod =head2 queue
661             #pod
662             #pod A reference to the location where data is queued for processing by this segment.
663             #pod
664             #pod =cut
665              
666             BEGIN { # Enables 'with Piper::Role::Queue'
667             has queue => (
668             is => 'lazy',
669             isa => ConsumerOf['Piper::Role::Queue'],
670             handles => [qw(enqueue requeue)],
671             builder => sub {
672 23     23   411 my ($self) = @_;
673 23 100       62 if ($self->has_children) {
674 8         109 return $self->children->[0];
675             }
676             else {
677 15         197 return $self->main->config->queue_class->new();
678             }
679             },
680 4     4   10967 );
681             }
682              
683             #pod =head2 segment
684             #pod
685             #pod The L or L object from which the instance segment was created.
686             #pod
687             #pod =cut
688              
689             BEGIN { # So we can 'around' on Piper::Role::Segment methods
690 4     4   19688 has segment => (
691             is => 'ro',
692             isa => ConsumerOf['Piper::Role::Segment'],
693             handles => 'Piper::Role::Segment',
694             required => 1,
695             );
696             }
697              
698             #pod =head1 UTILITY METHODS
699             #pod
700             #pod None of these should be directly accessed. Documented for contributors and source-code readers.
701             #pod
702             #pod =head2 descendant($path, $referrer)
703             #pod
704             #pod Returns a child segment if its path ends with C<$path>. Does not search children with a path of C<$referrer>, as it was presumably already searched by a previous iteration of the search. Used by C.
705             #pod
706             #pod =cut
707              
708             sub descendant {
709 554     554 1 19525 my ($self, $path, $referrer) = @_;
710 554 50       1025 return unless $self->has_children;
711 554   100     1565 $referrer //= '';
712              
713 554         4008 $self->DEBUG("Searching for location '$path'");
714 554 100       14224 $self->DEBUG('Referrer', $referrer) if $referrer;
715              
716             # Search immediate children
717 554 100 66     8351 $path = Piper::Path->new($path) if $path and not ref $path;
718 554 50       10001 my @pieces = $path ? $path->split : ();
719 554         972 my $descend = $self;
720 554   100     1629 while (defined $descend and @pieces) {
721 731 100       10121 if (!$descend->has_children) {
    100          
722 62         122 $descend = undef;
723             }
724             elsif (exists $descend->directory->{$pieces[0]}) {
725 348         5947 $descend = $descend->directory->{$pieces[0]};
726 348         2456 shift @pieces;
727             }
728             else {
729 321         2722 $descend = undef;
730             }
731             }
732              
733             # Search grandchildren,
734             # but not when checking whether requested location starts at $self (referrer = $self)
735 554 100 100     1380 if (!defined $descend and $referrer ne $self) {
736 358         454 my @possible;
737 358         368 for my $child (@{$self->children}) {
  358         659  
738 570 100       1029 if ($child eq $referrer) {
739 79         174 $self->DEBUG("Skipping search of '$child' referrer");
740 79         1986 next;
741             }
742 491 100       1166 if ($child->has_children) {
743 133         225 my $potential = $child->descendant($path);
744 133 100       270 push @possible, $potential if defined $potential;
745             }
746             }
747              
748 358 100       657 if (@possible) {
749 24     24   161 $descend = min_by { $_->path->split } @possible;
  24         524  
750             }
751             }
752              
753             # If location begins with $self->label, see if requested location starts at $self
754             # but not if already checking that (referrer = $self)
755 554 100 100     1496 if (!defined $descend and $referrer ne $self) {
756 334         4381 my $overlap = $self->label;
757 334 100       15719 if ($path =~ m{^\Q$overlap\E(?:$|/(?.*))}) {
758 4   100 4   39957 $path = $+{path} // '';
  4         1212  
  4         1304  
  134         790  
759 134 100       2281 $self->DEBUG('Overlapping descendant search', $path ? $path : ());
760 134 100       3469 $descend = $path ? $self->descendant($path, $self) : $self;
761             }
762             }
763              
764 554         2110 return $descend;
765             }
766              
767             #pod =head2 pressure
768             #pod
769             #pod An integer metric for the "fullness" of the pending queue. For handler instances (initialized from L objects), it is the percentage of pending items vs the batch size of the segment. For container instances (initialized from L objects), is is the maximum C of the contained instances. Used by process_batch for choosing which segment to process.
770             #pod
771             #pod =cut
772              
773             # Metric for "how full" the pending queue is
774             sub pressure {
775 532     532 1 648 my ($self) = @_;
776 532 100       848 if ($self->has_children) {
777 79         86 return max(map { $_->pressure } @{$self->children});
  213         5316  
  79         133  
778             }
779             else {
780 453 100 50     636 return $self->pending ? (int(100 * $self->pending / $self->batch_size) || 1) : 0;
781             }
782             }
783              
784             #pod =head2 process_batch
785             #pod
786             #pod Chooses the "best" segment for processing, and processes a batch for that segment.
787             #pod
788             #pod It first attempts to choose the full-batch segment (C<< pending >= batch_size >>) closest to the end of the pipeline. If there are no full-batch segments, it chooses the segment closest to being full.
789             #pod
790             #pod =cut
791              
792             sub process_batch {
793 283     283 1 6904 my ($self) = @_;
794 283 100       516 if ($self->has_children) {
795 166         173 my $best;
796             # Full-batch process closest to drain
797 166 100   282   473 if ($best = last_value { $_->pressure >= 100 } @{$self->children}) {
  282         980  
  166         428  
798 148         320 $self->DEBUG("Chose batch $best: full-batch process closest to drain");
799             }
800             # If no full batch, choose the one closest to full
801             else {
802 18     31   832 $best = max_by { $_->pressure } @{$self->children};
  31         537  
  18         71  
803 18         856 $self->DEBUG("Chose batch $best: closest to full-batch");
804             }
805 166         4465 $best->process_batch;
806             }
807             else {
808 117         1771 my $num = $self->batch_size;
809 117         8725 $self->DEBUG('Processing batch with max size', $num);
810              
811 117         4150 my @batch = $self->queue->dequeue($num);
812 117         1797 $self->INFO('Processing batch', @batch);
813              
814             $self->segment->handler->(
815             $self,
816             \@batch,
817 117         2938 @{$self->args}
  117         1566  
818             );
819             }
820             }
821              
822             1;
823              
824             __END__