2009-01-06 Tatsuhiro Tsujikawa <t-tujikawa@users.sourceforge.net>

Applied AdaptiveURISelector patch from Aurelien Lefebvre.  This
	patch adds AdaptiveURISelector, which selects one of the bests
	mirrors for first and reserved connections. For supplementary
	ones, it returns mirrors which has not been tested yet, and if
	each of them already tested, returns mirrors which has to be
	tested again. Otherwise, it doesn't return anymore mirrors.
	* src/AdaptiveURISelector.cc
	* src/AdaptiveURISelector.h
	* src/FtpNegotiationCommand.cc
	* src/HttpResponseCommand.cc
	* src/Makefile.am
	* src/Makefile.in
	* src/OptionHandlerFactory.cc
	* src/RequestGroup.cc
	* src/RequestGroup.h
	* src/RequestGroupMan.cc
	* src/ServerStat.cc
	* src/ServerStat.h
	* src/ServerStatMan.cc
	* src/SpeedCalc.cc
	* src/URISelector.h
	* src/prefs.cc
	* src/prefs.h
	* test/RequestGroupManTest.cc
	* test/ServerStatManTest.cc
	* test/ServerStatTest.cc
pull/1/head
Tatsuhiro Tsujikawa 2009-01-06 13:13:42 +00:00
parent 5325ec4155
commit 0a4f43d0ed
21 changed files with 683 additions and 34 deletions

View File

@ -1,3 +1,32 @@
2009-01-06 Tatsuhiro Tsujikawa <t-tujikawa@users.sourceforge.net>
Applied AdaptiveURISelector patch from Aurelien Lefebvre. This
patch adds AdaptiveURISelector, which selects one of the bests
mirrors for first and reserved connections. For supplementary
ones, it returns mirrors which has not been tested yet, and if
each of them already tested, returns mirrors which has to be
tested again. Otherwise, it doesn't return anymore mirrors.
* src/AdaptiveURISelector.cc
* src/AdaptiveURISelector.h
* src/FtpNegotiationCommand.cc
* src/HttpResponseCommand.cc
* src/Makefile.am
* src/Makefile.in
* src/OptionHandlerFactory.cc
* src/RequestGroup.cc
* src/RequestGroup.h
* src/RequestGroupMan.cc
* src/ServerStat.cc
* src/ServerStat.h
* src/ServerStatMan.cc
* src/SpeedCalc.cc
* src/URISelector.h
* src/prefs.cc
* src/prefs.h
* test/RequestGroupManTest.cc
* test/ServerStatManTest.cc
* test/ServerStatTest.cc
2009-01-04 Tatsuhiro Tsujikawa <t-tujikawa@users.sourceforge.net>
Fixed unit test error.

308
src/AdaptiveURISelector.cc Normal file
View File

