2009-06-30 Tatsuhiro Tsujikawa <t-tujikawa@users.sourceforge.net>

Rewritten PeerStat handling. In the previous implementation,
	faster command tries to find slower command. In this new
	implementation, slower command tries to find faster command.
	PeerStat is now created in each HTTP/FTP request to get download
	rate correctly. If the download range is small, the download rate
	tends to small and this will occur at the later stage of download.
	* src/AbstractCommand.cc
	* src/DownloadCommand.cc
	* src/FileEntry.cc
	* src/FileEntry.h
	* src/Request.cc
	* src/Request.h
	* src/RequestGroup.h
	* src/RequestGroupMan.cc
	* src/SegmentMan.cc
	* src/SegmentMan.h
	* test/SegmentManTest.cc
pull/1/head
Tatsuhiro Tsujikawa 2009-06-29 15:18:21 +00:00
parent e82f870fdc
commit c4d79d7160
12 changed files with 174 additions and 189 deletions

View File

@ -1,3 +1,23 @@
2009-06-30 Tatsuhiro Tsujikawa <t-tujikawa@users.sourceforge.net>
Rewritten PeerStat handling. In the previous implementation,
faster command tries to find slower command. In this new
implementation, slower command tries to find faster command.
PeerStat is now created in each HTTP/FTP request to get download
rate correctly. If the download range is small, the download rate
tends to small and this will occur at the later stage of download.
* src/AbstractCommand.cc
* src/DownloadCommand.cc
* src/FileEntry.cc
* src/FileEntry.h
* src/Request.cc
* src/Request.h
* src/RequestGroup.h
* src/RequestGroupMan.cc
* src/SegmentMan.cc
* src/SegmentMan.h
* test/SegmentManTest.cc
2009-06-29 Tatsuhiro Tsujikawa <t-tujikawa@users.sourceforge.net>
Removed _uris from RequestGroup. All functions that refer to _uris

View File

