Skip to content

Commit

Permalink
Merge pull request #2309 from marta-lokhova/merge_key_race
Browse files Browse the repository at this point in the history
Fix multiple keys referencing the same merge

Reviewed-by: MonsieurNicolas
  • Loading branch information
latobarita authored Oct 11, 2019
2 parents 0516bed + 7fecb9a commit 80f5998
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 107 deletions.
3 changes: 1 addition & 2 deletions src/bucket/Bucket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -632,8 +632,7 @@ Bucket::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
{
bucketManager.incrMergeCounters(mc);
}
MergeKey mk{maxProtocolVersion, keepDeadEntries, oldBucket, newBucket,
shadows};
MergeKey mk{keepDeadEntries, oldBucket, newBucket, shadows};
return out.getBucket(bucketManager, &mk);
}

Expand Down
4 changes: 4 additions & 0 deletions src/bucket/BucketTests.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ void for_versions_with_differing_bucket_logic(
stellar::Config const& cfg,
std::function<void(stellar::Config const&)> const& f);

void for_versions_with_differing_initentry_logic(
stellar::Config const& cfg,
std::function<void(stellar::Config const&)> const& f);

struct EntryCounts
{
size_t nMeta{0};
Expand Down
3 changes: 1 addition & 2 deletions src/bucket/FutureBucket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,7 @@ FutureBucket::startMerge(Application& app, uint32_t maxProtocolVersion,
// deserialized. In this case we want to attach to the existing merge, which
// will have left a std::shared_future behind in a shared cache in the
// bucket manager.
MergeKey mk{maxProtocolVersion, BucketList::keepDeadEntries(level), curr,
snap, shadows};
MergeKey mk{BucketList::keepDeadEntries(level), curr, snap, shadows};
auto f = bm.getMergeFuture(mk);
if (f.valid())
{
Expand Down
21 changes: 4 additions & 17 deletions src/bucket/MergeKey.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@
namespace stellar
{

MergeKey::MergeKey(uint32_t maxProtocolVersion, bool keepDeadEntries,
MergeKey::MergeKey(bool keepDeadEntries,
std::shared_ptr<Bucket> const& inputCurr,
std::shared_ptr<Bucket> const& inputSnap,
std::vector<std::shared_ptr<Bucket>> const& inputShadows)
: mMaxProtocolVersion(maxProtocolVersion)
, mKeepDeadEntries(keepDeadEntries)
: mKeepDeadEntries(keepDeadEntries)
, mInputCurrBucket(inputCurr->getHash())
, mInputSnapBucket(inputSnap->getHash())
{
Expand All @@ -25,22 +24,10 @@ MergeKey::MergeKey(uint32_t maxProtocolVersion, bool keepDeadEntries,
}
}

MergeKey::MergeKey(uint32_t maxProtocolVersion, bool keepDeadEntries,
Hash& inputCurr, Hash& inputSnap,
std::vector<Hash> const& inputShadows)
: mMaxProtocolVersion(maxProtocolVersion)
, mKeepDeadEntries(keepDeadEntries)
, mInputCurrBucket(inputCurr)
, mInputSnapBucket(inputSnap)
, mInputShadowBuckets(inputShadows)
{
}

bool
MergeKey::operator==(MergeKey const& other) const
{
return mMaxProtocolVersion == other.mMaxProtocolVersion &&
mKeepDeadEntries == other.mKeepDeadEntries &&
return mKeepDeadEntries == other.mKeepDeadEntries &&
mInputCurrBucket == other.mInputCurrBucket &&
mInputSnapBucket == other.mInputSnapBucket &&
mInputShadowBuckets == other.mInputShadowBuckets;
Expand Down Expand Up @@ -72,7 +59,7 @@ size_t
hash<stellar::MergeKey>::operator()(stellar::MergeKey const& key) const noexcept
{
std::ostringstream oss;
oss << key.mMaxProtocolVersion << ',' << key.mKeepDeadEntries << ','
oss << key.mKeepDeadEntries << ','
<< stellar::binToHex(key.mInputCurrBucket) << ','
<< stellar::binToHex(key.mInputSnapBucket);
for (auto const& e : key.mInputShadowBuckets)
Expand Down
9 changes: 2 additions & 7 deletions src/bucket/MergeKey.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,14 @@ namespace stellar
// Key type for cache of merges-in-progress. These only exist to enable
// re-attaching a deserialized FutureBucket to a std::shared_future, or (if the
// merge is finished and has been promoted to a live bucket) to identify which
// _output_ was produced from a given set of _inptus_ so we can recreate a
// _output_ was produced from a given set of _inputs_ so we can recreate a
// pre-resolved std::shared_future containing that output.
struct MergeKey
{
MergeKey(uint32_t maxProtocolVersion, bool keepDeadEntries,
std::shared_ptr<Bucket> const& inputCurr,
MergeKey(bool keepDeadEntries, std::shared_ptr<Bucket> const& inputCurr,
std::shared_ptr<Bucket> const& inputSnap,
std::vector<std::shared_ptr<Bucket>> const& inputShadows);

MergeKey(uint32_t maxProtocolVersion, bool keepDeadEntries, Hash& inputCurr,
Hash& inputSnap, std::vector<Hash> const& inputShadows);

uint32_t mMaxProtocolVersion;
bool mKeepDeadEntries;
Hash mInputCurrBucket;
Hash mInputSnapBucket;
Expand Down
32 changes: 17 additions & 15 deletions src/bucket/test/BucketManagerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ TEST_CASE("bucketmanager reattach HAS from publish queue to finished merge",
stellar::historytestutils::TmpDirHistoryConfigurator tcfg;
cfg = tcfg.configure(cfg, true);

{
for_versions_with_differing_bucket_logic(cfg, [&](Config const& cfg) {
VirtualClock clock;
Application::pointer app = createTestApplication(clock, cfg);
auto vers = getAppLedgerVersion(app);
Expand Down Expand Up @@ -557,22 +557,24 @@ TEST_CASE("bucketmanager reattach HAS from publish queue to finished merge",
REQUIRE(HASs.size() == 5);
for (auto& has : HASs)
{
for (uint32_t level = 0; level < BucketList::kNumLevels; ++level)
{
if (has.currentBuckets[level].next.hasHashes())
{
has.currentBuckets[level].next.makeLive(
*app, vers, BucketList::keepDeadEntries(level));
}
}
has.prepareForPublish(*app);
}

// This is the key check of the test: re-enabling the merges worked
// and caused a bunch of finished-merge reattachments.
auto ra = bm.readMergeCounters().mFinishedMergeReattachments;
REQUIRE(ra != oldReattachments);
CLOG(INFO, "Bucket")
<< "finished-merge reattachments after making-live: " << ra;
if (vers < Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED)
{
// Versions prior to FIRST_PROTOCOL_SHADOWS_REMOVED re-attach to
// finished merges
REQUIRE(ra > oldReattachments);
CLOG(INFO, "Bucket")
<< "finished-merge reattachments after making-live: " << ra;
}
else
{
// Versions after FIRST_PROTOCOL_SHADOWS_REMOVED do not re-attach,
// because merges are cleared
REQUIRE(ra == oldReattachments);
}

// Un-cork the publication process, nothing should be broken.
hm.setPublicationEnabled(true);
Expand All @@ -584,7 +586,7 @@ TEST_CASE("bucketmanager reattach HAS from publish queue to finished merge",
ExternalQueue ps(*app);
ps.deleteOldEntries(50000);
}
}
});
}

// Running one of these tests involves comparing three timelines with different
Expand Down
137 changes: 79 additions & 58 deletions src/bucket/test/BucketMergeMapTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,108 +3,129 @@
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#include "bucket/BucketMergeMap.h"
#include "crypto/SecretKey.h"
#include "bucket/BucketTests.h"
#include "ledger/test/LedgerTestUtils.h"
#include "lib/catch.hpp"
#include "main/Application.h"
#include "test/TestUtils.h"
#include "test/test.h"

using namespace stellar;

TEST_CASE("bucket merge map", "[bucket][bucketmergemap]")
{
Hash in1a = HashUtils::random();
Hash in1b = HashUtils::random();
Hash in1c = HashUtils::random();

Hash in2a = HashUtils::random();
Hash in2b = HashUtils::random();
Hash in2c = HashUtils::random();

Hash in3a = HashUtils::random();
Hash in3b = HashUtils::random();
Hash in3c = HashUtils::random();
Hash in3d = HashUtils::random();

Hash in4a = HashUtils::random();
Hash in4b = HashUtils::random();

Hash in5a = HashUtils::random();
Hash in5b = HashUtils::random();

Hash in6a = HashUtils::random();
Hash in6b = HashUtils::random();

Hash out1 = HashUtils::random();
Hash out2 = HashUtils::random();
Hash out4 = HashUtils::random();
Hash out6 = HashUtils::random();
Config cfg(getTestConfig());
VirtualClock clock;
Application::pointer app = createTestApplication(clock, cfg);

auto getValidBucket = [&](int numEntries = 10) {
std::vector<LedgerEntry> live(numEntries);
for (auto& e : live)
{
e = LedgerTestUtils::generateValidLedgerEntry(3);
}
std::shared_ptr<Bucket> b1 =
Bucket::fresh(app->getBucketManager(),
BucketTests::getAppLedgerVersion(app), {}, live, {},
/*countMergeEvents=*/true, /*doFsync=*/true);
return b1;
};

std::shared_ptr<Bucket> in1a = getValidBucket();
std::shared_ptr<Bucket> in1b = getValidBucket();
std::shared_ptr<Bucket> in1c = getValidBucket();

std::shared_ptr<Bucket> in2a = getValidBucket();
std::shared_ptr<Bucket> in2b = getValidBucket();
std::shared_ptr<Bucket> in2c = getValidBucket();

std::shared_ptr<Bucket> in3a = getValidBucket();
std::shared_ptr<Bucket> in3b = getValidBucket();
std::shared_ptr<Bucket> in3c = getValidBucket();
std::shared_ptr<Bucket> in3d = getValidBucket();

std::shared_ptr<Bucket> in4a = getValidBucket();
std::shared_ptr<Bucket> in4b = getValidBucket();

std::shared_ptr<Bucket> in5a = getValidBucket();
std::shared_ptr<Bucket> in5b = getValidBucket();

std::shared_ptr<Bucket> in6a = getValidBucket();
std::shared_ptr<Bucket> in6b = getValidBucket();

std::shared_ptr<Bucket> out1 = getValidBucket();
std::shared_ptr<Bucket> out2 = getValidBucket();
std::shared_ptr<Bucket> out4 = getValidBucket();
std::shared_ptr<Bucket> out6 = getValidBucket();

BucketMergeMap bmm;

MergeKey m1{1, true, in1a, in1b, {in1c}};
MergeKey m2{1, true, in2a, in2b, {in2c}};
MergeKey m3{1, true, in3a, in3b, {in3c, in3d}};
MergeKey m4{1, true, in4a, in4b, {}};
MergeKey m5{1, true, in5a, in5b, {}};
MergeKey m6{1, true, in6a, in6b, {in1a}};
MergeKey m1{true, in1a, in1b, {in1c}};
MergeKey m2{true, in2a, in2b, {in2c}};
MergeKey m3{true, in3a, in3b, {in3c, in3d}};
MergeKey m4{true, in4a, in4b, {}};
MergeKey m5{true, in5a, in5b, {}};
MergeKey m6{true, in6a, in6b, {in1a}};

bmm.recordMerge(m1, out1);
bmm.recordMerge(m2, out2);
bmm.recordMerge(m1, out1->getHash());
bmm.recordMerge(m2, out2->getHash());
// m3 produces same as m2
bmm.recordMerge(m3, out2);
bmm.recordMerge(m4, out4);
bmm.recordMerge(m3, out2->getHash());
bmm.recordMerge(m4, out4->getHash());
// m5 isn't recorded
// m6 reuses an input from m1
bmm.recordMerge(m6, out6);
bmm.recordMerge(m6, out6->getHash());

Hash t;
REQUIRE(bmm.findMergeFor(m1, t));
REQUIRE(t == out1);
REQUIRE(t == out1->getHash());
REQUIRE(bmm.findMergeFor(m2, t));
REQUIRE(t == out2);
REQUIRE(t == out2->getHash());
REQUIRE(bmm.findMergeFor(m3, t));
REQUIRE(t == out2);
REQUIRE(t == out2->getHash());
REQUIRE(bmm.findMergeFor(m4, t));
REQUIRE(t == out4);
REQUIRE(t == out4->getHash());
REQUIRE(!bmm.findMergeFor(m5, t));
REQUIRE(bmm.findMergeFor(m6, t));
REQUIRE(t == out6);
REQUIRE(t == out6->getHash());

std::set<Hash> outs;
bmm.getOutputsUsingInput(in1a, outs);
REQUIRE(outs == std::set<Hash>{out1, out6});
bmm.getOutputsUsingInput(in1a->getHash(), outs);
REQUIRE(outs == std::set<Hash>{out1->getHash(), out6->getHash()});
outs.clear();
bmm.getOutputsUsingInput(in1b, outs);
REQUIRE(outs == std::set<Hash>{out1});
bmm.getOutputsUsingInput(in1b->getHash(), outs);
REQUIRE(outs == std::set<Hash>{out1->getHash()});
outs.clear();
bmm.getOutputsUsingInput(in1c, outs);
REQUIRE(outs == std::set<Hash>{out1});
bmm.getOutputsUsingInput(in1c->getHash(), outs);
REQUIRE(outs == std::set<Hash>{out1->getHash()});

REQUIRE(bmm.forgetAllMergesProducing(out1) ==
REQUIRE(bmm.forgetAllMergesProducing(out1->getHash()) ==
std::unordered_set<MergeKey>{m1});
REQUIRE(!bmm.findMergeFor(m1, t));
outs.clear();
bmm.getOutputsUsingInput(in1a, outs);
REQUIRE(outs == std::set<Hash>{out6});
bmm.getOutputsUsingInput(in1a->getHash(), outs);
REQUIRE(outs == std::set<Hash>{out6->getHash()});

REQUIRE(bmm.forgetAllMergesProducing(out2) ==
REQUIRE(bmm.forgetAllMergesProducing(out2->getHash()) ==
std::unordered_set<MergeKey>{m2, m3});
REQUIRE(!bmm.findMergeFor(m2, t));
REQUIRE(!bmm.findMergeFor(m3, t));

REQUIRE(bmm.forgetAllMergesProducing(out4) ==
REQUIRE(bmm.forgetAllMergesProducing(out4->getHash()) ==
std::unordered_set<MergeKey>{m4});
REQUIRE(!bmm.findMergeFor(m4, t));

REQUIRE(bmm.forgetAllMergesProducing(out6) ==
REQUIRE(bmm.forgetAllMergesProducing(out6->getHash()) ==
std::unordered_set<MergeKey>{m6});
REQUIRE(!bmm.findMergeFor(m6, t));
outs.clear();
bmm.getOutputsUsingInput(in6a, outs);
bmm.getOutputsUsingInput(in6a->getHash(), outs);
REQUIRE(outs == std::set<Hash>{});
outs.clear();
bmm.getOutputsUsingInput(in1a, outs);
bmm.getOutputsUsingInput(in1a->getHash(), outs);
REQUIRE(outs == std::set<Hash>{});

// Second forget produces empty set.
REQUIRE(bmm.forgetAllMergesProducing(out1) ==
REQUIRE(bmm.forgetAllMergesProducing(out1->getHash()) ==
std::unordered_set<MergeKey>{});
}
13 changes: 7 additions & 6 deletions src/herder/test/UpgradesTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1487,12 +1487,13 @@ TEST_CASE("upgrade to version 11", "[upgrades]")
// Check several subtle characteristics of the post-upgrade
// environment:
// - Old-protocol merges stop happening (there should have
// been 6 before the upgrade, and we stop there.)
// been 6 before the upgrade, but we re-use a merge we did at
// ledger 1 for ledger 2 spill, so the counter is at 5)
// - New-protocol merges start happening.
// - At the upgrade (5), we find 1 INITENTRY in lev[0].curr
// - The next two (6, 7), propagate INITENTRYs to lev[0].snap
// - From 8 on, the INITENTRYs propagate to lev[1].curr
REQUIRE(mc.mPreInitEntryProtocolMerges == 6);
REQUIRE(mc.mPreInitEntryProtocolMerges == 5);
REQUIRE(mc.mPostInitEntryProtocolMerges != 0);
auto& lev0 = bm.getBucketList().getLevel(0);
auto& lev1 = bm.getBucketList().getLevel(1);
Expand Down Expand Up @@ -1601,25 +1602,25 @@ TEST_CASE("upgrade to version 12", "[upgrades]")
// One more old-style merge despite the upgrade
// At ledger 8, level 2 spills, and starts an old-style merge,
// as level 1 snap is still of old version
REQUIRE(mc.mPreShadowRemovalProtocolMerges == 7);
REQUIRE(mc.mPreShadowRemovalProtocolMerges == 6);
break;
case 7:
REQUIRE(getVers(lev0Snap) == newProto);
REQUIRE(getVers(lev1Curr) == oldProto);
REQUIRE(mc.mPostShadowRemovalProtocolMerges == 4);
REQUIRE(mc.mPreShadowRemovalProtocolMerges == 6);
REQUIRE(mc.mPreShadowRemovalProtocolMerges == 5);
break;
case 6:
REQUIRE(getVers(lev0Snap) == newProto);
REQUIRE(getVers(lev1Curr) == oldProto);
REQUIRE(mc.mPostShadowRemovalProtocolMerges == 3);
REQUIRE(mc.mPreShadowRemovalProtocolMerges == 6);
REQUIRE(mc.mPreShadowRemovalProtocolMerges == 5);
break;
case 5:
REQUIRE(getVers(lev0Curr) == newProto);
REQUIRE(getVers(lev0Snap) == oldProto);
REQUIRE(mc.mPostShadowRemovalProtocolMerges == 1);
REQUIRE(mc.mPreShadowRemovalProtocolMerges == 6);
REQUIRE(mc.mPreShadowRemovalProtocolMerges == 5);
break;
default:
break;
Expand Down

0 comments on commit 80f5998

Please sign in to comment.