@ -0,0 +1,308 @@
/* <!-- copyright */
/*
* aria2 - The high speed download utility
*
* Copyright (C) 2006 Tatsuhiro Tsujikawa
* Copyright (C) 2008 Aurelien Lefebvre, Mandriva
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* In addition, as a special exception, the copyright holders give
* permission to link the code of portions of this program with the
* OpenSSL library under certain conditions as described in each
* individual source file, and distribute linked combinations
* including the two.
* You must obey the GNU General Public License in all respects
* for all of the code used other than OpenSSL. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you
* do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source
* files in the program, then also delete it here.
*/
/* copyright --> */
#include "AdaptiveURISelector.h"
#include <cstdlib>
#include <cmath>
#include <algorithm>
#include "DownloadCommand.h"
#include "DownloadContext.h"
#include "ServerStatMan.h"
#include "ServerStat.h"
#include "RequestGroup.h"
#include "LogFactory.h"
#include "A2STR.h"
#include "prefs.h"
#include "Option.h"
#include "SimpleRandomizer.h"
#include "SocketCore.h"
namespace aria2 {
/* In that URI Selector, select method returns one of the bests
* mirrors for first and reserved connections. For supplementary
* ones, it returns mirrors which has not been tested yet, and
* if each of them already tested, returns mirrors which has to
* be tested again. Otherwise, it doesn't return anymore mirrors.
*/
AdaptiveURISelector::AdaptiveURISelector
(const SharedHandle<ServerStatMan>& serverStatMan,
const SharedHandle<RequestGroup>& requestGroup):
_serverStatMan(serverStatMan),
_requestGroup(requestGroup),
_nbConnections(1),
_logger(LogFactory::getInstance())
{
const Option* op = _requestGroup->getOption();
_nbServerToEvaluate = op->getAsInt(PREF_METALINK_SERVERS) - 1;
}
AdaptiveURISelector::~AdaptiveURISelector() {}
std::string AdaptiveURISelector::select(std::deque<std::string>& uris)
{
std::string selected = selectOne(uris);
if(selected != A2STR::NIL)
uris.erase(std::find(uris.begin(), uris.end(), selected));
return selected;
}
std::string AdaptiveURISelector::selectOne(const std::deque<std::string>& uris)
{
if(uris.empty()) {
return A2STR::NIL;
} else {
const unsigned int numPieces =
_requestGroup->getDownloadContext()->getNumPieces();
bool reservedContext = numPieces > 0 &&
_nbConnections > std::min(numPieces,
_requestGroup->getNumConcurrentCommand());
bool selectBest = numPieces == 0 || reservedContext;
if(numPieces > 0)
_nbConnections++;
/* At least, 3 mirrors must be tested */
if(getNbTestedServers(uris) < 3) {
std::string notTested = getFirstNotTestedUri(uris);
if(notTested != A2STR::NIL) {
_logger->debug("AdaptiveURISelector: choosing the first non tested"
" mirror: %s", notTested.c_str());
--_nbServerToEvaluate;
return notTested;
}
}
if(!selectBest && _nbConnections > 1 && _nbServerToEvaluate > 0) {
_nbServerToEvaluate--;
std::string notTested = getFirstNotTestedUri(uris);
if(notTested != A2STR::NIL) {
/* Here we return the first untested mirror */
_logger->debug("AdaptiveURISelector: choosing non tested mirror %s for"
" connection #%d", notTested.c_str(), _nbConnections);
return notTested;
} else {
/* Here we return a mirror which need to be tested again */
std::string toReTest = getFirstToTestUri(uris);
_logger->debug("AdaptiveURISelector: choosing mirror %s which has not"
" been tested recently for connection #%d",
toReTest.c_str(), _nbConnections);
return toReTest;
}
}
else {
/* Here we return one of the bests mirrors */
unsigned int max = getMaxDownloadSpeed(uris);
unsigned int min = max-(int)(max*0.25);
std::deque<std::string> bests = getUrisBySpeed(uris, min);
if (bests.size() < 2) {
std::string uri = getMaxDownloadSpeedUri(uris);
_logger->debug("AdaptiveURISelector: choosing the best mirror :"
" %.2fKB/s %s (other mirrors are at least 25%% slower)",
(float) max/1024, uri.c_str());
return uri;
} else {
std::string uri = selectRandomUri(bests);
_logger->debug("AdaptiveURISelector: choosing randomly one of the best"
" mirrors (range [%.2fKB/s, %.2fKB/s]): %s",
(float) min/1024, (float) max/1024, uri.c_str());
return uri;
}
}
}
}
void AdaptiveURISelector::resetCounters()
{
const Option* op = _requestGroup->getOption();
_nbConnections = 1;
_nbServerToEvaluate = op->getAsInt(PREF_METALINK_SERVERS) - 1;
}
void AdaptiveURISelector::tuneDownloadCommand
(const std::deque<std::string>& uris, DownloadCommand* command)
{
adjustLowestSpeedLimit(uris, command);
}
void AdaptiveURISelector::adjustLowestSpeedLimit
(const std::deque<std::string>& uris, DownloadCommand* command) const
{
const Option* op = _requestGroup->getOption();
unsigned int lowest = op->getAsInt(PREF_LOWEST_SPEED_LIMIT);
if (lowest > 0) {
unsigned int low_lowest = 4 * 1024;
unsigned int max = getMaxDownloadSpeed(uris);
if (max > 0 && lowest > max / 4) {
_logger->notice("Lowering lowest-speed-limit since known max speed is too"
" near (new:%d was:%d max:%d)", max / 4, lowest, max);
command->setLowestDownloadSpeedLimit(max / 4);
} else if (max == 0 && lowest > low_lowest) {
_logger->notice("Lowering lowest-speed-limit since we have no clue about"
" available speed (now:%d was:%d)", low_lowest, lowest);
command->setLowestDownloadSpeedLimit(low_lowest);
}
}
}
static unsigned int getUriMaxSpeed(SharedHandle<ServerStat> ss)
{
return std::max(ss->getSingleConnectionAvgSpeed(),
ss->getMultiConnectionAvgSpeed());
}
unsigned int AdaptiveURISelector::getMaxDownloadSpeed
(const std::deque<std::string>& uris) const
{
std::string uri = getMaxDownloadSpeedUri(uris);
if(uri == A2STR::NIL)
return 0;
return getUriMaxSpeed(getServerStats(uri));
}
std::string AdaptiveURISelector::getMaxDownloadSpeedUri
(const std::deque<std::string>& uris) const
{
int max = -1;
std::string uri = A2STR::NIL;
for(std::deque<std::string>::const_iterator i = uris.begin();
i != uris.end(); ++i) {
SharedHandle<ServerStat> ss = getServerStats(*i);
if(ss.isNull())
continue;
if((int)ss->getSingleConnectionAvgSpeed() > max) {
max = ss->getSingleConnectionAvgSpeed();
uri = (*i);
}
if((int)ss->getMultiConnectionAvgSpeed() > max) {
max = ss->getMultiConnectionAvgSpeed();
uri = (*i);
}
}
return uri;
}
std::deque<std::string> AdaptiveURISelector::getUrisBySpeed
(const std::deque<std::string>& uris, unsigned int min) const
{
std::deque<std::string> bests;
for(std::deque<std::string>::const_iterator i = uris.begin();
i != uris.end(); ++i) {
SharedHandle<ServerStat> ss = getServerStats(*i);
if(ss.isNull())
continue;
if(ss->getSingleConnectionAvgSpeed() > min ||
ss->getMultiConnectionAvgSpeed() > min) {
bests.push_back(*i);
}
}
return bests;
}
std::string AdaptiveURISelector::selectRandomUri
(const std::deque<std::string>& uris) const
{
int pos = SimpleRandomizer::getInstance()->getRandomNumber(uris.size());
std::deque<std::string>::const_iterator i = uris.begin();
i = i+pos;
return *i;
}
std::string AdaptiveURISelector::getFirstNotTestedUri
(const std::deque<std::string>& uris) const
{
for(std::deque<std::string>::const_iterator i = uris.begin();
i != uris.end(); ++i) {
SharedHandle<ServerStat> ss = getServerStats(*i);
if(ss.isNull())
return *i;
}
return A2STR::NIL;
}
std::string AdaptiveURISelector::getFirstToTestUri
(const std::deque<std::string>& uris) const
{
unsigned int counter;
int power;
for(std::deque<std::string>::const_iterator i = uris.begin();
i != uris.end(); ++i) {
SharedHandle<ServerStat> ss = getServerStats(*i);
if(ss.isNull())
continue;
counter = ss->getCounter();
if(counter > 8)
continue;
power = (int)pow(2.0, (float)counter);
/* We test the mirror another time if it has not been
* tested since 2^counter days */
if(ss->getLastUpdated().difference() > power*24*60*60) {
return *i;
}
}
return A2STR::NIL;
}
SharedHandle<ServerStat> AdaptiveURISelector::getServerStats
(const std::string& uri) const
{
Request r;
r.setUrl(uri);
return _serverStatMan->find(r.getHost(), r.getProtocol());
}
unsigned int AdaptiveURISelector::getNbTestedServers
(const std::deque<std::string>& uris) const
{
unsigned int counter = 0;
for(std::deque<std::string>::const_iterator i = uris.begin();
i != uris.end(); ++i) {
SharedHandle<ServerStat> ss = getServerStats(*i);
if(ss.isNull())
counter++;
}
return uris.size() - counter;
}
} // namespace aria2

