File Coverage

blib/lib/IO/Lambda/Mutex.pm
Criterion Covered Total %
statement 71 75 94.6
branch 20 28 71.4
condition 3 6 50.0
subroutine 16 18 88.8
pod 9 9 100.0
total 119 136 87.5


line stmt bran cond sub pod time code
1             # $Id: Mutex.pm,v 1.13 2010/03/27 19:56:34 dk Exp $
2             package IO::Lambda::Mutex;
3 1     1   681 use vars qw($DEBUG @ISA);
  1         2  
  1         92  
4             $DEBUG = $IO::Lambda::DEBUG{mutex} || 0;
5             @ISA = qw(Exporter);
6             @EXPORT_OK = qw(mutex);
7             %EXPORT_TAGS = ( all => \@EXPORT_OK);
8              
9 1     1   4 use strict;
  1         1  
  1         21  
10 1     1   3 use warnings;
  1         1  
  1         22  
11 1     1   4 use IO::Lambda qw(:all);
  1         16  
  1         1707  
12              
13             sub new
14             {
15 1     1 1 215 return bless {
16             taken => 0,
17             queue => [],
18             }, shift;
19             }
20              
21 6     6 1 1223 sub is_taken { $_[0]-> {taken} }
22 7     7 1 37 sub is_free { not $_[0]-> {taken} }
23              
24             # non-blocking take
25             sub take
26             {
27 8     8 1 12 my $self = shift;
28 8 50 33     29 warn "$self is taken\n" if $DEBUG and not $self->{taken};
29 8 50       24 return $self-> {taken} ? 0 : ($self-> {taken} = 1);
30             }
31              
32             # remove the lambda from queue
33             sub remove
34             {
35 4     4 1 8 my ( $self, $lambda) = @_;
36 4         4 my $found;
37 4         6 my $q = $self-> {queue};
38 4         12 for ( my $i = 0; $i < @$q; $i ++) {
39 3 50       7 next if $q->[$i] != $lambda;
40 3         4 $found = $i;
41 3         5 last;
42             }
43 4 100       9 if ( defined $found) {
44 3         6 splice( @$q, $found, 1);
45 3         7 return 1;
46             } else {
47 1 50       3 warn "$self failed to remove $lambda from queue\n" if $DEBUG;
48 1         2 return 0;
49             }
50             }
51              
52             sub waiter
53             {
54 11     11 1 16 my ( $self, $timeout) = @_;
55              
56             # mutex is free, can take now
57 11 100       25 unless ( $self-> {taken}) {
58 3         6 $self-> take;
59 3     3   17 return lambda { undef };
  3         39  
60             }
61              
62             # mutex is not free, wait for it
63 8         28 my $waiter = IO::Lambda-> new;
64             my $bind = $waiter-> bind( sub {
65 7     7   10 my ($w,$rec) = (shift,shift);
66             # lambda was terminated, relinquish waiting and kill timeout
67 7 100       14 unless ($w->{__already_removed}) {
68 2         6 my $removed = $self->remove($w);
69 2 50 66     10 $self->release if !$removed && 0 == $self->{queue};
70             }
71 7 100       18 $w-> cancel_event($timeout) if defined $timeout;
72 7         12 return @_; # propagate error
73 8         53 });
74 8         10 push @{$self-> {queue}}, $waiter;
  8         14  
75              
76             $timeout = $waiter-> watch_timer( $timeout, sub {
77 1     1   4 $self-> remove($waiter);
78 1         5 $waiter-> resolve($bind);
79 1         5 return 'timeout';
80 8 100       23 }) if defined $timeout;
81              
82 8         17 return $waiter;
83             }
84              
85             sub release
86             {
87 13     13 1 31 my $self = shift;
88 13 50       32 return unless $self-> {taken};
89              
90 13 100       12 unless (@{$self-> {queue}}) {
  13         39  
91 8 50       19 warn "$self is free\n" if $DEBUG;
92 8         10 $self-> {taken} = 0;
93 8         12 return;
94             }
95              
96 5         6 my $lambda = shift @{$self-> {queue}};
  5         9  
97              
98 5 50       11 warn "$self gives ownership to $lambda\n" if $DEBUG;
99 5         9 $lambda-> {__already_removed} = 1;
100 5         18 $lambda-> terminate(undef);
101             }
102              
103 0     0   0 sub DESTROY { $_-> terminate('dead') for @{shift-> {queue}} }
  0         0  
