2010-10-02 Tatsuhiro Tsujikawa <t-tujikawa@users.sourceforge.net>

Execute 5 DHT tasks concurrently in each task queue.
	* src/DHTTaskExecutor.cc
	* src/DHTTaskExecutor.h
	* src/DHTTaskQueueImpl.cc
	* src/DHTTaskQueueImpl.h
	* src/Makefile.am
	* test/DHTTaskExecutorTest.cc
	* test/Makefile.am
	* test/MockDHTTask.h
pull/1/head
Tatsuhiro Tsujikawa 2010-10-02 14:38:37 +00:00
parent 7375a778c4
commit 584af68e19
11 changed files with 297 additions and 65 deletions

View File

@ -1,3 +1,15 @@
2010-10-02 Tatsuhiro Tsujikawa <t-tujikawa@users.sourceforge.net>
Execute 5 DHT tasks concurrently in each task queue.
* src/DHTTaskExecutor.cc
* src/DHTTaskExecutor.h
* src/DHTTaskQueueImpl.cc
* src/DHTTaskQueueImpl.h
* src/Makefile.am
* test/DHTTaskExecutorTest.cc
* test/Makefile.am
* test/MockDHTTask.h
2010-10-02 Tatsuhiro Tsujikawa <t-tujikawa@users.sourceforge.net>
Fixed the bug that FtpFinishDownloadCommand does not handle

74
src/DHTTaskExecutor.cc Normal file
View File

@ -0,0 +1,74 @@
/* <!-- copyright */
/*
* aria2 - The high speed download utility
*
* Copyright (C) 2010 Tatsuhiro Tsujikawa
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* In addition, as a special exception, the copyright holders give
* permission to link the code of portions of this program with the
* OpenSSL library under certain conditions as described in each
* individual source file, and distribute linked combinations
* including the two.
* You must obey the GNU General Public License in all respects
* for all of the code used other than OpenSSL. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you
* do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source
* files in the program, then also delete it here.
*/
/* copyright --> */
#include "DHTTaskExecutor.h"
#include <algorithm>
#include "DHTTask.h"
#include "Logger.h"
#include "LogFactory.h"
#include "a2functional.h"
namespace aria2 {
DHTTaskExecutor::DHTTaskExecutor(size_t numConcurrent):
numConcurrent_(numConcurrent),
logger_(LogFactory::getInstance()) {}
DHTTaskExecutor::~DHTTaskExecutor() {}
void DHTTaskExecutor::update()
{
execTasks_.erase(std::remove_if(execTasks_.begin(), execTasks_.end(),
mem_fun_sh(&DHTTask::finished)),
execTasks_.end());
size_t r = numConcurrent_-execTasks_.size();
while(r && !queue_.empty()) {
SharedHandle<DHTTask> task = queue_.front();
queue_.pop_front();
task->startup();
if(!task->finished()) {
execTasks_.push_back(task);
--r;
}
}
if(logger_->debug()) {
logger_->debug("Executing %u Task(s). Queue has %u task(s).",
static_cast<unsigned int>(getExecutingTaskSize()),
static_cast<unsigned int>(getQueueSize()));
}
}
} // namespace aria2

86
src/DHTTaskExecutor.h Normal file
View File

@ -0,0 +1,86 @@
/* <!-- copyright */
/*
* aria2 - The high speed download utility
*
* Copyright (C) 2010 Tatsuhiro Tsujikawa
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* In addition, as a special exception, the copyright holders give
* permission to link the code of portions of this program with the
* OpenSSL library under certain conditions as described in each
* individual source file, and distribute linked combinations
* including the two.
* You must obey the GNU General Public License in all respects
* for all of the code used other than OpenSSL. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you
* do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source
* files in the program, then also delete it here.
*/
/* copyright --> */
#ifndef D_DHT_TASK_EXECUTOR_H
#define D_DHT_TASK_EXECUTOR_H
#include "common.h"
#include <vector>
#include <deque>
#include "SharedHandle.h"
namespace aria2 {
class DHTTask;
class Logger;
class DHTTaskExecutor {
private:
size_t numConcurrent_;
std::vector<SharedHandle<DHTTask> > execTasks_;
std::deque<SharedHandle<DHTTask> > queue_;
Logger* logger_;
public:
DHTTaskExecutor(size_t numConcurrent);
~DHTTaskExecutor();
void update();
void addTask(const SharedHandle<DHTTask>& task)
{
queue_.push_back(task);
}
size_t getExecutingTaskSize() const
{
return execTasks_.size();
}
size_t getNumConcurrent() const
{
return numConcurrent_;
}
size_t getQueueSize() const
{
return queue_.size();
}
};
} // namespace aria2
#endif // D_DHT_TASK_EXECUTOR_H

