File Coverage

lib/Net/ZooIt.pm
Criterion Covered Total %
statement 22 24 91.6
branch n/a
condition n/a
subroutine 8 8 100.0
pod n/a
total 30 32 93.7


line stmt bran cond sub pod time code
1             package Net::ZooIt;
2              
3 5     5   34086 use strict;
  5         13  
  5         159  
4 5     5   34 use warnings;
  5         12  
  5         269  
5              
6             our $VERSION = '0.20';
7              
8 5     5   2507 use Sys::Hostname qw(hostname);
  5         5784  
  5         332  
9 5     5   42 use Carp qw(croak);
  5         14  
  5         522  
10 5     5   2016 use POSIX qw(strftime);
  5         30011  
  5         38  
11 5     5   11103 use Time::HiRes qw(time);
  5         7446  
  5         25  
12 5     5   988 use feature ':5.10';
  5         15  
  5         644  
13              
14 5     5   5603 use Net::ZooKeeper qw(:all);
  0            
  0            
15              
16             use base qw(Exporter);
17             our @EXPORT = qw(ZOOIT_DIE ZOOIT_ERR ZOOIT_WARN ZOOIT_INFO ZOOIT_DEBUG);
18              
19             # Logging
20             sub ZOOIT_DIE { 0 }
21             sub ZOOIT_ERR { 1 }
22             sub ZOOIT_WARN { 2 }
23             sub ZOOIT_INFO { 3 }
24             sub ZOOIT_DEBUG { 4 }
25             my @log_levels = qw(ZOOIT_DIE ZOOIT_ERR ZOOIT_WARN ZOOIT_INFO ZOOIT_DEBUG);
26             my $log_level = 1;
27              
28             sub set_log_level {
29             my $level = shift;
30             return unless $level =~ /^\d+$/;
31             $log_level = $level;
32             }
33              
34             sub logger {
35             my ($level, $msg) = @_;
36             return unless $level =~ /^\d+$/;
37             return if $level > $log_level;
38             $msg =~ s/\n$//;
39             my $prefix = strftime '%Y-%m-%dT%H:%M:%SZ', gmtime;
40             $prefix .= " $$";
41             $prefix .= " $log_levels[$level]";
42             print STDERR "$prefix $msg\n";
43             croak $msg if $level == 0;
44             }
45              
46             sub zdie { logger ZOOIT_DIE, @_ }
47             sub zerr { logger ZOOIT_ERR, @_ }
48             sub zwarn { logger ZOOIT_WARN, @_ }
49             sub zinfo { logger ZOOIT_INFO, @_ }
50             sub zdebug { logger ZOOIT_DEBUG, @_ }
51              
52             sub zerr2txt {
53             my $err = shift;
54             our %code2name;
55             unless (%code2name) {
56             foreach my $name (@{$Net::ZooKeeper::EXPORT_TAGS{errors}}) {
57             no strict "refs";
58             my $code = &$name;
59             use strict "refs";
60             $code2name{$code} = $name;
61             }
62             }
63             return $code2name{$err};
64             }
65              
66             # Generate and split sequential znode names
67             sub gen_seq_name { hostname . ".PID.$$-" }
68             sub split_seq_name { shift =~ /^(.+-)(\d+)$/; $1, $2 }
69              
70             # ZooKeeper recipe Lock
71             sub new_lock {
72             my $class = shift;
73             zerr "lock will be released immediately, new_lock called in void context"
74             unless defined wantarray;
75             my %p = @_;
76             zdie "Param zk must be a connect Net::ZooKeeper object"
77             unless ref $p{zk};
78             zdie "Param path must be a valid ZooKeeper znode path"
79             unless $p{path} =~ m|^/.+|;
80              
81             my $lockname = gen_seq_name;
82             my $lock = $p{zk}->create(
83             "$p{path}/$lockname" => 1,
84             flags => ZOO_EPHEMERAL|ZOO_SEQUENCE,
85             acl => ZOO_OPEN_ACL_UNSAFE,
86             );
87             unless ($lock) {
88             zerr "Could not create $p{path}/$lockname: " . zerr2txt($p{zk}->get_error);
89             return;
90             }
91             zinfo "Created lock $lock";
92             # Create the blessed object now, for auto-deletion if next operations fail
93             my $res = bless { lock => $lock, zk => $p{zk} }, $class;
94             my $t0 = time;
95              
96             my ($basename, $n) = split_seq_name $res->{lock};
97             while (1) {
98             _gc($p{zk});
99              
100             my @locks = $p{zk}->get_children($p{path});
101             my $err = $p{zk}->get_error;
102             if ($err ne ZOK) {
103             zerr "Could not get lock list: " . zerr2txt($err);
104             return;
105             }
106             zdebug "Get lock list: @locks";
107             # Look for other lock with highest sequence number lower than mine
108             my ($lock_prev, $n_prev);
109             foreach (@locks) {
110             my ($basename_i, $n_i) = split_seq_name $_;
111             next if $n_i >= $n;
112             if (!defined $n_prev || $n_i > $n_prev) {
113             $n_prev = $n_i;
114             $lock_prev = $_;
115             }
116             }
117             # If none found, the lock is mine
118             unless (defined $n_prev) {
119             zinfo "Take lock: $res->{lock}";
120             return $res;
121             }
122             # I can't take lock, abort if timeout reached
123             my $dt;
124             if (defined $p{timeout}) {
125             $dt = $t0 + $p{timeout} - time;
126             if ($dt <= 0) {
127             zinfo "Timeout reached, abort";
128             return;
129             }
130             }
131             # Wait for lock with highest seq number lower than mine to be deleted
132             $dt //= 60;
133             $dt *= 1000;
134             my $w = $p{zk}->watch(timeout => $dt);
135             $w->wait if $p{zk}->exists("$p{path}/$lock_prev", watch => $w);
136             }
137             }
138              
139             # ZooKeeper recipe Queue
140             sub new_queue {
141             my $class = shift;
142             my %p = @_;
143             zdie "Param zk must be a connect Net::ZooKeeper object"
144             unless ref $p{zk};
145             zdie "Param path must be a valid ZooKeeper znode path"
146             unless $p{path} =~ m|^/.+|;
147              
148             return bless { queue => $p{path}, zk => $p{zk} }, $class;
149             }
150              
151             sub put_queue {
152             my ($self, $data) = @_;
153             my $itemname = gen_seq_name;
154             my $item = $self->{zk}->create(
155             "$self->{queue}/$itemname" => $data,
156             flags => ZOO_SEQUENCE,
157             acl => ZOO_OPEN_ACL_UNSAFE,
158             );
159             unless ($item) {
160             zerr "Could not create $self->{queue}/$itemname: " . zerr2txt($self->{zk}->get_error);
161             return;
162             }
163             zinfo "Created queue item $item";
164             return 1;
165             }
166              
167             sub get_queue {
168             my $self = shift;
169             my %p = @_;
170             my $t0 = time;
171             while (1) {
172             _gc($self->{zk});
173              
174             my @items = $self->{zk}->get_children($self->{queue});
175             my $err = $self->{zk}->get_error;
176             if ($err ne ZOK) {
177             zerr "Could not get queue items: " . zerr2txt($err);
178             return;
179             }
180             zdebug "Get queue items: @items";
181             # Look for queue item with lowest seq number
182             my ($item_min, $n_min);
183             foreach (@items) {
184             my ($item_i, $n_i) = split_seq_name $_;
185             if (!defined $n_min || $n_i < $n_min) {
186             $n_min = $n_i;
187             $item_min = $_;
188             }
189             }
190             # If queue empty, wait for get_children, max timeout [s]
191             unless (defined $n_min) {
192             my $dt;
193             if (defined $p{timeout}) {
194             $dt = $t0 + $p{timeout} - time;
195             if ($dt <= 0) {
196             zinfo "Timeout reached, abort";
197             return;
198             }
199             }
200             $dt //= 60;
201             $dt *= 1000;
202             my $w = $self->{zk}->watch(timeout => $dt);
203             $w->wait unless $self->{zk}->get_children("$self->{queue}", watch => $w);
204             next;
205             }
206             # Get data, attempt to delete znode with lowest seq number
207             zinfo "Attempt to get/delete $item_min";
208             my $data = $self->{zk}->get("$self->{queue}/$item_min");
209             $err = $self->{zk}->get_error;
210             if ($err ne ZOK) {
211             zerr "Could not get item data: " . zerr2txt($err);
212             next;
213             }
214             if ($self->{zk}->delete("$self->{queue}/$item_min")) {
215             return $data;
216             }
217             $err = $self->{zk}->get_error;
218             if ($err ne ZNONODE) {
219             zerr "Error deleting queue item: " . zerr2txt($err);
220             return;
221             } else {
222             zinfo "Someone else deleted $item_min";
223             }
224             }
225             }
226              
227             # Automatic deletion of znodes when ZooIt objects go out of scope
228             # Garbage collection for znodes deleted during ZCONNECTIONLOSS
229             my @garbage;
230              
231             sub DESTROY {
232             my $self = shift;
233             if ($self->{lock}) {
234             zinfo "DESTROY deleting lock: $self->{lock}";
235             $self->{zk}->delete($self->{lock});
236             my $err = zerr2txt($self->{zk}->get_error);
237             if ($err ne 'ZOK') {
238             push @garbage, $self->{lock};
239             zerr "Could not delete $self->{lock}: $err";
240             }
241             delete $self->{lock};
242             }
243             }
244              
245             sub _gc {
246             my $zk = shift;
247             while (my $znode = shift @garbage) {
248             zinfo "_gc deleting $znode";
249             $zk->delete($znode);
250             my $err = zerr2txt($zk->get_error);
251             zdebug " $err";
252             if ($err eq 'ZOK' || $err eq 'ZNONODE') {
253             zinfo "$znode deleted by _gc";
254             } else {
255             zerr "$znode could not be deleted by _gc: $err";
256             unshift @garbage, $znode;
257             last;
258             }
259             }
260             }
261              
262             1;
263              
264             __END__