@ -47,6 +47,7 @@
#include "DlRetryEx.h"
#include "DownloadFailureException.h"
#include "CreateRequestCommand.h"
#include "InitiateConnectionCommandFactory.h"
#include "SleepCommand.h"
#ifdef ENABLE_ASYNC_DNS
#include "AsyncNameResolver.h"
@ -122,16 +123,29 @@ bool AbstractCommand::execute() {
//logger->debug("CUID#%d - finished.", cuid);
return true;
}
PeerStatHandle peerStat;
if(!_requestGroup->getSegmentMan().isNull()) {
peerStat = _requestGroup->getSegmentMan()->getPeerStat(cuid);
}
if(!peerStat.isNull()) {
if(peerStat->getStatus() == PeerStat::REQUEST_IDLE) {
logger->info(MSG_ABORT_REQUESTED, cuid);
onAbort();
req->resetUrl();
tryReserved();
// TODO1.5 it is not needed to check other PeerStats every time.
// Find faster Request when this is the only active Request
if(!req.isNull() && _requestGroup->getNumStreamConnection() == 1 &&
_fileEntry->countPooledRequest() > 0) {
SharedHandle<Request> fasterRequest = _fileEntry->findFasterRequest(req);
if(!fasterRequest.isNull()) {
logger->info("CUID#%d - Use faster Request hostname=%s, port=%u",
cuid,
fasterRequest->getHost().c_str(),
fasterRequest->getPort());
// Cancel current Request object and use faster one.
_fileEntry->removeRequest(req);
Command* command =
InitiateConnectionCommandFactory::createInitiateConnectionCommand
(cuid, fasterRequest, _fileEntry, _requestGroup, e);
// TODO1.5 Here is ServerHost stuff
//ServerHostHandle sv(new ServerHost(command->getCuid(), req->getHost()));
//registerServerHost(sv);
e->setNoWait(true);
e->commands.push_back(command);
return true;
}
}
@ -259,8 +273,6 @@ bool AbstractCommand::prepareForRetry(time_t wait) {
_fileEntry->poolRequest(req);
}
if(!_segments.empty()) {
// TODO1.5 subtract 1 from getPositionToWrite()
SharedHandle<FileEntry> fileEntry = _requestGroup->getDownloadContext()->findFileEntryByOffset(_segments.front()->getPositionToWrite()-1);
logger->debug("CUID#%d - Pooling request URI=%s",
cuid, req->getUrl().c_str());
_requestGroup->getSegmentMan()->recognizeSegmentFor(_fileEntry);

View File

@ -89,9 +89,9 @@ DownloadCommand::DownloadCommand(int cuid,
}
}
#endif // ENABLE_MESSAGE_DIGEST
peerStat = _requestGroup->getSegmentMan()->getPeerStat(cuid);
peerStat = req->getPeerStat();
if(peerStat.isNull()) {
peerStat.reset(new PeerStat(cuid, req->getHost(), req->getProtocol()));
peerStat = req->initPeerStat();
_requestGroup->getSegmentMan()->registerPeerStat(peerStat);
}
peerStat->downloadStart();
@ -100,6 +100,7 @@ DownloadCommand::DownloadCommand(int cuid,
DownloadCommand::~DownloadCommand() {
assert(peerStat.get());
peerStat->downloadStop();
req->purgePeerStat();
}
bool DownloadCommand::executeInternal() {

View File

@ -121,17 +121,71 @@ FileEntry::getRequest(const SharedHandle<URISelector>& selector)
}
}
} else {
req = _requestPool.back();
_requestPool.pop_back();
req = _requestPool.front();
_requestPool.pop_front();
_inFlightRequests.push_back(req);
return req;
}
}
SharedHandle<Request>
FileEntry::findFasterRequest(const SharedHandle<Request>& base)
{
if(_requestPool.empty()) {
return SharedHandle<Request>();
}
const SharedHandle<PeerStat>& fastest = _requestPool.front()->getPeerStat();
if(fastest.isNull()) {
return SharedHandle<Request>();
}
const SharedHandle<PeerStat>& basestat = base->getPeerStat();
// TODO1.5 hard coded value. See PREF_STARTUP_IDLE_TIME
const int startupIdleTime = 10;
if(basestat.isNull() ||
(basestat->getDownloadStartTime().elapsed(startupIdleTime) &&
fastest->getAvgDownloadSpeed()*0.8 > basestat->calculateDownloadSpeed())){
// TODO1.5 we should consider that "fastest" is very slow.
SharedHandle<Request> fastestRequest = _requestPool.front();
_requestPool.pop_front();
return fastestRequest;
}
return SharedHandle<Request>();
}
class RequestFaster {
public:
bool operator()(const SharedHandle<Request>& lhs,
const SharedHandle<Request>& rhs) const
{
if(lhs->getPeerStat().isNull()) {
return false;
}
if(rhs->getPeerStat().isNull()) {
return true;
}
return
lhs->getPeerStat()->getAvgDownloadSpeed() > rhs->getPeerStat()->getAvgDownloadSpeed();
}
};
void FileEntry::storePool(const SharedHandle<Request>& request)
{
const SharedHandle<PeerStat>& peerStat = request->getPeerStat();
if(!peerStat.isNull()) {
// We need to calculate average download speed here in order to
// store Request in the right position in the pool.
peerStat->calculateAvgDownloadSpeed();
}
std::deque<SharedHandle<Request> >::iterator i =
std::lower_bound(_requestPool.begin(), _requestPool.end(), request,
RequestFaster());
_requestPool.insert(i, request);
}
void FileEntry::poolRequest(const SharedHandle<Request>& request)
{
removeRequest(request);
_requestPool.push_back(request);
storePool(request);
}
bool FileEntry::removeRequest(const SharedHandle<Request>& request)

View File

@ -70,6 +70,8 @@ private:
std::deque<URIResult> _uriResults;
Logger* _logger;
void storePool(const SharedHandle<Request>& request);
public:
FileEntry();
@ -158,6 +160,11 @@ public:
// and return the Request object.
SharedHandle<Request> getRequest(const SharedHandle<URISelector>& selector);
// Finds pooled Request object which is faster than passed one,
// comparing their PeerStat objects. If such Request is found, it is
// removed from the pool and returned.
SharedHandle<Request> findFasterRequest(const SharedHandle<Request>& base);
void poolRequest(const SharedHandle<Request>& request);
bool removeRequest(const SharedHandle<Request>& request);
@ -167,6 +174,11 @@ public:
return _inFlightRequests.size();
}
size_t countPooledRequest() const
{
return _requestPool.size();
}
bool operator<(const FileEntry& fileEntry) const;
bool exists() const;

