File Coverage

blib/lib/MCE/Channel/MutexFast.pm
Criterion Covered Total %
statement 146 154 94.8
branch 81 108 75.0
condition 13 27 48.1
subroutine 18 18 100.0
pod 11 11 100.0
total 269 318 84.5


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Channel for producer(s) and many consumers supporting processes and threads.
4             ##
5             ###############################################################################
6              
7             package MCE::Channel::MutexFast;
8              
9 13     13   3506 use strict;
  13         26  
  13         413  
10 13     13   87 use warnings;
  13         47  
  13         353  
11              
12 13     13   64 no warnings qw( uninitialized once );
  13         26  
  13         700  
13              
14             our $VERSION = '1.889';
15              
16 13     13   65 use base 'MCE::Channel';
  13         26  
  13         1347  
17 13     13   948 use MCE::Mutex ();
  13         16  
  13         25991  
18              
19             my $LF = "\012"; Internals::SvREADONLY($LF, 1);
20              
21             sub new {
22 13     13 1 69 my ( $class, %obj ) = ( @_, impl => 'MutexFast' );
23              
24 13         40 $obj{init_pid} = MCE::Channel::_pid();
25 13         67 MCE::Util::_sock_pair( \%obj, 'p_sock', 'c_sock' );
26              
27             # locking for the consumer side of the channel
28 13         92 $obj{c_mutex} = MCE::Mutex->new( impl => 'Channel2' );
29              
30             # optionally, support many-producers writing and reading
31 13 100       57 $obj{p_mutex} = MCE::Mutex->new( impl => 'Channel2' ) if $obj{mp};
32              
33 13         53 bless \%obj, $class;
34              
35 13         55 MCE::Mutex::Channel::_save_for_global_cleanup($obj{c_mutex});
36 13 100       42 MCE::Mutex::Channel::_save_for_global_cleanup($obj{p_mutex}) if $obj{mp};
37              
38 13         52 return \%obj;
39             }
40              
41             END {
42 13 100   13   187 MCE::Child->finish('MCE') if $INC{'MCE/Child.pm'};
43             }
44              
45             ###############################################################################
46             ## ----------------------------------------------------------------------------
47             ## Queue-like methods.
48             ##
49             ###############################################################################
50              
51             sub end {
52 2     2 1 27 my ( $self ) = @_;
53              
54 2 50       10 local $\ = undef if (defined $\);
55 2         4 print { $self->{p_sock} } pack('i', -1);
  2         20  
56              
57 2         15 $self->{ended} = 1;
58             }
59              
60             sub enqueue {
61 26     26 1 4117 my $self = shift;
62 26 100       75 return MCE::Channel::_ended('enqueue') if $self->{ended};
63              
64 24 50       62 local $\ = undef if (defined $\);
65 24         38 my $p_mutex = $self->{p_mutex};
66 24 100       67 $p_mutex->lock2 if $p_mutex;
67              
68 24         59 while ( @_ ) {
69 44         106 my $data = ''.shift;
70 44         61 print { $self->{p_sock} } pack('i', length $data), $data;
  44         542  
71             }
72              
73 24 100       96 $p_mutex->unlock2 if $p_mutex;
74              
75 24         57 return 1;
76             }
77              
78             sub dequeue {
79 16     16 1 47 my ( $self, $count ) = @_;
80 16 100 66     58 $count = 1 if ( !$count || $count < 1 );
81              
82 16 100       48 if ( $count == 1 ) {
83 14         55 ( my $c_mutex = $self->{c_mutex} )->lock;
84 14         51 MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 );
85              
86 14         56 my $len = unpack('i', $plen);
87 14 50       34 if ( $len < 0 ) {
88 0         0 $self->end, $c_mutex->unlock;
89 0 0       0 return wantarray ? () : undef;
90             }
91              
92 14 100       51 MCE::Channel::_read( $self->{c_sock}, my($data), $len ) if $len;
93 14         47 $c_mutex->unlock;
94              
95 14 100       105 $len ? $data : '';
96             }
97             else {
98 2         6 my ( $plen, @ret );
99              
100 2         12 ( my $c_mutex = $self->{c_mutex} )->lock;
101              
102 2         25 while ( $count-- ) {
103 6         20 MCE::Util::_sysread( $self->{c_sock}, $plen, 4 );
104              
105 6         28 my $len = unpack('i', $plen);
106 6 50       19 if ( $len < 0 ) {
107 0         0 $self->end;
108 0         0 last;
109             }
110              
111 6 50       14 push(@ret, ''), next unless $len;
112 6         22 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
113 6         21 push @ret, $data;
114             }
115              
116 2         10 $c_mutex->unlock;
117              
118 2 50       20 wantarray ? @ret : $ret[-1];
119             }
120             }
121              
122             sub dequeue_nb {
123 20     20 1 73 my ( $self, $count ) = @_;
124 20 100 66     72 $count = 1 if ( !$count || $count < 1 );
125              
126 20         32 my ( $plen, @ret );
127 20         76 ( my $c_mutex = $self->{c_mutex} )->lock;
128              
129 20         48 while ( $count-- ) {
130 24         73 MCE::Util::_nonblocking( $self->{c_sock}, 1 );
131 24         69 MCE::Util::_sysread( $self->{c_sock}, $plen, 4 );
132 24         123 MCE::Util::_nonblocking( $self->{c_sock}, 0 );
133              
134 24 50       71 my $len; $len = unpack('i', $plen) if $plen;
  24         89  
135 24 100 66     95 if ( !$len || $len < 0 ) {
136 4 50 33     22 $self->end if defined $len && $len < 0;
137 4 50 33     28 push @ret, '' if defined $len && $len == 0;
138 4         9 last;
139             }
140              
141 20         70 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
142 20         68 push @ret, $data;
143             }
144              
145 20         69 $c_mutex->unlock;
146              
147 20 100       151 wantarray ? @ret : $ret[-1];
148             }
149              
150             ###############################################################################
151             ## ----------------------------------------------------------------------------
152             ## Methods for two-way communication; producer to consumer.
153             ##
154             ###############################################################################
155              
156             sub send {
157 38     38 1 6734 my $self = shift;
158 38 100       167 return MCE::Channel::_ended('send') if $self->{ended};
159              
160 36         93 my $data = ''.shift;
161              
162 36 50       106 local $\ = undef if (defined $\);
163 36         51 my $p_mutex = $self->{p_mutex};
164 36 100       88 $p_mutex->lock2 if $p_mutex;
165              
166 36         60 print { $self->{p_sock} } pack('i', length $data), $data;
  36         903  
167 36 100       169 $p_mutex->unlock2 if $p_mutex;
168              
169 36         104 return 1;
170             }
171              
172             sub recv {
173 9     9 1 135 my ( $self ) = @_;
174              
175 9         194 ( my $c_mutex = $self->{c_mutex} )->lock;
176 9         49 MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 );
177              
178 9         49 my $len = unpack('i', $plen);
179 9 50       59 if ( $len < 0 ) {
180 0         0 $self->end, $c_mutex->unlock;
181 0 0       0 return wantarray ? () : undef;
182             }
183              
184 9 100       35 MCE::Channel::_read( $self->{c_sock}, my($data), $len ) if $len;
185 9         68 $c_mutex->unlock;
186              
187 9 100       70 $len ? $data : '';
188             }
189              
190             sub recv_nb {
191 6     6 1 22 my ( $self ) = @_;
192              
193 6         25 ( my $c_mutex = $self->{c_mutex} )->lock;
194 6         27 MCE::Util::_nonblocking( $self->{c_sock}, 1 );
195 6         20 MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 );
196 6         24 MCE::Util::_nonblocking( $self->{c_sock}, 0 );
197              
198 6 50       9 my $len; $len = unpack('i', $plen) if $plen;
  6         25  
