/* */ #include "DHTReplaceNodeTask.h" #include "DHTBucket.h" #include "DHTNode.h" #include "DHTMessage.h" #include "DHTMessageFactory.h" #include "DHTMessageDispatcher.h" #include "DHTMessageCallbackImpl.h" #include "Logger.h" namespace aria2 { DHTReplaceNodeTask::DHTReplaceNodeTask(const SharedHandle& bucket, const SharedHandle& newNode): _bucket(bucket), _newNode(newNode), _numRetry(0), _timeout(DHT_MESSAGE_TIMEOUT) {} DHTReplaceNodeTask::~DHTReplaceNodeTask() {} void DHTReplaceNodeTask::startup() { sendMessage(); } void DHTReplaceNodeTask::sendMessage() { SharedHandle questionableNode = _bucket->getLRUQuestionableNode(); if(questionableNode.isNull()) { setFinished(true); } else { SharedHandle m = getMessageFactory()->createPingMessage(questionableNode); WeakHandle listener(this); SharedHandle callback (new DHTMessageCallbackImpl(listener)); getMessageDispatcher()->addMessageToQueue(m, _timeout, callback); } } void DHTReplaceNodeTask::onReceived(const SharedHandle& message) { getLogger()->info("ReplaceNode: Ping reply received from %s.", message->getRemoteNode()->toString().c_str()); setFinished(true); } void DHTReplaceNodeTask::onTimeout(const SharedHandle& node) { ++_numRetry; if(_numRetry >= MAX_RETRY) { getLogger()->info("ReplaceNode: Ping failed %u times. Replace %s with %s.", _numRetry, node->toString().c_str(), _newNode->toString().c_str()); node->markBad(); _bucket->addNode(_newNode); setFinished(true); } else { getLogger()->info("ReplaceNode: Ping reply timeout from %s. Try once more.", node->toString().c_str()); sendMessage(); } } } // namespace aria2