diff --git a/src/AbstractCommand.cc b/src/AbstractCommand.cc index 8c928274..5a15e86a 100644 --- a/src/AbstractCommand.cc +++ b/src/AbstractCommand.cc @@ -66,6 +66,7 @@ #include "uri.h" #include "FileEntry.h" #include "error_code.h" +#include "SocketRecvBuffer.h" #ifdef ENABLE_ASYNC_DNS #include "AsyncNameResolver.h" #endif // ENABLE_ASYNC_DNS @@ -75,20 +76,23 @@ namespace aria2 { -AbstractCommand::AbstractCommand(cuid_t cuid, - const SharedHandle& req, - const SharedHandle& fileEntry, - RequestGroup* requestGroup, - DownloadEngine* e, - const SocketHandle& s, - bool incNumConnection): - Command(cuid), checkPoint_(global::wallclock), - timeout_(requestGroup->getTimeout()), - requestGroup_(requestGroup), - req_(req), fileEntry_(fileEntry), e_(e), socket_(s), - checkSocketIsReadable_(false), checkSocketIsWritable_(false), - nameResolverCheck_(false), - incNumConnection_(incNumConnection) +AbstractCommand::AbstractCommand +(cuid_t cuid, + const SharedHandle& req, + const SharedHandle& fileEntry, + RequestGroup* requestGroup, + DownloadEngine* e, + const SocketHandle& s, + const SharedHandle& socketRecvBuffer, + bool incNumConnection) + : Command(cuid), checkPoint_(global::wallclock), + timeout_(requestGroup->getTimeout()), + requestGroup_(requestGroup), + req_(req), fileEntry_(fileEntry), e_(e), socket_(s), + socketRecvBuffer_(socketRecvBuffer), + checkSocketIsReadable_(false), checkSocketIsWritable_(false), + nameResolverCheck_(false), + incNumConnection_(incNumConnection) { if(socket_ && socket_->isOpen()) { setReadCheckSocket(socket_); @@ -169,7 +173,9 @@ bool AbstractCommand::execute() { } } } - if((checkSocketIsReadable_ && readEventEnabled()) || + if((checkSocketIsReadable_ && + (readEventEnabled() || + (socketRecvBuffer_ && !socketRecvBuffer_->bufferEmpty()))) || (checkSocketIsWritable_ && writeEventEnabled()) || hupEventEnabled() || #ifdef ENABLE_ASYNC_DNS @@ -846,4 +852,12 @@ const SharedHandle& AbstractCommand::getPieceStorage() const return requestGroup_->getPieceStorage(); } +void AbstractCommand::checkSocketRecvBuffer() +{ + if(!socketRecvBuffer_->bufferEmpty()) { + setStatus(Command::STATUS_ONESHOT_REALTIME); + e_->setNoWait(true); + } +} + } // namespace aria2 diff --git a/src/AbstractCommand.h b/src/AbstractCommand.h index 687738b9..d612fada 100644 --- a/src/AbstractCommand.h +++ b/src/AbstractCommand.h @@ -56,6 +56,7 @@ class DownloadEngine; class Segment; class SocketCore; class Option; +class SocketRecvBuffer; #ifdef ENABLE_ASYNC_DNS class AsyncNameResolver; #endif // ENABLE_ASYNC_DNS @@ -70,6 +71,7 @@ private: SharedHandle fileEntry_; DownloadEngine* e_; SharedHandle socket_; + SharedHandle socketRecvBuffer_; std::vector > segments_; #ifdef ENABLE_ASYNC_DNS @@ -128,6 +130,11 @@ protected: void createSocket(); + const SharedHandle& getSocketRecvBuffer() const + { + return socketRecvBuffer_; + } + const std::vector >& getSegments() const { return segments_; @@ -219,12 +226,16 @@ protected: { return checkPoint_; } + + void checkSocketRecvBuffer(); public: AbstractCommand (cuid_t cuid, const SharedHandle& req, const SharedHandle& fileEntry, RequestGroup* requestGroup, DownloadEngine* e, const SharedHandle& s = SharedHandle(), + const SharedHandle& socketRecvBuffer + = SharedHandle(), bool incNumConnection = true); virtual ~AbstractCommand(); diff --git a/src/AbstractProxyRequestCommand.cc b/src/AbstractProxyRequestCommand.cc index c43e32c8..3ee6b74b 100644 --- a/src/AbstractProxyRequestCommand.cc +++ b/src/AbstractProxyRequestCommand.cc @@ -43,6 +43,7 @@ #include "prefs.h" #include "Socket.h" #include "DownloadContext.h" +#include "SocketRecvBuffer.h" namespace aria2 { @@ -57,7 +58,9 @@ AbstractProxyRequestCommand::AbstractProxyRequestCommand : AbstractCommand(cuid, req, fileEntry, requestGroup, e, s), proxyRequest_(proxyRequest), - httpConnection_(new HttpConnection(cuid, s)) + httpConnection_ + (new HttpConnection + (cuid, s, SharedHandle(new SocketRecvBuffer(s)))) { setTimeout(getOption()->getAsInt(PREF_CONNECT_TIMEOUT)); disableReadCheckSocket(); diff --git a/src/AbstractProxyResponseCommand.cc b/src/AbstractProxyResponseCommand.cc index a0e65415..94224514 100644 --- a/src/AbstractProxyResponseCommand.cc +++ b/src/AbstractProxyResponseCommand.cc @@ -46,6 +46,7 @@ #include "message.h" #include "HttpHeader.h" #include "DownloadContext.h" +#include "SocketRecvBuffer.h" namespace aria2 { diff --git a/src/AdaptiveURISelector.cc b/src/AdaptiveURISelector.cc index efe28b80..02a598ff 100644 --- a/src/AdaptiveURISelector.cc +++ b/src/AdaptiveURISelector.cc @@ -54,6 +54,7 @@ #include "FileEntry.h" #include "uri.h" #include "fmt.h" +#include "SocketRecvBuffer.h" namespace aria2 { diff --git a/src/CreateRequestCommand.cc b/src/CreateRequestCommand.cc index d6e84f20..3130b741 100644 --- a/src/CreateRequestCommand.cc +++ b/src/CreateRequestCommand.cc @@ -45,6 +45,7 @@ #include "Option.h" #include "RequestGroupMan.h" #include "FileEntry.h" +#include "SocketRecvBuffer.h" namespace aria2 { @@ -53,7 +54,9 @@ CreateRequestCommand::CreateRequestCommand(cuid_t cuid, DownloadEngine* e): AbstractCommand (cuid, SharedHandle(), SharedHandle(), requestGroup, e, - SharedHandle(), false) + SharedHandle(), + SharedHandle(), + false) { setStatus(Command::STATUS_ONESHOT_REALTIME); disableReadCheckSocket(); diff --git a/src/DownloadCommand.cc b/src/DownloadCommand.cc index 580facdd..99a0f85d 100644 --- a/src/DownloadCommand.cc +++ b/src/DownloadCommand.cc @@ -61,6 +61,7 @@ #include "wallclock.h" #include "SinkStreamFilter.h" #include "FileEntry.h" +#include "SocketRecvBuffer.h" #ifdef ENABLE_MESSAGE_DIGEST # include "MessageDigest.h" # include "MessageDigestHelper.h" @@ -75,16 +76,18 @@ namespace { const size_t BUFSIZE = 16*1024; } // namespace -DownloadCommand::DownloadCommand(cuid_t cuid, - const SharedHandle& req, - const SharedHandle& fileEntry, - RequestGroup* requestGroup, - DownloadEngine* e, - const SocketHandle& s): - AbstractCommand(cuid, req, fileEntry, requestGroup, e, s), - startupIdleTime_(10), - lowestDownloadSpeedLimit_(0), - pieceHashValidationEnabled_(false) +DownloadCommand::DownloadCommand +(cuid_t cuid, + const SharedHandle& req, + const SharedHandle& fileEntry, + RequestGroup* requestGroup, + DownloadEngine* e, + const SocketHandle& s, + const SharedHandle& socketRecvBuffer) + : AbstractCommand(cuid, req, fileEntry, requestGroup, e, s, socketRecvBuffer), + startupIdleTime_(10), + lowestDownloadSpeedLimit_(0), + pieceHashValidationEnabled_(false) { #ifdef ENABLE_MESSAGE_DIGEST { @@ -105,6 +108,7 @@ DownloadCommand::DownloadCommand(cuid_t cuid, streamFilter_.reset(new SinkStreamFilter(pieceHashValidationEnabled_)); streamFilter_->init(); sinkFilterOnly_ = true; + checkSocketRecvBuffer(); } DownloadCommand::~DownloadCommand() { @@ -124,38 +128,53 @@ bool DownloadCommand::executeInternal() { const SharedHandle& diskAdaptor = getPieceStorage()->getDiskAdaptor(); SharedHandle segment = getSegments().front(); - size_t bufSize; - unsigned char buf[BUFSIZE]; - if(sinkFilterOnly_) { - if(segment->getLength() > 0 ) { - if(static_cast(segment->getPosition()+segment->getLength()) <= - static_cast(getFileEntry()->getLastOffset())) { - bufSize = std::min(segment->getLength()-segment->getWrittenLength(), - BUFSIZE); - } else { - bufSize = - std::min - (static_cast - (getFileEntry()->getLastOffset()-segment->getPositionToWrite()), - BUFSIZE); - } - } else { - bufSize = BUFSIZE; - } - getSocket()->readData(buf, bufSize); - streamFilter_->transform(diskAdaptor, segment, buf, bufSize); - } else { - // It is possible that segment is completed but we have some bytes - // of stream to read. For example, chunked encoding has "0"+CRLF - // after data. After we read data(at this moment segment is - // completed), we need another 3bytes(or more if it has trailers). - bufSize = BUFSIZE; - getSocket()->peekData(buf, bufSize); - streamFilter_->transform(diskAdaptor, segment, buf, bufSize); - bufSize = streamFilter_->getBytesProcessed(); - getSocket()->readData(buf, bufSize); + bool eof = false; + if(getSocketRecvBuffer()->bufferEmpty()) { + // Only read from socket when buffer is empty. Imagine that When + // segment length is *short* and we are using HTTP pilelining. We + // issued 2 requests in pipeline. When reading first response + // header, we may read its response body and 2nd response header + // and 2nd response body in buffer if they are small enough to fit + // in buffer. And then server may sends EOF. In this case, we + // read data from socket here, we will get EOF and leaves 2nd + // response unprocessed. To prevent this, we don't read from + // socket when buffer is not empty. + eof = getSocketRecvBuffer()->recv() == 0 && + !getSocket()->wantRead() && !getSocket()->wantWrite(); + } + if(!eof) { + size_t bufSize; + if(sinkFilterOnly_) { + if(segment->getLength() > 0) { + if(static_cast(segment->getPosition()+segment->getLength()) <= + static_cast(getFileEntry()->getLastOffset())) { + bufSize = std::min(segment->getLength()-segment->getWrittenLength(), + getSocketRecvBuffer()->getBufferLength()); + } else { + bufSize = + std::min + (static_cast + (getFileEntry()->getLastOffset()-segment->getPositionToWrite()), + getSocketRecvBuffer()->getBufferLength()); + } + } else { + bufSize = getSocketRecvBuffer()->getBufferLength(); + } + streamFilter_->transform(diskAdaptor, segment, + getSocketRecvBuffer()->getBuffer(), bufSize); + } else { + // It is possible that segment is completed but we have some bytes + // of stream to read. For example, chunked encoding has "0"+CRLF + // after data. After we read data(at this moment segment is + // completed), we need another 3bytes(or more if it has trailers). + streamFilter_->transform(diskAdaptor, segment, + getSocketRecvBuffer()->getBuffer(), + getSocketRecvBuffer()->getBufferLength()); + bufSize = streamFilter_->getBytesProcessed(); + } + getSocketRecvBuffer()->shiftBuffer(bufSize); + peerStat_->updateDownloadLength(bufSize); } - peerStat_->updateDownloadLength(bufSize); getSegmentMan()->updateDownloadSpeedFor(peerStat_); bool segmentPartComplete = false; // Note that GrowSegment::complete() always returns false. @@ -163,8 +182,7 @@ bool DownloadCommand::executeInternal() { if(segment->complete() || segment->getPositionToWrite() == getFileEntry()->getLastOffset()) { segmentPartComplete = true; - } else if(segment->getLength() == 0 && bufSize == 0 && - !getSocket()->wantRead() && !getSocket()->wantWrite()) { + } else if(segment->getLength() == 0 && eof) { segmentPartComplete = true; } } else { @@ -187,8 +205,7 @@ bool DownloadCommand::executeInternal() { } } - if(!segmentPartComplete && bufSize == 0 && - !getSocket()->wantRead() && !getSocket()->wantWrite()) { + if(!segmentPartComplete && eof) { throw DL_RETRY_EX(EX_GOT_EOF); } @@ -245,6 +262,7 @@ bool DownloadCommand::executeInternal() { } else { checkLowestDownloadSpeed(); setWriteCheckSocketIf(getSocket(), getSocket()->wantWrite()); + checkSocketRecvBuffer(); getDownloadEngine()->addCommand(this); return false; } @@ -288,8 +306,6 @@ bool DownloadCommand::prepareForNextSegment() { getDownloadEngine()->getCheckIntegrityMan()->pushEntry(entry); } } - // Following 2lines are needed for DownloadEngine to detect - // completed RequestGroups without 1sec delay. getDownloadEngine()->setNoWait(true); getDownloadEngine()->setRefreshInterval(0); #endif // ENABLE_MESSAGE_DIGEST @@ -319,6 +335,7 @@ bool DownloadCommand::prepareForNextSegment() { // nextSegment->getWrittenLength() corrupts file. return prepareForRetry(0); } else { + checkSocketRecvBuffer(); getDownloadEngine()->addCommand(this); return false; } diff --git a/src/DownloadCommand.h b/src/DownloadCommand.h index 5210d5ae..85dcc8f5 100644 --- a/src/DownloadCommand.h +++ b/src/DownloadCommand.h @@ -83,7 +83,8 @@ public: const SharedHandle& fileEntry, RequestGroup* requestGroup, DownloadEngine* e, - const SharedHandle& s); + const SharedHandle& s, + const SharedHandle& socketRecvBuffer); virtual ~DownloadCommand(); const SharedHandle& getStreamFilter() const diff --git a/src/FtpDownloadCommand.cc b/src/FtpDownloadCommand.cc index 8abd826b..fc55391f 100644 --- a/src/FtpDownloadCommand.cc +++ b/src/FtpDownloadCommand.cc @@ -44,6 +44,7 @@ #include "FtpConnection.h" #include "Logger.h" #include "FileEntry.h" +#include "SocketRecvBuffer.h" namespace aria2 { @@ -56,13 +57,14 @@ FtpDownloadCommand::FtpDownloadCommand DownloadEngine* e, const SocketHandle& dataSocket, const SocketHandle& ctrlSocket) - :DownloadCommand(cuid, req, fileEntry, requestGroup, e, dataSocket), + :DownloadCommand(cuid, req, fileEntry, requestGroup, e, dataSocket, + SharedHandle + (new SocketRecvBuffer(dataSocket))), ftpConnection_(ftpConnection), ctrlSocket_(ctrlSocket) {} FtpDownloadCommand::~FtpDownloadCommand() {} - bool FtpDownloadCommand::prepareForNextSegment() { if(getOption()->getAsBool(PREF_FTP_REUSE_CONNECTION) && diff --git a/src/FtpFinishDownloadCommand.cc b/src/FtpFinishDownloadCommand.cc index ce63a73f..af7f5ad2 100644 --- a/src/FtpFinishDownloadCommand.cc +++ b/src/FtpFinishDownloadCommand.cc @@ -50,6 +50,7 @@ #include "LogFactory.h" #include "util.h" #include "wallclock.h" +#include "SocketRecvBuffer.h" namespace aria2 { diff --git a/src/FtpInitiateConnectionCommand.cc b/src/FtpInitiateConnectionCommand.cc index b47bb534..0a5951da 100644 --- a/src/FtpInitiateConnectionCommand.cc +++ b/src/FtpInitiateConnectionCommand.cc @@ -55,6 +55,7 @@ #include "AuthConfigFactory.h" #include "AuthConfig.h" #include "fmt.h" +#include "SocketRecvBuffer.h" namespace aria2 { @@ -99,8 +100,10 @@ Command* FtpInitiateConnectionCommand::createNextCommand if(proxyMethod == V_GET) { // Use GET for FTP via HTTP proxy. getRequest()->setMethod(Request::METHOD_GET); + SharedHandle socketRecvBuffer + (new SocketRecvBuffer(getSocket())); SharedHandle hc - (new HttpConnection(getCuid(), getSocket())); + (new HttpConnection(getCuid(), getSocket(), socketRecvBuffer)); HttpRequestCommand* c = new HttpRequestCommand(getCuid(), getRequest(), getFileEntry(), @@ -130,8 +133,10 @@ Command* FtpInitiateConnectionCommand::createNextCommand } else if(proxyMethod == V_GET) { // Use GET for FTP via HTTP proxy. getRequest()->setMethod(Request::METHOD_GET); + SharedHandle socketRecvBuffer + (new SocketRecvBuffer(pooledSocket)); SharedHandle hc - (new HttpConnection(getCuid(), pooledSocket)); + (new HttpConnection(getCuid(), pooledSocket, socketRecvBuffer)); HttpRequestCommand* c = new HttpRequestCommand(getCuid(), getRequest(), getFileEntry(), diff --git a/src/FtpNegotiationCommand.cc b/src/FtpNegotiationCommand.cc index 7ad09340..2a99f0ac 100644 --- a/src/FtpNegotiationCommand.cc +++ b/src/FtpNegotiationCommand.cc @@ -73,6 +73,7 @@ #include "DlRetryEx.h" #include "CheckIntegrityEntry.h" #include "error_code.h" +#include "SocketRecvBuffer.h" namespace aria2 { @@ -677,7 +678,9 @@ bool FtpNegotiationCommand::resolveProxy() dataSocket_->establishConnection(proxyAddr_, proxyReq->getPort()); disableReadCheckSocket(); setWriteCheckSocket(dataSocket_); - http_.reset(new HttpConnection(getCuid(), dataSocket_)); + SharedHandle socketRecvBuffer + (new SocketRecvBuffer(dataSocket_)); + http_.reset(new HttpConnection(getCuid(), dataSocket_, socketRecvBuffer)); sequence_ = SEQ_SEND_TUNNEL_REQUEST; return false; } diff --git a/src/FtpTunnelRequestCommand.cc b/src/FtpTunnelRequestCommand.cc index c6ff69fe..8ea81617 100644 --- a/src/FtpTunnelRequestCommand.cc +++ b/src/FtpTunnelRequestCommand.cc @@ -37,6 +37,7 @@ #include "Request.h" #include "Socket.h" #include "DownloadContext.h" +#include "SocketRecvBuffer.h" namespace aria2 { diff --git a/src/FtpTunnelResponseCommand.cc b/src/FtpTunnelResponseCommand.cc index 354a6a7f..a17eeda3 100644 --- a/src/FtpTunnelResponseCommand.cc +++ b/src/FtpTunnelResponseCommand.cc @@ -39,6 +39,7 @@ #include "HttpRequest.h" #include "Segment.h" #include "Socket.h" +#include "SocketRecvBuffer.h" namespace aria2 { diff --git a/src/HttpConnection.cc b/src/HttpConnection.cc index effb4d27..9433d3bd 100644 --- a/src/HttpConnection.cc +++ b/src/HttpConnection.cc @@ -56,6 +56,7 @@ #include "AuthConfig.h" #include "a2functional.h" #include "fmt.h" +#include "SocketRecvBuffer.h" namespace aria2 { @@ -67,9 +68,13 @@ HttpRequestEntry::HttpRequestEntry HttpRequestEntry::~HttpRequestEntry() {} -HttpConnection::HttpConnection(cuid_t cuid, const SocketHandle& socket) +HttpConnection::HttpConnection +(cuid_t cuid, + const SocketHandle& socket, + const SharedHandle& socketRecvBuffer) : cuid_(cuid), socket_(socket), + socketRecvBuffer_(socketRecvBuffer), socketBuffer_(socket) {} @@ -126,36 +131,34 @@ SharedHandle HttpConnection::receiveResponse() } HttpRequestEntryHandle entry = outstandingHttpRequests_.front(); HttpHeaderProcessorHandle proc = entry->getHttpHeaderProcessor(); - - unsigned char buf[512]; - size_t size = sizeof(buf); - socket_->peekData(buf, size); - if(size == 0) { - if(socket_->wantRead() || socket_->wantWrite()) { - return SharedHandle(); - } else { + if(socketRecvBuffer_->bufferEmpty()) { + if(socketRecvBuffer_->recv() == 0 && + !socket_->wantRead() && !socket_->wantWrite()) { throw DL_RETRY_EX(EX_GOT_EOF); } } - proc->update(buf, size); - if(!proc->eoh()) { - socket_->readData(buf, size); - return SharedHandle(); + proc->update(socketRecvBuffer_->getBuffer(), + socketRecvBuffer_->getBufferLength()); + SharedHandle httpResponse; + size_t shiftBufferLength; + if(proc->eoh()) { + SharedHandle httpHeader = proc->getHttpResponseHeader(); + size_t putbackDataLength = proc->getPutBackDataLength(); + A2_LOG_INFO(fmt(MSG_RECEIVE_RESPONSE, + cuid_, + proc->getHeaderString().c_str())); + assert(socketRecvBuffer_->getBufferLength() >= putbackDataLength); + shiftBufferLength = + socketRecvBuffer_->getBufferLength()-putbackDataLength; + httpResponse.reset(new HttpResponse()); + httpResponse->setCuid(cuid_); + httpResponse->setHttpHeader(httpHeader); + httpResponse->setHttpRequest(entry->getHttpRequest()); + outstandingHttpRequests_.pop_front(); + } else { + shiftBufferLength = socketRecvBuffer_->getBufferLength(); } - size_t putbackDataLength = proc->getPutBackDataLength(); - size -= putbackDataLength; - socket_->readData(buf, size); - A2_LOG_INFO(fmt(MSG_RECEIVE_RESPONSE, - cuid_, - proc->getHeaderString().c_str())); - SharedHandle httpHeader = proc->getHttpResponseHeader(); - SharedHandle httpResponse(new HttpResponse()); - httpResponse->setCuid(cuid_); - httpResponse->setHttpHeader(httpHeader); - httpResponse->setHttpRequest(entry->getHttpRequest()); - - outstandingHttpRequests_.pop_front(); - + socketRecvBuffer_->shiftBuffer(shiftBufferLength); return httpResponse; } diff --git a/src/HttpConnection.h b/src/HttpConnection.h index 9c7facea..4467a4d2 100644 --- a/src/HttpConnection.h +++ b/src/HttpConnection.h @@ -52,6 +52,7 @@ class HttpHeaderProcessor; class Option; class Segment; class SocketCore; +class SocketRecvBuffer; class HttpRequestEntry { private: @@ -80,6 +81,7 @@ class HttpConnection { private: cuid_t cuid_; SharedHandle socket_; + SharedHandle socketRecvBuffer_; SocketBuffer socketBuffer_; const Option* option_; @@ -87,7 +89,10 @@ private: std::string eraseConfidentialInfo(const std::string& request); public: - HttpConnection(cuid_t cuid, const SharedHandle& socket); + HttpConnection + (cuid_t cuid, + const SharedHandle& socket, + const SharedHandle& socketRecvBuffer); ~HttpConnection(); /** @@ -124,6 +129,11 @@ public: bool sendBufferIsEmpty() const; void sendPendingData(); + + const SharedHandle& getSocketRecvBuffer() const + { + return socketRecvBuffer_; + } }; typedef SharedHandle HttpConnectionHandle; diff --git a/src/HttpDownloadCommand.cc b/src/HttpDownloadCommand.cc index 7503ee16..66fcc069 100644 --- a/src/HttpDownloadCommand.cc +++ b/src/HttpDownloadCommand.cc @@ -51,6 +51,7 @@ #include "StreamFilter.h" #include "SinkStreamFilter.h" #include "util.h" +#include "SocketRecvBuffer.h" namespace aria2 { @@ -63,9 +64,11 @@ HttpDownloadCommand::HttpDownloadCommand const HttpConnectionHandle& httpConnection, DownloadEngine* e, const SocketHandle& socket) - :DownloadCommand(cuid, req, fileEntry, requestGroup, e, socket), - httpResponse_(httpResponse), - httpConnection_(httpConnection) {} + : DownloadCommand(cuid, req, fileEntry, requestGroup, e, socket, + httpConnection->getSocketRecvBuffer()), + httpResponse_(httpResponse), + httpConnection_(httpConnection) +{} HttpDownloadCommand::~HttpDownloadCommand() {} diff --git a/src/HttpInitiateConnectionCommand.cc b/src/HttpInitiateConnectionCommand.cc index b86493f9..5b7f1ced 100644 --- a/src/HttpInitiateConnectionCommand.cc +++ b/src/HttpInitiateConnectionCommand.cc @@ -50,6 +50,7 @@ #include "A2STR.h" #include "util.h" #include "fmt.h" +#include "SocketRecvBuffer.h" namespace aria2 { @@ -94,8 +95,10 @@ Command* HttpInitiateConnectionCommand::createNextCommand getSocket()); command = c; } else if(proxyMethod == V_GET) { + SharedHandle socketRecvBuffer + (new SocketRecvBuffer(getSocket())); SharedHandle httpConnection - (new HttpConnection(getCuid(), getSocket())); + (new HttpConnection(getCuid(), getSocket(), socketRecvBuffer)); HttpRequestCommand* c = new HttpRequestCommand(getCuid(), getRequest(), getFileEntry(), @@ -111,8 +114,10 @@ Command* HttpInitiateConnectionCommand::createNextCommand } } else { setConnectedAddrInfo(getRequest(), hostname, pooledSocket); + SharedHandle socketRecvBuffer + (new SocketRecvBuffer(pooledSocket)); SharedHandle httpConnection - (new HttpConnection(getCuid(), pooledSocket)); + (new HttpConnection(getCuid(), pooledSocket, socketRecvBuffer)); HttpRequestCommand* c = new HttpRequestCommand(getCuid(), getRequest(), getFileEntry(), @@ -139,8 +144,10 @@ Command* HttpInitiateConnectionCommand::createNextCommand setSocket(pooledSocket); setConnectedAddrInfo(getRequest(), hostname, pooledSocket); } + SharedHandle socketRecvBuffer + (new SocketRecvBuffer(getSocket())); SharedHandle httpConnection - (new HttpConnection(getCuid(), getSocket())); + (new HttpConnection(getCuid(), getSocket(), socketRecvBuffer)); HttpRequestCommand* c = new HttpRequestCommand(getCuid(), getRequest(), getFileEntry(), getRequestGroup(), diff --git a/src/HttpProxyRequestCommand.cc b/src/HttpProxyRequestCommand.cc index a315e079..5764f42b 100644 --- a/src/HttpProxyRequestCommand.cc +++ b/src/HttpProxyRequestCommand.cc @@ -36,6 +36,7 @@ #include "HttpProxyResponseCommand.h" #include "Request.h" #include "Socket.h" +#include "SocketRecvBuffer.h" namespace aria2 { diff --git a/src/HttpProxyResponseCommand.cc b/src/HttpProxyResponseCommand.cc index b41952ea..d3e819a8 100644 --- a/src/HttpProxyResponseCommand.cc +++ b/src/HttpProxyResponseCommand.cc @@ -39,6 +39,7 @@ #include "HttpRequest.h" #include "Segment.h" #include "Socket.h" +#include "SocketRecvBuffer.h" namespace aria2 { diff --git a/src/HttpRequestCommand.cc b/src/HttpRequestCommand.cc index a16e7691..b9146760 100644 --- a/src/HttpRequestCommand.cc +++ b/src/HttpRequestCommand.cc @@ -58,6 +58,7 @@ #include "Logger.h" #include "LogFactory.h" #include "fmt.h" +#include "SocketRecvBuffer.h" namespace aria2 { @@ -69,7 +70,8 @@ HttpRequestCommand::HttpRequestCommand const HttpConnectionHandle& httpConnection, DownloadEngine* e, const SocketHandle& s) - : AbstractCommand(cuid, req, fileEntry, requestGroup, e, s), + : AbstractCommand(cuid, req, fileEntry, requestGroup, e, s, + httpConnection->getSocketRecvBuffer()), httpConnection_(httpConnection) { setTimeout(getOption()->getAsInt(PREF_CONNECT_TIMEOUT)); diff --git a/src/HttpResponseCommand.cc b/src/HttpResponseCommand.cc index 47cab4b1..f5072dc6 100644 --- a/src/HttpResponseCommand.cc +++ b/src/HttpResponseCommand.cc @@ -71,6 +71,7 @@ #include "SinkStreamFilter.h" #include "ChunkedDecodingStreamFilter.h" #include "uri.h" +#include "SocketRecvBuffer.h" #ifdef HAVE_LIBZ # include "GZipDecodingStreamFilter.h" #endif // HAVE_LIBZ @@ -134,9 +135,12 @@ HttpResponseCommand::HttpResponseCommand const HttpConnectionHandle& httpConnection, DownloadEngine* e, const SocketHandle& s) - : AbstractCommand(cuid, req, fileEntry, requestGroup, e, s), + : AbstractCommand(cuid, req, fileEntry, requestGroup, e, s, + httpConnection->getSocketRecvBuffer()), httpConnection_(httpConnection) -{} +{ + checkSocketRecvBuffer(); +} HttpResponseCommand::~HttpResponseCommand() {} diff --git a/src/HttpSkipResponseCommand.cc b/src/HttpSkipResponseCommand.cc index 05002fad..3dacab79 100644 --- a/src/HttpSkipResponseCommand.cc +++ b/src/HttpSkipResponseCommand.cc @@ -59,6 +59,7 @@ #include "NullSinkStreamFilter.h" #include "SinkStreamFilter.h" #include "error_code.h" +#include "SocketRecvBuffer.h" namespace aria2 { @@ -71,14 +72,17 @@ HttpSkipResponseCommand::HttpSkipResponseCommand const SharedHandle& httpResponse, DownloadEngine* e, const SharedHandle& s) - : AbstractCommand(cuid, req, fileEntry, requestGroup, e, s), + : AbstractCommand(cuid, req, fileEntry, requestGroup, e, s, + httpConnection->getSocketRecvBuffer()), httpConnection_(httpConnection), httpResponse_(httpResponse), streamFilter_(new NullSinkStreamFilter()), sinkFilterOnly_(true), totalLength_(httpResponse_->getEntityLength()), receivedBytes_(0) -{} +{ + checkSocketRecvBuffer(); +} HttpSkipResponseCommand::~HttpSkipResponseCommand() {} @@ -108,30 +112,34 @@ bool HttpSkipResponseCommand::executeInternal() } return processResponse(); } - const size_t BUFSIZE = 16*1024; - unsigned char buf[BUFSIZE]; - size_t bufSize; - if(sinkFilterOnly_ && totalLength_ > 0) { - bufSize = totalLength_-receivedBytes_; - } else { - bufSize = BUFSIZE; - } + bool eof = false; try { - if(sinkFilterOnly_) { - getSocket()->readData(buf, bufSize); - receivedBytes_ += bufSize; - } else { - getSocket()->peekData(buf, bufSize); - // receivedBytes_ is not updated if transferEncoding is set. - // The return value is safely ignored here. - streamFilter_->transform(SharedHandle(), - SharedHandle(), - buf, bufSize); - bufSize = streamFilter_->getBytesProcessed(); - getSocket()->readData(buf, bufSize); + size_t bufSize; + if(getSocketRecvBuffer()->bufferEmpty()) { + eof = getSocketRecvBuffer()->recv() == 0 && + !getSocket()->wantRead() && !getSocket()->wantWrite(); } - if(totalLength_ != 0 && bufSize == 0 && - !getSocket()->wantRead() && !getSocket()->wantWrite()) { + if(!eof) { + if(sinkFilterOnly_) { + if(totalLength_ > 0) { + bufSize = std::min(totalLength_-receivedBytes_, + getSocketRecvBuffer()->getBufferLength()); + } else { + bufSize = getSocketRecvBuffer()->getBufferLength(); + } + receivedBytes_ += bufSize; + } else { + // receivedBytes_ is not updated if transferEncoding is set. + // The return value is safely ignored here. + streamFilter_->transform(SharedHandle(), + SharedHandle(), + getSocketRecvBuffer()->getBuffer(), + getSocketRecvBuffer()->getBufferLength()); + bufSize = streamFilter_->getBytesProcessed(); + } + getSocketRecvBuffer()->shiftBuffer(bufSize); + } + if(totalLength_ != 0 && eof) { throw DL_RETRY_EX(EX_GOT_EOF); } } catch(RecoverableException& e) { @@ -141,10 +149,8 @@ bool HttpSkipResponseCommand::executeInternal() bool finished = false; if(sinkFilterOnly_) { - if(bufSize == 0) { - if(!getSocket()->wantRead() && !getSocket()->wantWrite()) { - return processResponse(); - } + if(eof) { + return processResponse(); } else { finished = (totalLength_ == receivedBytes_); } diff --git a/src/HttpSkipResponseCommand.h b/src/HttpSkipResponseCommand.h index df44c364..4a04b43c 100644 --- a/src/HttpSkipResponseCommand.h +++ b/src/HttpSkipResponseCommand.h @@ -63,14 +63,15 @@ private: protected: virtual bool executeInternal(); public: - HttpSkipResponseCommand(cuid_t cuid, - const SharedHandle& req, - const SharedHandle& fileEntry, - RequestGroup* requestGroup, - const SharedHandle& httpConnection, - const SharedHandle& httpResponse, - DownloadEngine* e, - const SharedHandle& s); + HttpSkipResponseCommand + (cuid_t cuid, + const SharedHandle& req, + const SharedHandle& fileEntry, + RequestGroup* requestGroup, + const SharedHandle& httpConnection, + const SharedHandle& httpResponse, + DownloadEngine* e, + const SharedHandle& s); virtual ~HttpSkipResponseCommand(); diff --git a/src/InitiateConnectionCommand.cc b/src/InitiateConnectionCommand.cc index 2ac9dd4a..baee1f74 100644 --- a/src/InitiateConnectionCommand.cc +++ b/src/InitiateConnectionCommand.cc @@ -50,6 +50,7 @@ #include "util.h" #include "RecoverableException.h" #include "fmt.h" +#include "SocketRecvBuffer.h" namespace aria2 { diff --git a/src/InitiateConnectionCommandFactory.cc b/src/InitiateConnectionCommandFactory.cc index 50541420..8b310a3e 100644 --- a/src/InitiateConnectionCommandFactory.cc +++ b/src/InitiateConnectionCommandFactory.cc @@ -43,6 +43,7 @@ #include "Option.h" #include "prefs.h" #include "SocketCore.h" +#include "SocketRecvBuffer.h" namespace aria2 { diff --git a/src/Makefile.am b/src/Makefile.am index b2954835..fe943f3e 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -182,6 +182,7 @@ SRCS = Socket.h\ NsCookieParser.cc NsCookieParser.h\ CookieStorage.cc CookieStorage.h\ SocketBuffer.cc SocketBuffer.h\ + SocketRecvBuffer.cc SocketRecvBuffer.h\ OptionHandlerException.cc OptionHandlerException.h\ URIResult.cc URIResult.h\ EventPoll.h\ diff --git a/src/RequestGroup.cc b/src/RequestGroup.cc index d1cedc1a..a42f76ed 100644 --- a/src/RequestGroup.cc +++ b/src/RequestGroup.cc @@ -77,6 +77,7 @@ #include "SocketCore.h" #include "SimpleRandomizer.h" #include "Segment.h" +#include "SocketRecvBuffer.h" #ifdef ENABLE_MESSAGE_DIGEST # include "CheckIntegrityCommand.h" # include "ChecksumCheckIntegrityEntry.h" diff --git a/src/SocketRecvBuffer.cc b/src/SocketRecvBuffer.cc new file mode 100644 index 00000000..2c5a1cdf --- /dev/null +++ b/src/SocketRecvBuffer.cc @@ -0,0 +1,77 @@ +/* */ +#include "SocketRecvBuffer.h" + +#include + +#include "SocketCore.h" +#include "LogFactory.h" + +namespace aria2 { + +SocketRecvBuffer::SocketRecvBuffer +(const SharedHandle& socket, + size_t capacity) + : socket_(socket), + capacity_(capacity), + buf_(new unsigned char[capacity_]), + bufLen_(0) +{} + +SocketRecvBuffer::~SocketRecvBuffer() +{ + delete [] buf_; +} + +ssize_t SocketRecvBuffer::recv() +{ + size_t len = capacity_-bufLen_; + if(len > 0) { + socket_->readData(buf_+bufLen_, len); + bufLen_ += len; + } else { + A2_LOG_DEBUG("Buffer full"); + } + return len; +} + +void SocketRecvBuffer::shiftBuffer(size_t offset) +{ + assert(offset <= bufLen_); + memmove(buf_, buf_+offset, bufLen_-offset); + bufLen_ -= offset; +} + +} // namespace aria2 diff --git a/src/SocketRecvBuffer.h b/src/SocketRecvBuffer.h new file mode 100644 index 00000000..0a562edc --- /dev/null +++ b/src/SocketRecvBuffer.h @@ -0,0 +1,86 @@ +/* */ +#ifndef D_SOCKET_RECV_BUFFER_H +#define D_SOCKET_RECV_BUFFER_H + +#include "common.h" +#include "SharedHandle.h" + +namespace aria2 { + +class SocketCore; + +class SocketRecvBuffer { +public: + SocketRecvBuffer + (const SharedHandle& socket, + size_t capacity = 16*1024); + ~SocketRecvBuffer(); + // Reads data from socket as much as capacity allows. Returns the + // number of bytes read. + ssize_t recv(); + // Shifts buffer by offset bytes. offset must satisfy offset <= + // getBufferLength(). + void shiftBuffer(size_t offset); + + const SharedHandle& getSocket() const + { + return socket_; + } + + const unsigned char* getBuffer() const + { + return buf_; + } + + size_t getBufferLength() const + { + return bufLen_; + } + + bool bufferEmpty() const + { + return bufLen_ == 0; + } +private: + SharedHandle socket_; + size_t capacity_; + unsigned char* buf_; + size_t bufLen_; +}; + +} // namespace aria2 + +#endif // D_SOCKET_RECV_BUFFER_H