/* */ #include "DHTSetup.h" #include #include #include "LogFactory.h" #include "Logger.h" #include "util.h" #include "File.h" #include "DHTNode.h" #include "DHTConnectionImpl.h" #include "DHTRoutingTable.h" #include "DHTMessageFactoryImpl.h" #include "DHTMessageTracker.h" #include "DHTMessageDispatcherImpl.h" #include "DHTMessageReceiver.h" #include "DHTTaskQueueImpl.h" #include "DHTTaskFactoryImpl.h" #include "DHTPeerAnnounceStorage.h" #include "DHTTokenTracker.h" #include "DHTInteractionCommand.h" #include "DHTTokenUpdateCommand.h" #include "DHTBucketRefreshCommand.h" #include "DHTPeerAnnounceCommand.h" #include "DHTEntryPointNameResolveCommand.h" #include "DHTAutoSaveCommand.h" #include "DHTTask.h" #include "DHTRoutingTableDeserializer.h" #include "DHTRegistry.h" #include "DHTBucketRefreshTask.h" #include "DHTMessageCallback.h" #include "prefs.h" #include "Option.h" #include "SocketCore.h" #include "DlAbortEx.h" #include "RecoverableException.h" #include "a2functional.h" #include "DownloadEngine.h" #include "wallclock.h" #include "RequestGroupMan.h" #include "FileAllocationEntry.h" #include "CheckIntegrityEntry.h" #include "ServerStatMan.h" #include "FileEntry.h" namespace aria2 { // TODO DownloadEngine should hold this flag. bool DHTSetup::_initialized = false; DHTSetup::DHTSetup():_logger(LogFactory::getInstance()) {} DHTSetup::~DHTSetup() {} void DHTSetup::setup(std::vector& commands, DownloadEngine* e) { if(_initialized) { return; } try { std::vector* tempCommands = new std::vector(); auto_delete_container > commandsDel(tempCommands); // load routing table and localnode id here SharedHandle localNode; DHTRoutingTableDeserializer deserializer; std::string dhtFile = e->getOption()->get(PREF_DHT_FILE_PATH); try { std::ifstream in(dhtFile.c_str(), std::ios::binary); if(!in) { throw DL_ABORT_EX("Could not open file"); } deserializer.deserialize(in); localNode = deserializer.getLocalNode(); } catch(RecoverableException& e) { _logger->error("Exception caught while loading DHT routing table from %s", e, dhtFile.c_str()); } if(localNode.isNull()) { localNode.reset(new DHTNode()); } SharedHandle connection(new DHTConnectionImpl()); { IntSequence seq = util::parseIntRange(e->getOption()->get(PREF_DHT_LISTEN_PORT)); uint16_t port; if(!connection->bind(port, seq)) { throw DL_ABORT_EX("Error occurred while binding port for DHT"); } localNode->setPort(port); } if(_logger->debug()) { _logger->debug("Initialized local node ID=%s", util::toHex(localNode->getID(), DHT_ID_LENGTH).c_str()); } SharedHandle routingTable(new DHTRoutingTable(localNode)); SharedHandle factory(new DHTMessageFactoryImpl()); SharedHandle tracker(new DHTMessageTracker()); SharedHandle dispatcher(new DHTMessageDispatcherImpl(tracker)); SharedHandle receiver(new DHTMessageReceiver(tracker)); SharedHandle taskQueue(new DHTTaskQueueImpl()); SharedHandle taskFactory(new DHTTaskFactoryImpl()); SharedHandle peerAnnounceStorage(new DHTPeerAnnounceStorage()); SharedHandle tokenTracker(new DHTTokenTracker()); const time_t messageTimeout = e->getOption()->getAsInt(PREF_DHT_MESSAGE_TIMEOUT); // wiring up tracker->setRoutingTable(routingTable); tracker->setMessageFactory(factory); dispatcher->setTimeout(messageTimeout); receiver->setConnection(connection); receiver->setMessageFactory(factory); receiver->setRoutingTable(routingTable); taskFactory->setLocalNode(localNode); taskFactory->setRoutingTable(routingTable); taskFactory->setMessageDispatcher(dispatcher); taskFactory->setMessageFactory(factory); taskFactory->setTaskQueue(taskQueue); taskFactory->setTimeout(messageTimeout); routingTable->setTaskQueue(taskQueue); routingTable->setTaskFactory(taskFactory); peerAnnounceStorage->setTaskQueue(taskQueue); peerAnnounceStorage->setTaskFactory(taskFactory); factory->setRoutingTable(routingTable); factory->setConnection(connection); factory->setMessageDispatcher(dispatcher); factory->setPeerAnnounceStorage(peerAnnounceStorage); factory->setTokenTracker(tokenTracker); factory->setLocalNode(localNode); // assign them into DHTRegistry DHTRegistry::getMutableData().localNode = localNode; DHTRegistry::getMutableData().routingTable = routingTable; DHTRegistry::getMutableData().taskQueue = taskQueue; DHTRegistry::getMutableData().taskFactory = taskFactory; DHTRegistry::getMutableData().peerAnnounceStorage = peerAnnounceStorage; DHTRegistry::getMutableData().tokenTracker = tokenTracker; DHTRegistry::getMutableData().messageDispatcher = dispatcher; DHTRegistry::getMutableData().messageReceiver = receiver; DHTRegistry::getMutableData().messageFactory = factory; // add deserialized nodes to routing table const std::vector >& desnodes = deserializer.getNodes(); for(std::vector >::const_iterator i = desnodes.begin(), eoi = desnodes.end(); i != eoi; ++i) { routingTable->addNode(*i); } if(!desnodes.empty() && deserializer.getSerializedTime(). difference() >= DHT_BUCKET_REFRESH_INTERVAL) { SharedHandle task (static_pointer_cast (taskFactory->createBucketRefreshTask())); task->setForceRefresh(true); taskQueue->addPeriodicTask1(task); } if(!e->getOption()->get(PREF_DHT_ENTRY_POINT_HOST).empty()) { { std::pair addr (e->getOption()->get(PREF_DHT_ENTRY_POINT_HOST), e->getOption()->getAsInt(PREF_DHT_ENTRY_POINT_PORT)); std::vector > entryPoints; entryPoints.push_back(addr); DHTEntryPointNameResolveCommand* command = new DHTEntryPointNameResolveCommand(e->newCUID(), e, entryPoints); command->setBootstrapEnabled(true); command->setTaskQueue(taskQueue); command->setTaskFactory(taskFactory); command->setRoutingTable(routingTable); command->setLocalNode(localNode); tempCommands->push_back(command); } } else { _logger->info("No DHT entry point specified."); } { DHTInteractionCommand* command = new DHTInteractionCommand(e->newCUID(), e); command->setMessageDispatcher(dispatcher); command->setMessageReceiver(receiver); command->setTaskQueue(taskQueue); command->setReadCheckSocket(connection->getSocket()); tempCommands->push_back(command); } { DHTTokenUpdateCommand* command = new DHTTokenUpdateCommand(e->newCUID(), e, DHT_TOKEN_UPDATE_INTERVAL); command->setTokenTracker(tokenTracker); tempCommands->push_back(command); } { DHTBucketRefreshCommand* command = new DHTBucketRefreshCommand(e->newCUID(), e, DHT_BUCKET_REFRESH_CHECK_INTERVAL); command->setTaskQueue(taskQueue); command->setRoutingTable(routingTable); command->setTaskFactory(taskFactory); tempCommands->push_back(command); } { DHTPeerAnnounceCommand* command = new DHTPeerAnnounceCommand(e->newCUID(), e, DHT_PEER_ANNOUNCE_CHECK_INTERVAL); command->setPeerAnnounceStorage(peerAnnounceStorage); tempCommands->push_back(command); } { DHTAutoSaveCommand* command = new DHTAutoSaveCommand(e->newCUID(), e, 30*60); command->setLocalNode(localNode); command->setRoutingTable(routingTable); tempCommands->push_back(command); } _initialized = true; commands.insert(commands.end(), tempCommands->begin(), tempCommands->end()); tempCommands->clear(); } catch(RecoverableException& e) { _logger->error("Exception caught while initializing DHT functionality." " DHT is disabled.", e); DHTRegistry::clearData(); } } bool DHTSetup::initialized() { return _initialized; } } // namespace aria2