File Coverage

blib/lib/App/Switchman.pm
Criterion Covered Total %
statement 31 33 93.9
branch n/a
condition n/a
subroutine 11 11 100.0
pod n/a
total 42 44 95.4


line stmt bran cond sub pod time code
1             package App::Switchman;
2              
3             our $VERSION = '1.15';
4              
5             =head1 NAME
6              
7             App::Switchman
8              
9             =head1 DESCRIPTION
10              
11             switchman's internals
12              
13             =cut
14              
15 3     3   1252 use strict;
  3         5  
  3         79  
16 3     3   8 use warnings;
  3         3  
  3         68  
17              
18 3     3   10 use File::Basename qw(basename);
  3         4  
  3         161  
19 3     3   1771 use Getopt::Long qw(GetOptionsFromArray);
  3         22750  
  3         11  
20 3     3   908 use JSON;
  3         7743  
  3         11  
21 3     3   1348 use Linux::MemInfo;
  3         6330  
  3         134  
22 3     3   1322 use List::MoreUtils qw(part uniq);
  3         22303  
  3         14  
23 3     3   1317 use List::Util qw(max);
  3         3  
  3         198  
24 3     3   1277 use Log::Dispatch;
  3         24574  
  3         63  
25 3     3   451 use Moo;
  3         7367  
  3         16  
26 3     3   2703 use Net::ZooKeeper qw(:acls :errors :events :node_flags);
  0            
  0            
