File Coverage

blib/lib/MCE/Channel/Mutex.pm
Criterion Covered Total %
statement 149 161 92.5
branch 72 100 72.0
condition 6 18 33.3
subroutine 17 17 100.0
pod 11 11 100.0
total 255 307 83.0


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Channel for producer(s) and many consumers supporting processes and threads.
4             ##
5             ###############################################################################
6              
7             package MCE::Channel::Mutex;
8              
9 14     14   3414 use strict;
  14         27  
  14         367  
10 14     14   57 use warnings;
  14         27  
  14         335  
11              
12 14     14   55 no warnings qw( uninitialized once );
  14         28  
  14         668  
13              
14             our $VERSION = '1.887';
15              
16 14     14   82 use base 'MCE::Channel';
  14         27  
  14         1187  
17 14     14   806 use MCE::Mutex ();
  14         27  
  14         20650  
18              
19             my $LF = "\012"; Internals::SvREADONLY($LF, 1);
20             my $freeze = MCE::Channel::_get_freeze();
21             my $thaw = MCE::Channel::_get_thaw();
22              
23             sub new {
24 14     14 1 56 my ( $class, %obj ) = ( @_, impl => 'Mutex' );
25              
26 14         44 $obj{init_pid} = MCE::Channel::_pid();
27 14         58 MCE::Util::_sock_pair( \%obj, 'p_sock', 'c_sock' );
28              
29             # locking for the consumer side of the channel
30 14         72 $obj{c_mutex} = MCE::Mutex->new( impl => 'Channel2' );
31              
32             # optionally, support many-producers writing and reading
33 14 100       57 $obj{p_mutex} = MCE::Mutex->new( impl => 'Channel2' ) if $obj{mp};
34              
35 14         40 bless \%obj, $class;
36              
37 14         45 MCE::Mutex::Channel::_save_for_global_cleanup($obj{c_mutex});
38 14 100       42 MCE::Mutex::Channel::_save_for_global_cleanup($obj{p_mutex}) if $obj{mp};
39              
40 14         55 return \%obj;
41             }
42              
43             END {
44 14 100   14   4195 MCE::Child->finish('MCE') if $INC{'MCE/Child.pm'};
45             }
46              
47             ###############################################################################
48             ## ----------------------------------------------------------------------------
49             ## Queue-like methods.
50             ##
51             ###############################################################################
52              
53             sub end {
54 2     2 1 12 my ( $self ) = @_;
55              
56 2 50       8 local $\ = undef if (defined $\);
57 2         4 print { $self->{p_sock} } pack('i', -1);
  2         15  
58              
59 2         9 $self->{ended} = 1;
60             }
61              
62             sub enqueue {
63 30     30 1 3849 my $self = shift;
64 30 100       72 return MCE::Channel::_ended('enqueue') if $self->{ended};
65              
66 28 50       60 local $\ = undef if (defined $\);
67 28         40 my $p_mutex = $self->{p_mutex};
68 28 100       67 $p_mutex->lock2 if $p_mutex;
69              
70 28         52 while ( @_ ) {
71 48         268 my $data = $freeze->([ shift ]);
72 48         68 print { $self->{p_sock} } pack('i', length $data), $data;
  48         497  
73             }
74              
75 28 100       91 $p_mutex->unlock2 if $p_mutex;
76              
77 28         54 return 1;
78             }
79              
80             sub dequeue {
81 18     18 1 38 my ( $self, $count ) = @_;
82 18 100 66     50 $count = 1 if ( !$count || $count < 1 );
83              
84 18 100       32 if ( $count == 1 ) {
85 16         62 ( my $c_mutex = $self->{c_mutex} )->lock;
86 16         39 MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 );
87              
88 16         46 my $len = unpack('i', $plen);
89 16 50       35 if ( $len < 0 ) {
90 0         0 $self->end, $c_mutex->unlock;
91 0 0       0 return wantarray ? () : undef;
92             }
93              
94 16         42 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
95 16         41 $c_mutex->unlock;
96              
97 16 50       228 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  0         0  
98             }
99             else {
100 2         4 my ( $plen, @ret );
101              
102 2         10 ( my $c_mutex = $self->{c_mutex} )->lock;
103              
104 2         7 while ( $count-- ) {
105 6         21 MCE::Util::_sysread( $self->{c_sock}, $plen, 4 );
106              
107 6         18 my $len = unpack('i', $plen);
108 6 50       15 if ( $len < 0 ) {
109 0         0 $self->end;
110 0         0 last;
111             }
112              
113 6         17 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
114 6         9 push @ret, @{ $thaw->($data) };
  6         93  
115             }
116              
117 2         35 $c_mutex->unlock;
118              
119 2 50       16 wantarray ? @ret : $ret[-1];
120             }
121             }
122              
123             sub dequeue_nb {
124 22     22 1 104 my ( $self, $count ) = @_;
125 22 100 66     88 $count = 1 if ( !$count || $count < 1 );
126              
127 22         32 my ( $plen, @ret );
128 22         69 ( my $c_mutex = $self->{c_mutex} )->lock;
129              
130 22         46 while ( $count-- ) {
131 26         68 MCE::Util::_nonblocking( $self->{c_sock}, 1 );
132 26         60 MCE::Util::_sysread( $self->{c_sock}, $plen, 4 );
133 26         75 MCE::Util::_nonblocking( $self->{c_sock}, 0 );
134              
135 26 50       33 my $len; $len = unpack('i', $plen) if $plen;
  26         73  
136 26 50 33     94 if ( !$len || $len < 0 ) {
137 0 0 0     0 $self->end if defined $len && $len < 0;
138 0         0 last;
139             }
140              
141 26         66 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
142 26         33 push @ret, @{ $thaw->($data) };
  26         299  
143             }
144              
145 22         72 $c_mutex->unlock;
146              
147 22 100       127 wantarray ? @ret : $ret[-1];
148             }
149              
150             ###############################################################################
151             ## ----------------------------------------------------------------------------
152             ## Methods for two-way communication; producer to consumer.
153             ##
154             ###############################################################################
155              
156             sub send {
157 34     34 1 1329 my $self = shift;
158 34 100       143 return MCE::Channel::_ended('send') if $self->{ended};
159              
160 32         303 my $data = $freeze->([ @_ ]);
161              
162 32 50       106 local $\ = undef if (defined $\);
163 32         41 my $p_mutex = $self->{p_mutex};
164 32 100       875 $p_mutex->lock2 if $p_mutex;
165              
166 32         56 print { $self->{p_sock} } pack('i', length $data), $data;
  32         541  
167 32 100       194 $p_mutex->unlock2 if $p_mutex;
168              
169 32         70 return 1;
170             }
171              
172             sub recv {
173 10     10 1 17 my ( $self ) = @_;
174              
175 10         37 ( my $c_mutex = $self->{c_mutex} )->lock;
176 10         25 MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 );
177              
178 10         56 my $len = unpack('i', $plen);
179 10 50       22 if ( $len < 0 ) {
180 0         0 $self->end, $c_mutex->unlock;
181 0 0       0 return wantarray ? () : undef;
182             }
183              
184 10         33 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
185 10         31 $c_mutex->unlock;
186              
187 10 100       124 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  2         36  
188             }
189              
190             sub recv_nb {
191 10     10 1 22 my ( $self ) = @_;
192              
193 10         33 ( my $c_mutex = $self->{c_mutex} )->lock;
194 10         28 MCE::Util::_nonblocking( $self->{c_sock}, 1 );
195 10         27 MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 );
196 10         33 MCE::Util::_nonblocking( $self->{c_sock}, 0 );
197              
198 10 50       11 my $len; $len = unpack('i', $plen) if $plen;
  10         34  
