File Coverage

blib/lib/MogileFS/Server.pm
Criterion Covered Total %
statement 156 246 63.4
branch 7 42 16.6
condition 1 14 7.1
subroutine 49 68 72.0
pod 0 2 0.0
total 213 372 57.2


line stmt bran cond sub pod time code
1             package MogileFS::Server;
2 21     21   3016346 use strict;
  21         297  
  21         953  
3 21     21   125 use warnings;
  21         42  
  21         1254  
4 21     21   116 use vars qw($VERSION);
  21         46  
  21         2048  
5             $VERSION = "2.70";
6              
7             =head1 NAME
8              
9             MogileFS::Server - MogileFS (distributed filesystem) server
10              
11             =head1 SYNOPSIS
12              
13             $s = MogileFS::Server->server;
14             $s->run;
15              
16             =cut
17              
18 21     21   30048 use IO::Socket;
  21         774754  
  21         124  
19 21     21   15119 use Symbol;
  21         49  
  21         1323  
20 21     21   21739 use POSIX;
  21         6676547  
  21         264  
21 21     21   110945 use File::Copy ();
  21         884426  
  21         624  
22 21     21   194 use Carp;
  21         124  
  21         3499  
23 21     21   141 use File::Basename ();
  21         36  
  21         440  
24 21     21   307 use File::Path ();
  21         43  
  21         421  
25 21     21   45435 use Sys::Syslog ();
  21         1801820  
  21         709  
26 21     21   33363 use Time::HiRes ();
  21         42998  
  21         841  
27 21     21   23717 use Net::Netmask;
  21         890267  
  21         3621  
28 21     21   302 use List::Util;
  21         45  
  21         2587  
29 21     21   128 use Socket qw(SO_KEEPALIVE IPPROTO_TCP TCP_NODELAY);
  21         45  
  21         2940  
30              
31 21     21   16002 use MogileFS::Util qw(daemonize);
  21         72  
  21         2002  
32 21     21   17641 use MogileFS::Config;
  21         91  
  21         3962  
33              
34 21     21   137 use MogileFS::ProcManager;
  21         169  
  21         572  
35 21     21   201 use MogileFS::Connection::Client;
  21         66  
  21         509  
36 21     21   128 use MogileFS::Connection::Worker;
  21         52  
  21         957  
37              
38 21     21   26257 use MogileFS::Worker::Query;
  21         104  
  21         1181  
39 21     21   19410 use MogileFS::Worker::Delete;
  21         76  
  21         698  
40 21     21   26118 use MogileFS::Worker::Replicate;
  21         101  
  21         1046  
41 21     21   15714 use MogileFS::Worker::Reaper;
  21         74  
  21         632  
42 21     21   15651 use MogileFS::Worker::Monitor;
  21         94  
  21         1083  
43 21     21   19583 use MogileFS::Worker::Fsck;
  21         87  
  21         885  
44 21     21   15341 use MogileFS::Worker::JobMaster;
  21         78  
  21         700  
45              
46 21     21   13922 use MogileFS::Factory::Domain;
  21         76  
  21         591  
47 21     21   11638 use MogileFS::Factory::Class;
  21         83  
  21         815  
48 21     21   13071 use MogileFS::Factory::Host;
  21         94  
  21         737  
49 21     21   17264 use MogileFS::Factory::Device;
  21         79  
  21         1702  
50 21     21   439 use MogileFS::Domain;
  21         46  
  21         493  
51 21     21   129 use MogileFS::Class;
  21         43  
  21         654  
52 21     21   121 use MogileFS::Host;
  21         48  
  21         433  
53 21     21   113 use MogileFS::Device;
  21         48  
  21         368  
54              
55 21     21   134 use MogileFS::HTTPFile;
  21         57  
  21         475  
56 21     21   15696 use MogileFS::FID;
  21         75  
  21         647  
57 21     21   13634 use MogileFS::DevFID;
  21         69  
  21         635  
58              
59 21     21   178 use MogileFS::Store;
  21         93  
  21         708  
60              
61 21     21   17720 use MogileFS::ReplicationPolicy::MultipleHosts;
  21         79  
  21         18000  
