Skip to content

Commit

Permalink
Speed up incremental collection replication using revisions (arangodb…
Browse files Browse the repository at this point in the history
  • Loading branch information
jsteemann authored Feb 18, 2022
1 parent 2d75fdb commit 4ea2ef4
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 88 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
devel
-----

* Parallelize applying of revision tree changes with fetching next revision
tree range in incremental collection replication for collections created
with ArangoDB 3.8 and higher.

* Support JSON schema objects for documenting Foxx endpoints.

* Sorted out various geo problems:
Expand Down
226 changes: 165 additions & 61 deletions arangod/Replication/DatabaseInitialSyncer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ std::chrono::milliseconds sleepTimeFromWaitTime(double waitTime) {
return std::chrono::seconds(2);
}

bool isVelocyPack(arangodb::httpclient::SimpleHttpResult const& response) {
bool found = false;
std::string const& cType = response.getHeaderField(
arangodb::StaticStrings::ContentTypeHeader, found);
return found && cType == arangodb::StaticStrings::MimeTypeVPack;
}

std::string const kTypeString = "type";
std::string const kDataString = "data";

Expand Down Expand Up @@ -666,6 +673,7 @@ Result DatabaseInitialSyncer::parseCollectionDumpMarker(
Result DatabaseInitialSyncer::parseCollectionDump(
transaction::Methods& trx, LogicalCollection* coll,
httpclient::SimpleHttpResult* response, uint64_t& markersProcessed) {
TRI_ASSERT(response != nullptr);
TRI_ASSERT(!trx.isSingleOperationTransaction());

FormatHint hint = FormatHint::AutoDetect;
Expand All @@ -674,18 +682,13 @@ Result DatabaseInitialSyncer::parseCollectionDump(
char const* p = data.begin();
char const* end = p + data.length();

bool found = false;
std::string const& cType =
response->getHeaderField(StaticStrings::ContentTypeHeader, found);

if (found && cType == StaticStrings::MimeTypeVPack) {
if (isVelocyPack(*response)) {
// received a velocypack response from the leader
LOG_TOPIC("b9f4d", DEBUG, Logger::REPLICATION)
<< "using vpack for chunk contents";

// intentional copy
VPackOptions validationOptions =
basics::VelocyPackHelper::strictRequestValidationOptions;

// allow custom types being sent here
validationOptions.disallowCustom = false;

Expand Down Expand Up @@ -845,17 +848,20 @@ void DatabaseInitialSyncer::fetchDumpChunk(
_config.flushed = true;
}

bool isVPack = false;
auto headers = replutils::createHeaders();
if (_config.leader.version() >= 30800) {
// from 3.8 onwards, it is safe and also faster to retrieve vpack-encoded
// dumps. in previous versions there may be vpack encoding issues for the
// /_api/replication/dump responses.
headers[StaticStrings::Accept] = StaticStrings::MimeTypeVPack;
isVPack = true;
}

_config.progress.set(
std::string("fetching leader collection dump for collection '") +
coll->name() + "', type: " + typeString + ", id: " + leaderColl +
coll->name() + "', type: " + typeString +
", format: " + (isVPack ? "vpack" : "json") + ", id: " + leaderColl +
", batch " + itoa(batch) + ", url: " + url);

double t = TRI_microtime();
Expand Down Expand Up @@ -1431,6 +1437,70 @@ Result DatabaseInitialSyncer::fetchCollectionSyncByKeys(
}
}

/// @brief order a new chunk from the /revisions API
void DatabaseInitialSyncer::fetchRevisionsChunk(
std::shared_ptr<Syncer::JobSynchronizer> sharedStatus,
std::string const& baseUrl, arangodb::LogicalCollection* coll,
std::string const& leaderColl, std::string const& requestPayload,
RevisionId requestResume) {
if (isAborted()) {
sharedStatus->gotResponse(Result(TRI_ERROR_REPLICATION_APPLIER_STOPPED));
return;
}

try {
std::string const typeString =
(coll->type() == TRI_COL_TYPE_EDGE ? "edge" : "document");

if (!_config.isChild()) {
batchExtend();
}

// assemble URL to call
std::string url = baseUrl + "&" + StaticStrings::RevisionTreeResume + "=" +
requestResume.toString();

bool isVPack = false;
auto headers = replutils::createHeaders();
if (_config.leader.version() >= 30900) {
headers[StaticStrings::Accept] = StaticStrings::MimeTypeVPack;
isVPack = true;
}

_config.progress.set(
std::string(
"fetching leader collection revision ranges for collection '") +
coll->name() + "', type: " + typeString + ", format: " +
(isVPack ? "vpack" : "json") + ", id: " + leaderColl + ", url: " + url);

double t = TRI_microtime();

// send request
std::unique_ptr<httpclient::SimpleHttpResult> response;
_config.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->retryRequest(rest::RequestType::PUT, url,
requestPayload.data(),
requestPayload.size(), headers));
});

t = TRI_microtime() - t;

if (replutils::hasFailed(response.get())) {
sharedStatus->gotResponse(
replutils::buildHttpError(response.get(), url, _config.connection),
t);
return;
}

// success!
sharedStatus->gotResponse(std::move(response), t);
} catch (basics::Exception const& ex) {
sharedStatus->gotResponse(Result(ex.code(), ex.what()));
} catch (std::exception const& ex) {
sharedStatus->gotResponse(Result(TRI_ERROR_INTERNAL, ex.what()));
}
}

/// @brief incrementally fetch data from a collection using keys as the primary
/// document identifier
Result DatabaseInitialSyncer::fetchCollectionSyncByRevisions(
Expand Down Expand Up @@ -1641,99 +1711,133 @@ Result DatabaseInitialSyncer::fetchCollectionSyncByRevisions(
ridBuffer));
}
}
std::string request = requestBuilder.slice().toJson();