27             use Net::ZooKeeper::Semaphore;
28             use Pod::Usage;
29             use POSIX qw(strftime);
30             use Scalar::Util qw(blessed);
31             use Sys::CPU;
32             use Sys::Hostname::FQDN qw(fqdn);
33             use Sys::SigAction qw(set_sig_handler);
34              
35              
36             our $DEFAULT_CONFIG_PATH ||= "/etc/switchman.conf";
37             our $LOCKS_PATH ||= 'locks';
38             our $QUEUES_PATH ||= 'queues';
39             our $SEMAPHORES_PATH ||= 'semaphores';
40              
41              
42             has command => (is => 'ro', required => 1);
43             has data_read_len => (
44             is => 'ro',
45             isa => sub {die "bad data_read_len: $_[0]" if defined $_[0] && $_[0] !~ m{^[0-9]+$}},
46             default => sub {65535},
47             );
48             has do_get_lock => (is => 'ro', default => 1);
49             has group => (is => 'ro');
50             has leases => (is => 'ro');
51             has lock_path => (
52             is => 'ro',
53             lazy => 1,
54             builder => sub {join '/', $_[0]->prefix, $LOCKS_PATH, $_[0]->lockname},
55             );
56             has lock_watch => (
57             is => 'ro',
58             lazy => 1,
59             builder => sub {$_[0]->zkh->watch},
60             );
61             has lockname => (
62             is => 'ro',
63             isa => sub {
64             die "lockname is too long: $_[0]" if length($_[0]) > 512;
65             die "lockname must not contain '/'" if index($_[0], '/') != -1;
66             },
67             required => 1,
68             );
69             has log => (is => 'ro', lazy => 1, builder => 1);
70             has logfile => (is => 'ro');
71             has loglevel => (is => 'ro');
72             has prefix => (
73             is => 'ro',
74             isa => sub {die "bad prefix: $_[0]" unless $_[0] =~ m{^(?:/[^/]+)+$}},
75             required => 1,
76             );
77             has prefix_data => (is => 'rw');
78             has prefix_data_watch => (
79             is => 'ro',
80             lazy => 1,
81             builder => sub {$_[0]->zkh->watch},
82             );
83             has queue_positions => (
84             is => 'ro',
85             default => sub {+{}},
86             );
87             has resources_wait_timeout => (
88             is => 'ro',
89             isa => sub {die "bad resources_wait_timeout: $_[0]" if defined $_[0] && $_[0] !~ m{^[0-9]+$}},
90             default => sub {0},
91             );
92             has termination_timeout => (
93             is => 'ro',
94             isa => sub {die "bad termination_timeout: $_[0]" if defined $_[0] && $_[0] !~ m{^\d+$}},
95             default => sub {10},
96             );
97             has zkh => (
98             is => 'rw',
99             lazy => 1,
100             builder => sub {Net::ZooKeeper->new($_[0]->zkhosts)},
101             );
102             has zkhosts => (is => 'ro', required => 1);
103              
104              
105             sub BUILDARGS
106             {
107             my $class = shift;
108             my $arguments = shift;
109              
110             return $arguments if ref $arguments eq 'HASH';
111             die "Bad constructor arguments: hashref or arrayref expected" unless ref $arguments eq 'ARRAY';
112              
113             my %options = (do_get_lock => 1);
114             my $config_path;
115             my $leases = {};
116             GetOptionsFromArray(
117             $arguments,
118             'c|config=s' => \$config_path,
119             'g|group=s' => \$options{group},
120             'h|help' => \&usage,
121             'lease=s' => $leases,
122             'lockname=s' => \$options{lockname},
123             'v|version' => \&version,
124             'lock!' => \$options{do_get_lock},
125             ) or die "Couldn't parse options, see $0 -h for help\n";
126              
127             die "No command provided" unless @$arguments;
128             $options{lockname} ||= basename($arguments->[0]);
129             $options{command} = [@$arguments];
130              
131             $options{leases} = {};
132             for my $resource (keys %$leases) {
133             my ($count, $total) = split /:/, _process_resource_macro($leases->{$resource}), 2;
134             $options{leases}->{_process_resource_macro($resource)} = {
135             count => eval $count,
136             total => eval $total,
137             };
138             }
139              
140             if (!$config_path && -f $DEFAULT_CONFIG_PATH) {
141             $config_path = $DEFAULT_CONFIG_PATH;
142             }
143             die "$DEFAULT_CONFIG_PATH is absent and --config is missing, see $0 -h for help\n" unless $config_path;
144             my $config = _get_and_check_config($config_path);
145             for my $key (qw/data_read_len logfile loglevel prefix resources_wait_timeout termination_timeout zkhosts/) {
146             next unless exists $config->{$key};
147             $options{$key} = $config->{$key};
148             }
149              
150             return \%options;
151             }
152              
153              
154             sub _build_log
155             {
156             my $self = shift;
157              
158             return Log::Dispatch->new(
159             outputs => [
160             [
161             'Screen',
162             min_level => $ENV{DEBUG} ? 'debug' : 'warning',
163             stderr => 1,
164             newline => 1,
165             ],
166             $self->logfile ? [
167             'File',
168             min_level => $self->loglevel || 'info',
169             filename => $self->logfile,
170             mode => '>>',
171             newline => 1,
172             binmode => ':encoding(UTF-8)',
173             format => '[%d] [%p] %m at %F line %L%n',
174             ] : (),
175             ],
176             callbacks => sub {my %p = @_; return join "\t", strftime("%Y-%m-%d %H:%M:%S", localtime(time)), "[$$]", $p{message};},
177             );
178             }
179              
180              
181             =head1 METHODS
182              
183             =head2 acquire_semaphore
184              
185             Acquires semaphore for a given resource
186              
187             =cut
188              
189             sub acquire_semaphore
190             {
191             my $self = shift;
192             my $resource = shift;
193              
194             $self->log->debug(sprintf "Trying to acquire semaphore %s", $resource);
195              
196             return Net::ZooKeeper::Semaphore->new(
197             count => $self->leases->{$resource}->{count},
198             data => _node_data(),
199             path => $self->prefix."/$SEMAPHORES_PATH/$resource",
200             total => $self->leases->{$resource}->{total},
201             zkh => $self->zkh,
202             );
203             }
204              
205              
206             =head2 get_group_hosts
207              
208             Returns an arrayref of hosts included int the given group
209              
210             =cut
211              
212             sub get_group_hosts
213             {
214             my $self = shift;
215             my $groups = shift;
216             my $group = shift;
217             my $seen = shift || {$group => 1};
218              
219             my $items = $groups->{$group} or $self->_error(sprintf "Group <%s> is not described", $group);
220             $items = [$items] unless ref $items eq 'ARRAY';
221             my ($subgroups, $hosts) = part {exists $groups->{$_} ? 0 : 1} @$items;
222             for my $subgroup (@$subgroups) {
223             next if $seen->{$subgroup};
224             $seen->{$subgroup} = 1;
225             push @$hosts, @{$self->get_group_hosts($groups, $subgroup, $seen)};
226             }
227             return [uniq @$hosts];
228             }
229              
230              
231             =head2 get_lock
232              
233             Creates a named lock in ZooKeeper
234             Returns undef is lock already exists, otherwise returns true and sets lock_watch
235              
236             =cut
237              
238             sub get_lock
239             {
240             my $self = shift;
241              
242             my $lock_path = $self->zkh->create($self->lock_path, _node_data(),
243             acl => ZOO_OPEN_ACL_UNSAFE,
244             flags => ZOO_EPHEMERAL,
245             );
246             if (my $error = $self->zkh->get_error) {
247             if ($error == ZNODEEXISTS) {
248             return undef;
249             } else {
250             $self->_error(sprintf("Could not acquire lock %s: %s", $self->lockname, $self->zkh->str_error));
251             }
252             }
253             $self->log->debug(sprintf "Lock <%s> taken", $self->lock_path);
254             return $self->zkh->exists($lock_path, watch => $self->lock_watch);
255             }
256              
257              
258             =head2 get_queue_path
259              
260             Returns queue path for a given resource
261              
262             =cut
263              
264             sub get_queue_path
265             {
266             my $self = shift;
267             my $resource = shift;
268              
269             return $self->prefix."/$QUEUES_PATH/$resource";
270             }
271              
272              
273             =head2 get_resources
274              
275             Returns resource names listed in ZooKeeper
276             Macros are processed
277              
278             =cut
279              
280             sub get_resources
281             {
282             my $self = shift;
283              
284             $self->load_prefix_data;
285             return map {_process_resource_macro($_)} @{$self->prefix_data->{resources}};
286             }
287              
288              
289             =head2 is_group_serviced
290              
291             Determines if execution is allowed on the current host
292              
293             =cut
294              
295             sub is_group_serviced
296             {
297             my $self = shift;
298              
299             $self->load_prefix_data;
300             my $hosts = $self->get_group_hosts($self->prefix_data->{groups}, $self->group);
301             my $fqdn = fqdn();
302             my $is_serviced = grep {$fqdn eq $_} @$hosts;
303             return $is_serviced;
304             }
305              
306              
307             =head2 is_task_in_queue
308              
309             Checks if task is already queue up for a given resource
310              
311             =cut
312              
313             sub is_task_in_queue
314             {
315             my $self = shift;
316             my $resource = shift;
317              
318             my $re = quotemeta($self->lockname).'-\d+';
319             my $is_in_queue = scalar grep {$_ =~ /^$re$/} $self->zkh->get_children($self->get_queue_path($resource));
320             if ($self->zkh->get_error && $self->zkh->get_error != ZNONODE) {
321             $self->_error("Could not check queue for <$resource>: ".$self->zkh->str_error);
322             }
323             $self->log->debug(sprintf "Check if task <%s> already queued up for resource <%s>: %s", $self->lockname, $resource, ($is_in_queue ? 'true' : 'false'));
324             return $is_in_queue;
325             }
326              
327              
328             =head2 leave_queues
329              
330             Leaves all resource queues
331              
332             =cut
333              
334             sub leave_queues
335             {
336             my $self = shift;
337              
338             for my $resource (keys %{$self->queue_positions}) {
339             my $position = $self->queue_positions->{$resource};
340             $self->log->debug(sprintf "Delete from queue %s", $position);
341             $self->zkh->delete($position);
342             if ($self->zkh->get_error) {
343             $self->_error("Could not delete <$position>: ".$self->zkh->str_error);
344             }
345             delete $self->queue_positions->{$resource};
346             }
347             }
348              
349              
350             =head2 load_prefix_data
351              
352             Loads data from prefix znode
353             Sets prefix_data_watch
354              
355             =cut
356              
357             sub load_prefix_data
358             {
359             my $self = shift;
360              
361             my $json_data = $self->zkh->get($self->prefix, watch => $self->prefix_data_watch);
362             if ($self->zkh->get_error) {
363             $self->_error("Could not get data: ".$self->zkh->str_error);
364             }
365              
366             my (%data, $prefix_data);
367             if ($json_data) {
368             $prefix_data = eval {from_json($json_data)};
369             if (!$prefix_data || $@) {
370             $self->_error("Could not decode data: $@");
371             } elsif (ref $prefix_data ne 'HASH') {
372             $self->_error("Bad prefix data: hashref expected");
373             }
374             if ($prefix_data->{resources} && ref $prefix_data->{resources} ne 'ARRAY') {
375             $self->_error("Bad prefix data: resources should be an array");
376             }
377             if ($prefix_data->{groups} && ref $prefix_data->{groups} ne 'HASH') {
378             $self->_error("Bad prefix data: groups should be a hash");
379             }
380             }
381              
382             $data{resources} = $prefix_data->{resources} || [];
383             $data{groups} = $prefix_data->{groups} || {};
384              
385             $self->prefix_data(\%data);
386             }
387              
388              
389             =head2 prepare_zknodes
390              
391             Ensures existence of subnodes we use
392              
393             =cut
394              
395             sub prepare_zknodes
396             {
397             my $self = shift;
398             my $nodes = shift;
399              
400             for my $path (@$nodes) {
401             unless ($self->zkh->exists($path)) {
402             my $error = $self->zkh->get_error;
403             if ($error && $error != ZNONODE) {
404             $self->_error("Failed to check $path existence: ".$self->zkh->str_error);
405             }
406             $self->log->debug("Create $path");
407             $self->zkh->create($path, _node_data(),
408             acl => ZOO_OPEN_ACL_UNSAFE,
409             ) or $self->_error("Failed to prepare $path: ".$self->zkh->str_error);
410             }
411             }
412             }
413              
414              
415             =head2 queue_up
416              
417             Puts task in queue for resource
418             Returns queue item path
419              
420             =cut
421              
422             sub queue_up
423             {
424             my $self = shift;
425             my $resource = shift;
426              
427             my $queue_path = $self->get_queue_path($resource);
428             $self->prepare_zknodes([$queue_path]);
429             my $item_path = $self->zkh->create(sprintf("%s/%s-", $queue_path, $self->lockname), _node_data(),
430             acl => ZOO_OPEN_ACL_UNSAFE,
431             flags => (ZOO_EPHEMERAL | ZOO_SEQUENCE),
432             );
433             if ($self->zkh->get_error) {
434             $self->_error(sprintf("Could not push task <%s> in queue for <%s>: %s", $self->lockname, $resource, $self->zkh->str_error));
435             } else {
436             $self->log->debug(sprintf "Added task in queue for <%s>: <%s>", $resource, $item_path);
437             }
438             $self->queue_positions->{$resource} = $item_path;
439             return $item_path;
440             }
441              
442              
443             =head2 zk_connect
444              
445             Connect to zookeeper cluster
446              
447             =cut
448              
449             sub zk_connect
450             {
451             my $self = shift;
452              
453             # check connection and try and reconnect in case of a failure
454             for (1 .. 10) {
455             if (!$self->zkh) {
456             $self->log->debug("NetZooKeeper initialization failed");
457             } else {
458             $self->zkh->exists($self->prefix);
459             if (!$self->zkh->get_error || $self->zkh->get_error == ZNONODE) {
460             last;
461             }
462             }
463             $self->log->debug("Trying to reconnect");
464             $self->zkh(Net::ZooKeeper->new($self->zkhosts));
465             }
466              
467             if (!$self->zkh) {
468             $self->_error("Failed to connect to ZooKeeper");
469             }
470              
471             $self->zkh->{data_read_len} = $self->data_read_len;
472             }
473              
474              
475             =head2 run
476              
477             Application loop
478             Never returns
479              
480             =cut
481              
482             sub run
483             {
484             my $self = shift;
485             $self->zk_connect();
486              
487             $self->prepare_zknodes([$self->prefix, map {$self->prefix."/$_"} ($LOCKS_PATH, $QUEUES_PATH, $SEMAPHORES_PATH)]);
488              
489             if ($self->group && !$self->is_group_serviced) {
490             $self->log->debug(sprintf "Group <%s> is not serviced at the moment", $self->group);
491             exit;
492             }
493              
494             if ($self->do_get_lock && $self->zkh->exists($self->lock_path, watch => $self->lock_watch)) {
495             $self->log->info(sprintf "Lock %s already exists", $self->lock_path);
496             exit;
497             }
498              
499             my %known_resources = map {$_ => 1} $self->get_resources;
500             if (my @unknown_resources = grep {!exists $known_resources{$_}} keys %{$self->leases}) {
501             $self->_error("Unknown resources: ".join(', ', @unknown_resources));
502             }
503              
504             my $alarm_handler_guard;
505             if ($self->resources_wait_timeout) {
506             $alarm_handler_guard = set_sig_handler('ALRM', sub {
507             local *__ANON__ = 'timed_out_resources_waiting_handler';
508             $self->_error("Reached timeout while waiting for resources");
509             }, {safe => 0});
510             alarm($self->resources_wait_timeout);
511             }
512              
513             my @resources = grep {exists $self->leases->{$_}} $self->get_resources;
514             for my $resource (@resources) {
515             if ($self->is_task_in_queue($resource)) {
516             exit;
517             } else {
518             $self->queue_up($resource);
519             }
520             }
521              
522             my @semaphores = ();
523              
524             for my $resource (@resources) {
525             $self->wait_in_queue($resource);
526             # try to acquire a semaphore until success
527             while (1) {
528             if ($self->lock_watch->{state}) {
529             $self->log->info(sprintf "Lock watch received %s while waiting for $resource semaphore, we exit", $self->lock_watch->{event});
530             exit;
531             }
532             my $semaphore = $self->acquire_semaphore($resource);
533             if ($semaphore) {
534             $self->log->debug(sprintf "Semaphore <%s> acquired", $resource);
535             push @semaphores, $semaphore;
536             last;
537             }
538             sleep 1;
539             }
540             }
541              
542             $self->log->debug("All resources acquired");
543              
544             $self->leave_queues;
545             alarm(0);
546              
547             if ($self->do_get_lock && !$self->get_lock) {
548             $self->log->info(sprintf "Lock %s already exists", $self->lockname);
549             exit;
550             }
551              
552             # We want to exit right after our child dies
553             $SIG{CHLD} = sub {
554             my $pid = wait;
555             my $exit_code = $? >> 8;
556             $self->log->warn("Child $pid exited with $exit_code") if $exit_code;
557             # THE exit
558             exit $exit_code;
559             };
560              
561             my $CHILD;
562              
563             # If we suddenly die, we won't leave our child alone
564             # Otherwise the process will be active and not holding the lock
565             $SIG{__DIE__} = sub {
566             my $msg = shift;
567             chomp $msg;
568             if ($CHILD && kill 0 => $CHILD) {
569             $self->log->warn("Parent is terminating abnormally ($msg), killing child $CHILD");
570             kill 9 => $CHILD or $self->log->warn("Failed to KILL $CHILD");
571             }
572             };
573             $SIG{TERM} = $SIG{INT} = sub {
574             my $signame = shift;
575             warn "Parent received SIG$signame, terminating child $CHILD\n";
576             if (kill $signame => $CHILD) {
577             warn "Sent SIG$signame to $CHILD\n";
578             sleep 1; # wait for process cleanup
579             }
580             if (kill 0 => $CHILD) {
581             warn "Failed to $signame $CHILD\n";
582             } else {
583             exit;
584             }
585             };
586              
587             $CHILD = $self->run_command_in_background;
588              
589             while (1) {
590             if ($self->lock_watch->{state}) {
591             $self->log->warn("It's not secure to proceed, lock watch received ".$self->lock_watch->{event});
592             $self->_stop_child($CHILD);
593             last;
594             }
595             if ($self->group && $self->prefix_data_watch->{state}) {
596             unless ($self->is_group_serviced) {
597             $self->log->info(sprintf "Group <%s> is not serviced by the current host anymore", $self->group);
598             $self->_stop_child($CHILD);
599             last;
600             }
601             }
602             sleep 1;
603             }
604             }
605              
606              
607             =head2 run_command_in_background
608              
609             Execs command
610              
611             =cut
612              
613             sub run_command_in_background
614             {
615             my $self = shift;
616              
617             my $command = join(' ', @{$self->command});
618             $self->log->info("Executing <$command>");
619              
620             my $child = fork();
621             if (!defined $child) {
622             $self->_error("Could not fork");
623             } elsif (!$child) {
624             exec(@{$self->command}) or $self->_error("Failed to exec <$command>: $!");
625             } else {
626             return $child
627             }
628             }
629              
630              
631             =head2 usage
632              
633             Shows help and exits
634              
635             =cut
636              
637             sub usage
638             {
639             pod2usage(-exitval => 1, -verbose => 99, -sections => [qw(USAGE DESCRIPTION EXAMPLES), 'SEE ALSO', 'COPYRIGHT AND LICENSE']);
640             }
641              
642              
643             =head2 version
644              
645             Shows version info and exits
646              
647             =cut
648              
649             sub version
650             {
651             print "switchman $VERSION\n";
652             pod2usage(-exitval => 1, -verbose => 99, -sections => ['COPYRIGHT AND LICENSE']);
653             }
654              
655              
656             =head2 wait_in_queue
657              
658             Waits in queue for a given resource
659              
660             =cut
661              
662             sub wait_in_queue
663             {
664             my $self = shift;
665             my $resource = shift;
666              
667             my $queue_path = $self->prefix."/$QUEUES_PATH/$resource";
668             my $queue_position = $self->queue_positions->{$resource} or $self->_error("queue position for <$resource> is not initialized");
669             my ($position) = $queue_position =~ /-(\d+)$/;
670              
671             while (1) {
672             $self->log->debug(sprintf "Wait in queue cycle for %s", $queue_position);
673             my @items = $self->zkh->get_children($queue_path);
674             if ($self->zkh->get_error) {
675             $self->_error("Could not get items in queue $queue_path: ".$self->zkh->str_error);
676             }
677             my %positions;
678             for my $item (@items) {
679             if ($item =~ /-(\d+)$/) {
680             $positions{$1} = $item;
681             } else {
682             $self->_error("Unexpected item <$item> in queue $queue_path");
683             }
684             }
685              
686             if (!exists $positions{$position}) {
687             $self->log->debug(sprintf "Our position <%s> does not exists in queue. Queue items: %s.", $position, join(', ', @items));
688             $self->_error("Lost position <$position> in queue $queue_path");
689             }
690              
691             my @prior_pos = grep {$_ < $position} keys %positions;
692             last if !@prior_pos;
693              
694             my $neighbour = max @prior_pos;
695             my $neighbour_watch = $self->zkh->watch();
696             my $neighbour_exists = $self->zkh->exists("$queue_path/$positions{$neighbour}", watch => $neighbour_watch);
697             if (($self->zkh->get_error) && $self->zkh->get_error != ZNONODE) {
698             $self->_error("Could not check $positions{$neighbour} existence: ".$self->zkh->str_error);
699             }
700             if ($neighbour_exists) {
701             $self->log->debug(sprintf 'Wait for changing %s state (%d items before us)', $positions{$neighbour}, scalar(@prior_pos));
702             $neighbour_watch->wait;
703             }
704             }
705             $self->log->debug(sprintf "Waited %s", $queue_position);
706             }
707              
708              
709             sub _error
710             {
711             my $self = shift;
712             my $message = shift;
713              
714             @_ = ($self->log, level => 'critical', message => $message);
715             my $class = blessed $self->log;
716             no strict 'refs';
717             goto &{"${class}::log_and_croak"};
718             }
719              
720              
721             sub _get_and_check_config
722             {
723             my $config_path = shift;
724              
725             open my $config_file, '<:encoding(UTF-8)', $config_path or die "Failed to open <$config_path>";
726             my $config_json = do {local $/; <$config_file>};
727             close $config_file;
728             $config_json =~ s/(?:^\s*|\s*$)//gm;
729             my $config = from_json($config_json);
730             die "zkhosts is not defined in $config_path\n" unless $config->{zkhosts};
731             die "zk chroot is not supported in older versions, use prefix in $config_path\n" if $config->{zkhosts} =~ m!/\w+!;
732             die "prefix is not defined in $config_path\n" unless $config->{prefix};
733              
734             return $config;
735             }
736              
737              
738             sub _node_data
739             {
740             return fqdn()." $$";
741             }
742              
743              
744             sub _process_resource_macro
745             {
746             my $string = shift;
747              
748             my %mem_info = Linux::MemInfo::get_mem_info();
749             my %expand = (
750             CPU => Sys::CPU::cpu_count(),
751             FQDN => fqdn(),
752             MEMMB => int($mem_info{MemTotal} / 1024),
753             );
754             my $re = join '|', keys %expand;
755             $string =~ s/($re)/$expand{$1}/eg;
756             return $string;
757             }
758              
759              
760             sub _stop_child
761             {
762             my $self = shift;
763             my $pid = shift;
764              
765             kill TERM => $pid or die "Failed to TERM $pid";
766             # give some time to terminate gracefully
767             for (1 .. $self->termination_timeout) {
768             return unless kill 0 => $pid;
769             sleep 1;
770             }
771             # ran out of patience
772             kill KILL => $pid or die "Failed to KILL $pid";
773             }
774              
775             1;
776              
777             __END__