File Coverage

blib/lib/Cluster/Init/Util.pm
Criterion Covered Total %
statement 22 24 91.6
branch n/a
condition n/a
subroutine 8 8 100.0
pod n/a
total 30 32 93.7


line stmt bran cond sub pod time code
1             package Cluster::Init::Util;
2 7     7   45 use strict;
  7         15  
  7         245  
3 7     7   36 use warnings;
  7         16  
  7         227  
4 7     7   35 use Data::Dump qw(dump);
  7         11  
  7         388  
5 7     7   47 use Carp;
  7         23  
  7         507  
6 7     7   40 use Carp::Assert;
  7         13  
  7         45  
7             # use Storable qw(dclone);
8 7     7   5915 use Event qw(loop unloop unloop_all all_watchers sweep);
  7         105251  
  7         60  
9 7     7   1582 use Event;
  7         16  
  7         30  
10 7     7   17390 use Event::Stats;
  0            
  0            
11             use Time::HiRes qw(time);
12             require Exporter;
13             our @ISA = qw(Exporter);
14             our @EXPORT_OK = qw(&debug &run NOOP);
15              
16             $Event::DIED = sub {
17             Event::verbose_exception_handler(@_);
18             Event::unloop_all(0);
19             };
20              
21             use constant NOOP => 0;
22              
23             sub debug
24             {
25             my $debug = $ENV{DEBUG} || 0;
26             return unless $debug;
27             my ($package, $filename, $line, $subroutine, $hasargs, $wantarray, $evaltext, $is_require, $hints, $bitmask) = caller(1);
28             my $subline = (caller(0))[2];
29             my $msg = join(' ',@_);
30             $msg.="\n" unless $msg =~ /\n$/;
31             warn time()." $$ $subroutine,$subline: $msg" if $debug;
32             if ($debug > 1)
33             {
34             warn _stacktrace();
35             }
36             if ($debug > 2)
37             {
38             Event::Stats::collect(1);
39             warn sprintf("%d\n%-35s %3s %10s %4s %4s %4s %4s %7s\n", time,
40             "DESC", "PRI", "CBTIME", "PEND", "CARS", "RAN", "DIED", "ELAPSED");
41             for my $w (reverse all_watchers())
42             {
43             my @pending = $w->pending();
44             my $pending = @pending;
45             my $cars=sprintf("%01d%01d%01d%01d",
46             $w->is_cancelled,$w->is_active,$w->is_running,$w->is_suspended);
47             my ($ran,$died,$elapsed) = $w->stats(60);
48             warn sprintf("%-35s %3d %10d %4d %4s %4d %4d %7.3f\n",
49             $w->desc,
50             $w->prio,
51             $w->cbtime,
52             $pending,
53             $cars,
54             $ran,
55             $died,
56             $elapsed);
57             }
58             }
59             }
60              
61             sub _stacktrace
62             {
63             my $out="";
64             for (my $i=1;;$i++)
65             {
66             my @frame = caller($i);
67             last unless @frame;
68             $out .= "$frame[3] $frame[1] line $frame[2]\n";
69             }
70             return $out;
71             }
72              
73             sub dq
74             {
75             my $self=shift;
76             my $e=shift;
77             unless (ref $e->w)
78             {
79             debug "skipping $e -- no watcher";
80             return 0;
81             }
82             my $data=$e->w->data || {};
83             # warn dump $data;
84             my $event=$data->{_dfa_event};
85             my $desc= $e->w->desc;
86             debug "$desc: isactive: ". $e->w->is_active;
87             $self->killwatcher($e->w) unless $e->w->is_active;
88             # delete $data->{_dfa_event};
89             # $self->history($event,$data);
90             unless ($event)
91             {
92             # my $debug=$ENV{DEBUG};
93             # $ENV{DEBUG}=3;
94             debug "ouch -- somehow there's no _dfa_event in \$data:\n"
95             .(dump $data)."\n"
96             .(dump $self)."\n"
97             .(dump $e)."\n"
98             ;
99             # $ENV{DEBUG}=$debug;
100             return 0;
101             }
102             debug "$desc: calling tick($event,$data)";
103             $self->tick($event,$data);
104             }
105              
106             sub event
107             {
108             my $self=shift;
109             my $event=shift;
110             debug "queue event $event";
111             my $data=shift || {};
112             $self->timer($event,{at=>time},$data);
113             }
114              
115             sub watcher
116             {
117             my $self=shift;
118             my $type=shift;
119             my $event=shift;
120             debug "create $type $event";
121             my $parm=shift || {};
122             my $olddata=shift || {};
123             my $class=ref($self);
124             # make a copy so it doesn't go 'round and 'round
125             my $data = _copy($olddata);
126             # $data = eval(dump($data));
127             my $desc = "$self $type $event";
128             unless ($event)
129             {
130             my $debug=$ENV{DEBUG};
131             $ENV{DEBUG}=3;
132             debug "oooh -- $type has no event".(dump $self);
133             $ENV{DEBUG}=$debug;
134             return 0;
135             }
136             $data->{_dfa_event}=$event;
137             $parm->{desc}=$desc;
138             $parm->{cb}=[$self,'dq'];
139             $parm->{data}=$data;
140             # debug $type, $event, $data;
141             my $w = Event->$type(%$parm);
142             # warn $w;
143             $self->watchers($w);
144             return $w;
145             }
146              
147             # deep copy, but pass blessed and other complex refs through unchanged
148             sub _copy
149             {
150             my $in=shift;
151             my $ref=ref $in;
152             return $in unless $ref;
153             $ref eq "SCALAR" && do {my $out; $$out=$$in; return $out};
154             $ref eq "ARRAY" && do
155             {
156             my @out = map {_copy($_)} @$in;
157             return \@out;
158             };
159             $ref eq "HASH" && do
160             {
161             my %out;
162             while (my ($key,$val) = each %$in)
163             {
164             $out{$key}=_copy($val);
165             }
166             return \%out;
167             };
168             return $in;
169             }
170              
171             sub watchers
172             {
173             my $self=shift;
174             my $w=shift;
175             if ($w)
176             {
177             affirm { ref $w };
178             push @{$self->{watchers}}, $w;
179             }
180             my $out="watchers:\n";
181             for my $x (@{$self->{watchers}})
182             {
183             next unless ref $x;
184             $out.="\t".$x->desc."\n";
185             }
186             # warn $out;
187             return @{$self->{watchers}};
188             }
189              
190             sub killwatcher
191             {
192             my $self=shift;
193             my $w=shift;
194             if (ref $w)
195             {
196             debug "killwatcher ".$w->desc;
197             # let it finish any pending requests -- primarily catching CHLD
198             # sweep() while $w->pending;
199             $w->cancel;
200             my @watchers = grep {$_ && $_!=$w} $self->watchers;
201             $self->{watchers}=\@watchers;
202             }
203             return $self->watchers;
204             }
205              
206             sub idle { shift->watcher('idle', @_) }
207             sub timer { shift->watcher('timer', @_) }
208             sub io { shift->watcher('io', @_) }
209             sub var { shift->watcher('var', @_) }
210             sub sigevent { shift->watcher('signal',@_) }
211              
212             sub fields
213             {
214             my $self=shift;
215             my $class = ref $self;
216             affirm { $class };
217             my @field=@_;
218             for my $field (@field)
219             {
220             next if $self->can($field);
221             my $var = $class."::".$field;
222             debug "$var";
223             no strict 'refs';
224             *$field = sub
225             {
226             my $self=shift;
227             my $val=shift;
228             $self->{$var}=$val if defined $val;
229             return $self->{$var};
230             };
231             }
232             }
233              
234             sub transit
235             {
236             my ($self,$oldstate,$newstate,$action,@arg)=@_;
237             my $class = ref $self;
238             my $tag = $self->{tag} || "";
239             debug "$class: $tag: newstate=>'$newstate', action=>'$action'\n";
240             $self->{status}->newstate($self,$self->{name},$self->{level},$newstate)
241             if $self->{status} && $self->{name} && $self->{level};
242             if ($action)
243             {
244             my $method=lc($action);
245             my $code='$self->'.$method.'(@arg)';
246             unless ($self->can($method))
247             {
248             warn "$code not implemented\n";
249             return undef;
250             }
251             else
252             {
253             my ($event,@res) = eval ($code);
254             unless(defined $event)
255             {
256             die "$class: '$code' died: $@\n";
257             }
258             debug "$class: '$code' returned '$event'\n";
259             $self->event($event,@res) if $event; # =~ /^[A-Z]+[A-Z0-9]+$/;
260             }
261             }
262             # $self->timer("foo",{at=>time});
263             # $DB::single=1 if $newstate eq "DONE";
264             # `strace -o /tmp/t1 -p $$` if $newstate eq "DONE";
265             }
266              
267             sub run
268             {
269             my $seconds=shift;
270             Event->timer(at=>time() + $seconds,cb=>sub{unloop()});
271             loop();
272             }
273              
274             sub destruct
275             {
276             my $self=shift;
277             my $debug="destruct ";
278             $debug.= $self->{tag} || $self;
279             $debug.=" ";
280             $debug.= $self->{name} || " ";
281             $debug.=" ";
282             $debug.= $self->{pid} || " ";
283             debug $debug;
284             if ($self->{pid})
285             {
286             debug "killing ".$self->{pid};
287             kill(-9, $self->{pid});
288             kill(9, $self->{pid});
289             # the following line is dangerous -- could hang on hung umount
290             # requests etc.
291             waitpid($self->{pid},0);
292             }
293             for my $w ($self->watchers)
294             {
295             $self->killwatcher($w);
296             }
297             $self->{status}->remove($self,$self->{name})
298             if $self->{status} && $self->{name};
299             return 1;
300             }
301              
302             sub DESTROY
303             {
304             my $self=shift;
305             $self->destruct;
306             }
307              
308             1;