View File

@ -34,51 +34,52 @@
/* copyright --> */
#include "DHTTaskQueueImpl.h"
#include "DHTTask.h"
#include "Logger.h"
#include "LogFactory.h"
namespace aria2 {
DHTTaskQueueImpl::DHTTaskQueueImpl() {}
namespace {
const size_t NUM_CONCURRENT_TASK = 5;
}
DHTTaskQueueImpl::DHTTaskQueueImpl():
periodicTaskQueue1_(NUM_CONCURRENT_TASK),
periodicTaskQueue2_(NUM_CONCURRENT_TASK),
immediateTaskQueue_(NUM_CONCURRENT_TASK),
logger_(LogFactory::getInstance()) {}
DHTTaskQueueImpl::~DHTTaskQueueImpl() {}
void DHTTaskQueueImpl::executeTask(SharedHandle<DHTTask>& task,
std::deque<SharedHandle<DHTTask> >& taskQueue)
{
while(1) {
if(task.isNull() || task->finished()) {
task.reset();
if(taskQueue.empty()) {
break;
}
task = taskQueue.front();
taskQueue.erase(taskQueue.begin());
task->startup();
} else {
break;
}
}
}
void DHTTaskQueueImpl::executeTask()
{
executeTask(periodicTask1_, periodicTaskQueue1_);
executeTask(periodicTask2_, periodicTaskQueue2_);
executeTask(immediateTask_, immediateTaskQueue_);
if(logger_->debug()) {
logger_->debug("Updating periodicTaskQueue1");
}
periodicTaskQueue1_.update();
if(logger_->debug()) {
logger_->debug("Updating periodicTaskQueue2");
}
periodicTaskQueue2_.update();
if(logger_->debug()) {
logger_->debug("Updating immediateTaskQueue");
}
immediateTaskQueue_.update();
}
void DHTTaskQueueImpl::addPeriodicTask1(const SharedHandle<DHTTask>& task)
{
periodicTaskQueue1_.push_back(task);
periodicTaskQueue1_.addTask(task);
}
void DHTTaskQueueImpl::addPeriodicTask2(const SharedHandle<DHTTask>& task)
{
periodicTaskQueue2_.push_back(task);
periodicTaskQueue2_.addTask(task);
}
void DHTTaskQueueImpl::addImmediateTask(const SharedHandle<DHTTask>& task)
{
immediateTaskQueue_.push_back(task);
immediateTaskQueue_.addTask(task);
}
} // namespace aria2

View File

