Skip to content

Commit

Permalink
ApplyBucketsWork now calls AssumeState, cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
SirTyson committed Nov 18, 2022
1 parent 83178f8 commit 8ef962f
Show file tree
Hide file tree
Showing 17 changed files with 142 additions and 153 deletions.
2 changes: 0 additions & 2 deletions src/bucket/Bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ namespace stellar

class Application;
class BucketManager;
class BucketList;
class Database;

class Bucket : public std::enable_shared_from_this<Bucket>,
public NonMovableOrCopyable
Expand Down
52 changes: 31 additions & 21 deletions src/bucket/BucketIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "bucket/BucketIndex.h"
#include "bucket/Bucket.h"
#include "bucket/BucketManager.h"
#include "bucket/LedgerCmp.h"
#include "ledger/LedgerHashUtils.h"
#include "main/Config.h"
Expand Down Expand Up @@ -32,24 +33,24 @@ getDummyPoolShareTrustlineKey(AccountID const& accountID, uint8_t fill)
}

// Index maps a range of BucketEntry's to the associated offset
// within the bucket file. Index stored as two vectors, one stores
// LedgerKey ranges sorted in the same scheme as LedgerEntryCmp, the other
// stores offsets into the bucket file for a given key/ key range. pageSize
// determines how large, in bytes, each range should be. pageSize == 0 indicates
// that individual index.
// within the bucket file. Index stored as vector of pairs:
// First: LedgerKey/Key ranges sorted in the same scheme as LedgerEntryCmp
// Second: offset into the bucket file for a given key/ key range.
// pageSize determines how large, in bytes, each range should be. pageSize == 0
// indicates individual keys used instead of ranges.
template <class IndexT> class BucketIndexImpl : public BucketIndex
{
IndexT mKeysToOffset{};
std::streamoff const mPageSize{};
std::unique_ptr<bloom_filter> mFilter{};

BucketIndexImpl(std::filesystem::path const& filename,
std::streamoff pageSize, std::atomic_bool& exit);
BucketIndexImpl(BucketManager const& bm,
std::filesystem::path const& filename,
std::streamoff pageSize);

friend std::unique_ptr<BucketIndex const>
BucketIndex::createIndex(Config const& cfg,
std::filesystem::path const& filename,
std::atomic_bool& exit);
BucketIndex::createIndex(BucketManager const& bm,
std::filesystem::path const& filename);

public:
BucketIndexImpl() = default;
Expand Down Expand Up @@ -83,9 +84,9 @@ template <class IndexT> class BucketIndexImpl : public BucketIndex
};

template <class IndexT>
BucketIndexImpl<IndexT>::BucketIndexImpl(std::filesystem::path const& filename,
std::streamoff pageSize,
std::atomic_bool& exit)
BucketIndexImpl<IndexT>::BucketIndexImpl(BucketManager const& bm,
std::filesystem::path const& filename,
std::streamoff pageSize)
: mPageSize(pageSize)
{
ZoneScoped;
Expand Down Expand Up @@ -123,11 +124,18 @@ BucketIndexImpl<IndexT>::BucketIndexImpl(std::filesystem::path const& filename,
std::streamoff pos = 0;
std::streamoff pageUpperBound = 0;
BucketEntry be;
size_t iter = 0;
while (in && in.readOne(be))
{
if (exit)
// peridocially check if bucket manager is exiting to stop indexing
// gracefully
if (++iter >= 1000)
{
return;
iter = 0;
if (bm.isShutdown())
{
return;
}
}

if (be.type() != METAENTRY)
Expand All @@ -142,7 +150,9 @@ BucketIndexImpl<IndexT>::BucketIndexImpl(std::filesystem::path const& filename,
}
else
{
mKeysToOffset.back().first.upperBound = key;
auto& rangeEntry = mKeysToOffset.back().first;
releaseAssert(rangeEntry.upperBound < key);
rangeEntry.upperBound = key;
}

mFilter->insert(std::hash<stellar::LedgerKey>()(key));
Expand Down Expand Up @@ -219,11 +229,11 @@ upper_bound_pred(LedgerKey const& key, IndexEntryT const& indexEntry)
}

std::unique_ptr<BucketIndex const>
BucketIndex::createIndex(Config const& cfg,
std::filesystem::path const& filename,
std::atomic_bool& exit)
BucketIndex::createIndex(BucketManager const& bm,
std::filesystem::path const& filename)
{
ZoneScoped;
auto const& cfg = bm.getConfig();
releaseAssertOrThrow(cfg.EXPERIMENTAL_BUCKETLIST_DB);
releaseAssertOrThrow(!filename.empty());

Expand All @@ -240,7 +250,7 @@ BucketIndex::createIndex(Config const& cfg,
"bucket {}",
filename);
return std::unique_ptr<BucketIndexImpl<IndividualIndex> const>(
new BucketIndexImpl<IndividualIndex>(filename, 0, exit));
new BucketIndexImpl<IndividualIndex>(bm, filename, 0));
}
else
{
Expand All @@ -250,7 +260,7 @@ BucketIndex::createIndex(Config const& cfg,
"{} in bucket {}",
pageSize, filename);
return std::unique_ptr<BucketIndexImpl<RangeIndex> const>(
new BucketIndexImpl<RangeIndex>(filename, pageSize, exit));
new BucketIndexImpl<RangeIndex>(bm, filename, pageSize));
}

