diff --git a/src/io/deflate_istream.hpp b/src/io/deflate_istream.hpp index ecbc738a..0dd8f6de 100644 --- a/src/io/deflate_istream.hpp +++ b/src/io/deflate_istream.hpp @@ -7,95 +7,93 @@ #include #include -class deflate_istream : public std::istream { +class deflate_istreambuf : public std::streambuf { public: - explicit deflate_istream(std::unique_ptr src) - : std::istream(&buf), source(std::move(src)), buf(*source) {} + explicit deflate_istreambuf(std::istream& src) : src(src) { + zstream.zalloc = Z_NULL; + zstream.zfree = Z_NULL; + zstream.opaque = Z_NULL; + zstream.avail_in = 0; + zstream.next_in = Z_NULL; + + int ret = inflateInit2(&zstream, -15); + if (ret != Z_OK) { + throw std::runtime_error("zlib init failed"); + } + } -private: - class deflate_streambuf : public std::streambuf { - public: - explicit deflate_streambuf(std::istream& src) : src(src) { - zstream.zalloc = Z_NULL; - zstream.zfree = Z_NULL; - zstream.opaque = Z_NULL; - zstream.avail_in = 0; - zstream.next_in = Z_NULL; - - int ret = inflateInit2(&zstream, -15); - if (ret != Z_OK) { - throw std::runtime_error("zlib init failed"); + ~deflate_istreambuf() { + inflateEnd(&zstream); + } + + deflate_istreambuf(const deflate_istreambuf&) = delete; + deflate_istreambuf& operator=(const deflate_istreambuf&) = delete; + +protected: + int_type underflow() override { + if (gptr() < egptr()) { + return traits_type::to_int_type(*gptr()); + } + + if (eof) { + return traits_type::eof(); + } + + zstream.next_out = reinterpret_cast(outBuf.data()); + zstream.avail_out = outBuf.size(); + + do { + if (zstream.avail_in == 0) { + src.read(inBuf.data(), inBuf.size()); + zstream.avail_in = static_cast(src.gcount()); + zstream.next_in = reinterpret_cast(inBuf.data()); + + if (src.bad()) { + return traits_type::eof(); + } } - } - ~deflate_streambuf() { - inflateEnd(&zstream); - } + int ret = inflate(&zstream, Z_NO_FLUSH); + if (ret == Z_STREAM_END) { + eof = true; + } else if (ret != Z_OK) { + if (ret == Z_BUF_ERROR && zstream.avail_out == outBuf.size()) { + continue; + } + return traits_type::eof(); + } - deflate_streambuf(const deflate_streambuf&) = delete; - deflate_streambuf& operator=(const deflate_streambuf&) = delete; - - protected: - int_type underflow() override { - if (gptr() < egptr()) { + const auto decompressed = outBuf.size() - zstream.avail_out; + if (decompressed > 0) { + setg(outBuf.data(), + outBuf.data(), + outBuf.data() + decompressed); return traits_type::to_int_type(*gptr()); } if (eof) { return traits_type::eof(); } + } while (zstream.avail_in > 0 || !src.eof()); - zstream.next_out = reinterpret_cast(outBuf.data()); - zstream.avail_out = outBuf.size(); + return traits_type::eof(); + } +private: + static constexpr size_t BUFFER_SIZE = 16384; - do { - if (zstream.avail_in == 0) { - src.read(inBuf.data(), inBuf.size()); - zstream.avail_in = static_cast(src.gcount()); - zstream.next_in = reinterpret_cast(inBuf.data()); - - if (src.bad()) { - return traits_type::eof(); - } - } - - int ret = inflate(&zstream, Z_NO_FLUSH); - if (ret == Z_STREAM_END) { - eof = true; - } else if (ret != Z_OK) { - if (ret == Z_BUF_ERROR && zstream.avail_out == outBuf.size()) { - continue; - } - return traits_type::eof(); - } - - const auto decompressed = outBuf.size() - zstream.avail_out; - if (decompressed > 0) { - setg(outBuf.data(), - outBuf.data(), - outBuf.data() + decompressed); - return traits_type::to_int_type(*gptr()); - } - - if (eof) { - return traits_type::eof(); - } - - } while (zstream.avail_in > 0 || !src.eof()); - - return traits_type::eof(); - } - - private: - static constexpr size_t BUFFER_SIZE = 16384; - - std::istream& src; - z_stream zstream {}; - std::array inBuf {}; - std::array outBuf {}; - bool eof = false; - }; - - std::unique_ptr source; - deflate_streambuf buf; + std::istream& src; + z_stream zstream {}; + std::array inBuf {}; + std::array outBuf {}; + bool eof = false; +}; + +class deflate_istream : public std::istream { +public: + explicit deflate_istream(std::unique_ptr src) + : std::istream(&buffer), source(std::move(src)), buffer(*source) {} + +private: + std::unique_ptr source; + deflate_istreambuf buffer; }; diff --git a/src/io/deflate_ostream.hpp b/src/io/deflate_ostream.hpp new file mode 100644 index 00000000..39678843 --- /dev/null +++ b/src/io/deflate_ostream.hpp @@ -0,0 +1,118 @@ +#include +#include +#include +#include + +class deflate_ostreambuf : public std::streambuf { +public: + deflate_ostreambuf(std::ostream& dest, int level = Z_DEFAULT_COMPRESSION) + : dest(dest) { + zstream.zalloc = Z_NULL; + zstream.zfree = Z_NULL; + zstream.opaque = Z_NULL; + int ret = deflateInit2( + &zstream, level, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY + ); + if (ret != Z_OK) { + throw std::runtime_error("zlib deflate initialization failed"); + } + inBuffer = std::make_unique(bufferSize); + outBuffer = std::make_unique(outBufferSize); + + setp(inBuffer.get(), inBuffer.get() + bufferSize - 1); + } + + ~deflate_ostreambuf() { + try { + close(); + } catch (...) { + std::cerr << "error in zlib output stream finalization" << std::endl; + } + } + + bool close() { + overflow(EOF); + + // Finalize the deflate stream + zstream.avail_in = 0; + zstream.next_in = nullptr; + int ret; + do { + zstream.avail_out = outBufferSize; + zstream.next_out = reinterpret_cast(outBuffer.get()); + ret = deflate(&zstream, Z_FINISH); + if (ret == Z_STREAM_ERROR) { + break; + } + size_t compressed_size = outBufferSize - zstream.avail_out; + dest.write(outBuffer.get(), compressed_size); + } while (ret != Z_STREAM_END); + + deflateEnd(&zstream); + return true; + } + +protected: + int overflow(int c) override { + if (c != EOF) { + *pptr() = static_cast(c); + pbump(1); + } + + if (process_input() == EOF) { + return EOF; + } + + return c != EOF ? 0 : EOF; + } + + int sync() override { + if (process_input(Z_SYNC_FLUSH) == EOF) { + return -1; + } + dest.flush(); + return 0; + } + +private: + static const size_t bufferSize = 512; + static const size_t outBufferSize = bufferSize * 2; + + std::ostream& dest; + z_stream zstream {}; + std::unique_ptr inBuffer; + std::unique_ptr outBuffer; + + int process_input(int flush = Z_NO_FLUSH) { + size_t input_size = pptr() - pbase(); + zstream.avail_in = static_cast(input_size); + zstream.next_in = reinterpret_cast(pbase()); + + int ret; + do { + zstream.avail_out = outBufferSize; + zstream.next_out = reinterpret_cast(outBuffer.get()); + ret = deflate(&zstream, flush); + if (ret == Z_STREAM_ERROR) { + return EOF; + } + size_t compressed_size = outBufferSize - zstream.avail_out; + dest.write(outBuffer.get(), compressed_size); + if (!dest) { + return EOF; + } + } while (zstream.avail_out == 0); + + setp(inBuffer.get(), inBuffer.get() + bufferSize - 1); + return 0; + } +}; + +class deflate_ostream : public std::ostream { +public: + explicit deflate_ostream(std::ostream& dest, int level = Z_DEFAULT_COMPRESSION) + : std::ostream(&buffer), buffer(dest, level) {} + +private: + deflate_ostreambuf buffer; +}; diff --git a/src/io/memory_istream.hpp b/src/io/memory_istream.hpp index 09f6f418..0b2131d6 100644 --- a/src/io/memory_istream.hpp +++ b/src/io/memory_istream.hpp @@ -3,32 +3,32 @@ #include #include "util/Buffer.hpp" +class memory_streambuf : public std::streambuf { +public: + explicit memory_streambuf(util::Buffer buffer) + : buffer(std::move(buffer)) { + char* base = this->buffer.data(); + char* end = base + this->buffer.size(); + setg(base, base, end); + } + + memory_streambuf(const memory_streambuf&) = delete; + memory_streambuf& operator=(const memory_streambuf&) = delete; + +protected: + int_type underflow() override { + return traits_type::eof(); + } + +private: + util::Buffer buffer; +}; + class memory_istream : public std::istream { public: explicit memory_istream(util::Buffer buffer) : std::istream(&buf), buf(std::move(buffer)) {} private: - class memory_streambuf : public std::streambuf { - public: - explicit memory_streambuf(util::Buffer buffer) - : buffer(std::move(buffer)) { - char* base = this->buffer.data(); - char* end = base + this->buffer.size(); - setg(base, base, end); - } - - memory_streambuf(const memory_streambuf&) = delete; - memory_streambuf& operator=(const memory_streambuf&) = delete; - - protected: - int_type underflow() override { - return traits_type::eof(); - } - - private: - util::Buffer buffer; - }; - memory_streambuf buf; }; diff --git a/src/io/memory_ostream.hpp b/src/io/memory_ostream.hpp new file mode 100644 index 00000000..76ad8623 --- /dev/null +++ b/src/io/memory_ostream.hpp @@ -0,0 +1,105 @@ +#include +#include +#include +#include +#include + +#include "util/Buffer.hpp" + +class memory_buffer : public std::streambuf { +public: + explicit memory_buffer(size_t initial_size = 64) + : capacity(initial_size), + buffer(std::make_unique(initial_size)) { + setp(buffer.get(), buffer.get() + initial_size); + } + + std::string_view view() const { + return std::string_view(pbase(), pptr() - pbase()); + } + + util::Buffer release() { + return {std::move(buffer), size()}; + } + + size_t size() const { + return pptr()-pbase(); + } +protected: + int_type overflow(int_type c) override { + if (c == traits_type::eof()) + return traits_type::eof(); + + const size_t data_size = pptr() - pbase(); + const size_t new_capacity = std::max(capacity * 2, data_size + 1); + auto new_buffer = std::make_unique(new_capacity); + + std::memcpy(new_buffer.get(), pbase(), data_size); + + buffer = std::move(new_buffer); + capacity = new_capacity; + + setp(buffer.get(), buffer.get() + new_capacity); + pbump(data_size); + + *pptr() = traits_type::to_char_type(c); + pbump(1); + + return c; + } + + std::streamsize xsputn(const char* s, std::streamsize count) override { + const std::streamsize avail = epptr() - pptr(); + + if (avail >= count) { + std::memcpy(pptr(), s, count); + pbump(count); + return count; + } + + std::streamsize written = 0; + if (avail > 0) { + std::memcpy(pptr(), s, avail); + written += avail; + s += avail; + count -= avail; + pbump(avail); + } + + const size_t data_size = pptr() - pbase(); + const size_t required_capacity = data_size + count; + const size_t new_capacity = std::max(capacity * 2, required_capacity); + auto new_buffer = std::make_unique(new_capacity); + + std::memcpy(new_buffer.get(), pbase(), data_size); + std::memcpy(new_buffer.get() + data_size, s, count); + + buffer = std::move(new_buffer); + capacity = new_capacity; + + setp(buffer.get(), buffer.get() + new_capacity); + pbump(data_size + count); + written += count; + + return written; + } +private: + std::unique_ptr buffer; + size_t capacity; +}; + +class memory_ostream : public std::ostream { +public: + explicit memory_ostream(size_t initialCapacity = 64) + : std::ostream(&buffer), buffer(initialCapacity) {} + + std::string_view view() const { + return buffer.view(); + } + + util::Buffer release() { + return buffer.release(); + } +private: + memory_buffer buffer; +};