File Coverage

blib/lib/MCE/Channel/Mutex.pm
Criterion Covered Total %
statement 144 156 92.3
branch 71 98 72.4
condition 6 18 33.3
subroutine 18 18 100.0
pod 11 11 100.0
total 250 301 83.0


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Channel for producer(s) and many consumers supporting processes and threads.
4             ##
5             ###############################################################################
6              
7             package MCE::Channel::Mutex;
8              
9 14     14   3397 use strict;
  14         37  
  14         433  
10 14     14   68 use warnings;
  14         73  
  14         408  
11              
12 14     14   70 no warnings qw( uninitialized once );
  14         30  
  14         787  
13              
14             our $VERSION = '1.889';
15              
16 14     14   90 use base 'MCE::Channel';
  14         51  
  14         1468  
17 14     14   916 use MCE::Mutex ();
  14         39  
  14         25636  
18              
19             my $LF = "\012"; Internals::SvREADONLY($LF, 1);
20             my $freeze = MCE::Channel::_get_freeze();
21             my $thaw = MCE::Channel::_get_thaw();
22              
23             sub new {
24 14     14 1 60 my ( $class, %obj ) = ( @_, impl => 'Mutex' );
25              
26 14         44 $obj{init_pid} = MCE::Channel::_pid();
27 14         61 MCE::Util::_sock_pair( \%obj, 'p_sock', 'c_sock' );
28              
29             # locking for the consumer side of the channel
30 14         85 $obj{c_mutex} = MCE::Mutex->new( impl => 'Channel2' );
31              
32             # optionally, support many-producers writing and reading
33 14 100       57 $obj{p_mutex} = MCE::Mutex->new( impl => 'Channel2' ) if $obj{mp};
34              
35 14         43 bless \%obj, $class;
36              
37 14         67 MCE::Mutex::Channel::_save_for_global_cleanup($obj{c_mutex});
38 14 100       56 MCE::Mutex::Channel::_save_for_global_cleanup($obj{p_mutex}) if $obj{mp};
39              
40 14         78 return \%obj;
41             }
42              
43             END {
44 14 100   14   3049 MCE::Child->finish('MCE') if $INC{'MCE/Child.pm'};
45             }
46              
47             ###############################################################################
48             ## ----------------------------------------------------------------------------
49             ## Queue-like methods.
50             ##
51             ###############################################################################
52              
53             sub end {
54 2     2 1 14 my ( $self ) = @_;
55              
56 2 50       10 local $\ = undef if (defined $\);
57 2         4 print { $self->{p_sock} } pack('i', -1);
  2         18  
58              
59 2         13 $self->{ended} = 1;
60             }
61              
62             sub enqueue {
63 30     30 1 4191 my $self = shift;
64 30 100       91 return MCE::Channel::_ended('enqueue') if $self->{ended};
65              
66 28 50       95 local $\ = undef if (defined $\);
67 28         43 my $p_mutex = $self->{p_mutex};
68 28 100       78 $p_mutex->lock2 if $p_mutex;
69              
70 28         60 while ( @_ ) {
71 48         335 my $data = $freeze->([ shift ]);
72 48         89 print { $self->{p_sock} } pack('i', length $data), $data;
  48         621  
73             }
74              
75 28 100       143 $p_mutex->unlock2 if $p_mutex;
76              
77 28         65 return 1;
78             }
79              
80             sub dequeue {
81 18     18 1 51 my ( $self, $count ) = @_;
82 18 100 66     61 $count = 1 if ( !$count || $count < 1 );
83              
84 18 100       43 if ( $count == 1 ) {
85 16         65 ( my $c_mutex = $self->{c_mutex} )->lock;
86 16         47 MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 );
87              
88 16         62 my $len = unpack('i', $plen);
89 16 50       44 if ( $len < 0 ) {
90 0         0 $self->end, $c_mutex->unlock;
91 0 0       0 return wantarray ? () : undef;
92             }
93              
94 16         50 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
95 16         56 $c_mutex->unlock;
96              
97 16 50       296 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  0         0  
98             }
99             else {
100 2         7 my ( $plen, @ret );
101              
102 2         21 ( my $c_mutex = $self->{c_mutex} )->lock;
103              
104 2         10 while ( $count-- ) {
105 6         28 MCE::Util::_sysread( $self->{c_sock}, $plen, 4 );
106              
107 6         25 my $len = unpack('i', $plen);
108 6 50       16 if ( $len < 0 ) {
109 0         0 $self->end;
110 0         0 last;
111             }
112              
113 6         22 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
114 6         12 push @ret, @{ $thaw->($data) };
  6         49  
115             }
116              
117 2         72 $c_mutex->unlock;
118              
119 2 50       57 wantarray ? @ret : $ret[-1];
120             }
121             }
122              
123             sub dequeue_nb {
124 22     22 1 68 my ( $self, $count ) = @_;
125 22 100 66     77 $count = 1 if ( !$count || $count < 1 );
126              
127 22         31 my ( $plen, @ret );
128 22         87 ( my $c_mutex = $self->{c_mutex} )->lock;
129              
130 22         53 while ( $count-- ) {
131 26         79 MCE::Util::_nonblocking( $self->{c_sock}, 1 );
132 26         82 MCE::Util::_sysread( $self->{c_sock}, $plen, 4 );
133 26         94 MCE::Util::_nonblocking( $self->{c_sock}, 0 );
134              
135 26 50       33 my $len; $len = unpack('i', $plen) if $plen;
  26         96  
136 26 50 33     109 if ( !$len || $len < 0 ) {
137 0 0 0     0 $self->end if defined $len && $len < 0;
138 0         0 last;
139             }
140              
141 26         88 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
142 26         42 push @ret, @{ $thaw->($data) };
  26         302  
143             }
144              
145 22         86 $c_mutex->unlock;
146              
147 22 100       167 wantarray ? @ret : $ret[-1];
148             }
149              
150             ###############################################################################
151             ## ----------------------------------------------------------------------------
152             ## Methods for two-way communication; producer to consumer.
153             ##
154             ###############################################################################
155              
156             sub send {
157 34     34 1 1452 my $self = shift;
158 34 100       196 return MCE::Channel::_ended('send') if $self->{ended};
159              
160 32         411 my $data = $freeze->([ @_ ]);
161              
162 32 50       121 local $\ = undef if (defined $\);
163 32         63 my $p_mutex = $self->{p_mutex};
164 32 100       866 $p_mutex->lock2 if $p_mutex;
165              
166 32         75 print { $self->{p_sock} } pack('i', length $data), $data;
  32         587  
167 32 100       172 $p_mutex->unlock2 if $p_mutex;
168              
169 32         89 return 1;
170             }
171              
172             sub recv {
173 10     10 1 21 my ( $self ) = @_;
174              
175 10         45 ( my $c_mutex = $self->{c_mutex} )->lock;
176 10         40 MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 );
177              
178 10         43 my $len = unpack('i', $plen);
179 10 50       29 if ( $len < 0 ) {
180 0         0 $self->end, $c_mutex->unlock;
181 0 0       0 return wantarray ? () : undef;
182             }
183              
184 10         33 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
185 10         40 $c_mutex->unlock;
186              
187 10 100       169 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  2         33  
188             }
189              
190             sub recv_nb {
191 10     10 1 30 my ( $self ) = @_;
192              
193 10         37 ( my $c_mutex = $self->{c_mutex} )->lock;
194 10         35 MCE::Util::_nonblocking( $self->{c_sock}, 1 );
195 10         30 MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 );
196 10         38 MCE::Util::_nonblocking( $self->{c_sock}, 0 );
197              
198 10 50       14 my $len; $len = unpack('i', $plen) if $plen;
  10         40  
