File Coverage

lib/Net/ZooIt.pm
Criterion Covered Total %
statement 22 24 91.6
branch n/a
condition n/a
subroutine 8 8 100.0
pod n/a
total 30 32 93.7


line stmt bran cond sub pod time code
1             package Net::ZooIt;
2              
3 5     5   28855 use strict;
  5         13  
  5         147  
4 5     5   27 use warnings;
  5         11  
  5         237  
5              
6             our $VERSION = '0.22';
7              
8 5     5   2335 use Sys::Hostname qw(hostname);
  5         5215  
  5         339  
9 5     5   40 use Carp qw(croak);
  5         12  
  5         268  
10 5     5   2164 use POSIX qw(strftime);
  5         29885  
  5         31  
11 5     5   10116 use Time::HiRes qw(time);
  5         7502  
  5         28  
12 5     5   1013 use feature ':5.10';
  5         13  
  5         633  
13              
14 5     5   5187 use Net::ZooKeeper qw(:all);
  0            
  0            
15              
16             use base qw(Exporter);
17             our @EXPORT = qw(ZOOIT_DIE ZOOIT_ERR ZOOIT_WARN ZOOIT_INFO ZOOIT_DEBUG);
18              
19             # Logging
20             sub ZOOIT_DIE { 0 }
21             sub ZOOIT_ERR { 1 }
22             sub ZOOIT_WARN { 2 }
23             sub ZOOIT_INFO { 3 }
24             sub ZOOIT_DEBUG { 4 }
25             my @log_levels = qw(ZOOIT_DIE ZOOIT_ERR ZOOIT_WARN ZOOIT_INFO ZOOIT_DEBUG);
26             my $log_level = 1;
27              
28             sub set_log_level {
29             my $level = shift;
30             return unless $level =~ /^\d+$/;
31             $log_level = $level;
32             }
33              
34             sub logger {
35             my ($level, $msg) = @_;
36             return unless $level =~ /^\d+$/;
37             return if $level > $log_level;
38             $msg =~ s/\n$//;
39             my $prefix = strftime '%Y-%m-%dT%H:%M:%SZ', gmtime;
40             $prefix .= " $$";
41             $prefix .= " $log_levels[$level]";
42             print STDERR "$prefix $msg\n";
43             croak $msg if $level == 0;
44             }
45              
46             sub zdie { logger ZOOIT_DIE, @_ }
47             sub zerr { logger ZOOIT_ERR, @_ }
48             sub zwarn { logger ZOOIT_WARN, @_ }
49             sub zinfo { logger ZOOIT_INFO, @_ }
50             sub zdebug { logger ZOOIT_DEBUG, @_ }
51              
52             sub z2txt {
53             my ($type, $val) = @_;
54             our %tv2txt;
55             unless (%tv2txt) {
56             foreach my $t (qw(errors events node_flags states)) {
57             foreach my $name (@{$Net::ZooKeeper::EXPORT_TAGS{$t}}) {
58             no strict "refs";
59             my $code = &$name();
60             use strict "refs";
61             defined $code and $tv2txt{$t}->{$code} = $name;
62             }
63             }
64             }
65             my $t = {qw(
66             err errors error errors
67             ev events event events
68             st states state states
69             node node_flags flag node_flags
70             )}->{$type} // $type;
71             $tv2txt{$t} or zerr "z2txt: unknown type $type" and return;
72             my $txt = $tv2txt{$t}->{$val} or return;
73             return $txt;
74             }
75              
76             sub zerr2txt {
77             return z2txt 'errors', shift;
78             }
79              
80             # Generate and split sequential znode names
81             sub gen_seq_name { hostname . ".PID.$$-" }
82             sub split_seq_name { shift =~ /^(.+-)(\d+)$/; $1, $2 }
83              
84             # ZooKeeper recipe Lock
85             sub new_lock {
86             my $class = shift;
87             zerr "lock will be released immediately, new_lock called in void context"
88             unless defined wantarray;
89             my %p = @_;
90             zdie "Param zk must be a connect Net::ZooKeeper object"
91             unless ref $p{zk};
92             zdie "Param path must be a valid ZooKeeper znode path"
93             unless $p{path} =~ m|^/.+|;
94              
95             my $lockname = gen_seq_name;
96             my $lock = $p{zk}->create(
97             "$p{path}/$lockname" => 1,
98             flags => ZOO_EPHEMERAL|ZOO_SEQUENCE,
99             acl => ZOO_OPEN_ACL_UNSAFE,
100             );
101             unless ($lock) {
102             zerr "Could not create $p{path}/$lockname: " . zerr2txt($p{zk}->get_error);
103             return;
104             }
105             zinfo "Created lock $lock";
106             # Create the blessed object now, for auto-deletion if next operations fail
107             my $res = bless { lock => $lock, zk => $p{zk} }, $class;
108             my $t0 = time;
109              
110             my ($basename, $n) = split_seq_name $res->{lock};
111             while (1) {
112             _gc($p{zk});
113              
114             my @locks = $p{zk}->get_children($p{path});
115             my $err = $p{zk}->get_error;
116             if ($err ne ZOK) {
117             zerr "Could not get lock list: " . zerr2txt($err);
118             return;
119             }
120             zdebug "Get lock list: @locks";
121             # Look for other lock with highest sequence number lower than mine
122             my ($lock_prev, $n_prev);
123             foreach (@locks) {
124             my ($basename_i, $n_i) = split_seq_name $_;
125             next if $n_i >= $n;
126             if (!defined $n_prev || $n_i > $n_prev) {
127             $n_prev = $n_i;
128             $lock_prev = $_;
129             }
130             }
131             # If none found, the lock is mine
132             unless (defined $n_prev) {
133             zinfo "Take lock: $res->{lock}";
134             return $res;
135             }
136             # I can't take lock, abort if timeout reached
137             my $dt;
138             if (defined $p{timeout}) {
139             $dt = $t0 + $p{timeout} - time;
140             if ($dt <= 0) {
141             zinfo "Timeout reached, abort";
142             return;
143             }
144             }
145             # Wait for lock with highest seq number lower than mine to be deleted
146             $dt //= 60;
147             $dt *= 1000;
148             my $w = $p{zk}->watch(timeout => $dt);
149             if ($p{zk}->exists("$p{path}/$lock_prev", watch => $w)) {
150             zinfo "Wait for delete $p{path}/$lock_prev";
151             $w->wait;
152             my $event = z2txt('ev', $w->{event}) // 'timeout';
153             zinfo "Wait for delete $p{path}/$lock_prev over: $event";
154             }
155             }
156             }
157              
158             # ZooKeeper recipe Queue
159             sub new_queue {
160             my $class = shift;
161             my %p = @_;
162             zdie "Param zk must be a connect Net::ZooKeeper object"
163             unless ref $p{zk};
164             zdie "Param path must be a valid ZooKeeper znode path"
165             unless $p{path} =~ m|^/.+|;
166              
167             return bless { queue => $p{path}, zk => $p{zk} }, $class;
168             }
169              
170             sub put_queue {
171             my ($self, $data) = @_;
172             my $itemname = gen_seq_name;
173             my $item = $self->{zk}->create(
174             "$self->{queue}/$itemname" => $data,
175             flags => ZOO_SEQUENCE,
176             acl => ZOO_OPEN_ACL_UNSAFE,
177             );
178             unless ($item) {
179             zerr "Could not create $self->{queue}/$itemname: " . zerr2txt($self->{zk}->get_error);
180             return;
181             }
182             zinfo "Created queue item $item";
183             return 1;
184             }
185              
186             sub get_queue {
187             my $self = shift;
188             my %p = @_;
189             my $t0 = time;
190             while (1) {
191             _gc($self->{zk});
192              
193             my @items = $self->{zk}->get_children($self->{queue});
194             my $err = $self->{zk}->get_error;
195             if ($err ne ZOK) {
196             zerr "Could not get queue items: " . zerr2txt($err);
197             return;
198             }
199             zdebug "Get queue items: @items";
200             # Look for queue item with lowest seq number
201             my ($item_min, $n_min);
202             foreach (@items) {
203             my ($item_i, $n_i) = split_seq_name $_;
204             if (!defined $n_min || $n_i < $n_min) {
205             $n_min = $n_i;
206             $item_min = $_;
207             }
208             }
209             # If queue empty, wait for get_children, max timeout [s]
210             unless (defined $n_min) {
211             my $dt;
212             if (defined $p{timeout}) {
213             $dt = $t0 + $p{timeout} - time;
214             if ($dt <= 0) {
215             zinfo "Timeout reached, abort";
216             return;
217             }
218             }
219             $dt //= 60;
220             $dt *= 1000;
221             my $w = $self->{zk}->watch(timeout => $dt);
222             unless ($self->{zk}->get_children("$self->{queue}", watch => $w)) {
223             zinfo "Wait for children in $self->{queue}";
224             $w->wait;
225             my $event = z2txt('ev', $w->{event}) // 'timeout';
226             zinfo "Wait for children in $self->{queue} over: $event";
227             }
228             next;
229             }
230             # Get data, attempt to delete znode with lowest seq number
231             zinfo "Attempt to get/delete $item_min";
232             my $data = $self->{zk}->get("$self->{queue}/$item_min");
233             $err = $self->{zk}->get_error;
234             if ($err ne ZOK) {
235             zerr "Could not get item data: " . zerr2txt($err);
236             next;
237             }
238             if ($self->{zk}->delete("$self->{queue}/$item_min")) {
239             return $data;
240             }
241             $err = $self->{zk}->get_error;
242             if ($err ne ZNONODE) {
243             zerr "Error deleting queue item: " . zerr2txt($err);
244             return;
245             } else {
246             zinfo "Someone else deleted $item_min";
247             }
248             }
249             }
250              
251             # Automatic deletion of znodes when ZooIt objects go out of scope
252             # Garbage collection for znodes deleted during ZCONNECTIONLOSS
253             my @garbage;
254              
255             sub DESTROY {
256             my $self = shift;
257             if ($self->{lock}) {
258             zinfo "DESTROY deleting lock: $self->{lock}";
259             $self->{zk}->delete($self->{lock});
260             my $err = zerr2txt($self->{zk}->get_error);
261             if ($err ne 'ZOK') {
262             push @garbage, $self->{lock};
263             zerr "Could not delete $self->{lock}: $err";
264             }
265             delete $self->{lock};
266             }
267             }
268              
269             sub _gc {
270             my $zk = shift;
271             while (my $znode = shift @garbage) {
272             zinfo "_gc deleting $znode";
273             $zk->delete($znode);
274             my $err = zerr2txt($zk->get_error);
275             zdebug " $err";
276             if ($err eq 'ZOK' || $err eq 'ZNONODE') {
277             zinfo "$znode deleted by _gc";
278             } else {
279             zerr "$znode could not be deleted by _gc: $err";
280             unshift @garbage, $znode;
281             last;
282             }
283             }
284             }
285              
286             1;
287              
288             __END__