/* */ #include "DHTSetup.h" #include #include #include "LogFactory.h" #include "Logger.h" #include "util.h" #include "DHTNode.h" #include "DHTConnectionImpl.h" #include "DHTRoutingTable.h" #include "DHTMessageFactoryImpl.h" #include "DHTMessageTracker.h" #include "DHTMessageDispatcherImpl.h" #include "DHTMessageReceiver.h" #include "DHTTaskQueueImpl.h" #include "DHTTaskFactoryImpl.h" #include "DHTPeerAnnounceStorage.h" #include "DHTTokenTracker.h" #include "DHTInteractionCommand.h" #include "DHTTokenUpdateCommand.h" #include "DHTBucketRefreshCommand.h" #include "DHTPeerAnnounceCommand.h" #include "DHTEntryPointNameResolveCommand.h" #include "DHTAutoSaveCommand.h" #include "DHTTask.h" #include "DHTRoutingTableDeserializer.h" #include "DHTRegistry.h" #include "DHTBucketRefreshTask.h" #include "DHTMessageCallback.h" #include "prefs.h" #include "Option.h" #include "SocketCore.h" #include "DlAbortEx.h" #include "RecoverableException.h" #include "a2functional.h" #include "DownloadEngine.h" #include "fmt.h" namespace aria2 { DHTSetup::DHTSetup() {} DHTSetup::~DHTSetup() {} void DHTSetup::setup (std::vector& commands, DownloadEngine* e, int family) { if(family != AF_INET && family != AF_INET6) { return; } if((family == AF_INET && DHTRegistry::isInitialized()) || (family == AF_INET6 && DHTRegistry::isInitialized6())) { return; } try { std::vector* tempCommands = new std::vector(); auto_delete_container > commandsDel(tempCommands); // load routing table and localnode id here SharedHandle localNode; DHTRoutingTableDeserializer deserializer(family); const std::string& dhtFile = e->getOption()->get(family == AF_INET?PREF_DHT_FILE_PATH: PREF_DHT_FILE_PATH6); try { std::ifstream in(dhtFile.c_str(), std::ios::binary); if(!in) { throw DL_ABORT_EX("Could not open file"); } deserializer.deserialize(in); localNode = deserializer.getLocalNode(); } catch(RecoverableException& e) { A2_LOG_ERROR_EX (fmt("Exception caught while loading DHT routing table from %s", dhtFile.c_str()), e); } if(!localNode) { localNode.reset(new DHTNode()); } SharedHandle connection(new DHTConnectionImpl(family)); { IntSequence seq = util::parseIntRange(e->getOption()->get(PREF_DHT_LISTEN_PORT)); uint16_t port; const std::string& addr = e->getOption()->get(family == AF_INET?PREF_DHT_LISTEN_ADDR: PREF_DHT_LISTEN_ADDR6); if(!connection->bind(port, addr, seq)) { throw DL_ABORT_EX("Error occurred while binding port for DHT"); } localNode->setPort(port); } A2_LOG_DEBUG(fmt("Initialized local node ID=%s", util::toHex(localNode->getID(), DHT_ID_LENGTH).c_str())); SharedHandle routingTable(new DHTRoutingTable(localNode)); SharedHandle factory (new DHTMessageFactoryImpl(family)); SharedHandle tracker(new DHTMessageTracker()); SharedHandle dispatcher(new DHTMessageDispatcherImpl(tracker)); SharedHandle receiver(new DHTMessageReceiver(tracker)); SharedHandle taskQueue(new DHTTaskQueueImpl()); SharedHandle taskFactory(new DHTTaskFactoryImpl()); SharedHandle peerAnnounceStorage(new DHTPeerAnnounceStorage()); SharedHandle tokenTracker(new DHTTokenTracker()); const time_t messageTimeout = e->getOption()->getAsInt(PREF_DHT_MESSAGE_TIMEOUT); // wiring up tracker->setRoutingTable(routingTable); tracker->setMessageFactory(factory); dispatcher->setTimeout(messageTimeout); receiver->setConnection(connection); receiver->setMessageFactory(factory); receiver->setRoutingTable(routingTable); taskFactory->setLocalNode(localNode); taskFactory->setRoutingTable(routingTable.get()); taskFactory->setMessageDispatcher(dispatcher.get()); taskFactory->setMessageFactory(factory.get()); taskFactory->setTaskQueue(taskQueue.get()); taskFactory->setTimeout(messageTimeout); routingTable->setTaskQueue(taskQueue); routingTable->setTaskFactory(taskFactory); peerAnnounceStorage->setTaskQueue(taskQueue); peerAnnounceStorage->setTaskFactory(taskFactory); factory->setRoutingTable(routingTable.get()); factory->setConnection(connection.get()); factory->setMessageDispatcher(dispatcher.get()); factory->setPeerAnnounceStorage(peerAnnounceStorage.get()); factory->setTokenTracker(tokenTracker.get()); factory->setLocalNode(localNode); // assign them into DHTRegistry if(family == AF_INET) { DHTRegistry::getMutableData().localNode = localNode; DHTRegistry::getMutableData().routingTable = routingTable; DHTRegistry::getMutableData().taskQueue = taskQueue; DHTRegistry::getMutableData().taskFactory = taskFactory; DHTRegistry::getMutableData().peerAnnounceStorage = peerAnnounceStorage; DHTRegistry::getMutableData().tokenTracker = tokenTracker; DHTRegistry::getMutableData().messageDispatcher = dispatcher; DHTRegistry::getMutableData().messageReceiver = receiver; DHTRegistry::getMutableData().messageFactory = factory; } else { DHTRegistry::getMutableData6().localNode = localNode; DHTRegistry::getMutableData6().routingTable = routingTable; DHTRegistry::getMutableData6().taskQueue = taskQueue; DHTRegistry::getMutableData6().taskFactory = taskFactory; DHTRegistry::getMutableData6().peerAnnounceStorage = peerAnnounceStorage; DHTRegistry::getMutableData6().tokenTracker = tokenTracker; DHTRegistry::getMutableData6().messageDispatcher = dispatcher; DHTRegistry::getMutableData6().messageReceiver = receiver; DHTRegistry::getMutableData6().messageFactory = factory; } // add deserialized nodes to routing table const std::vector >& desnodes = deserializer.getNodes(); for(std::vector >::const_iterator i = desnodes.begin(), eoi = desnodes.end(); i != eoi; ++i) { routingTable->addNode(*i); } if(!desnodes.empty()) { SharedHandle task (static_pointer_cast (taskFactory->createBucketRefreshTask())); task->setForceRefresh(true); taskQueue->addPeriodicTask1(task); } const std::string& prefEntryPointHost = family == AF_INET?PREF_DHT_ENTRY_POINT_HOST:PREF_DHT_ENTRY_POINT_HOST6; if(!e->getOption()->get(prefEntryPointHost).empty()) { { const std::string& prefEntryPointPort = family == AF_INET?PREF_DHT_ENTRY_POINT_PORT: PREF_DHT_ENTRY_POINT_PORT6; std::pair addr (e->getOption()->get(prefEntryPointHost), e->getOption()->getAsInt(prefEntryPointPort)); std::vector > entryPoints; entryPoints.push_back(addr); DHTEntryPointNameResolveCommand* command = new DHTEntryPointNameResolveCommand(e->newCUID(), e, entryPoints); command->setBootstrapEnabled(true); command->setTaskQueue(taskQueue); command->setTaskFactory(taskFactory); command->setRoutingTable(routingTable); command->setLocalNode(localNode); tempCommands->push_back(command); } } else { A2_LOG_INFO("No DHT entry point specified."); } { DHTInteractionCommand* command = new DHTInteractionCommand(e->newCUID(), e); command->setMessageDispatcher(dispatcher); command->setMessageReceiver(receiver); command->setTaskQueue(taskQueue); command->setReadCheckSocket(connection->getSocket()); tempCommands->push_back(command); } { DHTTokenUpdateCommand* command = new DHTTokenUpdateCommand(e->newCUID(), e, DHT_TOKEN_UPDATE_INTERVAL); command->setTokenTracker(tokenTracker); tempCommands->push_back(command); } { DHTBucketRefreshCommand* command = new DHTBucketRefreshCommand(e->newCUID(), e, DHT_BUCKET_REFRESH_CHECK_INTERVAL); command->setTaskQueue(taskQueue); command->setRoutingTable(routingTable); command->setTaskFactory(taskFactory); tempCommands->push_back(command); } { DHTPeerAnnounceCommand* command = new DHTPeerAnnounceCommand(e->newCUID(), e, DHT_PEER_ANNOUNCE_CHECK_INTERVAL); command->setPeerAnnounceStorage(peerAnnounceStorage); tempCommands->push_back(command); } { DHTAutoSaveCommand* command = new DHTAutoSaveCommand(e->newCUID(), e, family, 30*60); command->setLocalNode(localNode); command->setRoutingTable(routingTable); tempCommands->push_back(command); } if(family == AF_INET) { DHTRegistry::setInitialized(true); } else { DHTRegistry::setInitialized6(true); } commands.insert(commands.end(), tempCommands->begin(), tempCommands->end()); tempCommands->clear(); } catch(RecoverableException& e) { A2_LOG_ERROR_EX(fmt("Exception caught while initializing DHT functionality." " DHT is disabled."), e); if(family == AF_INET) { DHTRegistry::clearData(); } else { DHTRegistry::clearData6(); } } } } // namespace aria2