File Coverage

blib/lib/Net/ZooKeeper/Semaphore.pm
Criterion Covered Total %
statement 10 12 83.3
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 14 16 87.5


line stmt bran cond sub pod time code
1             package Net::ZooKeeper::Semaphore;
2              
3             # ABSTRACT: Distributed semaphores via Apache ZooKeeper
4              
5             =head1 NAME
6              
7             Net::ZooKeeper::Semaphore
8              
9             =head1 DESCRIPTION
10              
11             Distributed semaphores via Apache ZooKeeper
12              
13             =head1 SYNOPSIS
14              
15             my $fqdn = Sys::Hostname::FQDN::fqdn();
16             my $zkh = Net::ZooKeeper->new(...);
17              
18             my $cpu_semaphore = Net::ZooKeeper::Semaphore->new(
19             count => 1,
20             path => "/semaphores/${fqdn}_cpu",
21             total => Sys::CPU::cpu_count(),
22             zkh => $zkh,
23             );
24              
25             my %mem_info = Linux::MemInfo::get_mem_info();
26             my $mem_semaphore = Net::ZooKeeper::Semaphore->new(
27             count => 4E6, # 4GB
28             data => $$,
29             path => "/semaphores/${fqdn}_mem",
30             total => $mem_info{MemTotal},
31             zkh => $zkh,
32             );
33              
34             undef $cpu_semaphore; # to delete lease
35              
36             =cut
37              
38 1     1   1514 use strict;
  1         2  
  1         34  
39 1     1   5 use warnings;
  1         1  
  1         44  
40              
41             our $VERSION = 0.02;
42              
43 1     1   15 use Carp;
  1         2  
  1         82  
44 1     1   480 use Net::ZooKeeper qw/:acls :node_flags/;
  0            
  0            
45             use Net::ZooKeeper::Lock;
46             use Params::Validate qw/:all/;
47              
48             =head1 METHODS
49              
50             =head2 new(%options)
51              
52             Object creation doesn't block.
53             Undef is returned if it isn't possible to acquire a lease.
54             An exception is raised on any ZooKeeper errors.
55             A lease is held as long as the object lives.
56              
57             Parameters:
58              
59             =item count
60              
61             Resource amount to be leased.
62             Must be an integer (negative values are to be added to total).
63              
64             =item data
65              
66             Optional. Data for lease znode.
67             Must be a string, default is '0'.
68              
69             =item path
70              
71             Path in ZooKeeper that identifies the semaphore.
72             If it doesn't exist, it will be created.
73             Also path/lock and path/leases will be created.
74              
75             =item total
76              
77             Total amount of available resource.
78             If there are any active leases for the given path that were created with a
79             different total, an exception will be raised.
80              
81             =item zkh
82              
83             Net::ZooKeeper handle object
84              
85             =cut
86              
87             sub new {
88             my $class = shift;
89             my $self = validate(@_, {
90             count => {type => SCALAR, regex => qr/^-?\d+$/o},
91             data => {default => '0'},
92             path => {type => SCALAR},
93             total => {type => SCALAR, regex => qr/^\d+$/o},
94             zkh => {isa => "Net::ZooKeeper"},
95             });
96             $self->{path} =~ s#/$##g;
97              
98             bless $self, $class;
99              
100             if ($self->_acquire) {
101             return $self;
102             } else {
103             return undef;
104             }
105             }
106              
107              
108             sub DESTROY {
109             my $self = shift;
110              
111             if ($self->{lease_path}) {
112             $self->{zkh}->delete($self->{lease_path});
113             }
114             }
115              
116              
117             sub _acquire {
118             my $self = shift;
119              
120             my $lock = Net::ZooKeeper::Lock->new(
121             data => $self->{data},
122             lock_name => "acquire",
123             lock_prefix => "$self->{path}/lock",
124             zkh => $self->{zkh},
125             );
126             if ($lock) {
127             my $leases_path = "$self->{path}/leases";
128             unless ($self->{zkh}->exists($leases_path)) {
129             $self->_create_path($leases_path);
130             }
131             my @leases = $self->{zkh}->get_children($leases_path);
132             my $sum = 0;
133             for my $lease (@leases) {
134             my ($count, $total, undef) = split /_/, $lease, 3; # count_total_seq
135             if ($total != $self->{total}) {
136             croak "Totals mismatch: $leases_path/$lease $self->{total}";
137             }
138             $sum += $count;
139             }
140             if ($sum + $self->{count} <= $self->{total}) {
141             my $lease_tmpl = "$leases_path/$self->{count}_$self->{total}_";
142             $self->{lease_path} = $self->{zkh}->create($lease_tmpl, $self->{data},
143             acl => ZOO_OPEN_ACL_UNSAFE,
144             flags => (ZOO_EPHEMERAL | ZOO_SEQUENCE),
145             ) or croak "unable to create sequence znode $lease_tmpl: ".$self->{zkh}->get_error;
146             $lock->unlock;
147             return 1;
148             }
149             }
150             return 0;
151             }
152              
153              
154             sub _create_path {
155             my ($self, $path) = @_;
156              
157             my $current_index = 1;
158             while ($current_index > 0) {
159             $current_index = index($path, "/", $current_index + 1);
160             my $current_path;
161             if ($current_index > 0) {
162             $current_path = substr($path, 0, $current_index);
163             } else {
164             $current_path = $path;
165             }
166              
167             if (!$self->{zkh}->exists($current_path)) {
168             $self->{zkh}->create($current_path, '0',
169             acl => ZOO_OPEN_ACL_UNSAFE,
170             );
171             }
172             }
173             }
174              
175             1;
176              
177             __END__