2008-02-13 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>

Added the ability to load nodes from torrent file. These nodes are
	added to the routing table when downloading that torrent.
	* src/BtContext.h
	* src/DefaultBtContext.{h, cc}
	* src/DHTSetup.cc
	* src/DHTEntryPointNameResolveCommand.{h, cc}: Now accepts list of
	hostname and port pair, and resolves all of them.
	* src/NameResolver.{h, cc}: Added reset().
	* src/RequestGroup.cc	
	* test/DefaultBtContextTest.cc 
	* test/MockBtContext.h
	
	Removed assert() from DefaultBtContext and throw exception instead.
	* src/DefaultBtContext.cc
pull/1/head
Tatsuhiro Tsujikawa 2008-02-13 15:17:08 +00:00
parent bfcc300670
commit 1302123368
12 changed files with 312 additions and 28 deletions

View File

@ -1,3 +1,20 @@
2008-02-13 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>
Added the ability to load nodes from torrent file. These nodes are
added to the routing table when downloading that torrent.
* src/BtContext.h
* src/DefaultBtContext.{h, cc}
* src/DHTSetup.cc
* src/DHTEntryPointNameResolveCommand.{h, cc}: Now accepts list of
hostname and port pair, and resolves all of them.
* src/NameResolver.{h, cc}: Added reset().
* src/RequestGroup.cc
* test/DefaultBtContextTest.cc
* test/MockBtContext.h
Removed assert() from DefaultBtContext and throw exception instead.
* src/DefaultBtContext.cc
2008-02-12 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>
Fixed segmentation fault bug when exiting if dht is disabled.

View File

@ -37,6 +37,8 @@
#include "DownloadContext.h"
#include "BtContextDecl.h"
#include <utility>
#include <deque>
namespace aria2 {
@ -77,6 +79,8 @@ public:
virtual RequestGroup* getOwnerRequestGroup() = 0;
virtual std::deque<std::pair<std::string, uint16_t> >& getNodes() = 0;
};
} // namespace aria2

View File

