File Coverage

blib/lib/IPC/Lock/RabbitMQ.pm
Criterion Covered Total %
statement 21 47 44.6
branch 0 2 0.0
condition n/a
subroutine 7 16 43.7
pod 1 1 100.0
total 29 66 43.9


line stmt bran cond sub pod time code
1             package IPC::Lock::RabbitMQ;
2 1     1   3094 use Moose;
  1         1918072  
  1         13  
3 1     1   9531 use MooseX::Types::Moose qw/ HashRef /;
  1         230377  
  1         14  
4 1     1   17338 use AnyEvent;
  1         17980  
  1         103  
5 1     1   777 use IPC::Lock::RabbitMQ::Types qw/ MQ /;
  1         4  
  1         8  
6 1     1   1020 use Scalar::Util qw/ refaddr /;
  1         2  
  1         58  
7 1     1   724 use IPC::Lock::RabbitMQ::Lock;
  1         5  
  1         44  
8 1     1   15 use namespace::autoclean;
  1         3  
  1         8  
9              
10             our $VERSION = '0.007';
11              
12             with 'IPC::Lock::RabbitMQ::HasTimeout';
13              
14             has mq => (
15             isa => MQ,
16             is => 'ro',
17             coerce => 1,
18             required => 1,
19             );
20              
21             # NOTE - We use an auto_delete queue for each lock, and we lock by
22             # trying to start a consumer with 'exclusive'. WE DO NOT make
23             # the queue exclusive, so someone else can grab the lock before
24             # the queue is auto-deleted once we let it go. If someone is
25             # already consuming this queue, then we get an AMQP fault and
26             # our channel gets torn down. If we disconnect / crash, then
27             # our consumtion is cancelled and the queue is auto deleted.
28              
29             # Relevant parts of the AMQP spec follow.
30              
31             #<field name="auto delete" type="bit">
32             # auto-delete queue when unused
33             # <doc>
34             # If set, the queue is deleted when all consumers have finished
35             # using it. Last consumer can be cancelled either explicitly or because
36             # its channel is closed. If there was no consumer ever on the queue, it
37             # won't be deleted.
38             # </doc>
39             # <rule implement="SHOULD">
40             # <test>amq_queue_02</test>
41             # The server SHOULD allow for a reasonable delay between the point
42             # when it determines that a queue is not being used (or no longer
43             # used), and the point when it deletes the queue. At the least it
44             # must allow a client to create a queue and then create a consumer
45             # to read from it, with a small but non-zero delay between these
46             # two actions. The server should equally allow for clients that may
47             # be disconnected prematurely, and wish to re-consume from the same
48             # queue without losing messages. We would recommend a configurable
49             # timeout, with a suitable default value being one minute.
50             # </rule>
51             # </field>
52              
53             #<field name = "exclusive" type = "bit">
54             # request exclusive access
55             # <doc>
56             # Request exclusive consumer access, meaning only this consumer can
57             # access the queue.
58             # </doc>
59             # <doc name = "rule" test = "amq_basic_02">
60             # If the server cannot grant exclusive access to the queue when asked,
61             # - because there are other consumers active - it MUST raise a channel
62             # exception with return code 403 (access refused).
63             # </doc>
64             #</field>
65              
66             sub lock {
67 0     0 1   my ($self, $key) = @_;
68              
69 0           my $lock_cv = AnyEvent->condvar;
70              
71 0           my $channel_cv = AnyEvent->condvar;
72 0           my $t = $self->_gen_timer($channel_cv, 'Open channel');
73             $self->mq->open_channel(
74 0     0     on_success => sub { $channel_cv->send(shift()) },
75 0     0     on_failure => sub { $channel_cv->croak(shift()) },
76 0     0     on_close => sub { $lock_cv->send(0) }, # Channel torn down if we consume locked queue.
77 0           );
78 0           my $channel = $channel_cv->recv;
79 0           undef $t;
80 0           my $queue_cv = AnyEvent->condvar;
81 0           $t = $self->_gen_timer($queue_cv, 'Declare queue');
82             $channel->declare_queue(
83             queue => 'lock_' . $key,
84             auto_delete => 1,
85 0     0     on_success => sub { $queue_cv->send(1) },
86 0     0     on_failure => sub { $queue_cv->croak(shift()) },
87 0           );
88 0           $queue_cv->recv;
89 0           undef $t;
90 0           $t = $self->_gen_timer($lock_cv, 'Start consume');
91             $channel->consume(
92             consumer_tag => refaddr($self) . $key,
93             queue => 'lock_' . $key,
94             exclusive => 1,
95             on_consume => sub {
96 0     0     warn("Saw message on lock queue lock_" . $key);
97             },
98 0     0     on_success => sub { $lock_cv->send(1) },
99 0     0     on_failure => sub { $lock_cv->send(0) },
100 0           );
101 0 0         if ($lock_cv->recv) {
102 0           undef $t;
103 0           return IPC::Lock::RabbitMQ::Lock->new( locker => $self, lock_name => $key, channel => $channel, timeout => $self->timeout );
104             }
105 0           return;
106             }
107              
108             1;
109              
110             =head1 NAME
111              
112             IPC::Lock::RabbitMQ - Simple and reliable scoped locking for coarse grained locks.
113              
114             =head1 SYNOPSIS
115              
116             my $locker1 = IPC::Lock::RabbitMQ->new( mq => $rabbitfoot );
117             my $locker2 = IPC::Lock::RabbitMQ->new( mq => $rabbitfoot );
118              
119             {
120             my $lock = $locker1->lock("foo");
121             my $false = $locker2->lock("foo");
122             }
123             # $lock out of scope here, i.e.
124             # $lock = undef;
125              
126             my $new_lock = $locker2->lock("foo");
127             $new_lock->unlock;
128              
129             =head1 DESCRIPTION
130              
131             This module uses RabbitMQ to provide locking for coarse grained locks. The idea being
132             that you want to take a lock to stop duplicate jobs doing the same work you are doing.
133              
134             The lock taken whilst your job is running can last quite a while, and you don't
135             want your lock to be broken by another process if you're still working. Equally well,
136             if you crash, you want the lock to be freed so that another process can retry the job.
137              
138             =head1 METHODS
139              
140             =head2 new
141              
142             Constructs a lock manager object. Supply it with the C<mq> parameter which contains either
143             an instance of L<AnyEvent::RabbitMQ> or L<Net::RabbitFoot>
144              
145             =head2 lock ($key)
146              
147             Take a lock named with a specified key. Returns false if the lock is already held, returns
148             a L<IPC::Lock::RabbitMQ::Lock> object if the lock was successful.
149              
150             The lock is unlocked either by latting the L<IPC::Lock::RabbitMQ::Lock> object
151             go out of scope, or by explicitly calling the unlock method on it.
152              
153             =head1 AUTHOR
154              
155             Tomas Doran (t0m) C<< <bobtfish@bobtfish.net> >>.
156              
157             =head1 COPYRIGHT & LICENSE
158              
159             Copyright 2011 the above author(s).
160              
161             This sofware is free software, and is licensed under the same terms as perl itself.
162              
163             =cut
164