Eliminated SocketCore::peekData() form HTTP/FTP downloads.

We introduced SocketRecvBuffer which buffers received bytes. Since
HTTP response header and response body are divided with \r\n, we have
to buffer up several bytes to find this delimiter. We use
SocketRecvBuffer to hold these bytes and only consumes header and
passes SocketRecvBuffer, which may contain head of response body, to
next Command.  Since FTPConnection doesn't use SocketCore::peekData(),
we left it as is.
pull/1/head
Tatsuhiro Tsujikawa 2011-01-16 16:55:41 +09:00
parent 602a9d6ba3
commit ea1b4b3ee5
30 changed files with 412 additions and 143 deletions

View File

@ -66,6 +66,7 @@
#include "uri.h"
#include "FileEntry.h"
#include "error_code.h"
#include "SocketRecvBuffer.h"
#ifdef ENABLE_ASYNC_DNS
#include "AsyncNameResolver.h"
#endif // ENABLE_ASYNC_DNS
@ -75,17 +76,20 @@
namespace aria2 {
AbstractCommand::AbstractCommand(cuid_t cuid,
AbstractCommand::AbstractCommand
(cuid_t cuid,
const SharedHandle<Request>& req,
const SharedHandle<FileEntry>& fileEntry,
RequestGroup* requestGroup,
DownloadEngine* e,
const SocketHandle& s,
bool incNumConnection):
Command(cuid), checkPoint_(global::wallclock),
const SharedHandle<SocketRecvBuffer>& socketRecvBuffer,
bool incNumConnection)
: Command(cuid), checkPoint_(global::wallclock),
timeout_(requestGroup->getTimeout()),
requestGroup_(requestGroup),
req_(req), fileEntry_(fileEntry), e_(e), socket_(s),
socketRecvBuffer_(socketRecvBuffer),
checkSocketIsReadable_(false), checkSocketIsWritable_(false),
nameResolverCheck_(false),
incNumConnection_(incNumConnection)
@ -169,7 +173,9 @@ bool AbstractCommand::execute() {
}
}
}
if((checkSocketIsReadable_ && readEventEnabled()) ||
if((checkSocketIsReadable_ &&
(readEventEnabled() ||
(socketRecvBuffer_ && !socketRecvBuffer_->bufferEmpty()))) ||
(checkSocketIsWritable_ && writeEventEnabled()) ||
hupEventEnabled() ||
#ifdef ENABLE_ASYNC_DNS
@ -846,4 +852,12 @@ const SharedHandle<PieceStorage>& AbstractCommand::getPieceStorage() const
return requestGroup_->getPieceStorage();
}
void AbstractCommand::checkSocketRecvBuffer()
{
if(!socketRecvBuffer_->bufferEmpty()) {
setStatus(Command::STATUS_ONESHOT_REALTIME);
e_->setNoWait(true);
}
}
} // namespace aria2

View File

