Skip to content

Commit

Permalink
[Core] Fix data races in bls_worker and use ctpl_stl queue
Browse files Browse the repository at this point in the history
Change ctpl implementation to use STL queue & mutex.

Use ctpl synchronized queue instead of boost lockfree queue in bls
worker aggregator.

Use smart pointers for memory management of Aggregator and
VectorAggregator. With 'delete this;' the objects are prone to data race
on the delete operator.

Use smart pointers for memory management of ContributionVerifier.

Pass shared_ptr by value to other threads via worker pool.
  • Loading branch information
Fuzzbawls committed Jan 8, 2022
1 parent 55babf4 commit c76e7b6
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 103 deletions.
2 changes: 1 addition & 1 deletion src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ BITCOIN_CORE_H = \
core_io.h \
cuckoocache.h \
crypter.h \
ctpl.h \
ctpl_stl.h \
cyclingvector.h \
evo/deterministicmns.h \
evo/evodb.h \
Expand Down
94 changes: 45 additions & 49 deletions src/bls/bls_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ bool CBLSWorker::GenerateContributions(int quorumThreshold, const BLSIdVector& i
// The input vector is not copied into the Aggregator but instead a vector of pointers to the original entries from the
// input vector is stored. This means that the input vector must stay alive for the whole lifetime of the Aggregator
template <typename T>
struct Aggregator {
struct Aggregator : public std::enable_shared_from_this<Aggregator<T>> {
typedef T ElementType;

size_t batchSize{16};
Expand All @@ -142,7 +142,7 @@ struct Aggregator {
std::mutex m;
// items in the queue are all intermediate aggregation results of finished batches.
// The intermediate results must be deleted by us again (which we do in SyncAggregateAndPushAggQueue)
boost::lockfree::queue<T*> aggQueue;
ctpl::detail::Queue<T*> aggQueue;
std::atomic<size_t> aggQueueSize{0};

typedef std::function<void(const T& agg)> DoneCallback;
Expand All @@ -160,7 +160,6 @@ struct Aggregator {
DoneCallback _doneCallback) :
workerPool(_workerPool),
parallel(_parallel),
aggQueue(0),
doneCallback(std::move(_doneCallback))
{
inputVec = std::make_shared<std::vector<const T*> >(count);
Expand All @@ -184,19 +183,19 @@ struct Aggregator {
} else {
doneCallback(SyncAggregate(*inputVec, 0, inputVec->size()));
}
delete this;
return;
}

if (batchCount == 1) {
// just a single batch of work, take a shortcut.
PushWork([this](int threadId) {
if (inputVec->size() == 1) {
doneCallback(*(*inputVec)[0]);
} else {
doneCallback(SyncAggregate(*inputVec, 0, inputVec->size()));
}
delete this;
auto self(this->shared_from_this());
PushWork([self](int threadId) {
size_t vecSize = self->inputVec->size();
if (vecSize == 1) {
self->doneCallback(*(*self->inputVec)[0]);
} else {
self->doneCallback(self->SyncAggregate(*self->inputVec, 0, vecSize));
}
});
return;
}
Expand Down Expand Up @@ -252,17 +251,18 @@ struct Aggregator {
delete rem[i];
}
doneCallback(r);

delete this;
}

void AsyncAggregateAndPushAggQueue(std::shared_ptr<std::vector<const T*> >& vec, size_t start, size_t count, bool del)
void AsyncAggregateAndPushAggQueue(const std::shared_ptr<std::vector<const T*>>& vec, size_t start, size_t count, bool del)
{
IncWait();
PushWork(std::bind(&Aggregator::SyncAggregateAndPushAggQueue, this, vec, start, count, del));
auto self(this->shared_from_this());
PushWork([self, vec, start, count, del](int threadId){
self->SyncAggregateAndPushAggQueue(vec, start, count, del);
});
}

void SyncAggregateAndPushAggQueue(std::shared_ptr<std::vector<const T*> >& vec, size_t start, size_t count, bool del)
void SyncAggregateAndPushAggQueue(const std::shared_ptr<std::vector<const T*> >& vec, size_t start, size_t count, bool del)
{
// aggregate vec and push the intermediate result onto the work queue
PushAggQueue(SyncAggregate(*vec, start, count));
Expand Down Expand Up @@ -333,7 +333,7 @@ struct Aggregator {
// [ a1+a2+a3+a4, b1+b2+b3+b4, c1+c2+c3+c4, d1+d2+d3+d4]
// Same rules for the input vectors apply to the VectorAggregator as for the Aggregator (they must stay alive)
template <typename T>
struct VectorAggregator {
struct VectorAggregator : public std::enable_shared_from_this<VectorAggregator<T>> {
typedef Aggregator<T> AggregatorType;
typedef std::vector<T> VectorType;
typedef std::shared_ptr<VectorType> VectorPtrType;
Expand Down Expand Up @@ -372,19 +372,15 @@ struct VectorAggregator {

void Start()
{
std::vector<AggregatorType*> aggregators;
for (size_t i = 0; i < vecSize; i++) {
std::vector<const T*> tmp(count);
for (size_t j = 0; j < count; j++) {
tmp[j] = &(*vecs[start + j])[i];
}

auto aggregator = new AggregatorType(std::move(tmp), 0, count, parallel, workerPool, std::bind(&VectorAggregator::CheckDone, this, std::placeholders::_1, i));
// we can't directly start the aggregator here as it might be so fast that it deletes "this" while we are still in this loop
aggregators.emplace_back(aggregator);
}
for (auto agg : aggregators) {
agg->Start();
auto self(this->shared_from_this());
auto aggregator = std::make_shared<AggregatorType>(std::move(tmp), 0, count, parallel, workerPool, [self, i](const T& agg) {self->CheckDone(agg, i);});
aggregator->Start();
}
}

Expand All @@ -393,14 +389,13 @@ struct VectorAggregator {
(*result)[idx] = agg;
if (++doneCount == vecSize) {
doneCallback(result);
delete this;
}
}
};

// See comment of AsyncVerifyContributionShares for a description on what this does
// Same rules as in Aggregator apply for the inputs
struct ContributionVerifier {
struct ContributionVerifier : public std::enable_shared_from_this<ContributionVerifier> {
struct BatchState {
size_t start;
size_t count;
Expand Down Expand Up @@ -493,16 +488,16 @@ struct ContributionVerifier {
}
}
doneCallback(result);
delete this;
}

void AsyncAggregate(size_t batchIdx)
{
auto& batchState = batchStates[batchIdx];

// aggregate vvecs and skShares of batch in parallel
auto vvecAgg = new VectorAggregator<CBLSPublicKey>(vvecs, batchState.start, batchState.count, parallel, workerPool, std::bind(&ContributionVerifier::HandleAggVvecDone, this, batchIdx, std::placeholders::_1));
auto skShareAgg = new Aggregator<CBLSSecretKey>(skShares, batchState.start, batchState.count, parallel, workerPool, std::bind(&ContributionVerifier::HandleAggSkShareDone, this, batchIdx, std::placeholders::_1));
auto self(this->shared_from_this());
auto vvecAgg = std::make_shared<VectorAggregator<CBLSPublicKey>>(vvecs, batchState.start, batchState.count, parallel, workerPool, [self, batchIdx] (const BLSVerificationVectorPtr& vvec) {self->HandleAggVvecDone(batchIdx, vvec);});
auto skShareAgg = std::make_shared<Aggregator<CBLSSecretKey>>(skShares, batchState.start, batchState.count, parallel, workerPool, [self, batchIdx] (const CBLSSecretKey& skShare) {self->HandleAggSkShareDone(batchIdx, skShare);});

vvecAgg->Start();
skShareAgg->Start();
Expand Down Expand Up @@ -550,17 +545,18 @@ struct ContributionVerifier {

void AsyncAggregatedVerifyBatch(size_t batchIdx)
{
auto f = [this, batchIdx](int threadId) {
auto& batchState = batchStates[batchIdx];
bool result = Verify(batchState.vvec, batchState.skShare);
if (result) {
// whole batch is valid
batchState.verifyResults.assign(batchState.count, 1);
HandleVerifyDone(batchIdx, batchState.count);
} else {
// at least one entry in the batch is invalid, revert to per-contribution verification (but parallelized)
AsyncVerifyBatchOneByOne(batchIdx);
}
auto self(shared_from_this());
auto f = [self, batchIdx](int threadId) {
auto& batchState = self->batchStates[batchIdx];
bool result = self->Verify(batchState.vvec, batchState.skShare);
if (result) {
// whole batch is valid
batchState.verifyResults.assign(batchState.count, 1);
self->HandleVerifyDone(batchIdx, batchState.count);
} else {
// at least one entry in the batch is invalid, revert to per-contribution verification (but parallelized)
self->AsyncVerifyBatchOneByOne(batchIdx);
}
};
PushOrDoWork(std::move(f));
}
Expand All @@ -570,12 +566,12 @@ struct ContributionVerifier {
size_t count = batchStates[batchIdx].count;
batchStates[batchIdx].verifyResults.assign(count, 0);
for (size_t i = 0; i < count; i++) {
auto f = [this, i, batchIdx](int threadId) {
auto& batchState = batchStates[batchIdx];
batchState.verifyResults[i] = Verify(vvecs[batchState.start + i], skShares[batchState.start + i]);
HandleVerifyDone(batchIdx, 1);
};
PushOrDoWork(std::move(f));
auto self(this->shared_from_this());
PushOrDoWork([self, i, batchIdx](int threadId) {
auto& batchState = self->batchStates[batchIdx];
batchState.verifyResults[i] = self->Verify(self->vvecs[batchState.start + i], self->skShares[batchState.start + i]);
self->HandleVerifyDone(batchIdx, 1);
});
}
}

Expand Down Expand Up @@ -617,7 +613,7 @@ void CBLSWorker::AsyncBuildQuorumVerificationVector(const std::vector<BLSVerific
return;
}

auto agg = new VectorAggregator<CBLSPublicKey>(vvecs, start, count, parallel, workerPool, std::move(doneCallback));
auto agg = std::make_shared<VectorAggregator<CBLSPublicKey>>(vvecs, start, count, parallel, workerPool, std::move(doneCallback));
agg->Start();
}

Expand Down Expand Up @@ -652,7 +648,7 @@ void AsyncAggregateHelper(ctpl::thread_pool& workerPool,
return;
}

auto agg = new Aggregator<T>(vec, start, count, parallel, workerPool, std::move(doneCallback));
auto agg = std::make_shared<Aggregator<T>>(vec, start, count, parallel, workerPool, std::move(doneCallback));
agg->Start();
}

Expand Down Expand Up @@ -737,7 +733,7 @@ void CBLSWorker::AsyncVerifyContributionShares(const CBLSId& forId, const std::v
return;
}

auto verifier = new ContributionVerifier(forId, vvecs, skShares, 8, parallel, aggregated, workerPool, std::move(doneCallback));
auto verifier = std::make_shared<ContributionVerifier>(forId, vvecs, skShares, 8, parallel, aggregated, workerPool, std::move(doneCallback));
verifier->Start();
}

Expand Down
4 changes: 1 addition & 3 deletions src/bls/bls_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@
#define PIVX_CRYPTO_BLS_WORKER_H

#include "bls/bls_wrapper.h"
#include "ctpl.h"
#include "ctpl_stl.h"

#include <future>
#include <mutex>

#include <boost/lockfree/queue.hpp>

// Low level BLS/DKG stuff. All very compute intensive and optimized for parallelization
// The worker tries to parallelize as much as possible and utilizes a few properties of BLS aggregation to speed up things
// For example, public key vectors can be aggregated in parallel if they are split into batches and the batched aggregations are
Expand Down
Loading

0 comments on commit c76e7b6

Please sign in to comment.