* Distributed builds and load balancing now seem to work pretty well.

(Though the `build-remote.pl' script has a gigantic race condition).
This commit is contained in:
Eelco Dolstra 2004-05-13 22:52:37 +00:00
parent 2fa3304933
commit 4fc00cbec1
2 changed files with 101 additions and 39 deletions

View file

@ -80,6 +80,21 @@ if (!defined $machine) {
}
}
sub writeLoad {
system "echo A >> /tmp/blaaaa";
open LOAD, "> /home/eelco/nix/distributed/current-load" or die;
system "echo B >> /tmp/blaaaa";
foreach my $cur (keys %machines) {
system "echo $cur $curJobs{$cur} >> /tmp/blaaaa";
print LOAD "$cur $curJobs{$cur}\n";
}
system "echo C >> /tmp/blaaaa";
close LOAD;
}
$curJobs{$machine} = $curJobs{$machine} + 1;
writeLoad;
sendReply "accept";
open IN, "<&4" or die;
my $x = <IN>;
@ -87,30 +102,27 @@ chomp $x;
print "got $x\n";
close IN;
system "echo $x >> /tmp/blaaaa";
system "echo $curJobs{$machine} >> /tmp/blaaaa";
if ($x ne "okay") {
$curJobs{$machine} = $curJobs{$machine} - 1;
system "echo $curJobs{$machine} >> /tmp/blaaaa";
writeLoad;
exit 0;
}
print "BUILDING REMOTE: $storeExpr on $machine\n";
$curJobs{$machine} = $curJobs{$machine} + 1;
sub writeLoad {
open LOAD, "> /home/eelco/nix/distributed/current-load" or die;
foreach my $cur (keys %machines) {
print LOAD "$cur $curJobs{$cur}\n";
}
close LOAD;
}
writeLoad
my $ssh = "ssh -i $sshKeys{$machine} -x";
my $inputs = `cat inputs`;
my $inputs = `cat inputs` or die;
$inputs =~ s/\n/ /g;
my $outputs = `cat outputs`;
my $outputs = `cat outputs` or die;
$outputs =~ s/\n/ /g;
my $successors = `cat successors`;
my $successors = `cat successors` or die;
$successors =~ s/\n/ /g;
system "rsync -a -e '$ssh' $storeExpr $inputs $machine:/nix/store";
@ -136,4 +148,4 @@ foreach my $output (split '\n', $outputs) {
$curJobs{$machine} = $curJobs{$machine} - 1;
writeLoad
writeLoad;

View file

@ -91,6 +91,9 @@ struct Goal
/* The remainder is state held during the build. */
/* Whether it's being built by a hook or by ourselves. */
bool inHook;
/* Locks on the output paths. */
PathLocks outputLocks;
@ -201,6 +204,11 @@ private:
of derivation expressions that have yet to be normalised. */
Goals goals;
/* Finished goals are removed in run() at top-level; they are not
deleted as soon as they are finished, since there may be
references hanging about. */
PathSet deadGoals;
/* The set of `buildable' goals, which are the ones with an empty
list of unfinished inputs. */
PathSet buildable;
@ -237,7 +245,10 @@ private:
void startBuildChild(Goal & goal);
bool tryBuildHook(Goal & goal);
typedef enum {rpAccept, rpDecline, rpPostpone} HookReply;
HookReply tryBuildHook(Goal & goal);
void terminateBuildHook(Goal & goal);
void openLogFile(Goal & goal);
@ -353,6 +364,12 @@ void Normaliser::run()
do {
printMsg(lvlVomit, "waiting for children");
} while (!waitForChildren());
/* Remove finished goals from the graph. */
for (PathSet::iterator i = deadGoals.begin();
i != deadGoals.end(); ++i)
goals.erase(*i);
deadGoals.clear();
}
assert(buildable.empty() && building.empty());
@ -361,7 +378,12 @@ void Normaliser::run()
bool Normaliser::canBuildMore()
{
return building.size() < maxBuildJobs;
/* !!! O(n) - not that it matters */
unsigned int localJobs = 0;
for (Building::iterator i = building.begin();
i != building.end(); ++i)
if (!goals[i->second].inHook) localJobs++;
return localJobs < maxBuildJobs;
}
@ -378,13 +400,19 @@ bool Normaliser::startBuild(Path nePath)
format("starting normalisation of goal `%1%'") % nePath);
/* Is the build hook willing to accept this job? */
if (tryBuildHook(goal)) return true;
switch (tryBuildHook(goal)) {
case rpAccept: return true;
case rpPostpone: return false;
case rpDecline: ;
}
if (!canBuildMore()) {
debug("postponing build");
return false;
}
goal.inHook = false;
/* Prepare the build, i.e., acquire locks and gather necessary
information. */
if (!prepareBuild(goal)) return true;
@ -616,10 +644,17 @@ string readLine(int fd)
}
bool Normaliser::tryBuildHook(Goal & goal)
void writeLine(int fd, string s)
{
s += '\n';
writeFull(fd, (const unsigned char *) s.c_str(), s.size());
}
Normaliser::HookReply Normaliser::tryBuildHook(Goal & goal)
{
Path buildHook = getEnv("NIX_BUILD_HOOK");
if (buildHook == "") return false;
if (buildHook == "") return rpDecline;
buildHook = absPath(buildHook);
/* Create a directory where we will store files used for
@ -676,21 +711,17 @@ bool Normaliser::tryBuildHook(Goal & goal)
if (reply == "decline" || reply == "postpone") {
/* Clean up the child. !!! hacky / should verify */
/* !!! drain stdout of hook, wait for child process */
goal.pid = 0;
goal.fromHook.closeReadSide();
goal.toHook.closeWriteSide();
close(goal.fdLogFile);
goal.fdLogFile = 0;
goal.logPipe.closeReadSide();
building.erase(pid);
return reply == "postpone";
terminateBuildHook(goal);
return reply == "decline" ? rpDecline : rpPostpone;
}
else if (reply == "accept") {
if (!prepareBuild(goal))
throw Error("NOT IMPLEMENTED: hook unnecessary");
if (!prepareBuild(goal)) {
writeLine(goal.toHook.writeSide, "cancel");
terminateBuildHook(goal);
return rpAccept;
}
Path inputListFN = goal.tmpDir + "/inputs";
Path outputListFN = goal.tmpDir + "/outputs";
@ -717,17 +748,35 @@ bool Normaliser::tryBuildHook(Goal & goal)
s += i->first + " " + i->second + "\n";
writeStringToFile(successorsListFN, s);
string okay = "okay\n";
writeFull(goal.toHook.writeSide,
(const unsigned char *) okay.c_str(), okay.size());
writeLine(goal.toHook.writeSide, "okay");
return true;
goal.inHook = true;
return rpAccept;
}
else throw Error(format("bad hook reply `%1%'") % reply);
}
void Normaliser::terminateBuildHook(Goal & goal)
{
/* !!! drain stdout of hook, wait for child process */
debug("terminating build hook");
pid_t pid = goal.pid;
int status;
if (waitpid(goal.pid, &status, 0) != goal.pid)
printMsg(lvlError, format("process `%1%' missing") % goal.pid);
goal.pid = 0;
goal.fromHook.closeReadSide();
goal.toHook.closeWriteSide();
close(goal.fdLogFile);
goal.fdLogFile = 0;
goal.logPipe.closeReadSide();
building.erase(pid);
}
void Normaliser::openLogFile(Goal & goal)
{
/* Create a log file. */
@ -841,7 +890,7 @@ bool Normaliser::waitForChildren()
Goal & goal(goals[i->second]);
int fd = goal.logPipe.readSide;
if (FD_ISSET(fd, &fds)) {
unsigned char buffer[1024];
unsigned char buffer[4096];
ssize_t rd = read(fd, buffer, sizeof(buffer));
if (rd == -1) {
if (errno != EINTR)
@ -1047,9 +1096,10 @@ void Normaliser::removeGoal(Goal & goal)
}
}
/* Remove this goal from the graph. Careful: after this `goal' is
probably no longer valid. */
goals.erase(goal.nePath);
/* Lazily remove the goal from the graph (it will be actually
removed in run(); this is since callers may have references to
`goal'). */
deadGoals.insert(goal.nePath);
}