File Coverage

blib/lib/MCE/Channel/MutexFast.pm
Criterion Covered Total %
statement 151 159 94.9
branch 82 110 74.5
condition 13 27 48.1
subroutine 17 17 100.0
pod 11 11 100.0
total 274 324 84.5


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Channel for producer(s) and many consumers supporting processes and threads.
4             ##
5             ###############################################################################
6              
7             package MCE::Channel::MutexFast;
8              
9 13     13   2313 use strict;
  13         26  
  13         402  
10 13     13   86 use warnings;
  13         58  
  13         374  
11              
12 13     13   64 no warnings qw( uninitialized once );
  13         25  
  13         752  
13              
14             our $VERSION = '1.888';
15              
16 13     13   153 use base 'MCE::Channel';
  13         26  
  13         1349  
17 13     13   925 use MCE::Mutex ();
  13         27  
  13         22485  
18              
19             my $LF = "\012"; Internals::SvREADONLY($LF, 1);
20              
21             sub new {
22 13     13 1 78 my ( $class, %obj ) = ( @_, impl => 'MutexFast' );
23              
24 13         87 $obj{init_pid} = MCE::Channel::_pid();
25 13         65 MCE::Util::_sock_pair( \%obj, 'p_sock', 'c_sock' );
26              
27             # locking for the consumer side of the channel
28 13         126 $obj{c_mutex} = MCE::Mutex->new( impl => 'Channel2' );
29              
30             # optionally, support many-producers writing and reading
31 13 100       57 $obj{p_mutex} = MCE::Mutex->new( impl => 'Channel2' ) if $obj{mp};
32              
33 13         41 bless \%obj, $class;
34              
35 13         57 MCE::Mutex::Channel::_save_for_global_cleanup($obj{c_mutex});
36 13 100       45 MCE::Mutex::Channel::_save_for_global_cleanup($obj{p_mutex}) if $obj{mp};
37              
38 13         53 return \%obj;
39             }
40              
41             END {
42 13 100   13   283 MCE::Child->finish('MCE') if $INC{'MCE/Child.pm'};
43             }
44              
45             ###############################################################################
46             ## ----------------------------------------------------------------------------
47             ## Queue-like methods.
48             ##
49             ###############################################################################
50              
51             sub end {
52 2     2 1 24 my ( $self ) = @_;
53              
54 2 50       11 local $\ = undef if (defined $\);
55 2         4 print { $self->{p_sock} } pack('i', -1);
  2         20  
56              
57 2         14 $self->{ended} = 1;
58             }
59              
60             sub enqueue {
61 26     26 1 3784 my $self = shift;
62 26 100       74 return MCE::Channel::_ended('enqueue') if $self->{ended};
63              
64 24 50       68 local $\ = undef if (defined $\);
65 24         37 my $p_mutex = $self->{p_mutex};
66 24 100       72 $p_mutex->lock2 if $p_mutex;
67              
68 24         55 while ( @_ ) {
69 44         104 my $data = ''.shift;
70 44         57 print { $self->{p_sock} } pack('i', length $data), $data;
  44         587  
71             }
72              
73 24 100       94 $p_mutex->unlock2 if $p_mutex;
74              
75 24         59 return 1;
76             }
77              
78             sub dequeue {
79 16     16 1 58 my ( $self, $count ) = @_;
80 16 100 66     50 $count = 1 if ( !$count || $count < 1 );
81              
82 16 100       38 if ( $count == 1 ) {
83 14         55 ( my $c_mutex = $self->{c_mutex} )->lock;
84 14         51 MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 );
85              
86 14         60 my $len = unpack('i', $plen);
87 14 50       40 if ( $len < 0 ) {
88 0         0 $self->end, $c_mutex->unlock;
89 0 0       0 return wantarray ? () : undef;
90             }
91              
92 14 100       50 MCE::Channel::_read( $self->{c_sock}, my($data), $len ) if $len;
93 14         51 $c_mutex->unlock;
94              
95 14 100       117 $len ? $data : '';
96             }
97             else {
98 2         7 my ( $plen, @ret );
99              
100 2         18 ( my $c_mutex = $self->{c_mutex} )->lock;
101              
102 2         15 while ( $count-- ) {
103 6         21 MCE::Util::_sysread( $self->{c_sock}, $plen, 4 );
104              
105 6         22 my $len = unpack('i', $plen);
106 6 50       18 if ( $len < 0 ) {
107 0         0 $self->end;
108 0         0 last;
109             }
110              
111 6 50       12 push(@ret, ''), next unless $len;
112 6         21 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
113 6         19 push @ret, $data;
114             }
115              
116 2         10 $c_mutex->unlock;
117              
118 2 50       22 wantarray ? @ret : $ret[-1];
119             }
120             }
121              
122             sub dequeue_nb {
123 20     20 1 86 my ( $self, $count ) = @_;
124 20 100 66     75 $count = 1 if ( !$count || $count < 1 );
125              
126 20         31 my ( $plen, @ret );
127 20         81 ( my $c_mutex = $self->{c_mutex} )->lock;
128              
129 20         71 while ( $count-- ) {
130 24         72 MCE::Util::_nonblocking( $self->{c_sock}, 1 );
131 24         74 MCE::Util::_sysread( $self->{c_sock}, $plen, 4 );
132 24         86 MCE::Util::_nonblocking( $self->{c_sock}, 0 );
133              
134 24 50       33 my $len; $len = unpack('i', $plen) if $plen;
  24         103  
135 24 100 66     102 if ( !$len || $len < 0 ) {
136 4 50 33     37 $self->end if defined $len && $len < 0;
137 4 50 33     25 push @ret, '' if defined $len && $len == 0;
138 4         9 last;
139             }
140              
141 20         70 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
142 20         72 push @ret, $data;
143             }
144              
145 20         68 $c_mutex->unlock;
146              
147 20 100       205 wantarray ? @ret : $ret[-1];
148             }
149              
150             ###############################################################################
151             ## ----------------------------------------------------------------------------
152             ## Methods for two-way communication; producer to consumer.
153             ##
154             ###############################################################################
155              
156             sub send {
157 38     38 1 6707 my $self = shift;
158 38 100       174 return MCE::Channel::_ended('send') if $self->{ended};
159              
160 36         98 my $data = ''.shift;
161              
162 36 50       124 local $\ = undef if (defined $\);
163 36         66 my $p_mutex = $self->{p_mutex};
164 36 100       91 $p_mutex->lock2 if $p_mutex;
165              
166 36         58 print { $self->{p_sock} } pack('i', length $data), $data;
  36         2509  
167 36 100       210 $p_mutex->unlock2 if $p_mutex;
168              
169 36         105 return 1;
170             }
171              
172             sub recv {
173 9     9 1 209 my ( $self ) = @_;
174              
175 9         443 ( my $c_mutex = $self->{c_mutex} )->lock;
176 9         94 MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 );
177              
178 9         63 my $len = unpack('i', $plen);
179 9 50       68 if ( $len < 0 ) {
180 0         0 $self->end, $c_mutex->unlock;
181 0 0       0 return wantarray ? () : undef;
182             }
183              
184 9 100       44 MCE::Channel::_read( $self->{c_sock}, my($data), $len ) if $len;
185 9         119 $c_mutex->unlock;
186              
187 9 100       72 $len ? $data : '';
188             }
189              
190             sub recv_nb {
191 6     6 1 23 my ( $self ) = @_;
192              
193 6         26 ( my $c_mutex = $self->{c_mutex} )->lock;
194 6         23 MCE::Util::_nonblocking( $self->{c_sock}, 1 );
195 6         22 MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 );
196 6         27 MCE::Util::_nonblocking( $self->{c_sock}, 0 );
197              
198 6 50       16 my $len; $len = unpack('i', $plen) if $plen;
  6         30  
