File Coverage

blib/lib/MCE/Shared/Server.pm
Criterion Covered Total %
statement 462 1009 45.7
branch 190 672 28.2
condition 36 256 14.0
subroutine 81 130 62.3
pod n/a
total 769 2067 37.2


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Server/Object packages for MCE::Shared.
4             ##
5             ###############################################################################
6              
7 43     43   306 use strict;
  43         101  
  43         1314  
8 43     43   214 use warnings;
  43         93  
  43         959  
9              
10 43     43   683 use 5.010001;
  43         144  
11              
12 43     43   263 no warnings qw( threads recursion uninitialized numeric once );
  43         79  
  43         3491  
13              
14             package MCE::Shared::Server;
15              
16             our $VERSION = '1.886';
17              
18             ## no critic (BuiltinFunctions::ProhibitStringyEval)
19             ## no critic (Subroutines::ProhibitExplicitReturnUndef)
20             ## no critic (TestingAndDebugging::ProhibitNoStrict)
21             ## no critic (InputOutput::ProhibitTwoArgOpen)
22              
23 43     43   20763 use if $^O eq 'MSWin32', 'threads';
  43         443  
  43         277  
24 43     43   2275 use if $^O eq 'MSWin32', 'threads::shared';
  43         119  
  43         183  
25              
26 43     43   1413 no overloading;
  43         107  
  43         1326  
27              
28 43     43   223 use Carp ();
  43         120  
  43         835  
29 43     43   30075 use Storable ();
  43         148825  
  43         10177  
30              
31             my ($_spawn_child, $_freeze, $_thaw);
32              
33             BEGIN {
34 43     43   176 local $@;
35              
36             eval 'use IO::FDPass ();'
37 43 50 33 43   4341 if ( ! $INC{'IO/FDPass.pm'} && $^O !~ /cygwin|android/ );
  43         22172  
  43         14518  
  43         575  
38              
39 43 50       243 $_spawn_child = $INC{'threads.pm'} ? 0 : 1;
40              
41 43 50       186 if ( ! $INC{'PDL.pm'} ) {
42 43     43   2656 eval 'use Sereal::Encoder 3.015; use Sereal::Decoder 3.015;';
  43     43   293  
  43         941  
  43         1959  
  43         267  
  43         716  
  43         1145  
43 43 50       379 if ( ! $@ ) {
44 43         652 my $_encoder_ver = int( Sereal::Encoder->VERSION() );
45 43         479 my $_decoder_ver = int( Sereal::Decoder->VERSION() );
46 43 50       196 if ( $_encoder_ver - $_decoder_ver == 0 ) {
47 43         114 $_freeze = \&Sereal::Encoder::encode_sereal;
48 43         96 $_thaw = \&Sereal::Decoder::decode_sereal;
49             }
50             }
51             }
52              
53 43 50       2200 if ( ! defined $_freeze ) {
54 0         0 $_freeze = \&Storable::freeze;
55 0         0 $_thaw = \&Storable::thaw;
56             }
57             }
58              
59 0     0   0 sub _get_freeze { $_freeze; }
60 0     0   0 sub _get_thaw { $_thaw; }
61              
62 43     43   3044 use IO::Handle ();
  43         32751  
  43         1093  
63 43     43   313 use Scalar::Util qw( blessed looks_like_number reftype weaken );
  43         73  
  43         3097  
64 43     43   3304 use Socket qw( SOL_SOCKET SO_RCVBUF );
  43         19600  
  43         2856  
65 43     43   1784 use Time::HiRes qw( alarm sleep time );
  43         3950  
  43         395  
66              
67 43     43   12885 use MCE::Signal 1.863 (); # requires 1.863 minimally
  43         12373  
  43         1121  
68 43     43   2690 use MCE::Util ();
  43         28717  
  43         955  
69 43     43   214 use MCE::Mutex ();
  43         81  
  43         1014  
70 43     43   3410 use bytes;
  43         155  
  43         263  
71              
72             ## The POSIX module has many symbols. Try not loading it simply
73             ## to have WNOHANG. The following covers most platforms.
74              
75             use constant {
76 43 50       5836 _WNOHANG => ( $INC{'POSIX.pm'} )
    50          
77             ? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1
78 43     43   2825 };
  43         77  
79              
80             use constant {
81             # Max data channels. This cannot be greater than 8 on MSWin32.
82 43         372764 DATA_CHANNELS => 8,
83              
84             SHR_M_NEW => 'M~NEW', # New share
85             SHR_M_CID => 'M~CID', # ClientID request
86             SHR_M_DEE => 'M~DEE', # Deeply shared
87             SHR_M_INC => 'M~INC', # Increment count
88             SHR_M_OBJ => 'M~OBJ', # Object request
89             SHR_M_OB0 => 'M~OB0', # Object request - thaw'less
90             SHR_M_DES => 'M~DES', # Destroy request
91             SHR_M_EXP => 'M~EXP', # Export request
92             SHR_M_INX => 'M~INX', # Iterator next
93             SHR_M_IRW => 'M~IRW', # Iterator rewind
94             SHR_M_STP => 'M~STP', # Exit loop
95              
96             SHR_O_PDL => 'O~PDL', # PDL::ins inplace(this),what,coords
97             SHR_O_DAT => 'O~DAT', # Get MCE::Hobo data
98             SHR_O_CLR => 'O~CLR', # Clear
99             SHR_O_FCH => 'O~FCH', # Fetch
100             SHR_O_SZE => 'O~SZE', # Size
101              
102             WA_ARRAY => 1, # Wants list
103 43     43   324 };
  43         85  
