File Coverage

blib/lib/MCE/Mutex/Channel.pm
Criterion Covered Total %
statement 50 78 64.1
branch 17 64 26.5
condition 4 12 33.3
subroutine 14 19 73.6
pod 5 5 100.0
total 90 178 50.5


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## MCE::Mutex::Channel - Mutex locking via a pipe or socket.
4             ##
5             ###############################################################################
6              
7             package MCE::Mutex::Channel;
8              
9 102     102   2568 use strict;
  102         187  
  102         2682  
10 102     102   1079 use warnings;
  102         189  
  102         2827  
11              
12 102     102   419 no warnings qw( threads recursion uninitialized once );
  102         185  
  102         5563  
13              
14             our $VERSION = '1.887';
15              
16 102     102   46495 use if $^O eq 'MSWin32', 'threads';
  102         1003  
  102         530  
17 102     102   3936 use if $^O eq 'MSWin32', 'threads::shared';
  102         171  
  102         353  
18              
19 102     102   2760 use base 'MCE::Mutex';
  102         178  
  102         9546  
20 102     102   1373 use MCE::Util ();
  102         256  
  102         1806  
21 102     102   381 use Scalar::Util qw(looks_like_number weaken);
  102         171  
  102         5129  
22 102     102   534 use Time::HiRes 'alarm';
  102         178  
  102         651  
