Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve merge performance #3548

Merged
merged 39 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
83781ef
Add async pool to destroy executeChunks
morebtcg Mar 21, 2023
68abf06
Baseline scheduler support parallel calculate roots
morebtcg Mar 21, 2023
7cb17cd
Fix tx root calculate error
morebtcg Mar 22, 2023
6250520
Fix warnings
morebtcg Mar 23, 2023
5dd37f5
Update compile options
morebtcg Mar 23, 2023
ea83f1a
Remove unnecessary template arg
morebtcg Mar 23, 2023
5c3ee10
Use task_group to replace old ThreadPool
morebtcg Mar 24, 2023
249b001
Fix some names
morebtcg Mar 24, 2023
81fceea
Add timing for scheduler
morebtcg Mar 24, 2023
5259d6a
Revert StringPool changes
morebtcg Mar 24, 2023
b2375d2
Add ittapi report
morebtcg Mar 24, 2023
e390734
Merge remote-tracking branch 'upstream/feature-3.4.0' into timing
morebtcg Mar 24, 2023
27377f4
Add oneapi tbb header for WsSession
morebtcg Mar 24, 2023
2d0c108
Add missing tbb header
morebtcg Mar 24, 2023
eb12a71
Update tests
morebtcg Mar 24, 2023
c86ffd2
Improve merge performance
morebtcg Mar 25, 2023
e5e341b
Add more itt report
morebtcg Mar 26, 2023
ed36395
Add 1000user tests
morebtcg Mar 27, 2023
4f694f9
Add mutableLock
morebtcg Mar 27, 2023
d28ff80
Add notify itt report
morebtcg Mar 27, 2023
f48ce76
Merge remote-tracking branch 'upstream/feature-3.4.0' into merge
morebtcg Mar 27, 2023
ecdbe8b
Add merge method for rocksdb
morebtcg Mar 28, 2023
503a052
Update merge method
morebtcg Mar 29, 2023
5ad67cf
Temp remove serial execute
morebtcg Mar 29, 2023
62dc4f1
Merge remote-tracking branch 'upstream/feature-3.4.0' into merge
morebtcg Mar 31, 2023
6c0c575
Fix macos compile error
morebtcg Mar 31, 2023
ca623e8
Fix macos <=> compare
morebtcg Mar 31, 2023
98cf528
Update zip_view and iota_view to views::zip and views::iota
morebtcg Mar 31, 2023
eb091b4
Fix type mismatch
morebtcg Mar 31, 2023
e842314
Fix macos compile error
morebtcg Mar 31, 2023
167c792
Fix macos error2
morebtcg Mar 31, 2023
6590d96
Fix macos error3
morebtcg Mar 31, 2023
85af3a7
Fix mac error #4
morebtcg Mar 31, 2023
d1ed357
disable centos COVERAGE
morebtcg Mar 31, 2023
0226265
Fix archive tool
morebtcg Mar 31, 2023
4afd7de
Update linker to gold
morebtcg Mar 31, 2023
a693203
Update ci check air transaction count
morebtcg Apr 3, 2023
9542413
Reduce lock range of asyncSendMessageByNodeID
morebtcg Apr 3, 2023
2018580
Use global task_group in WsService
morebtcg Apr 3, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/workflow-self-hosted-centos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ jobs:
export LIBCLANG_PATH=/opt/rh/llvm-toolset-7.0/root/lib64/
. /opt/rh/llvm-toolset-7.0/enable
alias cmake='cmake3'
cd build && cmake3 -DALLOCATOR=default -DCMAKE_TOOLCHAIN_FILE=${{ env.VCPKG_ROOT }}/scripts/buildsystems/vcpkg.cmake -DTESTS=ON -DCOVERAGE=ON -DWITH_LIGHTNODE=ON -DWITH_CPPSDK=ON -DWITH_TIKV=OFF -DWITH_TARS_SERVICES=ON .. || cat *.log
cd build && cmake3 -DALLOCATOR=default -DCMAKE_TOOLCHAIN_FILE=${{ env.VCPKG_ROOT }}/scripts/buildsystems/vcpkg.cmake -DTESTS=ON -DLINKER=gold -DCOVERAGE=OFF -DWITH_LIGHTNODE=ON -DWITH_CPPSDK=ON -DWITH_TIKV=OFF -DWITH_TARS_SERVICES=ON .. || cat *.log
make -j8
- name: Test
run:
Expand Down
4 changes: 2 additions & 2 deletions bcos-boostssl/bcos-boostssl/websocket/WsService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,9 +394,9 @@ std::shared_ptr<WsSession> WsService::newSession(
_wsStreamDelegate->setMaxReadMsgSize(m_config->maxMsgSize());

std::string endPoint = _wsStreamDelegate->remoteEndpoint();
auto session = m_sessionFactory->createSession(m_moduleName);
auto session = m_sessionFactory->createSession(m_taskGroup, m_moduleName);

session->setWsStreamDelegate(_wsStreamDelegate);
session->setWsStreamDelegate(std::move(_wsStreamDelegate));
session->setIoc(m_ioservicePool->getIOService());
session->setMessageFactory(messageFactory());
session->setEndPoint(endPoint);
Expand Down
3 changes: 2 additions & 1 deletion bcos-boostssl/bcos-boostssl/websocket/WsService.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <bcos-utilities/Common.h>
#include <bcos-utilities/IOServicePool.h>
#include <bcos-utilities/ThreadPool.h>
#include <oneapi/tbb/task_group.h>
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/strand.hpp>
Expand Down Expand Up @@ -188,6 +189,7 @@ class WsService : public std::enable_shared_from_this<WsService>

private:
bool m_running{false};
tbb::task_group m_taskGroup;

int32_t m_waitConnectFinishTimeout = 30000;
std::string m_moduleName;
Expand Down Expand Up @@ -217,7 +219,6 @@ class WsService : public std::enable_shared_from_this<WsService>
// http server
std::shared_ptr<bcos::boostssl::http::HttpServer> m_httpServer;

private:
// mutex for m_sessions
mutable boost::shared_mutex x_mutex;
// all active sessions
Expand Down
11 changes: 6 additions & 5 deletions bcos-boostssl/bcos-boostssl/websocket/WsSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ using namespace bcos::boostssl;
using namespace bcos::boostssl::ws;
using namespace bcos::boostssl::http;

WsSession::WsSession(std::string _moduleName) : m_moduleName(_moduleName)
WsSession::WsSession(tbb::task_group& taskGroup, std::string _moduleName)
: m_taskGroup(taskGroup), m_moduleName(std::move(_moduleName))
{
WEBSOCKET_SESSION(INFO) << LOG_KV("[NEWOBJ][WSSESSION]", this);
}
Expand Down Expand Up @@ -81,7 +82,7 @@ void WsSession::drop(uint32_t _reason)
WEBSOCKET_SESSION(TRACE)
<< LOG_DESC("the session has been disconnected") << LOG_KV("seq", cbEntry.first);

m_asyncGroup.run([callback = std::move(callback), error]() {
m_taskGroup.run([callback = std::move(callback), error]() {
callback->respCallBack(error, nullptr, nullptr);
});
}
Expand All @@ -98,7 +99,7 @@ void WsSession::drop(uint32_t _reason)
m_wsStreamDelegate->close();
}

m_asyncGroup.run([self]() {
m_taskGroup.run([self]() {
auto session = self.lock();
if (session)
{
Expand Down Expand Up @@ -181,7 +182,7 @@ void WsSession::onReadPacket(boost::beast::flat_buffer& _buffer)
void WsSession::onMessage(bcos::boostssl::MessageFace::Ptr _message)
{
// task enqueue
m_asyncGroup.run([self = weak_from_this(), _message = std::move(_message)]() {
m_taskGroup.run([self = weak_from_this(), _message = std::move(_message)]() {
auto session = self.lock();
if (!session)
{
Expand Down Expand Up @@ -470,7 +471,7 @@ void WsSession::onRespTimeout(const boost::system::error_code& _error, const std
WEBSOCKET_SESSION(WARNING) << LOG_BADGE("onRespTimeout") << LOG_KV("seq", _seq);

auto error = BCOS_ERROR_PTR(WsError::TimeOut, "waiting for message response timed out");
m_asyncGroup.run([callback = std::move(callback), error = std::move(error)]() {
m_taskGroup.run([callback = std::move(callback), error = std::move(error)]() {
callback->respCallBack(error, nullptr, nullptr);
});
}
14 changes: 9 additions & 5 deletions bcos-boostssl/bcos-boostssl/websocket/WsSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,12 @@ class WsSession : public std::enable_shared_from_this<WsSession>,
using Ptrs = std::vector<std::shared_ptr<WsSession>>;

public:
WsSession(std::string _moduleName = "DEFAULT");
WsSession(tbb::task_group& taskGroup, std::string _moduleName = "DEFAULT");

virtual ~WsSession() { WEBSOCKET_SESSION(INFO) << LOG_KV("[DELOBJ][WSSESSION]", this); }
virtual ~WsSession() noexcept
{
WEBSOCKET_SESSION(INFO) << LOG_KV("[DELOBJ][WSSESSION]", this);
}

void drop(uint32_t _reason);

Expand Down Expand Up @@ -177,6 +180,7 @@ class WsSession : public std::enable_shared_from_this<WsSession>,
};

protected:
tbb::task_group& m_taskGroup;
// flag for message that need to check respond packet like p2pmessage
bool m_needCheckRspPacket = false;
//
Expand Down Expand Up @@ -212,7 +216,7 @@ class WsSession : public std::enable_shared_from_this<WsSession>,
std::shared_ptr<MessageFaceFactory> m_messageFactory;
// thread pool
// std::shared_ptr<bcos::ThreadPool> m_threadPool;
tbb::task_group m_asyncGroup;

// ioc
std::shared_ptr<boost::asio::io_context> m_ioc;
// send message queue
Expand All @@ -229,9 +233,9 @@ class WsSessionFactory
virtual ~WsSessionFactory() = default;

public:
virtual WsSession::Ptr createSession(std::string _moduleName)
virtual WsSession::Ptr createSession(tbb::task_group& taskGroup, std::string _moduleName)
{
auto session = std::make_shared<WsSession>(_moduleName);
auto session = std::make_shared<WsSession>(taskGroup, _moduleName);
return session;
}
};
Expand Down
2 changes: 1 addition & 1 deletion bcos-framework/bcos-framework/protocol/Block.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class Block
virtual NonceListPtr nonces() const
{
return std::make_shared<NonceList>(
RANGES::iota_view<uint64_t, uint64_t>(0LU, transactionsSize()) |
RANGES::iota_view<size_t, size_t>(0LU, transactionsSize()) |
RANGES::views::transform([this](uint64_t index) {
auto transaction = this->transaction(index);
return transaction->nonce();
Expand Down
28 changes: 11 additions & 17 deletions bcos-framework/bcos-framework/storage2/MemoryStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "Storage.h"
#include "bcos-task/Task.h"
#include <bcos-utilities/NullLock.h>
#include <oneapi/tbb/parallel_for_each.h>
#include <boost/container/small_vector.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/identity.hpp>
Expand All @@ -14,6 +15,7 @@
#include <boost/throw_exception.hpp>
#include <functional>
#include <mutex>
#include <range/v3/view/transform.hpp>
#include <thread>
#include <type_traits>
#include <utility>
Expand Down Expand Up @@ -64,10 +66,8 @@ class MemoryStorage
using Mutex = std::mutex;
using Lock = std::conditional_t<withConcurrent, std::unique_lock<Mutex>, utilities::NullLock>;
using BucketMutex = std::conditional_t<withConcurrent, Mutex, Empty>;

using DataValueType =
std::conditional_t<withLogicalDeletion, std::variant<Deleted, ValueType>, ValueType>;

struct Data
{
KeyType key;
Expand Down Expand Up @@ -399,7 +399,7 @@ class MemoryStorage
task::AwaitableValue<void> write(
RANGES::input_range auto&& keys, RANGES::input_range auto&& values)
{
for (auto&& [key, value] : RANGES::zip_view(
for (auto&& [key, value] : RANGES::views::zip(
std::forward<decltype(keys)>(keys), std::forward<decltype(values)>(values)))
{
auto [bucket, lock] = getBucket(key);
Expand Down Expand Up @@ -498,9 +498,9 @@ class MemoryStorage
return {};
}

void merge(MemoryStorage& from, bool overwrite)
task::Task<void> merge(MemoryStorage& from)
{
for (auto bucketPair : RANGES::zip_view(m_buckets, from.m_buckets))
for (auto bucketPair : RANGES::views::zip(m_buckets, from.m_buckets))
{
auto& [bucket, fromBucket] = bucketPair;
Lock toLock(bucket.mutex);
Expand All @@ -509,27 +509,21 @@ class MemoryStorage
auto& index = bucket.container.template get<0>();
auto& fromIndex = fromBucket.container.template get<0>();

if (overwrite)
while (!fromIndex.empty())
{
while (!fromIndex.empty())
auto [it, merged] = index.merge(fromIndex, fromIndex.begin());
if (!merged)
{
auto [it, merged] = index.merge(fromIndex, fromIndex.begin());
if (!merged)
{
index.insert(index.erase(it), fromIndex.extract(fromIndex.begin()));
}
index.insert(index.erase(it), fromIndex.extract(fromIndex.begin()));
}
}
else
{
index.merge(fromIndex);
}
}
co_return;
}

void swap(MemoryStorage& from)
{
for (auto bucketPair : RANGES::zip_view(m_buckets, from.m_buckets))
for (auto bucketPair : RANGES::views::zip(m_buckets, from.m_buckets))
{
auto& [bucket, fromBucket] = bucketPair;
Lock toLock(bucket.mutex);
Expand Down
126 changes: 81 additions & 45 deletions bcos-framework/bcos-framework/storage2/Storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,57 +15,54 @@ namespace bcos::storage2
{

template <class IteratorType>
concept ReadIterator = requires(IteratorType&& iterator)
{
requires std::convertible_to < task::AwaitableReturnType<decltype(iterator.next())>,
bool > ;
requires std::same_as < typename task::AwaitableReturnType<decltype(iterator.key())>,
typename IteratorType::Key > ;
requires std::same_as < typename task::AwaitableReturnType<decltype(iterator.value())>,
typename IteratorType::Value > ;
requires std::convertible_to < task::AwaitableReturnType<decltype(iterator.hasValue())>,
bool > ;
};
concept ReadIterator =
requires(IteratorType&& iterator) {
requires std::convertible_to<task::AwaitableReturnType<decltype(iterator.next())>, bool>;
requires std::same_as<typename task::AwaitableReturnType<decltype(iterator.key())>,
typename IteratorType::Key>;
requires std::same_as<typename task::AwaitableReturnType<decltype(iterator.value())>,
typename IteratorType::Value>;
requires std::convertible_to<task::AwaitableReturnType<decltype(iterator.hasValue())>,
bool>;
};

template <class StorageType>
concept ReadableStorage = requires(StorageType&& impl)
{
typename StorageType::Key;
requires ReadIterator<task::AwaitableReturnType<decltype(impl.read(
std::declval<std::vector<typename StorageType::Key>>()))>>;
};
concept ReadableStorage = requires(StorageType&& impl) {
typename StorageType::Key;
requires ReadIterator<task::AwaitableReturnType<decltype(impl.read(
std::declval<std::vector<typename StorageType::Key>>()))>>;
};

template <class StorageType>
concept WriteableStorage = requires(StorageType&& impl)
{
typename StorageType::Key;
typename StorageType::Value;
requires task::IsAwaitable<decltype(impl.write(
std::vector<typename StorageType::Key>(), std::vector<typename StorageType::Value>()))>;
};
concept WriteableStorage =
requires(StorageType&& impl) {
typename StorageType::Key;
typename StorageType::Value;
requires task::IsAwaitable<decltype(impl.write(
std::vector<typename StorageType::Key>(), std::vector<typename StorageType::Value>()))>;
};

struct STORAGE_BEGIN_TYPE
{
};
inline constexpr STORAGE_BEGIN_TYPE STORAGE_BEGIN{};

template <class StorageType>
concept SeekableStorage = requires(StorageType&& impl)
{
typename StorageType::Key;
requires ReadIterator<
task::AwaitableReturnType<decltype(impl.seek(std::declval<typename StorageType::Key>()))>>;
requires ReadIterator<
task::AwaitableReturnType<decltype(impl.seek(std::declval<STORAGE_BEGIN_TYPE>()))>>;
};
concept SeekableStorage =
requires(StorageType&& impl) {
typename StorageType::Key;
requires ReadIterator<task::AwaitableReturnType<decltype(impl.seek(
std::declval<typename StorageType::Key>()))>>;
requires ReadIterator<
task::AwaitableReturnType<decltype(impl.seek(std::declval<STORAGE_BEGIN_TYPE>()))>>;
};

template <class StorageType>
concept ErasableStorage = requires(StorageType&& impl)
{
typename StorageType::Key;
requires task::IsAwaitable<decltype(impl.remove(
std::declval<std::vector<typename StorageType::Key>>()))>;
};
concept ErasableStorage = requires(StorageType&& impl) {
typename StorageType::Key;
requires task::IsAwaitable<decltype(impl.remove(
std::declval<std::vector<typename StorageType::Key>>()))>;
};

template <storage2::ReadableStorage Storage>
struct ReadIteratorTrait
Expand All @@ -86,13 +83,12 @@ template <storage2::SeekableStorage Storage>
using SeekIteratorType = typename SeekIteratorTrait<Storage>::type;

template <class Iterator>
concept RangeableIterator = requires(Iterator&& iterator)
{
// clang-format off
requires storage2::ReadIterator<Iterator>;
{ iterator.range() } -> RANGES::range;
// clang-format on
};
concept RangeableIterator = requires(Iterator&& iterator) {
requires storage2::ReadIterator<Iterator>;
{
iterator.range()
} -> RANGES::range;
};

inline auto singleView(auto&& value)
{
Expand Down Expand Up @@ -153,4 +149,44 @@ inline task::Task<void> removeOne(ErasableStorage auto& storage, auto const& key
co_return;
}

namespace detail
{
template <class FromStorage, class ToStorage>
concept HasMemberMergeMethod =
requires(FromStorage& fromStorage, ToStorage& toStorage) {
requires SeekableStorage<FromStorage>;
requires task::IsAwaitable<decltype(toStorage.merge(fromStorage))>;
};
struct merge
{
template <class ToStorage>
requires WriteableStorage<ToStorage> && ErasableStorage<ToStorage>
task::Task<void> operator()(SeekableStorage auto& fromStorage, ToStorage& toStorage) const
{
if constexpr (HasMemberMergeMethod<std::remove_cvref_t<decltype(fromStorage)>,
std::remove_cvref_t<decltype(toStorage)>>)
{
co_await toStorage.merge(fromStorage);
}
else
{
auto it = co_await fromStorage.seek(storage2::STORAGE_BEGIN);
while (co_await it.next())
{
auto&& key = co_await it.key();
if (co_await it.hasValue())
{
co_await storage2::writeOne(toStorage, key, co_await it.value());
}
else
{
co_await storage2::removeOne(toStorage, key);
}
}
}
}
};
} // namespace detail
constexpr inline detail::merge merge{};

} // namespace bcos::storage2
Loading