make httpGet non-blocking

This commit is contained in:
MihailRis 2024-11-22 07:28:18 +03:00
parent 0fec17a8b6
commit 186078a8d5
2 changed files with 128 additions and 25 deletions

View File

@ -4,6 +4,7 @@
#include <curl/curl.h>
#include <stdexcept>
#include <queue>
#include "debug/Logger.hpp"
@ -21,44 +22,127 @@ static size_t write_callback(
return size * nmemb;
}
struct Request {
std::string url;
OnResponse onResponse;
OnReject onReject;
long maxSize;
};
class CurlHttp : public Http {
CURLM* multiHandle;
CURL* curl;
size_t totalUpload = 0;
size_t totalDownload = 0;
OnResponse onResponse;
OnReject onReject;
std::vector<char> buffer;
std::string url;
std::queue<Request> requests;
public:
CurlHttp(CURL* curl) : curl(curl) {
CurlHttp(CURLM* multiHandle, CURL* curl)
: multiHandle(multiHandle), curl(curl) {
}
virtual ~CurlHttp() {
curl_easy_cleanup(curl);
curl_multi_remove_handle(multiHandle, curl);
curl_multi_cleanup(multiHandle);
}
void get(const std::string& url, OnResponse onResponse, OnReject onReject)
override {
std::vector<char> buffer;
void get(
const std::string& url,
OnResponse onResponse,
OnReject onReject,
long maxSize
) override {
Request request {url, onResponse, onReject, maxSize};
if (url.empty()) {
processRequest(request);
} else {
requests.push(request);
}
}
void processRequest(const Request& request) {
onResponse = request.onResponse;
onReject = request.onReject;
url = request.url;
buffer.clear();
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_callback);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buffer);
CURLcode res = curl_easy_perform(curl);
if (res == CURLE_OK) {
long size;
if (!curl_easy_getinfo(curl, CURLINFO_REQUEST_SIZE, &size)) {
totalUpload += size;
}
if (!curl_easy_getinfo(curl, CURLINFO_HEADER_SIZE, &size)) {
totalDownload += size;
}
totalDownload += buffer.size();
if (onResponse) {
onResponse(std::move(buffer));
}
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, true);
if (request.maxSize == 0) {
curl_easy_setopt(
curl, CURLOPT_MAXFILESIZE, std::numeric_limits<long>::max()
);
} else {
auto message = curl_easy_strerror(res);
curl_easy_setopt(curl, CURLOPT_MAXFILESIZE, request.maxSize);
}
curl_multi_add_handle(multiHandle, curl);
int running;
CURLMcode res = curl_multi_perform(multiHandle, &running);
if (res != CURLM_OK) {
auto message = curl_multi_strerror(res);
logger.error() << message << " (" << url << ")";
if (onReject) {
onReject(message);
}
url = "";
}
}
void update() override {
int messagesLeft;
int running;
CURLMsg* msg;
CURLMcode res = curl_multi_perform(multiHandle, &running);
if (res != CURLM_OK) {
auto message = curl_multi_strerror(res);
logger.error() << message << " (" << url << ")";
if (onReject) {
onReject(message);
}
curl_multi_remove_handle(multiHandle, curl);
url = "";
return;
}
if ((msg = curl_multi_info_read(multiHandle, &messagesLeft)) != NULL) {
if(msg->msg == CURLMSG_DONE) {
curl_multi_remove_handle(multiHandle, curl);
}
int response;
curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, &response);
if (response == 200) {
long size;
if (!curl_easy_getinfo(curl, CURLINFO_REQUEST_SIZE, &size)) {
totalUpload += size;
}
if (!curl_easy_getinfo(curl, CURLINFO_HEADER_SIZE, &size)) {
totalDownload += size;
}
totalDownload += buffer.size();
if (onResponse) {
onResponse(std::move(buffer));
}
} else {
logger.error() << "response code " << response << " (" << url << ")";
if (onReject) {
onReject(std::to_string(response).c_str());
}
}
url = "";
}
if (url.empty() && !requests.empty()) {
auto request = std::move(requests.front());
requests.pop();
processRequest(request);
}
}
@ -71,10 +155,16 @@ public:
}
static std::unique_ptr<CurlHttp> create() {
if (auto curl = curl_easy_init()) {
return std::make_unique<CurlHttp>(curl);
auto curl = curl_easy_init();
if (curl == nullptr) {
throw std::runtime_error("could not initialzie cURL");
}
throw std::runtime_error("could not initialzie cURL");
auto multiHandle = curl_multi_init();
if (multiHandle == nullptr) {
curl_easy_cleanup(curl);
throw std::runtime_error("could not initialzie cURL-multi");
}
return std::make_unique<CurlHttp>(multiHandle, curl);
}
};
@ -249,9 +339,12 @@ Network::Network(std::unique_ptr<Http> http, std::unique_ptr<Tcp> tcp)
Network::~Network() = default;
void Network::httpGet(
const std::string& url, OnResponse onResponse, OnReject onReject
const std::string& url,
OnResponse onResponse,
OnReject onReject,
long maxSize
) {
http->get(url, onResponse, onReject);
http->get(url, onResponse, onReject, maxSize);
}
std::shared_ptr<Socket> Network::connect(const std::string& address, int port) {
@ -276,6 +369,10 @@ size_t Network::getTotalDownload() const {
return http->getTotalDownload() + totalDownload;
}
void Network::update() {
http->update();
}
std::unique_ptr<Network> Network::create(const NetworkSettings& settings) {
auto http = CurlHttp::create();
auto tcp = std::make_unique<SocketTcp>();

View File

@ -18,10 +18,13 @@ namespace network {
virtual void get(
const std::string& url,
OnResponse onResponse,
OnReject onReject=nullptr
OnReject onReject=nullptr,
long maxSize=0
) = 0;
virtual size_t getTotalUpload() const = 0;
virtual size_t getTotalDownload() const = 0;
virtual void update() = 0;
};
class Socket {
@ -55,7 +58,8 @@ namespace network {
void httpGet(
const std::string& url,
OnResponse onResponse,
OnReject onReject = nullptr
OnReject onReject = nullptr,
long maxSize=0
);
std::shared_ptr<Socket> connect(const std::string& address, int port);
@ -63,6 +67,8 @@ namespace network {
size_t getTotalUpload() const;
size_t getTotalDownload() const;
void update();
static std::unique_ptr<Network> create(const NetworkSettings& settings);
};
}