File Coverage

blib/lib/Parallel/Benchmark.pm
Criterion Covered Total %
statement 126 133 94.7
branch 13 16 81.2
condition 2 3 66.6
subroutine 25 28 89.2
pod 2 2 100.0
total 168 182 92.3


line stmt bran cond sub pod time code
1             package Parallel::Benchmark;
2 21     21   311936 use strict;
  21         38  
  21         667  
3 21     21   79 use warnings;
  21         26  
  21         733  
4             our $VERSION = '0.09';
5              
6 21     21   9480 use Mouse;
  21         417830  
  21         84  
7 21     21   13627 use Log::Minimal;
  21         294704  
  21         107  
8 21     21   11506 use Time::HiRes qw/ tv_interval gettimeofday /;
  21         22476  
  21         74  
9 21     21   13488 use Parallel::ForkManager "1.08";
  21         508810  
  21         578  
10 21     21   8487 use Parallel::Scoreboard;
  21         48887  
  21         646  
11 21     21   110 use File::Temp qw/ tempdir /;
  21         29  
  21         1181  
12 21     21   79 use POSIX qw/ SIGUSR1 SIGUSR2 SIGTERM /;
  21         22  
  21         97  
13 21     21   10088 use Try::Tiny;
  21         20094  
  21         903  
14 21     21   93 use Scalar::Util qw/ blessed /;
  21         21  
  21         21033  
