ThreadPool test

This commit is contained in:
MihailRis 2024-04-04 21:32:38 +03:00
parent 40c3ce0cb1
commit 781ab0e892
3 changed files with 177 additions and 122 deletions

View File

@ -12,89 +12,47 @@
static debug::Logger logger("chunks-render");
ChunksRenderer::ChunksRenderer(Level* level, const ContentGfxCache* cache, const EngineSettings& settings)
: level(level), cache(cache), settings(settings) {
const int MAX_FULL_CUBES = 3000;
renderer = std::make_unique<BlocksRenderer>(
9 * 6 * 6 * MAX_FULL_CUBES, level->content, cache, settings
);
const uint RENDERER_CAPACITY = 9 * 6 * 6 * 3000;
const uint num_threads = std::thread::hardware_concurrency();
for (uint i = 0; i < num_threads; i++) {
threads.emplace_back(&ChunksRenderer::threadLoop, this, i);
workersBlocked.emplace_back();
class RendererWorker : public util::Worker<std::shared_ptr<Chunk>, RendererResult> {
Level* level;
std::shared_ptr<BlocksRenderer> renderer;
public:
RendererWorker(
Level* level,
const ContentGfxCache* cache,
const EngineSettings& settings
) : level(level)
{
renderer = std::make_shared<BlocksRenderer>(
RENDERER_CAPACITY, level->content, cache, settings
);
}
logger.info() << "created " << num_threads << " rendering threads";
RendererResult operator()(const std::shared_ptr<Chunk>& chunk) override {
renderer->build(chunk.get(), level->chunksStorage.get());
return RendererResult {glm::ivec2(chunk->x, chunk->z), renderer};
}
};
ChunksRenderer::ChunksRenderer(
Level* level,
const ContentGfxCache* cache,
const EngineSettings& settings
) : level(level),
threadPool(
[=](){return std::make_shared<RendererWorker>(level, cache, settings);},
[=](RendererResult& mesh){
meshes[mesh.key] = std::shared_ptr<Mesh>(mesh.renderer->createMesh());
inwork.erase(mesh.key);
})
{
renderer = std::make_unique<BlocksRenderer>(
RENDERER_CAPACITY, level->content, cache, settings
);
}
ChunksRenderer::~ChunksRenderer() {
{
std::unique_lock<std::mutex> lock(jobsMutex);
working = false;
}
resultsMutex.lock();
while (!results.empty()) {
Result entry = results.front();
results.pop();
entry.locked = false;
entry.variable.notify_all();
}
resultsMutex.unlock();
jobsMutexCondition.notify_all();
for (auto& thread : threads) {
thread.join();
}
}
void ChunksRenderer::threadLoop(int index) {
const int MAX_FULL_CUBES = 3000;
BlocksRenderer renderer(
9 * 6 * 6 * MAX_FULL_CUBES, level->content, cache, settings
);
std::condition_variable variable;
std::mutex mutex;
bool locked = false;
while (working) {
std::shared_ptr<Chunk> chunk;
{
std::unique_lock<std::mutex> lock(jobsMutex);
jobsMutexCondition.wait(lock, [this] {
return !jobs.empty() || !working;
});
if (!working) {
break;
}
chunk = jobs.front();
jobs.pop();
}
process(chunk, renderer);
{
resultsMutex.lock();
results.push(Result {variable, index, locked, {renderer, glm::ivec2(chunk->x, chunk->z)}});
locked = true;
resultsMutex.unlock();
}
{
std::unique_lock<std::mutex> lock(mutex);
variable.wait(lock, [&] {
return !working || !locked;
});
}
}
}
void ChunksRenderer::process(std::shared_ptr<Chunk> chunk, BlocksRenderer& renderer) {
renderer.build(chunk.get(), level->chunksStorage.get());
}
void ChunksRenderer::enqueueJob(std::shared_ptr<Chunk> job) {
jobsMutex.lock();
jobs.push(job);
jobsMutex.unlock();
jobsMutexCondition.notify_one();
}
std::shared_ptr<Mesh> ChunksRenderer::render(std::shared_ptr<Chunk> chunk, bool important) {
@ -113,7 +71,7 @@ std::shared_ptr<Mesh> ChunksRenderer::render(std::shared_ptr<Chunk> chunk, bool
}
inwork[key] = true;
enqueueJob(chunk);
threadPool.enqueueJob(chunk);
return nullptr;
}
@ -144,15 +102,5 @@ std::shared_ptr<Mesh> ChunksRenderer::get(Chunk* chunk) {
}
void ChunksRenderer::update() {
resultsMutex.lock();
while (!results.empty()) {
Result entry = results.front();
mesh_entry mesh = entry.entry;
results.pop();
meshes[mesh.key] = std::shared_ptr<Mesh>(mesh.renderer.createMesh());
inwork.erase(mesh.key);
entry.locked = false;
entry.variable.notify_all();
}
resultsMutex.unlock();
threadPool.update();
}

View File

@ -2,16 +2,14 @@
#define GRAPHICS_RENDER_CHUNKSRENDERER_H_
#include <queue>
#include <mutex>
#include <thread>
#include <memory>
#include <vector>
#include <unordered_map>
#include <glm/glm.hpp>
#include <condition_variable>
#include "../../voxels/Block.h"
#include "../../voxels/ChunksStorage.h"
#include "../../util/ThreadPool.h"
#include "../../settings.h"
class Mesh;
@ -20,44 +18,24 @@ class Level;
class BlocksRenderer;
class ContentGfxCache;
struct mesh_entry {
BlocksRenderer& renderer;
struct RendererResult {
glm::ivec2 key;
};
struct Result {
std::condition_variable& variable;
int workerIndex;
bool& locked;
mesh_entry entry;
std::shared_ptr<BlocksRenderer> renderer;
};
class ChunksRenderer {
std::unique_ptr<BlocksRenderer> renderer;
Level* level;
std::unique_ptr<BlocksRenderer> renderer;
std::unordered_map<glm::ivec2, std::shared_ptr<Mesh>> meshes;
std::unordered_map<glm::ivec2, bool> inwork;
std::vector<std::thread> threads;
std::queue<Result> results;
std::mutex resultsMutex;
std::queue<std::shared_ptr<Chunk>> jobs;
std::condition_variable jobsMutexCondition;
std::mutex jobsMutex;
bool working = true;
const ContentGfxCache* cache;
const EngineSettings& settings;
std::vector<std::unique_lock<std::mutex>> workersBlocked;
void enqueueJob(std::shared_ptr<Chunk> chunk);
void threadLoop(int index);
void process(std::shared_ptr<Chunk> chunk, BlocksRenderer& renderer);
util::ThreadPool<std::shared_ptr<Chunk>, RendererResult> threadPool;
public:
ChunksRenderer(Level* level,
const ContentGfxCache* cache,
const EngineSettings& settings);
ChunksRenderer(
Level* level,
const ContentGfxCache* cache,
const EngineSettings& settings
);
virtual ~ChunksRenderer();
std::shared_ptr<Mesh> render(std::shared_ptr<Chunk> chunk, bool important);

129
src/util/ThreadPool.h Normal file
View File

@ -0,0 +1,129 @@
#ifndef UTIL_THREAD_POOL_H_
#define UTIL_THREAD_POOL_H_
#include <queue>
#include <thread>
#include <functional>
#include <condition_variable>
#include "../delegates.h"
namespace util {
template<class T>
struct ThreadPoolResult {
std::condition_variable& variable;
int workerIndex;
bool& locked;
T entry;
};
template<class T, class R>
class Worker {
public:
Worker() {}
virtual ~Worker() {}
virtual R operator()(const T&) = 0;
};
template<class T, class R>
class ThreadPool {
std::queue<T> jobs;
std::queue<ThreadPoolResult<R>> results;
std::vector<std::thread> threads;
std::condition_variable jobsMutexCondition;
std::mutex jobsMutex;
bool working = true;
std::vector<std::unique_lock<std::mutex>> workersBlocked;
consumer<R&> resultConsumer;
std::mutex resultsMutex;
void threadLoop(int index, std::shared_ptr<Worker<T, R>> worker) {
std::condition_variable variable;
std::mutex mutex;
bool locked = false;
while (working) {
T job;
{
std::unique_lock<std::mutex> lock(jobsMutex);
jobsMutexCondition.wait(lock, [this] {
return !jobs.empty() || !working;
});
if (!working) {
break;
}
job = jobs.front();
jobs.pop();
}
R result = (*worker)(job);
{
resultsMutex.lock();
results.push(ThreadPoolResult<R> {variable, index, locked, result});
locked = true;
resultsMutex.unlock();
}
{
std::unique_lock<std::mutex> lock(mutex);
variable.wait(lock, [&] {
return !working || !locked;
});
}
}
}
public:
ThreadPool(
supplier<std::shared_ptr<Worker<T, R>>> workersSupplier,
consumer<R&> resultConsumer
) : resultConsumer(resultConsumer) {
const uint num_threads = std::thread::hardware_concurrency();
for (uint i = 0; i < num_threads; i++) {
threads.emplace_back(&ThreadPool<T,R>::threadLoop, this, i, workersSupplier());
workersBlocked.emplace_back();
}
}
~ThreadPool(){
{
std::lock_guard<std::mutex> lock(jobsMutex);
working = false;
}
{
std::lock_guard<std::mutex> lock(resultsMutex);
while (!results.empty()) {
ThreadPoolResult<R> entry = results.front();
results.pop();
entry.locked = false;
entry.variable.notify_all();
}
}
jobsMutexCondition.notify_all();
for (auto& thread : threads) {
thread.join();
}
}
void update() {
std::lock_guard<std::mutex> lock(resultsMutex);
while (!results.empty()) {
ThreadPoolResult<R> entry = results.front();
results.pop();
resultConsumer(entry.entry);
entry.locked = false;
entry.variable.notify_all();
}
}
void enqueueJob(T job) {
{
std::lock_guard<std::mutex> lock(jobsMutex);
jobs.push(job);
}
jobsMutexCondition.notify_one();
}
};
} // namespace util
#endif // UTIL_THREAD_POOL_H_