62              
63             my $server; # server singleton
64             sub server {
65 0     0 0 0 my ($pkg) = @_;
66 0   0     0 return $server ||= bless {}, $pkg;
67             }
68              
69             # --------------------------------------------------------------------------
70             # instance methods:
71             # --------------------------------------------------------------------------
72              
73             sub run {
74 0     0 0 0 my $self = shift;
75              
76 0         0 MogileFS::Config->load_config;
77              
78             # don't run as root
79 0 0 0     0 die "mogilefsd cannot be run as root\n"
80             if $< == 0 && MogileFS->config('user') ne "root";
81              
82 0         0 MogileFS::Config->check_database;
83 0 0       0 daemonize() if MogileFS->config("daemonize");
84              
85 0         0 MogileFS::ProcManager->set_min_workers('monitor' => 1);
86              
87             # open up our log
88 0         0 Sys::Syslog::openlog('mogilefsd', 'pid', 'daemon');
89 0         0 Mgd::log('info', 'beginning run');
90              
91 0 0       0 unless (MogileFS::ProcManager->write_pidfile) {
92 0         0 Mgd::log('info', "Couldn't write pidfile, ending run");
93 0         0 Sys::Syslog::closelog();
94 0         0 exit 1;
95             }
96              
97             # Install signal handlers.
98             $SIG{TERM} = sub {
99 0     0   0 my @children = MogileFS::ProcManager->child_pids;
100 0 0       0 print STDERR scalar @children, " children to kill.\n" if $DEBUG;
101 0         0 my $count = kill( 'TERM' => @children );
102 0 0       0 print STDERR "Sent SIGTERM to $count children.\n" if $DEBUG;
103 0         0 MogileFS::ProcManager->remove_pidfile;
104 0         0 Mgd::log('info', 'ending run due to SIGTERM');
105 0         0 Sys::Syslog::closelog();
106              
107 0         0 exit 0;
108 0         0 };
109              
110             $SIG{INT} = sub {
111 0     0   0 my @children = MogileFS::ProcManager->child_pids;
112 0 0       0 print STDERR scalar @children, " children to kill.\n" if $DEBUG;
113 0         0 my $count = kill( 'INT' => @children );
114 0 0       0 print STDERR "Sent SIGINT to $count children.\n" if $DEBUG;
115 0         0 MogileFS::ProcManager->remove_pidfile;
116 0         0 Mgd::log('info', 'ending run due to SIGINT');
117 0         0 exit 0;
118 0         0 };
119 0         0 $SIG{PIPE} = 'IGNORE'; # catch them by hand
120              
121             # setup server sockets to listen for client connections
122 0         0 my @servers;
123 0         0 foreach my $listen (@{ MogileFS->config('listen') }) {
  0         0  
124 0 0       0 my $server = IO::Socket::INET->new(LocalAddr => $listen,
125             Type => SOCK_STREAM,
126             Proto => 'tcp',
127             Blocking => 0,
128             Reuse => 1,
129             Listen => 1024 )
130             or die "Error creating socket: $@\n";
131 0         0 $server->sockopt(SO_KEEPALIVE, 1);
132 0         0 $server->setsockopt(IPPROTO_TCP, TCP_NODELAY, 1);
133              
134             # save sub to accept a client
135 0         0 push @servers, $server;
136             Danga::Socket->AddOtherFds( fileno($server) => sub {
137 0     0   0 while (my $csock = $server->accept) {
138 0         0 MogileFS::Connection::Client->new($csock);
139             }
140 0         0 } );
141             }
142              
143             MogileFS::ProcManager->push_pre_fork_cleanup(sub {
144             # so children don't hold server connection open
145 0     0   0 close($_) foreach @servers;
146 0         0 });
147              
148             # setup the post event loop callback to spawn jobs, and the timeout
149 0         0 Danga::Socket->DebugLevel(3);
150 0         0 Danga::Socket->SetLoopTimeout( 250 ); # 250 milliseconds
151 0         0 Danga::Socket->SetPostLoopCallback(MogileFS::ProcManager->PostEventLoopChecker);
152              
153             # and now, actually start listening for events
154 0         0 eval {
155 0 0       0 print( "Starting event loop for frontend job on pid $$.\n" ) if $DEBUG;
156 0         0 Danga::Socket->EventLoop();
157             };
158              
159 0 0       0 if ($@) {
160 0         0 Mgd::log('err', "crash log: $@");
161 0         0 exit 1;
162             }
163 0         0 Mgd::log('info', 'ending run');
164 0         0 Sys::Syslog::closelog();
165 0         0 exit(0);
166             }
167              
168             # --------------------------------------------------------------------------
169              
170             package MogileFS;
171             # just so MogileFS->config($key) will work:
172 21     21   171 use MogileFS::Config qw(config);
  21         158  
  21         5054  