@ -56,6 +56,7 @@ class DownloadEngine;
class Segment;
class SocketCore;
class Option;
class SocketRecvBuffer;
#ifdef ENABLE_ASYNC_DNS
class AsyncNameResolver;
#endif // ENABLE_ASYNC_DNS
@ -70,6 +71,7 @@ private:
SharedHandle<FileEntry> fileEntry_;
DownloadEngine* e_;
SharedHandle<SocketCore> socket_;
SharedHandle<SocketRecvBuffer> socketRecvBuffer_;
std::vector<SharedHandle<Segment> > segments_;
#ifdef ENABLE_ASYNC_DNS
@ -128,6 +130,11 @@ protected:
void createSocket();
const SharedHandle<SocketRecvBuffer>& getSocketRecvBuffer() const
{
return socketRecvBuffer_;
}
const std::vector<SharedHandle<Segment> >& getSegments() const
{
return segments_;
@ -219,12 +226,16 @@ protected:
{
return checkPoint_;
}
void checkSocketRecvBuffer();
public:
AbstractCommand
(cuid_t cuid, const SharedHandle<Request>& req,
const SharedHandle<FileEntry>& fileEntry,
RequestGroup* requestGroup, DownloadEngine* e,
const SharedHandle<SocketCore>& s = SharedHandle<SocketCore>(),
const SharedHandle<SocketRecvBuffer>& socketRecvBuffer
= SharedHandle<SocketRecvBuffer>(),
bool incNumConnection = true);
virtual ~AbstractCommand();

View File

@ -43,6 +43,7 @@
#include "prefs.h"
#include "Socket.h"
#include "DownloadContext.h"
#include "SocketRecvBuffer.h"
namespace aria2 {
@ -57,7 +58,9 @@ AbstractProxyRequestCommand::AbstractProxyRequestCommand
:
AbstractCommand(cuid, req, fileEntry, requestGroup, e, s),
proxyRequest_(proxyRequest),
httpConnection_(new HttpConnection(cuid, s))
httpConnection_
(new HttpConnection
(cuid, s, SharedHandle<SocketRecvBuffer>(new SocketRecvBuffer(s))))
{
setTimeout(getOption()->getAsInt(PREF_CONNECT_TIMEOUT));
disableReadCheckSocket();

View File

@ -46,6 +46,7 @@
#include "message.h"
#include "HttpHeader.h"
#include "DownloadContext.h"
#include "SocketRecvBuffer.h"
namespace aria2 {

View File

@ -54,6 +54,7 @@
#include "FileEntry.h"
#include "uri.h"
#include "fmt.h"
#include "SocketRecvBuffer.h"
namespace aria2 {

View File

@ -45,6 +45,7 @@
#include "Option.h"
#include "RequestGroupMan.h"
#include "FileEntry.h"
#include "SocketRecvBuffer.h"
namespace aria2 {
@ -53,7 +54,9 @@ CreateRequestCommand::CreateRequestCommand(cuid_t cuid,
DownloadEngine* e):
AbstractCommand
(cuid, SharedHandle<Request>(), SharedHandle<FileEntry>(), requestGroup, e,
SharedHandle<SocketCore>(), false)
SharedHandle<SocketCore>(),
SharedHandle<SocketRecvBuffer>(),
false)
{
setStatus(Command::STATUS_ONESHOT_REALTIME);
disableReadCheckSocket();

View File

@ -61,6 +61,7 @@
#include "wallclock.h"
#include "SinkStreamFilter.h"
#include "FileEntry.h"
#include "SocketRecvBuffer.h"
#ifdef ENABLE_MESSAGE_DIGEST
# include "MessageDigest.h"
# include "MessageDigestHelper.h"
@ -75,13 +76,15 @@ namespace {
const size_t BUFSIZE = 16*1024;
} // namespace
DownloadCommand::DownloadCommand(cuid_t cuid,
DownloadCommand::DownloadCommand
(cuid_t cuid,
const SharedHandle<Request>& req,
const SharedHandle<FileEntry>& fileEntry,
RequestGroup* requestGroup,
DownloadEngine* e,
const SocketHandle& s):
AbstractCommand(cuid, req, fileEntry, requestGroup, e, s),
const SocketHandle& s,
const SharedHandle<SocketRecvBuffer>& socketRecvBuffer)
: AbstractCommand(cuid, req, fileEntry, requestGroup, e, s, socketRecvBuffer),
startupIdleTime_(10),
lowestDownloadSpeedLimit_(0),
pieceHashValidationEnabled_(false)
@ -105,6 +108,7 @@ DownloadCommand::DownloadCommand(cuid_t cuid,
streamFilter_.reset(new SinkStreamFilter(pieceHashValidationEnabled_));
streamFilter_->init();
sinkFilterOnly_ = true;
checkSocketRecvBuffer();
}
DownloadCommand::~DownloadCommand() {
@ -124,38 +128,53 @@ bool DownloadCommand::executeInternal() {
const SharedHandle<DiskAdaptor>& diskAdaptor =
getPieceStorage()->getDiskAdaptor();
SharedHandle<Segment> segment = getSegments().front();
bool eof = false;
if(getSocketRecvBuffer()->bufferEmpty()) {
// Only read from socket when buffer is empty. Imagine that When
// segment length is *short* and we are using HTTP pilelining. We
// issued 2 requests in pipeline. When reading first response
// header, we may read its response body and 2nd response header
// and 2nd response body in buffer if they are small enough to fit
// in buffer. And then server may sends EOF. In this case, we
// read data from socket here, we will get EOF and leaves 2nd
// response unprocessed. To prevent this, we don't read from
// socket when buffer is not empty.
eof = getSocketRecvBuffer()->recv() == 0 &&
!getSocket()->wantRead() && !getSocket()->wantWrite();
}
if(!eof) {
size_t bufSize;
unsigned char buf[BUFSIZE];
if(sinkFilterOnly_) {
if(segment->getLength() > 0 ) {
if(segment->getLength() > 0) {
if(static_cast<uint64_t>(segment->getPosition()+segment->getLength()) <=
static_cast<uint64_t>(getFileEntry()->getLastOffset())) {
bufSize = std::min(segment->getLength()-segment->getWrittenLength(),
BUFSIZE);
getSocketRecvBuffer()->getBufferLength());
} else {
bufSize =
std::min
(static_cast<size_t>
(getFileEntry()->getLastOffset()-segment->getPositionToWrite()),
BUFSIZE);
getSocketRecvBuffer()->getBufferLength());
}
} else {
bufSize = BUFSIZE;
bufSize = getSocketRecvBuffer()->getBufferLength();
}
getSocket()->readData(buf, bufSize);
streamFilter_->transform(diskAdaptor, segment, buf, bufSize);
streamFilter_->transform(diskAdaptor, segment,
getSocketRecvBuffer()->getBuffer(), bufSize);
} else {
// It is possible that segment is completed but we have some bytes
// of stream to read. For example, chunked encoding has "0"+CRLF
// after data. After we read data(at this moment segment is
// completed), we need another 3bytes(or more if it has trailers).
bufSize = BUFSIZE;
getSocket()->peekData(buf, bufSize);
streamFilter_->transform(diskAdaptor, segment, buf, bufSize);
streamFilter_->transform(diskAdaptor, segment,
getSocketRecvBuffer()->getBuffer(),
getSocketRecvBuffer()->getBufferLength());
bufSize = streamFilter_->getBytesProcessed();
getSocket()->readData(buf, bufSize);
}
getSocketRecvBuffer()->shiftBuffer(bufSize);
peerStat_->updateDownloadLength(bufSize);
}
getSegmentMan()->updateDownloadSpeedFor(peerStat_);
bool segmentPartComplete = false;
// Note that GrowSegment::complete() always returns false.
@ -163,8 +182,7 @@ bool DownloadCommand::executeInternal() {
if(segment->complete() ||
segment->getPositionToWrite() == getFileEntry()->getLastOffset()) {
segmentPartComplete = true;
} else if(segment->getLength() == 0 && bufSize == 0 &&
!getSocket()->wantRead() && !getSocket()->wantWrite()) {
} else if(segment->getLength() == 0 && eof) {
segmentPartComplete = true;
}
} else {
@ -187,8 +205,7 @@ bool DownloadCommand::executeInternal() {
}
}
if(!segmentPartComplete && bufSize == 0 &&
!getSocket()->wantRead() && !getSocket()->wantWrite()) {
if(!segmentPartComplete && eof) {
throw DL_RETRY_EX(EX_GOT_EOF);
}
@ -245,6 +262,7 @@ bool DownloadCommand::executeInternal() {
} else {
checkLowestDownloadSpeed();
setWriteCheckSocketIf(getSocket(), getSocket()->wantWrite());
checkSocketRecvBuffer();
getDownloadEngine()->addCommand(this);
return false;
}
@ -288,8 +306,6 @@ bool DownloadCommand::prepareForNextSegment() {
getDownloadEngine()->getCheckIntegrityMan()->pushEntry(entry);
}
}
// Following 2lines are needed for DownloadEngine to detect
// completed RequestGroups without 1sec delay.
getDownloadEngine()->setNoWait(true);
getDownloadEngine()->setRefreshInterval(0);
#endif // ENABLE_MESSAGE_DIGEST
@ -319,6 +335,7 @@ bool DownloadCommand::prepareForNextSegment() {
// nextSegment->getWrittenLength() corrupts file.
return prepareForRetry(0);
} else {
checkSocketRecvBuffer();
getDownloadEngine()->addCommand(this);
return false;
}

View File

@ -83,7 +83,8 @@ public:
const SharedHandle<FileEntry>& fileEntry,
RequestGroup* requestGroup,
DownloadEngine* e,
const SharedHandle<SocketCore>& s);
const SharedHandle<SocketCore>& s,
const SharedHandle<SocketRecvBuffer>& socketRecvBuffer);
virtual ~DownloadCommand();
const SharedHandle<StreamFilter>& getStreamFilter() const

View File

@ -44,6 +44,7 @@
#include "FtpConnection.h"
#include "Logger.h"
#include "FileEntry.h"
#include "SocketRecvBuffer.h"
namespace aria2 {
@ -56,13 +57,14 @@ FtpDownloadCommand::FtpDownloadCommand
DownloadEngine* e,
const SocketHandle& dataSocket,
const SocketHandle& ctrlSocket)
:DownloadCommand(cuid, req, fileEntry, requestGroup, e, dataSocket),
:DownloadCommand(cuid, req, fileEntry, requestGroup, e, dataSocket,
SharedHandle<SocketRecvBuffer>
(new SocketRecvBuffer(dataSocket))),
ftpConnection_(ftpConnection),
ctrlSocket_(ctrlSocket) {}
FtpDownloadCommand::~FtpDownloadCommand() {}
bool FtpDownloadCommand::prepareForNextSegment()
{
if(getOption()->getAsBool(PREF_FTP_REUSE_CONNECTION) &&

View File

@ -50,6 +50,7 @@
#include "LogFactory.h"
#include "util.h"
#include "wallclock.h"
#include "SocketRecvBuffer.h"
namespace aria2 {

View File

@ -55,6 +55,7 @@
#include "AuthConfigFactory.h"
#include "AuthConfig.h"
#include "fmt.h"
#include "SocketRecvBuffer.h"
namespace aria2 {
@ -99,8 +100,10 @@ Command* FtpInitiateConnectionCommand::createNextCommand
if(proxyMethod == V_GET) {
// Use GET for FTP via HTTP proxy.
getRequest()->setMethod(Request::METHOD_GET);
SharedHandle<SocketRecvBuffer> socketRecvBuffer
(new SocketRecvBuffer(getSocket()));
SharedHandle<HttpConnection> hc
(new HttpConnection(getCuid(), getSocket()));
(new HttpConnection(getCuid(), getSocket(), socketRecvBuffer));
HttpRequestCommand* c =
new HttpRequestCommand(getCuid(), getRequest(), getFileEntry(),
@ -130,8 +133,10 @@ Command* FtpInitiateConnectionCommand::createNextCommand
} else if(proxyMethod == V_GET) {
// Use GET for FTP via HTTP proxy.
getRequest()->setMethod(Request::METHOD_GET);
SharedHandle<SocketRecvBuffer> socketRecvBuffer
(new SocketRecvBuffer(pooledSocket));
SharedHandle<HttpConnection> hc
(new HttpConnection(getCuid(), pooledSocket));
(new HttpConnection(getCuid(), pooledSocket, socketRecvBuffer));
HttpRequestCommand* c =
new HttpRequestCommand(getCuid(), getRequest(), getFileEntry(),

View File

@ -73,6 +73,7 @@
#include "DlRetryEx.h"
#include "CheckIntegrityEntry.h"
#include "error_code.h"
#include "SocketRecvBuffer.h"
namespace aria2 {
@ -677,7 +678,9 @@ bool FtpNegotiationCommand::resolveProxy()
dataSocket_->establishConnection(proxyAddr_, proxyReq->getPort());
disableReadCheckSocket();
setWriteCheckSocket(dataSocket_);
http_.reset(new HttpConnection(getCuid(), dataSocket_));
SharedHandle<SocketRecvBuffer> socketRecvBuffer
(new SocketRecvBuffer(dataSocket_));
http_.reset(new HttpConnection(getCuid(), dataSocket_, socketRecvBuffer));
sequence_ = SEQ_SEND_TUNNEL_REQUEST;
return false;
}

View File

@ -37,6 +37,7 @@
#include "Request.h"
#include "Socket.h"
#include "DownloadContext.h"
#include "SocketRecvBuffer.h"
namespace aria2 {

View File

@ -39,6 +39,7 @@
#include "HttpRequest.h"
#include "Segment.h"
#include "Socket.h"
#include "SocketRecvBuffer.h"
namespace aria2 {

View File

@ -56,6 +56,7 @@
#include "AuthConfig.h"
#include "a2functional.h"
#include "fmt.h"
#include "SocketRecvBuffer.h"
namespace aria2 {
@ -67,9 +68,13 @@ HttpRequestEntry::HttpRequestEntry
HttpRequestEntry::~HttpRequestEntry() {}
HttpConnection::HttpConnection(cuid_t cuid, const SocketHandle& socket)
HttpConnection::HttpConnection
(cuid_t cuid,
const SocketHandle& socket,
const SharedHandle<SocketRecvBuffer>& socketRecvBuffer)
: cuid_(cuid),
socket_(socket),
socketRecvBuffer_(socketRecvBuffer),
socketBuffer_(socket)
{}
@ -126,36 +131,34 @@ SharedHandle<HttpResponse> HttpConnection::receiveResponse()
}
HttpRequestEntryHandle entry = outstandingHttpRequests_.front();
HttpHeaderProcessorHandle proc = entry->getHttpHeaderProcessor();
unsigned char buf[512];
size_t size = sizeof(buf);
socket_->peekData(buf, size);
if(size == 0) {
if(socket_->wantRead() || socket_->wantWrite()) {
return SharedHandle<HttpResponse>();
} else {
if(socketRecvBuffer_->bufferEmpty()) {
if(socketRecvBuffer_->recv() == 0 &&
!socket_->wantRead() && !socket_->wantWrite()) {
throw DL_RETRY_EX(EX_GOT_EOF);
}
}
proc->update(buf, size);
if(!proc->eoh()) {
socket_->readData(buf, size);
return SharedHandle<HttpResponse>();
}
proc->update(socketRecvBuffer_->getBuffer(),
socketRecvBuffer_->getBufferLength());
SharedHandle<HttpResponse> httpResponse;
size_t shiftBufferLength;
if(proc->eoh()) {
SharedHandle<HttpHeader> httpHeader = proc->getHttpResponseHeader();
size_t putbackDataLength = proc->getPutBackDataLength();
size -= putbackDataLength;
socket_->readData(buf, size);
A2_LOG_INFO(fmt(MSG_RECEIVE_RESPONSE,
cuid_,
proc->getHeaderString().c_str()));
SharedHandle<HttpHeader> httpHeader = proc->getHttpResponseHeader();
SharedHandle<HttpResponse> httpResponse(new HttpResponse());
assert(socketRecvBuffer_->getBufferLength() >= putbackDataLength);
shiftBufferLength =
socketRecvBuffer_->getBufferLength()-putbackDataLength;
httpResponse.reset(new HttpResponse());
httpResponse->setCuid(cuid_);
httpResponse->setHttpHeader(httpHeader);
httpResponse->setHttpRequest(entry->getHttpRequest());
outstandingHttpRequests_.pop_front();
} else {
shiftBufferLength = socketRecvBuffer_->getBufferLength();
}
socketRecvBuffer_->shiftBuffer(shiftBufferLength);
return httpResponse;
}

View File

@ -52,6 +52,7 @@ class HttpHeaderProcessor;
class Option;
class Segment;
class SocketCore;
class SocketRecvBuffer;
class HttpRequestEntry {
private:
@ -80,6 +81,7 @@ class HttpConnection {
private:
cuid_t cuid_;
SharedHandle<SocketCore> socket_;
SharedHandle<SocketRecvBuffer> socketRecvBuffer_;
SocketBuffer socketBuffer_;
const Option* option_;
@ -87,7 +89,10 @@ private:
std::string eraseConfidentialInfo(const std::string& request);
public:
HttpConnection(cuid_t cuid, const SharedHandle<SocketCore>& socket);
HttpConnection
(cuid_t cuid,
const SharedHandle<SocketCore>& socket,
const SharedHandle<SocketRecvBuffer>& socketRecvBuffer);
~HttpConnection();
/**
@ -124,6 +129,11 @@ public:
bool sendBufferIsEmpty() const;
void sendPendingData();
const SharedHandle<SocketRecvBuffer>& getSocketRecvBuffer() const
{
return socketRecvBuffer_;
}
};
typedef SharedHandle<HttpConnection> HttpConnectionHandle;

View File

@ -51,6 +51,7 @@
#include "StreamFilter.h"
#include "SinkStreamFilter.h"
#include "util.h"
#include "SocketRecvBuffer.h"
namespace aria2 {
@ -63,9 +64,11 @@ HttpDownloadCommand::HttpDownloadCommand
const HttpConnectionHandle& httpConnection,
DownloadEngine* e,
const SocketHandle& socket)
:DownloadCommand(cuid, req, fileEntry, requestGroup, e, socket),
: DownloadCommand(cuid, req, fileEntry, requestGroup, e, socket,
httpConnection->getSocketRecvBuffer()),
httpResponse_(httpResponse),
httpConnection_(httpConnection) {}
httpConnection_(httpConnection)
{}
HttpDownloadCommand::~HttpDownloadCommand() {}

View File

@ -50,6 +50,7 @@
#include "A2STR.h"
#include "util.h"
#include "fmt.h"
#include "SocketRecvBuffer.h"
namespace aria2 {
@ -94,8 +95,10 @@ Command* HttpInitiateConnectionCommand::createNextCommand
getSocket());
command = c;
} else if(proxyMethod == V_GET) {
SharedHandle<SocketRecvBuffer> socketRecvBuffer
(new SocketRecvBuffer(getSocket()));
SharedHandle<HttpConnection> httpConnection
(new HttpConnection(getCuid(), getSocket()));
(new HttpConnection(getCuid(), getSocket(), socketRecvBuffer));
HttpRequestCommand* c = new HttpRequestCommand(getCuid(),
getRequest(),
getFileEntry(),
@ -111,8 +114,10 @@ Command* HttpInitiateConnectionCommand::createNextCommand
}
} else {
setConnectedAddrInfo(getRequest(), hostname, pooledSocket);
SharedHandle<SocketRecvBuffer> socketRecvBuffer
(new SocketRecvBuffer(pooledSocket));
SharedHandle<HttpConnection> httpConnection
(new HttpConnection(getCuid(), pooledSocket));
(new HttpConnection(getCuid(), pooledSocket, socketRecvBuffer));
HttpRequestCommand* c = new HttpRequestCommand(getCuid(),
getRequest(),
getFileEntry(),
@ -139,8 +144,10 @@ Command* HttpInitiateConnectionCommand::createNextCommand
setSocket(pooledSocket);
setConnectedAddrInfo(getRequest(), hostname, pooledSocket);
}
SharedHandle<SocketRecvBuffer> socketRecvBuffer
(new SocketRecvBuffer(getSocket()));
SharedHandle<HttpConnection> httpConnection
(new HttpConnection(getCuid(), getSocket()));
(new HttpConnection(getCuid(), getSocket(), socketRecvBuffer));
HttpRequestCommand* c =
new HttpRequestCommand(getCuid(), getRequest(), getFileEntry(),
getRequestGroup(),

View File

@ -36,6 +36,7 @@
#include "HttpProxyResponseCommand.h"
#include "Request.h"
#include "Socket.h"
#include "SocketRecvBuffer.h"
namespace aria2 {

View File

@ -39,6 +39,7 @@
#include "HttpRequest.h"
#include "Segment.h"
#include "Socket.h"
#include "SocketRecvBuffer.h"
namespace aria2 {

View File

@ -58,6 +58,7 @@
#include "Logger.h"
#include "LogFactory.h"
#include "fmt.h"
#include "SocketRecvBuffer.h"
namespace aria2 {
@ -69,7 +70,8 @@ HttpRequestCommand::HttpRequestCommand
const HttpConnectionHandle& httpConnection,
DownloadEngine* e,
const SocketHandle& s)
: AbstractCommand(cuid, req, fileEntry, requestGroup, e, s),
: AbstractCommand(cuid, req, fileEntry, requestGroup, e, s,
httpConnection->getSocketRecvBuffer()),
httpConnection_(httpConnection)
{
setTimeout(getOption()->getAsInt(PREF_CONNECT_TIMEOUT));

View File

@ -71,6 +71,7 @@
#include "SinkStreamFilter.h"
#include "ChunkedDecodingStreamFilter.h"
#include "uri.h"
#include "SocketRecvBuffer.h"
#ifdef HAVE_LIBZ
# include "GZipDecodingStreamFilter.h"
#endif // HAVE_LIBZ
@ -134,9 +135,12 @@ HttpResponseCommand::HttpResponseCommand
const HttpConnectionHandle& httpConnection,
DownloadEngine* e,
const SocketHandle& s)
: AbstractCommand(cuid, req, fileEntry, requestGroup, e, s),
: AbstractCommand(cuid, req, fileEntry, requestGroup, e, s,
httpConnection->getSocketRecvBuffer()),
httpConnection_(httpConnection)
{}
{
checkSocketRecvBuffer();
}
HttpResponseCommand::~HttpResponseCommand() {}

View File

@ -59,6 +59,7 @@
#include "NullSinkStreamFilter.h"
#include "SinkStreamFilter.h"
#include "error_code.h"
#include "SocketRecvBuffer.h"
namespace aria2 {
@ -71,14 +72,17 @@ HttpSkipResponseCommand::HttpSkipResponseCommand
const SharedHandle<HttpResponse>& httpResponse,
DownloadEngine* e,
const SharedHandle<SocketCore>& s)
: AbstractCommand(cuid, req, fileEntry, requestGroup, e, s),
: AbstractCommand(cuid, req, fileEntry, requestGroup, e, s,
httpConnection->getSocketRecvBuffer()),
httpConnection_(httpConnection),
httpResponse_(httpResponse),
streamFilter_(new NullSinkStreamFilter()),
sinkFilterOnly_(true),
totalLength_(httpResponse_->getEntityLength()),
receivedBytes_(0)
{}
{
checkSocketRecvBuffer();
}
HttpSkipResponseCommand::~HttpSkipResponseCommand() {}
@ -108,30 +112,34 @@ bool HttpSkipResponseCommand::executeInternal()
}
return processResponse();
}
const size_t BUFSIZE = 16*1024;
unsigned char buf[BUFSIZE];
size_t bufSize;
if(sinkFilterOnly_ && totalLength_ > 0) {
bufSize = totalLength_-receivedBytes_;
} else {
bufSize = BUFSIZE;
}
bool eof = false;
try {
size_t bufSize;
if(getSocketRecvBuffer()->bufferEmpty()) {
eof = getSocketRecvBuffer()->recv() == 0 &&
!getSocket()->wantRead() && !getSocket()->wantWrite();
}
if(!eof) {
if(sinkFilterOnly_) {
getSocket()->readData(buf, bufSize);
if(totalLength_ > 0) {
bufSize = std::min(totalLength_-receivedBytes_,
getSocketRecvBuffer()->getBufferLength());
} else {
bufSize = getSocketRecvBuffer()->getBufferLength();
}
receivedBytes_ += bufSize;
} else {
getSocket()->peekData(buf, bufSize);
// receivedBytes_ is not updated if transferEncoding is set.
// The return value is safely ignored here.
streamFilter_->transform(SharedHandle<BinaryStream>(),
SharedHandle<Segment>(),
buf, bufSize);
getSocketRecvBuffer()->getBuffer(),
getSocketRecvBuffer()->getBufferLength());
bufSize = streamFilter_->getBytesProcessed();
getSocket()->readData(buf, bufSize);
}
if(totalLength_ != 0 && bufSize == 0 &&
!getSocket()->wantRead() && !getSocket()->wantWrite()) {
getSocketRecvBuffer()->shiftBuffer(bufSize);
}
if(totalLength_ != 0 && eof) {
throw DL_RETRY_EX(EX_GOT_EOF);
}
} catch(RecoverableException& e) {
@ -141,10 +149,8 @@ bool HttpSkipResponseCommand::executeInternal()
bool finished = false;
if(sinkFilterOnly_) {
if(bufSize == 0) {
if(!getSocket()->wantRead() && !getSocket()->wantWrite()) {
if(eof) {
return processResponse();
}
} else {
finished = (totalLength_ == receivedBytes_);
}

View File

@ -63,7 +63,8 @@ private:
protected:
virtual bool executeInternal();
public:
HttpSkipResponseCommand(cuid_t cuid,
HttpSkipResponseCommand
(cuid_t cuid,
const SharedHandle<Request>& req,
const SharedHandle<FileEntry>& fileEntry,
RequestGroup* requestGroup,

View File

@ -50,6 +50,7 @@
#include "util.h"
#include "RecoverableException.h"
#include "fmt.h"
#include "SocketRecvBuffer.h"
namespace aria2 {

View File

@ -43,6 +43,7 @@
#include "Option.h"
#include "prefs.h"
#include "SocketCore.h"
#include "SocketRecvBuffer.h"
namespace aria2 {

View File

@ -182,6 +182,7 @@ SRCS = Socket.h\
NsCookieParser.cc NsCookieParser.h\
CookieStorage.cc CookieStorage.h\
SocketBuffer.cc SocketBuffer.h\
SocketRecvBuffer.cc SocketRecvBuffer.h\
OptionHandlerException.cc OptionHandlerException.h\
URIResult.cc URIResult.h\
EventPoll.h\

View File

@ -77,6 +77,7 @@
#include "SocketCore.h"
#include "SimpleRandomizer.h"
#include "Segment.h"
#include "SocketRecvBuffer.h"
#ifdef ENABLE_MESSAGE_DIGEST
# include "CheckIntegrityCommand.h"
# include "ChecksumCheckIntegrityEntry.h"

77
src/SocketRecvBuffer.cc Normal file
View File

@ -0,0 +1,77 @@
/* <!-- copyright */
/*
* aria2 - The high speed download utility
*
* Copyright (C) 2011 Tatsuhiro Tsujikawa
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* In addition, as a special exception, the copyright holders give
* permission to link the code of portions of this program with the
* OpenSSL library under certain conditions as described in each
* individual source file, and distribute linked combinations
* including the two.
* You must obey the GNU General Public License in all respects
* for all of the code used other than OpenSSL. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you
* do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source
* files in the program, then also delete it here.
*/
/* copyright --> */
#include "SocketRecvBuffer.h"
#include <cstring>
#include "SocketCore.h"
#include "LogFactory.h"
namespace aria2 {
SocketRecvBuffer::SocketRecvBuffer
(const SharedHandle<SocketCore>& socket,
size_t capacity)
: socket_(socket),
capacity_(capacity),
buf_(new unsigned char[capacity_]),
bufLen_(0)
{}
SocketRecvBuffer::~SocketRecvBuffer()
{
delete [] buf_;
}
ssize_t SocketRecvBuffer::recv()
{
size_t len = capacity_-bufLen_;
if(len > 0) {
socket_->readData(buf_+bufLen_, len);
bufLen_ += len;
} else {
A2_LOG_DEBUG("Buffer full");
}
return len;
}
void SocketRecvBuffer::shiftBuffer(size_t offset)
{
assert(offset <= bufLen_);
memmove(buf_, buf_+offset, bufLen_-offset);
bufLen_ -= offset;
}
} // namespace aria2

86
src/SocketRecvBuffer.h Normal file
View File

@ -0,0 +1,86 @@
/* <!-- copyright */
/*
* aria2 - The high speed download utility
*
* Copyright (C) 2011 Tatsuhiro Tsujikawa
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* In addition, as a special exception, the copyright holders give
* permission to link the code of portions of this program with the
* OpenSSL library under certain conditions as described in each
* individual source file, and distribute linked combinations
* including the two.
* You must obey the GNU General Public License in all respects
* for all of the code used other than OpenSSL. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you
* do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source
* files in the program, then also delete it here.
*/
/* copyright --> */
#ifndef D_SOCKET_RECV_BUFFER_H
#define D_SOCKET_RECV_BUFFER_H
#include "common.h"
#include "SharedHandle.h"
namespace aria2 {
class SocketCore;
class SocketRecvBuffer {
public:
SocketRecvBuffer
(const SharedHandle<SocketCore>& socket,
size_t capacity = 16*1024);
~SocketRecvBuffer();
// Reads data from socket as much as capacity allows. Returns the
// number of bytes read.
ssize_t recv();
// Shifts buffer by offset bytes. offset must satisfy offset <=
// getBufferLength().
void shiftBuffer(size_t offset);
const SharedHandle<SocketCore>& getSocket() const
{
return socket_;
}
const unsigned char* getBuffer() const
{
return buf_;
}
size_t getBufferLength() const
{
return bufLen_;
}
bool bufferEmpty() const
{
return bufLen_ == 0;
}
private:
SharedHandle<SocketCore> socket_;
size_t capacity_;
unsigned char* buf_;
size_t bufLen_;
};
} // namespace aria2
#endif // D_SOCKET_RECV_BUFFER_H