File Coverage

blib/lib/Piper/Instance.pm
Criterion Covered Total %
statement 211 212 99.5
branch 78 84 92.8
condition 27 29 93.1
subroutine 44 44 100.0
pod 14 17 82.3
total 374 386 96.8


line stmt bran cond sub pod time code
1             #####################################################################
2             ## AUTHOR: Mary Ehlers, regina.verbae@gmail.com
3             ## ABSTRACT: An initialized pipeline segment for the Piper system
4             #####################################################################
5              
6             package Piper::Instance;
7              
8 4     4   28 use v5.10;
  4         10  
9 4     4   16 use strict;
  4         5  
  4         68  
10 4     4   12 use warnings;
  4         4  
  4         91  
11              
12 4     4   1315 use List::AllUtils qw(last_value max part sum);
  4         36010  
  4         299  
13 4     4   24 use List::UtilsBy qw(max_by min_by);
  4         6  
  4         178  
14 4     4   1272 use Piper::Path;
  4         9  
  4         109  
15 4     4   23 use Scalar::Util qw(weaken);
  4         4  
  4         200  
16 4     4   46 use Types::Standard qw(ArrayRef ConsumerOf Enum HashRef InstanceOf Tuple slurpy);
  4         5  
  4         35  
17              
18 4     4   3069 use Moo;
  4         5  
  4         20  
19 4     4   960 use namespace::clean;
  4         8  
  4         27  
20              
21             with qw(Piper::Role::Queue);
22              
23             use overload (
24 3274     3274   106126 q{""} => sub { $_[0]->path },
25 4         29 fallback => 1,
26 4     4   1290 );
  4         5  
