libstore: turn Worker in a kj event loop user

using a proper event loop basis we no longer have to worry about most of
the intricacies of poll(), or platform-dependent replacements for it. we
may even be able to use the event loop and its promise system for all of
our scheduling in the future. we don't do any real async processing yet,
this is just preparation to separate the first such change from the huge
api design difference with the async framework we chose (kj from capnp):

kj::Promise, unlike std::future, doesn't return exceptions unmangled. it
instead wraps any non-kj exception into a kj exception, erasing all type
information and preserving mostly the what() string in the process. this
makes sense in the capnp rpc use case where unrestricted exception types
can't be transferred, and since it moves error handling styles closer to
a world we'd actually like there's no harm in doing it only here for now

Change-Id: I20f888de74d525fb2db36ca30ebba4bcfe9cc838
This commit is contained in:
eldritch horrors 2024-09-01 01:37:10 +02:00
parent 92eccfbd68
commit f2a49032a6
15 changed files with 225 additions and 135 deletions

View file

@ -229,6 +229,7 @@ configdata += {
}
boost = dependency('boost', required : true, modules : ['container'], include_type : 'system')
kj = dependency('kj-async', required : true, include_type : 'system')
# cpuid only makes sense on x86_64
cpuid_required = is_x64 ? get_option('cpuid') : false

View file