104              
105             sub mutex(&)
106             {
107 0     0 1 0 my ( $self, $timeout) = context;
108 0         0 $self-> waiter($timeout)-> condition(shift, \&mutex, 'mutex')
109             }
110              
111             sub pipeline
112             {
113 5     5 1 6 my ( $self, $lambda, $timeout) = @_;
114             lambda {
115 5     5   5 my @p = @_;
116 5         10 context $self-> waiter($timeout);
117             tail {
118 5         21 context $lambda, @p;
119             autocatch tail {
120 5         11 $self-> release;
121 5         11 return @_;
122 5         28 }}}
  5         24  
123 5         19 }
124              
125              
126             1;
127              
128             =pod
129              
130             =head1 NAME
131              
132             IO::Lambda::Mutex - wait for a shared resource
133              
134             =head1 DESCRIPTION
135              
136             Objects of class C are mutexes, that as normal mutexes,
137             can be taken and released. The mutexes allow lambdas to wait for their
138             availability with method C, that creates and returns a new lambda,
139             that in turn will finish as soon as the caller can acquire the mutex.
140              
141             =head1 SYNOPSIS
142              
143             use IO::Lambda qw(:lambda);
144             use IO::Lambda::Mutex qw(mutex);
145            
146             my $mutex = IO::Lambda::Mutex-> new;
147            
148             # wait for mutex that shall be available immediately
149             my $waiter = $mutex-> waiter;
150             my $error = $waiter-> wait;
151             die "error:$error" if $error;
152            
153             # create and start a lambda that sleeps 2 seconds and then releases the mutex
154             my $sleeper = lambda {
155             context 2;
156             timeout { $mutex-> release }
157             };
158             $sleeper-> start;
159            
160             # Create a new lambda that shall only wait for 0.5 seconds.
161             # It will surely fail, since $sleeper is well, still sleeping
162             lambda {
163             context $mutex-> waiter(0.5);
164             tail {
165             my $error = shift;
166             print $error ? "error:$error\n" : "ok\n";
167             # $error is expected to be 'timeout'
168             }
169             }-> wait;
170              
171             # Again, wait for the same mutex but using different syntax.
172             # This time should be ok - $sleeper will sleep for 1.5 seconds and
173             # then the mutex will be available.
174             lambda {
175             context $mutex, 3;
176             mutex {
177             my $error = shift;
178             print $error ? "error:$error\n" : "ok\n";
179             # expected to be 'ok'
180             }
181             }->wait;
182              
183             # pipeline - manage a queue of lambdas, stuff new ones to it, guarantees
184             # sequential execution:
185             lambda {
186             context
187             $mutex-> pipeline( lambda { print 1 } ),
188             $mutex-> pipeline( lambda { print 2 } ),
189             $mutex-> pipeline( lambda { print 3 } )
190             ;
191             &tails();
192             }-> wait;
193             # prints 123 guaranteedly in that order, even if intermediate lambdas sleep etc
194              
195             =head1 API
196              
197             =over
198              
199             =item new
200              
201             The constructor creates a new free mutex.
202              
203             =item is_free
204              
205             Returns boolean flag whether the mutex is free or not.
206             Opposite of L.
207              
208             =item is_taken
209              
210             Returns boolean flag whether the mutex is taken or not.
211             Opposite of L.
212              
213             =item take
214              
215             Attempts to take the mutex. If the mutex is free, the operation
216             is successful and true value is returned. Otherwise, the operation
217             is failed and false value is returned.
218              
219             =item release
220              
221             Tries to releases the taken mutex. If there are lambdas waiting (see L)
222             in the queue, the first lambda will be terminated, and thus whoever waits for
223             the lambda can be notified; it will be up to the code associated with the
224             waiter lambda to call C again. If there are no waiters in the queue,
225             the mutex is set free.
226              
227             =item waiter($timeout = undef) :: () -> error
228              
229             Creates a new lambda, that is finished when the mutex becomes available.
230             The lambda is inserted into the internal waiting queue. It takes as
231             many calls to C as many lambdas are in queue, until the mutex
232             becomes free. The lambda returns an error flags, which is C if
233             the mutex was acquired successfully, or the error string.
234              
235             If C<$timeout> is defined, and by the time it is expired the mutex
236             could not be obtained, the lambda is removed from the queue, and
237             returned error value is 'timeout'. The mutex state is then unchanged.
238              
239             If C succeeds, a C call is issued. Thus, if the next
240             waiter awaits for the mutex, it will be notified; otherwise the mutex
241             becomes free.
242              
243             =item pipeline($lambda, $timeout = undef)
244              
245             Creates a new lambda, that wraps over C<$lambda> so that it is executed
246             after mutex had been obtained. Also, as soon as C<$lambda> is finished,
247             the mutex is released, thus allowing others to take it.
248              
249             =item remove($lambda)
250              
251             Internal function, do not use directly, use C<< $lambda-> terminate >>
252             instead.
253              
254             Removes the lambda created previously by waiter() from internal queue. Note
255             that after that operation the lambda will never finish by itself.
256              
257             =item mutex($mutex, $timeout = undef) -> error
258              
259             Condition wrapper over C.
260              
261             =back
262              
263             =head1 AUTHOR
264              
265             Dmitry Karasik, Edmitry@karasik.eu.orgE.
266              
267             =cut