From efbfe4c006cfb6e0bf3ee4ed57d0657c0cd85c19 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Mon, 6 Sep 2010 14:29:36 +0000 Subject: [PATCH] 2010-09-06 Tatsuhiro Tsujikawa Data from remote server in HTTP/FTP download are now written to the disk(or memory) through StreamFilter. Decoding chunked and gziped streams are done cascading StreamFilter. Removed inefficient 1byte read code. * src/ChunkedDecodingStreamFilter.cc * src/ChunkedDecodingStreamFilter.h * src/DownloadCommand.cc * src/DownloadCommand.h * src/GZipDecodingStreamFilter.cc * src/GZipDecodingStreamFilter.h * src/HttpConnection.cc * src/HttpDownloadCommand.cc * src/HttpResponse.cc * src/HttpResponse.h * src/HttpResponseCommand.cc * src/HttpResponseCommand.h * src/HttpSkipResponseCommand.cc * src/HttpSkipResponseCommand.h * src/Makefile.am * src/NullSinkStreamFilter.cc * src/NullSinkStreamFilter.h * src/RequestGroup.cc * src/SinkStreamFilter.cc * src/SinkStreamFilter.h * src/StreamFilter.cc * src/StreamFilter.h * test/ChunkedDecodingStreamFilterTest.cc * test/GZipDecodingStreamFilterTest.cc * test/HttpResponseTest.cc * test/Makefile.am * test/MockSegment.h --- ChangeLog | 34 ++++ src/ChunkedDecodingStreamFilter.cc | 223 ++++++++++++++++++++++ src/ChunkedDecodingStreamFilter.h | 194 +++++++++++++++++++ src/DownloadCommand.cc | 148 ++++++--------- src/DownloadCommand.h | 19 +- src/GZipDecodingStreamFilter.cc | 131 +++++++++++++ src/GZipDecodingStreamFilter.h | 81 ++++++++ src/HttpConnection.cc | 2 +- src/HttpDownloadCommand.cc | 24 ++- src/HttpResponse.cc | 25 +-- src/HttpResponse.h | 6 +- src/HttpResponseCommand.cc | 120 +++++++----- src/HttpResponseCommand.h | 10 +- src/HttpSkipResponseCommand.cc | 45 +++-- src/HttpSkipResponseCommand.h | 8 +- src/Makefile.am | 7 +- src/Makefile.in | 42 +++-- src/NullSinkStreamFilter.cc | 41 ++++ src/NullSinkStreamFilter.h | 86 +++++++++ src/RequestGroup.cc | 4 +- src/SinkStreamFilter.cc | 65 +++++++ src/SinkStreamFilter.h | 84 +++++++++ src/StreamFilter.cc | 53 ++++++ src/StreamFilter.h | 84 +++++++++ test/ChunkedDecodingStreamFilterTest.cc | 240 ++++++++++++++++++++++++ test/GZipDecodingStreamFilterTest.cc | 81 ++++++++ test/HttpResponseTest.cc | 41 ++-- test/Makefile.am | 4 +- test/Makefile.in | 16 +- test/MockSegment.h | 80 ++++++++ 30 files changed, 1767 insertions(+), 231 deletions(-) create mode 100644 src/ChunkedDecodingStreamFilter.cc create mode 100644 src/ChunkedDecodingStreamFilter.h create mode 100644 src/GZipDecodingStreamFilter.cc create mode 100644 src/GZipDecodingStreamFilter.h create mode 100644 src/NullSinkStreamFilter.cc create mode 100644 src/NullSinkStreamFilter.h create mode 100644 src/SinkStreamFilter.cc create mode 100644 src/SinkStreamFilter.h create mode 100644 src/StreamFilter.cc create mode 100644 src/StreamFilter.h create mode 100644 test/ChunkedDecodingStreamFilterTest.cc create mode 100644 test/GZipDecodingStreamFilterTest.cc create mode 100644 test/MockSegment.h diff --git a/ChangeLog b/ChangeLog index 14f5c51f..9487490c 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,37 @@ +2010-09-06 Tatsuhiro Tsujikawa + + Data from remote server in HTTP/FTP download are now written to + the disk(or memory) through StreamFilter. Decoding chunked and + gziped streams are done cascading StreamFilter. + Removed inefficient 1byte read code. + * src/ChunkedDecodingStreamFilter.cc + * src/ChunkedDecodingStreamFilter.h + * src/DownloadCommand.cc + * src/DownloadCommand.h + * src/GZipDecodingStreamFilter.cc + * src/GZipDecodingStreamFilter.h + * src/HttpConnection.cc + * src/HttpDownloadCommand.cc + * src/HttpResponse.cc + * src/HttpResponse.h + * src/HttpResponseCommand.cc + * src/HttpResponseCommand.h + * src/HttpSkipResponseCommand.cc + * src/HttpSkipResponseCommand.h + * src/Makefile.am + * src/NullSinkStreamFilter.cc + * src/NullSinkStreamFilter.h + * src/RequestGroup.cc + * src/SinkStreamFilter.cc + * src/SinkStreamFilter.h + * src/StreamFilter.cc + * src/StreamFilter.h + * test/ChunkedDecodingStreamFilterTest.cc + * test/GZipDecodingStreamFilterTest.cc + * test/HttpResponseTest.cc + * test/Makefile.am + * test/MockSegment.h + 2010-09-01 Tatsuhiro Tsujikawa Release 1.10.2 diff --git a/src/ChunkedDecodingStreamFilter.cc b/src/ChunkedDecodingStreamFilter.cc new file mode 100644 index 00000000..87084707 --- /dev/null +++ b/src/ChunkedDecodingStreamFilter.cc @@ -0,0 +1,223 @@ +/* */ +#include "ChunkedDecodingStreamFilter.h" + +#include + +#include "util.h" +#include "message.h" +#include "DlAbortEx.h" +#include "StringFormat.h" +#include "A2STR.h" + +namespace aria2 { + +const std::string ChunkedDecodingStreamFilter::NAME +("ChunkedDecodingStreamFilter"); + +size_t ChunkedDecodingStreamFilter::MAX_BUF_SIZE = 1024*1024; + +ChunkedDecodingStreamFilter::ReadChunkSizeStateHandler* +ChunkedDecodingStreamFilter::readChunkSizeStateHandler_ = + new ChunkedDecodingStreamFilter::ReadChunkSizeStateHandler(); + +ChunkedDecodingStreamFilter::ReadTrailerStateHandler* +ChunkedDecodingStreamFilter::readTrailerStateHandler_ = + new ChunkedDecodingStreamFilter::ReadTrailerStateHandler(); + +ChunkedDecodingStreamFilter::ReadDataStateHandler* +ChunkedDecodingStreamFilter::readDataStateHandler_ = + new ChunkedDecodingStreamFilter::ReadDataStateHandler(); + +ChunkedDecodingStreamFilter::ReadDataEndStateHandler* +ChunkedDecodingStreamFilter::readDataEndStateHandler_ = + new ChunkedDecodingStreamFilter::ReadDataEndStateHandler(); + +ChunkedDecodingStreamFilter::StreamEndStatehandler* +ChunkedDecodingStreamFilter::streamEndStateHandler_ = + new ChunkedDecodingStreamFilter::StreamEndStatehandler(); + +ChunkedDecodingStreamFilter::ChunkedDecodingStreamFilter +(const SharedHandle& delegate): + StreamFilter(delegate), + state_(readChunkSizeStateHandler_), + chunkSize_(0), + bytesProcessed_(0) {} + +ChunkedDecodingStreamFilter::~ChunkedDecodingStreamFilter() {} + +void ChunkedDecodingStreamFilter::init() {} + +bool ChunkedDecodingStreamFilter::readChunkSize +(size_t& inbufOffset, const unsigned char* inbuf, size_t inlen) +{ + size_t pbufSize = buf_.size(); + buf_.append(&inbuf[inbufOffset], &inbuf[inlen]); + std::string::size_type crlfPos = buf_.find(A2STR::CRLF); + if(crlfPos == std::string::npos) { + if(buf_.size() > MAX_BUF_SIZE) { + throw DL_ABORT_EX("Could not find chunk size before buffer got full."); + } + inbufOffset = inlen; + return false; + } + std::string::size_type extPos = buf_.find(A2STR::SEMICOLON_C); + if(extPos == std::string::npos || crlfPos < extPos) { + extPos = crlfPos; + } + chunkSize_ = util::parseULLInt(buf_.substr(0, extPos), 16); + assert(crlfPos+2 > pbufSize); + inbufOffset += crlfPos+2-pbufSize; + buf_.clear(); + if(chunkSize_ == 0) { + state_ = readTrailerStateHandler_; + } else { + state_ = readDataStateHandler_; + } + return true; +} + +bool ChunkedDecodingStreamFilter::readTrailer +(size_t& inbufOffset, const unsigned char* inbuf, size_t inlen) +{ + size_t pbufSize = buf_.size(); + buf_.append(&inbuf[inbufOffset], &inbuf[inlen]); + std::string::size_type crlfcrlfPos = buf_.find("\r\n\r\n"); + if(crlfcrlfPos != std::string::npos) { + // TODO crlfcrlfPos == 0 case? + inbufOffset += crlfcrlfPos+4-pbufSize; + inbufOffset = inlen; + buf_.clear(); + state_ = streamEndStateHandler_; + return true; + } else { + std::string::size_type crlfPos = buf_.find(A2STR::CRLF); + if(crlfPos == std::string::npos) { + if(buf_.size() > MAX_BUF_SIZE) { + throw DL_ABORT_EX + ("Could not find end of stream before buffer got full."); + } + inbufOffset = inlen; + return false; + } else if(crlfPos == 0) { + inbufOffset += crlfPos+2-pbufSize; + buf_.clear(); + state_ = streamEndStateHandler_; + return true; + } else { + if(buf_.size() > MAX_BUF_SIZE) { + throw DL_ABORT_EX + ("Could not find end of stream before buffer got full."); + } + inbufOffset = inlen; + return false; + } + } +} + +bool ChunkedDecodingStreamFilter::readData +(ssize_t& outlen, + size_t& inbufOffset, + const unsigned char* inbuf, + size_t inlen, + const SharedHandle& out, + const SharedHandle& segment) +{ + uint64_t readlen = + std::min(chunkSize_, static_cast(inlen-inbufOffset)); + outlen += getDelegate()->transform(out, segment, inbuf+inbufOffset, readlen); + chunkSize_ -= readlen; + inbufOffset += readlen; + if(chunkSize_ == 0) { + state_ = readDataEndStateHandler_; + return true; + } else { + return false; + } +} + +bool ChunkedDecodingStreamFilter::readDataEnd +(size_t& inbufOffset, const unsigned char* inbuf, size_t inlen) +{ + size_t pbufSize = buf_.size(); + buf_.append(&inbuf[inbufOffset], &inbuf[inlen]); + if(buf_.size() >= 2) { + if(util::startsWith(buf_, A2STR::CRLF)) { + inbufOffset += 2-pbufSize; + buf_.clear(); + state_ = readChunkSizeStateHandler_; + return true; + } else { + throw DL_ABORT_EX("No CRLF at the end of chunk."); + } + } else { + inbufOffset = inlen; + return false; + } +} + +ssize_t ChunkedDecodingStreamFilter::transform +(const SharedHandle& out, + const SharedHandle& segment, + const unsigned char* inbuf, size_t inlen) +{ + size_t inbufOffset = 0; + ssize_t outlen = 0; + while(inbufOffset < inlen) { + ssize_t olen = 0; + bool r = state_->execute + (this, olen, inbufOffset, inbuf, inlen, out, segment); + outlen += olen; + if(!r) { + break; + } + } + bytesProcessed_ = inbufOffset; + return outlen; +} + +bool ChunkedDecodingStreamFilter::finished() +{ + return state_ == streamEndStateHandler_ && getDelegate()->finished(); +} + +void ChunkedDecodingStreamFilter::release() {} + +const std::string& ChunkedDecodingStreamFilter::getName() const +{ + return NAME; +} + +} // namespace aria2 diff --git a/src/ChunkedDecodingStreamFilter.h b/src/ChunkedDecodingStreamFilter.h new file mode 100644 index 00000000..6909e33e --- /dev/null +++ b/src/ChunkedDecodingStreamFilter.h @@ -0,0 +1,194 @@ +/* */ +#ifndef D_CHUNKED_DECODING_STREAM_FILTER_H +#define D_CHUNKED_DECODING_STREAM_FILTER_H + +#include "StreamFilter.h" + +namespace aria2 { + +class ChunkedDecodingStreamFilter : public StreamFilter { +private: + class StateHandler { + public: + virtual ~StateHandler() {} + + virtual bool execute + (ChunkedDecodingStreamFilter* filter, + ssize_t& outlen, + size_t& inbufOffset, + const unsigned char* inbuf, + size_t inlen, + const SharedHandle& out, + const SharedHandle& segment) const = 0; + }; + + StateHandler* state_; + + std::string buf_; + + uint64_t chunkSize_; + + size_t bytesProcessed_; + + static size_t MAX_BUF_SIZE; + + bool readChunkSize + (size_t& inbufOffset, const unsigned char* inbuf, size_t inlen); + + bool readTrailer + (size_t& inbufOffset, const unsigned char* inbuf, size_t inlen); + + bool readData + (ssize_t& outlen, + size_t& inbufOffset, + const unsigned char* inbuf, + size_t inlen, + const SharedHandle& out, + const SharedHandle& segment); + + bool readDataEnd + (size_t& inbufOffset, const unsigned char* inbuf, size_t inlen); + + class ReadChunkSizeStateHandler:public StateHandler { + public: + virtual bool execute + (ChunkedDecodingStreamFilter* filter, + ssize_t& outlen, + size_t& inbufOffset, + const unsigned char* inbuf, + size_t inlen, + const SharedHandle& out, + const SharedHandle& segment) const + { + return filter->readChunkSize(inbufOffset, inbuf, inlen); + } + }; + + class ReadTrailerStateHandler:public StateHandler { + public: + virtual bool execute + (ChunkedDecodingStreamFilter* filter, + ssize_t& outlen, + size_t& inbufOffset, + const unsigned char* inbuf, + size_t inlen, + const SharedHandle& out, + const SharedHandle& segment) const + { + return filter->readTrailer(inbufOffset, inbuf, inlen); + } + }; + + class ReadDataStateHandler:public StateHandler { + public: + virtual bool execute + (ChunkedDecodingStreamFilter* filter, + ssize_t& outlen, + size_t& inbufOffset, + const unsigned char* inbuf, + size_t inlen, + const SharedHandle& out, + const SharedHandle& segment) const + { + return filter->readData(outlen, inbufOffset, inbuf, inlen, out, segment); + } + }; + + class ReadDataEndStateHandler:public StateHandler { + public: + virtual bool execute + (ChunkedDecodingStreamFilter* filter, + ssize_t& outlen, + size_t& inbufOffset, + const unsigned char* inbuf, + size_t inlen, + const SharedHandle& out, + const SharedHandle& segment) const + { + return filter->readDataEnd(inbufOffset, inbuf, inlen); + } + }; + + class StreamEndStatehandler:public StateHandler { + public: + virtual bool execute + (ChunkedDecodingStreamFilter* filter, + ssize_t& outlen, + size_t& inbufOffset, + const unsigned char* inbuf, + size_t inlen, + const SharedHandle& out, + const SharedHandle& segment) const + { + return false; + } + }; + + static ReadChunkSizeStateHandler* readChunkSizeStateHandler_; + static ReadTrailerStateHandler* readTrailerStateHandler_; + static ReadDataStateHandler* readDataStateHandler_; + static ReadDataEndStateHandler* readDataEndStateHandler_; + static StreamEndStatehandler* streamEndStateHandler_; +public: + ChunkedDecodingStreamFilter + (const SharedHandle& delegate = SharedHandle()); + + virtual ~ChunkedDecodingStreamFilter(); + + virtual void init(); + + virtual ssize_t transform + (const SharedHandle& out, + const SharedHandle& segment, + const unsigned char* inbuf, size_t inlen); + + virtual bool finished(); + + virtual void release(); + + virtual const std::string& getName() const; + + virtual size_t getBytesProcessed() const + { + return bytesProcessed_; + } + + static const std::string NAME; +}; + +} // namespace aria2 + +#endif // D_CHUNKED_DECODING_STREAM_FILTER_H diff --git a/src/DownloadCommand.cc b/src/DownloadCommand.cc index f79a3adc..fb00ef38 100644 --- a/src/DownloadCommand.cc +++ b/src/DownloadCommand.cc @@ -56,11 +56,11 @@ #include "message.h" #include "prefs.h" #include "StringFormat.h" -#include "Decoder.h" #include "RequestGroupMan.h" #include "wallclock.h" #include "ServerStatMan.h" #include "FileAllocationEntry.h" +#include "SinkStreamFilter.h" #ifdef ENABLE_MESSAGE_DIGEST # include "MessageDigestHelper.h" #endif // ENABLE_MESSAGE_DIGEST @@ -104,6 +104,10 @@ DownloadCommand::DownloadCommand(cuid_t cuid, peerStat_ = req->initPeerStat(); peerStat_->downloadStart(); getSegmentMan()->registerPeerStat(peerStat_); + + streamFilter_.reset(new SinkStreamFilter(pieceHashValidationEnabled_)); + streamFilter_->init(); + sinkFilterOnly_ = true; } DownloadCommand::~DownloadCommand() { @@ -120,80 +124,45 @@ bool DownloadCommand::executeInternal() { return false; } setReadCheckSocket(getSocket()); - SharedHandle segment = getSegments().front(); - - size_t bufSize; - if(segment->getLength() > 0) { - if(static_cast(segment->getPosition()+segment->getLength()) <= - static_cast(getFileEntry()->getLastOffset())) { - bufSize = std::min(segment->getLength()-segment->getWrittenLength(), - BUFSIZE); - } else { - bufSize = - std::min - (static_cast - (getFileEntry()->getLastOffset()-segment->getPositionToWrite()), - BUFSIZE); - } - } else { - bufSize = BUFSIZE; - } - // It is possible that segment is completed but we have some bytes - // of stream to read. For example, chunked encoding has "0"+CRLF - // after data. After we read data(at this moment segment is - // completed), we need another 3bytes(or more if it has extension). - if(bufSize == 0 && - ((!transferEncodingDecoder_.isNull() && - !transferEncodingDecoder_->finished()) || - (!contentEncodingDecoder_.isNull() && - !contentEncodingDecoder_->finished()))) { - bufSize = 1; - } - getSocket()->readData(buf_, bufSize); const SharedHandle& diskAdaptor = getPieceStorage()->getDiskAdaptor(); - - const unsigned char* bufFinal; - size_t bufSizeFinal; - - std::string decoded; - if(transferEncodingDecoder_.isNull()) { - bufFinal = buf_; - bufSizeFinal = bufSize; + SharedHandle segment = getSegments().front(); + size_t bufSize; + if(sinkFilterOnly_) { + if(segment->getLength() > 0 ) { + if(static_cast(segment->getPosition()+segment->getLength()) <= + static_cast(getFileEntry()->getLastOffset())) { + bufSize = std::min(segment->getLength()-segment->getWrittenLength(), + BUFSIZE); + } else { + bufSize = + std::min + (static_cast + (getFileEntry()->getLastOffset()-segment->getPositionToWrite()), + BUFSIZE); + } + } else { + bufSize = BUFSIZE; + } + getSocket()->readData(buf_, bufSize); + streamFilter_->transform(diskAdaptor, segment, buf_, bufSize); } else { - decoded = transferEncodingDecoder_->decode(buf_, bufSize); - - bufFinal = reinterpret_cast(decoded.c_str()); - bufSizeFinal = decoded.size(); - } - - if(contentEncodingDecoder_.isNull()) { - diskAdaptor->writeData(bufFinal, bufSizeFinal, - segment->getPositionToWrite()); - } else { - std::string out = contentEncodingDecoder_->decode(bufFinal, bufSizeFinal); - diskAdaptor->writeData(reinterpret_cast(out.data()), - out.size(), - segment->getPositionToWrite()); - bufSizeFinal = out.size(); - } - -#ifdef ENABLE_MESSAGE_DIGEST - - if(pieceHashValidationEnabled_) { - segment->updateHash(segment->getWrittenLength(), bufFinal, bufSizeFinal); - } - -#endif // ENABLE_MESSAGE_DIGEST - if(bufSizeFinal > 0) { - segment->updateWrittenLength(bufSizeFinal); + // It is possible that segment is completed but we have some bytes + // of stream to read. For example, chunked encoding has "0"+CRLF + // after data. After we read data(at this moment segment is + // completed), we need another 3bytes(or more if it has trailers). + bufSize = BUFSIZE; + getSocket()->peekData(buf_, bufSize); + streamFilter_->transform(diskAdaptor, segment, buf_, bufSize); + bufSize = streamFilter_->getBytesProcessed(); + getSocket()->readData(buf_, bufSize); } peerStat_->updateDownloadLength(bufSize); getSegmentMan()->updateDownloadSpeedFor(peerStat_); bool segmentPartComplete = false; // Note that GrowSegment::complete() always returns false. - if(transferEncodingDecoder_.isNull() && contentEncodingDecoder_.isNull()) { + if(sinkFilterOnly_) { if(segment->complete() || segment->getPositionToWrite() == getFileEntry()->getLastOffset()) { segmentPartComplete = true; @@ -203,23 +172,20 @@ bool DownloadCommand::executeInternal() { } } else { off_t loff = getFileEntry()->gtoloff(segment->getPositionToWrite()); - if(!transferEncodingDecoder_.isNull() && - ((loff == getRequestEndOffset() && transferEncodingDecoder_->finished()) + if(getFileEntry()->getLength() > 0 && !sinkFilterOnly_ && + ((loff == getRequestEndOffset() && streamFilter_->finished()) || loff < getRequestEndOffset()) && (segment->complete() || segment->getPositionToWrite() == getFileEntry()->getLastOffset())) { - // In this case, transferEncodingDecoder is used and - // Content-Length is known. We check - // transferEncodingDecoder_->finished() only if the requested - // end offset equals to written position in file local offset; - // in other words, data in the requested ranage is all received. - // If requested end offset is greater than this segment, then - // transferEncodingDecoder_ is not finished in this segment. + // In this case, StreamFilter other than *SinkStreamFilter is + // used and Content-Length is known. We check + // streamFilter_->finished() only if the requested end offset + // equals to written position in file local offset; in other + // words, data in the requested ranage is all received. If + // requested end offset is greater than this segment, then + // streamFilter_ is not finished in this segment. segmentPartComplete = true; - } else if((transferEncodingDecoder_.isNull() || - transferEncodingDecoder_->finished()) && - (contentEncodingDecoder_.isNull() || - contentEncodingDecoder_->finished())) { + } else if(streamFilter_->finished()) { segmentPartComplete = true; } } @@ -392,18 +358,18 @@ void DownloadCommand::validatePieceHash(const SharedHandle& segment, } } +void DownloadCommand::installStreamFilter +(const SharedHandle& streamFilter) +{ + if(streamFilter.isNull()) { + return; + } + streamFilter->installDelegate(streamFilter_); + streamFilter_ = streamFilter; + sinkFilterOnly_ = + util::endsWith(streamFilter_->getName(), SinkStreamFilter::NAME); +} + #endif // ENABLE_MESSAGE_DIGEST -void DownloadCommand::setTransferEncodingDecoder -(const SharedHandle& decoder) -{ - this->transferEncodingDecoder_ = decoder; -} - -void DownloadCommand::setContentEncodingDecoder -(const SharedHandle& decoder) -{ - contentEncodingDecoder_ = decoder; -} - } // namespace aria2 diff --git a/src/DownloadCommand.h b/src/DownloadCommand.h index 128b6bf7..13342c70 100644 --- a/src/DownloadCommand.h +++ b/src/DownloadCommand.h @@ -39,8 +39,8 @@ namespace aria2 { -class Decoder; class PeerStat; +class StreamFilter; #ifdef ENABLE_MESSAGE_DIGEST class MessageDigestContext; #endif // ENABLE_MESSAGE_DIGEST @@ -67,9 +67,9 @@ private: void checkLowestDownloadSpeed() const; - SharedHandle transferEncodingDecoder_; + SharedHandle streamFilter_; - SharedHandle contentEncodingDecoder_; + bool sinkFilterOnly_; protected: virtual bool executeInternal(); @@ -86,19 +86,12 @@ public: const SharedHandle& s); virtual ~DownloadCommand(); - const SharedHandle& getTransferEncodingDecoder() const + const SharedHandle& getStreamFilter() const { - return transferEncodingDecoder_; + return streamFilter_; } - void setTransferEncodingDecoder(const SharedHandle& decoder); - - const SharedHandle& getContentEncodingDecoder() const - { - return contentEncodingDecoder_; - } - - void setContentEncodingDecoder(const SharedHandle& decoder); + void installStreamFilter(const SharedHandle& streamFilter); void setStartupIdleTime(time_t startupIdleTime) { diff --git a/src/GZipDecodingStreamFilter.cc b/src/GZipDecodingStreamFilter.cc new file mode 100644 index 00000000..3fa1d05c --- /dev/null +++ b/src/GZipDecodingStreamFilter.cc @@ -0,0 +1,131 @@ +/* */ +#include "GZipDecodingStreamFilter.h" + +#include + +#include "StringFormat.h" +#include "DlAbortEx.h" + +namespace aria2 { + +const std::string GZipDecodingStreamFilter::NAME("GZipDecodingStreamFilter"); + +GZipDecodingStreamFilter::GZipDecodingStreamFilter +(const SharedHandle& delegate): + StreamFilter(delegate), strm_(0), finished_(false), bytesProcessed_(0) {} + +GZipDecodingStreamFilter::~GZipDecodingStreamFilter() +{ + release(); +} + +void GZipDecodingStreamFilter::init() +{ + finished_ = false; + release(); + strm_ = new z_stream(); + strm_->zalloc = Z_NULL; + strm_->zfree = Z_NULL; + strm_->opaque = Z_NULL; + strm_->avail_in = 0; + strm_->next_in = Z_NULL; + + // initalize z_stream with gzip/zlib format auto detection enabled. + if(Z_OK != inflateInit2(strm_, 47)) { + throw DL_ABORT_EX("Initializing z_stream failed."); + } +} + +void GZipDecodingStreamFilter::release() +{ + if(strm_) { + inflateEnd(strm_); + delete strm_; + strm_ = 0; + } +} + +ssize_t GZipDecodingStreamFilter::transform +(const SharedHandle& out, + const SharedHandle& segment, + const unsigned char* inbuf, size_t inlen) +{ + bytesProcessed_ = 0; + ssize_t outlen = 0; + if(inlen == 0) { + return outlen; + } + + strm_->avail_in = inlen; + strm_->next_in = const_cast(inbuf); + + unsigned char outbuf[OUTBUF_LENGTH]; + while(1) { + strm_->avail_out = OUTBUF_LENGTH; + strm_->next_out = outbuf; + + int ret = ::inflate(strm_, Z_NO_FLUSH); + + if(ret == Z_STREAM_END) { + finished_ = true; + } else if(ret != Z_OK) { + throw DL_ABORT_EX(StringFormat("libz::inflate() failed. cause:%s", + strm_->msg).str()); + } + + size_t produced = OUTBUF_LENGTH-strm_->avail_out; + + outlen += getDelegate()->transform(out, segment, outbuf, produced); + if(strm_->avail_out > 0) { + break; + } + } + assert(inlen >= strm_->avail_in); + bytesProcessed_ = inlen-strm_->avail_in; + return outlen; +} + +bool GZipDecodingStreamFilter::finished() +{ + return finished_ && getDelegate()->finished(); +} + +const std::string& GZipDecodingStreamFilter::getName() const +{ + return NAME; +} + +} // namespace aria2 diff --git a/src/GZipDecodingStreamFilter.h b/src/GZipDecodingStreamFilter.h new file mode 100644 index 00000000..3fce057f --- /dev/null +++ b/src/GZipDecodingStreamFilter.h @@ -0,0 +1,81 @@ +/* */ +#ifndef D_GZIP_STREAM_FILTER_H +#define D_GZIP_STREAM_FILTER_H + +#include "StreamFilter.h" +#include + +namespace aria2 { + +// GZipDecodingStreamFilter can decode both gzip and deflate format. +class GZipDecodingStreamFilter : public StreamFilter { +private: + z_stream* strm_; + + bool finished_; + + size_t bytesProcessed_; + + static const size_t OUTBUF_LENGTH = 16*1024; +public: + GZipDecodingStreamFilter + (const SharedHandle& delegate = SharedHandle()); + + virtual ~GZipDecodingStreamFilter(); + + virtual void init(); + + virtual ssize_t transform(const SharedHandle& out, + const SharedHandle& segment, + const unsigned char* inbuf, size_t inlen); + + virtual bool finished(); + + virtual void release(); + + virtual const std::string& getName() const; + + virtual size_t getBytesProcessed() const + { + return bytesProcessed_; + } + + static const std::string NAME; +}; + +} // namespace aria2 + +#endif // D_GZIP_STREAM_FILTER_H diff --git a/src/HttpConnection.cc b/src/HttpConnection.cc index ff995a08..56710e62 100644 --- a/src/HttpConnection.cc +++ b/src/HttpConnection.cc @@ -134,7 +134,7 @@ SharedHandle HttpConnection::receiveResponse() if(socket_->wantRead() || socket_->wantWrite()) { return SharedHandle(); } else { - throw DL_RETRY_EX(EX_INVALID_RESPONSE); + throw DL_RETRY_EX(EX_GOT_EOF); } } proc->update(buf, size); diff --git a/src/HttpDownloadCommand.cc b/src/HttpDownloadCommand.cc index 52ba1ed7..7a304682 100644 --- a/src/HttpDownloadCommand.cc +++ b/src/HttpDownloadCommand.cc @@ -47,12 +47,15 @@ #include "HttpHeader.h" #include "Range.h" #include "DownloadContext.h" -#include "Decoder.h" #include "RequestGroupMan.h" #include "FileAllocationEntry.h" #include "CheckIntegrityEntry.h" #include "ServerStatMan.h" #include "Logger.h" +#include "StreamFilter.h" +#include "SinkStreamFilter.h" +#include "util.h" + namespace aria2 { HttpDownloadCommand::HttpDownloadCommand @@ -88,18 +91,16 @@ bool HttpDownloadCommand::prepareForNextSegment() { if(getRequest()->isPipeliningEnabled() || (getRequest()->isKeepAliveEnabled() && ( - // Make sure that all decoders are finished to pool socket - ((!getTransferEncodingDecoder().isNull() && - getTransferEncodingDecoder()->finished()) || - (getTransferEncodingDecoder().isNull() && - !getContentEncodingDecoder().isNull() && - getContentEncodingDecoder()->finished())) || + // Make sure that all filters are finished to pool socket + (!util::endsWith(getStreamFilter()->getName(), + SinkStreamFilter::NAME) && + getStreamFilter()->finished()) || getRequestEndOffset() == getFileEntry()->gtoloff(getSegments().front()->getPositionToWrite()) ) ) ) { - // TODO What if server sends EOF when _contentEncodingDecoder is + // TODO What if server sends EOF when non-SinkStreamFilter is // used and server didn't send Connection: close? We end up to // pool terminated socket. In HTTP/1.1, keep-alive is default, // so closing connection without Connection: close header means @@ -133,7 +134,12 @@ bool HttpDownloadCommand::prepareForNextSegment() { off_t HttpDownloadCommand::getRequestEndOffset() const { - return httpResponse_->getHttpHeader()->getRange()->getEndByte()+1; + off_t endByte = httpResponse_->getHttpHeader()->getRange()->getEndByte(); + if(endByte > 0) { + return endByte+1; + } else { + return endByte; + } } } // namespace aria2 diff --git a/src/HttpResponse.cc b/src/HttpResponse.cc index e2b08fb1..fd406b4a 100644 --- a/src/HttpResponse.cc +++ b/src/HttpResponse.cc @@ -46,14 +46,13 @@ #include "DlRetryEx.h" #include "StringFormat.h" #include "A2STR.h" -#include "Decoder.h" -#include "ChunkedDecoder.h" -#ifdef HAVE_LIBZ -# include "GZipDecoder.h" -#endif // HAVE_LIBZ #include "CookieStorage.h" #include "AuthConfigFactory.h" #include "AuthConfig.h" +#include "ChunkedDecodingStreamFilter.h" +#ifdef HAVE_LIBZ +# include "GZipDecodingStreamFilter.h" +#endif // HAVE_LIBZ namespace aria2 { @@ -177,20 +176,21 @@ bool HttpResponse::isTransferEncodingSpecified() const std::string HttpResponse::getTransferEncoding() const { - // TODO See TODO in getTransferEncodingDecoder() + // TODO See TODO in getTransferEncodingStreamFilter() return httpHeader_->getFirst(HttpHeader::TRANSFER_ENCODING); } -SharedHandle HttpResponse::getTransferEncodingDecoder() const +SharedHandle HttpResponse::getTransferEncodingStreamFilter() const { + SharedHandle filter; // TODO Transfer-Encoding header field can contains multiple tokens. We should // parse the field and retrieve each token. if(isTransferEncodingSpecified()) { if(getTransferEncoding() == HttpHeader::CHUNKED) { - return SharedHandle(new ChunkedDecoder()); + filter.reset(new ChunkedDecodingStreamFilter()); } } - return SharedHandle(); + return filter; } bool HttpResponse::isContentEncodingSpecified() const @@ -203,15 +203,16 @@ const std::string& HttpResponse::getContentEncoding() const return httpHeader_->getFirst(HttpHeader::CONTENT_ENCODING); } -SharedHandle HttpResponse::getContentEncodingDecoder() const +SharedHandle HttpResponse::getContentEncodingStreamFilter() const { + SharedHandle filter; #ifdef HAVE_LIBZ if(getContentEncoding() == HttpHeader::GZIP || getContentEncoding() == HttpHeader::DEFLATE) { - return SharedHandle(new GZipDecoder()); + filter.reset(new GZipDecodingStreamFilter()); } #endif // HAVE_LIBZ - return SharedHandle(); + return filter; } uint64_t HttpResponse::getContentLength() const diff --git a/src/HttpResponse.h b/src/HttpResponse.h index d3b96281..965bd431 100644 --- a/src/HttpResponse.h +++ b/src/HttpResponse.h @@ -48,7 +48,7 @@ namespace aria2 { class HttpRequest; class HttpHeader; class Logger; -class Decoder; +class StreamFilter; class HttpResponse { private: @@ -86,13 +86,13 @@ public: std::string getTransferEncoding() const; - SharedHandle getTransferEncodingDecoder() const; + SharedHandle getTransferEncodingStreamFilter() const; bool isContentEncodingSpecified() const; const std::string& getContentEncoding() const; - SharedHandle getContentEncodingDecoder() const; + SharedHandle getContentEncodingStreamFilter() const; uint64_t getContentLength() const; diff --git a/src/HttpResponseCommand.cc b/src/HttpResponseCommand.cc index a1c8c2aa..bfa67430 100644 --- a/src/HttpResponseCommand.cc +++ b/src/HttpResponseCommand.cc @@ -69,14 +69,20 @@ #include "ServerStatMan.h" #include "FileAllocationEntry.h" #include "CheckIntegrityEntry.h" +#include "StreamFilter.h" +#include "SinkStreamFilter.h" +#include "ChunkedDecodingStreamFilter.h" +#include "GZipDecodingStreamFilter.h" namespace aria2 { -static SharedHandle getTransferEncodingDecoder -(const SharedHandle& httpResponse); +static SharedHandle getTransferEncodingStreamFilter +(const SharedHandle& httpResponse, + const SharedHandle& delegate = SharedHandle()); -static SharedHandle getContentEncodingDecoder -(const SharedHandle& httpResponse); +static SharedHandle getContentEncodingStreamFilter +(const SharedHandle& httpResponse, + const SharedHandle& delegate = SharedHandle()); HttpResponseCommand::HttpResponseCommand (cuid_t cuid, @@ -198,12 +204,16 @@ bool HttpResponseCommand::executeInternal() // anyway. getPieceStorage()->getDiskAdaptor()->truncate(0); getDownloadEngine()->addCommand - (createHttpDownloadCommand(httpResponse, - getTransferEncodingDecoder(httpResponse), - getContentEncodingDecoder(httpResponse))); + (createHttpDownloadCommand + (httpResponse, + getTransferEncodingStreamFilter + (httpResponse, + getContentEncodingStreamFilter(httpResponse)))); } else { - getDownloadEngine()->addCommand(createHttpDownloadCommand - (httpResponse, getTransferEncodingDecoder(httpResponse))); + getDownloadEngine()->addCommand + (createHttpDownloadCommand + (httpResponse, + getTransferEncodingStreamFilter(httpResponse))); } return true; } @@ -275,7 +285,8 @@ bool HttpResponseCommand::handleDefaultEncoding !segment.isNull() && segment->getPositionToWrite() == 0 && !getRequest()->isPipeliningEnabled()) { command = createHttpDownloadCommand - (httpResponse, getTransferEncodingDecoder(httpResponse)); + (httpResponse, + getTransferEncodingStreamFilter(httpResponse)); } else { getSegmentMan()->cancelSegment(getCuid()); getFileEntry()->poolRequest(getRequest()); @@ -291,39 +302,49 @@ bool HttpResponseCommand::handleDefaultEncoding return true; } -static SharedHandle getTransferEncodingDecoder -(const SharedHandle& httpResponse) +static SharedHandle getTransferEncodingStreamFilter +(const SharedHandle& httpResponse, + const SharedHandle& delegate) { - SharedHandle decoder; + SharedHandle filter; if(httpResponse->isTransferEncodingSpecified()) { - decoder = httpResponse->getTransferEncodingDecoder(); - if(decoder.isNull()) { + filter = httpResponse->getTransferEncodingStreamFilter(); + if(filter.isNull()) { throw DL_ABORT_EX (StringFormat(EX_TRANSFER_ENCODING_NOT_SUPPORTED, httpResponse->getTransferEncoding().c_str()).str()); } - decoder->init(); + filter->init(); + filter->installDelegate(delegate); } - return decoder; + if(filter.isNull()) { + filter = delegate; + } + return filter; } -static SharedHandle getContentEncodingDecoder -(const SharedHandle& httpResponse) +static SharedHandle getContentEncodingStreamFilter +(const SharedHandle& httpResponse, + const SharedHandle& delegate) { - SharedHandle decoder; + SharedHandle filter; if(httpResponse->isContentEncodingSpecified()) { - decoder = httpResponse->getContentEncodingDecoder(); - if(decoder.isNull()) { + filter = httpResponse->getContentEncodingStreamFilter(); + if(filter.isNull()) { LogFactory::getInstance()->info ("Content-Encoding %s is specified, but the current implementation" "doesn't support it. The decoding process is skipped and the" "downloaded content will be still encoded.", httpResponse->getContentEncoding().c_str()); } else { - decoder->init(); + filter->init(); + filter->installDelegate(delegate); } } - return decoder; + if(filter.isNull()) { + filter = delegate; + } + return filter; } bool HttpResponseCommand::handleOtherEncoding @@ -357,9 +378,20 @@ bool HttpResponseCommand::handleOtherEncoding getRequestGroup()->shouldCancelDownloadForSafety(); getRequestGroup()->initPieceStorage(); getPieceStorage()->getDiskAdaptor()->initAndOpenFile(); + + SharedHandle streamFilter = + getTransferEncodingStreamFilter + (httpResponse, + getContentEncodingStreamFilter(httpResponse)); + // In this context, knowsTotalLength() is true only when the file is // really zero-length. - if(getDownloadContext()->knowsTotalLength()) { + if(getDownloadContext()->knowsTotalLength() && + (streamFilter.isNull() || + streamFilter->getName() != ChunkedDecodingStreamFilter::NAME)) { + // If chunked transfer-encoding is specified, we have to read end + // of chunk markers(0\r\n\r\n, for example), so cannot pool + // connection here. poolConnection(); return true; } @@ -369,16 +401,15 @@ bool HttpResponseCommand::handleOtherEncoding getSegmentMan()->getSegmentWithIndex(getCuid(), 0); getDownloadEngine()->addCommand - (createHttpDownloadCommand(httpResponse, - getTransferEncodingDecoder(httpResponse), - getContentEncodingDecoder(httpResponse))); + (createHttpDownloadCommand(httpResponse, streamFilter)); return true; } bool HttpResponseCommand::skipResponseBody (const SharedHandle& httpResponse) { - SharedHandle decoder = getTransferEncodingDecoder(httpResponse); + SharedHandle filter = + getTransferEncodingStreamFilter(httpResponse); // We don't use Content-Encoding here because this response body is just // thrown away. @@ -386,7 +417,7 @@ bool HttpResponseCommand::skipResponseBody (getCuid(), getRequest(), getFileEntry(), getRequestGroup(), httpConnection_, httpResponse, getDownloadEngine(), getSocket()); - command->setTransferEncodingDecoder(decoder); + command->installStreamFilter(filter); // If request method is HEAD or the response body is zero-length, // set command's status to real time so that avoid read check blocking @@ -403,10 +434,23 @@ bool HttpResponseCommand::skipResponseBody return true; } +static bool decideFileAllocation +(const SharedHandle& filter) +{ + for(SharedHandle f = filter; !f.isNull(); f = f->getDelegate()){ + // Since the compressed file's length are returned in the response header + // and the decompressed file size is unknown at this point, disable file + // allocation here. + if(f->getName() == GZipDecodingStreamFilter::NAME) { + return false; + } + } + return true; +} + HttpDownloadCommand* HttpResponseCommand::createHttpDownloadCommand (const SharedHandle& httpResponse, - const SharedHandle& transferEncodingDecoder, - const SharedHandle& contentEncodingDecoder) + const SharedHandle& filter) { HttpDownloadCommand* command = @@ -417,16 +461,8 @@ HttpDownloadCommand* HttpResponseCommand::createHttpDownloadCommand command->setStartupIdleTime(getOption()->getAsInt(PREF_STARTUP_IDLE_TIME)); command->setLowestDownloadSpeedLimit (getOption()->getAsInt(PREF_LOWEST_SPEED_LIMIT)); - command->setTransferEncodingDecoder(transferEncodingDecoder); - - if(!contentEncodingDecoder.isNull()) { - command->setContentEncodingDecoder(contentEncodingDecoder); - // Since the compressed file's length are returned in the response header - // and the decompressed file size is unknown at this point, disable file - // allocation here. - getRequestGroup()->setFileAllocationEnabled(false); - } - + command->installStreamFilter(filter); + getRequestGroup()->setFileAllocationEnabled(decideFileAllocation(filter)); getRequestGroup()->getURISelector()->tuneDownloadCommand (getFileEntry()->getRemainingUris(), command); diff --git a/src/HttpResponseCommand.h b/src/HttpResponseCommand.h index 79f2618b..85932f68 100644 --- a/src/HttpResponseCommand.h +++ b/src/HttpResponseCommand.h @@ -36,7 +36,6 @@ #define _D_HTTP_RESPONSE_COMMAND_H_ #include "AbstractCommand.h" -#include "Decoder.h" #include "TimeA2.h" namespace aria2 { @@ -45,6 +44,7 @@ class HttpConnection; class HttpDownloadCommand; class HttpResponse; class SocketCore; +class StreamFilter; class HttpResponseCommand : public AbstractCommand { private: @@ -55,11 +55,9 @@ private: bool skipResponseBody(const SharedHandle& httpResponse); HttpDownloadCommand* - createHttpDownloadCommand(const SharedHandle& httpResponse, - const SharedHandle& transferEncodingDecoder - = SharedHandle(), - const SharedHandle& contentEncodingDecoder - = SharedHandle()); + createHttpDownloadCommand + (const SharedHandle& httpResponse, + const SharedHandle& streamFilter); void updateLastModifiedTime(const Time& lastModified); diff --git a/src/HttpSkipResponseCommand.cc b/src/HttpSkipResponseCommand.cc index 43ad841e..6f0a173e 100644 --- a/src/HttpSkipResponseCommand.cc +++ b/src/HttpSkipResponseCommand.cc @@ -37,7 +37,6 @@ #include "HttpResponse.h" #include "message.h" #include "SocketCore.h" -#include "Decoder.h" #include "DlRetryEx.h" #include "Request.h" #include "DownloadEngine.h" @@ -58,6 +57,10 @@ #include "FileAllocationEntry.h" #include "CheckIntegrityEntry.h" #include "ServerStatMan.h" +#include "StreamFilter.h" +#include "BinaryStream.h" +#include "NullSinkStreamFilter.h" +#include "SinkStreamFilter.h" namespace aria2 { @@ -73,22 +76,30 @@ HttpSkipResponseCommand::HttpSkipResponseCommand AbstractCommand(cuid, req, fileEntry, requestGroup, e, s), httpConnection_(httpConnection), httpResponse_(httpResponse), + streamFilter_(new NullSinkStreamFilter()), + sinkFilterOnly_(true), totalLength_(httpResponse_->getEntityLength()), receivedBytes_(0) {} HttpSkipResponseCommand::~HttpSkipResponseCommand() {} -void HttpSkipResponseCommand::setTransferEncodingDecoder -(const SharedHandle& decoder) +void HttpSkipResponseCommand::installStreamFilter +(const SharedHandle& streamFilter) { - transferEncodingDecoder_ = decoder; + if(streamFilter.isNull()) { + return; + } + streamFilter->installDelegate(streamFilter_); + streamFilter_ = streamFilter; + sinkFilterOnly_ = + util::endsWith(streamFilter_->getName(), SinkStreamFilter::NAME); } bool HttpSkipResponseCommand::executeInternal() { if(getRequest()->getMethod() == Request::METHOD_HEAD || - (totalLength_ == 0 && transferEncodingDecoder_.isNull())) { + (totalLength_ == 0 && sinkFilterOnly_)) { // If request method is HEAD or content-length header is present and // it's value is 0, then pool socket for reuse. // If content-length header is not present, then EOF is expected in the end. @@ -101,17 +112,25 @@ bool HttpSkipResponseCommand::executeInternal() } const size_t BUFSIZE = 16*1024; unsigned char buf[BUFSIZE]; - size_t bufSize = BUFSIZE; - + size_t bufSize; + if(sinkFilterOnly_ && totalLength_ > 0) { + bufSize = totalLength_-receivedBytes_; + } else { + bufSize = BUFSIZE; + } try { - getSocket()->readData(buf, bufSize); - - if(transferEncodingDecoder_.isNull()) { + if(sinkFilterOnly_) { + getSocket()->readData(buf, bufSize); receivedBytes_ += bufSize; } else { + getSocket()->peekData(buf, bufSize); // receivedBytes_ is not updated if transferEncoding is set. // The return value is safely ignored here. - transferEncodingDecoder_->decode(buf, bufSize); + streamFilter_->transform(SharedHandle(), + SharedHandle(), + buf, bufSize); + bufSize = streamFilter_->getBytesProcessed(); + getSocket()->readData(buf, bufSize); } if(totalLength_ != 0 && bufSize == 0 && !getSocket()->wantRead() && !getSocket()->wantWrite()) { @@ -125,7 +144,7 @@ bool HttpSkipResponseCommand::executeInternal() } bool finished = false; - if(transferEncodingDecoder_.isNull()) { + if(sinkFilterOnly_) { if(bufSize == 0) { if(!getSocket()->wantRead() && !getSocket()->wantWrite()) { return processResponse(); @@ -134,7 +153,7 @@ bool HttpSkipResponseCommand::executeInternal() finished = (totalLength_ == receivedBytes_); } } else { - finished = transferEncodingDecoder_->finished(); + finished = streamFilter_->finished(); } if(finished) { poolConnection(); diff --git a/src/HttpSkipResponseCommand.h b/src/HttpSkipResponseCommand.h index 2a865cf7..7fb1769d 100644 --- a/src/HttpSkipResponseCommand.h +++ b/src/HttpSkipResponseCommand.h @@ -41,7 +41,7 @@ namespace aria2 { class HttpConnection; class HttpResponse; -class Decoder; +class StreamFilter; class HttpSkipResponseCommand : public AbstractCommand { private: @@ -49,7 +49,9 @@ private: SharedHandle httpResponse_; - SharedHandle transferEncodingDecoder_; + SharedHandle streamFilter_; + + bool sinkFilterOnly_; uint64_t totalLength_; @@ -72,7 +74,7 @@ public: virtual ~HttpSkipResponseCommand(); - void setTransferEncodingDecoder(const SharedHandle& decoder); + void installStreamFilter(const SharedHandle& streamFilter); void disableSocketCheck(); }; diff --git a/src/Makefile.am b/src/Makefile.am index e294b6c4..813c96a1 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -206,7 +206,12 @@ SRCS = Socket.h\ ValueBase.cc ValueBase.h\ ContextAttribute.h\ TorrentAttribute.h\ - AdaptiveFileAllocationIterator.cc AdaptiveFileAllocationIterator.h + AdaptiveFileAllocationIterator.cc AdaptiveFileAllocationIterator.h\ + StreamFilter.cc StreamFilter.h\ + SinkStreamFilter.cc SinkStreamFilter.h\ + GZipDecodingStreamFilter.cc GZipDecodingStreamFilter.h\ + ChunkedDecodingStreamFilter.cc ChunkedDecodingStreamFilter.h\ + NullSinkStreamFilter.cc NullSinkStreamFilter.h if ENABLE_XML_RPC SRCS += XmlRpcRequestParserController.cc XmlRpcRequestParserController.h\ diff --git a/src/Makefile.in b/src/Makefile.in index d38f6659..af897bba 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -442,7 +442,11 @@ am__libaria2c_a_SOURCES_DIST = Socket.h SocketCore.cc SocketCore.h \ MetadataInfo.h SessionSerializer.cc SessionSerializer.h \ Event.h timespec.h ValueBase.cc ValueBase.h ContextAttribute.h \ TorrentAttribute.h AdaptiveFileAllocationIterator.cc \ - AdaptiveFileAllocationIterator.h \ + AdaptiveFileAllocationIterator.h StreamFilter.cc \ + StreamFilter.h SinkStreamFilter.cc SinkStreamFilter.h \ + GZipDecodingStreamFilter.cc GZipDecodingStreamFilter.h \ + ChunkedDecodingStreamFilter.cc ChunkedDecodingStreamFilter.h \ + NullSinkStreamFilter.cc NullSinkStreamFilter.h \ XmlRpcRequestParserController.cc \ XmlRpcRequestParserController.h \ XmlRpcRequestParserStateMachine.cc \ @@ -883,17 +887,20 @@ am__objects_32 = SocketCore.$(OBJEXT) Command.$(OBJEXT) \ CreateRequestCommand.$(OBJEXT) download_helper.$(OBJEXT) \ MetadataInfo.$(OBJEXT) SessionSerializer.$(OBJEXT) \ ValueBase.$(OBJEXT) AdaptiveFileAllocationIterator.$(OBJEXT) \ - $(am__objects_1) $(am__objects_2) $(am__objects_3) \ - $(am__objects_4) $(am__objects_5) $(am__objects_6) \ - $(am__objects_7) $(am__objects_8) $(am__objects_9) \ - $(am__objects_10) $(am__objects_11) $(am__objects_12) \ - $(am__objects_13) $(am__objects_14) $(am__objects_15) \ - $(am__objects_16) $(am__objects_17) $(am__objects_18) \ - $(am__objects_19) $(am__objects_20) $(am__objects_21) \ - $(am__objects_22) $(am__objects_23) $(am__objects_24) \ - $(am__objects_25) $(am__objects_26) $(am__objects_27) \ - $(am__objects_28) $(am__objects_29) $(am__objects_30) \ - $(am__objects_31) + StreamFilter.$(OBJEXT) SinkStreamFilter.$(OBJEXT) \ + GZipDecodingStreamFilter.$(OBJEXT) \ + ChunkedDecodingStreamFilter.$(OBJEXT) \ + NullSinkStreamFilter.$(OBJEXT) $(am__objects_1) \ + $(am__objects_2) $(am__objects_3) $(am__objects_4) \ + $(am__objects_5) $(am__objects_6) $(am__objects_7) \ + $(am__objects_8) $(am__objects_9) $(am__objects_10) \ + $(am__objects_11) $(am__objects_12) $(am__objects_13) \ + $(am__objects_14) $(am__objects_15) $(am__objects_16) \ + $(am__objects_17) $(am__objects_18) $(am__objects_19) \ + $(am__objects_20) $(am__objects_21) $(am__objects_22) \ + $(am__objects_23) $(am__objects_24) $(am__objects_25) \ + $(am__objects_26) $(am__objects_27) $(am__objects_28) \ + $(am__objects_29) $(am__objects_30) $(am__objects_31) am_libaria2c_a_OBJECTS = $(am__objects_32) libaria2c_a_OBJECTS = $(am_libaria2c_a_OBJECTS) am__installdirs = "$(DESTDIR)$(bindir)" @@ -1227,7 +1234,11 @@ SRCS = Socket.h SocketCore.cc SocketCore.h BinaryStream.h Command.cc \ MetadataInfo.h SessionSerializer.cc SessionSerializer.h \ Event.h timespec.h ValueBase.cc ValueBase.h ContextAttribute.h \ TorrentAttribute.h AdaptiveFileAllocationIterator.cc \ - AdaptiveFileAllocationIterator.h $(am__append_1) \ + AdaptiveFileAllocationIterator.h StreamFilter.cc \ + StreamFilter.h SinkStreamFilter.cc SinkStreamFilter.h \ + GZipDecodingStreamFilter.cc GZipDecodingStreamFilter.h \ + ChunkedDecodingStreamFilter.cc ChunkedDecodingStreamFilter.h \ + NullSinkStreamFilter.cc NullSinkStreamFilter.h $(am__append_1) \ $(am__append_2) $(am__append_3) $(am__append_4) \ $(am__append_5) $(am__append_6) $(am__append_7) \ $(am__append_8) $(am__append_9) $(am__append_10) \ @@ -1396,6 +1407,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/CheckIntegrityEntry.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ChecksumCheckIntegrityEntry.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ChunkedDecoder.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ChunkedDecodingStreamFilter.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Command.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ConsoleStatCalc.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ContentTypeRequestGroupCriteria.Po@am__quote@ @@ -1498,6 +1510,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/FtpTunnelRequestCommand.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/FtpTunnelResponseCommand.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/GZipDecoder.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/GZipDecodingStreamFilter.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/GZipEncoder.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/GrowSegment.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/HandshakeExtensionMessage.Po@am__quote@ @@ -1559,6 +1572,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Netrc.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/NetrcAuthResolver.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/NsCookieParser.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/NullSinkStreamFilter.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Option.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/OptionHandler.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/OptionHandlerException.Po@am__quote@ @@ -1606,6 +1620,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SimpleLogFormatter.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SimpleRandomizer.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SingleFileAllocationIterator.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SinkStreamFilter.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SleepCommand.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SocketBuffer.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SocketCore.Po@am__quote@ @@ -1614,6 +1629,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Sqlite3CookieParserImpl.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/StreamCheckIntegrityEntry.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/StreamFileAllocationEntry.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/StreamFilter.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/StringFormat.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/TimeA2.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/TimeBasedCommand.Po@am__quote@ diff --git a/src/NullSinkStreamFilter.cc b/src/NullSinkStreamFilter.cc new file mode 100644 index 00000000..4572c575 --- /dev/null +++ b/src/NullSinkStreamFilter.cc @@ -0,0 +1,41 @@ +/* */ +#include "NullSinkStreamFilter.h" + +namespace aria2 { + +const std::string NullSinkStreamFilter::NAME("NullSinkStreamFilter"); + +} // namespace aria2 diff --git a/src/NullSinkStreamFilter.h b/src/NullSinkStreamFilter.h new file mode 100644 index 00000000..23d59c86 --- /dev/null +++ b/src/NullSinkStreamFilter.h @@ -0,0 +1,86 @@ +/* */ +#ifndef D_NULL_SINK_STREAM_FILTER_H +#define D_NULL_SINK_STREAM_FILTER_H + +#include "StreamFilter.h" + +namespace aria2 { + +class NullSinkStreamFilter:public StreamFilter { +private: + size_t bytesProcessed_; +public: + NullSinkStreamFilter():bytesProcessed_(0) {} + + virtual void init() {} + + virtual ssize_t transform + (const SharedHandle& out, + const SharedHandle& segment, + const unsigned char* inbuf, size_t inlen) + { + bytesProcessed_ = inlen; + return bytesProcessed_; + } + + virtual bool finished() + { + return true; + } + + virtual void release() {} + + virtual const std::string& getName() const + { + return NAME; + } + + static const std::string NAME; + + virtual size_t getBytesProcessed() const + { + return bytesProcessed_; + } + + virtual bool installDelegate(const SharedHandle& filter) + { + return false; + } +}; + +} // namespace aria2 + +#endif // D_NULL_SINK_STREAM_FILTER_H diff --git a/src/RequestGroup.cc b/src/RequestGroup.cc index 1e79bda6..7835782d 100644 --- a/src/RequestGroup.cc +++ b/src/RequestGroup.cc @@ -525,7 +525,9 @@ void RequestGroup::processCheckIntegrityEntry void RequestGroup::initPieceStorage() { SharedHandle tempPieceStorage; - if(downloadContext_->knowsTotalLength()) { + if(downloadContext_->knowsTotalLength() && + (downloadContext_->getTotalLength() > 0 || + downloadContext_->hasAttribute(bittorrent::BITTORRENT))) { #ifdef ENABLE_BITTORRENT SharedHandle ps (new DefaultPieceStorage(downloadContext_, option_.get())); diff --git a/src/SinkStreamFilter.cc b/src/SinkStreamFilter.cc new file mode 100644 index 00000000..dadec822 --- /dev/null +++ b/src/SinkStreamFilter.cc @@ -0,0 +1,65 @@ +/* */ +#include "SinkStreamFilter.h" +#include "BinaryStream.h" +#include "Segment.h" + +namespace aria2 { + +const std::string SinkStreamFilter::NAME("SinkStreamFilter"); + +SinkStreamFilter::SinkStreamFilter(bool hashUpdate): + hashUpdate_(hashUpdate), + bytesProcessed_(0) {} + +ssize_t SinkStreamFilter::transform +(const SharedHandle& out, + const SharedHandle& segment, + const unsigned char* inbuf, size_t inlen) +{ + if(inlen > 0) { + out->writeData(inbuf, inlen, segment->getPositionToWrite()); +#ifdef ENABLE_MESSAGE_DIGEST + if(hashUpdate_) { + segment->updateHash(segment->getWrittenLength(), inbuf, inlen); + } +#endif // ENABLE_MESSAGE_DIGEST + segment->updateWrittenLength(inlen); + } + bytesProcessed_ = inlen; + return bytesProcessed_; +} + +} // namespace aria2 diff --git a/src/SinkStreamFilter.h b/src/SinkStreamFilter.h new file mode 100644 index 00000000..c8dcc76f --- /dev/null +++ b/src/SinkStreamFilter.h @@ -0,0 +1,84 @@ +/* */ +#ifndef D_SINK_STREAM_FILTER_H +#define D_SINK_STREAM_FILTER_H + +#include "StreamFilter.h" + +namespace aria2 { + +class SinkStreamFilter:public StreamFilter { +private: + bool hashUpdate_; + + size_t bytesProcessed_; +public: + SinkStreamFilter(bool hashUpdate = false); + + virtual void init() {} + + virtual ssize_t transform + (const SharedHandle& out, + const SharedHandle& segment, + const unsigned char* inbuf, size_t inlen); + + virtual bool finished() + { + return true; + } + + virtual void release() {} + + virtual const std::string& getName() const + { + return NAME; + } + + static const std::string NAME; + + virtual size_t getBytesProcessed() const + { + return bytesProcessed_; + } + + virtual bool installDelegate(const SharedHandle& filter) + { + return false; + } +}; + +} // namespace aria2 + +#endif // D_SINK_STREAM_FILTER_H diff --git a/src/StreamFilter.cc b/src/StreamFilter.cc new file mode 100644 index 00000000..e88fe943 --- /dev/null +++ b/src/StreamFilter.cc @@ -0,0 +1,53 @@ +/* */ +#include "StreamFilter.h" + +namespace aria2 { + +StreamFilter::StreamFilter +(const SharedHandle& delegate): + delegate_(delegate) {} + +bool StreamFilter::installDelegate(const SharedHandle& filter) +{ + if(delegate_.isNull()) { + delegate_ = filter; + return true; + } else { + return delegate_->installDelegate(filter); + } +} + +} // namespace aria2 diff --git a/src/StreamFilter.h b/src/StreamFilter.h new file mode 100644 index 00000000..f1588976 --- /dev/null +++ b/src/StreamFilter.h @@ -0,0 +1,84 @@ +/* */ +#ifndef D_STREAM_FILTER_H +#define D_STREAM_FILTER_H + +#include "common.h" +#include +#include "SharedHandle.h" + +namespace aria2 { + +class BinaryStream; +class Segment; + +// Interface for basic decoding functionality. +class StreamFilter { +private: + SharedHandle delegate_; +public: + StreamFilter + (const SharedHandle& delegate = SharedHandle()); + + virtual ~StreamFilter() {} + + // init() must be called before calling decode(). + virtual void init() = 0; + + virtual ssize_t transform(const SharedHandle& out, + const SharedHandle& segment, + const unsigned char* inbuf, size_t inlen) = 0; + + virtual bool finished() = 0; + + // The call of release() will free allocated resources. + // After calling release(), the object can be reused by calling init(). + virtual void release() = 0; + + virtual const std::string& getName() const = 0; + + virtual size_t getBytesProcessed() const = 0; + + virtual bool installDelegate(const SharedHandle& filter); + + SharedHandle getDelegate() const + { + return delegate_; + } +}; + +} // namespace aria2 + +#endif // D_STREAM_FILTER_H diff --git a/test/ChunkedDecodingStreamFilterTest.cc b/test/ChunkedDecodingStreamFilterTest.cc new file mode 100644 index 00000000..8dea7bd1 --- /dev/null +++ b/test/ChunkedDecodingStreamFilterTest.cc @@ -0,0 +1,240 @@ +#include "ChunkedDecodingStreamFilter.h" + +#include +#include +#include + +#include "DlAbortEx.h" +#include "Segment.h" +#include "ByteArrayDiskWriter.h" +#include "SinkStreamFilter.h" +#include "MockSegment.h" + +namespace aria2 { + +class ChunkedDecodingStreamFilterTest:public CppUnit::TestFixture { + + CPPUNIT_TEST_SUITE(ChunkedDecodingStreamFilterTest); + CPPUNIT_TEST(testTransform); + CPPUNIT_TEST(testTransform_withoutTrailer); + CPPUNIT_TEST(testTransform_with2Trailers); + CPPUNIT_TEST(testTransform_largeChunkSize); + CPPUNIT_TEST(testTransform_tooLargeChunkSize); + CPPUNIT_TEST(testTransform_chunkSizeMismatch); + CPPUNIT_TEST(testGetName); + CPPUNIT_TEST_SUITE_END(); + + SharedHandle filter_; + SharedHandle sinkFilter_; + SharedHandle writer_; + SharedHandle segment_; + + void clearWriter() + { + writer_->setString(""); + } +public: + void setUp() + { + writer_.reset(new ByteArrayDiskWriter()); + sinkFilter_.reset(new SinkStreamFilter()); + filter_.reset(new ChunkedDecodingStreamFilter(sinkFilter_)); + sinkFilter_->init(); + filter_->init(); + segment_.reset(new MockSegment()); + } + + void testTransform(); + void testTransform_withoutTrailer(); + void testTransform_with2Trailers(); + void testTransform_largeChunkSize(); + void testTransform_tooLargeChunkSize(); + void testTransform_chunkSizeMismatch(); + void testGetName(); +}; + + +CPPUNIT_TEST_SUITE_REGISTRATION( ChunkedDecodingStreamFilterTest ); + +void ChunkedDecodingStreamFilterTest::testTransform() +{ + try { + { + std::basic_string msg = + reinterpret_cast("a\r\n1234567890\r\n"); + ssize_t r = filter_->transform(writer_, segment_, msg.data(), msg.size()); + CPPUNIT_ASSERT_EQUAL((ssize_t)10, r); + CPPUNIT_ASSERT_EQUAL(std::string("1234567890"), writer_->getString()); + } + clearWriter(); + // Feed extension; see it is ignored. + { + std::basic_string msg = + reinterpret_cast + ("3;extensionIgnored\r\n123\r\n"); + ssize_t r = filter_->transform(writer_, segment_, msg.data(), msg.size()); + CPPUNIT_ASSERT_EQUAL((ssize_t)3, r); + CPPUNIT_ASSERT_EQUAL(std::string("123"), writer_->getString()); + } + clearWriter(); + // Feed 2extensions; see it is ignored. + { + std::basic_string msg = + reinterpret_cast + ("3;extension1;extension2;\r\n123\r\n"); + ssize_t r = filter_->transform(writer_, segment_, msg.data(), msg.size()); + CPPUNIT_ASSERT_EQUAL((ssize_t)3, r); + CPPUNIT_ASSERT_EQUAL(std::string("123"), writer_->getString()); + } + clearWriter(); + // Not all chunk size is available + { + std::basic_string msg = + reinterpret_cast("1"); + ssize_t r = filter_->transform(writer_, segment_, msg.data(), msg.size()); + CPPUNIT_ASSERT_EQUAL((ssize_t)0, r); + } + clearWriter(); + { + std::basic_string msg = + reinterpret_cast("0\r\n1234567890123456\r\n"); + ssize_t r = filter_->transform(writer_, segment_, msg.data(), msg.size()); + CPPUNIT_ASSERT_EQUAL((ssize_t)16, r); + CPPUNIT_ASSERT_EQUAL(std::string("1234567890123456"), + writer_->getString()); + } + clearWriter(); + // Not all chunk data is available + { + std::basic_string msg = + reinterpret_cast("10\r\n1234567890"); + ssize_t r = filter_->transform(writer_, segment_, msg.data(), msg.size()); + CPPUNIT_ASSERT_EQUAL((ssize_t)10, r); + CPPUNIT_ASSERT_EQUAL(std::string("1234567890"), writer_->getString()); + } + clearWriter(); + { + std::basic_string msg = + reinterpret_cast("123456\r\n"); + ssize_t r = filter_->transform(writer_, segment_, msg.data(), msg.size()); + CPPUNIT_ASSERT_EQUAL((ssize_t)6, r); + CPPUNIT_ASSERT_EQUAL(std::string("123456"), writer_->getString()); + } + clearWriter(); + // no trailing CR LF. + { + std::basic_string msg = + reinterpret_cast("10\r\n1234567890123456"); + ssize_t r = filter_->transform(writer_, segment_, msg.data(), msg.size()); + CPPUNIT_ASSERT_EQUAL((ssize_t)16, r); + CPPUNIT_ASSERT_EQUAL(std::string("1234567890123456"), + writer_->getString()); + } + clearWriter(); + // feed only CR + { + std::basic_string msg = + reinterpret_cast("\r"); + ssize_t r = filter_->transform(writer_, segment_, msg.data(), msg.size()); + CPPUNIT_ASSERT_EQUAL((ssize_t)0, r); + } + // feed next LF + { + std::basic_string msg = + reinterpret_cast("\n"); + ssize_t r = filter_->transform(writer_, segment_, msg.data(), msg.size()); + CPPUNIT_ASSERT_EQUAL((ssize_t)0, r); + CPPUNIT_ASSERT_EQUAL(std::string(""), writer_->getString()); + } + // feed 0 CR LF. + { + std::basic_string msg = + reinterpret_cast("0\r\n"); + ssize_t r = filter_->transform(writer_, segment_, msg.data(), msg.size()); + CPPUNIT_ASSERT_EQUAL((ssize_t)0, r); + } + // feed trailer + { + std::basic_string msg = + reinterpret_cast("trailer\r\n"); + ssize_t r = filter_->transform(writer_, segment_, msg.data(), msg.size()); + CPPUNIT_ASSERT_EQUAL((ssize_t)0, r); + } + // feed final CRLF + { + std::basic_string msg = + reinterpret_cast("\r\n"); + ssize_t r = filter_->transform(writer_, segment_, msg.data(), msg.size()); + CPPUNIT_ASSERT_EQUAL((ssize_t)0, r); + } + // input is over + CPPUNIT_ASSERT(filter_->finished()); + } catch(DlAbortEx& e) { + CPPUNIT_FAIL(e.stackTrace()); + } +} + +void ChunkedDecodingStreamFilterTest::testTransform_withoutTrailer() +{ + CPPUNIT_ASSERT_EQUAL + ((ssize_t)0, + filter_->transform + (writer_, segment_, + reinterpret_cast("0\r\n\r\n"), 5)); + CPPUNIT_ASSERT(filter_->finished()); +} + +void ChunkedDecodingStreamFilterTest::testTransform_with2Trailers() +{ + CPPUNIT_ASSERT_EQUAL + ((ssize_t)0, + filter_->transform + (writer_, segment_, + reinterpret_cast("0\r\nt1\r\nt2\r\n\r\n"), 13)); + CPPUNIT_ASSERT(filter_->finished()); +} + +void ChunkedDecodingStreamFilterTest::testTransform_largeChunkSize() +{ + // chunkSize should be under 2^64-1 + { + std::basic_string msg = + reinterpret_cast("ffffffffffffffff\r\n"); + filter_->transform(writer_, segment_, msg.data(), msg.size()); + } +} + +void ChunkedDecodingStreamFilterTest::testTransform_tooLargeChunkSize() +{ + // chunkSize 2^64 causes error + { + std::basic_string msg = + reinterpret_cast("10000000000000000\r\n"); + try { + filter_->transform(writer_, segment_, msg.data(), msg.size()); + CPPUNIT_FAIL("exception must be thrown."); + } catch(DlAbortEx& e) { + // success + } + } +} + +void ChunkedDecodingStreamFilterTest::testTransform_chunkSizeMismatch() +{ + std::basic_string msg = + reinterpret_cast("3\r\n1234\r\n"); + try { + filter_->transform(writer_, segment_, msg.data(), msg.size()); + CPPUNIT_FAIL("exception must be thrown."); + } catch(DlAbortEx& e) { + // success + } +} + +void ChunkedDecodingStreamFilterTest::testGetName() +{ + CPPUNIT_ASSERT_EQUAL + (std::string("ChunkedDecodingStreamFilter"), filter_->getName()); +} + +} // namespace aria2 diff --git a/test/GZipDecodingStreamFilterTest.cc b/test/GZipDecodingStreamFilterTest.cc new file mode 100644 index 00000000..250f410e --- /dev/null +++ b/test/GZipDecodingStreamFilterTest.cc @@ -0,0 +1,81 @@ +#include "GZipDecodingStreamFilter.h" + +#include +#include +#include + +#include + +#include "Exception.h" +#include "util.h" +#include "Segment.h" +#include "ByteArrayDiskWriter.h" +#include "SinkStreamFilter.h" +#include "MockSegment.h" +#ifdef ENABLE_MESSAGE_DIGEST +# include "MessageDigestHelper.h" +#endif // ENABLE_MESSAGE_DIGEST + +namespace aria2 { + +class GZipDecodingStreamFilterTest:public CppUnit::TestFixture { + + CPPUNIT_TEST_SUITE(GZipDecodingStreamFilterTest); + CPPUNIT_TEST(testTransform); + CPPUNIT_TEST_SUITE_END(); + + class MockSegment2:public MockSegment { + private: + off_t positionToWrite_; + public: + MockSegment2():positionToWrite_(0) {} + + virtual void updateWrittenLength(size_t bytes) + { + positionToWrite_ += bytes; + } + + virtual off_t getPositionToWrite() const + { + return positionToWrite_; + } + }; + + SharedHandle filter_; + SharedHandle sinkFilter_; + SharedHandle writer_; + SharedHandle segment_; +public: + void setUp() + { + writer_.reset(new ByteArrayDiskWriter()); + sinkFilter_.reset(new SinkStreamFilter()); + filter_.reset(new GZipDecodingStreamFilter(sinkFilter_)); + sinkFilter_->init(); + filter_->init(); + segment_.reset(new MockSegment2()); + } + + void testTransform(); +}; + + +CPPUNIT_TEST_SUITE_REGISTRATION(GZipDecodingStreamFilterTest); + +void GZipDecodingStreamFilterTest::testTransform() +{ + unsigned char buf[4096]; + std::ifstream in("gzip_decode_test.gz", std::ios::binary); + while(in) { + in.read(reinterpret_cast(buf), sizeof(buf)); + filter_->transform(writer_, segment_, buf, in.gcount()); + } + CPPUNIT_ASSERT(filter_->finished()); +#ifdef ENABLE_MESSAGE_DIGEST + CPPUNIT_ASSERT_EQUAL(std::string("8b577b33c0411b2be9d4fa74c7402d54a8d21f96"), + MessageDigestHelper::digestString + (MessageDigestContext::SHA1, writer_->getString())); +#endif // ENABLE_MESSAGE_DIGEST +} + +} // namespace aria2 diff --git a/test/HttpResponseTest.cc b/test/HttpResponseTest.cc index dd31da09..a530a884 100644 --- a/test/HttpResponseTest.cc +++ b/test/HttpResponseTest.cc @@ -12,11 +12,11 @@ #include "HttpRequest.h" #include "Exception.h" #include "A2STR.h" -#include "Decoder.h" #include "DlRetryEx.h" #include "CookieStorage.h" #include "AuthConfigFactory.h" #include "AuthConfig.h" +#include "StreamFilter.h" namespace aria2 { @@ -36,10 +36,10 @@ class HttpResponseTest : public CppUnit::TestFixture { CPPUNIT_TEST(testIsRedirect); CPPUNIT_TEST(testIsTransferEncodingSpecified); CPPUNIT_TEST(testGetTransferEncoding); - CPPUNIT_TEST(testGetTransferEncodingDecoder); + CPPUNIT_TEST(testGetTransferEncodingStreamFilter); CPPUNIT_TEST(testIsContentEncodingSpecified); CPPUNIT_TEST(testGetContentEncoding); - CPPUNIT_TEST(testGetContentEncodingDecoder); + CPPUNIT_TEST(testGetContentEncodingStreamFilter); CPPUNIT_TEST(testValidateResponse); CPPUNIT_TEST(testValidateResponse_good_range); CPPUNIT_TEST(testValidateResponse_bad_range); @@ -68,10 +68,10 @@ public: void testIsRedirect(); void testIsTransferEncodingSpecified(); void testGetTransferEncoding(); - void testGetTransferEncodingDecoder(); + void testGetTransferEncodingStreamFilter(); void testIsContentEncodingSpecified(); void testGetContentEncoding(); - void testGetContentEncodingDecoder(); + void testGetContentEncodingStreamFilter(); void testValidateResponse(); void testValidateResponse_good_range(); void testValidateResponse_bad_range(); @@ -253,18 +253,18 @@ void HttpResponseTest::testGetTransferEncoding() httpResponse.getTransferEncoding()); } -void HttpResponseTest::testGetTransferEncodingDecoder() +void HttpResponseTest::testGetTransferEncodingStreamFilter() { HttpResponse httpResponse; SharedHandle httpHeader(new HttpHeader()); httpResponse.setHttpHeader(httpHeader); - CPPUNIT_ASSERT(httpResponse.getTransferEncodingDecoder().isNull()); + CPPUNIT_ASSERT(httpResponse.getTransferEncodingStreamFilter().isNull()); httpHeader->put("Transfer-Encoding", "chunked"); - CPPUNIT_ASSERT(!httpResponse.getTransferEncodingDecoder().isNull()); + CPPUNIT_ASSERT(!httpResponse.getTransferEncodingStreamFilter().isNull()); } void HttpResponseTest::testIsContentEncodingSpecified() @@ -295,37 +295,42 @@ void HttpResponseTest::testGetContentEncoding() CPPUNIT_ASSERT_EQUAL(std::string("gzip"), httpResponse.getContentEncoding()); } -void HttpResponseTest::testGetContentEncodingDecoder() +void HttpResponseTest::testGetContentEncodingStreamFilter() { HttpResponse httpResponse; SharedHandle httpHeader(new HttpHeader()); httpResponse.setHttpHeader(httpHeader); - CPPUNIT_ASSERT(httpResponse.getContentEncodingDecoder().isNull()); + CPPUNIT_ASSERT(httpResponse.getContentEncodingStreamFilter().isNull()); #ifdef HAVE_LIBZ httpHeader->put("Content-Encoding", "gzip"); { - SharedHandle decoder = httpResponse.getContentEncodingDecoder(); - CPPUNIT_ASSERT(!decoder.isNull()); - CPPUNIT_ASSERT_EQUAL(std::string("GZipDecoder"), decoder->getName()); + SharedHandle filter = + httpResponse.getContentEncodingStreamFilter(); + CPPUNIT_ASSERT(!filter.isNull()); + CPPUNIT_ASSERT_EQUAL(std::string("GZipDecodingStreamFilter"), + filter->getName()); } httpHeader.reset(new HttpHeader()); httpResponse.setHttpHeader(httpHeader); httpHeader->put("Content-Encoding", "deflate"); { - SharedHandle decoder = httpResponse.getContentEncodingDecoder(); - CPPUNIT_ASSERT(!decoder.isNull()); - CPPUNIT_ASSERT_EQUAL(std::string("GZipDecoder"), decoder->getName()); + SharedHandle filter = + httpResponse.getContentEncodingStreamFilter(); + CPPUNIT_ASSERT(!filter.isNull()); + CPPUNIT_ASSERT_EQUAL(std::string("GZipDecodingStreamFilter"), + filter->getName()); } #endif // HAVE_LIBZ httpHeader.reset(new HttpHeader()); httpResponse.setHttpHeader(httpHeader); httpHeader->put("Content-Encoding", "bzip2"); { - SharedHandle decoder = httpResponse.getContentEncodingDecoder(); - CPPUNIT_ASSERT(decoder.isNull()); + SharedHandle filter = + httpResponse.getContentEncodingStreamFilter(); + CPPUNIT_ASSERT(filter.isNull()); } } diff --git a/test/Makefile.am b/test/Makefile.am index 20fc2778..9871cc21 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -72,7 +72,9 @@ aria2c_SOURCES = AllTest.cc\ bitfieldTest.cc\ DownloadContextTest.cc\ SessionSerializerTest.cc\ - ValueBaseTest.cc + ValueBaseTest.cc\ + ChunkedDecodingStreamFilterTest.cc\ + GZipDecodingStreamFilterTest.cc if ENABLE_XML_RPC aria2c_SOURCES += XmlRpcRequestParserControllerTest.cc\ diff --git a/test/Makefile.in b/test/Makefile.in index c81a278e..c6717271 100644 --- a/test/Makefile.in +++ b/test/Makefile.in @@ -213,6 +213,8 @@ am__aria2c_SOURCES_DIST = AllTest.cc TestUtil.cc TestUtil.h \ InOrderPieceSelector.h LongestSequencePieceSelectorTest.cc \ a2algoTest.cc bitfieldTest.cc DownloadContextTest.cc \ SessionSerializerTest.cc ValueBaseTest.cc \ + ChunkedDecodingStreamFilterTest.cc \ + GZipDecodingStreamFilterTest.cc \ XmlRpcRequestParserControllerTest.cc \ XmlRpcRequestProcessorTest.cc XmlRpcMethodTest.cc \ FallocFileAllocationIteratorTest.cc GZipDecoderTest.cc \ @@ -404,9 +406,11 @@ am_aria2c_OBJECTS = AllTest.$(OBJEXT) TestUtil.$(OBJEXT) \ LongestSequencePieceSelectorTest.$(OBJEXT) \ a2algoTest.$(OBJEXT) bitfieldTest.$(OBJEXT) \ DownloadContextTest.$(OBJEXT) SessionSerializerTest.$(OBJEXT) \ - ValueBaseTest.$(OBJEXT) $(am__objects_1) $(am__objects_2) \ - $(am__objects_3) $(am__objects_4) $(am__objects_5) \ - $(am__objects_6) $(am__objects_7) + ValueBaseTest.$(OBJEXT) \ + ChunkedDecodingStreamFilterTest.$(OBJEXT) \ + GZipDecodingStreamFilterTest.$(OBJEXT) $(am__objects_1) \ + $(am__objects_2) $(am__objects_3) $(am__objects_4) \ + $(am__objects_5) $(am__objects_6) $(am__objects_7) aria2c_OBJECTS = $(am_aria2c_OBJECTS) am__DEPENDENCIES_1 = aria2c_DEPENDENCIES = ../src/libaria2c.a $(am__DEPENDENCIES_1) @@ -639,7 +643,9 @@ aria2c_SOURCES = AllTest.cc TestUtil.cc TestUtil.h SocketCoreTest.cc \ RarestPieceSelectorTest.cc PieceStatManTest.cc \ InOrderPieceSelector.h LongestSequencePieceSelectorTest.cc \ a2algoTest.cc bitfieldTest.cc DownloadContextTest.cc \ - SessionSerializerTest.cc ValueBaseTest.cc $(am__append_1) \ + SessionSerializerTest.cc ValueBaseTest.cc \ + ChunkedDecodingStreamFilterTest.cc \ + GZipDecodingStreamFilterTest.cc $(am__append_1) \ $(am__append_2) $(am__append_3) $(am__append_4) \ $(am__append_5) $(am__append_6) $(am__append_7) @@ -773,6 +779,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/BtUnchokeMessageTest.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ByteArrayDiskWriterTest.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ChunkedDecoderTest.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ChunkedDecodingStreamFilterTest.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/CookieParserTest.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/CookieStorageTest.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/CookieTest.Po@am__quote@ @@ -822,6 +829,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/FileTest.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/FtpConnectionTest.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/GZipDecoderTest.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/GZipDecodingStreamFilterTest.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/GZipEncoderTest.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/GrowSegmentTest.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/HandshakeExtensionMessageTest.Po@am__quote@ diff --git a/test/MockSegment.h b/test/MockSegment.h new file mode 100644 index 00000000..5be67fc8 --- /dev/null +++ b/test/MockSegment.h @@ -0,0 +1,80 @@ +#ifndef D_MOCK_SEGMENT_H +#define D_MOCK_SEGMENT_H + +#include "Segment.h" +#include "Piece.h" +#include "A2STR.h" + +namespace aria2 { + +class MockSegment:public Segment { +public: + virtual bool complete() const + { + return false; + } + + virtual size_t getIndex() const + { + return 0; + } + + virtual off_t getPosition() const + { + return 0; + } + + virtual off_t getPositionToWrite() const + { + return 0; + } + + virtual size_t getLength() const + { + return 0; + } + + virtual size_t getSegmentLength() const + { + return 0; + } + + virtual size_t getWrittenLength() const + { + return 0; + } + + virtual void updateWrittenLength(size_t bytes) {} + +#ifdef ENABLE_MESSAGE_DIGEST + + // `begin' is a offset inside this segment. + virtual bool updateHash + (uint32_t begin, const unsigned char* data, size_t dataLength) + { + return false; + } + + virtual bool isHashCalculated() const + { + return false; + } + + virtual std::string getHashString() + { + return A2STR::NIL; + } + +#endif // ENABLE_MESSAGE_DIGEST + + virtual void clear() {} + + virtual SharedHandle getPiece() const + { + return SharedHandle(); + } +}; + +} // namespace aria2 + +#endif // D_MOCK_SEGMENT_H