thread pool update + refactor

This commit is contained in:
MihailRis 2024-04-11 15:48:27 +03:00
parent 1549a02731
commit 90d0b54a69
17 changed files with 271 additions and 103 deletions

View File

@ -6,6 +6,8 @@
#include <iostream> #include <iostream>
#include <memory> #include <memory>
#include "../util/ThreadPool.h"
#include "../constants.h" #include "../constants.h"
#include "../data/dynamic.h" #include "../data/dynamic.h"
#include "../debug/Logger.h" #include "../debug/Logger.h"
@ -21,37 +23,42 @@ static debug::Logger logger("assets-loader");
AssetsLoader::AssetsLoader(Assets* assets, const ResPaths* paths) AssetsLoader::AssetsLoader(Assets* assets, const ResPaths* paths)
: assets(assets), paths(paths) : assets(assets), paths(paths)
{ {
addLoader(AssetType::shader, assetload::shader); addLoader(AssetType::shader, assetload::shader);
addLoader(AssetType::texture, assetload::texture); addLoader(AssetType::texture, assetload::texture);
addLoader(AssetType::font, assetload::font); addLoader(AssetType::font, assetload::font);
addLoader(AssetType::atlas, assetload::atlas); addLoader(AssetType::atlas, assetload::atlas);
addLoader(AssetType::layout, assetload::layout); addLoader(AssetType::layout, assetload::layout);
addLoader(AssetType::sound, assetload::sound); addLoader(AssetType::sound, assetload::sound);
} }
void AssetsLoader::addLoader(AssetType tag, aloader_func func) { void AssetsLoader::addLoader(AssetType tag, aloader_func func) {
loaders[tag] = func; loaders[tag] = func;
} }
void AssetsLoader::add(AssetType tag, const std::string filename, const std::string alias, std::shared_ptr<AssetCfg> settings) { void AssetsLoader::add(AssetType tag, const std::string filename, const std::string alias, std::shared_ptr<AssetCfg> settings) {
entries.push(aloader_entry{tag, filename, alias, settings}); entries.push(aloader_entry{tag, filename, alias, settings});
} }
bool AssetsLoader::hasNext() const { bool AssetsLoader::hasNext() const {
return !entries.empty(); return !entries.empty();
}
aloader_func AssetsLoader::getLoader(AssetType tag) {
auto found = loaders.find(tag);
if (found == loaders.end()) {
throw std::runtime_error(
"unknown asset tag "+std::to_string(static_cast<int>(tag))
);
}
return found->second;
} }
bool AssetsLoader::loadNext() { bool AssetsLoader::loadNext() {
const aloader_entry& entry = entries.front(); const aloader_entry& entry = entries.front();
logger.info() << "loading " << entry.filename << " as " << entry.alias; logger.info() << "loading " << entry.filename << " as " << entry.alias;
auto found = loaders.find(entry.tag);
if (found == loaders.end()) {
logger.error() << "unknown asset tag " << static_cast<int>(entry.tag);
return false;
}
aloader_func loader = found->second;
try { try {
auto postfunc = loader(*this, assets, paths, entry.filename, entry.alias, entry.config); aloader_func loader = getLoader(entry.tag);
auto postfunc = loader(this, paths, entry.filename, entry.alias, entry.config);
postfunc(assets); postfunc(assets);
entries.pop(); entries.pop();
return true; return true;
@ -198,5 +205,37 @@ void AssetsLoader::addDefaults(AssetsLoader& loader, const Content* content) {
} }
const ResPaths* AssetsLoader::getPaths() const { const ResPaths* AssetsLoader::getPaths() const {
return paths; return paths;
}
class LoaderWorker : public util::Worker<std::shared_ptr<aloader_entry>, assetload::postfunc> {
AssetsLoader* loader;
public:
LoaderWorker(AssetsLoader* loader) : loader(loader) {
}
assetload::postfunc operator()(const std::shared_ptr<aloader_entry>& entry) override {
aloader_func loadfunc = loader->getLoader(entry->tag);
return loadfunc(loader, loader->getPaths(), entry->filename, entry->alias, entry->config);
}
};
std::shared_ptr<Task> AssetsLoader::startTask(runnable onDone) {
auto pool = std::make_shared<
util::ThreadPool<std::shared_ptr<aloader_entry>, assetload::postfunc>
>(
"assets-loader-pool",
[=](){return std::make_shared<LoaderWorker>(this);},
[=](assetload::postfunc& func) {
func(assets);
}
);
pool->setOnComplete(onDone);
while (!entries.empty()) {
const aloader_entry& entry = entries.front();
auto ptr = std::make_shared<aloader_entry>(entry);
pool->enqueueJob(ptr);
entries.pop();
}
return pool;
} }

View File

@ -2,6 +2,8 @@
#define ASSETS_ASSETS_LOADER_H #define ASSETS_ASSETS_LOADER_H
#include "Assets.h" #include "Assets.h"
#include "../interfaces/Task.h"
#include "../delegates.h"
#include <string> #include <string>
#include <memory> #include <memory>
@ -45,8 +47,7 @@ struct SoundCfg : AssetCfg {
}; };
using aloader_func = std::function<assetload::postfunc( using aloader_func = std::function<assetload::postfunc(
AssetsLoader&, AssetsLoader*,
Assets*,
const ResPaths*, const ResPaths*,
const std::string&, const std::string&,
const std::string&, const std::string&,
@ -96,7 +97,10 @@ public:
/// @param content engine content /// @param content engine content
static void addDefaults(AssetsLoader& loader, const Content* content); static void addDefaults(AssetsLoader& loader, const Content* content);
std::shared_ptr<Task> startTask(runnable onDone);
const ResPaths* getPaths() const; const ResPaths* getPaths() const;
aloader_func getLoader(AssetType tag);
}; };
#endif // ASSETS_ASSETS_LOADER_H #endif // ASSETS_ASSETS_LOADER_H

View File

@ -8,7 +8,7 @@
#include "../audio/audio.h" #include "../audio/audio.h"
#include "../files/files.h" #include "../files/files.h"
#include "../files/engine_paths.h" #include "../files/engine_paths.h"
#include "../coders/png.h" #include "../coders/imageio.h"
#include "../coders/json.h" #include "../coders/json.h"
#include "../coders/GLSLExtension.h" #include "../coders/GLSLExtension.h"
#include "../graphics/core/Shader.h" #include "../graphics/core/Shader.h"
@ -30,15 +30,14 @@ static bool animation(
); );
assetload::postfunc assetload::texture( assetload::postfunc assetload::texture(
AssetsLoader&, AssetsLoader*,
Assets* assets,
const ResPaths* paths, const ResPaths* paths,
const std::string filename, const std::string filename,
const std::string name, const std::string name,
std::shared_ptr<AssetCfg> std::shared_ptr<AssetCfg>
) { ) {
std::shared_ptr<ImageData> image ( std::shared_ptr<ImageData> image (
png::load_image(paths->find(filename+".png").u8string()) imageio::read(paths->find(filename+".png").u8string()).release()
); );
return [name, image](auto assets) { return [name, image](auto assets) {
assets->store(Texture::from(image.get()), name); assets->store(Texture::from(image.get()), name);
@ -46,8 +45,7 @@ assetload::postfunc assetload::texture(
} }
assetload::postfunc assetload::shader( assetload::postfunc assetload::shader(
AssetsLoader&, AssetsLoader*,
Assets* assets,
const ResPaths* paths, const ResPaths* paths,
const std::string filename, const std::string filename,
const std::string name, const std::string name,
@ -72,15 +70,12 @@ assetload::postfunc assetload::shader(
} }
static bool appendAtlas(AtlasBuilder& atlas, const fs::path& file) { static bool appendAtlas(AtlasBuilder& atlas, const fs::path& file) {
// png is only supported format
if (file.extension() != ".png")
return false;
std::string name = file.stem().string(); std::string name = file.stem().string();
// skip duplicates // skip duplicates
if (atlas.has(name)) { if (atlas.has(name)) {
return false; return false;
} }
std::unique_ptr<ImageData> image(png::load_image(file.string())); auto image = imageio::read(file.string());
image->fixAlphaColor(); image->fixAlphaColor();
atlas.add(name, image.release()); atlas.add(name, image.release());
@ -88,8 +83,7 @@ static bool appendAtlas(AtlasBuilder& atlas, const fs::path& file) {
} }
assetload::postfunc assetload::atlas( assetload::postfunc assetload::atlas(
AssetsLoader&, AssetsLoader*,
Assets* assets,
const ResPaths* paths, const ResPaths* paths,
const std::string directory, const std::string directory,
const std::string name, const std::string name,
@ -114,8 +108,7 @@ assetload::postfunc assetload::atlas(
} }
assetload::postfunc assetload::font( assetload::postfunc assetload::font(
AssetsLoader&, AssetsLoader*,
Assets* assets,
const ResPaths* paths, const ResPaths* paths,
const std::string filename, const std::string filename,
const std::string name, const std::string name,
@ -125,8 +118,7 @@ assetload::postfunc assetload::font(
for (size_t i = 0; i <= 4; i++) { for (size_t i = 0; i <= 4; i++) {
std::string name = filename + "_" + std::to_string(i) + ".png"; std::string name = filename + "_" + std::to_string(i) + ".png";
name = paths->find(name).string(); name = paths->find(name).string();
std::unique_ptr<ImageData> image (png::load_image(name)); pages->push_back(std::move(imageio::read(name)));
pages->push_back(std::move(image));
} }
return [=](auto assets) { return [=](auto assets) {
int res = pages->at(0)->getHeight() / 16; int res = pages->at(0)->getHeight() / 16;
@ -139,8 +131,7 @@ assetload::postfunc assetload::font(
} }
assetload::postfunc assetload::layout( assetload::postfunc assetload::layout(
AssetsLoader& loader, AssetsLoader*,
Assets* assets,
const ResPaths* paths, const ResPaths* paths,
const std::string file, const std::string file,
const std::string name, const std::string name,
@ -159,8 +150,7 @@ assetload::postfunc assetload::layout(
}; };
} }
assetload::postfunc assetload::sound( assetload::postfunc assetload::sound(
AssetsLoader& loader, AssetsLoader*,
Assets* assets,
const ResPaths* paths, const ResPaths* paths,
const std::string file, const std::string file,
const std::string name, const std::string name,

View File

@ -15,40 +15,35 @@ struct AssetCfg;
/// @brief see AssetsLoader.h: aloader_func /// @brief see AssetsLoader.h: aloader_func
namespace assetload { namespace assetload {
postfunc texture( postfunc texture(
AssetsLoader&, AssetsLoader*,
Assets*,
const ResPaths* paths, const ResPaths* paths,
const std::string filename, const std::string filename,
const std::string name, const std::string name,
std::shared_ptr<AssetCfg> settings std::shared_ptr<AssetCfg> settings
); );
postfunc shader( postfunc shader(
AssetsLoader&, AssetsLoader*,
Assets*,
const ResPaths* paths, const ResPaths* paths,
const std::string filename, const std::string filename,
const std::string name, const std::string name,
std::shared_ptr<AssetCfg> settings std::shared_ptr<AssetCfg> settings
); );
postfunc atlas( postfunc atlas(
AssetsLoader&, AssetsLoader*,
Assets*,
const ResPaths* paths, const ResPaths* paths,
const std::string directory, const std::string directory,
const std::string name, const std::string name,
std::shared_ptr<AssetCfg> settings std::shared_ptr<AssetCfg> settings
); );
postfunc font( postfunc font(
AssetsLoader&, AssetsLoader*,
Assets*,
const ResPaths* paths, const ResPaths* paths,
const std::string filename, const std::string filename,
const std::string name, const std::string name,
std::shared_ptr<AssetCfg> settings std::shared_ptr<AssetCfg> settings
); );
postfunc layout( postfunc layout(
AssetsLoader&, AssetsLoader*,
Assets*,
const ResPaths* paths, const ResPaths* paths,
const std::string file, const std::string file,
const std::string name, const std::string name,
@ -56,8 +51,7 @@ namespace assetload {
); );
postfunc sound( postfunc sound(
AssetsLoader&, AssetsLoader*,
Assets*,
const ResPaths* paths, const ResPaths* paths,
const std::string file, const std::string file,
const std::string name, const std::string name,

49
src/coders/imageio.cpp Normal file
View File

@ -0,0 +1,49 @@
#include "imageio.h"
#include "png.h"
#include "../graphics/core/ImageData.h"
#include <filesystem>
#include <functional>
#include <unordered_map>
namespace fs = std::filesystem;
using image_reader = std::function<ImageData*(const std::string&)>;
using image_writer = std::function<void(const std::string&, const ImageData*)>;
static std::unordered_map<std::string, image_reader> readers {
{".png", png::load_image},
};
static std::unordered_map<std::string, image_writer> writers {
{".png", png::write_image},
};
bool imageio::is_read_supported(const std::string& extension) {
return readers.find(extension) != readers.end();
}
bool imageio::is_write_supported(const std::string& extension) {
return writers.find(extension) != writers.end();
}
inline std::string extensionOf(const std::string& filename) {
return fs::u8path(filename).extension().u8string();
}
std::unique_ptr<ImageData> imageio::read(const std::string& filename) {
auto found = readers.find(extensionOf(filename));
if (found == readers.end()) {
throw std::runtime_error("file format is not supported (read): "+filename);
}
return std::unique_ptr<ImageData>(found->second(filename));
}
void imageio::write(const std::string& filename, const ImageData* image) {
auto found = writers.find(extensionOf(filename));
if (found == writers.end()) {
throw std::runtime_error("file format is not supported (write): "+filename);
}
return found->second(filename, image);
}

19
src/coders/imageio.h Normal file
View File

@ -0,0 +1,19 @@
#ifndef CODERS_IMAGEIO_H_
#define CODERS_IMAGEIO_H_
#include <string>
#include <memory>
class ImageData;
namespace imageio {
inline const std::string PNG = ".png";
bool is_read_supported(const std::string& extension);
bool is_write_supported(const std::string& extension);
std::unique_ptr<ImageData> read(const std::string& filename);
void write(const std::string& filename, const ImageData* image);
}
#endif // CODERS_IMAGEIO_H_

View File

@ -340,7 +340,7 @@ ImageData* _png_load(const char* file){
} }
#endif #endif
ImageData* png::load_image(std::string filename) { ImageData* png::load_image(const std::string& filename) {
ImageData* image (_png_load(filename.c_str())); ImageData* image (_png_load(filename.c_str()));
if (image == nullptr) { if (image == nullptr) {
throw std::runtime_error("could not load image "+filename); throw std::runtime_error("could not load image "+filename);
@ -348,13 +348,13 @@ ImageData* png::load_image(std::string filename) {
return image; return image;
} }
Texture* png::load_texture(std::string filename) { Texture* png::load_texture(const std::string& filename) {
std::unique_ptr<ImageData> image (load_image(filename)); std::unique_ptr<ImageData> image (load_image(filename));
auto texture = Texture::from(image.get()); auto texture = Texture::from(image.get());
texture->setNearestFilter(); texture->setNearestFilter();
return texture; return texture;
} }
void png::write_image(std::string filename, const ImageData* image) { void png::write_image(const std::string& filename, const ImageData* image) {
_png_write(filename.c_str(), image->getWidth(), image->getHeight(), (const ubyte*)image->getData(), image->getFormat() == ImageFormat::rgba8888); _png_write(filename.c_str(), image->getWidth(), image->getHeight(), (const ubyte*)image->getData(), image->getFormat() == ImageFormat::rgba8888);
} }

View File

@ -8,9 +8,9 @@ class Texture;
class ImageData; class ImageData;
namespace png { namespace png {
extern ImageData* load_image(std::string filename); extern ImageData* load_image(const std::string& filename);
extern void write_image(std::string filename, const ImageData* image); extern void write_image(const std::string& filename, const ImageData* image);
extern Texture* load_texture(std::string filename); extern Texture* load_texture(const std::string& filename);
} }
#endif /* CODERS_PNG_H_ */ #endif /* CODERS_PNG_H_ */

View File

@ -6,7 +6,7 @@
#include "audio/audio.h" #include "audio/audio.h"
#include "coders/GLSLExtension.h" #include "coders/GLSLExtension.h"
#include "coders/json.h" #include "coders/json.h"
#include "coders/png.h" #include "coders/imageio.h"
#include "content/ContentLoader.h" #include "content/ContentLoader.h"
#include "core_defs.h" #include "core_defs.h"
#include "files/files.h" #include "files/files.h"
@ -70,7 +70,6 @@ Engine::Engine(EngineSettings& settings, EnginePaths* paths)
auto resdir = paths->getResources(); auto resdir = paths->getResources();
logger.info() << "loading assets";
std::vector<fs::path> roots {resdir}; std::vector<fs::path> roots {resdir};
resPaths = std::make_unique<ResPaths>(resdir, roots); resPaths = std::make_unique<ResPaths>(resdir, roots);
try { try {
@ -115,17 +114,21 @@ void Engine::updateTimers() {
void Engine::updateHotkeys() { void Engine::updateHotkeys() {
if (Events::jpressed(keycode::F2)) { if (Events::jpressed(keycode::F2)) {
std::unique_ptr<ImageData> image(Window::takeScreenshot()); saveScreenshot();
image->flipY();
fs::path filename = paths->getScreenshotFile("png");
png::write_image(filename.string(), image.get());
std::cout << "saved screenshot as " << filename << std::endl;
} }
if (Events::jpressed(keycode::F11)) { if (Events::jpressed(keycode::F11)) {
Window::toggleFullscreen(); Window::toggleFullscreen();
} }
} }
void Engine::saveScreenshot() {
std::unique_ptr<ImageData> image(Window::takeScreenshot());
image->flipY();
fs::path filename = paths->getScreenshotFile("png");
imageio::write(filename.string(), image.get());
logger.info() << "saved screenshot as "+filename.u8string();
}
void Engine::mainloop() { void Engine::mainloop() {
logger.info() << "starting menu screen"; logger.info() << "starting menu screen";
setScreen(std::make_shared<MenuScreen>(this)); setScreen(std::make_shared<MenuScreen>(this));
@ -203,10 +206,17 @@ void Engine::loadAssets() {
auto new_assets = std::make_unique<Assets>(); auto new_assets = std::make_unique<Assets>();
AssetsLoader loader(new_assets.get(), resPaths.get()); AssetsLoader loader(new_assets.get(), resPaths.get());
AssetsLoader::addDefaults(loader, content.get()); AssetsLoader::addDefaults(loader, content.get());
while (loader.hasNext()) {
if (!loader.loadNext()) { bool threading = false;
new_assets.reset(); if (threading) {
throw std::runtime_error("could not to load assets"); auto task = loader.startTask([=](){});
task->waitForEnd();
} else {
while (loader.hasNext()) {
if (!loader.loadNext()) {
new_assets.reset();
throw std::runtime_error("could not to load assets");
}
} }
} }
if (assets) { if (assets) {
@ -216,7 +226,6 @@ void Engine::loadAssets() {
} }
} }
// TODO: refactor this
void Engine::loadContent() { void Engine::loadContent() {
auto resdir = paths->getResources(); auto resdir = paths->getResources();
ContentBuilder contentBuilder; ContentBuilder contentBuilder;
@ -263,12 +272,7 @@ void Engine::loadWorldContent(const fs::path& folder) {
} }
void Engine::loadAllPacks() { void Engine::loadAllPacks() {
PacksManager manager; PacksManager manager = createPacksManager(paths->getWorldFolder());
manager.setSources({
paths->getWorldFolder()/fs::path("content"),
paths->getUserfiles()/fs::path("content"),
paths->getResources()/fs::path("content")
});
manager.scan(); manager.scan();
auto allnames = manager.getAllNames(); auto allnames = manager.getAllNames();
contentPacks = manager.getAll(manager.assembly(allnames)); contentPacks = manager.getAll(manager.assembly(allnames));

View File

@ -123,6 +123,8 @@ public:
/// @brief Enqueue function call to the end of current frame in draw thread /// @brief Enqueue function call to the end of current frame in draw thread
void postRunnable(runnable callback); void postRunnable(runnable callback);
void saveScreenshot();
PacksManager createPacksManager(const fs::path& worldFolder); PacksManager createPacksManager(const fs::path& worldFolder);
SettingsHandler& getSettingsHandler(); SettingsHandler& getSettingsHandler();

View File

@ -92,6 +92,12 @@ void WorldConverter::write() {
wfile->write(nullptr, content); wfile->write(nullptr, content);
} }
void WorldConverter::waitForEnd() {
while (isActive()) {
update();
}
}
uint WorldConverter::getWorkRemaining() const { uint WorldConverter::getWorkRemaining() const {
return tasks.size(); return tasks.size();
} }

View File

@ -58,6 +58,11 @@ public:
tasks = {}; tasks = {};
} }
bool isActive() const override {
return !tasks.empty();
}
void waitForEnd() override;
void write(); void write();
uint getWorkRemaining() const override; uint getWorkRemaining() const override;

View File

@ -201,6 +201,7 @@ void menus::open_world(std::string name, Engine* engine, bool confirmConvert) {
try { try {
engine->loadWorldContent(folder); engine->loadWorldContent(folder);
} catch (const contentpack_error& error) { } catch (const contentpack_error& error) {
engine->setScreen(std::make_shared<MenuScreen>(engine));
// could not to find or read pack // could not to find or read pack
guiutil::alert( guiutil::alert(
engine->getGUI(), langs::get(L"error.pack-not-found")+L": "+ engine->getGUI(), langs::get(L"error.pack-not-found")+L": "+
@ -208,6 +209,7 @@ void menus::open_world(std::string name, Engine* engine, bool confirmConvert) {
); );
return; return;
} catch (const std::runtime_error& error) { } catch (const std::runtime_error& error) {
engine->setScreen(std::make_shared<MenuScreen>(engine));
guiutil::alert( guiutil::alert(
engine->getGUI(), langs::get(L"Content Error", L"menu")+L": "+ engine->getGUI(), langs::get(L"Content Error", L"menu")+L": "+
util::str2wstr_utf8(error.what()) util::str2wstr_utf8(error.what())

View File

@ -1,11 +1,12 @@
#include "menu.h" #include "menu.h"
#include "menu_commons.h" #include "menu_commons.h"
#include "../../coders/png.h" #include "../../coders/imageio.h"
#include "../../content/PacksManager.h" #include "../../content/PacksManager.h"
#include "../../content/ContentLUT.h" #include "../../content/ContentLUT.h"
#include "../../engine.h" #include "../../engine.h"
#include "../../files/WorldFiles.h" #include "../../files/WorldFiles.h"
#include "../../graphics/core/Texture.h"
#include "../../graphics/ui/gui_util.h" #include "../../graphics/ui/gui_util.h"
#include "../../logic/LevelController.h" #include "../../logic/LevelController.h"
#include "../../util/stringutil.h" #include "../../util/stringutil.h"
@ -52,7 +53,8 @@ std::shared_ptr<Container> create_pack_panel(
if (assets->getTexture(icon) == nullptr) { if (assets->getTexture(icon) == nullptr) {
auto iconfile = pack.folder/fs::path("icon.png"); auto iconfile = pack.folder/fs::path("icon.png");
if (fs::is_regular_file(iconfile)) { if (fs::is_regular_file(iconfile)) {
assets->store(png::load_texture(iconfile.string()), icon); auto image = imageio::read(iconfile.string());
assets->store(Texture::from(image.get()), icon);
} else { } else {
icon = "gui/no_icon"; icon = "gui/no_icon";
} }
@ -183,12 +185,7 @@ void create_content_panel(Engine* engine, LevelController* controller) {
auto mainPanel = menus::create_page(engine, "content", 550, 0.0f, 5); auto mainPanel = menus::create_page(engine, "content", 550, 0.0f, 5);
auto paths = engine->getPaths(); auto paths = engine->getPaths();
PacksManager manager; PacksManager manager = engine->createPacksManager(paths->getWorldFolder());
manager.setSources({
paths->getWorldFolder()/fs::path("content"),
paths->getUserfiles()/fs::path("content"),
paths->getResources()/fs::path("content")
});
manager.scan(); manager.scan();
std::vector<ContentPack> scanned = manager.getAll(manager.getAllNames()); std::vector<ContentPack> scanned = manager.getAll(manager.getAllNames());

View File

@ -46,6 +46,7 @@ ChunksRenderer::ChunksRenderer(
}) })
{ {
threadPool.setStandaloneResults(false); threadPool.setStandaloneResults(false);
threadPool.setStopOnFail(false);
renderer = std::make_unique<BlocksRenderer>( renderer = std::make_unique<BlocksRenderer>(
RENDERER_CAPACITY, level->content, cache, settings RENDERER_CAPACITY, level->content, cache, settings
); );

View File

@ -9,9 +9,11 @@ class Task {
public: public:
virtual ~Task() {} virtual ~Task() {}
virtual bool isActive() const = 0;
virtual uint getWorkRemaining() const = 0; virtual uint getWorkRemaining() const = 0;
virtual uint getWorkDone() const = 0; virtual uint getWorkDone() const = 0;
virtual void update() = 0; virtual void update() = 0;
virtual void waitForEnd() = 0;
virtual void terminate() = 0; virtual void terminate() = 0;
}; };

View File

@ -4,6 +4,7 @@
#include <queue> #include <queue>
#include <atomic> #include <atomic>
#include <thread> #include <thread>
#include <chrono>
#include <iostream> #include <iostream>
#include <functional> #include <functional>
#include <condition_variable> #include <condition_variable>
@ -14,8 +15,9 @@
namespace util { namespace util {
template<class T> template<class J, class T>
struct ThreadPoolResult { struct ThreadPoolResult {
J job;
std::condition_variable& variable; std::condition_variable& variable;
int workerIndex; int workerIndex;
bool& locked; bool& locked;
@ -34,7 +36,7 @@ template<class T, class R>
class ThreadPool : public Task { class ThreadPool : public Task {
debug::Logger logger; debug::Logger logger;
std::queue<T> jobs; std::queue<T> jobs;
std::queue<ThreadPoolResult<R>> results; std::queue<ThreadPoolResult<T, R>> results;
std::mutex resultsMutex; std::mutex resultsMutex;
std::vector<std::thread> threads; std::vector<std::thread> threads;
std::condition_variable jobsMutexCondition; std::condition_variable jobsMutexCondition;
@ -46,7 +48,9 @@ class ThreadPool : public Task {
std::atomic<int> busyWorkers = 0; std::atomic<int> busyWorkers = 0;
std::atomic<uint> jobsDone = 0; std::atomic<uint> jobsDone = 0;
bool working = true; bool working = true;
bool failed = false;
bool standaloneResults = true; bool standaloneResults = true;
bool stopOnFail = true;
void threadLoop(int index, std::shared_ptr<Worker<T, R>> worker) { void threadLoop(int index, std::shared_ptr<Worker<T, R>> worker) {
std::condition_variable variable; std::condition_variable variable;
@ -59,7 +63,7 @@ class ThreadPool : public Task {
jobsMutexCondition.wait(lock, [this] { jobsMutexCondition.wait(lock, [this] {
return !jobs.empty() || !working; return !jobs.empty() || !working;
}); });
if (!working) { if (!working || failed) {
break; break;
} }
job = jobs.front(); job = jobs.front();
@ -71,7 +75,7 @@ class ThreadPool : public Task {
R result = (*worker)(job); R result = (*worker)(job);
{ {
std::lock_guard<std::mutex> lock(resultsMutex); std::lock_guard<std::mutex> lock(resultsMutex);
results.push(ThreadPoolResult<R> {variable, index, locked, result}); results.push(ThreadPoolResult<T, R> {job, variable, index, locked, result});
if (!standaloneResults) { if (!standaloneResults) {
locked = true; locked = true;
} }
@ -88,6 +92,10 @@ class ThreadPool : public Task {
if (onJobFailed) { if (onJobFailed) {
onJobFailed(job); onJobFailed(job);
} }
if (stopOnFail) {
std::lock_guard<std::mutex> lock(jobsMutex);
failed = true;
}
logger.error() << "uncaught exception: " << err.what(); logger.error() << "uncaught exception: " << err.what();
} }
jobsDone++; jobsDone++;
@ -109,6 +117,10 @@ public:
terminate(); terminate();
} }
bool isActive() const override {
return working;
}
void terminate() override { void terminate() override {
if (!working) { if (!working) {
return; return;
@ -120,7 +132,7 @@ public:
{ {
std::lock_guard<std::mutex> lock(resultsMutex); std::lock_guard<std::mutex> lock(resultsMutex);
while (!results.empty()) { while (!results.empty()) {
ThreadPoolResult<R> entry = results.front(); ThreadPoolResult<T,R> entry = results.front();
results.pop(); results.pop();
if (!standaloneResults) { if (!standaloneResults) {
entry.locked = false; entry.locked = false;
@ -136,24 +148,54 @@ public:
} }
void update() override { void update() override {
std::lock_guard<std::mutex> lock(resultsMutex); if (!working) {
while (!results.empty()) { return;
ThreadPoolResult<R> entry = results.front(); }
results.pop(); if (failed) {
throw std::runtime_error("some job failed");
resultConsumer(entry.entry);
if (!standaloneResults) {
entry.locked = false;
entry.variable.notify_all();
}
} }
if (onComplete && busyWorkers == 0) { bool complete = false;
std::lock_guard<std::mutex> lock(jobsMutex); {
if (jobs.empty()) { std::lock_guard<std::mutex> lock(resultsMutex);
onComplete(); while (!results.empty()) {
ThreadPoolResult<T,R> entry = results.front();
results.pop();
try {
resultConsumer(entry.entry);
} catch (std::exception& err) {
logger.error() << err.what();
if (onJobFailed) {
onJobFailed(entry.job);
}
if (stopOnFail) {
std::lock_guard<std::mutex> lock(jobsMutex);
failed = true;
complete = false;
}
break;
}
if (!standaloneResults) {
entry.locked = false;
entry.variable.notify_all();
}
} }
if (onComplete && busyWorkers == 0) {
std::lock_guard<std::mutex> lock(jobsMutex);
if (jobs.empty()) {
onComplete();
complete = true;
}
}
}
if (failed) {
throw std::runtime_error("some job failed");
}
if (complete) {
terminate();
} }
} }
@ -170,6 +212,10 @@ public:
standaloneResults = flag; standaloneResults = flag;
} }
void setStopOnFail(bool flag) {
stopOnFail = flag;
}
/// @brief onJobFailed called on exception thrown in worker thread. /// @brief onJobFailed called on exception thrown in worker thread.
/// Use engine.postRunnable when calling terminate() /// Use engine.postRunnable when calling terminate()
void setOnJobFailed(consumer<T&> callback) { void setOnJobFailed(consumer<T&> callback) {
@ -189,6 +235,14 @@ public:
uint getWorkDone() const override { uint getWorkDone() const override {
return jobsDone; return jobsDone;
} }
virtual void waitForEnd() override {
using namespace std::chrono_literals;
while (working) {
std::this_thread::sleep_for(2ms);
update();
}
}
}; };
} // namespace util } // namespace util