File Coverage

blib/lib/MCE/Channel/MutexFast.pm
Criterion Covered Total %
statement 151 159 94.9
branch 82 110 74.5
condition 13 27 48.1
subroutine 17 17 100.0
pod 11 11 100.0
total 274 324 84.5


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Channel for producer(s) and many consumers supporting processes and threads.
4             ##
5             ###############################################################################
6              
7             package MCE::Channel::MutexFast;
8              
9 13     13   3325 use strict;
  13         26  
  13         351  
10 13     13   383 use warnings;
  13         46  
  13         424  
11              
12 13     13   51 no warnings qw( uninitialized once );
  13         38  
  13         605  
13              
14             our $VERSION = '1.887';
15              
16 13     13   63 use base 'MCE::Channel';
  13         26  
  13         1151  
17 13     13   718 use MCE::Mutex ();
  13         25  
  13         19187  
18              
19             my $LF = "\012"; Internals::SvREADONLY($LF, 1);
20              
21             sub new {
22 13     13 1 54 my ( $class, %obj ) = ( @_, impl => 'MutexFast' );
23              
24 13         40 $obj{init_pid} = MCE::Channel::_pid();
25 13         65 MCE::Util::_sock_pair( \%obj, 'p_sock', 'c_sock' );
26              
27             # locking for the consumer side of the channel
28 13         67 $obj{c_mutex} = MCE::Mutex->new( impl => 'Channel2' );
29              
30             # optionally, support many-producers writing and reading
31 13 100       42 $obj{p_mutex} = MCE::Mutex->new( impl => 'Channel2' ) if $obj{mp};
32              
33 13         29 bless \%obj, $class;
34              
35 13         62 MCE::Mutex::Channel::_save_for_global_cleanup($obj{c_mutex});
36 13 100       41 MCE::Mutex::Channel::_save_for_global_cleanup($obj{p_mutex}) if $obj{mp};
37              
38 13         51 return \%obj;
39             }
40              
41             END {
42 13 100   13   916 MCE::Child->finish('MCE') if $INC{'MCE/Child.pm'};
43             }
44              
45             ###############################################################################
46             ## ----------------------------------------------------------------------------
47             ## Queue-like methods.
48             ##
49             ###############################################################################
50              
51             sub end {
52 2     2 1 13 my ( $self ) = @_;
53              
54 2 50       9 local $\ = undef if (defined $\);
55 2         3 print { $self->{p_sock} } pack('i', -1);
  2         15  
56              
57 2         10 $self->{ended} = 1;
58             }
59              
60             sub enqueue {
61 26     26 1 3928 my $self = shift;
62 26 100       64 return MCE::Channel::_ended('enqueue') if $self->{ended};
63              
64 24 50       50 local $\ = undef if (defined $\);
65 24         31 my $p_mutex = $self->{p_mutex};
66 24 100       60 $p_mutex->lock2 if $p_mutex;
67              
68 24         43 while ( @_ ) {
69 44         87 my $data = ''.shift;
70 44         50 print { $self->{p_sock} } pack('i', length $data), $data;
  44         427  
71             }
72              
73 24 100       80 $p_mutex->unlock2 if $p_mutex;
74              
75 24         47 return 1;
76             }
77              
78             sub dequeue {
79 16     16 1 47 my ( $self, $count ) = @_;
80 16 100 66     48 $count = 1 if ( !$count || $count < 1 );
81              
82 16 100       30 if ( $count == 1 ) {
83 14         43 ( my $c_mutex = $self->{c_mutex} )->lock;
84 14         35 MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 );
85              
86 14         43 my $len = unpack('i', $plen);
87 14 50       27 if ( $len < 0 ) {
88 0         0 $self->end, $c_mutex->unlock;
89 0 0       0 return wantarray ? () : undef;
90             }
91              
92 14 100       45 MCE::Channel::_read( $self->{c_sock}, my($data), $len ) if $len;
93 14         41 $c_mutex->unlock;
94              
95 14 100       78 $len ? $data : '';
96             }
97             else {
98 2         5 my ( $plen, @ret );
99              
100 2         9 ( my $c_mutex = $self->{c_mutex} )->lock;
101              
102 2         8 while ( $count-- ) {
103 6         16 MCE::Util::_sysread( $self->{c_sock}, $plen, 4 );
104              
105 6         18 my $len = unpack('i', $plen);
106 6 50       14 if ( $len < 0 ) {
107 0         0 $self->end;
108 0         0 last;
109             }
110              
111 6 50       12 push(@ret, ''), next unless $len;
112 6         16 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
113 6         15 push @ret, $data;
114             }
115              
116 2         9 $c_mutex->unlock;
117              
118 2 50       15 wantarray ? @ret : $ret[-1];
119             }
120             }
121              
122             sub dequeue_nb {
123 20     20 1 54 my ( $self, $count ) = @_;
124 20 100 66     57 $count = 1 if ( !$count || $count < 1 );
125              
126 20         31 my ( $plen, @ret );
127 20         57 ( my $c_mutex = $self->{c_mutex} )->lock;
128              
129 20         44 while ( $count-- ) {
130 24         58 MCE::Util::_nonblocking( $self->{c_sock}, 1 );
131 24         55 MCE::Util::_sysread( $self->{c_sock}, $plen, 4 );
132 24         72 MCE::Util::_nonblocking( $self->{c_sock}, 0 );
133              
134 24 50       26 my $len; $len = unpack('i', $plen) if $plen;
  24         71  
135 24 100 66     76 if ( !$len || $len < 0 ) {
136 4 50 33     23 $self->end if defined $len && $len < 0;
137 4 50 33     18 push @ret, '' if defined $len && $len == 0;
138 4         8 last;
139             }
140              
141 20         57 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
142 20         54 push @ret, $data;
143             }
144              
145 20         57 $c_mutex->unlock;
146              
147 20 100       120 wantarray ? @ret : $ret[-1];
148             }
149              
150             ###############################################################################
151             ## ----------------------------------------------------------------------------
152             ## Methods for two-way communication; producer to consumer.
153             ##
154             ###############################################################################
155              
156             sub send {
157 38     38 1 6362 my $self = shift;
158 38 100       169 return MCE::Channel::_ended('send') if $self->{ended};
159              
160 36         80 my $data = ''.shift;
161              
162 36 50       91 local $\ = undef if (defined $\);
163 36         51 my $p_mutex = $self->{p_mutex};
164 36 100       83 $p_mutex->lock2 if $p_mutex;
165              
166 36         46 print { $self->{p_sock} } pack('i', length $data), $data;
  36         756  
167 36 100       139 $p_mutex->unlock2 if $p_mutex;
168              
169 36         154 return 1;
170             }
171              
172             sub recv {
173 9     9 1 129 my ( $self ) = @_;
174              
175 9         383 ( my $c_mutex = $self->{c_mutex} )->lock;
176 9         35 MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 );
177              
178 9         36 my $len = unpack('i', $plen);
179 9 50       43 if ( $len < 0 ) {
180 0         0 $self->end, $c_mutex->unlock;
181 0 0       0 return wantarray ? () : undef;
182             }
183              
184 9 100       38 MCE::Channel::_read( $self->{c_sock}, my($data), $len ) if $len;
185 9         86 $c_mutex->unlock;
186              
187 9 100       47 $len ? $data : '';
188             }
189              
190             sub recv_nb {
191 6     6 1 17 my ( $self ) = @_;
192              
193 6         22 ( my $c_mutex = $self->{c_mutex} )->lock;
194 6         19 MCE::Util::_nonblocking( $self->{c_sock}, 1 );
195 6         15 MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 );
196 6         20 MCE::Util::_nonblocking( $self->{c_sock}, 0 );
197              
198 6 50       7 my $len; $len = unpack('i', $plen) if $plen;
  6         21  
