diff --git a/Common/Common.vcxproj b/Common/Common.vcxproj index 7e61f8977655..4ea9625d7810 100644 --- a/Common/Common.vcxproj +++ b/Common/Common.vcxproj @@ -529,8 +529,10 @@ + + diff --git a/Common/Common.vcxproj.filters b/Common/Common.vcxproj.filters index 65b6eaccabd4..5e8ec1347756 100644 --- a/Common/Common.vcxproj.filters +++ b/Common/Common.vcxproj.filters @@ -415,6 +415,12 @@ GPU\Vulkan + + Thread + + + Thread + diff --git a/Common/Thread/Barrier.h b/Common/Thread/Barrier.h new file mode 100644 index 000000000000..11e55cdb00ae --- /dev/null +++ b/Common/Thread/Barrier.h @@ -0,0 +1,31 @@ +#pragma once + +#include +#include + +// Similar to C++20's std::barrier +class CountingBarrier { +public: + CountingBarrier(size_t count) : threadCount_(count) {} + + void Arrive() { + std::unique_lock lk(m); + counter++; + waiting++; + cv.wait(lk, [&] {return counter >= threadCount_; }); + cv.notify_one(); + waiting--; + if (waiting == 0) { + // Reset so it can be re-used. + counter = 0; + } + lk.unlock(); + } + +private: + std::mutex m; + std::condition_variable cv; + size_t counter = 0; + size_t waiting = 0; + size_t threadCount_; +}; diff --git a/Common/Thread/Event.h b/Common/Thread/Event.h index 3baf46537308..3cc773da72f7 100644 --- a/Common/Thread/Event.h +++ b/Common/Thread/Event.h @@ -7,12 +7,15 @@ struct Event : public Waitable { public: + Event() { + triggered_ = false; + } + void Wait() override { - if (triggered_) { - return; - } std::unique_lock lock; - cond_.wait(lock, [&] { return !triggered_; }); + if (!triggered_) { + cond_.wait(lock, [&] { return triggered_.load(); }); + } } void Notify() { @@ -24,5 +27,5 @@ struct Event : public Waitable { private: std::condition_variable cond_; std::mutex mutex_; - bool triggered_ = false; + std::atomic triggered_; }; diff --git a/Common/Thread/Waitable.h b/Common/Thread/Waitable.h new file mode 100644 index 000000000000..9008b2fa216a --- /dev/null +++ b/Common/Thread/Waitable.h @@ -0,0 +1,42 @@ +#pragma once + +#include +#include + +#include "Common/Thread/ThreadManager.h" + +class LimitedWaitable : public Waitable { +public: + LimitedWaitable() { + triggered_ = false; + } + + void Wait() override { + std::unique_lock lock(mutex_); + if (!triggered_) { + cond_.wait(lock, [&] { return triggered_.load(); }); + } + } + + bool WaitFor(double budget) { + uint32_t us = budget > 0 ? (uint32_t)(budget * 1000000.0) : 0; + std::unique_lock lock(mutex_); + if (!triggered_) { + if (us == 0) + return false; + cond_.wait_for(lock, std::chrono::microseconds(us), [&] { return triggered_.load(); }); + } + return triggered_; + } + + void Notify() { + std::unique_lock lock(mutex_); + triggered_ = true; + cond_.notify_all(); + } + +private: + std::condition_variable cond_; + std::mutex mutex_; + std::atomic triggered_; +}; diff --git a/Common/TimeUtil.cpp b/Common/TimeUtil.cpp index 50718df9792b..99d3288d9914 100644 --- a/Common/TimeUtil.cpp +++ b/Common/TimeUtil.cpp @@ -66,7 +66,7 @@ void sleep_ms(int ms) { // Return the current time formatted as Minutes:Seconds:Milliseconds // in the form 00:00:000. -void GetTimeFormatted(char formattedTime[11]) { +void GetTimeFormatted(char formattedTime[13]) { time_t sysTime; time(&sysTime); diff --git a/Common/TimeUtil.h b/Common/TimeUtil.h index 8d7273c466e2..0faf6dac2316 100644 --- a/Common/TimeUtil.h +++ b/Common/TimeUtil.h @@ -7,3 +7,17 @@ double time_now_d(); void sleep_ms(int ms); void GetTimeFormatted(char formattedTime[13]); + +// Rust-style Instant for clear and easy timing. +class Instant { +public: + static Instant Now() { + return Instant(time_now_d()); + } + double Elapsed() const { + return time_now_d() - instantTime_; + } +private: + explicit Instant(double initTime) : instantTime_(initTime) {} + double instantTime_; +}; diff --git a/Core/TextureReplacer.cpp b/Core/TextureReplacer.cpp index f4a2e7282822..0bd466c87eae 100644 --- a/Core/TextureReplacer.cpp +++ b/Core/TextureReplacer.cpp @@ -32,6 +32,7 @@ #include "Common/File/FileUtil.h" #include "Common/StringUtils.h" #include "Common/Thread/ParallelLoop.h" +#include "Common/Thread/Waitable.h" #include "Common/TimeUtil.h" #include "Core/Config.h" #include "Core/Host.h" @@ -744,46 +745,9 @@ float TextureReplacer::LookupReduceHashRange(int& w, int& h) { } } -class LimitedWaitable : public Waitable { -public: - LimitedWaitable() { - triggered_ = false; - } - - void Wait() override { - if (!triggered_) { - std::unique_lock lock(mutex_); - cond_.wait(lock, [&] { return !triggered_; }); - } - } - - bool WaitFor(double budget) { - uint32_t us = budget > 0 ? (uint32_t)(budget * 1000000.0) : 0; - if (!triggered_) { - if (us == 0) - return false; - std::unique_lock lock(mutex_); - cond_.wait_for(lock, std::chrono::microseconds(us), [&] { return !triggered_; }); - } - return triggered_; - } - - void Notify() { - std::unique_lock lock(mutex_); - triggered_ = true; - cond_.notify_all(); - } - -private: - std::condition_variable cond_; - std::mutex mutex_; - std::atomic triggered_; -}; - class ReplacedTextureTask : public Task { public: - ReplacedTextureTask(ReplacedTexture &tex, LimitedWaitable *w) : tex_(tex), waitable_(w) { - } + ReplacedTextureTask(ReplacedTexture &tex, LimitedWaitable *w) : tex_(tex), waitable_(w) {} TaskType Type() const override { return TaskType::IO_BLOCKING; diff --git a/unittest/TestThreadManager.cpp b/unittest/TestThreadManager.cpp index 3445a33083f1..38dd6b208331 100644 --- a/unittest/TestThreadManager.cpp +++ b/unittest/TestThreadManager.cpp @@ -1,10 +1,16 @@ +#include + #include "Common/Log.h" #include "Common/TimeUtil.h" +#include "Common/Thread/Barrier.h" #include "Common/Thread/ThreadManager.h" #include "Common/Thread/Channel.h" #include "Common/Thread/Promise.h" #include "Common/Thread/ParallelLoop.h" #include "Common/Thread/ThreadUtil.h" +#include "Common/Thread/Waitable.h" + +#include "UnitTest.h" struct ResultObject { bool ok; @@ -56,16 +62,79 @@ bool TestParallelLoop(ThreadManager *threadMan) { return true; } +// This is some ugly stuff but realistic. +const size_t THREAD_COUNT = 6; // Must match the number of threads in TestMultithreadedScheduling +const size_t ITERATIONS = 40000; + +static std::atomic g_atomicCounter; +static ThreadManager *g_threadMan; +static CountingBarrier g_barrier(THREAD_COUNT + 1); + +class IncrementTask : public Task { +public: + IncrementTask(TaskType type, LimitedWaitable *waitable) : type_(type), waitable_(waitable) {} + ~IncrementTask() {} + virtual TaskType Type() const { return type_; } + virtual void Run() { + g_atomicCounter++; + waitable_->Notify(); + } +private: + TaskType type_; + LimitedWaitable *waitable_; +}; + +void ThreadFunc() { + for (int i = 0; i < ITERATIONS; i++) { + auto threadWaitable = new LimitedWaitable(); + g_threadMan->EnqueueTask(new IncrementTask((i & 1) ? TaskType::CPU_COMPUTE : TaskType::IO_BLOCKING, threadWaitable)); + threadWaitable->WaitAndRelease(); + } + g_barrier.Arrive(); +} + +bool TestMultithreadedScheduling() { + g_atomicCounter = 0; + + auto start = Instant::Now(); + + std::thread thread1(ThreadFunc); + std::thread thread2(ThreadFunc); + std::thread thread3(ThreadFunc); + std::thread thread4(ThreadFunc); + std::thread thread5(ThreadFunc); + std::thread thread6(ThreadFunc); + + // Just testing the barrier + g_barrier.Arrive(); + // OK, all are done. + + EXPECT_EQ_INT(g_atomicCounter, THREAD_COUNT * ITERATIONS); + + thread1.join(); + thread2.join(); + thread3.join(); + thread4.join(); + thread5.join(); + thread6.join(); + + printf("Stress test elapsed: %0.2f", start.Elapsed()); + + return true; +} + bool TestThreadManager() { ThreadManager manager; manager.Init(8, 1); + g_threadMan = &manager; + Promise *object(Promise::Spawn(&manager, &ResultProducer, TaskType::IO_BLOCKING)); if (!TestParallelLoop(&manager)) { return false; } - sleep_ms(1000); + sleep_ms(100); ResultObject *result = object->BlockUntilReady(); if (result) { @@ -78,5 +147,9 @@ bool TestThreadManager() { return false; } + if (!TestMultithreadedScheduling()) { + return false; + } + return true; } diff --git a/unittest/UnitTest.h b/unittest/UnitTest.h index e953925d39a1..2267a7a87fc8 100644 --- a/unittest/UnitTest.h +++ b/unittest/UnitTest.h @@ -2,7 +2,7 @@ #define EXPECT_TRUE(a) if (!(a)) { printf("%s:%i: Test Fail\n", __FUNCTION__, __LINE__); return false; } #define EXPECT_FALSE(a) if ((a)) { printf("%s:%i: Test Fail\n", __FUNCTION__, __LINE__); return false; } -#define EXPECT_EQ_INT(a, b) if ((a) != (b)) { printf("%s:%i: Test Fail\n%d\nvs\n%d\n", __FUNCTION__, __LINE__, a, b); return false; } +#define EXPECT_EQ_INT(a, b) if ((a) != (b)) { printf("%s:%i: Test Fail\n%d\nvs\n%d\n", __FUNCTION__, __LINE__, (int)(a), (int)(b)); return false; } #define EXPECT_EQ_HEX(a, b) if ((a) != (b)) { printf("%s:%i: Test Fail\n%x\nvs\n%x\n", __FUNCTION__, __LINE__, a, b); return false; } #define EXPECT_EQ_FLOAT(a, b) if ((a) != (b)) { printf("%s:%i: Test Fail\n%f\nvs\n%f\n", __FUNCTION__, __LINE__, a, b); return false; } #define EXPECT_APPROX_EQ_FLOAT(a, b) if (fabsf((a)-(b))>0.00001f) { printf("%s:%i: Test Fail\n%f\nvs\n%f\n", __FUNCTION__, __LINE__, a, b); /*return false;*/ }