Merge pull request #605 from MihailRis/fix-cannot-resume-dead-coroutine-again-http-edition

Fix: "cannot resume dead coroutine" again (http edition)
This commit is contained in:
MihailRis 2025-09-10 12:22:02 +03:00 committed by GitHub
commit 11d634d6ea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 249 additions and 170 deletions

View File

@ -72,6 +72,38 @@ local _tcp_client_callbacks = {}
local _udp_server_callbacks = {} local _udp_server_callbacks = {}
local _udp_client_datagram_callbacks = {} local _udp_client_datagram_callbacks = {}
local _udp_client_open_callbacks = {} local _udp_client_open_callbacks = {}
local _http_response_callbacks = {}
local _http_error_callbacks = {}
network.get = function(url, callback, errorCallback, headers)
local id = network.__get(url, headers)
if callback then
_http_response_callbacks[id] = callback
end
if errorCallback then
_http_error_callbacks[id] = errorCallback
end
end
network.get_binary = function(url, callback, errorCallback, headers)
local id = network.__get_binary(url, headers)
if callback then
_http_response_callbacks[id] = callback
end
if errorCallback then
_http_error_callbacks[id] = errorCallback
end
end
network.post = function(url, data, callback, errorCallback, headers)
local id = network.__post(url, data, headers)
if callback then
_http_response_callbacks[id] = callback
end
if errorCallback then
_http_error_callbacks[id] = errorCallback
end
end
network.tcp_open = function (port, handler) network.tcp_open = function (port, handler)
local socket = setmetatable({id=network.__open_tcp(port)}, ServerSocket) local socket = setmetatable({id=network.__open_tcp(port)}, ServerSocket)
@ -133,6 +165,7 @@ network.__process_events = function()
local CLIENT_CONNECTED = 1 local CLIENT_CONNECTED = 1
local CONNECTED_TO_SERVER = 2 local CONNECTED_TO_SERVER = 2
local DATAGRAM = 3 local DATAGRAM = 3
local RESPONSE = 4
local ON_SERVER = 1 local ON_SERVER = 1
local ON_CLIENT = 2 local ON_CLIENT = 2
@ -158,6 +191,22 @@ network.__process_events = function()
elseif side == ON_SERVER then elseif side == ON_SERVER then
_udp_server_callbacks[sid](addr, port, data) _udp_server_callbacks[sid](addr, port, data)
end end
elseif etype == RESPONSE then
if event[2] / 100 == 2 then
local callback = _http_response_callbacks[event[3]]
_http_response_callbacks[event[3]] = nil
_http_error_callbacks[event[3]] = nil
if callback then
callback(event[4])
end
else
local callback = _http_error_callbacks[event[3]]
_http_response_callbacks[event[3]] = nil
_http_error_callbacks[event[3]] = nil
if callback then
callback(event[2])
end
end
end end
-- remove dead servers -- remove dead servers

View File