83
src/AdaptiveURISelector.h Normal file
View File

@ -0,0 +1,83 @@
/* <!-- copyright */
/*
* aria2 - The high speed download utility
*
* Copyright (C) 2006 Tatsuhiro Tsujikawa
* Copyright (C) 2008 Aurelien Lefebvre, Mandriva
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* In addition, as a special exception, the copyright holders give
* permission to link the code of portions of this program with the
* OpenSSL library under certain conditions as described in each
* individual source file, and distribute linked combinations
* including the two.
* You must obey the GNU General Public License in all respects
* for all of the code used other than OpenSSL. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you
* do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source
* files in the program, then also delete it here.
*/
/* copyright --> */
#ifndef _D_ADAPTIVE_URI_SELECTOR_H_
#define _D_ADAPTIVE_URI_SELECTOR_H_
#include "URISelector.h"
#include "SharedHandle.h"
namespace aria2 {
class ServerStatMan;
class RequestGroup;
class ServerStat;
class Logger;
class AdaptiveURISelector:public URISelector {
private:
SharedHandle<ServerStatMan> _serverStatMan;
SharedHandle<RequestGroup> _requestGroup;
unsigned int _nbServerToEvaluate;
unsigned int _nbConnections;
Logger* _logger;
std::string selectOne(const std::deque<std::string>& uris);
void adjustLowestSpeedLimit(const std::deque<std::string>& uris,
DownloadCommand* command) const;
unsigned int getMaxDownloadSpeed(const std::deque<std::string>& uris) const;
std::string getMaxDownloadSpeedUri(const std::deque<std::string>& uris) const;
std::deque<std::string> getUrisBySpeed(const std::deque<std::string>& uris,
unsigned int min) const;
std::string selectRandomUri(const std::deque<std::string>& uris) const;
std::string getFirstNotTestedUri(const std::deque<std::string>& uris) const;
std::string getFirstToTestUri(const std::deque<std::string>& uris) const;
SharedHandle<ServerStat> getServerStats(const std::string& uri) const;
unsigned int getNbTestedServers(const std::deque<std::string>& uris) const;
public:
AdaptiveURISelector(const SharedHandle<ServerStatMan>& serverStatMan,
const SharedHandle<RequestGroup>& requestGroup);
virtual ~AdaptiveURISelector();
virtual std::string select(std::deque<std::string>& uris);
virtual void tuneDownloadCommand(const std::deque<std::string>& uris,
DownloadCommand* command);
virtual void resetCounters();
};
} // namespace aria2
#endif // _D_ADAPTIVE_URI_SELECTOR_H_

View File