return {};
Expand Down
5 changes: 2 additions & 3 deletions src/bucket/BucketIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace stellar
* the individual index vs range index.
*/

class Config;
class BucketManager;

// BucketIndex abstract interface
class BucketIndex : public NonMovableOrCopyable
Expand Down Expand Up @@ -63,8 +63,7 @@ class BucketIndex : public NonMovableOrCopyable
// if file size is less than the cutoff, individual key index is used.
// Otherwise range index is used, with the range defined by pageSize.
static std::unique_ptr<BucketIndex const>
createIndex(Config const& cfg, std::filesystem::path const& filename,
std::atomic_bool& exit);
createIndex(BucketManager const& bm, std::filesystem::path const& filename);

virtual ~BucketIndex() = default;

Expand Down
1 change: 0 additions & 1 deletion src/bucket/BucketList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

#include "BucketList.h"
#include "bucket/Bucket.h"
#include "bucket/BucketIndex.h"
#include "bucket/BucketInputIterator.h"
#include "bucket/BucketManager.h"
#include "bucket/LedgerCmp.h"
Expand Down
1 change: 1 addition & 0 deletions src/bucket/BucketManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ namespace stellar

class Application;
class BucketList;
class Config;
class TmpDirManager;
struct LedgerHeader;
struct MergeKey;
Expand Down
1 change: 0 additions & 1 deletion src/bucket/BucketManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

#include "bucket/BucketManagerImpl.h"
#include "bucket/Bucket.h"
#include "bucket/BucketIndex.h"
#include "bucket/BucketInputIterator.h"
#include "bucket/BucketList.h"
#include "bucket/BucketOutputIterator.h"
Expand Down
4 changes: 1 addition & 3 deletions src/bucket/BucketOutputIterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,7 @@ BucketOutputIterator::getBucket(BucketManager& bucketManager,
if (auto b = bucketManager.getBucketIfExists(hash);
!b || !b->isIndexed())
{
std::atomic_bool exit{false};
index = BucketIndex::createIndex(bucketManager.getConfig(),
mFilename, exit);
index = BucketIndex::createIndex(bucketManager, mFilename);
}
}

Expand Down
1 change: 0 additions & 1 deletion src/bucket/FutureBucket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include "util/asio.h"

#include "bucket/Bucket.h"
#include "bucket/BucketIndex.h"
#include "bucket/BucketList.h"
#include "bucket/BucketManager.h"
#include "bucket/FutureBucket.h"
Expand Down
137 changes: 77 additions & 60 deletions src/catchup/ApplyBucketsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "bucket/BucketApplicator.h"
#include "bucket/BucketList.h"
#include "bucket/BucketManager.h"
#include "catchup/AssumeStateWork.h"
#include "catchup/CatchupManager.h"
#include "crypto/Hex.h"
#include "crypto/SecretKey.h"
Expand Down Expand Up @@ -54,7 +55,7 @@ ApplyBucketsWork::ApplyBucketsWork(
std::map<std::string, std::shared_ptr<Bucket>> const& buckets,
HistoryArchiveState const& applyState, uint32_t maxProtocolVersion,
std::function<bool(LedgerEntryType)> onlyApply)
: BasicWork(app, "apply-buckets", BasicWork::RETRY_NEVER)
: Work(app, "apply-buckets", BasicWork::RETRY_NEVER)
, mBuckets(buckets)
, mApplyState(applyState)
, mEntryTypeFilter(onlyApply)
Expand Down Expand Up @@ -93,7 +94,7 @@ ApplyBucketsWork::getBucket(std::string const& hash)
}

void
ApplyBucketsWork::onReset()
ApplyBucketsWork::doReset()
{
ZoneScoped;
CLOG_INFO(History, "Applying buckets");
Expand Down Expand Up @@ -155,6 +156,7 @@ ApplyBucketsWork::onReset()
mLevel = BucketList::kNumLevels - 1;
mApplying = false;
mDelayChecked = false;
mSpawnedAssumeStateWork = false;

mSnapBucket.reset();
mCurrBucket.reset();
Expand Down Expand Up @@ -202,76 +204,86 @@ ApplyBucketsWork::startLevel()
}

