pull/1583/merge
Ali MJ Al-Nasrawy 2024-08-24 10:39:27 -07:00 committed by GitHub
commit cb4f777cb5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 61 additions and 39 deletions

View File

@ -84,10 +84,31 @@ ChunkedDecodingStreamFilter::transform(const std::shared_ptr<BinaryStream>& out,
const unsigned char* inbuf, size_t inlen)
{
ssize_t outlen = 0;
size_t i;
bytesProcessed_ = 0;
for (i = 0; i < inlen; ++i) {
unsigned char c = inbuf[i];
while (bytesProcessed_ < inlen) {
if (state_ == CHUNKS_COMPLETE) {
break;
}
if (state_ == CHUNK) {
int64_t readlen = std::min(chunkRemaining_,
static_cast<int64_t>(inlen - bytesProcessed_));
outlen += getDelegate()->transform(out, segment, inbuf + bytesProcessed_,
readlen);
int64_t processedlen = getDelegate()->getBytesProcessed();
bytesProcessed_ += processedlen;
chunkRemaining_ -= processedlen;
if (chunkRemaining_ == 0) {
state_ = PREV_CHUNK_CR;
}
if (processedlen < readlen) {
// segment download finished
break;
}
continue;
}
// The following states consume single char
unsigned char c = inbuf[bytesProcessed_];
bytesProcessed_++;
switch (state_) {
case PREV_CHUNK_SIZE:
if (util::isHexDigit(c)) {
@ -136,17 +157,6 @@ ChunkedDecodingStreamFilter::transform(const std::shared_ptr<BinaryStream>& out,
"missing LF at the end of chunk size");
}
break;
case CHUNK: {
int64_t readlen =
std::min(chunkRemaining_, static_cast<int64_t>(inlen - i));
outlen += getDelegate()->transform(out, segment, inbuf + i, readlen);
chunkRemaining_ -= readlen;
i += readlen - 1;
if (chunkRemaining_ == 0) {
state_ = PREV_CHUNK_CR;
}
break;
}
case PREV_CHUNK_CR:
if (c == '\r') {
state_ = PREV_CHUNK_LF;
@ -203,15 +213,12 @@ ChunkedDecodingStreamFilter::transform(const std::shared_ptr<BinaryStream>& out,
"missing LF at the end of chunks");
}
break;
case CHUNKS_COMPLETE:
goto fin;
default:
// unreachable
assert(0);
}
}
fin:
bytesProcessed_ += i;
return outlen;
}

View File

@ -47,6 +47,7 @@ GZipDecodingStreamFilter::GZipDecodingStreamFilter(
std::unique_ptr<StreamFilter> delegate)
: StreamFilter{std::move(delegate)},
strm_{nullptr},
outbuf_(),
finished_{false},
bytesProcessed_{0}
{
@ -57,6 +58,8 @@ GZipDecodingStreamFilter::~GZipDecodingStreamFilter() { release(); }
void GZipDecodingStreamFilter::init()
{
finished_ = false;
outbuf_.reserve(OUTBUF_CAPACITY);
outbuf_.resize(0);
release();
strm_ = new z_stream();
strm_->zalloc = Z_NULL;
@ -87,42 +90,51 @@ GZipDecodingStreamFilter::transform(const std::shared_ptr<BinaryStream>& out,
{
bytesProcessed_ = 0;
ssize_t outlen = 0;
if (inlen == 0) {
return outlen;
}
strm_->avail_in = inlen;
strm_->next_in = const_cast<unsigned char*>(inbuf);
unsigned char outbuf[OUTBUF_LENGTH];
while (1) {
strm_->avail_out = OUTBUF_LENGTH;
strm_->next_out = outbuf;
while (bytesProcessed_ < inlen) {
// inflate into outbuf_, if empty!
if (outbuf_.empty()) {
outbuf_.resize(OUTBUF_CAPACITY);
strm_->avail_out = outbuf_.size();
strm_->next_out = outbuf_.data();
int ret = ::inflate(strm_, Z_NO_FLUSH);
int ret = ::inflate(strm_, Z_NO_FLUSH);
if (ret == Z_STREAM_END) {
finished_ = true;
}
else if (ret != Z_OK && ret != Z_BUF_ERROR) {
throw DL_ABORT_EX(fmt("libz::inflate() failed. cause:%s", strm_->msg));
}
if (ret == Z_STREAM_END) {
finished_ = true;
}
else if (ret != Z_OK && ret != Z_BUF_ERROR) {
throw DL_ABORT_EX(fmt("libz::inflate() failed. cause:%s", strm_->msg));
assert(inlen >= strm_->avail_in);
bytesProcessed_ = strm_->next_in - inbuf;
outbuf_.resize(strm_->next_out - outbuf_.data());
if (outbuf_.empty())
break;
}
size_t produced = OUTBUF_LENGTH - strm_->avail_out;
outlen += getDelegate()->transform(out, segment, outbuf, produced);
if (strm_->avail_out > 0) {
// flush outbuf_
outlen += getDelegate()->transform(out, segment, outbuf_.data(),
outbuf_.size());
size_t processedlen = getDelegate()->getBytesProcessed();
if (processedlen == outbuf_.size()) {
outbuf_.clear();
}
else {
// segment download finished
outbuf_.erase(outbuf_.begin(), outbuf_.begin() + processedlen);
break;
}
}
assert(inlen >= strm_->avail_in);
bytesProcessed_ = inlen - strm_->avail_in;
return outlen;
}
bool GZipDecodingStreamFilter::finished()
{
return finished_ && getDelegate()->finished();
return finished_ && outbuf_.empty() && getDelegate()->finished();
}
const std::string& GZipDecodingStreamFilter::getName() const { return NAME; }

View File

@ -37,6 +37,7 @@
#include "StreamFilter.h"
#include <zlib.h>
#include <vector>
#include "a2functional.h"
@ -47,11 +48,13 @@ class GZipDecodingStreamFilter : public StreamFilter {
private:
z_stream* strm_;
std::vector<unsigned char> outbuf_;
bool finished_;
size_t bytesProcessed_;
static const size_t OUTBUF_LENGTH = 16_k;
static const size_t OUTBUF_CAPACITY = 16_k;
public:
GZipDecodingStreamFilter(std::unique_ptr<StreamFilter> delegate = nullptr);