std::string url = baseUrl + "/" + RestReplicationHandler::Ranges +
"?collection=" + urlEncode(leaderColl) +
"&serverId=" + _state.localServerIdString +
"&batchId=" + std::to_string(_config.batch.id);
auto headers = replutils::createHeaders();
std::unique_ptr<httpclient::SimpleHttpResult> response;
RevisionId requestResume{ranges[0].first}; // start with beginning
RevisionId iterResume = requestResume;
std::size_t chunk = 0;
std::unique_ptr<ReplicationIterator> iter =
physical->getReplicationIterator(
ReplicationIterator::Ordering::Revision, *trx);
if (!iter) {
return Result(TRI_ERROR_INTERNAL, "could not get replication iterator");
}

std::vector<RevisionId> toFetch;
std::vector<RevisionId> toRemove;
const uint64_t documentsFound = treeLocal->count();
RevisionReplicationIterator& local =
*static_cast<RevisionReplicationIterator*>(iter.get());

uint64_t const documentsFound = treeLocal->count();

std::vector<RevisionId> toFetch;
std::vector<RevisionId> toRemove;

std::string const requestPayload = requestBuilder.slice().toJson();
std::string const url = baseUrl + "/" + RestReplicationHandler::Ranges +
"?collection=" + urlEncode(leaderColl) +
"&serverId=" + _state.localServerIdString +
"&batchId=" + std::to_string(_config.batch.id);
RevisionId requestResume{ranges[0].first}; // start with beginning
RevisionId iterResume = requestResume;
std::size_t chunk = 0;

// the shared status will wait in its destructor until all posted
// requests have been completed/canceled!
auto self = shared_from_this();
auto sharedStatus = std::make_shared<Syncer::JobSynchronizer>(self);

// order initial chunk. this will block until the initial response
// has arrived
fetchRevisionsChunk(sharedStatus, url, coll, leaderColl, requestPayload,
requestResume);

// Builder will be recycled
VPackBuilder responseBuilder;

while (requestResume < RevisionId::max()) {
if (isAborted()) {
return Result(TRI_ERROR_REPLICATION_APPLIER_STOPPED);
}
std::unique_ptr<httpclient::SimpleHttpResult> chunkResponse;

if (!_config.isChild()) {
batchExtend();
}
// block until we either got a response or were shut down
Result res = sharedStatus->waitForResponse(chunkResponse);

std::string batchUrl = url + "&" + StaticStrings::RevisionTreeResume +
"=" + requestResume.toString();
std::string msg = "fetching collection revision ranges for collection '" +
coll->name() + "' from " + batchUrl;
_config.progress.set(msg);
double t = TRI_microtime();
_config.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->retryRequest(rest::RequestType::PUT, batchUrl,
request.data(), request.size(),
headers));
});
stats.waitedForKeys += TRI_microtime() - t;
// update our statistics
++stats.numKeysRequests;
stats.waitedForKeys += sharedStatus->time();