BasicWork::State
ApplyBucketsWork::onRun()
ApplyBucketsWork::doWork()
{
ZoneScoped;

if (mApp.getLedgerManager().rebuildingInMemoryState() && !mDelayChecked)
// Step 1: apply buckets. Step 2: assume state
if (!mSpawnedAssumeStateWork)
{
mDelayChecked = true;
auto delay =
mApp.getConfig().ARTIFICIALLY_DELAY_BUCKET_APPLICATION_FOR_TESTING;
if (delay != std::chrono::seconds::zero())
if (mApp.getLedgerManager().rebuildingInMemoryState() && !mDelayChecked)
{
CLOG_INFO(History, "Delay bucket application by {} seconds",
delay.count());
setupWaitingCallback(delay);
return State::WORK_WAITING;
mDelayChecked = true;
auto delay = mApp.getConfig()
.ARTIFICIALLY_DELAY_BUCKET_APPLICATION_FOR_TESTING;
if (delay != std::chrono::seconds::zero())
{
CLOG_INFO(History, "Delay bucket application by {} seconds",
delay.count());
setupWaitingCallback(delay);
return State::WORK_WAITING;
}
}
}

// Check if we're at the beginning of the new level
if (isLevelComplete())
{
startLevel();
}
// Check if we're at the beginning of the new level
if (isLevelComplete())
{
startLevel();
}

// The structure of these if statements is motivated by the following:
// 1. mCurrApplicator should never be advanced if mSnapApplicator is
// not false. Otherwise it is possible for curr to modify the
// database when the invariants for snap are checked.
// 2. There is no reason to advance mSnapApplicator or mCurrApplicator
// if there is nothing to be applied.
if (mSnapApplicator)
{
TempLedgerVersionSetter tlvs(mApp, mMaxProtocolVersion);
if (*mSnapApplicator)
// The structure of these if statements is motivated by the following:
// 1. mCurrApplicator should never be advanced if mSnapApplicator is
// not false. Otherwise it is possible for curr to modify the
// database when the invariants for snap are checked.
// 2. There is no reason to advance mSnapApplicator or mCurrApplicator
// if there is nothing to be applied.
if (mSnapApplicator)
{
advance("snap", *mSnapApplicator);
return State::WORK_RUNNING;
TempLedgerVersionSetter tlvs(mApp, mMaxProtocolVersion);
if (*mSnapApplicator)
{
advance("snap", *mSnapApplicator);
return State::WORK_RUNNING;
}
mApp.getInvariantManager().checkOnBucketApply(
mSnapBucket, mApplyState.currentLedger, mLevel, false,
mEntryTypeFilter);
mSnapApplicator.reset();
mSnapBucket.reset();
mApp.getCatchupManager().bucketsApplied();
}
mApp.getInvariantManager().checkOnBucketApply(
mSnapBucket, mApplyState.currentLedger, mLevel, false,
mEntryTypeFilter);
mSnapApplicator.reset();
mSnapBucket.reset();
mApp.getCatchupManager().bucketsApplied();
}
if (mCurrApplicator)
{
TempLedgerVersionSetter tlvs(mApp, mMaxProtocolVersion);
if (*mCurrApplicator)
if (mCurrApplicator)
{
TempLedgerVersionSetter tlvs(mApp, mMaxProtocolVersion);
if (*mCurrApplicator)
{
advance("curr", *mCurrApplicator);
return State::WORK_RUNNING;
}
mApp.getInvariantManager().checkOnBucketApply(
mCurrBucket, mApplyState.currentLedger, mLevel, true,
mEntryTypeFilter);
mCurrApplicator.reset();
mCurrBucket.reset();
mApp.getCatchupManager().bucketsApplied();
}

if (mLevel != 0)
{
advance("curr", *mCurrApplicator);
--mLevel;
CLOG_DEBUG(History, "ApplyBuckets : starting next level: {}",
mLevel);
return State::WORK_RUNNING;
}
mApp.getInvariantManager().checkOnBucketApply(
mCurrBucket, mApplyState.currentLedger, mLevel, true,
mEntryTypeFilter);
mCurrApplicator.reset();
mCurrBucket.reset();
mApp.getCatchupManager().bucketsApplied();
}

if (mLevel != 0)
{
--mLevel;
CLOG_DEBUG(History, "ApplyBuckets : starting next level: {}", mLevel);
return State::WORK_RUNNING;
CLOG_INFO(History, "ApplyBuckets : done, restarting merges");

// After all buckets applied, spawn assumeState work
addWork<AssumeStateWork>(mApplyState, mMaxProtocolVersion);
mSpawnedAssumeStateWork = true;
}

CLOG_INFO(History, "ApplyBuckets : done, restarting merges");
return State::WORK_SUCCESS;
return checkChildrenStatus();
}

void
Expand Down Expand Up @@ -326,9 +338,14 @@ ApplyBucketsWork::isLevelComplete()
std::string
ApplyBucketsWork::getStatus() const
{
auto size = mTotalSize == 0 ? 0 : (100 * mAppliedSize / mTotalSize);
return fmt::format(
FMT_STRING("Applying buckets {:d}%. Currently on level {:d}"), size,
mLevel);
if (!mSpawnedAssumeStateWork)
{
auto size = mTotalSize == 0 ? 0 : (100 * mAppliedSize / mTotalSize);
return fmt::format(
FMT_STRING("Applying buckets {:d}%. Currently on level {:d}"), size,
mLevel);
}

return Work::getStatus();
}
}
Loading

0 comments on commit 8ef962f

Please sign in to comment.