libstore: remove WriteConn::sink fields
we no longer need these since we're no longer using sinks to serialize things. Change-Id: Iffb1a3eab33c83f611c88fa4e8beaa8d5ffa079b
This commit is contained in:
parent
a5d1f69841
commit
6b4d46e9e0
15 changed files with 34 additions and 41 deletions
|
@ -1204,11 +1204,9 @@ HookReply DerivationGoal::tryBuildHook()
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
|
|
||||||
CommonProto::WriteConn conn { hook->sink };
|
|
||||||
|
|
||||||
/* Tell the hook all the inputs that have to be copied to the
|
/* Tell the hook all the inputs that have to be copied to the
|
||||||
remote system. */
|
remote system. */
|
||||||
conn.to << CommonProto::write(worker.store, conn, inputPaths);
|
hook->sink << CommonProto::write(worker.store, {}, inputPaths);
|
||||||
|
|
||||||
/* Tell the hooks the missing outputs that have to be copied back
|
/* Tell the hooks the missing outputs that have to be copied back
|
||||||
from the remote system. */
|
from the remote system. */
|
||||||
|
@ -1219,7 +1217,7 @@ HookReply DerivationGoal::tryBuildHook()
|
||||||
if (buildMode != bmCheck && status.known && status.known->isValid()) continue;
|
if (buildMode != bmCheck && status.known && status.known->isValid()) continue;
|
||||||
missingOutputs.insert(outputName);
|
missingOutputs.insert(outputName);
|
||||||
}
|
}
|
||||||
conn.to << CommonProto::write(worker.store, conn, missingOutputs);
|
hook->sink << CommonProto::write(worker.store, {}, missingOutputs);
|
||||||
}
|
}
|
||||||
|
|
||||||
hook->sink = FdSink();
|
hook->sink = FdSink();
|
||||||
|
|
|
@ -37,7 +37,6 @@ struct CommonProto
|
||||||
* canonical serializers below.
|
* canonical serializers below.
|
||||||
*/
|
*/
|
||||||
struct WriteConn {
|
struct WriteConn {
|
||||||
Sink & to;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
template<typename T>
|
template<typename T>
|
||||||
|
|
|
@ -264,7 +264,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
|
||||||
Source & from, BufferedSink & to, WorkerProto::Op op)
|
Source & from, BufferedSink & to, WorkerProto::Op op)
|
||||||
{
|
{
|
||||||
WorkerProto::ReadConn rconn{from, clientVersion};
|
WorkerProto::ReadConn rconn{from, clientVersion};
|
||||||
WorkerProto::WriteConn wconn{to, clientVersion};
|
WorkerProto::WriteConn wconn{clientVersion};
|
||||||
|
|
||||||
switch (op) {
|
switch (op) {
|
||||||
|
|
||||||
|
@ -291,7 +291,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
|
||||||
}
|
}
|
||||||
auto res = store->queryValidPaths(paths, substitute);
|
auto res = store->queryValidPaths(paths, substitute);
|
||||||
logger->stopWork();
|
logger->stopWork();
|
||||||
wconn.to << WorkerProto::write(*store, wconn, res);
|
to << WorkerProto::write(*store, wconn, res);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -300,7 +300,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
|
||||||
logger->startWork();
|
logger->startWork();
|
||||||
auto res = store->querySubstitutablePaths(paths);
|
auto res = store->querySubstitutablePaths(paths);
|
||||||
logger->stopWork();
|
logger->stopWork();
|
||||||
wconn.to << WorkerProto::write(*store, wconn, res);
|
to << WorkerProto::write(*store, wconn, res);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -365,7 +365,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
|
||||||
#pragma GCC diagnostic pop
|
#pragma GCC diagnostic pop
|
||||||
|
|
||||||
logger->stopWork();
|
logger->stopWork();
|
||||||
wconn.to << WorkerProto::write(*store, wconn, paths);
|
to << WorkerProto::write(*store, wconn, paths);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -385,7 +385,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
|
||||||
logger->startWork();
|
logger->startWork();
|
||||||
auto outputs = store->queryPartialDerivationOutputMap(path);
|
auto outputs = store->queryPartialDerivationOutputMap(path);
|
||||||
logger->stopWork();
|
logger->stopWork();
|
||||||
wconn.to << WorkerProto::write(*store, wconn, outputs);
|
to << WorkerProto::write(*store, wconn, outputs);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -432,7 +432,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
|
||||||
}();
|
}();
|
||||||
logger->stopWork();
|
logger->stopWork();
|
||||||
|
|
||||||
wconn.to << WorkerProto::Serialise<ValidPathInfo>::write(*store, wconn, *pathInfo);
|
to << WorkerProto::Serialise<ValidPathInfo>::write(*store, wconn, *pathInfo);
|
||||||
} else {
|
} else {
|
||||||
HashType hashAlgo;
|
HashType hashAlgo;
|
||||||
std::string baseName;
|
std::string baseName;
|
||||||
|
@ -565,7 +565,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
|
||||||
auto results = store->buildPathsWithResults(drvs, mode);
|
auto results = store->buildPathsWithResults(drvs, mode);
|
||||||
logger->stopWork();
|
logger->stopWork();
|
||||||
|
|
||||||
wconn.to << WorkerProto::write(*store, wconn, results);
|
to << WorkerProto::write(*store, wconn, results);
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -643,7 +643,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
|
||||||
|
|
||||||
auto res = store->buildDerivation(drvPath, drv, buildMode);
|
auto res = store->buildDerivation(drvPath, drv, buildMode);
|
||||||
logger->stopWork();
|
logger->stopWork();
|
||||||
wconn.to << WorkerProto::write(*store, wconn, res);
|
to << WorkerProto::write(*store, wconn, res);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -777,7 +777,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
|
||||||
else {
|
else {
|
||||||
to << 1
|
to << 1
|
||||||
<< (i->second.deriver ? store->printStorePath(*i->second.deriver) : "");
|
<< (i->second.deriver ? store->printStorePath(*i->second.deriver) : "");
|
||||||
wconn.to << WorkerProto::write(*store, wconn, i->second.references);
|
to << WorkerProto::write(*store, wconn, i->second.references);
|
||||||
to << i->second.downloadSize
|
to << i->second.downloadSize
|
||||||
<< i->second.narSize;
|
<< i->second.narSize;
|
||||||
}
|
}
|
||||||
|
@ -800,7 +800,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
|
||||||
for (auto & i : infos) {
|
for (auto & i : infos) {
|
||||||
to << store->printStorePath(i.first)
|
to << store->printStorePath(i.first)
|
||||||
<< (i.second.deriver ? store->printStorePath(*i.second.deriver) : "");
|
<< (i.second.deriver ? store->printStorePath(*i.second.deriver) : "");
|
||||||
wconn.to << WorkerProto::write(*store, wconn, i.second.references);
|
to << WorkerProto::write(*store, wconn, i.second.references);
|
||||||
to << i.second.downloadSize << i.second.narSize;
|
to << i.second.downloadSize << i.second.narSize;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -810,7 +810,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
|
||||||
logger->startWork();
|
logger->startWork();
|
||||||
auto paths = store->queryAllValidPaths();
|
auto paths = store->queryAllValidPaths();
|
||||||
logger->stopWork();
|
logger->stopWork();
|
||||||
wconn.to << WorkerProto::write(*store, wconn, paths);
|
to << WorkerProto::write(*store, wconn, paths);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -827,7 +827,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
|
||||||
logger->stopWork();
|
logger->stopWork();
|
||||||
if (info) {
|
if (info) {
|
||||||
to << 1;
|
to << 1;
|
||||||
wconn.to << WorkerProto::write(*store, wconn, static_cast<const UnkeyedValidPathInfo &>(*info));
|
to << WorkerProto::write(*store, wconn, static_cast<const UnkeyedValidPathInfo &>(*info));
|
||||||
} else {
|
} else {
|
||||||
to << 0;
|
to << 0;
|
||||||
}
|
}
|
||||||
|
@ -922,9 +922,9 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
|
||||||
uint64_t downloadSize, narSize;
|
uint64_t downloadSize, narSize;
|
||||||
store->queryMissing(targets, willBuild, willSubstitute, unknown, downloadSize, narSize);
|
store->queryMissing(targets, willBuild, willSubstitute, unknown, downloadSize, narSize);
|
||||||
logger->stopWork();
|
logger->stopWork();
|
||||||
wconn.to << WorkerProto::write(*store, wconn, willBuild);
|
to << WorkerProto::write(*store, wconn, willBuild);
|
||||||
wconn.to << WorkerProto::write(*store, wconn, willSubstitute);
|
to << WorkerProto::write(*store, wconn, willSubstitute);
|
||||||
wconn.to << WorkerProto::write(*store, wconn, unknown);
|
to << WorkerProto::write(*store, wconn, unknown);
|
||||||
to << downloadSize << narSize;
|
to << downloadSize << narSize;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -952,11 +952,11 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
|
||||||
if (GET_PROTOCOL_MINOR(clientVersion) < 31) {
|
if (GET_PROTOCOL_MINOR(clientVersion) < 31) {
|
||||||
std::set<StorePath> outPaths;
|
std::set<StorePath> outPaths;
|
||||||
if (info) outPaths.insert(info->outPath);
|
if (info) outPaths.insert(info->outPath);
|
||||||
wconn.to << WorkerProto::write(*store, wconn, outPaths);
|
to << WorkerProto::write(*store, wconn, outPaths);
|
||||||
} else {
|
} else {
|
||||||
std::set<Realisation> realisations;
|
std::set<Realisation> realisations;
|
||||||
if (info) realisations.insert(*info);
|
if (info) realisations.insert(*info);
|
||||||
wconn.to << WorkerProto::write(*store, wconn, realisations);
|
to << WorkerProto::write(*store, wconn, realisations);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1036,8 +1036,8 @@ void processConnection(
|
||||||
auto temp = trusted
|
auto temp = trusted
|
||||||
? store->isTrustedClient()
|
? store->isTrustedClient()
|
||||||
: std::optional { NotTrusted };
|
: std::optional { NotTrusted };
|
||||||
WorkerProto::WriteConn wconn {to, clientVersion};
|
WorkerProto::WriteConn wconn {clientVersion};
|
||||||
wconn.to << WorkerProto::write(*store, wconn, temp);
|
to << WorkerProto::write(*store, wconn, temp);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Send startup error messages to the client. */
|
/* Send startup error messages to the client. */
|
||||||
|
|
|
@ -995,7 +995,7 @@ void writeDerivation(Sink & out, const Store & store, const BasicDerivation & dr
|
||||||
}, i.second.raw);
|
}, i.second.raw);
|
||||||
}
|
}
|
||||||
out << CommonProto::write(store,
|
out << CommonProto::write(store,
|
||||||
CommonProto::WriteConn { .to = out },
|
CommonProto::WriteConn {},
|
||||||
drv.inputSrcs);
|
drv.inputSrcs);
|
||||||
out << drv.platform << drv.builder << drv.args;
|
out << drv.platform << drv.builder << drv.args;
|
||||||
out << drv.env.size();
|
out << drv.env.size();
|
||||||
|
|
|
@ -47,7 +47,7 @@ void Store::exportPath(const StorePath & path, Sink & sink)
|
||||||
<< exportMagic
|
<< exportMagic
|
||||||
<< printStorePath(path);
|
<< printStorePath(path);
|
||||||
teeSink << CommonProto::write(*this,
|
teeSink << CommonProto::write(*this,
|
||||||
CommonProto::WriteConn { .to = teeSink },
|
CommonProto::WriteConn {},
|
||||||
info->references);
|
info->references);
|
||||||
teeSink
|
teeSink
|
||||||
<< (info->deriver ? printStorePath(*info->deriver) : "")
|
<< (info->deriver ? printStorePath(*info->deriver) : "")
|
||||||
|
|
|
@ -74,7 +74,6 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor
|
||||||
operator ServeProto::WriteConn ()
|
operator ServeProto::WriteConn ()
|
||||||
{
|
{
|
||||||
return ServeProto::WriteConn {
|
return ServeProto::WriteConn {
|
||||||
.to = to,
|
|
||||||
.version = remoteVersion,
|
.version = remoteVersion,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -81,7 +81,7 @@ struct RemoteStore::Connection
|
||||||
*/
|
*/
|
||||||
operator WorkerProto::WriteConn ()
|
operator WorkerProto::WriteConn ()
|
||||||
{
|
{
|
||||||
return WorkerProto::WriteConn {to, daemonVersion};
|
return WorkerProto::WriteConn {daemonVersion};
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual ~Connection();
|
virtual ~Connection();
|
||||||
|
|
|
@ -489,7 +489,7 @@ void RemoteStore::addMultipleToStore(
|
||||||
sink << pathsToCopy.size();
|
sink << pathsToCopy.size();
|
||||||
for (auto & [pathInfo, pathSource] : pathsToCopy) {
|
for (auto & [pathInfo, pathSource] : pathsToCopy) {
|
||||||
sink << WorkerProto::Serialise<ValidPathInfo>::write(*this,
|
sink << WorkerProto::Serialise<ValidPathInfo>::write(*this,
|
||||||
WorkerProto::WriteConn {sink, remoteVersion},
|
WorkerProto::WriteConn {remoteVersion},
|
||||||
pathInfo);
|
pathInfo);
|
||||||
pathSource->drainInto(sink);
|
pathSource->drainInto(sink);
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,7 +50,7 @@ struct ServeProto::Serialise
|
||||||
static WireFormatGenerator write(const Store & store, ServeProto::WriteConn conn, const T & t)
|
static WireFormatGenerator write(const Store & store, ServeProto::WriteConn conn, const T & t)
|
||||||
{
|
{
|
||||||
return CommonProto::Serialise<T>::write(store,
|
return CommonProto::Serialise<T>::write(store,
|
||||||
CommonProto::WriteConn { .to = conn.to },
|
CommonProto::WriteConn {},
|
||||||
t);
|
t);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
@ -60,7 +60,6 @@ struct ServeProto
|
||||||
* canonical serializers below.
|
* canonical serializers below.
|
||||||
*/
|
*/
|
||||||
struct WriteConn {
|
struct WriteConn {
|
||||||
Sink & to;
|
|
||||||
Version version;
|
Version version;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -50,7 +50,7 @@ struct WorkerProto::Serialise
|
||||||
static WireFormatGenerator write(const Store & store, WorkerProto::WriteConn conn, const T & t)
|
static WireFormatGenerator write(const Store & store, WorkerProto::WriteConn conn, const T & t)
|
||||||
{
|
{
|
||||||
return CommonProto::Serialise<T>::write(store,
|
return CommonProto::Serialise<T>::write(store,
|
||||||
CommonProto::WriteConn { .to = conn.to },
|
CommonProto::WriteConn {},
|
||||||
t);
|
t);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
@ -87,10 +87,9 @@ struct WorkerProto
|
||||||
* canonical serializers below.
|
* canonical serializers below.
|
||||||
*/
|
*/
|
||||||
struct WriteConn {
|
struct WriteConn {
|
||||||
Sink & to;
|
|
||||||
Version version;
|
Version version;
|
||||||
|
|
||||||
WriteConn(Sink & to, Version version) : to(to), version(version) {
|
explicit WriteConn(Version version) : version(version) {
|
||||||
assert(version >= MIN_SUPPORTED_WORKER_PROTO_VERSION);
|
assert(version >= MIN_SUPPORTED_WORKER_PROTO_VERSION);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
@ -824,7 +824,6 @@ static void opServe(Strings opFlags, Strings opArgs)
|
||||||
.version = clientVersion,
|
.version = clientVersion,
|
||||||
};
|
};
|
||||||
ServeProto::WriteConn wconn {
|
ServeProto::WriteConn wconn {
|
||||||
.to = out,
|
|
||||||
.version = clientVersion,
|
.version = clientVersion,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -881,7 +880,7 @@ static void opServe(Strings opFlags, Strings opArgs)
|
||||||
}
|
}
|
||||||
|
|
||||||
auto valid = store->queryValidPaths(paths);
|
auto valid = store->queryValidPaths(paths);
|
||||||
wconn.to << ServeProto::write(*store, wconn, valid);
|
out << ServeProto::write(*store, wconn, valid);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -892,7 +891,7 @@ static void opServe(Strings opFlags, Strings opArgs)
|
||||||
try {
|
try {
|
||||||
auto info = store->queryPathInfo(i);
|
auto info = store->queryPathInfo(i);
|
||||||
out << store->printStorePath(info->path);
|
out << store->printStorePath(info->path);
|
||||||
wconn.to << ServeProto::write(*store, wconn, static_cast<const UnkeyedValidPathInfo &>(*info));
|
out << ServeProto::write(*store, wconn, static_cast<const UnkeyedValidPathInfo &>(*info));
|
||||||
} catch (InvalidPath &) {
|
} catch (InvalidPath &) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -951,7 +950,7 @@ static void opServe(Strings opFlags, Strings opArgs)
|
||||||
MonitorFdHup monitor(in.fd);
|
MonitorFdHup monitor(in.fd);
|
||||||
auto status = store->buildDerivation(drvPath, drv);
|
auto status = store->buildDerivation(drvPath, drv);
|
||||||
|
|
||||||
wconn.to << ServeProto::write(*store, wconn, status);
|
out << ServeProto::write(*store, wconn, status);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -960,7 +959,7 @@ static void opServe(Strings opFlags, Strings opArgs)
|
||||||
StorePathSet closure;
|
StorePathSet closure;
|
||||||
store->computeFSClosure(ServeProto::Serialise<StorePathSet>::read(*store, rconn),
|
store->computeFSClosure(ServeProto::Serialise<StorePathSet>::read(*store, rconn),
|
||||||
closure, false, includeOutputs);
|
closure, false, includeOutputs);
|
||||||
wconn.to << ServeProto::write(*store, wconn, closure);
|
out << ServeProto::write(*store, wconn, closure);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -52,7 +52,7 @@ public:
|
||||||
StringSink to;
|
StringSink to;
|
||||||
to << CommonProto::write(
|
to << CommonProto::write(
|
||||||
*store,
|
*store,
|
||||||
CommonProto::WriteConn { .to = to },
|
CommonProto::WriteConn {},
|
||||||
value);
|
value);
|
||||||
|
|
||||||
if (testAccept())
|
if (testAccept())
|
||||||
|
|
|
@ -58,7 +58,7 @@ public:
|
||||||
StringSink to;
|
StringSink to;
|
||||||
to << Proto::write(
|
to << Proto::write(
|
||||||
*LibStoreTest::store,
|
*LibStoreTest::store,
|
||||||
typename Proto::WriteConn {to, version},
|
typename Proto::WriteConn {version},
|
||||||
value);
|
value);
|
||||||
|
|
||||||
if (testAccept())
|
if (testAccept())
|
||||||
|
|
Loading…
Reference in a new issue