From 852315bf1110e053a3082847fa5351ece436db48 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Tue, 11 Mar 2008 14:19:10 +0000 Subject: [PATCH] 2008-03-11 Tatsuhiro Tsujikawa Fixed high memory footprint when DHT is enabled. This is not a memory leak, but DHTReplaceNodeTask is more frequently queued than it is processed and the queue is getting longer. As a consequence, momory usage is increased. As for a fix, instead of issuing DHTReplaceNodeTask, I've implemented replacement cache in DHTBucket which is described in Kademlia paper. * src/DHTRoutingTable.cc (addNode): Removed the issuing of DHTReplaceNodeTask. * src/DHTBucket.{h, cc} (cacheNode): New function. (getCachedNodes): New function. (dropNode): Push back cached node to _nodes. * test/DHTBucketTest.cc (testCacheNode): New test (testDropNode): New test --- ChangeLog | 18 ++++++++++++ src/DHTBucket.cc | 46 +++++++++++++---------------- src/DHTBucket.h | 8 ++++- src/DHTRoutingTable.cc | 6 ++-- test/DHTBucketTest.cc | 67 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 115 insertions(+), 30 deletions(-) diff --git a/ChangeLog b/ChangeLog index cbdf8cb2..d8f43db4 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,21 @@ +2008-03-11 Tatsuhiro Tsujikawa + + Fixed high memory footprint when DHT is enabled. + This is not a memory leak, but DHTReplaceNodeTask is more frequently + queued than it is processed and the queue is getting longer. As a + consequence, momory usage is increased. + As for a fix, instead of issuing DHTReplaceNodeTask, I've implemented + replacement cache in DHTBucket which is described in Kademlia paper. + * src/DHTRoutingTable.cc (addNode): Removed the issuing of + DHTReplaceNodeTask. + * src/DHTBucket.{h, cc} + (cacheNode): New function. + (getCachedNodes): New function. + (dropNode): Push back cached node to _nodes. + * test/DHTBucketTest.cc + (testCacheNode): New test + (testDropNode): New test + 2008-03-11 Tatsuhiro Tsujikawa Allocate memory for peekBuf in initiateSecureConnection() to avoid diff --git a/src/DHTBucket.cc b/src/DHTBucket.cc index a5045baf..720f478d 100644 --- a/src/DHTBucket.cc +++ b/src/DHTBucket.cc @@ -119,23 +119,6 @@ bool DHTBucket::addNode(const SharedHandle& node) } else { return false; } - /* - } else if(splitAllowed()) { - return false; - } else { - std::deque >::iterator ci = find(_cachedNodes.begin(), _cachedNodes.end(), node); - if(ci == _cachedNodes.end()) { - _cachedNodes.push_back(node); - if(_cachedNodes.size() > CACHE_SIZE) { - _cachedNodes.erase(_cachedNodes.begin(), _cachedNodes().begin()+CACHE_SIZE-_cachedNodes.size()); - } - } else { - _cachedNodes.erase(ci); - _cachedNodes.push_back(node); - } - return true; - } - */ } } else { _nodes.erase(itr); @@ -144,19 +127,25 @@ bool DHTBucket::addNode(const SharedHandle& node) } } +void DHTBucket::cacheNode(const SharedHandle& node) +{ + // _cachedNodes are sorted by last time seen + _cachedNodes.push_front(node); + if(_cachedNodes.size() > CACHE_SIZE) { + _cachedNodes.resize(CACHE_SIZE, 0); + } +} + void DHTBucket::dropNode(const SharedHandle& node) { - return; - /* - std::deque >::iterator itr = find(_nodes.begin(), _nodes.end(), node); - if(itr != _nodes.end()) { - _nodes.erase(itr); - if(_cachedNodes.size()) { - _nodes.push_back(_cachedNodes.back()); - _cachedNodes.erase(_cachedNodes.begin()+_cachedNodes.size()-1); + if(_cachedNodes.size()) { + std::deque >::iterator itr = find(_nodes.begin(), _nodes.end(), node); + if(itr != _nodes.end()) { + _nodes.erase(itr); + _nodes.push_back(_cachedNodes.front()); + _cachedNodes.erase(_cachedNodes.begin()); } } - */ } void DHTBucket::moveToHead(const SharedHandle& node) @@ -291,4 +280,9 @@ SharedHandle DHTBucket::getLRUQuestionableNode() const } } +const std::deque >& DHTBucket::getCachedNodes() const +{ + return _cachedNodes; +} + } // namespace aria2 diff --git a/src/DHTBucket.h b/src/DHTBucket.h index 5c10eeb1..adbcee8f 100644 --- a/src/DHTBucket.h +++ b/src/DHTBucket.h @@ -61,7 +61,9 @@ private: // sorted in ascending order std::deque > _nodes; - //std::deque > _cachedNodes; + // a replacement cache. The maximum size is specified by CACHE_SIZE. + // This is sorted by last time seen. + std::deque > _cachedNodes; Time _lastUpdated; @@ -89,6 +91,8 @@ public: bool addNode(const SharedHandle& node); + void cacheNode(const SharedHandle& node); + bool splitAllowed() const; size_t getPrefixLength() const @@ -131,6 +135,8 @@ public: bool containsQuestionableNode() const; SharedHandle getLRUQuestionableNode() const; + + const std::deque >& getCachedNodes() const; }; } // namespace aria2 diff --git a/src/DHTRoutingTable.cc b/src/DHTRoutingTable.cc index 82386f4e..c811c805 100644 --- a/src/DHTRoutingTable.cc +++ b/src/DHTRoutingTable.cc @@ -98,9 +98,9 @@ bool DHTRoutingTable::addNode(const SharedHandle& node, bool good) bnode = lbnode; } } else { - if(good && bucket->containsQuestionableNode()) { - _logger->debug("Issuing ReplaceNodeTask: new node=%s", node->toString().c_str()); - _taskQueue->addImmediateTask(_taskFactory->createReplaceNodeTask(bucket, node)); + if(good) { + bucket->cacheNode(node); + _logger->debug("Cached node=%s", node->toString().c_str()); } return false; } diff --git a/test/DHTBucketTest.cc b/test/DHTBucketTest.cc index c6c3b8f8..f97eb17a 100644 --- a/test/DHTBucketTest.cc +++ b/test/DHTBucketTest.cc @@ -17,6 +17,8 @@ class DHTBucketTest:public CppUnit::TestFixture { CPPUNIT_TEST(testMoveToHead); CPPUNIT_TEST(testMoveToTail); CPPUNIT_TEST(testGetGoodNodes); + CPPUNIT_TEST(testCacheNode); + CPPUNIT_TEST(testDropNode); CPPUNIT_TEST_SUITE_END(); public: void setUp() {} @@ -31,6 +33,8 @@ public: void testMoveToHead(); void testMoveToTail(); void testGetGoodNodes(); + void testCacheNode(); + void testDropNode(); }; @@ -337,4 +341,67 @@ void DHTBucketTest::testGetGoodNodes() CPPUNIT_ASSERT_EQUAL((uint16_t)6888, goodNodes[5]->getPort()); } +void DHTBucketTest::testCacheNode() +{ + unsigned char localNodeID[DHT_ID_LENGTH]; + memset(localNodeID, 0, DHT_ID_LENGTH); + SharedHandle localNode = new DHTNode(localNodeID); + DHTBucket bucket(localNode); + + SharedHandle n1 = new DHTNode(); + SharedHandle n2 = new DHTNode(); + SharedHandle n3 = new DHTNode(); + + bucket.cacheNode(n1); + bucket.cacheNode(n2); + CPPUNIT_ASSERT_EQUAL((size_t)2, bucket.getCachedNodes().size()); + CPPUNIT_ASSERT(n2 == bucket.getCachedNodes()[0]); + + bucket.cacheNode(n3); + CPPUNIT_ASSERT_EQUAL((size_t)2, bucket.getCachedNodes().size()); + CPPUNIT_ASSERT(n3 == bucket.getCachedNodes()[0]); + CPPUNIT_ASSERT(n2 == bucket.getCachedNodes()[1]); +} + +void DHTBucketTest::testDropNode() +{ + unsigned char localNodeID[DHT_ID_LENGTH]; + memset(localNodeID, 0, DHT_ID_LENGTH); + SharedHandle localNode = new DHTNode(localNodeID); + DHTBucket bucket(localNode); + + unsigned char id[DHT_ID_LENGTH]; + SharedHandle nodes[] = { 0, 0, 0, 0, 0, 0, 0, 0 }; + for(size_t i = 0; i < DHTBucket::K; ++i) { + createID(id, 0xf0, i); + nodes[i] = new DHTNode(id); + nodes[i]->setPort(6881+i); + CPPUNIT_ASSERT(bucket.addNode(nodes[i])); + } + + SharedHandle cachedNode1 = new DHTNode(); + SharedHandle cachedNode2 = new DHTNode(); + + bucket.dropNode(nodes[3]); + // nothing happens because the replacement cache is empty. + { + std::deque > tnodes = bucket.getNodes(); + CPPUNIT_ASSERT_EQUAL((size_t)8, tnodes.size()); + CPPUNIT_ASSERT(nodes[3] == tnodes[3]); + } + + bucket.cacheNode(cachedNode1); + bucket.cacheNode(cachedNode2); + + bucket.dropNode(nodes[3]); + { + std::deque > tnodes = bucket.getNodes(); + CPPUNIT_ASSERT_EQUAL((size_t)8, tnodes.size()); + CPPUNIT_ASSERT(tnodes.end() == std::find(tnodes.begin(), tnodes.end(), nodes[3])); + CPPUNIT_ASSERT(cachedNode2 == tnodes[7]); + } + CPPUNIT_ASSERT_EQUAL((size_t)1, bucket.getCachedNodes().size()); + CPPUNIT_ASSERT(cachedNode1 == bucket.getCachedNodes()[0]); +} + } // namespace aria2