27              
28             our $VERSION = '0.04'; # from Piper-0.04.tar.gz
29              
30             #pod =head1 ATTRIBUTES
31             #pod
32             #pod =head2 batch_size
33             #pod
34             #pod The number of items to process at a time for this segment.
35             #pod
36             #pod If not set, inherits the C of any existing parent(s). If the segment has no parents, or if none of its parents have a C defined, the default C will be used. The default is 200, but this can be configured at import of L.
37             #pod
38             #pod To clear a previously-set C, simply set it to C or use the C method.
39             #pod
40             #pod $segment->batch_size(undef);
41             #pod $segment->clear_batch_size;
42             #pod
43             #pod =cut
44              
45             # around below to set up inheritance through parents
46              
47             #pod =head2 children
48             #pod
49             #pod For container instances (made from L objects, not L objects), the C attribute holds an arrayref of the contained instance objects.
50             #pod
51             #pod =cut
52              
53             has children => (
54             is => 'ro',
55             # Must contain at least one child
56             isa => Tuple[InstanceOf['Piper::Instance'],
57             slurpy ArrayRef[InstanceOf['Piper::Instance']]
58             ],
59             required => 0,
60             predicate => 1,
61             );
62              
63             #pod =head2 debug
64             #pod
65             #pod Debug level for this segment. When accessing, inherits the debug level of any existing parent(s) if not explicitly set for this segment. The default level is 0, but can be globally overridden with the environment variable C.
66             #pod
67             #pod To clear a previously-set debug level for a segment, simply set it to C or use the C method.
68             #pod
69             #pod $segment->debug(undef);
70             #pod $segment->clear_debug;
71             #pod
72             #pod =cut
73              
74             # around below to set up inheritance through parents
75              
76             #pod =head2 enabled
77             #pod
78             #pod A boolean indicating that the segment is enabled and can accept items for processing. Inherits this attribute from any existing parent(s) with a default of true.
79             #pod
80             #pod To clear a previously-set enabled attribute, simply set it to C or use the C method.
81             #pod
82             #pod $segment->enabled(undef);
83             #pod $segment->clear_enabled;
84             #pod
85             #pod =cut
86              
87             # around below to set up inheritance through parents
88              
89             #pod =head2 main
90             #pod
91             #pod Holds a reference to the outermost container instance for the pipeline.
92             #pod
93             #pod =cut
94              
95             has main => (
96             is => 'lazy',
97             isa => InstanceOf['Piper::Instance'],
98             weak_ref => 1,
99             builder => sub {
100 39     39   285 my ($self) = @_;
101 39         44 my $parent = $self;
102 39         118 while ($parent->has_parent) {
103 35         77 $parent = $parent->parent;
104             }
105 39         550 return $parent;
106             },
107             );
108              
109             #pod =head2 parent
110             #pod
111             #pod Unless this segment is the outermost container (C
), this attribute holds a reference to the segment's immediate container.
112             #pod
113             #pod =cut
114              
115             has parent => (
116             is => 'rwp',
117             isa => InstanceOf['Piper::Instance'],
118             # Setting a parent will introduce a self-reference
119             weak_ref => 1,
120             required => 0,
121             predicate => 1,
122             );
123              
124             #pod =head2 path
125             #pod
126             #pod The full path to this segment, built as the concatenation of all the parent(s) labels and the segment's label, joined by C. L objects stringify to this attribute.
127             #pod
128             #pod =cut
129              
130             has path => (
131             is => 'lazy',
132             isa => InstanceOf['Piper::Path'],
133             builder => sub {
134 35     35   2769 my ($self) = @_;
135              
136 35 100       601 return $self->has_parent
137             ? $self->parent->path->child($self->label)
138             : Piper::Path->new($self->label);
139             },
140             );
141              
142             #pod =head2 verbose
143             #pod
144             #pod Verbosity level for this segment. When accessing, inherits verbosity level of any existing parent(s) if not explicitly set for this segment.
145             #pod
146             #pod To clear a previously-set verbosity level for a segment, simply set it to C or use the C method.
147             #pod
148             #pod $segment->verbose(undef);
149             #pod $segment->clear_verbose;
150             #pod
151             #pod =cut
152              
153             # Inherit parent settings
154             for my $attr (qw(batch_size debug enabled verbose)) {
155             my $clear = "clear_$attr";
156             my $has = "has_$attr";
157              
158             around $attr => sub {
159             my ($orig, $self) = splice @_, 0, 2;
160              
161             state $default = {
162             batch_size => $self->main->config->batch_size,
163             debug => 0,
164             enabled => 1,
165             verbose => 0,
166             };
167              
168             if (@_) {
169             return $self->$clear() if !defined $_[0];
170             return $self->$orig(@_);
171             }
172             else {
173             return $self->$has()
174             ? $self->$orig()
175             : $self->has_parent
176             ? $self->parent->$attr()
177             : $default->{$attr};
178             }
179             };
180             }
181              
182             #pod =head1 METHODS
183             #pod
184             #pod Methods marked with a (*) should only be called from the outermost instance.
185             #pod
186             #pod =head2 clear_batch_size
187             #pod
188             #pod =head2 clear_debug
189             #pod
190             #pod =head2 clear_enabled
191             #pod
192             #pod =head2 clear_verbose
193             #pod
194             #pod Methods for clearing the corresponding attribute.
195             #pod
196             #pod =head2 has_children
197             #pod
198             #pod A boolean indicating whether the instance has any children (contained instances). Will be true for all segments initialized from a L object and false for all segments initialized from a L object.
199             #pod
200             #pod =head2 has_parent
201             #pod
202             #pod A boolean indicating whether the instance has a parent (container instance). Will be true for all segments except the outermost segment (C
).
203             #pod
204             #pod =head2 has_pending
205             #pod
206             #pod Returns a boolean indicating whether there are any items that are queued at some level of the segment but have not completed processing.
207             #pod
208             #pod =cut
209              
210             sub has_pending {
211 398     398 1 3819 my ($self) = @_;
212              
213 398 100       558 if ($self->has_children) {
214 208         131 for my $child (@{$self->children}) {
  208         254  
215 267 100       319 return 1 if $child->has_pending;
216             }
217 51         136 return 0;
218             }
219             else {
220 190         2638 return $self->queue->ready;
221             }
222             }
223              
224             #pod =head2 *dequeue([$num])
225             #pod
226             #pod Remove at most C<$num> S<(default 1)> processed items from the end of the segment.
227             #pod
228             #pod =head2 *enqueue(@data)
229             #pod
230             #pod Queue C<@data> for processing by the pipeline.
231             #pod
232             #pod =cut
233              
234             around enqueue => sub {
235             my ($orig, $self, @args) = @_;
236              
237             if (!$self->enabled) {
238             # Bypass - go straight to drain
239             $self->INFO('Skipping disabled process', @args);
240             $self->drain->enqueue(@args);
241             return;
242             }
243              
244             my @items;
245             if ($self->has_allow) {
246             my ($skip, $queue) = part {
247             $self->allow->($_) ? 1 : 0
248             } @args;
249              
250             @items = @$queue if defined $queue;
251              
252             if (defined $skip) {
253             $self->INFO('Disallowed items emitted to next handler', @$skip);
254             $self->drain->enqueue(@$skip);
255             }
256             }
257             else {
258             @items = @args;
259             }
260              
261             return unless @items;
262              
263             $self->INFO('Queueing items', @items);
264             $self->$orig(@items);
265             };
266              
267             #pod =head2 find_segment($location)
268             #pod
269             #pod Find and return the segment instance according to <$location>, which can be a label or a path-like hierarchy of labels.
270             #pod
271             #pod For example, in the following pipeline, a few possible C<$location> values include C, C, or C
.
272             #pod
273             #pod my $pipe = Piper->new(
274             #pod { label => 'main' },
275             #pod subpipe => Piper->new(
276             #pod a => sub { ... },
277             #pod b => sub { ... },
278             #pod c => sub { ... },
279             #pod ),
280             #pod )->init;
281             #pod
282             #pod If a label is unique within the pipeline, no path is required. For non-unique labels, searches are performed in a nearest-neighbor, depth-first manner.
283             #pod
284             #pod For example, in the following pipeline, searching for C from C would find C
, not C
. So to reach C
from C, the appropriate search would be for C
.
285             #pod
286             #pod my $pipe = Piper->new(
287             #pod { label => 'main' },
288             #pod pipeA => Piper->new(
289             #pod processA => sub { ... },
290             #pod processB => sub { ... },
291             #pod ),
292             #pod processA => sub { ... },
293             #pod );
294             #pod
295             #pod =cut
296              
297             sub find_segment {
298 192     192 1 48361 my ($self, $location) = @_;
299            
300 192         197 state $global_cache = {};
301 192   100     3592 $global_cache->{$self->main->id}{$self->path} //= {};
302 192         2491 my $cache = $global_cache->{$self->main->id}{$self->path};
303              
304 192 100       428 unless (exists $cache->{$location}) {
305 161         2036 $location = Piper::Path->new($location);
306 161 100 100     6868 if ($self->has_children or $self->has_parent) {
307 159 100       364 my $parent = $self->has_children ? $self : $self->parent;
308 159         285 my $segment = $parent->descendant($location);
309 159   100     465 while (!defined $segment and $parent->has_parent) {
310 88         79 my $referrer = $parent;
311 88         100 $parent = $parent->parent;
312 88         124 $segment = $parent->descendant($location, $referrer);
313             }
314 159         361 $cache->{$location} = $segment;
315             }
316             else {
317             # Lonely Process (no parents or children)
318 2 100       6 $cache->{$location} = "$self" eq "$location" ? $self : undef;
319             }
320 161 100       278 weaken($cache->{$location}) if defined $cache->{$location};
321             }
322              
323 192 100       356 $self->DEBUG("Found label $location: '$cache->{$location}'") if defined $cache->{$location};
324 192         4630 return $cache->{$location};
325             }
326              
327             #pod =head2 *flush
328             #pod
329             #pod Process batches until there are no more items pending.
330             #pod
331             #pod =cut
332              
333             sub flush {
334 2     2 0 4 my ($self) = @_;
335              
336 2         8 while ($self->has_pending) {
337 62         94 $self->process_batch;
338             }
339             }
340              
341             #pod =head2 *is_exhausted
342             #pod
343             #pod Returns a boolean indicating whether there are any items left to process or dequeue.
344             #pod
345             #pod =cut
346              
347             sub is_exhausted {
348 24     24 0 5118 my ($self) = @_;
349            
350 24 100       40 return $self->prepare ? 0 : 1;
351             }
352              
353             #pod =head2 *isnt_exhausted
354             #pod
355             #pod Returns the opposite of C.
356             #pod
357             #pod =cut
358              
359             sub isnt_exhausted {
360 15     15 0 17 my ($self) = @_;
361 15         24 return !$self->is_exhausted;
362             }
363              
364             #pod =head2 next_segment
365             #pod
366             #pod Returns the next adjacent segment from the calling segment. Returns C for the outermost container.
367             #pod
368             #pod =cut
369              
370             sub next_segment {
371 12     12 1 13 my ($self) = @_;
372 12 50       30 return unless $self->has_parent;
373 12         168 return $self->parent->follower->{$self};
374             }
375              
376             #pod =head2 pending
377             #pod
378             #pod Returns the number of items that are queued at some level of the segment but have not completed processing.
379             #pod
380             #pod =cut
381              
382             sub pending {
383 746     746 1 2561 my ($self) = @_;
384 746 100       845 if ($self->has_children) {
385 14         14 return sum(map { $_->pending } @{$self->children});
  24         40  
  14         31  
386             }
387             else {
388 732         9383 return $self->queue->ready;
389             }
390             }
391              
392             #pod =head2 *prepare([$num])
393             #pod
394             #pod Process batches while data is still C until at least C<$num> S<(default 1)> items are C for C.
395             #pod
396             #pod =cut
397              
398             sub prepare {
399 28     28 1 36 my ($self, $num) = @_;
400 28   100     87 $num //= 1;
401              
402 28   66     45 while ($self->has_pending and $self->ready < $num) {
403 26         53 $self->process_batch;
404             }
405 28         356 return $self->ready;
406             }
407              
408             #pod =head2 ready
409             #pod
410             #pod Returns the number of items that have finished processing and are ready for C from the segment.
411             #pod
412             #pod =cut
413              
414             #pod =head1 FLOW CONTROL METHODS
415             #pod
416             #pod These methods are available for use within process handler subroutines (see L).
417             #pod
418             #pod =head2 eject(@data)
419             #pod
420             #pod If the segment has a parent, send C<@data> to the drain of its parent. Otherwise, enqueues C<@data> to the segment's drain.
421             #pod
422             #pod =cut
423              
424             sub eject {
425 5     5 1 4379 my $self = shift;
426 5 100       20 if ($self->has_parent) {
427 4         18 $self->INFO('Ejecting to drain of parent ('.$self->parent.')', @_);
428 4         143 $self->parent->drain->enqueue(@_);
429             }
430             else {
431 1         24 $self->INFO('Ejecting to drain', @_);
432 1         36 $self->drain->enqueue(@_);
433             }
434             }
435              
436             #pod =head2 emit(@data)
437             #pod
438             #pod Send C<@data> to the next segment in the pipeline. If the segment is the last in the pipeline, emits to the drain, making the C<@data> ready for C.
439             #pod
440             #pod =cut
441              
442             sub emit {
443 130     130 1 4563 my $self = shift;
444 130         2027 $self->INFO('Emitting', @_);
445             # Just collect in the drain
446 130         4598 $self->drain->enqueue(@_);
447             }
448              
449             #pod =head2 inject(@data)
450             #pod
451             #pod If the segment has a parent, enqueues C<@data> to its parent. Otherwise, enqueues <@data> to itself.
452             #pod
453             #pod =cut
454              
455             sub inject {
456 5     5 1 5725 my $self = shift;
457              
458 5 100       25 if ($self->has_parent) {
459 4         24 $self->INFO('Injecting to parent ('.$self->parent.')', @_);
460 4         152 $self->parent->enqueue(@_);
461             }
462             else {
463 1         4 $self->INFO('Injecting to self ('.$self.')', @_);
464 1         36 $self->enqueue(@_);
465             }
466             }
467              
468             #pod =head2 injectAfter($location, @data)
469             #pod
470             #pod Send C<@data> to the segment I the specified C<$location>. See L|/find_segment($location)> for a detailed description of C<$location>.
471             #pod
472             #pod =cut
473              
474             sub injectAfter {
475 5     5 1 4972 my $self = shift;
476 5         9 my $location = shift;
477 5         14 my $segment = $self->find_segment($location);
478 5 100       131 $self->ERROR("Could not find $location to injectAfter", @_)
479             if !defined $segment;
480 3         160 $self->INFO("Injecting to $location", @_);
481 3         134 $segment->drain->enqueue(@_);
482             }
483              
484             #pod =head2 injectAt($location, @data)
485             #pod
486             #pod Send C<@data> to the segment I the specified C<$location>. See L|/find_segment($location)> for a detailed description of C<$location>.
487             #pod
488             #pod =cut
489              
490             sub injectAt {
491 5     5 1 3775 my $self = shift;
492 5         7 my $location = shift;
493 5         11 my $segment = $self->find_segment($location);
494 5 100       51 $self->ERROR("Could not find $location to injectAt", @_)
495             if !defined $segment;
496 3         51 $self->INFO("Injecting to $location", @_);
497 3         103 $segment->enqueue(@_);
498             }
499              
500             #pod =head2 recycle(@data)
501             #pod
502             #pod Re-queue C<@data> to the top of the current segment in an order such that C would subsequently return C<$data[0]> and so forth.
503             #pod
504             #pod =cut
505              
506             sub recycle {
507 5     5 1 24 my $self = shift;
508 5         75 $self->INFO('Recycling', @_);
509 5         170 $self->requeue(@_);
510             }
511              
512             #pod =head1 LOGGING AND DEBUGGING METHODS
513             #pod
514             #pod See L for detailed descriptions.
515             #pod
516             #pod =head2 INFO($message, [@items])
517             #pod
518             #pod Prints an informational C<$message> to STDERR if either the debug or verbosity level for the segment S<< is > 0 >>.
519             #pod
520             #pod =head2 DEBUG($message, [@items])
521             #pod
522             #pod Prints a debug C<$message> to STDERR if the debug level for the segment S<< is > 0 >>.
523             #pod
524             #pod =head2 WARN($message, [@items])
525             #pod
526             #pod Issues a warning with C<$message> via L.
527             #pod
528             #pod =head2 ERROR($message, [@items])
529             #pod
530             #pod Throws an error with C<$message> via L.
531             #pod
532             #pod =head1 UTILITY ATTRIBUTES
533             #pod
534             #pod None of these should be directly accessed. Documented for contributors and source-code readers.
535             #pod
536             #pod =head2 args
537             #pod
538             #pod The arguments passed to the C method of L.
539             #pod
540             #pod =cut
541              
542             has args => (
543             is => 'rwp',
544             isa => ArrayRef,
545             lazy => 1,
546             builder => sub {
547 10     10   84 my ($self) = @_;
548 10 50       26 if ($self->has_parent) {
549 10         152 return $self->main->args;
550             }
551             else {
552 0         0 return [];
553             }
554             },
555             );
556              
557             #pod =head2 directory
558             #pod
559             #pod A hashref of the segment's children, keyed by their labels. Used by C.
560             #pod
561             #pod =cut
562              
563             has directory => (
564             is => 'lazy',
565             isa => HashRef,
566             builder => sub {
567 7     7   72 my ($self) = @_;
568 7 50       25 return {} unless $self->has_children;
569 7         8 my %dir;
570 7         9 for my $child (@{$self->children}) {
  7         20  
571 11         205 $dir{$child->path->name} = $child;
572             }
573 7         122 return \%dir;
574             },
575             );
576              
577             #pod =head2 drain
578             #pod
579             #pod A reference to the location where the segment's processed items are emitted.
580             #pod
581             #pod =cut
582              
583             BEGIN { # Enables 'with Piper::Role::Queue'
584             has drain => (
585             is => 'lazy',
586             handles => [qw(dequeue ready)],
587             builder => sub {
588 23     23   174 my ($self) = @_;
589 23 100       55 if ($self->has_parent) {
590 12         29 return $self->next_segment;
591             }
592             else {
593 11         154 return $self->main->config->queue_class->new();
594             }
595             },
596 4     4   5828 );
597             }
598              
599             #pod =head2 follower
600             #pod
601             #pod A hashref of children paths to the child's next adjacent segment. Used by C.
602             #pod
603             #pod =cut
604              
605             has follower => (
606             is => 'lazy',
607             isa => HashRef,
608             builder => sub {
609 8     8   68 my ($self) = @_;
610 8 50       23 return {} unless $self->has_children;
611 8         8 my %follow;
612 8         8 for my $index (0..$#{$self->children}) {
  8         25  
613 12 100       37 if (defined $self->children->[$index + 1]) {
614 4         19 $follow{$self->children->[$index]} =
615             $self->children->[$index + 1];
616             }
617             else {
618 8         121 $follow{$self->children->[$index]} = $self->drain;
619             }
620             }
621 8         125 return \%follow;
622             },
623             );
624              
625             #pod =head2 logger
626             #pod
627             #pod A reference to the logger for the pipeline. Handles L methods.
628             #pod
629             #pod =cut
630              
631             has logger => (
632             is => 'lazy',
633             isa => ConsumerOf['Piper::Role::Logger'],
634             handles => 'Piper::Role::Logger',
635             builder => sub {
636 35     35   703 my ($self) = @_;
637            
638 35 100       79 if ($self->has_parent) {
639 22         289 return $self->main->logger;
640             }
641             else {
642 13         175 return $self->main->config->logger_class->new();
643             }
644             },
645             );
646              
647             # Cute little trick to auto-insert the instance object
648             # as first argument, since $self will become the logger
649             # object and lose access to paths/labels/etc.
650             around [qw(INFO DEBUG WARN ERROR)] => sub {
651             my ($orig, $self) = splice @_, 0, 2;
652             if (ref $_[0]) {
653             $self->$orig(@_);
654             }
655             else {
656             $self->$orig($self, @_);
657             }
658             };
659              
660             #pod =head2 queue
661             #pod
662             #pod A reference to the location where data is queued for processing by this segment.
663             #pod
664             #pod =cut
665              
666             BEGIN { # Enables 'with Piper::Role::Queue'
667             has queue => (
668             is => 'lazy',
669             isa => ConsumerOf['Piper::Role::Queue'],
670             handles => [qw(enqueue requeue)],
671             builder => sub {
672 23     23   378 my ($self) = @_;
673 23 100       63 if ($self->has_children) {
674 8         114 return $self->children->[0];
675             }
676             else {
677 15         205 return $self->main->config->queue_class->new();
678             }
679             },
680 4     4   9171 );
681             }
682              
683             #pod =head2 segment
684             #pod
685             #pod The L or L object from which the instance segment was created.
686             #pod
687             #pod =cut
688              
689             BEGIN { # So we can 'around' on Piper::Role::Segment methods
690 4     4   17127 has segment => (
691             is => 'ro',
692             isa => ConsumerOf['Piper::Role::Segment'],
693             handles => 'Piper::Role::Segment',
694             required => 1,
695             );
696             }
697              
698             #pod =head1 UTILITY METHODS
699             #pod
700             #pod None of these should be directly accessed. Documented for contributors and source-code readers.
701             #pod
702             #pod =head2 descendant($path, $referrer)
703             #pod
704             #pod Returns a child segment if its path ends with C<$path>. Does not search children with a path of C<$referrer>, as it was presumably already searched by a previous iteration of the search. Used by C.
705             #pod
706             #pod =cut
707              
708             sub descendant {
709 554     554 1 19339 my ($self, $path, $referrer) = @_;
710 554 50       1095 return unless $self->has_children;
711 554   100     1388 $referrer //= '';
712              
713 554         4398 $self->DEBUG("Searching for location '$path'");
714 554 100       12587 $self->DEBUG('Referrer', $referrer) if $referrer;
715              
716             # Search immediate children
717 554 100 66     7998 $path = Piper::Path->new($path) if $path and not ref $path;
718 554 50       8230 my @pieces = $path ? $path->split : ();
719 554         679 my $descend = $self;
720 554   100     1834 while (defined $descend and @pieces) {
721 731 100       10258 if (!$descend->has_children) {
    100          
722 62         117 $descend = undef;
723             }
724             elsif (exists $descend->directory->{$pieces[0]}) {
725 348         5242 $descend = $descend->directory->{$pieces[0]};
726 348         2076 shift @pieces;
727             }
728             else {
729 321         1896 $descend = undef;
730             }
731             }
732              
733             # Search grandchildren,
734             # but not when checking whether requested location starts at $self (referrer = $self)
735 554 100 100     1435 if (!defined $descend and $referrer ne $self) {
736 358         269 my @possible;
737 358         241 for my $child (@{$self->children}) {
  358         535  
738 570 100       885 if ($child eq $referrer) {
739 79         126 $self->DEBUG("Skipping search of '$child' referrer");
740 79         1726 next;
741             }
742 491 100       1076 if ($child->has_children) {
743 133         238 my $potential = $child->descendant($path);
744 133 100       287 push @possible, $potential if defined $potential;
745             }
746             }
747              
748 358 100       685 if (@possible) {
749 24     24   161 $descend = min_by { $_->path->split } @possible;
  24         449  
750             }
751             }
752              
753             # If location begins with $self->label, see if requested location starts at $self
754             # but not if already checking that (referrer = $self)
755 554 100 100     1390 if (!defined $descend and $referrer ne $self) {
756 334         4373 my $overlap = $self->label;
757 334 100       14924 if ($path =~ m{^\Q$overlap\E(?:$|/(?.*))}) {
758 4   100 4   34082 $path = $+{path} // '';
  4         1338  
  4         1297  
  134         904  
759 134 100       2355 $self->DEBUG('Overlapping descendant search', $path ? $path : ());
760 134 100       3065 $descend = $path ? $self->descendant($path, $self) : $self;
761             }
762             }
763              
764 554         2203 return $descend;
765             }
766              
767             #pod =head2 pressure
768             #pod
769             #pod An integer metric for the "fullness" of the pending queue. For handler instances (initialized from L objects), it is the percentage of pending items vs the batch size of the segment. For container instances (initialized from L objects), is is the maximum C of the contained instances. Used by process_batch for choosing which segment to process.
770             #pod
771             #pod =cut
772              
773             # Metric for "how full" the pending queue is
774             sub pressure {
775 532     532 1 399 my ($self) = @_;
776 532 100       692 if ($self->has_children) {
777 79         80 return max(map { $_->pressure } @{$self->children});
  213         4837  
  79         95  
778             }
779             else {
780 453 100       546 return $self->pending ? int(100 * $self->pending / $self->batch_size) : 0;
781             }
782             }
783              
784             #pod =head2 process_batch
785             #pod
786             #pod Chooses the "best" segment for processing, and processes a batch for that segment.
787             #pod
788             #pod It first attempts to choose the full-batch segment (C<< pending >= batch_size >>) closest to the end of the pipeline. If there are no full-batch segments, it chooses the segment closest to being full.
789             #pod
790             #pod =cut
791              
792             sub process_batch {
793 283     283 1 6187 my ($self) = @_;
794 283 100       509 if ($self->has_children) {
795 166         127 my $best;
796             # Full-batch process closest to drain
797 166 100   282   482 if ($best = last_value { $_->pressure >= 100 } @{$self->children}) {
  282         939  
  166         394  
798 148         262 $self->DEBUG("Chose batch $best: full-batch process closest to drain");
799             }
800             # If no full batch, choose the one closest to full
801             else {
802 18     31   727 $best = max_by { $_->pressure } @{$self->children};
  31         424  
  18         73  
803 18         792 $self->DEBUG("Chose batch $best: closest to full-batch");
804             }
805 166         4057 $best->process_batch;
806             }
807             else {
808 117         1880 my $num = $self->batch_size;
809 117         8235 $self->DEBUG('Processing batch with max size', $num);
810              
811 117         3932 my @batch = $self->queue->dequeue($num);
812 117         1749 $self->INFO('Processing batch', @batch);
813              
814             $self->segment->handler->(
815             $self,
816             \@batch,
817 117         2732 @{$self->args}
  117         1633  
818             );
819             }
820             }
821              
822             1;
823              
824             __END__