Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Boost usage reduction #714

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Remove Boost mutex
  • Loading branch information
Bushstar committed Sep 8, 2021
commit b20068954ad373735e11a3e379f316d4a814daef
11 changes: 3 additions & 8 deletions src/bench/checkqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@
#include <util/system.h>
#include <checkqueue.h>
#include <prevector.h>
#include <vector>
#include <boost/thread/thread.hpp>
#include <random.h>

#include <vector>

static const int MIN_CORES = 2;
static const size_t BATCHES = 101;
Expand All @@ -36,10 +35,7 @@ static void CCheckQueueSpeedPrevectorJob(benchmark::State& state)
void swap(PrevectorJob& x){p.swap(x.p);};
};
CCheckQueue<PrevectorJob> queue {QUEUE_BATCH_SIZE};
boost::thread_group tg;
for (auto x = 0; x < std::max(MIN_CORES, GetNumCores()); ++x) {
tg.create_thread([&]{queue.Thread();});
}
queue.StartWorkerThreads(GetNumCores() - 1);
while (state.KeepRunning()) {
// Make insecure_rand here so that each iteration is identical.
FastRandomContext insecure_rand(true);
Expand All @@ -55,7 +51,6 @@ static void CCheckQueueSpeedPrevectorJob(benchmark::State& state)
// it is done explicitly here for clarity
control.Wait();
}
tg.interrupt_all();
tg.join_all();
queue.StopWorkerThreads();
}
BENCHMARK(CCheckQueueSpeedPrevectorJob, 1400);
88 changes: 59 additions & 29 deletions src/checkqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@
#define DEFI_CHECKQUEUE_H

#include <sync.h>
#include <tinyformat.h>
#include <util/threadnames.h>

#include <algorithm>
#include <vector>

#include <boost/thread/condition_variable.hpp>
#include <boost/thread/mutex.hpp>

template <typename T>
class CCheckQueueControl;

Expand All @@ -31,74 +30,80 @@ class CCheckQueue
{
private:
//! Mutex to protect the inner state
boost::mutex mutex;
Mutex m_mutex;

//! Worker threads block on this when out of work
boost::condition_variable condWorker;
std::condition_variable m_worker_cv;

//! Master thread blocks on this when out of work
boost::condition_variable condMaster;
std::condition_variable m_master_cv;

//! The queue of elements to be processed.
//! As the order of booleans doesn't matter, it is used as a LIFO (stack)
std::vector<T> queue;
std::vector<T> queue GUARDED_BY(m_mutex);

//! The number of workers (including the master) that are idle.
int nIdle;
int nIdle GUARDED_BY(m_mutex){0};

//! The total number of workers (including the master).
int nTotal;
int nTotal GUARDED_BY(m_mutex){0};

//! The temporary evaluation result.
bool fAllOk;
bool fAllOk GUARDED_BY(m_mutex){true};

/**
* Number of verifications that haven't completed yet.
* This includes elements that are no longer queued, but still in the
* worker's own batches.
*/
unsigned int nTodo;
unsigned int nTodo GUARDED_BY(m_mutex){0};

//! The maximum number of elements to be processed in one batch
unsigned int nBatchSize;
const unsigned int nBatchSize;

std::vector<std::thread> m_worker_threads;
bool m_request_stop GUARDED_BY(m_mutex){false};

/** Internal function that does bulk of the verification work. */
bool Loop(bool fMaster = false)
bool Loop(bool fMaster)
{
boost::condition_variable& cond = fMaster ? condMaster : condWorker;
std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv;
std::vector<T> vChecks;
vChecks.reserve(nBatchSize);
unsigned int nNow = 0;
bool fOk = true;
do {
{
boost::unique_lock<boost::mutex> lock(mutex);
WAIT_LOCK(m_mutex, lock);
// first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
if (nNow) {
fAllOk &= fOk;
nTodo -= nNow;
if (nTodo == 0 && !fMaster)
// We processed the last element; inform the master it can exit and return the result
condMaster.notify_one();
m_master_cv.notify_one();
} else {
// first iteration
nTotal++;
}
// logically, the do loop starts here
while (queue.empty()) {
while (queue.empty() && !m_request_stop) {
if (fMaster && nTodo == 0) {
nTotal--;
bool fRet = fAllOk;
// reset the status for new work later
if (fMaster)
fAllOk = true;
fAllOk = true;
// return the current status
return fRet;
}
nIdle++;
cond.wait(lock); // wait
nIdle--;
}
if (m_request_stop) {
return false;
}

// Decide how many work units to process now.
// * Do not try to do everything at once, but aim for increasingly smaller batches so
// all workers finish approximately simultaneously.
Expand All @@ -125,15 +130,27 @@ class CCheckQueue

public:
//! Mutex to ensure only one concurrent CCheckQueueControl
boost::mutex ControlMutex;
Mutex m_control_mutex;

//! Create a new check queue
explicit CCheckQueue(unsigned int nBatchSizeIn) : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), nBatchSize(nBatchSizeIn) {}
explicit CCheckQueue(unsigned int nBatchSizeIn) : nBatchSize(nBatchSizeIn) {}

//! Worker thread
void Thread()
//! Create a pool of new worker threads.
void StartWorkerThreads(const int threads_num)
{
Loop();
{
LOCK(m_mutex);
nIdle = 0;
nTotal = 0;
fAllOk = true;
}
assert(m_worker_threads.empty());
for (int n = 0; n < threads_num; ++n) {
m_worker_threads.emplace_back([this, n]() {
util::ThreadRename(strprintf("scriptch.%i", n));
Loop(false /* worker thread */);
});
}
}

//! Wait until execution finishes, and return whether all evaluations were successful.
Expand All @@ -145,20 +162,33 @@ class CCheckQueue
//! Add a batch of checks to the queue
void Add(std::vector<T>& vChecks)
{
boost::unique_lock<boost::mutex> lock(mutex);
LOCK(m_mutex);
for (T& check : vChecks) {
queue.push_back(T());
check.swap(queue.back());
}
nTodo += vChecks.size();
if (vChecks.size() == 1)
condWorker.notify_one();
m_worker_cv.notify_one();
else if (vChecks.size() > 1)
condWorker.notify_all();
m_worker_cv.notify_all();
}

//! Stop all of the worker threads.
void StopWorkerThreads()
{
WITH_LOCK(m_mutex, m_request_stop = true);
m_worker_cv.notify_all();
for (std::thread& t : m_worker_threads) {
t.join();
}
m_worker_threads.clear();
WITH_LOCK(m_mutex, m_request_stop = false);
}

~CCheckQueue()
{
assert(m_worker_threads.empty());
}

};
Expand All @@ -182,7 +212,7 @@ class CCheckQueueControl
{
// passed queue is supposed to be unused, or nullptr
if (pqueue != nullptr) {
ENTER_CRITICAL_SECTION(pqueue->ControlMutex);
ENTER_CRITICAL_SECTION(pqueue->m_control_mutex);
}
}

Expand All @@ -206,7 +236,7 @@ class CCheckQueueControl
if (!fDone)
Wait();
if (pqueue != nullptr) {
LEAVE_CRITICAL_SECTION(pqueue->ControlMutex);
LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex);
}
}
};
Expand Down
31 changes: 18 additions & 13 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ void Shutdown(InitInterfaces& interfaces)
scheduler.stop();
threadGroup.interrupt_all();
threadGroup.join_all();
StopScriptCheckWorkerThreads();