199 6 100 66     27 if ( !$len || $len < 0 ) {
200 4 50 33     24 $self->end if defined $len && $len < 0;
201 4         18 $c_mutex->unlock;
202 4 50 33     41 return '' if defined $len && $len == 0;
203 0 0       0 return wantarray ? () : undef;
204             }
205              
206 2         9 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
207 2         11 $c_mutex->unlock;
208              
209 2         12 $data;
210             }
211              
212             ###############################################################################
213             ## ----------------------------------------------------------------------------
214             ## Methods for two-way communication; consumer to producer.
215             ##
216             ###############################################################################
217              
218             sub send2 {
219 12     12 1 2175 my $self = shift;
220 12         27 my $data = ''.shift;
221              
222 12 50       39 local $\ = undef if (defined $\);
223 12         18 local $MCE::Signal::SIG;
224              
225             {
226 12         18 local $MCE::Signal::IPC = 1;
  12         16  
227 12         46 ( my $c_mutex = $self->{c_mutex} )->lock2;
228              
229 12         17 print { $self->{c_sock} } pack('i', length $data), $data;
  12         179  
230 12         58 $c_mutex->unlock2;
231             }
232              
233 12 50       40 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
234              
235 12         26 return 1;
236             }
237              
238             sub recv2 {
239 6     6 1 23 my ( $self ) = @_;
240 6         13 my ( $plen, $data );
241              
242 6 50       19 local $/ = $LF if ( $/ ne $LF );
243 6         12 my $p_mutex = $self->{p_mutex};
244 6 100       20 $p_mutex->lock if $p_mutex;
245              
246             ( $p_mutex )
247             ? MCE::Util::_sysread( $self->{p_sock}, $plen, 4 )
248 6 100       67 : read( $self->{p_sock}, $plen, 4 );
249              
250 6         33 my $len = unpack('i', $plen);
251              
252 6 100       19 if ( $len ) {
253             ( $p_mutex )
254             ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
255 2 100       14 : read( $self->{p_sock}, $data, $len );
256             }
257              
258 6 100       38 $p_mutex->unlock if $p_mutex;
259              
260 6 100       51 $len ? $data : '';
261             }
262              
263             sub recv2_nb {
264 6     6 1 21 my ( $self ) = @_;
265 6         11 my ( $plen, $data );
266              
267 6 50       25 local $/ = $LF if ( $/ ne $LF );
268 6         11 my $p_mutex = $self->{p_mutex};
269 6 100       22 $p_mutex->lock if $p_mutex;
270              
271 6         24 MCE::Util::_nonblocking( $self->{p_sock}, 1 );
272              
273             ( $p_mutex )
274             ? MCE::Util::_sysread( $self->{p_sock}, $plen, 4 )
275 6 100       49 : read( $self->{p_sock}, $plen, 4 );
276              
277 6         26 MCE::Util::_nonblocking( $self->{p_sock}, 0 );
278              
279 6 50       11 my $len; $len = unpack('i', $plen) if $plen;
  6         28  
280 6 100       18 if ( !$len ) {
281 4 100       19 $p_mutex->unlock if $p_mutex;
282 4 50 33     40 return '' if defined $len && $len == 0;
283 0 0       0 return wantarray ? () : undef;
284             }
285              
286             ( $p_mutex )
287             ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
288 2 100       11 : read( $self->{p_sock}, $data, $len );
289              
290 2 100       11 $p_mutex->unlock if $p_mutex;
291              
292 2         13 $data;
293             }
294              
295             1;
296              
297             __END__