diff --git a/ChangeLog b/ChangeLog index 3d773d29..fff4b2a7 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,26 @@ +2008-05-17 Tatsuhiro Tsujikawa + + Sort RequestSlot in ascending order and manipulate them using + lower_bound. + * src/DefaultBtMessageDispatcher.cc + * src/DefaultBtMessageDispatcher.h + (getMessageQueue): Added const qualifier. + (getRequestSlots): Added const qualifier. + (sendMessages): Use empty() instead of size(). + (doCancelSendingPieceAction): Use HandleEvent object. + (doAbortOutstandingRequestAction): Rewritten. + (doChokedAction): Rewritten. + (checkRequestSlotAndDoNecessaryThing): Rewritten. + (isOutstandingRequest): Rewritten. + (getOutstandingRequest): Rewritten. + (removeOutstandingRequest): Rewritten. + (addOutstandingRequest): Rewritten. + * src/RequestSlot.cc + * src/RequestSlot.h + (operator=): Rewritten. + (operator!=): New function. + (operator<): New function. + 2008-05-17 Tatsuhiro Tsujikawa * src/DefaultBtRequestFactory.cc diff --git a/src/DefaultBtMessageDispatcher.cc b/src/DefaultBtMessageDispatcher.cc index cfd8032d..35f493a7 100644 --- a/src/DefaultBtMessageDispatcher.cc +++ b/src/DefaultBtMessageDispatcher.cc @@ -81,7 +81,7 @@ void DefaultBtMessageDispatcher::addMessageToQueue(const BtMessages& btMessages) void DefaultBtMessageDispatcher::sendMessages() { BtMessages tempQueue; - while(messageQueue.size() > 0) { + while(!messageQueue.empty()) { BtMessageHandle msg = messageQueue.front(); messageQueue.pop_front(); if(maxUploadSpeedLimit > 0 && @@ -101,6 +101,18 @@ void DefaultBtMessageDispatcher::sendMessages() { std::copy(tempQueue.begin(), tempQueue.end(), std::back_inserter(messageQueue)); } +class HandleEvent { +private: + SharedHandle _event; +public: + HandleEvent(const SharedHandle& event):_event(event) {} + + void operator()(const SharedHandle& msg) const + { + msg->handleEvent(_event); + } +}; + // Cancel sending piece message to peer. void DefaultBtMessageDispatcher::doCancelSendingPieceAction(size_t index, uint32_t begin, size_t length) { @@ -108,9 +120,7 @@ void DefaultBtMessageDispatcher::doCancelSendingPieceAction(size_t index, uint32 (new BtCancelSendingPieceEvent(index, begin, length)); BtMessages tempQueue = messageQueue; - for(BtMessages::iterator itr = tempQueue.begin(); itr != tempQueue.end(); itr++) { - (*itr)->handleEvent(event); - } + std::for_each(tempQueue.begin(), tempQueue.end(), HandleEvent(event)); } // Cancel sending piece message to peer. @@ -119,57 +129,105 @@ void DefaultBtMessageDispatcher::doCancelSendingPieceAction(const PieceHandle& p { } +class AbortOutstandingRequest { +private: + SharedHandle _piece; + int32_t _cuid; + Logger* _logger; +public: + AbortOutstandingRequest(const SharedHandle& piece, int32_t cuid): + _piece(piece), + _cuid(cuid), + _logger(LogFactory::getInstance()) {} + + void operator()(const RequestSlot& slot) const + { + _logger->debug(MSG_DELETING_REQUEST_SLOT, + _cuid, + slot.getIndex(), + slot.getBlockIndex()); + _logger->debug("index=%d, begin=%d", slot.getIndex(), slot.getBegin()); + _piece->cancelBlock(slot.getBlockIndex()); + } +}; + // localhost cancels outstanding download requests to the peer. void DefaultBtMessageDispatcher::doAbortOutstandingRequestAction(const PieceHandle& piece) { - for(RequestSlots::iterator itr = requestSlots.begin(); - itr != requestSlots.end();) { - RequestSlot& slot = *itr; - if(slot.getIndex() == piece->getIndex()) { - logger->debug(MSG_DELETING_REQUEST_SLOT, - cuid, - slot.getIndex(), - slot.getBlockIndex()); - piece->cancelBlock(slot.getBlockIndex()); - itr = requestSlots.erase(itr); - } else { - itr++; - } - } + RequestSlot rs(piece->getIndex(), 0, 0, 0); + std::deque::iterator first = + std::lower_bound(requestSlots.begin(), requestSlots.end(), rs); + + rs.setIndex(piece->getIndex()+1); + std::deque::iterator last = + std::lower_bound(requestSlots.begin(), requestSlots.end(), rs); + + std::for_each(first, last, AbortOutstandingRequest(piece, cuid)); + requestSlots.erase(first, last); BtAbortOutstandingRequestEventHandle event (new BtAbortOutstandingRequestEvent(piece)); BtMessages tempQueue = messageQueue; - for(BtMessages::iterator itr = tempQueue.begin(); itr != tempQueue.end(); ++itr) { - (*itr)->handleEvent(event); - } + std::for_each(tempQueue.begin(), tempQueue.end(), HandleEvent(event)); } +class ProcessChokedRequestSlot { +private: + int32_t _cuid; + SharedHandle _peer; + SharedHandle _pieceStorage; + Logger* _logger; +public: + ProcessChokedRequestSlot(int32_t cuid, + const SharedHandle& peer, + const SharedHandle& pieceStorage): + _cuid(cuid), + _peer(peer), + _pieceStorage(pieceStorage), + _logger(LogFactory::getInstance()) {} + + void operator()(const RequestSlot& slot) const + { + if(!_peer->isInPeerAllowedIndexSet(slot.getIndex())) { + _logger->debug(MSG_DELETING_REQUEST_SLOT_CHOKED, + _cuid, + slot.getIndex(), + slot.getBlockIndex()); + _logger->debug("index=%d, begin=%d", slot.getIndex(), slot.getBegin()); + SharedHandle piece = _pieceStorage->getPiece(slot.getIndex()); + piece->cancelBlock(slot.getBlockIndex()); + } + } + +}; + +class FindChokedRequestSlot { +private: + SharedHandle _peer; +public: + FindChokedRequestSlot(const SharedHandle& peer): + _peer(peer) {} + + bool operator()(const RequestSlot& slot) const + { + return !_peer->isInPeerAllowedIndexSet(slot.getIndex()); + } +}; + // localhost received choke message from the peer. void DefaultBtMessageDispatcher::doChokedAction() { - for(RequestSlots::iterator itr = requestSlots.begin(); - itr != requestSlots.end();) { - RequestSlot& slot = *itr; - if(peer->isInPeerAllowedIndexSet(slot.getIndex())) { - itr++; - } else { - logger->debug(MSG_DELETING_REQUEST_SLOT_CHOKED, - cuid, - slot.getIndex(), - slot.getBlockIndex()); - PieceHandle piece = pieceStorage->getPiece(slot.getIndex()); - piece->cancelBlock(slot.getBlockIndex()); - itr = requestSlots.erase(itr); - } - } + std::for_each(requestSlots.begin(), requestSlots.end(), + ProcessChokedRequestSlot(cuid, peer, pieceStorage)); + + requestSlots.erase(std::remove_if(requestSlots.begin(), requestSlots.end(), + FindChokedRequestSlot(peer)), + requestSlots.end()); BtChokedEventHandle event(new BtChokedEvent()); BtMessages tempQueue = messageQueue; - for(BtMessages::iterator itr = tempQueue.begin(); itr != tempQueue.end(); ++itr) { - (*itr)->handleEvent(event); - } + std::for_each(tempQueue.begin(), tempQueue.end(), HandleEvent(event)); } // localhost dispatched choke message to the peer. @@ -178,36 +236,93 @@ void DefaultBtMessageDispatcher::doChokingAction() BtChokingEventHandle event(new BtChokingEvent()); BtMessages tempQueue = messageQueue; - for(BtMessages::iterator itr = tempQueue.begin(); itr != tempQueue.end(); ++itr) { - (*itr)->handleEvent(event); - } + std::for_each(tempQueue.begin(), tempQueue.end(), HandleEvent(event)); } +class ProcessStaleRequestSlot { +private: + int32_t _cuid; + SharedHandle _peer; + SharedHandle _pieceStorage; + BtMessageDispatcher* _messageDispatcher; + WeakHandle _messageFactory; + time_t _requestTimeout; + Logger* _logger; +public: + ProcessStaleRequestSlot(int32_t cuid, const SharedHandle& peer, + const SharedHandle& pieceStorage, + BtMessageDispatcher* dispatcher, + const WeakHandle& factory, + time_t requestTimeout): + _cuid(cuid), + _peer(peer), + _pieceStorage(pieceStorage), + _messageDispatcher(dispatcher), + _messageFactory(factory), + _requestTimeout(requestTimeout), + _logger(LogFactory::getInstance()) {} + + void operator()(const RequestSlot& slot) + { + SharedHandle piece = _pieceStorage->getPiece(slot.getIndex()); + if(slot.isTimeout(_requestTimeout)) { + _logger->debug(MSG_DELETING_REQUEST_SLOT_TIMEOUT, + _cuid, + slot.getBlockIndex()); + _logger->debug("index=%d, begin=%d", slot.getIndex(), slot.getBegin()); + piece->cancelBlock(slot.getBlockIndex()); + _peer->snubbing(true); + } else if(piece->hasBlock(slot.getBlockIndex())) { + _logger->debug(MSG_DELETING_REQUEST_SLOT_ACQUIRED, + _cuid, + slot.getBlockIndex()); + _logger->debug("index=%d, begin=%d", slot.getIndex(), slot.getBegin()); + _messageDispatcher->addMessageToQueue + (_messageFactory->createCancelMessage(slot.getIndex(), + slot.getBegin(), + slot.getLength())); + } + } +}; + +class FindStaleRequestSlot { +private: + SharedHandle _pieceStorage; + time_t _requestTimeout; +public: + FindStaleRequestSlot(const SharedHandle& pieceStorage, + time_t requestTimeout): + _pieceStorage(pieceStorage), + _requestTimeout(requestTimeout) {} + + bool operator()(const RequestSlot& slot) + { + if(slot.isTimeout(_requestTimeout)) { + return true; + } else { + SharedHandle piece = _pieceStorage->getPiece(slot.getIndex()); + if(piece->hasBlock(slot.getBlockIndex())) { + return true; + } else { + return false; + } + } + } +}; + void DefaultBtMessageDispatcher::checkRequestSlotAndDoNecessaryThing() { - for(RequestSlots::iterator itr = requestSlots.begin(); - itr != requestSlots.end();) { - RequestSlot& slot = *itr; - PieceHandle piece = pieceStorage->getPiece(slot.getIndex()); - if(slot.isTimeout(requestTimeout)) { - logger->debug(MSG_DELETING_REQUEST_SLOT_TIMEOUT, - cuid, - slot.getBlockIndex()); - piece->cancelBlock(slot.getBlockIndex()); - peer->snubbing(true); - itr = requestSlots.erase(itr); - } else if(piece->hasBlock(slot.getBlockIndex())) { - logger->debug(MSG_DELETING_REQUEST_SLOT_ACQUIRED, - cuid, - slot.getBlockIndex()); - addMessageToQueue(messageFactory->createCancelMessage(slot.getIndex(), - slot.getBegin(), - slot.getLength())); - itr = requestSlots.erase(itr); - } else { - itr++; - } - } + std::for_each(requestSlots.begin(), requestSlots.end(), + ProcessStaleRequestSlot(cuid, + peer, + pieceStorage, + this, + messageFactory, + requestTimeout)); + requestSlots.erase(std::remove_if(requestSlots.begin(), requestSlots.end(), + FindStaleRequestSlot(pieceStorage, + requestTimeout)), + requestSlots.end()); } bool DefaultBtMessageDispatcher::isSendingInProgress() @@ -224,42 +339,57 @@ size_t DefaultBtMessageDispatcher::countOutstandingRequest() return requestSlots.size(); } -bool DefaultBtMessageDispatcher::isOutstandingRequest(size_t index, size_t blockIndex) { - for(RequestSlots::const_iterator itr = requestSlots.begin(); - itr != requestSlots.end(); itr++) { - const RequestSlot& slot = *itr; - if(slot.getIndex() == index && slot.getBlockIndex() == blockIndex) { - return true; +class BlockIndexLess { +public: + bool operator()(const RequestSlot& lhs, const RequestSlot& rhs) const + { + if(lhs.getIndex() == rhs.getIndex()) { + return lhs.getBlockIndex() < rhs.getBlockIndex(); + } else { + return lhs.getIndex() < rhs.getIndex(); } } - return false; +}; + +bool DefaultBtMessageDispatcher::isOutstandingRequest(size_t index, size_t blockIndex) { + RequestSlot rs(index, 0, 0, blockIndex); + + std::deque::iterator i = + std::lower_bound(requestSlots.begin(), requestSlots.end(), rs, BlockIndexLess()); + return i != requestSlots.end() && + (*i).getIndex() == index && (*i).getBlockIndex() == blockIndex; } RequestSlot DefaultBtMessageDispatcher::getOutstandingRequest(size_t index, uint32_t begin, size_t length) { - for(RequestSlots::iterator itr = requestSlots.begin(); - itr != requestSlots.end(); itr++) { - if(itr->getIndex() == index && - itr->getBegin() == begin && - itr->getLength() == length) { - return *itr; - } + RequestSlot ret(0, 0, 0, 0); + RequestSlot rs(index, begin, length, 0); + std::deque::iterator i = + std::lower_bound(requestSlots.begin(), requestSlots.end(), rs); + if(i != requestSlots.end() && (*i) == rs) { + ret = *i; + } else { + ret = RequestSlot::nullSlot; } - return RequestSlot::nullSlot; + return ret; } void DefaultBtMessageDispatcher::removeOutstandingRequest(const RequestSlot& slot) { - RequestSlots temp; - std::remove_copy(requestSlots.begin(), requestSlots.end(), std::back_inserter(temp), slot); - requestSlots = temp; + std::deque::iterator i = + std::lower_bound(requestSlots.begin(), requestSlots.end(), slot); + if(i != requestSlots.end() && (*i) == slot) { + requestSlots.erase(i); + } } -void DefaultBtMessageDispatcher::addOutstandingRequest(const RequestSlot& requestSlot) +void DefaultBtMessageDispatcher::addOutstandingRequest(const RequestSlot& slot) { - if(!isOutstandingRequest(requestSlot.getIndex(), requestSlot.getBlockIndex())) { - requestSlots.push_back(requestSlot); + std::deque::iterator i = + std::lower_bound(requestSlots.begin(), requestSlots.end(), slot); + if(i == requestSlots.end() || (*i) != slot) { + requestSlots.insert(i, slot); } } @@ -269,13 +399,13 @@ size_t DefaultBtMessageDispatcher::countOutstandingUpload() mem_fun_sh(&BtMessage::isUploading)); } -std::deque >& +const std::deque >& DefaultBtMessageDispatcher::getMessageQueue() { return messageQueue; } -std::deque& DefaultBtMessageDispatcher::getRequestSlots() +const std::deque& DefaultBtMessageDispatcher::getRequestSlots() { return requestSlots; } diff --git a/src/DefaultBtMessageDispatcher.h b/src/DefaultBtMessageDispatcher.h index 0d63e48c..bfd3925a 100644 --- a/src/DefaultBtMessageDispatcher.h +++ b/src/DefaultBtMessageDispatcher.h @@ -103,9 +103,9 @@ public: virtual size_t countOutstandingUpload(); - std::deque >& getMessageQueue(); + const std::deque >& getMessageQueue(); - RequestSlots& getRequestSlots(); + const RequestSlots& getRequestSlots(); void setPeer(const SharedHandle& peer); diff --git a/src/RequestSlot.cc b/src/RequestSlot.cc index c8d26118..2ceadc67 100644 --- a/src/RequestSlot.cc +++ b/src/RequestSlot.cc @@ -64,9 +64,21 @@ RequestSlot& RequestSlot::operator=(const RequestSlot& requestSlot) bool RequestSlot::operator==(const RequestSlot& requestSlot) const { - return index == requestSlot.index && - begin == requestSlot.begin && - length == requestSlot.length; + return index == requestSlot.index && begin == requestSlot.begin; +} + +bool RequestSlot::operator!=(const RequestSlot& requestSlot) const +{ + return !(*this == requestSlot); +} + +bool RequestSlot::operator<(const RequestSlot& requestSlot) const +{ + if(index == requestSlot.index) { + return begin < requestSlot.begin; + } else { + return index < requestSlot.index; + } } void RequestSlot::setDispatchedTime() { diff --git a/src/RequestSlot.h b/src/RequestSlot.h index 65dd03f1..94ab8f8f 100644 --- a/src/RequestSlot.h +++ b/src/RequestSlot.h @@ -59,6 +59,10 @@ public: bool operator==(const RequestSlot& requestSlot) const; + bool operator!=(const RequestSlot& requestSlot) const; + + bool operator<(const RequestSlot& requestSlot) const; + void setDispatchedTime(); void setDispatchedTime(time_t secFromEpoch); diff --git a/test/DefaultBtMessageDispatcherTest.cc b/test/DefaultBtMessageDispatcherTest.cc index bb8b7398..dc370def 100644 --- a/test/DefaultBtMessageDispatcherTest.cc +++ b/test/DefaultBtMessageDispatcherTest.cc @@ -399,7 +399,7 @@ void DefaultBtMessageDispatcherTest::testGetOutstandingRequest() { CPPUNIT_ASSERT(!RequestSlot::isNull(s2)); RequestSlot s3 = btMessageDispatcher->getOutstandingRequest(1, 1024, 17*1024); - CPPUNIT_ASSERT(RequestSlot::isNull(s3)); + CPPUNIT_ASSERT(!RequestSlot::isNull(s3)); RequestSlot s4 = btMessageDispatcher->getOutstandingRequest(1, 2*1024, 16*1024); CPPUNIT_ASSERT(RequestSlot::isNull(s4));