diff --git a/src/ActivePeerConnectionCommand.cc b/src/ActivePeerConnectionCommand.cc index 47183e4a..4c0dbf65 100644 --- a/src/ActivePeerConnectionCommand.cc +++ b/src/ActivePeerConnectionCommand.cc @@ -115,13 +115,8 @@ bool ActivePeerConnectionCommand::execute() { numConnection = numNewConnection_; } - for(; numConnection > 0; --numConnection) { - SharedHandle peer = peerStorage_->getUnusedPeer(); - if(!peer) { - break; - } - connectToPeer(peer); - } + makeNewConnections(numConnection); + if(btRuntime_->getConnections() == 0 && !pieceStorage_->downloadFinished()) { btAnnounce_->overrideMinInterval(BtAnnounce::DEFAULT_ANNOUNCE_INTERVAL); @@ -132,18 +127,24 @@ bool ActivePeerConnectionCommand::execute() { return false; } -void ActivePeerConnectionCommand::connectToPeer(const SharedHandle& peer) +void ActivePeerConnectionCommand::makeNewConnections(int num) { - peer->usedBy(e_->newCUID()); - PeerInitiateConnectionCommand* command = - new PeerInitiateConnectionCommand(peer->usedBy(), requestGroup_, peer, e_, - btRuntime_); - command->setPeerStorage(peerStorage_); - command->setPieceStorage(pieceStorage_); - e_->addCommand(command); - A2_LOG_INFO(fmt(MSG_CONNECTING_TO_PEER, - getCuid(), - peer->getIPAddress().c_str())); + for(; num && peerStorage_->isPeerAvailable(); --num) { + cuid_t ncuid = e_->newCUID(); + SharedHandle peer = peerStorage_->checkoutPeer(ncuid); + // sanity check + if(!peer) { + break; + } + PeerInitiateConnectionCommand* command; + command = new PeerInitiateConnectionCommand(ncuid, requestGroup_, peer, e_, + btRuntime_); + command->setPeerStorage(peerStorage_); + command->setPieceStorage(pieceStorage_); + e_->addCommand(command); + A2_LOG_INFO(fmt(MSG_CONNECTING_TO_PEER, getCuid(), + peer->getIPAddress().c_str())); + } } void ActivePeerConnectionCommand::setBtRuntime diff --git a/src/ActivePeerConnectionCommand.h b/src/ActivePeerConnectionCommand.h index 929fd16e..872eec58 100644 --- a/src/ActivePeerConnectionCommand.h +++ b/src/ActivePeerConnectionCommand.h @@ -71,7 +71,7 @@ public: virtual bool execute(); - void connectToPeer(const SharedHandle& peer); + void makeNewConnections(int num); void setNumNewConnection(int numNewConnection) { diff --git a/src/DHTGetPeersCommand.cc b/src/DHTGetPeersCommand.cc index 01db7c79..c973437c 100644 --- a/src/DHTGetPeersCommand.cc +++ b/src/DHTGetPeersCommand.cc @@ -112,11 +112,11 @@ bool DHTGetPeersCommand::execute() lastGetPeerTime_ = global::wallclock(); if(numRetry_ < MAX_RETRIES && (btRuntime_->getMaxPeers() == 0 || - btRuntime_->getMaxPeers() > peerStorage_->countPeer())) { + btRuntime_->getMaxPeers() > peerStorage_->countAllPeer())) { ++numRetry_; A2_LOG_DEBUG(fmt("Too few peers. peers=%lu, max_peers=%d." " Try again(%d)", - static_cast(peerStorage_->countPeer()), + static_cast(peerStorage_->countAllPeer()), btRuntime_->getMaxPeers(), numRetry_)); } else { diff --git a/src/DefaultPeerStorage.cc b/src/DefaultPeerStorage.cc index 519368df..555969a3 100644 --- a/src/DefaultPeerStorage.cc +++ b/src/DefaultPeerStorage.cc @@ -53,7 +53,7 @@ namespace aria2 { namespace { -const size_t MAX_PEER_LIST_SIZE = 1024; +const size_t MAX_PEER_LIST_SIZE = 128; const size_t MAX_PEER_LIST_UPDATE = 100; } // namespace @@ -69,30 +69,27 @@ DefaultPeerStorage::~DefaultPeerStorage() { delete seederStateChoke_; delete leecherStateChoke_; + assert(uniqPeers_.size() == unusedPeers_.size() + usedPeers_.size()); } -namespace { -class FindIdenticalPeer { -private: - SharedHandle peer_; -public: - FindIdenticalPeer(const SharedHandle& peer):peer_(peer) {} - - bool operator()(const SharedHandle& peer) const { - return (*peer_ == *peer) || - ((peer_->getIPAddress() == peer->getIPAddress()) && - (peer_->getPort() == peer->getPort())); - } -}; -} // namespace +size_t DefaultPeerStorage::countAllPeer() const +{ + return unusedPeers_.size() + usedPeers_.size(); +} bool DefaultPeerStorage::isPeerAlreadyAdded(const SharedHandle& peer) { - return std::find_if(peers_.begin(), peers_.end(), - FindIdenticalPeer(peer)) != peers_.end(); + return uniqPeers_.count(std::make_pair(peer->getIPAddress(), + peer->getOrigPort())); } -bool DefaultPeerStorage::addPeer(const SharedHandle& peer) { +void DefaultPeerStorage::addUniqPeer(const SharedHandle& peer) +{ + uniqPeers_.insert(std::make_pair(peer->getIPAddress(), peer->getOrigPort())); +} + +bool DefaultPeerStorage::addPeer(const SharedHandle& peer) +{ if(isPeerAlreadyAdded(peer)) { A2_LOG_DEBUG(fmt("Adding %s:%u is rejected because it has been already" " added.", @@ -104,13 +101,14 @@ bool DefaultPeerStorage::addPeer(const SharedHandle& peer) { peer->getIPAddress().c_str(), peer->getPort())); return false; } - const size_t peerListSize = peers_.size(); + const size_t peerListSize = unusedPeers_.size(); if(peerListSize >= maxPeerListSize_) { deleteUnusedPeer(peerListSize-maxPeerListSize_+1); } - peers_.push_front(peer); - A2_LOG_DEBUG(fmt("Now peer list contains %lu peers", - static_cast(peers_.size()))); + unusedPeers_.push_front(peer); + addUniqPeer(peer); + A2_LOG_DEBUG(fmt("Now unused peer list contains %lu peers", + static_cast(unusedPeers_.size()))); return true; } @@ -134,28 +132,35 @@ void DefaultPeerStorage::addPeer(const std::vector >& peers) A2_LOG_DEBUG(fmt(MSG_ADDING_PEER, peer->getIPAddress().c_str(), peer->getPort())); } - peers_.push_front(peer); + unusedPeers_.push_front(peer); + addUniqPeer(peer); ++added; } - const size_t peerListSize = peers_.size(); + const size_t peerListSize = unusedPeers_.size(); if(peerListSize > maxPeerListSize_) { deleteUnusedPeer(peerListSize-maxPeerListSize_); } - A2_LOG_DEBUG(fmt("Now peer list contains %lu peers", - static_cast(peers_.size()))); + A2_LOG_DEBUG(fmt("Now unused peer list contains %lu peers", + static_cast(unusedPeers_.size()))); } void DefaultPeerStorage::addDroppedPeer(const SharedHandle& peer) { + // TODO Make unique droppedPeers_.push_front(peer); if(droppedPeers_.size() > 50) { droppedPeers_.pop_back(); } } -const std::deque >& DefaultPeerStorage::getPeers() +const std::deque >& DefaultPeerStorage::getUnusedPeers() { - return peers_; + return unusedPeers_; +} + +const PeerSet& DefaultPeerStorage::getUsedPeers() +{ + return usedPeers_; } const std::deque >& DefaultPeerStorage::getDroppedPeers() @@ -163,57 +168,8 @@ const std::deque >& DefaultPeerStorage::getDroppedPeers() return droppedPeers_; } -namespace { -class FindFinePeer { -public: - bool operator()(const SharedHandle& peer) const { - return peer->unused() && peer->isGood(); - } -}; -} // namespace - -SharedHandle DefaultPeerStorage::getUnusedPeer() { - std::deque >::const_iterator itr = - std::find_if(peers_.begin(), peers_.end(), FindFinePeer()); - if(itr == peers_.end()) { - return SharedHandle(); - } else { - return *itr; - } -} - -namespace { -class FindPeer { -private: - std::string ipaddr; - uint16_t port; -public: - FindPeer(const std::string& ipaddr, uint16_t port): - ipaddr(ipaddr), port(port) {} - - bool operator()(const SharedHandle& peer) const { - return ipaddr == peer->getIPAddress() && port == peer->getPort(); - } -}; -} // namespace - -SharedHandle DefaultPeerStorage::getPeer(const std::string& ipaddr, - uint16_t port) const { - std::deque >::const_iterator itr = - std::find_if(peers_.begin(), peers_.end(), FindPeer(ipaddr, port)); - if(itr == peers_.end()) { - return SharedHandle(); - } else { - return *itr; - } -} - -size_t DefaultPeerStorage::countPeer() const { - return peers_.size(); -} - bool DefaultPeerStorage::isPeerAvailable() { - return getUnusedPeer(); + return !unusedPeers_.empty(); } namespace { @@ -236,7 +192,8 @@ public: void DefaultPeerStorage::getActivePeers (std::vector >& activePeers) { - std::for_each(peers_.begin(), peers_.end(), CollectActivePeer(activePeers)); + std::for_each(usedPeers_.begin(), usedPeers_.end(), + CollectActivePeer(activePeers)); } bool DefaultPeerStorage::isBadPeer(const std::string& ipaddr) @@ -274,21 +231,32 @@ void DefaultPeerStorage::addBadPeer(const std::string& ipaddr) } void DefaultPeerStorage::deleteUnusedPeer(size_t delSize) { - std::deque > temp; - for(std::deque >::const_reverse_iterator itr = - peers_.rbegin(), eoi = peers_.rend(); itr != eoi; ++itr) { - const SharedHandle& p = *itr; - if(p->unused() && delSize > 0) { - onErasingPeer(p); - --delSize; - } else { - temp.push_front(p); - } + for(; delSize > 0 && !unusedPeers_.empty(); --delSize) { + onErasingPeer(unusedPeers_.back()); + unusedPeers_.pop_back(); } - peers_.swap(temp); } -void DefaultPeerStorage::onErasingPeer(const SharedHandle& peer) {} +SharedHandle DefaultPeerStorage::checkoutPeer(cuid_t cuid) +{ + if(!isPeerAvailable()) { + return SharedHandle(); + } + SharedHandle peer = unusedPeers_.front(); + unusedPeers_.pop_front(); + peer->usedBy(cuid); + usedPeers_.insert(peer); + A2_LOG_DEBUG(fmt("Checkout peer %s:%u to CUID#%"PRId64, + peer->getIPAddress().c_str(), peer->getPort(), + peer->usedBy())); + return peer; +} + +void DefaultPeerStorage::onErasingPeer(const SharedHandle& peer) +{ + uniqPeers_.erase(std::make_pair(peer->getIPAddress(), + peer->getOrigPort())); +} void DefaultPeerStorage::onReturningPeer(const SharedHandle& peer) { @@ -303,20 +271,20 @@ void DefaultPeerStorage::onReturningPeer(const SharedHandle& peer) executeChoke(); } } + peer->usedBy(0); } void DefaultPeerStorage::returnPeer(const SharedHandle& peer) { - std::deque >::iterator itr = - std::find_if(peers_.begin(), peers_.end(), derefEqual(peer)); - if(itr == peers_.end()) { - A2_LOG_DEBUG(fmt("Cannot find peer %s:%u in PeerStorage.", - peer->getIPAddress().c_str(), peer->getPort())); - } else { - peers_.erase(itr); - + A2_LOG_DEBUG(fmt("Peer %s:%u returned from CUID#%"PRId64, + peer->getIPAddress().c_str(), peer->getPort(), + peer->usedBy())); + if(usedPeers_.erase(peer)) { onReturningPeer(peer); onErasingPeer(peer); + } else { + A2_LOG_DEBUG(fmt("Cannot find peer %s:%u in usedPeers_", + peer->getIPAddress().c_str(), peer->getPort())); } } diff --git a/src/DefaultPeerStorage.h b/src/DefaultPeerStorage.h index 3b3dac20..a172dd06 100644 --- a/src/DefaultPeerStorage.h +++ b/src/DefaultPeerStorage.h @@ -39,8 +39,10 @@ #include #include +#include #include "TimerA2.h" +#include "a2functional.h" namespace aria2 { @@ -49,12 +51,23 @@ class BtSeederStateChoke; class BtLeecherStateChoke; class PieceStorage; +typedef std::set, RefLess > PeerSet; + class DefaultPeerStorage : public PeerStorage { private: SharedHandle btRuntime_; SharedHandle pieceStorage_; size_t maxPeerListSize_; - std::deque > peers_; + + // This contains ip address and port pair and is used to ensure that + // no duplicate peers are stored. + std::set > uniqPeers_; + // Unused (not connected) peers, sorted by last added. + std::deque > unusedPeers_; + // The set of used peers. Some of them are not connected yet. To + // know it is connected or not, call Peer::isActive(). + PeerSet usedPeers_; + std::deque > droppedPeers_; BtSeederStateChoke* seederStateChoke_; @@ -66,6 +79,7 @@ private: Timer lastBadPeerCleaned_; bool isPeerAlreadyAdded(const SharedHandle& peer); + void addUniqPeer(const SharedHandle& peer); void addDroppedPeer(const SharedHandle& peer); public: @@ -73,17 +87,18 @@ public: virtual ~DefaultPeerStorage(); + // TODO We need addAndCheckoutPeer for incoming peers virtual bool addPeer(const SharedHandle& peer); - virtual size_t countPeer() const; - - virtual SharedHandle getUnusedPeer(); + virtual size_t countAllPeer() const; SharedHandle getPeer(const std::string& ipaddr, uint16_t port) const; virtual void addPeer(const std::vector >& peers); - virtual const std::deque >& getPeers(); + const std::deque >& getUnusedPeers(); + + const PeerSet& getUsedPeers(); virtual const std::deque >& getDroppedPeers(); @@ -95,6 +110,8 @@ public: virtual void addBadPeer(const std::string& ipaddr); + virtual SharedHandle checkoutPeer(cuid_t cuid); + virtual void returnPeer(const SharedHandle& peer); virtual bool chokeRoundIntervalElapsed(); diff --git a/src/InitiatorMSEHandshakeCommand.cc b/src/InitiatorMSEHandshakeCommand.cc index bc516473..65b6c437 100644 --- a/src/InitiatorMSEHandshakeCommand.cc +++ b/src/InitiatorMSEHandshakeCommand.cc @@ -198,14 +198,18 @@ bool InitiatorMSEHandshakeCommand::executeInternal() { void InitiatorMSEHandshakeCommand::tryNewPeer() { if(peerStorage_->isPeerAvailable() && btRuntime_->lessThanEqMinPeers()) { - SharedHandle peer = peerStorage_->getUnusedPeer(); - peer->usedBy(getDownloadEngine()->newCUID()); - PeerInitiateConnectionCommand* command = - new PeerInitiateConnectionCommand(peer->usedBy(), requestGroup_, peer, - getDownloadEngine(), btRuntime_); - command->setPeerStorage(peerStorage_); - command->setPieceStorage(pieceStorage_); - getDownloadEngine()->addCommand(command); + cuid_t ncuid = getDownloadEngine()->newCUID(); + SharedHandle peer = peerStorage_->checkoutPeer(ncuid); + // sanity check + if(peer) { + PeerInitiateConnectionCommand* command; + command = new PeerInitiateConnectionCommand(ncuid, requestGroup_, peer, + getDownloadEngine(), + btRuntime_); + command->setPeerStorage(peerStorage_); + command->setPieceStorage(pieceStorage_); + getDownloadEngine()->addCommand(command); + } } } diff --git a/src/Peer.cc b/src/Peer.cc index b4fc13ac..1ca02d91 100644 --- a/src/Peer.cc +++ b/src/Peer.cc @@ -45,12 +45,11 @@ namespace aria2 { -#define BAD_CONDITION_INTERVAL 10 - Peer::Peer(std::string ipaddr, uint16_t port, bool incoming): ipaddr_(ipaddr), port_(port), - id_(fmt("%s(%u)", ipaddr_.c_str(), port_)), + origPort_(port), + cuid_(0), firstContactTime_(global::wallclock()), badConditionStartTime_(0), seeder_(false), @@ -60,7 +59,6 @@ Peer::Peer(std::string ipaddr, uint16_t port, bool incoming): disconnectedGracefully_(false) { memset(peerId_, 0, PEER_ID_LENGTH); - resetStatus(); } Peer::~Peer() @@ -98,10 +96,6 @@ void Peer::setPeerId(const unsigned char* peerId) memcpy(peerId_, peerId, PEER_ID_LENGTH); } -void Peer::resetStatus() { - cuid_ = 0; -} - bool Peer::amChoking() const { assert(res_); @@ -328,12 +322,6 @@ void Peer::startBadCondition() badConditionStartTime_ = global::wallclock(); } -bool Peer::isGood() const -{ - return badConditionStartTime_. - difference(global::wallclock()) >= BAD_CONDITION_INTERVAL; -} - uint8_t Peer::getExtensionMessageID(int key) const { assert(res_); diff --git a/src/Peer.h b/src/Peer.h index f85c8f76..9d5e2b70 100644 --- a/src/Peer.h +++ b/src/Peer.h @@ -61,8 +61,9 @@ private: // true, then this port is not a port the peer is listening to and // we cannot connect to it. uint16_t port_; - - std::string id_; + // This is the port number passed in the constructor arguments. This + // is used to distinguish peer identity. + uint16_t origPort_; cuid_t cuid_; @@ -94,18 +95,6 @@ public: ~Peer(); - bool operator==(const Peer& p) - { - return id_ == p.id_; - } - - bool operator!=(const Peer& p) - { - return !(*this == p); - } - - void resetStatus(); - const std::string& getIPAddress() const { return ipaddr_; @@ -121,6 +110,11 @@ public: port_ = port; } + uint16_t getOrigPort() const + { + return origPort_; + } + void usedBy(cuid_t cuid); cuid_t usedBy() const @@ -151,15 +145,8 @@ public: return seeder_; } - const std::string& getID() const - { - return id_; - } - void startBadCondition(); - bool isGood() const; - void allocateSessionResource(int32_t pieceLength, int64_t totalLength); void reconfigureSessionResource(int32_t pieceLength, int64_t totalLength); diff --git a/src/PeerInitiateConnectionCommand.cc b/src/PeerInitiateConnectionCommand.cc index c2f23bb1..cc878c27 100644 --- a/src/PeerInitiateConnectionCommand.cc +++ b/src/PeerInitiateConnectionCommand.cc @@ -105,14 +105,18 @@ bool PeerInitiateConnectionCommand::executeInternal() { // TODO this method removed when PeerBalancerCommand is implemented bool PeerInitiateConnectionCommand::prepareForNextPeer(time_t wait) { if(peerStorage_->isPeerAvailable() && btRuntime_->lessThanEqMinPeers()) { - SharedHandle peer = peerStorage_->getUnusedPeer(); - peer->usedBy(getDownloadEngine()->newCUID()); - PeerInitiateConnectionCommand* command = - new PeerInitiateConnectionCommand(peer->usedBy(), requestGroup_, peer, - getDownloadEngine(), btRuntime_); - command->setPeerStorage(peerStorage_); - command->setPieceStorage(pieceStorage_); - getDownloadEngine()->addCommand(command); + cuid_t ncuid = getDownloadEngine()->newCUID(); + SharedHandle peer = peerStorage_->checkoutPeer(ncuid); + // sanity check + if(peer) { + PeerInitiateConnectionCommand* command; + command = new PeerInitiateConnectionCommand(ncuid, requestGroup_, peer, + getDownloadEngine(), + btRuntime_); + command->setPeerStorage(peerStorage_); + command->setPieceStorage(pieceStorage_); + getDownloadEngine()->addCommand(command); + } } return true; } diff --git a/src/PeerInteractionCommand.cc b/src/PeerInteractionCommand.cc index 5fcc8faf..f07e42b5 100644 --- a/src/PeerInteractionCommand.cc +++ b/src/PeerInteractionCommand.cc @@ -391,14 +391,18 @@ bool PeerInteractionCommand::executeInternal() { // TODO this method removed when PeerBalancerCommand is implemented bool PeerInteractionCommand::prepareForNextPeer(time_t wait) { if(peerStorage_->isPeerAvailable() && btRuntime_->lessThanEqMinPeers()) { - SharedHandle peer = peerStorage_->getUnusedPeer(); - peer->usedBy(getDownloadEngine()->newCUID()); - PeerInitiateConnectionCommand* command = - new PeerInitiateConnectionCommand - (peer->usedBy(), requestGroup_, peer, getDownloadEngine(), btRuntime_); - command->setPeerStorage(peerStorage_); - command->setPieceStorage(pieceStorage_); - getDownloadEngine()->addCommand(command); + cuid_t ncuid = getDownloadEngine()->newCUID(); + SharedHandle peer = peerStorage_->checkoutPeer(ncuid); + // sanity check + if(peer) { + PeerInitiateConnectionCommand* command; + command = new PeerInitiateConnectionCommand(ncuid, requestGroup_, peer, + getDownloadEngine(), + btRuntime_); + command->setPeerStorage(peerStorage_); + command->setPieceStorage(pieceStorage_); + getDownloadEngine()->addCommand(command); + } } return true; } diff --git a/src/PeerReceiveHandshakeCommand.cc b/src/PeerReceiveHandshakeCommand.cc index 48d26b1f..b93e049d 100644 --- a/src/PeerReceiveHandshakeCommand.cc +++ b/src/PeerReceiveHandshakeCommand.cc @@ -138,8 +138,10 @@ bool PeerReceiveHandshakeCommand::executeInternal() if((!pieceStorage->downloadFinished() && stat.calculateDownloadSpeed() < thresholdSpeed) || btRuntime->lessThanMaxPeers()) { - if(peerStorage->addPeer(getPeer())) { - getPeer()->usedBy(getCuid()); + // TODO addPeer and checkoutPeer must be "atomic", in a sense + // that the added peer must be checked out. + if(peerStorage->addPeer(getPeer()) && + peerStorage->checkoutPeer(getCuid())) { PeerInteractionCommand* command = new PeerInteractionCommand (getCuid(), diff --git a/src/PeerStorage.h b/src/PeerStorage.h index ab4cb2e2..fbbe2746 100644 --- a/src/PeerStorage.h +++ b/src/PeerStorage.h @@ -42,6 +42,7 @@ #include "SharedHandle.h" #include "TransferStat.h" +#include "Command.h" namespace aria2 { @@ -63,26 +64,15 @@ public: virtual void addPeer(const std::vector >& peers) = 0; /** - * Returns internal peer list. + * Returns the number of peers, including used and unused ones. */ - virtual const std::deque >& getPeers() = 0; - - - /** - * Returns the number of peers. - */ - virtual size_t countPeer() const = 0; + virtual size_t countAllPeer() const = 0; /** * Returns internal dropped peer list. */ virtual const std::deque >& getDroppedPeers() = 0; - /** - * Returns one of the unused peers. - */ - virtual SharedHandle getUnusedPeer() = 0; - /** * Returns true if at least one unused peer exists. * Otherwise returns false. @@ -105,6 +95,13 @@ public: */ virtual void addBadPeer(const std::string& ipaddr) = 0; + /** + * Moves first peer in unused peer list to used peer set and calls + * Peer::usedBy(cuid). If there is no peer available, returns + * SharedHandle(). + */ + virtual SharedHandle checkoutPeer(cuid_t cuid) = 0; + /** * Tells PeerStorage object that peer is no longer used in the session. */ diff --git a/src/TrackerWatcherCommand.cc b/src/TrackerWatcherCommand.cc index fcd3b6f2..0eba057f 100644 --- a/src/TrackerWatcherCommand.cc +++ b/src/TrackerWatcherCommand.cc @@ -170,14 +170,18 @@ void TrackerWatcherCommand::processTrackerResponse (reinterpret_cast(trackerResponse.c_str()), trackerResponse.size()); while(!btRuntime_->isHalt() && btRuntime_->lessThanMinPeers()) { - SharedHandle peer = peerStorage_->getUnusedPeer(); + if(!peerStorage_->isPeerAvailable()) { + break; + } + cuid_t ncuid = e_->newCUID(); + SharedHandle peer = peerStorage_->checkoutPeer(ncuid); + // sanity check if(!peer) { break; } - peer->usedBy(e_->newCUID()); - PeerInitiateConnectionCommand* command = - new PeerInitiateConnectionCommand - (peer->usedBy(), requestGroup_, peer, e_, btRuntime_); + PeerInitiateConnectionCommand* command; + command = new PeerInitiateConnectionCommand(ncuid, requestGroup_, peer, e_, + btRuntime_); command->setPeerStorage(peerStorage_); command->setPieceStorage(pieceStorage_); e_->addCommand(command); diff --git a/test/DHTMessageFactoryImplTest.cc b/test/DHTMessageFactoryImplTest.cc index 7abec745..67590297 100644 --- a/test/DHTMessageFactoryImplTest.cc +++ b/test/DHTMessageFactoryImplTest.cc @@ -344,8 +344,11 @@ void DHTMessageFactoryImplTest::testCreateGetPeersReplyMessage() CPPUNIT_ASSERT(*nodes[0] == *m->getClosestKNodes()[0]); CPPUNIT_ASSERT(*nodes[7] == *m->getClosestKNodes()[7]); CPPUNIT_ASSERT_EQUAL((size_t)4, m->getValues().size()); - CPPUNIT_ASSERT(*peers[0] == *m->getValues()[0]); - CPPUNIT_ASSERT(*peers[3] == *m->getValues()[3]); + for(int i = 0; i < 4; ++i) { + CPPUNIT_ASSERT_EQUAL(peers[i]->getIPAddress(), + m->getValues()[i]->getIPAddress()); + CPPUNIT_ASSERT_EQUAL(peers[i]->getPort(), m->getValues()[i]->getPort()); + } CPPUNIT_ASSERT_EQUAL(util::toHex(transactionID, DHT_TRANSACTION_ID_LENGTH), util::toHex(m->getTransactionID())); } catch(Exception& e) { @@ -416,8 +419,11 @@ void DHTMessageFactoryImplTest::testCreateGetPeersReplyMessage6() CPPUNIT_ASSERT(*nodes[0] == *m->getClosestKNodes()[0]); CPPUNIT_ASSERT(*nodes[7] == *m->getClosestKNodes()[7]); CPPUNIT_ASSERT_EQUAL((size_t)4, m->getValues().size()); - CPPUNIT_ASSERT(*peers[0] == *m->getValues()[0]); - CPPUNIT_ASSERT(*peers[3] == *m->getValues()[3]); + for(int i = 0; i < 4; ++i) { + CPPUNIT_ASSERT_EQUAL(peers[i]->getIPAddress(), + m->getValues()[i]->getIPAddress()); + CPPUNIT_ASSERT_EQUAL(peers[i]->getPort(), m->getValues()[i]->getPort()); + } CPPUNIT_ASSERT_EQUAL(util::toHex(transactionID, DHT_TRANSACTION_ID_LENGTH), util::toHex(m->getTransactionID())); } catch(Exception& e) { diff --git a/test/DefaultBtAnnounceTest.cc b/test/DefaultBtAnnounceTest.cc index 6a5e49d3..edc4c52c 100644 --- a/test/DefaultBtAnnounceTest.cc +++ b/test/DefaultBtAnnounceTest.cc @@ -403,10 +403,10 @@ void DefaultBtAnnounceTest::testProcessAnnounceResponse() CPPUNIT_ASSERT_EQUAL((time_t)1800, an.getMinInterval()); CPPUNIT_ASSERT_EQUAL(100, an.getComplete()); CPPUNIT_ASSERT_EQUAL(200, an.getIncomplete()); - CPPUNIT_ASSERT_EQUAL((size_t)2, peerStorage_->getPeers().size()); - SharedHandle peer = peerStorage_->getPeers()[0]; + CPPUNIT_ASSERT_EQUAL((size_t)2, peerStorage_->getUnusedPeers().size()); + SharedHandle peer = peerStorage_->getUnusedPeers()[0]; CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.1"), peer->getIPAddress()); - peer = peerStorage_->getPeers()[1]; + peer = peerStorage_->getUnusedPeers()[1]; CPPUNIT_ASSERT_EQUAL(std::string("1002:1035:4527:3546:7854:1237:3247:3217"), peer->getIPAddress()); } diff --git a/test/DefaultPeerStorageTest.cc b/test/DefaultPeerStorageTest.cc index b3a425da..7aad7e9c 100644 --- a/test/DefaultPeerStorageTest.cc +++ b/test/DefaultPeerStorageTest.cc @@ -16,13 +16,12 @@ namespace aria2 { class DefaultPeerStorageTest:public CppUnit::TestFixture { CPPUNIT_TEST_SUITE(DefaultPeerStorageTest); - CPPUNIT_TEST(testCountPeer); + CPPUNIT_TEST(testCountAllPeer); CPPUNIT_TEST(testDeleteUnusedPeer); CPPUNIT_TEST(testAddPeer); - CPPUNIT_TEST(testGetUnusedPeer); CPPUNIT_TEST(testIsPeerAvailable); - CPPUNIT_TEST(testActivatePeer); - CPPUNIT_TEST(testCalculateStat); + CPPUNIT_TEST(testGetActivePeers); + CPPUNIT_TEST(testCheckoutPeer); CPPUNIT_TEST(testReturnPeer); CPPUNIT_TEST(testOnErasingPeer); CPPUNIT_TEST(testAddBadPeer); @@ -40,13 +39,12 @@ public: delete option; } - void testCountPeer(); + void testCountAllPeer(); void testDeleteUnusedPeer(); void testAddPeer(); - void testGetUnusedPeer(); void testIsPeerAvailable(); - void testActivatePeer(); - void testCalculateStat(); + void testGetActivePeers(); + void testCheckoutPeer(); void testReturnPeer(); void testOnErasingPeer(); void testAddBadPeer(); @@ -55,154 +53,91 @@ public: CPPUNIT_TEST_SUITE_REGISTRATION(DefaultPeerStorageTest); -void DefaultPeerStorageTest::testCountPeer() { +void DefaultPeerStorageTest::testCountAllPeer() +{ DefaultPeerStorage ps; - CPPUNIT_ASSERT_EQUAL((size_t)0, ps.countPeer()); - - SharedHandle peer(new Peer("192.168.0.1", 6889)); - - ps.addPeer(peer); - CPPUNIT_ASSERT_EQUAL((size_t)1, ps.countPeer()); + CPPUNIT_ASSERT_EQUAL((size_t)0, ps.countAllPeer()); + for(int i = 0; i < 2; ++i) { + SharedHandle peer(new Peer("192.168.0.1", 6889+i)); + ps.addPeer(peer); + } + CPPUNIT_ASSERT_EQUAL((size_t)2, ps.countAllPeer()); + SharedHandle peer = ps.checkoutPeer(1); + CPPUNIT_ASSERT(peer); + CPPUNIT_ASSERT_EQUAL((size_t)2, ps.countAllPeer()); + ps.returnPeer(peer); + CPPUNIT_ASSERT_EQUAL((size_t)1, ps.countAllPeer()); } -void DefaultPeerStorageTest::testDeleteUnusedPeer() { +void DefaultPeerStorageTest::testDeleteUnusedPeer() +{ DefaultPeerStorage ps; SharedHandle peer1(new Peer("192.168.0.1", 6889)); SharedHandle peer2(new Peer("192.168.0.2", 6889)); SharedHandle peer3(new Peer("192.168.0.3", 6889)); - ps.addPeer(peer1); - ps.addPeer(peer2); - ps.addPeer(peer3); + CPPUNIT_ASSERT(ps.addPeer(peer1)); + CPPUNIT_ASSERT(ps.addPeer(peer2)); + CPPUNIT_ASSERT(ps.addPeer(peer3)); ps.deleteUnusedPeer(2); - CPPUNIT_ASSERT_EQUAL((size_t)1, ps.countPeer()); + CPPUNIT_ASSERT_EQUAL((size_t)1, ps.getUnusedPeers().size()); CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.3"), - ps.getPeer("192.168.0.3", 6889)->getIPAddress()); - - ps.addPeer(peer1); - ps.addPeer(peer2); - - peer2->usedBy(1); - - ps.deleteUnusedPeer(3); - - // peer2 has been in use, so it did't deleted. - CPPUNIT_ASSERT_EQUAL((size_t)1, ps.countPeer()); - CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.2"), - ps.getPeer("192.168.0.2", 6889)->getIPAddress()); + ps.getUnusedPeers()[0]->getIPAddress()); + ps.deleteUnusedPeer(100); + CPPUNIT_ASSERT(ps.getUnusedPeers().empty()); } -void DefaultPeerStorageTest::testAddPeer() { +void DefaultPeerStorageTest::testAddPeer() +{ DefaultPeerStorage ps; SharedHandle btRuntime(new BtRuntime()); - ps.setMaxPeerListSize(3); + ps.setMaxPeerListSize(2); ps.setBtRuntime(btRuntime); SharedHandle peer1(new Peer("192.168.0.1", 6889)); SharedHandle peer2(new Peer("192.168.0.2", 6889)); SharedHandle peer3(new Peer("192.168.0.3", 6889)); - ps.addPeer(peer1); - ps.addPeer(peer2); - ps.addPeer(peer3); + CPPUNIT_ASSERT(ps.addPeer(peer1)); + CPPUNIT_ASSERT(ps.addPeer(peer2)); + CPPUNIT_ASSERT(ps.addPeer(peer3)); - CPPUNIT_ASSERT_EQUAL((size_t)3, ps.countPeer()); + CPPUNIT_ASSERT_EQUAL((size_t)2, ps.getUnusedPeers().size()); + CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.3"), + ps.getUnusedPeers()[0]->getIPAddress()); - // this returns false, because peer1 is already in the container - CPPUNIT_ASSERT_EQUAL(false, ps.addPeer(peer1)); - // the number of peers doesn't change. - CPPUNIT_ASSERT_EQUAL((size_t)3, ps.countPeer()); - - SharedHandle peer4(new Peer("192.168.0.4", 6889)); - - peer1->usedBy(1); - CPPUNIT_ASSERT(ps.addPeer(peer4)); - // peer2 was deleted. While peer1 is oldest, its cuid is not 0. - CPPUNIT_ASSERT_EQUAL((size_t)3, ps.countPeer()); - CPPUNIT_ASSERT(std::find_if(ps.getPeers().begin(), ps.getPeers().end(), - derefEqual(peer2)) == ps.getPeers().end()); - - SharedHandle peer5(new Peer("192.168.0.4", 0)); - - peer5->setPort(6889); - - // this returns false because the peer which has same ip and port - // has already added - CPPUNIT_ASSERT_EQUAL(false, ps.addPeer(peer5)); - - SharedHandle pa[] = { - SharedHandle(new Peer("192.168.0.4", 6889)), - SharedHandle(new Peer("192.168.0.5", 6889)), - SharedHandle(new Peer("192.168.0.6", 6889)), - SharedHandle(new Peer("192.168.0.7", 6889)), - SharedHandle(new Peer("192.168.0.8", 6889)) - }; - std::vector > peers(vbegin(pa), vend(pa)); - ps.addPeer(peers); - // peers[0] is not added because it has been already added. - // peers[1], peers[2] and peers[3] are going to be added. peers[4] - // is not added because DefaultPeerStorage::addPeer() limits the - // number of peers to add. Finally, unused peers are removed from - // back and size 3 vector is made. - CPPUNIT_ASSERT_EQUAL((size_t)3, ps.countPeer()); - CPPUNIT_ASSERT(std::find_if(ps.getPeers().begin(), ps.getPeers().end(), - derefEqual(peers[2])) != ps.getPeers().end()); - CPPUNIT_ASSERT(std::find_if(ps.getPeers().begin(), ps.getPeers().end(), - derefEqual(peers[3])) != ps.getPeers().end()); -} - -void DefaultPeerStorageTest::testGetUnusedPeer() { - DefaultPeerStorage ps; - ps.setBtRuntime(btRuntime); - - SharedHandle peer1(new Peer("192.168.0.1", 6889)); - - ps.addPeer(peer1); + CPPUNIT_ASSERT(!ps.addPeer(peer2)); + CPPUNIT_ASSERT(ps.addPeer(peer1)); + CPPUNIT_ASSERT_EQUAL((size_t)2, ps.getUnusedPeers().size()); CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.1"), - ps.getUnusedPeer()->getIPAddress()); + ps.getUnusedPeers()[0]->getIPAddress()); - peer1->usedBy(1); - - CPPUNIT_ASSERT(!ps.getUnusedPeer()); - - peer1->resetStatus(); - peer1->startBadCondition(); - - CPPUNIT_ASSERT(!ps.getUnusedPeer()); + CPPUNIT_ASSERT_EQUAL(peer1->getIPAddress(), + ps.checkoutPeer(1)->getIPAddress()); + CPPUNIT_ASSERT(!ps.addPeer(peer1)); } void DefaultPeerStorageTest::testIsPeerAvailable() { DefaultPeerStorage ps; ps.setBtRuntime(btRuntime); - - CPPUNIT_ASSERT_EQUAL(false, ps.isPeerAvailable()); - SharedHandle peer1(new Peer("192.168.0.1", 6889)); + CPPUNIT_ASSERT(!ps.isPeerAvailable()); ps.addPeer(peer1); - - CPPUNIT_ASSERT_EQUAL(true, ps.isPeerAvailable()); - - peer1->usedBy(1); - - CPPUNIT_ASSERT_EQUAL(false, ps.isPeerAvailable()); - - peer1->resetStatus(); - - peer1->startBadCondition(); - - CPPUNIT_ASSERT_EQUAL(false, ps.isPeerAvailable()); + CPPUNIT_ASSERT(ps.isPeerAvailable()); + CPPUNIT_ASSERT(ps.checkoutPeer(1)); + CPPUNIT_ASSERT(!ps.isPeerAvailable()); } -void DefaultPeerStorageTest::testActivatePeer() { +void DefaultPeerStorageTest::testGetActivePeers() +{ DefaultPeerStorage ps; - { std::vector > peers; ps.getActivePeers(peers); @@ -210,15 +145,13 @@ void DefaultPeerStorageTest::testActivatePeer() { } SharedHandle peer1(new Peer("192.168.0.1", 6889)); - ps.addPeer(peer1); - { std::vector > activePeers; ps.getActivePeers(activePeers); - CPPUNIT_ASSERT_EQUAL((size_t)0, activePeers.size()); } + CPPUNIT_ASSERT(ps.checkoutPeer(1)); { peer1->allocateSessionResource(1024*1024, 1024*1024*10); @@ -229,7 +162,23 @@ void DefaultPeerStorageTest::testActivatePeer() { } } -void DefaultPeerStorageTest::testCalculateStat() { +void DefaultPeerStorageTest::testCheckoutPeer() +{ + DefaultPeerStorage ps; + SharedHandle peers[] = { + SharedHandle(new Peer("192.168.0.1", 1000)), + SharedHandle(new Peer("192.168.0.2", 1000)), + SharedHandle(new Peer("192.168.0.3", 1000)) + }; + int len = A2_ARRAY_LEN(peers); + for(int i = 0; i < len; ++i) { + ps.addPeer(peers[i]); + } + for(int i = 0; i < len; ++i) { + SharedHandle peer = ps.checkoutPeer(i+1); + CPPUNIT_ASSERT_EQUAL(peers[len-i-1]->getIPAddress(), peer->getIPAddress()); + } + CPPUNIT_ASSERT(!ps.checkoutPeer(len+1)); } void DefaultPeerStorageTest::testReturnPeer() @@ -245,17 +194,19 @@ void DefaultPeerStorageTest::testReturnPeer() ps.addPeer(peer1); ps.addPeer(peer2); ps.addPeer(peer3); + for(int i = 1; i <= 3; ++i) { + CPPUNIT_ASSERT(ps.checkoutPeer(i)); + } + CPPUNIT_ASSERT_EQUAL((size_t)3, ps.getUsedPeers().size()); ps.returnPeer(peer2); // peer2 removed from the container - CPPUNIT_ASSERT_EQUAL((size_t)2, ps.getPeers().size()); - CPPUNIT_ASSERT(std::find_if(ps.getPeers().begin(), ps.getPeers().end(), - derefEqual(peer2)) == ps.getPeers().end()); + CPPUNIT_ASSERT_EQUAL((size_t)2, ps.getUsedPeers().size()); + CPPUNIT_ASSERT(ps.getUsedPeers().count(peer2) == 0); CPPUNIT_ASSERT_EQUAL((size_t)1, ps.getDroppedPeers().size()); ps.returnPeer(peer1); // peer1 is removed from the container - CPPUNIT_ASSERT_EQUAL((size_t)1, ps.getPeers().size()); - CPPUNIT_ASSERT(std::find_if(ps.getPeers().begin(), ps.getPeers().end(), - derefEqual(peer1)) == ps.getPeers().end()); + CPPUNIT_ASSERT_EQUAL((size_t)1, ps.getUsedPeers().size()); + CPPUNIT_ASSERT(ps.getUsedPeers().count(peer1) == 0); CPPUNIT_ASSERT_EQUAL((size_t)1, ps.getDroppedPeers().size()); } diff --git a/test/MockPeerStorage.h b/test/MockPeerStorage.h index 9060346e..682f5ffe 100644 --- a/test/MockPeerStorage.h +++ b/test/MockPeerStorage.h @@ -11,7 +11,8 @@ namespace aria2 { class MockPeerStorage : public PeerStorage { private: - std::deque > peers; + std::deque > unusedPeers; + std::deque > usedPeers; std::deque > droppedPeers; std::vector > activePeers; int numChokeExecuted_; @@ -19,22 +20,24 @@ public: MockPeerStorage():numChokeExecuted_(0) {} virtual ~MockPeerStorage() {} - virtual bool addPeer(const SharedHandle& peer) { - peers.push_back(peer); + virtual bool addPeer(const SharedHandle& peer) + { + unusedPeers.push_back(peer); return true; } virtual void addPeer(const std::vector >& peers) { - std::copy(peers.begin(), peers.end(), back_inserter(this->peers)); + unusedPeers.insert(unusedPeers.end(), peers.begin(), peers.end()); } - virtual const std::deque >& getPeers() { - return peers; - } - - virtual size_t countPeer() const + const std::deque >& getUnusedPeers() { - return peers.size(); + return unusedPeers; + } + + virtual size_t countAllPeer() const + { + return unusedPeers.size() + usedPeers.size(); } virtual const std::deque >& getDroppedPeers() { @@ -45,10 +48,6 @@ public: droppedPeers.push_back(peer); } - virtual SharedHandle getUnusedPeer() { - return SharedHandle(); - } - virtual bool isPeerAvailable() { return false; } @@ -71,6 +70,11 @@ public: { } + virtual SharedHandle checkoutPeer(cuid_t cuid) + { + return SharedHandle(); + } + virtual void returnPeer(const SharedHandle& peer) { } diff --git a/test/PeerTest.cc b/test/PeerTest.cc index 484f0427..c88078e2 100644 --- a/test/PeerTest.cc +++ b/test/PeerTest.cc @@ -8,8 +8,6 @@ class PeerTest:public CppUnit::TestFixture { CPPUNIT_TEST_SUITE(PeerTest); CPPUNIT_TEST(testPeerAllowedIndexSet); CPPUNIT_TEST(testAmAllowedIndexSet); - CPPUNIT_TEST(testGetId); - CPPUNIT_TEST(testOperatorEqual); CPPUNIT_TEST(testCountSeeder); CPPUNIT_TEST_SUITE_END(); private: @@ -22,8 +20,6 @@ public: void testPeerAllowedIndexSet(); void testAmAllowedIndexSet(); - void testGetId(); - void testOperatorEqual(); void testCountSeeder(); }; @@ -42,22 +38,6 @@ void PeerTest::testAmAllowedIndexSet() { CPPUNIT_ASSERT(peer->isInAmAllowedIndexSet(0)); } -void PeerTest::testGetId() { - CPPUNIT_ASSERT_EQUAL(std::string("localhost(6969)"), peer->getID()); -} - -void PeerTest::testOperatorEqual() -{ - CPPUNIT_ASSERT(Peer("localhost", 6881) == Peer("localhost", 6881)); - - { - Peer p1("localhost", 6881); - Peer p2("localhsot", 0); - p2.setPort(6881); - CPPUNIT_ASSERT(p1 != p2); - } -} - void PeerTest::testCountSeeder() { std::vector > peers(5); diff --git a/test/UTPexExtensionMessageTest.cc b/test/UTPexExtensionMessageTest.cc index 422caffb..04412fe7 100644 --- a/test/UTPexExtensionMessageTest.cc +++ b/test/UTPexExtensionMessageTest.cc @@ -154,24 +154,24 @@ void UTPexExtensionMessageTest::testDoReceivedAction() msg.doReceivedAction(); - CPPUNIT_ASSERT_EQUAL((size_t)4, peerStorage_->getPeers().size()); + CPPUNIT_ASSERT_EQUAL((size_t)4, peerStorage_->getUnusedPeers().size()); { - SharedHandle p = peerStorage_->getPeers()[0]; + SharedHandle p = peerStorage_->getUnusedPeers()[0]; CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.1"), p->getIPAddress()); CPPUNIT_ASSERT_EQUAL((uint16_t)6881, p->getPort()); } { - SharedHandle p = peerStorage_->getPeers()[1]; + SharedHandle p = peerStorage_->getUnusedPeers()[1]; CPPUNIT_ASSERT_EQUAL(std::string("1002:1035:4527:3546:7854:1237:3247:3217"), p->getIPAddress()); CPPUNIT_ASSERT_EQUAL((uint16_t)9999, p->getPort()); } { - SharedHandle p = peerStorage_->getPeers()[2]; + SharedHandle p = peerStorage_->getUnusedPeers()[2]; CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.2"), p->getIPAddress()); } { - SharedHandle p = peerStorage_->getPeers()[3]; + SharedHandle p = peerStorage_->getUnusedPeers()[3]; CPPUNIT_ASSERT_EQUAL(std::string("2001:db8:bd05:1d2:288a:1fc0:1:10ee"), p->getIPAddress()); }