File Coverage

blib/lib/Schedule/Load.pm
Criterion Covered Total %
statement 108 125 86.4
branch 16 40 40.0
condition 7 18 38.8
subroutine 22 25 88.0
pod n/a
total 153 208 73.5


line stmt bran cond sub pod time code
1             # Load.pm -- Schedule load management
2             # See copyright, etc in below POD section.
3             ######################################################################
4              
5             require 5.005;
6             package Schedule::Load;
7             require Exporter;
8             @ISA = ('Exporter');
9             @EXPORT = qw( );
10             @EXPORT_OK = qw(_min _max _nfreeze _nthaw _pfreeze _pthaw);
11             %EXPORT_TAGS = (_utils => \@EXPORT_OK);
12              
13 4     4   20 use vars qw($VERSION $Debug %Machines %_Default_Params $Default_Port @Default_Hosts);
  4         9  
  4         318  
14              
15 4     4   4683 use IO::Pipe;
  4         4881  
  4         102  
16 4     4   22 use IO::File;
  4         8  
  4         521  
17 4     4   20 use IO::Socket;
  4         5  
  4         27  
18 4     4   2717 use Sys::Hostname;
  4         7  
  4         157  
19 4     4   4098 use Storable qw();
  4         14108  
  4         106  
20 4     4   34 use Socket;
  4         6  
  4         4020  
21             require Exporter;
22 4     4   242 BEGIN { eval 'use Data::Dumper; $Data::Dumper::Indent=1;';} #Ok if doesn't exist: debugging only
  4     4   1202  
  4         13219  
  4         202  
23 4     4   3553 use POSIX qw (EWOULDBLOCK BUFSIZ);
  4         35393  
  4         36  
24 4     4   4837 use strict;
  4         9  
  4         130  
25 4     4   20 use Carp;
  4         6  
  4         713  
26              
27             ######################################################################
28             #### Configuration Section
29              
30             $VERSION = '3.064';
31             $Debug = 0;
32              
33             %_Default_Params = (
34             min_pctcpu=>3,
35             port=>((defined $ENV{SLCHOOSED_PORT}) # Debugging
36             ? $ENV{SLCHOOSED_PORT}
37             : (getservbyname ('slchoosed',"")
38             ? 'slchoosed' : 1752)),
39             dhost=> [(defined $ENV{SLCHOOSED_HOST})
40             ? split ':', $ENV{SLCHOOSED_HOST}
41             : qw(localhost)],
42             req_retries=>3, # Number of tries before we bail out
43             req_retry_delay=>10, # After being closed, delay for failover server to come up
44             );
45              
46             ######################################################################
47             #### Internal utilities
48              
49             sub _subprocesses {
50 4   66 4   6037373 my $parent = shift || $$;
51 4         32 my $pt = shift; # Generally undef, unless happen to have process table already
52             # All pids under the given parent
53             # Used by testing module
54             # Same function in Parallel::Forker::_subprocesses
55 4     4   2783 use Proc::ProcessTable;
  4         35718  
  4         2082  
56 4 100       86 if (!$pt) {
57 1         76 $pt = new Proc::ProcessTable( 'cache_ttys' => 1);
58             }
59 4         791 my %parent_pids;
60 4         7 foreach my $p (@{$pt->table}) {
  4         19382  
61 74         2646 $parent_pids{$p->pid} = $p->ppid;
62             }
63 4         98 my @out;
64 4         14 my @search = ($parent);
65 4         28 while ($#search > -1) {
66 19         29 my $pid = shift @search;
67 19 100       68 push @out, $pid if $pid ne $parent;
68 19         85 foreach (keys %parent_pids) {
69 362 100       721 push @search, $_ if $parent_pids{$_} == $pid;
70             }
71             }
72 4         110 return @out;
73             }
74              
75             sub _min {
76 0 0   0   0 return $_[0] if (!defined $_[1]);
77 0 0       0 return $_[1] if (!defined $_[0]);
78 0 0       0 return $_[0] if ($_[0] <= $_[1]);
79 0         0 return $_[1];
80             }
81             sub _max {
82 0 0   0   0 return $_[0] if (!defined $_[1]);
83 0 0       0 return $_[1] if (!defined $_[0]);
84 0 0       0 return $_[0] if ($_[0] >= $_[1]);
85 0         0 return $_[1];
86             }
87              
88             sub _pfreeze {
89 12     12   59 my $cmd = shift;
90 12         27 my $ref = shift;
91 12         21 my $debug = shift;
92              
93 12         179 my $serialized = $cmd . " " . unpack ("h*", Storable::nfreeze $ref) . "\n";
94 12 50       1146 if ($debug) {
95 0         0 printf "AFREEZE $cmd: %s\n", Data::Dumper::Dumper($ref);
96             }
97 12         77 return $serialized;
98             }
99              
100             sub _pthaw {
101 80     80   210 my $line = shift;
102 80         119 my $debug = shift;
103              
104 80         364 $line =~ /^(\S+)\s*(\S*)/;
105 80         183 my $cmd = $1; my $serialized = $2;
  80         196  
106              
107 80         100 my $ref;
108 80 100       223 if ($serialized) {
109 60         470 eval {
110             # Tolerate storable version mismatches. The error will look like
111             # "Storable binary image v2.7 more recent then I am"
112 60         658 $ref = Storable::thaw(pack ("h*", $serialized));
113             };
114 60 0 33     2147 if (!$ref && $@) {
115 0         0 $cmd = "THAW_ERROR-".$@;
116             }
117             }
118 80 50       193 if ($debug) {
119 0         0 print "$cmd: ", Data::Dumper::Dumper($ref);
120             }
121 80         301 return ($cmd, $ref);
122             }
123              
124             ######################################################################
125             ######################################################################
126             ######################################################################
127             #### Internal socket class, so we can override NEW
128              
129             package Schedule::Load::Socket;
130 4     4   33 use IO::Socket;
  4         8  
  4         36  
