File Coverage

lib/Bio/Graphics/Browser2/Render/Slave/AWS_Balancer.pm
Criterion Covered Total %
statement 154 603 25.5
branch 30 228 13.1
condition 8 64 12.5
subroutine 41 99 41.4
pod 0 61 0.0
total 233 1055 22.0


line stmt bran cond sub pod time code
1             package Bio::Graphics::Browser2::Render::Slave::AWS_Balancer;
2              
3             # This module is used to manage GBrowse slaves in an on-demand Amazon EC2
4             # environment.
5              
6 1     1   229395 use strict;
  1         2  
  1         23  
7 1     1   3 use Parse::Apache::ServerStatus;
  1         1  
  1         15  
8 1     1   23 use VM::EC2 1.22;
  1         24  
  1         5  
9 1     1   736 use VM::EC2::Instance::Metadata;
  1         1212  
  1         22  
10 1     1   683 use VM::EC2::Staging::Manager;
  1         24928  
  1         39  
11 1     1   518 use LWP::Simple 'get','head';
  1         13059  
  1         7  
12 1     1   174 use LWP::UserAgent;
  1         1  
  1         16  
13 1     1   4 use Parse::Apache::ServerStatus;
  1         1  
  1         7  
14 1     1   573 use IO::File;
  1         956  
  1         158  
15 1     1   15 use POSIX 'strftime','setsid','setuid';
  1         2  
  1         8  
16 1     1   73 use Carp 'croak';
  1         1  
  1         39  
17 1     1   10 use FindBin '$Bin';
  1         1  
  1         122  
18              
19 1     1   5 use constant CONFIGURE_SLAVES => "$Bin/gbrowse_configure_slaves.pl";
  1         2  
  1         4172  
