From dd7ed38c90f05547d698a7984e71c32dfe5fff2a Mon Sep 17 00:00:00 2001 From: Ali MJ Al-Nasrawy Date: Mon, 9 Mar 2020 21:15:09 +0300 Subject: [PATCH 1/2] ChunkedDecodingStreamFilter: be aware of segments See GH-1582 Report the correct bytesProcessed_ taking into account getDelagate()->getBytesProcessed() Also, rewrite to avoid using goto or modifying for-loop variables. --- src/ChunkedDecodingStreamFilter.cc | 43 +++++++++++++++++------------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/src/ChunkedDecodingStreamFilter.cc b/src/ChunkedDecodingStreamFilter.cc index 4e3a0e59..f619f380 100644 --- a/src/ChunkedDecodingStreamFilter.cc +++ b/src/ChunkedDecodingStreamFilter.cc @@ -84,10 +84,31 @@ ChunkedDecodingStreamFilter::transform(const std::shared_ptr& out, const unsigned char* inbuf, size_t inlen) { ssize_t outlen = 0; - size_t i; bytesProcessed_ = 0; - for (i = 0; i < inlen; ++i) { - unsigned char c = inbuf[i]; + while (bytesProcessed_ < inlen) { + if (state_ == CHUNKS_COMPLETE) { + break; + } + if (state_ == CHUNK) { + int64_t readlen = std::min(chunkRemaining_, + static_cast(inlen - bytesProcessed_)); + outlen += getDelegate()->transform(out, segment, inbuf + bytesProcessed_, + readlen); + int64_t processedlen = getDelegate()->getBytesProcessed(); + bytesProcessed_ += processedlen; + chunkRemaining_ -= processedlen; + if (chunkRemaining_ == 0) { + state_ = PREV_CHUNK_CR; + } + if (processedlen < readlen) { + // segment download finished + break; + } + continue; + } + // The following states consume single char + unsigned char c = inbuf[bytesProcessed_]; + bytesProcessed_++; switch (state_) { case PREV_CHUNK_SIZE: if (util::isHexDigit(c)) { @@ -136,17 +157,6 @@ ChunkedDecodingStreamFilter::transform(const std::shared_ptr& out, "missing LF at the end of chunk size"); } break; - case CHUNK: { - int64_t readlen = - std::min(chunkRemaining_, static_cast(inlen - i)); - outlen += getDelegate()->transform(out, segment, inbuf + i, readlen); - chunkRemaining_ -= readlen; - i += readlen - 1; - if (chunkRemaining_ == 0) { - state_ = PREV_CHUNK_CR; - } - break; - } case PREV_CHUNK_CR: if (c == '\r') { state_ = PREV_CHUNK_LF; @@ -203,15 +213,12 @@ ChunkedDecodingStreamFilter::transform(const std::shared_ptr& out, "missing LF at the end of chunks"); } break; - case CHUNKS_COMPLETE: - goto fin; default: // unreachable assert(0); } } -fin: - bytesProcessed_ += i; + return outlen; } From 6d5ab2f124605c43bc868042c9bc4c2dbd4833b3 Mon Sep 17 00:00:00 2001 From: Ali MJ Al-Nasrawy Date: Mon, 9 Mar 2020 21:32:14 +0300 Subject: [PATCH 2/2] GZipDecodingStreamFilter: be aware of segments See GH-1582 Report the correct bytesProcessed_ taking into account getDelegate()->getBytesProcessed(). This makes it necessary to use outbuf_ to store residual data not processed by the downstream filter. --- src/GZipDecodingStreamFilter.cc | 52 ++++++++++++++++++++------------- src/GZipDecodingStreamFilter.h | 5 +++- 2 files changed, 36 insertions(+), 21 deletions(-) diff --git a/src/GZipDecodingStreamFilter.cc b/src/GZipDecodingStreamFilter.cc index d24acc3e..c98847bf 100644 --- a/src/GZipDecodingStreamFilter.cc +++ b/src/GZipDecodingStreamFilter.cc @@ -47,6 +47,7 @@ GZipDecodingStreamFilter::GZipDecodingStreamFilter( std::unique_ptr delegate) : StreamFilter{std::move(delegate)}, strm_{nullptr}, + outbuf_(), finished_{false}, bytesProcessed_{0} { @@ -57,6 +58,8 @@ GZipDecodingStreamFilter::~GZipDecodingStreamFilter() { release(); } void GZipDecodingStreamFilter::init() { finished_ = false; + outbuf_.reserve(OUTBUF_CAPACITY); + outbuf_.resize(0); release(); strm_ = new z_stream(); strm_->zalloc = Z_NULL; @@ -87,42 +90,51 @@ GZipDecodingStreamFilter::transform(const std::shared_ptr& out, { bytesProcessed_ = 0; ssize_t outlen = 0; - if (inlen == 0) { - return outlen; - } strm_->avail_in = inlen; strm_->next_in = const_cast(inbuf); - unsigned char outbuf[OUTBUF_LENGTH]; - while (1) { - strm_->avail_out = OUTBUF_LENGTH; - strm_->next_out = outbuf; + while (bytesProcessed_ < inlen) { + // inflate into outbuf_, if empty! + if (outbuf_.empty()) { + outbuf_.resize(OUTBUF_CAPACITY); + strm_->avail_out = outbuf_.size(); + strm_->next_out = outbuf_.data(); - int ret = ::inflate(strm_, Z_NO_FLUSH); + int ret = ::inflate(strm_, Z_NO_FLUSH); + if (ret == Z_STREAM_END) { + finished_ = true; + } + else if (ret != Z_OK && ret != Z_BUF_ERROR) { + throw DL_ABORT_EX(fmt("libz::inflate() failed. cause:%s", strm_->msg)); + } - if (ret == Z_STREAM_END) { - finished_ = true; - } - else if (ret != Z_OK && ret != Z_BUF_ERROR) { - throw DL_ABORT_EX(fmt("libz::inflate() failed. cause:%s", strm_->msg)); + assert(inlen >= strm_->avail_in); + bytesProcessed_ = strm_->next_in - inbuf; + outbuf_.resize(strm_->next_out - outbuf_.data()); + if (outbuf_.empty()) + break; } - size_t produced = OUTBUF_LENGTH - strm_->avail_out; - - outlen += getDelegate()->transform(out, segment, outbuf, produced); - if (strm_->avail_out > 0) { + // flush outbuf_ + outlen += getDelegate()->transform(out, segment, outbuf_.data(), + outbuf_.size()); + size_t processedlen = getDelegate()->getBytesProcessed(); + if (processedlen == outbuf_.size()) { + outbuf_.clear(); + } + else { + // segment download finished + outbuf_.erase(outbuf_.begin(), outbuf_.begin() + processedlen); break; } } - assert(inlen >= strm_->avail_in); - bytesProcessed_ = inlen - strm_->avail_in; return outlen; } bool GZipDecodingStreamFilter::finished() { - return finished_ && getDelegate()->finished(); + return finished_ && outbuf_.empty() && getDelegate()->finished(); } const std::string& GZipDecodingStreamFilter::getName() const { return NAME; } diff --git a/src/GZipDecodingStreamFilter.h b/src/GZipDecodingStreamFilter.h index d9c3c22e..49a2a7f0 100644 --- a/src/GZipDecodingStreamFilter.h +++ b/src/GZipDecodingStreamFilter.h @@ -37,6 +37,7 @@ #include "StreamFilter.h" #include +#include #include "a2functional.h" @@ -47,11 +48,13 @@ class GZipDecodingStreamFilter : public StreamFilter { private: z_stream* strm_; + std::vector outbuf_; + bool finished_; size_t bytesProcessed_; - static const size_t OUTBUF_LENGTH = 16_k; + static const size_t OUTBUF_CAPACITY = 16_k; public: GZipDecodingStreamFilter(std::unique_ptr delegate = nullptr);