File Coverage

blib/lib/MCE/Channel/Simple.pm
Criterion Covered Total %
statement 131 142 92.2
branch 64 120 53.3
condition 6 18 33.3
subroutine 15 15 100.0
pod 11 11 100.0
total 227 306 74.1


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Channel tuned for one producer and one consumer involving no locking.
4             ##
5             ###############################################################################
6              
7             package MCE::Channel::Simple;
8              
9 1     1   2044 use strict;
  1         2  
  1         30  
10 1     1   4 use warnings;
  1         2  
  1         27  
11              
12 1     1   4 no warnings qw( uninitialized once );
  1         1  
  1         58  
13              
14             our $VERSION = '1.887';
15              
16 1     1   6 use base 'MCE::Channel';
  1         1  
  1         1599  
17              
18             my $LF = "\012"; Internals::SvREADONLY($LF, 1);
19             my $is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0;
20             my $freeze = MCE::Channel::_get_freeze();
21             my $thaw = MCE::Channel::_get_thaw();
22              
23             sub new {
24 1     1 1 5 my ( $class, %obj ) = ( @_, impl => 'Simple' );
25              
26 1         4 $obj{init_pid} = MCE::Channel::_pid();
27 1         4 MCE::Util::_sock_pair( \%obj, 'p_sock', 'c_sock' );
28              
29 1         5 return bless \%obj, $class;
30             }
31              
32             ###############################################################################
33             ## ----------------------------------------------------------------------------
34             ## Queue-like methods.
35             ##
36             ###############################################################################
37              
38             sub end {
39 1     1 1 7 my ( $self ) = @_;
40              
41 1 50       3 local $\ = undef if (defined $\);
42 1 50       4 MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32;
43 1         2 print { $self->{p_sock} } pack('i', -1);
  1         8  
44              
45 1         4 $self->{ended} = 1;
46             }
47              
48             sub enqueue {
49 15     15 1 1962 my $self = shift;
50 15 100       35 return MCE::Channel::_ended('enqueue') if $self->{ended};
51              
52 14 50       31 local $\ = undef if (defined $\);
53 14 50       23 MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32;
54              
55 14         25 while ( @_ ) {
56 24         139 my $data = $freeze->([ shift ]);
57 24         41 print { $self->{p_sock} } pack('i', length $data) . $data;
  24         269  
58             }
59              
60 14         60 return 1;
61             }
62              
63             sub dequeue {
64 9     9 1 21 my ( $self, $count ) = @_;
65 9 100 66     24 $count = 1 if ( !$count || $count < 1 );
66              
67 9 50       23 local $/ = $LF if ( $/ ne $LF );
68              
69 9 100       16 if ( $count == 1 ) {
70 8         11 my ( $plen, $data );
71 8 50       14 MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32;
72              
73             $is_MSWin32
74             ? sysread( $self->{c_sock}, $plen, 4 )
75 8 50       54 : read( $self->{c_sock}, $plen, 4 );
76              
77 8         25 my $len = unpack('i', $plen);
78 8 50       16 if ( $len < 0 ) {
79 0         0 $self->end;
80 0 0       0 return wantarray ? () : undef;
81             }
82              
83             $is_MSWin32
84             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
85 8 50       20 : read( $self->{c_sock}, $data, $len );
86              
87 8 50       109 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  0         0  
88             }
89             else {
90 1         3 my ( $plen, @ret );
91 1 50       3 MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32;
92              
93 1         3 while ( $count-- ) {
94 3         5 my $data;
95              
96             $is_MSWin32
97             ? sysread( $self->{c_sock}, $plen, 4 )
98 3 50       15 : read( $self->{c_sock}, $plen, 4 );
99              
100 3         7 my $len = unpack('i', $plen);
101 3 50       7 if ( $len < 0 ) {
102 0         0 $self->end;
103 0         0 last;
104             }
105              
106             $is_MSWin32
107             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
108 3 50       7 : read( $self->{c_sock}, $data, $len );
109              
110 3         6 push @ret, @{ $thaw->($data) };
  3         17  
111             }
112              
113 1 50       7 wantarray ? @ret : $ret[-1];
114             }
115             }
116              
117             sub dequeue_nb {
118 11     11 1 33 my ( $self, $count ) = @_;
119 11 100 66     28 $count = 1 if ( !$count || $count < 1 );
120              
121 11         16 my ( $plen, @ret );
122 11 50       28 local $/ = $LF if ( $/ ne $LF );
123              
124 11         18 while ( $count-- ) {
125 13         15 my $data;
126 13         36 MCE::Util::_nonblocking( $self->{c_sock}, 1 );
127              
128             $is_MSWin32
129             ? sysread( $self->{c_sock}, $plen, 4 )
130 13 50       87 : read( $self->{c_sock}, $plen, 4 );
131              
132 13         36 MCE::Util::_nonblocking( $self->{c_sock}, 0 );
133              
134 13 50       17 my $len; $len = unpack('i', $plen) if $plen;
  13         43  
135 13 50 33     46 if ( !$len || $len < 0 ) {
136 0 0 0     0 $self->end if defined $len && $len < 0;
137 0         0 last;
138             }
139              
140             $is_MSWin32
141             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
142 13 50       34 : read( $self->{c_sock}, $data, $len );
143              
144 13         27 push @ret, @{ $thaw->($data) };
  13         138  
145             }
146              
147 11 100       60 wantarray ? @ret : $ret[-1];
148             }
149              
150             ###############################################################################
151             ## ----------------------------------------------------------------------------
152             ## Methods for two-way communication; producer(s) to consumers.
153             ##
154             ###############################################################################
155              
156             sub send {
157 11     11 1 659 my $self = shift;
158 11 100       32 return MCE::Channel::_ended('send') if $self->{ended};
159              
160 10         88 my $data = $freeze->([ @_ ]);
161              
162 10 50       27 local $\ = undef if (defined $\);
163 10 50       19 MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32;
164 10         25 print { $self->{p_sock} } pack('i', length $data) . $data;
  10         153  
165              
166 10         32 return 1;
167             }
168              
169             sub recv {
170 5     5 1 9 my ( $self ) = @_;
171 5         6 my ( $plen, $data );
172              
173 5 50       16 local $/ = $LF if ( $/ ne $LF );
174 5 50       10 MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32;
175              
176             $is_MSWin32
177             ? sysread( $self->{c_sock}, $plen, 4 )
178 5 50       65 : read( $self->{c_sock}, $plen, 4 );
179              
180 5         18 my $len = unpack('i', $plen);
181 5 50       13 if ( $len < 0 ) {
182 0         0 $self->end;
183 0 0       0 return wantarray ? () : undef;
184             }
185              
186             $is_MSWin32
187             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
188 5 50       13 : read( $self->{c_sock}, $data, $len );
189              
190 5 100       72 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  1         13  
191             }
192              
193             sub recv_nb {
194 5     5 1 13 my ( $self ) = @_;
195 5         6 my ( $plen, $data );
196              
197 5 50       33 local $/ = $LF if ( $/ ne $LF );
198 5         22 MCE::Util::_nonblocking( $self->{c_sock}, 1 );
199              
200             $is_MSWin32
201             ? sysread( $self->{c_sock}, $plen, 4 )
202 5 50       48 : read( $self->{c_sock}, $plen, 4 );
203              
204 5         17 MCE::Util::_nonblocking( $self->{c_sock}, 0 );
205              
206 5 50       6 my $len; $len = unpack('i', $plen) if $plen;
  5         16  
207 5 50 33     20 if ( !$len || $len < 0 ) {
208 0 0 0     0 $self->end if defined $len && $len < 0;
209 0 0       0 return wantarray ? () : undef;
210             }
211              
212             $is_MSWin32
213             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
214 5 50       17 : read( $self->{c_sock}, $data, $len );
215              
216 5 100       52 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  1         13  
217             }
218              
219             ###############################################################################
220             ## ----------------------------------------------------------------------------
221             ## Methods for two-way communication; consumers to producer(s).
222             ##
223             ###############################################################################
224              
225             sub send2 {
226 10     10 1 1231 my $self = shift;
227 10         62 my $data = $freeze->([ @_ ]);
228              
229 10 50       32 local $\ = undef if (defined $\);
230 10         12 local $MCE::Signal::SIG;
231              
232             {
233 10         11 local $MCE::Signal::IPC = 1;
  10         12  
234              
235 10 50       18 MCE::Util::_sock_ready_w( $self->{c_sock} ) if $is_MSWin32;
236 10         11 print { $self->{c_sock} } pack('i', length $data) . $data;
  10         141  
237             }
238              
239 10 50       29 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
240              
241 10         22 return 1;
242             }
243              
244             sub recv2 {
245 5     5 1 11 my ( $self ) = @_;
246 5         6 my ( $plen, $data );
247              
248 5 50       14 local $/ = $LF if ( $/ ne $LF );
249 5 50       9 MCE::Util::_sock_ready( $self->{p_sock} ) if $is_MSWin32;
250              
251             $is_MSWin32
252             ? sysread( $self->{p_sock}, $plen, 4 )
253 5 50       59 : read( $self->{p_sock}, $plen, 4 );
254              
255 5         17 my $len = unpack('i', $plen);
256              
257             $is_MSWin32
258             ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
259 5 50       15 : read( $self->{p_sock}, $data, $len );
260              
261 5 100       61 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  1         14  
262             }
263              
264             sub recv2_nb {
265 5     5 1 13 my ( $self ) = @_;
266 5         7 my ( $plen, $data );
267              
268 5 50       15 local $/ = $LF if ( $/ ne $LF );
269 5         14 MCE::Util::_nonblocking( $self->{p_sock}, 1 );
270              
271             $is_MSWin32
272             ? sysread( $self->{p_sock}, $plen, 4 )
273 5 50       47 : read( $self->{p_sock}, $plen, 4 );
274              
275 5         18 MCE::Util::_nonblocking( $self->{p_sock}, 0 );
276              
277 5 50       5 my $len; $len = unpack('i', $plen) if $plen;
  5         16  
278 5 0       10 return wantarray ? () : undef unless $len;
    50          
279              
280             $is_MSWin32
281             ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
282 5 50       18 : read( $self->{p_sock}, $data, $len );
283              
284 5 100       72 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  1         22  
285             }
286              
287             1;
288              
289             __END__
290              
291             ###############################################################################
292             ## ----------------------------------------------------------------------------
293             ## Module usage.
294             ##
295             ###############################################################################
296              
297             =head1 NAME
298              
299             MCE::Channel::Simple - Channel tuned for one producer and one consumer
300              
301             =head1 VERSION
302              
303             This document describes MCE::Channel::Simple version 1.887
304              
305             =head1 DESCRIPTION
306              
307             A channel class providing queue-like and two-way communication
308             for one process or thread on either end; no locking needed.
309              
310             The API is described in L<MCE::Channel>.
311              
312             =over 3
313              
314             =item new
315              
316             use MCE::Channel;
317              
318             my $chnl = MCE::Channel->new( impl => 'Simple' );
319              
320             =back
321              
322             =head1 QUEUE-LIKE BEHAVIOR
323              
324             =over 3
325              
326             =item enqueue
327              
328             =item dequeue
329              
330             =item dequeue_nb
331              
332             =item end
333              
334             =back
335              
336             =head1 TWO-WAY IPC - PRODUCER TO CONSUMER
337              
338             =over 3
339              
340             =item send
341              
342             =item recv
343              
344             =item recv_nb
345              
346             =back
347              
348             =head1 TWO-WAY IPC - CONSUMER TO PRODUCER
349              
350             =over 3
351              
352             =item send2
353              
354             =item recv2
355              
356             =item recv2_nb
357              
358             =back
359              
360             =head1 AUTHOR
361              
362             Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
363              
364             =cut
365