/* */ #include "DHTAbstractNodeLookupTask.h" #include "DHTRoutingTable.h" #include "DHTMessageDispatcher.h" #include "DHTMessageFactory.h" #include "DHTMessage.h" #include "DHTNode.h" #include "DHTNodeLookupEntry.h" #include "DHTMessageCallbackImpl.h" #include "DHTBucket.h" #include "LogFactory.h" #include "Logger.h" #include "Util.h" #include "DHTIDCloser.h" #include #include namespace aria2 { DHTAbstractNodeLookupTask::DHTAbstractNodeLookupTask(const unsigned char* targetID): _inFlightMessage(0) { memcpy(_targetID, targetID, DHT_ID_LENGTH); } void DHTAbstractNodeLookupTask::onReceived(const SharedHandle& message) { --_inFlightMessage; onReceivedInternal(message); std::deque > newEntries = toEntries(getNodesFromMessage(message)); size_t count = 0; for(std::deque >::const_iterator i = newEntries.begin(); i != newEntries.end(); ++i) { if(memcmp(_localNode->getID(), (*i)->_node->getID(), DHT_ID_LENGTH) != 0) { _entries.push_front(*i); ++count; } } _logger->debug("%u node lookup entries added.", count); std::stable_sort(_entries.begin(), _entries.end(), DHTIDCloser(_targetID)); _entries.erase(std::unique(_entries.begin(), _entries.end()), _entries.end()); _logger->debug("%u node lookup entries are unique.", _entries.size()); if(_entries.size() > DHTBucket::K) { _entries.erase(_entries.begin()+DHTBucket::K, _entries.end()); } if(needsAdditionalOutgoingMessage()) { sendMessage(); } if(_inFlightMessage == 0) { _logger->debug("Finished node_lookup for node ID %s", Util::toHex(_targetID, DHT_ID_LENGTH).c_str()); onFinish(); updateBucket(); _finished = true; } } void DHTAbstractNodeLookupTask::onTimeout(const SharedHandle& node) { _logger->debug("node lookup message timeout for node ID=%s", Util::toHex(node->getID(), DHT_ID_LENGTH).c_str()); --_inFlightMessage; for(std::deque >::iterator i = _entries.begin(); i != _entries.end(); ++i) { if((*i)->_node == node) { _entries.erase(i); break; } } if(needsAdditionalOutgoingMessage()) { sendMessage(); } if(_inFlightMessage == 0) { _logger->debug("Finished node_lookup for node ID %s", Util::toHex(_targetID, DHT_ID_LENGTH).c_str()); onFinish(); updateBucket(); _finished = true; } } void DHTAbstractNodeLookupTask::sendMessage() { for(std::deque >::iterator i = _entries.begin(); i != _entries.end() && _inFlightMessage < ALPHA; ++i) { if((*i)->_used == false) { ++_inFlightMessage; (*i)->_used = true; SharedHandle m = createMessage((*i)->_node); WeakHandle listener(this); SharedHandle callback(new DHTMessageCallbackImpl(listener)); _dispatcher->addMessageToQueue(m, callback); } } } void DHTAbstractNodeLookupTask::updateBucket() { // TODO we have to something here? } void DHTAbstractNodeLookupTask::startup() { _entries = toEntries(_routingTable->getClosestKNodes(_targetID)); if(_entries.empty()) { _finished = true; } else { // TODO use RTT here _inFlightMessage = 0; sendMessage(); if(_inFlightMessage == 0) { _logger->debug("No message was sent in this lookup stage. Finished."); _finished = true; } } } std::deque > DHTAbstractNodeLookupTask::toEntries(const std::deque >& nodes) const { std::deque > entries; for(std::deque >::const_iterator i = nodes.begin(); i != nodes.end(); ++i) { SharedHandle e(new DHTNodeLookupEntry(*i)); entries.push_back(e); } return entries; } } // namespace aria2