gc: refactor the gc server thread out into a class without changing it
This removes a *whole load* of variables from scope and enforces thread boundaries with the type system. There is not much change of significance in here, so the things to watch out for while reviewing it are primarily that the destructor ordering may have changed inadvertently, I think. Change-Id: I3cd87e6d5a08dfcf368637407251db22a8906316
This commit is contained in:
parent
22252825c4
commit
77ff799cc8
4 changed files with 197 additions and 132 deletions
|
@ -22,8 +22,8 @@
|
||||||
namespace nix {
|
namespace nix {
|
||||||
|
|
||||||
|
|
||||||
static std::string gcSocketPath = "/gc-socket/socket";
|
constexpr static const std::string_view gcSocketPath = "/gc-socket/socket";
|
||||||
static std::string gcRootsDir = "gcroots";
|
constexpr static const std::string_view gcRootsDir = "gcroots";
|
||||||
|
|
||||||
|
|
||||||
static void makeSymlink(const Path & link, const Path & target)
|
static void makeSymlink(const Path & link, const Path & target)
|
||||||
|
@ -359,16 +359,34 @@ void LocalStore::findRuntimeRoots(Roots & roots, bool censor)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
struct GCLimitReached { };
|
struct GCLimitReached : std::exception { };
|
||||||
|
|
||||||
|
|
||||||
void LocalStore::collectGarbage(const GCOptions & options, GCResults & results)
|
/**
|
||||||
{
|
* Delegate class to expose just the operations required to perform GC on a store.
|
||||||
bool shouldDelete = options.action == GCOptions::gcDeleteDead || options.action == GCOptions::gcDeleteSpecific;
|
*/
|
||||||
bool gcKeepOutputs = settings.gcKeepOutputs;
|
class GCStoreDelegate {
|
||||||
bool gcKeepDerivations = settings.gcKeepDerivations;
|
LocalStore const & store;
|
||||||
|
|
||||||
StorePathSet roots, dead, alive;
|
public:
|
||||||
|
GCStoreDelegate(LocalStore const & store) : store(store) {}
|
||||||
|
|
||||||
|
std::optional<StorePath> maybeParseStorePath(std::string_view path) const {
|
||||||
|
return store.maybeParseStorePath(path);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class holding a server to receive new GC roots.
|
||||||
|
*/
|
||||||
|
class GCOperation {
|
||||||
|
const GCStoreDelegate store;
|
||||||
|
|
||||||
|
std::thread serverThread;
|
||||||
|
Pipe shutdownPipe;
|
||||||
|
|
||||||
|
AutoCloseFD fdServer;
|
||||||
|
|
||||||
struct Shared
|
struct Shared
|
||||||
{
|
{
|
||||||
|
@ -381,45 +399,60 @@ void LocalStore::collectGarbage(const GCOptions & options, GCResults & results)
|
||||||
std::optional<std::string> pending;
|
std::optional<std::string> pending;
|
||||||
};
|
};
|
||||||
|
|
||||||
Sync<Shared> _shared;
|
void runServerThread();
|
||||||
|
|
||||||
std::condition_variable wakeup;
|
std::condition_variable wakeup;
|
||||||
|
Sync<Shared> _shared;
|
||||||
|
|
||||||
/* Using `--ignore-liveness' with `--delete' can have unintended
|
public:
|
||||||
consequences if `keep-outputs' or `keep-derivations' are true
|
GCOperation(LocalStore const & store, Path stateDir) : store(store)
|
||||||
(the garbage collector will recurse into deleting the outputs
|
{
|
||||||
or derivers, respectively). So disable them. */
|
|
||||||
if (options.action == GCOptions::gcDeleteSpecific && options.ignoreLiveness) {
|
|
||||||
gcKeepOutputs = false;
|
|
||||||
gcKeepDerivations = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (shouldDelete)
|
|
||||||
deletePath(reservedPath);
|
|
||||||
|
|
||||||
/* Acquire the global GC root. Note: we don't use fdGCLock
|
|
||||||
here because then in auto-gc mode, another thread could
|
|
||||||
downgrade our exclusive lock. */
|
|
||||||
auto fdGCLock = openGCLock();
|
|
||||||
FdLock gcLock(fdGCLock.get(), ltWrite, true, "waiting for the big garbage collector lock...");
|
|
||||||
|
|
||||||
/* Synchronisation point to test ENOENT handling in
|
|
||||||
addTempRoot(), see tests/gc-non-blocking.sh. */
|
|
||||||
if (auto p = getEnv("_NIX_TEST_GC_SYNC_1"))
|
|
||||||
readFile(*p);
|
|
||||||
|
|
||||||
/* Start the server for receiving new roots. */
|
/* Start the server for receiving new roots. */
|
||||||
auto socketPath = stateDir.get() + gcSocketPath;
|
|
||||||
createDirs(dirOf(socketPath));
|
|
||||||
auto fdServer = createUnixDomainSocket(socketPath, 0666);
|
|
||||||
|
|
||||||
if (fcntl(fdServer.get(), F_SETFL, fcntl(fdServer.get(), F_GETFL) | O_NONBLOCK) == -1)
|
|
||||||
throw SysError("making socket '%1%' non-blocking", socketPath);
|
|
||||||
|
|
||||||
Pipe shutdownPipe;
|
|
||||||
shutdownPipe.create();
|
shutdownPipe.create();
|
||||||
|
|
||||||
std::thread serverThread([&]() {
|
auto socketPath = stateDir + gcSocketPath;
|
||||||
|
createDirs(dirOf(socketPath));
|
||||||
|
fdServer = createUnixDomainSocket(socketPath, 0666);
|
||||||
|
|
||||||
|
if (fcntl(fdServer.get(), F_SETFL, fcntl(fdServer.get(), F_GETFL) | O_NONBLOCK) == -1) {
|
||||||
|
throw SysError("making socket '%1%' non-blocking", socketPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
serverThread = std::thread([this]() { runServerThread(); });
|
||||||
|
}
|
||||||
|
|
||||||
|
void addTempRoot(std::string rootHashPart)
|
||||||
|
{
|
||||||
|
_shared.lock()->tempRoots.insert(rootHashPart);
|
||||||
|
}
|
||||||
|
|
||||||
|
void releasePending()
|
||||||
|
{
|
||||||
|
auto shared(_shared.lock());
|
||||||
|
shared->pending.reset();
|
||||||
|
wakeup.notify_all();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Marks a path as pending deletion if it is not in tempRoots.
|
||||||
|
*
|
||||||
|
* Returns whether it was marked for deletion.
|
||||||
|
*/
|
||||||
|
bool markPendingIfPresent(std::string const & hashPart)
|
||||||
|
{
|
||||||
|
auto shared(_shared.lock());
|
||||||
|
if (shared->tempRoots.count(hashPart)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
shared->pending = hashPart;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
~GCOperation();
|
||||||
|
};
|
||||||
|
|
||||||
|
void GCOperation::runServerThread()
|
||||||
|
{
|
||||||
Sync<std::map<int, std::thread>> connections;
|
Sync<std::map<int, std::thread>> connections;
|
||||||
|
|
||||||
Finally cleanup([&]() {
|
Finally cleanup([&]() {
|
||||||
|
@ -474,7 +507,7 @@ void LocalStore::collectGarbage(const GCOptions & options, GCResults & results)
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
auto path = readLine(fdClient.get());
|
auto path = readLine(fdClient.get());
|
||||||
auto storePath = maybeParseStorePath(path);
|
auto storePath = store.maybeParseStorePath(path);
|
||||||
if (storePath) {
|
if (storePath) {
|
||||||
debug("got new GC root '%s'", path);
|
debug("got new GC root '%s'", path);
|
||||||
auto hashPart = std::string(storePath->hashPart());
|
auto hashPart = std::string(storePath->hashPart());
|
||||||
|
@ -505,13 +538,51 @@ void LocalStore::collectGarbage(const GCOptions & options, GCResults & results)
|
||||||
connections.lock()->insert({fdClient_, std::move(clientThread)});
|
connections.lock()->insert({fdClient_, std::move(clientThread)});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
|
|
||||||
Finally stopServer([&]() {
|
GCOperation::~GCOperation()
|
||||||
|
{
|
||||||
writeFull(shutdownPipe.writeSide.get(), "x", false);
|
writeFull(shutdownPipe.writeSide.get(), "x", false);
|
||||||
|
{
|
||||||
|
auto shared(_shared.lock());
|
||||||
wakeup.notify_all();
|
wakeup.notify_all();
|
||||||
|
}
|
||||||
if (serverThread.joinable()) serverThread.join();
|
if (serverThread.joinable()) serverThread.join();
|
||||||
});
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void LocalStore::collectGarbage(const GCOptions & options, GCResults & results)
|
||||||
|
{
|
||||||
|
bool shouldDelete = options.action == GCOptions::gcDeleteDead || options.action == GCOptions::gcDeleteSpecific;
|
||||||
|
bool gcKeepOutputs = settings.gcKeepOutputs;
|
||||||
|
bool gcKeepDerivations = settings.gcKeepDerivations;
|
||||||
|
|
||||||
|
StorePathSet roots, dead, alive;
|
||||||
|
|
||||||
|
/* Using `--ignore-liveness' with `--delete' can have unintended
|
||||||
|
consequences if `keep-outputs' or `keep-derivations' are true
|
||||||
|
(the garbage collector will recurse into deleting the outputs
|
||||||
|
or derivers, respectively). So disable them. */
|
||||||
|
if (options.action == GCOptions::gcDeleteSpecific && options.ignoreLiveness) {
|
||||||
|
gcKeepOutputs = false;
|
||||||
|
gcKeepDerivations = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (shouldDelete)
|
||||||
|
deletePath(reservedSpacePath);
|
||||||
|
|
||||||
|
/* Acquire the global GC root. Note: we don't use fdGCLock
|
||||||
|
here because then in auto-gc mode, another thread could
|
||||||
|
downgrade our exclusive lock. */
|
||||||
|
auto fdGCLock = openGCLock();
|
||||||
|
FdLock gcLock(fdGCLock.get(), ltWrite, true, "waiting for the big garbage collector lock...");
|
||||||
|
|
||||||
|
/* Synchronisation point to test ENOENT handling in
|
||||||
|
addTempRoot(), see tests/gc-non-blocking.sh. */
|
||||||
|
if (auto p = getEnv("_NIX_TEST_GC_SYNC_1"))
|
||||||
|
readFile(*p);
|
||||||
|
|
||||||
|
GCOperation gcServer {*this, stateDir.get()};
|
||||||
|
|
||||||
/* Find the roots. Since we've grabbed the GC lock, the set of
|
/* Find the roots. Since we've grabbed the GC lock, the set of
|
||||||
permanent roots cannot increase now. */
|
permanent roots cannot increase now. */
|
||||||
|
@ -527,7 +598,7 @@ void LocalStore::collectGarbage(const GCOptions & options, GCResults & results)
|
||||||
Roots tempRoots;
|
Roots tempRoots;
|
||||||
findTempRoots(tempRoots, true);
|
findTempRoots(tempRoots, true);
|
||||||
for (auto & root : tempRoots) {
|
for (auto & root : tempRoots) {
|
||||||
_shared.lock()->tempRoots.insert(std::string(root.first.hashPart()));
|
gcServer.addTempRoot(std::string(root.first.hashPart()));
|
||||||
roots.insert(root.first);
|
roots.insert(root.first);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -580,9 +651,7 @@ void LocalStore::collectGarbage(const GCOptions & options, GCResults & results)
|
||||||
/* Wake up any GC client waiting for deletion of the paths in
|
/* Wake up any GC client waiting for deletion of the paths in
|
||||||
'visited' to finish. */
|
'visited' to finish. */
|
||||||
Finally releasePending([&]() {
|
Finally releasePending([&]() {
|
||||||
auto shared(_shared.lock());
|
gcServer.releasePending();
|
||||||
shared->pending.reset();
|
|
||||||
wakeup.notify_all();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
auto enqueue = [&](const StorePath & path) {
|
auto enqueue = [&](const StorePath & path) {
|
||||||
|
@ -629,15 +698,10 @@ void LocalStore::collectGarbage(const GCOptions & options, GCResults & results)
|
||||||
&& !options.pathsToDelete.count(*path))
|
&& !options.pathsToDelete.count(*path))
|
||||||
return;
|
return;
|
||||||
|
|
||||||
{
|
if (!gcServer.markPendingIfPresent(std::string(path->hashPart()))) {
|
||||||
auto hashPart = std::string(path->hashPart());
|
|
||||||
auto shared(_shared.lock());
|
|
||||||
if (shared->tempRoots.count(hashPart)) {
|
|
||||||
debug("cannot delete '%s' because it's a temporary root", printStorePath(*path));
|
debug("cannot delete '%s' because it's a temporary root", printStorePath(*path));
|
||||||
return markAlive();
|
return markAlive();
|
||||||
}
|
}
|
||||||
shared->pending = hashPart;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (isValidPath(*path)) {
|
if (isValidPath(*path)) {
|
||||||
|
|
||||||
|
|
|
@ -181,7 +181,7 @@ LocalStore::LocalStore(const Params & params)
|
||||||
, LocalFSStore(params)
|
, LocalFSStore(params)
|
||||||
, dbDir(stateDir + "/db")
|
, dbDir(stateDir + "/db")
|
||||||
, linksDir(realStoreDir + "/.links")
|
, linksDir(realStoreDir + "/.links")
|
||||||
, reservedPath(dbDir + "/reserved")
|
, reservedSpacePath(dbDir + "/reserved")
|
||||||
, schemaPath(dbDir + "/schema")
|
, schemaPath(dbDir + "/schema")
|
||||||
, tempRootsDir(stateDir + "/temproots")
|
, tempRootsDir(stateDir + "/temproots")
|
||||||
, fnTempRoots(fmt("%s/%d", tempRootsDir, getpid()))
|
, fnTempRoots(fmt("%s/%d", tempRootsDir, getpid()))
|
||||||
|
@ -259,10 +259,10 @@ LocalStore::LocalStore(const Params & params)
|
||||||
before doing a garbage collection. */
|
before doing a garbage collection. */
|
||||||
try {
|
try {
|
||||||
struct stat st;
|
struct stat st;
|
||||||
if (stat(reservedPath.c_str(), &st) == -1 ||
|
if (stat(reservedSpacePath.c_str(), &st) == -1 ||
|
||||||
st.st_size != settings.reservedSize)
|
st.st_size != settings.reservedSize)
|
||||||
{
|
{
|
||||||
AutoCloseFD fd{open(reservedPath.c_str(), O_WRONLY | O_CREAT | O_CLOEXEC, 0600)};
|
AutoCloseFD fd{open(reservedSpacePath.c_str(), O_WRONLY | O_CREAT | O_CLOEXEC, 0600)};
|
||||||
int res = -1;
|
int res = -1;
|
||||||
#if HAVE_POSIX_FALLOCATE
|
#if HAVE_POSIX_FALLOCATE
|
||||||
res = posix_fallocate(fd.get(), 0, settings.reservedSize);
|
res = posix_fallocate(fd.get(), 0, settings.reservedSize);
|
||||||
|
|
|
@ -119,7 +119,8 @@ public:
|
||||||
|
|
||||||
const Path dbDir;
|
const Path dbDir;
|
||||||
const Path linksDir;
|
const Path linksDir;
|
||||||
const Path reservedPath;
|
/** Path kept around to reserve some filesystem space to be able to begin a garbage collection */
|
||||||
|
const Path reservedSpacePath;
|
||||||
const Path schemaPath;
|
const Path schemaPath;
|
||||||
const Path tempRootsDir;
|
const Path tempRootsDir;
|
||||||
const Path fnTempRoots;
|
const Path fnTempRoots;
|
||||||
|
|
|
@ -33,7 +33,7 @@ sleep 2
|
||||||
pid2=$!
|
pid2=$!
|
||||||
|
|
||||||
# Start a build. This should not be blocked by the GC in progress.
|
# Start a build. This should not be blocked by the GC in progress.
|
||||||
outPath=$(nix-build --max-silent-time 60 -o "$TEST_ROOT/result" -E "
|
outPath=$(nix-build --max-silent-time 60 --debug -o "$TEST_ROOT/result" -E "
|
||||||
with import ./config.nix;
|
with import ./config.nix;
|
||||||
mkDerivation {
|
mkDerivation {
|
||||||
name = \"non-blocking\";
|
name = \"non-blocking\";
|
||||||
|
|
Loading…
Reference in a new issue