Refactor peer list management in DefaultPeerStorage

Peer list is now divided into 2: unusedPeers_ and usedPeers_.
Duplicate check is done using std::set by comparing pair of IP address
and port. For this, only IP address and port given to the Peer
constructor are used. In other words, TCP port received from extended
message is not used for this purpose.
pull/43/head
Tatsuhiro Tsujikawa 2013-01-26 17:50:06 +09:00
parent 8524ac3806
commit ba69f5c0c3
19 changed files with 292 additions and 375 deletions

View File

@ -115,13 +115,8 @@ bool ActivePeerConnectionCommand::execute() {
numConnection = numNewConnection_;
}
for(; numConnection > 0; --numConnection) {
SharedHandle<Peer> peer = peerStorage_->getUnusedPeer();
if(!peer) {
break;
}
connectToPeer(peer);
}
makeNewConnections(numConnection);
if(btRuntime_->getConnections() == 0 &&
!pieceStorage_->downloadFinished()) {
btAnnounce_->overrideMinInterval(BtAnnounce::DEFAULT_ANNOUNCE_INTERVAL);
@ -132,19 +127,25 @@ bool ActivePeerConnectionCommand::execute() {
return false;
}
void ActivePeerConnectionCommand::connectToPeer(const SharedHandle<Peer>& peer)
void ActivePeerConnectionCommand::makeNewConnections(int num)
{
peer->usedBy(e_->newCUID());
PeerInitiateConnectionCommand* command =
new PeerInitiateConnectionCommand(peer->usedBy(), requestGroup_, peer, e_,
for(; num && peerStorage_->isPeerAvailable(); --num) {
cuid_t ncuid = e_->newCUID();
SharedHandle<Peer> peer = peerStorage_->checkoutPeer(ncuid);
// sanity check
if(!peer) {
break;
}
PeerInitiateConnectionCommand* command;
command = new PeerInitiateConnectionCommand(ncuid, requestGroup_, peer, e_,
btRuntime_);
command->setPeerStorage(peerStorage_);
command->setPieceStorage(pieceStorage_);
e_->addCommand(command);
A2_LOG_INFO(fmt(MSG_CONNECTING_TO_PEER,
getCuid(),
A2_LOG_INFO(fmt(MSG_CONNECTING_TO_PEER, getCuid(),
peer->getIPAddress().c_str()));
}
}
void ActivePeerConnectionCommand::setBtRuntime
(const SharedHandle<BtRuntime>& btRuntime)

View File

@ -71,7 +71,7 @@ public:
virtual bool execute();
void connectToPeer(const SharedHandle<Peer>& peer);
void makeNewConnections(int num);
void setNumNewConnection(int numNewConnection)
{

View File

@ -112,11 +112,11 @@ bool DHTGetPeersCommand::execute()
lastGetPeerTime_ = global::wallclock();
if(numRetry_ < MAX_RETRIES &&
(btRuntime_->getMaxPeers() == 0 ||
btRuntime_->getMaxPeers() > peerStorage_->countPeer())) {
btRuntime_->getMaxPeers() > peerStorage_->countAllPeer())) {
++numRetry_;
A2_LOG_DEBUG(fmt("Too few peers. peers=%lu, max_peers=%d."
" Try again(%d)",
static_cast<unsigned long>(peerStorage_->countPeer()),
static_cast<unsigned long>(peerStorage_->countAllPeer()),
btRuntime_->getMaxPeers(),
numRetry_));
} else {

View File

@ -53,7 +53,7 @@ namespace aria2 {
namespace {
const size_t MAX_PEER_LIST_SIZE = 1024;
const size_t MAX_PEER_LIST_SIZE = 128;
const size_t MAX_PEER_LIST_UPDATE = 100;
} // namespace
@ -69,30 +69,27 @@ DefaultPeerStorage::~DefaultPeerStorage()
{
delete seederStateChoke_;
delete leecherStateChoke_;
assert(uniqPeers_.size() == unusedPeers_.size() + usedPeers_.size());
}
namespace {
class FindIdenticalPeer {
private:
SharedHandle<Peer> peer_;
public:
FindIdenticalPeer(const SharedHandle<Peer>& peer):peer_(peer) {}
bool operator()(const SharedHandle<Peer>& peer) const {
return (*peer_ == *peer) ||
((peer_->getIPAddress() == peer->getIPAddress()) &&
(peer_->getPort() == peer->getPort()));
size_t DefaultPeerStorage::countAllPeer() const
{
return unusedPeers_.size() + usedPeers_.size();
}
};
} // namespace
bool DefaultPeerStorage::isPeerAlreadyAdded(const SharedHandle<Peer>& peer)
{
return std::find_if(peers_.begin(), peers_.end(),
FindIdenticalPeer(peer)) != peers_.end();
return uniqPeers_.count(std::make_pair(peer->getIPAddress(),
peer->getOrigPort()));
}
bool DefaultPeerStorage::addPeer(const SharedHandle<Peer>& peer) {
void DefaultPeerStorage::addUniqPeer(const SharedHandle<Peer>& peer)
{
uniqPeers_.insert(std::make_pair(peer->getIPAddress(), peer->getOrigPort()));
}
bool DefaultPeerStorage::addPeer(const SharedHandle<Peer>& peer)
{
if(isPeerAlreadyAdded(peer)) {
A2_LOG_DEBUG(fmt("Adding %s:%u is rejected because it has been already"
" added.",
@ -104,13 +101,14 @@ bool DefaultPeerStorage::addPeer(const SharedHandle<Peer>& peer) {
peer->getIPAddress().c_str(), peer->getPort()));
return false;
}
const size_t peerListSize = peers_.size();
const size_t peerListSize = unusedPeers_.size();
if(peerListSize >= maxPeerListSize_) {
deleteUnusedPeer(peerListSize-maxPeerListSize_+1);
}
peers_.push_front(peer);
A2_LOG_DEBUG(fmt("Now peer list contains %lu peers",
static_cast<unsigned long>(peers_.size())));
unusedPeers_.push_front(peer);
addUniqPeer(peer);
A2_LOG_DEBUG(fmt("Now unused peer list contains %lu peers",
static_cast<unsigned long>(unusedPeers_.size())));
return true;
}
@ -134,28 +132,35 @@ void DefaultPeerStorage::addPeer(const std::vector<SharedHandle<Peer> >& peers)
A2_LOG_DEBUG(fmt(MSG_ADDING_PEER,
peer->getIPAddress().c_str(), peer->getPort()));
}
peers_.push_front(peer);
unusedPeers_.push_front(peer);
addUniqPeer(peer);
++added;
}
const size_t peerListSize = peers_.size();
const size_t peerListSize = unusedPeers_.size();
if(peerListSize > maxPeerListSize_) {
deleteUnusedPeer(peerListSize-maxPeerListSize_);
}
A2_LOG_DEBUG(fmt("Now peer list contains %lu peers",
static_cast<unsigned long>(peers_.size())));
A2_LOG_DEBUG(fmt("Now unused peer list contains %lu peers",
static_cast<unsigned long>(unusedPeers_.size())));
}
void DefaultPeerStorage::addDroppedPeer(const SharedHandle<Peer>& peer)
{
// TODO Make unique
droppedPeers_.push_front(peer);
if(droppedPeers_.size() > 50) {
droppedPeers_.pop_back();
}
}
const std::deque<SharedHandle<Peer> >& DefaultPeerStorage::getPeers()
const std::deque<SharedHandle<Peer> >& DefaultPeerStorage::getUnusedPeers()
{
return peers_;
return unusedPeers_;
}
const PeerSet& DefaultPeerStorage::getUsedPeers()
{
return usedPeers_;
}
const std::deque<SharedHandle<Peer> >& DefaultPeerStorage::getDroppedPeers()
@ -163,57 +168,8 @@ const std::deque<SharedHandle<Peer> >& DefaultPeerStorage::getDroppedPeers()
return droppedPeers_;
}
namespace {
class FindFinePeer {
public:
bool operator()(const SharedHandle<Peer>& peer) const {
return peer->unused() && peer->isGood();
}
};
} // namespace
SharedHandle<Peer> DefaultPeerStorage::getUnusedPeer() {
std::deque<SharedHandle<Peer> >::const_iterator itr =
std::find_if(peers_.begin(), peers_.end(), FindFinePeer());
if(itr == peers_.end()) {
return SharedHandle<Peer>();
} else {
return *itr;
}
}
namespace {
class FindPeer {
private:
std::string ipaddr;
uint16_t port;
public:
FindPeer(const std::string& ipaddr, uint16_t port):
ipaddr(ipaddr), port(port) {}
bool operator()(const SharedHandle<Peer>& peer) const {
return ipaddr == peer->getIPAddress() && port == peer->getPort();
}
};
} // namespace
SharedHandle<Peer> DefaultPeerStorage::getPeer(const std::string& ipaddr,
uint16_t port) const {
std::deque<SharedHandle<Peer> >::const_iterator itr =
std::find_if(peers_.begin(), peers_.end(), FindPeer(ipaddr, port));
if(itr == peers_.end()) {
return SharedHandle<Peer>();
} else {
return *itr;
}
}
size_t DefaultPeerStorage::countPeer() const {
return peers_.size();
}
bool DefaultPeerStorage::isPeerAvailable() {
return getUnusedPeer();
return !unusedPeers_.empty();
}
namespace {
@ -236,7 +192,8 @@ public:
void DefaultPeerStorage::getActivePeers
(std::vector<SharedHandle<Peer> >& activePeers)
{
std::for_each(peers_.begin(), peers_.end(), CollectActivePeer(activePeers));
std::for_each(usedPeers_.begin(), usedPeers_.end(),
CollectActivePeer(activePeers));
}
bool DefaultPeerStorage::isBadPeer(const std::string& ipaddr)
@ -274,21 +231,32 @@ void DefaultPeerStorage::addBadPeer(const std::string& ipaddr)
}
void DefaultPeerStorage::deleteUnusedPeer(size_t delSize) {
std::deque<SharedHandle<Peer> > temp;
for(std::deque<SharedHandle<Peer> >::const_reverse_iterator itr =
peers_.rbegin(), eoi = peers_.rend(); itr != eoi; ++itr) {
const SharedHandle<Peer>& p = *itr;
if(p->unused() && delSize > 0) {
onErasingPeer(p);
--delSize;
} else {
temp.push_front(p);
for(; delSize > 0 && !unusedPeers_.empty(); --delSize) {
onErasingPeer(unusedPeers_.back());
unusedPeers_.pop_back();
}
}
peers_.swap(temp);
}
void DefaultPeerStorage::onErasingPeer(const SharedHandle<Peer>& peer) {}
SharedHandle<Peer> DefaultPeerStorage::checkoutPeer(cuid_t cuid)
{
if(!isPeerAvailable()) {
return SharedHandle<Peer>();
}
SharedHandle<Peer> peer = unusedPeers_.front();
unusedPeers_.pop_front();
peer->usedBy(cuid);
usedPeers_.insert(peer);
A2_LOG_DEBUG(fmt("Checkout peer %s:%u to CUID#%"PRId64,
peer->getIPAddress().c_str(), peer->getPort(),
peer->usedBy()));
return peer;
}
void DefaultPeerStorage::onErasingPeer(const SharedHandle<Peer>& peer)
{
uniqPeers_.erase(std::make_pair(peer->getIPAddress(),
peer->getOrigPort()));
}
void DefaultPeerStorage::onReturningPeer(const SharedHandle<Peer>& peer)
{
@ -303,20 +271,20 @@ void DefaultPeerStorage::onReturningPeer(const SharedHandle<Peer>& peer)
executeChoke();
}
}
peer->usedBy(0);
}
void DefaultPeerStorage::returnPeer(const SharedHandle<Peer>& peer)
{
std::deque<SharedHandle<Peer> >::iterator itr =
std::find_if(peers_.begin(), peers_.end(), derefEqual(peer));
if(itr == peers_.end()) {
A2_LOG_DEBUG(fmt("Cannot find peer %s:%u in PeerStorage.",
peer->getIPAddress().c_str(), peer->getPort()));
} else {
peers_.erase(itr);
A2_LOG_DEBUG(fmt("Peer %s:%u returned from CUID#%"PRId64,
peer->getIPAddress().c_str(), peer->getPort(),
peer->usedBy()));
if(usedPeers_.erase(peer)) {
onReturningPeer(peer);
onErasingPeer(peer);
} else {
A2_LOG_DEBUG(fmt("Cannot find peer %s:%u in usedPeers_",
peer->getIPAddress().c_str(), peer->getPort()));
}
}

View File

@ -39,8 +39,10 @@
#include <string>
#include <map>
#include <set>
#include "TimerA2.h"
#include "a2functional.h"
namespace aria2 {
@ -49,12 +51,23 @@ class BtSeederStateChoke;
class BtLeecherStateChoke;
class PieceStorage;
typedef std::set<SharedHandle<Peer>, RefLess<Peer> > PeerSet;
class DefaultPeerStorage : public PeerStorage {
private:
SharedHandle<BtRuntime> btRuntime_;
SharedHandle<PieceStorage> pieceStorage_;
size_t maxPeerListSize_;
std::deque<SharedHandle<Peer> > peers_;
// This contains ip address and port pair and is used to ensure that
// no duplicate peers are stored.
std::set<std::pair<std::string, uint16_t> > uniqPeers_;
// Unused (not connected) peers, sorted by last added.
std::deque<SharedHandle<Peer> > unusedPeers_;
// The set of used peers. Some of them are not connected yet. To
// know it is connected or not, call Peer::isActive().
PeerSet usedPeers_;
std::deque<SharedHandle<Peer> > droppedPeers_;
BtSeederStateChoke* seederStateChoke_;
@ -66,6 +79,7 @@ private:
Timer lastBadPeerCleaned_;
bool isPeerAlreadyAdded(const SharedHandle<Peer>& peer);
void addUniqPeer(const SharedHandle<Peer>& peer);
void addDroppedPeer(const SharedHandle<Peer>& peer);
public:
@ -73,17 +87,18 @@ public:
virtual ~DefaultPeerStorage();
// TODO We need addAndCheckoutPeer for incoming peers
virtual bool addPeer(const SharedHandle<Peer>& peer);
virtual size_t countPeer() const;
virtual SharedHandle<Peer> getUnusedPeer();
virtual size_t countAllPeer() const;
SharedHandle<Peer> getPeer(const std::string& ipaddr, uint16_t port) const;
virtual void addPeer(const std::vector<SharedHandle<Peer> >& peers);
virtual const std::deque<SharedHandle<Peer> >& getPeers();
const std::deque<SharedHandle<Peer> >& getUnusedPeers();
const PeerSet& getUsedPeers();
virtual const std::deque<SharedHandle<Peer> >& getDroppedPeers();
@ -95,6 +110,8 @@ public:
virtual void addBadPeer(const std::string& ipaddr);
virtual SharedHandle<Peer> checkoutPeer(cuid_t cuid);
virtual void returnPeer(const SharedHandle<Peer>& peer);
virtual bool chokeRoundIntervalElapsed();

View File

@ -198,16 +198,20 @@ bool InitiatorMSEHandshakeCommand::executeInternal() {
void InitiatorMSEHandshakeCommand::tryNewPeer()
{
if(peerStorage_->isPeerAvailable() && btRuntime_->lessThanEqMinPeers()) {
SharedHandle<Peer> peer = peerStorage_->getUnusedPeer();
peer->usedBy(getDownloadEngine()->newCUID());
PeerInitiateConnectionCommand* command =
new PeerInitiateConnectionCommand(peer->usedBy(), requestGroup_, peer,
getDownloadEngine(), btRuntime_);
cuid_t ncuid = getDownloadEngine()->newCUID();
SharedHandle<Peer> peer = peerStorage_->checkoutPeer(ncuid);
// sanity check
if(peer) {
PeerInitiateConnectionCommand* command;
command = new PeerInitiateConnectionCommand(ncuid, requestGroup_, peer,
getDownloadEngine(),
btRuntime_);
command->setPeerStorage(peerStorage_);
command->setPieceStorage(pieceStorage_);
getDownloadEngine()->addCommand(command);
}
}
}
bool InitiatorMSEHandshakeCommand::prepareForNextPeer(time_t wait)
{

View File

@ -45,12 +45,11 @@
namespace aria2 {
#define BAD_CONDITION_INTERVAL 10
Peer::Peer(std::string ipaddr, uint16_t port, bool incoming):
ipaddr_(ipaddr),
port_(port),
id_(fmt("%s(%u)", ipaddr_.c_str(), port_)),
origPort_(port),
cuid_(0),
firstContactTime_(global::wallclock()),
badConditionStartTime_(0),
seeder_(false),
@ -60,7 +59,6 @@ Peer::Peer(std::string ipaddr, uint16_t port, bool incoming):
disconnectedGracefully_(false)
{
memset(peerId_, 0, PEER_ID_LENGTH);
resetStatus();
}
Peer::~Peer()
@ -98,10 +96,6 @@ void Peer::setPeerId(const unsigned char* peerId)
memcpy(peerId_, peerId, PEER_ID_LENGTH);
}
void Peer::resetStatus() {
cuid_ = 0;
}
bool Peer::amChoking() const
{
assert(res_);
@ -328,12 +322,6 @@ void Peer::startBadCondition()
badConditionStartTime_ = global::wallclock();
}
bool Peer::isGood() const
{
return badConditionStartTime_.
difference(global::wallclock()) >= BAD_CONDITION_INTERVAL;
}
uint8_t Peer::getExtensionMessageID(int key) const
{
assert(res_);

View File

@ -61,8 +61,9 @@ private:
// true, then this port is not a port the peer is listening to and
// we cannot connect to it.
uint16_t port_;
std::string id_;
// This is the port number passed in the constructor arguments. This
// is used to distinguish peer identity.
uint16_t origPort_;
cuid_t cuid_;
@ -94,18 +95,6 @@ public:
~Peer();
bool operator==(const Peer& p)
{
return id_ == p.id_;
}
bool operator!=(const Peer& p)
{
return !(*this == p);
}
void resetStatus();
const std::string& getIPAddress() const
{
return ipaddr_;
@ -121,6 +110,11 @@ public:
port_ = port;
}
uint16_t getOrigPort() const
{
return origPort_;
}
void usedBy(cuid_t cuid);
cuid_t usedBy() const
@ -151,15 +145,8 @@ public:
return seeder_;
}
const std::string& getID() const
{
return id_;
}
void startBadCondition();
bool isGood() const;
void allocateSessionResource(int32_t pieceLength, int64_t totalLength);
void reconfigureSessionResource(int32_t pieceLength, int64_t totalLength);

View File

@ -105,15 +105,19 @@ bool PeerInitiateConnectionCommand::executeInternal() {
// TODO this method removed when PeerBalancerCommand is implemented
bool PeerInitiateConnectionCommand::prepareForNextPeer(time_t wait) {
if(peerStorage_->isPeerAvailable() && btRuntime_->lessThanEqMinPeers()) {
SharedHandle<Peer> peer = peerStorage_->getUnusedPeer();
peer->usedBy(getDownloadEngine()->newCUID());
PeerInitiateConnectionCommand* command =
new PeerInitiateConnectionCommand(peer->usedBy(), requestGroup_, peer,
getDownloadEngine(), btRuntime_);
cuid_t ncuid = getDownloadEngine()->newCUID();
SharedHandle<Peer> peer = peerStorage_->checkoutPeer(ncuid);
// sanity check
if(peer) {
PeerInitiateConnectionCommand* command;
command = new PeerInitiateConnectionCommand(ncuid, requestGroup_, peer,
getDownloadEngine(),
btRuntime_);
command->setPeerStorage(peerStorage_);
command->setPieceStorage(pieceStorage_);
getDownloadEngine()->addCommand(command);
}
}
return true;
}

View File

@ -391,15 +391,19 @@ bool PeerInteractionCommand::executeInternal() {
// TODO this method removed when PeerBalancerCommand is implemented
bool PeerInteractionCommand::prepareForNextPeer(time_t wait) {
if(peerStorage_->isPeerAvailable() && btRuntime_->lessThanEqMinPeers()) {
SharedHandle<Peer> peer = peerStorage_->getUnusedPeer();
peer->usedBy(getDownloadEngine()->newCUID());
PeerInitiateConnectionCommand* command =
new PeerInitiateConnectionCommand
(peer->usedBy(), requestGroup_, peer, getDownloadEngine(), btRuntime_);
cuid_t ncuid = getDownloadEngine()->newCUID();
SharedHandle<Peer> peer = peerStorage_->checkoutPeer(ncuid);
// sanity check
if(peer) {
PeerInitiateConnectionCommand* command;
command = new PeerInitiateConnectionCommand(ncuid, requestGroup_, peer,
getDownloadEngine(),
btRuntime_);
command->setPeerStorage(peerStorage_);
command->setPieceStorage(pieceStorage_);
getDownloadEngine()->addCommand(command);
}
}
return true;
}

View File

@ -138,8 +138,10 @@ bool PeerReceiveHandshakeCommand::executeInternal()
if((!pieceStorage->downloadFinished() &&
stat.calculateDownloadSpeed() < thresholdSpeed) ||
btRuntime->lessThanMaxPeers()) {
if(peerStorage->addPeer(getPeer())) {
getPeer()->usedBy(getCuid());
// TODO addPeer and checkoutPeer must be "atomic", in a sense
// that the added peer must be checked out.
if(peerStorage->addPeer(getPeer()) &&
peerStorage->checkoutPeer(getCuid())) {
PeerInteractionCommand* command =
new PeerInteractionCommand
(getCuid(),

View File

@ -42,6 +42,7 @@
#include "SharedHandle.h"
#include "TransferStat.h"
#include "Command.h"
namespace aria2 {
@ -63,26 +64,15 @@ public:
virtual void addPeer(const std::vector<SharedHandle<Peer> >& peers) = 0;
/**
* Returns internal peer list.
* Returns the number of peers, including used and unused ones.
*/
virtual const std::deque<SharedHandle<Peer> >& getPeers() = 0;
/**
* Returns the number of peers.
*/
virtual size_t countPeer() const = 0;
virtual size_t countAllPeer() const = 0;
/**
* Returns internal dropped peer list.
*/
virtual const std::deque<SharedHandle<Peer> >& getDroppedPeers() = 0;
/**
* Returns one of the unused peers.
*/
virtual SharedHandle<Peer> getUnusedPeer() = 0;
/**
* Returns true if at least one unused peer exists.
* Otherwise returns false.
@ -105,6 +95,13 @@ public:
*/
virtual void addBadPeer(const std::string& ipaddr) = 0;
/**
* Moves first peer in unused peer list to used peer set and calls
* Peer::usedBy(cuid). If there is no peer available, returns
* SharedHandle<Peer>().
*/
virtual SharedHandle<Peer> checkoutPeer(cuid_t cuid) = 0;
/**
* Tells PeerStorage object that peer is no longer used in the session.
*/

View File

@ -170,14 +170,18 @@ void TrackerWatcherCommand::processTrackerResponse
(reinterpret_cast<const unsigned char*>(trackerResponse.c_str()),
trackerResponse.size());
while(!btRuntime_->isHalt() && btRuntime_->lessThanMinPeers()) {
SharedHandle<Peer> peer = peerStorage_->getUnusedPeer();
if(!peerStorage_->isPeerAvailable()) {
break;
}
cuid_t ncuid = e_->newCUID();
SharedHandle<Peer> peer = peerStorage_->checkoutPeer(ncuid);
// sanity check
if(!peer) {
break;
}
peer->usedBy(e_->newCUID());
PeerInitiateConnectionCommand* command =
new PeerInitiateConnectionCommand
(peer->usedBy(), requestGroup_, peer, e_, btRuntime_);
PeerInitiateConnectionCommand* command;
command = new PeerInitiateConnectionCommand(ncuid, requestGroup_, peer, e_,
btRuntime_);
command->setPeerStorage(peerStorage_);
command->setPieceStorage(pieceStorage_);
e_->addCommand(command);

View File

@ -344,8 +344,11 @@ void DHTMessageFactoryImplTest::testCreateGetPeersReplyMessage()
CPPUNIT_ASSERT(*nodes[0] == *m->getClosestKNodes()[0]);
CPPUNIT_ASSERT(*nodes[7] == *m->getClosestKNodes()[7]);
CPPUNIT_ASSERT_EQUAL((size_t)4, m->getValues().size());
CPPUNIT_ASSERT(*peers[0] == *m->getValues()[0]);
CPPUNIT_ASSERT(*peers[3] == *m->getValues()[3]);
for(int i = 0; i < 4; ++i) {
CPPUNIT_ASSERT_EQUAL(peers[i]->getIPAddress(),
m->getValues()[i]->getIPAddress());
CPPUNIT_ASSERT_EQUAL(peers[i]->getPort(), m->getValues()[i]->getPort());
}
CPPUNIT_ASSERT_EQUAL(util::toHex(transactionID, DHT_TRANSACTION_ID_LENGTH),
util::toHex(m->getTransactionID()));
} catch(Exception& e) {
@ -416,8 +419,11 @@ void DHTMessageFactoryImplTest::testCreateGetPeersReplyMessage6()
CPPUNIT_ASSERT(*nodes[0] == *m->getClosestKNodes()[0]);
CPPUNIT_ASSERT(*nodes[7] == *m->getClosestKNodes()[7]);
CPPUNIT_ASSERT_EQUAL((size_t)4, m->getValues().size());
CPPUNIT_ASSERT(*peers[0] == *m->getValues()[0]);
CPPUNIT_ASSERT(*peers[3] == *m->getValues()[3]);
for(int i = 0; i < 4; ++i) {
CPPUNIT_ASSERT_EQUAL(peers[i]->getIPAddress(),
m->getValues()[i]->getIPAddress());
CPPUNIT_ASSERT_EQUAL(peers[i]->getPort(), m->getValues()[i]->getPort());
}
CPPUNIT_ASSERT_EQUAL(util::toHex(transactionID, DHT_TRANSACTION_ID_LENGTH),
util::toHex(m->getTransactionID()));
} catch(Exception& e) {

View File

@ -403,10 +403,10 @@ void DefaultBtAnnounceTest::testProcessAnnounceResponse()
CPPUNIT_ASSERT_EQUAL((time_t)1800, an.getMinInterval());
CPPUNIT_ASSERT_EQUAL(100, an.getComplete());
CPPUNIT_ASSERT_EQUAL(200, an.getIncomplete());
CPPUNIT_ASSERT_EQUAL((size_t)2, peerStorage_->getPeers().size());
SharedHandle<Peer> peer = peerStorage_->getPeers()[0];
CPPUNIT_ASSERT_EQUAL((size_t)2, peerStorage_->getUnusedPeers().size());
SharedHandle<Peer> peer = peerStorage_->getUnusedPeers()[0];
CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.1"), peer->getIPAddress());
peer = peerStorage_->getPeers()[1];
peer = peerStorage_->getUnusedPeers()[1];
CPPUNIT_ASSERT_EQUAL(std::string("1002:1035:4527:3546:7854:1237:3247:3217"),
peer->getIPAddress());
}

View File

@ -16,13 +16,12 @@ namespace aria2 {
class DefaultPeerStorageTest:public CppUnit::TestFixture {
CPPUNIT_TEST_SUITE(DefaultPeerStorageTest);
CPPUNIT_TEST(testCountPeer);
CPPUNIT_TEST(testCountAllPeer);
CPPUNIT_TEST(testDeleteUnusedPeer);
CPPUNIT_TEST(testAddPeer);
CPPUNIT_TEST(testGetUnusedPeer);
CPPUNIT_TEST(testIsPeerAvailable);
CPPUNIT_TEST(testActivatePeer);
CPPUNIT_TEST(testCalculateStat);
CPPUNIT_TEST(testGetActivePeers);
CPPUNIT_TEST(testCheckoutPeer);
CPPUNIT_TEST(testReturnPeer);
CPPUNIT_TEST(testOnErasingPeer);
CPPUNIT_TEST(testAddBadPeer);
@ -40,13 +39,12 @@ public:
delete option;
}
void testCountPeer();
void testCountAllPeer();
void testDeleteUnusedPeer();
void testAddPeer();
void testGetUnusedPeer();
void testIsPeerAvailable();
void testActivatePeer();
void testCalculateStat();
void testGetActivePeers();
void testCheckoutPeer();
void testReturnPeer();
void testOnErasingPeer();
void testAddBadPeer();
@ -55,154 +53,91 @@ public:
CPPUNIT_TEST_SUITE_REGISTRATION(DefaultPeerStorageTest);
void DefaultPeerStorageTest::testCountPeer() {
void DefaultPeerStorageTest::testCountAllPeer()
{
DefaultPeerStorage ps;
CPPUNIT_ASSERT_EQUAL((size_t)0, ps.countPeer());
SharedHandle<Peer> peer(new Peer("192.168.0.1", 6889));
CPPUNIT_ASSERT_EQUAL((size_t)0, ps.countAllPeer());
for(int i = 0; i < 2; ++i) {
SharedHandle<Peer> peer(new Peer("192.168.0.1", 6889+i));
ps.addPeer(peer);
CPPUNIT_ASSERT_EQUAL((size_t)1, ps.countPeer());
}
CPPUNIT_ASSERT_EQUAL((size_t)2, ps.countAllPeer());
SharedHandle<Peer> peer = ps.checkoutPeer(1);
CPPUNIT_ASSERT(peer);
CPPUNIT_ASSERT_EQUAL((size_t)2, ps.countAllPeer());
ps.returnPeer(peer);
CPPUNIT_ASSERT_EQUAL((size_t)1, ps.countAllPeer());
}
void DefaultPeerStorageTest::testDeleteUnusedPeer() {
void DefaultPeerStorageTest::testDeleteUnusedPeer()
{
DefaultPeerStorage ps;
SharedHandle<Peer> peer1(new Peer("192.168.0.1", 6889));
SharedHandle<Peer> peer2(new Peer("192.168.0.2", 6889));
SharedHandle<Peer> peer3(new Peer("192.168.0.3", 6889));
ps.addPeer(peer1);
ps.addPeer(peer2);
ps.addPeer(peer3);
CPPUNIT_ASSERT(ps.addPeer(peer1));
CPPUNIT_ASSERT(ps.addPeer(peer2));
CPPUNIT_ASSERT(ps.addPeer(peer3));
ps.deleteUnusedPeer(2);
CPPUNIT_ASSERT_EQUAL((size_t)1, ps.countPeer());
CPPUNIT_ASSERT_EQUAL((size_t)1, ps.getUnusedPeers().size());
CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.3"),
ps.getPeer("192.168.0.3", 6889)->getIPAddress());
ps.addPeer(peer1);
ps.addPeer(peer2);
peer2->usedBy(1);
ps.deleteUnusedPeer(3);
// peer2 has been in use, so it did't deleted.
CPPUNIT_ASSERT_EQUAL((size_t)1, ps.countPeer());
CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.2"),
ps.getPeer("192.168.0.2", 6889)->getIPAddress());
ps.getUnusedPeers()[0]->getIPAddress());
ps.deleteUnusedPeer(100);
CPPUNIT_ASSERT(ps.getUnusedPeers().empty());
}
void DefaultPeerStorageTest::testAddPeer() {
void DefaultPeerStorageTest::testAddPeer()
{
DefaultPeerStorage ps;
SharedHandle<BtRuntime> btRuntime(new BtRuntime());
ps.setMaxPeerListSize(3);
ps.setMaxPeerListSize(2);
ps.setBtRuntime(btRuntime);
SharedHandle<Peer> peer1(new Peer("192.168.0.1", 6889));
SharedHandle<Peer> peer2(new Peer("192.168.0.2", 6889));
SharedHandle<Peer> peer3(new Peer("192.168.0.3", 6889));
ps.addPeer(peer1);
ps.addPeer(peer2);
ps.addPeer(peer3);
CPPUNIT_ASSERT(ps.addPeer(peer1));
CPPUNIT_ASSERT(ps.addPeer(peer2));
CPPUNIT_ASSERT(ps.addPeer(peer3));
CPPUNIT_ASSERT_EQUAL((size_t)3, ps.countPeer());
CPPUNIT_ASSERT_EQUAL((size_t)2, ps.getUnusedPeers().size());
CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.3"),
ps.getUnusedPeers()[0]->getIPAddress());
// this returns false, because peer1 is already in the container
CPPUNIT_ASSERT_EQUAL(false, ps.addPeer(peer1));
// the number of peers doesn't change.
CPPUNIT_ASSERT_EQUAL((size_t)3, ps.countPeer());
SharedHandle<Peer> peer4(new Peer("192.168.0.4", 6889));
peer1->usedBy(1);
CPPUNIT_ASSERT(ps.addPeer(peer4));
// peer2 was deleted. While peer1 is oldest, its cuid is not 0.
CPPUNIT_ASSERT_EQUAL((size_t)3, ps.countPeer());
CPPUNIT_ASSERT(std::find_if(ps.getPeers().begin(), ps.getPeers().end(),
derefEqual(peer2)) == ps.getPeers().end());
SharedHandle<Peer> peer5(new Peer("192.168.0.4", 0));
peer5->setPort(6889);
// this returns false because the peer which has same ip and port
// has already added
CPPUNIT_ASSERT_EQUAL(false, ps.addPeer(peer5));
SharedHandle<Peer> pa[] = {
SharedHandle<Peer>(new Peer("192.168.0.4", 6889)),
SharedHandle<Peer>(new Peer("192.168.0.5", 6889)),
SharedHandle<Peer>(new Peer("192.168.0.6", 6889)),
SharedHandle<Peer>(new Peer("192.168.0.7", 6889)),
SharedHandle<Peer>(new Peer("192.168.0.8", 6889))
};
std::vector<SharedHandle<Peer> > peers(vbegin(pa), vend(pa));
ps.addPeer(peers);
// peers[0] is not added because it has been already added.
// peers[1], peers[2] and peers[3] are going to be added. peers[4]
// is not added because DefaultPeerStorage::addPeer() limits the
// number of peers to add. Finally, unused peers are removed from
// back and size 3 vector is made.
CPPUNIT_ASSERT_EQUAL((size_t)3, ps.countPeer());
CPPUNIT_ASSERT(std::find_if(ps.getPeers().begin(), ps.getPeers().end(),
derefEqual(peers[2])) != ps.getPeers().end());
CPPUNIT_ASSERT(std::find_if(ps.getPeers().begin(), ps.getPeers().end(),
derefEqual(peers[3])) != ps.getPeers().end());
}
void DefaultPeerStorageTest::testGetUnusedPeer() {
DefaultPeerStorage ps;
ps.setBtRuntime(btRuntime);
SharedHandle<Peer> peer1(new Peer("192.168.0.1", 6889));
ps.addPeer(peer1);
CPPUNIT_ASSERT(!ps.addPeer(peer2));
CPPUNIT_ASSERT(ps.addPeer(peer1));
CPPUNIT_ASSERT_EQUAL((size_t)2, ps.getUnusedPeers().size());
CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.1"),
ps.getUnusedPeer()->getIPAddress());
ps.getUnusedPeers()[0]->getIPAddress());
peer1->usedBy(1);
CPPUNIT_ASSERT(!ps.getUnusedPeer());
peer1->resetStatus();
peer1->startBadCondition();
CPPUNIT_ASSERT(!ps.getUnusedPeer());
CPPUNIT_ASSERT_EQUAL(peer1->getIPAddress(),
ps.checkoutPeer(1)->getIPAddress());
CPPUNIT_ASSERT(!ps.addPeer(peer1));
}
void DefaultPeerStorageTest::testIsPeerAvailable() {
DefaultPeerStorage ps;
ps.setBtRuntime(btRuntime);
CPPUNIT_ASSERT_EQUAL(false, ps.isPeerAvailable());
SharedHandle<Peer> peer1(new Peer("192.168.0.1", 6889));
CPPUNIT_ASSERT(!ps.isPeerAvailable());
ps.addPeer(peer1);
CPPUNIT_ASSERT_EQUAL(true, ps.isPeerAvailable());
peer1->usedBy(1);
CPPUNIT_ASSERT_EQUAL(false, ps.isPeerAvailable());
peer1->resetStatus();
peer1->startBadCondition();
CPPUNIT_ASSERT_EQUAL(false, ps.isPeerAvailable());
CPPUNIT_ASSERT(ps.isPeerAvailable());
CPPUNIT_ASSERT(ps.checkoutPeer(1));
CPPUNIT_ASSERT(!ps.isPeerAvailable());
}
void DefaultPeerStorageTest::testActivatePeer() {
void DefaultPeerStorageTest::testGetActivePeers()
{
DefaultPeerStorage ps;
{
std::vector<SharedHandle<Peer> > peers;
ps.getActivePeers(peers);
@ -210,15 +145,13 @@ void DefaultPeerStorageTest::testActivatePeer() {
}
SharedHandle<Peer> peer1(new Peer("192.168.0.1", 6889));
ps.addPeer(peer1);
{
std::vector<SharedHandle<Peer> > activePeers;
ps.getActivePeers(activePeers);
CPPUNIT_ASSERT_EQUAL((size_t)0, activePeers.size());
}
CPPUNIT_ASSERT(ps.checkoutPeer(1));
{
peer1->allocateSessionResource(1024*1024, 1024*1024*10);
@ -229,7 +162,23 @@ void DefaultPeerStorageTest::testActivatePeer() {
}
}
void DefaultPeerStorageTest::testCalculateStat() {
void DefaultPeerStorageTest::testCheckoutPeer()
{
DefaultPeerStorage ps;
SharedHandle<Peer> peers[] = {
SharedHandle<Peer>(new Peer("192.168.0.1", 1000)),
SharedHandle<Peer>(new Peer("192.168.0.2", 1000)),
SharedHandle<Peer>(new Peer("192.168.0.3", 1000))
};
int len = A2_ARRAY_LEN(peers);
for(int i = 0; i < len; ++i) {
ps.addPeer(peers[i]);
}
for(int i = 0; i < len; ++i) {
SharedHandle<Peer> peer = ps.checkoutPeer(i+1);
CPPUNIT_ASSERT_EQUAL(peers[len-i-1]->getIPAddress(), peer->getIPAddress());
}
CPPUNIT_ASSERT(!ps.checkoutPeer(len+1));
}
void DefaultPeerStorageTest::testReturnPeer()
@ -245,17 +194,19 @@ void DefaultPeerStorageTest::testReturnPeer()
ps.addPeer(peer1);
ps.addPeer(peer2);
ps.addPeer(peer3);
for(int i = 1; i <= 3; ++i) {
CPPUNIT_ASSERT(ps.checkoutPeer(i));
}
CPPUNIT_ASSERT_EQUAL((size_t)3, ps.getUsedPeers().size());
ps.returnPeer(peer2); // peer2 removed from the container
CPPUNIT_ASSERT_EQUAL((size_t)2, ps.getPeers().size());
CPPUNIT_ASSERT(std::find_if(ps.getPeers().begin(), ps.getPeers().end(),
derefEqual(peer2)) == ps.getPeers().end());
CPPUNIT_ASSERT_EQUAL((size_t)2, ps.getUsedPeers().size());
CPPUNIT_ASSERT(ps.getUsedPeers().count(peer2) == 0);
CPPUNIT_ASSERT_EQUAL((size_t)1, ps.getDroppedPeers().size());
ps.returnPeer(peer1); // peer1 is removed from the container
CPPUNIT_ASSERT_EQUAL((size_t)1, ps.getPeers().size());
CPPUNIT_ASSERT(std::find_if(ps.getPeers().begin(), ps.getPeers().end(),
derefEqual(peer1)) == ps.getPeers().end());
CPPUNIT_ASSERT_EQUAL((size_t)1, ps.getUsedPeers().size());
CPPUNIT_ASSERT(ps.getUsedPeers().count(peer1) == 0);
CPPUNIT_ASSERT_EQUAL((size_t)1, ps.getDroppedPeers().size());
}

View File

@ -11,7 +11,8 @@ namespace aria2 {
class MockPeerStorage : public PeerStorage {
private:
std::deque<SharedHandle<Peer> > peers;
std::deque<SharedHandle<Peer> > unusedPeers;
std::deque<SharedHandle<Peer> > usedPeers;
std::deque<SharedHandle<Peer> > droppedPeers;
std::vector<SharedHandle<Peer> > activePeers;
int numChokeExecuted_;
@ -19,22 +20,24 @@ public:
MockPeerStorage():numChokeExecuted_(0) {}
virtual ~MockPeerStorage() {}
virtual bool addPeer(const SharedHandle<Peer>& peer) {
peers.push_back(peer);
virtual bool addPeer(const SharedHandle<Peer>& peer)
{
unusedPeers.push_back(peer);
return true;
}
virtual void addPeer(const std::vector<SharedHandle<Peer> >& peers) {
std::copy(peers.begin(), peers.end(), back_inserter(this->peers));
unusedPeers.insert(unusedPeers.end(), peers.begin(), peers.end());
}
virtual const std::deque<SharedHandle<Peer> >& getPeers() {
return peers;
}
virtual size_t countPeer() const
const std::deque<SharedHandle<Peer> >& getUnusedPeers()
{
return peers.size();
return unusedPeers;
}
virtual size_t countAllPeer() const
{
return unusedPeers.size() + usedPeers.size();
}
virtual const std::deque<SharedHandle<Peer> >& getDroppedPeers() {
@ -45,10 +48,6 @@ public:
droppedPeers.push_back(peer);
}
virtual SharedHandle<Peer> getUnusedPeer() {
return SharedHandle<Peer>();
}
virtual bool isPeerAvailable() {
return false;
}
@ -71,6 +70,11 @@ public:
{
}
virtual SharedHandle<Peer> checkoutPeer(cuid_t cuid)
{
return SharedHandle<Peer>();
}
virtual void returnPeer(const SharedHandle<Peer>& peer)
{
}

View File

@ -8,8 +8,6 @@ class PeerTest:public CppUnit::TestFixture {
CPPUNIT_TEST_SUITE(PeerTest);
CPPUNIT_TEST(testPeerAllowedIndexSet);
CPPUNIT_TEST(testAmAllowedIndexSet);
CPPUNIT_TEST(testGetId);
CPPUNIT_TEST(testOperatorEqual);
CPPUNIT_TEST(testCountSeeder);
CPPUNIT_TEST_SUITE_END();
private:
@ -22,8 +20,6 @@ public:
void testPeerAllowedIndexSet();
void testAmAllowedIndexSet();
void testGetId();
void testOperatorEqual();
void testCountSeeder();
};
@ -42,22 +38,6 @@ void PeerTest::testAmAllowedIndexSet() {
CPPUNIT_ASSERT(peer->isInAmAllowedIndexSet(0));
}
void PeerTest::testGetId() {
CPPUNIT_ASSERT_EQUAL(std::string("localhost(6969)"), peer->getID());
}
void PeerTest::testOperatorEqual()
{
CPPUNIT_ASSERT(Peer("localhost", 6881) == Peer("localhost", 6881));
{
Peer p1("localhost", 6881);
Peer p2("localhsot", 0);
p2.setPort(6881);
CPPUNIT_ASSERT(p1 != p2);
}
}
void PeerTest::testCountSeeder()
{
std::vector<SharedHandle<Peer> > peers(5);

View File

@ -154,24 +154,24 @@ void UTPexExtensionMessageTest::testDoReceivedAction()
msg.doReceivedAction();
CPPUNIT_ASSERT_EQUAL((size_t)4, peerStorage_->getPeers().size());
CPPUNIT_ASSERT_EQUAL((size_t)4, peerStorage_->getUnusedPeers().size());
{
SharedHandle<Peer> p = peerStorage_->getPeers()[0];
SharedHandle<Peer> p = peerStorage_->getUnusedPeers()[0];
CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.1"), p->getIPAddress());
CPPUNIT_ASSERT_EQUAL((uint16_t)6881, p->getPort());
}
{
SharedHandle<Peer> p = peerStorage_->getPeers()[1];
SharedHandle<Peer> p = peerStorage_->getUnusedPeers()[1];
CPPUNIT_ASSERT_EQUAL(std::string("1002:1035:4527:3546:7854:1237:3247:3217"),
p->getIPAddress());
CPPUNIT_ASSERT_EQUAL((uint16_t)9999, p->getPort());
}
{
SharedHandle<Peer> p = peerStorage_->getPeers()[2];
SharedHandle<Peer> p = peerStorage_->getUnusedPeers()[2];
CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.2"), p->getIPAddress());
}
{
SharedHandle<Peer> p = peerStorage_->getPeers()[3];
SharedHandle<Peer> p = peerStorage_->getUnusedPeers()[3];
CPPUNIT_ASSERT_EQUAL(std::string("2001:db8:bd05:1d2:288a:1fc0:1:10ee"),
p->getIPAddress());
}