File Coverage

blib/lib/Parallel/Benchmark.pm
Criterion Covered Total %
statement 125 132 94.7
branch 13 16 81.2
condition 2 3 66.6
subroutine 25 28 89.2
pod 2 2 100.0
total 167 181 92.2


line stmt bran cond sub pod time code
1             package Parallel::Benchmark;
2 21     21   499527 use strict;
  21         46  
  21         816  
3 21     21   154 use warnings;
  21         42  
  21         876  
4             our $VERSION = '0.08';
5              
6 21     21   18141 use Mouse;
  21         785932  
  21         120  
7 21     21   29956 use Log::Minimal;
  21         590442  
  21         182  
8 21     21   31239 use Time::HiRes qw/ tv_interval gettimeofday /;
  21         41878  
  21         110  
9 21     21   26756 use Parallel::ForkManager;
  21         944597  
  21         890  
10 21     21   21760 use Parallel::Scoreboard;
  21         88026  
  21         827  
11 21     21   200 use File::Temp qw/ tempdir /;
  21         47  
  21         2426  
12 21     21   118 use POSIX qw/ SIGUSR1 SIGUSR2 SIGTERM /;
  21         52  
  21         180  
13 21     21   24437 use Try::Tiny;
  21         36972  
  21         1523  
14 21     21   171 use Scalar::Util qw/ blessed /;
  21         30  
  21         38404  
