Merge branch 'oliviercommelarbre-optimize-concurrent-downloads'

pull/596/head
Tatsuhiro Tsujikawa 2016-03-16 21:26:15 +09:00
commit 1d9a4a8aec
15 changed files with 252 additions and 10 deletions

View File

@ -42,6 +42,7 @@
#include "LogFactory.h" #include "LogFactory.h"
#include "DownloadContext.h" #include "DownloadContext.h"
#include "fmt.h" #include "fmt.h"
#include "wallclock.h"
namespace aria2 { namespace aria2 {
@ -80,6 +81,17 @@ bool FillRequestGroupCommand::execute()
} }
} }
e_->addRoutineCommand(std::unique_ptr<Command>(this)); e_->addRoutineCommand(std::unique_ptr<Command>(this));
// let's make sure we come back here every second or so
// if we use the optimize-concurrent-download option
if (rgman->getOptimizeConcurrentDownloads()) {
const auto& now = global::wallclock();
if (std::chrono::duration_cast<std::chrono::seconds>(lastExecTime.difference(now)) >= 1_s) {
lastExecTime = now;
rgman->requestQueueCheck();
}
}
return false; return false;
} }

View File

@ -37,6 +37,7 @@
#include "Command.h" #include "Command.h"
#include "a2time.h" #include "a2time.h"
#include "TimerA2.h"
namespace aria2 { namespace aria2 {
@ -46,6 +47,7 @@ class DownloadEngine;
class FillRequestGroupCommand : public Command { class FillRequestGroupCommand : public Command {
private: private:
DownloadEngine* e_; DownloadEngine* e_;
Timer lastExecTime;
public: public:
FillRequestGroupCommand(cuid_t cuid, DownloadEngine* e); FillRequestGroupCommand(cuid_t cuid, DownloadEngine* e);

View File

@ -56,6 +56,11 @@ int NetStat::calculateDownloadSpeed()
return downloadSpeed_.calculateSpeed(); return downloadSpeed_.calculateSpeed();
} }
int NetStat::calculateNewestDownloadSpeed(int seconds)
{
return downloadSpeed_.calculateNewestSpeed(seconds);
}
int NetStat::calculateAvgDownloadSpeed() int NetStat::calculateAvgDownloadSpeed()
{ {
return avgDownloadSpeed_ = downloadSpeed_.calculateAvgSpeed(); return avgDownloadSpeed_ = downloadSpeed_.calculateAvgSpeed();
@ -63,6 +68,11 @@ int NetStat::calculateAvgDownloadSpeed()
int NetStat::calculateUploadSpeed() { return uploadSpeed_.calculateSpeed(); } int NetStat::calculateUploadSpeed() { return uploadSpeed_.calculateSpeed(); }
int NetStat::calculateNewestUploadSpeed(int seconds)
{
return uploadSpeed_.calculateNewestSpeed(seconds);
}
int NetStat::calculateAvgUploadSpeed() int NetStat::calculateAvgUploadSpeed()
{ {
return avgUploadSpeed_ = uploadSpeed_.calculateAvgSpeed(); return avgUploadSpeed_ = uploadSpeed_.calculateAvgSpeed();

View File

@ -61,10 +61,14 @@ public:
*/ */
int calculateDownloadSpeed(); int calculateDownloadSpeed();
int calculateNewestDownloadSpeed(int seconds);
int calculateAvgDownloadSpeed(); int calculateAvgDownloadSpeed();
int calculateUploadSpeed(); int calculateUploadSpeed();
int calculateNewestUploadSpeed(int seconds);
int calculateAvgUploadSpeed(); int calculateAvgUploadSpeed();
void updateDownload(size_t bytes); void updateDownload(size_t bytes);

View File

@ -416,6 +416,16 @@ std::vector<OptionHandler*> OptionHandlerFactory::createOptionHandlers()
op->setChangeGlobalOption(true); op->setChangeGlobalOption(true);
handlers.push_back(op); handlers.push_back(op);
} }
{
OptionHandler* op(new OptimizeConcurrentDownloadsOptionHandler
(PREF_OPTIMIZE_CONCURRENT_DOWNLOADS,
TEXT_OPTIMIZE_CONCURRENT_DOWNLOADS,
A2_V_FALSE,
OptionHandler::OPT_ARG));
op->addTag(TAG_BASIC);
op->setChangeGlobalOption(true);
handlers.push_back(op);
}
{ {
OptionHandler* op(new NumberOptionHandler(PREF_MAX_CONNECTION_PER_SERVER, OptionHandler* op(new NumberOptionHandler(PREF_MAX_CONNECTION_PER_SERVER,
TEXT_MAX_CONNECTION_PER_SERVER, TEXT_MAX_CONNECTION_PER_SERVER,

View File

@ -586,6 +586,61 @@ std::string PrioritizePieceOptionHandler::createPossibleValuesString() const
return "head[=SIZE], tail[=SIZE]"; return "head[=SIZE], tail[=SIZE]";
} }
OptimizeConcurrentDownloadsOptionHandler::OptimizeConcurrentDownloadsOptionHandler(
PrefPtr pref, const char* description, const std::string& defaultValue, char shortName)
: AbstractOptionHandler(pref, description, defaultValue,
OptionHandler::OPT_ARG, shortName)
{
}
void OptimizeConcurrentDownloadsOptionHandler::parseArg(Option& option,
const std::string& optarg) const
{
if (optarg == "true" || optarg.empty()) {
option.put(pref_, A2_V_TRUE);
}
else if (optarg == "false") {
option.put(pref_, A2_V_FALSE);
}
else {
auto p = util::divide(std::begin(optarg), std::end(optarg), ':');
std::string coeff_b(p.second.first,p.second.second);
if(coeff_b.empty()) {
std::string msg = pref_->k;
msg += " ";
msg += _("must be either 'true', 'false' or a pair numeric coefficients A and B under the form 'A:B'.");
throw DL_ABORT_EX(msg);
}
std::string coeff_a(p.first.first,p.first.second);
PrefPtr pref=PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFA;
std::string *sptr = &coeff_a;
for(;;) {
try {
double dbl = std::stod(*sptr);
} catch(std::invalid_argument & ex) {
throw DL_ABORT_EX(fmt("Bad number '%s'", sptr->c_str()));
}
option.put(pref,*sptr);
if(pref == PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFB) {
break;
}
pref = PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFB;
sptr = &coeff_b;
}
option.put(pref_, A2_V_TRUE);
}
}
std::string OptimizeConcurrentDownloadsOptionHandler::createPossibleValuesString() const
{
return "true, false, A:B";
}
DeprecatedOptionHandler::DeprecatedOptionHandler( DeprecatedOptionHandler::DeprecatedOptionHandler(
OptionHandler* depOptHandler, const OptionHandler* repOptHandler, OptionHandler* depOptHandler, const OptionHandler* repOptHandler,
bool stillWork, std::string additionalMessage) bool stillWork, std::string additionalMessage)

View File

@ -254,6 +254,17 @@ public:
virtual std::string createPossibleValuesString() const CXX11_OVERRIDE; virtual std::string createPossibleValuesString() const CXX11_OVERRIDE;
}; };
class OptimizeConcurrentDownloadsOptionHandler : public AbstractOptionHandler {
public:
OptimizeConcurrentDownloadsOptionHandler(
PrefPtr pref, const char* description = NO_DESCRIPTION,
const std::string& defaultValue = NO_DEFAULT_VALUE, char shortName = 0);
virtual void parseArg(Option& option,
const std::string& optarg) const CXX11_OVERRIDE;
virtual std::string createPossibleValuesString() const CXX11_OVERRIDE;
};
// This class is used to deprecate option and optionally handle its // This class is used to deprecate option and optionally handle its
// option value using replacing option. // option value using replacing option.
class DeprecatedOptionHandler : public OptionHandler { class DeprecatedOptionHandler : public OptionHandler {

View File

@ -83,6 +83,7 @@
#include "SimpleRandomizer.h" #include "SimpleRandomizer.h"
#include "array_fun.h" #include "array_fun.h"
#include "OpenedFileCounter.h" #include "OpenedFileCounter.h"
#include "wallclock.h"
#ifdef ENABLE_BITTORRENT #ifdef ENABLE_BITTORRENT
#include "bittorrent_helper.h" #include "bittorrent_helper.h"
#endif // ENABLE_BITTORRENT #endif // ENABLE_BITTORRENT
@ -102,8 +103,12 @@ void appendReservedGroup(RequestGroupList& list, InputIterator first,
RequestGroupMan::RequestGroupMan( RequestGroupMan::RequestGroupMan(
std::vector<std::shared_ptr<RequestGroup>> requestGroups, std::vector<std::shared_ptr<RequestGroup>> requestGroups,
int maxSimultaneousDownloads, const Option* option) int maxConcurrentDownloads, const Option* option)
: maxSimultaneousDownloads_(maxSimultaneousDownloads), : maxConcurrentDownloads_(maxConcurrentDownloads),
optimizeConcurrentDownloads_(false),
optimizeConcurrentDownloadsCoeffA_(5.),
optimizeConcurrentDownloadsCoeffB_(25.),
optimizationSpeed_(0),
numActive_(0), numActive_(0),
option_(option), option_(option),
serverStatMan_(std::make_shared<ServerStatMan>()), serverStatMan_(std::make_shared<ServerStatMan>()),
@ -120,12 +125,25 @@ RequestGroupMan::RequestGroupMan(
this, option->getAsInt(PREF_BT_MAX_OPEN_FILES))), this, option->getAsInt(PREF_BT_MAX_OPEN_FILES))),
numStoppedTotal_(0) numStoppedTotal_(0)
{ {
setupOptimizeConcurrentDownloads();
appendReservedGroup(reservedGroups_, requestGroups.begin(), appendReservedGroup(reservedGroups_, requestGroups.begin(),
requestGroups.end()); requestGroups.end());
} }
RequestGroupMan::~RequestGroupMan() { openedFileCounter_->deactivate(); } RequestGroupMan::~RequestGroupMan() { openedFileCounter_->deactivate(); }
bool RequestGroupMan::setupOptimizeConcurrentDownloads(void)
{
optimizeConcurrentDownloads_ = option_->getAsBool(PREF_OPTIMIZE_CONCURRENT_DOWNLOADS);
if (optimizeConcurrentDownloads_) {
if (option_->defined(PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFA)) {
optimizeConcurrentDownloadsCoeffA_ = std::stod(option_->get(PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFA));
optimizeConcurrentDownloadsCoeffB_ = std::stod(option_->get(PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFB));
}
}
return optimizeConcurrentDownloads_;
}
bool RequestGroupMan::downloadFinished() bool RequestGroupMan::downloadFinished()
{ {
if (keepRunning_) { if (keepRunning_) {
@ -474,11 +492,14 @@ createInitialCommand(const std::shared_ptr<RequestGroup>& requestGroup,
void RequestGroupMan::fillRequestGroupFromReserver(DownloadEngine* e) void RequestGroupMan::fillRequestGroupFromReserver(DownloadEngine* e)
{ {
removeStoppedGroup(e); removeStoppedGroup(e);
if (static_cast<size_t>(maxSimultaneousDownloads_) <= numActive_) {
int maxConcurrentDownloads = optimizeConcurrentDownloads_ ? optimizeConcurrentDownloads() : maxConcurrentDownloads_;
if (static_cast<size_t>(maxConcurrentDownloads) <= numActive_) {
return; return;
} }
int count = 0; int count = 0;
int num = maxSimultaneousDownloads_ - numActive_; int num = maxConcurrentDownloads - numActive_;
std::vector<std::shared_ptr<RequestGroup>> pending; std::vector<std::shared_ptr<RequestGroup>> pending;
while (count < num && (uriListParser_ || !reservedGroups_.empty())) { while (count < num && (uriListParser_ || !reservedGroups_.empty())) {
@ -1005,4 +1026,49 @@ void RequestGroupMan::decreaseNumActive()
--numActive_; --numActive_;
} }
int RequestGroupMan::optimizeConcurrentDownloads()
{
// gauge the current speed
int currentSpeed = getNetStat().calculateDownloadSpeed();
const auto& now = global::wallclock();
if (currentSpeed >= optimizationSpeed_) {
optimizationSpeed_ = currentSpeed;
optimizationSpeedTimer_ = now;
} else if (std::chrono::duration_cast<std::chrono::seconds>(optimizationSpeedTimer_.difference(now)) >= 5_s) {
// we keep using the reference speed for minimum 5 seconds so reset the timer
optimizationSpeedTimer_ = now;
// keep the reference speed as long as the speed tends to augment or to maintain itself within 10%
if (currentSpeed >= 1.1 * getNetStat().calculateNewestDownloadSpeed(5)) {
// else assume a possible congestion and record a new optimization speed by dichotomy
optimizationSpeed_ = (optimizationSpeed_ + currentSpeed)/2.;
}
}
if (optimizationSpeed_ <= 0) {
return 1;
}
// apply the rule
if ((maxOverallDownloadSpeedLimit_ > 0) && (optimizationSpeed_ > maxOverallDownloadSpeedLimit_)) {
optimizationSpeed_ = maxOverallDownloadSpeedLimit_;
}
int maxConcurrentDownloads = ceil(
optimizeConcurrentDownloadsCoeffA_
+ optimizeConcurrentDownloadsCoeffB_ * log10(optimizationSpeed_ * 8. / 1000000.)
);
// bring the value in bound between 1 and the defined maximum
maxConcurrentDownloads = std::min(std::max(1, maxConcurrentDownloads), maxConcurrentDownloads_);
A2_LOG_DEBUG
(fmt("Max concurrent downloads optimized at %d (%lu currently active) "
"[optimization speed %sB/s, current speed %sB/s]",
maxConcurrentDownloads, numActive_, util::abbrevSize(optimizationSpeed_).c_str(),
util::abbrevSize(currentSpeed).c_str()));
return maxConcurrentDownloads;
}
} // namespace aria2 } // namespace aria2

View File

@ -72,7 +72,13 @@ private:
RequestGroupList reservedGroups_; RequestGroupList reservedGroups_;
DownloadResultList downloadResults_; DownloadResultList downloadResults_;
int maxSimultaneousDownloads_; int maxConcurrentDownloads_;
bool optimizeConcurrentDownloads_;
double optimizeConcurrentDownloadsCoeffA_;
double optimizeConcurrentDownloadsCoeffB_;
int optimizationSpeed_;
Timer optimizationSpeedTimer_;
// The number of simultaneous active downloads, excluding seed only // The number of simultaneous active downloads, excluding seed only
// item if PREF_BT_DETACH_SEED_ONLY is true. We rely on this // item if PREF_BT_DETACH_SEED_ONLY is true. We rely on this
@ -135,9 +141,11 @@ private:
void addRequestGroupIndex( void addRequestGroupIndex(
const std::vector<std::shared_ptr<RequestGroup>>& groups); const std::vector<std::shared_ptr<RequestGroup>>& groups);
int optimizeConcurrentDownloads();
public: public:
RequestGroupMan(std::vector<std::shared_ptr<RequestGroup>> requestGroups, RequestGroupMan(std::vector<std::shared_ptr<RequestGroup>> requestGroups,
int maxSimultaneousDownloads, const Option* option); int maxConcurrentDownloads, const Option* option);
~RequestGroupMan(); ~RequestGroupMan();
@ -195,6 +203,13 @@ public:
bool removeReservedGroup(a2_gid_t gid); bool removeReservedGroup(a2_gid_t gid);
bool getOptimizeConcurrentDownloads() const
{
return optimizeConcurrentDownloads_;
}
bool setupOptimizeConcurrentDownloads();
void showDownloadResults(OutputFile& o, bool full) const; void showDownloadResults(OutputFile& o, bool full) const;
bool isSameFileBeingDownloaded(RequestGroup* requestGroup) const; bool isSameFileBeingDownloaded(RequestGroup* requestGroup) const;
@ -291,7 +306,7 @@ public:
return maxOverallUploadSpeedLimit_; return maxOverallUploadSpeedLimit_;
} }
void setMaxSimultaneousDownloads(int max) { maxSimultaneousDownloads_ = max; } void setMaxConcurrentDownloads(int max) { maxConcurrentDownloads_ = max; }
// Call this function if requestGroups_ queue should be maintained. // Call this function if requestGroups_ queue should be maintained.
// This function is added to reduce the call of maintenance, but at // This function is added to reduce the call of maintenance, but at

View File

@ -1592,10 +1592,14 @@ void changeGlobalOption(const Option& option, DownloadEngine* e)
option.getAsInt(PREF_MAX_OVERALL_UPLOAD_LIMIT)); option.getAsInt(PREF_MAX_OVERALL_UPLOAD_LIMIT));
} }
if (option.defined(PREF_MAX_CONCURRENT_DOWNLOADS)) { if (option.defined(PREF_MAX_CONCURRENT_DOWNLOADS)) {
e->getRequestGroupMan()->setMaxSimultaneousDownloads( e->getRequestGroupMan()->setMaxConcurrentDownloads(
option.getAsInt(PREF_MAX_CONCURRENT_DOWNLOADS)); option.getAsInt(PREF_MAX_CONCURRENT_DOWNLOADS));
e->getRequestGroupMan()->requestQueueCheck(); e->getRequestGroupMan()->requestQueueCheck();
} }
if(option.defined(PREF_OPTIMIZE_CONCURRENT_DOWNLOADS)) {
e->getRequestGroupMan()->setupOptimizeConcurrentDownloads();
e->getRequestGroupMan()->requestQueueCheck();
}
if (option.defined(PREF_MAX_DOWNLOAD_RESULT)) { if (option.defined(PREF_MAX_DOWNLOAD_RESULT)) {
e->getRequestGroupMan()->setMaxDownloadResult( e->getRequestGroupMan()->setMaxDownloadResult(
option.getAsInt(PREF_MAX_DOWNLOAD_RESULT)); option.getAsInt(PREF_MAX_DOWNLOAD_RESULT));

View File

@ -42,7 +42,7 @@
namespace aria2 { namespace aria2 {
namespace { namespace {
constexpr auto WINDOW_TIME = 15_s; constexpr auto WINDOW_TIME = 10_s;
} // namespace } // namespace
SpeedCalc::SpeedCalc() : accumulatedLength_(0), bytesWindow_(0), maxSpeed_(0) {} SpeedCalc::SpeedCalc() : accumulatedLength_(0), bytesWindow_(0), maxSpeed_(0) {}
@ -84,6 +84,31 @@ int SpeedCalc::calculateSpeed()
return speed; return speed;
} }
int SpeedCalc::calculateNewestSpeed(int seconds)
{
const auto& now = global::wallclock();
removeStaleTimeSlot(now);
int64_t bytesCount(0);
auto it = timeSlots_.rbegin();
while (it != timeSlots_.rend()) {
if (it->first.difference(now) > seconds * 1_s) {
break;
}
bytesCount += (*it++).second;
}
if (it == timeSlots_.rbegin()) {
return 0;
}
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
(*--it).first.difference(now)).count();
if (elapsed <= 0) {
elapsed = 1;
}
return bytesCount * (1000. / elapsed);
}
void SpeedCalc::update(size_t bytes) void SpeedCalc::update(size_t bytes)
{ {
const auto& now = global::wallclock(); const auto& now = global::wallclock();

View File

@ -61,6 +61,8 @@ public:
*/ */
int calculateSpeed(); int calculateSpeed();
int calculateNewestSpeed(int seconds);
int getMaxSpeed() const { return maxSpeed_; } int getMaxSpeed() const { return maxSpeed_; }
int calculateAvgSpeed() const; int calculateAvgSpeed() const;

View File

@ -216,6 +216,12 @@ PrefPtr PREF_INPUT_FILE = makePref("input-file");
PrefPtr PREF_DEFERRED_INPUT = makePref("deferred-input"); PrefPtr PREF_DEFERRED_INPUT = makePref("deferred-input");
// value: 1*digit // value: 1*digit
PrefPtr PREF_MAX_CONCURRENT_DOWNLOADS = makePref("max-concurrent-downloads"); PrefPtr PREF_MAX_CONCURRENT_DOWNLOADS = makePref("max-concurrent-downloads");
// value: true | false | A:B
PrefPtr PREF_OPTIMIZE_CONCURRENT_DOWNLOADS = makePref("optimize-concurrent-downloads");
// values: 1*digit ['.' [ 1*digit ] ]
PrefPtr PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFA = makePref("optimize-concurrent-downloads-coeffA");
// values: 1*digit ['.' [ 1*digit ] ]
PrefPtr PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFB = makePref("optimize-concurrent-downloads-coeffB");
// value: true | false // value: true | false
PrefPtr PREF_FORCE_SEQUENTIAL = makePref("force-sequential"); PrefPtr PREF_FORCE_SEQUENTIAL = makePref("force-sequential");
// value: true | false // value: true | false

View File

@ -173,6 +173,12 @@ extern PrefPtr PREF_DEFERRED_INPUT;
// value: 1*digit // value: 1*digit
extern PrefPtr PREF_MAX_CONCURRENT_DOWNLOADS; extern PrefPtr PREF_MAX_CONCURRENT_DOWNLOADS;
// value: true | false // value: true | false
extern PrefPtr PREF_OPTIMIZE_CONCURRENT_DOWNLOADS;
// value: 1*digit ['.' [ 1*digit ] ]
extern PrefPtr PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFA;
// value: 1*digit ['.' [ 1*digit ] ]
extern PrefPtr PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFB;
// value: true | false
extern PrefPtr PREF_FORCE_SEQUENTIAL; extern PrefPtr PREF_FORCE_SEQUENTIAL;
// value: true | false // value: true | false
extern PrefPtr PREF_AUTO_FILE_RENAMING; extern PrefPtr PREF_AUTO_FILE_RENAMING;

View File

@ -242,7 +242,21 @@
#define TEXT_MAX_CONCURRENT_DOWNLOADS \ #define TEXT_MAX_CONCURRENT_DOWNLOADS \
_(" -j, --max-concurrent-downloads=N Set maximum number of parallel downloads for\n" \ _(" -j, --max-concurrent-downloads=N Set maximum number of parallel downloads for\n" \
" every static (HTTP/FTP) URL, torrent and metalink.\n" \ " every static (HTTP/FTP) URL, torrent and metalink.\n" \
" See also --split option.") " See also --split and --optimize-concurrent-downloads options.")
#define TEXT_OPTIMIZE_CONCURRENT_DOWNLOADS\
_(" --optimize-concurrent-downloads[=true|false|A:B] Optimizes the number of\n" \
" concurrent downloads according to the bandwidth\n" \
" available. aria2 uses the download speed observed\n" \
" in the previous downloads to adapt the number of\n" \
" downloads launched in parallel according to the rule\n" \
" N = A + B Log10(speed in Mbps). The coefficients\n" \
" A and B can be customized in the option arguments\n" \
" with A and B separated by a colon. The default values\n" \
" (A=5,B=25) lead to using typically 5 parallel\n" \
" downloads on 1Mbps networks and above 50 on 100Mbps\n" \
" networks. The number of parallel downloads remains\n" \
" constrained under the maximum defined by the\n" \
" max-concurrent-downloads parameter.")
#define TEXT_LOAD_COOKIES \ #define TEXT_LOAD_COOKIES \
_(" --load-cookies=FILE Load Cookies from FILE using the Firefox3 format\n" \ _(" --load-cookies=FILE Load Cookies from FILE using the Firefox3 format\n" \
" and Mozilla/Firefox(1.x/2.x)/Netscape format.") " and Mozilla/Firefox(1.x/2.x)/Netscape format.")