23              
24             my $is_MSWin32 = ($^O eq 'MSWin32') ? 1 : 0;
25             my $use_pipe = ($^O !~ /mswin|mingw|msys|cygwin/i && $] gt '5.010000');
26             my $tid = $INC{'threads.pm'} ? threads->tid : 0;
27              
28             sub CLONE {
29 0 0   0   0 $tid = threads->tid if $INC{'threads.pm'};
30             }
31              
32             sub DESTROY {
33 256 50   256   4813 my ($pid, $obj) = ($tid ? $$ .'.'. $tid : $$, @_);
34              
35 256 50       1953 CORE::syswrite($obj->{_w_sock}, '0'), $obj->{$pid } = 0 if $obj->{$pid };
36 256 50       921 CORE::syswrite($obj->{_r_sock}, '0'), $obj->{$pid.'b'} = 0 if $obj->{$pid.'b'};
37              
38 256 100       968 if ( $obj->{_init_pid} eq $pid ) {
39 210 100 66     1922 (!$use_pipe || $obj->{impl} eq 'Channel2')
40             ? MCE::Util::_destroy_socks($obj, qw(_w_sock _r_sock))
41             : MCE::Util::_destroy_pipes($obj, qw(_w_sock _r_sock));
42             }
43              
44 256         16535 return;
45             }
46              
47             my @mutex;
48              
49             sub _destroy {
50 0 0   0   0 my $pid = $tid ? $$ .'.'. $tid : $$;
51              
52             # Called by { MCE, MCE::Child, and MCE::Hobo }::_exit
53 0         0 for my $i ( 0 .. @mutex - 1 ) {
54             CORE::syswrite($mutex[$i]->{_w_sock}, '0'), $mutex[$i]->{$pid} = 0
55 0 0       0 if ( $mutex[$i]->{$pid} );
56             CORE::syswrite($mutex[$i]->{_r_sock}, '0'), $mutex[$i]->{$pid.'b'} = 0
57 0 0       0 if ( $mutex[$i]->{$pid.'b'} );
58             }
59             }
60              
61             sub _save_for_global_cleanup {
62 32     32   149 push(@mutex, $_[0]), weaken($mutex[-1]);
63             }
64              
65             ###############################################################################
66             ## ----------------------------------------------------------------------------
67             ## Public methods.
68             ##
69             ###############################################################################
70              
71             sub new {
72 431     431 1 1480 my ($class, %obj) = (@_, impl => 'Channel');
73 431 50       1646 $obj{_init_pid} = $tid ? $$ .'.'. $tid : $$;
74 431 50       982 $obj{_t_lock} = threads::shared::share( my $t_lock ) if $is_MSWin32;
75              
76 431 50       2333 $use_pipe
77             ? MCE::Util::_pipe_pair(\%obj, qw(_r_sock _w_sock))
78             : MCE::Util::_sock_pair(\%obj, qw(_r_sock _w_sock));
79              
80 431         4457 CORE::syswrite($obj{_w_sock}, '0');
81 431         2067 bless \%obj, $class;
82              
83 431 100 66     4130 if ( caller !~ /^MCE:?/ || caller(1) !~ /^MCE:?/ ) {
84 2         5 MCE::Mutex::Channel::_save_for_global_cleanup(\%obj);
85             }
86              
87 431         2912 return \%obj;
88             }
89              
90             sub lock {
91 355 50   355 1 1724 my ($pid, $obj) = ($tid ? $$ .'.'. $tid : $$, shift);
92              
93             CORE::lock($obj->{_t_lock}), MCE::Util::_sock_ready($obj->{_r_sock})
94 355 50       860 if $is_MSWin32;
95             MCE::Util::_sysread($obj->{_r_sock}, my($b), 1), $obj->{ $pid } = 1
96 355 50       5360 unless $obj->{ $pid };
97              
98 355         9386 return;
99             }
100              
101             *lock_exclusive = \&lock;
102             *lock_shared = \&lock;
103              
104             sub unlock {
105 355 50   355 1 1977 my ($pid, $obj) = ($tid ? $$ .'.'. $tid : $$, shift);
106              
107             CORE::syswrite($obj->{_w_sock}, '0'), $obj->{ $pid } = 0
108 355 50       20804 if $obj->{ $pid };
109              
110 355         1836 return;
111             }
112              
113             sub synchronize {
114 0 0   0 1   my ($pid, $obj, $code) = ($tid ? $$ .'.'. $tid : $$, shift, shift);
115 0           my (@ret, $b);
116              
117 0 0         return unless ref($code) eq 'CODE';
118              
119             # lock, run, unlock - inlined for performance
120             CORE::lock($obj->{_t_lock}), MCE::Util::_sock_ready($obj->{_r_sock})
121 0 0         if $is_MSWin32;
122             MCE::Util::_sysread($obj->{_r_sock}, $b, 1), $obj->{ $pid } = 1
123 0 0         unless $obj->{ $pid };
124              
125             (defined wantarray)
126 0 0         ? @ret = wantarray ? $code->(@_) : scalar $code->(@_)
    0          
127             : $code->(@_);
128              
129 0           CORE::syswrite($obj->{_w_sock}, '0'), $obj->{ $pid } = 0;
130              
131 0 0         return wantarray ? @ret : $ret[-1];
132             }
133              
134             *enter = \&synchronize;
135              
136             sub timedwait {
137 0     0 1   my ($obj, $timeout) = @_;
138              
139 0 0         $timeout = 1 unless defined $timeout;
140 0 0 0       Carp::croak('MCE::Mutex::Channel: timedwait (timeout) is not valid')
141             if (!looks_like_number($timeout) || $timeout < 0);
142              
143 0 0         $timeout = 0.0003 if $timeout < 0.0003;
144 0           local $@; my $ret = '';
  0            
145              
146 0           eval {
147 0     0     local $SIG{ALRM} = sub { alarm 0; die "alarm clock restart\n" };
  0            
  0            
148 0 0         alarm $timeout unless $is_MSWin32;
149              
150             die "alarm clock restart\n"
151 0 0 0       if $is_MSWin32 && MCE::Util::_sock_ready($obj->{_r_sock}, $timeout);
152              
153 0 0         (!$is_MSWin32)
154             ? ($obj->lock_exclusive, $ret = 1, alarm(0))
155             : ($obj->lock_exclusive, $ret = 1);
156             };
157              
158 0 0         alarm 0 unless $is_MSWin32;
159              
160 0           $ret;
161             }
162              
163             1;
164              
165             __END__
166              
167             ###############################################################################
168             ## ----------------------------------------------------------------------------
169             ## Module usage.
170             ##
171             ###############################################################################
172              
173             =head1 NAME
174              
175             MCE::Mutex::Channel - Mutex locking via a pipe or socket
176              
177             =head1 VERSION
178              
179             This document describes MCE::Mutex::Channel version 1.887
180              
181             =head1 DESCRIPTION
182              
183             A pipe-socket implementation for C<MCE::Mutex>.
184              
185             The API is described in L<MCE::Mutex>.
186              
187             =over 3
188              
189             =item new
190              
191             =item lock
192              
193             =item lock_exclusive
194              
195             =item lock_shared
196              
197             =item unlock
198              
199             =item synchronize
200              
201             =item enter
202              
203             =item timedwait
204              
205             =back
206              
207             =head1 AUTHOR
208              
209             Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
210              
211             =cut
212