2010-08-04 Tatsuhiro Tsujikawa <t-tujikawa@users.sourceforge.net>

Rewritten ut_pex peer selection.
	* src/DefaultBtInteractive.cc
	* src/DefaultPeerStorage.cc
	* src/DefaultPeerStorage.h
	* src/PeerStorage.h
	* src/UTPexExtensionMessage.h
	* test/DefaultPeerStorageTest.cc
	* test/MockPeerStorage.h
	* test/UTPexExtensionMessageTest.cc
pull/1/head
Tatsuhiro Tsujikawa 2010-08-04 12:25:46 +00:00
parent 2bd5020f81
commit 2176b68116
9 changed files with 71 additions and 20 deletions

View File

@ -1,3 +1,15 @@
2010-08-04 Tatsuhiro Tsujikawa <t-tujikawa@users.sourceforge.net>
Rewritten ut_pex peer selection.
* src/DefaultBtInteractive.cc
* src/DefaultPeerStorage.cc
* src/DefaultPeerStorage.h
* src/PeerStorage.h
* src/UTPexExtensionMessage.h
* test/DefaultPeerStorageTest.cc
* test/MockPeerStorage.h
* test/UTPexExtensionMessageTest.cc
2010-08-03 Tatsuhiro Tsujikawa <t-tujikawa@users.sourceforge.net>
Added bittorrent::packcompact() which replaces

View File

