From ba69f5c0c318132bb95c561f89488c754341ee96 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Sat, 26 Jan 2013 17:50:06 +0900 Subject: [PATCH] Refactor peer list management in DefaultPeerStorage Peer list is now divided into 2: unusedPeers_ and usedPeers_. Duplicate check is done using std::set by comparing pair of IP address and port. For this, only IP address and port given to the Peer constructor are used. In other words, TCP port received from extended message is not used for this purpose. --- src/ActivePeerConnectionCommand.cc | 37 ++--- src/ActivePeerConnectionCommand.h | 2 +- src/DHTGetPeersCommand.cc | 4 +- src/DefaultPeerStorage.cc | 166 +++++++++------------- src/DefaultPeerStorage.h | 27 +++- src/InitiatorMSEHandshakeCommand.cc | 20 +-- src/Peer.cc | 16 +-- src/Peer.h | 29 ++-- src/PeerInitiateConnectionCommand.cc | 20 +-- src/PeerInteractionCommand.cc | 20 +-- src/PeerReceiveHandshakeCommand.cc | 6 +- src/PeerStorage.h | 23 ++- src/TrackerWatcherCommand.cc | 14 +- test/DHTMessageFactoryImplTest.cc | 14 +- test/DefaultBtAnnounceTest.cc | 6 +- test/DefaultPeerStorageTest.cc | 201 ++++++++++----------------- test/MockPeerStorage.h | 32 +++-- test/PeerTest.cc | 20 --- test/UTPexExtensionMessageTest.cc | 10 +- 19 files changed, 292 insertions(+), 375 deletions(-) diff --git a/src/ActivePeerConnectionCommand.cc b/src/ActivePeerConnectionCommand.cc index 47183e4a..4c0dbf65 100644 --- a/src/ActivePeerConnectionCommand.cc +++ b/src/ActivePeerConnectionCommand.cc @@ -115,13 +115,8 @@ bool ActivePeerConnectionCommand::execute() { numConnection = numNewConnection_; } - for(; numConnection > 0; --numConnection) { - SharedHandle peer = peerStorage_->getUnusedPeer(); - if(!peer) { - break; - } - connectToPeer(peer); - } + makeNewConnections(numConnection); + if(btRuntime_->getConnections() == 0 && !pieceStorage_->downloadFinished()) { btAnnounce_->overrideMinInterval(BtAnnounce::DEFAULT_ANNOUNCE_INTERVAL); @@ -132,18 +127,24 @@ bool ActivePeerConnectionCommand::execute() { return false; } -void ActivePeerConnectionCommand::connectToPeer(const SharedHandle& peer) +void ActivePeerConnectionCommand::makeNewConnections(int num) { - peer->usedBy(e_->newCUID()); - PeerInitiateConnectionCommand* command = - new PeerInitiateConnectionCommand(peer->usedBy(), requestGroup_, peer, e_, - btRuntime_); - command->setPeerStorage(peerStorage_); - command->setPieceStorage(pieceStorage_); - e_->addCommand(command); - A2_LOG_INFO(fmt(MSG_CONNECTING_TO_PEER, - getCuid(), - peer->getIPAddress().c_str())); + for(; num && peerStorage_->isPeerAvailable(); --num) { + cuid_t ncuid = e_->newCUID(); + SharedHandle peer = peerStorage_->checkoutPeer(ncuid); + // sanity check + if(!peer) { + break; + } + PeerInitiateConnectionCommand* command; + command = new PeerInitiateConnectionCommand(ncuid, requestGroup_, peer, e_, + btRuntime_); + command->setPeerStorage(peerStorage_); + command->setPieceStorage(pieceStorage_); + e_->addCommand(command); + A2_LOG_INFO(fmt(MSG_CONNECTING_TO_PEER, getCuid(), + peer->getIPAddress().c_str())); + } } void ActivePeerConnectionCommand::setBtRuntime diff --git a/src/ActivePeerConnectionCommand.h b/src/ActivePeerConnectionCommand.h index 929fd16e..872eec58 100644 --- a/src/ActivePeerConnectionCommand.h +++ b/src/ActivePeerConnectionCommand.h @@ -71,7 +71,7 @@ public: virtual bool execute(); - void connectToPeer(const SharedHandle& peer); + void makeNewConnections(int num); void setNumNewConnection(int numNewConnection) { diff --git a/src/DHTGetPeersCommand.cc b/src/DHTGetPeersCommand.cc index 01db7c79..c973437c 100644 --- a/src/DHTGetPeersCommand.cc +++ b/src/DHTGetPeersCommand.cc @@ -112,11 +112,11 @@ bool DHTGetPeersCommand::execute() lastGetPeerTime_ = global::wallclock(); if(numRetry_ < MAX_RETRIES && (btRuntime_->getMaxPeers() == 0 || - btRuntime_->getMaxPeers() > peerStorage_->countPeer())) { + btRuntime_->getMaxPeers() > peerStorage_->countAllPeer())) { ++numRetry_; A2_LOG_DEBUG(fmt("Too few peers. peers=%lu, max_peers=%d." " Try again(%d)", - static_cast(peerStorage_->countPeer()), + static_cast(peerStorage_->countAllPeer()), btRuntime_->getMaxPeers(), numRetry_)); } else { diff --git a/src/DefaultPeerStorage.cc b/src/DefaultPeerStorage.cc index 519368df..555969a3 100644 --- a/src/DefaultPeerStorage.cc +++ b/src/DefaultPeerStorage.cc @@ -53,7 +53,7 @@ namespace aria2 { namespace { -const size_t MAX_PEER_LIST_SIZE = 1024; +const size_t MAX_PEER_LIST_SIZE = 128; const size_t MAX_PEER_LIST_UPDATE = 100; } // namespace @@ -69,30 +69,27 @@ DefaultPeerStorage::~DefaultPeerStorage() { delete seederStateChoke_; delete leecherStateChoke_; + assert(uniqPeers_.size() == unusedPeers_.size() + usedPeers_.size()); } -namespace { -class FindIdenticalPeer { -private: - SharedHandle peer_; -public: - FindIdenticalPeer(const SharedHandle& peer):peer_(peer) {} - - bool operator()(const SharedHandle& peer) const { - return (*peer_ == *peer) || - ((peer_->getIPAddress() == peer->getIPAddress()) && - (peer_->getPort() == peer->getPort())); - } -}; -} // namespace +size_t DefaultPeerStorage::countAllPeer() const +{ + return unusedPeers_.size() + usedPeers_.size(); +} bool DefaultPeerStorage::isPeerAlreadyAdded(const SharedHandle& peer) { - return std::find_if(peers_.begin(), peers_.end(), - FindIdenticalPeer(peer)) != peers_.end(); + return uniqPeers_.count(std::make_pair(peer->getIPAddress(), + peer->getOrigPort())); } -bool DefaultPeerStorage::addPeer(const SharedHandle& peer) { +void DefaultPeerStorage::addUniqPeer(const SharedHandle& peer) +{ + uniqPeers_.insert(std::make_pair(peer->getIPAddress(), peer->getOrigPort())); +} + +bool DefaultPeerStorage::addPeer(const SharedHandle& peer) +{ if(isPeerAlreadyAdded(peer)) { A2_LOG_DEBUG(fmt("Adding %s:%u is rejected because it has been already" " added.", @@ -104,13 +101,14 @@ bool DefaultPeerStorage::addPeer(const SharedHandle& peer) { peer->getIPAddress().c_str(), peer->getPort())); return false; } - const size_t peerListSize = peers_.size(); + const size_t peerListSize = unusedPeers_.size(); if(peerListSize >= maxPeerListSize_) { deleteUnusedPeer(peerListSize-maxPeerListSize_+1); } - peers_.push_front(peer); - A2_LOG_DEBUG(fmt("Now peer list contains %lu peers", - static_cast(peers_.size()))); + unusedPeers_.push_front(peer); + addUniqPeer(peer); + A2_LOG_DEBUG(fmt("Now unused peer list contains %lu peers", + static_cast(unusedPeers_.size()))); return true; } @@ -134,28 +132,35 @@ void DefaultPeerStorage::addPeer(const std::vector >& peers) A2_LOG_DEBUG(fmt(MSG_ADDING_PEER, peer->getIPAddress().c_str(), peer->getPort())); } - peers_.push_front(peer); + unusedPeers_.push_front(peer); + addUniqPeer(peer); ++added; } - const size_t peerListSize = peers_.size(); + const size_t peerListSize = unusedPeers_.size(); if(peerListSize > maxPeerListSize_) { deleteUnusedPeer(peerListSize-maxPeerListSize_); } - A2_LOG_DEBUG(fmt("Now peer list contains %lu peers", - static_cast(peers_.size()))); + A2_LOG_DEBUG(fmt("Now unused peer list contains %lu peers", + static_cast(unusedPeers_.size()))); } void DefaultPeerStorage::addDroppedPeer(const SharedHandle& peer) { + // TODO Make unique droppedPeers_.push_front(peer); if(droppedPeers_.size() > 50) { droppedPeers_.pop_back(); } } -const std::deque >& DefaultPeerStorage::getPeers() +const std::deque >& DefaultPeerStorage::getUnusedPeers() { - return peers_; + return unusedPeers_; +} + +const PeerSet& DefaultPeerStorage::getUsedPeers() +{ + return usedPeers_; } const std::deque >& DefaultPeerStorage::getDroppedPeers() @@ -163,57 +168,8 @@ const std::deque >& DefaultPeerStorage::getDroppedPeers() return droppedPeers_; } -namespace { -class FindFinePeer { -public: - bool operator()(const SharedHandle& peer) const { - return peer->unused() && peer->isGood(); - } -}; -} // namespace - -SharedHandle DefaultPeerStorage::getUnusedPeer() { - std::deque >::const_iterator itr = - std::find_if(peers_.begin(), peers_.end(), FindFinePeer()); - if(itr == peers_.end()) { - return SharedHandle(); - } else { - return *itr; - } -} - -namespace { -class FindPeer { -private: - std::string ipaddr; - uint16_t port; -public: - FindPeer(const std::string& ipaddr, uint16_t port): - ipaddr(ipaddr), port(port) {} - - bool operator()(const SharedHandle& peer) const { - return ipaddr == peer->getIPAddress() && port == peer->getPort(); - } -}; -} // namespace - -SharedHandle DefaultPeerStorage::getPeer(const std::string& ipaddr, - uint16_t port) const { - std::deque >::const_iterator itr = - std::find_if(peers_.begin(), peers_.end(), FindPeer(ipaddr, port)); - if(itr == peers_.end()) { - return SharedHandle(); - } else { - return *itr; - } -} - -size_t DefaultPeerStorage::countPeer() const { - return peers_.size(); -} - bool DefaultPeerStorage::isPeerAvailable() { - return getUnusedPeer(); + return !unusedPeers_.empty(); } namespace { @@ -236,7 +192,8 @@ public: void DefaultPeerStorage::getActivePeers (std::vector >& activePeers) { - std::for_each(peers_.begin(), peers_.end(), CollectActivePeer(activePeers)); + std::for_each(usedPeers_.begin(), usedPeers_.end(), + CollectActivePeer(activePeers)); } bool DefaultPeerStorage::isBadPeer(const std::string& ipaddr) @@ -274,21 +231,32 @@ void DefaultPeerStorage::addBadPeer(const std::string& ipaddr) } void DefaultPeerStorage::deleteUnusedPeer(size_t delSize) { - std::deque > temp; - for(std::deque >::const_reverse_iterator itr = - peers_.rbegin(), eoi = peers_.rend(); itr != eoi; ++itr) { - const SharedHandle& p = *itr; - if(p->unused() && delSize > 0) { - onErasingPeer(p); - --delSize; - } else { - temp.push_front(p); - } + for(; delSize > 0 && !unusedPeers_.empty(); --delSize) { + onErasingPeer(unusedPeers_.back()); + unusedPeers_.pop_back(); } - peers_.swap(temp); } -void DefaultPeerStorage::onErasingPeer(const SharedHandle& peer) {} +SharedHandle DefaultPeerStorage::checkoutPeer(cuid_t cuid) +{ + if(!isPeerAvailable()) { + return SharedHandle(); + } + SharedHandle peer = unusedPeers_.front(); + unusedPeers_.pop_front(); + peer->usedBy(cuid); + usedPeers_.insert(peer); + A2_LOG_DEBUG(fmt("Checkout peer %s:%u to CUID#%"PRId64, + peer->getIPAddress().c_str(), peer->getPort(), + peer->usedBy())); + return peer; +} + +void DefaultPeerStorage::onErasingPeer(const SharedHandle& peer) +{ + uniqPeers_.erase(std::make_pair(peer->getIPAddress(), + peer->getOrigPort())); +} void DefaultPeerStorage::onReturningPeer(const SharedHandle& peer) { @@ -303,20 +271,20 @@ void DefaultPeerStorage::onReturningPeer(const SharedHandle& peer) executeChoke(); } } + peer->usedBy(0); } void DefaultPeerStorage::returnPeer(const SharedHandle& peer) { - std::deque >::iterator itr = - std::find_if(peers_.begin(), peers_.end(), derefEqual(peer)); - if(itr == peers_.end()) { - A2_LOG_DEBUG(fmt("Cannot find peer %s:%u in PeerStorage.", - peer->getIPAddress().c_str(), peer->getPort())); - } else { - peers_.erase(itr); - + A2_LOG_DEBUG(fmt("Peer %s:%u returned from CUID#%"PRId64, + peer->getIPAddress().c_str(), peer->getPort(), + peer->usedBy())); + if(usedPeers_.erase(peer)) { onReturningPeer(peer); onErasingPeer(peer); + } else { + A2_LOG_DEBUG(fmt("Cannot find peer %s:%u in usedPeers_", + peer->getIPAddress().c_str(), peer->getPort())); } } diff --git a/src/DefaultPeerStorage.h b/src/DefaultPeerStorage.h index 3b3dac20..a172dd06 100644 --- a/src/DefaultPeerStorage.h +++ b/src/DefaultPeerStorage.h @@ -39,8 +39,10 @@ #include #include +#include #include "TimerA2.h" +#include "a2functional.h" namespace aria2 { @@ -49,12 +51,23 @@ class BtSeederStateChoke; class BtLeecherStateChoke; class PieceStorage; +typedef std::set, RefLess > PeerSet; + class DefaultPeerStorage : public PeerStorage { private: SharedHandle btRuntime_; SharedHandle pieceStorage_; size_t maxPeerListSize_; - std::deque > peers_; + + // This contains ip address and port pair and is used to ensure that + // no duplicate peers are stored. + std::set > uniqPeers_; + // Unused (not connected) peers, sorted by last added. + std::deque > unusedPeers_; + // The set of used peers. Some of them are not connected yet. To + // know it is connected or not, call Peer::isActive(). + PeerSet usedPeers_; + std::deque > droppedPeers_; BtSeederStateChoke* seederStateChoke_; @@ -66,6 +79,7 @@ private: Timer lastBadPeerCleaned_; bool isPeerAlreadyAdded(const SharedHandle& peer); + void addUniqPeer(const SharedHandle& peer); void addDroppedPeer(const SharedHandle& peer); public: @@ -73,17 +87,18 @@ public: virtual ~DefaultPeerStorage(); + // TODO We need addAndCheckoutPeer for incoming peers virtual bool addPeer(const SharedHandle& peer); - virtual size_t countPeer() const; - - virtual SharedHandle getUnusedPeer(); + virtual size_t countAllPeer() const; SharedHandle getPeer(const std::string& ipaddr, uint16_t port) const; virtual void addPeer(const std::vector >& peers); - virtual const std::deque >& getPeers(); + const std::deque >& getUnusedPeers(); + + const PeerSet& getUsedPeers(); virtual const std::deque >& getDroppedPeers(); @@ -95,6 +110,8 @@ public: virtual void addBadPeer(const std::string& ipaddr); + virtual SharedHandle checkoutPeer(cuid_t cuid); + virtual void returnPeer(const SharedHandle& peer); virtual bool chokeRoundIntervalElapsed(); diff --git a/src/InitiatorMSEHandshakeCommand.cc b/src/InitiatorMSEHandshakeCommand.cc index bc516473..65b6c437 100644 --- a/src/InitiatorMSEHandshakeCommand.cc +++ b/src/InitiatorMSEHandshakeCommand.cc @@ -198,14 +198,18 @@ bool InitiatorMSEHandshakeCommand::executeInternal() { void InitiatorMSEHandshakeCommand::tryNewPeer() { if(peerStorage_->isPeerAvailable() && btRuntime_->lessThanEqMinPeers()) { - SharedHandle peer = peerStorage_->getUnusedPeer(); - peer->usedBy(getDownloadEngine()->newCUID()); - PeerInitiateConnectionCommand* command = - new PeerInitiateConnectionCommand(peer->usedBy(), requestGroup_, peer, - getDownloadEngine(), btRuntime_); - command->setPeerStorage(peerStorage_); - command->setPieceStorage(pieceStorage_); - getDownloadEngine()->addCommand(command); + cuid_t ncuid = getDownloadEngine()->newCUID(); + SharedHandle peer = peerStorage_->checkoutPeer(ncuid); + // sanity check + if(peer) { + PeerInitiateConnectionCommand* command; + command = new PeerInitiateConnectionCommand(ncuid, requestGroup_, peer, + getDownloadEngine(), + btRuntime_); + command->setPeerStorage(peerStorage_); + command->setPieceStorage(pieceStorage_); + getDownloadEngine()->addCommand(command); + } } } diff --git a/src/Peer.cc b/src/Peer.cc index b4fc13ac..1ca02d91 100644 --- a/src/Peer.cc +++ b/src/Peer.cc @@ -45,12 +45,11 @@ namespace aria2 { -#define BAD_CONDITION_INTERVAL 10 - Peer::Peer(std::string ipaddr, uint16_t port, bool incoming): ipaddr_(ipaddr), port_(port), - id_(fmt("%s(%u)", ipaddr_.c_str(), port_)), + origPort_(port), + cuid_(0), firstContactTime_(global::wallclock()), badConditionStartTime_(0), seeder_(false), @@ -60,7 +59,6 @@ Peer::Peer(std::string ipaddr, uint16_t port, bool incoming): disconnectedGracefully_(false) { memset(peerId_, 0, PEER_ID_LENGTH); - resetStatus(); } Peer::~Peer() @@ -98,10 +96,6 @@ void Peer::setPeerId(const unsigned char* peerId) memcpy(peerId_, peerId, PEER_ID_LENGTH); } -void Peer::resetStatus() { - cuid_ = 0; -} - bool Peer::amChoking() const { assert(res_); @@ -328,12 +322,6 @@ void Peer::startBadCondition() badConditionStartTime_ = global::wallclock(); } -bool Peer::isGood() const -{ - return badConditionStartTime_. - difference(global::wallclock()) >= BAD_CONDITION_INTERVAL; -} - uint8_t Peer::getExtensionMessageID(int key) const { assert(res_); diff --git a/src/Peer.h b/src/Peer.h index f85c8f76..9d5e2b70 100644 --- a/src/Peer.h +++ b/src/Peer.h @@ -61,8 +61,9 @@ private: // true, then this port is not a port the peer is listening to and // we cannot connect to it. uint16_t port_; - - std::string id_; + // This is the port number passed in the constructor arguments. This + // is used to distinguish peer identity. + uint16_t origPort_; cuid_t cuid_; @@ -94,18 +95,6 @@ public: ~Peer(); - bool operator==(const Peer& p) - { - return id_ == p.id_; - } - - bool operator!=(const Peer& p) - { - return !(*this == p); - } - - void resetStatus(); - const std::string& getIPAddress() const { return ipaddr_; @@ -121,6 +110,11 @@ public: port_ = port; } + uint16_t getOrigPort() const + { + return origPort_; + } + void usedBy(cuid_t cuid); cuid_t usedBy() const @@ -151,15 +145,8 @@ public: return seeder_; } - const std::string& getID() const - { - return id_; - } - void startBadCondition(); - bool isGood() const; - void allocateSessionResource(int32_t pieceLength, int64_t totalLength); void reconfigureSessionResource(int32_t pieceLength, int64_t totalLength); diff --git a/src/PeerInitiateConnectionCommand.cc b/src/PeerInitiateConnectionCommand.cc index c2f23bb1..cc878c27 100644 --- a/src/PeerInitiateConnectionCommand.cc +++ b/src/PeerInitiateConnectionCommand.cc @@ -105,14 +105,18 @@ bool PeerInitiateConnectionCommand::executeInternal() { // TODO this method removed when PeerBalancerCommand is implemented bool PeerInitiateConnectionCommand::prepareForNextPeer(time_t wait) { if(peerStorage_->isPeerAvailable() && btRuntime_->lessThanEqMinPeers()) { - SharedHandle peer = peerStorage_->getUnusedPeer(); - peer->usedBy(getDownloadEngine()->newCUID()); - PeerInitiateConnectionCommand* command = - new PeerInitiateConnectionCommand(peer->usedBy(), requestGroup_, peer, - getDownloadEngine(), btRuntime_); - command->setPeerStorage(peerStorage_); - command->setPieceStorage(pieceStorage_); - getDownloadEngine()->addCommand(command); + cuid_t ncuid = getDownloadEngine()->newCUID(); + SharedHandle peer = peerStorage_->checkoutPeer(ncuid); + // sanity check + if(peer) { + PeerInitiateConnectionCommand* command; + command = new PeerInitiateConnectionCommand(ncuid, requestGroup_, peer, + getDownloadEngine(), + btRuntime_); + command->setPeerStorage(peerStorage_); + command->setPieceStorage(pieceStorage_); + getDownloadEngine()->addCommand(command); + } } return true; } diff --git a/src/PeerInteractionCommand.cc b/src/PeerInteractionCommand.cc index 5fcc8faf..f07e42b5 100644 --- a/src/PeerInteractionCommand.cc +++ b/src/PeerInteractionCommand.cc @@ -391,14 +391,18 @@ bool PeerInteractionCommand::executeInternal() { // TODO this method removed when PeerBalancerCommand is implemented bool PeerInteractionCommand::prepareForNextPeer(time_t wait) { if(peerStorage_->isPeerAvailable() && btRuntime_->lessThanEqMinPeers()) { - SharedHandle peer = peerStorage_->getUnusedPeer(); - peer->usedBy(getDownloadEngine()->newCUID()); - PeerInitiateConnectionCommand* command = - new PeerInitiateConnectionCommand - (peer->usedBy(), requestGroup_, peer, getDownloadEngine(), btRuntime_); - command->setPeerStorage(peerStorage_); - command->setPieceStorage(pieceStorage_); - getDownloadEngine()->addCommand(command); + cuid_t ncuid = getDownloadEngine()->newCUID(); + SharedHandle peer = peerStorage_->checkoutPeer(ncuid); + // sanity check + if(peer) { + PeerInitiateConnectionCommand* command; + command = new PeerInitiateConnectionCommand(ncuid, requestGroup_, peer, + getDownloadEngine(), + btRuntime_); + command->setPeerStorage(peerStorage_); + command->setPieceStorage(pieceStorage_); + getDownloadEngine()->addCommand(command); + } } return true; } diff --git a/src/PeerReceiveHandshakeCommand.cc b/src/PeerReceiveHandshakeCommand.cc index 48d26b1f..b93e049d 100644 --- a/src/PeerReceiveHandshakeCommand.cc +++ b/src/PeerReceiveHandshakeCommand.cc @@ -138,8 +138,10 @@ bool PeerReceiveHandshakeCommand::executeInternal() if((!pieceStorage->downloadFinished() && stat.calculateDownloadSpeed() < thresholdSpeed) || btRuntime->lessThanMaxPeers()) { - if(peerStorage->addPeer(getPeer())) { - getPeer()->usedBy(getCuid()); + // TODO addPeer and checkoutPeer must be "atomic", in a sense + // that the added peer must be checked out. + if(peerStorage->addPeer(getPeer()) && + peerStorage->checkoutPeer(getCuid())) { PeerInteractionCommand* command = new PeerInteractionCommand (getCuid(), diff --git a/src/PeerStorage.h b/src/PeerStorage.h index ab4cb2e2..fbbe2746 100644 --- a/src/PeerStorage.h +++ b/src/PeerStorage.h @@ -42,6 +42,7 @@ #include "SharedHandle.h" #include "TransferStat.h" +#include "Command.h" namespace aria2 { @@ -63,26 +64,15 @@ public: virtual void addPeer(const std::vector >& peers) = 0; /** - * Returns internal peer list. + * Returns the number of peers, including used and unused ones. */ - virtual const std::deque >& getPeers() = 0; - - - /** - * Returns the number of peers. - */ - virtual size_t countPeer() const = 0; + virtual size_t countAllPeer() const = 0; /** * Returns internal dropped peer list. */ virtual const std::deque >& getDroppedPeers() = 0; - /** - * Returns one of the unused peers. - */ - virtual SharedHandle getUnusedPeer() = 0; - /** * Returns true if at least one unused peer exists. * Otherwise returns false. @@ -105,6 +95,13 @@ public: */ virtual void addBadPeer(const std::string& ipaddr) = 0; + /** + * Moves first peer in unused peer list to used peer set and calls + * Peer::usedBy(cuid). If there is no peer available, returns + * SharedHandle(). + */ + virtual SharedHandle checkoutPeer(cuid_t cuid) = 0; + /** * Tells PeerStorage object that peer is no longer used in the session. */ diff --git a/src/TrackerWatcherCommand.cc b/src/TrackerWatcherCommand.cc index fcd3b6f2..0eba057f 100644 --- a/src/TrackerWatcherCommand.cc +++ b/src/TrackerWatcherCommand.cc @@ -170,14 +170,18 @@ void TrackerWatcherCommand::processTrackerResponse (reinterpret_cast(trackerResponse.c_str()), trackerResponse.size()); while(!btRuntime_->isHalt() && btRuntime_->lessThanMinPeers()) { - SharedHandle peer = peerStorage_->getUnusedPeer(); + if(!peerStorage_->isPeerAvailable()) { + break; + } + cuid_t ncuid = e_->newCUID(); + SharedHandle peer = peerStorage_->checkoutPeer(ncuid); + // sanity check if(!peer) { break; } - peer->usedBy(e_->newCUID()); - PeerInitiateConnectionCommand* command = - new PeerInitiateConnectionCommand - (peer->usedBy(), requestGroup_, peer, e_, btRuntime_); + PeerInitiateConnectionCommand* command; + command = new PeerInitiateConnectionCommand(ncuid, requestGroup_, peer, e_, + btRuntime_); command->setPeerStorage(peerStorage_); command->setPieceStorage(pieceStorage_); e_->addCommand(command); diff --git a/test/DHTMessageFactoryImplTest.cc b/test/DHTMessageFactoryImplTest.cc index 7abec745..67590297 100644 --- a/test/DHTMessageFactoryImplTest.cc +++ b/test/DHTMessageFactoryImplTest.cc @@ -344,8 +344,11 @@ void DHTMessageFactoryImplTest::testCreateGetPeersReplyMessage() CPPUNIT_ASSERT(*nodes[0] == *m->getClosestKNodes()[0]); CPPUNIT_ASSERT(*nodes[7] == *m->getClosestKNodes()[7]); CPPUNIT_ASSERT_EQUAL((size_t)4, m->getValues().size()); - CPPUNIT_ASSERT(*peers[0] == *m->getValues()[0]); - CPPUNIT_ASSERT(*peers[3] == *m->getValues()[3]); + for(int i = 0; i < 4; ++i) { + CPPUNIT_ASSERT_EQUAL(peers[i]->getIPAddress(), + m->getValues()[i]->getIPAddress()); + CPPUNIT_ASSERT_EQUAL(peers[i]->getPort(), m->getValues()[i]->getPort()); + } CPPUNIT_ASSERT_EQUAL(util::toHex(transactionID, DHT_TRANSACTION_ID_LENGTH), util::toHex(m->getTransactionID())); } catch(Exception& e) { @@ -416,8 +419,11 @@ void DHTMessageFactoryImplTest::testCreateGetPeersReplyMessage6() CPPUNIT_ASSERT(*nodes[0] == *m->getClosestKNodes()[0]); CPPUNIT_ASSERT(*nodes[7] == *m->getClosestKNodes()[7]); CPPUNIT_ASSERT_EQUAL((size_t)4, m->getValues().size()); - CPPUNIT_ASSERT(*peers[0] == *m->getValues()[0]); - CPPUNIT_ASSERT(*peers[3] == *m->getValues()[3]); + for(int i = 0; i < 4; ++i) { + CPPUNIT_ASSERT_EQUAL(peers[i]->getIPAddress(), + m->getValues()[i]->getIPAddress()); + CPPUNIT_ASSERT_EQUAL(peers[i]->getPort(), m->getValues()[i]->getPort()); + } CPPUNIT_ASSERT_EQUAL(util::toHex(transactionID, DHT_TRANSACTION_ID_LENGTH), util::toHex(m->getTransactionID())); } catch(Exception& e) { diff --git a/test/DefaultBtAnnounceTest.cc b/test/DefaultBtAnnounceTest.cc index 6a5e49d3..edc4c52c 100644 --- a/test/DefaultBtAnnounceTest.cc +++ b/test/DefaultBtAnnounceTest.cc @@ -403,10 +403,10 @@ void DefaultBtAnnounceTest::testProcessAnnounceResponse() CPPUNIT_ASSERT_EQUAL((time_t)1800, an.getMinInterval()); CPPUNIT_ASSERT_EQUAL(100, an.getComplete()); CPPUNIT_ASSERT_EQUAL(200, an.getIncomplete()); - CPPUNIT_ASSERT_EQUAL((size_t)2, peerStorage_->getPeers().size()); - SharedHandle peer = peerStorage_->getPeers()[0]; + CPPUNIT_ASSERT_EQUAL((size_t)2, peerStorage_->getUnusedPeers().size()); + SharedHandle peer = peerStorage_->getUnusedPeers()[0]; CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.1"), peer->getIPAddress()); - peer = peerStorage_->getPeers()[1]; + peer = peerStorage_->getUnusedPeers()[1]; CPPUNIT_ASSERT_EQUAL(std::string("1002:1035:4527:3546:7854:1237:3247:3217"), peer->getIPAddress()); } diff --git a/test/DefaultPeerStorageTest.cc b/test/DefaultPeerStorageTest.cc index b3a425da..7aad7e9c 100644 --- a/test/DefaultPeerStorageTest.cc +++ b/test/DefaultPeerStorageTest.cc @@ -16,13 +16,12 @@ namespace aria2 { class DefaultPeerStorageTest:public CppUnit::TestFixture { CPPUNIT_TEST_SUITE(DefaultPeerStorageTest); - CPPUNIT_TEST(testCountPeer); + CPPUNIT_TEST(testCountAllPeer); CPPUNIT_TEST(testDeleteUnusedPeer); CPPUNIT_TEST(testAddPeer); - CPPUNIT_TEST(testGetUnusedPeer); CPPUNIT_TEST(testIsPeerAvailable); - CPPUNIT_TEST(testActivatePeer); - CPPUNIT_TEST(testCalculateStat); + CPPUNIT_TEST(testGetActivePeers); + CPPUNIT_TEST(testCheckoutPeer); CPPUNIT_TEST(testReturnPeer); CPPUNIT_TEST(testOnErasingPeer); CPPUNIT_TEST(testAddBadPeer); @@ -40,13 +39,12 @@ public: delete option; } - void testCountPeer(); + void testCountAllPeer(); void testDeleteUnusedPeer(); void testAddPeer(); - void testGetUnusedPeer(); void testIsPeerAvailable(); - void testActivatePeer(); - void testCalculateStat(); + void testGetActivePeers(); + void testCheckoutPeer(); void testReturnPeer(); void testOnErasingPeer(); void testAddBadPeer(); @@ -55,154 +53,91 @@ public: CPPUNIT_TEST_SUITE_REGISTRATION(DefaultPeerStorageTest); -void DefaultPeerStorageTest::testCountPeer() { +void DefaultPeerStorageTest::testCountAllPeer() +{ DefaultPeerStorage ps; - CPPUNIT_ASSERT_EQUAL((size_t)0, ps.countPeer()); - - SharedHandle peer(new Peer("192.168.0.1", 6889)); - - ps.addPeer(peer); - CPPUNIT_ASSERT_EQUAL((size_t)1, ps.countPeer()); + CPPUNIT_ASSERT_EQUAL((size_t)0, ps.countAllPeer()); + for(int i = 0; i < 2; ++i) { + SharedHandle peer(new Peer("192.168.0.1", 6889+i)); + ps.addPeer(peer); + } + CPPUNIT_ASSERT_EQUAL((size_t)2, ps.countAllPeer()); + SharedHandle peer = ps.checkoutPeer(1); + CPPUNIT_ASSERT(peer); + CPPUNIT_ASSERT_EQUAL((size_t)2, ps.countAllPeer()); + ps.returnPeer(peer); + CPPUNIT_ASSERT_EQUAL((size_t)1, ps.countAllPeer()); } -void DefaultPeerStorageTest::testDeleteUnusedPeer() { +void DefaultPeerStorageTest::testDeleteUnusedPeer() +{ DefaultPeerStorage ps; SharedHandle peer1(new Peer("192.168.0.1", 6889)); SharedHandle peer2(new Peer("192.168.0.2", 6889)); SharedHandle peer3(new Peer("192.168.0.3", 6889)); - ps.addPeer(peer1); - ps.addPeer(peer2); - ps.addPeer(peer3); + CPPUNIT_ASSERT(ps.addPeer(peer1)); + CPPUNIT_ASSERT(ps.addPeer(peer2)); + CPPUNIT_ASSERT(ps.addPeer(peer3)); ps.deleteUnusedPeer(2); - CPPUNIT_ASSERT_EQUAL((size_t)1, ps.countPeer()); + CPPUNIT_ASSERT_EQUAL((size_t)1, ps.getUnusedPeers().size()); CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.3"), - ps.getPeer("192.168.0.3", 6889)->getIPAddress()); - - ps.addPeer(peer1); - ps.addPeer(peer2); - - peer2->usedBy(1); - - ps.deleteUnusedPeer(3); - - // peer2 has been in use, so it did't deleted. - CPPUNIT_ASSERT_EQUAL((size_t)1, ps.countPeer()); - CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.2"), - ps.getPeer("192.168.0.2", 6889)->getIPAddress()); + ps.getUnusedPeers()[0]->getIPAddress()); + ps.deleteUnusedPeer(100); + CPPUNIT_ASSERT(ps.getUnusedPeers().empty()); } -void DefaultPeerStorageTest::testAddPeer() { +void DefaultPeerStorageTest::testAddPeer() +{ DefaultPeerStorage ps; SharedHandle btRuntime(new BtRuntime()); - ps.setMaxPeerListSize(3); + ps.setMaxPeerListSize(2); ps.setBtRuntime(btRuntime); SharedHandle peer1(new Peer("192.168.0.1", 6889)); SharedHandle peer2(new Peer("192.168.0.2", 6889)); SharedHandle peer3(new Peer("192.168.0.3", 6889)); - ps.addPeer(peer1); - ps.addPeer(peer2); - ps.addPeer(peer3); + CPPUNIT_ASSERT(ps.addPeer(peer1)); + CPPUNIT_ASSERT(ps.addPeer(peer2)); + CPPUNIT_ASSERT(ps.addPeer(peer3)); - CPPUNIT_ASSERT_EQUAL((size_t)3, ps.countPeer()); + CPPUNIT_ASSERT_EQUAL((size_t)2, ps.getUnusedPeers().size()); + CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.3"), + ps.getUnusedPeers()[0]->getIPAddress()); - // this returns false, because peer1 is already in the container - CPPUNIT_ASSERT_EQUAL(false, ps.addPeer(peer1)); - // the number of peers doesn't change. - CPPUNIT_ASSERT_EQUAL((size_t)3, ps.countPeer()); - - SharedHandle peer4(new Peer("192.168.0.4", 6889)); - - peer1->usedBy(1); - CPPUNIT_ASSERT(ps.addPeer(peer4)); - // peer2 was deleted. While peer1 is oldest, its cuid is not 0. - CPPUNIT_ASSERT_EQUAL((size_t)3, ps.countPeer()); - CPPUNIT_ASSERT(std::find_if(ps.getPeers().begin(), ps.getPeers().end(), - derefEqual(peer2)) == ps.getPeers().end()); - - SharedHandle peer5(new Peer("192.168.0.4", 0)); - - peer5->setPort(6889); - - // this returns false because the peer which has same ip and port - // has already added - CPPUNIT_ASSERT_EQUAL(false, ps.addPeer(peer5)); - - SharedHandle pa[] = { - SharedHandle(new Peer("192.168.0.4", 6889)), - SharedHandle(new Peer("192.168.0.5", 6889)), - SharedHandle(new Peer("192.168.0.6", 6889)), - SharedHandle(new Peer("192.168.0.7", 6889)), - SharedHandle(new Peer("192.168.0.8", 6889)) - }; - std::vector > peers(vbegin(pa), vend(pa)); - ps.addPeer(peers); - // peers[0] is not added because it has been already added. - // peers[1], peers[2] and peers[3] are going to be added. peers[4] - // is not added because DefaultPeerStorage::addPeer() limits the - // number of peers to add. Finally, unused peers are removed from - // back and size 3 vector is made. - CPPUNIT_ASSERT_EQUAL((size_t)3, ps.countPeer()); - CPPUNIT_ASSERT(std::find_if(ps.getPeers().begin(), ps.getPeers().end(), - derefEqual(peers[2])) != ps.getPeers().end()); - CPPUNIT_ASSERT(std::find_if(ps.getPeers().begin(), ps.getPeers().end(), - derefEqual(peers[3])) != ps.getPeers().end()); -} - -void DefaultPeerStorageTest::testGetUnusedPeer() { - DefaultPeerStorage ps; - ps.setBtRuntime(btRuntime); - - SharedHandle peer1(new Peer("192.168.0.1", 6889)); - - ps.addPeer(peer1); + CPPUNIT_ASSERT(!ps.addPeer(peer2)); + CPPUNIT_ASSERT(ps.addPeer(peer1)); + CPPUNIT_ASSERT_EQUAL((size_t)2, ps.getUnusedPeers().size()); CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.1"), - ps.getUnusedPeer()->getIPAddress()); + ps.getUnusedPeers()[0]->getIPAddress()); - peer1->usedBy(1); - - CPPUNIT_ASSERT(!ps.getUnusedPeer()); - - peer1->resetStatus(); - peer1->startBadCondition(); - - CPPUNIT_ASSERT(!ps.getUnusedPeer()); + CPPUNIT_ASSERT_EQUAL(peer1->getIPAddress(), + ps.checkoutPeer(1)->getIPAddress()); + CPPUNIT_ASSERT(!ps.addPeer(peer1)); } void DefaultPeerStorageTest::testIsPeerAvailable() { DefaultPeerStorage ps; ps.setBtRuntime(btRuntime); - - CPPUNIT_ASSERT_EQUAL(false, ps.isPeerAvailable()); - SharedHandle peer1(new Peer("192.168.0.1", 6889)); + CPPUNIT_ASSERT(!ps.isPeerAvailable()); ps.addPeer(peer1); - - CPPUNIT_ASSERT_EQUAL(true, ps.isPeerAvailable()); - - peer1->usedBy(1); - - CPPUNIT_ASSERT_EQUAL(false, ps.isPeerAvailable()); - - peer1->resetStatus(); - - peer1->startBadCondition(); - - CPPUNIT_ASSERT_EQUAL(false, ps.isPeerAvailable()); + CPPUNIT_ASSERT(ps.isPeerAvailable()); + CPPUNIT_ASSERT(ps.checkoutPeer(1)); + CPPUNIT_ASSERT(!ps.isPeerAvailable()); } -void DefaultPeerStorageTest::testActivatePeer() { +void DefaultPeerStorageTest::testGetActivePeers() +{ DefaultPeerStorage ps; - { std::vector > peers; ps.getActivePeers(peers); @@ -210,15 +145,13 @@ void DefaultPeerStorageTest::testActivatePeer() { } SharedHandle peer1(new Peer("192.168.0.1", 6889)); - ps.addPeer(peer1); - { std::vector > activePeers; ps.getActivePeers(activePeers); - CPPUNIT_ASSERT_EQUAL((size_t)0, activePeers.size()); } + CPPUNIT_ASSERT(ps.checkoutPeer(1)); { peer1->allocateSessionResource(1024*1024, 1024*1024*10); @@ -229,7 +162,23 @@ void DefaultPeerStorageTest::testActivatePeer() { } } -void DefaultPeerStorageTest::testCalculateStat() { +void DefaultPeerStorageTest::testCheckoutPeer() +{ + DefaultPeerStorage ps; + SharedHandle peers[] = { + SharedHandle(new Peer("192.168.0.1", 1000)), + SharedHandle(new Peer("192.168.0.2", 1000)), + SharedHandle(new Peer("192.168.0.3", 1000)) + }; + int len = A2_ARRAY_LEN(peers); + for(int i = 0; i < len; ++i) { + ps.addPeer(peers[i]); + } + for(int i = 0; i < len; ++i) { + SharedHandle peer = ps.checkoutPeer(i+1); + CPPUNIT_ASSERT_EQUAL(peers[len-i-1]->getIPAddress(), peer->getIPAddress()); + } + CPPUNIT_ASSERT(!ps.checkoutPeer(len+1)); } void DefaultPeerStorageTest::testReturnPeer() @@ -245,17 +194,19 @@ void DefaultPeerStorageTest::testReturnPeer() ps.addPeer(peer1); ps.addPeer(peer2); ps.addPeer(peer3); + for(int i = 1; i <= 3; ++i) { + CPPUNIT_ASSERT(ps.checkoutPeer(i)); + } + CPPUNIT_ASSERT_EQUAL((size_t)3, ps.getUsedPeers().size()); ps.returnPeer(peer2); // peer2 removed from the container - CPPUNIT_ASSERT_EQUAL((size_t)2, ps.getPeers().size()); - CPPUNIT_ASSERT(std::find_if(ps.getPeers().begin(), ps.getPeers().end(), - derefEqual(peer2)) == ps.getPeers().end()); + CPPUNIT_ASSERT_EQUAL((size_t)2, ps.getUsedPeers().size()); + CPPUNIT_ASSERT(ps.getUsedPeers().count(peer2) == 0); CPPUNIT_ASSERT_EQUAL((size_t)1, ps.getDroppedPeers().size()); ps.returnPeer(peer1); // peer1 is removed from the container - CPPUNIT_ASSERT_EQUAL((size_t)1, ps.getPeers().size()); - CPPUNIT_ASSERT(std::find_if(ps.getPeers().begin(), ps.getPeers().end(), - derefEqual(peer1)) == ps.getPeers().end()); + CPPUNIT_ASSERT_EQUAL((size_t)1, ps.getUsedPeers().size()); + CPPUNIT_ASSERT(ps.getUsedPeers().count(peer1) == 0); CPPUNIT_ASSERT_EQUAL((size_t)1, ps.getDroppedPeers().size()); } diff --git a/test/MockPeerStorage.h b/test/MockPeerStorage.h index 9060346e..682f5ffe 100644 --- a/test/MockPeerStorage.h +++ b/test/MockPeerStorage.h @@ -11,7 +11,8 @@ namespace aria2 { class MockPeerStorage : public PeerStorage { private: - std::deque > peers; + std::deque > unusedPeers; + std::deque > usedPeers; std::deque > droppedPeers; std::vector > activePeers; int numChokeExecuted_; @@ -19,22 +20,24 @@ public: MockPeerStorage():numChokeExecuted_(0) {} virtual ~MockPeerStorage() {} - virtual bool addPeer(const SharedHandle& peer) { - peers.push_back(peer); + virtual bool addPeer(const SharedHandle& peer) + { + unusedPeers.push_back(peer); return true; } virtual void addPeer(const std::vector >& peers) { - std::copy(peers.begin(), peers.end(), back_inserter(this->peers)); + unusedPeers.insert(unusedPeers.end(), peers.begin(), peers.end()); } - virtual const std::deque >& getPeers() { - return peers; - } - - virtual size_t countPeer() const + const std::deque >& getUnusedPeers() { - return peers.size(); + return unusedPeers; + } + + virtual size_t countAllPeer() const + { + return unusedPeers.size() + usedPeers.size(); } virtual const std::deque >& getDroppedPeers() { @@ -45,10 +48,6 @@ public: droppedPeers.push_back(peer); } - virtual SharedHandle getUnusedPeer() { - return SharedHandle(); - } - virtual bool isPeerAvailable() { return false; } @@ -71,6 +70,11 @@ public: { } + virtual SharedHandle checkoutPeer(cuid_t cuid) + { + return SharedHandle(); + } + virtual void returnPeer(const SharedHandle& peer) { } diff --git a/test/PeerTest.cc b/test/PeerTest.cc index 484f0427..c88078e2 100644 --- a/test/PeerTest.cc +++ b/test/PeerTest.cc @@ -8,8 +8,6 @@ class PeerTest:public CppUnit::TestFixture { CPPUNIT_TEST_SUITE(PeerTest); CPPUNIT_TEST(testPeerAllowedIndexSet); CPPUNIT_TEST(testAmAllowedIndexSet); - CPPUNIT_TEST(testGetId); - CPPUNIT_TEST(testOperatorEqual); CPPUNIT_TEST(testCountSeeder); CPPUNIT_TEST_SUITE_END(); private: @@ -22,8 +20,6 @@ public: void testPeerAllowedIndexSet(); void testAmAllowedIndexSet(); - void testGetId(); - void testOperatorEqual(); void testCountSeeder(); }; @@ -42,22 +38,6 @@ void PeerTest::testAmAllowedIndexSet() { CPPUNIT_ASSERT(peer->isInAmAllowedIndexSet(0)); } -void PeerTest::testGetId() { - CPPUNIT_ASSERT_EQUAL(std::string("localhost(6969)"), peer->getID()); -} - -void PeerTest::testOperatorEqual() -{ - CPPUNIT_ASSERT(Peer("localhost", 6881) == Peer("localhost", 6881)); - - { - Peer p1("localhost", 6881); - Peer p2("localhsot", 0); - p2.setPort(6881); - CPPUNIT_ASSERT(p1 != p2); - } -} - void PeerTest::testCountSeeder() { std::vector > peers(5); diff --git a/test/UTPexExtensionMessageTest.cc b/test/UTPexExtensionMessageTest.cc index 422caffb..04412fe7 100644 --- a/test/UTPexExtensionMessageTest.cc +++ b/test/UTPexExtensionMessageTest.cc @@ -154,24 +154,24 @@ void UTPexExtensionMessageTest::testDoReceivedAction() msg.doReceivedAction(); - CPPUNIT_ASSERT_EQUAL((size_t)4, peerStorage_->getPeers().size()); + CPPUNIT_ASSERT_EQUAL((size_t)4, peerStorage_->getUnusedPeers().size()); { - SharedHandle p = peerStorage_->getPeers()[0]; + SharedHandle p = peerStorage_->getUnusedPeers()[0]; CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.1"), p->getIPAddress()); CPPUNIT_ASSERT_EQUAL((uint16_t)6881, p->getPort()); } { - SharedHandle p = peerStorage_->getPeers()[1]; + SharedHandle p = peerStorage_->getUnusedPeers()[1]; CPPUNIT_ASSERT_EQUAL(std::string("1002:1035:4527:3546:7854:1237:3247:3217"), p->getIPAddress()); CPPUNIT_ASSERT_EQUAL((uint16_t)9999, p->getPort()); } { - SharedHandle p = peerStorage_->getPeers()[2]; + SharedHandle p = peerStorage_->getUnusedPeers()[2]; CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.2"), p->getIPAddress()); } { - SharedHandle p = peerStorage_->getPeers()[3]; + SharedHandle p = peerStorage_->getUnusedPeers()[3]; CPPUNIT_ASSERT_EQUAL(std::string("2001:db8:bd05:1d2:288a:1fc0:1:10ee"), p->getIPAddress()); }