131 4     4   9140 use Time::HiRes qw(usleep gettimeofday);
  4         6560  
  4         18  
132              
133 4     4   829 use strict;
  4         8  
  4         114  
134 4     4   26 use vars qw(@ISA);
  4         10  
  4         1766  
135             @ISA = qw(IO::Socket::INET);
136              
137             sub new {
138 1     1   10 my $class = shift;
139              
140 1         22 my %params = (@_);
141             # There is a bug in the socket that it requires untainted peer address
142             # it will just silently fail if you give it a tainted host name
143 1 50       6 if ($params{PeerAddr}) {
144 1         18 $params{PeerAddr} =~ /([a-z0-9A-Z._-]*)/; $params{PeerAddr}=$1; # Untaint
  1         27  
145             }
146 1 50       3 if ($params{PeerPort}) {
147 1         13 $params{PeerPort} =~ /([a-z0-9A-Z._-]*)/; $params{PeerPort}=$1; # Untaint
  1         4  
148             }
149              
150 1         2 my $fh;
151 1         11 $? = 0;
152             {
153 1         2 local $SIG{__WARN__} = sub {
154 0 0   0   0 return if $_[0] =~ /Connection refused/;
155 0         0 warn @_;
156 1         28 };
157 1         69 $fh = $class->SUPER::new(
158             Proto => 'tcp',
159             %params);
160             }
161 1 50       1448 $fh = undef if $?;
162 1         6 return $fh;
163             }
164              
165             sub send_and_check {
166 20     20   34 my $fh = shift;
167 20         65 my $out = join "", @_;
168             # Send any arguments to the filehandle
169             # Returns 0 if failed, else 1
170 20         63 while ($out ne "") {
171 20 50 33     162 if (!$fh || !$fh->connected()) {
172 0         0 return 0;
173             }
174 20         267 my $rv = eval { return $fh->syswrite($out); };
  20         167  
175             # Node ->connected call does a system getpeeraddr() call
176 20 50 33     32193 if (!$fh || !$fh->connected() || ($! && $! != POSIX::EWOULDBLOCK)) {
      33        
      33        
177 0         0 return 0;
178             }
179 20 50       567 if (!defined $rv) { usleep 1000; next; } # Couldn't write: very rare
  0         0  
  0         0  
180             # Truncate what did get out
181 20         130 $out = substr ($out, $rv);
182             }
183 20         55 return 1;
184             }
185              
186             package Schedule::Load;
187              
188             ######################################################################
189             ######################################################################
190             ######################################################################
191             #### Package return
192             1;
193              
194             ######################################################################
195             __END__