Skip to content

Commit

Permalink
Feature/rocksdb hashsearch (arangodb#2713)
Browse files Browse the repository at this point in the history
* Testing hash for documents

* fixing logspam error

* added base support for kHash in the edge index

* Current version of hash enabled edge index

* removed rocksdb key type byte, dynamic prefixes for hash-index

* Fixing issues with key classes

* Fixing merge conflict

* Fixing version with semi-working hash index

* fixing various issues

* Fixing hanger in EdgeIndexIterator nextExta, when _cache is disabled. Reenable cache

* Fixing geo index bounds, and fillIndexes error handling

* Adding back failure point

* Fixing RocksDBKey::vertexId

* Added version check on startup

* Fixing version check

* added one more check

* properly detect column families at startup

* Adding additional sanity checking

* Adding missing column family

* some fixes

* Turned off geoindex sync

* fixed error

* added asserts (untested)

* Adding more asserts
  • Loading branch information
graetzer authored and jsteemann committed Jul 5, 2017
1 parent 3387d79 commit d8c57f2
Show file tree
Hide file tree
Showing 66 changed files with 2,280 additions and 2,221 deletions.
5 changes: 2 additions & 3 deletions arangod/Indexes/Index.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,8 @@ class Index {
arangodb::StringRef const* = nullptr) const;

/// @brief whether or not the index is implicitly unique
/// this can be the case if the index is not declared as unique, but contains
/// a
/// unique attribute such as _key
/// this can be the case if the index is not declared as unique,
/// but contains a unique attribute such as _key
virtual bool implicitlyUnique() const;

virtual size_t memory() const = 0;
Expand Down
1 change: 0 additions & 1 deletion arangod/RocksDBEngine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ set(ROCKSDB_SOURCES
RocksDBEngine/RocksDBKeyBounds.cpp
RocksDBEngine/RocksDBLogValue.cpp
RocksDBEngine/RocksDBMethods.cpp
RocksDBEngine/RocksDBPrefixExtractor.cpp
RocksDBEngine/RocksDBPrimaryIndex.cpp
RocksDBEngine/RocksDBReplicationCommon.cpp
RocksDBEngine/RocksDBReplicationContext.cpp
Expand Down
11 changes: 6 additions & 5 deletions arangod/RocksDBEngine/RocksDBBackgroundThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,18 @@ void RocksDBBackgroundThread::run() {
}
}
});

}
// determine which WAL files can be pruned