20              
21             # arguments:
22             # ( -conf => $config_path,
23             # -access_key => $aws_access_key,
24             # -secret_key => $aws_secret_key,
25             # -logfile => $path_to_logfile,
26             # -pidfile => $path_to_pidfile,
27             # -user => $user_name_to_run_under,# (root only)
28             # -daemon => $daemon_mode,
29             # -ssh_key => $ssh_login_key (optional)
30             # )
31              
32             sub new {
33 1     1 0 88 my $class = shift;
34 1         3 my %args = @_;
35 1 50       4 $args{-conf} or croak "-conf argument required";
36 1 50       23 -e $args{-conf} or croak "$args{-conf} not found";
37              
38             #setup EC2 environment
39 1   33     9 $args{-access_key} ||= $ENV{EC2_ACCESS_KEY};
40 1   33     5 $args{-secret_key} ||= $ENV{EC2_SECRET_KEY};
41              
42             my $self = bless {
43             access_key => $args{-access_key},
44             secret_key => $args{-secret_key},
45             logfile => $args{-logfile},
46             pidfile => $args{-pidfile},
47             user => $args{-user},
48             conf_file => $args{-conf},
49             daemon => $args{-daemon},
50             ssh_key => $args{-ssh_key},
51 1   33     15 verbosity => 2,
52             },ref $class || $class;
53 1         16 $self->initialize();
54 1         12 return $self;
55             }
56              
57 0     0 0 0 sub logfile {shift->{logfile}}
58 0     0 0 0 sub pidfile {shift->{pidfile}}
59 0     0 0 0 sub pid {shift->{pid}}
60 0     0 0 0 sub user {shift->{user}}
61 0     0 0 0 sub daemon {shift->{daemon}}
62 0     0 0 0 sub ssh_key {shift->{ssh_key}}
63             sub ec2_credentials {
64 0     0 0 0 my $self = shift;
65 0 0       0 if ($self->running_as_instance) {
66 0         0 my $credentials = $self->{instance_metadata}->iam_credentials;
67 0 0       0 return (-security_token => $credentials) if $credentials;
68 0         0 $self->log_debug('No instance security credentials. Does this instance have an IAM role?');
69             }
70 0   0     0 $self->{access_key} ||= $self->_prompt('Enter your EC2 access key:');
71 0   0     0 $self->{secret_key} ||= $self->_prompt('Enter your EC2 secret key:');
72             return (-access_key => $self->{access_key},
73             -secret_key => $self->{secret_key})
74 0         0 }
75             sub logfh {
76 0     0 0 0 my $self = shift;
77 0         0 my $d = $self->{logfh};
78 0 0       0 $self->{logfh} = shift if @_;
79 0         0 $d;
80             }
81              
82             sub verbosity {
83 2     2 0 8 my $self = shift;
84 2         5 my $d = $self->{verbosity};
85 2 100       8 $self->{verbosity} = shift if @_;
86 2         5 $d;
87             }
88              
89             sub initialize {
90 1     1 0 2 my $self = shift;
91 1         3 $self->_parse_conf_file;
92 1         2 $self->_parse_instance_metadata;
93             }
94              
95             sub DESTROY {
96 1     1   3 my $self = shift;
97 1         5 $self->cleanup;
98             }
99              
100             sub run {
101 0     0 0 0 my $self = shift;
102 0 0 0     0 $self->become_daemon && return
103             if $self->daemon;
104              
105 0         0 my $killed;
106 0     0   0 local $SIG{INT} = local $SIG{TERM} = sub {$self->log_info('Termination signal received');
107 0         0 $killed++; };
  0         0  
108 0         0 $self->{pid} = $$;
109              
110 0         0 my $poll = $self->master_poll;
111 0 0       0 eval {$self->ec2} or croak $@;
  0         0  
112 0         0 $self->log_info("Monitoring load at intervals of $poll sec\n");
113 0         0 while (sleep $poll) {
114 0 0       0 last if $killed;
115 0         0 my $load = $self->get_load();
116 0         0 $self->log_debug("Current load: $load req/s\n");
117 0         0 $self->adjust_instances($load);
118 0         0 $self->update_requests();
119             }
120              
121 0         0 $self->log_info('Normal termination');
122             }
123              
124             sub stop_daemon {
125 0     0 0 0 my $self = shift;
126 0         0 my $pid = $self->pid;
127 0 0 0     0 if (!$pid && (my $pidfile = $self->pidfile)) {
128 0 0       0 my $fh = IO::File->new($pidfile) or croak "No PID file; is daemon runnning?";
129 0         0 $pid = $fh->getline;
130 0         0 chomp($pid);
131 0         0 $fh->close;
132             }
133 0 0       0 unlink $self->pidfile if -e $self->pidfile;
134 0 0       0 kill TERM=>$pid if defined $pid;
135             }
136              
137             #######################
138             # configuration
139             ######################
140              
141 1     1 0 28 sub conf_file {shift->{conf_file}}
142              
143             sub load_table {
144 7     7 0 16 return shift->{options}{'LOAD TABLE'};
145             }
146              
147             sub option {
148 4     4 0 4 my $self = shift;
149 4         4 my ($stanza,$option) = @_;
150 4         18 return $self->{options}{uc $stanza}{$option};
151             }
152              
153             # given load, returns two element list of min_instances, max_instances
154             sub slaves_wanted {
155 7     7 0 12 my $self = shift;
156 7         5 my $load = shift;
157              
158 7 50       10 my $lt = $self->load_table or croak 'no load table!';
159 7         9 my ($min,$max) = (0,0);
160 7         23 for my $l (sort {$a<=>$b} keys %$lt) {
  42         58  
161 35 100       58 ($min,$max) = @{$lt->{$l}} if $load >= $l;
  17         31  
162             }
163 7         33 return ($min,$max);
164             }
165              
166 1 50   1 0 4 sub slave_instance_type { shift->option('SLAVE','instance_type') || 'm1.large' }
167 1 50   1 0 3 sub slave_spot_bid { shift->option('SLAVE','spot_bid') || 0.08 }
168 0     0 0 0 sub slave_ports { my $p = shift->option('SLAVE','ports');
169 0         0 my @p = split /\s+/,$p;
170 0 0       0 return @p ? @p : (8101); }
171             sub slave_endpoint {
172 0     0 0 0 my $self = shift;
173 0 0       0 if ($self->running_as_instance) {
174 0         0 my $zone = $self->{instance_metadata}->endpoint;
175 0         0 return $zone;
176             } else {
177 0   0     0 my $region = $self->option('SLAVE','region') || 'us-east-1';
178 0         0 return "http://ec2.$region.amazonaws.com";
179             }
180             }
181              
182             sub slave_zone {
183 0     0 0 0 my $self = shift;
184 0 0       0 if ($self->running_as_instance) {
185 0         0 return $self->{instance_metadata}->availabilityZone;
186             } else {
187 0         0 $self->option('SLAVE','availability_zone');
188             }
189             }
190              
191             sub slave_image_id {
192 0     0 0 0 my $self = shift;
193 0 0       0 if ($self->running_as_instance) {
194 0         0 return $self->{instance_metadata}->imageId;
195             } else {
196 0         0 $self->option('SLAVE','image_id');
197             }
198             }
199              
200             sub slave_data_snapshots {
201 0     0 0 0 my $self = shift;
202 0         0 return split /\s+/,$self->option('SLAVE','data_snapshots');
203             }
204              
205             sub slave_block_device_mapping {
206 0     0 0 0 my $self = shift;
207 0 0       0 my $image = $self->ec2->describe_images($self->slave_image_id)
208             or die "Could not find image ",$self->slave_image_id;
209 0         0 my $root = $image->rootDeviceName;
210 0         0 my @root = grep {$_->deviceName eq $root} $image->blockDeviceMapping;
  0         0  
211            
212 0         0 my @snaps = $self->slave_data_snapshots;
213 0         0 my @bdm;
214             DEVICE:
215 0         0 for my $major ('g'..'z') {
216 0         0 for my $minor (1..15) {
217 0 0       0 my $snap = shift @snaps or last DEVICE;
218 0         0 push @bdm,"/dev/sd${major}${minor}=${snap}::true";
219             }
220             }
221 0         0 return [@root,@bdm];
222             }
223              
224             sub slave_subnet {
225 0     0 0 0 my $self = shift;
226 0 0       0 if ($self->running_as_instance) {
227 0         0 return eval {(values %{$self->{instance_metadata}->interfaces})[0]{subnetId}};
  0         0  
  0         0  
228             } else {
229 0         0 $self->option('SLAVE','subnet');
230             }
231             }
232              
233             sub slave_ssh_key {
234 0     0 0 0 my $self = shift;
235 0         0 my $key = $self->ssh_key;
236 0   0     0 $key ||= $self->option('SLAVE','ssh_key');
237 0         0 return $key;
238             }
239              
240             sub slave_security_group {
241 0     0 0 0 my $self = shift;
242 0         0 my $sg = $self->{slave_security_group};
243 0 0       0 return $sg if $sg;
244 0         0 my $ec2 = $self->ec2;
245 0         0 $sg = eval {$ec2->describe_security_groups(-name => "GBROWSE_SLAVE_$$")};
  0         0  
246 0   0     0 $sg ||= $ec2->create_security_group(-name => "GBROWSE_SLAVE_$$",
247             -description => 'Temporary security group for slave communications');
248 0 0       0 my $ip = $self->running_as_instance ? $self->internal_ip : $self->master_ip;
249            
250             $self->log_debug(
251             $sg->authorize_incoming(-protocol => 'tcp',
252             -port => $_,
253             -source_ip => "$ip/32")
254 0         0 ) foreach $self->slave_ports;
255 0 0       0 $self->log_debug(
256             $sg->authorize_incoming(-protocol => 'tcp',
257             -port => 22,
258             -source_ip=> "$ip/32"))
259             if $self->slave_ssh_key;
260            
261 0 0       0 $sg->update or croak $ec2->error_str;
262 0         0 return $self->{slave_security_group} = $sg;
263             }
264              
265             sub ec2 {
266 0     0 0 0 my $self = shift;
267             # create a new ec2 each time because security credentials may expire
268 0         0 my @credentials = $self->ec2_credentials;
269 0         0 return $self->{ec2} = VM::EC2->new(-endpoint => $self->slave_endpoint,
270             -raise_error => 1,
271             @credentials);
272             }
273              
274             sub internal_ip {
275 0     0 0 0 my $self = shift;
276 0 0       0 return unless $self->running_as_instance;
277 0         0 return $self->{instance_metadata}->privateIpAddress;
278             }
279              
280             sub master_security_group {
281 0     0 0 0 my $self = shift;
282 0 0       0 return unless $self->running_as_instance;
283 0         0 my $sg = ($self->{instance_metadata}->securityGroups)[0];
284 0         0 $sg =~ s/%([0-9A-Fa-f]{2})/chr(hex($1))/eg;
  0         0  
285 0         0 return $sg;
286             }
287              
288             sub master_ip {
289 1     1 0 1 my $self = shift;
290 1         3 my $ip = $self->option('MASTER','external_ip');
291 1   33     6 $ip ||= $self->_get_external_ip;
292 1         9 return $ip;
293             }
294              
295             # poll interval in seconds
296             sub master_poll {
297 1     1 0 2 my $self = shift;
298 1         2 my $pi = $self->option('MASTER','poll_interval');
299 1         5 return $pi * 60;
300             }
301              
302             sub master_server_status_url {
303 0     0 0 0 my $self = shift;
304 0   0     0 return $self->option('MASTER','server_status_url')
305             || 'http://localhost/server-status';
306             }
307              
308             sub running_as_instance {
309 1     1 0 4 my $self = shift;
310 1   33     17 return -e '/var/lib/cloud/data/previous-instance-id'
311             && head('http://169.254.169.254');
312             }
313              
314              
315             # update conf file with new snapshot images
316             sub update_data_snapshots {
317 0     0 0 0 my $self = shift;
318 0         0 my @snapshot_ids = @_;
319 0         0 my $timestamp = 'synchronized with local filesystem on '.localtime;
320 0         0 my $conf_file = $self->conf_file;
321 0         0 my ($user,$group) = (stat($conf_file))[4,5];
322 0 0       0 open my $in,'<',$conf_file or die "Couldn't open $conf_file: $!";
323 0 0       0 open my $out,'>',"$conf_file.new" or die "Couldn't open $conf_file: $!";
324 0         0 while (<$in>) {
325 0         0 chomp;
326 0         0 s/^(data_snapshots\s*=).*/$1 @snapshot_ids # $timestamp/;
327 0         0 print $out "$_\n";
328             }
329 0         0 close $in;
330 0         0 close $out;
331 0 0       0 rename "$conf_file","$conf_file.bak" or die "Can't rename $conf_file: $!";
332 0 0       0 rename "$conf_file.new","$conf_file" or die "Can't rename $conf_file.new: $!";
333 0         0 chown $user,$group,$conf_file;
334             }
335              
336             #######################
337             # status
338             ######################
339              
340             # return true if slave is listening on at least one of the designated ports
341             sub ping_slave {
342 0     0 0 0 my $self = shift;
343 0         0 my $instance = shift;
344 0 0       0 my $ip = $self->running_as_instance?$instance->privateIpAddress:$instance->ipAddress;
345 0         0 my ($port) = $self->slave_ports;
346 0         0 my $ua = LWP::UserAgent->new;
347 0         0 my $req = HTTP::Request->new(HEAD => "http://$ip:$port");
348 0         0 my $res = $ua->request($req);
349 0         0 return $res->code == 403;
350             }
351              
352             # returns list of slave instances as VM::EC2::Instance objects
353             sub running_slaves {
354 0     0 0 0 my $self = shift;
355 0   0     0 $self->{running_slaves} ||= {};
356 0         0 return values %{$self->{running_slaves}};
  0         0  
357             }
358              
359             sub add_slave {
360 0     0 0 0 my $self = shift;
361 0         0 my $instance = shift;
362 0         0 $self->{running_slaves}{$instance}=$instance;
363             }
364              
365             sub remove_slave {
366 0     0 0 0 my $self = shift;
367 0         0 my $instance = shift;
368 0         0 delete $self->{running_slaves}{$instance};
369             }
370              
371             # given an instance ID, returns the slave VM::EC2::Instance object
372             sub id2slave {
373 0     0 0 0 my $self = shift;
374 0         0 my $id = shift;
375 0         0 return $self->{running_slaves}{$id};
376             }
377              
378             # spot requests - only tracks pending requests
379             sub pending_spot_requests {
380 0     0 0 0 my $self = shift;
381 0   0     0 $self->{pending_requests} ||= {};
382 0         0 return values %{$self->{pending_requests}};
  0         0  
383             }
384              
385             sub add_spot_request {
386 0     0 0 0 my $self = shift;
387 0         0 my $sr = shift;
388 0         0 $self->{pending_requests}{$sr} = $sr;
389             }
390              
391             sub remove_spot_request {
392 0     0 0 0 my $self = shift;
393 0         0 my $sr = shift;
394 0         0 delete $self->{pending_requests}{$sr};
395             }
396              
397             sub id2_spot_request {
398 0     0 0 0 my $self = shift;
399 0         0 my $id = shift;
400 0         0 return $self->{pending_requests}{$id};
401             }
402              
403             sub get_load {
404 0     0 0 0 my $self = shift;
405 0   0     0 $self->{pr} ||= Parse::Apache::ServerStatus->new(url=>$self->master_server_status_url);
406 0 0       0 if (-e '/tmp/gbrowse_load') {
407 0         0 open my $fh,'/tmp/gbrowse_load';
408 0         0 chomp (my $load = <$fh>);
409 0         0 return $load;
410             }
411 0 0       0 my $stats = $self->{pr}->get or $self->fatal("couldn't fetch load from Apache status: ",$self->{pr}->errstr);
412 0         0 return $stats->{rs};
413             }
414              
415              
416             ###########################################
417             # state change
418             ###########################################
419              
420             # this is called to update the number of live and pending slaves
421             # according to the load
422             sub adjust_instances {
423 0     0 0 0 my $self = shift;
424 0         0 my $load = shift;
425 0         0 my ($min,$max) = $self->slaves_wanted($load);
426 0         0 my $current = $self->pending_spot_requests + $self->running_slaves;
427            
428 0 0       0 if ($current < $min) {
    0          
429 0         0 $self->log_debug("Need to add more slave spot instances (have $current, wanted $min)\n");
430 0         0 $self->request_spot_instance while $current++ < $min;
431             }
432              
433             elsif ($current > $max) {
434 0         0 $self->log_debug("Need to delete some slave spot instances (have $current, wanted $max)\n");
435 0         0 my $reconfigure;
436 0         0 my $ec2 = $self->ec2;
437 0         0 my @candidates = ($self->pending_spot_requests,$self->running_slaves);
438 0         0 while ($current-- > $max) {
439 0         0 my $c = shift @candidates;
440 0 0       0 if ($c->isa('VM::EC2::Spot::InstanceRequest')) {
    0          
441 0         0 $self->log_debug("Cancelling spot instance request $c\n");
442 0         0 $ec2->cancel_spot_instance_requests($c);
443 0         0 $self->remove_spot_request($c);
444 0         0 $reconfigure++;
445             } elsif ($c->isa('VM::EC2::Instance')) {
446 0         0 $self->log_debug("Terminating slave instance $c\n");
447 0         0 $ec2->terminate_instances($c);
448 0         0 $self->remove_slave($c);
449 0         0 $reconfigure++;
450             }
451             }
452             # we reconfigure master immediately to avoid calling instance that were terminated
453 0 0       0 $self->reconfigure_master() if $reconfigure;
454             }
455             }
456              
457             # this is called to act on state changes in spot requests and instances
458             sub update_requests {
459 0     0 0 0 my $self = shift;
460 0         0 my @requests = $self->pending_spot_requests;
461 0         0 for my $sr (@requests) {
462 0         0 my $state = $sr->current_status;
463 0         0 $self->log_debug("Status of $sr is $state");
464 0         0 my $instance = $sr->instance;
465 0 0 0     0 if ($state eq 'fulfilled' && $instance && $instance->instanceState eq 'running') {
    0 0        
466 0         0 $instance->add_tag(Name => 'GBrowse Slave');
467 0         0 $self->log_debug("New instance $instance; testing readiness");
468 0 0       0 next unless $self->ping_slave($instance); # not ready - try again on next poll
469 0         0 $self->log_debug("New slave instance is ready");
470 0         0 $self->add_slave($instance);
471 0         0 $self->remove_spot_request($sr); # we will never check this request again
472 0         0 $self->reconfigure_master();
473             } elsif ($sr->current_state =~ /cancelled|failed/ ) {
474 0         0 $self->remove_spot_request($sr);
475             }
476             }
477             }
478              
479             # launch a spot instance request
480             sub request_spot_instance {
481 0     0 0 0 my $self = shift;
482 0         0 my $ec2 = $self->ec2;
483              
484 0         0 my $subnet = $self->slave_subnet;
485 0         0 my @ports = $self->slave_ports;
486 0         0 my $key = $self->slave_ssh_key;
487              
488 0 0       0 my @options = (
    0          
489             -image_id => $self->slave_image_id,
490             -instance_type => $self->slave_instance_type,
491             -instance_count => 1,
492             -security_group_id => $self->slave_security_group,
493             -spot_price => $self->slave_spot_bid,
494             -block_device_mapping => $self->slave_block_device_mapping,
495             -user_data => "#!/bin/sh\nexec /opt/gbrowse/etc/init.d/gbrowse-slave start @ports",
496             $subnet? (-subnet_id => $subnet) : (),
497             $key ? (-key_name => $key) : (),
498             );
499              
500 0         0 my @debug_options;
501 0         0 for (my $i = 0;$i<@options;$i+=2) {
502 0         0 my $a = $options[$i];
503 0         0 my $v = $options[$i+1];
504 0 0 0     0 if (ref $v && ref $v eq 'ARRAY') {
505 0         0 push @debug_options,($a=>$_) foreach @$v;
506             } else {
507 0         0 push @debug_options,($a=>$v);
508             }
509             }
510            
511              
512 0         0 my $debug_options = "@debug_options";
513 0         0 $debug_options =~ tr/\n/ /;
514              
515 0         0 $self->log_debug("Launching a spot request with options: $debug_options\n");
516 0         0 my @requests = $ec2->request_spot_instances(@options);
517 0 0       0 @requests or croak $ec2->error_str;
518              
519 0         0 $_->add_tag(Requestor=>'GBrowse AWS Balancer') foreach @requests;
520 0         0 $self->add_spot_request($_) foreach @requests;
521             }
522              
523             sub kill_slave {
524 0     0 0 0 my $self = shift;
525 0         0 my $instance = shift;
526 0         0 $self->remove_slave($instance);
527 0         0 $self->reconfigure_master();
528 0         0 $instance->terminate();
529             }
530              
531             sub reconfigure_master {
532 0     0 0 0 my $self = shift;
533 0         0 my @slaves = $self->running_slaves;
534 0 0       0 my @ips = map {$self->running_as_instance?$_->privateIpAddress:$_->ipAddress} @slaves;
  0         0  
535 0         0 my @a;
536 0         0 for my $i (@ips) {
537 0         0 for my $p ($self->slave_ports) {
538 0         0 push @a,"http://$i:$p";
539             }
540             }
541 0 0       0 if (@a) {
542 0         0 system 'sudo',CONFIGURE_SLAVES,(map {('--set'=>$_)} @a);
  0         0  
543             } else {
544 0         0 system 'sudo',CONFIGURE_SLAVES,'--set','';
545             }
546            
547             }
548              
549 0     0 0 0 sub log_debug {shift->_log(3,@_)}
550 1     1 0 5 sub log_info {shift->_log(2,@_)}
551 0     0 0 0 sub log_warn {shift->_log(1,@_)}
552 0     0 0 0 sub log_crit {shift->_log(0,@_)}
553             sub fatal {
554 0     0 0 0 my $self = shift;
555 0         0 my @msg = shift;
556 0         0 $self->log_crit(@msg);
557 0         0 die;
558             }
559              
560             sub _log {
561 1     1   1 my $self = shift;
562 1         3 my ($level,@msg) = @_;
563 1 50       4 return unless $level <= $self->verbosity;
564 0         0 my $ts = strftime('%d/%b/%Y:%H:%M:%S %z',localtime);
565 0         0 my $msg = ucfirst "@msg";
566 0         0 chomp($msg);
567 0         0 print STDERR "[$ts] $msg\n";
568             }
569              
570             sub cleanup {
571 1     1 0 2 my $self = shift;
572 1 50 33     68 return if !$self->{pid} || $self->{pid} != $$;
573              
574 0 0       0 my $ec2 = eval{$self->ec2} or return;
  0         0  
575 0         0 $self->log_debug('Running cleanup routine');
576              
577 0         0 my @requests = $self->pending_spot_requests;
578 0         0 my @instances = ($self->running_slaves,grep {$_} map {$_->instance} @requests);
  0         0  
  0         0  
579              
580 0 0       0 if (@instances) {
581 0         0 $self->log_debug("terminating spot instances @instances\n");
582 0         0 delete $self->{running_slaves};
583 0         0 $self->reconfigure_master();
584 0         0 $ec2->terminate_instances(@instances);
585             }
586              
587 0 0       0 if (my @requests = grep {$_->current_state eq 'open'} $self->pending_spot_requests) {
  0         0  
588 0         0 $self->log_debug("cancelling spot instance requests @requests\n");
589 0         0 $ec2->cancel_spot_instance_requests(@requests);
590 0         0 delete $self->{pending_requests};
591             }
592              
593 0 0       0 if (my $sg = $self->{slave_security_group}) {
594 0 0       0 if (@instances) {
595 0         0 $self->log_debug('waiting for running instances to terminate');
596 0         0 $ec2->wait_for_instances(@instances);
597             }
598 0         0 $self->ec2->delete_security_group($sg);
599 0         0 delete $self->{slave_security_group};
600 0         0 $self->log_debug("deleting security group $sg\n");
601             }
602              
603 0 0       0 unlink $self->pidfile if $self->pidfile;
604             }
605              
606              
607              
608             #######################
609             # Synchronization
610             #######################
611              
612             sub launch_staging_server {
613 0     0 0 0 my $self = shift;
614 0         0 my $ec2 = $self->ec2;
615 0   0     0 my $staging = $self->{staging} ||= $ec2->staging_manager(-on_exit=>'run',
616             -verbose=>3);
617 0         0 $self->log_debug("Requesting block device mapping: ",join ',',@{$self->slave_block_device_mapping});
  0         0  
618 0         0 my $server = $staging->get_server(-name => 'slave_staging_server',
619             -username => 'admin',
620             -instance_type => $self->slave_instance_type,
621             -image_name => $self->slave_image_id,
622             -block_devices => $self->slave_block_device_mapping,
623             -server_class => 'Bio::Graphics::Browser2::Render::Slave::StagingServer', # this is defined at the bottom of this .pm file
624             -architecture => undef
625             );
626 0         0 $server->{manager} = $staging; # avoid global destruction issues
627 0         0 return $server;
628             }
629              
630             #######################
631             # Daemon stuff
632             #######################
633              
634             # BUG - redundant code cut-and-paste from Slave.pm
635             sub become_daemon {
636 0     0 0 0 my $self = shift;
637              
638 0         0 my $child = fork();
639 0 0       0 croak "Couldn't fork: $!" unless defined $child;
640 0 0       0 return $child if $child; # return child PID in parent process
641              
642 0         0 umask(0);
643 0         0 $ENV{PATH} = '/bin:/sbin:/usr/bin:/usr/sbin';
644              
645 0         0 setsid(); # become process leader
646              
647             # write out PID file if requested
648 0 0       0 if (my $l = $self->pidfile) {
649 0 0       0 my $fh = IO::File->new($l,">")
650             or $self->log_crit("Could not open pidfile $l: $!");
651 0 0       0 $fh->print($$)
652             or $self->log_crit("Could not write to pidfile $l: $!");
653 0         0 $fh->close();
654             }
655 0         0 $self->open_log;
656 0 0       0 open STDERR,">&",$self->logfh if $self->logfh;
657              
658 0         0 chdir '/'; # don't hold open working directories
659 0         0 open STDIN, "
660 0         0 open STDOUT,">/dev/null";
661              
662 0         0 $self->set_user;
663 0         0 return;
664             }
665              
666             # change user if requested
667             sub set_user {
668 0     0 0 0 my $self = shift;
669 0 0       0 my $u = $self->user or return;
670 0         0 my $uid = getpwnam($u);
671 0 0       0 defined $uid or $self->log_crit("Cannot change uid to $u: unknown user");
672 0 0       0 setuid($uid) or $self->log_crit("Cannot change uid to $u: $!");
673             }
674              
675             # open log file if requested
676             sub open_log {
677 0     0 0 0 my $self = shift;
678 0 0       0 my $l = $self->logfile or return;
679 0 0       0 my $fh = IO::File->new($l,">>") # append
680             or $self->Fatal("Could not open logfile $l: $!");
681 0         0 $fh->autoflush(1);
682 0         0 $self->logfh($fh);
683             }
684              
685              
686             #######################
687             # internal routines
688             ######################
689              
690             sub _get_external_ip {
691 1     1   1 my $self = shift;
692 1         5 my $ip= get('http://icanhazip.com');
693 1         266775 chomp($ip);
694 1         7 $self->log_info("Found external IP address $ip");
695 1         7 return $ip;
696             }
697              
698             sub _parse_conf_file {
699 1     1   1 my $self = shift;
700 1 50       6 return if exists $self->{options}{'LOAD TABLE'};
701 1 50       6 open my $f,$self->conf_file or croak "Could not open ",$self->conf_file,": $!";
702 1         3 $self->{pushback} = [];
703 1         3 while (defined(my $line = $self->_getline($f))) {
704 3 50       13 $self->_parse_stanza($1,$f) if $line =~ /^\[([^]]+)\]/;
705             }
706 1         6 close $f;
707             croak "invalid config file; must contain [LOAD TABLE] and [SLAVE] stanzas"
708 1 50 33     8 unless exists $self->{options}{'LOAD TABLE'} and exists $self->{options}{'SLAVE'};
709             }
710              
711             sub _parse_stanza {
712 3     3   4 my $self = shift;
713 3         5 my ($stanza,$fh) = @_;
714 3 100       6 if (uc $stanza eq 'LOAD TABLE') {
715 1         3 $self->_parse_load_table($fh);
716             } else {
717 2         3 $self->_parse_regular_stanza($stanza,$fh);
718             }
719             }
720              
721             sub _parse_load_table {
722 1     1   0 my $self = shift;
723 1         2 my $fh = shift;
724 1         3 while (my $line = $self->_get_stanza_line($fh)) {
725 5         15 my @tokens = split /\s+/,$line;
726 5 50       7 @tokens == 3 or croak "invalid load table line: $line";
727 5         6 my ($load,$min,$max) = @tokens;
728 5         15 $self->{options}{'LOAD TABLE'}{$load} = [$min,$max];
729             }
730             }
731              
732             sub _parse_regular_stanza {
733 2     2   2 my $self = shift;
734 2         2 my ($stanza,$fh) = @_;
735 2         3 while (my $line = $self->_get_stanza_line($fh)) {
736 10 100       27 my ($option,$value) = $line =~ /^(\S+)\s*=\s*(.+)/ or next;
737 6         17 $self->{options}{uc $stanza}{$option} = $value;
738             }
739             }
740              
741             sub _get_stanza_line {
742 18     18   12 my $self = shift;
743 18         11 my $fh = shift;
744 18         23 local $^W=0;
745 18         18 my $line = $self->_getline($fh);
746 18 100       25 if ($line =~ /^\[/) {
747 2         2 push @{$self->{pushback}},$line;
  2         3  
748 2         7 return;
749             }
750 16         33 return $line;
751             }
752              
753             sub _getline {
754 22     22   16 my $self = shift;
755 22         10 my $fh = shift;
756              
757 22 100       17 if (@{$self->{pushback}}) {
  22         29  
758 2         2 return pop @{$self->{pushback}};
  2         5  
759             }
760              
761 20         19 while (1) {
762 25 100       50 defined(my $line = <$fh>) or return;
763 23         15 chomp $line;
764 23 100       36 $line =~ /^\s*#/ and next;
765 22         32 $line =~ s/\s+#.*$//;
766 22 100       40 $line =~ /\S/ or next;
767 18         23 return $line;
768             }
769             }
770              
771             sub _parse_instance_metadata {
772 1     1   2 my $self = shift;
773 1   33     11 $self->{instance_metadata} ||= VM::EC2::Instance::Metadata->new();
774             }
775              
776             sub _prompt {
777 0     0     my $self = shift;
778 0           my $msg = shift;
779 0 0         -t \*STDIN or return;
780 0           print STDERR $msg;
781 0           my $result = ;
782 0           chomp $result;
783 0           return $result;
784             }
785              
786             ##############################################################################################################
787             # descendent of VM::EC2::Staging::Server that adds a few tricks
788             ##############################################################################################################
789              
790             package Bio::Graphics::Browser2::Render::Slave::StagingServer;
791 1     1   5 use base 'VM::EC2::Staging::Server';
  1         1  
  1         620  
792 1     1   5110 use Sys::Hostname;
  1         757  
  1         44  
793              
794 1     1   4 use constant GB => 1_073_741_824;
  1         2  
  1         45  
795 1     1   3 use constant TB => 1_099_511_627_776;
  1         1  
  1         1325  
796              
797             # return the size of the /opt/gbrowse volume in GB
798             # we intentionally truncate to floor
799             sub volume_size {
800 0     0     my $self = shift;
801 0           my $df = $self->scmd('df -B 1 /opt/gbrowse');
802 0           my ($total,$used,$available) = $df =~ /(\d+)\s+(\d+)\s+(\d+)/;
803 0           return int(0.5 + $total/GB);
804             }
805              
806             sub grow_volume {
807 0     0     my $self = shift;
808 0           my $gig_wanted = shift;
809              
810             # get information about the /dev/volumes/gbrowse lv
811 0           my ($lv,$vg,undef,$size) = split /,/,$self->scmd('sudo lvs /dev/volumes/gbrowse --noheadings --units g --nosuffix --separator ,');
812 0           $lv =~ s/^\s+//;
813 0           my $needed = int($gig_wanted-$size);
814 0 0         return if $needed <= 0;
815              
816 0           $self->info("Resizing /opt/gbrowse to $gig_wanted...\n");
817              
818             # get information about the physical volumes that belong to this group
819 0           my %volumes;
820 0           my $fh = $self->scmd_read('sudo pvs --noheadings --units g --nosuffix --separator ,');
821 0           while (<$fh>) {
822 0           chomp;
823 0           s/^\s+//;
824 0           my ($pv,$vg,undef,undef,$used,$free) = split /,/;
825 0 0         next unless $vg eq 'volumes';
826 0           $volumes{$pv} = $used+$free;
827             }
828 0           close $fh;
829              
830             # select a volume to resize
831 0           my $to_resize;
832 0           for my $pv (sort {$a<=>$b} keys %volumes) {
  0            
833 0 0         if ($volumes{$pv} + $needed < 1000) {
834 0           $to_resize = $pv;
835 0           last;
836             }
837             }
838              
839             # If we found a pv that we can resize sufficiently, then go ahead and do that.
840             # Otherwise, we add a new EBS volume to the volume group.
841 0           $self->info("Unmounting /opt/gbrowse filesystem...\n");
842 0 0         $self->ssh('sudo umount /opt/gbrowse') or die "Couldn't umount";
843              
844 0 0         if ($to_resize) {
845 0           $self->_resize_pv($to_resize,int($volumes{$to_resize}+$needed));
846             } else {
847 0           $self->_extend_vg('volumes',int($needed));
848             }
849            
850             # If we get here, the volume group has been extended, so we can
851             # resize the logical volume and the filesystem
852 0           $self->info("Resizing logical volume...\n");
853 0 0         $self->ssh('sudo lvextend -l +100%FREE /dev/volumes/gbrowse') or die "Couldn't lvresize";
854              
855 0           $self->info("Checking filesystem prior to resizing...\n");
856 0 0         $self->ssh('sudo e2fsck -f -p /dev/volumes/gbrowse') or die "e2fsck failed";
857              
858 0           $self->info("Resizing filesystem...\n");
859 0 0         $self->ssh('sudo resize2fs -p /dev/volumes/gbrowse') or die "Couldn't resize2fs";
860              
861 0           $self->info("Remounting filesystem...\n");
862 0 0         $self->ssh('sudo mount /opt/gbrowse') or die "Couldn't mount";
863              
864 0           1;
865             }
866              
867             sub terminate {
868 0     0     my $self = shift;
869 0 0         $self->manager->unregister_server($self) if $self->manager;
870 0           $self->ec2->terminate_instances($self);
871             }
872              
873             sub start_services {
874 0     0     my $self = shift;
875 0           $self->_start_stop_services('start');
876             }
877              
878             sub stop_services {
879 0     0     my $self = shift;
880 0           $self->_start_stop_services('stop');
881             }
882              
883             sub _start_stop_services {
884 0     0     my $self = shift;
885 0 0         my $action = shift or die "usage: _start_stop_services(start|stop)";
886 0 0         $self->info($action eq 'stop' ? "Stopping services...\n":"Starting services...\n");
887 0           foreach ('apache2','mysql','postgresql') {
888 0           $self->ssh("sudo service $_ $action");
889             }
890             }
891              
892             sub snapshot_data_volumes {
893 0     0     my $self = shift;
894              
895 0           my %volumes;
896 0           my $fh = $self->scmd_read('sudo pvs --noheadings --units g --nosuffix --separator ,');
897 0           while (<$fh>) {
898 0           chomp;
899 0           s/^\s+//;
900 0           my ($pv,$vg,undef,undef,$used,$free) = split /,/;
901 0 0         next unless $vg eq 'volumes';
902 0           $pv =~ s!/dev/xvd!/dev/sd!;
903 0           $volumes{$pv}++;
904             }
905 0           close $fh;
906              
907 0           my $hostname = hostname();
908 0           my $timestamp = localtime();
909              
910             # get the EBS volumes for this device
911 0           my @vols = map {$_->volume} grep {$volumes{$_->deviceName}} $self->blockDeviceMapping;
  0            
  0            
912 0 0         @vols or die "Could not find the EBS volumes to snapshot";
913              
914 0           $self->info("Unmounting filesystem...\n");
915 0 0         $self->ssh('sudo umount /opt/gbrowse') or die "Couldn't umount";
916              
917 0           $self->info("Creating snapshots...\n");
918 0           my @snapshots = map {$_->create_snapshot("GBrowse data volume synchronized with $hostname on $timestamp")} @vols;
  0            
919 0           $_->add_tag(Name => "GBrowse data from ${hostname}\@${timestamp}") foreach @snapshots;
920              
921 0           $self->info("Remounting filesystem...\n");
922 0 0         $self->ssh('sudo mount /opt/gbrowse') or die "Couldn't mount";
923              
924 0           return @snapshots;
925             }
926              
927             sub _extend_vg {
928 0     0     my $self = shift;
929 0           my ($vg,$size) = @_;
930            
931 0           my ($ebs_device,$local_device) = $self->unused_block_device();
932 0           $self->info("Creating ${size}G EBS volume...\n");
933 0 0         my $vol = $self->ec2->create_volume(-availability_zone => $self->placement,
934             -size => $size) or die "Couldn't create EBS volume: ",$self->ec2->error_str;
935 0           $self->ec2->wait_for_volumes($vol);
936 0 0         $vol->current_status eq 'available' or die "EBS volume creation failed: ",$self->ec2->error_str;
937            
938 0 0         my $a = $vol->attach($self => $ebs_device) or die "EBS volume attachment failed: ",$self->ec2->error_str;
939 0           $self->ec2->wait_for_attachments($a);
940 0 0         $a->current_status eq 'attached' or die "Volume attachment failed: ",$self->ec2->error_str;
941              
942 0           $a->deleteOnTermination(1);
943              
944 0           $self->info("Creating LVM2 physical device...\n");
945 0 0         $self->ssh("sudo pvcreate $local_device") or die "pvcreate failed";
946              
947 0           $self->info("Extending 'volumes' volume group...\n");
948 0 0         $self->ssh("sudo vgextend volumes $local_device") or die "vgextend failed";
949              
950 0           1;
951             }
952              
953             sub _resize_pv {
954 0     0     my $self = shift;
955 0           my ($device,$new_size) = @_;
956              
957             # get the EBS volume for this device
958 0           my @mapping = $self->blockDeviceMapping;
959 0           (my $ebs_device = $device) =~ s!/dev/xvd!/dev/sd!;
960 0           my ($mapping) = grep /$ebs_device/,@mapping;
961 0 0         $mapping or die "Couldn't find an EBS mapping for $device";
962              
963 0           my $volume_id = $mapping->volumeId;
964 0           my $volume = $mapping->volume;
965            
966             # disable the volume group
967 0 0         $self->ssh('sudo vgchange -an volumes') or die "Couldn't vgchange";
968            
969             # detach the underlying device
970 0           $self->info("Detaching volume $volume...\n");
971 0 0         my $a = $volume->detach or die "Couldn't detach";
972 0           $self->ec2->wait_for_attachments($a);
973              
974             # snapshot it
975 0           $self->info("Snapshotting volume $volume...\n");
976 0 0         my $snapshot = $volume->create_snapshot('created by '.__PACKAGE__) or die "Couldn't snapshot: ",$self->ec2->error_str;
977 0           $self->ec2->wait_for_snapshots($snapshot);
978 0 0         $snapshot->current_status eq 'completed' or die "Snapshot errored: ",$self->ec2->error_str;
979            
980             # create a new volume of the appropriate size
981 0           $self->info("Creating new volume from snapshot...\n");
982 0           my $zone = $volume->availabilityZone;
983            
984 0 0         my $new_volume = $self->ec2->create_volume(-availability_zone => $zone,
985             -size => $new_size,
986             -snapshot_id => $snapshot) or die "Couldn't create volume: ",$self->ec2->error_str;
987 0           $self->ec2->wait_for_volumes($new_volume);
988 0 0         $new_volume->current_status eq 'available' or die "Volume error: ",$self->ec2->error_str;
989              
990 0           $self->info("Attaching new volume...\n");
991 0           $a = $self->attach_volume($new_volume => $ebs_device);
992 0           $self->ec2->wait_for_attachments($a);
993 0           $new_volume->deleteOnTermination(1);
994              
995             # activate
996 0           $self->info("Resizing physical volume...\n");
997 0 0         $self->ssh("sudo pvresize $device") or die "Couldn't pvresize";
998 0 0         $self->ssh('sudo vgchange -ay volumes') or die "Couldn't vgchange";
999              
1000             # get rid of the old volume and the new snapshot (which we no longer need)
1001 0           $self->ec2->delete_volume($volume);
1002 0           $self->ec2->delete_snapshot($snapshot);
1003              
1004 0           1;
1005             }
1006              
1007             1;
1008              
1009             =head1 AUTHOR
1010              
1011             Lincoln Stein Elincoln.stein@gmail.comE.
1012              
1013             Copyright (c) 2012 Ontario Institute for Cancer Research
1014              
1015             This package and its accompanying libraries is free software; you can
1016             redistribute it and/or modify it under the terms of the GPL (either
1017             version 1, or at your option, any later version) or the Artistic
1018             License 2.0. Refer to LICENSE for the full license text. In addition,
1019             please see DISCLAIMER.txt for disclaimers of warranty.
1020              
1021             =cut
1022