/* */ #include "DHTReplaceNodeTask.h" #include "DHTBucket.h" #include "DHTNode.h" #include "DHTPingReplyMessage.h" #include "DHTMessageFactory.h" #include "DHTMessageDispatcher.h" #include "Logger.h" #include "LogFactory.h" #include "DHTPingReplyMessageCallback.h" #include "DHTQueryMessage.h" #include "fmt.h" namespace aria2 { DHTReplaceNodeTask::DHTReplaceNodeTask (const SharedHandle& bucket, const SharedHandle& newNode) : bucket_(bucket), newNode_(newNode), numRetry_(0), timeout_(DHT_MESSAGE_TIMEOUT) {} DHTReplaceNodeTask::~DHTReplaceNodeTask() {} void DHTReplaceNodeTask::startup() { sendMessage(); } void DHTReplaceNodeTask::sendMessage() { SharedHandle questionableNode = bucket_->getLRUQuestionableNode(); if(!questionableNode) { setFinished(true); } else { SharedHandle m = getMessageFactory()->createPingMessage(questionableNode); SharedHandle callback (new DHTPingReplyMessageCallback(this)); getMessageDispatcher()->addMessageToQueue(m, timeout_, callback); } } void DHTReplaceNodeTask::onReceived(const DHTPingReplyMessage* message) { A2_LOG_INFO(fmt("ReplaceNode: Ping reply received from %s.", message->getRemoteNode()->toString().c_str())); setFinished(true); } namespace { const int MAX_RETRY = 2; } //namespace void DHTReplaceNodeTask::onTimeout(const SharedHandle& node) { ++numRetry_; if(numRetry_ >= MAX_RETRY) { A2_LOG_INFO(fmt("ReplaceNode: Ping failed %d times. Replace %s with %s.", numRetry_, node->toString().c_str(), newNode_->toString().c_str())); node->markBad(); bucket_->addNode(newNode_); setFinished(true); } else { A2_LOG_INFO(fmt("ReplaceNode: Ping reply timeout from %s. Try once more.", node->toString().c_str())); sendMessage(); } } } // namespace aria2