15              
16             has benchmark => (
17             is => "rw",
18             isa => "CodeRef",
19             default => sub { sub { return 1 } },
20             );
21              
22             has setup => (
23             is => "rw",
24             isa => "CodeRef",
25             default => sub { sub { } },
26             );
27              
28             has teardown => (
29             is => "rw",
30             isa => "CodeRef",
31             default => sub { sub { } },
32             );
33              
34             has time => (
35             is => "rw",
36             isa => "Int",
37             default => 3,
38             );
39              
40             has concurrency => (
41             is => "rw",
42             isa => "Int",
43             default => 1,
44             );
45              
46             has debug => (
47             is => "rw",
48             isa => "Bool",
49             default => 0,
50             trigger => sub {
51             my ($self, $val) = @_;
52             $ENV{LM_DEBUG} = $val;
53             },
54             );
55              
56             has stash => (
57             is => "rw",
58             isa => "HashRef",
59             default => sub { +{} },
60             );
61              
62             has scoreboard => (
63             is => "rw",
64             default => sub {
65             my $dir = tempdir( CLEANUP => 1 );
66             Parallel::Scoreboard->new( base_dir => $dir );
67             },
68             );
69              
70             sub run {
71 20     20 1 191 my $self = shift;
72              
73 20 50       216 local $Log::Minimal::COLOR = 1
74             if -t *STDERR; ## no critic
75             local $Log::Minimal::PRINT = sub {
76 81     81   25210 my ( $time, $type, $message, $trace) = @_;
77 81         32234 warn "$time [$$] [$type] $message\n";
78 20         142 };
79              
80 20         260 infof "starting benchmark: concurrency: %d, time: %d",
81             $self->concurrency, $self->time;
82              
83 20         280 my $pm = Parallel::ForkManager->new( $self->concurrency );
84 20         10277 my $result = {
85             score => 0,
86             elapsed => 0,
87             stashes => {},
88             };
89             $pm->run_on_finish(
90             sub {
91 15     15   25694507 my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data) = @_;
92 15 50       96 if (defined $data) {
93 15         69 $result->{score} += $data->[1];
94 15         63 $result->{elapsed} += $data->[2];
95 15         168 $result->{stashes}->{ $data->[0] } = $data->[3];
96             }
97             }
98 20         234 );
99 20         398 my $pids = {};
100             local $SIG{INT} = $SIG{TERM} = sub {
101 0     0   0 infof "terminating benchmark processes...";
102 0         0 kill SIGTERM, keys %$pids;
103 0         0 $pm->wait_all_children;
104 0         0 exit;
105 20         978 };
106              
107             CHILD:
108 20         304 for my $n ( 1 .. $self->concurrency ) {
109 46         311 my $pid = $pm->start;
110 46 100       185661615 if ($pid) {
111             # parent
112 31         576 $pids->{$pid} = 1;
113 31         306 next CHILD;
114             }
115             else {
116             # child
117 15     0   27633 local $SIG{INT} = $SIG{TERM} = sub { exit };
  0         0  
118 15         1031 debugf "spwan child process[%d]", $n;
119 15         610 my $r = $self->_run_on_child($n);
120 15         531 $pm->finish(0, $r);
121 0         0 exit;
122             }
123             }
124              
125 5         312 $self->_wait_for_finish_setup($pids);
126              
127 5         10932 kill SIGUSR1, keys %$pids;
128 5         101 my $start = [gettimeofday];
129             try {
130             my $teardown = sub {
131 4         9545917 alarm 0;
132 4         278 kill SIGUSR2, keys %$pids;
133 4         1039710 $pm->wait_all_children;
134 4         280 die;
135 5     5   224 };
136 5         116 local $SIG{INT} = $teardown;
137 5         98 local $SIG{ALRM} = $teardown;
138 5         101 alarm $self->time;
139 5         71 $pm->wait_all_children;
140 1         43 alarm 0;
141 5         212 };
142              
143 5         179 $result->{elapsed} = tv_interval($start);
144              
145 5         149 infof "done benchmark: score %s, elapsed %.3f sec = %.3f / sec",
146             $result->{score},
147             $result->{elapsed},
148             $result->{score} / $result->{elapsed},
149             ;
150 5         164 $result;
151             }
152              
153             sub _run_on_child {
154 15     15   168 my $self = shift;
155 15         248 my $n = shift;
156              
157 15         493 my $r = [ $n, 0, 0, {} ];
158             try {
159 15     15   7846 $self->scoreboard->update("setup_start");
160 15         28174 $self->setup->( $self, $n );
161 14         4000977 $self->scoreboard->update("setup_done");
162 14         6224 $r = $self->_run_benchmark_on_child($n);
163 13         280 $self->teardown->( $self, $n );
164             }
165             catch {
166 3     3   2000540 my $e = $_;
167 3         100 critf "benchmark process[%d] died: %s", $n, $e;
168 15         2591 };
169 15         755 return $r;
170             }
171              
172             sub _wait_for_finish_setup {
173 5     5   96 my $self = shift;
174 5         86 my $pids = shift;
175 5         43 while (1) {
176 9         9001955 sleep 1;
177 9         235801 debugf "waiting for all children finish setup()";
178 9         271 my $stats = $self->scoreboard->read_all();
179 9         7933 my $done = 0;
180 9         60 for my $pid (keys %$pids) {
181 29 100       178 if (my $s = $stats->{$pid}) {
    50          
182 28 100       167 $done++ if $s eq "setup_done";
183             }
184             elsif ( kill(0, $pid) == 1 ) {
185             # maybe died...
186 1         8 delete $pids->{$pid};
187             }
188             }
189 9 100       247 last if $done == keys %$pids;
190             }
191             }
192              
193             sub _run_benchmark_on_child {
194 14     14   114 my $self = shift;
195 14         158 my $n = shift;
196              
197 14         67 my ($wait, $run) = (1, 1);
198 14     14   865 local $SIG{USR1} = sub { $wait = 0 };
  14         546  
199 14     11   879 local $SIG{USR2} = sub { $run = 0 };
  11         60720136  
200 14     0   702 local $SIG{INT} = sub {};
  0         0  
201              
202 14         23377184 sleep 1 while $wait;
203              
204 14         286 debugf "starting benchmark process[%d]", $n;
205              
206 14         185 my $benchmark = $self->benchmark;
207 14         56 my $score = 0;
208 14         370 my $start = [gettimeofday];
209              
210             try {
211 14     14   2564 $score += $benchmark->( $self, $n ) while $run;
212             }
213             catch {
214 3     3   75 my $e = $_;
215 3         46 my $class = blessed $e;
216 3 100 66     59 if ( $class && $class eq __PACKAGE__ . "::HaltedException" ) {
217 2         12 infof "benchmark process[%d] halted: %s", $n, $$e;
218             }
219             else {
220 1         86 die $e;
221             }
222 14         672 };
223              
224 13         2476 my $elapsed = tv_interval($start);
225              
226 13         520 debugf "done benchmark process[%d]: score %s, elapsed %.3f sec.",
227             $n, $score, $elapsed;
228              
229 13         707 return [ $n, $score, $elapsed, $self->stash ];
230             }
231              
232             sub halt {
233 2     2 1 156 my $self = shift;
234 2         65 my $msg = shift;
235 2         120 die bless \$msg, __PACKAGE__ . "::HaltedException";
236             }
237              
238             1;
239             __END__