@ -475,26 +475,27 @@ void DefaultBtInteractive::addPeerExchangeMessage()
difference(global::wallclock) >= UTPexExtensionMessage::DEFAULT_INTERVAL) {
UTPexExtensionMessageHandle m
(new UTPexExtensionMessage(peer_->getExtensionMessageID("ut_pex")));
const std::deque<SharedHandle<Peer> >& peers = peerStorage_->getPeers();
{
for(std::deque<SharedHandle<Peer> >::const_iterator i =
peers.begin(), eoi = peers.end();
i != eoi && !m->freshPeersAreFull(); ++i) {
if(peer_->getIPAddress() != (*i)->getIPAddress()) {
m->addFreshPeer(*i);
}
std::vector<SharedHandle<Peer> > activePeers;
peerStorage_->getActivePeers(activePeers);
for(std::vector<SharedHandle<Peer> >::const_iterator i =
activePeers.begin(), eoi = activePeers.end();
i != eoi && !m->freshPeersAreFull(); ++i) {
if(peer_->getIPAddress() != (*i)->getIPAddress()) {
m->addFreshPeer(*i);
}
}
{
for(std::deque<SharedHandle<Peer> >::const_reverse_iterator i =
peers.rbegin(), eoi = peers.rend();
i != eoi && !m->droppedPeersAreFull();
++i) {
if(peer_->getIPAddress() != (*i)->getIPAddress()) {
m->addDroppedPeer(*i);
}
const std::deque<SharedHandle<Peer> >& droppedPeers =
peerStorage_->getDroppedPeers();
for(std::deque<SharedHandle<Peer> >::const_iterator i =
droppedPeers.begin(), eoi = droppedPeers.end();
i != eoi && !m->droppedPeersAreFull();
++i) {
if(peer_->getIPAddress() != (*i)->getIPAddress()) {
m->addDroppedPeer(*i);
}
}
BtMessageHandle msg = messageFactory_->createBtExtendedMessage(m);
dispatcher_->addMessageToQueue(msg);
pexTimer_ = global::wallclock;

View File

@ -124,11 +124,24 @@ void DefaultPeerStorage::addPeer(const std::vector<SharedHandle<Peer> >& peers)
}
}
void DefaultPeerStorage::addDroppedPeer(const SharedHandle<Peer>& peer)
{
droppedPeers_.push_front(peer);
if(droppedPeers_.size() > 50) {
droppedPeers_.pop_back();
}
}
const std::deque<SharedHandle<Peer> >& DefaultPeerStorage::getPeers()
{
return peers_;
}
const std::deque<SharedHandle<Peer> >& DefaultPeerStorage::getDroppedPeers()
{
return droppedPeers_;
}
class FindFinePeer {
public:
bool operator()(const SharedHandle<Peer>& peer) const {
@ -295,6 +308,10 @@ void DefaultPeerStorage::onReturningPeer(const SharedHandle<Peer>& peer)
removedPeerSessionUploadLength_ += removedStat.getSessionUploadLength();
cachedTransferStat_ -= removedStat;
if(!peer->isIncomingPeer()) {
peer->startBadCondition();
addDroppedPeer(peer);
}
// Execute choking algorithm if unchoked and interested peer is
// disconnected.
if(!peer->amChoking() && peer->peerInterested()) {

View File

@ -54,6 +54,7 @@ private:
SharedHandle<BtRuntime> btRuntime_;
SharedHandle<PieceStorage> pieceStorage_;
std::deque<SharedHandle<Peer> > peers_;
std::deque<SharedHandle<Peer> > droppedPeers_;
Logger* logger_;
uint64_t removedPeerSessionDownloadLength_;
uint64_t removedPeerSessionUploadLength_;
@ -68,6 +69,8 @@ private:
TransferStat cachedTransferStat_;
bool isPeerAlreadyAdded(const SharedHandle<Peer>& peer);
void addDroppedPeer(const SharedHandle<Peer>& peer);
public:
DefaultPeerStorage();
@ -85,6 +88,8 @@ public:
virtual const std::deque<SharedHandle<Peer> >& getPeers();
virtual const std::deque<SharedHandle<Peer> >& getDroppedPeers();
virtual bool isPeerAvailable();
virtual void getActivePeers(std::vector<SharedHandle<Peer> >& peers);

View File

@ -67,6 +67,11 @@ public:
*/
virtual const std::deque<SharedHandle<Peer> >& getPeers() = 0;
/**
* Returns internal dropped peer list.
*/
virtual const std::deque<SharedHandle<Peer> >& getDroppedPeers() = 0;
/**
* Returns one of the unused peers.
*/

View File

@ -130,9 +130,9 @@ public:
static const time_t DEFAULT_INTERVAL = 60;
static const size_t DEFAULT_MAX_FRESH_PEER = 30;
static const size_t DEFAULT_MAX_FRESH_PEER = 50;
static const size_t DEFAULT_MAX_DROPPED_PEER = 10;
static const size_t DEFAULT_MAX_DROPPED_PEER = 50;
};
typedef SharedHandle<UTPexExtensionMessage> UTPexExtensionMessageHandle;

View File

@ -229,6 +229,8 @@ void DefaultPeerStorageTest::testReturnPeer()
ps.returnPeer(peer1); // peer1 is removed from the container
CPPUNIT_ASSERT_EQUAL((size_t)1, ps.getPeers().size());
CPPUNIT_ASSERT(std::find(ps.getPeers().begin(), ps.getPeers().end(), peer1) == ps.getPeers().end());
CPPUNIT_ASSERT_EQUAL((size_t)2, ps.getDroppedPeers().size());
}
void DefaultPeerStorageTest::testOnErasingPeer()

View File

@ -13,6 +13,7 @@ class MockPeerStorage : public PeerStorage {
private:
TransferStat stat;
std::deque<SharedHandle<Peer> > peers;
std::deque<SharedHandle<Peer> > droppedPeers;
std::vector<SharedHandle<Peer> > activePeers;
int numChokeExecuted_;
public:
@ -32,6 +33,14 @@ public:
return peers;
}
virtual const std::deque<SharedHandle<Peer> >& getDroppedPeers() {
return droppedPeers;
}
void addDroppedPeer(const SharedHandle<Peer>& peer) {
droppedPeers.push_back(peer);
}
virtual SharedHandle<Peer> getUnusedPeer() {
return SharedHandle<Peer>();
}

View File

@ -228,7 +228,7 @@ void UTPexExtensionMessageTest::testAddDroppedPeer()
void UTPexExtensionMessageTest::testFreshPeersAreFull()
{
UTPexExtensionMessage msg(1);
CPPUNIT_ASSERT_EQUAL((size_t)30, msg.getMaxFreshPeer());
CPPUNIT_ASSERT_EQUAL((size_t)50, msg.getMaxFreshPeer());
msg.setMaxFreshPeer(2);
SharedHandle<Peer> p1(new Peer("192.168.0.1", 6881));
CPPUNIT_ASSERT(msg.addFreshPeer(p1));
@ -244,7 +244,7 @@ void UTPexExtensionMessageTest::testFreshPeersAreFull()
void UTPexExtensionMessageTest::testDroppedPeersAreFull()
{
UTPexExtensionMessage msg(1);
CPPUNIT_ASSERT_EQUAL((size_t)10, msg.getMaxDroppedPeer());
CPPUNIT_ASSERT_EQUAL((size_t)50, msg.getMaxDroppedPeer());
msg.setMaxDroppedPeer(2);
SharedHandle<Peer> p1(new Peer("192.168.0.1", 6881));
p1->startBadCondition();