/* */ #include "DHTMessageTracker.h" #include "DHTMessage.h" #include "DHTMessageCallback.h" #include "DHTMessageTrackerEntry.h" #include "DHTNode.h" #include "DHTRoutingTable.h" #include "DHTMessageFactory.h" #include "Util.h" #include "LogFactory.h" #include "Dictionary.h" #include "Data.h" #include "DlAbortEx.h" #include "DHTConstants.h" DHTMessageTracker::DHTMessageTracker(): _routingTable(0), _factory(0), _logger(LogFactory::getInstance()) {} DHTMessageTracker::~DHTMessageTracker() {} void DHTMessageTracker::addMessage(const DHTMessageHandle& message, time_t timeout, const DHTMessageCallbackHandle& callback) { _entries.push_back(new DHTMessageTrackerEntry(message, timeout, callback)); } void DHTMessageTracker::addMessage(const DHTMessageHandle& message, const DHTMessageCallbackHandle& callback) { addMessage(message, DHT_MESSAGE_TIMEOUT, callback); } pair DHTMessageTracker::messageArrived(const Dictionary* d, const string& ipaddr, uint16_t port) { const Data* tid = dynamic_cast(d->get("t")); if(!tid) { throw new DlAbortEx("Malformed DHT message. From:%s:%u", ipaddr.c_str(), port); } _logger->debug("Searching tracker entry for TransactionID=%s, Remote=%s:%u", Util::toHex(tid->toString()).c_str(), ipaddr.c_str(), port); for(DHTMessageTrackerEntries::iterator i = _entries.begin(); i != _entries.end(); ++i) { if((*i)->match(tid->toString(), ipaddr, port)) { DHTMessageTrackerEntryHandle entry = *i; _entries.erase(i); _logger->debug("Tracker entry found."); DHTNodeHandle targetNode = entry->getTargetNode(); DHTMessageHandle message = _factory->createResponseMessage(entry->getMessageType(), d, targetNode); int64_t rtt = entry->getElapsedMillis(); _logger->debug("RTT is %s", Util::llitos(rtt).c_str()); targetNode->updateRTT(rtt); DHTMessageCallbackHandle callback = entry->getCallback(); return pair(message, callback); } } _logger->debug("Tracker entry not found."); return pair(0, 0); } void DHTMessageTracker::handleTimeout() { for(DHTMessageTrackerEntries::iterator i = _entries.begin(); i != _entries.end();) { if((*i)->isTimeout()) { DHTMessageTrackerEntryHandle entry = *i; i = _entries.erase(i); DHTNodeHandle node = entry->getTargetNode(); _logger->debug("Message timeout: To:%s:%u", node->getIPAddress().c_str(), node->getPort()); node->updateRTT(entry->getElapsedMillis()); node->timeout(); if(node->isBad()) { _logger->debug("Marked bad: %s:%u", node->getIPAddress().c_str(), node->getPort()); _routingTable->dropNode(node); } DHTMessageCallbackHandle callback = entry->getCallback(); if(!callback.isNull()) { callback->onTimeout(node); } } else { ++i; } } } DHTMessageTrackerEntryHandle DHTMessageTracker::getEntryFor(const DHTMessageHandle& message) const { for(DHTMessageTrackerEntries::const_iterator i = _entries.begin(); i != _entries.end(); ++i) { if((*i)->match(message->getTransactionID(), message->getRemoteNode()->getIPAddress(), message->getRemoteNode()->getPort())) { return *i; } } return 0; } size_t DHTMessageTracker::countEntry() const { return _entries.size(); } void DHTMessageTracker::setRoutingTable(const DHTRoutingTableHandle& routingTable) { _routingTable = routingTable; } void DHTMessageTracker::setMessageFactory(const DHTMessageFactoryHandle& factory) { _factory = factory; }