Skip to content

Commit

Permalink
[GORDO-1571] historic pregel access (arangodb#18366)
Browse files Browse the repository at this point in the history
* started work on extending api command_pregel. Right now reading a single pid tested by collection and api.

* do NOT forward history requests, jslint

* added pregel read history (optional: pid) javascript modules (shell client(js) and shell sherver(v8))

* add getHistory method and deleteHistoyr method to the JS modules (client http and server v8). handled strange internal api error case

* jslint

* ..... js & v8 hate

* ..... jslint

* special handling also in v8 - gut. added placeholder for further tests.

* implemented addition get all history tests

* truncate handling was incorrect. fixed this. some hopefully last adjustments. tests finalized.

* <WIP> trx.all cannot be used like that <WIP>

* for reads, now uses query instead of a transaction as the solution before is not working in a clustered environment.

* handle returned Slices (inside OperationResult) in case they are none). Will return a TrueSlice in that specific case now. Also adjusted js tests.

* some rework on what we actually write. This is still a partial TODO as the solution does not look like how I thought it would be!

* remove not used builder

* proper error message in v8 removeHistory

* remove not needed builder

* added changelog

* build error fix

* Finished sentence.

---------

Co-authored-by: Aditya Mukhopadhyay <aditya@arangodb.com>
  • Loading branch information
hkernbach and Aditya Mukhopadhyay authored Mar 27, 2023
1 parent 80c9b68 commit 08f82c2
Show file tree
Hide file tree
Showing 13 changed files with 874 additions and 191 deletions.
213 changes: 110 additions & 103 deletions CHANGELOG

Large diffs are not rendered by default.

86 changes: 67 additions & 19 deletions arangod/Pregel/Conductor/Conductor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ void Conductor::finishedWorkerStartup(GraphLoaded const& graphLoaded) {
}

_timing.loading.finish();
_graphLoaded = true;
_timing.computation.start();