@ -106,6 +106,7 @@ bool FtpNegotiationCommand::executeInternal() {
_requestGroup->removeURIWhoseHostnameIs(sv->getHostname());
}
}
_requestGroup->tuneDownloadCommand(command);
e->commands.push_back(command);
return true;
} else if(sequence == SEQ_HEAD_OK || sequence == SEQ_DOWNLOAD_ALREADY_COMPLETED) {

View File

@ -348,6 +348,8 @@ HttpDownloadCommand* HttpResponseCommand::createHttpDownloadCommand
_requestGroup->setFileAllocationEnabled(false);
}
_requestGroup->tuneDownloadCommand(command);
return command;
}

View File

@ -188,6 +188,7 @@ SRCS = Socket.h\
ServerStat.cc ServerStat.h\
ServerStatMan.cc ServerStatMan.h\
URISelector.h\
AdaptiveURISelector.cc AdaptiveURISelector.h\
InOrderURISelector.cc InOrderURISelector.h\
ServerStatURISelector.cc ServerStatURISelector.h\
NsCookieParser.cc NsCookieParser.h\

View File

@ -405,7 +405,8 @@ am__libaria2c_a_SOURCES_DIST = Socket.h SocketCore.cc SocketCore.h \
A2STR.cc A2STR.h RarestPieceSelector.cc RarestPieceSelector.h \
Decoder.h ChunkedDecoder.cc ChunkedDecoder.h Signature.cc \
Signature.h ServerStat.cc ServerStat.h ServerStatMan.cc \
ServerStatMan.h URISelector.h InOrderURISelector.cc \
ServerStatMan.h URISelector.h AdaptiveURISelector.cc \
AdaptiveURISelector.h InOrderURISelector.cc \
InOrderURISelector.h ServerStatURISelector.cc \
ServerStatURISelector.h NsCookieParser.cc NsCookieParser.h \
CookieStorage.cc CookieStorage.h SocketBuffer.cc \
@ -797,17 +798,17 @@ am__objects_21 = SocketCore.$(OBJEXT) Command.$(OBJEXT) \
FtpFinishDownloadCommand.$(OBJEXT) A2STR.$(OBJEXT) \
RarestPieceSelector.$(OBJEXT) ChunkedDecoder.$(OBJEXT) \
Signature.$(OBJEXT) ServerStat.$(OBJEXT) \
ServerStatMan.$(OBJEXT) InOrderURISelector.$(OBJEXT) \
ServerStatURISelector.$(OBJEXT) NsCookieParser.$(OBJEXT) \
CookieStorage.$(OBJEXT) SocketBuffer.$(OBJEXT) \
OptionHandlerException.$(OBJEXT) bencode.$(OBJEXT) \
$(am__objects_1) $(am__objects_2) $(am__objects_3) \
$(am__objects_4) $(am__objects_5) $(am__objects_6) \
$(am__objects_7) $(am__objects_8) $(am__objects_9) \
$(am__objects_10) $(am__objects_11) $(am__objects_12) \
$(am__objects_13) $(am__objects_14) $(am__objects_15) \
$(am__objects_16) $(am__objects_17) $(am__objects_18) \
$(am__objects_19) $(am__objects_20)
ServerStatMan.$(OBJEXT) AdaptiveURISelector.$(OBJEXT) \
InOrderURISelector.$(OBJEXT) ServerStatURISelector.$(OBJEXT) \
NsCookieParser.$(OBJEXT) CookieStorage.$(OBJEXT) \
SocketBuffer.$(OBJEXT) OptionHandlerException.$(OBJEXT) \
bencode.$(OBJEXT) $(am__objects_1) $(am__objects_2) \
$(am__objects_3) $(am__objects_4) $(am__objects_5) \
$(am__objects_6) $(am__objects_7) $(am__objects_8) \
$(am__objects_9) $(am__objects_10) $(am__objects_11) \
$(am__objects_12) $(am__objects_13) $(am__objects_14) \
$(am__objects_15) $(am__objects_16) $(am__objects_17) \
$(am__objects_18) $(am__objects_19) $(am__objects_20)
am_libaria2c_a_OBJECTS = $(am__objects_21)
libaria2c_a_OBJECTS = $(am_libaria2c_a_OBJECTS)
am__installdirs = "$(DESTDIR)$(bindir)"
@ -1126,7 +1127,8 @@ SRCS = Socket.h SocketCore.cc SocketCore.h BinaryStream.h Command.cc \
A2STR.cc A2STR.h RarestPieceSelector.cc RarestPieceSelector.h \
Decoder.h ChunkedDecoder.cc ChunkedDecoder.h Signature.cc \
Signature.h ServerStat.cc ServerStat.h ServerStatMan.cc \
ServerStatMan.h URISelector.h InOrderURISelector.cc \
ServerStatMan.h URISelector.h AdaptiveURISelector.cc \
AdaptiveURISelector.h InOrderURISelector.cc \
InOrderURISelector.h ServerStatURISelector.cc \
ServerStatURISelector.h NsCookieParser.cc NsCookieParser.h \
CookieStorage.cc CookieStorage.h SocketBuffer.cc \
@ -1237,6 +1239,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/AbstractProxyResponseCommand.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/AbstractSingleDiskAdaptor.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ActivePeerConnectionCommand.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/AdaptiveURISelector.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/AnnounceList.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/AsyncNameResolver.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/AuthConfig.Po@am__quote@

