Expose an async interface for queryRealisation
Doesn’t change much so far because everything is still using it synchronously, but should allow the binary cache to fetch stuff in parallel
This commit is contained in:
parent
5b2aa61f1b
commit
96670ed216
12 changed files with 138 additions and 55 deletions
|
@ -437,39 +437,19 @@ StorePath BinaryCacheStore::addTextToStore(const string & name, const string & s
|
|||
})->path;
|
||||
}
|
||||
|
||||
std::optional<const Realisation> BinaryCacheStore::queryRealisation(const DrvOutput & id)
|
||||
void BinaryCacheStore::queryRealisationUncached(const DrvOutput & id,
|
||||
Callback<std::shared_ptr<const Realisation>> callback) noexcept
|
||||
{
|
||||
if (diskCache) {
|
||||
auto [cacheOutcome, maybeCachedRealisation] =
|
||||
diskCache->lookupRealisation(getUri(), id);
|
||||
switch (cacheOutcome) {
|
||||
case NarInfoDiskCache::oValid:
|
||||
debug("Returning a cached realisation for %s", id.to_string());
|
||||
return *maybeCachedRealisation;
|
||||
case NarInfoDiskCache::oInvalid:
|
||||
debug("Returning a cached missing realisation for %s", id.to_string());
|
||||
return {};
|
||||
case NarInfoDiskCache::oUnknown:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
auto outputInfoFilePath = realisationsPrefix + "/" + id.to_string() + ".doi";
|
||||
auto rawOutputInfo = getFile(outputInfoFilePath);
|
||||
|
||||
if (rawOutputInfo) {
|
||||
auto realisation = Realisation::fromJSON(
|
||||
nlohmann::json::parse(*rawOutputInfo), outputInfoFilePath);
|
||||
|
||||
if (diskCache)
|
||||
diskCache->upsertRealisation(
|
||||
getUri(), realisation);
|
||||
|
||||
return {realisation};
|
||||
callback(std::make_shared<const Realisation>(realisation));
|
||||
return;
|
||||
} else {
|
||||
if (diskCache)
|
||||
diskCache->upsertAbsentRealisation(getUri(), id);
|
||||
return std::nullopt;
|
||||
callback(nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -108,7 +108,8 @@ public:
|
|||
|
||||
void registerDrvOutput(const Realisation & info) override;
|
||||
|
||||
std::optional<const Realisation> queryRealisation(const DrvOutput &) override;
|
||||
void queryRealisationUncached(const DrvOutput &,
|
||||
Callback<std::shared_ptr<const Realisation>> callback) noexcept override;
|
||||
|
||||
void narFromPath(const StorePath & path, Sink & sink) override;
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ private:
|
|||
|
||||
// The realisation corresponding to the given output id.
|
||||
// Will be filled once we can get it.
|
||||
std::optional<Realisation> outputInfo;
|
||||
std::shared_ptr<const Realisation> outputInfo;
|
||||
|
||||
/* The remaining substituters. */
|
||||
std::list<ref<Store>> subs;
|
||||
|
|
|
@ -1224,13 +1224,14 @@ struct RestrictedStore : public virtual RestrictedStoreConfig, public virtual Lo
|
|||
// corresponds to an allowed derivation
|
||||
{ throw Error("registerDrvOutput"); }
|
||||
|
||||
std::optional<const Realisation> queryRealisation(const DrvOutput & id) override
|
||||
void queryRealisationUncached(const DrvOutput & id,
|
||||
Callback<std::shared_ptr<const Realisation>> callback) noexcept override
|
||||
// XXX: This should probably be allowed if the realisation corresponds to
|
||||
// an allowed derivation
|
||||
{
|
||||
if (!goal.isAllowed(id))
|
||||
throw InvalidPath("cannot query an unknown output id '%s' in recursive Nix", id.to_string());
|
||||
return next->queryRealisation(id);
|
||||
callback(nullptr);
|
||||
next->queryRealisation(id, std::move(callback));
|
||||
}
|
||||
|
||||
void buildPaths(const std::vector<DerivedPath> & paths, BuildMode buildMode, std::shared_ptr<Store> evalStore) override
|
||||
|
|
|
@ -50,8 +50,9 @@ struct DummyStore : public virtual DummyStoreConfig, public virtual Store
|
|||
void narFromPath(const StorePath & path, Sink & sink) override
|
||||
{ unsupported("narFromPath"); }
|
||||
|
||||
std::optional<const Realisation> queryRealisation(const DrvOutput&) override
|
||||
{ unsupported("queryRealisation"); }
|
||||
void queryRealisationUncached(const DrvOutput &,
|
||||
Callback<std::shared_ptr<const Realisation>> callback) noexcept override
|
||||
{ callback(nullptr); }
|
||||
};
|
||||
|
||||
static RegisterStoreImplementation<DummyStore, DummyStoreConfig> regDummyStore;
|
||||
|
|
|
@ -367,7 +367,8 @@ public:
|
|||
return conn->remoteVersion;
|
||||
}
|
||||
|
||||
std::optional<const Realisation> queryRealisation(const DrvOutput&) override
|
||||
void queryRealisationUncached(const DrvOutput &,
|
||||
Callback<std::shared_ptr<const Realisation>> callback) noexcept override
|
||||
// TODO: Implement
|
||||
{ unsupported("queryRealisation"); }
|
||||
};
|
||||
|
|
|
@ -1838,13 +1838,24 @@ std::optional<const Realisation> LocalStore::queryRealisation_(
|
|||
return { res };
|
||||
}
|
||||
|
||||
std::optional<const Realisation>
|
||||
LocalStore::queryRealisation(const DrvOutput & id)
|
||||
void LocalStore::queryRealisationUncached(const DrvOutput & id,
|
||||
Callback<std::shared_ptr<const Realisation>> callback) noexcept
|
||||
{
|
||||
return retrySQLite<std::optional<const Realisation>>([&]() {
|
||||
auto state(_state.lock());
|
||||
return queryRealisation_(*state, id);
|
||||
});
|
||||
try {
|
||||
auto maybeRealisation
|
||||
= retrySQLite<std::optional<const Realisation>>([&]() {
|
||||
auto state(_state.lock());
|
||||
return queryRealisation_(*state, id);
|
||||
});
|
||||
if (maybeRealisation)
|
||||
callback(
|
||||
std::make_shared<const Realisation>(maybeRealisation.value()));
|
||||
else
|
||||
callback(nullptr);
|
||||
|
||||
} catch (...) {
|
||||
callback.rethrow();
|
||||
}
|
||||
}
|
||||
|
||||
FixedOutputHash LocalStore::hashCAPath(
|
||||
|
|
|
@ -207,7 +207,8 @@ public:
|
|||
|
||||
std::optional<const Realisation> queryRealisation_(State & state, const DrvOutput & id);
|
||||
std::optional<std::pair<int64_t, Realisation>> queryRealisationCore_(State & state, const DrvOutput & id);
|
||||
std::optional<const Realisation> queryRealisation(const DrvOutput&) override;
|
||||
void queryRealisationUncached(const DrvOutput&,
|
||||
Callback<std::shared_ptr<const Realisation>> callback) noexcept override;
|
||||
|
||||
private:
|
||||
|
||||
|
|
|
@ -677,23 +677,33 @@ void RemoteStore::registerDrvOutput(const Realisation & info)
|
|||
conn.processStderr();
|
||||
}
|
||||
|
||||
std::optional<const Realisation> RemoteStore::queryRealisation(const DrvOutput & id)
|
||||
void RemoteStore::queryRealisationUncached(const DrvOutput & id,
|
||||
Callback<std::shared_ptr<const Realisation>> callback) noexcept
|
||||
{
|
||||
auto conn(getConnection());
|
||||
conn->to << wopQueryRealisation;
|
||||
conn->to << id.to_string();
|
||||
conn.processStderr();
|
||||
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 31) {
|
||||
auto outPaths = worker_proto::read(*this, conn->from, Phantom<std::set<StorePath>>{});
|
||||
if (outPaths.empty())
|
||||
return std::nullopt;
|
||||
return {Realisation{.id = id, .outPath = *outPaths.begin()}};
|
||||
} else {
|
||||
auto realisations = worker_proto::read(*this, conn->from, Phantom<std::set<Realisation>>{});
|
||||
if (realisations.empty())
|
||||
return std::nullopt;
|
||||
return *realisations.begin();
|
||||
}
|
||||
|
||||
auto real = [&]() -> std::shared_ptr<const Realisation> {
|
||||
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 31) {
|
||||
auto outPaths = worker_proto::read(
|
||||
*this, conn->from, Phantom<std::set<StorePath>> {});
|
||||
if (outPaths.empty())
|
||||
return nullptr;
|
||||
return std::make_shared<const Realisation>(Realisation { .id = id, .outPath = *outPaths.begin() });
|
||||
} else {
|
||||
auto realisations = worker_proto::read(
|
||||
*this, conn->from, Phantom<std::set<Realisation>> {});
|
||||
if (realisations.empty())
|
||||
return nullptr;
|
||||
return std::make_shared<const Realisation>(*realisations.begin());
|
||||
}
|
||||
}();
|
||||
|
||||
try {
|
||||
callback(std::shared_ptr<const Realisation>(real));
|
||||
} catch (...) { return callback.rethrow(); }
|
||||
}
|
||||
|
||||
static void writeDerivedPaths(RemoteStore & store, ConnectionHandle & conn, const std::vector<DerivedPath> & reqs)
|
||||
|
|
|
@ -88,7 +88,8 @@ public:
|
|||
|
||||
void registerDrvOutput(const Realisation & info) override;
|
||||
|
||||
std::optional<const Realisation> queryRealisation(const DrvOutput &) override;
|
||||
void queryRealisationUncached(const DrvOutput &,
|
||||
Callback<std::shared_ptr<const Realisation>> callback) noexcept override;
|
||||
|
||||
void buildPaths(const std::vector<DerivedPath> & paths, BuildMode buildMode, std::shared_ptr<Store> evalStore) override;
|
||||
|
||||
|
|
|
@ -542,6 +542,74 @@ void Store::queryPathInfo(const StorePath & storePath,
|
|||
}});
|
||||
}
|
||||
|
||||
void Store::queryRealisation(const DrvOutput & id,
|
||||
Callback<std::shared_ptr<const Realisation>> callback) noexcept
|
||||
{
|
||||
|
||||
try {
|
||||
if (diskCache) {
|
||||
auto [cacheOutcome, maybeCachedRealisation]
|
||||
= diskCache->lookupRealisation(getUri(), id);
|
||||
switch (cacheOutcome) {
|
||||
case NarInfoDiskCache::oValid:
|
||||
debug("Returning a cached realisation for %s", id.to_string());
|
||||
callback(maybeCachedRealisation);
|
||||
return;
|
||||
case NarInfoDiskCache::oInvalid:
|
||||
debug(
|
||||
"Returning a cached missing realisation for %s",
|
||||
id.to_string());
|
||||
callback(nullptr);
|
||||
return;
|
||||
case NarInfoDiskCache::oUnknown:
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (...) {
|
||||
return callback.rethrow();
|
||||
}
|
||||
|
||||
auto callbackPtr
|
||||
= std::make_shared<decltype(callback)>(std::move(callback));
|
||||
|
||||
queryRealisationUncached(
|
||||
id,
|
||||
{ [this, id, callbackPtr](
|
||||
std::future<std::shared_ptr<const Realisation>> fut) {
|
||||
try {
|
||||
auto info = fut.get();
|
||||
|
||||
if (diskCache) {
|
||||
if (info)
|
||||
diskCache->upsertRealisation(getUri(), *info);
|
||||
else
|
||||
diskCache->upsertAbsentRealisation(getUri(), id);
|
||||
}
|
||||
|
||||
(*callbackPtr)(std::shared_ptr<const Realisation>(info));
|
||||
|
||||
} catch (...) {
|
||||
callbackPtr->rethrow();
|
||||
}
|
||||
} });
|
||||
}
|
||||
|
||||
std::shared_ptr<const Realisation> Store::queryRealisation(const DrvOutput & id)
|
||||
{
|
||||
using RealPtr = std::shared_ptr<const Realisation>;
|
||||
std::promise<RealPtr> promise;
|
||||
|
||||
queryRealisation(id,
|
||||
{[&](std::future<RealPtr> result) {
|
||||
try {
|
||||
promise.set_value(result.get());
|
||||
} catch (...) {
|
||||
promise.set_exception(std::current_exception());
|
||||
}
|
||||
}});
|
||||
|
||||
return promise.get_future().get();
|
||||
}
|
||||
|
||||
void Store::substitutePaths(const StorePathSet & paths)
|
||||
{
|
||||
|
|
|
@ -369,6 +369,14 @@ public:
|
|||
void queryPathInfo(const StorePath & path,
|
||||
Callback<ref<const ValidPathInfo>> callback) noexcept;
|
||||
|
||||
/* Query the information about a realisation. */
|
||||
std::shared_ptr<const Realisation> queryRealisation(const DrvOutput &);
|
||||
|
||||
/* Asynchronous version of queryRealisation(). */
|
||||
void queryRealisation(const DrvOutput &,
|
||||
Callback<std::shared_ptr<const Realisation>> callback) noexcept;
|
||||
|
||||
|
||||
/* Check whether the given valid path info is sufficiently attested, by
|
||||
either being signed by a trusted public key or content-addressed, in
|
||||
order to be included in the given store.
|
||||
|
@ -393,11 +401,11 @@ protected:
|
|||
|
||||
virtual void queryPathInfoUncached(const StorePath & path,
|
||||
Callback<std::shared_ptr<const ValidPathInfo>> callback) noexcept = 0;
|
||||
virtual void queryRealisationUncached(const DrvOutput &,
|
||||
Callback<std::shared_ptr<const Realisation>> callback) noexcept = 0;
|
||||
|
||||
public:
|
||||
|
||||
virtual std::optional<const Realisation> queryRealisation(const DrvOutput &) = 0;
|
||||
|
||||
/* Queries the set of incoming FS references for a store path.
|
||||
The result is not cleared. */
|
||||
virtual void queryReferrers(const StorePath & path, StorePathSet & referrers)
|
||||
|
|
Loading…
Reference in a new issue