File Coverage

blib/lib/Parallel/Queue/Manager.pm
Criterion Covered Total %
statement 59 63 93.6
branch 14 18 77.7
condition 3 5 60.0
subroutine 18 18 100.0
pod 0 10 0.0
total 94 114 82.4


line stmt bran cond sub pod time code
1             ########################################################################
2             # housekeeping
3             ########################################################################
4              
5             package Parallel::Queue::Manager;
6 85     85   134352 use v5.24;
  85         277  
7 85     85   405 use mro qw( c3 );
  85         167  
  85         507  
8              
9 85     85   34929 use Parallel::Queue qw( noexport );
  85         171  
  85         1298  
10              
11 85     85   551 use mro::EVERY;
  85         119  
  85         296  
12              
13 85     85   2528 use Carp qw( croak );
  85         179  
  85         3919  
14 85     85   455 use Scalar::Util qw( blessed reftype );
  85         116  
  85         48669  
15              
16             ########################################################################
17             # package variables
18             ########################################################################
19              
20             our @CARP_NOT = ( __PACKAGE__, qw( mro mro::EVERY ) );
21              
22             my $parent_pid = $$;
23              
24             ########################################################################
25             # methods
26             ########################################################################
27              
28             sub handler : lvalue
29             {
30 2159     2159 0 3921 my $qmgr = shift;
31 2159 50       8962 @_ or return $qmgr->[0];
32            
33 0         0 my $handler = shift;
34              
35 0 0       0 'CODE' eq reftype $handler
36             or croak "handler: '$handler' is not CODE.";
37              
38 0         0 $qmgr->[0] = $handler
39             }
40              
41             sub queue : lvalue
42             {
43 4107     4107 0 10419 my $qmgr = shift;
44 4107 50 100     18365 @_ or return $qmgr->[1] ||= [];
45              
46 0         0 $qmgr->[1] = shift
47             }
48              
49             sub next
50             {
51 2762     2762 0 3627 my $qmgr = shift;
52 2762         6815 my $queue = $qmgr->queue;
53              
54             # the caller may want undef as an argument, who knows?
55             # only fix is an exception to indicate no further jobs.
56              
57 2762 100       12490 @$queue
58             ? shift @$queue
59             : die "Empty queue.\n"
60             }
61              
62             sub configure
63             {
64 500     500 0 2696 my $handler = Parallel::Queue->can( 'configure' );
65 500         808 my $qmgr = shift;
66              
67 500         1856 $handler->( @_, qw( noexport ) );
68              
69 500         1702 $qmgr
70             }
71              
72             sub runqueue
73             {
74 500     500 0 1246 state $runq = Parallel::Queue->can( 'runqueue' );
75              
76 500         1263 $parent_pid = $$;
77              
78 500         910 my $qmgr = shift;
79 500         860 my $jobs = shift;
80              
81             # note that the queue may already be loaded
82             # from construction or previous assignment.
83              
84 500 100       1793 $qmgr->queue = [ @_ ]
85             if @_;
86              
87 500         1837 $runq->( $jobs, $qmgr );
88 422         1497 $qmgr
89             }
90              
91             sub next_job
92             {
93             # pull the item off the stack in the
94             # parent process, not the child. ignore
95             # $@, eval returning undef is sufficient
96             # to end queue execution.
97              
98 2762     2762 0 5867 my $qmgr = shift;
99              
100             eval
101 2762         8634 {
102 2762         8453 my $next = $qmgr->next;
103 1654     1654   4131 sub { $qmgr->handler->( $next ) }
104 2345         20396 }
105             }
106              
107             ########################################################################
108             # object manglement
109             ########################################################################
110              
111             sub new
112             {
113 503     503 0 927879 my $qmgr = &construct;
114              
115 503         5504 $qmgr->EVERY::LAST::initialize( @_ );
116 503         2431 $qmgr
117             }
118              
119             sub construct
120             {
121 503     503 0 1356 my $proto = shift;
122              
123 503   33     3711 bless [], blessed $proto || $proto;
124             }
125              
126             sub initialize
127             {
128 503     503 0 30140 my $qmgr = shift;
129              
130 503 100       1944 $qmgr->handler = shift if @_;
131 503 100       1703 $qmgr->queue = [ @_ ] if @_;
132              
133             return
134 503         1073 }
135              
136             DESTROY
137             {
138 424     424   365406 my $qmgr = shift;
139              
140 424         6193 $qmgr->EVERY::cleanup;
141              
142 424         2037 undef @$qmgr;
143 424         701 undef $qmgr;
144              
145             return
146 424         4427 }
147              
148             sub cleanup
149             {
150 424 100   424 0 35680 if( $$ == $parent_pid )
151             {
152 420         736 my $qmgr = shift;
153 420         1283 my $queue = $qmgr->queue;
154              
155 420 100       1303 say STDERR join "\n\t", "($$) Incomplete jobs:", @$queue
156             if @$queue;
157             }
158             else
159             {
160             # child running individual job has nothing to
161             # clean up.
162             }
163              
164             return
165 424         843 }
166              
167             # keep require happy
168             1
169             __END__