Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Agency Node Refactor #19312

Merged
merged 76 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
c662b38
Remove ttl and observe support.
May 26, 2023
4cd4a1d
Fixing tests.
May 30, 2023
38b6503
Rename operator= to loadFromVelocyPack. Fixing assertion.
May 30, 2023
918c66b
Remove unused code.
May 30, 2023
aea3559
Update CHANGELOG.
May 30, 2023
ea11e65
Merge remote-tracking branch 'origin/devel' into feature/remove-ttl-a…
May 30, 2023
0943319
Merge remote-tracking branch 'origin/devel' into feature/remove-ttl-a…
May 30, 2023
e1946bc
Remove agent* from Store.
May 30, 2023
79a8fbc
Fixing more tests.
May 30, 2023
cd1fffc
Fixing unit tests.
May 30, 2023
310ab16
Fixing windows compile.
May 30, 2023
ccd68eb
Merge remote-tracking branch 'origin/devel' into feature/remove-ttl-a…
May 30, 2023
fb4481a
Merge remote-tracking branch 'origin/devel' into feature/remove-ttl-a…
Jun 14, 2023
6a38fdc
Code cleanup.
Jun 16, 2023
e3e0454
Remove callback bin.
Jun 17, 2023
5c8fb4b
Make Node.h less widely included.
Jun 17, 2023
d8d00a9
Initial refactoring. still broken.
Jun 17, 2023
c4b53df
Fixing nodes.
Jun 19, 2023
2ad3571
Make constructors private.
Jun 19, 2023
7d54a07
Fixing some node tests.
Jun 20, 2023
d1e631e
More fixed tests.
Jun 21, 2023
9826729
Fixing even more tests.
Jun 21, 2023
663c73b
More tests.
Jun 21, 2023
60a55af
And even more test fixes.
Jun 21, 2023
9f2c614
More tests fixed.
Jun 23, 2023
2578fb3
Fixed yet another test.
Jun 23, 2023
5d79f49
More tests.
Jun 23, 2023
25d3489
More fixed tests.
Jun 23, 2023
a07a6e5
Second to last test fixed.
Jun 23, 2023
213a089
Fix final test.
Jun 23, 2023
88c3420
Merge remote-tracking branch 'origin/devel' into feature/agency-node-…
Jun 23, 2023
40a2c43
Supervision cleanup.
Jun 26, 2023
58655d7
Merge remote-tracking branch 'origin/devel' into feature/agency-node-…
Jun 26, 2023
c6e7930
Merge remote-tracking branch 'origin/devel' into feature/remove-ttl-a…
Jun 26, 2023
178d03c
Merge remote-tracking branch 'origin/feature/remove-ttl-and-observe' …
Jun 26, 2023
c7cc644
Make explicit copy in store.
Jun 26, 2023
7e819be
Merge remote-tracking branch 'origin/feature/remove-ttl-and-observe' …
Jun 26, 2023
3361011
Fix compilation.
Jun 26, 2023
6175d59
Fixing ResultT constructor.
Jun 26, 2023
8e5fb8f
Fixing push-queue.
Jun 26, 2023
6ec4e06
Reuse nodes for readDB.
Jun 27, 2023
a48f99c
Fixing more jobs.
Jun 27, 2023
54b1c3f
Optimize operations.
Jun 27, 2023
18d2f7f
Revert "Reuse nodes for readDB."
Jun 27, 2023
2e65eaf
Fixing msvc warnings.
Jun 27, 2023
bebadf6
Merge remote-tracking branch 'origin/devel' into feature/remove-ttl-a…
Jun 27, 2023
285ef97
Remove unnecessary include.
Jun 27, 2023
d46f300
Fixing another test.
Jun 28, 2023
e2a94ac
Merge remote-tracking branch 'origin/feature/remove-ttl-and-observe' …
Jun 28, 2023
72aafc3
Revert change.
Jun 28, 2023
40d7faf
Fixing more bugs.
Jun 28, 2023
ae08423
Update velocypack.
Jun 28, 2023
f44a146
Fixing asan issue.
Jun 29, 2023
a34b38a
Merge remote-tracking branch 'origin/devel' into feature/remove-ttl-a…
Jul 5, 2023
05b61f3
Merge branch 'feature/remove-ttl-and-observe' into feature/agency-nod…
Jul 5, 2023
8fac4bf
Remove hasAsNode.
Jul 11, 2023
c82bb5a
Remove hasAsSlice and slice().
Jul 11, 2023
f598f16
Code cleanup.
Jul 11, 2023
48aab14
Adding nullNode.
Jul 11, 2023
6cec768
Merge remote-tracking branch 'origin/devel' into feature/remove-ttl-a…
Jul 11, 2023
5a45db3
Merge remote-tracking branch 'origin/feature/remove-ttl-and-observe' …
Jul 11, 2023
4cb5684
Merge remote-tracking branch 'origin/devel' into feature/agency-node-…
Jul 11, 2023
1230885
Remove support for slahes in keys.
Jul 11, 2023
5592cc7
Remove unused var.
Jul 11, 2023
951d1de
Merge remote-tracking branch 'origin/devel' into feature/remove-ttl-a…
Jul 11, 2023
7cf9b85
Merge branch 'feature/remove-ttl-and-observe' into feature/agency-nod…
Jul 11, 2023
9ffeb71
Clean up.
Jul 11, 2023
9e81543
Fixing more tests.
Jul 12, 2023
4b5de4f
Fix triggers in value-only special case.
Jul 13, 2023
5391ac1
Update arangod/Agency/NodeLoadInspector.h
Jul 13, 2023
48652d1
Fixing tests again.
Jul 13, 2023
9a70b84
Merge remote-tracking branch 'origin/devel' into feature/remove-ttl-a…
Jul 17, 2023
3c346b2
Merge remote-tracking branch 'origin/feature/remove-ttl-and-observe' …
Jul 17, 2023
6b6c769
Merge remote-tracking branch 'origin/devel' into feature/remove-ttl-a…
Jul 17, 2023
358cc93
Merge remote-tracking branch 'origin/feature/remove-ttl-and-observe' …
Jul 17, 2023
6c2d376
Update velocypack.
Jul 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions arangod/Agency/ActiveFailoverJob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ bool ActiveFailoverJob::start(bool&) {
return finish(_server, "", true, reason); // move to /Target/Finished
}

