File Coverage

blib/lib/MCE/Core/Input/Iterator.pm
Criterion Covered Total %
statement 9 56 16.0
branch 0 32 0.0
condition n/a
subroutine 3 8 37.5
pod n/a
total 12 96 12.5


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Iterator reader.
4             ##
5             ## This package, used internally by the worker process, provides support for
6             ## user specified iterators assigned to input_data.
7             ##
8             ## There is no public API.
9             ##
10             ###############################################################################
11              
12             package MCE::Core::Input::Iterator;
13              
14 1     1   966 use strict;
  1         1  
  1         32  
15 1     1   5 use warnings;
  1         2  
  1         53  
16              
17             our $VERSION = '1.888';
18              
19             ## Items below are folded into MCE.
20              
21             package # hide from rpm
22             MCE;
23              
24 1     1   5 no warnings qw( threads recursion uninitialized );
  1         5  
  1         611  
25              
26             ###############################################################################
27             ## ----------------------------------------------------------------------------
28             ## Worker process -- User Iterator.
29             ##
30             ###############################################################################
31              
32             sub _worker_user_iterator {
33              
34 0     0     my ($self) = @_;
35              
36 0           @_ = ();
37              
38             _croak('MCE::_worker_user_iterator: (user_func) is not specified')
39 0 0         unless (defined $self->{user_func});
40              
41 0 0         my $_is_MSWin32 = ($^O eq 'MSWin32') ? 1 : 0;
42 0           my $_chn = $self->{_chn};
43 0           my $_DAT_LOCK = $self->{_dat_lock};
44 0           my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
45 0           my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
46 0           my $_lock_chn = $self->{_lock_chn};
47 0           my $_chunk_size = $self->{chunk_size};
48 0           my $_wuf = $self->{_wuf};
49              
50 0           my ($_dat_ex, $_dat_un, $_pid);
51              
52 0 0         if ($_lock_chn) {
53 0 0         $_pid = $INC{'threads.pm'} ? $$ .'.'. threads->tid() : $$;
54              
55             # inlined for performance
56             $_dat_ex = sub {
57             CORE::lock($_DAT_LOCK->{_t_lock}), MCE::Util::_sock_ready($_DAT_LOCK->{_r_sock})
58 0 0   0     if $_is_MSWin32;
59             MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1
60 0 0         unless $_DAT_LOCK->{ $_pid };
61 0           };
62             $_dat_un = sub {
63             syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
64 0 0   0     if $_DAT_LOCK->{ $_pid };
65 0           };
66             }
67              
68 0           my ($_chunk_id, $_len);
69              
70             ## -------------------------------------------------------------------------
71              
72 0     0     $self->{_next_jmp} = sub { goto _WORKER_USER_ITERATOR__NEXT; };
  0            
73 0     0     $self->{_last_jmp} = sub { goto _WORKER_USER_ITERATOR__LAST; };
  0            
74              
75 0           local $_;
76              
77             _WORKER_USER_ITERATOR__NEXT:
78              
79 0           while (1) {
80 0 0         undef $_ if (length > MAX_GC_SIZE);
81              
82 0           $_ = '';
83              
84             ## Obtain the next chunk of data.
85             {
86 0 0         local $\ = undef if (defined $\);
  0            
87 0 0         local $/ = $LF if ($/ ne $LF );
88              
89 0 0         $_dat_ex->() if $_lock_chn;
90 0           print {$_DAT_W_SOCK} OUTPUT_I_REF . $LF . $_chn . $LF;
  0            
91 0 0         MCE::Util::_sock_ready($_DAU_W_SOCK, -1) if $_is_MSWin32;
92 0           chomp($_len = <$_DAU_W_SOCK>);
93              
94 0 0         if ($_len < 0) {
95 0 0         $_dat_un->() if $_lock_chn;
96 0           return;
97             }
98              
99 0           chomp($_chunk_id = <$_DAU_W_SOCK>);
100 0           read $_DAU_W_SOCK, $_, $_len;
101              
102 0 0         $_dat_un->() if $_lock_chn;
103             }
104              
105             ## Call user function.
106 0           my $_chunk_ref = $self->{thaw}($_); undef $_;
  0            
107 0 0         $_ = ($_chunk_size == 1) ? $_chunk_ref->[0] : $_chunk_ref;
108 0           $_wuf->($self, $_chunk_ref, $_chunk_id);
109             }
110              
111             _WORKER_USER_ITERATOR__LAST:
112              
113 0           return;
114             }
115              
116             1;
117              
118             __END__