File Coverage

blib/lib/Parallel/Benchmark.pm
Criterion Covered Total %
statement 126 133 94.7
branch 13 16 81.2
condition 2 3 66.6
subroutine 25 28 89.2
pod 2 2 100.0
total 168 182 92.3


line stmt bran cond sub pod time code
1             package Parallel::Benchmark;
2 21     21   337097 use strict;
  21         42  
  21         743  
3 21     21   87 use warnings;
  21         34  
  21         1307  
4             our $VERSION = '0.10';
5              
6 21     21   9522 use Mouse;
  21         478124  
  21         98  
7 21     21   17846 use Log::Minimal;
  21         357659  
  21         124  
8 21     21   14540 use Time::HiRes qw/ tv_interval gettimeofday /;
  21         26770  
  21         82  
9 21     21   16555 use Parallel::ForkManager "1.12";
  21         597896  
  21         665  
10 21     21   9617 use Parallel::Scoreboard;
  21         56845  
  21         632  
11 21     21   132 use File::Temp qw/ tempdir /;
  21         33  
  21         1477  
12 21     21   96 use POSIX qw/ SIGUSR1 SIGUSR2 SIGTERM /;
  21         29  
  21         126  
13 21     21   15154 use Try::Tiny;
  21         22520  
  21         1061  
14 21     21   107 use Scalar::Util qw/ blessed /;
  21         24  
  21         24619  