199 10 50 33     38 if ( !$len || $len < 0 ) {
200 0 0 0     0 $self->end if defined $len && $len < 0;
201 0         0 $c_mutex->unlock;
202 0 0       0 return wantarray ? () : undef;
203             }
204              
205 10         31 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
206 10         30 $c_mutex->unlock;
207              
208 10 100       143 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  2         26  
209             }
210              
211             ###############################################################################
212             ## ----------------------------------------------------------------------------
213             ## Methods for two-way communication; consumer to producer.
214             ##
215             ###############################################################################
216              
217             sub send2 {
218 40     40 1 2839 my $self = shift;
219 40         630 my $data = $freeze->([ @_ ]);
220              
221 40 50       358 local $\ = undef if (defined $\);
222 40         242 local $MCE::Signal::SIG;
223              
224             {
225 40         61 local $MCE::Signal::IPC = 1;
  40         321  
226 40         612 ( my $c_mutex = $self->{c_mutex} )->lock2;
227              
228 40         77 print { $self->{c_sock} } pack('i', length $data), $data;
  40         778  
229 40         555 $c_mutex->unlock2;
230             }
231              
232 40 50       124 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
233              
234 40         107 return 1;
235             }
236              
237             sub recv2 {
238 10     10 1 34 my ( $self ) = @_;
239 10         12 my ( $plen, $data );
240              
241 10 50       26 local $/ = $LF if ( $/ ne $LF );
242 10         14 my $p_mutex = $self->{p_mutex};
243 10 100       35 $p_mutex->lock if $p_mutex;
244              
245             ( $p_mutex )
246             ? MCE::Util::_sysread( $self->{p_sock}, $plen, 4 )
247 10 100       69 : read( $self->{p_sock}, $plen, 4 );
248              
249 10         32 my $len = unpack('i', $plen);
250              
251             ( $p_mutex )
252             ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
253 10 100       32 : read( $self->{p_sock}, $data, $len );
254              
255 10 100       27 $p_mutex->unlock if $p_mutex;
256              
257 10 100       129 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  2         28  
258             }
259              
260             sub recv2_nb {
261 446     446 1 1338 my ( $self ) = @_;
262 446         699 my ( $plen, $data );
263              
264 446 50       2479 local $/ = $LF if ( $/ ne $LF );
265 446         901 my $p_mutex = $self->{p_mutex};
266 446 100       857 $p_mutex->lock if $p_mutex;
267              
268 446         2770 MCE::Util::_nonblocking( $self->{p_sock}, 1 );
269              
270             ( $p_mutex )
271             ? MCE::Util::_sysread( $self->{p_sock}, $plen, 4 )
272 446 100       12882 : read( $self->{p_sock}, $plen, 4 );
273              
274 446         1640 MCE::Util::_nonblocking( $self->{p_sock}, 0 );
275              
276 446 100       679 my $len; $len = unpack('i', $plen) if $plen;
  446         1270  
277 446 100       950 if ( !$len ) {
278 340 50       512 $p_mutex->unlock if $p_mutex;
279 340 50       1930 return wantarray ? () : undef;
280             }
281              
282             ( $p_mutex )
283             ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
284 106 100       617 : read( $self->{p_sock}, $data, $len );
285              
286 106 100       332 $p_mutex->unlock if $p_mutex;
287              
288 106 100       2653 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  2         27  
289             }
290              
291             1;
292              
293             __END__
294              
295             ###############################################################################
296             ## ----------------------------------------------------------------------------
297             ## Module usage.
298             ##
299             ###############################################################################
300              
301             =head1 NAME
302              
303             MCE::Channel::Mutex - Channel for producer(s) and many consumers
304              
305             =head1 VERSION
306              
307             This document describes MCE::Channel::Mutex version 1.887
308              
309             =head1 DESCRIPTION
310              
311             A channel class providing queue-like and two-way communication
312             for processes and threads. Locking is handled using MCE::Mutex.
313              
314             The API is described in L<MCE::Channel>.
315              
316             =over 3
317              
318             =item new
319              
320             use MCE::Channel;
321              
322             # The default is tuned for one producer and many consumers.
323             my $chnl_a = MCE::Channel->new( impl => 'Mutex' );
324              
325             # Specify the 'mp' option for safe use by two or more producers
326             # sending or recieving on the left side of the channel (i.e.
327             # ->enqueue/->send or ->recv2/->recv2_nb).
328              
329             my $chnl_b = MCE::Channel->new( impl => 'Mutex', mp => 1 );
330              
331             =back
332              
333             =head1 QUEUE-LIKE BEHAVIOR
334              
335             =over 3
336              
337             =item enqueue
338              
339             =item dequeue
340              
341             =item dequeue_nb
342              
343             =item end
344              
345             =back
346              
347             =head1 TWO-WAY IPC - PRODUCER TO CONSUMER
348              
349             =over 3
350              
351             =item send
352              
353             =item recv
354              
355             =item recv_nb
356              
357             =back
358              
359             =head1 TWO-WAY IPC - CONSUMER TO PRODUCER
360              
361             =over 3
362              
363             =item send2
364              
365             =item recv2
366              
367             =item recv2_nb
368              
369             =back
370              
371             =head1 AUTHOR
372              
373             Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
374              
375             =cut
376