File Coverage

blib/lib/MCE/Channel/Simple.pm
Criterion Covered Total %
statement 131 142 92.2
branch 64 120 53.3
condition 6 18 33.3
subroutine 15 15 100.0
pod 11 11 100.0
total 227 306 74.1


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Channel tuned for one producer and one consumer involving no locking.
4             ##
5             ###############################################################################
6              
7             package MCE::Channel::Simple;
8              
9 1     1   1114 use strict;
  1         2  
  1         29  
10 1     1   5 use warnings;
  1         2  
  1         27  
11              
12 1     1   4 no warnings qw( uninitialized once );
  1         2  
  1         43  
13              
14             our $VERSION = '1.888';
15              
16 1     1   5 use base 'MCE::Channel';
  1         2  
  1         2129  
17              
18             my $LF = "\012"; Internals::SvREADONLY($LF, 1);
19             my $is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0;
20             my $freeze = MCE::Channel::_get_freeze();
21             my $thaw = MCE::Channel::_get_thaw();
22              
23             sub new {
24 1     1 1 5 my ( $class, %obj ) = ( @_, impl => 'Simple' );
25              
26 1         3 $obj{init_pid} = MCE::Channel::_pid();
27 1         5 MCE::Util::_sock_pair( \%obj, 'p_sock', 'c_sock' );
28              
29 1         6 return bless \%obj, $class;
30             }
31              
32             ###############################################################################
33             ## ----------------------------------------------------------------------------
34             ## Queue-like methods.
35             ##
36             ###############################################################################
37              
38             sub end {
39 1     1 1 7 my ( $self ) = @_;
40              
41 1 50       4 local $\ = undef if (defined $\);
42 1 50       3 MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32;
43 1         2 print { $self->{p_sock} } pack('i', -1);
  1         10  
44              
45 1         5 $self->{ended} = 1;
46             }
47              
48             sub enqueue {
49 15     15 1 1743 my $self = shift;
50 15 100       40 return MCE::Channel::_ended('enqueue') if $self->{ended};
51              
52 14 50       36 local $\ = undef if (defined $\);
53 14 50       27 MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32;
54              
55 14         29 while ( @_ ) {
56 24         157 my $data = $freeze->([ shift ]);
57 24         43 print { $self->{p_sock} } pack('i', length $data) . $data;
  24         340  
58             }
59              
60 14         47 return 1;
61             }
62              
63             sub dequeue {
64 9     9 1 25 my ( $self, $count ) = @_;
65 9 100 66     29 $count = 1 if ( !$count || $count < 1 );
66              
67 9 50       26 local $/ = $LF if ( $/ ne $LF );
68              
69 9 100       20 if ( $count == 1 ) {
70 8         10 my ( $plen, $data );
71 8 50       15 MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32;
72              
73             $is_MSWin32
74             ? sysread( $self->{c_sock}, $plen, 4 )
75 8 50       70 : read( $self->{c_sock}, $plen, 4 );
76              
77 8         30 my $len = unpack('i', $plen);
78 8 50       18 if ( $len < 0 ) {
79 0         0 $self->end;
80 0 0       0 return wantarray ? () : undef;
81             }
82              
83             $is_MSWin32
84             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
85 8 50       23 : read( $self->{c_sock}, $data, $len );
86              
87 8 50       147 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  0         0  
88             }
89             else {
90 1         2 my ( $plen, @ret );
91 1 50       4 MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32;
92              
93 1         3 while ( $count-- ) {
94 3         5 my $data;
95              
96             $is_MSWin32
97             ? sysread( $self->{c_sock}, $plen, 4 )
98 3 50       21 : read( $self->{c_sock}, $plen, 4 );
99              
100 3         20 my $len = unpack('i', $plen);
101 3 50       8 if ( $len < 0 ) {
102 0         0 $self->end;
103 0         0 last;
104             }
105              
106             $is_MSWin32
107             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
108 3 50       8 : read( $self->{c_sock}, $data, $len );
109              
110 3         5 push @ret, @{ $thaw->($data) };
  3         31  
111             }
112              
113 1 50       8 wantarray ? @ret : $ret[-1];
114             }
115             }
116              
117             sub dequeue_nb {
118 11     11 1 34 my ( $self, $count ) = @_;
119 11 100 66     36 $count = 1 if ( !$count || $count < 1 );
120              
121 11         15 my ( $plen, @ret );
122 11 50       30 local $/ = $LF if ( $/ ne $LF );
123              
124 11         24 while ( $count-- ) {
125 13         17 my $data;
126 13         57 MCE::Util::_nonblocking( $self->{c_sock}, 1 );
127              
128             $is_MSWin32
129             ? sysread( $self->{c_sock}, $plen, 4 )
130 13 50       119 : read( $self->{c_sock}, $plen, 4 );
131              
132 13         59 MCE::Util::_nonblocking( $self->{c_sock}, 0 );
133              
134 13 50       16 my $len; $len = unpack('i', $plen) if $plen;
  13         54  
135 13 50 33     54 if ( !$len || $len < 0 ) {
136 0 0 0     0 $self->end if defined $len && $len < 0;
137 0         0 last;
138             }
139              
140             $is_MSWin32
141             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
142 13 50       45 : read( $self->{c_sock}, $data, $len );
143              
144 13         17 push @ret, @{ $thaw->($data) };
  13         126  
145             }
146              
147 11 100       88 wantarray ? @ret : $ret[-1];
148             }
149              
150             ###############################################################################
151             ## ----------------------------------------------------------------------------
152             ## Methods for two-way communication; producer(s) to consumers.
153             ##
154             ###############################################################################
155              
156             sub send {
157 11     11 1 582 my $self = shift;
158 11 100       34 return MCE::Channel::_ended('send') if $self->{ended};
159              
160 10         96 my $data = $freeze->([ @_ ]);
161              
162 10 50       38 local $\ = undef if (defined $\);
163 10 50       19 MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32;
164 10         14 print { $self->{p_sock} } pack('i', length $data) . $data;
  10         177  
165              
166 10         42 return 1;
167             }
168              
169             sub recv {
170 5     5 1 13 my ( $self ) = @_;
171 5         8 my ( $plen, $data );
172              
173 5 50       16 local $/ = $LF if ( $/ ne $LF );
174 5 50       9 MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32;
175              
176             $is_MSWin32
177             ? sysread( $self->{c_sock}, $plen, 4 )
178 5 50       77 : read( $self->{c_sock}, $plen, 4 );
179              
180 5         20 my $len = unpack('i', $plen);
181 5 50       14 if ( $len < 0 ) {
182 0         0 $self->end;
183 0 0       0 return wantarray ? () : undef;
184             }
185              
186             $is_MSWin32
187             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
188 5 50       15 : read( $self->{c_sock}, $data, $len );
189              
190 5 100       72 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  1         27  
191             }
192              
193             sub recv_nb {
194 5     5 1 16 my ( $self ) = @_;
195 5         7 my ( $plen, $data );
196              
197 5 50       30 local $/ = $LF if ( $/ ne $LF );
198 5         34 MCE::Util::_nonblocking( $self->{c_sock}, 1 );
199              
200             $is_MSWin32
201             ? sysread( $self->{c_sock}, $plen, 4 )
202 5 50       59 : read( $self->{c_sock}, $plen, 4 );
203              
204 5         22 MCE::Util::_nonblocking( $self->{c_sock}, 0 );
205              
206 5 50       5 my $len; $len = unpack('i', $plen) if $plen;
  5         21  
207 5 50 33     22 if ( !$len || $len < 0 ) {
208 0 0 0     0 $self->end if defined $len && $len < 0;
209 0 0       0 return wantarray ? () : undef;
210             }
211              
212             $is_MSWin32
213             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
214 5 50       20 : read( $self->{c_sock}, $data, $len );
215              
216 5 100       63 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  1         15  
217             }
218              
219             ###############################################################################
220             ## ----------------------------------------------------------------------------
221             ## Methods for two-way communication; consumers to producer(s).
222             ##
223             ###############################################################################
224              
225             sub send2 {
226 10     10 1 1105 my $self = shift;
227 10         72 my $data = $freeze->([ @_ ]);
228              
229 10 50       35 local $\ = undef if (defined $\);
230 10         16 local $MCE::Signal::SIG;
231              
232             {
233 10         12 local $MCE::Signal::IPC = 1;
  10         16  
234              
235 10 50       20 MCE::Util::_sock_ready_w( $self->{c_sock} ) if $is_MSWin32;
236 10         13 print { $self->{c_sock} } pack('i', length $data) . $data;
  10         163  
237             }
238              
239 10 50       41 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
240              
241 10         25 return 1;
242             }
243              
244             sub recv2 {
245 5     5 1 13 my ( $self ) = @_;
246 5         7 my ( $plen, $data );
247              
248 5 50       15 local $/ = $LF if ( $/ ne $LF );
249 5 50       9 MCE::Util::_sock_ready( $self->{p_sock} ) if $is_MSWin32;
250              
251             $is_MSWin32
252             ? sysread( $self->{p_sock}, $plen, 4 )
253 5 50       77 : read( $self->{p_sock}, $plen, 4 );
254              
255 5         21 my $len = unpack('i', $plen);
256              
257             $is_MSWin32
258             ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
259 5 50       18 : read( $self->{p_sock}, $data, $len );
260              
261 5 100       75 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  1         15  
262             }
263              
264             sub recv2_nb {
265 5     5 1 15 my ( $self ) = @_;
266 5         10 my ( $plen, $data );
267              
268 5 50       16 local $/ = $LF if ( $/ ne $LF );
269 5         17 MCE::Util::_nonblocking( $self->{p_sock}, 1 );
270              
271             $is_MSWin32
272             ? sysread( $self->{p_sock}, $plen, 4 )
273 5 50       62 : read( $self->{p_sock}, $plen, 4 );
274              
275 5         23 MCE::Util::_nonblocking( $self->{p_sock}, 0 );
276              
277 5 50       8 my $len; $len = unpack('i', $plen) if $plen;
  5         19  
278 5 0       13 return wantarray ? () : undef unless $len;
    50          
279              
280             $is_MSWin32
281             ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
282 5 50       18 : read( $self->{p_sock}, $data, $len );
283              
284 5 100       57 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  1         18  
285             }
286              
287             1;
288              
289             __END__