ThreadPool onJobFailed onComplete callbacks

This commit is contained in:
MihailRis 2024-04-05 12:38:03 +03:00
parent dc660edc74
commit 84c7c16860
3 changed files with 57 additions and 17 deletions

View File

@ -9,7 +9,7 @@ using namespace debug;
std::ofstream Logger::file;
std::mutex Logger::mutex;
std::string Logger::utcOffset = "";
unsigned Logger::moduleLen = 16;
unsigned Logger::moduleLen = 20;
LogMessage::~LogMessage() {
logger->log(level, ss.str());

View File

@ -38,9 +38,10 @@ ChunksRenderer::ChunksRenderer(
const EngineSettings& settings
) : level(level),
threadPool(
"chunks-render-pool",
[=](){return std::make_shared<RendererWorker>(level, cache, settings);},
[=](RendererResult& mesh){
meshes[mesh.key] = std::shared_ptr<Mesh>(mesh.renderer->createMesh());
meshes[mesh.key].reset(mesh.renderer->createMesh());
inwork.erase(mesh.key);
})
{

View File

@ -2,11 +2,14 @@
#define UTIL_THREAD_POOL_H_
#include <queue>
#include <atomic>
#include <thread>
#include <iostream>
#include <functional>
#include <condition_variable>
#include "../delegates.h"
#include "../debug/Logger.h"
namespace util {
@ -28,15 +31,19 @@ public:
template<class T, class R>
class ThreadPool {
debug::Logger logger;
std::queue<T> jobs;
std::queue<ThreadPoolResult<R>> results;
std::mutex resultsMutex;
std::vector<std::thread> threads;
std::condition_variable jobsMutexCondition;
std::mutex jobsMutex;
bool working = true;
std::vector<std::unique_lock<std::mutex>> workersBlocked;
consumer<R&> resultConsumer;
std::mutex resultsMutex;
consumer<T&> onJobFailed = nullptr;
runnable onComplete = nullptr;
std::atomic<int> busyWorkers = 0;
bool working = true;
void threadLoop(int index, std::shared_ptr<Worker<T, R>> worker) {
std::condition_variable variable;
@ -54,27 +61,37 @@ class ThreadPool {
}
job = jobs.front();
jobs.pop();
busyWorkers++;
}
R result = (*worker)(job);
{
resultsMutex.lock();
results.push(ThreadPoolResult<R> {variable, index, locked, result});
locked = true;
resultsMutex.unlock();
}
{
std::unique_lock<std::mutex> lock(mutex);
variable.wait(lock, [&] {
return !working || !locked;
});
try {
R result = (*worker)(job);
{
std::lock_guard<std::mutex> lock(resultsMutex);
results.push(ThreadPoolResult<R> {variable, index, locked, result});
locked = true;
busyWorkers--;
}
{
std::unique_lock<std::mutex> lock(mutex);
variable.wait(lock, [&] {
return !working || !locked;
});
}
} catch (std::exception& err) {
if (onJobFailed) {
onJobFailed(job);
}
logger.error() << "uncaught exception: " << err.what();
}
}
}
public:
ThreadPool(
std::string name,
supplier<std::shared_ptr<Worker<T, R>>> workersSupplier,
consumer<R&> resultConsumer
) : resultConsumer(resultConsumer) {
) : logger(name), resultConsumer(resultConsumer) {
const uint num_threads = std::thread::hardware_concurrency();
for (uint i = 0; i < num_threads; i++) {
threads.emplace_back(&ThreadPool<T,R>::threadLoop, this, i, workersSupplier());
@ -82,6 +99,13 @@ public:
}
}
~ThreadPool(){
terminate();
}
void terminate() {
if (!working) {
return;
}
{
std::lock_guard<std::mutex> lock(jobsMutex);
working = false;
@ -113,6 +137,13 @@ public:
entry.locked = false;
entry.variable.notify_all();
}
if (onComplete && busyWorkers == 0) {
std::lock_guard<std::mutex> lock(jobsMutex);
if (jobs.empty()) {
onComplete();
}
}
}
void enqueueJob(T job) {
@ -122,6 +153,14 @@ public:
}
jobsMutexCondition.notify_one();
}
void setOnJobFailed(consumer<T&> callback) {
this->onJobFailed = callback;
}
void setOnComplete(runnable callback) {
this->onComplete = callback;
}
};
} // namespace util