// After the threads that potentially access these pointers have been stopped,
// destruct and reset all to nullptr.
Expand Down Expand Up @@ -1108,15 +1109,6 @@ bool AppInitParameterInteraction()
incrementalRelayFee = CFeeRate(n);
}

// -par=0 means autodetect, but nScriptCheckThreads==0 means no concurrency
nScriptCheckThreads = gArgs.GetArg("-par", DEFAULT_SCRIPTCHECK_THREADS);
if (nScriptCheckThreads <= 0)
nScriptCheckThreads += GetNumCores();
if (nScriptCheckThreads <= 1)
nScriptCheckThreads = 0;
else if (nScriptCheckThreads > MAX_SCRIPTCHECK_THREADS)
nScriptCheckThreads = MAX_SCRIPTCHECK_THREADS;

// block pruning; get the amount of disk space (in MiB) to allot for block & undo files
int64_t nPruneArg = gArgs.GetArg("-prune", 0);
if (nPruneArg < 0) {
Expand Down Expand Up @@ -1326,10 +1318,23 @@ bool AppInitMain(InitInterfaces& interfaces)
InitSignatureCache();
InitScriptExecutionCache();

LogPrintf("Using %u threads for script verification\n", nScriptCheckThreads);
if (nScriptCheckThreads) {
for (int i=0; i<nScriptCheckThreads-1; i++)
threadGroup.create_thread([i]() { return ThreadScriptCheck(i); });
int script_threads = gArgs.GetArg("-par", DEFAULT_SCRIPTCHECK_THREADS);
if (script_threads <= 0) {
// -par=0 means autodetect (number of cores - 1 script threads)
// -par=-n means "leave n cores free" (number of cores - n - 1 script threads)
script_threads += GetNumCores();
}

// Subtract 1 because the main thread counts towards the par threads
script_threads = std::max(script_threads - 1, 0);

// Number of script-checking threads <= MAX_SCRIPTCHECK_THREADS
script_threads = std::min(script_threads, MAX_SCRIPTCHECK_THREADS);

LogPrintf("Script verification uses %d additional threads\n", script_threads);
if (script_threads >= 1) {
g_parallel_script_checks = true;
StartScriptCheckWorkerThreads(script_threads);
}

// Start the lightweight task scheduler thread
Expand Down
Loading