15              
16             has benchmark => (
17             is => "rw",
18             isa => "CodeRef",
19             default => sub { sub { return 1 } },
20             );
21              
22             has setup => (
23             is => "rw",
24             isa => "CodeRef",
25             default => sub { sub { } },
26             );
27              
28             has teardown => (
29             is => "rw",
30             isa => "CodeRef",
31             default => sub { sub { } },
32             );
33              
34             has time => (
35             is => "rw",
36             isa => "Int",
37             default => 3,
38             );
39              
40             has concurrency => (
41             is => "rw",
42             isa => "Int",
43             default => 1,
44             );
45              
46             has debug => (
47             is => "rw",
48             isa => "Bool",
49             default => 0,
50             trigger => sub {
51             my ($self, $val) = @_;
52             $ENV{LM_DEBUG} = $val;
53             },
54             );
55              
56             has stash => (
57             is => "rw",
58             isa => "HashRef",
59             default => sub { +{} },
60             );
61              
62             has scoreboard => (
63             is => "rw",
64             default => sub {
65             my $dir = tempdir( CLEANUP => 1 );
66             Parallel::Scoreboard->new( base_dir => $dir );
67             },
68             );
69              
70             sub run {
71 20     20 1 118 my $self = shift;
72              
73 20 50       113 local $Log::Minimal::COLOR = 1
74             if -t *STDERR; ## no critic
75             local $Log::Minimal::PRINT = sub {
76 79     79   10425 my ( $time, $type, $message, $trace) = @_;
77 79         16029 warn "$time [$$] [$type] $message\n";
78 20         92 };
79              
80 20         144 infof "starting benchmark: concurrency: %d, time: %d",
81             $self->concurrency, $self->time;
82              
83 20         468 my $pm = Parallel::ForkManager->new( $self->concurrency );
84 20         53642 $pm->set_waitpid_blocking_sleep(0); # true blocking calls enabled
85 20         268 my $result = {
86             score => 0,
87             elapsed => 0,
88             stashes => {},
89             };
90             $pm->run_on_finish(
91             sub {
92 15     15   1393406 my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data) = @_;
93 15 50       63 if (defined $data) {
94 15         47 $result->{score} += $data->[1];
95 15         49 $result->{elapsed} += $data->[2];
96 15         114 $result->{stashes}->{ $data->[0] } = $data->[3];
97             }
98             }
99 20         519 );
100 20         247 my $pids = {};
101             local $SIG{INT} = $SIG{TERM} = sub {
102 0     0   0 infof "terminating benchmark processes...";
103 0         0 kill SIGTERM, keys %$pids;
104 0         0 $pm->wait_all_children;
105 0         0 exit;
106 20         768 };
107              
108             CHILD:
109 20         128 for my $n ( 1 .. $self->concurrency ) {
110 46         212 my $pid = $pm->start;
111 46 100       40242 if ($pid) {
112             # parent
113 31         248 $pids->{$pid} = 1;
114 31         163 next CHILD;
115             }
116             else {
117             # child
118 15     0   1724 local $SIG{INT} = $SIG{TERM} = sub { exit };
  0         0  
119 15         518 debugf "spwan child process[%d]", $n;
120 15         358 my $r = $self->_run_on_child($n);
121 15         366 $pm->finish(0, $r);
122 0         0 exit;
123             }
124             }
125              
126 5         162 $self->_wait_for_finish_setup($pids);
127              
128 5         16989 kill SIGUSR1, keys %$pids;
129 5         98 my $start = [gettimeofday];
130             try {
131             my $teardown = sub {
132 4         9819656 alarm 0;
133 4         5813 kill SIGUSR2, keys %$pids;
134 4         110 $pm->wait_all_children;
135 4         178 die;
136 5     5   152 };
137 5         77 local $SIG{INT} = $teardown;
138 5         35 local $SIG{ALRM} = $teardown;
139 5         59 alarm $self->time;
140 5         56 $pm->wait_all_children;
141 1         22 alarm 0;
142 5         159 };
143              
144 5         131 $result->{elapsed} = tv_interval($start);
145              
146 5         129 infof "done benchmark: score %s, elapsed %.3f sec = %.3f / sec",
147             $result->{score},
148             $result->{elapsed},
149             $result->{score} / $result->{elapsed},
150             ;
151 5         91 $result;
152             }
153              
154             sub _run_on_child {
155 15     15   102 my $self = shift;
156 15         61 my $n = shift;
157              
158 15         129 my $r = [ $n, 0, 0, {} ];
159             try {
160 15     15   1971 $self->scoreboard->update("setup_start");
161 15         8880 $self->setup->( $self, $n );
162 14         4001738 $self->scoreboard->update("setup_done");
163 14         81418 $r = $self->_run_benchmark_on_child($n);
164 13         279 $self->teardown->( $self, $n );
165             }
166             catch {
167 3     3   2000202 my $e = $_;
168 3         50 critf "benchmark process[%d] died: %s", $n, $e;
169 15         606 };
170 15         416 return $r;
171             }
172              
173             sub _wait_for_finish_setup {
174 5     5   42 my $self = shift;
175 5         22 my $pids = shift;
176 5         32 while (1) {
177 7         7000793 sleep 1;
178 7         147 debugf "waiting for all children finish setup()";
179 7         117 my $stats = $self->scoreboard->read_all();
180 7         43803 my $done = 0;
181 7         46 for my $pid (keys %$pids) {
182 22 100       138 if (my $s = $stats->{$pid}) {
    50          
183 21 100       86 $done++ if $s eq "setup_done";
184             }
185             elsif ( kill(0, $pid) == 1 ) {
186             # maybe died...
187 1         4 delete $pids->{$pid};
188             }
189             }
190 7 100       150 last if $done == keys %$pids;
191             }
192             }
193              
194             sub _run_benchmark_on_child {
195 14     14   70 my $self = shift;
196 14         76 my $n = shift;
197              
198 14         54 my ($wait, $run) = (1, 1);
199 14     14   328 local $SIG{USR1} = sub { $wait = 0 };
  14         142  
200 14     11   170 local $SIG{USR2} = sub { $run = 0 };
  11         31023440  
201 14     0   130 local $SIG{INT} = sub {};
  0         0  
202              
203 14         16120579 sleep 1 while $wait;
204              
205 14         279 debugf "starting benchmark process[%d]", $n;
206              
207 14         139 my $benchmark = $self->benchmark;
208 14         32 my $score = 0;
209 14         177 my $start = [gettimeofday];
210              
211             try {
212 14     14   1063 $score += $benchmark->( $self, $n ) while $run;
213             }
214             catch {
215 3     3   54 my $e = $_;
216 3         32 my $class = blessed $e;
217 3 100 66     44 if ( $class && $class eq __PACKAGE__ . "::HaltedException" ) {
218 2         9 infof "benchmark process[%d] halted: %s", $n, $$e;
219             }
220             else {
221 1         30 die $e;
222             }
223 14         354 };
224              
225 13         1536 my $elapsed = tv_interval($start);
226              
227 13         339 debugf "done benchmark process[%d]: score %s, elapsed %.3f sec.",
228             $n, $score, $elapsed;
229              
230 13         300 return [ $n, $score, $elapsed, $self->stash ];
231             }
232              
233             sub halt {
234 2     2 1 113 my $self = shift;
235 2         21 my $msg = shift;
236 2         48 die bless \$msg, __PACKAGE__ . "::HaltedException";
237             }
238              
239             1;
240             __END__