_feature.metrics()->pregelConductorsLoadingNumber->fetch_sub(1);
Expand Down Expand Up @@ -747,31 +748,78 @@ void Conductor::toVelocyPack(VPackBuilder& result) const {

void Conductor::persistPregelState(ExecutionState state) {
// Persist current pregel state into historic pregel system collection.

statuswriter::CollectionStatusWriter cWriter{_vocbaseGuard.database(),
_specifications.executionNumber};
// Replace this later
// TODO: ongoing <WIP>
// Note: I wanted to just use the toVelocyPack method. This fails because of
// the mutex there. Therefore temporarly, I'll write data that works for now.
// VPackBuilder b;
// toVelocyPack(b);

VPackBuilder debugOut;
debugOut.openObject();
debugOut.add("state", VPackValue(pregel::ExecutionStateNames[_state]));
debugOut.add("stats", VPackValue(VPackValueType::Object));
_statistics.serializeValues(debugOut);
debugOut.close();
_masterContext->_aggregators->serializeValues(debugOut);
debugOut.close();
// Replace this later
VPackBuilder stateBuilder;

auto addMinimalOutputToBuilder = [&](VPackBuilder& stateBuilder) -> void {
TRI_ASSERT(stateBuilder.isOpenObject());
stateBuilder.add(
"id",
VPackValue(std::to_string(_specifications.executionNumber.value)));
stateBuilder.add("database", VPackValue(_vocbaseGuard.database().name()));
if (_algorithm != nullptr) {
stateBuilder.add("algorithm", VPackValue(_algorithm->name()));
}
stateBuilder.add("created", VPackValue(timepointToString(_created)));
stateBuilder.add("state", VPackValue(pregel::ExecutionStateNames[_state]));
stateBuilder.add("graphLoaded", VPackValue(_graphLoaded));
stateBuilder.add("gss", VPackValue(_globalSuperstep));
};
auto addAdditionalOutputToBuilder = [&](VPackBuilder& builder) -> void {
TRI_ASSERT(builder.isOpenObject());
if (_timing.total.hasStarted()) {
builder.add("totalRuntime",
VPackValue(_timing.total.elapsedSeconds().count()));
}
if (_timing.loading.hasStarted()) {
builder.add("startupTime",
VPackValue(_timing.loading.elapsedSeconds().count()));
}
if (_timing.computation.hasStarted()) {
builder.add("computationTime",
VPackValue(_timing.computation.elapsedSeconds().count()));
}
if (_timing.storing.hasStarted()) {
builder.add("storageTime",
VPackValue(_timing.storing.elapsedSeconds().count()));
}
{
builder.add(VPackValue("gssTimes"));
VPackArrayBuilder array(&builder);
for (auto const& gssTime : _timing.gss) {
builder.add(VPackValue(gssTime.elapsedSeconds().count()));
}
}
_statistics.serializeValues(builder);
};

if (_state == ExecutionState::DONE) {
// TODO: What I wanted to do here is to just use the already available
// toVelocyPack() method. This fails currently because of the lock:
// "[void arangodb::Mutex::lock()]: _holder != Thread::currentThreadId()"
// Therefore, for now - do it manually. Let's clean this up ASAP.
// this->toVelocyPack(stateBuilder);
// After this works, we can remove all of that code below (same scope).
// Including those lambda helper methods.

stateBuilder.openObject(); // opens main builder
addMinimalOutputToBuilder(stateBuilder);
addAdditionalOutputToBuilder(stateBuilder);
stateBuilder.close(); // closes main builder
} else {
// minimalistic update during runs or errors (cancel, fatal)
// TODO: We should introduce an inspector here as well.
stateBuilder.openObject(); // opens main builder
addMinimalOutputToBuilder(stateBuilder);
stateBuilder.close(); // closes main builder
}

TRI_ASSERT(state != ExecutionState::DEFAULT);
if (state == ExecutionState::LOADING) {
// During state LOADING we need to initially create the document in the
// collection
auto storeResult = cWriter.createResult(debugOut.slice());
auto storeResult = cWriter.createResult(stateBuilder.slice());
if (storeResult.ok()) {
LOG_PREGEL("063x1", INFO)
<< "Stored result into: \"" << StaticStrings::PregelCollection
Expand All @@ -785,7 +833,7 @@ void Conductor::persistPregelState(ExecutionState state) {
} else {
// During all other states, we will just simply update the already created
// document
auto updateResult = cWriter.updateResult(debugOut.slice());
auto updateResult = cWriter.updateResult(stateBuilder.slice());
if (updateResult.ok()) {
LOG_PREGEL("063x3", INFO)
<< "Updated state into: \"" << StaticStrings::PregelCollection
Expand Down
10 changes: 6 additions & 4 deletions arangod/Pregel/Conductor/Conductor.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ enum ExecutionState {
LOADING, // load graph into memory
RUNNING, // during normal operation
STORING, // store results
DONE, // after everyting is done
CANCELED, // after an terminal error or manual canceling
DONE, // after everything is done
CANCELED, // after a terminal error or manual canceling
FATAL_ERROR, // execution can not continue because of errors
};
extern const char* ExecutionStateNames[9];
Expand Down Expand Up @@ -93,11 +93,13 @@ class Conductor : public std::enable_shared_from_this<Conductor> {
std::chrono::system_clock::time_point _created;
std::chrono::system_clock::time_point _expires;
ExecutionTimings _timing;
/// Variable to identify whether the graph has been fully loaded.
bool _graphLoaded{false};
/// Variable to identify whether ArangoDB is in shutdown mode.
bool _shutdown{false};
// Work in Progress: Move data incrementally into this
// struct; sort it into categories and make it (de)serialisable
// with the Inspecotr framework
// struct; sort it into categories and make it (de)serializable
// with the Inspector framework
ConductorStatus _status;

bool _startGlobalStep();
Expand Down
37 changes: 37 additions & 0 deletions arangod/Pregel/PregelFeature.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@
#include "Pregel/PregelOptions.h"
#include "Pregel/ResultActor.h"
#include "Pregel/SpawnActor.h"
#include "Pregel/StatusWriter/CollectionStatusWriter.h"
#include "Pregel/Utils.h"
#include "Pregel/Worker/Messages.h"
#include "Pregel/Worker/Worker.h"
#include "Rest/CommonDefines.h"
#include "RestServer/DatabasePathFeature.h"
#include "Scheduler/Scheduler.h"
#include "Scheduler/SchedulerFeature.h"
Expand Down Expand Up @@ -998,6 +1000,41 @@ void PregelFeature::handleWorkerRequest(TRI_vocbase_t& vocbase,
}
}

ResultT<OperationResult> PregelFeature::handleHistoryRequest(
TRI_vocbase_t& vocbase, arangodb::rest::RequestType requestType,
std::optional<ExecutionNumber> executionNumber) {
if (isStopping()) {
// shutdown ongoing
return {Result(TRI_ERROR_SHUTTING_DOWN)};
}

if (requestType == rest::RequestType::GET) {
if (executionNumber.has_value()) {
// read a single result
statuswriter::CollectionStatusWriter cWriter{vocbase,
executionNumber.value()};
return cWriter.readResult();
} else {
// read all results
statuswriter::CollectionStatusWriter cWriter{vocbase};
return cWriter.readAllResults();
}
} else if (requestType == rest::RequestType::DELETE_REQ) {
if (executionNumber.has_value()) {
// delete a single result
statuswriter::CollectionStatusWriter cWriter{vocbase,
executionNumber.value()};
return cWriter.deleteResult();
} else {
// delete all results
statuswriter::CollectionStatusWriter cWriter{vocbase};
return cWriter.deleteAllResults();
}
}

return {Result(TRI_ERROR_HTTP_METHOD_NOT_ALLOWED)};
}

uint64_t PregelFeature::numberOfActiveConductors() const {
MUTEX_LOCKER(guard, _mutex);
uint64_t nr{0};
Expand Down
10 changes: 10 additions & 0 deletions arangod/Pregel/PregelFeature.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@

struct TRI_vocbase_t;

namespace arangodb {
struct OperationResult;
namespace rest {
enum class RequestType;
}
} // namespace arangodb

namespace arangodb::pregel {

struct PregelScheduler {
Expand Down Expand Up @@ -101,6 +108,9 @@ class PregelFeature final : public ArangodFeature {
VPackBuilder& outResponse);
void handleWorkerRequest(TRI_vocbase_t& vocbase, std::string const& path,
VPackSlice const& body, VPackBuilder& outBuilder);
ResultT<OperationResult> handleHistoryRequest(
TRI_vocbase_t& vocbase, arangodb::rest::RequestType requestType,
std::optional<ExecutionNumber> executionNumber);

uint64_t numberOfActiveConductors() const;

Expand Down
112 changes: 94 additions & 18 deletions arangod/Pregel/REST/RestControlPregelHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ RestStatus RestControlPregelHandler::execute() {
break;
}
case rest::RequestType::GET: {
getExecutionStatus();
handleGetRequest();
break;
}
case rest::RequestType::DELETE_REQ: {
cancelExecution();
handleDeleteRequest();
break;
}
default: {
Expand Down Expand Up @@ -93,6 +93,13 @@ RestControlPregelHandler::forwardingTarget() {
return {std::make_pair(StaticStrings::Empty, false)};
}

// Do NOT forward requests to any other arangod instance in case we're
// requesting the history API. Any coordinator is able to handle this
// request.
if (suffixes.size() >= 1 && suffixes.at(0) == "history") {
return {std::make_pair(StaticStrings::Empty, false)};
}

uint64_t tick = arangodb::basics::StringUtils::uint64(suffixes[0]);
uint32_t sourceServer = TRI_ExtractServerIdFromTick(tick);

Expand Down Expand Up @@ -138,7 +145,7 @@ void RestControlPregelHandler::startExecution() {
generateResult(rest::ResponseCode::OK, builder.slice());
}

void RestControlPregelHandler::getExecutionStatus() {
void RestControlPregelHandler::handleGetRequest() {
std::vector<std::string> const& suffixes = _request->decodedSuffixes();

if (suffixes.empty()) {
Expand All @@ -152,33 +159,102 @@ void RestControlPregelHandler::getExecutionStatus() {
return;
}

if (suffixes.size() != 1 || suffixes[0].empty()) {
generateError(
rest::ResponseCode::BAD, TRI_ERROR_HTTP_SUPERFLUOUS_SUFFICES,
"superfluous parameter, expecting /_api/control_pregel[/<id>]");
if (suffixes.size() == 1 && suffixes.at(0) != "history") {
if (suffixes[0].empty()) {
generateError(
rest::ResponseCode::BAD, TRI_ERROR_HTTP_SUPERFLUOUS_SUFFICES,
"superfluous parameter, expecting /_api/control_pregel[/<id>]");
return;
}
auto executionNumber = arangodb::pregel::ExecutionNumber{
arangodb::basics::StringUtils::uint64(suffixes[0])};
auto c = _pregel.conductor(executionNumber);

if (nullptr == c) {
generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_CURSOR_NOT_FOUND,
"Execution number is invalid");
return;
}

VPackBuilder builder;
c->toVelocyPack(builder);
generateResult(rest::ResponseCode::OK, builder.slice());
return;
} else if ((suffixes.size() >= 1 || suffixes.size() <= 2) &&
suffixes.at(0) == "history") {
if (suffixes.size() == 1) {
return handlePregelHistoryResult(_pregel.handleHistoryRequest(
_vocbase, _request->requestType(), std::nullopt));
} else {
auto executionNumber = arangodb::pregel::ExecutionNumber{
arangodb::basics::StringUtils::uint64(suffixes.at(1))};
return handlePregelHistoryResult(_pregel.handleHistoryRequest(
_vocbase, _request->requestType(), executionNumber));
}
}

auto executionNumber = arangodb::pregel::ExecutionNumber{
arangodb::basics::StringUtils::uint64(suffixes[0])};
auto c = _pregel.conductor(executionNumber);
generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND,
"expecting one of the resources /_api/control_pregel[/<id>] or "
"/_api/control_pregel/history[/<id>]");
}

if (nullptr == c) {
generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_CURSOR_NOT_FOUND,
"Execution number is invalid");
void RestControlPregelHandler::handlePregelHistoryResult(
ResultT<OperationResult> result) {
if (result.fail()) {
// check outer ResultT result
generateError(rest::ResponseCode::BAD, result.errorNumber(),
result.errorMessage());
return;
}
if (result.get().fail()) {
// check inner OperationResult
std::string message = std::string{result.get().errorMessage()};
if (result.get().errorNumber() == TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND) {
// For reasons, not all OperationResults deliver the expected message.
// Therefore, we need set up the message properly and manually here.
message = Result(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND).errorMessage();
}

VPackBuilder builder;
c->toVelocyPack(builder);
generateResult(rest::ResponseCode::OK, builder.slice());
ResponseCode code =
GeneralResponse::responseCode(result.get().errorNumber());
generateError(code, result.get().errorNumber(), message);
return;
}

if (result->hasSlice()) {
if (result->slice().isNone()) {
// Truncate does not deliver a proper slice in a Cluster.
generateResult(rest::ResponseCode::OK, VPackSlice::trueSlice());
} else {
generateResult(rest::ResponseCode::OK, result.get().slice());
}
} else {
// Should always have a Slice, doing this check to be sure.
// (e.g. a truncate might not return a Slice in SingleServer)
generateResult(rest::ResponseCode::OK, VPackSlice::trueSlice());
}
}

void RestControlPregelHandler::cancelExecution() {
void RestControlPregelHandler::handleDeleteRequest() {
std::vector<std::string> const& suffixes = _request->decodedSuffixes();

if ((suffixes.size() >= 1 || suffixes.size() <= 2) &&
suffixes.at(0) == "history") {
if (suffixes.size() == 1) {
return handlePregelHistoryResult(_pregel.handleHistoryRequest(
_vocbase, _request->requestType(), std::nullopt));
} else {
auto executionNumber = arangodb::pregel::ExecutionNumber{
arangodb::basics::StringUtils::uint64(suffixes.at(1))};
return handlePregelHistoryResult(_pregel.handleHistoryRequest(
_vocbase, _request->requestType(), executionNumber));
}
}

if ((suffixes.size() != 1) || suffixes[0].empty()) {
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_SUPERFLUOUS_SUFFICES,
"bad parameter, expecting /_api/control_pregel/<id>");
"bad parameter, expecting /_api/control_pregel/<id> or "
"/_api/control_pregel/history[/<id>]");
return;
}

Expand Down
14 changes: 12 additions & 2 deletions arangod/Pregel/REST/RestControlPregelHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,18 @@ class RestControlPregelHandler : public arangodb::RestVocbaseBaseHandler {

private:
void startExecution();
void getExecutionStatus();
void cancelExecution();

// Handled GET requests for APIs:
// - /_api/control_pregel[/<id>]
// - /_api/control_pregel/history[/<id>]
void handleGetRequest();

// Handled DELETE requests for APIs:
// - /_api/control_pregel[/<id>]
// - /_api/control_pregel/history[/<id>]
void handleDeleteRequest();

void handlePregelHistoryResult(ResultT<OperationResult> opResult);

pregel::PregelFeature& _pregel;
};
Expand Down
Loading

0 comments on commit 08f82c2

Please sign in to comment.