File Coverage

blib/lib/MCE/Channel/Mutex.pm
Criterion Covered Total %
statement 149 161 92.5
branch 72 100 72.0
condition 6 18 33.3
subroutine 17 17 100.0
pod 11 11 100.0
total 255 307 83.0


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Channel for producer(s) and many consumers supporting processes and threads.
4             ##
5             ###############################################################################
6              
7             package MCE::Channel::Mutex;
8              
9 14     14   2870 use strict;
  14         28  
  14         435  
10 14     14   81 use warnings;
  14         28  
  14         402  
11              
12 14     14   63 no warnings qw( uninitialized once );
  14         27  
  14         769  
13              
14             our $VERSION = '1.888';
15              
16 14     14   82 use base 'MCE::Channel';
  14         26  
  14         1351  
17 14     14   941 use MCE::Mutex ();
  14         30  
  14         26508  
18              
19             my $LF = "\012"; Internals::SvREADONLY($LF, 1);
20             my $freeze = MCE::Channel::_get_freeze();
21             my $thaw = MCE::Channel::_get_thaw();
22              
23             sub new {
24 14     14 1 59 my ( $class, %obj ) = ( @_, impl => 'Mutex' );
25              
26 14         56 $obj{init_pid} = MCE::Channel::_pid();
27 14         62 MCE::Util::_sock_pair( \%obj, 'p_sock', 'c_sock' );
28              
29             # locking for the consumer side of the channel
30 14         74 $obj{c_mutex} = MCE::Mutex->new( impl => 'Channel2' );
31              
32             # optionally, support many-producers writing and reading
33 14 100       64 $obj{p_mutex} = MCE::Mutex->new( impl => 'Channel2' ) if $obj{mp};
34              
35 14         48 bless \%obj, $class;
36              
37 14         71 MCE::Mutex::Channel::_save_for_global_cleanup($obj{c_mutex});
38 14 100       74 MCE::Mutex::Channel::_save_for_global_cleanup($obj{p_mutex}) if $obj{mp};
39              
40 14         68 return \%obj;
41             }
42              
43             END {
44 14 100   14   4306 MCE::Child->finish('MCE') if $INC{'MCE/Child.pm'};
45             }
46              
47             ###############################################################################
48             ## ----------------------------------------------------------------------------
49             ## Queue-like methods.
50             ##
51             ###############################################################################
52              
53             sub end {
54 2     2 1 26 my ( $self ) = @_;
55              
56 2 50       12 local $\ = undef if (defined $\);
57 2         6 print { $self->{p_sock} } pack('i', -1);
  2         21  
58              
59 2         12 $self->{ended} = 1;
60             }
61              
62             sub enqueue {
63 30     30 1 3864 my $self = shift;
64 30 100       86 return MCE::Channel::_ended('enqueue') if $self->{ended};
65              
66 28 50       69 local $\ = undef if (defined $\);
67 28         52 my $p_mutex = $self->{p_mutex};
68 28 100       78 $p_mutex->lock2 if $p_mutex;
69              
70 28         68 while ( @_ ) {
71 48         310 my $data = $freeze->([ shift ]);
72 48         90 print { $self->{p_sock} } pack('i', length $data), $data;
  48         660  
73             }
74              
75 28 100       108 $p_mutex->unlock2 if $p_mutex;
76              
77 28         64 return 1;
78             }
79              
80             sub dequeue {
81 18     18 1 61 my ( $self, $count ) = @_;
82 18 100 66     59 $count = 1 if ( !$count || $count < 1 );
83              
84 18 100       44 if ( $count == 1 ) {
85 16         59 ( my $c_mutex = $self->{c_mutex} )->lock;
86 16         78 MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 );
87              
88 16         71 my $len = unpack('i', $plen);
89 16 50       48 if ( $len < 0 ) {
90 0         0 $self->end, $c_mutex->unlock;
91 0 0       0 return wantarray ? () : undef;
92             }
93              
94 16         52 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
95 16         69 $c_mutex->unlock;
96              
97 16 50       322 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  0         0  
98             }
99             else {
100 2         11 my ( $plen, @ret );
101              
102 2         17 ( my $c_mutex = $self->{c_mutex} )->lock;
103              
104 2         20 while ( $count-- ) {
105 6         41 MCE::Util::_sysread( $self->{c_sock}, $plen, 4 );
106              
107 6         25 my $len = unpack('i', $plen);
108 6 50       17 if ( $len < 0 ) {
109 0         0 $self->end;
110 0         0 last;
111             }
112              
113 6         29 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
114 6         17 push @ret, @{ $thaw->($data) };
  6         118  
115             }
116              
117 2         51 $c_mutex->unlock;
118              
119 2 50       35 wantarray ? @ret : $ret[-1];
120             }
121             }
122              
123             sub dequeue_nb {
124 22     22 1 120 my ( $self, $count ) = @_;
125 22 100 66     127 $count = 1 if ( !$count || $count < 1 );
126              
127 22         34 my ( $plen, @ret );
128 22         105 ( my $c_mutex = $self->{c_mutex} )->lock;
129              
130 22         87 while ( $count-- ) {
131 26         85 MCE::Util::_nonblocking( $self->{c_sock}, 1 );
132 26         88 MCE::Util::_sysread( $self->{c_sock}, $plen, 4 );
133 26         94 MCE::Util::_nonblocking( $self->{c_sock}, 0 );
134              
135 26 50       32 my $len; $len = unpack('i', $plen) if $plen;
  26         108  
136 26 50 33     111 if ( !$len || $len < 0 ) {
137 0 0 0     0 $self->end if defined $len && $len < 0;
138 0         0 last;
139             }
140              
141 26         89 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
142 26         52 push @ret, @{ $thaw->($data) };
  26         314  
143             }
144              
145 22         93 $c_mutex->unlock;
146              
147 22 100       236 wantarray ? @ret : $ret[-1];
148             }
149              
150             ###############################################################################
151             ## ----------------------------------------------------------------------------
152             ## Methods for two-way communication; producer to consumer.
153             ##
154             ###############################################################################
155              
156             sub send {
157 34     34 1 1323 my $self = shift;
158 34 100       151 return MCE::Channel::_ended('send') if $self->{ended};
159              
160 32         460 my $data = $freeze->([ @_ ]);
161              
162 32 50       126 local $\ = undef if (defined $\);
163 32         58 my $p_mutex = $self->{p_mutex};
164 32 100       934 $p_mutex->lock2 if $p_mutex;
165              
166 32         77 print { $self->{p_sock} } pack('i', length $data), $data;
  32         608  
167 32 100       174 $p_mutex->unlock2 if $p_mutex;
168              
169 32         86 return 1;
170             }
171              
172             sub recv {
173 10     10 1 20 my ( $self ) = @_;
174              
175 10         50 ( my $c_mutex = $self->{c_mutex} )->lock;
176 10         32 MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 );
177              
178 10         56 my $len = unpack('i', $plen);
179 10 50       27 if ( $len < 0 ) {
180 0         0 $self->end, $c_mutex->unlock;
181 0 0       0 return wantarray ? () : undef;
182             }
183              
184 10         51 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
185 10         61 $c_mutex->unlock;
186              
187 10 100       290 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  2         46  
188             }
189              
190             sub recv_nb {
191 10     10 1 45 my ( $self ) = @_;
192              
193 10         39 ( my $c_mutex = $self->{c_mutex} )->lock;
194 10         51 MCE::Util::_nonblocking( $self->{c_sock}, 1 );
195 10         35 MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 );
196 10         41 MCE::Util::_nonblocking( $self->{c_sock}, 0 );
197              
198 10 50       15 my $len; $len = unpack('i', $plen) if $plen;
  10         48  
