File Coverage

blib/lib/Helm.pm
Criterion Covered Total %
statement 7 9 77.7
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 10 12 83.3


line stmt bran cond sub pod time code
1             package Helm;
2 1     1   752 use strict;
  1         1  
  1         36  
3 1     1   5 use warnings;
  1         2  
  1         30  
4 1     1   467 use Moose;
  0            
  0            
5             use Moose::Util::TypeConstraints qw(enum);
6             use URI;
7             use namespace::autoclean;
8             use Try::Tiny;
9             use File::Spec::Functions qw(catdir catfile tmpdir devnull);
10             use File::HomeDir;
11             use Net::OpenSSH;
12             use Fcntl qw(:flock);
13             use File::Basename qw(basename);
14             use Helm::Log;
15             use Helm::Server;
16             use Scalar::Util qw(blessed);
17             use Parallel::ForkManager;
18             use DateTime;
19             use IO::File;
20              
21             our $VERSION = 0.4;
22             our $DEBUG = 0;
23             our $DEBUG_LOG;
24             our $DEBUG_LOG_PID;
25              
26             enum LOG_LEVEL => qw(debug info warn error);
27             enum LOCK_TYPE => qw(none local remote both);
28              
29             has task => (is => 'ro', writer => '_task', required => 1);
30             has user => (is => 'ro', writer => '_user', isa => 'Maybe[Str]');
31             has config_uri => (is => 'ro', writer => '_config_uri', isa => 'Maybe[Str]');
32             has config => (is => 'ro', writer => '_config', isa => 'Helm::Conf');
33             has lock_type => (is => 'ro', writer => '_lock_type', isa => 'LOCK_TYPE');
34             has sleep => (is => 'ro', writer => '_sleep', isa => 'Maybe[Num]');
35             has current_server => (is => 'ro', writer => '_current_server', isa => 'Helm::Server');
36             has current_ssh => (is => 'ro', writer => '_current_ssh', isa => 'Net::OpenSSH');
37             has log => (is => 'ro', writer => '_log', isa => 'Helm::Log');
38             has default_port => (is => 'ro', writer => '_port', isa => 'Maybe[Int]');
39             has timeout => (is => 'ro', writer => '_timeout', isa => 'Maybe[Int]');
40             has sudo => (is => 'rw', isa => 'Maybe[Str]', default => '');
41             has extra_options => (is => 'ro', isa => 'Maybe[HashRef]', default => sub { {} });
42             has extra_args => (is => 'ro', isa => 'Maybe[ArrayRef]', default => sub { [] });
43             has parallel => (is => 'ro', isa => 'Maybe[Bool]', default => 0);
44             has parallel_max => (is => 'ro', isa => 'Maybe[Int]', default => 100);
45             has continue_with_errors => (is => 'ro', isa => 'Maybe[Bool]', default => 0);
46             has all_configured_servers => (
47             is => 'ro',
48             writer => '_all_configured_servers',
49             isa => 'Maybe[Bool]',
50             default => 0,
51             );
52             has local_lock_handle => (
53             is => 'ro',
54             writer => '_local_lock_handle',
55             isa => 'Maybe[FileHandle]',
56             );
57             has servers => (
58             is => 'ro',
59             writer => '_servers',
60             isa => 'Maybe[ArrayRef]',
61             default => sub { [] },
62             );
63             has roles => (
64             is => 'ro',
65             writer => '_roles',
66             isa => 'ArrayRef[Str]',
67             default => sub { [] },
68             );
69             has exclude_servers => (
70             is => 'ro',
71             writer => '_exclude_servers',
72             isa => 'Maybe[ArrayRef]',
73             default => sub { [] },
74             );
75             has exclude_roles => (
76             is => 'ro',
77             writer => '_exclude_roles',
78             isa => 'ArrayRef[Str]',
79             default => sub { [] },
80             );
81             has log_level => (
82             is => 'ro',
83             writer => '_log_level',
84             isa => 'LOG_LEVEL',
85             default => 'info',
86             );
87             has _dont_exit => (
88             is => 'rw',
89             isa => 'Maybe[Bool]',
90             default => 0
91             );
92              
93             my %REGISTERED_MODULES = (
94             task => {
95             get => 'Helm::Task::get',
96             patch => 'Helm::Task::patch',
97             put => 'Helm::Task::put',
98             rsync_put => 'Helm::Task::rsync_put',
99             run => 'Helm::Task::run',
100             unlock => 'Helm::Task::unlock',
101             },
102             log => {
103             console => 'Helm::Log::Channel::console',
104             file => 'Helm::Log::Channel::file',
105             mailto => 'Helm::Log::Channel::email',
106             irc => 'Helm::Log::Channel::irc',
107             },
108             configuration => {helm => 'Helm::Conf::Loader::helm'},
109             );
110              
111             around BUILDARGS => sub {
112             my $orig = shift;
113             my $class = shift;
114             my %args = (@_ == 1 && ref $_[0] && ref $_[0] eq 'HASH') ? %{$_[0]} : @_;
115              
116             # allow "log" list of URIs to be passed into new() and then convert them into
117             # a Helm::Log object with various Helm::Log::Channel objects
118             if (my $log_uris = delete $args{log}) {
119             my $log =
120             Helm::Log->new($args{log_level} ? (log_level => $args{log_level}) : ());
121             foreach my $uri (@$log_uris) {
122             # console is a special case
123             $uri = 'console://blah' if $uri eq 'console';
124             $uri = try {
125             URI->new($uri);
126             } catch {
127             CORE::die("Invalid log URI $uri");
128             };
129             my $scheme = $uri->scheme;
130             CORE::die("Unknown log type for $uri") unless $scheme;
131             my $log_class = $REGISTERED_MODULES{log}->{$scheme};
132             CORE::die("Unknown log type for $uri") unless $log_class;
133             eval "require $log_class";
134              
135             if( $@ ) {
136             my $log_class_file = $log_class;
137             $log_class_file =~ s/::/\//g;
138             if( $@ =~ /Can't locate \Q$log_class_file\E\.pm/ ) {
139             CORE::die("Can not find module $log_class for log type $scheme");
140             } else {
141             CORE::die("Could not load module $log_class for log type $scheme: $@");
142             }
143             }
144             Helm->debug("Adding new logging channel for URI $uri using class $log_class");
145             $log->add_channel($log_class->new(uri => $uri, task => $args{task}));
146             }
147             $args{log} = $log;
148             }
149              
150             return $class->$orig(%args);
151             };
152              
153             sub BUILD {
154             my $self = shift;
155              
156             $self->log->initialize($self);
157              
158             # create a config object from the config URI string (if it's not already a config object)
159             if ($self->config_uri && !$self->config ) {
160             Helm->debug("Loading configuration for URI " . $self->config_uri);
161             $self->_config($self->load_configuration($self->config_uri));
162             }
163              
164             # do we have any servers we're excluding?
165             my %excludes;
166             if( my @excludes = @{$self->exclude_servers} ) {
167             foreach my $server_name (Helm::Server->expand_server_names(@excludes)) {
168             if( my $config = $self->config ) {
169             if( my $server = $config->get_server_by_abbrev($server_name, $self) ) {
170             $server_name = $server->name;
171             }
172             }
173             $excludes{$server_name} = 1;
174             }
175             }
176              
177             # if we have servers let's turn them into Helm::Server objects, let's fully expand their names in case we're using abbreviations
178             my @server_names = @{$self->servers};
179             if(@server_names) {
180             my @server_objs;
181             foreach my $server_name (Helm::Server->expand_server_names(@server_names)) {
182             if( $excludes{$server_name} ) {
183             Helm->debug("Excluding server $server_name");
184             next;
185             }
186             # if it's already a Helm::Server just keep it
187             if( ref $server_name && blessed($server_name) && $server_name->isa('Helm::Server') ) {
188             push(@server_objs, $server_name);
189             } elsif( my $config = $self->config ) {
190             # with a config file we can find out more about these servers
191             my $server = $config->get_server_by_abbrev($server_name, $self)
192             || Helm::Server->new(name => $server_name);
193             push(@server_objs, $server);
194             } else {
195             push(@server_objs, Helm::Server->new(name => $server_name));
196             }
197             }
198             $self->_servers(\@server_objs);
199             }
200              
201             # if we have any roles, then get the servers with (or without) those roles
202             my @roles = @{$self->roles};
203             my @exclude_roles = @{$self->exclude_roles};
204             if( @roles ) {
205             $self->die("Can't specify roles without a config") if !$self->config;
206             my @servers = @{$self->servers};
207             push(@servers, grep { !$excludes{$_->name} } $self->config->get_servers_by_roles(\@roles, \@exclude_roles));
208             if(!@servers) {
209             if( @exclude_roles ) {
210             $self->die("No servers with roles ("
211             . join(', ', @roles)
212             . ") when roles ("
213             . join(', ', @exclude_roles)
214             . ") are excluded");
215             } else {
216             $self->die("No servers with roles: " . join(', ', @roles));
217             }
218             }
219             $self->_servers(\@servers);
220             }
221            
222             # if we still don't have any servers, then use 'em all
223             my @servers = @{$self->servers};
224             if(!@servers) {
225             $self->die("You must specify servers if you don't have a config") if !$self->config;
226            
227             # exclude any servers we don't want
228             @servers =
229             grep { !@exclude_roles || !$_->has_role(@exclude_roles) }
230             grep { !$excludes{$_->name} } @{$self->config->servers};
231              
232             # are we operating on all the servers?
233             if(!@exclude_roles && !%excludes ) {
234             $self->_all_configured_servers(1);
235             }
236            
237             $self->_servers(\@servers);
238             }
239             }
240              
241             sub steer {
242             my $self = shift;
243             my $task = $self->task;
244              
245             # make sure it's a task we know about and can load
246             my $task_class = $REGISTERED_MODULES{task}->{$task};
247             $self->die("Unknown task $task") unless $task_class;
248             eval "require $task_class";
249              
250             if( $@ ) {
251             my $task_class_file = $task_class;
252             $task_class_file =~ s/::/\//g;
253             if( $@ =~ /Can't locate \Q$task_class_file\E\.pm/ ) {
254             $self->die("Can not find module $task_class for task $task");
255             } else {
256             $self->die("Could not load module $task_class for task $task");
257             }
258             }
259              
260             my $task_obj = $task_class->new(helm => $self);
261             $task_obj->validate();
262              
263             # make sure have a local lock if we need it
264             if ($self->lock_type eq 'local' || $self->lock_type eq 'both') {
265             Helm->debug("Trying to optain local helm lock");
266             $self->die("Cannot obtain a local helm lock. Is another helm process running?",
267             no_release_locks => 1)
268             unless $self->_get_local_lock;
269             }
270              
271             my @servers = @{$self->servers};
272             $self->log->info("Helm execution started by " . getlogin);
273             if( @servers > 20 ) {
274             $self->log->info(qq("Running task "$task" on ) . scalar(@servers) . " servers");
275             } else {
276             $self->log->info(qq(Running task "$task" on servers: ) . join(', ', @servers));
277             }
278              
279             $self->log->debug("Running task setup");
280             $task_obj->setup();
281              
282             my $forker;
283             if( $self->parallel ) {
284             Helm->debug("Setting up fork manager");
285             $forker = Parallel::ForkManager->new($self->parallel_max);
286             Helm->debug("Letting loggers know we're going to parallelize things");
287             $self->log->parallelize($self);
288             }
289              
290             # execute the task for each server
291             $self->_dont_exit(1) if $self->continue_with_errors;
292             foreach my $server (@servers) {
293             $self->log->start_server($server);
294             $self->_current_server($server);
295              
296             my $port = $server->port || $self->default_port;
297             my %ssh_args = (
298             ctl_dir => catdir(File::HomeDir->my_home, '.helm'),
299             strict_mode => 0,
300             );
301             $ssh_args{port} = $port if $port;
302             $ssh_args{timeout} = $self->timeout if $self->timeout;
303             $self->log->debug("Setting up SSH connection to $server" . ($port ? ":$port" : ''));
304              
305             # in parallel mode, send all stdout/stderr from each connection to a file
306             if( $self->parallel ) {
307             my $log_file = catfile(tmpdir(), "helm-$server.log");
308             open(my $log_fh, '>', $log_file) or die "Could not open file $log_file for logging: $!";
309             open(my $devnull, '<', devnull) or die "Could not open /dev/null: $!";
310             $ssh_args{default_stdout_fh} = $log_fh;
311             $ssh_args{default_stderr_fh} = $log_fh;
312             $ssh_args{default_stdin_fh} = $devnull;
313             $self->log->info("Logging output for $server to $log_file");
314              
315             my $pid = $forker->start;
316             Helm->debug("Letting the loggers know we've actually forked off a child task worker");
317             if( $pid ) {
318             # let the loggers know we're now forked;
319             $self->log->forked('parent');
320             next;
321             } else {
322             $self->log->forked('child');
323             }
324             }
325              
326             my $connection_name = $server->name;
327             my $user = $self->user;
328             $connection_name = $user . '@' . $connection_name if $user;
329              
330             my $ssh = Net::OpenSSH->new($connection_name, %ssh_args);
331             $ssh->error
332             && $self->die("Can't ssh to $server" . ($user ? " as user $user" : '') . ": " . $ssh->error);
333             $self->_current_ssh($ssh);
334              
335             # get a lock on the server if we need to
336             if ($self->lock_type eq 'remote' || $self->lock_type eq 'both') {
337             Helm->debug("Trying to obtain remote lock on $server");
338             $self->die("Cannot obtain remote lock on $server. Is another helm process working there?",
339             no_release_locks => 1) unless $self->_get_remote_lock($ssh);
340             }
341              
342             Helm->debug(qq(Excuting task "$task" on server "$server"));
343             $task_obj->execute(
344             ssh => $ssh,
345             server => $server,
346             );
347              
348             $self->log->end_server($server);
349             $self->_release_remote_lock($ssh);
350             if( my $secs = $self->sleep ) {
351             Helm->debug("Sleeping for $secs seconds between servers");
352             sleep($secs);
353             }
354             if($self->parallel) {
355             Helm->debug("Finished work in child task process");
356             $forker->finish;
357             }
358             }
359              
360             if( $self->parallel ) {
361             Helm->debug("Waiting on all child task processes to finish");
362             $forker->wait_all_children;
363             }
364              
365             $self->log->debug("Running task teardown");
366             $task_obj->teardown();
367              
368             # release the local lock
369             $self->_release_local_lock();
370             Helm->debug("Finalizing loggers");
371             $self->log->finalize($self);
372             Helm->debug("Finished with all tasks on all servers");
373             }
374              
375             sub load_configuration {
376             my ($self, $uri) = @_;
377             $uri = try {
378             URI->new($uri)
379             } catch {
380             $self->die("Invalid configuration URI $uri");
381             };
382              
383             # try to load the right config module
384             my $scheme = $uri->scheme;
385             $self->die("Unknown config type for $uri") unless $scheme;
386             my $loader_class = $REGISTERED_MODULES{configuration}->{$scheme};
387             $self->die("Unknown config type for $uri") unless $loader_class;
388             eval "require $loader_class";
389              
390             if( $@ ) {
391             my $loader_class_file = $loader_class;
392             $loader_class_file =~ s/::/\//g;
393             if( $@ =~ /Can't locate \Q$loader_class_file\E\.pm/ ) {
394             $self->die("Can not find module $loader_class for configuration type $scheme");
395             } else {
396             $self->die("Could not load module $loader_class for configuration type $scheme: $@");
397             }
398             }
399              
400             $self->log->debug("Loading configuration for $uri from $loader_class");
401             return $loader_class->load(uri => $uri, helm => $self);
402             }
403              
404             sub task_help {
405             my ($class, $task) = @_;
406             # make sure it's a task we know about and can load
407             my $task_class = $REGISTERED_MODULES{task}->{$task};
408             CORE::die(qq(Unknown task "$task")) unless $task_class;
409             eval "require $task_class";
410             die $@ if $@;
411              
412             return $task_class->help($task);
413             }
414              
415             sub known_tasks {
416             my $class = shift;
417             return sort keys %{$REGISTERED_MODULES{task}};
418             }
419              
420             sub _get_local_lock {
421             my $self = shift;
422             $self->log->debug("Trying to acquire global local helm lock");
423             # lock the file so nothing else can run at the same time
424             my $lock_handle;
425             my $lock_file = $self->_local_lock_file();
426             open($lock_handle, '>', $lock_file) or $self->die("Can't open $lock_file for locking: $!");
427             if (flock($lock_handle, LOCK_EX | LOCK_NB)) {
428             $self->_local_lock_handle($lock_handle);
429             $self->log->debug("Local helm lock obtained");
430             return 1;
431             } else {
432             return 0;
433             }
434             }
435              
436             sub _release_local_lock {
437             my $self = shift;
438             if($self->local_lock_handle) {
439             $self->log->debug("Releasing global local helm lock");
440             close($self->local_lock_handle)
441             }
442             }
443              
444             sub _local_lock_file {
445             my $self = shift;
446             return catfile(tmpdir(), 'helm.lock');
447             }
448              
449             sub _get_remote_lock {
450             my ($self, $ssh) = @_;
451             my $server = $self->current_server;
452             $self->log->debug("Trying to obtain remote server lock for $server");
453              
454             # make sure the lock file on the server doesn't exist
455             my $lock_file = $self->_remote_lock_file();
456             my $output = $self->run_remote_command(
457             ssh => $ssh,
458             command => qq(if [ -e "/tmp/helm.remote.lock" ]; then echo "lock found"; else echo "no lock found"; fi),
459             ssh_method => 'capture',
460             );
461             chomp($output);
462             if( $output eq 'lock found') {
463             return 0;
464             } else {
465             # XXX - there's a race condition here, not sure what the right fix is though
466             $self->run_remote_command(ssh => $ssh, command => "touch $lock_file");
467             $self->log->debug("Remote server lock for $server obtained");
468             return 1;
469             }
470             }
471              
472             sub _release_remote_lock {
473             my ($self, $ssh) = @_;
474             if( $self->lock_type eq 'remote' || $self->lock_type eq 'both' ) {
475             $self->log->debug("Releasing remote server lock for " . $self->current_server);
476             my $lock_file = $self->_remote_lock_file();
477             $self->run_remote_command(ssh => $ssh, command => "rm -f $lock_file");
478             }
479             }
480              
481             sub _remote_lock_file {
482             my $self = shift;
483             return catfile(tmpdir(), 'helm.remote.lock');
484             }
485              
486             sub run_remote_command {
487             my ($self, %args) = @_;
488             my $ssh = $args{ssh};
489             my $ssh_options = $args{ssh_options} || {};
490             my $cmd = $args{command};
491             my $ssh_method = $args{ssh_method} || 'system';
492             my $server = $args{server} || $self->current_server;
493             my $sudo = $self->sudo;
494              
495             if( $sudo && !$args{no_sudo}) {
496             $cmd = "sudo -u $sudo $cmd";
497             $ssh_options->{tty} = 1;
498             }
499              
500             $self->log->debug("Running remote command ($cmd) on server $server");
501             $ssh->$ssh_method($ssh_options, $cmd)
502             or $self->die("Can't execute command ($cmd) on server $server: " . $ssh->error);
503             }
504              
505             sub run_local_command {
506             my $self = shift;
507             my %args = @_ > 1 ? @_ : (command => $_[0]);
508             my @cmd = ref $args{command} ? @{$args{command}} : ($args{command});
509              
510             my $return = system(@cmd);
511             if( system(@cmd) != 0 ) {
512             $self->die("Can't execute local command (" . join(' ', @cmd) . ": " . $!);
513             }
514             }
515              
516             sub ssh_connection {
517             my ($self, %args) = @_;
518             my $server = $args{server};
519             my %ssh_args = (
520             ctl_dir => catdir(File::HomeDir->my_home, '.helm'),
521             strict_mode => 0,
522             );
523             my $port = $server->port || $self->default_port;
524             $ssh_args{port} = $port if $port;
525             $ssh_args{timeout} = $self->timeout if $self->timeout;
526             $self->log->debug("Setting up SSH connection to $server" . ($port ? ":$port" : ''));
527             return Net::OpenSSH->new($server->name, %ssh_args);
528             }
529              
530             sub die {
531             my ($self, $msg, %options) = @_;
532             $self->log->error($msg);
533             unless($options{no_release_locks}) {
534             $self->_release_remote_lock($self->current_ssh);
535             $self->_release_local_lock();
536             }
537              
538             exit(1) unless $self->_dont_exit;
539             }
540              
541             sub register_module {
542             my ($class, $type, $key, $module) = @_;
543             CORE::die("Unknown Helm module type '$type'!") unless exists $REGISTERED_MODULES{$type};
544             Helm->debug("Loading module $module for $type plugins with key $key");
545             $REGISTERED_MODULES{$type}->{$key} = $module;
546             }
547              
548             # this is a class method so that it can be called even before any objects
549             # have been fully initialized.
550             sub debug {
551             return unless $DEBUG;
552             my ($self, @msgs) = @_;
553              
554             # open the debug log handle if we haven't opened it yet
555             # or re-open if we're already opened it in another process
556             if(!$DEBUG_LOG || $DEBUG_LOG_PID != $$) {
557             $DEBUG_LOG = IO::File->new('>> debug.log')
558             or $self->die("Could not open helm.debug for appending: $!");
559             $DEBUG_LOG->autoflush(1);
560             $DEBUG_LOG_PID = $$;
561             }
562              
563             my $ts = DateTime->now->strftime('%a %b %d %H:%M:%S %Y');
564             my ($calling_class) = caller();
565             foreach my $msg (@msgs) {
566             $msg =~ s/\s+$//;
567             $DEBUG_LOG->print("[$ts] [$$] [$calling_class] $msg\n");
568             }
569             }
570              
571             __PACKAGE__->meta->make_immutable;
572              
573             1;