@ -1,12 +1,72 @@
#include <utility>
#include "api_lua.hpp" #include "api_lua.hpp"
#include "coders/json.hpp" #include "coders/json.hpp"
#include "engine/Engine.hpp" #include "engine/Engine.hpp"
#include "network/Network.hpp" #include "network/Network.hpp"
#include <variant>
#include <utility>
using namespace scripting; using namespace scripting;
enum NetworkEventType {
CLIENT_CONNECTED = 1,
CONNECTED_TO_SERVER,
DATAGRAM,
RESPONSE,
};
struct ConnectionEventDto {
u64id_t server;
u64id_t client;
};
struct ResponseEventDto {
int status;
bool binary;
int requestId;
std::vector<char> bytes;
};
enum NetworkDatagramSide {
ON_SERVER = 1,
ON_CLIENT
};
struct NetworkDatagramEventDto {
NetworkDatagramSide side;
u64id_t server;
u64id_t client;
std::string addr;
int port;
std::vector<char> buffer;
};
struct NetworkEvent {
using Payload = std::variant<
ConnectionEventDto,
ResponseEventDto,
NetworkDatagramEventDto
>;
NetworkEventType type;
Payload payload;
NetworkEvent(
NetworkEventType type,
Payload payload
) : type(type), payload(std::move(payload)) {}
virtual ~NetworkEvent() = default;
};
static std::vector<NetworkEvent> events_queue {};
static std::mutex events_queue_mutex;
static void push_event(NetworkEvent&& event) {
std::lock_guard lock(events_queue_mutex);
events_queue.push_back(std::move(event));
}
static std::vector<std::string> read_headers(lua::State* L, int index) { static std::vector<std::string> read_headers(lua::State* L, int index) {
std::vector<std::string> headers; std::vector<std::string> headers;
if (lua::istable(L, index)) { if (lua::istable(L, index)) {
@ -20,76 +80,50 @@ static std::vector<std::string> read_headers(lua::State* L, int index) {
return headers; return headers;
} }
static int l_get(lua::State* L, network::Network& network) { static int request_id = 1;
static int perform_get(lua::State* L, network::Network& network, bool binary) {
std::string url(lua::require_lstring(L, 1)); std::string url(lua::require_lstring(L, 1));
auto headers = read_headers(L, 2);
lua::pushvalue(L, 2); int currentRequestId = request_id;
auto onResponse = lua::create_lambda_nothrow(L);
network::OnReject onReject = nullptr; network.get(url, [currentRequestId, binary](std::vector<char> bytes) {
if (!lua::isnoneornil(L, 3)) { push_event(NetworkEvent(
lua::pushvalue(L, 3); RESPONSE,
auto callback = lua::create_lambda_nothrow(L); ResponseEventDto {
onReject = [callback](int code) { 200,
callback({code}); binary,
}; currentRequestId,
} std::move(bytes)
}
));
}, [currentRequestId](int code) {
push_event(NetworkEvent(
RESPONSE,
ResponseEventDto {
code,
false,
currentRequestId,
{}
}
));
}, std::move(headers));
return lua::pushinteger(L, request_id++);
}
auto headers = read_headers(L, 4); static int l_get(lua::State* L, network::Network& network) {
return perform_get(L, network, false);
network.get(url, [onResponse](std::vector<char> bytes) {
engine->postRunnable([=]() {
onResponse({std::string(bytes.data(), bytes.size())});
});
}, std::move(onReject), std::move(headers));
return 0;
} }
static int l_get_binary(lua::State* L, network::Network& network) { static int l_get_binary(lua::State* L, network::Network& network) {
std::string url(lua::require_lstring(L, 1)); return perform_get(L, network, true);
lua::pushvalue(L, 2);
auto onResponse = lua::create_lambda_nothrow(L);
network::OnReject onReject = nullptr;
if (!lua::isnoneornil(L, 3)) {
lua::pushvalue(L, 3);
auto callback = lua::create_lambda_nothrow(L);
onReject = [callback](int code) {
callback({code});
};
}
auto headers = read_headers(L, 4);
network.get(url, [onResponse](std::vector<char> bytes) {
auto buffer = std::make_shared<util::Buffer<ubyte>>(
reinterpret_cast<const ubyte*>(bytes.data()), bytes.size()
);
engine->postRunnable([=]() {
onResponse({buffer});
});
}, std::move(onReject), std::move(headers));
return 0;
} }
static int l_post(lua::State* L, network::Network& network) { static int l_post(lua::State* L, network::Network& network) {
std::string url(lua::require_lstring(L, 1)); std::string url(lua::require_lstring(L, 1));
auto data = lua::tovalue(L, 2); auto data = lua::tovalue(L, 2);
lua::pushvalue(L, 3);
auto onResponse = lua::create_lambda_nothrow(L);
network::OnReject onReject = nullptr;
if (!lua::isnoneornil(L, 4)) {
lua::pushvalue(L, 4);
auto callback = lua::create_lambda_nothrow(L);
onReject = [callback](int code) {
callback({code});
};
}
std::string string; std::string string;
if (data.isString()) { if (data.isString()) {
string = data.asString(); string = data.asString();
@ -97,16 +131,32 @@ static int l_post(lua::State* L, network::Network& network) {
string = json::stringify(data, false); string = json::stringify(data, false);
} }
auto headers = read_headers(L, 5); auto headers = read_headers(L, 3);
int currentRequestId = request_id;
engine->getNetwork().post(url, string, [onResponse](std::vector<char> bytes) { engine->getNetwork().post(
auto buffer = std::make_shared<util::Buffer<ubyte>>( url,
reinterpret_cast<const ubyte*>(bytes.data()), bytes.size() string,
); [currentRequestId](std::vector<char> bytes) {
engine->postRunnable([=]() { auto buffer = std::make_shared<util::Buffer<ubyte>>(
onResponse({std::string(bytes.data(), bytes.size())}); reinterpret_cast<const ubyte*>(bytes.data()), bytes.size()
}); );
}, std::move(onReject), std::move(headers)); push_event(NetworkEvent(
RESPONSE,
ResponseEventDto {
200,
false,
currentRequestId,
std::vector<char>(buffer->begin(), buffer->end())}
));
},
[currentRequestId](int code) {
push_event(NetworkEvent(
RESPONSE, ResponseEventDto {code, false, currentRequestId, {}}
));
},
std::move(headers)
);
return 0; return 0;
} }
@ -232,68 +282,14 @@ static int l_available(lua::State* L, network::Network& network) {
return 0; return 0;
} }
enum NetworkEventType {
CLIENT_CONNECTED = 1,
CONNECTED_TO_SERVER,
DATAGRAM
};
struct NetworkEvent {
NetworkEventType type;
u64id_t server;
u64id_t client;
NetworkEvent(
NetworkEventType type,
u64id_t server,
u64id_t client
) {
this->type = type;
this->server = server;
this->client = client;
}
virtual ~NetworkEvent() = default;
};
enum NetworkDatagramSide {
ON_SERVER = 1,
ON_CLIENT
};
struct NetworkDatagramEvent : NetworkEvent {
NetworkDatagramSide side;
std::string addr;
int port;
std::vector<char> buffer;
NetworkDatagramEvent(
NetworkEventType datagram,
u64id_t sid,
u64id_t cid,
NetworkDatagramSide side,
const std::string& addr,
int port,
std::vector<char> buffer
) : NetworkEvent(DATAGRAM, sid, cid) {
this->side = side;
this->addr = addr;
this->port = port;
this->buffer = std::move(buffer);
}
};
static std::vector<std::unique_ptr<NetworkEvent>> events_queue {};
static std::mutex events_queue_mutex;
static int l_connect_tcp(lua::State* L, network::Network& network) { static int l_connect_tcp(lua::State* L, network::Network& network) {
std::string address = lua::require_string(L, 1); std::string address = lua::require_string(L, 1);
int port = lua::tointeger(L, 2); int port = lua::tointeger(L, 2);
u64id_t id = network.connectTcp(address, port, [](u64id_t cid) { u64id_t id = network.connectTcp(address, port, [](u64id_t cid) {
std::lock_guard lock(events_queue_mutex); push_event(NetworkEvent(
CONNECTED_TO_SERVER,
events_queue.push_back(std::make_unique<NetworkEvent>(CONNECTED_TO_SERVER, 0, cid)); ConnectionEventDto {0, cid}
));
}); });
return lua::pushinteger(L, id); return lua::pushinteger(L, id);
} }
@ -301,9 +297,10 @@ static int l_connect_tcp(lua::State* L, network::Network& network) {
static int l_open_tcp(lua::State* L, network::Network& network) { static int l_open_tcp(lua::State* L, network::Network& network) {
int port = lua::tointeger(L, 1); int port = lua::tointeger(L, 1);
u64id_t id = network.openTcpServer(port, [](u64id_t sid, u64id_t id) { u64id_t id = network.openTcpServer(port, [](u64id_t sid, u64id_t id) {
std::lock_guard lock(events_queue_mutex); push_event(NetworkEvent(
CLIENT_CONNECTED,
events_queue.push_back(std::make_unique<NetworkEvent>(CLIENT_CONNECTED, sid, id)); ConnectionEventDto {sid, id}
));
}); });
return lua::pushinteger(L, id); return lua::pushinteger(L, id);
} }
@ -312,22 +309,22 @@ static int l_connect_udp(lua::State* L, network::Network& network) {
std::string address = lua::require_string(L, 1); std::string address = lua::require_string(L, 1);
int port = lua::tointeger(L, 2); int port = lua::tointeger(L, 2);
u64id_t id = network.connectUdp(address, port, [](u64id_t cid) { u64id_t id = network.connectUdp(address, port, [](u64id_t cid) {
std::lock_guard lock(events_queue_mutex); push_event(NetworkEvent(
CONNECTED_TO_SERVER,
events_queue.push_back(std::make_unique<NetworkEvent>(CONNECTED_TO_SERVER, 0, cid)); ConnectionEventDto {0, cid}
));
}, [address, port]( }, [address, port](
u64id_t cid, u64id_t cid,
const char* buffer, const char* buffer,
size_t length size_t length
) { ) {
std::lock_guard lock(events_queue_mutex); push_event(NetworkEvent(
DATAGRAM,
events_queue.push_back( NetworkDatagramEventDto {
std::make_unique<NetworkDatagramEvent>( ON_CLIENT, 0, cid,
DATAGRAM, 0, cid, ON_CLIENT, address, port, std::vector<char>(buffer, buffer + length)
address, port, std::vector(buffer, buffer + length) }
) ));
);
}); });
return lua::pushinteger(L, id); return lua::pushinteger(L, id);
} }
@ -340,12 +337,13 @@ static int l_open_udp(lua::State* L, network::Network& network) {
int port, int port,
const char* buffer, const char* buffer,
size_t length) { size_t length) {
std::lock_guard lock(events_queue_mutex); push_event(
NetworkEvent(
events_queue.push_back( DATAGRAM,
std::make_unique<NetworkDatagramEvent>( NetworkDatagramEventDto {
DATAGRAM, sid, 0, ON_SERVER, ON_SERVER, sid, 0,
addr, port, std::vector(buffer, buffer + length) addr, port, std::vector<char>(buffer, buffer + length)
}
) )
); );
}); });
@ -412,8 +410,7 @@ static int l_get_total_download(lua::State* L, network::Network& network) {
} }
static int l_pull_events(lua::State* L, network::Network& network) { static int l_pull_events(lua::State* L, network::Network& network) {
std::vector<std::unique_ptr<NetworkEvent>> local_queue; std::vector<NetworkEvent> local_queue;
{ {
std::lock_guard lock(events_queue_mutex); std::lock_guard lock(events_queue_mutex);
local_queue.swap(events_queue); local_queue.swap(events_queue);
@ -422,36 +419,69 @@ static int l_pull_events(lua::State* L, network::Network& network) {
lua::createtable(L, local_queue.size(), 0); lua::createtable(L, local_queue.size(), 0);
for (size_t i = 0; i < local_queue.size(); i++) { for (size_t i = 0; i < local_queue.size(); i++) {
const auto* datagramEvent = dynamic_cast<NetworkDatagramEvent*>(local_queue[i].get()); lua::createtable(L, 7, 0);
lua::createtable(L, datagramEvent ? 7 : 3, 0); const auto& event = local_queue[i];
switch (event.type) {
case CLIENT_CONNECTED:
case CONNECTED_TO_SERVER: {
const auto& dto = std::get<ConnectionEventDto>(event.payload);
lua::pushinteger(L, event.type);
lua::rawseti(L, 1);
lua::pushinteger(L, local_queue[i]->type); lua::pushinteger(L, dto.server);
lua::rawseti(L, 1); lua::rawseti(L, 2);
lua::pushinteger(L, local_queue[i]->server); lua::pushinteger(L, dto.client);
lua::rawseti(L, 2); lua::rawseti(L, 3);
break;
}
case DATAGRAM: {
const auto& dto = std::get<NetworkDatagramEventDto>(event.payload);
lua::pushinteger(L, event.type);
lua::rawseti(L, 1);
lua::pushinteger(L, local_queue[i]->client); lua::pushinteger(L, dto.server);
lua::rawseti(L, 3); lua::rawseti(L, 2);
if (datagramEvent) { lua::pushinteger(L, dto.client);
lua::pushstring(L, datagramEvent->addr); lua::rawseti(L, 3);
lua::rawseti(L, 4);
lua::pushinteger(L, datagramEvent->port); lua::pushstring(L, dto.addr);
lua::rawseti(L, 5); lua::rawseti(L, 4);
lua::pushinteger(L, datagramEvent->side); lua::pushinteger(L, dto.port);
lua::rawseti(L, 6); lua::rawseti(L, 5);
lua::create_bytearray(L, datagramEvent->buffer.data(), datagramEvent->buffer.size()); lua::pushinteger(L, dto.side);
lua::rawseti(L, 7); lua::rawseti(L, 6);
lua::create_bytearray(L, dto.buffer.data(), dto.buffer.size());
lua::rawseti(L, 7);
break;
}
case RESPONSE: {
const auto& dto = std::get<ResponseEventDto>(event.payload);
lua::pushinteger(L, event.type);
lua::rawseti(L, 1);
lua::pushinteger(L, dto.status);
lua::rawseti(L, 2);
lua::pushinteger(L, dto.requestId);
lua::rawseti(L, 3);
if (dto.binary) {
lua::create_bytearray(L, dto.bytes.data(), dto.bytes.size());
} else {
lua::pushlstring(L, std::string_view(dto.bytes.data(), dto.bytes.size()));
}
lua::rawseti(L, 4);
break;
}
} }
lua::rawseti(L, i + 1); lua::rawseti(L, i + 1);
} }
return 1; return 1;
} }
@ -473,9 +503,9 @@ int wrap(lua_State* L) {
} }
const luaL_Reg networklib[] = { const luaL_Reg networklib[] = {
{"get", wrap<l_get>}, {"__get", wrap<l_get>},
{"get_binary", wrap<l_get_binary>}, {"__get_binary", wrap<l_get_binary>},
{"post", wrap<l_post>}, {"__post", wrap<l_post>},
{"get_total_upload", wrap<l_get_total_upload>}, {"get_total_upload", wrap<l_get_total_upload>},
{"get_total_download", wrap<l_get_total_download>}, {"get_total_download", wrap<l_get_total_download>},
{"__pull_events", wrap<l_pull_events>}, {"__pull_events", wrap<l_pull_events>},