104              
105             ###############################################################################
106             ## ----------------------------------------------------------------------------
107             ## Private functions.
108             ##
109             ###############################################################################
110              
111             my ($_SVR, $_stopped, %_all, %_obj, %_ob2, %_ob3, %_itr, %_new) = (undef);
112             my ($_next_id, $_is_client, $_init_pid, $_svr_pid) = (0, 1);
113             my $LF = "\012"; Internals::SvREADONLY($LF, 1);
114             my %_export_nul;
115              
116             my @_db_modules = qw(
117             AnyDBM_File DB_File GDBM_File NDBM_File ODBM_File SDBM_File
118             BerkeleyDB::Btree BerkeleyDB::Hash BerkeleyDB::Queue
119             BerkeleyDB::Recno CDB_File KyotoCabinet::DB SQLite_File
120             TokyoCabinet::ADB TokyoCabinet::BDB TokyoCabinet::HDB
121             Tie::Array::DBD Tie::Hash::DBD
122             );
123              
124             my $_is_MSWin32 = ( $^O eq 'MSWin32') ? 1 : 0;
125             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
126             my $_oid = "$$.$_tid";
127              
128             sub _croak {
129 0     0   0 Carp::carp($_[0]);
130 0         0 MCE::Signal::stop_and_exit('INT');
131             }
132             sub CLONE {
133 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
134             }
135              
136             END {
137 43 0 33 43   28056 CORE::kill('KILL', $$)
138             if ($_is_MSWin32 && $MCE::Signal::KILLED);
139 43 100 66     1633 &_stop()
      66        
140             if ($_init_pid && $_init_pid eq "$$.$_tid" && $_is_client);
141             }
142              
143             sub _new {
144 158     158   698 my ($_class, $_deeply, %_hndls) = ($_[0]->{class}, $_[0]->{_DEEPLY_});
145 158         1026 my $_has_fh = ($_class =~ /^MCE::Shared::(?:Condvar|Queue)$/);
146              
147 158 100       739 if (!$_svr_pid) {
148             # Minimum support on platforms without IO::FDPass (not installed).
149             # Condvar and Queue must be shared first before others.
150             $_export_nul{ $_class } = undef, return _share(@_)
151 31 50 66     164 if $_has_fh && !$INC{'IO/FDPass.pm'};
152              
153 31         118 _start();
154             }
155              
156 158 100       978 if ($_has_fh) {
157             _croak("Sharing module '$_class' while the server is running\n".
158             "requires the 'IO::FDPass' module, missing in Perl")
159 29 50       268 if !$INC{'IO/FDPass.pm'};
160              
161 29         332 for my $_k (qw(
162             _qr_mutex _qw_sock _qr_sock _aw_sock _ar_sock _cw_sock _cr_sock
163             )) {
164 203 100       561 if ( defined $_[1]->{ $_k } ) {
165 84         911 $_hndls{ $_k } = delete $_[1]->{ $_k };
166 84         396 $_[1]->{ $_k } = undef;
167             }
168             }
169             }
170              
171 158         1041 my $_chn = $_SVR->{_data_channels} + 1;
172 158         1141 my $_DAT_LOCK = $_SVR->{'_mutex_'.$_chn};
173 158         654 my $_DAT_W_SOCK = $_SVR->{_dat_w_sock}[0];
174 158         670 my $_DAU_W_SOCK = $_SVR->{_dat_w_sock}[$_chn];
175              
176             ##
177             # Sereal cannot encode $DB_RECNO. Therefore, must encode using Storable.
178             # Error: DB_File::RECNOINFO does not define the method FIRSTKEY
179             #
180             # my $ob = tie my @db, 'MCE::Shared', { module => 'DB_File' }, $file,
181             # O_RDWR|O_CREAT, 0640, $DB_RECNO or die "open error '$file': $!";
182             ##
183              
184 158         2627 my $_buf = Storable::freeze(shift);
185 158         15008 my $_bu2 = Storable::freeze([ @_ ]);
186              
187 158 50       7614 local $\ = undef if (defined $\);
188 158 50       854 local $/ = $LF if ($/ ne $LF);
189              
190 158 50       1732 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_DAT_LOCK->lock();
191              
192 158         7720 print({$_DAT_W_SOCK} SHR_M_NEW.$LF . $_chn.$LF),
193 158 100       7529 print({$_DAU_W_SOCK} length($_buf).$LF, $_buf, length($_bu2).$LF, $_bu2,
  158         4620  
194             (keys %_hndls ? 1 : 0).$LF);
195              
196 158         310828 <$_DAU_W_SOCK>;
197              
198 158         1516 undef($_buf), undef($_bu2);
199              
200 158 100       872 if (keys %_hndls) {
201 29         90 for my $_k (qw( _qw_sock _qr_sock _aw_sock _cw_sock )) {
202 116 100       870 if (exists $_hndls{ $_k }) {
203 55         3331 IO::FDPass::send( fileno $_DAU_W_SOCK, fileno $_hndls{ $_k } );
204 55         16893 <$_DAU_W_SOCK>;
205             }
206             }
207             }
208              
209 158         80519 chomp(my $_id = <$_DAU_W_SOCK>),
210             chomp(my $_len = <$_DAU_W_SOCK>);
211              
212 158 50       2270 read($_DAU_W_SOCK, $_buf, $_len) if $_len;
213              
214 158 50       1691 $_DAT_LOCK->unlock() if !$_is_MSWin32;
215              
216 158 50       4859 $! = $_id, return '' unless $_len;
217              
218 158 100       835 if (keys %_hndls) {
219 29         607 $_all{ $_id } = $_class;
220 29         127 $_obj{ $_id } = \%_hndls;
221             }
222              
223 158 100       557 if (!$_deeply) {
224             # for auto-destroy
225 156 50       1446 $_new{ $_id } = $_tid ? $$ .'.'. $_tid : $$;
226             }
227              
228 158         6336 return $_thaw->($_buf);
229             }
230              
231             sub _incr_count {
232 0 0   0   0 return unless $_svr_pid;
233              
234 0         0 my $_chn = $_SVR->{_data_channels} + 1;
235 0         0 my $_DAT_LOCK = $_SVR->{'_mutex_'.$_chn};
236 0         0 my $_DAT_W_SOCK = $_SVR->{_dat_w_sock}[0];
237 0         0 my $_DAU_W_SOCK = $_SVR->{_dat_w_sock}[$_chn];
238              
239 0 0       0 local $\ = undef if (defined $\);
240 0 0       0 local $/ = $LF if ($/ ne $LF);
241 0         0 local $MCE::Signal::SIG;
242              
243             {
244 0         0 local $MCE::Signal::IPC = 1;
  0         0  
245 0 0       0 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_DAT_LOCK->lock();
246              
247 0         0 print({$_DAT_W_SOCK} SHR_M_INC.$LF . $_chn.$LF),
248 0         0 print({$_DAU_W_SOCK} $_[0].$LF);
  0         0  
249 0         0 <$_DAU_W_SOCK>;
250              
251 0 0       0 $_DAT_LOCK->unlock() if !$_is_MSWin32;
252             }
253              
254 0 0       0 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
255              
256 0         0 return;
257             }
258              
259             sub _share {
260 0     0   0 my ($_params, $_item) = (shift, shift);
261 0         0 my $_class = delete $_params->{'class'};
262 0         0 my $_id = ++$_next_id;
263              
264 0 0       0 if ($_class eq ':construct_module:') {
    0          
265 0         0 my ($_module, $_fcn) = ($_params->{module}, pop @{ $_item });
  0         0  
266 0 0       0 my $_has_args = @{ $_item } ? 1 : 0; local $@;
  0         0  
  0         0  
267              
268 0         0 ($_module) = $_module =~ /(.*)/; # remove tainted'ness
269 0         0 ($_fcn ) = $_fcn =~ /(.*)/;
270              
271 0 0       0 MCE::Shared::_use( $_class = $_module ) or _croak("$@\n");
272              
273 0 0       0 _croak("Can't locate object method \"$_fcn\" via package \"$_module\"")
274             unless eval qq{ $_module->can('$_fcn') };
275              
276 0 0       0 $! = 0; $_item = $_module->$_fcn(@{ $_item }) or return '';
  0         0  
  0         0  
277              
278 0 0       0 $_export_nul{ $_class } = undef if ($_class->isa('Graphics::Framebuffer'));
279 0 0       0 $_export_nul{ $_class } = undef if ($_fcn eq 'TIEHANDLE');
280              
281 0 0 0     0 return '' if (
      0        
282             $_has_args && $_fcn eq 'TIEHANDLE' && !defined(fileno $_item)
283             );
284             }
285             elsif ($_class eq ':construct_pdl:') {
286 0         0 local $@; local $SIG{__DIE__};
  0         0  
287              
288 0         0 $_class = 'PDL', $_item = eval q{
289             unless ($INC{'PDL.pm'}) {
290             use PDL;
291             # Disable PDL auto-threading.
292             eval q{ PDL::set_autopthread_targ(1) };
293             }
294              
295             my $_func = pop @{ $_item };
296              
297             if ($_func eq 'sbyte' ) { sbyte (@{ $_item }); }
298             elsif ($_func eq 'byte' ) { byte (@{ $_item }); }
299             elsif ($_func eq 'short' ) { short (@{ $_item }); }
300             elsif ($_func eq 'ushort' ) { ushort (@{ $_item }); }
301             elsif ($_func eq 'long' ) { long (@{ $_item }); }
302             elsif ($_func eq 'ulong' ) { ulong (@{ $_item }); }
303             elsif ($_func eq 'indx' ) { indx (@{ $_item }); }
304             elsif ($_func eq 'longlong' ) { longlong (@{ $_item }); }
305             elsif ($_func eq 'ulonglong') { ulonglong (@{ $_item }); }
306             elsif ($_func eq 'float' ) { float (@{ $_item }); }
307             elsif ($_func eq 'double' ) { double (@{ $_item }); }
308             elsif ($_func eq 'ldouble' ) { ldouble (@{ $_item }); }
309             elsif ($_func eq 'sequence' ) { sequence (@{ $_item }); }
310             elsif ($_func eq 'zeroes' ) { zeroes (@{ $_item }); }
311             elsif ($_func eq 'zeros' ) { zeros (@{ $_item }); }
312             elsif ($_func eq 'ones' ) { ones (@{ $_item }); }
313             elsif ($_func eq 'random' ) { random (@{ $_item }); }
314             elsif ($_func eq 'grandom' ) { grandom (@{ $_item }); }
315             else { pdl (@{ $_item }); }
316             };
317             }
318              
319 0         0 $_all{ $_id } = $_class;
320 0         0 $_ob3{"$_id:count"} = 1;
321              
322 0 0 0     0 if ($_class eq 'MCE::Shared::Handle' && reftype $_item eq 'ARRAY') {
323 0         0 $_obj{ $_id } = IO::Handle->new();
324 0         0 $_export_nul{ $_class } = undef;
325              
326 0         0 bless $_obj{ $_id }, $_class;
327             }
328             else {
329 0         0 $_obj{ $_id } = $_item;
330              
331 0 0 0     0 if ( reftype $_obj{ $_id } eq 'HASH' &&
332             reftype $_obj{ $_id }->{'fh'} eq 'GLOB' ) {
333              
334 0 0       0 if ( $_class->isa('Tie::File') ) {
335             # enable autoflush, enable raw layer
336 0         0 $_obj{ $_id }->{'fh'}->autoflush(1);
337 0         0 binmode($_obj{ $_id }->{'fh'}, ':raw');
338             }
339              
340 0         0 $_export_nul{ $_class } = undef;
341             }
342             }
343              
344 0         0 my $self = bless [ $_id, $_class ], 'MCE::Shared::Object';
345              
346 0         0 $_ob2{ $_id } = $_freeze->([ $self ]);
347              
348 0 0       0 if ( $_params->{tied} ) {
349             # set encoder/decoder upon receipt in MCE::Shared::_tie
350 0         0 for my $_module ( @_db_modules ) {
351 0 0       0 $self->[2] = 1, last if $_class->isa($_module);
352             }
353 0 0       0 $_export_nul{ $_class } = undef if $self->[2];
354             }
355              
356 0         0 return $self;
357             }
358              
359             sub _start {
360 91 100   91   474 return if $_svr_pid;
361              
362 43 50       183 if ($INC{'PDL.pm'}) { local $@;
  0         0  
363             # PDL::IO::Storable is required for serializing piddles.
364 0 0       0 eval 'use PDL::IO::Storable' unless $INC{'PDL/IO/Storable.pm'};
365             # PDL data should not be naively copied in new threads.
366 0         0 eval 'no warnings; sub PDL::CLONE_SKIP { 1 }';
367             # Disable PDL auto-threading.
368 0         0 eval q{ PDL::set_autopthread_targ(1) };
369             }
370              
371 43         155 local $_; $_init_pid = "$$.$_tid", $_stopped = undef;
  43         208  
372              
373 43 50       223 my $_data_channels = ($_init_pid eq $_oid) ? DATA_CHANNELS : 2;
374              
375 43         164 $_SVR = { _data_channels => $_data_channels };
376              
377             # Defaults to the misc channel used by _new, _get_hobo_data, and export.
378 43         245 MCE::Util::_sock_pair($_SVR, qw(_dat_r_sock _dat_w_sock), 0);
379             MCE::Util::_sock_pair($_SVR, qw(_dat_r_sock _dat_w_sock), $_, 1)
380 43         8387 for (1 .. $_data_channels + 1);
381              
382 43 50       76623 if ($^O !~ /linux|android|aix/) {
383 0         0 setsockopt($_SVR->{_dat_r_sock}[0], SOL_SOCKET, SO_RCVBUF, 4096);
384             }
385              
386 43 50       234 if ($_is_MSWin32) {
387 0         0 for (1 .. $_data_channels + 1) {
388 0         0 my $_mutex;
389 0         0 $_SVR->{'_mutex_'.$_} = threads::shared::share($_mutex);
390             }
391             }
392             else {
393             $_SVR->{'_mutex_'.$_} = MCE::Mutex->new( impl => 'Channel' )
394 43         493 for (1 .. $_data_channels + 1);
395             }
396              
397 43         140903 MCE::Shared::Object::_start();
398              
399             local $SIG{TTIN}, local $SIG{TTOU}, local $SIG{WINCH}
400 43 50       2375 unless $_is_MSWin32;
401              
402 43 50       274 if ($_spawn_child) {
403 43         72637 $_svr_pid = fork();
404 43 50 33     5208 _loop() if (defined $_svr_pid && $_svr_pid == 0);
405             }
406             else {
407 0         0 $_svr_pid = threads->create(\&_loop);
408 0 0       0 $_svr_pid->detach() if defined $_svr_pid;
409             }
410              
411 43 50       1026 _croak("cannot start the shared-manager process: $!")
412             unless (defined $_svr_pid);
413              
414 43 50 33     1685 sleep 0.015 if (!$_spawn_child || $_is_MSWin32);
415              
416 43         7820 return;
417             }
418              
419             sub _stop {
420 15     15   101 $_stopped = 1;
421 15 50 33     372 return unless ($_is_client && $_init_pid && $_init_pid eq "$$.$_tid");
      33        
422              
423 15 50       124 MCE::Child->finish('MCE') if $INC{'MCE/Child.pm'};
424 15 100       142 MCE::Hobo->finish('MCE') if $INC{'MCE/Hobo.pm'};
425              
426 15         369 local ($!, $?, $@);
427              
428 15 50       92 if (defined $_svr_pid) {
429 15         89 my $_DAT_W_SOCK = $_SVR->{_dat_w_sock}[0];
430              
431 15 50       84 if (ref $_svr_pid) {
432 0         0 eval { $_svr_pid->kill('KILL') };
  0         0  
433             }
434             else {
435             local $SIG{HUP} = local $SIG{QUIT} = local $SIG{PIPE} =
436 15     0   1370 local $SIG{INT} = local $SIG{TERM} = sub {};
437              
438 15         159 my $_start = time;
439              
440 15         51 eval {
441 15 50       78 local $\ = undef if (defined $\);
442 15         54 print {$_DAT_W_SOCK} SHR_M_STP.$LF.'0'.$LF;
  15         727  
443             };
444              
445 15         80 while () {
446 30 100       2422 last if waitpid($_svr_pid, _WNOHANG);
447 15 50       152 if ( time - $_start > 0.7 ) {
448 0         0 CORE::kill('USR2', $_svr_pid);
449 0         0 waitpid($_svr_pid, 0);
450 0         0 last;
451             }
452 15         677631 sleep 0.045;
453             }
454             }
455              
456 15         131 $_init_pid = $_svr_pid = undef;
457 15         184 %_all = (), %_obj = ();
458              
459 15         616 MCE::Util::_destroy_socks($_SVR, qw( _dat_w_sock _dat_r_sock ));
460              
461 15         43200 for my $_i (1 .. $_SVR->{_data_channels} + 1) {
462 135         12745 delete $_SVR->{'_mutex_'.$_i};
463             }
464              
465 15         199 MCE::Shared::Object::_stop();
466             }
467              
468 15         340 return;
469             }
470              
471             sub _pid {
472 0 0   0   0 return ref($_svr_pid) ? int($_init_pid) : $_svr_pid;
473             }
474              
475             sub _destroy {
476 0     0   0 my ($_lkup, $_item, $_id) = @_;
477              
478             # safety for circular references to not destroy dangerously
479 0 0 0     0 return if exists $_ob3{ "$_id:count" } && --$_ob3{ "$_id:count" } > 0;
480              
481             # safety for circular references to not loop endlessly
482 0 0       0 return if exists $_lkup->{ $_id };
483              
484 0         0 $_lkup->{ $_id } = undef;
485              
486 0 0       0 if (exists $_ob3{ "$_id:deeply" }) {
    0          
487 0         0 for my $_oid (keys %{ $_ob3{ "$_id:deeply" } }) {
  0         0  
488 0         0 _destroy($_lkup, $_obj{ $_oid }, $_oid);
489             }
490 0         0 delete $_ob3{ "$_id:deeply" };
491             }
492             elsif (exists $_obj{ $_id }) {
493 0 0 0     0 if ($_obj{ $_id }->isa('MCE::Shared::Scalar') ||
    0          
    0          
    0          
    0          
    0          
    0          
494             $_obj{ $_id }->isa('Tie::StdScalar')) {
495              
496 0 0       0 if (blessed($_item->FETCH())) {
497 0         0 my $_oid = $_item->FETCH()->SHARED_ID();
498 0         0 _destroy($_lkup, $_obj{ $_oid }, $_oid);
499             }
500              
501 0         0 undef ${ $_obj{ $_id } };
  0         0  
502             }
503 0         0 elsif ($_obj{ $_id }->isa('Tie::File')) { $_obj{ $_id }->flush(); }
504 0         0 elsif ($_obj{ $_id }->can('sync')) { $_obj{ $_id }->sync(); }
505 0         0 elsif ($_obj{ $_id }->can('db_sync')) { $_obj{ $_id }->db_sync(); }
506 0         0 elsif ($_obj{ $_id }->can('close')) { $_obj{ $_id }->close(); }
507 0         0 elsif ($_obj{ $_id }->can('DESTROY')) { delete $_obj{ $_id }; }
508             elsif (reftype $_obj{ $_id } eq 'GLOB') {
509 0 0       0 close $_obj{ $_id } if defined(fileno $_obj{ $_id });
510             }
511             }
512              
513 0 0       0 weaken( delete $_obj{ $_id } ) if exists($_obj{ $_id });
514 0 0       0 weaken( delete $_itr{ $_id } ) if exists($_itr{ $_id });
515              
516             delete($_itr{ "$_id:args" }), delete($_all{ $_id }),
517 0         0 delete($_ob3{ "$_id:count" }), delete($_ob2{ $_id });
518              
519 0         0 return;
520             }
521              
522             ###############################################################################
523             ## ----------------------------------------------------------------------------
524             ## Server loop.
525             ##
526             ###############################################################################
527              
528             sub _exit {
529 0 0   0   0 $SIG{__DIE__} = sub {} unless $_tid;
        0      
530 0     0   0 $SIG{__WARN__} = sub {};
531              
532             # Flush file handles.
533 0         0 for my $_id ( keys %_obj ) {
534 0         0 eval {
535 0 0       0 if ($_obj{ $_id }->isa('Tie::File')) { $_obj{ $_id }->flush(); }
  0 0       0  
    0          
    0          
    0          
    0          
536 0         0 elsif ($_obj{ $_id }->can('sync')) { $_obj{ $_id }->sync(); }
537 0         0 elsif ($_obj{ $_id }->can('db_sync')) { $_obj{ $_id }->db_sync(); }
538 0         0 elsif ($_obj{ $_id }->can('close')) { $_obj{ $_id }->close(); }
539 0         0 elsif ($_obj{ $_id }->can('DESTROY')) { delete $_obj{ $_id }; }
540             elsif (reftype $_obj{ $_id } eq 'GLOB') {
541 0 0       0 close $_obj{ $_id } if defined(fileno $_obj{ $_id });
542             }
543             };
544             }
545              
546             # Destroy non-exportable objects.
547 0         0 for my $_id ( keys %_all ) {
548 0         0 eval {
549             weaken( delete $_obj{ $_id } )
550 0 0       0 if ( exists $_export_nul{ $_all{ $_id } } );
551             };
552             }
553              
554             # Wait for the main thread to exit.
555 0 0 0     0 if ( !$_spawn_child && ($_is_MSWin32 || $INC{'Tk.pm'} || $INC{'Wx.pm'}) ) {
      0        
556 0         0 sleep 1.0;
557             }
558              
559 0 0 0     0 threads->exit(0) if ( !$_spawn_child || $_is_MSWin32 );
560              
561 0 0       0 CORE::kill('KILL', $$) unless $_is_MSWin32;
562 0         0 CORE::exit(0);
563             }
564              
565             sub _loop {
566 0     0   0 $_is_client = 0;
567              
568 0 0 0     0 $MCE::MCE = undef if ($MCE::MCE && $MCE::MCE->{_wid} == 0);
569              
570 0         0 local $\ = undef; local $/ = $LF; $| = 1;
  0         0  
  0         0  
571 0         0 my $_running_inside_eval = $^S;
572              
573       0     local $SIG{TERM} = local $SIG{QUIT} = local $SIG{INT} = local $SIG{HUP} = sub {}
574 0 0       0 if ($_init_pid eq $_oid);
575              
576 0     0   0 local $SIG{PIPE} = sub { $SIG{PIPE} = sub {}; CORE::kill('PIPE', getppid()); }
  0         0  
577 0 0 0     0 if ($_spawn_child && !$_is_MSWin32);
578              
579 0 0 0     0 local $SIG{USR2} = \&_exit if ($_init_pid eq $_oid && !$_is_MSWin32);
580 0 0 0     0 local $SIG{KILL} = \&_exit if ($_init_pid eq $_oid && !$_spawn_child);
581              
582             local $SIG{__DIE__} = sub {
583 0 0 0 0   0 if (!defined $^S || $^S) {
584 0 0 0     0 if ( ($INC{'threads.pm'} && threads->tid() != 0) ||
      0        
      0        
585             $ENV{'PERL_IPERL_RUNNING'} ||
586             $_running_inside_eval
587             ) {
588             # thread env or running inside IPerl, check stack trace
589 0         0 my $_t = Carp::longmess(); $_t =~ s/\teval [^\n]+\n$//;
  0         0  
590 0 0 0     0 CORE::die(@_)
591             if ( $_t =~ /^(?:[^\n]+\n){1,7}\teval / ||
592             $_t =~ /\n\teval [^\n]+\n\t(?:eval|Try)/ );
593             }
594             else {
595             # normal env, trust $^S
596 0         0 CORE::die(@_);
597             }
598             }
599              
600 0         0 $SIG{INT} = $SIG{__DIE__} = $SIG{__WARN__} = sub {};
601 0 0       0 print {*STDERR} defined $_[0] ? $_[0] : '';
  0         0  
602              
603 0 0 0     0 ( $_spawn_child && !$_is_MSWin32 )
604             ? CORE::kill('KILL', -getpgrp)
605             : CORE::exit($?);
606 0         0 };
607              
608 0         0 my ($_id, $_fcn, $_wa, $_len, $_func, $_var);
609 0         0 my ($_channel_id, $_done) = (0, 0);
610              
611 0         0 my $_channels = $_SVR->{_dat_r_sock};
612 0         0 my $_DAT_R_SOCK = $_SVR->{_dat_r_sock}[0];
613 0         0 my $_DAU_R_SOCK;
614              
615             my $_auto_reply = sub {
616 0 0   0   0 if ( $_wa == WA_ARRAY ) {
617 0         0 my @_ret = eval { $_var->$_fcn(@_) };
  0         0  
618 0         0 my $_buf = $_freeze->(\@_ret);
619 0         0 return print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
  0         0  
620             }
621              
622 0         0 my $_ret = eval { $_var->$_fcn(@_) };
  0         0  
623 0         0 my $_buf = $_freeze->([ $_ret ]);
624              
625 0         0 print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
  0         0  
626 0         0 };
627              
628             my $_obj_keys = sub {
629 0     0   0 my ( $_obj, @_keys, $_cnt ) = ( shift );
630              
631 0 0       0 return keys %{ $_obj } if $_obj->isa('Tie::StdHash');
  0         0  
632 0 0       0 return (0 .. $_obj->FETCHSIZE - 1) unless $_obj->can('FIRSTKEY');
633              
634 0 0       0 if ( wantarray ) {
    0          
    0          
635 0         0 my $_key = $_obj->FIRSTKEY;
636 0 0       0 if ( defined $_key ) {
637 0         0 push @_keys, $_key;
638             # CDB_File expects the $_key argument
639 0         0 while ( defined( $_key = $_obj->NEXTKEY($_key) ) ) {
640 0         0 push @_keys, $_key;
641             }
642             }
643             }
644             elsif ( $_obj->isa('Tie::ExtraHash') ) {
645 0         0 $_cnt = keys %{ $_obj->[0] };
  0         0  
646             }
647             elsif ( $_obj->isa('Tie::IxHash') ) {
648 0         0 $_cnt = keys %{ $_obj->[2] };
  0         0  
649             }
650             else {
651 0         0 my $_key = $_obj->FIRSTKEY; $_cnt = 0;
  0         0  
652 0 0       0 if ( defined $_key ) {
653 0         0 $_cnt = 1;
654             # CDB_File expects the $_key argument
655 0         0 while ( defined( $_key = $_obj->NEXTKEY($_key) ) ) {
656 0         0 $_cnt++;
657             }
658             }
659             }
660              
661 0 0       0 wantarray ? @_keys : $_cnt;
662 0         0 };
663              
664             my $_iter = sub {
665 0 0   0   0 unless ( exists $_itr{ $_id } ) {
666              
667 0         0 my $pkg = $_all{ $_id };
668 0 0 0     0 my $flg = ($pkg->can('NEXTKEY') || $pkg->can('keys')) ? 1 : 0;
669 0 0       0 my $get = $pkg->can('FETCH') ? 'FETCH' : $pkg->can('get') ? 'get' : '';
    0          
670              
671 0 0 0     0 unless ( ($flg || $pkg->can('FETCHSIZE')) && $get ) {
      0        
672 0         0 print {$_DAU_R_SOCK} '-1'.$LF;
  0         0  
673 0         0 return;
674             }
675              
676             # MCE::Shared::{ Array, Cache, Hash, Ordhash }, Hash::Ordered,
677             # or similar module.
678              
679 0 0       0 $get = 'peek' if $pkg->isa('MCE::Shared::Cache');
680              
681 0 0       0 if ( !exists $_itr{ "$_id:args" } ) {
682 0         0 @{ $_itr{ "$_id:args" } } = $pkg->can('keys')
683             ? $_obj{ $_id }->keys()
684 0 0       0 : $_obj_keys->( $_obj{ $_id } );
685             }
686             else {
687 0         0 my $_args = $_itr{ "$_id:args" };
688 0 0 0     0 if ( @{ $_args } == 1 &&
  0         0  
689             $_args->[0] =~ /^(?:key|val)[ ]+\S\S?[ ]+\S/ ) {
690              
691 0 0       0 @{ $_args } = $_obj{ $_id }->keys($_args->[0])
  0         0  
692             if $pkg->isa('MCE::Shared::Common');
693             }
694             else {
695 0 0       0 $_obj{ $_id }->_prune_head()
696             if $pkg->isa('MCE::Shared::Cache');
697             }
698             }
699              
700             $_itr{ $_id } = sub {
701 0         0 my $_key = shift @{ $_itr{ "$_id:args" } };
  0         0  
702 0 0       0 print({$_DAU_R_SOCK} '-1'.$LF), return if !defined($_key);
  0         0  
703 0         0 my $_buf = $_freeze->([ $_key, $_obj{ $_id }->$get($_key) ]);
704 0         0 print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
  0         0  
705 0         0 };
706             }
707              
708 0         0 $_itr{ $_id }->();
709              
710 0         0 return;
711 0         0 };
712              
713             my $_warn = sub {
714 0 0   0   0 if ( $_wa ) {
715 0         0 my $_buf = $_freeze->([ ]);
716 0         0 print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
  0         0  
717             }
718 0         0 };
719              
720             # --------------------------------------------------------------------------
721              
722 0         0 my %_output_function; %_output_function = (
723              
724             SHR_M_NEW.$LF => sub { # New share
725 0     0   0 my ($_buf, $_params, $_class, $_args, $_fd, $_item);
726              
727 0         0 chomp($_len = <$_DAU_R_SOCK>),
728             read($_DAU_R_SOCK, $_buf, $_len);
729              
730 0         0 $_params = Storable::thaw($_buf);
731 0         0 $_class = $_params->{'class'};
732              
733 0   0     0 { local $@; MCE::Shared::_use($_params->{module} || $_class); }
  0         0  
  0         0  
734              
735             chomp($_len = <$_DAU_R_SOCK>), read($_DAU_R_SOCK, $_buf, $_len),
736 0         0 chomp($_len = <$_DAU_R_SOCK>), print({$_DAU_R_SOCK} $LF);
  0         0  
737              
738 0         0 $_args = Storable::thaw($_buf); undef $_buf;
  0         0  
739              
740 0 0       0 if ($_len) {
741 0         0 $_export_nul{ $_class } = undef;
742              
743 0         0 for my $_k (qw( _qw_sock _qr_sock _aw_sock _cw_sock )) {
744 0 0       0 if (exists $_args->[0]->{ $_k }) {
745 0         0 delete $_args->[0]->{ $_k };
746              
747 0 0       0 $_fd = IO::FDPass::recv(fileno $_DAU_R_SOCK); $_fd >= 0
  0         0  
748             or _croak("cannot receive file handle: $!");
749              
750 0 0       0 open $_args->[0]->{ $_k }, "+<&=$_fd"
751             or _croak("cannot convert file discriptor to handle: $!");
752              
753 0         0 print {$_DAU_R_SOCK} $LF;
  0         0  
754             }
755             }
756             }
757              
758 0 0       0 $_item = _share($_params, @{ $_args }) or do {
  0         0  
759 0         0 print {$_DAU_R_SOCK} int($!).$LF . '0'.$LF;
  0         0  
760 0         0 return;
761             };
762              
763 0         0 $_buf = $_freeze->($_item);
764              
765 0         0 print {$_DAU_R_SOCK} $_item->SHARED_ID().$LF .
  0         0  
766             length($_buf).$LF, $_buf;
767              
768 0 0       0 if ($_class eq 'MCE::Shared::Queue') {
    0          
    0          
769             MCE::Shared::Queue::_init_mgr(
770             \$_DAU_R_SOCK, \%_obj, \%_output_function, $_freeze, $_thaw
771 0 0       0 ) if $INC{'MCE/Shared/Queue.pm'};
772             }
773             elsif (reftype $_obj{ $_item->[0] } eq 'GLOB') {
774             MCE::Shared::Handle::_init_mgr(
775             \$_DAU_R_SOCK, \%_obj, \%_output_function, $_freeze, $_thaw
776 0 0       0 ) if $INC{'MCE/Shared/Handle.pm'};
777             }
778             elsif ($_class eq 'MCE::Shared::Condvar') {
779             MCE::Shared::Condvar::_init_mgr(
780             \$_DAU_R_SOCK, \%_obj, \%_output_function
781 0 0       0 ) if $INC{'MCE/Shared/Condvar.pm'};
782             }
783              
784 0         0 return;
785             },
786              
787             SHR_M_CID.$LF => sub { # ClientID request
788 0     0   0 print {$_DAU_R_SOCK} (++$_channel_id).$LF;
  0         0  
789 0 0       0 $_channel_id = 0 if ($_channel_id >= $_SVR->{_data_channels});
790              
791 0         0 return;
792             },
793              
794             SHR_M_DEE.$LF => sub { # Deeply shared
795 0     0   0 chomp(my $_id1 = <$_DAU_R_SOCK>),
796             chomp(my $_id2 = <$_DAU_R_SOCK>);
797              
798 0         0 $_ob3{ "$_id1:deeply" }->{ $_id2 } = undef;
799              
800 0         0 return;
801             },
802              
803             SHR_M_INC.$LF => sub { # Increment count
804 0     0   0 chomp($_id = <$_DAU_R_SOCK>);
805              
806 0         0 $_ob3{ "$_id:count" }++;
807 0         0 print {$_DAU_R_SOCK} $LF;
  0         0  
808              
809 0         0 return;
810             },
811              
812             SHR_M_OBJ.$LF => sub { # Object request
813 0     0   0 chomp($_id = <$_DAU_R_SOCK>),
814             chomp($_fcn = <$_DAU_R_SOCK>),
815             chomp($_wa = <$_DAU_R_SOCK>),
816             chomp($_len = <$_DAU_R_SOCK>),
817              
818             read($_DAU_R_SOCK, my($_buf), $_len);
819              
820 0   0     0 $_var = $_obj{ $_id } || do { return $_warn->($_fcn) };
821              
822 0         0 $_wa ? $_auto_reply->(@{ $_thaw->($_buf) })
823 0 0       0 : eval { $_var->$_fcn(@{ $_thaw->($_buf) }) };
  0         0  
  0         0  
824              
825 0 0       0 warn $@ if $@;
826 0         0 return;
827             },
828              
829             SHR_M_OB0.$LF => sub { # Object request - thaw'less
830 0     0   0 chomp($_id = <$_DAU_R_SOCK>),
831             chomp($_fcn = <$_DAU_R_SOCK>),
832             chomp($_wa = <$_DAU_R_SOCK>);
833              
834 0   0     0 $_var = $_obj{ $_id } || do { return $_warn->($_fcn) };
835              
836 0   0     0 my $_code = $_var->can($_fcn) || do {
837             if ( ($_fcn eq 'keys' || $_fcn eq 'SCALAR') &&
838             ($_var->can('NEXTKEY') || $_var->can('FETCHSIZE')) ) {
839             $_obj_keys;
840             }
841             else {
842             $_wa ? $_auto_reply->() : eval { $_var->$_fcn() };
843             warn $@ if $@;
844             return;
845             }
846             };
847              
848 0 0       0 if ( $_wa == WA_ARRAY ) {
    0          
849 0         0 my @_ret = eval { $_code->($_var) };
  0         0  
850 0         0 my $_buf = $_freeze->(\@_ret);
851 0         0 print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
  0         0  
852             }
853             elsif ( $_wa ) {
854 0         0 my $_ret = eval { $_code->($_var) };
  0         0  
855 0         0 my $_buf = $_freeze->([ $_ret ]);
856 0         0 print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
  0         0  
857             }
858             else {
859 0         0 eval { $_code->($_var) };
  0         0  
860             }
861              
862 0 0       0 warn $@ if $@;
863 0         0 return;
864             },
865              
866             SHR_M_DES.$LF => sub { # Destroy request
867 0     0   0 chomp($_id = <$_DAU_R_SOCK>);
868              
869 0         0 local $SIG{__DIE__};
870 0         0 local $SIG{__WARN__};
871              
872 0         0 $_var = undef; local $@;
  0         0  
873              
874 0         0 eval {
875 0 0       0 my $_ret = exists($_all{ $_id }) ? '1' : '0';
876 0 0       0 _destroy({}, $_obj{ $_id }, $_id) if $_ret;
877             };
878              
879 0         0 print {$_DAU_R_SOCK} $LF;
  0         0  
880              
881 0         0 return;
882             },
883              
884             SHR_M_EXP.$LF => sub { # Export request
885 0     0   0 chomp($_id = <$_DAU_R_SOCK>),
886             chomp($_len = <$_DAU_R_SOCK>);
887              
888 0 0       0 read($_DAU_R_SOCK, my($_keys), $_len) if $_len;
889              
890 0 0       0 if (exists $_obj{ $_id }) {
891 0         0 my $_buf;
892              
893             # Do not export: e.g. objects with file handles
894 0 0       0 if ( exists $_export_nul{ $_all{ $_id } } ) {
895 0         0 print {$_DAU_R_SOCK} '-1'.$LF;
  0         0  
896 0         0 return;
897             }
898              
899             # MCE::Shared::{ Array, Hash, Ordhash }, Hash::Ordered
900 0 0       0 if ($_obj{ $_id }->can('clone')) {
901             $_buf = ($_len)
902 0         0 ? Storable::freeze($_obj{ $_id }->clone(@{ $_thaw->($_keys) }))
903 0 0       0 : Storable::freeze($_obj{ $_id });
904             }
905             # Other
906             else {
907 0         0 $_buf = Storable::freeze($_obj{ $_id });
908             }
909              
910 0         0 print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
  0         0  
911 0         0 undef $_buf;
912             }
913             else {
914 0         0 print {$_DAU_R_SOCK} '-1'.$LF;
  0         0  
915             }
916              
917 0         0 return;
918             },
919              
920             SHR_M_INX.$LF => sub { # Iterator next
921 0     0   0 chomp($_id = <$_DAU_R_SOCK>);
922              
923 0         0 my $_var = $_obj{ $_id };
924              
925 0 0       0 if ( my $_code = $_var->can('next') ) {
926 0         0 my $_buf = $_freeze->([ $_code->($_var) ]);
927 0         0 print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
  0         0  
928             }
929             else {
930 0         0 $_iter->();
931             }
932              
933 0         0 return;
934             },
935              
936             SHR_M_IRW.$LF => sub { # Iterator rewind
937 0     0   0 chomp($_id = <$_DAU_R_SOCK>),
938             chomp($_len = <$_DAU_R_SOCK>),
939              
940             read($_DAU_R_SOCK, my($_buf), $_len);
941              
942 0         0 my $_var = $_obj{ $_id };
943              
944 0 0       0 if ( my $_code = $_var->can('rewind') ) {
945 0         0 $_code->($_var, @{ $_thaw->($_buf) });
  0         0  
946             }
947             else {
948 0 0       0 weaken( delete $_itr{ $_id } ) if ( exists $_itr{ $_id } );
949 0         0 my @_args = @{ $_thaw->($_buf) };
  0         0  
950 0 0       0 if ( @_args ) {
951 0         0 $_itr{ "$_id:args" } = \@_args;
952             } else {
953 0         0 delete $_itr{ "$_id:args" };
954             }
955             }
956              
957 0         0 print {$_DAU_R_SOCK} $LF;
  0         0  
958              
959 0         0 return;
960             },
961              
962             SHR_M_STP.$LF => sub { # Exit loop
963 0 0   0   0 $SIG{USR2} = sub {} unless $_is_MSWin32;
964              
965 0         0 $_done = 1;
966              
967 0         0 return;
968             },
969              
970             SHR_O_PDL.$LF => sub { # PDL::ins inplace(this),...
971 0     0   0 chomp($_id = <$_DAU_R_SOCK>),
972             chomp($_len = <$_DAU_R_SOCK>),
973              
974             read($_DAU_R_SOCK, my($_buf), $_len);
975              
976 0 0       0 if ($_all{ $_id } eq 'PDL') {
977             # PDL ins( inplace($this), $what, @coords );
978 0         0 local @_ = @{ Storable::thaw($_buf) };
  0         0  
979              
980 0 0 0     0 if (@_ == 1) {
    0 0        
    0 0        
    0          
    0          
981 0         0 ins( inplace($_obj{ $_id }), @_, 0, 0 );
982             }
983             elsif (@_ == 2 && $_[0] =~ /^:,(\d+):(\d+)/ && ref($_[1])) {
984 0         0 my $_s = $2 - $1;
985 0         0 ins( inplace($_obj{ $_id }), $_[1]->slice(":,0:$_s"), 0, $1 );
986             }
987             elsif (!ref($_[0]) && $_[0] =~ /^(\d+)$/) {
988 0         0 $_obj{ $_id }->set(@_);
989             }
990             elsif (@_ == 2) {
991 0         0 $_[0] =~ /^:,(\d+)/;
992 0   0     0 ins( inplace($_obj{ $_id }), $_[1], 0, $1 // $_[0] );
993             }
994             elsif (@_ > 2) {
995 0         0 ins( inplace($_obj{ $_id }), @_ );
996             }
997             }
998              
999 0         0 return;
1000             },
1001              
1002             SHR_O_DAT.$LF => sub { # Get MCE::Hobo data
1003 0     0   0 my $_key;
1004              
1005 0         0 chomp($_id = <$_DAU_R_SOCK>),
1006             chomp($_key = <$_DAU_R_SOCK>);
1007              
1008 0   0     0 my $error = delete $_obj{ $_id }{ 'S'.$_key } // '';
1009 0   0     0 my $result = delete $_obj{ $_id }{ 'R'.$_key } // '';
1010              
1011 0         0 print {$_DAU_R_SOCK}
  0         0  
1012             length($error).$LF . length($result).$LF . $error, $result;
1013              
1014 0         0 return;
1015             },
1016              
1017             SHR_O_CLR.$LF => sub { # Clear
1018 0     0   0 chomp($_id = <$_DAU_R_SOCK>),
1019             chomp($_fcn = <$_DAU_R_SOCK>);
1020              
1021 0   0     0 my $_var = $_obj{ $_id } || do { return };
1022              
1023 0 0       0 if (exists $_ob3{ "$_id:deeply" }) {
1024 0         0 my $_keep = { $_id => 1 };
1025 0         0 for my $_oid (keys %{ $_ob3{ "$_id:deeply" } }) {
  0         0  
1026 0         0 _destroy($_keep, $_obj{ $_oid }, $_oid);
1027             }
1028 0         0 delete $_ob3{ "$_id:deeply" };
1029             }
1030              
1031 0         0 eval { $_var->$_fcn() };
  0         0  
1032              
1033 0 0       0 warn $@ if $@;
1034 0         0 return;
1035             },
1036              
1037             SHR_O_FCH.$LF => sub { # Fetch
1038 0     0   0 chomp($_id = <$_DAU_R_SOCK>),
1039             chomp($_fcn = <$_DAU_R_SOCK>),
1040             chomp($_len = <$_DAU_R_SOCK>);
1041              
1042 0 0       0 read($_DAU_R_SOCK, my($_key), $_len) if $_len;
1043              
1044 0   0     0 my $_var = $_obj{ $_id } || do {
1045             return print {$_DAU_R_SOCK} '-1'.$LF;
1046             };
1047              
1048             my $_buf = $_len
1049 0 0       0 ? eval { $_var->$_fcn( chop $_key ? ${ $_thaw->($_key) } : $_key ) }
  0         0  
1050 0 0       0 : eval { $_var->$_fcn() };
  0         0  
1051              
1052 0 0       0 warn $@ if $@;
1053              
1054 0 0       0 return print {$_DAU_R_SOCK} '-1'.$LF if ( !defined $_buf );
  0         0  
1055              
1056             my $_ret = ( blessed($_buf) && $_buf->can('SHARED_ID') && $_ob2{ $_buf->[0] } )
1057 0 0 0     0 ? $_ob2{ $_buf->[0] }
1058             : $_freeze->([ $_buf ]);
1059              
1060 0         0 print {$_DAU_R_SOCK} length($_ret).$LF, $_ret;
  0         0  
1061              
1062 0         0 return;
1063             },
1064              
1065             SHR_O_SZE.$LF => sub { # Size
1066 0     0   0 chomp($_id = <$_DAU_R_SOCK>),
1067             chomp($_fcn = <$_DAU_R_SOCK>);
1068              
1069 0   0     0 $_var = $_obj{ $_id } || do {
1070             print {$_DAU_R_SOCK} $LF if $_wa;
1071             return;
1072             };
1073              
1074 0   0     0 my $_code = $_var->can($_fcn) || do {
1075             if ( ($_fcn eq 'keys' || $_fcn eq 'SCALAR') &&
1076             ($_var->can('NEXTKEY') || $_var->can('FETCHSIZE')) ) {
1077             $_obj_keys;
1078             }
1079             else {
1080             $_len = eval { $_var->$_fcn() };
1081             print {$_DAU_R_SOCK} $_len.$LF;
1082              
1083             warn $@ if $@;
1084             return;
1085             }
1086             };
1087              
1088 0         0 $_len = eval { $_code->($_var) };
  0         0  
1089 0         0 print {$_DAU_R_SOCK} $_len.$LF;
  0         0  
1090              
1091 0 0       0 warn $@ if $@;
1092 0         0 return;
1093             },
1094              
1095 0         0 );
1096              
1097             MCE::Shared::Queue::_init_mgr(
1098             \$_DAU_R_SOCK, \%_obj, \%_output_function, $_freeze, $_thaw
1099 0 0       0 ) if $INC{'MCE/Shared/Queue.pm'};
1100              
1101             MCE::Shared::Handle::_init_mgr(
1102             \$_DAU_R_SOCK, \%_obj, \%_output_function, $_freeze, $_thaw
1103 0 0       0 ) if $INC{'MCE/Shared/Handle.pm'};
1104              
1105             MCE::Shared::Condvar::_init_mgr(
1106             \$_DAU_R_SOCK, \%_obj, \%_output_function
1107 0 0       0 ) if $INC{'MCE/Shared/Condvar.pm'};
1108              
1109             # --------------------------------------------------------------------------
1110              
1111             # Call on hash function.
1112              
1113 0 0       0 if ($_is_MSWin32) {
1114             # The normal loop hangs on Windows when processes/threads start/exit.
1115             # Using ioctl() properly, https://www.perlmonks.org/?node_id=780083
1116              
1117 0         0 my $_val_bytes = pack('L', 0);
1118 0         0 my ($_count, $_nbytes, $_start);
1119              
1120 0         0 while (!$_done) {
1121 0         0 $_start = time, $_count = 1;
1122              
1123             # MSWin32 FIONREAD
1124 0         0 IOCTL: ioctl($_DAT_R_SOCK, 0x4004667f, $_val_bytes);
1125              
1126 0 0       0 unless ($_nbytes = unpack('L', $_val_bytes)) {
1127 0 0       0 if ($_count) {
1128             # delay after a while to not consume a CPU core
1129 0 0 0     0 $_count = 0 if ++$_count % 50 == 0 && time - $_start > 0.030;
1130             } else {
1131 0         0 sleep 0.015;
1132             }
1133 0         0 goto IOCTL;
1134             }
1135              
1136 0         0 do {
1137 0         0 sysread($_DAT_R_SOCK, $_func, 8);
1138 0 0       0 $_done = 1, last() unless length($_func) == 8;
1139 0         0 $_DAU_R_SOCK = $_channels->[ substr($_func, -2, 2, '') ];
1140              
1141 0         0 $_output_function{$_func}();
1142              
1143             } while (($_nbytes -= 8) >= 8);
1144             }
1145             }
1146             else {
1147 0         0 while (!$_done) {
1148 0         0 $_func = <$_DAT_R_SOCK>;
1149 0 0       0 last() unless length($_func) == 6;
1150 0         0 $_DAU_R_SOCK = $_channels->[ <$_DAT_R_SOCK> ];
1151              
1152 0         0 $_output_function{$_func}();
1153             }
1154             }
1155              
1156 0         0 _exit();
1157             }
1158              
1159             ###############################################################################
1160             ## ----------------------------------------------------------------------------
1161             ## Object package.
1162             ##
1163             ###############################################################################
1164              
1165             package MCE::Shared::Object;
1166              
1167 43     43   498 use Scalar::Util qw( looks_like_number reftype );
  43         92  
  43         2714  
1168 43     43   22681 use MCE::Shared::Base ();
  43         125  
  43         1131  
1169 43     43   291 use bytes;
  43         81  
  43         280  
1170              
1171             use constant {
1172 43 50       3338 _WNOHANG => ( $INC{'POSIX.pm'} )
    50          
1173             ? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1
1174 43     43   2096 };
  43         119  
1175             use constant {
1176 43         4397 _ID => 0, _CLASS => 1, _ENCODE => 2, _DECODE => 3, # shared object
1177             _DREF => 4, _ITER => 5, _MUTEX => 6,
1178 43     43   273 };
  43         89  
1179             use constant {
1180 43         4643 _UNDEF => 0, _ARRAY => 1, _SCALAR => 2, # wantarray
1181 43     43   303 };
  43         125  
1182              
1183             ## Below, no circular reference to original, therefore no memory leaks.
1184              
1185             use overload (
1186             q("") => \&MCE::Shared::Base::_stringify,
1187             q(0+) => \&MCE::Shared::Base::_numify,
1188             q(@{}) => sub {
1189 43     43   323 no overloading;
  43         94  
  43         9092  
1190 7 50   7   3662 $_[0]->[_DREF] || do {
1191 7         61 local $@; my $c = $_[0]->[_CLASS];
  7         100  
1192 7         181 ($c) = $c =~ /(.*)/; # remove tainted'ness
1193 7 50       1484 return $_[0] unless eval qq{ eval { require $c }; $c->can('TIEARRAY') };
1194 7         200 tie my @a, __PACKAGE__, bless([ @{ $_[0] }[ 0..3 ] ], __PACKAGE__);
  7         329  
1195 7         175 $_[0]->[_DREF] = \@a;
1196             };
1197             },
1198             q(%{}) => sub {
1199 43     43   330 no overloading;
  43         106  
  43         8361  
1200 9 50   9   4975 $_[0]->[_DREF] || do {
1201 9         64 local $@; my $c = $_[0]->[_CLASS];
  9         77  
1202 9         180 ($c) = $c =~ /(.*)/; # remove tainted'ness
1203 9 50       2293 return $_[0] unless eval qq{ eval { require $c }; $c->can('TIEHASH') };
1204 9         145 tie my %h, __PACKAGE__, bless([ @{ $_[0] }[ 0..3 ] ], __PACKAGE__);
  9         380  
1205 9         293 $_[0]->[_DREF] = \%h;
1206             };
1207             },
1208             q(${}) => sub {
1209 43     43   559 no overloading;
  43         92  
  43         8268  
1210 0 0   0   0 $_[0]->[_DREF] || do {
1211 0         0 local $@; my $c = $_[0]->[_CLASS];
  0         0  
1212 0         0 ($c) = $c =~ /(.*)/; # remove tainted'ness
1213 0 0       0 return $_[0] unless eval qq{ eval { require $c }; $c->can('TIESCALAR') };
1214 0         0 tie my $s, __PACKAGE__, bless([ @{ $_[0] }[ 0..3 ] ], __PACKAGE__);
  0         0  
1215 0         0 $_[0]->[_DREF] = \$s;
1216             };
1217             },
1218 43         664 fallback => 1
1219 43     43   38554 );
  43         29602  
1220              
1221 43     43   5632 no overloading;
  43         86  
  43         4163  
1222              
1223             my ($_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_chn, $_dat_ex, $_dat_un);
1224              
1225             my $_blessed = \&Scalar::Util::blessed;
1226              
1227             BEGIN {
1228 0         0 $_dat_ex = sub { _croak (
1229             "\nPlease start the shared-manager process manually when ready.\n",
1230             "See section labeled \"Extra Functionality\" in MCE::Shared.\n\n"
1231 43     43   120607 ) };
1232             }
1233              
1234             # Hook for threads.
1235              
1236             sub CLONE {
1237 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
1238 0 0       0 &_init($_tid) if $_tid;
1239             }
1240              
1241             # Private functions.
1242              
1243             sub DESTROY {
1244 113 100   113   3700765 return if $_stopped;
1245 110 50 33     1590 return unless ( $_is_client && defined $_svr_pid && defined $_[0] );
      33        
1246              
1247 110 100 33     1411 if ( $_spawn_child && $_init_pid && $_init_pid eq "$$.$_tid" ) {
      66        
1248 58         343 local ($!, $?);
1249 58 50 33     910 return if ( ! $_svr_pid || waitpid($_svr_pid, _WNOHANG) );
1250             }
1251              
1252 110         520 my $_id = $_[0]->[_ID];
1253              
1254 110 100       457 if ( exists $_new{ $_id } ) {
1255 48 50       175 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
1256              
1257 48 50       266 if ($_new{ $_id } eq $_pid) {
1258 48 50       140 return if $MCE::Signal::KILLED;
1259              
1260             delete($_all{ $_id }), delete($_obj{ $_id }),
1261             delete($_new{ $_id }), delete($_ob2{ $_id }),
1262 48         593 delete($_ob3{"$_id:count"});
1263              
1264 48         770 _req1('M~DES', $_id.$LF);
1265             }
1266             }
1267              
1268 110         1306 return;
1269             }
1270              
1271 0     0   0 sub _croak { goto &MCE::Shared::Base::_croak }
1272              
1273 2     2   17 sub SHARED_ID { $_[0]->[_ID] }
1274              
1275 9     9   47 sub TIEARRAY { $_[1] }
1276 3     3   17 sub TIEHANDLE { $_[1] }
1277 9     9   46 sub TIEHASH { $_[1] }
1278 0     0   0 sub TIESCALAR { $_[1] }
1279              
1280             sub _reset {
1281             MCE::Shared::Object::_init_condvar(
1282             $_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_dat_ex, $_dat_un, $_chn, \%_obj,
1283             $_freeze, $_thaw
1284 84 100   84   681 ) if $INC{'MCE/Shared/Condvar.pm'};
1285              
1286             MCE::Shared::Object::_init_handle(
1287             $_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_dat_ex, $_dat_un, $_chn, \%_obj,
1288             $_freeze, $_thaw
1289 84 100       368 ) if $INC{'MCE/Shared/Handle.pm'};
1290              
1291             MCE::Shared::Object::_init_queue(
1292             $_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_dat_ex, $_dat_un, $_chn, \%_obj,
1293             $_freeze, $_thaw
1294 84 100       758 ) if $INC{'MCE/Shared/Queue.pm'};
1295             }
1296              
1297             sub _start {
1298 43     43   174 $_chn = $_SVR->{_data_channels} + 1;
1299 43         200 $_DAT_LOCK = $_SVR->{'_mutex_'.$_chn};
1300 43         133 $_DAT_W_SOCK = $_SVR->{_dat_w_sock}[0];
1301 43         110 $_DAU_W_SOCK = $_SVR->{_dat_w_sock}[$_chn];
1302              
1303             # inlined for performance
1304             $_dat_ex = sub {
1305 2005 50   2005   6074 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
1306             MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1
1307 2005 50       12892 unless $_DAT_LOCK->{ $_pid };
1308 43         461 };
1309             $_dat_un = sub {
1310 2005 50   2005   9696 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
1311             CORE::syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
1312 2005 50       29562 if $_DAT_LOCK->{ $_pid };
1313 43         245 };
1314              
1315 43         163 _reset();
1316             }
1317              
1318             sub _stop {
1319 15     15   370 $_DAT_LOCK = $_DAT_W_SOCK = $_DAU_W_SOCK = $_chn = $_dat_un = undef;
1320              
1321 0     0   0 $_dat_ex = sub { _croak (
1322             "\nPlease start the shared-manager process manually when ready.\n",
1323             "See section labeled \"Extra Functionality\" in MCE::Shared.\n\n"
1324 15         191 ) };
1325              
1326 15         1230 return;
1327             }
1328              
1329             sub _get_channel_id {
1330 28 50   28   924 local $\ = undef if (defined $\);
1331 28 50       740 local $/ = $LF if ($/ ne $LF);
1332 28         723 local $MCE::Signal::SIG;
1333              
1334 28         303 my $_ret;
1335              
1336             {
1337 28         211 local $MCE::Signal::IPC = 1;
  28         316  
1338 28 50       1485 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1339              
1340 28         3471 print {$_DAT_W_SOCK} 'M~CID'.$LF . $_chn.$LF;
  28         2998  
1341 28         5332 chomp($_ret = <$_DAU_W_SOCK>);
1342              
1343 28 50       1392 $_dat_un->() if !$_is_MSWin32;
1344             }
1345              
1346 28 50       462 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1347              
1348 28         328 return $_ret;
1349             }
1350              
1351             sub _init {
1352 28 50   28   3060 return unless defined $_SVR;
1353              
1354 28   33     2675 my $_id = $_[0] // &_get_channel_id();
1355 28 50       1944 $_id = $$ if ( $_id !~ /\d+/ );
1356              
1357 28         496 $_chn = abs($_id) % $_SVR->{_data_channels} + 1;
1358 28         629 $_DAT_LOCK = $_SVR->{'_mutex_'.$_chn};
1359 28         333 $_DAU_W_SOCK = $_SVR->{_dat_w_sock}[$_chn];
1360              
1361 28         2183 %_new = (), _reset();
1362              
1363 28         200 return;
1364             }
1365              
1366             ###############################################################################
1367             ## ----------------------------------------------------------------------------
1368             ## Private routines.
1369             ##
1370             ###############################################################################
1371              
1372             # Called by AUTOLOAD, STORE, set, and keys.
1373              
1374             sub _auto {
1375 1228 100   1228   4924 my $_wa = !defined wantarray ? _UNDEF : wantarray ? _ARRAY : _SCALAR;
    100          
1376              
1377 1228 50       4350 local $\ = undef if (defined $\);
1378 1228         2480 local $MCE::Signal::SIG;
1379              
1380 1228         2205 my $_buf;
1381              
1382             {
1383 1228         1920 local $MCE::Signal::IPC = 1;
  1228         2054  
1384 1228 50       4561 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1385              
1386 1228 100       23662 if ( @_ == 2 ) {
1387 155         5456 print({$_DAT_W_SOCK} 'M~OB0'.$LF . $_chn.$LF),
1388 155         363 print({$_DAU_W_SOCK} $_[1]->[_ID].$LF . $_[0].$LF . $_wa.$LF);
  155         4138  
1389             }
1390             else {
1391 1073         15496 my ( $_fcn, $_id, $_buf ) = ( shift, shift()->[_ID], $_freeze->([ @_ ]) );
1392 1073         4995 my $_tmp = $_id.$LF . $_fcn.$LF . $_wa.$LF . length($_buf).$LF;
1393 1073         47246 print({$_DAT_W_SOCK} 'M~OBJ'.$LF . $_chn.$LF),
1394 1073         1881 print({$_DAU_W_SOCK} $_tmp, $_buf);
  1073         23476  
1395             }
1396              
1397 1228 100       5274 if ( $_wa ) {
1398 916 50       3404 local $/ = $LF if ($/ ne $LF);
1399 916         612255 chomp(my $_len = <$_DAU_W_SOCK>);
1400 916         7986 read($_DAU_W_SOCK, $_buf, $_len);
1401             }
1402              
1403 1228 50       6188 $_dat_un->() if !$_is_MSWin32;
1404             }
1405              
1406 1228 50       4691 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1407              
1408 1228 100       3941 return unless $_wa;
1409 916 100       14144 return ( $_wa != _ARRAY ) ? $_thaw->($_buf)[0] : @{ $_thaw->($_buf) };
  382         10631  
1410             }
1411              
1412             # Called by MCE::Hobo ( ->join, ->wait_one ).
1413              
1414             sub _get_hobo_data {
1415 49 50 33 49   1292 if ( $_spawn_child && $_init_pid && $_init_pid eq "$$.$_tid" ) {
      33        
1416 49         846 local ($!, $?);
1417 49 50 33     1096 return if ( ! $_svr_pid || waitpid($_svr_pid, _WNOHANG) );
1418             }
1419              
1420 49 50       385 local $\ = undef if (defined $\);
1421 49 50       273 local $/ = $LF if ($/ ne $LF);
1422 49         277 local $MCE::Signal::SIG;
1423              
1424 49         151 my ($_result, $_error);
1425              
1426             {
1427 49         129 local $MCE::Signal::IPC = 1;
  49         120  
1428 49 50       551 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1429              
1430 49         2993 print({$_DAT_W_SOCK} 'O~DAT'.$LF . $_chn.$LF),
1431 49         1384 print({$_DAU_W_SOCK} $_[0]->[_ID].$LF . $_[1].$LF);
  49         1279  
1432              
1433 49         13826 chomp(my $_le1 = <$_DAU_W_SOCK>),
1434             chomp(my $_le2 = <$_DAU_W_SOCK>);
1435              
1436 49 50       504 read($_DAU_W_SOCK, $_error, $_le1) if $_le1;
1437 49 100       414 read($_DAU_W_SOCK, $_result, $_le2) if $_le2;
1438              
1439 49 50       501 $_dat_un->() if !$_is_MSWin32;
1440             }
1441              
1442 49 50       296 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1443              
1444 49         690 return ($_result, $_error);
1445             }
1446              
1447             # Called by await, dequeue_timed, rewind, broadcast, signal, timedwait, and
1448             # wait. Including CLOSE, DESTROY, and destroy.
1449              
1450             sub _req1 {
1451 78 50   78   275 return unless defined $_DAU_W_SOCK; # (in cleanup)
1452              
1453 78 50       361 local $\ = undef if (defined $\);
1454 78 50       258 local $/ = $LF if ($/ ne $LF );
1455 78         174 local $MCE::Signal::SIG;
1456              
1457 78         135 my $_ret;
1458              
1459             {
1460 78         160 local $MCE::Signal::IPC = 1;
  78         208  
1461 78 50       427 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1462              
1463 78         3524 print({$_DAT_W_SOCK} $_[0].$LF . $_chn.$LF),
1464 78         2403 print({$_DAU_W_SOCK} $_[1]);
  78         1105  
1465 78         35141 chomp($_ret = <$_DAU_W_SOCK>);
1466              
1467 78 50       742 $_dat_un->() if !$_is_MSWin32;
1468             }
1469              
1470 78 50       386 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1471              
1472 78         254 $_ret;
1473             }
1474              
1475             # Called by PRINT, PRINTF, STORE, ins_inplace, and set.
1476              
1477             sub _req2 {
1478 224 50   224   481 local $\ = undef if (defined $\);
1479 224         733 local $MCE::Signal::SIG;
1480              
1481             {
1482 224         276 local $MCE::Signal::IPC = 1;
  224         303  
1483 224 50       574 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1484              
1485 224         6718 print({$_DAT_W_SOCK} $_[0].$LF . $_chn.$LF),
1486 224         2909 print({$_DAU_W_SOCK} $_[1], $_[2]);
  224         2358  
1487              
1488 224 50       922 $_dat_un->() if !$_is_MSWin32;
1489             }
1490              
1491 224 50       722 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1492              
1493 224         1040 1;
1494             }
1495              
1496             # Called by CLEAR and clear.
1497              
1498             sub _req3 {
1499 40     40   458 my ( $_fcn, $self ) = @_;
1500              
1501 40 50       308 local $\ = undef if (defined $\);
1502 40 50       211 local $/ = $LF if ($/ ne $LF );
1503 40         151 local $MCE::Signal::SIG;
1504              
1505 40 50       240 delete $self->[_ITER] if defined $self->[_ITER];
1506              
1507             {
1508 40         111 local $MCE::Signal::IPC = 1;
  40         133  
1509 40 50       241 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1510              
1511 40         2178 print({$_DAT_W_SOCK} 'O~CLR'.$LF . $_chn.$LF),
1512 40         805 print({$_DAU_W_SOCK} $self->[_ID].$LF . $_fcn.$LF);
  40         1283  
1513              
1514 40 50       368 $_dat_un->() if !$_is_MSWin32;
1515             }
1516              
1517 40 50       301 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1518              
1519 40         387 return;
1520             }
1521              
1522             # Called by FETCH and get.
1523              
1524             sub _req4 {
1525 157 50   157   786 local $\ = undef if (defined $\);
1526 157 50       697 local $/ = $LF if ($/ ne $LF );
1527 157         527 local $MCE::Signal::SIG;
1528              
1529 157         480 my ( $_key, $_len, $_buf );
1530              
1531 157 100       543 if ( @_ == 3 ) {
1532 104 50       1592 $_key = ref($_[2]) ? $_[2].'0' : $_freeze->(\$_[2]).'1';
1533             }
1534              
1535             {
1536 157         372 local $MCE::Signal::IPC = 1;
  157         441  
1537 157 50       1021 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1538              
1539 157         6958 print({$_DAT_W_SOCK} 'O~FCH'.$LF . $_chn.$LF),
1540 157         3262 print({$_DAU_W_SOCK} $_[1]->[_ID].$LF . $_[0].$LF . length($_key).$LF, $_key);
  157         3661  
1541 157         41973 chomp($_len = <$_DAU_W_SOCK>);
1542              
1543 157 100       2073 read($_DAU_W_SOCK, $_buf, $_len) if ($_len >= 0);
1544              
1545 157 50       1018 $_dat_un->() if !$_is_MSWin32;
1546             }
1547              
1548 157 50       739 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1549              
1550 157 100       638 return undef if ($_len < 0);
1551              
1552 156 50 33     904 if ( $_[1]->[_DECODE] && $_[0] eq 'FETCH' ) {
1553 0         0 local $@; $_buf = $_thaw->($_buf)[0];
  0         0  
1554 0   0     0 return eval { $_[1]->[_DECODE]->($_buf) } || $_buf;
1555             }
1556              
1557 156         4837 $_thaw->($_buf)[0];
1558             }
1559              
1560             # Called by FETCHSIZE, SCALAR, keys, and pending.
1561              
1562             sub _size {
1563 4 50   4   33 local $\ = undef if (defined $\);
1564 4 50       32 local $/ = $LF if ($/ ne $LF );
1565 4         16 local $MCE::Signal::SIG;
1566              
1567 4         15 my $_size;
1568              
1569             {
1570 4         8 local $MCE::Signal::IPC = 1;
  4         29  
1571 4 50       39 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1572              
1573 4         154 print({$_DAT_W_SOCK} 'O~SZE'.$LF . $_chn.$LF),
1574 4         72 print({$_DAU_W_SOCK} $_[1]->[_ID].$LF . $_[0].$LF);
  4         169  
1575 4         867 chomp($_size = <$_DAU_W_SOCK>);
1576              
1577 4 50       201 $_dat_un->() if !$_is_MSWin32;
1578             }
1579              
1580 4 50       47 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1581              
1582 4 50       72 length($_size) ? int($_size) : undef;
1583             }
1584              
1585             ###############################################################################
1586             ## ----------------------------------------------------------------------------
1587             ## Common methods.
1588             ##
1589             ###############################################################################
1590              
1591             our $AUTOLOAD; # MCE::Shared::Object::
1592              
1593             sub AUTOLOAD {
1594 235     235   97106 my $_fcn = $AUTOLOAD; substr($_fcn, 0, rindex($_fcn,':') + 1, '');
  235         1118  
1595              
1596             # save this method for future calls
1597 43     43   478 no strict 'refs';
  43         119  
  43         128216  
1598 235     976   2441 *{ $AUTOLOAD } = sub { _auto($_fcn, @_) };
  235         2105  
  976         416029  
1599              
1600 235         586 goto &{ $AUTOLOAD };
  235         1253  
1601             }
1602              
1603             # blessed ( )
1604              
1605             sub blessed {
1606 17     17   1079 $_[0]->[_CLASS];
1607             }
1608              
1609             # decoder ( CODE )
1610             # decoder ( )
1611              
1612             sub decoder {
1613 0 0 0 0   0 $_[0]->[_DECODE] = $_[1] if (@_ == 2 && (ref $_[1] eq 'CODE' || !$_[1]));
      0        
1614 0         0 $_[0]->[_DECODE];
1615             }
1616              
1617             # encoder ( CODE )
1618             # encoder ( )
1619              
1620             sub encoder {
1621 0 0 0 0   0 $_[0]->[_ENCODE] = $_[1] if (@_ == 2 && (ref $_[1] eq 'CODE' || !$_[1]));
      0        
1622 0         0 $_[0]->[_ENCODE];
1623             }
1624              
1625             # destroy ( { unbless => 1 } )
1626             # destroy ( )
1627              
1628             sub destroy {
1629 0     0   0 my $_id = $_[0]->[_ID];
1630 0 0 0     0 my $_un = (ref $_[1] eq 'HASH' && $_[1]->{'unbless'}) ? 1 : 0;
1631 0 0       0 my $_item = (defined wantarray) ? $_[0]->export({ unbless => $_un }) : undef;
1632 0 0       0 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
1633              
1634 0         0 delete($_all{ $_id }), delete($_obj{ $_id });
1635              
1636 0 0 0     0 if (defined $_svr_pid && exists $_new{ $_id } && $_new{ $_id } eq $_pid) {
      0        
1637 0         0 delete($_new{ $_id }), _req1('M~DES', $_id.$LF);
1638             }
1639              
1640 0         0 $_[0] = undef;
1641 0         0 $_item;
1642             }
1643              
1644             # export ( { unbless => 1 }, key [, key, ... ] )
1645             # export ( key [, key, ... ] )
1646             # export ( )
1647              
1648             sub export {
1649 0     0   0 my $_ob = shift;
1650 0         0 my $_id = $_ob->[_ID];
1651 0 0       0 my $_lkup = ref($_[0]) eq 'HASH' ? shift : {};
1652              
1653             # safety for circular references to not loop endlessly
1654 0 0       0 return $_lkup->{ $_id } if exists $_lkup->{ $_id };
1655              
1656 0 0       0 my $_tmp = @_ ? $_freeze->([ @_ ]) : '';
1657 0         0 my $_buf = $_id.$LF . length($_tmp).$LF;
1658 0         0 my $_class = $_ob->[_CLASS];
1659 0         0 my $_item;
1660              
1661 0         0 { local $@; MCE::Shared::_use($_class); }
  0         0  
1662              
1663             {
1664 0 0       0 local $\ = undef if (defined $\);
  0         0  
  0         0  
1665 0 0       0 local $/ = $LF if ($/ ne $LF);
1666 0         0 local $MCE::Signal::SIG;
1667              
1668 0         0 my $_len;
1669              
1670             {
1671 0         0 local $MCE::Signal::IPC = 1;
  0         0  
1672 0 0       0 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1673              
1674 0         0 print({$_DAT_W_SOCK} 'M~EXP'.$LF . $_chn.$LF),
1675 0         0 print({$_DAU_W_SOCK} $_buf, $_tmp); undef $_buf;
  0         0  
  0         0  
1676 0         0 chomp($_len = <$_DAU_W_SOCK>);
1677              
1678 0 0       0 read($_DAU_W_SOCK, $_buf, $_len) if ($_len >= 0);
1679              
1680 0 0       0 $_dat_un->() if !$_is_MSWin32;
1681             }
1682              
1683 0 0       0 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1684              
1685 0 0       0 return undef if ($_len < 0);
1686              
1687 0         0 $_item = $_lkup->{ $_id } = Storable::thaw($_buf);
1688 0         0 undef $_buf;
1689             }
1690              
1691 0         0 my $_data; local $_;
  0         0  
1692              
1693             ## no critic
1694 0 0 0     0 if ( $_class->isa('MCE::Shared::Array') || $_class->isa('Tie::StdArray') ) {
    0 0        
    0 0        
1695 0 0 0     0 map { $_ = $_->export($_lkup) if $_blessed->($_) && $_->can('export')
1696 0         0 } @{ $_item };
  0         0  
1697              
1698 0 0       0 return $_lkup->{ $_id } = [ @{ $_item } ] if $_lkup->{'unbless'};
  0         0  
1699             }
1700             elsif ( $_class->isa('MCE::Shared::Hash') || $_class->isa('Tie::StdHash') ) {
1701 0 0 0     0 map { $_ = $_->export($_lkup) if $_blessed->($_) && $_->can('export')
1702 0         0 } CORE::values %{ $_item };
  0         0  
1703              
1704 0 0       0 return $_lkup->{ $_id } = { %{ $_item } } if $_lkup->{'unbless'};
  0         0  
1705             }
1706             elsif ( $_class->isa('MCE::Shared::Scalar') || $_class->isa('Tie::StdScalar') ) {
1707 0 0 0     0 if ( $_blessed->(${ $_item }) && ${ $_item }->can('export') ) {
  0         0  
  0         0  
1708 0         0 ${ $_item } = ${ $_item }->export($_lkup);
  0         0  
  0         0  
1709             }
1710 0 0       0 return $_lkup->{ $_id } = \do { my $o = ${ $_item } } if $_lkup->{'unbless'};
  0         0  
  0         0  
1711             }
1712             else {
1713 0 0       0 if ( $_class->isa('MCE::Shared::Ordhash') ) { $_data = $_item->[0] }
  0 0       0  
    0          
    0          
    0          
1714 0         0 elsif ( $_class->isa('MCE::Shared::Cache') ) { $_data = $_item->[0] }
1715 0         0 elsif ( $_class->isa('Hash::Ordered') ) { $_data = $_item->[0] }
1716 0         0 elsif ( $_class->isa('Tie::ExtraHash') ) { $_data = $_item->[0] }
1717 0         0 elsif ( $_class->isa('Tie::IxHash') ) { $_data = $_item->[2] }
1718              
1719 0 0       0 if ( reftype $_data eq 'ARRAY' ) {
    0          
1720 0 0 0     0 map { $_ = $_->export($_lkup) if $_blessed->($_) && $_->can('export')
1721 0         0 } @{ $_data };
  0         0  
1722             }
1723             elsif ( reftype $_data eq 'HASH' ) {
1724 0 0 0     0 map { $_ = $_->export($_lkup) if $_blessed->($_) && $_->can('export')
1725 0         0 } values %{ $_data };
  0         0  
1726             }
1727             }
1728              
1729 0         0 $_item;
1730             }
1731              
1732             # iterator ( index [, index, ... ] ) # Array
1733             # iterator ( key [, key, ... ] ) # Cache, Hash, Ordhash
1734             # iterator ( "query string" ) # Cache, Hash, Ordhash, Array
1735             # iterator ( )
1736              
1737             sub iterator {
1738 2     2   15 my ( $self, @keys ) = @_;
1739              
1740 2         13 my $pkg = $self->blessed();
1741 2 50 33     36 my $flg = ($pkg->can('NEXTKEY') || $pkg->can('keys')) ? 1 : 0;
1742 2 0       26 my $get = $pkg->can('FETCH') ? 'FETCH' : $pkg->can('get') ? 'get' : '';
    50          
1743              
1744 2 50 33     28 unless ( ($flg || $pkg->can('FETCHSIZE')) && $get ) {
      33        
1745 0     0   0 return sub {};
1746             }
1747              
1748             # MCE::Shared::{ Array, Cache, Hash, Ordhash }, Hash::Ordered,
1749             # or similar module.
1750              
1751 2 50       20 $get = 'peek' if $pkg->isa('MCE::Shared::Cache');
1752              
1753 2 50 0     7 if ( ! @keys ) {
    0          
    0          
1754 2         7 @keys = $self->keys;
1755             }
1756             elsif ( @keys == 1 && $keys[0] =~ /^(?:key|val)[ ]+\S\S?[ ]+\S/ ) {
1757 0 0   0   0 return sub {} unless $pkg->isa('MCE::Shared::Common');
1758 0         0 @keys = $self->keys($keys[0]);
1759             }
1760             elsif ( $pkg->isa('MCE::Shared::Cache') ) {
1761 0         0 $self->_prune_head();
1762             }
1763              
1764             return sub {
1765 10 100   10   108 return unless @keys;
1766 8         14 my $key = shift @keys;
1767 8         50 return ( $key => $self->$get($key) );
1768 2         53 };
1769             }
1770              
1771             # rewind ( index [, index, ... ] ) # Array
1772             # rewind ( key [, key, ... ] ) # Cache, Hash, Ordhash
1773             # rewind ( "query string" ) # Cache, Hash, Ordhash, Array
1774             # rewind ( begin, end [, step, format ] ) # Sequence
1775             # rewind ( )
1776              
1777             sub rewind {
1778 18     18   11383 my $_id = shift()->[_ID];
1779 18         279 my $_buf = $_freeze->([ @_ ]);
1780 18         171 _req1('M~IRW', $_id.$LF . length($_buf).$LF . $_buf);
1781              
1782 18         76 return;
1783             }
1784              
1785             # next ( )
1786              
1787             sub next {
1788 127 50   127   1332 local $\ = undef if (defined $\);
1789 127 50       370 local $/ = $LF if ($/ ne $LF);
1790 127         227 local $MCE::Signal::SIG;
1791              
1792 127         255 my ( $_len, $_buf );
1793              
1794             {
1795 127         197 local $MCE::Signal::IPC = 1;
  127         202  
1796 127 50       480 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1797              
1798 127         5207 print({$_DAT_W_SOCK} 'M~INX'.$LF . $_chn.$LF),
1799 127         1925 print({$_DAU_W_SOCK} $_[0]->[_ID].$LF);
  127         1937  
1800 127         29704 chomp($_len = <$_DAU_W_SOCK>);
1801              
1802 127 100       1205 read($_DAU_W_SOCK, $_buf, $_len) if ($_len >= 0);
1803              
1804 127 50       550 $_dat_un->() if !$_is_MSWin32;
1805             }
1806              
1807 127 50       494 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1808              
1809 127 100       344 return if ($_len < 0);
1810              
1811 112 100       177 my $_b; return wantarray ? () : undef unless @{ $_b = $_thaw->($_buf) };
  112 100       154  
  112         1450  
1812              
1813 105 50       541 if ( $_[0]->[_DECODE] ) {
1814 0   0     0 local $@; $_b->[-1] = eval { $_[0]->[_DECODE]->($_b->[-1]) } || $_b->[-1];
  0         0  
1815             }
1816              
1817             ( wantarray )
1818 105 100       596 ? @{ $_b } == 2 ? ( $_b->[0], delete $_b->[-1] ) : @{ $_b }
  24 100       152  
  12         51  
1819             : delete $_b->[-1];
1820             }
1821              
1822             ###############################################################################
1823             ## ----------------------------------------------------------------------------
1824             ## Methods optimized for:
1825             ## MCE::Shared::{ Array, Hash, Ordhash, Scalar } and similar.
1826             ##
1827             ###############################################################################
1828              
1829             sub ins_inplace {
1830 0     0   0 my $_id = shift()->[_ID];
1831              
1832 0 0       0 if ( @_ ) {
1833 0         0 my $_tmp = Storable::freeze([ @_ ]);
1834 0         0 my $_buf = $_id.$LF . length($_tmp).$LF;
1835 0         0 _req2('O~PDL', $_buf, $_tmp);
1836             }
1837              
1838 0         0 return;
1839             }
1840              
1841 4     4   1525 sub FETCHSIZE { _size('FETCHSIZE', @_) }
1842 0     0   0 sub SCALAR { _size('SCALAR' , @_) }
1843 6     6   181 sub CLEAR { _req3('CLEAR' , @_) }
1844 98     98   19353693 sub FETCH { _req4('FETCH' , @_) }
1845              
1846             sub clear {
1847 34 50   34   14077 @_ > 1 ? _auto('clear', @_) : _req3('clear', @_);
1848             }
1849             sub get {
1850 59 50   59   1070 @_ > 2 ? _auto('get', @_) : _req4('get', @_);
1851             }
1852              
1853             sub FIRSTKEY {
1854 6     6   317 $_[0]->[_ITER] = [ $_[0]->keys ];
1855 6         43 shift @{ $_[0]->[_ITER] };
  6         188  
1856             }
1857             sub NEXTKEY {
1858 9     9   24 shift @{ $_[0]->[_ITER] };
  9         117  
1859             }
1860              
1861             sub STORE {
1862 58 50 33 58   1589 if ( @_ > 1 && $_[0]->[_ENCODE] ) {
    50 66        
    100 33        
1863 0 0       0 $_[-1] = $_[0]->[_ENCODE]->($_[-1]) if ref($_[-1]);
1864             }
1865             elsif ( @_ == 2 && $_blessed->($_[1]) && $_[1]->can('SHARED_ID') ) {
1866 0         0 _req2('M~DEE', $_[0]->[_ID].$LF, $_[1]->SHARED_ID().$LF);
1867 0         0 delete $_new{ $_[1]->SHARED_ID() };
1868             }
1869             elsif ( ref $_[2] ) {
1870 4 50 33     305 if ( $_blessed->($_[2]) && $_[2]->can('SHARED_ID') ) {
    100 100        
1871 0         0 _req2('M~DEE', $_[0]->[_ID].$LF, $_[2]->SHARED_ID().$LF);
1872 0         0 delete $_new{ $_[2]->SHARED_ID() };
1873             }
1874             elsif ( $_[0]->[1]->isa('MCE::Shared::Array') ||
1875             $_[0]->[1]->isa('MCE::Shared::Hash') ) {
1876 2         58 $_[2] = MCE::Shared::share({ _DEEPLY_ => 1 }, $_[2]);
1877 2         42 _req2('M~DEE', $_[0]->[_ID].$LF, $_[2]->SHARED_ID().$LF);
1878             }
1879             }
1880 58         218 _auto('STORE', @_); 1;
  58         288  
1881             }
1882              
1883             sub set {
1884 45 50   45   3748 if ( ref $_[2] ) {
1885 0 0 0     0 if ( $_blessed->($_[2]) && $_[2]->can('SHARED_ID') ) {
1886 0         0 _req2('M~DEE', $_[0]->[_ID].$LF, $_[2]->SHARED_ID().$LF);
1887 0         0 delete $_new{ $_[2]->SHARED_ID() };
1888             }
1889             }
1890 45         308 _auto('set', @_);
1891             }
1892              
1893             sub keys {
1894 145 50 66 145   1119 ( @_ == 1 && !wantarray ) ? _size('keys', @_) : _auto('keys', @_);
1895             }
1896              
1897             sub lock {
1898 6     6   2003429 my ( $self ) = @_;
1899 6 50       53 Carp::croak( sprintf(
1900             "Mutex not enabled for the shared %s instance", $self->[_CLASS]
1901             )) unless $self->[_MUTEX];
1902              
1903 6         185 $self->[_MUTEX]->lock();
1904             }
1905              
1906             sub unlock {
1907 1     1   21 my ( $self ) = @_;
1908 1 50       20 Carp::croak( sprintf(
1909             "Mutex not enabled for the shared %s instance", $self->[_CLASS]
1910             )) unless $self->[_MUTEX];
1911              
1912 1         19 $self->[_MUTEX]->unlock();
1913             }
1914              
1915             {
1916 43     43   437 no strict 'refs'; *{ __PACKAGE__.'::store' } = \&STORE;
  43         191  
  43         3842  
1917             }
1918              
1919             1;
1920              
1921             __END__