// determine which WAL files can be pruned
_engine->determinePrunableWalFiles(minTick);
// and then prune them when they expired
_engine->pruneWalFiles();
} catch (std::exception const& ex) {
LOG_TOPIC(WARN, Logger::FIXME) << "caught exception in rocksdb background thread: " << ex.what();
LOG_TOPIC(WARN, Logger::FIXME)
<< "caught exception in rocksdb background thread: " << ex.what();
} catch (...) {
LOG_TOPIC(WARN, Logger::FIXME) << "caught unknown exception in rocksdb background";
LOG_TOPIC(WARN, Logger::FIXME)
<< "caught unknown exception in rocksdb background";
}
}
_engine->counterManager()->sync(true); // final write on shutdown
Expand Down
119 changes: 61 additions & 58 deletions arangod/RocksDBEngine/RocksDBCollection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,7 @@ void RocksDBCollection::unload() {
TRI_voc_rid_t RocksDBCollection::revision() const { return _revisionId; }

TRI_voc_rid_t RocksDBCollection::revision(transaction::Methods* trx) const {
RocksDBTransactionState* state = toRocksTransactionState(trx);

auto state = RocksDBTransactionState::toState(trx);
auto trxCollection = static_cast<RocksDBTransactionCollection*>(
state->findCollection(_logicalCollection->cid()));
TRI_ASSERT(trxCollection != nullptr);
Expand All @@ -200,8 +199,7 @@ TRI_voc_rid_t RocksDBCollection::revision(transaction::Methods* trx) const {
uint64_t RocksDBCollection::numberDocuments() const { return _numberDocuments; }

uint64_t RocksDBCollection::numberDocuments(transaction::Methods* trx) const {
RocksDBTransactionState* state = toRocksTransactionState(trx);

auto state = RocksDBTransactionState::toState(trx);
auto trxCollection = static_cast<RocksDBTransactionCollection*>(
state->findCollection(_logicalCollection->cid()));
TRI_ASSERT(trxCollection != nullptr);
Expand Down Expand Up @@ -664,51 +662,49 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
// TODO FIXME -- improve transaction size
TRI_ASSERT(_objectId != 0);
TRI_voc_cid_t cid = _logicalCollection->cid();
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
// delete documents
RocksDBMethods* mthd;
mthd = state->rocksdbMethods();
auto state = RocksDBTransactionState::toState(trx);
RocksDBMethods* mthd = state->rocksdbMethods();

// delete documents
RocksDBKeyBounds documentBounds =
RocksDBKeyBounds::CollectionDocuments(this->objectId());

rocksdb::Comparator const* cmp =
RocksDBColumnFamily::documents()->GetComparator();
rocksdb::ReadOptions ro = mthd->readOptions();
rocksdb::Slice const end = documentBounds.end();
ro.iterate_upper_bound = &end;

std::unique_ptr<rocksdb::Iterator> iter =
mthd->NewIterator(ro, RocksDBColumnFamily::documents());
mthd->NewIterator(ro, documentBounds.columnFamily());
iter->Seek(documentBounds.start());

while (iter->Valid() && cmp->Compare(iter->key(), end) < 0) {
TRI_ASSERT(_objectId == RocksDBKey::objectId(iter->key()));

TRI_voc_rid_t revisionId = RocksDBKey::revisionId(iter->key());
TRI_voc_rid_t revId =
RocksDBKey::revisionId(RocksDBEntryType::Document, iter->key());
VPackSlice key =
VPackSlice(iter->value().data()).get(StaticStrings::KeyString);
TRI_ASSERT(key.isString());

blackListKey(iter->key().data(), static_cast<uint32_t>(iter->key().size()));

// add possible log statement
state->prepareOperation(cid, revisionId, StringRef(key),
state->prepareOperation(cid, revId, StringRef(key),
TRI_VOC_DOCUMENT_OPERATION_REMOVE);
Result r = mthd->Delete(RocksDBColumnFamily::documents(), RocksDBKey(iter->key()));
Result r =
mthd->Delete(RocksDBColumnFamily::documents(), RocksDBKey(iter->key()));
if (!r.ok()) {
THROW_ARANGO_EXCEPTION(r);
}
// report size of key
RocksDBOperationResult result =
state->addOperation(cid, revisionId, TRI_VOC_DOCUMENT_OPERATION_REMOVE,
0, iter->key().size());
RocksDBOperationResult result = state->addOperation(
cid, revId, TRI_VOC_DOCUMENT_OPERATION_REMOVE, 0, iter->key().size());

// transaction size limit reached -- fail
if (result.fail()) {
THROW_ARANGO_EXCEPTION(result);
}

iter->Next();
}

Expand Down Expand Up @@ -826,9 +822,9 @@ Result RocksDBCollection::insert(arangodb::transaction::Methods* trx,
TRI_voc_rid_t revisionId =
transaction::helpers::extractRevFromDocument(newSlice);

RocksDBTransactionState* state = toRocksTransactionState(trx);

RocksDBSavePoint guard(rocksutils::toRocksMethods(trx),
auto state = RocksDBTransactionState::toState(trx);
auto mthds = RocksDBTransactionState::toMethods(trx);
RocksDBSavePoint guard(mthds,
trx->isSingleOperationTransaction(),
[&state]() { state->resetLogState(); });

Expand Down Expand Up @@ -915,7 +911,7 @@ Result RocksDBCollection::update(arangodb::transaction::Methods* trx,
mergeObjectsForUpdate(trx, oldDoc, newSlice, isEdgeCollection,
TRI_RidToString(revisionId), options.mergeObjects,
options.keepNull, *builder.get());
RocksDBTransactionState* state = toRocksTransactionState(trx);
auto state = RocksDBTransactionState::toState(trx);
if (state->isDBServer()) {
// Need to check that no sharding keys have changed:
if (arangodb::shardKeysChanged(_logicalCollection->dbName(),
Expand All @@ -928,7 +924,7 @@ Result RocksDBCollection::update(arangodb::transaction::Methods* trx,

VPackSlice const newDoc(builder->slice());

RocksDBSavePoint guard(rocksutils::toRocksMethods(trx),
RocksDBSavePoint guard(RocksDBTransactionState::toMethods(trx),
trx->isSingleOperationTransaction(),
[&state]() { state->resetLogState(); });

Expand Down Expand Up @@ -1008,7 +1004,7 @@ Result RocksDBCollection::replace(
isEdgeCollection, TRI_RidToString(revisionId),
*builder.get());

RocksDBTransactionState* state = toRocksTransactionState(trx);
auto state = RocksDBTransactionState::toState(trx);
if (state->isDBServer()) {
// Need to check that no sharding keys have changed:
if (arangodb::shardKeysChanged(_logicalCollection->dbName(),
Expand All @@ -1019,7 +1015,7 @@ Result RocksDBCollection::replace(
}
}

RocksDBSavePoint guard(rocksutils::toRocksMethods(trx),
RocksDBSavePoint guard(RocksDBTransactionState::toMethods(trx),
trx->isSingleOperationTransaction(),
[&state]() { state->resetLogState(); });

Expand Down Expand Up @@ -1099,8 +1095,8 @@ Result RocksDBCollection::remove(arangodb::transaction::Methods* trx,
}
}

RocksDBTransactionState* state = toRocksTransactionState(trx);
RocksDBSavePoint guard(rocksutils::toRocksMethods(trx),
auto state = RocksDBTransactionState::toState(trx);
RocksDBSavePoint guard(RocksDBTransactionState::toMethods(trx),
trx->isSingleOperationTransaction(),
[&state]() { state->resetLogState(); });

Expand Down Expand Up @@ -1137,7 +1133,11 @@ void RocksDBCollection::figuresSpecific(
rocksdb::Range r(bounds.start(), bounds.end());

uint64_t out = 0;
db->GetApproximateSizes(RocksDBColumnFamily::documents(), &r, 1, &out, static_cast<uint8_t>(rocksdb::DB::SizeApproximationFlags::INCLUDE_MEMTABLES | rocksdb::DB::SizeApproximationFlags::INCLUDE_FILES));
db->GetApproximateSizes(
RocksDBColumnFamily::documents(), &r, 1, &out,
static_cast<uint8_t>(
rocksdb::DB::SizeApproximationFlags::INCLUDE_MEMTABLES |
rocksdb::DB::SizeApproximationFlags::INCLUDE_FILES));

builder->add("documentsSize", VPackValue(out));
}
Expand Down Expand Up @@ -1235,71 +1235,68 @@ arangodb::Result RocksDBCollection::fillIndexes(

ManagedDocumentResult mmdr;
RocksDBIndex* ridx = static_cast<RocksDBIndex*>(added.get());
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
auto state = RocksDBTransactionState::toState(trx);
std::unique_ptr<IndexIterator> it(new RocksDBAllIndexIterator(
_logicalCollection, trx, &mmdr, primaryIndex(), false));

rocksdb::TransactionDB* db = globalRocksDB();
uint64_t numDocsWritten = 0;
// write batch will be reset each 5000 documents
rocksdb::WriteBatchWithIndex batch(db->DefaultColumnFamily()->GetComparator(),
rocksdb::WriteBatchWithIndex batch(ridx->columnFamily()->GetComparator(),
32 * 1024 * 1024);
RocksDBBatchedMethods batched(state, &batch);

int res = TRI_ERROR_NO_ERROR;
arangodb::Result res;
auto cb = [&](ManagedDocumentResult const& mdr) {
if (res == TRI_ERROR_NO_ERROR) {
res = ridx->insertRaw(&batched, mdr.lastRevisionId(),
if (res.ok()) {
res = ridx->insertInternal(trx, &batched, mdr.lastRevisionId(),
VPackSlice(mdr.vpack()));
if (res == TRI_ERROR_NO_ERROR) {
if (res.ok()) {
numDocsWritten++;
}
}
};

rocksdb::WriteOptions writeOpts;
Result r;
bool hasMore = true;
while (hasMore) {
while (hasMore && res.ok()) {
hasMore = it->nextDocument(cb, 250);
if (_logicalCollection->status() == TRI_VOC_COL_STATUS_DELETED ||
_logicalCollection->deleted()) {
res = TRI_ERROR_INTERNAL;
}
if (res != TRI_ERROR_NO_ERROR) {
r = Result(res);
break;
}
rocksdb::Status s = db->Write(writeOpts, batch.GetWriteBatch());
if (!s.ok()) {
r = rocksutils::convertStatus(s, rocksutils::StatusHint::index);
break;
if (res.ok()) {
rocksdb::Status s = db->Write(writeOpts, batch.GetWriteBatch());
if (!s.ok()) {
res = rocksutils::convertStatus(s, rocksutils::StatusHint::index);
break;
}
}
batch.Clear();
}

// we will need to remove index elements created before an error
// occured, this needs to happen since we are non transactional
if (!r.ok()) {
if (!res.ok()) {
it->reset();
batch.Clear();

res = TRI_ERROR_NO_ERROR;
arangodb::Result res2;// do not overwrite original error
auto removeCb = [&](DocumentIdentifierToken token) {
if (res == TRI_ERROR_NO_ERROR && numDocsWritten > 0 &&
if (res2.ok() && numDocsWritten > 0 &&
this->readDocument(trx, token, mmdr)) {
// we need to remove already inserted documents up to numDocsWritten
res = ridx->removeRaw(&batched, mmdr.lastRevisionId(),
res2 = ridx->removeInternal(trx, &batched, mmdr.lastRevisionId(),
VPackSlice(mmdr.vpack()));
if (res == TRI_ERROR_NO_ERROR) {
if (res2.ok()) {
numDocsWritten--;
}
}
};

hasMore = true;
while (hasMore && numDocsWritten > 0) {
hasMore = it->next(removeCb, 5000);
hasMore = it->next(removeCb, 500);
}
// TODO: if this fails, do we have any recourse?
// Simon: Don't think so
Expand All @@ -1310,7 +1307,7 @@ arangodb::Result RocksDBCollection::fillIndexes(
_needToPersistIndexEstimates = true;
}

return r;
return res;
}

// @brief return the primary index
Expand Down Expand Up @@ -1351,7 +1348,7 @@ RocksDBOperationResult RocksDBCollection::insertDocument(

blackListKey(key.string().data(), static_cast<uint32_t>(key.string().size()));

RocksDBMethods* mthd = rocksutils::toRocksMethods(trx);
RocksDBMethods* mthd = RocksDBTransactionState::toMethods(trx);
res = mthd->Put(RocksDBColumnFamily::documents(), key, value.string());
if (!res.ok()) {
// set keysize that is passed up to the crud operations
Expand Down Expand Up @@ -1414,7 +1411,7 @@ RocksDBOperationResult RocksDBCollection::removeDocument(
// document store, if the doc is overwritten with PUT
// Simon: actually we do, because otherwise the counter recovery is broken
// if (!isUpdate) {
RocksDBMethods* mthd = rocksutils::toRocksMethods(trx);
RocksDBMethods* mthd = RocksDBTransactionState::toMethods(trx);
RocksDBOperationResult res =
mthd->Delete(RocksDBColumnFamily::documents(), key);
if (!res.ok()) {
Expand Down Expand Up @@ -1534,7 +1531,7 @@ arangodb::Result RocksDBCollection::lookupRevisionVPack(
}
}

RocksDBMethods* mthd = rocksutils::toRocksMethods(trx);
RocksDBMethods* mthd = RocksDBTransactionState::toMethods(trx);
Result res = mthd->Get(RocksDBColumnFamily::documents(), key, &value);
TRI_ASSERT(value.data());
if (res.ok()) {
Expand Down Expand Up @@ -1691,8 +1688,8 @@ uint64_t RocksDBCollection::recalculateCounts() {

// count documents
auto documentBounds = RocksDBKeyBounds::CollectionDocuments(_objectId);
_numberDocuments = rocksutils::countKeyRange(globalRocksDB(), readOptions,
documentBounds);
_numberDocuments =
rocksutils::countKeyRange(globalRocksDB(), readOptions, documentBounds);

// update counter manager value
res = globalRocksEngine()->counterManager()->setAbsoluteCounter(
Expand Down Expand Up @@ -1728,7 +1725,11 @@ void RocksDBCollection::estimateSize(velocypack::Builder& builder) {
RocksDBKeyBounds bounds = RocksDBKeyBounds::CollectionDocuments(_objectId);
rocksdb::Range r(bounds.start(), bounds.end());
uint64_t out = 0, total = 0;
db->GetApproximateSizes(RocksDBColumnFamily::documents(), &r, 1, &out, static_cast<uint8_t>(rocksdb::DB::SizeApproximationFlags::INCLUDE_MEMTABLES | rocksdb::DB::SizeApproximationFlags::INCLUDE_FILES));
db->GetApproximateSizes(
RocksDBColumnFamily::documents(), &r, 1, &out,
static_cast<uint8_t>(
rocksdb::DB::SizeApproximationFlags::INCLUDE_MEMTABLES |
rocksdb::DB::SizeApproximationFlags::INCLUDE_FILES));
total += out;

builder.openObject();
Expand Down Expand Up @@ -1765,7 +1766,8 @@ arangodb::Result RocksDBCollection::serializeIndexEstimates(
if (output.size() > sizeof(uint64_t)) {
RocksDBKey key = RocksDBKey::IndexEstimateValue(cindex->objectId());
rocksdb::Slice value(output);
rocksdb::Status s = rtrx->Put(key.string(), value);
rocksdb::Status s = rtrx->Put(RocksDBColumnFamily::definitions(),
key.string(), value);

if (!s.ok()) {
LOG_TOPIC(WARN, Logger::ENGINES) << "writing index estimates failed";
Expand Down Expand Up @@ -1840,7 +1842,8 @@ arangodb::Result RocksDBCollection::serializeKeyGenerator(

RocksDBKey key = RocksDBKey::KeyGeneratorValue(_objectId);
RocksDBValue value = RocksDBValue::KeyGeneratorValue(builder.slice());
rocksdb::Status s = rtrx->Put(key.string(), value.string());
rocksdb::Status s = rtrx->Put(RocksDBColumnFamily::definitions(),
key.string(), value.string());

if (!s.ok()) {
LOG_TOPIC(WARN, Logger::ENGINES) << "writing key generator data failed";
Expand Down
Loading

0 comments on commit d8c57f2

Please sign in to comment.