File Coverage

blib/lib/MCE/Core/Input/Request.pm
Criterion Covered Total %
statement 82 102 80.3
branch 30 58 51.7
condition 5 15 33.3
subroutine 4 8 50.0
pod n/a
total 121 183 66.1


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Array reference and Glob reference input reader.
4             ##
5             ## This package provides the request chunk method used internally by the worker
6             ## process. Distribution follows a bank-queuing model.
7             ##
8             ## There is no public API.
9             ##
10             ###############################################################################
11              
12             package MCE::Core::Input::Request;
13              
14 35     35   1200 use strict;
  35         76  
  35         1065  
15 35     35   226 use warnings;
  35         69  
  35         2094  
16              
17             our $VERSION = '1.889';
18              
19             ## Items below are folded into MCE.
20              
21             package # hide from rpm
22             MCE;
23              
24 35     35   257 no warnings qw( threads recursion uninitialized );
  35         62  
  35         34400  
25              
26             ###############################################################################
27             ## ----------------------------------------------------------------------------
28             ## Worker process -- Request chunk.
29             ##
30             ###############################################################################
31              
32             sub _worker_request_chunk {
33              
34 53     53   518 my ($self, $_proc_type) = @_;
35              
36 53         96 @_ = ();
37              
38             _croak('MCE::_worker_request_chunk: (user_func) is not specified')
39 53 50       184 unless (defined $self->{user_func});
40              
41 53 50       309 my $_is_MSWin32 = ($^O eq 'MSWin32') ? 1 : 0;
42 53         120 my $_chn = $self->{_chn};
43 53         112 my $_DAT_LOCK = $self->{_dat_lock};
44 53         91 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
45 53         110 my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
46 53         139 my $_lock_chn = $self->{_lock_chn};
47 53         103 my $_chunk_size = $self->{chunk_size};
48 53         114 my $_use_slurpio = $self->{use_slurpio};
49 53   33     766 my $_RS = $self->{RS} || $/;
50 53         134 my $_wuf = $self->{_wuf};
51              
52 53         113 my ($_dat_ex, $_dat_un, $_pid);
53              
54 53 50       167 if ($_lock_chn) {
55 0 0       0 $_pid = $INC{'threads.pm'} ? $$ .'.'. threads->tid() : $$;
56              
57             # inlined for performance
58             $_dat_ex = sub {
59             CORE::lock($_DAT_LOCK->{_t_lock}), MCE::Util::_sock_ready($_DAT_LOCK->{_r_sock})
60 0 0   0   0 if $_is_MSWin32;
61             MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1
62 0 0       0 unless $_DAT_LOCK->{ $_pid };
63 0         0 };
64             $_dat_un = sub {
65             syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
66 0 0   0   0 if $_DAT_LOCK->{ $_pid };
67 0         0 };
68             }
69              
70 53         590 my ($_chunk_id, $_len, $_output_tag);
71 53         0 my ($_chop_len, $_chop_str, $_p);
72              
73 53 100       221 if ($_proc_type == REQUEST_ARRAY) {
    100          
74 36         168 $_output_tag = OUTPUT_A_REF;
75 36         87 $_chop_len = 0;
76             }
77             elsif ($_proc_type == REQUEST_HASH) {
78 3         29 $_output_tag = OUTPUT_H_REF;
79 3         11 $_chop_len = 0;
80             }
81             else {
82 14         86 $_output_tag = OUTPUT_G_REF;
83 14 50 33     83 if (length $_RS > 1 && substr($_RS, 0, 1) eq "\n") {
84 0         0 $_chop_str = substr($_RS, 1);
85 0         0 $_chop_len = length $_chop_str;
86             } else {
87 14         164 $_chop_str = '';
88 14         46 $_chop_len = 0;
89             }
90             }
91              
92             ## -------------------------------------------------------------------------
93              
94 53     0   452 $self->{_next_jmp} = sub { goto _WORKER_REQUEST_CHUNK__NEXT; };
  0         0  
95 53     0   363 $self->{_last_jmp} = sub { goto _WORKER_REQUEST_CHUNK__LAST; };
  0         0  
96              
97 53         135 local $_;
98              
99             _WORKER_REQUEST_CHUNK__NEXT:
100              
101 53         533 while (1) {
102 197 50       742 undef $_ if (length > MAX_GC_SIZE);
103              
104 197         539 $_ = '';
105              
106             ## Obtain the next chunk of data.
107             {
108 197 50       512 local $\ = undef if (defined $\);
  197         544  
109 197 50       548 local $/ = $LF if ($/ ne $LF );
110              
111 197 50       800 $_dat_ex->() if $_lock_chn;
112 197         283 print {$_DAT_W_SOCK} $_output_tag . $LF . $_chn . $LF;
  197         2527  
113 197 50       915 MCE::Util::_sock_ready($_DAU_W_SOCK, -1) if $_is_MSWin32;
114 197         85374 chomp($_len = <$_DAU_W_SOCK>);
115              
116 197 100       1513 unless ($_len) {
117 53 50       236 $_dat_un->() if $_lock_chn;
118 53         377 return;
119             }
120              
121 144         450 chomp($_chunk_id = <$_DAU_W_SOCK>);
122              
123 144 50 66     1226 if ($_chunk_id > 1 && $_chop_len) {
124 0         0 $_p = $_chop_len; $_ = $_chop_str;
  0         0  
125             } else {
126 144         410 $_p = 0;
127             }
128              
129 144         636 read $_DAU_W_SOCK, $_, $_len, $_p;
130              
131 144 50       415 $_dat_un->() if $_lock_chn;
132             }
133              
134             ## Call user function.
135 144 100       571 if ($_proc_type == REQUEST_ARRAY) {
    100          
136 122         1688 my $_chunk_ref = $self->{thaw}($_); undef $_;
  122         326  
137 122 100       445 $_ = ($_chunk_size == 1) ? $_chunk_ref->[0] : $_chunk_ref;
138 122         533 $_wuf->($self, $_chunk_ref, $_chunk_id);
139             }
140             elsif ($_proc_type == REQUEST_HASH) {
141 15         24 my $_chunk_ref = { @{ $self->{thaw}($_) } }; undef $_;
  15         383  
  15         64  
142 15         42 $_ = $_chunk_ref;
143 15         81 $_wuf->($self, $_chunk_ref, $_chunk_id);
144             }
145             else {
146 7         16 $_ = ${ $self->{thaw}($_) };
  7         101  
147 7 100       37 if ($_use_slurpio) {
148 2 50 33     10 if ($_chop_len && substr($_, -$_chop_len) eq $_chop_str) {
149 0         0 substr($_, -$_chop_len, $_chop_len, '');
150             }
151 2         5 local $_ = \$_;
152 2         8 $_wuf->($self, $_, $_chunk_id);
153             }
154             else {
155 5 50       20 if ($_chunk_size == 1) {
156 0 0 0     0 if ($_chop_len && substr($_, -$_chop_len) eq $_chop_str) {
157 0         0 substr($_, -$_chop_len, $_chop_len, '');
158             }
159 0         0 $_wuf->($self, [ $_ ], $_chunk_id);
160             }
161             else {
162 5         19 my @_recs;
163             {
164 5 50       23 local $/ = $_RS if ($/ ne $_RS);
  5         26  
165 5         93 _sync_buffer_to_array(\$_, \@_recs, $_chop_str);
166 5         11 undef $_;
167             }
168 5 50       20 if ($_chop_len) {
169 0         0 for my $i (0 .. @_recs - 1) {
170 0 0       0 if (substr($_recs[$i], -$_chop_len) eq $_chop_str) {
171 0         0 substr($_recs[$i], -$_chop_len, $_chop_len, '');
172             }
173             }
174             }
175 5         18 local $_ = \@_recs;
176 5         24 $_wuf->($self, \@_recs, $_chunk_id);
177             }
178             }
179             }
180             }
181              
182             _WORKER_REQUEST_CHUNK__LAST:
183              
184 0           return;
185             }
186              
187             1;
188              
189             __END__