2008-02-11 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>

Bootstrap through node added by port message.
	Currently bootstrap is executed if the number of buckets in routing
	table is 1.
	* src/BtPortMessage.{h, cc}
	* src/DefaultBtMessageFactory.{h, cc}
	* src/PeerInteractionCommand.cc
	* test/BtPortMessageTest.cc
	* test/MockDHTTask.h
pull/1/head
Tatsuhiro Tsujikawa 2008-02-10 15:57:00 +00:00
parent c064a2cd9e
commit 5d6f1c046a
8 changed files with 136 additions and 3 deletions

View File

@ -1,3 +1,14 @@
2008-02-11 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>
Bootstrap through node added by port message.
Currently bootstrap is executed if the number of buckets in routing
table is 1.
* src/BtPortMessage.{h, cc}
* src/DefaultBtMessageFactory.{h, cc}
* src/PeerInteractionCommand.cc
* test/BtPortMessageTest.cc
* test/MockDHTTask.h
2008-02-10 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com> 2008-02-10 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>
Extract the Peer class's member variables, which are only needed after Extract the Peer class's member variables, which are only needed after

View File

@ -40,6 +40,7 @@
#include "Logger.h" #include "Logger.h"
#include "Peer.h" #include "Peer.h"
#include "DHTNode.h" #include "DHTNode.h"
#include "DHTRoutingTable.h"
#include "DHTTaskQueue.h" #include "DHTTaskQueue.h"
#include "DHTTaskFactory.h" #include "DHTTaskFactory.h"
#include "DHTTask.h" #include "DHTTask.h"
@ -75,8 +76,15 @@ void BtPortMessage::doReceivedAction()
SharedHandle<DHTNode> node = new DHTNode(); SharedHandle<DHTNode> node = new DHTNode();
node->setIPAddress(peer->ipaddr); node->setIPAddress(peer->ipaddr);
node->setPort(_port); node->setPort(_port);
{
SharedHandle<DHTTask> task = _taskFactory->createPingTask(node); SharedHandle<DHTTask> task = _taskFactory->createPingTask(node);
_taskQueue->addImmediateTask(task); _taskQueue->addImmediateTask(task);
}
if(_routingTable->countBucket() == 1) {
// initiate bootstrap
logger->info("Dispatch node_lookup since too few buckets.");
_taskQueue->addImmediateTask(_taskFactory->createNodeLookupTask(_localNode->getID()));
}
} else { } else {
logger->info("DHT port message received while localhost didn't declare support it."); logger->info("DHT port message received while localhost didn't declare support it.");
} }
@ -105,6 +113,16 @@ std::string BtPortMessage::toString() const {
return "port port="+Util::uitos(_port); return "port port="+Util::uitos(_port);
} }
void BtPortMessage::setLocalNode(const WeakHandle<DHTNode>& localNode)
{
_localNode = localNode;
}
void BtPortMessage::setRoutingTable(const WeakHandle<DHTRoutingTable>& routingTable)
{
_routingTable = routingTable;
}
void BtPortMessage::setTaskQueue(const WeakHandle<DHTTaskQueue>& taskQueue) void BtPortMessage::setTaskQueue(const WeakHandle<DHTTaskQueue>& taskQueue)
{ {
_taskQueue = taskQueue; _taskQueue = taskQueue;

View File

@ -39,6 +39,8 @@
namespace aria2 { namespace aria2 {
class DHTNode;
class DHTRoutingTable;
class DHTTaskQueue; class DHTTaskQueue;
class DHTTaskFactory; class DHTTaskFactory;
@ -48,6 +50,10 @@ private:
unsigned char* _msg; unsigned char* _msg;
static const size_t MESSAGE_LENGTH = 7; static const size_t MESSAGE_LENGTH = 7;
WeakHandle<DHTNode> _localNode;
WeakHandle<DHTRoutingTable> _routingTable;
WeakHandle<DHTTaskQueue> _taskQueue; WeakHandle<DHTTaskQueue> _taskQueue;
WeakHandle<DHTTaskFactory> _taskFactory; WeakHandle<DHTTaskFactory> _taskFactory;
@ -72,6 +78,10 @@ public:
virtual std::string toString() const; virtual std::string toString() const;
void setLocalNode(const WeakHandle<DHTNode>& localNode);
void setRoutingTable(const WeakHandle<DHTRoutingTable>& routingTable);
void setTaskQueue(const WeakHandle<DHTTaskQueue>& taskQueue); void setTaskQueue(const WeakHandle<DHTTaskQueue>& taskQueue);
void setTaskFactory(const WeakHandle<DHTTaskFactory>& taskFactory); void setTaskFactory(const WeakHandle<DHTTaskFactory>& taskFactory);

View File

@ -178,6 +178,8 @@ DefaultBtMessageFactory::createBtMessage(const unsigned char* data, int32_t data
} }
case BtPortMessage::ID: { case BtPortMessage::ID: {
SharedHandle<BtPortMessage> temp = BtPortMessage::create(data, dataLength); SharedHandle<BtPortMessage> temp = BtPortMessage::create(data, dataLength);
temp->setLocalNode(_localNode);
temp->setRoutingTable(_routingTable);
temp->setTaskQueue(_taskQueue); temp->setTaskQueue(_taskQueue);
temp->setTaskFactory(_taskFactory); temp->setTaskFactory(_taskFactory);
msg = temp; msg = temp;
@ -423,6 +425,16 @@ void DefaultBtMessageFactory::setBtMessageDispatcher(const WeakHandle<BtMessageD
this->dispatcher = dispatcher; this->dispatcher = dispatcher;
} }
void DefaultBtMessageFactory::setLocalNode(const WeakHandle<DHTNode>& localNode)
{
_localNode = localNode;
}
void DefaultBtMessageFactory::setRoutingTable(const WeakHandle<DHTRoutingTable>& routingTable)
{
_routingTable = routingTable;
}
void DefaultBtMessageFactory::setBtRequestFactory(const WeakHandle<BtRequestFactory>& factory) void DefaultBtMessageFactory::setBtRequestFactory(const WeakHandle<BtRequestFactory>& factory)
{ {
this->requestFactory = factory; this->requestFactory = factory;

View File

@ -46,6 +46,8 @@ class BtMessageDispatcher;
class BtRequestFactory; class BtRequestFactory;
class PeerConnection; class PeerConnection;
class PieceStorage; class PieceStorage;
class DHTNode;
class DHTRoutingTable;
class DHTTaskQueue; class DHTTaskQueue;
class DHTTaskFactory; class DHTTaskFactory;
@ -64,6 +66,10 @@ private:
WeakHandle<PeerConnection> peerConnection; WeakHandle<PeerConnection> peerConnection;
WeakHandle<DHTNode> _localNode;
WeakHandle<DHTRoutingTable> _routingTable;
WeakHandle<DHTTaskQueue> _taskQueue; WeakHandle<DHTTaskQueue> _taskQueue;
WeakHandle<DHTTaskFactory> _taskFactory; WeakHandle<DHTTaskFactory> _taskFactory;
@ -139,6 +145,10 @@ public:
void setPeerConnection(const WeakHandle<PeerConnection>& connection); void setPeerConnection(const WeakHandle<PeerConnection>& connection);
void setLocalNode(const WeakHandle<DHTNode>& localNode);
void setRoutingTable(const WeakHandle<DHTRoutingTable>& routingTable);
void setTaskQueue(const WeakHandle<DHTTaskQueue>& taskQueue); void setTaskQueue(const WeakHandle<DHTTaskQueue>& taskQueue);
void setTaskFactory(const WeakHandle<DHTTaskFactory>& taskFactory); void setTaskFactory(const WeakHandle<DHTTaskFactory>& taskFactory);

View File

@ -56,6 +56,7 @@
#include "PeerConnection.h" #include "PeerConnection.h"
#include "ExtensionMessageFactory.h" #include "ExtensionMessageFactory.h"
#include "CUIDCounter.h" #include "CUIDCounter.h"
#include "DHTRoutingTable.h"
#include "DHTTaskQueue.h" #include "DHTTaskQueue.h"
#include "DHTTaskFactory.h" #include "DHTTaskFactory.h"
#include "DHTNode.h" #include "DHTNode.h"
@ -90,6 +91,8 @@ PeerInteractionCommand::PeerInteractionCommand(int32_t cuid,
factory->setCuid(cuid); factory->setCuid(cuid);
factory->setBtContext(btContext); factory->setBtContext(btContext);
factory->setPeer(peer); factory->setPeer(peer);
factory->setLocalNode(DHTRegistry::_localNode);
factory->setRoutingTable(DHTRegistry::_routingTable);
factory->setTaskQueue(DHTRegistry::_taskQueue); factory->setTaskQueue(DHTRegistry::_taskQueue);
factory->setTaskFactory(DHTRegistry::_taskFactory); factory->setTaskFactory(DHTRegistry::_taskFactory);

View File

@ -1,8 +1,10 @@
#include "BtPortMessage.h" #include "BtPortMessage.h"
#include "PeerMessageUtil.h" #include "PeerMessageUtil.h"
#include "Util.h" #include "Util.h"
#include "array_fun.h"
#include "Peer.h" #include "Peer.h"
#include "DHTNode.h" #include "DHTNode.h"
#include "DHTRoutingTable.h"
#include "MockDHTTask.h" #include "MockDHTTask.h"
#include "MockDHTTaskFactory.h" #include "MockDHTTaskFactory.h"
#include "MockDHTTaskQueue.h" #include "MockDHTTaskQueue.h"
@ -18,6 +20,7 @@ class BtPortMessageTest:public CppUnit::TestFixture {
CPPUNIT_TEST(testToString); CPPUNIT_TEST(testToString);
CPPUNIT_TEST(testGetMessage); CPPUNIT_TEST(testGetMessage);
CPPUNIT_TEST(testDoReceivedAction); CPPUNIT_TEST(testDoReceivedAction);
CPPUNIT_TEST(testDoReceivedAction_bootstrap);
CPPUNIT_TEST_SUITE_END(); CPPUNIT_TEST_SUITE_END();
private: private:
@ -29,6 +32,7 @@ public:
void testToString(); void testToString();
void testGetMessage(); void testGetMessage();
void testDoReceivedAction(); void testDoReceivedAction();
void testDoReceivedAction_bootstrap();
class MockDHTTaskFactory2:public MockDHTTaskFactory { class MockDHTTaskFactory2:public MockDHTTaskFactory {
public: public:
@ -37,6 +41,14 @@ public:
{ {
return new MockDHTTask(remoteNode); return new MockDHTTask(remoteNode);
} }
virtual SharedHandle<DHTTask>
createNodeLookupTask(const unsigned char* targetID)
{
MockDHTTask* task = new MockDHTTask(0);
task->setTargetID(targetID);
return task;
}
}; };
}; };
@ -84,10 +96,29 @@ void BtPortMessageTest::testGetMessage() {
void BtPortMessageTest::testDoReceivedAction() void BtPortMessageTest::testDoReceivedAction()
{ {
unsigned char nodeID[DHT_ID_LENGTH];
memset(nodeID, 0, DHT_ID_LENGTH);
SharedHandle<DHTNode> localNode = new DHTNode(nodeID);
// 9 nodes to create at least 2 buckets.
SharedHandle<DHTNode> nodes[] = { 0, 0, 0, 0, 0, 0, 0, 0, 0 };
for(size_t i = 0; i < arrayLength(nodes); ++i) {
memset(nodeID, 0, DHT_ID_LENGTH);
nodeID[DHT_ID_LENGTH-1] = i;
nodes[i] = new DHTNode(nodeID);
}
DHTRoutingTable routingTable(localNode);
for(size_t i = 0; i < arrayLength(nodes); ++i) {
routingTable.addNode(nodes[i]);
}
SharedHandle<Peer> peer = new Peer("192.168.0.1", 6881); SharedHandle<Peer> peer = new Peer("192.168.0.1", 6881);
BtPortMessage msg(6881); BtPortMessage msg(6881);
MockDHTTaskQueue taskQueue; MockDHTTaskQueue taskQueue;
MockDHTTaskFactory2 taskFactory; MockDHTTaskFactory2 taskFactory;
msg.setLocalNode(localNode);
msg.setRoutingTable(&routingTable);
msg.setTaskQueue(&taskQueue); msg.setTaskQueue(&taskQueue);
msg.setTaskFactory(&taskFactory); msg.setTaskFactory(&taskFactory);
msg.setPeer(peer); msg.setPeer(peer);
@ -100,4 +131,33 @@ void BtPortMessageTest::testDoReceivedAction()
CPPUNIT_ASSERT_EQUAL((uint16_t)6881, node->getPort()); CPPUNIT_ASSERT_EQUAL((uint16_t)6881, node->getPort());
} }
void BtPortMessageTest::testDoReceivedAction_bootstrap()
{
unsigned char nodeID[DHT_ID_LENGTH];
memset(nodeID, 0, DHT_ID_LENGTH);
nodeID[0] = 0xff;
SharedHandle<DHTNode> localNode = new DHTNode(nodeID);
DHTRoutingTable routingTable(localNode); // no nodes , 1 bucket.
SharedHandle<Peer> peer = new Peer("192.168.0.1", 6881);
BtPortMessage msg(6881);
MockDHTTaskQueue taskQueue;
MockDHTTaskFactory2 taskFactory;
msg.setLocalNode(localNode);
msg.setRoutingTable(&routingTable);
msg.setTaskQueue(&taskQueue);
msg.setTaskFactory(&taskFactory);
msg.setPeer(peer);
msg.doReceivedAction();
CPPUNIT_ASSERT_EQUAL((size_t)2, taskQueue._immediateTaskQueue.size());
SharedHandle<DHTNode> node = SharedHandle<MockDHTTask>(taskQueue._immediateTaskQueue[0])->_remoteNode;
CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.1"), node->getIPAddress());
CPPUNIT_ASSERT_EQUAL((uint16_t)6881, node->getPort());
SharedHandle<MockDHTTask> task2 = taskQueue._immediateTaskQueue[1];
CPPUNIT_ASSERT(memcmp(nodeID, task2->_targetID, DHT_ID_LENGTH) == 0);
}
} // namespace aria2 } // namespace aria2

View File

@ -3,6 +3,8 @@
#include "DHTTask.h" #include "DHTTask.h"
#include "DHTNode.h" #include "DHTNode.h"
#include "DHTConstants.h"
#include <cstring>
namespace aria2 { namespace aria2 {
@ -10,6 +12,8 @@ class MockDHTTask:public DHTTask {
public: public:
SharedHandle<DHTNode> _remoteNode; SharedHandle<DHTNode> _remoteNode;
unsigned char _targetID[DHT_ID_LENGTH];
MockDHTTask(const SharedHandle<DHTNode>& remoteNode):_remoteNode(remoteNode) {} MockDHTTask(const SharedHandle<DHTNode>& remoteNode):_remoteNode(remoteNode) {}
virtual ~MockDHTTask() {} virtual ~MockDHTTask() {}
@ -20,6 +24,11 @@ public:
{ {
return false; return false;
} }
void setTargetID(const unsigned char* targetID)
{
memcpy(_targetID, targetID, DHT_ID_LENGTH);
}
}; };
} // namespace aria2 } // namespace aria2