199 10 50 33     47 if ( !$len || $len < 0 ) {
200 0 0 0     0 $self->end if defined $len && $len < 0;
201 0         0 $c_mutex->unlock;
202 0 0       0 return wantarray ? () : undef;
203             }
204              
205 10         35 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
206 10         38 $c_mutex->unlock;
207              
208 10 100       174 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  2         50  
209             }
210              
211             ###############################################################################
212             ## ----------------------------------------------------------------------------
213             ## Methods for two-way communication; consumer to producer.
214             ##
215             ###############################################################################
216              
217             sub send2 {
218 40     40 1 2931 my $self = shift;
219 40         765 my $data = $freeze->([ @_ ]);
220              
221 40 50       246 local $\ = undef if (defined $\);
222             $self->{c_mutex}->synchronize2( sub {
223 40     40   70 print { $self->{c_sock} } pack('i', length $data), $data;
  40         1183  
224 40         1032 });
225              
226 40         399 return 1;
227             }
228              
229             sub recv2 {
230 10     10 1 28 my ( $self ) = @_;
231 10         17 my ( $plen, $data );
232              
233 10 50       31 local $/ = $LF if ( $/ ne $LF );
234 10         19 my $p_mutex = $self->{p_mutex};
235 10 100       29 $p_mutex->lock if $p_mutex;
236              
237             ( $p_mutex )
238             ? MCE::Util::_sysread( $self->{p_sock}, $plen, 4 )
239 10 100       84 : read( $self->{p_sock}, $plen, 4 );
240              
241 10         41 my $len = unpack('i', $plen);
242              
243             ( $p_mutex )
244             ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
245 10 100       42 : read( $self->{p_sock}, $data, $len );
246              
247 10 100       50 $p_mutex->unlock if $p_mutex;
248              
249 10 100       153 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  2         37  
250             }
251              
252             sub recv2_nb {
253 491     491 1 1337 my ( $self ) = @_;
254 491         916 my ( $plen, $data );
255              
256 491 50       1800 local $/ = $LF if ( $/ ne $LF );
257 491         938 my $p_mutex = $self->{p_mutex};
258 491 100       1135 $p_mutex->lock if $p_mutex;
259              
260 491         2978 MCE::Util::_nonblocking( $self->{p_sock}, 1 );
261              
262             ( $p_mutex )
263             ? MCE::Util::_sysread( $self->{p_sock}, $plen, 4 )
264 491 100       6354 : read( $self->{p_sock}, $plen, 4 );
265              
266 491         2103 MCE::Util::_nonblocking( $self->{p_sock}, 0 );
267              
268 491 100       802 my $len; $len = unpack('i', $plen) if $plen;
  491         1655  
269 491 100       1147 if ( !$len ) {
270 385 50       839 $p_mutex->unlock if $p_mutex;
271 385 50       2727 return wantarray ? () : undef;
272             }
273              
274             ( $p_mutex )
275             ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
276 106 100       840 : read( $self->{p_sock}, $data, $len );
277              
278 106 100       357 $p_mutex->unlock if $p_mutex;
279              
280 106 100       2955 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  2         34  
281             }
282              
283             1;
284              
285             __END__