15              
16             has benchmark => (
17             is => "rw",
18             isa => "CodeRef",
19             default => sub { sub { return 1 } },
20             );
21              
22             has setup => (
23             is => "rw",
24             isa => "CodeRef",
25             default => sub { sub { } },
26             );
27              
28             has teardown => (
29             is => "rw",
30             isa => "CodeRef",
31             default => sub { sub { } },
32             );
33              
34             has time => (
35             is => "rw",
36             isa => "Int",
37             default => 3,
38             );
39              
40             has concurrency => (
41             is => "rw",
42             isa => "Int",
43             default => 1,
44             );
45              
46             has debug => (
47             is => "rw",
48             isa => "Bool",
49             default => 0,
50             trigger => sub {
51             my ($self, $val) = @_;
52             $ENV{LM_DEBUG} = $val;
53             },
54             );
55              
56             has stash => (
57             is => "rw",
58             isa => "HashRef",
59             default => sub { +{} },
60             );
61              
62             has scoreboard => (
63             is => "rw",
64             default => sub {
65             my $dir = tempdir( CLEANUP => 1 );
66             Parallel::Scoreboard->new( base_dir => $dir );
67             },
68             );
69              
70             sub run {
71 20     20 1 135 my $self = shift;
72              
73 20 50       126 local $Log::Minimal::COLOR = 1
74             if -t *STDERR; ## no critic
75             local $Log::Minimal::PRINT = sub {
76 79     79   10870 my ( $time, $type, $message, $trace) = @_;
77 79         9296 warn "$time [$$] [$type] $message\n";
78 20         102 };
79              
80 20         169 infof "starting benchmark: concurrency: %d, time: %d",
81             $self->concurrency, $self->time;
82              
83 20         241 my $pm = Parallel::ForkManager->new( $self->concurrency );
84 20         5595 $pm->set_waitpid_blocking_sleep(0); # true blocking calls enabled
85 20         233 my $result = {
86             score => 0,
87             elapsed => 0,
88             stashes => {},
89             };
90             $pm->run_on_finish(
91             sub {
92 15     15   1322206 my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data) = @_;
93 15 50       64 if (defined $data) {
94 15         52 $result->{score} += $data->[1];
95 15         54 $result->{elapsed} += $data->[2];
96 15         120 $result->{stashes}->{ $data->[0] } = $data->[3];
97             }
98             }
99 20         293 );
100 20         211 my $pids = {};
101             local $SIG{INT} = $SIG{TERM} = sub {
102 0     0   0 infof "terminating benchmark processes...";
103 0         0 kill SIGTERM, keys %$pids;
104 0         0 $pm->wait_all_children;
105 0         0 exit;
106 20         687 };
107              
108             CHILD:
109 20         109 for my $n ( 1 .. $self->concurrency ) {
110 46         236 my $pid = $pm->start;
111 46 100       34372 if ($pid) {
112             # parent
113 31         283 $pids->{$pid} = 1;
114 31         146 next CHILD;
115             }
116             else {
117             # child
118 15     0   1580 local $SIG{INT} = $SIG{TERM} = sub { exit };
  0         0  
119 15         460 debugf "spwan child process[%d]", $n;
120 15         313 my $r = $self->_run_on_child($n);
121 15         413 $pm->finish(0, $r);
122 0         0 exit;
123             }
124             }
125              
126 5         192 $self->_wait_for_finish_setup($pids);
127              
128 5         8400 kill SIGUSR1, keys %$pids;
129 5         63 my $start = [gettimeofday];
130             try {
131             my $teardown = sub {
132 4         9812279 alarm 0;
133 4         6118 kill SIGUSR2, keys %$pids;
134 4         102 $pm->wait_all_children;
135 4         197 die;
136 5     5   140 };
137 5         57 local $SIG{INT} = $teardown;
138 5         36 local $SIG{ALRM} = $teardown;
139 5         63 alarm $self->time;
140 5         56 $pm->wait_all_children;
141 1         32 alarm 0;
142 5         104 };
143              
144 5         134 $result->{elapsed} = tv_interval($start);
145              
146 5         141 infof "done benchmark: score %s, elapsed %.3f sec = %.3f / sec",
147             $result->{score},
148             $result->{elapsed},
149             $result->{score} / $result->{elapsed},
150             ;
151 5         87 $result;
152             }
153              
154             sub _run_on_child {
155 15     15   168 my $self = shift;
156 15         62 my $n = shift;
157              
158 15         120 my $r = [ $n, 0, 0, {} ];
159             try {
160 15     15   2043 $self->scoreboard->update("setup_start");
161 15         8691 $self->setup->( $self, $n );
162 14         4000495 $self->scoreboard->update("setup_done");
163 14         635 $r = $self->_run_benchmark_on_child($n);
164 13         212 $self->teardown->( $self, $n );
165             }
166             catch {
167 3     3   2000256 my $e = $_;
168 3         37 critf "benchmark process[%d] died: %s", $n, $e;
169 15         695 };
170 15         431 return $r;
171             }
172              
173             sub _wait_for_finish_setup {
174 5     5   53 my $self = shift;
175 5         35 my $pids = shift;
176 5         32 while (1) {
177 7         7000875 sleep 1;
178 7         176 debugf "waiting for all children finish setup()";
179 7         149 my $stats = $self->scoreboard->read_all();
180 7         3637 my $done = 0;
181 7         59 for my $pid (keys %$pids) {
182 22 100       105 if (my $s = $stats->{$pid}) {
    50          
183 21 100       87 $done++ if $s eq "setup_done";
184             }
185             elsif ( kill(0, $pid) == 1 ) {
186             # maybe died...
187 1         4 delete $pids->{$pid};
188             }
189             }
190 7 100       187 last if $done == keys %$pids;
191             }
192             }
193              
194             sub _run_benchmark_on_child {
195 14     14   41 my $self = shift;
196 14         130 my $n = shift;
197              
198 14         62 my ($wait, $run) = (1, 1);
199 14     14   295 local $SIG{USR1} = sub { $wait = 0 };
  14         176  
200 14     11   162 local $SIG{USR2} = sub { $run = 0 };
  11         31009253  
201 14     0   235 local $SIG{INT} = sub {};
  0         0  
202              
203 14         16032192 sleep 1 while $wait;
204              
205 14         129 debugf "starting benchmark process[%d]", $n;
206              
207 14         194 my $benchmark = $self->benchmark;
208 14         31 my $score = 0;
209 14         249 my $start = [gettimeofday];
210              
211             try {
212 14     14   809 $score += $benchmark->( $self, $n ) while $run;
213             }
214             catch {
215 3     3   50 my $e = $_;
216 3         38 my $class = blessed $e;
217 3 100 66     27 if ( $class && $class eq __PACKAGE__ . "::HaltedException" ) {
218 2         8 infof "benchmark process[%d] halted: %s", $n, $$e;
219             }
220             else {
221 1         22 die $e;
222             }
223 14         343 };
224              
225 13         1434 my $elapsed = tv_interval($start);
226              
227 13         311 debugf "done benchmark process[%d]: score %s, elapsed %.3f sec.",
228             $n, $score, $elapsed;
229              
230 13         325 return [ $n, $score, $elapsed, $self->stash ];
231             }
232              
233             sub halt {
234 2     2 1 92 my $self = shift;
235 2         11 my $msg = shift;
236 2         60 die bless \$msg, __PACKAGE__ . "::HaltedException";
237             }
238              
239             1;
240             __END__