@ -44,19 +44,23 @@
#include "DHTNode.h"
#include "DHTTaskQueue.h"
#include "DHTTaskFactory.h"
#include "DHTRoutingTable.h"
#include "DHTTask.h"
#include "RequestGroupMan.h"
#include "Logger.h"
namespace aria2 {
DHTEntryPointNameResolveCommand::DHTEntryPointNameResolveCommand(int32_t cuid, DownloadEngine* e):
DHTEntryPointNameResolveCommand::DHTEntryPointNameResolveCommand(int32_t cuid, DownloadEngine* e, const std::deque<std::pair<std::string, uint16_t> >& entryPoints):
Command(cuid),
_e(e),
_resolver(new NameResolver()),
_taskQueue(0),
_taskFactory(0),
_localNode(0)
_routingTable(0),
_localNode(0),
_entryPoints(entryPoints),
_bootstrapEnabled(false)
{}
DHTEntryPointNameResolveCommand::~DHTEntryPointNameResolveCommand()
@ -71,25 +75,42 @@ bool DHTEntryPointNameResolveCommand::execute()
if(_e->_requestGroupMan->downloadFinished() || _e->isHaltRequested()) {
return true;
}
try {
std::string hostname = _e->option->get(PREF_DHT_ENTRY_POINT_HOST);
if(!Util::isNumbersAndDotsNotation(hostname)) {
if(resolveHostname(hostname, _resolver)) {
hostname = _resolver->getAddrString();
} else {
_e->commands.push_back(this);
return false;
while(_entryPoints.size()) {
std::string hostname = _entryPoints.front().first;
try {
if(Util::isNumbersAndDotsNotation(hostname)) {
std::pair<std::string, uint16_t> p(hostname,
_entryPoints.front().second);
_resolvedEntryPoints.push_back(p);
_entryPoints.erase(_entryPoints.begin());
addPingTask(p);
} else {
if(resolveHostname(hostname, _resolver)) {
hostname = _resolver->getAddrString();
_resolver->reset();
std::pair<std::string, uint16_t> p(hostname,
_entryPoints.front().second);
_resolvedEntryPoints.push_back(p);
_entryPoints.erase(_entryPoints.begin());
addPingTask(p);
} else {
_e->commands.push_back(this);
return false;
}
}
} catch(RecoverableException* e) {
logger->error(EX_EXCEPTION_CAUGHT, e);
delete e;
_entryPoints.erase(_entryPoints.begin());
_resolver->reset();
}
}
SharedHandle<DHTNode> entryNode = new DHTNode();
entryNode->setIPAddress(hostname);
entryNode->setPort(_e->option->getAsInt(PREF_DHT_ENTRY_POINT_PORT));
_taskQueue->addPeriodicTask1(_taskFactory->createPingTask(entryNode, 10));
_taskQueue->addPeriodicTask1(_taskFactory->createNodeLookupTask(_localNode->getID()));
_taskQueue->addPeriodicTask1(_taskFactory->createBucketRefreshTask());
if(_bootstrapEnabled && _resolvedEntryPoints.size()) {
_taskQueue->addPeriodicTask1(_taskFactory->createNodeLookupTask(_localNode->getID()));
_taskQueue->addPeriodicTask1(_taskFactory->createBucketRefreshTask());
}
} catch(RecoverableException* e) {
logger->error(EX_EXCEPTION_CAUGHT, e);
delete e;
@ -97,6 +118,15 @@ bool DHTEntryPointNameResolveCommand::execute()
return true;
}
void DHTEntryPointNameResolveCommand::addPingTask(const std::pair<std::string, uint16_t>& addr)
{
SharedHandle<DHTNode> entryNode = new DHTNode();
entryNode->setIPAddress(addr.first);
entryNode->setPort(addr.second);
_taskQueue->addPeriodicTask1(_taskFactory->createPingTask(entryNode, 10));
}
bool DHTEntryPointNameResolveCommand::resolveHostname(const std::string& hostname,
const NameResolverHandle& resolver)
{
@ -148,6 +178,11 @@ void DHTEntryPointNameResolveCommand::disableNameResolverCheck(const SharedHandl
}
#endif // ENABLE_ASYNC_DNS
void DHTEntryPointNameResolveCommand::setBootstrapEnabled(bool f)
{
_bootstrapEnabled = f;
}
void DHTEntryPointNameResolveCommand::setTaskQueue(const SharedHandle<DHTTaskQueue>& taskQueue)
{
_taskQueue = taskQueue;
@ -158,6 +193,11 @@ void DHTEntryPointNameResolveCommand::setTaskFactory(const SharedHandle<DHTTaskF
_taskFactory = taskFactory;
}
void DHTEntryPointNameResolveCommand::setRoutingTable(const SharedHandle<DHTRoutingTable>& routingTable)
{
_routingTable = routingTable;
}
void DHTEntryPointNameResolveCommand::setLocalNode(const SharedHandle<DHTNode>& localNode)
{
_localNode = localNode;

View File

@ -37,11 +37,14 @@
#include "Command.h"
#include "SharedHandle.h"
#include <utility>
#include <deque>
namespace aria2 {
class DHTTaskQueue;
class DHTTaskFactory;
class DHTRoutingTable;
class DHTNode;
class DownloadEngine;
class NameResolver;
@ -56,8 +59,18 @@ private:
SharedHandle<DHTTaskFactory> _taskFactory;
SharedHandle<DHTRoutingTable> _routingTable;
SharedHandle<DHTNode> _localNode;
std::deque<std::pair<std::string, uint16_t> > _entryPoints;
std::deque<std::pair<std::string, uint16_t> > _resolvedEntryPoints;
bool _bootstrapEnabled;
void addPingTask(const std::pair<std::string, uint16_t>& addr);
bool resolveHostname(const std::string& hostname,
const SharedHandle<NameResolver>& resolver);
@ -66,16 +79,21 @@ private:
void disableNameResolverCheck(const SharedHandle<NameResolver>& resolver);
public:
DHTEntryPointNameResolveCommand(int32_t cuid, DownloadEngine* e);
DHTEntryPointNameResolveCommand(int32_t cuid, DownloadEngine* e,
const std::deque<std::pair<std:: string, uint16_t> >& entryPoints);
virtual ~DHTEntryPointNameResolveCommand();
virtual bool execute();
void setBootstrapEnabled(bool f);
void setTaskQueue(const SharedHandle<DHTTaskQueue>& taskQueue);
void setTaskFactory(const SharedHandle<DHTTaskFactory>& taskFactory);
void setRoutingTable(const SharedHandle<DHTRoutingTable>& routingTable);
void setLocalNode(const SharedHandle<DHTNode>& localNode);
};

View File

@ -184,9 +184,15 @@ Commands DHTSetup::setup(DownloadEngine* e, const Option* option)
Commands commands;
if(!option->get(PREF_DHT_ENTRY_POINT_HOST).empty()) {
{
DHTEntryPointNameResolveCommand* command = new DHTEntryPointNameResolveCommand(CUIDCounterSingletonHolder::instance()->newID(), e);
std::pair<std::string, uint16_t> addr(option->get(PREF_DHT_ENTRY_POINT_HOST),
option->getAsInt(PREF_DHT_ENTRY_POINT_PORT));
std::deque<std::pair<std::string, uint16_t> > entryPoints;
entryPoints.push_back(addr);
DHTEntryPointNameResolveCommand* command = new DHTEntryPointNameResolveCommand(CUIDCounterSingletonHolder::instance()->newID(), e, entryPoints);
command->setBootstrapEnabled(true);
command->setTaskQueue(taskQueue);
command->setTaskFactory(taskFactory);
command->setRoutingTable(routingTable);
command->setLocalNode(localNode);
commands.push_back(command);
}

View File

@ -48,7 +48,6 @@
#include "Logger.h"
#include "FileEntry.h"
#include "message.h"
#include <cassert>
#include <cstring>
#include <ostream>
#include <functional>
@ -101,11 +100,7 @@ void DefaultBtContext::clear() {
void DefaultBtContext::extractPieceHash(const unsigned char* hashData,
int32_t hashDataLength,
int32_t hashLength) {
assert(hashDataLength > 0);
assert(hashLength > 0);
int32_t numPieces = hashDataLength/hashLength;
assert(numPieces > 0);
for(int32_t i = 0; i < numPieces; i++) {
pieceHashes.push_back(Util::toHex(&hashData[i*hashLength],
hashLength));
@ -233,6 +228,35 @@ std::deque<std::string> DefaultBtContext::extractUrlList(const MetaEntry* obj)
return uris;
}
void DefaultBtContext::extractNodes(const List* nodes)
{
for(std::deque<MetaEntry*>::const_iterator i = nodes->getList().begin();
i != nodes->getList().end(); ++i) {
const List* addrPair = dynamic_cast<const List*>(*i);
if(!addrPair || addrPair->getList().size() != 2) {
continue;
}
const Data* hostname = dynamic_cast<const Data*>(addrPair->getList()[0]);
if(!hostname) {
continue;
}
std::string h = hostname->toString();
if(Util::trim(h).empty()) {
continue;
}
const Data* port = dynamic_cast<const Data*>(addrPair->getList()[1]);
if(!port) {
continue;
}
uint16_t p = port->toInt();
if(p == 0) {
continue;
}
_nodes.push_back(std::pair<std::string, uint16_t>(h, p));
}
}
void DefaultBtContext::loadFromMemory(const char* content, int32_t length, const std::string& defaultName)
{
SharedHandle<MetaEntry> rootEntry = MetaFileUtil::bdecoding(content, length);
@ -271,7 +295,13 @@ void DefaultBtContext::processRootDictionary(const Dictionary* rootDic, const st
if(!pieceHashData) {
throw new DlAbortEx(MSG_SOMETHING_MISSING_IN_TORRENT, "pieces");
}
if(pieceHashData->getLen() == 0) {
throw new DlAbortEx("The length of piece hash is 0.");
}
numPieces = pieceHashData->getLen()/PIECE_HASH_LENGTH;
if(numPieces == 0) {
throw new DlAbortEx("The number of pieces is 0.");
}
// retrieve piece length
const Data* pieceLengthData = dynamic_cast<const Data*>(infoDic->get("piece length"));
if(!pieceLengthData) {
@ -305,8 +335,10 @@ void DefaultBtContext::processRootDictionary(const Dictionary* rootDic, const st
} else if(announceData) {
extractAnnounce(announceData);
}
if(!announceTiers.size()) {
throw new DlAbortEx("No announce URL found.");
// retrieve nodes
const List* nodes = dynamic_cast<const List*>(rootDic->get("nodes"));
if(nodes) {
extractNodes(nodes);
}
}
@ -415,4 +447,10 @@ void DefaultBtContext::setRandomizer(const RandomizerHandle& randomizer)
_randomizer = randomizer;
}
std::deque<std::pair<std::string, uint16_t> >&
DefaultBtContext::getNodes()
{
return _nodes;
}
} // namespace aria2

View File

@ -64,6 +64,7 @@ private:
std::string peerId;
std::string _peerIdPrefix;
std::deque<SharedHandle<AnnounceTier> > announceTiers;
std::deque<std::pair<std::string, uint16_t> > _nodes;
SharedHandle<Randomizer> _randomizer;
RequestGroup* _ownerRequestGroup;
@ -82,6 +83,8 @@ private:
std::deque<std::string> extractUrlList(const MetaEntry* obj);
void extractNodes(const List* nodes);
void processRootDictionary(const Dictionary* rootDic, const std::string& defaultName);
public:
@ -140,6 +143,8 @@ private:
return _ownerRequestGroup;
}
virtual std::deque<std::pair<std::string, uint16_t> >& getNodes();
std::string generatePeerId() const;
void setPeerIdPrefix(const std::string& peerIdPrefix)

View File

@ -84,6 +84,14 @@ void NameResolver::setAddr(const std::string& addrString)
inet_aton(addrString.c_str(), &addr);
}
void NameResolver::reset()
{
status = STATUS_READY;
ares_destroy(channel);
// TODO evaluate return value
ares_init(&channel);
}
#else // ENABLE_ASYNC_DNS
#include "DlAbortEx.h"
@ -118,6 +126,8 @@ void NameResolver::setAddr(const std::string& addrString)
inet_aton(addrString.c_str(), &_addr);
}
void NameResolver::reset() {}
#endif // ENABLE_ASYNC_DNS
} // namespace aria2

View File

@ -81,6 +81,7 @@ public:
NameResolver():
status(STATUS_READY)
{
// TODO evaluate return value
ares_init(&channel);
}
@ -117,6 +118,8 @@ public:
}
void setAddr(const std::string& addrString);
void reset();
};
#else // ENABLE_ASYNC_DNS
@ -130,6 +133,8 @@ public:
std::string getAddrString() const;
void setAddr(const std::string& addrString);
void reset();
};
#endif // ENABLE_ASYNC_DNS

View File

@ -91,6 +91,7 @@
# include "PeerConnection.h"
# include "ExtensionMessageFactory.h"
# include "DHTPeerAnnounceStorage.h"
# include "DHTEntryPointNameResolveCommand.h"
#endif // ENABLE_BITTORRENT
#ifdef ENABLE_METALINK
# include "MetalinkPostDownloadHandler.h"
@ -243,6 +244,15 @@ Commands RequestGroup::createInitialCommand(DownloadEngine* e)
if(!btContext->isPrivate() && _option->getAsBool(PREF_ENABLE_DHT)) {
e->addCommand(DHTSetup().setup(e, _option));
if(btContext->getNodes().size() && DHTSetup::initialized()) {
DHTEntryPointNameResolveCommand* command =
new DHTEntryPointNameResolveCommand(CUIDCounterSingletonHolder::instance()->newID(), e, btContext->getNodes());
command->setTaskQueue(DHTRegistry::_taskQueue);
command->setTaskFactory(DHTRegistry::_taskFactory);
command->setRoutingTable(DHTRegistry::_routingTable);
command->setLocalNode(DHTRegistry::_localNode);
e->commands.push_back(command);
}
}
CheckIntegrityEntryHandle entry = new BtCheckIntegrityEntry(this);

View File

@ -1,6 +1,6 @@
#include "DefaultBtContext.h"
#include "Util.h"
#include "Exception.h"
#include "RecoverableException.h"
#include "AnnounceTier.h"
#include "FixedNumberRandomizer.h"
#include "FileEntry.h"
@ -31,6 +31,7 @@ class DefaultBtContextTest:public CppUnit::TestFixture {
CPPUNIT_TEST(testGetFileEntries_singleFileUrlList);
CPPUNIT_TEST(testLoadFromMemory);
CPPUNIT_TEST(testLoadFromMemory_somethingMissing);
CPPUNIT_TEST(testGetNodes);
CPPUNIT_TEST_SUITE_END();
public:
void setUp() {
@ -56,6 +57,7 @@ public:
void testGetFileEntries_singleFileUrlList();
void testLoadFromMemory();
void testLoadFromMemory_somethingMissing();
void testGetNodes();
};
@ -343,4 +345,121 @@ void DefaultBtContextTest::testLoadFromMemory_somethingMissing()
}
}
void DefaultBtContextTest::testGetNodes()
{
{
std::string memory =
"d5:nodesl"
"l11:192.168.0.1i6881ee"
"l11:192.168.0.24:6882e"
"e4:infod4:name13:aria2.tar.bz26:lengthi262144e"
"12:piece lengthi262144e"
"6:pieces20:AAAAAAAAAAAAAAAAAAAA"
"ee";
DefaultBtContext btContext;
btContext.loadFromMemory(memory.c_str(), memory.size(), "default");
const std::deque<std::pair<std::string, uint16_t> >& nodes =
btContext.getNodes();
CPPUNIT_ASSERT_EQUAL((size_t)2, nodes.size());
CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.1"), nodes[0].first);
CPPUNIT_ASSERT_EQUAL((uint16_t)6881, nodes[0].second);
CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.2"), nodes[1].first);
CPPUNIT_ASSERT_EQUAL((uint16_t)6882, nodes[1].second);
}
{
// empty hostname
std::string memory =
"d5:nodesl"
"l1: i6881ee"
"l11:192.168.0.24:6882e"
"e4:infod4:name13:aria2.tar.bz26:lengthi262144e"
"12:piece lengthi262144e"
"6:pieces20:AAAAAAAAAAAAAAAAAAAA"
"ee";
DefaultBtContext btContext;
btContext.loadFromMemory(memory.c_str(), memory.size(), "default");
const std::deque<std::pair<std::string, uint16_t> >& nodes =
btContext.getNodes();
CPPUNIT_ASSERT_EQUAL((size_t)1, nodes.size());
CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.2"), nodes[0].first);
CPPUNIT_ASSERT_EQUAL((uint16_t)6882, nodes[0].second);
}
{
// bad port
std::string memory =
"d5:nodesl"
"l11:192.168.0.11:xe"
"l11:192.168.0.24:6882e"
"e4:infod4:name13:aria2.tar.bz26:lengthi262144e"
"12:piece lengthi262144e"
"6:pieces20:AAAAAAAAAAAAAAAAAAAA"
"ee";
DefaultBtContext btContext;
btContext.loadFromMemory(memory.c_str(), memory.size(), "default");
const std::deque<std::pair<std::string, uint16_t> >& nodes =
btContext.getNodes();
CPPUNIT_ASSERT_EQUAL((size_t)1, nodes.size());
CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.2"), nodes[0].first);
CPPUNIT_ASSERT_EQUAL((uint16_t)6882, nodes[0].second);
}
{
// port missing
std::string memory =
"d5:nodesl"
"l11:192.168.0.1e"
"l11:192.168.0.24:6882e"
"e4:infod4:name13:aria2.tar.bz26:lengthi262144e"
"12:piece lengthi262144e"
"6:pieces20:AAAAAAAAAAAAAAAAAAAA"
"ee";
DefaultBtContext btContext;
btContext.loadFromMemory(memory.c_str(), memory.size(), "default");
const std::deque<std::pair<std::string, uint16_t> >& nodes =
btContext.getNodes();
CPPUNIT_ASSERT_EQUAL((size_t)1, nodes.size());
CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.2"), nodes[0].first);
CPPUNIT_ASSERT_EQUAL((uint16_t)6882, nodes[0].second);
}
{
// nodes is not a list
std::string memory =
"d5:nodes"
"l11:192.168.0.1e"
"4:infod4:name13:aria2.tar.bz26:lengthi262144e"
"12:piece lengthi262144e"
"6:pieces20:AAAAAAAAAAAAAAAAAAAA"
"ee";
DefaultBtContext btContext;
btContext.loadFromMemory(memory.c_str(), memory.size(), "default");
const std::deque<std::pair<std::string, uint16_t> >& nodes =
btContext.getNodes();
CPPUNIT_ASSERT_EQUAL((size_t)0, nodes.size());
}
{
// the element of node is not Data
std::string memory =
"d5:nodesl"
"ll11:192.168.0.1i6881eee"
"l11:192.168.0.24:6882e"
"e4:infod4:name13:aria2.tar.bz26:lengthi262144e"
"12:piece lengthi262144e"
"6:pieces20:AAAAAAAAAAAAAAAAAAAA"
"ee";
DefaultBtContext btContext;
btContext.loadFromMemory(memory.c_str(), memory.size(), "default");
const std::deque<std::pair<std::string, uint16_t> >& nodes =
btContext.getNodes();
CPPUNIT_ASSERT_EQUAL((size_t)1, nodes.size());
CPPUNIT_ASSERT_EQUAL(std::string("192.168.0.2"), nodes[0].first);
CPPUNIT_ASSERT_EQUAL((uint16_t)6882, nodes[0].second);
}
}
} // namespace aria2

View File

@ -19,6 +19,7 @@ private:
unsigned char peerId[20];
std::deque<SharedHandle<FileEntry> > fileEntries;
std::deque<SharedHandle<AnnounceTier> > announceTiers;
std::deque<std::pair<std::string, uint16_t> > _nodes;
std::deque<int32_t> fastSet;
public:
MockBtContext():totalLength(0),
@ -145,6 +146,17 @@ public:
{
return 0;
}
virtual std::deque<std::pair<std::string, uint16_t> >& getNodes()
{
return _nodes;
}
void setNodes(const std::deque<std::pair<std::string, uint16_t> >& nodes)
{
_nodes = nodes;
}
};
} // namespace aria2