auto leader = _snapshot.hasAsSlice(asyncReplLeader);
if (!leader || leader->compareString(_server) != 0) {
auto leader = _snapshot.hasAsStringView(asyncReplLeader);
if (!leader || leader != _server) {
std::string reason =
"Server " + _server + " is not the current replication leader";
LOG_TOPIC("d468e", INFO, Logger::SUPERVISION) << reason;
Expand Down Expand Up @@ -222,7 +222,7 @@ bool ActiveFailoverJob::start(bool&) {
// Destination server should not be blocked by another job
addPreconditionServerNotBlocked(pending, newLeader);
// AsyncReplication leader must be the failed server
addPreconditionUnchanged(pending, asyncReplLeader, leader.value());
addPreconditionUnchanged(pending, asyncReplLeader, VPackValue(*leader));
} // precondition done

} // array for transaction done
Expand Down
10 changes: 5 additions & 5 deletions arangod/Agency/AddFollower.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ bool AddFollower::start(bool&) {
return false;
}
Node const& collection =
*_snapshot.hasAsNode(planColPrefix + _database + "/" + _collection);
*_snapshot.get(planColPrefix + _database + "/" + _collection);
if (collection.has("distributeShardsLike")) {
finish("", "", false,
"collection must not have 'distributeShardsLike' attribute");
Expand All @@ -158,8 +158,8 @@ bool AddFollower::start(bool&) {
std::string planPath =
planColPrefix + _database + "/" + _collection + "/shards/" + _shard;

Slice planned = _snapshot.hasAsSlice(planPath).value();

auto plannedBuilder = _snapshot.get(planPath)->toBuilder();
auto planned = plannedBuilder.slice();
TRI_ASSERT(planned.isArray());

// First check that we still have too few followers for the current
Expand Down Expand Up @@ -350,9 +350,9 @@ bool AddFollower::start(bool&) {
// "failoverCandidates":
std::string foCandsPath = curPath.substr(0, curPath.size() - 7);
foCandsPath += StaticStrings::FailoverCandidates;
auto foCands = this->_snapshot.hasAsSlice(foCandsPath);
auto foCands = this->_snapshot.hasAsBuilder(foCandsPath);
if (foCands) {
addPreconditionUnchanged(trx, foCandsPath, foCands.value());
addPreconditionUnchanged(trx, foCandsPath, foCands->slice());
}
});
addPreconditionShardNotBlocked(trx, _shard);
Expand Down
4 changes: 2 additions & 2 deletions arangod/Agency/Agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2061,13 +2061,13 @@ Store const& Agent::transient() const { return _transient; }
/// Rebuild from persisted state
void Agent::setPersistedState(VPackSlice compaction) {
// Catch up with compacted state, this is only called at startup
_spearhead.loadFromVelocyPack(compaction);
_spearhead.setNodeValue(compaction);

// Catch up with commit
try {
WRITE_LOCKER(oLocker, _outputLock);
std::lock_guard guard{_waitForCV.mutex};
_readDB.loadFromVelocyPack(compaction);
_readDB.setNodeValue(compaction);
_commitIndex = arangodb::basics::StringUtils::uint64(
compaction.get(StaticStrings::KeyString).copyString());
_local_index = _commitIndex.load(std::memory_order_relaxed);
Expand Down
16 changes: 8 additions & 8 deletions arangod/Agency/CleanOutServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ bool CleanOutServer::start(bool& aborts) {

// Check that _to is not in `Target/CleanedServers`:
VPackBuilder cleanedServersBuilder;
auto const& cleanedServersNode = _snapshot.hasAsNode(cleanedPrefix);
auto const& cleanedServersNode = _snapshot.get(cleanedPrefix);
if (cleanedServersNode) {
cleanedServersNode->toBuilder(cleanedServersBuilder);
} else {
Expand All @@ -253,10 +253,10 @@ bool CleanOutServer::start(bool& aborts) {

// Check that _to is not in `Target/FailedServers`:
// (this node is expected to NOT exists, so make test before processing
// so that hasAsNode does not generate a warning log message)
// so that get does not generate a warning log message)
VPackBuilder failedServersBuilder;
if (_snapshot.has(failedServersPrefix)) {
auto const& failedServersNode = _snapshot.hasAsNode(failedServersPrefix);
auto const& failedServersNode = _snapshot.get(failedServersPrefix);
if (failedServersNode) {
failedServersNode->toBuilder(failedServersBuilder);
} else {
Expand Down Expand Up @@ -354,7 +354,7 @@ bool CleanOutServer::start(bool& aborts) {
addPreconditionUnchanged(*pending, failedServersPrefix, failedServers);
addPreconditionUnchanged(*pending, cleanedPrefix, cleanedServers);
addPreconditionUnchanged(*pending, planVersion,
_snapshot.get(planVersion)->slice());
_snapshot.get(planVersion)->toBuilder().slice());
}
} // array for transaction done

Expand Down Expand Up @@ -383,7 +383,7 @@ void CleanOutServer::scheduleJobsR2(std::shared_ptr<Builder>& trx,
auto logs = logsChild;

for (auto const& [logIdString, logNode] : *logs) {
auto logTarget = deserialize<replication2::agency::LogTarget>(*logNode);
auto logTarget = deserialize<replication2::agency::LogTarget>(logNode);
bool removeServer = logTarget.participants.contains(_server);

if (removeServer) {
Expand Down Expand Up @@ -436,8 +436,9 @@ bool CleanOutServer::scheduleMoveShards(std::shared_ptr<Builder>& trx) {
for (auto const& shard : *collection.hasAsChildren("shards")) {
// Only shards, which are affected
int found = -1;

int count = 0;
for (VPackSlice dbserver : VPackArrayIterator(shard.second->slice())) {
for (VPackSlice dbserver : *shard.second->getArray()) {
if (dbserver.stringView() == _server) {
found = count;
break;
Expand Down Expand Up @@ -481,8 +482,7 @@ bool CleanOutServer::scheduleMoveShards(std::shared_ptr<Builder>& trx) {
decltype(servers) serversCopy(servers); // a copy

// Only destinations, which are not already holding this shard
for (VPackSlice dbserver :
VPackArrayIterator(shard.second->slice())) {
for (VPackSlice dbserver : *shard.second->getArray()) {
serversCopy.erase(
std::remove(serversCopy.begin(), serversCopy.end(),
dbserver.copyString()),
Expand Down
11 changes: 6 additions & 5 deletions arangod/Agency/FailedFollower.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,14 @@ bool FailedFollower::start(bool& aborts) {
// Planned servers vector
std::string planPath =
planColPrefix + _database + "/" + _collection + "/shards/" + _shard;
auto plannedPair = _snapshot.hasAsSlice(planPath);
auto plannedPair = _snapshot.get(planPath);
if (!plannedPair) {
finish("", _shard, true,
"Plan entry for collection " + _collection + " gone");
return false;
}
Slice const& planned = plannedPair.value();
auto builder = plannedPair->toBuilder();
Slice planned = builder.slice();

// Now check if _server is still in this plan, note that it could have
// been removed by RemoveFollower already, in which case we simply stop:
Expand Down Expand Up @@ -230,7 +231,7 @@ bool FailedFollower::start(bool& aborts) {
{
VPackArrayBuilder a(&todo);
if (_jb == nullptr) {
auto const& jobIdNode = _snapshot.hasAsNode(toDoPrefix + _jobId);
auto const& jobIdNode = _snapshot.get(toDoPrefix + _jobId);
if (jobIdNode) {
jobIdNode->toBuilder(todo);
} else {
Expand Down Expand Up @@ -310,9 +311,9 @@ bool FailedFollower::start(bool& aborts) {
// "failoverCandidates":
std::string foCandsPath = curPath.substr(0, curPath.size() - 7);
foCandsPath += StaticStrings::FailoverCandidates;
auto foCands = this->_snapshot.hasAsSlice(foCandsPath);
auto foCands = this->_snapshot.hasAsBuilder(foCandsPath);
if (foCands) {
addPreconditionUnchanged(job, foCandsPath, foCands.value());
addPreconditionUnchanged(job, foCandsPath, foCands->slice());
}
});
// toServer not blocked
Expand Down
28 changes: 16 additions & 12 deletions arangod/Agency/FailedLeader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ void FailedLeader::rollback() {
// Create new plan servers (exchange _to and _from)
std::string planPath =
planColPrefix + _database + "/" + _collection + "/shards/" + _shard;
auto planned = _snapshot.hasAsSlice(planPath).value();
auto plannedBuilder = _snapshot.hasAsBuilder(planPath).value();
auto planned = plannedBuilder.slice();
std::shared_ptr<Builder> payload = nullptr;

if (_status == PENDING) { // Only payload if pending. Otherwise just fail.
Expand Down Expand Up @@ -218,19 +219,21 @@ bool FailedLeader::start(bool& aborts) {
// Current servers vector
std::string curPath =
curColPrefix + _database + "/" + _collection + "/" + _shard;
auto current = _snapshot.hasAsSlice(curPath + "/servers").value();
auto currentBuilder = _snapshot.get(curPath + "/servers")->toBuilder();
auto current = currentBuilder.slice();

// Planned servers vector
std::string planPath =
planColPrefix + _database + "/" + _collection + "/shards/" + _shard;
auto planned = _snapshot.hasAsSlice(planPath).value();
auto plannedBuilder = _snapshot.get(planPath)->toBuilder();
auto planned = plannedBuilder.slice();

// Get todo entry
Builder todo;
{
VPackArrayBuilder t(&todo);
if (_jb == nullptr) {
auto const& jobIdNode = _snapshot.hasAsNode(toDoPrefix + _jobId);
auto const& jobIdNode = _snapshot.get(toDoPrefix + _jobId);
if (jobIdNode) {
jobIdNode->toBuilder(todo);
} else {
Expand Down Expand Up @@ -371,9 +374,10 @@ bool FailedLeader::start(bool& aborts) {
// "failoverCandidates":
std::string foCandsPath = curPath.substr(0, curPath.size() - 7);
foCandsPath += StaticStrings::FailoverCandidates;
auto foCands = this->_snapshot.hasAsSlice(foCandsPath);
auto foCands = this->_snapshot.hasAsBuilder(foCandsPath);
if (foCands) {
addPreconditionUnchanged(pending, foCandsPath, *foCands);
addPreconditionUnchanged(pending, foCandsPath,
foCands->slice());
}
});
// Destination server should not be blocked by another job
Expand Down Expand Up @@ -498,7 +502,7 @@ JOB_STATUS FailedLeader::status() {
}

std::string database, shard;
auto const& job = _snapshot.hasAsNode(pendingPrefix + _jobId);
auto const& job = _snapshot.get(pendingPrefix + _jobId);
if (job) {
auto tmp_database = job->hasAsString("database");
auto tmp_shard = job->hasAsString("shard");
Expand All @@ -516,12 +520,12 @@ JOB_STATUS FailedLeader::status() {
for (auto const& clone : clones(_snapshot, _database, _collection, _shard)) {
auto sub = database + "/" + clone.collection;
auto plan_slice =
_snapshot.hasAsSlice(planColPrefix + sub + "/shards/" + clone.shard);
auto cur_slice = _snapshot.hasAsSlice(curColPrefix + sub + "/" +
clone.shard + "/servers");
_snapshot.hasAsBuilder(planColPrefix + sub + "/shards/" + clone.shard);
auto cur_slice = _snapshot.hasAsBuilder(curColPrefix + sub + "/" +
clone.shard + "/servers");
if (plan_slice && cur_slice &&
!basics::VelocyPackHelper::equal(plan_slice.value()[0],
cur_slice.value()[0], false)) {
!basics::VelocyPackHelper::equal(plan_slice->slice()[0],
cur_slice->slice()[0], false)) {
LOG_TOPIC("0d8ca", DEBUG, Logger::SUPERVISION)
<< "FailedLeader waiting for " << sub << "/" << shard;
break;
Expand Down
33 changes: 12 additions & 21 deletions arangod/Agency/FailedServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ bool FailedServer::start(bool& aborts) {
return true;
};

auto dbserverLock = _snapshot.hasAsSlice(blockedServersPrefix + _server);
auto dbserverLock = _snapshot.hasAsBuilder(blockedServersPrefix + _server);
if (dbserverLock) {
auto s = *dbserverLock;
auto s = dbserverLock->slice();
if (s.isArray()) {
for (auto const& m : VPackArrayIterator(s)) {
if (m.isString()) {
Expand Down Expand Up @@ -147,7 +147,7 @@ bool FailedServer::start(bool& aborts) {
{
VPackArrayBuilder t(&todo);
if (_jb == nullptr) {
auto const& toDoJob = _snapshot.hasAsNode(toDoPrefix + _jobId);
auto const& toDoJob = _snapshot.get(toDoPrefix + _jobId);
if (toDoJob) {
toDoJob->toBuilder(todo);
} else {
Expand Down Expand Up @@ -184,28 +184,19 @@ bool FailedServer::start(bool& aborts) {
for (auto const& collptr : database.second->children()) {
auto const& collection = *(collptr.second);

auto const& replicationFactorPair =
collection.hasAsNode(StaticStrings::ReplicationFactor);
if (replicationFactorPair) {
VPackSlice const replicationFactor = replicationFactorPair->slice();
auto const& replicationFactor =
collection.get(StaticStrings::ReplicationFactor);
if (replicationFactor) {
uint64_t number = 1;
bool isSatellite = false;

if (replicationFactor.isString() &&
replicationFactor.compareString(StaticStrings::Satellite) ==
0) {
if (replicationFactor->getStringView() ==
StaticStrings::Satellite) {
isSatellite = true; // do nothing - number =
// Job::availableServers(_snapshot).size();
} else if (replicationFactor.isNumber()) {
try {
number = replicationFactor.getNumber<uint64_t>();
} catch (...) {
LOG_TOPIC("f5290", ERR, Logger::SUPERVISION)
<< "Failed to read replicationFactor. job: " << _jobId
<< " " << collection.hasAsString("id").value();
continue;
}

} else if (auto maybeNumber = replicationFactor->getUInt();
maybeNumber) {
number = *maybeNumber;
if (number == 1) {
continue;
}
Expand All @@ -220,7 +211,7 @@ bool FailedServer::start(bool& aborts) {
for (auto const& shard : *collection.hasAsChildren("shards")) {
size_t pos = 0;

for (VPackSlice it : VPackArrayIterator(shard.second->slice())) {
for (auto it : *shard.second->getArray()) {
auto dbs = it.copyString();

if (dbs == _server || dbs == "_" + _server) {
Expand Down
4 changes: 2 additions & 2 deletions arangod/Agency/Helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ namespace arangodb::consensus {
bool isReplicationTwoDB(Node::Children const& databases,
std::string const& dbName) {
auto it = databases.find(dbName);
if (it == databases.end()) {
if (it == nullptr) {
// this should actually never happen, but if it does we simply claim that
// this is an old replication 1 DB.
return false;
}

if (auto v = it->second->hasAsString(StaticStrings::ReplicationVersion); v) {
if (auto v = (*it)->hasAsString(StaticStrings::ReplicationVersion); v) {
auto res = replication::parseVersion(v.value());
return res.ok() && res.get() == replication::Version::TWO;
}
Expand Down
Loading