diff --git a/ChangeLog b/ChangeLog index da5a8404..2611d89d 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,21 @@ +2008-04-13 Tatsuhiro Tsujikawa + + Rewritten choking algorithm. + * src/PeerChokeCommand.{cc, h} + * src/BtInterestedMessage.cc + * src/PeerSessionResource.{cc, h} + * src/DefaultPeerStorage.{cc, h} + * src/BtNotInterestedMessage.cc + * src/DefaultBtMessageDispatcher.{cc, h} + * src/BtMessageDispatcher.h + * src/Peer.{cc, h} + * src/BtLeecherStateChoke.{cc, h} + * src/BtSetup.cc + * src/BtSeederStateChoke.{cc, h} + * src/PeerStorage.h + * test/MockPeerStorage.h + * test/MockBtMessageDispatcher.h + 2008-04-09 Tatsuhiro Tsujikawa Fixed compilation error on x84-64 platform. diff --git a/src/BtInterestedMessage.cc b/src/BtInterestedMessage.cc index f4292de5..da4f21ef 100644 --- a/src/BtInterestedMessage.cc +++ b/src/BtInterestedMessage.cc @@ -37,6 +37,9 @@ #include "DlAbortEx.h" #include "message.h" #include "Peer.h" +#include "BtRegistry.h" +#include "BtContext.h" +#include "PeerStorage.h" namespace aria2 { @@ -54,6 +57,9 @@ BtInterestedMessageHandle BtInterestedMessage::create(const unsigned char* data, void BtInterestedMessage::doReceivedAction() { peer->peerInterested(true); + if(!peer->amChoking()) { + PEER_STORAGE(btContext)->executeChoke(); + } } bool BtInterestedMessage::sendPredicate() const { diff --git a/src/BtLeecherStateChoke.cc b/src/BtLeecherStateChoke.cc new file mode 100644 index 00000000..f72a4c35 --- /dev/null +++ b/src/BtLeecherStateChoke.cc @@ -0,0 +1,179 @@ +/* */ +#include "BtLeecherStateChoke.h" +#include "Peer.h" +#include "Logger.h" +#include "LogFactory.h" +#include "a2time.h" +#include + +namespace aria2 { + +BtLeecherStateChoke::BtLeecherStateChoke(): + _round(0), + _lastRound(0), + _logger(LogFactory::getInstance()) {} + +BtLeecherStateChoke::~BtLeecherStateChoke() {} + +class PeerFilter { +private: + bool _amChoking; + bool _peerInterested; +public: + PeerFilter(bool amChoking, bool peerInterested): + _amChoking(amChoking), + _peerInterested(peerInterested) {} + + bool operator()(const Peer* peer) const + { + return peer->amChoking() == _amChoking && + peer->peerInterested() == _peerInterested; + } +}; + +class RegularUnchoker { +public: + bool operator()(const Peer* peer) const + { + // peer must be interested to us and sent block in the last 30 seconds + return peer->peerInterested() && !peer->getLastDownloadUpdate().elapsed(30); + } +}; + +class DownloadFaster { +private: + const struct timeval _now; +public: + + DownloadFaster(const struct timeval& now):_now(now) {} + + bool operator() (Peer* left, Peer* right) const + { + return left->calculateDownloadSpeed(_now) > right->calculateDownloadSpeed(_now); + } +}; + +class SnubbedPeer { +public: + bool operator() (const Peer* peer) const + { + return peer->snubbing(); + } +}; + +void BtLeecherStateChoke::plannedOptimisticUnchoke(std::deque& peers) +{ + std::for_each(peers.begin(), peers.end(), + std::bind2nd(std::mem_fun((void (Peer::*)(bool))&Peer::optUnchoking), false)); + + std::deque::iterator i = std::partition(peers.begin(), peers.end(), PeerFilter(true, true)); + if(i != peers.begin()) { + std::random_shuffle(peers.begin(), i); + (*peers.begin())->optUnchoking(true); + _logger->info("POU: %s", (*peers.begin())->ipaddr.c_str()); + } +} + +void BtLeecherStateChoke::regularUnchoke(std::deque& peers) +{ + std::deque::iterator rest = std::partition(peers.begin(), peers.end(), RegularUnchoker()); + + struct timeval now; + gettimeofday(&now, 0); + + std::sort(peers.begin(), rest, DownloadFaster(now)); + + // the number of regular unchokers + int count = 3; + + bool fastOptUnchoker = false; + std::deque::iterator peerIter = peers.begin(); + for(;peerIter != rest && count; ++peerIter, --count) { + (*peerIter)->chokingRequired(false); + _logger->info("RU: %s, dlspd=%u", (*peerIter)->ipaddr.c_str(), (*peerIter)->calculateDownloadSpeed(now)); + if((*peerIter)->optUnchoking()) { + fastOptUnchoker = true; + (*peerIter)->optUnchoking(false); + } + } + if(fastOptUnchoker) { + std::random_shuffle(peerIter, peers.end()); + for(std::deque::iterator i = peerIter; i != peers.end(); ++i) { + if((*i)->peerInterested()) { + (*i)->optUnchoking(true); + _logger->info("OU: %s", (*i)->ipaddr.c_str()); + break; + } else { + (*i)->chokingRequired(false); + _logger->info("OU: %s", (*i)->ipaddr.c_str()); + } + } + } +} + +void +BtLeecherStateChoke::executeChoke(const std::deque >& peerSet) +{ + _logger->info("Leecher state, %d choke round started", _round); + _lastRound.reset(); + + std::deque peers; + std::transform(peerSet.begin(), peerSet.end(), std::back_inserter(peers), + std::mem_fun_ref(&SharedHandle::get)); + + peers.erase(std::remove_if(peers.begin(), peers.end(), SnubbedPeer()), + peers.end()); + + std::for_each(peers.begin(), peers.end(), + std::bind2nd(std::mem_fun((void (Peer::*)(bool))&Peer::chokingRequired), true)); + + // planned optimistic unchoke + if(_round == 0) { + plannedOptimisticUnchoke(peers); + } + regularUnchoke(peers); + + if(++_round == 3) { + _round = 0; + } +} + +const Time& BtLeecherStateChoke::getLastRound() const +{ + return _lastRound; +} + +} // namespace aria2 diff --git a/src/BtLeecherStateChoke.h b/src/BtLeecherStateChoke.h new file mode 100644 index 00000000..d898775b --- /dev/null +++ b/src/BtLeecherStateChoke.h @@ -0,0 +1,72 @@ +/* */ +#ifndef _D_BT_LEECHER_STATE_CHOKE_H_ +#define _D_BT_LEECHER_STATE_CHOKE_H_ + +#include "common.h" +#include "SharedHandle.h" +#include "TimeA2.h" +#include + +namespace aria2 { + +class Peer; +class Logger; + +class BtLeecherStateChoke { +private: + int _round; + + Time _lastRound; + + const Logger* _logger; + + void plannedOptimisticUnchoke(std::deque& peers); + + void regularUnchoke(std::deque& peers); + +public: + BtLeecherStateChoke(); + + ~BtLeecherStateChoke(); + + void executeChoke(const std::deque >& peerSet); + + const Time& getLastRound() const; +}; + +} // namespace aria2 + +#endif // _D_BT_LEECHER_STATE_CHOKE_H_ diff --git a/src/BtMessageDispatcher.h b/src/BtMessageDispatcher.h index b083bf9d..1d6c8cfe 100644 --- a/src/BtMessageDispatcher.h +++ b/src/BtMessageDispatcher.h @@ -81,6 +81,8 @@ public: virtual void removeOutstandingRequest(const RequestSlot& slot) = 0; virtual void addOutstandingRequest(const RequestSlot& slot) = 0; + + virtual size_t countOutstandingUpload() = 0; }; typedef SharedHandle BtMessageDispatcherHandle; diff --git a/src/BtNotInterestedMessage.cc b/src/BtNotInterestedMessage.cc index 1c79d1de..c72556d4 100644 --- a/src/BtNotInterestedMessage.cc +++ b/src/BtNotInterestedMessage.cc @@ -37,6 +37,9 @@ #include "DlAbortEx.h" #include "message.h" #include "Peer.h" +#include "BtRegistry.h" +#include "BtContext.h" +#include "PeerStorage.h" namespace aria2 { @@ -54,6 +57,9 @@ BtNotInterestedMessageHandle BtNotInterestedMessage::create(const unsigned char* void BtNotInterestedMessage::doReceivedAction() { peer->peerInterested(false); + if(!peer->amChoking()) { + PEER_STORAGE(btContext)->executeChoke(); + } } bool BtNotInterestedMessage::sendPredicate() const { diff --git a/src/BtSeederStateChoke.cc b/src/BtSeederStateChoke.cc new file mode 100644 index 00000000..3e45d344 --- /dev/null +++ b/src/BtSeederStateChoke.cc @@ -0,0 +1,151 @@ +/* */ +#include "BtSeederStateChoke.h" +#include "BtContext.h" +#include "Peer.h" +#include "BtRegistry.h" +#include "PeerObject.h" +#include "BtMessageDispatcher.h" +#include "BtMessageFactory.h" +#include "BtRequestFactory.h" +#include "BtMessageReceiver.h" +#include "PeerConnection.h" +#include "ExtensionMessageFactory.h" +#include "Logger.h" +#include "LogFactory.h" +#include "a2time.h" +#include + +namespace aria2 { + +BtSeederStateChoke::BtSeederStateChoke(const SharedHandle& btContext): + _btContext(btContext), + _round(0), + _lastRound(0), + _logger(LogFactory::getInstance()) {} + +BtSeederStateChoke::~BtSeederStateChoke() {} + +class RecentUnchoke { +private: + SharedHandle _btContext; + + const struct timeval _now; +public: + RecentUnchoke(const SharedHandle& btContext, + const struct timeval& now): + _btContext(btContext), _now(now) {} + + bool operator()(Peer* left, Peer* right) const + { + size_t leftUpload = BT_MESSAGE_DISPATCHER(_btContext, left)->countOutstandingRequest(); + size_t rightUpload = BT_MESSAGE_DISPATCHER(_btContext, right)->countOutstandingRequest(); + if(leftUpload && !rightUpload) { + return true; + } else if(!leftUpload && rightUpload) { + return false; + } + const int TIME_FRAME = 20; + if(!left->getLastAmUnchoking().elapsed(TIME_FRAME) && + left->getLastAmUnchoking().isNewer(right->getLastAmUnchoking())) { + return true; + } else if(!right->getLastAmUnchoking().elapsed(TIME_FRAME) && + right->getLastAmUnchoking().isNewer(left->getLastAmUnchoking())) { + return false; + } else { + return left->calculateUploadSpeed(_now) > right->calculateUploadSpeed(_now); + } + } +}; + +class NotInterestedPeer { +public: + bool operator()(const Peer* peer) const + { + return !peer->peerInterested(); + } +}; + +void BtSeederStateChoke::unchoke(std::deque& peers) +{ + int count = (_round == 2) ? 4 : 3; + + struct timeval now; + gettimeofday(&now, 0); + + std::sort(peers.begin(), peers.end(), RecentUnchoke(_btContext, now)); + + std::deque::iterator r = peers.begin(); + for(; r != peers.end() && count; ++r, --count) { + (*r)->chokingRequired(false); + _logger->info("RU: %s, ulspd=%u", (*r)->ipaddr.c_str(), + (*r)->calculateUploadSpeed(now)); + } + if(_round == 2 && r != peers.end()) { + std::random_shuffle(r, peers.end()); + (*r)->optUnchoking(true); + _logger->info("POU: %s", (*r)->ipaddr.c_str()); + } +} + +void +BtSeederStateChoke::executeChoke(const std::deque >& peerSet) +{ + _logger->info("Seeder state, %d choke round started", _round); + _lastRound.reset(); + + std::deque peers; + std::transform(peerSet.begin(), peerSet.end(), std::back_inserter(peers), + std::mem_fun_ref(&SharedHandle::get)); + + std::for_each(peers.begin(), peers.end(), + std::bind2nd(std::mem_fun((void (Peer::*)(bool))&Peer::chokingRequired), true)); + + peers.erase(std::remove_if(peers.begin(), peers.end(), NotInterestedPeer()), + peers.end()); + + unchoke(peers); + + if(++_round == 3) { + _round = 0; + } +} + +const Time& BtSeederStateChoke::getLastRound() const +{ + return _lastRound; +} + +} // namespace aria2 diff --git a/src/BtSeederStateChoke.h b/src/BtSeederStateChoke.h new file mode 100644 index 00000000..8255dcd8 --- /dev/null +++ b/src/BtSeederStateChoke.h @@ -0,0 +1,72 @@ +/* */ +#ifndef _D_BT_SEEDER_STATE_CHOKE_H_ +#define _D_BT_SEEDER_STATE_CHOKE_H_ + +#include "common.h" +#include "SharedHandle.h" +#include "TimeA2.h" +#include + +namespace aria2 { + +class BtContext; +class Peer; +class Logger; + +class BtSeederStateChoke { +private: + SharedHandle _btContext; + + int _round; + + Time _lastRound; + + const Logger* _logger; + + void unchoke(std::deque& peers); +public: + BtSeederStateChoke(const SharedHandle& btContext); + + ~BtSeederStateChoke(); + + void executeChoke(const std::deque >& peerSet); + + const Time& getLastRound() const; +}; + +} // namespace aria2 + +#endif // _D_BT_SEEDER_STATE_CHOKE_H_ diff --git a/src/BtSetup.cc b/src/BtSetup.cc index 3dc7fc1a..f0f4866d 100644 --- a/src/BtSetup.cc +++ b/src/BtSetup.cc @@ -76,10 +76,8 @@ Commands BtSetup::setup(RequestGroup* requestGroup, e, btContext)); commands.push_back(new PeerChokeCommand(CUIDCounterSingletonHolder::instance()->newID(), - requestGroup, e, - btContext, - 10)); + btContext)); commands.push_back(new ActivePeerConnectionCommand(CUIDCounterSingletonHolder::instance()->newID(), requestGroup, e, btContext, 10)); diff --git a/src/DefaultBtMessageDispatcher.cc b/src/DefaultBtMessageDispatcher.cc index 0230dca1..3067c76b 100644 --- a/src/DefaultBtMessageDispatcher.cc +++ b/src/DefaultBtMessageDispatcher.cc @@ -49,6 +49,7 @@ #include "Piece.h" #include "LogFactory.h" #include "Logger.h" +#include "a2functional.h" #include namespace aria2 { @@ -266,6 +267,12 @@ void DefaultBtMessageDispatcher::addOutstandingRequest(const RequestSlot& reques } } +size_t DefaultBtMessageDispatcher::countOutstandingUpload() +{ + return std::count_if(messageQueue.begin(), messageQueue.end(), + mem_fun_sh(&BtMessage::isUploading)); +} + std::deque >& DefaultBtMessageDispatcher::getMessageQueue() { diff --git a/src/DefaultBtMessageDispatcher.h b/src/DefaultBtMessageDispatcher.h index e5a0817e..c4fc9bad 100644 --- a/src/DefaultBtMessageDispatcher.h +++ b/src/DefaultBtMessageDispatcher.h @@ -101,6 +101,8 @@ public: virtual void addOutstandingRequest(const RequestSlot& requestSlot); + virtual size_t countOutstandingUpload(); + std::deque >& getMessageQueue(); RequestSlots& getRequestSlots(); diff --git a/src/DefaultPeerStorage.cc b/src/DefaultPeerStorage.cc index bc6a60af..63d21c72 100644 --- a/src/DefaultPeerStorage.cc +++ b/src/DefaultPeerStorage.cc @@ -41,6 +41,9 @@ #include "Peer.h" #include "BtContext.h" #include "BtRuntime.h" +#include "BtSeederStateChoke.h" +#include "BtLeecherStateChoke.h" +#include "PieceStorage.h" #include namespace aria2 { @@ -52,12 +55,18 @@ DefaultPeerStorage::DefaultPeerStorage(const BtContextHandle& btContext, maxPeerListSize(MAX_PEER_LIST_SIZE), btRuntime(BT_RUNTIME(btContext)), removedPeerSessionDownloadLength(0), - removedPeerSessionUploadLength(0) + removedPeerSessionUploadLength(0), + _seederStateChoke(new BtSeederStateChoke(btContext)), + _leecherStateChoke(new BtLeecherStateChoke()) { logger = LogFactory::getInstance(); } -DefaultPeerStorage::~DefaultPeerStorage() {} +DefaultPeerStorage::~DefaultPeerStorage() +{ + delete _seederStateChoke; + delete _leecherStateChoke; +} class FindIdenticalPeer { private: @@ -246,4 +255,23 @@ void DefaultPeerStorage::returnPeer(const PeerHandle& peer) } } +bool DefaultPeerStorage::chokeRoundIntervalElapsed() +{ + const time_t CHOKE_ROUND_INTERVAL = 10; + if(PIECE_STORAGE(btContext)->downloadFinished()) { + return _seederStateChoke->getLastRound().elapsed(CHOKE_ROUND_INTERVAL); + } else { + return _leecherStateChoke->getLastRound().elapsed(CHOKE_ROUND_INTERVAL); + } +} + +void DefaultPeerStorage::executeChoke() +{ + if(PIECE_STORAGE(btContext)->downloadFinished()) { + return _seederStateChoke->executeChoke(getActivePeers()); + } else { + return _leecherStateChoke->executeChoke(getActivePeers()); + } +} + } // namespace aria2 diff --git a/src/DefaultPeerStorage.h b/src/DefaultPeerStorage.h index b81f825a..de9dc9f1 100644 --- a/src/DefaultPeerStorage.h +++ b/src/DefaultPeerStorage.h @@ -46,6 +46,8 @@ class BtContext; class Option; class Logger; class BtRuntime; +class BtSeederStateChoke; +class BtLeecherStateChoke; class DefaultPeerStorage : public PeerStorage { private: @@ -58,6 +60,9 @@ private: uint64_t removedPeerSessionDownloadLength; uint64_t removedPeerSessionUploadLength; + BtSeederStateChoke* _seederStateChoke; + BtLeecherStateChoke* _leecherStateChoke; + bool isPeerAlreadyAdded(const SharedHandle& peer); public: DefaultPeerStorage(const SharedHandle& btContext, @@ -91,6 +96,10 @@ public: virtual void returnPeer(const SharedHandle& peer); + virtual bool chokeRoundIntervalElapsed(); + + virtual void executeChoke(); + void setMaxPeerListSize(size_t size) { this->maxPeerListSize = size; } size_t getMaxPeerListSize() const { return maxPeerListSize; } diff --git a/src/Makefile.am b/src/Makefile.am index e66f9c35..3d549779 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -367,7 +367,9 @@ SRCS += MetaEntry.h\ LibsslARC4Decryptor.h\ LibsslARC4Encryptor.h\ LibsslDHKeyExchange.h\ - BtConstants.h + BtConstants.h\ + BtLeecherStateChoke.cc BtLeecherStateChoke.h\ + BtSeederStateChoke.cc BtSeederStateChoke.h endif # ENABLE_BITTORRENT if ENABLE_METALINK diff --git a/src/Makefile.in b/src/Makefile.in index bfeaa7e4..2e6c9861 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -221,7 +221,9 @@ bin_PROGRAMS = aria2c$(EXEEXT) @ENABLE_BITTORRENT_TRUE@ LibsslARC4Decryptor.h\ @ENABLE_BITTORRENT_TRUE@ LibsslARC4Encryptor.h\ @ENABLE_BITTORRENT_TRUE@ LibsslDHKeyExchange.h\ -@ENABLE_BITTORRENT_TRUE@ BtConstants.h +@ENABLE_BITTORRENT_TRUE@ BtConstants.h\ +@ENABLE_BITTORRENT_TRUE@ BtLeecherStateChoke.cc BtLeecherStateChoke.h\ +@ENABLE_BITTORRENT_TRUE@ BtSeederStateChoke.cc BtSeederStateChoke.h @ENABLE_METALINK_TRUE@am__append_3 = Metalinker.cc Metalinker.h\ @ENABLE_METALINK_TRUE@ MetalinkEntry.cc MetalinkEntry.h\ @@ -522,7 +524,9 @@ am__libaria2c_a_SOURCES_DIST = Socket.h SocketCore.cc SocketCore.h \ LibgcryptARC4Decryptor.h LibgcryptARC4Encryptor.h \ LibgcryptDHKeyExchange.h LibsslARC4Context.h \ LibsslARC4Decryptor.h LibsslARC4Encryptor.h \ - LibsslDHKeyExchange.h BtConstants.h Metalinker.cc Metalinker.h \ + LibsslDHKeyExchange.h BtConstants.h BtLeecherStateChoke.cc \ + BtLeecherStateChoke.h BtSeederStateChoke.cc \ + BtSeederStateChoke.h Metalinker.cc Metalinker.h \ MetalinkEntry.cc MetalinkEntry.h MetalinkResource.cc \ MetalinkResource.h MetalinkProcessor.h \ MetalinkProcessorFactory.cc MetalinkProcessorFactory.h \ @@ -667,7 +671,9 @@ am__libaria2c_a_SOURCES_DIST = Socket.h SocketCore.cc SocketCore.h \ @ENABLE_BITTORRENT_TRUE@ DHTRegistry.$(OBJEXT) \ @ENABLE_BITTORRENT_TRUE@ InitiatorMSEHandshakeCommand.$(OBJEXT) \ @ENABLE_BITTORRENT_TRUE@ ReceiverMSEHandshakeCommand.$(OBJEXT) \ -@ENABLE_BITTORRENT_TRUE@ MSEHandshake.$(OBJEXT) +@ENABLE_BITTORRENT_TRUE@ MSEHandshake.$(OBJEXT) \ +@ENABLE_BITTORRENT_TRUE@ BtLeecherStateChoke.$(OBJEXT) \ +@ENABLE_BITTORRENT_TRUE@ BtSeederStateChoke.$(OBJEXT) @ENABLE_METALINK_TRUE@am__objects_3 = Metalinker.$(OBJEXT) \ @ENABLE_METALINK_TRUE@ MetalinkEntry.$(OBJEXT) \ @ENABLE_METALINK_TRUE@ MetalinkResource.$(OBJEXT) \ @@ -1234,6 +1240,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/BtHaveNoneMessage.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/BtInterestedMessage.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/BtKeepAliveMessage.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/BtLeecherStateChoke.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/BtNotInterestedMessage.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/BtPieceMessage.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/BtPortMessage.Po@am__quote@ @@ -1241,6 +1248,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/BtRegistry.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/BtRejectMessage.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/BtRequestMessage.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/BtSeederStateChoke.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/BtSetup.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/BtSuggestPieceMessage.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/BtUnchokeMessage.Po@am__quote@ diff --git a/src/Peer.cc b/src/Peer.cc index c89a26b7..e702c67c 100644 --- a/src/Peer.cc +++ b/src/Peer.cc @@ -442,4 +442,16 @@ const Time& Peer::getBadConditionStartTime() const return _badConditionStartTime; } +const Time& Peer::getLastDownloadUpdate() const +{ + assert(_res); + return _res->getLastDownloadUpdate(); +} + +const Time& Peer::getLastAmUnchoking() const +{ + assert(_res); + return _res->getLastAmUnchoking(); +} + } // namespace aria2 diff --git a/src/Peer.h b/src/Peer.h index c228470f..89d61849 100644 --- a/src/Peer.h +++ b/src/Peer.h @@ -231,6 +231,10 @@ public: std::string getExtensionName(uint8_t id) const; void setExtension(const std::string& name, uint8_t id); + + const Time& getLastDownloadUpdate() const; + + const Time& getLastAmUnchoking() const; }; typedef SharedHandle PeerHandle; diff --git a/src/PeerChokeCommand.cc b/src/PeerChokeCommand.cc index a175e79e..d9205805 100644 --- a/src/PeerChokeCommand.cc +++ b/src/PeerChokeCommand.cc @@ -33,141 +33,29 @@ */ /* copyright --> */ #include "PeerChokeCommand.h" -#include "Util.h" -#include "Peer.h" #include "DownloadEngine.h" #include "BtContext.h" #include "BtRuntime.h" -#include "PieceStorage.h" #include "PeerStorage.h" -#include "Logger.h" -#include namespace aria2 { PeerChokeCommand::PeerChokeCommand(int32_t cuid, - RequestGroup* requestGroup, DownloadEngine* e, - const BtContextHandle& btContext, - time_t interval): + const BtContextHandle& btContext): Command(cuid), BtContextAwareCommand(btContext), - RequestGroupAware(requestGroup), - interval(interval), - e(e), - rotate(0) + e(e) {} PeerChokeCommand::~PeerChokeCommand() {} -class ChokePeer { -public: - ChokePeer() {} - void operator()(PeerHandle& peer) { - peer->chokingRequired(true); - } -}; - -void PeerChokeCommand::optUnchokingPeer(Peers& peers) const { - if(peers.empty()) { - return; - } - std::random_shuffle(peers.begin(), peers.end()); - unsigned int optUnchokCount = 1; - for(Peers::iterator itr = peers.begin(); itr != peers.end(); itr++) { - Peers::value_type peer = *itr; - if(optUnchokCount > 0 && !peer->snubbing()) { - optUnchokCount--; - peer->optUnchoking(true); - logger->debug("opt, unchoking %s, download speed=%d", - peer->ipaddr.c_str(), peer->calculateDownloadSpeed()); - } else { - peer->optUnchoking(false); - } - } -} - -class UploadFaster { -public: - bool operator() (const PeerHandle& left, const PeerHandle& right) const { - return left->calculateUploadSpeed() > right->calculateUploadSpeed(); - } -}; - -void PeerChokeCommand::orderByUploadRate(Peers& peers) const { - std::sort(peers.begin(), peers.end(), UploadFaster()); -} - -class DownloadFaster { -public: - bool operator() (const PeerHandle& left, const PeerHandle& right) const { - return left->calculateDownloadSpeed() > right->calculateDownloadSpeed(); - } -}; - -void PeerChokeCommand::orderByDownloadRate(Peers& peers) const { - std::sort(peers.begin(), peers.end(), DownloadFaster()); -} - bool PeerChokeCommand::execute() { if(btRuntime->isHalt()) { return true; } - if(checkPoint.elapsed(interval)) { - checkPoint.reset(); - Peers peers = peerStorage->getActivePeers(); - std::for_each(peers.begin(), peers.end(), ChokePeer()); - if(pieceStorage->downloadFinished()) { - orderByUploadRate(peers); - } else { - orderByDownloadRate(peers); - } - unsigned int unchokingCount = 4;//peers.size() >= 4 ? 4 : peers.size(); - for(Peers::iterator itr = peers.begin(); itr != peers.end() && unchokingCount > 0; ) { - PeerHandle peer = *itr; - if(peer->peerInterested() && !peer->snubbing()) { - unchokingCount--; - peer->chokingRequired(false); - peer->optUnchoking(false); - itr = peers.erase(itr); - if(pieceStorage->downloadFinished()) { - logger->debug("cat01, unchoking %s, upload speed=%d", - peer->ipaddr.c_str(), - peer->calculateUploadSpeed()); - } else { - logger->debug("cat01, unchoking %s, download speed=%d", - peer->ipaddr.c_str(), - peer->calculateDownloadSpeed()); - } - } else { - itr++; - } - } - for(Peers::iterator itr = peers.begin(); itr != peers.end(); ) { - PeerHandle peer = *itr; - if(!peer->peerInterested() && !peer->snubbing()) { - peer->chokingRequired(false); - peer->optUnchoking(false); - itr = peers.erase(itr); - if(pieceStorage->downloadFinished()) { - logger->debug("cat02, unchoking %s, upload speed=%d", - peer->ipaddr.c_str(), - peer->calculateUploadSpeed()); - } else { - logger->debug("cat02, unchoking %s, download speed=%d", - peer->ipaddr.c_str(), - peer->calculateDownloadSpeed()); - } - break; - } else { - itr++; - } - } - if(rotate%3 == 0) { - optUnchokingPeer(peers); - rotate = 0; - } - rotate++; + if(peerStorage->chokeRoundIntervalElapsed()) { + peerStorage->executeChoke(); } e->commands.push_back(this); return false; diff --git a/src/PeerChokeCommand.h b/src/PeerChokeCommand.h index 467f88b8..23bb8764 100644 --- a/src/PeerChokeCommand.h +++ b/src/PeerChokeCommand.h @@ -37,34 +37,21 @@ #include "Command.h" #include "BtContextAwareCommand.h" -#include "RequestGroupAware.h" -#include "TimeA2.h" namespace aria2 { class DownloadEngine; -class Peer; class PeerChokeCommand : public Command, - public BtContextAwareCommand, - public RequestGroupAware + public BtContextAwareCommand { private: - time_t interval; DownloadEngine* e; - unsigned int rotate; - Time checkPoint; - - void orderByUploadRate(std::deque >& peers) const; - void orderByDownloadRate(std::deque >& peers) const; - void optUnchokingPeer(std::deque >& peers) const; public: PeerChokeCommand(int32_t cuid, - RequestGroup* requestGroup, DownloadEngine* e, - const SharedHandle& btContext, - time_t interval); + const SharedHandle& btContext); virtual ~PeerChokeCommand(); diff --git a/src/PeerSessionResource.cc b/src/PeerSessionResource.cc index 2501041e..a87bbe6b 100644 --- a/src/PeerSessionResource.cc +++ b/src/PeerSessionResource.cc @@ -53,7 +53,9 @@ PeerSessionResource::PeerSessionResource(size_t pieceLength, uint64_t totalLengt _dhtEnabled(false), _latency(DEFAULT_LATENCY), _uploadLength(0), - _downloadLength(0) + _downloadLength(0), + _lastDownloadUpdate(0), + _lastAmUnchoking(0) {} PeerSessionResource::~PeerSessionResource() @@ -69,6 +71,9 @@ bool PeerSessionResource::amChoking() const void PeerSessionResource::amChoking(bool b) { _amChoking = b; + if(!b) { + _lastAmUnchoking.reset(); + } } bool PeerSessionResource::amInterested() const @@ -137,6 +142,10 @@ bool PeerSessionResource::snubbing() const void PeerSessionResource::snubbing(bool b) { _snubbing = b; + if(_snubbing) { + chokingRequired(true); + optUnchoking(false); + } } bool PeerSessionResource::hasAllPieces() const @@ -311,6 +320,18 @@ void PeerSessionResource::updateDownloadLength(size_t bytes) { _peerStat.updateDownloadLength(bytes); _downloadLength += bytes; + + _lastDownloadUpdate.reset(); +} + +const Time& PeerSessionResource::getLastDownloadUpdate() const +{ + return _lastDownloadUpdate; +} + +const Time& PeerSessionResource::getLastAmUnchoking() const +{ + return _lastAmUnchoking; } } // namespace aria2 diff --git a/src/PeerSessionResource.h b/src/PeerSessionResource.h index 465aba80..c4897bcd 100644 --- a/src/PeerSessionResource.h +++ b/src/PeerSessionResource.h @@ -38,6 +38,7 @@ #include "common.h" #include "BtConstants.h" #include "PeerStat.h" +#include "TimeA2.h" #include #include @@ -76,6 +77,10 @@ private: uint64_t _uploadLength; uint64_t _downloadLength; + Time _lastDownloadUpdate; + + Time _lastAmUnchoking; + template bool indexIncluded(const std::deque& c, T index) const; public: @@ -179,6 +184,10 @@ public: uint64_t downloadLength() const; void updateDownloadLength(size_t bytes); + + const Time& getLastDownloadUpdate() const; + + const Time& getLastAmUnchoking() const; }; } // namespace aria2 diff --git a/src/PeerStorage.h b/src/PeerStorage.h index e0ef37d3..132a107d 100644 --- a/src/PeerStorage.h +++ b/src/PeerStorage.h @@ -89,6 +89,10 @@ public: * Tells PeerStorage object that peer is no longer used in the session. */ virtual void returnPeer(const SharedHandle& peer) = 0; + + virtual bool chokeRoundIntervalElapsed() = 0; + + virtual void executeChoke() = 0; }; typedef SharedHandle PeerStorageHandle; diff --git a/test/MockBtMessageDispatcher.h b/test/MockBtMessageDispatcher.h index 5cd84689..81aeff5a 100644 --- a/test/MockBtMessageDispatcher.h +++ b/test/MockBtMessageDispatcher.h @@ -57,6 +57,11 @@ public: virtual void removeOutstandingRequest(const RequestSlot& slot) {} virtual void addOutstandingRequest(const RequestSlot& slot) {} + + virtual size_t countOutstandingUpload() + { + return 0; + } }; } // namespace aria2 diff --git a/test/MockPeerStorage.h b/test/MockPeerStorage.h index 7bffbacd..27da9166 100644 --- a/test/MockPeerStorage.h +++ b/test/MockPeerStorage.h @@ -56,6 +56,13 @@ public: virtual void returnPeer(const SharedHandle& peer) { } + + virtual bool chokeRoundIntervalElapsed() + { + return false; + } + + virtual void executeChoke() {} }; #endif // _D_MOCK_PEER_STORAGE_H_