View File

@ -417,7 +417,7 @@ OptionHandlers OptionHandlerFactory::createOptionHandlers()
handlers.push_back(op);
}
{
const std::string params[] = { V_INORDER, V_FEEDBACK };
const std::string params[] = { V_INORDER, V_FEEDBACK, V_ADAPTIVE };
SharedHandle<OptionHandler> op(new ParameterOptionHandler
(PREF_URI_SELECTOR,
TEXT_URI_SELECTOR,

View File

@ -521,6 +521,8 @@ void RequestGroup::createNextCommand(std::deque<Command*>& commands,
std::deque<std::string> pendingURIs;
for(; !_uris.empty() && numCommand--; ) {
std::string uri = _uriSelector->select(_uris);
if(uri.empty())
continue;
RequestHandle req(new Request());
if(req->setUrl(uri)) {
ServerHostHandle sv;
@ -987,6 +989,7 @@ void RequestGroup::reportDownloadFinished()
{
_logger->notice(MSG_FILE_DOWNLOAD_COMPLETED,
getFilePath().c_str());
_uriSelector->resetCounters();
#ifdef ENABLE_BITTORRENT
SharedHandle<BtContext> ctx = dynamic_pointer_cast<BtContext>(_downloadContext);
if(!ctx.isNull()) {
@ -1089,4 +1092,9 @@ bool RequestGroup::inMemoryDownload() const
return _inMemoryDownload;
}
void RequestGroup::tuneDownloadCommand(DownloadCommand* command)
{
_uriSelector->tuneDownloadCommand(_uris, command);
}
} // namespace aria2

View File

@ -51,6 +51,7 @@ class DownloadEngine;
class SegmentMan;
class SegmentManFactory;
class Command;
class DownloadCommand;
class DownloadContext;
class PieceStorage;
class BtProgressInfoFile;
@ -405,6 +406,8 @@ public:
// Returns inMemoryDownload flag.
bool inMemoryDownload() const;
void tuneDownloadCommand(DownloadCommand* command);
};
typedef SharedHandle<RequestGroup> RequestGroupHandle;

View File

@ -57,6 +57,7 @@
#include "SegmentMan.h"
#include "ServerStatURISelector.h"
#include "InOrderURISelector.h"
#include "AdaptiveURISelector.h"
#include "Option.h"
#include "prefs.h"
#include "File.h"
@ -205,15 +206,26 @@ public:
if(!group->getSegmentMan().isNull()) {
const std::deque<SharedHandle<PeerStat> >& peerStats =
group->getSegmentMan()->getPeerStats();
for(std::deque<SharedHandle<PeerStat> >::const_iterator i =
peerStats.begin(); i != peerStats.end(); ++i) {
if((*i)->getHostname().empty() || (*i)->getProtocol().empty()) {
continue;
}
int speed = (*i)->getAvgDownloadSpeed();
if (speed == 0) continue;
SharedHandle<ServerStat> ss =
_requestGroupMan->getOrCreateServerStat((*i)->getHostname(),
(*i)->getProtocol());
ss->updateDownloadSpeed((*i)->getAvgDownloadSpeed());
ss->increaseCounter();
ss->updateDownloadSpeed(speed);
if(peerStats.size() == 1) {
ss->updateSingleConnectionAvgSpeed(speed);
}
else {
ss->updateMultiConnectionAvgSpeed(speed);
}
}
}
}
@ -265,6 +277,10 @@ void RequestGroupMan::configureRequestGroup
} else if(uriSelectorValue == V_INORDER) {
requestGroup->setURISelector
(SharedHandle<URISelector>(new InOrderURISelector()));
} else if(uriSelectorValue == V_ADAPTIVE) {
requestGroup->setURISelector
(SharedHandle<URISelector>(new AdaptiveURISelector(_serverStatMan,
requestGroup)));
}
}

View File

@ -33,10 +33,13 @@
*/
/* copyright --> */
#include "ServerStat.h"
#include "array_fun.h"
#include <ostream>
#include <algorithm>
#include "array_fun.h"
#include "LogFactory.h"
namespace aria2 {
const std::string ServerStat::STATUS_STRING[] = {
@ -49,7 +52,12 @@ ServerStat::ServerStat(const std::string& hostname, const std::string& protocol)
_hostname(hostname),
_protocol(protocol),
_downloadSpeed(0),
_status(OK) {}
_singleConnectionAvgSpeed(0),
_multiConnectionAvgSpeed(0),
_counter(0),
_status(OK),
_logger(LogFactory::getInstance())
{}
ServerStat::~ServerStat() {}
@ -92,6 +100,94 @@ void ServerStat::updateDownloadSpeed(unsigned int downloadSpeed)
_lastUpdated.reset();
}
unsigned int ServerStat::getSingleConnectionAvgSpeed() const
{
return _singleConnectionAvgSpeed;
}
void ServerStat::setSingleConnectionAvgSpeed
(unsigned int singleConnectionAvgSpeed)
{
_singleConnectionAvgSpeed = singleConnectionAvgSpeed;
}
void ServerStat::updateSingleConnectionAvgSpeed(unsigned int downloadSpeed)
{
float avgDownloadSpeed;
if(_counter == 0)
return;
if(_counter < 5) {
avgDownloadSpeed =
((((float)_counter-1)/(float)_counter)*(float)_singleConnectionAvgSpeed)+
((1.0/(float)_counter)*(float)downloadSpeed);
}
else {
avgDownloadSpeed = ((4.0/5.0)*(float)_singleConnectionAvgSpeed) +
((1.0/5.0)*(float)downloadSpeed);
}
if(avgDownloadSpeed < (int)(0.80*_singleConnectionAvgSpeed)) {
_logger->debug("ServerStat:%s: resetting counter since single connection"
" speed dropped", getHostname().c_str());
_counter = 0;
}
_logger->debug("ServerStat:%s: _singleConnectionAvgSpeed old:%.2fKB/s"
" new:%.2fKB/s last:%.2fKB/s",
getHostname().c_str(),
(float) _singleConnectionAvgSpeed/1024,
(float) avgDownloadSpeed/1024,
(float) downloadSpeed / 1024);
_singleConnectionAvgSpeed = (int)avgDownloadSpeed;
}
unsigned int ServerStat::getMultiConnectionAvgSpeed() const
{
return _multiConnectionAvgSpeed;
}
void ServerStat::setMultiConnectionAvgSpeed
(unsigned int multiConnectionAvgSpeed)
{
_multiConnectionAvgSpeed = multiConnectionAvgSpeed;
}
void ServerStat::updateMultiConnectionAvgSpeed(unsigned int downloadSpeed)
{
float avgDownloadSpeed;
if(_counter == 0)
return;
if(_counter < 5) {
avgDownloadSpeed =
((((float)_counter-1)/(float)_counter)*(float)_multiConnectionAvgSpeed) +
((1.0/(float)_counter)*(float)downloadSpeed);
}
else {
avgDownloadSpeed = ((4.0/5.0)*(float)_multiConnectionAvgSpeed) +
((1.0/5.0)*(float)downloadSpeed);
}
_logger->debug("ServerStat:%s: _multiConnectionAvgSpeed old:%.2fKB/s"
" new:%.2fKB/s last:%.2fKB/s",
getHostname().c_str(),
(float) _multiConnectionAvgSpeed/1024,
(float) avgDownloadSpeed/1024,
(float) downloadSpeed / 1024);
_multiConnectionAvgSpeed = (int)avgDownloadSpeed;
}
unsigned int ServerStat::getCounter() const
{
return _counter;
}
void ServerStat::increaseCounter()
{
_counter++;
}
void ServerStat::setCounter(unsigned int value)
{
_counter = value;
}
void ServerStat::setStatus(STATUS status)
{
_status = status;
@ -160,7 +256,10 @@ std::ostream& operator<<(std::ostream& o, const ServerStat& serverStat)
o << "host=" << serverStat.getHostname() << ", "
<< "protocol=" << serverStat.getProtocol() << ", "
<< "dl_speed=" << serverStat.getDownloadSpeed() << ", "
<< "sc_avg_speed=" << serverStat.getSingleConnectionAvgSpeed() << ", "
<< "mc_avg_speed=" << serverStat.getMultiConnectionAvgSpeed() << ", "
<< "last_updated=" << serverStat.getLastUpdated().getTime() << ", "
<< "counter=" << serverStat.getCounter() << ", "
<< "status=" << ServerStat::STATUS_STRING[serverStat.getStatus()];
return o;
}

View File

@ -35,12 +35,16 @@
#ifndef _D_SERVER_STAT_H_
#define _D_SERVER_STAT_H_
#include "common.h"
#include "TimeA2.h"
#include <string>
#include <iosfwd>
#include "TimeA2.h"
namespace aria2 {
class Logger;
// ServerStatMan: has many ServerStat
// URISelector: interface
// ServerStatURISelector: Has a reference of ServerStatMan
@ -75,6 +79,18 @@ public:
// set download speed. This method doesn't update _lastUpdate.
void setDownloadSpeed(unsigned int downloadSpeed);
unsigned int getSingleConnectionAvgSpeed() const;
void updateSingleConnectionAvgSpeed(unsigned int downloadSpeed);
void setSingleConnectionAvgSpeed(unsigned int singleConnectionAvgSpeed);
unsigned int getMultiConnectionAvgSpeed() const;
void updateMultiConnectionAvgSpeed(unsigned int downloadSpeed);
void setMultiConnectionAvgSpeed(unsigned int singleConnectionAvgSpeed);
unsigned int getCounter() const;
void increaseCounter();
void setCounter(unsigned int value);
// This method doesn't update _lastUpdate.
void setStatus(STATUS status);
@ -104,6 +120,14 @@ private:
std::string _protocol;
unsigned int _downloadSpeed;
unsigned int _singleConnectionAvgSpeed;
unsigned int _multiConnectionAvgSpeed;
unsigned int _counter;
Logger* _logger;
STATUS _status;

View File

@ -86,7 +86,10 @@ bool ServerStatMan::load(std::istream& in)
static const std::string S_HOST = "host";
static const std::string S_PROTOCOL = "protocol";
static const std::string S_DL_SPEED = "dl_speed";
static const std::string S_SC_AVG_SPEED = "sc_avg_speed";
static const std::string S_MC_AVG_SPEED = "mc_avg_speed";
static const std::string S_LAST_UPDATED = "last_updated";
static const std::string S_COUNTER = "counter";
static const std::string S_STATUS = "status";
std::string line;
@ -111,6 +114,18 @@ bool ServerStatMan::load(std::istream& in)
SharedHandle<ServerStat> sstat(new ServerStat(m[S_HOST], m[S_PROTOCOL]));
try {
sstat->setDownloadSpeed(Util::parseUInt(m[S_DL_SPEED]));
// Old serverstat file doesn't contains SC_AVG_SPEED
if(m.find(S_SC_AVG_SPEED) != m.end()) {
sstat->setSingleConnectionAvgSpeed(Util::parseUInt(m[S_SC_AVG_SPEED]));
}
// Old serverstat file doesn't contains MC_AVG_SPEED
if(m.find(S_MC_AVG_SPEED) != m.end()) {
sstat->setMultiConnectionAvgSpeed(Util::parseUInt(m[S_MC_AVG_SPEED]));
}
// Old serverstat file doesn't contains COUNTER_SPEED
if(m.find(S_COUNTER) != m.end()) {
sstat->setCounter(Util::parseUInt(m[S_COUNTER]));
}
sstat->setLastUpdated(Time(Util::parseInt(m[S_LAST_UPDATED])));
sstat->setStatus(m[S_STATUS]);
add(sstat);

View File

@ -115,7 +115,9 @@ void SpeedCalc::changeSw() {
unsigned int SpeedCalc::calculateAvgSpeed() const {
uint64_t milliElapsed = start.differenceInMillis();
if(milliElapsed) {
// if milliElapsed is too small, the average speed is rubish, better return 0
if(milliElapsed > 4) {
unsigned int speed = accumulatedLength*1000/milliElapsed;
return speed;
} else {

View File

@ -40,11 +40,18 @@
namespace aria2 {
class DownloadCommand;
class URISelector {
public:
virtual ~URISelector() {}
virtual std::string select(std::deque<std::string>& uris) = 0;
virtual void tuneDownloadCommand(std::deque<std::string>& uris,
DownloadCommand* command) {};
virtual void resetCounters() { return; };
};
} // namespace aria2

View File

@ -131,10 +131,11 @@ const std::string V_INFO("info");
const std::string V_NOTICE("notice");
const std::string V_WARN("warn");
const std::string V_ERROR("error");
// value: inorder | feedback
// value: inorder | feedback | adaptive
const std::string PREF_URI_SELECTOR("uri-selector");
const std::string V_INORDER("inorder");
const std::string V_FEEDBACK("feedback");
const std::string V_ADAPTIVE("adaptive");
// value: 1*digit
const std::string PREF_SERVER_STAT_TIMEOUT("server-stat-timeout");
// value: string that your file system recognizes as a file name.

View File

@ -135,10 +135,11 @@ extern const std::string V_INFO;
extern const std::string V_NOTICE;
extern const std::string V_WARN;
extern const std::string V_ERROR;
// value: inorder | feedback
// value: inorder | feedback | adaptive
extern const std::string PREF_URI_SELECTOR;
extern const std::string V_INORDER;
extern const std::string V_FEEDBACK;
extern const std::string V_ADAPTIVE;
// value: 1*digit
extern const std::string PREF_SERVER_STAT_TIMEOUT;
// value: string that your file system recognizes as a file name.

View File

@ -1,4 +1,9 @@
#include "RequestGroupMan.h"
#include <fstream>
#include <cppunit/extensions/HelperMacros.h>
#include "prefs.h"
#include "SingleFileDownloadContext.h"
#include "RequestGroup.h"
@ -8,8 +13,6 @@
#include "ServerStatMan.h"
#include "ServerStat.h"
#include "File.h"
#include <fstream>
#include <cppunit/extensions/HelperMacros.h>
namespace aria2 {
@ -97,6 +100,7 @@ void RequestGroupManTest::testLoadServerStat()
Option option;
RequestGroupMan rm(std::deque<SharedHandle<RequestGroup> >(), 0, &option);
std::cerr << "testLoadServerStat" << std::endl;
CPPUNIT_ASSERT(rm.loadServerStat(f.getPath()));
SharedHandle<ServerStat> ss_localhost = rm.findServerStat("localhost",
"http");

View File

@ -1,10 +1,13 @@
#include "ServerStatMan.h"
#include <iostream>
#include <sstream>
#include <cppunit/extensions/HelperMacros.h>
#include "ServerStat.h"
#include "Exception.h"
#include "Util.h"
#include <iostream>
#include <sstream>
#include <cppunit/extensions/HelperMacros.h>
namespace aria2 {
@ -58,6 +61,9 @@ void ServerStatManTest::testSave()
{
SharedHandle<ServerStat> localhost_http(new ServerStat("localhost", "http"));
localhost_http->setDownloadSpeed(25000);
localhost_http->setSingleConnectionAvgSpeed(100);
localhost_http->setMultiConnectionAvgSpeed(101);
localhost_http->setCounter(5);
localhost_http->setLastUpdated(Time(1210000000));
SharedHandle<ServerStat> localhost_ftp(new ServerStat("localhost", "ftp"));
localhost_ftp->setDownloadSpeed(30000);
@ -77,9 +83,29 @@ void ServerStatManTest::testSave()
std::string out = ss.str();
CPPUNIT_ASSERT_EQUAL
(std::string
("host=localhost, protocol=ftp, dl_speed=30000, last_updated=1210000001, status=OK\n"
"host=localhost, protocol=http, dl_speed=25000, last_updated=1210000000, status=OK\n"
"host=mirror, protocol=http, dl_speed=0, last_updated=1210000002, status=ERROR\n"),
("host=localhost, protocol=ftp,"
" dl_speed=30000,"
" sc_avg_speed=0,"
" mc_avg_speed=0,"
" last_updated=1210000001,"
" counter=0,"
" status=OK\n"
"host=localhost, protocol=http,"
" dl_speed=25000,"
" sc_avg_speed=100,"
" mc_avg_speed=101,"
" last_updated=1210000000,"
" counter=5,"
" status=OK\n"
"host=mirror, protocol=http,"
" dl_speed=0,"
" sc_avg_speed=0,"
" mc_avg_speed=0,"
" last_updated=1210000002,"
" counter=0,"
" status=ERROR\n"),
out);
}
@ -87,7 +113,7 @@ void ServerStatManTest::testLoad()
{
std::string in =
"host=localhost, protocol=ftp, dl_speed=30000, last_updated=1210000001, status=OK\n"
"host=localhost, protocol=http, dl_speed=25000, last_updated=1210000000, status=OK\n"
"host=localhost, protocol=http, dl_speed=25000, sc_avg_speed=101, mc_avg_speed=102, last_updated=1210000000, counter=6, status=OK\n"
"host=mirror, protocol=http, dl_speed=0, last_updated=1210000002, status=ERROR\n";
std::stringstream ss(in);
@ -101,6 +127,12 @@ void ServerStatManTest::testLoad()
CPPUNIT_ASSERT_EQUAL(std::string("http"), localhost_http->getProtocol());
CPPUNIT_ASSERT_EQUAL(static_cast<unsigned int>(25000),
localhost_http->getDownloadSpeed());
CPPUNIT_ASSERT_EQUAL(static_cast<unsigned int>(101),
localhost_http->getSingleConnectionAvgSpeed());
CPPUNIT_ASSERT_EQUAL(static_cast<unsigned int>(102),
localhost_http->getMultiConnectionAvgSpeed());
CPPUNIT_ASSERT_EQUAL(static_cast<unsigned int>(6),
localhost_http->getCounter());
CPPUNIT_ASSERT_EQUAL(static_cast<time_t>(1210000000),
localhost_http->getLastUpdated().getTime());
CPPUNIT_ASSERT_EQUAL(ServerStat::OK, localhost_http->getStatus());

View File

@ -1,10 +1,13 @@
#include "ServerStat.h"
#include "Exception.h"
#include "Util.h"
#include <iostream>
#include <sstream>
#include <cppunit/extensions/HelperMacros.h>
#include "Exception.h"
#include "Util.h"
namespace aria2 {
class ServerStatTest:public CppUnit::TestFixture {
@ -46,6 +49,9 @@ void ServerStatTest::testOperatorOstream()
ServerStat localhost_http("localhost", "http");
localhost_http.setDownloadSpeed(90000);
localhost_http.setLastUpdated(Time(1000));
localhost_http.setSingleConnectionAvgSpeed(101);
localhost_http.setMultiConnectionAvgSpeed(102);
localhost_http.setCounter(5);
std::stringstream ss;
@ -53,7 +59,9 @@ void ServerStatTest::testOperatorOstream()
CPPUNIT_ASSERT_EQUAL
(std::string
("host=localhost, protocol=http, dl_speed=90000, last_updated=1000, status=OK"),
("host=localhost, protocol=http, dl_speed=90000,"
" sc_avg_speed=101, mc_avg_speed=102,"
" last_updated=1000, counter=5, status=OK"),
ss.str());
ss.str("");
@ -67,7 +75,9 @@ void ServerStatTest::testOperatorOstream()
CPPUNIT_ASSERT_EQUAL
(std::string
("host=localhost, protocol=ftp, dl_speed=10000, last_updated=1210000000, status=ERROR"),
("host=localhost, protocol=ftp, dl_speed=10000,"
" sc_avg_speed=0, mc_avg_speed=0,"
" last_updated=1210000000, counter=0, status=ERROR"),
ss.str());
}