/* */ #include "DHTMessageDispatcherImpl.h" #include "DHTMessage.h" #include "DHTMessageCallback.h" #include "DHTMessageEntry.h" #include "DHTMessageTracker.h" #include "RecoverableException.h" #include "LogFactory.h" #include "Logger.h" #include "DHTConstants.h" #include "fmt.h" #include "DHTNode.h" #include "a2functional.h" namespace aria2 { DHTMessageDispatcherImpl::DHTMessageDispatcherImpl (const std::shared_ptr& tracker) : tracker_{tracker}, timeout_{DHT_MESSAGE_TIMEOUT} {} void DHTMessageDispatcherImpl::addMessageToQueue (std::unique_ptr message, time_t timeout, std::unique_ptr callback) { messageQueue_.push_back(make_unique (std::move(message), timeout, std::move(callback))); } void DHTMessageDispatcherImpl::addMessageToQueue (std::unique_ptr message, std::unique_ptr callback) { addMessageToQueue(std::move(message), timeout_, std::move(callback)); } bool DHTMessageDispatcherImpl::sendMessage(DHTMessageEntry* entry) { try { if(entry->message->send()) { if(!entry->message->isReply()) { tracker_->addMessage(entry->message.get(), entry->timeout, std::move(entry->callback)); } A2_LOG_INFO(fmt("Message sent: %s", entry->message->toString().c_str())); } else { return false; } } catch(RecoverableException& e) { A2_LOG_INFO_EX(fmt("Failed to send message: %s", entry->message->toString().c_str()), e); // Add message to DHTMessageTracker with timeout 0 to treat it as // time out. Without this, we have untracked message and some of // DHTTask(such as DHTAbstractNodeLookupTask) don't finish // forever. if(!entry->message->isReply()) { tracker_->addMessage(entry->message.get(), 0, std::move(entry->callback)); } } return true; } void DHTMessageDispatcherImpl::sendMessages() { auto itr = std::begin(messageQueue_); for(; itr != std::end(messageQueue_); ++itr) { if(!sendMessage((*itr).get())) { break; } } messageQueue_.erase(std::begin(messageQueue_), itr); A2_LOG_DEBUG(fmt("%lu dht messages remaining in the queue.", static_cast(messageQueue_.size()))); } size_t DHTMessageDispatcherImpl::countMessageInQueue() const { return messageQueue_.size(); } } // namespace aria2