GZipDecodingStreamFilter: be aware of segments

See GH-1582
Report the correct bytesProcessed_ taking into account
getDelegate()->getBytesProcessed().
This makes it necessary to use outbuf_ to store residual data not
processed by the downstream filter.
pull/1583/head
Ali MJ Al-Nasrawy 2020-03-09 21:32:14 +03:00
parent dd7ed38c90
commit 6d5ab2f124
2 changed files with 36 additions and 21 deletions

View File

@ -47,6 +47,7 @@ GZipDecodingStreamFilter::GZipDecodingStreamFilter(
std::unique_ptr<StreamFilter> delegate) std::unique_ptr<StreamFilter> delegate)
: StreamFilter{std::move(delegate)}, : StreamFilter{std::move(delegate)},
strm_{nullptr}, strm_{nullptr},
outbuf_(),
finished_{false}, finished_{false},
bytesProcessed_{0} bytesProcessed_{0}
{ {
@ -57,6 +58,8 @@ GZipDecodingStreamFilter::~GZipDecodingStreamFilter() { release(); }
void GZipDecodingStreamFilter::init() void GZipDecodingStreamFilter::init()
{ {
finished_ = false; finished_ = false;
outbuf_.reserve(OUTBUF_CAPACITY);
outbuf_.resize(0);
release(); release();
strm_ = new z_stream(); strm_ = new z_stream();
strm_->zalloc = Z_NULL; strm_->zalloc = Z_NULL;
@ -87,42 +90,51 @@ GZipDecodingStreamFilter::transform(const std::shared_ptr<BinaryStream>& out,
{ {
bytesProcessed_ = 0; bytesProcessed_ = 0;
ssize_t outlen = 0; ssize_t outlen = 0;
if (inlen == 0) {
return outlen;
}
strm_->avail_in = inlen; strm_->avail_in = inlen;
strm_->next_in = const_cast<unsigned char*>(inbuf); strm_->next_in = const_cast<unsigned char*>(inbuf);
unsigned char outbuf[OUTBUF_LENGTH]; while (bytesProcessed_ < inlen) {
while (1) { // inflate into outbuf_, if empty!
strm_->avail_out = OUTBUF_LENGTH; if (outbuf_.empty()) {
strm_->next_out = outbuf; outbuf_.resize(OUTBUF_CAPACITY);
strm_->avail_out = outbuf_.size();
strm_->next_out = outbuf_.data();
int ret = ::inflate(strm_, Z_NO_FLUSH); int ret = ::inflate(strm_, Z_NO_FLUSH);
if (ret == Z_STREAM_END) {
finished_ = true;
}
else if (ret != Z_OK && ret != Z_BUF_ERROR) {
throw DL_ABORT_EX(fmt("libz::inflate() failed. cause:%s", strm_->msg));
}
if (ret == Z_STREAM_END) { assert(inlen >= strm_->avail_in);
finished_ = true; bytesProcessed_ = strm_->next_in - inbuf;
} outbuf_.resize(strm_->next_out - outbuf_.data());
else if (ret != Z_OK && ret != Z_BUF_ERROR) { if (outbuf_.empty())
throw DL_ABORT_EX(fmt("libz::inflate() failed. cause:%s", strm_->msg)); break;
} }
size_t produced = OUTBUF_LENGTH - strm_->avail_out; // flush outbuf_
outlen += getDelegate()->transform(out, segment, outbuf_.data(),
outlen += getDelegate()->transform(out, segment, outbuf, produced); outbuf_.size());
if (strm_->avail_out > 0) { size_t processedlen = getDelegate()->getBytesProcessed();
if (processedlen == outbuf_.size()) {
outbuf_.clear();
}
else {
// segment download finished
outbuf_.erase(outbuf_.begin(), outbuf_.begin() + processedlen);
break; break;
} }
} }
assert(inlen >= strm_->avail_in);
bytesProcessed_ = inlen - strm_->avail_in;
return outlen; return outlen;
} }
bool GZipDecodingStreamFilter::finished() bool GZipDecodingStreamFilter::finished()
{ {
return finished_ && getDelegate()->finished(); return finished_ && outbuf_.empty() && getDelegate()->finished();
} }
const std::string& GZipDecodingStreamFilter::getName() const { return NAME; } const std::string& GZipDecodingStreamFilter::getName() const { return NAME; }

View File

@ -37,6 +37,7 @@
#include "StreamFilter.h" #include "StreamFilter.h"
#include <zlib.h> #include <zlib.h>
#include <vector>
#include "a2functional.h" #include "a2functional.h"
@ -47,11 +48,13 @@ class GZipDecodingStreamFilter : public StreamFilter {
private: private:
z_stream* strm_; z_stream* strm_;
std::vector<unsigned char> outbuf_;
bool finished_; bool finished_;
size_t bytesProcessed_; size_t bytesProcessed_;
static const size_t OUTBUF_LENGTH = 16_k; static const size_t OUTBUF_CAPACITY = 16_k;
public: public:
GZipDecodingStreamFilter(std::unique_ptr<StreamFilter> delegate = nullptr); GZipDecodingStreamFilter(std::unique_ptr<StreamFilter> delegate = nullptr);