199 6 100 66     28 if ( !$len || $len < 0 ) {
200 4 50 33     23 $self->end if defined $len && $len < 0;
201 4         16 $c_mutex->unlock;
202 4 50 33     38 return '' if defined $len && $len == 0;
203 0 0       0 return wantarray ? () : undef;
204             }
205              
206 2         11 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
207 2         10 $c_mutex->unlock;
208              
209 2         13 $data;
210             }
211              
212             ###############################################################################
213             ## ----------------------------------------------------------------------------
214             ## Methods for two-way communication; consumer to producer.
215             ##
216             ###############################################################################
217              
218             sub send2 {
219 12     12 1 2697 my $self = shift;
220 12         30 my $data = ''.shift;
221              
222 12 50       39 local $\ = undef if (defined $\);
223             $self->{c_mutex}->synchronize2( sub {
224 12     12   17 print { $self->{c_sock} } pack('i', length $data), $data;
  12         187  
225 12         88 });
226              
227 12         59 return 1;
228             }
229              
230             sub recv2 {
231 6     6 1 26 my ( $self ) = @_;
232 6         12 my ( $plen, $data );
233              
234 6 50       27 local $/ = $LF if ( $/ ne $LF );
235 6         11 my $p_mutex = $self->{p_mutex};
236 6 100       21 $p_mutex->lock if $p_mutex;
237              
238             ( $p_mutex )
239             ? MCE::Util::_sysread( $self->{p_sock}, $plen, 4 )
240 6 100       61 : read( $self->{p_sock}, $plen, 4 );
241              
242 6         27 my $len = unpack('i', $plen);
243              
244 6 100       32 if ( $len ) {
245             ( $p_mutex )
246             ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
247 2 100       12 : read( $self->{p_sock}, $data, $len );
248             }
249              
250 6 100       21 $p_mutex->unlock if $p_mutex;
251              
252 6 100       40 $len ? $data : '';
253             }
254              
255             sub recv2_nb {
256 6     6 1 20 my ( $self ) = @_;
257 6         12 my ( $plen, $data );
258              
259 6 50       23 local $/ = $LF if ( $/ ne $LF );
260 6         13 my $p_mutex = $self->{p_mutex};
261 6 100       20 $p_mutex->lock if $p_mutex;
262              
263 6         19 MCE::Util::_nonblocking( $self->{p_sock}, 1 );
264              
265             ( $p_mutex )
266             ? MCE::Util::_sysread( $self->{p_sock}, $plen, 4 )
267 6 100       49 : read( $self->{p_sock}, $plen, 4 );
268              
269 6         27 MCE::Util::_nonblocking( $self->{p_sock}, 0 );
270              
271 6 50       9 my $len; $len = unpack('i', $plen) if $plen;
  6         26  
272 6 100       15 if ( !$len ) {
273 4 100       17 $p_mutex->unlock if $p_mutex;
274 4 50 33     38 return '' if defined $len && $len == 0;
275 0 0       0 return wantarray ? () : undef;
276             }
277              
278             ( $p_mutex )
279             ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
280 2 100       13 : read( $self->{p_sock}, $data, $len );
281              
282 2 100       10 $p_mutex->unlock if $p_mutex;
283              
284 2         11 $data;
285             }
286              
287             1;
288              
289             __END__