diff --git a/ChangeLog b/ChangeLog index bc108e31..9c8989a3 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,14 @@ +2008-02-11 Tatsuhiro Tsujikawa + + Bootstrap through node added by port message. + Currently bootstrap is executed if the number of buckets in routing + table is 1. + * src/BtPortMessage.{h, cc} + * src/DefaultBtMessageFactory.{h, cc} + * src/PeerInteractionCommand.cc + * test/BtPortMessageTest.cc + * test/MockDHTTask.h + 2008-02-10 Tatsuhiro Tsujikawa Extract the Peer class's member variables, which are only needed after diff --git a/src/BtPortMessage.cc b/src/BtPortMessage.cc index a094deae..15a04ce9 100644 --- a/src/BtPortMessage.cc +++ b/src/BtPortMessage.cc @@ -40,6 +40,7 @@ #include "Logger.h" #include "Peer.h" #include "DHTNode.h" +#include "DHTRoutingTable.h" #include "DHTTaskQueue.h" #include "DHTTaskFactory.h" #include "DHTTask.h" @@ -75,8 +76,15 @@ void BtPortMessage::doReceivedAction() SharedHandle node = new DHTNode(); node->setIPAddress(peer->ipaddr); node->setPort(_port); - SharedHandle task = _taskFactory->createPingTask(node); - _taskQueue->addImmediateTask(task); + { + SharedHandle task = _taskFactory->createPingTask(node); + _taskQueue->addImmediateTask(task); + } + if(_routingTable->countBucket() == 1) { + // initiate bootstrap + logger->info("Dispatch node_lookup since too few buckets."); + _taskQueue->addImmediateTask(_taskFactory->createNodeLookupTask(_localNode->getID())); + } } else { logger->info("DHT port message received while localhost didn't declare support it."); } @@ -105,6 +113,16 @@ std::string BtPortMessage::toString() const { return "port port="+Util::uitos(_port); } +void BtPortMessage::setLocalNode(const WeakHandle& localNode) +{ + _localNode = localNode; +} + +void BtPortMessage::setRoutingTable(const WeakHandle& routingTable) +{ + _routingTable = routingTable; +} + void BtPortMessage::setTaskQueue(const WeakHandle& taskQueue) { _taskQueue = taskQueue; diff --git a/src/BtPortMessage.h b/src/BtPortMessage.h index 1c8c075d..99ae4e6f 100644 --- a/src/BtPortMessage.h +++ b/src/BtPortMessage.h @@ -39,6 +39,8 @@ namespace aria2 { +class DHTNode; +class DHTRoutingTable; class DHTTaskQueue; class DHTTaskFactory; @@ -48,6 +50,10 @@ private: unsigned char* _msg; static const size_t MESSAGE_LENGTH = 7; + WeakHandle _localNode; + + WeakHandle _routingTable; + WeakHandle _taskQueue; WeakHandle _taskFactory; @@ -72,6 +78,10 @@ public: virtual std::string toString() const; + void setLocalNode(const WeakHandle& localNode); + + void setRoutingTable(const WeakHandle& routingTable); + void setTaskQueue(const WeakHandle& taskQueue); void setTaskFactory(const WeakHandle& taskFactory); diff --git a/src/DefaultBtMessageFactory.cc b/src/DefaultBtMessageFactory.cc index 7cfec8be..94d92d1e 100644 --- a/src/DefaultBtMessageFactory.cc +++ b/src/DefaultBtMessageFactory.cc @@ -178,6 +178,8 @@ DefaultBtMessageFactory::createBtMessage(const unsigned char* data, int32_t data } case BtPortMessage::ID: { SharedHandle temp = BtPortMessage::create(data, dataLength); + temp->setLocalNode(_localNode); + temp->setRoutingTable(_routingTable); temp->setTaskQueue(_taskQueue); temp->setTaskFactory(_taskFactory); msg = temp; @@ -422,7 +424,17 @@ void DefaultBtMessageFactory::setBtMessageDispatcher(const WeakHandledispatcher = dispatcher; } - + +void DefaultBtMessageFactory::setLocalNode(const WeakHandle& localNode) +{ + _localNode = localNode; +} + +void DefaultBtMessageFactory::setRoutingTable(const WeakHandle& routingTable) +{ + _routingTable = routingTable; +} + void DefaultBtMessageFactory::setBtRequestFactory(const WeakHandle& factory) { this->requestFactory = factory; diff --git a/src/DefaultBtMessageFactory.h b/src/DefaultBtMessageFactory.h index b7ddb9b8..6231f833 100644 --- a/src/DefaultBtMessageFactory.h +++ b/src/DefaultBtMessageFactory.h @@ -46,6 +46,8 @@ class BtMessageDispatcher; class BtRequestFactory; class PeerConnection; class PieceStorage; +class DHTNode; +class DHTRoutingTable; class DHTTaskQueue; class DHTTaskFactory; @@ -64,6 +66,10 @@ private: WeakHandle peerConnection; + WeakHandle _localNode; + + WeakHandle _routingTable; + WeakHandle _taskQueue; WeakHandle _taskFactory; @@ -138,6 +144,10 @@ public: void setBtRequestFactory(const WeakHandle& factory); void setPeerConnection(const WeakHandle& connection); + + void setLocalNode(const WeakHandle& localNode); + + void setRoutingTable(const WeakHandle& routingTable); void setTaskQueue(const WeakHandle& taskQueue); diff --git a/src/PeerInteractionCommand.cc b/src/PeerInteractionCommand.cc index 4152b619..1c241e39 100644 --- a/src/PeerInteractionCommand.cc +++ b/src/PeerInteractionCommand.cc @@ -56,6 +56,7 @@ #include "PeerConnection.h" #include "ExtensionMessageFactory.h" #include "CUIDCounter.h" +#include "DHTRoutingTable.h" #include "DHTTaskQueue.h" #include "DHTTaskFactory.h" #include "DHTNode.h" @@ -90,6 +91,8 @@ PeerInteractionCommand::PeerInteractionCommand(int32_t cuid, factory->setCuid(cuid); factory->setBtContext(btContext); factory->setPeer(peer); + factory->setLocalNode(DHTRegistry::_localNode); + factory->setRoutingTable(DHTRegistry::_routingTable); factory->setTaskQueue(DHTRegistry::_taskQueue); factory->setTaskFactory(DHTRegistry::_taskFactory); diff --git a/test/BtPortMessageTest.cc b/test/BtPortMessageTest.cc index 5438caad..6585151c 100644 --- a/test/BtPortMessageTest.cc +++ b/test/BtPortMessageTest.cc @@ -1,8 +1,10 @@ #include "BtPortMessage.h" #include "PeerMessageUtil.h" #include "Util.h" +#include "array_fun.h" #include "Peer.h" #include "DHTNode.h" +#include "DHTRoutingTable.h" #include "MockDHTTask.h" #include "MockDHTTaskFactory.h" #include "MockDHTTaskQueue.h" @@ -18,6 +20,7 @@ class BtPortMessageTest:public CppUnit::TestFixture { CPPUNIT_TEST(testToString); CPPUNIT_TEST(testGetMessage); CPPUNIT_TEST(testDoReceivedAction); + CPPUNIT_TEST(testDoReceivedAction_bootstrap); CPPUNIT_TEST_SUITE_END(); private: @@ -29,6 +32,7 @@ public: void testToString(); void testGetMessage(); void testDoReceivedAction(); + void testDoReceivedAction_bootstrap(); class MockDHTTaskFactory2:public MockDHTTaskFactory { public: @@ -37,6 +41,14 @@ public: { return new MockDHTTask(remoteNode); } + + virtual SharedHandle + createNodeLookupTask(const unsigned char* targetID) + { + MockDHTTask* task = new MockDHTTask(0); + task->setTargetID(targetID); + return task; + } }; }; @@ -84,10 +96,29 @@ void BtPortMessageTest::testGetMessage() { void BtPortMessageTest::testDoReceivedAction() { + unsigned char nodeID[DHT_ID_LENGTH]; + memset(nodeID, 0, DHT_ID_LENGTH); + SharedHandle localNode = new DHTNode(nodeID); + + // 9 nodes to create at least 2 buckets. + SharedHandle nodes[] = { 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + for(size_t i = 0; i < arrayLength(nodes); ++i) { + memset(nodeID, 0, DHT_ID_LENGTH); + nodeID[DHT_ID_LENGTH-1] = i; + nodes[i] = new DHTNode(nodeID); + } + + DHTRoutingTable routingTable(localNode); + for(size_t i = 0; i < arrayLength(nodes); ++i) { + routingTable.addNode(nodes[i]); + } + SharedHandle peer = new Peer("192.168.0.1", 6881); BtPortMessage msg(6881); MockDHTTaskQueue taskQueue; MockDHTTaskFactory2 taskFactory; + msg.setLocalNode(localNode); + msg.setRoutingTable(&routingTable); msg.setTaskQueue(&taskQueue); msg.setTaskFactory(&taskFactory); msg.setPeer(peer); @@ -100,4 +131,33 @@ void BtPortMessageTest::testDoReceivedAction() CPPUNIT_ASSERT_EQUAL((uint16_t)6881, node->getPort()); } +void BtPortMessageTest::testDoReceivedAction_bootstrap() +{ + unsigned char nodeID[DHT_ID_LENGTH]; + memset(nodeID, 0, DHT_ID_LENGTH); + nodeID[0] = 0xff; + SharedHandle localNode = new DHTNode(nodeID); + DHTRoutingTable routingTable(localNode); // no nodes , 1 bucket. + + SharedHandle peer = new Peer("192.168.0.1", 6881); + BtPortMessage msg(6881); + MockDHTTaskQueue taskQueue; + MockDHTTaskFactory2 taskFactory; + msg.setLocalNode(localNode); + msg.setRoutingTable(&routingTable); + msg.setTaskQueue(&taskQueue); + msg.setTaskFactory(&taskFactory); + msg.setPeer(peer); + + msg.doReceivedAction(); + + CPPUNIT_ASSERT_EQUAL((size_t)2, taskQueue._immediateTaskQueue.size()); + SharedHandle node = SharedHandle(taskQueue._immediateTaskQueue[0])->_remoteNode; + CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.1"), node->getIPAddress()); + CPPUNIT_ASSERT_EQUAL((uint16_t)6881, node->getPort()); + + SharedHandle task2 = taskQueue._immediateTaskQueue[1]; + CPPUNIT_ASSERT(memcmp(nodeID, task2->_targetID, DHT_ID_LENGTH) == 0); +} + } // namespace aria2 diff --git a/test/MockDHTTask.h b/test/MockDHTTask.h index dceac86a..ede1cd82 100644 --- a/test/MockDHTTask.h +++ b/test/MockDHTTask.h @@ -3,6 +3,8 @@ #include "DHTTask.h" #include "DHTNode.h" +#include "DHTConstants.h" +#include namespace aria2 { @@ -10,6 +12,8 @@ class MockDHTTask:public DHTTask { public: SharedHandle _remoteNode; + unsigned char _targetID[DHT_ID_LENGTH]; + MockDHTTask(const SharedHandle& remoteNode):_remoteNode(remoteNode) {} virtual ~MockDHTTask() {} @@ -20,6 +24,11 @@ public: { return false; } + + void setTargetID(const unsigned char* targetID) + { + memcpy(_targetID, targetID, DHT_ID_LENGTH); + } }; } // namespace aria2