File Coverage

blib/lib/MCE/Util.pm
Criterion Covered Total %
statement 105 216 48.6
branch 38 164 23.1
condition 11 30 36.6
subroutine 17 23 73.9
pod 1 1 100.0
total 172 434 39.6


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Utility functions.
4             ##
5             ###############################################################################
6              
7             package MCE::Util;
8              
9 107     107   617 use strict;
  107         163  
  107         2828  
10 107     107   423 use warnings;
  107         155  
  107         2596  
11              
12 107     107   417 no warnings qw( threads recursion uninitialized numeric );
  107         776  
  107         4457  
13              
14             our $VERSION = '1.887';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17              
18 107     107   10908 use IO::Handle ();
  107         116579  
  107         1845  
19 107     107   11834 use Socket qw( AF_UNIX SOL_SOCKET SO_SNDBUF SO_RCVBUF );
  107         67823  
  107         6251  
20 107     107   515 use Time::HiRes qw( sleep time );
  107         176  
  107         559  
21 107     107   45803 use Errno ();
  107         114193  
  107         2788  
22 107     107   589 use base qw( Exporter );
  107         178  
  107         18344  
23              
24             my ($_is_winenv, $_zero_bytes, %_sock_ready);
25              
26             BEGIN {
27 107 50   107   999 $_is_winenv = ( $^O =~ /mswin|mingw|msys|cygwin/i ) ? 1 : 0;
28 107         234431 $_zero_bytes = pack('L', 0);
29             }
30              
31             sub CLONE {
32 0     0   0 %_sock_ready = ();
33             }
34              
35             our $LF = "\012"; Internals::SvREADONLY($LF, 1);
36              
37             our @EXPORT_OK = qw( $LF get_ncpu );
38             our %EXPORT_TAGS = ( all => \@EXPORT_OK );
39              
40             ###############################################################################
41             ## ----------------------------------------------------------------------------
42             ## The get_ncpu subroutine, largely adopted from Test::Smoke::Util.pm,
43             ## returns the number of logical (online/active/enabled) CPU cores;
44             ## never smaller than one.
45             ##
46             ## A warning is emitted to STDERR when it cannot recognize the operating
47             ## system or the external command failed.
48             ##
49             ###############################################################################
50              
51             my $g_ncpu;
52              
53             sub get_ncpu {
54 66 100   66 1 229 return $g_ncpu if (defined $g_ncpu);
55              
56 61         534 local $ENV{PATH} = "/usr/sbin:/sbin:/usr/bin:/bin:$ENV{PATH}";
57 61         277 $ENV{PATH} =~ /(.*)/; $ENV{PATH} = $1; ## Remove tainted'ness
  61         304  
58              
59 61         134 my $ncpu = 1;
60              
61             OS_CHECK: {
62 61         111 local $_ = lc $^O;
  61         242  
63              
64 61 50       286 /linux/ && do {
65 61         128 my ( $count, $fh );
66 61 50       3376 if ( open $fh, '<', '/proc/stat' ) {
67 61         3529 $count = grep { /^cpu\d/ } <$fh>;
  1464         2617  
68 61         738 close $fh;
69             }
70 61 50       288 $ncpu = $count if $count;
71 61         330 last OS_CHECK;
72             };
73              
74 0 0       0 /bsd|darwin|dragonfly/ && do {
75 0         0 chomp( my @output = `sysctl -n hw.ncpu 2>/dev/null` );
76 0 0       0 $ncpu = $output[0] if @output;
77 0         0 last OS_CHECK;
78             };
79              
80 0 0       0 /aix/ && do {
81 0         0 my @output = `lparstat -i 2>/dev/null | grep "^Online Virtual CPUs"`;
82 0 0       0 if ( @output ) {
83 0         0 $output[0] =~ /(\d+)\n$/;
84 0 0       0 $ncpu = $1 if $1;
85             }
86 0 0       0 if ( !$ncpu ) {
87 0         0 @output = `pmcycles -m 2>/dev/null`;
88 0 0       0 if ( @output ) {
89 0         0 $ncpu = scalar @output;
90             } else {
91 0         0 @output = `lsdev -Cc processor -S Available 2>/dev/null`;
92 0 0       0 $ncpu = scalar @output if @output;
93             }
94             }
95 0         0 last OS_CHECK;
96             };
97              
98 0 0       0 /gnu/ && do {
99 0         0 chomp( my @output = `nproc 2>/dev/null` );
100 0 0       0 $ncpu = $output[0] if @output;
101 0         0 last OS_CHECK;
102             };
103              
104 0 0       0 /haiku/ && do {
105 0         0 my @output = `sysinfo -cpu 2>/dev/null | grep "^CPU #"`;
106 0 0       0 $ncpu = scalar @output if @output;
107 0         0 last OS_CHECK;
108             };
109              
110 0 0       0 /hp-?ux/ && do {
111 0         0 my $count = grep { /^processor/ } `ioscan -fkC processor 2>/dev/null`;
  0         0  
112 0 0       0 $ncpu = $count if $count;
113 0         0 last OS_CHECK;
114             };
115              
116 0 0       0 /irix/ && do {
117 0         0 my @out = grep { /\s+processors?$/i } `hinv -c processor 2>/dev/null`;
  0         0  
118 0 0       0 $ncpu = (split ' ', $out[0])[0] if @out;
119 0         0 last OS_CHECK;
120             };
121              
122 0 0       0 /osf|solaris|sunos|svr5|sco/ && do {
123 0 0       0 if (-x '/usr/sbin/psrinfo') {
124 0         0 my $count = grep { /on-?line/ } `psrinfo 2>/dev/null`;
  0         0  
125 0 0       0 $ncpu = $count if $count;
126             }
127             else {
128 0         0 my @output = grep { /^NumCPU = \d+/ } `uname -X 2>/dev/null`;
  0         0  
129 0 0       0 $ncpu = (split ' ', $output[0])[2] if @output;
130             }
131 0         0 last OS_CHECK;
132             };
133              
134 0 0       0 /mswin|mingw|msys|cygwin/ && do {
135 0 0       0 if (exists $ENV{NUMBER_OF_PROCESSORS}) {
136 0         0 $ncpu = $ENV{NUMBER_OF_PROCESSORS};
137             }
138 0         0 last OS_CHECK;
139             };
140              
141 0         0 warn "MCE::Util::get_ncpu: command failed or unknown operating system\n";
142             }
143              
144 61 50 33     757 $ncpu = 1 if (!$ncpu || $ncpu < 1);
145              
146 61         416 return $g_ncpu = $ncpu;
147             }
148              
149             ###############################################################################
150             ## ----------------------------------------------------------------------------
151             ## Private methods for pipes and sockets.
152             ##
153             ###############################################################################
154              
155             sub _destroy_pipes {
156 193     193   873 my ($_obj, @_params) = @_;
157 193         698 local ($!,$?); local $SIG{__DIE__};
  193         697  
158              
159 193         23425 for my $_p (@_params) {
160 386 50       990 next unless (defined $_obj->{$_p});
161              
162 386 50       4671 if (ref $_obj->{$_p} eq 'ARRAY') {
163 0         0 for my $_i (0 .. @{ $_obj->{$_p} } - 1) {
  0         0  
164 0 0       0 next unless (defined $_obj->{$_p}[$_i]);
165 0 0       0 close $_obj->{$_p}[$_i] if (fileno $_obj->{$_p}[$_i]);
166 0         0 undef $_obj->{$_p}[$_i];
167             }
168             }
169             else {
170 386 50       4570 close $_obj->{$_p} if (fileno $_obj->{$_p});
171 386         1960 undef $_obj->{$_p};
172             }
173             }
174              
175 193         981 return;
176             }
177              
178             sub _destroy_socks {
179 110     110   1876 my ($_obj, @_params) = @_;
180 110         905 local ($!,$?,$@); local $SIG{__DIE__};
  110         790  
181              
182 110         567 for my $_p (@_params) {
183 772 100       2391 next unless (defined $_obj->{$_p});
184              
185 606 100       1675 if (ref $_obj->{$_p} eq 'ARRAY') {
186 154         241 for my $_i (0 .. @{ $_obj->{$_p} } - 1) {
  154         535  
187 478 50       1150 next unless (defined $_obj->{$_p}[$_i]);
188 478 50       1303 if (fileno $_obj->{$_p}[$_i]) {
189 478 50       942 syswrite($_obj->{$_p}[$_i], '0') if $_is_winenv;
190 478         22172 eval q{ CORE::shutdown($_obj->{$_p}[$_i], 2) };
191 478         6528 close $_obj->{$_p}[$_i];
192             }
193 478         2749 undef $_obj->{$_p}[$_i];
194             }
195             }
196             else {
197 452 50       1464 if (fileno $_obj->{$_p}) {
198 452 50       861 syswrite($_obj->{$_p}, '0') if $_is_winenv;
199 452         25071 eval q{ CORE::shutdown($_obj->{$_p}, 2) };
200 452         8380 close $_obj->{$_p};
201             }
202 452         3269 undef $_obj->{$_p};
203             }
204             }
205              
206 110         819 return;
207             }
208              
209             sub _pipe_pair {
210 431     431   1336 my ($_obj, $_r_sock, $_w_sock, $_i) = @_;
211 431         1272 local $!;
212              
213 431 50       853 if (defined $_i) {
214             # remove tainted'ness
215 0         0 ($_i) = $_i =~ /(.*)/;
216 0 0       0 pipe($_obj->{$_r_sock}[$_i], $_obj->{$_w_sock}[$_i]) or die "pipe: $!\n";
217 0         0 $_obj->{$_w_sock}[$_i]->autoflush(1);
218             }
219             else {
220 431 50       14808 pipe($_obj->{$_r_sock}, $_obj->{$_w_sock}) or die "pipe: $!\n";
221 431         2780 $_obj->{$_w_sock}->autoflush(1);
222             }
223              
224 431         18071 return;
225             }
226              
227             sub _sock_pair {
228 1011     1011   2577 my ($_obj, $_r_sock, $_w_sock, $_i, $_seq) = @_;
229 1011         2296 my $_size = 16384; local ($!, $@);
  1011         2329  
230              
231 1011 100       6279 if (defined $_i) {
232             # remove tainted'ness
233 517         3182 ($_i) = $_i =~ /(.*)/;
234              
235 517 100 66     16905 if ($_seq && $^O eq 'linux' && eval q{ Socket::SOCK_SEQPACKET() }) {
      66        
236             socketpair( $_obj->{$_r_sock}[$_i], $_obj->{$_w_sock}[$_i],
237 391 50       14899 AF_UNIX, Socket::SOCK_SEQPACKET(), 0 ) or do {
238 0 0       0 socketpair( $_obj->{$_r_sock}[$_i], $_obj->{$_w_sock}[$_i],
239             AF_UNIX, Socket::SOCK_STREAM(), 0 ) or die "socketpair: $!\n";
240             };
241             }
242             else {
243 126 50       5518 socketpair( $_obj->{$_r_sock}[$_i], $_obj->{$_w_sock}[$_i],
244             AF_UNIX, Socket::SOCK_STREAM(), 0 ) or die "socketpair: $!\n";
245             }
246              
247 517 50 33     3976 if ($^O ne 'aix' && $^O ne 'linux') {
248 0         0 setsockopt($_obj->{$_r_sock}[$_i], SOL_SOCKET, SO_SNDBUF, int $_size);
249 0         0 setsockopt($_obj->{$_r_sock}[$_i], SOL_SOCKET, SO_RCVBUF, int $_size);
250 0         0 setsockopt($_obj->{$_w_sock}[$_i], SOL_SOCKET, SO_SNDBUF, int $_size);
251 0         0 setsockopt($_obj->{$_w_sock}[$_i], SOL_SOCKET, SO_RCVBUF, int $_size);
252             }
253              
254 517         2037 $_obj->{$_r_sock}[$_i]->autoflush(1);
255 517         15671 $_obj->{$_w_sock}[$_i]->autoflush(1);
256             }
257             else {
258 494 100 66     22213 if ($_seq && $^O eq 'linux' && eval q{ Socket::SOCK_SEQPACKET() }) {
      66        
259             socketpair( $_obj->{$_r_sock}, $_obj->{$_w_sock},
260 465 50       22391 AF_UNIX, Socket::SOCK_SEQPACKET(), 0 ) or do {
261 0 0       0 socketpair( $_obj->{$_r_sock}, $_obj->{$_w_sock},
262             AF_UNIX, Socket::SOCK_STREAM(), 0 ) or die "socketpair: $!\n";
263             };
264             }
265             else {
266 29 50       1569 socketpair( $_obj->{$_r_sock}, $_obj->{$_w_sock},
267             AF_UNIX, Socket::SOCK_STREAM(), 0 ) or die "socketpair: $!\n";
268             }
269              
270 494 50 33     4142 if ($^O ne 'aix' && $^O ne 'linux') {
271 0         0 setsockopt($_obj->{$_r_sock}, SOL_SOCKET, SO_SNDBUF, int $_size);
272 0         0 setsockopt($_obj->{$_r_sock}, SOL_SOCKET, SO_RCVBUF, int $_size);
273 0         0 setsockopt($_obj->{$_w_sock}, SOL_SOCKET, SO_SNDBUF, int $_size);
274 0         0 setsockopt($_obj->{$_w_sock}, SOL_SOCKET, SO_RCVBUF, int $_size);
275             }
276              
277 494         2376 $_obj->{$_r_sock}->autoflush(1);
278 494         16732 $_obj->{$_w_sock}->autoflush(1);
279             }
280              
281 1011         24448 return;
282             }
283              
284             sub _sock_ready {
285 0     0   0 my ($_socket, $_timeout) = @_;
286 0 0 0     0 return '' if !defined $_timeout && $_sock_ready{"$_socket"} > 1;
287              
288 0         0 my ($_val_bytes, $_delay, $_start) = (pack('L', 0), 0, time);
289              
290 0 0       0 if (!defined $_timeout) {
291 0         0 $_sock_ready{"$_socket"}++;
292             }
293             else {
294 0 0       0 $_timeout = undef if $_timeout < 0;
295 0 0       0 $_timeout += $_start if $_timeout;
296             }
297              
298 0         0 while (1) {
299             # MSWin32 FIONREAD - from winsock2.h macro
300 0         0 ioctl($_socket, 0x4004667f, $_val_bytes);
301              
302 0 0       0 return '' if $_val_bytes ne $_zero_bytes;
303 0 0 0     0 return 1 if $_timeout && time > $_timeout;
304              
305             # delay after a while to not consume a CPU core
306 0 0       0 sleep(0.015), next if $_delay;
307 0 0       0 $_delay = 1 if time - $_start > 0.030;
308             }
309             }
310              
311             sub _sock_ready_w {
312 0     0   0 my ($_socket) = @_;
313 0 0       0 return if $_sock_ready{"${_socket}_w"} > 1;
314              
315 0         0 my $_vec = '';
316 0         0 $_sock_ready{"${_socket}_w"}++;
317              
318 0         0 while (1) {
319 0         0 vec($_vec, fileno($_socket), 1) = 1;
320 0 0       0 return if select(undef, $_vec, undef, 0) > 0;
321 0         0 sleep 0.045;
322             }
323              
324 0         0 return;
325             }
326              
327             sub _sysread {
328             ( @_ == 3
329             ? CORE::sysread($_[0], $_[1], $_[2])
330             : CORE::sysread($_[0], $_[1], $_[2], $_[3])
331             )
332 1293 50   1293   950874 or do {
    50          
333 0 0       0 goto \&_sysread if ($! == Errno::EINTR());
334             };
335             }
336              
337             sub _sysread2 {
338 0     0   0 my ($_bytes, $_delay, $_start);
339             # called by MCE/Core/Manager.pm
340              
341             SYSREAD: $_bytes = ( @_ == 3
342             ? CORE::sysread($_[0], $_[1], $_[2])
343             : CORE::sysread($_[0], $_[1], $_[2], $_[3])
344             )
345 0 0       0 or do {
    0          
346 0 0       0 unless ( defined $_bytes ) {
347 0 0       0 goto SYSREAD if ($! == Errno::EINTR());
348              
349             # non-blocking operation could not be completed
350 0 0 0     0 if ( $! == Errno::EWOULDBLOCK() || $! == Errno::EAGAIN() ) {
351 0 0       0 sleep(0.015), goto SYSREAD if $_delay;
352              
353             # delay after a while to not consume a CPU core
354 0 0       0 $_start = time unless $_start;
355 0 0       0 $_delay = 1 if time - $_start > 0.030;
356              
357 0         0 goto SYSREAD;
358             }
359             }
360             };
361              
362 0         0 return $_bytes;
363             }
364              
365             sub _nonblocking {
366 1118 50   1118   3665 if ($^O eq 'MSWin32') {
367             # MSWin32 FIONBIO - from winsock2.h macro
368 0 0       0 my $nonblocking = $_[1] ? pack('L', 1) : pack('L', 0);
369 0         0 ioctl($_[0], 0x8004667e, $nonblocking);
370             }
371             else {
372 1118 100       12526 $_[0]->blocking( $_[1] ? 0 : 1 );
373             }
374              
375 1118         2789 return;
376             }
377              
378             ###############################################################################
379             ## ----------------------------------------------------------------------------
380             ## Private methods, providing high-resolution time, for MCE->yield,
381             ## MCE::Child->yield, and MCE::Hobo->yield.
382             ##
383             ###############################################################################
384              
385             ## Use monotonic clock if available.
386              
387 107         295 use constant CLOCK_MONOTONIC => eval {
388 107         724 Time::HiRes::clock_gettime( Time::HiRes::CLOCK_MONOTONIC() );
389 107         28205 1;
390 107     107   797 };
  107         278  
