diff --git a/Common/Common.vcxproj b/Common/Common.vcxproj
index 7e61f8977655..4ea9625d7810 100644
--- a/Common/Common.vcxproj
+++ b/Common/Common.vcxproj
@@ -529,8 +529,10 @@
+
+
diff --git a/Common/Common.vcxproj.filters b/Common/Common.vcxproj.filters
index 65b6eaccabd4..5e8ec1347756 100644
--- a/Common/Common.vcxproj.filters
+++ b/Common/Common.vcxproj.filters
@@ -415,6 +415,12 @@
GPU\Vulkan
+
+ Thread
+
+
+ Thread
+
diff --git a/Common/Thread/Barrier.h b/Common/Thread/Barrier.h
new file mode 100644
index 000000000000..11e55cdb00ae
--- /dev/null
+++ b/Common/Thread/Barrier.h
@@ -0,0 +1,31 @@
+#pragma once
+
+#include
+#include
+
+// Similar to C++20's std::barrier
+class CountingBarrier {
+public:
+ CountingBarrier(size_t count) : threadCount_(count) {}
+
+ void Arrive() {
+ std::unique_lock lk(m);
+ counter++;
+ waiting++;
+ cv.wait(lk, [&] {return counter >= threadCount_; });
+ cv.notify_one();
+ waiting--;
+ if (waiting == 0) {
+ // Reset so it can be re-used.
+ counter = 0;
+ }
+ lk.unlock();
+ }
+
+private:
+ std::mutex m;
+ std::condition_variable cv;
+ size_t counter = 0;
+ size_t waiting = 0;
+ size_t threadCount_;
+};
diff --git a/Common/Thread/Event.h b/Common/Thread/Event.h
index 3baf46537308..3cc773da72f7 100644
--- a/Common/Thread/Event.h
+++ b/Common/Thread/Event.h
@@ -7,12 +7,15 @@
struct Event : public Waitable {
public:
+ Event() {
+ triggered_ = false;
+ }
+
void Wait() override {
- if (triggered_) {
- return;
- }
std::unique_lock lock;
- cond_.wait(lock, [&] { return !triggered_; });
+ if (!triggered_) {
+ cond_.wait(lock, [&] { return triggered_.load(); });
+ }
}
void Notify() {
@@ -24,5 +27,5 @@ struct Event : public Waitable {
private:
std::condition_variable cond_;
std::mutex mutex_;
- bool triggered_ = false;
+ std::atomic triggered_;
};
diff --git a/Common/Thread/Waitable.h b/Common/Thread/Waitable.h
new file mode 100644
index 000000000000..9008b2fa216a
--- /dev/null
+++ b/Common/Thread/Waitable.h
@@ -0,0 +1,42 @@
+#pragma once
+
+#include
+#include
+
+#include "Common/Thread/ThreadManager.h"
+
+class LimitedWaitable : public Waitable {
+public:
+ LimitedWaitable() {
+ triggered_ = false;
+ }
+
+ void Wait() override {
+ std::unique_lock lock(mutex_);
+ if (!triggered_) {
+ cond_.wait(lock, [&] { return triggered_.load(); });
+ }
+ }
+
+ bool WaitFor(double budget) {
+ uint32_t us = budget > 0 ? (uint32_t)(budget * 1000000.0) : 0;
+ std::unique_lock lock(mutex_);
+ if (!triggered_) {
+ if (us == 0)
+ return false;
+ cond_.wait_for(lock, std::chrono::microseconds(us), [&] { return triggered_.load(); });
+ }
+ return triggered_;
+ }
+
+ void Notify() {
+ std::unique_lock lock(mutex_);
+ triggered_ = true;
+ cond_.notify_all();
+ }
+
+private:
+ std::condition_variable cond_;
+ std::mutex mutex_;
+ std::atomic triggered_;
+};
diff --git a/Common/TimeUtil.cpp b/Common/TimeUtil.cpp
index 50718df9792b..99d3288d9914 100644
--- a/Common/TimeUtil.cpp
+++ b/Common/TimeUtil.cpp
@@ -66,7 +66,7 @@ void sleep_ms(int ms) {
// Return the current time formatted as Minutes:Seconds:Milliseconds
// in the form 00:00:000.
-void GetTimeFormatted(char formattedTime[11]) {
+void GetTimeFormatted(char formattedTime[13]) {
time_t sysTime;
time(&sysTime);
diff --git a/Common/TimeUtil.h b/Common/TimeUtil.h
index 8d7273c466e2..0faf6dac2316 100644
--- a/Common/TimeUtil.h
+++ b/Common/TimeUtil.h
@@ -7,3 +7,17 @@ double time_now_d();
void sleep_ms(int ms);
void GetTimeFormatted(char formattedTime[13]);
+
+// Rust-style Instant for clear and easy timing.
+class Instant {
+public:
+ static Instant Now() {
+ return Instant(time_now_d());
+ }
+ double Elapsed() const {
+ return time_now_d() - instantTime_;
+ }
+private:
+ explicit Instant(double initTime) : instantTime_(initTime) {}
+ double instantTime_;
+};
diff --git a/Core/TextureReplacer.cpp b/Core/TextureReplacer.cpp
index f4a2e7282822..0bd466c87eae 100644
--- a/Core/TextureReplacer.cpp
+++ b/Core/TextureReplacer.cpp
@@ -32,6 +32,7 @@
#include "Common/File/FileUtil.h"
#include "Common/StringUtils.h"
#include "Common/Thread/ParallelLoop.h"
+#include "Common/Thread/Waitable.h"
#include "Common/TimeUtil.h"
#include "Core/Config.h"
#include "Core/Host.h"
@@ -744,46 +745,9 @@ float TextureReplacer::LookupReduceHashRange(int& w, int& h) {
}
}
-class LimitedWaitable : public Waitable {
-public:
- LimitedWaitable() {
- triggered_ = false;
- }
-
- void Wait() override {
- if (!triggered_) {
- std::unique_lock lock(mutex_);
- cond_.wait(lock, [&] { return !triggered_; });
- }
- }
-
- bool WaitFor(double budget) {
- uint32_t us = budget > 0 ? (uint32_t)(budget * 1000000.0) : 0;
- if (!triggered_) {
- if (us == 0)
- return false;
- std::unique_lock lock(mutex_);
- cond_.wait_for(lock, std::chrono::microseconds(us), [&] { return !triggered_; });
- }
- return triggered_;
- }
-
- void Notify() {
- std::unique_lock lock(mutex_);
- triggered_ = true;
- cond_.notify_all();
- }
-
-private:
- std::condition_variable cond_;
- std::mutex mutex_;
- std::atomic triggered_;
-};
-
class ReplacedTextureTask : public Task {
public:
- ReplacedTextureTask(ReplacedTexture &tex, LimitedWaitable *w) : tex_(tex), waitable_(w) {
- }
+ ReplacedTextureTask(ReplacedTexture &tex, LimitedWaitable *w) : tex_(tex), waitable_(w) {}
TaskType Type() const override {
return TaskType::IO_BLOCKING;
diff --git a/unittest/TestThreadManager.cpp b/unittest/TestThreadManager.cpp
index 3445a33083f1..38dd6b208331 100644
--- a/unittest/TestThreadManager.cpp
+++ b/unittest/TestThreadManager.cpp
@@ -1,10 +1,16 @@
+#include
+
#include "Common/Log.h"
#include "Common/TimeUtil.h"
+#include "Common/Thread/Barrier.h"
#include "Common/Thread/ThreadManager.h"
#include "Common/Thread/Channel.h"
#include "Common/Thread/Promise.h"
#include "Common/Thread/ParallelLoop.h"
#include "Common/Thread/ThreadUtil.h"
+#include "Common/Thread/Waitable.h"
+
+#include "UnitTest.h"
struct ResultObject {
bool ok;
@@ -56,16 +62,79 @@ bool TestParallelLoop(ThreadManager *threadMan) {
return true;
}
+// This is some ugly stuff but realistic.
+const size_t THREAD_COUNT = 6; // Must match the number of threads in TestMultithreadedScheduling
+const size_t ITERATIONS = 40000;
+
+static std::atomic g_atomicCounter;
+static ThreadManager *g_threadMan;
+static CountingBarrier g_barrier(THREAD_COUNT + 1);
+
+class IncrementTask : public Task {
+public:
+ IncrementTask(TaskType type, LimitedWaitable *waitable) : type_(type), waitable_(waitable) {}
+ ~IncrementTask() {}
+ virtual TaskType Type() const { return type_; }
+ virtual void Run() {
+ g_atomicCounter++;
+ waitable_->Notify();
+ }
+private:
+ TaskType type_;
+ LimitedWaitable *waitable_;
+};
+
+void ThreadFunc() {
+ for (int i = 0; i < ITERATIONS; i++) {
+ auto threadWaitable = new LimitedWaitable();
+ g_threadMan->EnqueueTask(new IncrementTask((i & 1) ? TaskType::CPU_COMPUTE : TaskType::IO_BLOCKING, threadWaitable));
+ threadWaitable->WaitAndRelease();
+ }
+ g_barrier.Arrive();
+}
+
+bool TestMultithreadedScheduling() {
+ g_atomicCounter = 0;
+
+ auto start = Instant::Now();
+
+ std::thread thread1(ThreadFunc);
+ std::thread thread2(ThreadFunc);
+ std::thread thread3(ThreadFunc);
+ std::thread thread4(ThreadFunc);
+ std::thread thread5(ThreadFunc);
+ std::thread thread6(ThreadFunc);
+
+ // Just testing the barrier
+ g_barrier.Arrive();
+ // OK, all are done.
+
+ EXPECT_EQ_INT(g_atomicCounter, THREAD_COUNT * ITERATIONS);
+
+ thread1.join();
+ thread2.join();
+ thread3.join();
+ thread4.join();
+ thread5.join();
+ thread6.join();
+
+ printf("Stress test elapsed: %0.2f", start.Elapsed());
+
+ return true;
+}
+
bool TestThreadManager() {
ThreadManager manager;
manager.Init(8, 1);
+ g_threadMan = &manager;
+
Promise *object(Promise::Spawn(&manager, &ResultProducer, TaskType::IO_BLOCKING));
if (!TestParallelLoop(&manager)) {
return false;
}
- sleep_ms(1000);
+ sleep_ms(100);
ResultObject *result = object->BlockUntilReady();
if (result) {
@@ -78,5 +147,9 @@ bool TestThreadManager() {
return false;
}
+ if (!TestMultithreadedScheduling()) {
+ return false;
+ }
+
return true;
}
diff --git a/unittest/UnitTest.h b/unittest/UnitTest.h
index e953925d39a1..2267a7a87fc8 100644
--- a/unittest/UnitTest.h
+++ b/unittest/UnitTest.h
@@ -2,7 +2,7 @@
#define EXPECT_TRUE(a) if (!(a)) { printf("%s:%i: Test Fail\n", __FUNCTION__, __LINE__); return false; }
#define EXPECT_FALSE(a) if ((a)) { printf("%s:%i: Test Fail\n", __FUNCTION__, __LINE__); return false; }
-#define EXPECT_EQ_INT(a, b) if ((a) != (b)) { printf("%s:%i: Test Fail\n%d\nvs\n%d\n", __FUNCTION__, __LINE__, a, b); return false; }
+#define EXPECT_EQ_INT(a, b) if ((a) != (b)) { printf("%s:%i: Test Fail\n%d\nvs\n%d\n", __FUNCTION__, __LINE__, (int)(a), (int)(b)); return false; }
#define EXPECT_EQ_HEX(a, b) if ((a) != (b)) { printf("%s:%i: Test Fail\n%x\nvs\n%x\n", __FUNCTION__, __LINE__, a, b); return false; }
#define EXPECT_EQ_FLOAT(a, b) if ((a) != (b)) { printf("%s:%i: Test Fail\n%f\nvs\n%f\n", __FUNCTION__, __LINE__, a, b); return false; }
#define EXPECT_APPROX_EQ_FLOAT(a, b) if (fabsf((a)-(b))>0.00001f) { printf("%s:%i: Test Fail\n%f\nvs\n%f\n", __FUNCTION__, __LINE__, a, b); /*return false;*/ }