@ -15,6 +15,8 @@
brotli,
bzip2,
callPackage,
capnproto-lix ? __forDefaults.capnproto-lix,
capnproto,
cmake,
curl,
doxygen,
@ -83,6 +85,9 @@
});
build-release-notes = callPackage ./maintainers/build-release-notes.nix { };
# needs explicit c++20 to enable coroutine support
capnproto-lix = capnproto.overrideAttrs { CXXFLAGS = "-std=c++20"; };
},
}:
let
@ -220,6 +225,7 @@ stdenv.mkDerivation (finalAttrs: {
ninja
cmake
rustc
capnproto-lix
]
++ [
(lib.getBin lowdown)
@ -260,6 +266,7 @@ stdenv.mkDerivation (finalAttrs: {
libsodium
toml11
pegtl
capnproto-lix
]
++ lib.optionals hostPlatform.isLinux [
libseccomp

View file

@ -131,7 +131,7 @@ Goal::Finished DerivationGoal::timedOut(Error && ex)
}
Goal::WorkResult DerivationGoal::work(bool inBuildSlot)
kj::Promise<Result<Goal::WorkResult>> DerivationGoal::work(bool inBuildSlot) noexcept
{
return (this->*state)(inBuildSlot);
}
@ -157,8 +157,8 @@ void DerivationGoal::addWantedOutputs(const OutputsSpec & outputs)
}
Goal::WorkResult DerivationGoal::getDerivation(bool inBuildSlot)
{
kj::Promise<Result<Goal::WorkResult>> DerivationGoal::getDerivation(bool inBuildSlot) noexcept
try {
trace("init");
/* The first thing to do is to make sure that the derivation
@ -170,16 +170,22 @@ Goal::WorkResult DerivationGoal::getDerivation(bool inBuildSlot)
state = &DerivationGoal::loadDerivation;
return WaitForGoals{{worker.goalFactory().makePathSubstitutionGoal(drvPath)}};
return {WaitForGoals{{worker.goalFactory().makePathSubstitutionGoal(drvPath)}}};
} catch (...) {
return {std::current_exception()};
}
Goal::WorkResult DerivationGoal::loadDerivation(bool inBuildSlot)
{
kj::Promise<Result<Goal::WorkResult>> DerivationGoal::loadDerivation(bool inBuildSlot) noexcept
try {
trace("loading derivation");
if (nrFailed != 0) {
return done(BuildResult::MiscFailure, {}, Error("cannot build missing derivation '%s'", worker.store.printStorePath(drvPath)));
return {done(
BuildResult::MiscFailure,
{},
Error("cannot build missing derivation '%s'", worker.store.printStorePath(drvPath))
)};
}
/* `drvPath' should already be a root, but let's be on the safe
@ -202,11 +208,13 @@ Goal::WorkResult DerivationGoal::loadDerivation(bool inBuildSlot)
assert(drv);
return haveDerivation(inBuildSlot);
} catch (...) {
return {std::current_exception()};
}
Goal::WorkResult DerivationGoal::haveDerivation(bool inBuildSlot)
{
kj::Promise<Result<Goal::WorkResult>> DerivationGoal::haveDerivation(bool inBuildSlot) noexcept
try {
trace("have derivation");
parsedDrv = std::make_unique<ParsedDerivation>(drvPath, *drv);
@ -255,7 +263,7 @@ Goal::WorkResult DerivationGoal::haveDerivation(bool inBuildSlot)
/* If they are all valid, then we're done. */
if (allValid && buildMode == bmNormal) {
return done(BuildResult::AlreadyValid, std::move(validOutputs));
return {done(BuildResult::AlreadyValid, std::move(validOutputs))};
}
/* We are first going to try to create the invalid output paths
@ -290,20 +298,29 @@ Goal::WorkResult DerivationGoal::haveDerivation(bool inBuildSlot)
return outputsSubstitutionTried(inBuildSlot);
} else {
state = &DerivationGoal::outputsSubstitutionTried;
return result;
return {std::move(result)};
}
} catch (...) {
return {std::current_exception()};
}
Goal::WorkResult DerivationGoal::outputsSubstitutionTried(bool inBuildSlot)
{
kj::Promise<Result<Goal::WorkResult>> DerivationGoal::outputsSubstitutionTried(bool inBuildSlot) noexcept
try {
trace("all outputs substituted (maybe)");
assert(drv->type().isPure());
if (nrFailed > 0 && nrFailed > nrNoSubstituters + nrIncompleteClosure && !settings.tryFallback) {
return done(BuildResult::TransientFailure, {},
Error("some substitutes for the outputs of derivation '%s' failed (usually happens due to networking issues); try '--fallback' to build derivation from source ",
worker.store.printStorePath(drvPath)));
if (nrFailed > 0 && nrFailed > nrNoSubstituters + nrIncompleteClosure && !settings.tryFallback)
{
return {done(
BuildResult::TransientFailure,
{},
Error(
"some substitutes for the outputs of derivation '%s' failed (usually happens due "
"to networking issues); try '--fallback' to build derivation from source ",
worker.store.printStorePath(drvPath)
)
)};
}
/* If the substitutes form an incomplete closure, then we should
@ -343,7 +360,7 @@ Goal::WorkResult DerivationGoal::outputsSubstitutionTried(bool inBuildSlot)
auto [allValid, validOutputs] = checkPathValidity();
if (buildMode == bmNormal && allValid) {
return done(BuildResult::Substituted, std::move(validOutputs));
return {done(BuildResult::Substituted, std::move(validOutputs))};
}
if (buildMode == bmRepair && allValid) {
return repairClosure();
@ -354,13 +371,15 @@ Goal::WorkResult DerivationGoal::outputsSubstitutionTried(bool inBuildSlot)
/* Nothing to wait for; tail call */
return gaveUpOnSubstitution(inBuildSlot);
} catch (...) {
return {std::current_exception()};
}
/* At least one of the output paths could not be
produced using a substitute. So we have to build instead. */
Goal::WorkResult DerivationGoal::gaveUpOnSubstitution(bool inBuildSlot)
{
kj::Promise<Result<Goal::WorkResult>> DerivationGoal::gaveUpOnSubstitution(bool inBuildSlot) noexcept
try {
WaitForGoals result;
/* At this point we are building all outputs, so if more are wanted there
@ -426,13 +445,15 @@ Goal::WorkResult DerivationGoal::gaveUpOnSubstitution(bool inBuildSlot)
return inputsRealised(inBuildSlot);
} else {
state = &DerivationGoal::inputsRealised;
return result;
return {result};
}
} catch (...) {
return {std::current_exception()};
}
Goal::WorkResult DerivationGoal::repairClosure()
{
kj::Promise<Result<Goal::WorkResult>> DerivationGoal::repairClosure() noexcept
try {
assert(drv->type().isPure());
/* If we're repairing, we now know that our own outputs are valid.
@ -486,34 +507,44 @@ Goal::WorkResult DerivationGoal::repairClosure()
}
if (result.goals.empty()) {
return done(BuildResult::AlreadyValid, assertPathValidity());
return {done(BuildResult::AlreadyValid, assertPathValidity())};
}
state = &DerivationGoal::closureRepaired;
return result;
return {result};
} catch (...) {
return {std::current_exception()};
}
Goal::WorkResult DerivationGoal::closureRepaired(bool inBuildSlot)
{
kj::Promise<Result<Goal::WorkResult>> DerivationGoal::closureRepaired(bool inBuildSlot) noexcept
try {
trace("closure repaired");
if (nrFailed > 0)
throw Error("some paths in the output closure of derivation '%s' could not be repaired",
worker.store.printStorePath(drvPath));
return done(BuildResult::AlreadyValid, assertPathValidity());
return {done(BuildResult::AlreadyValid, assertPathValidity())};
} catch (...) {
return {std::current_exception()};
}
Goal::WorkResult DerivationGoal::inputsRealised(bool inBuildSlot)
{
kj::Promise<Result<Goal::WorkResult>> DerivationGoal::inputsRealised(bool inBuildSlot) noexcept
try {
trace("all inputs realised");
if (nrFailed != 0) {
if (!useDerivation)
throw Error("some dependencies of '%s' are missing", worker.store.printStorePath(drvPath));
return done(BuildResult::DependencyFailed, {}, Error(
return {done(
BuildResult::DependencyFailed,
{},
Error(
"%s dependencies of derivation '%s' failed to build",
nrFailed, worker.store.printStorePath(drvPath)));
nrFailed,
worker.store.printStorePath(drvPath)
)
)};
}
if (retrySubstitution == RetrySubstitution::YesNeed) {
@ -584,7 +615,7 @@ Goal::WorkResult DerivationGoal::inputsRealised(bool inBuildSlot)
pathResolved, wantedOutputs, buildMode);
state = &DerivationGoal::resolvedFinished;
return WaitForGoals{{resolvedDrvGoal}};
return {WaitForGoals{{resolvedDrvGoal}}};
}
std::function<void(const StorePath &, const DerivedPathMap<StringSet>::ChildNode &)> accumInputPaths;
@ -650,6 +681,8 @@ Goal::WorkResult DerivationGoal::inputsRealised(bool inBuildSlot)
build hook. */
state = &DerivationGoal::tryToBuild;
return tryToBuild(inBuildSlot);
} catch (...) {
return {std::current_exception()};
}
void DerivationGoal::started()
@ -665,8 +698,8 @@ void DerivationGoal::started()
mcRunningBuilds = worker.runningBuilds.addTemporarily(1);
}
Goal::WorkResult DerivationGoal::tryToBuild(bool inBuildSlot)
{
kj::Promise<Result<Goal::WorkResult>> DerivationGoal::tryToBuild(bool inBuildSlot) noexcept
try {
trace("trying to build");
/* Obtain locks on all output paths, if the paths are known a priori.
@ -700,7 +733,7 @@ Goal::WorkResult DerivationGoal::tryToBuild(bool inBuildSlot)
if (!actLock)
actLock = std::make_unique<Activity>(*logger, lvlWarn, actBuildWaiting,
fmt("waiting for lock on %s", Magenta(showPaths(lockFiles))));
return WaitForAWhile{};
return {WaitForAWhile{}};
}
actLock.reset();
@ -717,7 +750,7 @@ Goal::WorkResult DerivationGoal::tryToBuild(bool inBuildSlot)
if (buildMode != bmCheck && allValid) {
debug("skipping build of derivation '%s', someone beat us to it", worker.store.printStorePath(drvPath));
outputLocks.setDeletion(true);
return done(BuildResult::AlreadyValid, std::move(validOutputs));
return {done(BuildResult::AlreadyValid, std::move(validOutputs))};
}
/* If any of the outputs already exist but are not valid, delete
@ -765,7 +798,7 @@ Goal::WorkResult DerivationGoal::tryToBuild(bool inBuildSlot)
},
hookReply);
if (result) {
return std::move(*result);
return {std::move(*result)};
}
}
@ -773,13 +806,18 @@ Goal::WorkResult DerivationGoal::tryToBuild(bool inBuildSlot)
state = &DerivationGoal::tryLocalBuild;
return tryLocalBuild(inBuildSlot);
} catch (...) {
return {std::current_exception()};
}
Goal::WorkResult DerivationGoal::tryLocalBuild(bool inBuildSlot) {
kj::Promise<Result<Goal::WorkResult>> DerivationGoal::tryLocalBuild(bool inBuildSlot) noexcept
try {
throw Error(
"unable to build with a primary store that isn't a local store; "
"either pass a different '--store' or enable remote builds."
"\nhttps://docs.lix.systems/manual/lix/stable/advanced-topics/distributed-builds.html");
} catch (...) {
return {std::current_exception()};
}
@ -935,8 +973,8 @@ void runPostBuildHook(
proc.getStdout()->drainInto(sink);
}
Goal::WorkResult DerivationGoal::buildDone(bool inBuildSlot)
{
kj::Promise<Result<Goal::WorkResult>> DerivationGoal::buildDone(bool inBuildSlot) noexcept
try {
trace("build done");
Finally releaseBuildUser([&](){ this->cleanupHookFinally(); });
@ -1030,7 +1068,7 @@ Goal::WorkResult DerivationGoal::buildDone(bool inBuildSlot)
outputLocks.setDeletion(true);
outputLocks.unlock();
return done(BuildResult::Built, std::move(builtOutputs));
return {done(BuildResult::Built, std::move(builtOutputs))};
} catch (BuildError & e) {
outputLocks.unlock();
@ -1051,12 +1089,14 @@ Goal::WorkResult DerivationGoal::buildDone(bool inBuildSlot)
BuildResult::PermanentFailure;
}
return done(st, {}, std::move(e));
return {done(st, {}, std::move(e))};
}
} catch (...) {
return {std::current_exception()};
}
Goal::WorkResult DerivationGoal::resolvedFinished(bool inBuildSlot)
{
kj::Promise<Result<Goal::WorkResult>> DerivationGoal::resolvedFinished(bool inBuildSlot) noexcept
try {
trace("resolved derivation finished");
assert(resolvedDrvGoal);
@ -1123,7 +1163,9 @@ Goal::WorkResult DerivationGoal::resolvedFinished(bool inBuildSlot)
if (status == BuildResult::AlreadyValid)
status = BuildResult::ResolvesToAlreadyValid;
return done(status, std::move(builtOutputs));
return {done(status, std::move(builtOutputs))};
} catch (...) {
return {std::current_exception()};
}
HookReply DerivationGoal::tryBuildHook(bool inBuildSlot)

View file

@ -213,7 +213,7 @@ struct DerivationGoal : public Goal
*/
std::optional<DerivationType> derivationType;
typedef WorkResult (DerivationGoal::*GoalState)(bool inBuildSlot);
typedef kj::Promise<Result<WorkResult>> (DerivationGoal::*GoalState)(bool inBuildSlot) noexcept;
GoalState state;
BuildMode buildMode;
@ -246,7 +246,7 @@ struct DerivationGoal : public Goal
std::string key() override;
WorkResult work(bool inBuildSlot) override;
kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept override;
/**
* Add wanted outputs to an already existing derivation goal.
@ -256,18 +256,18 @@ struct DerivationGoal : public Goal
/**
* The states.
*/
WorkResult getDerivation(bool inBuildSlot);
WorkResult loadDerivation(bool inBuildSlot);
WorkResult haveDerivation(bool inBuildSlot);
WorkResult outputsSubstitutionTried(bool inBuildSlot);
WorkResult gaveUpOnSubstitution(bool inBuildSlot);
WorkResult closureRepaired(bool inBuildSlot);
WorkResult inputsRealised(bool inBuildSlot);
WorkResult tryToBuild(bool inBuildSlot);
virtual WorkResult tryLocalBuild(bool inBuildSlot);
WorkResult buildDone(bool inBuildSlot);
kj::Promise<Result<WorkResult>> getDerivation(bool inBuildSlot) noexcept;
kj::Promise<Result<WorkResult>> loadDerivation(bool inBuildSlot) noexcept;
kj::Promise<Result<WorkResult>> haveDerivation(bool inBuildSlot) noexcept;
kj::Promise<Result<WorkResult>> outputsSubstitutionTried(bool inBuildSlot) noexcept;
kj::Promise<Result<WorkResult>> gaveUpOnSubstitution(bool inBuildSlot) noexcept;
kj::Promise<Result<WorkResult>> closureRepaired(bool inBuildSlot) noexcept;
kj::Promise<Result<WorkResult>> inputsRealised(bool inBuildSlot) noexcept;
kj::Promise<Result<WorkResult>> tryToBuild(bool inBuildSlot) noexcept;
virtual kj::Promise<Result<WorkResult>> tryLocalBuild(bool inBuildSlot) noexcept;
kj::Promise<Result<WorkResult>> buildDone(bool inBuildSlot) noexcept;
WorkResult resolvedFinished(bool inBuildSlot);
kj::Promise<Result<WorkResult>> resolvedFinished(bool inBuildSlot) noexcept;
/**
* Is the build hook willing to perform the build?
@ -346,7 +346,7 @@ struct DerivationGoal : public Goal
*/
virtual void killChild();
WorkResult repairClosure();
kj::Promise<Result<WorkResult>> repairClosure() noexcept;
void started();

View file

@ -22,25 +22,27 @@ DrvOutputSubstitutionGoal::DrvOutputSubstitutionGoal(
}
Goal::WorkResult DrvOutputSubstitutionGoal::init(bool inBuildSlot)
{
kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::init(bool inBuildSlot) noexcept
try {
trace("init");
/* If the derivation already exists, were done */
if (worker.store.queryRealisation(id)) {
return Finished{ecSuccess, std::move(buildResult)};
return {Finished{ecSuccess, std::move(buildResult)}};
}
subs = settings.useSubstitutes ? getDefaultSubstituters() : std::list<ref<Store>>();
return tryNext(inBuildSlot);
} catch (...) {
return {std::current_exception()};
}
Goal::WorkResult DrvOutputSubstitutionGoal::tryNext(bool inBuildSlot)
{
kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::tryNext(bool inBuildSlot) noexcept
try {
trace("trying next substituter");
if (!inBuildSlot) {
return WaitForSlot{};
return {WaitForSlot{}};
}
maintainRunningSubstitutions = worker.runningSubstitutions.addTemporarily(1);
@ -57,7 +59,7 @@ Goal::WorkResult DrvOutputSubstitutionGoal::tryNext(bool inBuildSlot)
/* Hack: don't indicate failure if there were no substituters.
In that case the calling derivation should just do a
build. */
return Finished{substituterFailed ? ecFailed : ecNoSubstituters, std::move(buildResult)};
return {Finished{substituterFailed ? ecFailed : ecNoSubstituters, std::move(buildResult)}};
}
sub = subs.front();
@ -77,11 +79,13 @@ Goal::WorkResult DrvOutputSubstitutionGoal::tryNext(bool inBuildSlot)
});
state = &DrvOutputSubstitutionGoal::realisationFetched;
return WaitForWorld{{downloadState->outPipe.readSide.get()}, true};
return {WaitForWorld{{downloadState->outPipe.readSide.get()}, true}};
} catch (...) {
return {std::current_exception()};
}
Goal::WorkResult DrvOutputSubstitutionGoal::realisationFetched(bool inBuildSlot)
{
kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::realisationFetched(bool inBuildSlot) noexcept
try {
worker.childTerminated(this);
maintainRunningSubstitutions.reset();
@ -122,31 +126,37 @@ Goal::WorkResult DrvOutputSubstitutionGoal::realisationFetched(bool inBuildSlot)
return outPathValid(inBuildSlot);
} else {
state = &DrvOutputSubstitutionGoal::outPathValid;
return result;
return {std::move(result)};
}
} catch (...) {
return {std::current_exception()};
}
Goal::WorkResult DrvOutputSubstitutionGoal::outPathValid(bool inBuildSlot)
{
kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::outPathValid(bool inBuildSlot) noexcept
try {
assert(outputInfo);
trace("output path substituted");
if (nrFailed > 0) {
debug("The output path of the derivation output '%s' could not be substituted", id.to_string());
return Finished{
return {Finished{
nrNoSubstituters > 0 || nrIncompleteClosure > 0 ? ecIncompleteClosure : ecFailed,
std::move(buildResult),
};
}};
}
worker.store.registerDrvOutput(*outputInfo);
return finished();
} catch (...) {
return {std::current_exception()};
}
Goal::WorkResult DrvOutputSubstitutionGoal::finished()
{
kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::finished() noexcept
try {
trace("finished");
return Finished{ecSuccess, std::move(buildResult)};
return {Finished{ecSuccess, std::move(buildResult)}};
} catch (...) {
return {std::current_exception()};
}
std::string DrvOutputSubstitutionGoal::key()
@ -156,7 +166,7 @@ std::string DrvOutputSubstitutionGoal::key()
return "a$" + std::string(id.to_string());
}
Goal::WorkResult DrvOutputSubstitutionGoal::work(bool inBuildSlot)
kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::work(bool inBuildSlot) noexcept
{
return (this->*state)(inBuildSlot);
}

View file

@ -65,20 +65,20 @@ public:
std::optional<ContentAddress> ca = std::nullopt
);
typedef WorkResult (DrvOutputSubstitutionGoal::*GoalState)(bool inBuildSlot);
typedef kj::Promise<Result<WorkResult>> (DrvOutputSubstitutionGoal::*GoalState)(bool inBuildSlot) noexcept;
GoalState state;
WorkResult init(bool inBuildSlot);
WorkResult tryNext(bool inBuildSlot);
WorkResult realisationFetched(bool inBuildSlot);
WorkResult outPathValid(bool inBuildSlot);
WorkResult finished();
kj::Promise<Result<WorkResult>> init(bool inBuildSlot) noexcept;
kj::Promise<Result<WorkResult>> tryNext(bool inBuildSlot) noexcept;
kj::Promise<Result<WorkResult>> realisationFetched(bool inBuildSlot) noexcept;
kj::Promise<Result<WorkResult>> outPathValid(bool inBuildSlot) noexcept;
kj::Promise<Result<WorkResult>> finished() noexcept;
Finished timedOut(Error && ex) override { abort(); };
std::string key() override;
WorkResult work(bool inBuildSlot) override;
kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept override;
JobCategory jobCategory() const override {
return JobCategory::Substitution;

View file

@ -6,11 +6,17 @@
namespace nix {
static auto runWorker(Worker & worker, auto mkGoals)
{
return worker.run(mkGoals);
}
void Store::buildPaths(const std::vector<DerivedPath> & reqs, BuildMode buildMode, std::shared_ptr<Store> evalStore)
{
Worker worker(*this, evalStore ? *evalStore : *this);
auto aio = kj::setupAsyncIo();
Worker worker(*this, evalStore ? *evalStore : *this, aio);
auto goals = worker.run([&](GoalFactory & gf) {
auto goals = runWorker(worker, [&](GoalFactory & gf) {
Goals goals;
for (auto & br : reqs)
goals.insert(gf.makeGoal(br, buildMode));
@ -48,10 +54,12 @@ std::vector<KeyedBuildResult> Store::buildPathsWithResults(
BuildMode buildMode,
std::shared_ptr<Store> evalStore)
{
Worker worker(*this, evalStore ? *evalStore : *this);
auto aio = kj::setupAsyncIo();
Worker worker(*this, evalStore ? *evalStore : *this, aio);
std::vector<std::pair<const DerivedPath &, GoalPtr>> state;
auto goals = worker.run([&](GoalFactory & gf) {
auto goals = runWorker(worker, [&](GoalFactory & gf) {
Goals goals;
for (const auto & req : reqs) {
auto goal = gf.makeGoal(req, buildMode);
@ -72,10 +80,11 @@ std::vector<KeyedBuildResult> Store::buildPathsWithResults(
BuildResult Store::buildDerivation(const StorePath & drvPath, const BasicDerivation & drv,
BuildMode buildMode)
{
Worker worker(*this, *this);
auto aio = kj::setupAsyncIo();
Worker worker(*this, *this, aio);
try {
auto goals = worker.run([&](GoalFactory & gf) -> Goals {
auto goals = runWorker(worker, [&](GoalFactory & gf) -> Goals {
return Goals{gf.makeBasicDerivationGoal(drvPath, drv, OutputsSpec::All{}, buildMode)};
});
auto goal = *goals.begin();
@ -97,10 +106,12 @@ void Store::ensurePath(const StorePath & path)
/* If the path is already valid, we're done. */
if (isValidPath(path)) return;
Worker worker(*this, *this);
auto aio = kj::setupAsyncIo();
Worker worker(*this, *this, aio);
auto goals =
worker.run([&](GoalFactory & gf) { return Goals{gf.makePathSubstitutionGoal(path)}; });
auto goals = runWorker(worker, [&](GoalFactory & gf) {
return Goals{gf.makePathSubstitutionGoal(path)};
});
auto goal = *goals.begin();
if (goal->exitCode != Goal::ecSuccess) {
@ -115,9 +126,10 @@ void Store::ensurePath(const StorePath & path)
void Store::repairPath(const StorePath & path)
{
Worker worker(*this, *this);
auto aio = kj::setupAsyncIo();
Worker worker(*this, *this, aio);
auto goals = worker.run([&](GoalFactory & gf) {
auto goals = runWorker(worker, [&](GoalFactory & gf) {
return Goals{gf.makePathSubstitutionGoal(path, Repair)};
});
auto goal = *goals.begin();

View file

@ -1,9 +1,11 @@
#pragma once
///@file
#include "result.hh"
#include "types.hh"
#include "store-api.hh"
#include "build-result.hh"
#include <kj/async.h>
namespace nix {
@ -161,7 +163,7 @@ public:
trace("goal destroyed");
}
virtual WorkResult work(bool inBuildSlot) = 0;
virtual kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept = 0;
virtual void waiteeDone(GoalPtr waitee) { }

View file

@ -149,8 +149,8 @@ void LocalDerivationGoal::killSandbox(bool getStats)
}
Goal::WorkResult LocalDerivationGoal::tryLocalBuild(bool inBuildSlot)
{
kj::Promise<Result<Goal::WorkResult>> LocalDerivationGoal::tryLocalBuild(bool inBuildSlot) noexcept
try {
#if __APPLE__
additionalSandboxProfile = parsedDrv->getStringAttr("__sandboxProfile").value_or("");
#endif
@ -159,7 +159,7 @@ Goal::WorkResult LocalDerivationGoal::tryLocalBuild(bool inBuildSlot)
state = &DerivationGoal::tryToBuild;
outputLocks.unlock();
if (0U != settings.maxBuildJobs) {
return WaitForSlot{};
return {WaitForSlot{}};
}
if (getMachines().empty()) {
throw Error(
@ -214,7 +214,7 @@ Goal::WorkResult LocalDerivationGoal::tryLocalBuild(bool inBuildSlot)
if (!actLock)
actLock = std::make_unique<Activity>(*logger, lvlWarn, actBuildWaiting,
fmt("waiting for a free build user ID for '%s'", Magenta(worker.store.printStorePath(drvPath))));
return WaitForAWhile{};
return {WaitForAWhile{}};
}
}
@ -250,15 +250,17 @@ Goal::WorkResult LocalDerivationGoal::tryLocalBuild(bool inBuildSlot)
state = &DerivationGoal::buildDone;
started();
return WaitForWorld{std::move(fds), true};
return {WaitForWorld{std::move(fds), true}};
} catch (BuildError & e) {
outputLocks.unlock();
buildUser.reset();
auto report = done(BuildResult::InputRejected, {}, std::move(e));
report.permanentFailure = true;
return report;
return {std::move(report)};
}
} catch (...) {
return {std::current_exception()};
}

View file

@ -213,7 +213,7 @@ struct LocalDerivationGoal : public DerivationGoal
/**
* The additional states.
*/
WorkResult tryLocalBuild(bool inBuildSlot) override;
kj::Promise<Result<WorkResult>> tryLocalBuild(bool inBuildSlot) noexcept override;
/**
* Start building a derivation.

View file

@ -45,21 +45,21 @@ Goal::Finished PathSubstitutionGoal::done(
}
Goal::WorkResult PathSubstitutionGoal::work(bool inBuildSlot)
kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::work(bool inBuildSlot) noexcept
{
return (this->*state)(inBuildSlot);
}
Goal::WorkResult PathSubstitutionGoal::init(bool inBuildSlot)
{
kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::init(bool inBuildSlot) noexcept
try {
trace("init");
worker.store.addTempRoot(storePath);
/* If the path already exists we're done. */
if (!repair && worker.store.isValidPath(storePath)) {
return done(ecSuccess, BuildResult::AlreadyValid);
return {done(ecSuccess, BuildResult::AlreadyValid)};
}
if (settings.readOnlyMode)
@ -68,11 +68,13 @@ Goal::WorkResult PathSubstitutionGoal::init(bool inBuildSlot)
subs = settings.useSubstitutes ? getDefaultSubstituters() : std::list<ref<Store>>();
return tryNext(inBuildSlot);
} catch (...) {
return {std::current_exception()};
}
Goal::WorkResult PathSubstitutionGoal::tryNext(bool inBuildSlot)
{
kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::tryNext(bool inBuildSlot) noexcept
try {
trace("trying next substituter");
cleanup();
@ -87,10 +89,10 @@ Goal::WorkResult PathSubstitutionGoal::tryNext(bool inBuildSlot)
/* Hack: don't indicate failure if there were no substituters.
In that case the calling derivation should just do a
build. */
return done(
return {done(
substituterFailed ? ecFailed : ecNoSubstituters,
BuildResult::NoSubstituters,
fmt("path '%s' is required, but there is no substituter that can build it", worker.store.printStorePath(storePath)));
fmt("path '%s' is required, but there is no substituter that can build it", worker.store.printStorePath(storePath)))};
}
sub = subs.front();
@ -167,20 +169,22 @@ Goal::WorkResult PathSubstitutionGoal::tryNext(bool inBuildSlot)
return referencesValid(inBuildSlot);
} else {
state = &PathSubstitutionGoal::referencesValid;
return result;
return {std::move(result)};
}
} catch (...) {
return {std::current_exception()};
}
Goal::WorkResult PathSubstitutionGoal::referencesValid(bool inBuildSlot)
{
kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::referencesValid(bool inBuildSlot) noexcept
try {
trace("all references realised");
if (nrFailed > 0) {
return done(
return {done(
nrNoSubstituters > 0 || nrIncompleteClosure > 0 ? ecIncompleteClosure : ecFailed,
BuildResult::DependencyFailed,
fmt("some references of path '%s' could not be realised", worker.store.printStorePath(storePath)));
fmt("some references of path '%s' could not be realised", worker.store.printStorePath(storePath)))};
}
for (auto & i : info->references)
@ -189,15 +193,17 @@ Goal::WorkResult PathSubstitutionGoal::referencesValid(bool inBuildSlot)
state = &PathSubstitutionGoal::tryToRun;
return tryToRun(inBuildSlot);
} catch (...) {
return {std::current_exception()};
}
Goal::WorkResult PathSubstitutionGoal::tryToRun(bool inBuildSlot)
{
kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::tryToRun(bool inBuildSlot) noexcept
try {
trace("trying to run");
if (!inBuildSlot) {
return WaitForSlot{};
return {WaitForSlot{}};
}
maintainRunningSubstitutions = worker.runningSubstitutions.addTemporarily(1);
@ -228,12 +234,14 @@ Goal::WorkResult PathSubstitutionGoal::tryToRun(bool inBuildSlot)
});
state = &PathSubstitutionGoal::finished;
return WaitForWorld{{outPipe.readSide.get()}, true};
return {WaitForWorld{{outPipe.readSide.get()}, true}};
} catch (...) {
return {std::current_exception()};
}
Goal::WorkResult PathSubstitutionGoal::finished(bool inBuildSlot)
{
kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::finished(bool inBuildSlot) noexcept
try {
trace("substitute finished");
worker.childTerminated(this);
@ -274,7 +282,9 @@ Goal::WorkResult PathSubstitutionGoal::finished(bool inBuildSlot)
worker.doneNarSize += maintainExpectedNar.delta();
maintainExpectedNar.reset();
return done(ecSuccess, BuildResult::Substituted);
return {done(ecSuccess, BuildResult::Substituted)};
} catch (...) {
return {std::current_exception()};
}

View file

@ -67,7 +67,7 @@ struct PathSubstitutionGoal : public Goal
NotifyingCounter<uint64_t>::Bump maintainExpectedSubstitutions,
maintainRunningSubstitutions, maintainExpectedNar, maintainExpectedDownload;
typedef WorkResult (PathSubstitutionGoal::*GoalState)(bool inBuildSlot);
typedef kj::Promise<Result<WorkResult>> (PathSubstitutionGoal::*GoalState)(bool inBuildSlot) noexcept;
GoalState state;
/**
@ -101,16 +101,16 @@ public:
return "a$" + std::string(storePath.name()) + "$" + worker.store.printStorePath(storePath);
}
WorkResult work(bool inBuildSlot) override;
kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept override;
/**
* The states.
*/
WorkResult init(bool inBuildSlot);
WorkResult tryNext(bool inBuildSlot);
WorkResult referencesValid(bool inBuildSlot);
WorkResult tryToRun(bool inBuildSlot);
WorkResult finished(bool inBuildSlot);
kj::Promise<Result<WorkResult>> init(bool inBuildSlot) noexcept;
kj::Promise<Result<WorkResult>> tryNext(bool inBuildSlot) noexcept;
kj::Promise<Result<WorkResult>> referencesValid(bool inBuildSlot) noexcept;
kj::Promise<Result<WorkResult>> tryToRun(bool inBuildSlot) noexcept;
kj::Promise<Result<WorkResult>> finished(bool inBuildSlot) noexcept;
/**
* Callback used by the worker to write to the log.

View file

@ -11,12 +11,13 @@
namespace nix {
Worker::Worker(Store & store, Store & evalStore)
Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio)
: act(*logger, actRealise)
, actDerivations(*logger, actBuilds)
, actSubstitutions(*logger, actCopyPaths)
, store(store)
, evalStore(evalStore)
, aio(aio)
{
/* Debugging: prevent recursive workers. */
nrLocalBuilds = 0;
@ -379,7 +380,7 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req)
const bool inSlot = goal->jobCategory() == JobCategory::Substitution
? nrSubstitutions < std::max(1U, (unsigned int) settings.maxSubstitutionJobs)
: nrLocalBuilds < settings.maxBuildJobs;
handleWorkResult(goal, goal->work(inSlot));
handleWorkResult(goal, goal->work(inSlot).wait(aio.waitScope).value());
updateStatistics();
if (topGoals.empty()) break; // stuff may have been cancelled

View file

@ -9,6 +9,7 @@
#include "realisation.hh"
#include <future>
#include <kj/async-io.h>
#include <thread>
namespace nix {
@ -237,6 +238,7 @@ public:
Store & store;
Store & evalStore;
kj::AsyncIoContext & aio;
struct HookState {
std::unique_ptr<HookInstance> instance;
@ -264,7 +266,7 @@ public:
NotifyingCounter<uint64_t> expectedNarSize{[this] { updateStatisticsLater(); }};
NotifyingCounter<uint64_t> doneNarSize{[this] { updateStatisticsLater(); }};
Worker(Store & store, Store & evalStore);
Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio);
~Worker();
/**

View file

@ -221,6 +221,7 @@ dependencies = [
aws_s3,
aws_sdk_transfer,
nlohmann_json,
kj,
]
if host_machine.system() == 'freebsd'