199 10 50 33     49 if ( !$len || $len < 0 ) {
200 0 0 0     0 $self->end if defined $len && $len < 0;
201 0         0 $c_mutex->unlock;
202 0 0       0 return wantarray ? () : undef;
203             }
204              
205 10         37 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
206 10         40 $c_mutex->unlock;
207              
208 10 100       151 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  2         42  
209             }
210              
211             ###############################################################################
212             ## ----------------------------------------------------------------------------
213             ## Methods for two-way communication; consumer to producer.
214             ##
215             ###############################################################################
216              
217             sub send2 {
218 40     40 1 2768 my $self = shift;
219 40         1103 my $data = $freeze->([ @_ ]);
220              
221 40 50       284 local $\ = undef if (defined $\);
222 40         384 local $MCE::Signal::SIG;
223              
224             {
225 40         235 local $MCE::Signal::IPC = 1;
  40         279  
226 40         659 ( my $c_mutex = $self->{c_mutex} )->lock2;
227              
228 40         235 print { $self->{c_sock} } pack('i', length $data), $data;
  40         1310  
229 40         591 $c_mutex->unlock2;
230             }
231              
232 40 50       177 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
233              
234 40         192 return 1;
235             }
236              
237             sub recv2 {
238 10     10 1 35 my ( $self ) = @_;
239 10         16 my ( $plen, $data );
240              
241 10 50       37 local $/ = $LF if ( $/ ne $LF );
242 10         19 my $p_mutex = $self->{p_mutex};
243 10 100       28 $p_mutex->lock if $p_mutex;
244              
245             ( $p_mutex )
246             ? MCE::Util::_sysread( $self->{p_sock}, $plen, 4 )
247 10 100       117 : read( $self->{p_sock}, $plen, 4 );
248              
249 10         45 my $len = unpack('i', $plen);
250              
251             ( $p_mutex )
252             ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
253 10 100       54 : read( $self->{p_sock}, $data, $len );
254              
255 10 100       36 $p_mutex->unlock if $p_mutex;
256              
257 10 100       152 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  2         33  
258             }
259              
260             sub recv2_nb {
261 484     484 1 1731 my ( $self ) = @_;
262 484         877 my ( $plen, $data );
263              
264 484 50       1804 local $/ = $LF if ( $/ ne $LF );
265 484         967 my $p_mutex = $self->{p_mutex};
266 484 100       921 $p_mutex->lock if $p_mutex;
267              
268 484         3205 MCE::Util::_nonblocking( $self->{p_sock}, 1 );
269              
270             ( $p_mutex )
271             ? MCE::Util::_sysread( $self->{p_sock}, $plen, 4 )
272 484 100       6833 : read( $self->{p_sock}, $plen, 4 );
273              
274 484         2186 MCE::Util::_nonblocking( $self->{p_sock}, 0 );
275              
276 484 100       851 my $len; $len = unpack('i', $plen) if $plen;
  484         1688  
277 484 100       1212 if ( !$len ) {
278 379 50       811 $p_mutex->unlock if $p_mutex;
279 379 50       2412 return wantarray ? () : undef;
280             }
281              
282             ( $p_mutex )
283             ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
284 105 100       780 : read( $self->{p_sock}, $data, $len );
285              
286 105 100       358 $p_mutex->unlock if $p_mutex;
287              
288 105 100       3161 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  2         44  
289             }
290              
291             1;
292              
293             __END__