diff --git a/ChangeLog b/ChangeLog index bd148e89..b0ce9b43 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,15 @@ +2010-10-02 Tatsuhiro Tsujikawa + + Execute 5 DHT tasks concurrently in each task queue. + * src/DHTTaskExecutor.cc + * src/DHTTaskExecutor.h + * src/DHTTaskQueueImpl.cc + * src/DHTTaskQueueImpl.h + * src/Makefile.am + * test/DHTTaskExecutorTest.cc + * test/Makefile.am + * test/MockDHTTask.h + 2010-10-02 Tatsuhiro Tsujikawa Fixed the bug that FtpFinishDownloadCommand does not handle diff --git a/src/DHTTaskExecutor.cc b/src/DHTTaskExecutor.cc new file mode 100644 index 00000000..7227f17f --- /dev/null +++ b/src/DHTTaskExecutor.cc @@ -0,0 +1,74 @@ +/* */ +#include "DHTTaskExecutor.h" + +#include + +#include "DHTTask.h" +#include "Logger.h" +#include "LogFactory.h" +#include "a2functional.h" + +namespace aria2 { + +DHTTaskExecutor::DHTTaskExecutor(size_t numConcurrent): + numConcurrent_(numConcurrent), + logger_(LogFactory::getInstance()) {} + +DHTTaskExecutor::~DHTTaskExecutor() {} + +void DHTTaskExecutor::update() +{ + execTasks_.erase(std::remove_if(execTasks_.begin(), execTasks_.end(), + mem_fun_sh(&DHTTask::finished)), + execTasks_.end()); + size_t r = numConcurrent_-execTasks_.size(); + while(r && !queue_.empty()) { + SharedHandle task = queue_.front(); + queue_.pop_front(); + task->startup(); + if(!task->finished()) { + execTasks_.push_back(task); + --r; + } + } + if(logger_->debug()) { + logger_->debug("Executing %u Task(s). Queue has %u task(s).", + static_cast(getExecutingTaskSize()), + static_cast(getQueueSize())); + } +} + +} // namespace aria2 diff --git a/src/DHTTaskExecutor.h b/src/DHTTaskExecutor.h new file mode 100644 index 00000000..436c8dff --- /dev/null +++ b/src/DHTTaskExecutor.h @@ -0,0 +1,86 @@ +/* */ +#ifndef D_DHT_TASK_EXECUTOR_H +#define D_DHT_TASK_EXECUTOR_H + +#include "common.h" + +#include +#include + +#include "SharedHandle.h" + +namespace aria2 { + +class DHTTask; +class Logger; + +class DHTTaskExecutor { +private: + size_t numConcurrent_; + std::vector > execTasks_; + std::deque > queue_; + Logger* logger_; +public: + DHTTaskExecutor(size_t numConcurrent); + + ~DHTTaskExecutor(); + + void update(); + + void addTask(const SharedHandle& task) + { + queue_.push_back(task); + } + + size_t getExecutingTaskSize() const + { + return execTasks_.size(); + } + + size_t getNumConcurrent() const + { + return numConcurrent_; + } + + size_t getQueueSize() const + { + return queue_.size(); + } +}; + +} // namespace aria2 + +#endif // D_DHT_TASK_EXECUTOR_H diff --git a/src/DHTTaskQueueImpl.cc b/src/DHTTaskQueueImpl.cc index 65cff073..2c46f514 100644 --- a/src/DHTTaskQueueImpl.cc +++ b/src/DHTTaskQueueImpl.cc @@ -34,51 +34,52 @@ /* copyright --> */ #include "DHTTaskQueueImpl.h" #include "DHTTask.h" +#include "Logger.h" +#include "LogFactory.h" namespace aria2 { -DHTTaskQueueImpl::DHTTaskQueueImpl() {} +namespace { +const size_t NUM_CONCURRENT_TASK = 5; +} + +DHTTaskQueueImpl::DHTTaskQueueImpl(): + periodicTaskQueue1_(NUM_CONCURRENT_TASK), + periodicTaskQueue2_(NUM_CONCURRENT_TASK), + immediateTaskQueue_(NUM_CONCURRENT_TASK), + logger_(LogFactory::getInstance()) {} DHTTaskQueueImpl::~DHTTaskQueueImpl() {} -void DHTTaskQueueImpl::executeTask(SharedHandle& task, - std::deque >& taskQueue) -{ - while(1) { - if(task.isNull() || task->finished()) { - task.reset(); - if(taskQueue.empty()) { - break; - } - task = taskQueue.front(); - taskQueue.erase(taskQueue.begin()); - task->startup(); - } else { - break; - } - } -} - void DHTTaskQueueImpl::executeTask() { - executeTask(periodicTask1_, periodicTaskQueue1_); - executeTask(periodicTask2_, periodicTaskQueue2_); - executeTask(immediateTask_, immediateTaskQueue_); + if(logger_->debug()) { + logger_->debug("Updating periodicTaskQueue1"); + } + periodicTaskQueue1_.update(); + if(logger_->debug()) { + logger_->debug("Updating periodicTaskQueue2"); + } + periodicTaskQueue2_.update(); + if(logger_->debug()) { + logger_->debug("Updating immediateTaskQueue"); + } + immediateTaskQueue_.update(); } void DHTTaskQueueImpl::addPeriodicTask1(const SharedHandle& task) { - periodicTaskQueue1_.push_back(task); + periodicTaskQueue1_.addTask(task); } void DHTTaskQueueImpl::addPeriodicTask2(const SharedHandle& task) { - periodicTaskQueue2_.push_back(task); + periodicTaskQueue2_.addTask(task); } void DHTTaskQueueImpl::addImmediateTask(const SharedHandle& task) { - immediateTaskQueue_.push_back(task); + immediateTaskQueue_.addTask(task); } } // namespace aria2 diff --git a/src/DHTTaskQueueImpl.h b/src/DHTTaskQueueImpl.h index 53edd078..d0ec0e95 100644 --- a/src/DHTTaskQueueImpl.h +++ b/src/DHTTaskQueueImpl.h @@ -36,26 +36,21 @@ #define _D_DHT_TASK_QUEUE_IMPL_H_ #include "DHTTaskQueue.h" -#include +#include "DHTTaskExecutor.h" namespace aria2 { +class Logger; + class DHTTaskQueueImpl:public DHTTaskQueue { private: + DHTTaskExecutor periodicTaskQueue1_; - SharedHandle periodicTask1_; + DHTTaskExecutor periodicTaskQueue2_; - SharedHandle periodicTask2_; + DHTTaskExecutor immediateTaskQueue_; - SharedHandle immediateTask_; - - std::deque > periodicTaskQueue1_; - - std::deque > periodicTaskQueue2_; - - std::deque > immediateTaskQueue_; - - void executeTask(SharedHandle& task, std::deque >& taskQueue); + Logger* logger_; public: DHTTaskQueueImpl(); diff --git a/src/Makefile.am b/src/Makefile.am index 553b19e4..eb26512a 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -406,6 +406,7 @@ SRCS += PeerAbstractCommand.cc PeerAbstractCommand.h\ DHTPingTask.cc DHTPingTask.h\ DHTTaskQueue.h\ DHTTaskQueueImpl.cc DHTTaskQueueImpl.h\ + DHTTaskExecutor.cc DHTTaskExecutor.h\ DHTBucketRefreshTask.cc DHTBucketRefreshTask.h\ DHTAbstractNodeLookupTask.h\ DHTPeerLookupTask.cc DHTPeerLookupTask.h\ diff --git a/src/Makefile.in b/src/Makefile.in index 74731720..65151a70 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -196,6 +196,7 @@ bin_PROGRAMS = aria2c$(EXEEXT) @ENABLE_BITTORRENT_TRUE@ DHTPingTask.cc DHTPingTask.h\ @ENABLE_BITTORRENT_TRUE@ DHTTaskQueue.h\ @ENABLE_BITTORRENT_TRUE@ DHTTaskQueueImpl.cc DHTTaskQueueImpl.h\ +@ENABLE_BITTORRENT_TRUE@ DHTTaskExecutor.cc DHTTaskExecutor.h\ @ENABLE_BITTORRENT_TRUE@ DHTBucketRefreshTask.cc DHTBucketRefreshTask.h\ @ENABLE_BITTORRENT_TRUE@ DHTAbstractNodeLookupTask.h\ @ENABLE_BITTORRENT_TRUE@ DHTPeerLookupTask.cc DHTPeerLookupTask.h\ @@ -565,19 +566,20 @@ am__libaria2c_a_SOURCES_DIST = Socket.h SocketCore.cc SocketCore.h \ DHTPeerLookupTaskCallback.h DHTAbstractTask.cc \ DHTAbstractTask.h DHTTask.h DHTPingTask.cc DHTPingTask.h \ DHTTaskQueue.h DHTTaskQueueImpl.cc DHTTaskQueueImpl.h \ - DHTBucketRefreshTask.cc DHTBucketRefreshTask.h \ - DHTAbstractNodeLookupTask.h DHTPeerLookupTask.cc \ - DHTPeerLookupTask.h DHTSetup.cc DHTSetup.h DHTTaskFactory.h \ - DHTTaskFactoryImpl.cc DHTTaskFactoryImpl.h \ - DHTInteractionCommand.cc DHTInteractionCommand.h \ - DHTPeerAnnounceEntry.cc DHTPeerAnnounceEntry.h \ - DHTPeerAnnounceStorage.cc DHTPeerAnnounceStorage.h \ - DHTTokenTracker.cc DHTTokenTracker.h DHTGetPeersCommand.cc \ - DHTGetPeersCommand.h DHTTokenUpdateCommand.cc \ - DHTTokenUpdateCommand.h DHTBucketRefreshCommand.cc \ - DHTBucketRefreshCommand.h DHTPeerAnnounceCommand.cc \ - DHTPeerAnnounceCommand.h DHTReplaceNodeTask.cc \ - DHTReplaceNodeTask.h DHTEntryPointNameResolveCommand.cc \ + DHTTaskExecutor.cc DHTTaskExecutor.h DHTBucketRefreshTask.cc \ + DHTBucketRefreshTask.h DHTAbstractNodeLookupTask.h \ + DHTPeerLookupTask.cc DHTPeerLookupTask.h DHTSetup.cc \ + DHTSetup.h DHTTaskFactory.h DHTTaskFactoryImpl.cc \ + DHTTaskFactoryImpl.h DHTInteractionCommand.cc \ + DHTInteractionCommand.h DHTPeerAnnounceEntry.cc \ + DHTPeerAnnounceEntry.h DHTPeerAnnounceStorage.cc \ + DHTPeerAnnounceStorage.h DHTTokenTracker.cc DHTTokenTracker.h \ + DHTGetPeersCommand.cc DHTGetPeersCommand.h \ + DHTTokenUpdateCommand.cc DHTTokenUpdateCommand.h \ + DHTBucketRefreshCommand.cc DHTBucketRefreshCommand.h \ + DHTPeerAnnounceCommand.cc DHTPeerAnnounceCommand.h \ + DHTReplaceNodeTask.cc DHTReplaceNodeTask.h \ + DHTEntryPointNameResolveCommand.cc \ DHTEntryPointNameResolveCommand.h DHTRoutingTableSerializer.cc \ DHTRoutingTableSerializer.h DHTRoutingTableDeserializer.cc \ DHTRoutingTableDeserializer.h DHTAutoSaveCommand.cc \ @@ -746,6 +748,7 @@ am__objects_6 = @ENABLE_BITTORRENT_TRUE@ DHTAbstractTask.$(OBJEXT) \ @ENABLE_BITTORRENT_TRUE@ DHTPingTask.$(OBJEXT) \ @ENABLE_BITTORRENT_TRUE@ DHTTaskQueueImpl.$(OBJEXT) \ +@ENABLE_BITTORRENT_TRUE@ DHTTaskExecutor.$(OBJEXT) \ @ENABLE_BITTORRENT_TRUE@ DHTBucketRefreshTask.$(OBJEXT) \ @ENABLE_BITTORRENT_TRUE@ DHTPeerLookupTask.$(OBJEXT) \ @ENABLE_BITTORRENT_TRUE@ DHTSetup.$(OBJEXT) \ @@ -1460,6 +1463,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DHTRoutingTableDeserializer.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DHTRoutingTableSerializer.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DHTSetup.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DHTTaskExecutor.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DHTTaskFactoryImpl.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DHTTaskQueueImpl.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DHTTokenTracker.Po@am__quote@ diff --git a/test/DHTTaskExecutorTest.cc b/test/DHTTaskExecutorTest.cc new file mode 100644 index 00000000..f66695bd --- /dev/null +++ b/test/DHTTaskExecutorTest.cc @@ -0,0 +1,48 @@ +#include "DHTTaskExecutor.h" + +#include + +#include "MockDHTTask.h" +#include "array_fun.h" + +namespace aria2 { + +class DHTTaskExecutorTest:public CppUnit::TestFixture { + + CPPUNIT_TEST_SUITE(DHTTaskExecutorTest); + CPPUNIT_TEST(testUpdate); + CPPUNIT_TEST_SUITE_END(); +public: + void testUpdate(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(DHTTaskExecutorTest); + +void DHTTaskExecutorTest::testUpdate() +{ + SharedHandle rn; + DHTTaskExecutor tex(2); + SharedHandle tasks[] = { + SharedHandle(new MockDHTTask(rn)), + SharedHandle(new MockDHTTask(rn)), + SharedHandle(new MockDHTTask(rn)), + SharedHandle(new MockDHTTask(rn)) + }; + tasks[1]->finished_ = true; + for(size_t i = 0; i < A2_ARRAY_LEN(tasks); ++i) { + tex.addTask(tasks[i]); + } + CPPUNIT_ASSERT_EQUAL((size_t)0, tex.getExecutingTaskSize()); + CPPUNIT_ASSERT_EQUAL((size_t)4, tex.getQueueSize()); + tex.update(); + CPPUNIT_ASSERT_EQUAL((size_t)2, tex.getExecutingTaskSize()); + CPPUNIT_ASSERT_EQUAL((size_t)1, tex.getQueueSize()); + tasks[0]->finished_ = true; + tasks[2]->finished_ = true; + tasks[3]->finished_ = true; + tex.update(); + CPPUNIT_ASSERT_EQUAL((size_t)0, tex.getExecutingTaskSize()); + CPPUNIT_ASSERT_EQUAL((size_t)0, tex.getQueueSize()); +} + +} // namespace aria2 diff --git a/test/Makefile.am b/test/Makefile.am index 6c68f0ab..c46cb242 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -175,6 +175,7 @@ aria2c_SOURCES += BtAllowedFastMessageTest.cc\ DHTIDCloserTest.cc\ DHTRoutingTableSerializerTest.cc\ DHTRoutingTableDeserializerTest.cc\ + DHTTaskExecutorTest.cc\ DHKeyExchangeTest.cc\ ARC4Test.cc\ MSEHandshakeTest.cc\ diff --git a/test/Makefile.in b/test/Makefile.in index b1f9638d..bf976038 100644 --- a/test/Makefile.in +++ b/test/Makefile.in @@ -121,6 +121,7 @@ check_PROGRAMS = $(am__EXEEXT_1) @ENABLE_BITTORRENT_TRUE@ DHTIDCloserTest.cc\ @ENABLE_BITTORRENT_TRUE@ DHTRoutingTableSerializerTest.cc\ @ENABLE_BITTORRENT_TRUE@ DHTRoutingTableDeserializerTest.cc\ +@ENABLE_BITTORRENT_TRUE@ DHTTaskExecutorTest.cc\ @ENABLE_BITTORRENT_TRUE@ DHKeyExchangeTest.cc\ @ENABLE_BITTORRENT_TRUE@ ARC4Test.cc\ @ENABLE_BITTORRENT_TRUE@ MSEHandshakeTest.cc\ @@ -260,18 +261,19 @@ am__aria2c_SOURCES_DIST = AllTest.cc TestUtil.cc TestUtil.h \ DHTPeerAnnounceEntryTest.cc DHTPeerAnnounceStorageTest.cc \ DHTTokenTrackerTest.cc XORCloserTest.cc DHTIDCloserTest.cc \ DHTRoutingTableSerializerTest.cc \ - DHTRoutingTableDeserializerTest.cc DHKeyExchangeTest.cc \ - ARC4Test.cc MSEHandshakeTest.cc MockBtAnnounce.h \ - MockBtProgressInfoFile.h MockBtRequestFactory.h \ - MockDHTMessage.h MockDHTMessageCallback.h \ - MockDHTMessageDispatcher.h MockDHTMessageFactory.h \ - MockDHTTask.h MockDHTTaskFactory.h MockDHTTaskQueue.h \ - MockExtensionMessage.h MockExtensionMessageFactory.h \ - MockPieceStorage.h BittorrentHelperTest.cc \ - PriorityPieceSelectorTest.cc MockPieceSelector.h \ - extension_message_test_helper.h LpdMessageDispatcherTest.cc \ - LpdMessageReceiverTest.cc Bencode2Test.cc MetalinkerTest.cc \ - MetalinkEntryTest.cc Metalink2RequestGroupTest.cc \ + DHTRoutingTableDeserializerTest.cc DHTTaskExecutorTest.cc \ + DHKeyExchangeTest.cc ARC4Test.cc MSEHandshakeTest.cc \ + MockBtAnnounce.h MockBtProgressInfoFile.h \ + MockBtRequestFactory.h MockDHTMessage.h \ + MockDHTMessageCallback.h MockDHTMessageDispatcher.h \ + MockDHTMessageFactory.h MockDHTTask.h MockDHTTaskFactory.h \ + MockDHTTaskQueue.h MockExtensionMessage.h \ + MockExtensionMessageFactory.h MockPieceStorage.h \ + BittorrentHelperTest.cc PriorityPieceSelectorTest.cc \ + MockPieceSelector.h extension_message_test_helper.h \ + LpdMessageDispatcherTest.cc LpdMessageReceiverTest.cc \ + Bencode2Test.cc MetalinkerTest.cc MetalinkEntryTest.cc \ + Metalink2RequestGroupTest.cc \ MetalinkPostDownloadHandlerTest.cc MetalinkHelperTest.cc \ MetalinkParserControllerTest.cc MetalinkProcessorTest.cc @ENABLE_XML_RPC_TRUE@am__objects_1 = XmlRpcRequestParserControllerTest.$(OBJEXT) \ @@ -354,6 +356,7 @@ am__aria2c_SOURCES_DIST = AllTest.cc TestUtil.cc TestUtil.h \ @ENABLE_BITTORRENT_TRUE@ DHTIDCloserTest.$(OBJEXT) \ @ENABLE_BITTORRENT_TRUE@ DHTRoutingTableSerializerTest.$(OBJEXT) \ @ENABLE_BITTORRENT_TRUE@ DHTRoutingTableDeserializerTest.$(OBJEXT) \ +@ENABLE_BITTORRENT_TRUE@ DHTTaskExecutorTest.$(OBJEXT) \ @ENABLE_BITTORRENT_TRUE@ DHKeyExchangeTest.$(OBJEXT) \ @ENABLE_BITTORRENT_TRUE@ ARC4Test.$(OBJEXT) \ @ENABLE_BITTORRENT_TRUE@ MSEHandshakeTest.$(OBJEXT) \ @@ -804,6 +807,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DHTRoutingTableDeserializerTest.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DHTRoutingTableSerializerTest.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DHTRoutingTableTest.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DHTTaskExecutorTest.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DHTTokenTrackerTest.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DHTUnknownMessageTest.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DNSCacheTest.Po@am__quote@ diff --git a/test/MockDHTTask.h b/test/MockDHTTask.h index cba8047b..9dced596 100644 --- a/test/MockDHTTask.h +++ b/test/MockDHTTask.h @@ -2,9 +2,11 @@ #define _D_MOCK_DHT_TASK_H_ #include "DHTTask.h" + +#include + #include "DHTNode.h" #include "DHTConstants.h" -#include namespace aria2 { @@ -14,7 +16,11 @@ public: unsigned char targetID_[DHT_ID_LENGTH]; - MockDHTTask(const SharedHandle& remoteNode):remoteNode_(remoteNode) {} + bool finished_; + + MockDHTTask(const SharedHandle& remoteNode): + remoteNode_(remoteNode), + finished_(false) {} virtual ~MockDHTTask() {} @@ -22,7 +28,7 @@ public: virtual bool finished() { - return false; + return finished_; } void setTargetID(const unsigned char* targetID)