/* */ #include "DefaultBtMessageDispatcher.h" #include #include "prefs.h" #include "BtAbortOutstandingRequestEvent.h" #include "BtCancelSendingPieceEvent.h" #include "BtChokingEvent.h" #include "BtMessageFactory.h" #include "message.h" #include "DownloadContext.h" #include "PeerStorage.h" #include "PieceStorage.h" #include "BtMessage.h" #include "Peer.h" #include "Piece.h" #include "LogFactory.h" #include "Logger.h" #include "a2functional.h" #include "a2algo.h" #include "RequestGroupMan.h" #include "RequestGroup.h" #include "util.h" #include "fmt.h" #include "PeerConnection.h" namespace aria2 { DefaultBtMessageDispatcher::DefaultBtMessageDispatcher() : cuid_(0), downloadContext_{0}, peerStorage_{0}, pieceStorage_{0}, peerConnection_{0}, messageFactory_(0), requestGroupMan_(0), requestTimeout_(0) {} DefaultBtMessageDispatcher::~DefaultBtMessageDispatcher() { A2_LOG_DEBUG("DefaultBtMessageDispatcher::deleted"); } void DefaultBtMessageDispatcher::addMessageToQueue (const std::shared_ptr& btMessage) { btMessage->onQueued(); messageQueue_.push_back(btMessage); } void DefaultBtMessageDispatcher::addMessageToQueue (const std::vector >& btMessages) { for(std::vector >::const_iterator itr = btMessages.begin(), eoi = btMessages.end(); itr != eoi; ++itr) { addMessageToQueue(*itr); } } void DefaultBtMessageDispatcher::sendMessagesInternal() { std::vector > tempQueue; while(!messageQueue_.empty()) { std::shared_ptr msg = messageQueue_.front(); messageQueue_.pop_front(); if(msg->isUploading()) { if(requestGroupMan_->doesOverallUploadSpeedExceed() || downloadContext_->getOwnerRequestGroup()->doesUploadSpeedExceed()) { tempQueue.push_back(msg); continue; } } msg->send(); } if(!tempQueue.empty()) { messageQueue_.insert(messageQueue_.begin(), tempQueue.begin(), tempQueue.end()); } } void DefaultBtMessageDispatcher::sendMessages() { if(peerConnection_->getBufferEntrySize() < A2_IOV_MAX) { sendMessagesInternal(); } peerConnection_->sendPendingData(); } // Cancel sending piece message to peer. void DefaultBtMessageDispatcher::doCancelSendingPieceAction (size_t index, int32_t begin, int32_t length) { BtCancelSendingPieceEvent event(index, begin, length); std::vector > tempQueue (messageQueue_.begin(), messageQueue_.end()); for(const auto& i : tempQueue) { i->onCancelSendingPieceEvent(event); } } // Cancel sending piece message to peer. // TODO Is this method really necessary? void DefaultBtMessageDispatcher::doCancelSendingPieceAction (const std::shared_ptr& piece) { } namespace { void abortOutstandingRequest (const RequestSlot& slot, const std::shared_ptr& piece, cuid_t cuid) { A2_LOG_DEBUG(fmt(MSG_DELETING_REQUEST_SLOT, cuid, static_cast(slot.getIndex()), slot.getBegin(), static_cast(slot.getBlockIndex()))); piece->cancelBlock(slot.getBlockIndex()); } } // namespace namespace { struct FindRequestSlotByIndex { size_t index; FindRequestSlotByIndex(size_t index) : index(index) {} bool operator()(const RequestSlot& slot) const { return slot.getIndex() == index; } }; } // namespace // localhost cancels outstanding download requests to the peer. void DefaultBtMessageDispatcher::doAbortOutstandingRequestAction (const std::shared_ptr& piece) { for(std::deque::iterator itr = requestSlots_.begin(), eoi = requestSlots_.end(); itr != eoi; ++itr) { if((*itr).getIndex() == piece->getIndex()) { abortOutstandingRequest(*itr, piece, cuid_); } } requestSlots_.erase(std::remove_if(requestSlots_.begin(), requestSlots_.end(), FindRequestSlotByIndex(piece->getIndex())), requestSlots_.end()); BtAbortOutstandingRequestEvent event(piece); std::vector > tempQueue (messageQueue_.begin(), messageQueue_.end()); for(const auto& i : tempQueue) { i->onAbortOutstandingRequestEvent(event); } } namespace { class ProcessChokedRequestSlot { private: cuid_t cuid_; std::shared_ptr peer_; PieceStorage* pieceStorage_; public: ProcessChokedRequestSlot (cuid_t cuid, const std::shared_ptr& peer, PieceStorage* pieceStorage) : cuid_(cuid), peer_(peer), pieceStorage_(pieceStorage) {} void operator()(const RequestSlot& slot) const { if(!peer_->isInPeerAllowedIndexSet(slot.getIndex())) { A2_LOG_DEBUG(fmt(MSG_DELETING_REQUEST_SLOT_CHOKED, cuid_, static_cast(slot.getIndex()), slot.getBegin(), static_cast(slot.getBlockIndex()))); std::shared_ptr piece = pieceStorage_->getPiece(slot.getIndex()); piece->cancelBlock(slot.getBlockIndex()); } } }; } // namespace namespace { class FindChokedRequestSlot { private: std::shared_ptr peer_; public: FindChokedRequestSlot(const std::shared_ptr& peer): peer_(peer) {} bool operator()(const RequestSlot& slot) const { return !peer_->isInPeerAllowedIndexSet(slot.getIndex()); } }; } // namespace // localhost received choke message from the peer. void DefaultBtMessageDispatcher::doChokedAction() { std::for_each(requestSlots_.begin(), requestSlots_.end(), ProcessChokedRequestSlot(cuid_, peer_, pieceStorage_)); requestSlots_.erase(std::remove_if(requestSlots_.begin(), requestSlots_.end(), FindChokedRequestSlot(peer_)), requestSlots_.end()); } // localhost dispatched choke message to the peer. void DefaultBtMessageDispatcher::doChokingAction() { BtChokingEvent event; std::vector > tempQueue (messageQueue_.begin(), messageQueue_.end()); for(const auto& i : tempQueue) { i->onChokingEvent(event); } } namespace { class ProcessStaleRequestSlot { private: cuid_t cuid_; std::shared_ptr peer_; PieceStorage* pieceStorage_; BtMessageDispatcher* messageDispatcher_; BtMessageFactory* messageFactory_; time_t requestTimeout_; public: ProcessStaleRequestSlot (cuid_t cuid, const std::shared_ptr& peer, PieceStorage* pieceStorage, BtMessageDispatcher* dispatcher, BtMessageFactory* factory, time_t requestTimeout) : cuid_(cuid), peer_(peer), pieceStorage_(pieceStorage), messageDispatcher_(dispatcher), messageFactory_(factory), requestTimeout_(requestTimeout) {} void operator()(const RequestSlot& slot) { if(slot.isTimeout(requestTimeout_)) { A2_LOG_DEBUG(fmt(MSG_DELETING_REQUEST_SLOT_TIMEOUT, cuid_, static_cast(slot.getIndex()), slot.getBegin(), static_cast(slot.getBlockIndex()))); slot.getPiece()->cancelBlock(slot.getBlockIndex()); peer_->snubbing(true); } else if(slot.getPiece()->hasBlock(slot.getBlockIndex())) { A2_LOG_DEBUG(fmt(MSG_DELETING_REQUEST_SLOT_ACQUIRED, cuid_, static_cast(slot.getIndex()), slot.getBegin(), static_cast(slot.getBlockIndex()))); messageDispatcher_->addMessageToQueue (messageFactory_->createCancelMessage(slot.getIndex(), slot.getBegin(), slot.getLength())); } } }; } // namespace namespace { class FindStaleRequestSlot { private: PieceStorage* pieceStorage_; time_t requestTimeout_; public: FindStaleRequestSlot(PieceStorage* pieceStorage, time_t requestTimeout) : pieceStorage_(pieceStorage), requestTimeout_(requestTimeout) {} bool operator()(const RequestSlot& slot) { if(slot.isTimeout(requestTimeout_)) { return true; } else { if(slot.getPiece()->hasBlock(slot.getBlockIndex())) { return true; } else { return false; } } } }; } // namespace void DefaultBtMessageDispatcher::checkRequestSlotAndDoNecessaryThing() { std::for_each(requestSlots_.begin(), requestSlots_.end(), ProcessStaleRequestSlot(cuid_, peer_, pieceStorage_, this, messageFactory_, requestTimeout_)); requestSlots_.erase(std::remove_if(requestSlots_.begin(), requestSlots_.end(), FindStaleRequestSlot(pieceStorage_, requestTimeout_)), requestSlots_.end()); } bool DefaultBtMessageDispatcher::isSendingInProgress() { return peerConnection_->getBufferEntrySize(); } namespace { class BlockIndexLess { public: bool operator()(const RequestSlot& lhs, const RequestSlot& rhs) const { if(lhs.getIndex() == rhs.getIndex()) { return lhs.getBlockIndex() < rhs.getBlockIndex(); } else { return lhs.getIndex() < rhs.getIndex(); } } }; } // namespace bool DefaultBtMessageDispatcher::isOutstandingRequest (size_t index, size_t blockIndex) { for(std::deque::const_iterator itr = requestSlots_.begin(), eoi = requestSlots_.end(); itr != eoi; ++itr) { if((*itr).getIndex() == index && (*itr).getBlockIndex() == blockIndex) { return true; } } return false; } RequestSlot DefaultBtMessageDispatcher::getOutstandingRequest (size_t index, int32_t begin, int32_t length) { for(std::deque::const_iterator itr = requestSlots_.begin(), eoi = requestSlots_.end(); itr != eoi; ++itr) { if((*itr).getIndex() == index && (*itr).getBegin() == begin && (*itr).getLength() == length) { return *itr; } } return RequestSlot::nullSlot; } void DefaultBtMessageDispatcher::removeOutstandingRequest (const RequestSlot& slot) { for(std::deque::iterator itr = requestSlots_.begin(), eoi = requestSlots_.end(); itr != eoi; ++itr) { if(*itr == slot) { abortOutstandingRequest(*itr, slot.getPiece(), cuid_); requestSlots_.erase(itr); break; } } } void DefaultBtMessageDispatcher::addOutstandingRequest (const RequestSlot& slot) { requestSlots_.push_back(slot); } size_t DefaultBtMessageDispatcher::countOutstandingUpload() { return std::count_if(messageQueue_.begin(), messageQueue_.end(), std::mem_fn(&BtMessage::isUploading)); } void DefaultBtMessageDispatcher::setPeer(const std::shared_ptr& peer) { peer_ = peer; } void DefaultBtMessageDispatcher::setDownloadContext (DownloadContext* downloadContext) { downloadContext_ = downloadContext; } void DefaultBtMessageDispatcher::setPieceStorage(PieceStorage* pieceStorage) { pieceStorage_ = pieceStorage; } void DefaultBtMessageDispatcher::setPeerStorage(PeerStorage* peerStorage) { peerStorage_ = peerStorage; } void DefaultBtMessageDispatcher::setBtMessageFactory(BtMessageFactory* factory) { messageFactory_ = factory; } void DefaultBtMessageDispatcher::setRequestGroupMan(RequestGroupMan* rgman) { requestGroupMan_ = rgman; } } // namespace aria2