173              
174             my %hooks;
175              
176             sub register_worker_command {
177             # just pass this through to the Worker class
178 0     0   0 return MogileFS::Worker::Query::register_command(@_);
179             }
180              
181             sub register_global_hook {
182 0     0   0 $hooks{$_[0]} = $_[1];
183 0         0 return 1;
184             }
185              
186             sub unregister_global_hook {
187 0     0   0 delete $hooks{$_[0]};
188 0         0 return 1;
189             }
190              
191             sub run_global_hook {
192 0     0   0 my $hookname = shift;
193 0         0 my $ref = $hooks{$hookname};
194 0 0       0 return $ref->(@_) if defined $ref;
195 0         0 return undef;
196             }
197              
198             # --------------------------------------------------------------------------
199              
200             package Mgd; # conveniently short name
201 21     21   132 use strict;
  21         46  
  21         834  
202 21     21   126 use warnings;
  21         48  
  21         820  
203 21     21   118 use MogileFS::Config;
  21         225  
  21         3469  
204 21     21   129 use MogileFS::Util qw(error fatal debug); # for others calling Mgd::foo()
  21         46  
  21         1281  
205 21     21   135 use Socket qw(SOL_SOCKET SO_RCVBUF AF_UNIX SOCK_STREAM PF_UNSPEC);
  21         43  
  21         3981  
206             BEGIN {
207             # detect the receive buffer size for Unix domain stream sockets,
208             # we assume the size is identical across all Unix domain sockets.
209 21 50   21   1798 socketpair(my $s1, my $s2, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
210             or die( "socketpair failed: $!" );
211              
212 21         244 my $r = getsockopt($s1, SOL_SOCKET, SO_RCVBUF);
213 21 50       137 defined $r or die "getsockopt: $!";
214 21 50       182 $r = unpack('i', $r) if defined $r;
215 21 50 33     211 $r = (defined $r && $r > 0) ? $r : 8192;
216 21         357 close $s1;
217 21         246 close $s2;
218 21     21   1660 eval 'use constant UNIX_RCVBUF_SIZE => $r';
  21         3267  
  21         51  
  21         11939  
219             }
220              
221             sub server {
222 0     0   0 return MogileFS::Server->server;
223             }
224              
225             # database checking/connecting
226             sub validate_dbh {
227 0     0   0 my $sto = Mgd::get_store();
228 0         0 my $had_dbh = $sto->have_dbh;
229 0         0 $sto->recheck_dbh();
230 0         0 my $dbh;
231 0         0 eval { $dbh = $sto->dbh };
  0         0  
232             # Doesn't matter what the failure was; workers should retry later.
233 0 0 0     0 error("Error validating master DB: $@") if $@ && $had_dbh;
234 0         0 return $dbh;
235             }
236              
237             # the eventual replacement for callers asking for a dbh directly:
238             # they'll ask for the current store, which is a database abstraction
239             # layer.
240             my ($store, $store_pid);
241             sub get_store {
242 0 0 0 0   0 return $store if $store && $store_pid == $$;
243 0         0 $store_pid = $$;
244 0         0 return $store = MogileFS::Store->new;
245             }
246              
247             sub close_store {
248 0 0   0   0 if ($store) {
249 0         0 $store->dbh->disconnect();
250 0         0 $store = undef;
251 0         0 return 1;
252             }
253 0         0 return 0;
254             }
255              
256             # only for t/ scripts to explicitly set a store, without loading in a config
257             sub set_store {
258 0     0   0 my ($s) = @_;
259 0         0 $store = $s;
260 0         0 $store_pid = $$;
261             }
262              
263             sub domain_factory {
264 0     0   0 return MogileFS::Factory::Domain->get_factory;
265             }
266              
267             sub class_factory {
268 0     0   0 return MogileFS::Factory::Class->get_factory;
269             }
270              
271             sub host_factory {
272 0     0   0 return MogileFS::Factory::Host->get_factory;
273             }
274              
275             sub device_factory {
276 0     0   0 return MogileFS::Factory::Device->get_factory;
277             }
278              
279             # log stuff to syslog or the screen
280             sub log {
281             # simple logging functionality
282 4 50   4   15 if (! $MogileFS::Config::daemonize) {
283 4         42 $| = 1;
284             # syslog acts like printf so we have to use printf and append a \n
285 4         10 shift; # ignore the first parameter (info, warn, critical, etc)
286 4         13 my $mask = shift; # format string
287 4 50       46 $mask .= "\n" unless $mask =~ /\n$/;
288 4 50       42 my $message = @_ ? sprintf($mask, @_) : $mask;
289 4         1744 print '[', scalar localtime(), '] ', $message;
290             } else {
291             # just pass the parameters to syslog
292 0           Sys::Syslog::syslog(@_);
293             }
294             }
295              
296             1;
297             __END__