if (replutils::hasFailed(response.get())) {
++stats.numFailedConnects;
return replutils::buildHttpError(response.get(), batchUrl,
_config.connection);
if (res.fail()) {
// no response or error or shutdown
return res;
}

if (response->hasContentLength()) {
stats.numSyncBytesReceived += response->getContentLength();
// now we have got a response!
TRI_ASSERT(chunkResponse != nullptr);

if (chunkResponse->hasContentLength()) {
stats.numSyncBytesReceived += chunkResponse->getContentLength();
}

VPackBuilder responseBuilder;
Result r = replutils::parseResponse(responseBuilder, response.get());
if (r.fail()) {
++stats.numFailedConnects;
return Result(
TRI_ERROR_REPLICATION_INVALID_RESPONSE,
concatT("got invalid response from leader at ",
_config.leader.endpoint, batchUrl, ": ", r.errorMessage()));
VPackSlice slice;

if (::isVelocyPack(*chunkResponse)) {
// velocypack body...

// intentional copy of options
VPackOptions validationOptions =
basics::VelocyPackHelper::strictRequestValidationOptions;
VPackValidator validator(&validationOptions);

validator.validate(chunkResponse->getBody().begin(),
chunkResponse->getBody().length(),
/*isSubPart*/ false);

slice = VPackSlice(
reinterpret_cast<uint8_t const*>(chunkResponse->getBody().begin()));
} else {
// JSON body...
// recycle builder
responseBuilder.clear();
Result r =
replutils::parseResponse(responseBuilder, chunkResponse.get());
if (r.fail()) {
++stats.numFailedConnects;
return Result(
TRI_ERROR_REPLICATION_INVALID_RESPONSE,
concatT("got invalid response from leader at ",
_config.leader.endpoint, url, ": ", r.errorMessage()));
}
slice = responseBuilder.slice();
}

VPackSlice const slice = responseBuilder.slice();
if (!slice.isObject()) {
++stats.numFailedConnects;
return Result(TRI_ERROR_REPLICATION_INVALID_RESPONSE,
std::string("got invalid response from leader at ") +
_config.leader.endpoint + batchUrl +
_config.leader.endpoint + url +
": response is not an object");
}

VPackSlice const resumeSlice = slice.get("resume");
VPackSlice resumeSlice = slice.get("resume");
if (!resumeSlice.isNone() && !resumeSlice.isString()) {
++stats.numFailedConnects;
return Result(TRI_ERROR_REPLICATION_INVALID_RESPONSE,
std::string("got invalid response from leader at ") +
_config.leader.endpoint + batchUrl +
_config.leader.endpoint + url +
": response field 'resume' is not a number");
}
requestResume = resumeSlice.isNone() ? RevisionId::max()
: RevisionId::fromSlice(resumeSlice);

VPackSlice const rangesSlice = slice.get("ranges");
if (requestResume < RevisionId::max() && !isAborted()) {
// already fetch next chunk in the background, by posting the
// request to the scheduler, which can run it asynchronously
sharedStatus->request([this, self, url, sharedStatus, coll, leaderColl,
requestResume, &requestPayload]() {
fetchRevisionsChunk(sharedStatus, url, coll, leaderColl,
requestPayload, requestResume);
});
}

VPackSlice rangesSlice = slice.get("ranges");
if (!rangesSlice.isArray()) {
++stats.numFailedConnects;
return Result(TRI_ERROR_REPLICATION_INVALID_RESPONSE,
std::string("got invalid response from leader at ") +
_config.leader.endpoint + batchUrl +
_config.leader.endpoint + url +
": response field 'ranges' is not an array");
}

Expand All @@ -1743,7 +1847,7 @@ Result DatabaseInitialSyncer::fetchCollectionSyncByRevisions(
return Result(
TRI_ERROR_REPLICATION_INVALID_RESPONSE,
std::string("got invalid response from leader at ") +
_config.leader.endpoint + batchUrl +
_config.leader.endpoint + url +
": response field 'ranges' entry is not a revision range");
}
auto& currentRange = ranges[chunk];
Expand Down Expand Up @@ -1811,7 +1915,7 @@ Result DatabaseInitialSyncer::fetchCollectionSyncByRevisions(
}
}

Result res = ::removeRevisions(*trx, *coll, toRemove, stats);
res = ::removeRevisions(*trx, *coll, toRemove, stats);
if (res.fail()) {
return res;
}
Expand Down
6 changes: 6 additions & 0 deletions arangod/Replication/DatabaseInitialSyncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,12 @@ class DatabaseInitialSyncer : public InitialSyncer {
std::string const& leaderColl,
TRI_voc_tick_t);

void fetchRevisionsChunk(
std::shared_ptr<Syncer::JobSynchronizer> sharedStatus,
std::string const& baseUrl, arangodb::LogicalCollection* coll,
std::string const& leaderColl, std::string const& requestPayload,
RevisionId requestResume);

/// @brief incrementally fetch data from a collection using revisions as the
/// primary document identifier, not supported by all engines/collections
// TODO worker safety
Expand Down
Loading

0 comments on commit 4ea2ef4

Please sign in to comment.