199 6 100 66     21 if ( !$len || $len < 0 ) {
200 4 50 33     21 $self->end if defined $len && $len < 0;
201 4         12 $c_mutex->unlock;
202 4 50 33     30 return '' if defined $len && $len == 0;
203 0 0       0 return wantarray ? () : undef;
204             }
205              
206 2         9 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
207 2         8 $c_mutex->unlock;
208              
209 2         9 $data;
210             }
211              
212             ###############################################################################
213             ## ----------------------------------------------------------------------------
214             ## Methods for two-way communication; consumer to producer.
215             ##
216             ###############################################################################
217              
218             sub send2 {
219 12     12 1 2519 my $self = shift;
220 12         23 my $data = ''.shift;
221              
222 12 50       33 local $\ = undef if (defined $\);
223 12         15 local $MCE::Signal::SIG;
224              
225             {
226 12         14 local $MCE::Signal::IPC = 1;
  12         16  
227 12         38 ( my $c_mutex = $self->{c_mutex} )->lock2;
228              
229 12         14 print { $self->{c_sock} } pack('i', length $data), $data;
  12         136  
230 12         45 $c_mutex->unlock2;
231             }
232              
233 12 50       39 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
234              
235 12         27 return 1;
236             }
237              
238             sub recv2 {
239 6     6 1 15 my ( $self ) = @_;
240 6         8 my ( $plen, $data );
241              
242 6 50       16 local $/ = $LF if ( $/ ne $LF );
243 6         12 my $p_mutex = $self->{p_mutex};
244 6 100       14 $p_mutex->lock if $p_mutex;
245              
246             ( $p_mutex )
247             ? MCE::Util::_sysread( $self->{p_sock}, $plen, 4 )
248 6 100       50 : read( $self->{p_sock}, $plen, 4 );
249              
250 6         20 my $len = unpack('i', $plen);
251              
252 6 100       16 if ( $len ) {
253             ( $p_mutex )
254             ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
255 2 100       17 : read( $self->{p_sock}, $data, $len );
256             }
257              
258 6 100       18 $p_mutex->unlock if $p_mutex;
259              
260 6 100       35 $len ? $data : '';
261             }
262              
263             sub recv2_nb {
264 6     6 1 17 my ( $self ) = @_;
265 6         9 my ( $plen, $data );
266              
267 6 50       18 local $/ = $LF if ( $/ ne $LF );
268 6         9 my $p_mutex = $self->{p_mutex};
269 6 100       16 $p_mutex->lock if $p_mutex;
270              
271 6         18 MCE::Util::_nonblocking( $self->{p_sock}, 1 );
272              
273             ( $p_mutex )
274             ? MCE::Util::_sysread( $self->{p_sock}, $plen, 4 )
275 6 100       40 : read( $self->{p_sock}, $plen, 4 );
276              
277 6         19 MCE::Util::_nonblocking( $self->{p_sock}, 0 );
278              
279 6 50       8 my $len; $len = unpack('i', $plen) if $plen;
  6         20  
280 6 100       16 if ( !$len ) {
281 4 100       14 $p_mutex->unlock if $p_mutex;
282 4 50 33     31 return '' if defined $len && $len == 0;
283 0 0       0 return wantarray ? () : undef;
284             }
285              
286             ( $p_mutex )
287             ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
288 2 100       8 : read( $self->{p_sock}, $data, $len );
289              
290 2 100       8 $p_mutex->unlock if $p_mutex;
291              
292 2         8 $data;
293             }
294              
295             1;
296              
297             __END__
298              
299             ###############################################################################
300             ## ----------------------------------------------------------------------------
301             ## Module usage.
302             ##
303             ###############################################################################
304              
305             =head1 NAME
306              
307             MCE::Channel::MutexFast - Fast channel for producer(s) and many consumers
308              
309             =head1 VERSION
310              
311             This document describes MCE::Channel::MutexFast version 1.887
312              
313             =head1 DESCRIPTION
314              
315             A channel class providing queue-like and two-way communication
316             for processes and threads. Locking is handled using MCE::Mutex.
317              
318             This is similar to L<MCE::Channel::Mutex> but optimized for
319             non-Unicode strings only. The main difference is that this module
320             lacks freeze-thaw serialization. Non-string arguments become
321             stringified; i.e. numbers and undef.
322              
323             The API is described in L<MCE::Channel> with the sole difference
324             being C<send> and C<send2> handle one argument.
325              
326             Current module available since MCE 1.877.
327              
328             =over 3
329              
330             =item new
331              
332             use MCE::Channel;
333              
334             # The default is tuned for one producer and many consumers.
335             my $chnl_a = MCE::Channel->new( impl => 'MutexFast' );
336              
337             # Specify the 'mp' option for safe use by two or more producers
338             # sending or recieving on the left side of the channel (i.e.
339             # ->enqueue/->send or ->recv2/->recv2_nb).
340              
341             my $chnl_b = MCE::Channel->new( impl => 'MutexFast', mp => 1 );
342              
343             =back
344              
345             =head1 QUEUE-LIKE BEHAVIOR
346              
347             =over 3
348              
349             =item enqueue
350              
351             =item dequeue
352              
353             =item dequeue_nb
354              
355             =item end
356              
357             =back
358              
359             =head1 TWO-WAY IPC - PRODUCER TO CONSUMER
360              
361             =over 3
362              
363             =item send
364              
365             =item recv
366              
367             =item recv_nb
368              
369             =back
370              
371             =head1 TWO-WAY IPC - CONSUMER TO PRODUCER
372              
373             =over 3
374              
375             =item send2
376              
377             =item recv2
378              
379             =item recv2_nb
380              
381             =back
382              
383             =head1 AUTHOR
384              
385             Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
386              
387             =cut
388