@ -36,26 +36,21 @@
#define _D_DHT_TASK_QUEUE_IMPL_H_
#include "DHTTaskQueue.h"
#include <deque>
#include "DHTTaskExecutor.h"
namespace aria2 {
class Logger;
class DHTTaskQueueImpl:public DHTTaskQueue {
private:
DHTTaskExecutor periodicTaskQueue1_;
SharedHandle<DHTTask> periodicTask1_;
DHTTaskExecutor periodicTaskQueue2_;
SharedHandle<DHTTask> periodicTask2_;
DHTTaskExecutor immediateTaskQueue_;
SharedHandle<DHTTask> immediateTask_;
std::deque<SharedHandle<DHTTask> > periodicTaskQueue1_;
std::deque<SharedHandle<DHTTask> > periodicTaskQueue2_;
std::deque<SharedHandle<DHTTask> > immediateTaskQueue_;
void executeTask(SharedHandle<DHTTask>& task, std::deque<SharedHandle<DHTTask> >& taskQueue);
Logger* logger_;
public:
DHTTaskQueueImpl();

View File

@ -406,6 +406,7 @@ SRCS += PeerAbstractCommand.cc PeerAbstractCommand.h\
DHTPingTask.cc DHTPingTask.h\
DHTTaskQueue.h\
DHTTaskQueueImpl.cc DHTTaskQueueImpl.h\
DHTTaskExecutor.cc DHTTaskExecutor.h\
DHTBucketRefreshTask.cc DHTBucketRefreshTask.h\
DHTAbstractNodeLookupTask.h\
DHTPeerLookupTask.cc DHTPeerLookupTask.h\

View File

@ -196,6 +196,7 @@ bin_PROGRAMS = aria2c$(EXEEXT)
@ENABLE_BITTORRENT_TRUE@ DHTPingTask.cc DHTPingTask.h\
@ENABLE_BITTORRENT_TRUE@ DHTTaskQueue.h\
@ENABLE_BITTORRENT_TRUE@ DHTTaskQueueImpl.cc DHTTaskQueueImpl.h\
@ENABLE_BITTORRENT_TRUE@ DHTTaskExecutor.cc DHTTaskExecutor.h\
@ENABLE_BITTORRENT_TRUE@ DHTBucketRefreshTask.cc DHTBucketRefreshTask.h\
@ENABLE_BITTORRENT_TRUE@ DHTAbstractNodeLookupTask.h\
@ENABLE_BITTORRENT_TRUE@ DHTPeerLookupTask.cc DHTPeerLookupTask.h\
@ -565,19 +566,20 @@ am__libaria2c_a_SOURCES_DIST = Socket.h SocketCore.cc SocketCore.h \
DHTPeerLookupTaskCallback.h DHTAbstractTask.cc \
DHTAbstractTask.h DHTTask.h DHTPingTask.cc DHTPingTask.h \
DHTTaskQueue.h DHTTaskQueueImpl.cc DHTTaskQueueImpl.h \
DHTBucketRefreshTask.cc DHTBucketRefreshTask.h \
DHTAbstractNodeLookupTask.h DHTPeerLookupTask.cc \
DHTPeerLookupTask.h DHTSetup.cc DHTSetup.h DHTTaskFactory.h \
DHTTaskFactoryImpl.cc DHTTaskFactoryImpl.h \
DHTInteractionCommand.cc DHTInteractionCommand.h \
DHTPeerAnnounceEntry.cc DHTPeerAnnounceEntry.h \
DHTPeerAnnounceStorage.cc DHTPeerAnnounceStorage.h \
DHTTokenTracker.cc DHTTokenTracker.h DHTGetPeersCommand.cc \
DHTGetPeersCommand.h DHTTokenUpdateCommand.cc \
DHTTokenUpdateCommand.h DHTBucketRefreshCommand.cc \
DHTBucketRefreshCommand.h DHTPeerAnnounceCommand.cc \
DHTPeerAnnounceCommand.h DHTReplaceNodeTask.cc \
DHTReplaceNodeTask.h DHTEntryPointNameResolveCommand.cc \
DHTTaskExecutor.cc DHTTaskExecutor.h DHTBucketRefreshTask.cc \
DHTBucketRefreshTask.h DHTAbstractNodeLookupTask.h \
DHTPeerLookupTask.cc DHTPeerLookupTask.h DHTSetup.cc \
DHTSetup.h DHTTaskFactory.h DHTTaskFactoryImpl.cc \
DHTTaskFactoryImpl.h DHTInteractionCommand.cc \
DHTInteractionCommand.h DHTPeerAnnounceEntry.cc \
DHTPeerAnnounceEntry.h DHTPeerAnnounceStorage.cc \
DHTPeerAnnounceStorage.h DHTTokenTracker.cc DHTTokenTracker.h \
DHTGetPeersCommand.cc DHTGetPeersCommand.h \
DHTTokenUpdateCommand.cc DHTTokenUpdateCommand.h \
DHTBucketRefreshCommand.cc DHTBucketRefreshCommand.h \
DHTPeerAnnounceCommand.cc DHTPeerAnnounceCommand.h \
DHTReplaceNodeTask.cc DHTReplaceNodeTask.h \
DHTEntryPointNameResolveCommand.cc \
DHTEntryPointNameResolveCommand.h DHTRoutingTableSerializer.cc \
DHTRoutingTableSerializer.h DHTRoutingTableDeserializer.cc \
DHTRoutingTableDeserializer.h DHTAutoSaveCommand.cc \
@ -746,6 +748,7 @@ am__objects_6 =
@ENABLE_BITTORRENT_TRUE@ DHTAbstractTask.$(OBJEXT) \
@ENABLE_BITTORRENT_TRUE@ DHTPingTask.$(OBJEXT) \
@ENABLE_BITTORRENT_TRUE@ DHTTaskQueueImpl.$(OBJEXT) \
@ENABLE_BITTORRENT_TRUE@ DHTTaskExecutor.$(OBJEXT) \
@ENABLE_BITTORRENT_TRUE@ DHTBucketRefreshTask.$(OBJEXT) \
@ENABLE_BITTORRENT_TRUE@ DHTPeerLookupTask.$(OBJEXT) \
@ENABLE_BITTORRENT_TRUE@ DHTSetup.$(OBJEXT) \
@ -1460,6 +1463,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DHTRoutingTableDeserializer.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DHTRoutingTableSerializer.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DHTSetup.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DHTTaskExecutor.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DHTTaskFactoryImpl.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DHTTaskQueueImpl.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DHTTokenTracker.Po@am__quote@

View File

@ -0,0 +1,48 @@
#include "DHTTaskExecutor.h"
#include <cppunit/extensions/HelperMacros.h>
#include "MockDHTTask.h"
#include "array_fun.h"
namespace aria2 {
class DHTTaskExecutorTest:public CppUnit::TestFixture {
CPPUNIT_TEST_SUITE(DHTTaskExecutorTest);
CPPUNIT_TEST(testUpdate);
CPPUNIT_TEST_SUITE_END();
public:
void testUpdate();
};
CPPUNIT_TEST_SUITE_REGISTRATION(DHTTaskExecutorTest);
void DHTTaskExecutorTest::testUpdate()
{
SharedHandle<DHTNode> rn;
DHTTaskExecutor tex(2);
SharedHandle<MockDHTTask> tasks[] = {
SharedHandle<MockDHTTask>(new MockDHTTask(rn)),
SharedHandle<MockDHTTask>(new MockDHTTask(rn)),
SharedHandle<MockDHTTask>(new MockDHTTask(rn)),
SharedHandle<MockDHTTask>(new MockDHTTask(rn))
};
tasks[1]->finished_ = true;
for(size_t i = 0; i < A2_ARRAY_LEN(tasks); ++i) {
tex.addTask(tasks[i]);
}
CPPUNIT_ASSERT_EQUAL((size_t)0, tex.getExecutingTaskSize());
CPPUNIT_ASSERT_EQUAL((size_t)4, tex.getQueueSize());
tex.update();
CPPUNIT_ASSERT_EQUAL((size_t)2, tex.getExecutingTaskSize());
CPPUNIT_ASSERT_EQUAL((size_t)1, tex.getQueueSize());
tasks[0]->finished_ = true;
tasks[2]->finished_ = true;
tasks[3]->finished_ = true;
tex.update();
CPPUNIT_ASSERT_EQUAL((size_t)0, tex.getExecutingTaskSize());
CPPUNIT_ASSERT_EQUAL((size_t)0, tex.getQueueSize());
}
} // namespace aria2

View File

@ -175,6 +175,7 @@ aria2c_SOURCES += BtAllowedFastMessageTest.cc\
DHTIDCloserTest.cc\
DHTRoutingTableSerializerTest.cc\
DHTRoutingTableDeserializerTest.cc\
DHTTaskExecutorTest.cc\
DHKeyExchangeTest.cc\
ARC4Test.cc\
MSEHandshakeTest.cc\

View File

@ -121,6 +121,7 @@ check_PROGRAMS = $(am__EXEEXT_1)
@ENABLE_BITTORRENT_TRUE@ DHTIDCloserTest.cc\
@ENABLE_BITTORRENT_TRUE@ DHTRoutingTableSerializerTest.cc\
@ENABLE_BITTORRENT_TRUE@ DHTRoutingTableDeserializerTest.cc\
@ENABLE_BITTORRENT_TRUE@ DHTTaskExecutorTest.cc\
@ENABLE_BITTORRENT_TRUE@ DHKeyExchangeTest.cc\
@ENABLE_BITTORRENT_TRUE@ ARC4Test.cc\
@ENABLE_BITTORRENT_TRUE@ MSEHandshakeTest.cc\
@ -260,18 +261,19 @@ am__aria2c_SOURCES_DIST = AllTest.cc TestUtil.cc TestUtil.h \
DHTPeerAnnounceEntryTest.cc DHTPeerAnnounceStorageTest.cc \
DHTTokenTrackerTest.cc XORCloserTest.cc DHTIDCloserTest.cc \
DHTRoutingTableSerializerTest.cc \
DHTRoutingTableDeserializerTest.cc DHKeyExchangeTest.cc \
ARC4Test.cc MSEHandshakeTest.cc MockBtAnnounce.h \
MockBtProgressInfoFile.h MockBtRequestFactory.h \
MockDHTMessage.h MockDHTMessageCallback.h \
MockDHTMessageDispatcher.h MockDHTMessageFactory.h \
MockDHTTask.h MockDHTTaskFactory.h MockDHTTaskQueue.h \
MockExtensionMessage.h MockExtensionMessageFactory.h \
MockPieceStorage.h BittorrentHelperTest.cc \
PriorityPieceSelectorTest.cc MockPieceSelector.h \
extension_message_test_helper.h LpdMessageDispatcherTest.cc \
LpdMessageReceiverTest.cc Bencode2Test.cc MetalinkerTest.cc \
MetalinkEntryTest.cc Metalink2RequestGroupTest.cc \
DHTRoutingTableDeserializerTest.cc DHTTaskExecutorTest.cc \
DHKeyExchangeTest.cc ARC4Test.cc MSEHandshakeTest.cc \
MockBtAnnounce.h MockBtProgressInfoFile.h \
MockBtRequestFactory.h MockDHTMessage.h \
MockDHTMessageCallback.h MockDHTMessageDispatcher.h \
MockDHTMessageFactory.h MockDHTTask.h MockDHTTaskFactory.h \
MockDHTTaskQueue.h MockExtensionMessage.h \
MockExtensionMessageFactory.h MockPieceStorage.h \
BittorrentHelperTest.cc PriorityPieceSelectorTest.cc \
MockPieceSelector.h extension_message_test_helper.h \
LpdMessageDispatcherTest.cc LpdMessageReceiverTest.cc \
Bencode2Test.cc MetalinkerTest.cc MetalinkEntryTest.cc \
Metalink2RequestGroupTest.cc \
MetalinkPostDownloadHandlerTest.cc MetalinkHelperTest.cc \
MetalinkParserControllerTest.cc MetalinkProcessorTest.cc
@ENABLE_XML_RPC_TRUE@am__objects_1 = XmlRpcRequestParserControllerTest.$(OBJEXT) \
@ -354,6 +356,7 @@ am__aria2c_SOURCES_DIST = AllTest.cc TestUtil.cc TestUtil.h \
@ENABLE_BITTORRENT_TRUE@ DHTIDCloserTest.$(OBJEXT) \
@ENABLE_BITTORRENT_TRUE@ DHTRoutingTableSerializerTest.$(OBJEXT) \
@ENABLE_BITTORRENT_TRUE@ DHTRoutingTableDeserializerTest.$(OBJEXT) \
@ENABLE_BITTORRENT_TRUE@ DHTTaskExecutorTest.$(OBJEXT) \
@ENABLE_BITTORRENT_TRUE@ DHKeyExchangeTest.$(OBJEXT) \
@ENABLE_BITTORRENT_TRUE@ ARC4Test.$(OBJEXT) \
@ENABLE_BITTORRENT_TRUE@ MSEHandshakeTest.$(OBJEXT) \
@ -804,6 +807,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DHTRoutingTableDeserializerTest.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DHTRoutingTableSerializerTest.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DHTRoutingTableTest.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DHTTaskExecutorTest.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DHTTokenTrackerTest.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DHTUnknownMessageTest.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DNSCacheTest.Po@am__quote@

View File

@ -2,9 +2,11 @@
#define _D_MOCK_DHT_TASK_H_
#include "DHTTask.h"
#include <cstring>
#include "DHTNode.h"
#include "DHTConstants.h"
#include <cstring>
namespace aria2 {
@ -14,7 +16,11 @@ public:
unsigned char targetID_[DHT_ID_LENGTH];
MockDHTTask(const SharedHandle<DHTNode>& remoteNode):remoteNode_(remoteNode) {}
bool finished_;
MockDHTTask(const SharedHandle<DHTNode>& remoteNode):
remoteNode_(remoteNode),
finished_(false) {}
virtual ~MockDHTTask() {}
@ -22,7 +28,7 @@ public:
virtual bool finished()
{
return false;
return finished_;
}
void setTargetID(const unsigned char* targetID)