/* */ #include "DefaultBtMessageDispatcher.h" #include #include "prefs.h" #include "BtAbortOutstandingRequestEvent.h" #include "BtCancelSendingPieceEvent.h" #include "BtChokingEvent.h" #include "BtMessageFactory.h" #include "message.h" #include "DownloadContext.h" #include "PeerStorage.h" #include "PieceStorage.h" #include "BtMessage.h" #include "Peer.h" #include "Piece.h" #include "LogFactory.h" #include "Logger.h" #include "a2functional.h" #include "a2algo.h" #include "RequestGroupMan.h" #include "RequestGroup.h" #include "util.h" namespace aria2 { DefaultBtMessageDispatcher::DefaultBtMessageDispatcher(): cuid(0), requestTimeout(0), logger(LogFactory::getInstance()) {} DefaultBtMessageDispatcher::~DefaultBtMessageDispatcher() { if(logger->debug()) { logger->debug("DefaultBtMessageDispatcher::deleted"); } } void DefaultBtMessageDispatcher::addMessageToQueue(const BtMessageHandle& btMessage) { btMessage->onQueued(); messageQueue.push_back(btMessage); } void DefaultBtMessageDispatcher::addMessageToQueue (const std::vector >& btMessages) { for(std::vector >::const_iterator itr = btMessages.begin(), eoi = btMessages.end(); itr != eoi; ++itr) { addMessageToQueue(*itr); } } void DefaultBtMessageDispatcher::sendMessages() { std::vector > tempQueue; while(!messageQueue.empty()) { BtMessageHandle msg = messageQueue.front(); messageQueue.pop_front(); if(msg->isUploading() && !msg->isSendingInProgress()) { if(_requestGroupMan->doesOverallUploadSpeedExceed() || _downloadContext->getOwnerRequestGroup()->doesUploadSpeedExceed()) { tempQueue.push_back(msg); continue; } } msg->send(); if(msg->isUploading()) { _peerStorage->updateTransferStatFor(peer); } if(msg->isSendingInProgress()) { messageQueue.push_front(msg); break; } } if(!tempQueue.empty()) { // Insert pending message to the front, so that message is likely sent in // the same order as it is queued. if(!messageQueue.empty() && messageQueue.front()->isSendingInProgress()) { messageQueue.insert(messageQueue.begin()+1, tempQueue.begin(), tempQueue.end()); } else { messageQueue.insert(messageQueue.begin(), tempQueue.begin(), tempQueue.end()); } } } // Cancel sending piece message to peer. void DefaultBtMessageDispatcher::doCancelSendingPieceAction(size_t index, uint32_t begin, size_t length) { BtCancelSendingPieceEvent event(index, begin, length); std::vector > tempQueue (messageQueue.begin(), messageQueue.end()); forEachMemFunSH(tempQueue.begin(), tempQueue.end(), &BtMessage::onCancelSendingPieceEvent, event); } // Cancel sending piece message to peer. // TODO Is this method really necessary? void DefaultBtMessageDispatcher::doCancelSendingPieceAction (const SharedHandle& piece) { } class AbortOutstandingRequest { private: SharedHandle _piece; cuid_t _cuid; Logger* _logger; public: AbortOutstandingRequest(const SharedHandle& piece, cuid_t cuid): _piece(piece), _cuid(cuid), _logger(LogFactory::getInstance()) {} void operator()(const RequestSlot& slot) const { if(_logger->debug()) { _logger->debug(MSG_DELETING_REQUEST_SLOT, util::itos(_cuid).c_str(), slot.getIndex(), slot.getBlockIndex()); _logger->debug("index=%d, begin=%d", slot.getIndex(), slot.getBegin()); } _piece->cancelBlock(slot.getBlockIndex()); } }; // localhost cancels outstanding download requests to the peer. void DefaultBtMessageDispatcher::doAbortOutstandingRequestAction (const SharedHandle& piece) { RequestSlot rs(piece->getIndex(), 0, 0, 0); std::deque::iterator first = std::lower_bound(requestSlots.begin(), requestSlots.end(), rs); rs.setIndex(piece->getIndex()+1); std::deque::iterator last = std::lower_bound(requestSlots.begin(), requestSlots.end(), rs); std::for_each(first, last, AbortOutstandingRequest(piece, cuid)); requestSlots.erase(first, last); BtAbortOutstandingRequestEvent event(piece); std::vector > tempQueue (messageQueue.begin(), messageQueue.end()); forEachMemFunSH(tempQueue.begin(), tempQueue.end(), &BtMessage::onAbortOutstandingRequestEvent, event); } class ProcessChokedRequestSlot { private: cuid_t _cuid; SharedHandle _peer; SharedHandle _pieceStorage; Logger* _logger; public: ProcessChokedRequestSlot(cuid_t cuid, const SharedHandle& peer, const SharedHandle& pieceStorage): _cuid(cuid), _peer(peer), _pieceStorage(pieceStorage), _logger(LogFactory::getInstance()) {} void operator()(const RequestSlot& slot) const { if(!_peer->isInPeerAllowedIndexSet(slot.getIndex())) { if(_logger->debug()) { _logger->debug(MSG_DELETING_REQUEST_SLOT_CHOKED, util::itos(_cuid).c_str(), slot.getIndex(), slot.getBlockIndex()); _logger->debug("index=%d, begin=%d", slot.getIndex(), slot.getBegin()); } SharedHandle piece = _pieceStorage->getPiece(slot.getIndex()); piece->cancelBlock(slot.getBlockIndex()); } } }; class FindChokedRequestSlot { private: SharedHandle _peer; public: FindChokedRequestSlot(const SharedHandle& peer): _peer(peer) {} bool operator()(const RequestSlot& slot) const { return !_peer->isInPeerAllowedIndexSet(slot.getIndex()); } }; // localhost received choke message from the peer. void DefaultBtMessageDispatcher::doChokedAction() { std::for_each(requestSlots.begin(), requestSlots.end(), ProcessChokedRequestSlot(cuid, peer, _pieceStorage)); requestSlots.erase(std::remove_if(requestSlots.begin(), requestSlots.end(), FindChokedRequestSlot(peer)), requestSlots.end()); } // localhost dispatched choke message to the peer. void DefaultBtMessageDispatcher::doChokingAction() { BtChokingEvent event; std::vector > tempQueue (messageQueue.begin(), messageQueue.end()); forEachMemFunSH(tempQueue.begin(), tempQueue.end(), &BtMessage::onChokingEvent, event); } class ProcessStaleRequestSlot { private: cuid_t _cuid; SharedHandle _peer; SharedHandle _pieceStorage; BtMessageDispatcher* _messageDispatcher; WeakHandle _messageFactory; time_t _requestTimeout; Logger* _logger; public: ProcessStaleRequestSlot(cuid_t cuid, const SharedHandle& peer, const SharedHandle& pieceStorage, BtMessageDispatcher* dispatcher, const WeakHandle& factory, time_t requestTimeout): _cuid(cuid), _peer(peer), _pieceStorage(pieceStorage), _messageDispatcher(dispatcher), _messageFactory(factory), _requestTimeout(requestTimeout), _logger(LogFactory::getInstance()) {} void operator()(const RequestSlot& slot) { if(slot.isTimeout(_requestTimeout)) { if(_logger->debug()) { _logger->debug(MSG_DELETING_REQUEST_SLOT_TIMEOUT, util::itos(_cuid).c_str(), slot.getBlockIndex()); _logger->debug("index=%d, begin=%d", slot.getIndex(), slot.getBegin()); } slot.getPiece()->cancelBlock(slot.getBlockIndex()); _peer->snubbing(true); } else if(slot.getPiece()->hasBlock(slot.getBlockIndex())) { if(_logger->debug()) { _logger->debug(MSG_DELETING_REQUEST_SLOT_ACQUIRED, util::itos(_cuid).c_str(), slot.getBlockIndex()); _logger->debug("index=%d, begin=%d", slot.getIndex(), slot.getBegin()); } _messageDispatcher->addMessageToQueue (_messageFactory->createCancelMessage(slot.getIndex(), slot.getBegin(), slot.getLength())); } } }; class FindStaleRequestSlot { private: SharedHandle _pieceStorage; time_t _requestTimeout; public: FindStaleRequestSlot(const SharedHandle& pieceStorage, time_t requestTimeout): _pieceStorage(pieceStorage), _requestTimeout(requestTimeout) {} bool operator()(const RequestSlot& slot) { if(slot.isTimeout(_requestTimeout)) { return true; } else { if(slot.getPiece()->hasBlock(slot.getBlockIndex())) { return true; } else { return false; } } } }; void DefaultBtMessageDispatcher::checkRequestSlotAndDoNecessaryThing() { std::for_each(requestSlots.begin(), requestSlots.end(), ProcessStaleRequestSlot(cuid, peer, _pieceStorage, this, messageFactory, requestTimeout)); requestSlots.erase(std::remove_if(requestSlots.begin(), requestSlots.end(), FindStaleRequestSlot(_pieceStorage, requestTimeout)), requestSlots.end()); } bool DefaultBtMessageDispatcher::isSendingInProgress() { if(messageQueue.size() > 0) { return messageQueue.front()->isSendingInProgress(); } else { return false; } } class BlockIndexLess { public: bool operator()(const RequestSlot& lhs, const RequestSlot& rhs) const { if(lhs.getIndex() == rhs.getIndex()) { return lhs.getBlockIndex() < rhs.getBlockIndex(); } else { return lhs.getIndex() < rhs.getIndex(); } } }; bool DefaultBtMessageDispatcher::isOutstandingRequest(size_t index, size_t blockIndex) { RequestSlot rs(index, 0, 0, blockIndex); std::deque::iterator i = std::lower_bound(requestSlots.begin(), requestSlots.end(), rs, BlockIndexLess()); return i != requestSlots.end() && (*i).getIndex() == index && (*i).getBlockIndex() == blockIndex; } RequestSlot DefaultBtMessageDispatcher::getOutstandingRequest(size_t index, uint32_t begin, size_t length) { RequestSlot ret; RequestSlot rs(index, begin, length, 0); std::deque::iterator i = std::lower_bound(requestSlots.begin(), requestSlots.end(), rs); if(i != requestSlots.end() && (*i) == rs) { ret = *i; } else { ret = RequestSlot::nullSlot; } return ret; } void DefaultBtMessageDispatcher::removeOutstandingRequest(const RequestSlot& slot) { std::deque::iterator i = std::lower_bound(requestSlots.begin(), requestSlots.end(), slot); if(i != requestSlots.end() && (*i) == slot) { AbortOutstandingRequest(slot.getPiece(), cuid)(*i); requestSlots.erase(i); } } void DefaultBtMessageDispatcher::addOutstandingRequest(const RequestSlot& slot) { std::deque::iterator i = std::lower_bound(requestSlots.begin(), requestSlots.end(), slot); if(i == requestSlots.end() || (*i) != slot) { requestSlots.insert(i, slot); } } size_t DefaultBtMessageDispatcher::countOutstandingUpload() { return std::count_if(messageQueue.begin(), messageQueue.end(), mem_fun_sh(&BtMessage::isUploading)); } void DefaultBtMessageDispatcher::setPeer(const SharedHandle& peer) { this->peer = peer; } void DefaultBtMessageDispatcher::setDownloadContext (const SharedHandle& downloadContext) { _downloadContext = downloadContext; } void DefaultBtMessageDispatcher::setPieceStorage (const SharedHandle& pieceStorage) { _pieceStorage = pieceStorage; } void DefaultBtMessageDispatcher::setPeerStorage (const SharedHandle& peerStorage) { _peerStorage = peerStorage; } void DefaultBtMessageDispatcher::setBtMessageFactory(const WeakHandle& factory) { this->messageFactory = factory; } void DefaultBtMessageDispatcher::setRequestGroupMan (const WeakHandle& rgman) { _requestGroupMan = rgman; } } // namespace aria2