Skip to content

Commit

Permalink
Bug fix/traversal cleanup (arangodb#11149)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsteemann authored Mar 4, 2020
1 parent 34f8aa6 commit ad38472
Show file tree
Hide file tree
Showing 136 changed files with 3,119 additions and 2,717 deletions.
6 changes: 6 additions & 0 deletions 3rdParty/velocypack/include/velocypack/Iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,14 @@ class ArrayIterator : public std::iterator<std::forward_iterator_tag, Slice> {
public:
using iterator_category = std::forward_iterator_tag;

struct Empty {};

ArrayIterator() = delete;

// optimization for an empty array
explicit ArrayIterator(Empty) noexcept
: _slice(Slice::emptyArraySlice()), _size(0), _position(0), _current(nullptr), _first(nullptr) {}

explicit ArrayIterator(Slice slice)
: _slice(slice), _size(0), _position(0), _current(nullptr), _first(nullptr) {

Expand Down
49 changes: 49 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,52 @@
devel
-----

* Improved graph traversal performance via some internal code refactoring:

- Traversal cursors are reused instead of recreated from scratch, if possible.
This can save lots of calls to the memory management subsystem.
- Unnecessary checks have been removed from the cursors, by ensuring some
invariants.
- Each vertex lookup needs to perform slightly less work.

The traversal speedups observed by these changes alone were around 8 to 10% for
single-server traversals and traversals in OneShard setups. Cluster traversals
will also benefit from these changes, but to a lesser extent. This is because the
network roundtrips have a higher share of the total query execution times there.

* Traversal performance can also be improved by not fetching the visited vertices
from the storage engine in case the traversal query does not refer to them.
For example, in the query

FOR v, e, p IN 1..3 OUTBOUND 'collection/startVertex' edges
RETURN e

the vertex variable (`v`) is never accessed, making it unnecessary to fetch the
vertices from storage. If this optimization is applied, the traversal node will be
marked with `/* vertex optimized away */` in the query's execution plan output.

* The existing optimizer rule "move-calculations-down" is now able to also move
unrelated subqueries beyond SORT and LIMIT instructions, which can help avoid the
execution of subqueries for which the results are later discarded.
For example, in the query

FOR doc IN collection1
LET sub1 = FIRST(FOR sub IN collection2 FILTER sub.ref == doc._key RETURN sub)
LET sub2 = FIRST(FOR sub IN collection3 FILTER sub.ref == doc._key RETURN sub)
SORT sub1
LIMIT 10
RETURN { doc, sub1, sub2 }

the execution of the `sub2` subquery can be delayed to after the SORT and LIMIT,
turning it into

FOR doc IN collection1
LET sub1 = FIRST(FOR sub IN collection2 FILTER sub.ref == doc._key RETURN sub)
SORT sub1
LIMIT 10
LET sub2 = FIRST(FOR sub IN collection3 FILTER sub.ref == doc._key RETURN sub)
RETURN { doc, sub1, sub2 }

* Added JSON-Schema (draft-4) document validation. The validation can be
specified by providing the new `validation` collection property when creating a
new collection or when updating the properties of an existing collection:
Expand Down Expand Up @@ -48,6 +94,9 @@ devel
from the named graph are needed, the `edgeCollections` option can be a handy
performance optimization.

* Make arangobench return a proper error message when its initial attempt to
create the test collection fails.

* In some cases with a COLLECT LIMIT situation on a small limit the collect
does more calls to upstream than without a limit to provide the same
result. We improved this situation and made sure that LIMIT does
Expand Down
11 changes: 7 additions & 4 deletions arangod/Aql/AqlItemBlock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,14 @@ void AqlItemBlock::initFromSlice(VPackSlice const slice) {
VPackSlice data = slice.get("data");
VPackSlice raw = slice.get("raw");

VPackArrayIterator rawIterator(raw);

std::vector<AqlValue> madeHere;
madeHere.reserve(static_cast<size_t>(raw.length()));
madeHere.reserve(static_cast<size_t>(rawIterator.size()));
madeHere.emplace_back(); // an empty AqlValue
madeHere.emplace_back(); // another empty AqlValue, indices start w. 2

VPackArrayIterator dataIterator(data);
VPackArrayIterator rawIterator(raw);

auto storeSingleValue = [this](size_t row, RegisterId column, VPackArrayIterator& it,
std::vector<AqlValue>& madeHere) {
Expand Down Expand Up @@ -302,7 +303,8 @@ void AqlItemBlock::destroy() noexcept {
return;
}

for (size_t i = 0; i < numEntries(); i++) {
size_t const n = numEntries();
for (size_t i = 0; i < n; i++) {
auto& it = _data[i];
if (it.requiresDestruction()) {
auto it2 = _valueCount.find(it);
Expand Down Expand Up @@ -354,7 +356,8 @@ void AqlItemBlock::shrink(size_t nrItems) {
// adjust the size of the block
_nrItems = nrItems;

for (size_t i = numEntries(); i < _data.size(); ++i) {
size_t const n = numEntries();
for (size_t i = n; i < _data.size(); ++i) {
AqlValue& a = _data[i];
if (a.requiresDestruction()) {
auto it = _valueCount.find(a);
Expand Down
4 changes: 2 additions & 2 deletions arangod/Aql/BindParameters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ void BindParameters::stripCollectionNames(VPackSlice const& keys,
}

BindParameters::BindParameters()
: _builder(nullptr), _parameters(), _processed(false) {}
: _parameters(), _processed(false) {}

BindParameters::BindParameters(std::shared_ptr<arangodb::velocypack::Builder> builder)
BindParameters::BindParameters(std::shared_ptr<arangodb::velocypack::Builder> builder)
: _builder(std::move(builder)), _parameters(), _processed(false) {}

BindParametersType& BindParameters::get() {
Expand Down
4 changes: 2 additions & 2 deletions arangod/Aql/ExecutionPlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ std::unique_ptr<graph::BaseOptions> createTraversalOptions(aql::Query* query,
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_BAD_PARAMETER,
"uniqueEdges: 'global' is not supported, "
"due to unpredictable results. Use 'path' "
"due to otherwise unpredictable results. Use 'path' "
"or 'none' instead");
}
} else if (name == "edgeCollections") {
Expand All @@ -214,7 +214,7 @@ std::unique_ptr<graph::BaseOptions> createTraversalOptions(aql::Query* query,
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER,
"uniqueVertices: 'global' is only "
"supported, with bfs: true due to "
"unpredictable results.");
"otherwise unpredictable results.");
}

return options;
Expand Down
15 changes: 8 additions & 7 deletions arangod/Aql/IndexExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,19 +423,20 @@ void IndexExecutor::CursorReader::reset() {
if (iterator != nullptr && iterator->canRearm()) {
bool didRearm =
iterator->rearm(_condition, _infos.getOutVariable(), _infos.getOptions());
if (!didRearm) {
if (didRearm) {
_cursor->reset();
} else {
// iterator does not support the condition
// It will not create any results
_cursor->rearm(std::make_unique<EmptyIndexIterator>(iterator->collection(),
_infos.getTrxPtr()));
}
_cursor->reset();
return;
} else {
// We need to build a fresh search and cannot go the rearm shortcut
_cursor->rearm(_infos.getTrxPtr()->indexScanForCondition(_index, _condition,
_infos.getOutVariable(),
_infos.getOptions()));
}
// We need to build a fresh search and cannot go the rearm shortcut
_cursor->rearm(_infos.getTrxPtr()->indexScanForCondition(_index, _condition,
_infos.getOutVariable(),
_infos.getOptions()));
}

IndexExecutor::IndexExecutor(Fetcher& fetcher, Infos& infos)
Expand Down
20 changes: 19 additions & 1 deletion arangod/Aql/OptimizerRules.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5776,6 +5776,7 @@ void arangodb::aql::optimizeTraversalsRule(Optimizer* opt,
// variables from them
for (auto const& n : tNodes) {
TraversalNode* traversal = ExecutionNode::castTo<TraversalNode*>(n);
auto* options = static_cast<arangodb::traverser::TraverserOptions*>(traversal->options());

std::vector<Variable const*> pruneVars;
traversal->getPruneVariables(pruneVars);
Expand All @@ -5797,7 +5798,7 @@ void arangodb::aql::optimizeTraversalsRule(Optimizer* opt,
(!n->isVarUsedLater(outVariable) &&
std::find(pruneVars.begin(), pruneVars.end(), outVariable) == pruneVars.end())) {
// both traversal vertex and path outVariables not used later
traversal->options()->setProduceVertices(false);
options->setProduceVertices(false);
modified = true;
}
}
Expand All @@ -5817,6 +5818,23 @@ void arangodb::aql::optimizeTraversalsRule(Optimizer* opt,
traversal->setPathOutput(nullptr);
modified = true;
}

// check if we can make use of the optimized neighbors enumerator
if (!ServerState::instance()->isCoordinator()) {
if (traversal->vertexOutVariable() != nullptr &&
traversal->edgeOutVariable() == nullptr &&
traversal->pathOutVariable() == nullptr &&
options->useBreadthFirst &&
options->uniqueVertices == arangodb::traverser::TraverserOptions::GLOBAL &&
!options->usesPrune() &&
!options->hasDepthLookupInfo()) {
// this is possible in case *only* vertices are produced (no edges, no path),
// the traversal is breadth-first, the vertex uniqueness level is set to "global",
// there is no pruning and there are no depth-specific filters
options->useNeighbors = true;
modified = true;
}
}
}

if (!tNodes.empty()) {
Expand Down
2 changes: 1 addition & 1 deletion arangod/Aql/SimpleModifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ VPackArrayIterator SimpleModifier<ModifierCompletion, Enable>::getResultsIterato
TRI_ASSERT(_results.hasSlice() && _results.slice().isArray());
return VPackArrayIterator{_results.slice()};
}
return VPackArrayIterator{VPackSlice::emptyArraySlice()};
return VPackArrayIterator(VPackArrayIterator::Empty{});
}

template class ::arangodb::aql::SimpleModifier<InsertModifierCompletion>;
Expand Down
2 changes: 1 addition & 1 deletion arangod/Aql/SimpleModifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class SimpleModifier {
: _infos(infos),
_completion(infos),
_accumulator(nullptr),
_resultsIterator(VPackSlice::emptyArraySlice()),
_resultsIterator(VPackArrayIterator::Empty{}),
_batchSize(ExecutionBlock::DefaultBatchSize) {}
~SimpleModifier() = default;

Expand Down
7 changes: 3 additions & 4 deletions arangod/Aql/SortedCollectExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ static const AqlValue EmptyValue;
SortedCollectExecutor::CollectGroup::CollectGroup(bool count, Infos& infos)
: groupLength(0),
count(count),
_shouldDeleteBuilderBuffer(true),
infos(infos),
_lastInputRow(InputAqlItemRow{CreateInvalidInputRowHint{}}),
_shouldDeleteBuilderBuffer(true) {
_lastInputRow(InputAqlItemRow{CreateInvalidInputRowHint{}}) {
for (auto const& aggName : infos.getAggregateTypes()) {
aggregators.emplace_back(Aggregator::fromTypeString(infos.getTransaction(), aggName));
}
TRI_ASSERT(infos.getAggregatedRegisters().size() == aggregators.size());
};
}

SortedCollectExecutor::CollectGroup::~CollectGroup() {
for (auto& it : groupValues) {
Expand Down Expand Up @@ -301,7 +301,6 @@ std::pair<ExecutionState, NoStats> SortedCollectExecutor::produceRows(OutputAqlI
InputAqlItemRow input{CreateInvalidInputRowHint{}};
_currentGroup.reset(input);
TRI_ASSERT(!_currentGroup.isValid());
return {ExecutionState::DONE, {}};
}
return {ExecutionState::DONE, {}};
}
Expand Down
2 changes: 1 addition & 1 deletion arangod/Aql/SortedCollectExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,10 @@ class SortedCollectExecutor {
AggregateValuesType aggregators;
size_t groupLength;
bool const count;
bool _shouldDeleteBuilderBuffer;
Infos& infos;
InputAqlItemRow _lastInputRow;
arangodb::velocypack::Builder _builder;
bool _shouldDeleteBuilderBuffer;

CollectGroup() = delete;
CollectGroup(CollectGroup&&) = default;
Expand Down
4 changes: 2 additions & 2 deletions arangod/Aql/UpsertModifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,14 @@ VPackArrayIterator UpsertModifier::getUpdateResultsIterator() const {
if (_updateResults.hasSlice() && _updateResults.slice().isArray()) {
return VPackArrayIterator(_updateResults.slice());
}
return VPackArrayIterator(VPackSlice::emptyArraySlice());
return VPackArrayIterator(VPackArrayIterator::Empty{});
}

VPackArrayIterator UpsertModifier::getInsertResultsIterator() const {
if (_insertResults.hasSlice() && _insertResults.slice().isArray()) {
return VPackArrayIterator(_insertResults.slice());
}
return VPackArrayIterator(VPackSlice::emptyArraySlice());
return VPackArrayIterator(VPackArrayIterator::Empty{});
}

Result UpsertModifier::accumulate(InputAqlItemRow& row) {
Expand Down
36 changes: 18 additions & 18 deletions arangod/Cache/BucketState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,18 @@
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Daniel H. Larkin
/// @author Dan Larkin-York
////////////////////////////////////////////////////////////////////////////////

#include <atomic>
#include <cstdint>

#include "Cache/BucketState.h"
#include "Basics/Common.h"
#include "Basics/cpu-relax.h"
#include "Basics/debugging.h"

#include <stdint.h>
#include <atomic>

using namespace arangodb::basics;
using namespace arangodb::cache;
namespace arangodb::cache {

BucketState::BucketState() : _state(0) {}

Expand All @@ -46,17 +45,17 @@ BucketState& BucketState::operator=(BucketState const& other) {
}

bool BucketState::isLocked() const {
return ((_state.load() & static_cast<uint32_t>(Flag::locked)) > 0);
return ((_state.load() & static_cast<std::uint32_t>(Flag::locked)) > 0);
}

bool BucketState::lock(uint64_t maxTries, BucketState::CallbackType cb) {
bool BucketState::lock(std::uint64_t maxTries, BucketState::CallbackType cb) {
uint64_t attempt = 0;
while (attempt < maxTries) {
// expect unlocked, but need to preserve migrating status
uint32_t current = _state.load(std::memory_order_relaxed);
uint32_t expected = current & (~static_cast<uint32_t>(Flag::locked));
std::uint32_t current = _state.load(std::memory_order_relaxed);
std::uint32_t expected = current & (~static_cast<std::uint32_t>(Flag::locked));
if (current == expected) {
uint32_t desired = expected | static_cast<uint32_t>(Flag::locked);
uint32_t desired = expected | static_cast<std::uint32_t>(Flag::locked);
// try to lock
bool success = _state.compare_exchange_strong(expected, desired, std::memory_order_acq_rel,
std::memory_order_relaxed);
Expand All @@ -66,7 +65,7 @@ bool BucketState::lock(uint64_t maxTries, BucketState::CallbackType cb) {
}
}
attempt++;
cpu_relax();
basics::cpu_relax();
// TODO: exponential back-off for failure?
}

Expand All @@ -75,26 +74,27 @@ bool BucketState::lock(uint64_t maxTries, BucketState::CallbackType cb) {

void BucketState::unlock() {
TRI_ASSERT(isLocked());
_state.fetch_and(~static_cast<uint32_t>(Flag::locked), std::memory_order_release);
_state.fetch_and(~static_cast<std::uint32_t>(Flag::locked), std::memory_order_release);
}

bool BucketState::isSet(BucketState::Flag flag) const {
TRI_ASSERT(isLocked());
return ((_state.load() & static_cast<uint32_t>(flag)) > 0);
return ((_state.load() & static_cast<std::uint32_t>(flag)) > 0);
}

bool BucketState::isSet(BucketState::Flag flag1, BucketState::Flag flag2) const {
TRI_ASSERT(isLocked());
return ((_state.load() &
(static_cast<uint32_t>(flag1) | static_cast<uint32_t>(flag2))) > 0);
return ((_state.load() & (static_cast<std::uint32_t>(flag1) |
static_cast<std::uint32_t>(flag2))) > 0);
}

void BucketState::toggleFlag(BucketState::Flag flag) {
TRI_ASSERT(isLocked());
_state ^= static_cast<uint32_t>(flag);
_state ^= static_cast<std::uint32_t>(flag);
}

void BucketState::clear() {
TRI_ASSERT(isLocked());
_state = static_cast<uint32_t>(Flag::locked);
_state = static_cast<std::uint32_t>(Flag::locked);
}
}
Loading

0 comments on commit ad38472

Please sign in to comment.