File Coverage

blib/lib/MCE/Channel.pm
Criterion Covered Total %
statement 65 71 91.5
branch 15 32 46.8
condition 10 21 47.6
subroutine 19 20 95.0
pod 2 2 100.0
total 111 146 76.0


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Queue-like and two-way communication capability.
4             ##
5             ###############################################################################
6              
7             package MCE::Channel;
8              
9 18     18   432759 use strict;
  18         105  
  18         548  
10 18     18   100 use warnings;
  18         23  
  18         506  
11              
12 18     18   88 no warnings qw( uninitialized once );
  18         33  
  18         1167  
13              
14             our $VERSION = '1.889';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17             ## no critic (TestingAndDebugging::ProhibitNoStrict)
18              
19 18     18   11753 use if $^O eq 'MSWin32', 'threads';
  18         249  
  18         180  
20 18     18   882 use if $^O eq 'MSWin32', 'threads::shared';
  18         38  
  18         57  
21              
22 18     18   496 use Carp ();
  18         33  
  18         3042  
23              
24             $Carp::Internal{ (__PACKAGE__) }++;
25              
26             my ( $freeze, $thaw );
27              
28             BEGIN {
29 18 50 33 18   198 if ( $] ge '5.008008' && ! $INC{'PDL.pm'} ) {
30 18         34 local $@;
31 18     18   1430 eval 'use Sereal::Encoder 3.015; use Sereal::Decoder 3.015;';
  18     18   137  
  18         471  
  18         1024  
  18         126  
  18         310  
  18         526  
32 18 50       88 if ( ! $@ ) {
33 18         214 my $encoder_ver = int( Sereal::Encoder->VERSION() );
34 18         151 my $decoder_ver = int( Sereal::Decoder->VERSION() );
35 18 50       83 if ( $encoder_ver - $decoder_ver == 0 ) {
36 18         38 $freeze = \&Sereal::Encoder::encode_sereal;
37 18         50 $thaw = \&Sereal::Decoder::decode_sereal;
38             }
39             }
40             }
41              
42 18 50       440 if ( ! defined $freeze ) {
43 0         0 require Storable;
44 0         0 $freeze = \&Storable::freeze;
45 0         0 $thaw = \&Storable::thaw;
46             }
47             }
48              
49 18     18   8538 use MCE::Util ();
  18         63  
  18         3282  
50              
51             my $tid = $INC{'threads.pm'} ? threads->tid() : 0;
52              
53             sub new {
54 29     29 1 1994 my ( $class, %argv ) = @_;
55 29 50       458 my $impl = defined( $argv{impl} ) ? ucfirst( lc $argv{impl} ) : 'Mutex';
56              
57             # Replace 'fast' with 'Fast' in the implementation value.
58 29         185 $impl =~ s/fast/Fast/;
59              
60 29 50 66     208 $impl = 'Threads' if ( $impl eq 'Mutex' && $^O eq 'MSWin32' );
61 29 50 66     163 $impl = 'ThreadsFast' if ( $impl eq 'MutexFast' && $^O eq 'MSWin32' );
62 29 50 33     123 $impl = 'Mutex' if ( $impl eq 'Threads' && $^O eq 'cygwin' );
63 29 50 33     94 $impl = 'MutexFast' if ( $impl eq 'ThreadsFast' && $^O eq 'cygwin' );
64              
65 29 50       1876 eval "require MCE::Channel::$impl; 1;" ||
66             Carp::croak("Could not load Channel implementation '$impl': $@");
67              
68 29         140 my $pkg = 'MCE::Channel::'.$impl;
69 18     18   147 no strict 'refs';
  18         37  
  18         8127  
70              
71 29         176 $pkg->new(%argv);
72             }
73              
74             sub CLONE {
75 0 0   0   0 $tid = threads->tid if $INC{'threads.pm'};
76             }
77              
78             sub DESTROY {
79 19 50   19   39461 my ( $pid, $self ) = ( $tid ? $$ .'.'. $tid : $$, @_ );
80              
81 19 100 66     260 if ( $self->{'init_pid'} && $self->{'init_pid'} eq $pid ) {
82 16         206 MCE::Util::_destroy_socks($self, qw(c_sock c2_sock p_sock p2_sock));
83 16         448 delete($self->{c_mutex}), delete($self->{p_mutex});
84             }
85              
86 19         1452 return;
87             }
88              
89             sub impl {
90 6 50   6 1 108 $_[0]->{'impl'} || 'Not defined';
91             }
92              
93 15     15   42 sub _get_freeze { $freeze; }
94 15     15   40 sub _get_thaw { $thaw; }
95              
96             sub _ended {
97 12     12   129 warn "WARNING: ($_[0]) called on a channel that has been 'end'ed\n";
98              
99 12         8215 return;
100             }
101              
102             sub _read {
103 120     120   283 my $bytes = MCE::Util::_sysread( $_[0], $_[1], my $len = $_[2] );
104 120         279 my $read = $bytes;
105              
106 120   33     470 while ( $bytes && $read != $len ) {
107 0         0 $bytes = MCE::Util::_sysread( $_[0], $_[1], $len - $read, length($_[1]) );
108 0 0       0 $read += $bytes if $bytes;
109             }
110              
111 120         244 return;
112             }
113              
114             sub _pid {
115 29 50   29   168 $tid ? $$ .'.'. $tid : $$;
116             }
117              
118             1;
119              
120             __END__