391              
392             sub _sleep {
393 0     0     my ( $seconds ) = @_;
394 0 0         return if ( $seconds < 0 );
395              
396 0 0         if ( $INC{'Coro/AnyEvent.pm'} ) {
    0          
    0          
397 0           Coro::AnyEvent::sleep( $seconds );
398             }
399             elsif ( &Time::HiRes::d_nanosleep ) {
400 0           Time::HiRes::nanosleep( $seconds * 1e9 );
401             }
402             elsif ( &Time::HiRes::d_usleep ) {
403 0           Time::HiRes::usleep( $seconds * 1e6 );
404             }
405             else {
406 0           Time::HiRes::sleep( $seconds );
407             }
408              
409 0           return;
410             }
411              
412             sub _time {
413 0     0     return ( CLOCK_MONOTONIC )
414             ? Time::HiRes::clock_gettime( Time::HiRes::CLOCK_MONOTONIC() )
415             : Time::HiRes::time();
416             }
417              
418             1;
419              
420             __END__
421              
422             ###############################################################################
423             ## ----------------------------------------------------------------------------
424             ## Module usage.
425             ##
426             ###############################################################################
427              
428             =head1 NAME
429              
430             MCE::Util - Utility functions
431              
432             =head1 VERSION
433              
434             This document describes MCE::Util version 1.887
435              
436             =head1 SYNOPSIS
437              
438             use MCE::Util;
439              
440             =head1 DESCRIPTION
441              
442             A utility module for MCE. Nothing is exported by default. Exportable is
443             get_ncpu.
444              
445             =head2 get_ncpu()
446              
447             Returns the number of logical (online/active/enabled) CPU cores; never smaller
448             than one.
449              
450             my $ncpu = MCE::Util::get_ncpu();
451              
452             Specifying 'auto' for max_workers calls MCE::Util::get_ncpu automatically.
453             MCE 1.521 sets an upper-limit when specifying 'auto'. The reason is mainly
454             to safeguard apps from spawning 100 workers on a box having 100 cores.
455             This is important for apps which are IO-bound.
456              
457             use MCE;
458              
459             ## 'Auto' is the total # of logical cores (lcores) (8 maximum, MCE 1.521).
460             ## The computed value will not exceed the # of logical cores on the box.
461              
462             my $mce = MCE->new(
463              
464             max_workers => 'auto', ## 1 on HW with 1-lcores; 2 on 2-lcores
465             max_workers => 16, ## 16 on HW with 4-lcores; 16 on 32-lcores
466              
467             max_workers => 'auto', ## 4 on HW with 4-lcores; 8 on 16-lcores
468             max_workers => 'auto*1.5', ## 4 on HW with 4-lcores; 12 on 16-lcores
469             max_workers => 'auto*2.0', ## 4 on HW with 4-lcores; 16 on 16-lcores
470             max_workers => 'auto/2.0', ## 2 on HW with 4-lcores; 4 on 16-lcores
471             max_workers => 'auto+3', ## 4 on HW with 4-lcores; 11 on 16-lcores
472             max_workers => 'auto-1', ## 3 on HW with 4-lcores; 7 on 16-lcores
473              
474             max_workers => MCE::Util::get_ncpu, ## run on all lcores
475             );
476              
477             In summary:
478              
479             1. Auto has an upper-limit of 8 in MCE 1.521 (# of lcores, 8 maximum)
480             2. Math may be applied with auto (*/+-) to change the upper limit
481             3. The computed value for auto will not exceed the total # of lcores
482             4. One can specify max_workers explicitly to a hard value
483             5. MCE::Util::get_ncpu returns the actual # of lcores
484              
485             =head1 ACKNOWLEDGMENTS
486              
487             The portable code for detecting the number of processors was adopted from
488             L<Test::Smoke::SysInfo>.
489              
490             =head1 INDEX
491              
492             L<MCE|MCE>, L<MCE::Core>
493              
494             =head1 AUTHOR
495              
496             Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
497              
498             =cut
499