View File

@ -244,4 +244,10 @@ void Request::setMaxPipelinedRequest(unsigned int num)
_maxPipelinedRequest = num;
}
const SharedHandle<PeerStat>& Request::initPeerStat()
{
_peerStat.reset(new PeerStat(0, host, protocol));
return _peerStat;
}
} // namespace aria2

View File

@ -40,6 +40,7 @@
#include <deque>
#include "SharedHandle.h"
#include "PeerStat.h"
namespace aria2 {
@ -81,6 +82,8 @@ private:
std::string _password;
SharedHandle<PeerStat> _peerStat;
bool parseUrl(const std::string& url);
public:
Request();
@ -178,6 +181,12 @@ public:
return method;
}
const SharedHandle<PeerStat>& getPeerStat() const { return _peerStat; }
const SharedHandle<PeerStat>& initPeerStat();
void purgePeerStat() { _peerStat.reset(); }
static const std::string METHOD_GET;
static const std::string METHOD_HEAD;

View File

@ -275,6 +275,9 @@ public:
void decreaseStreamConnection();
// Returns the number of connections used in HTTP(S)/FTP.
unsigned int getNumStreamConnection() { return _numStreamConnection; }
unsigned int getNumConnection() const;
void increaseNumCommand();

View File

@ -329,6 +329,34 @@ public:
}
};
class PeerStatFaster {
public:
bool operator()(const SharedHandle<PeerStat>& lhs,
const SharedHandle<PeerStat>& rhs) const
{
int r;
r = lhs->getHostname().compare(rhs->getHostname());
if(r != 0) {
return r < 0;
}
r = lhs->getProtocol().compare(rhs->getProtocol());
if(r != 0) {
return r < 0;
}
return lhs->getAvgDownloadSpeed() > rhs->getAvgDownloadSpeed();
}
};
class PeerStatHostProtoEqual {
public:
bool operator()(const SharedHandle<PeerStat>& lhs,
const SharedHandle<PeerStat>& rhs) const
{
return lhs->getHostname() == rhs->getHostname() &&
lhs->getProtocol() == rhs->getProtocol();
}
};
class CollectServerStat {
private:
RequestGroupMan* _requestGroupMan;
@ -342,11 +370,16 @@ public:
// Collect statistics during download in PeerStats and update/register
// ServerStatMan
if(!group->getSegmentMan().isNull()) {
const std::deque<SharedHandle<PeerStat> >& peerStats =
group->getSegmentMan()->getPeerStats();
std::deque<SharedHandle<PeerStat> > peerStats =
group->getSegmentMan()->getPeerStats();
std::sort(peerStats.begin(), peerStats.end(), PeerStatFaster());
// Use fastest PeerStat for each hostname/protocol pair.
for(std::deque<SharedHandle<PeerStat> >::const_iterator i =
peerStats.begin(); i != peerStats.end(); ++i) {
peerStats.begin(),
last = std::unique(peerStats.begin(), peerStats.end(),
PeerStatHostProtoEqual());
i != last; ++i) {
if((*i)->getHostname().empty() || (*i)->getProtocol().empty()) {
continue;
}

View File

@ -151,42 +151,6 @@ SegmentHandle SegmentMan::checkoutSegment(cuid_t cuid,
return segment;
}
SegmentEntryHandle SegmentMan::findSlowerSegmentEntry
(const PeerStatHandle& peerStat)
{
unsigned int speed =
static_cast<unsigned int>(peerStat->getAvgDownloadSpeed()*0.8);
SegmentEntryHandle slowSegmentEntry;
int startupIdleTime = _option->getAsInt(PREF_STARTUP_IDLE_TIME);
for(std::deque<SharedHandle<SegmentEntry> >::const_iterator itr =
usedSegmentEntries.begin(); itr != usedSegmentEntries.end(); ++itr) {
const SharedHandle<SegmentEntry>& segmentEntry = *itr;
if(segmentEntry->cuid == 0) {
continue;
}
SharedHandle<PeerStat> p = getPeerStat(segmentEntry->cuid);
if(p.isNull()) {
// "p is null" means that it hasn't used DownloadCommand yet, i.e. waiting
// response from HTTP server after sending HTTP request.
p.reset(new PeerStat(segmentEntry->cuid));
registerPeerStat(p);
slowSegmentEntry = segmentEntry;
} else {
if(p->getCuid() == peerStat->getCuid() ||
(p->getStatus() == PeerStat::ACTIVE &&
!p->getDownloadStartTime().elapsed(startupIdleTime))) {
continue;
}
unsigned int pSpeed = p->calculateDownloadSpeed();
if(pSpeed < speed) {
speed = pSpeed;
slowSegmentEntry = segmentEntry;
}
}
}
return slowSegmentEntry;
}
void SegmentMan::getInFlightSegment(std::deque<SharedHandle<Segment> >& segments,
cuid_t cuid)
{
@ -203,32 +167,7 @@ SegmentHandle SegmentMan::getSegment(cuid_t cuid) {
PieceHandle piece =
_pieceStorage->getSparseMissingUnusedPiece
(_ignoreBitfield.getFilterBitfield(),_ignoreBitfield.getBitfieldLength());
if(piece.isNull()) {
PeerStatHandle myPeerStat = getPeerStat(cuid);
if(myPeerStat.isNull()) {
return SharedHandle<Segment>();
}
SegmentEntryHandle slowSegmentEntry = findSlowerSegmentEntry(myPeerStat);
if(slowSegmentEntry.get()) {
logger->info(MSG_SEGMENT_FORWARDING,
slowSegmentEntry->cuid,
slowSegmentEntry->segment->getIndex(),
cuid);
PeerStatHandle slowPeerStat = getPeerStat(slowSegmentEntry->cuid);
slowPeerStat->requestIdle();
cancelSegment(slowSegmentEntry->cuid);
SharedHandle<Piece> piece =
_pieceStorage->getMissingPiece(slowSegmentEntry->segment->getIndex());
assert(!piece.isNull());
return checkoutSegment(cuid, piece);
} else {
return SharedHandle<Segment>();
}
} else {
return checkoutSegment(cuid, piece);
}
return checkoutSegment(cuid, piece);
}
SegmentHandle SegmentMan::getSegment(cuid_t cuid, size_t index) {
@ -293,36 +232,9 @@ uint64_t SegmentMan::getDownloadLength() const {
}
}
class FindPeerStat {
public:
bool operator()(const SharedHandle<PeerStat>& peerStat, cuid_t cuid) const
{
return peerStat->getCuid() < cuid;
}
};
bool SegmentMan::registerPeerStat(const SharedHandle<PeerStat>& peerStat)
void SegmentMan::registerPeerStat(const SharedHandle<PeerStat>& peerStat)
{
std::deque<SharedHandle<PeerStat> >::iterator i =
std::lower_bound(peerStats.begin(), peerStats.end(),peerStat->getCuid(),
FindPeerStat());
if(i == peerStats.end() || (*i)->getCuid() != peerStat->getCuid()) {
peerStats.insert(i, peerStat);
return true ;
} else {
return false;
}
}
PeerStatHandle SegmentMan::getPeerStat(cuid_t cuid) const
{
std::deque<SharedHandle<PeerStat> >::const_iterator i =
std::lower_bound(peerStats.begin(), peerStats.end(), cuid, FindPeerStat());
if(i != peerStats.end() && (*i)->getCuid() == cuid) {
return *i;
} else {
return SharedHandle<PeerStat>();
}
peerStats.push_back(peerStat);
}
unsigned int SegmentMan::calculateDownloadSpeed()

View File

@ -101,9 +101,6 @@ private:
SharedHandle<Segment> checkoutSegment(cuid_t cuid,
const SharedHandle<Piece>& piece);
SharedHandle<SegmentEntry> findSlowerSegmentEntry
(const SharedHandle<PeerStat>& peerStat);
public:
SegmentMan(const Option* option,
const SharedHandle<DownloadContext>& downloadContext,
@ -183,13 +180,7 @@ public:
* Registers given peerStat if it has not been registerd and returns true.
* Otherwise does nothing and returns false.
*/
bool registerPeerStat(const SharedHandle<PeerStat>& peerStat);
/**
* Returns peerStat whose cuid is given cuid. If it is not found, returns
* 0.
*/
SharedHandle<PeerStat> getPeerStat(cuid_t cuid) const;
void registerPeerStat(const SharedHandle<PeerStat>& peerStat);
const std::deque<SharedHandle<PeerStat> >& getPeerStats() const
{

View File

@ -2,17 +2,11 @@
#include <cppunit/extensions/HelperMacros.h>
#include "File.h"
#include "prefs.h"
#include "Util.h"
#include "DownloadContext.h"
#include "UnknownLengthPieceStorage.h"
#include "DefaultPieceStorage.h"
#include "Segment.h"
#include "Option.h"
#include "FileEntry.h"
#include "MockPieceStorage.h"
#include "PeerStat.h"
#include "PieceSelector.h"
namespace aria2 {
@ -22,8 +16,6 @@ class SegmentManTest:public CppUnit::TestFixture {
CPPUNIT_TEST_SUITE(SegmentManTest);
CPPUNIT_TEST(testNullBitfield);
CPPUNIT_TEST(testCompleteSegment);
CPPUNIT_TEST(testGetPeerStat);
CPPUNIT_TEST(testGetSegment_segmentForward);
CPPUNIT_TEST_SUITE_END();
private:
@ -89,64 +81,4 @@ void SegmentManTest::testCompleteSegment()
CPPUNIT_ASSERT_EQUAL((size_t)2, segments[1]->getIndex());
}
void SegmentManTest::testGetPeerStat()
{
Option op;
size_t pieceLength = 1;
uint64_t totalLength = 1;
SharedHandle<DownloadContext> dctx
(new DownloadContext(pieceLength, totalLength, "aria2.tar.bz2"));
SharedHandle<PieceStorage> ps(new MockPieceStorage());
SegmentMan segmentMan(&op, dctx, ps);
CPPUNIT_ASSERT(segmentMan.getPeerStat(1).isNull());
SharedHandle<PeerStat> ps1(new PeerStat(1));
CPPUNIT_ASSERT(segmentMan.registerPeerStat(ps1));
{
SharedHandle<PeerStat> ps = segmentMan.getPeerStat(1);
CPPUNIT_ASSERT(!ps.isNull());
CPPUNIT_ASSERT_EQUAL(1, ps->getCuid());
}
// Duplicate registering is not allowed.
SharedHandle<PeerStat> ps1d(new PeerStat(1));
CPPUNIT_ASSERT(!segmentMan.registerPeerStat(ps1d));
// See whether it can work on several entries.
SharedHandle<PeerStat> ps2(new PeerStat(2));
SharedHandle<PeerStat> ps3(new PeerStat(3));
CPPUNIT_ASSERT(segmentMan.registerPeerStat(ps3));
CPPUNIT_ASSERT(segmentMan.registerPeerStat(ps2));
{
SharedHandle<PeerStat> ps = segmentMan.getPeerStat(2);
CPPUNIT_ASSERT(!ps.isNull());
CPPUNIT_ASSERT_EQUAL(2, ps->getCuid());
}
}
void SegmentManTest::testGetSegment_segmentForward()
{
Option op;
size_t pieceLength = 1;
uint64_t totalLength = 1;
SharedHandle<DownloadContext> dctx
(new DownloadContext(pieceLength, totalLength, "aria2.tar.bz2"));
SharedHandle<PieceStorage> ps(new DefaultPieceStorage(dctx, &op));
SegmentMan segmentMan(&op, dctx, ps);
SharedHandle<Segment> segment = segmentMan.getSegment(1);
CPPUNIT_ASSERT(!segment.isNull());
CPPUNIT_ASSERT_EQUAL((size_t)0, segment->getIndex());
SharedHandle<PeerStat> cuid2_ps(new PeerStat(2));
CPPUNIT_ASSERT(segmentMan.registerPeerStat(cuid2_ps));
SharedHandle<Segment> segment_forwarded = segmentMan.getSegment(2);
CPPUNIT_ASSERT(!segment_forwarded.isNull());
CPPUNIT_ASSERT_EQUAL((size_t)0, segment_forwarded->getIndex());
// SegmentMan::getSegmetn(3) returns null because CUID#3's PeerStat is not
// registered and all segment(total 1 in this case) are used.
CPPUNIT_ASSERT(segmentMan.getSegment(3).isNull());
}
} // namespace aria2