From babdcb2c7d249b19ad38a07158e829f52a08e75a Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Sun, 10 Jul 2016 22:42:49 +0900 Subject: [PATCH] Change have entry indexing method Now use increasing sequence of integer rather than timer value. --- src/BtPieceMessage.cc | 3 +- src/DefaultBtInteractive.cc | 8 ++--- src/DefaultBtInteractive.h | 4 ++- src/DefaultPieceStorage.cc | 58 +++++++++++++++++++-------------- src/DefaultPieceStorage.h | 38 ++++++++++----------- src/HaveEraseCommand.cc | 13 ++++++-- src/PieceStorage.h | 19 ++++++----- src/RequestGroup.cc | 2 +- src/SegmentMan.cc | 3 +- src/UnknownLengthPieceStorage.h | 26 +++++---------- test/DefaultPieceStorageTest.cc | 56 +++++++++++++++++++++++++++++++ test/MockPieceStorage.h | 14 ++++---- 12 files changed, 156 insertions(+), 88 deletions(-) diff --git a/src/BtPieceMessage.cc b/src/BtPieceMessage.cc index 8a8c5607..cb0c2eb5 100644 --- a/src/BtPieceMessage.cc +++ b/src/BtPieceMessage.cc @@ -277,7 +277,8 @@ void BtPieceMessage::onNewPiece(const std::shared_ptr& piece) A2_LOG_INFO(fmt(MSG_GOT_NEW_PIECE, getCuid(), static_cast(piece->getIndex()))); getPieceStorage()->completePiece(piece); - getPieceStorage()->advertisePiece(getCuid(), piece->getIndex()); + getPieceStorage()->advertisePiece(getCuid(), piece->getIndex(), + global::wallclock()); } void BtPieceMessage::onWrongPiece(const std::shared_ptr& piece) diff --git a/src/DefaultBtInteractive.cc b/src/DefaultBtInteractive.cc index 10f3b956..6c125e12 100644 --- a/src/DefaultBtInteractive.cc +++ b/src/DefaultBtInteractive.cc @@ -92,8 +92,8 @@ DefaultBtInteractive::DefaultBtInteractive( peer_(peer), metadataGetMode_(false), localNode_(nullptr), + lastHaveIndex_(0), allowedFastSetSize_(10), - haveTimer_(global::wallclock()), keepAliveTimer_(global::wallclock()), floodingTimer_(global::wallclock()), inactiveTimer_(global::wallclock()), @@ -172,7 +172,6 @@ DefaultBtInteractive::receiveAndSendHandshake() void DefaultBtInteractive::doPostHandshakeProcessing() { // Set time 0 to haveTimer to cache http/ftp download piece completion - haveTimer_ = Timer::zero(); keepAliveTimer_ = global::wallclock(); floodingTimer_ = global::wallclock(); pexTimer_ = Timer::zero(); @@ -267,8 +266,9 @@ void DefaultBtInteractive::checkHave() { std::vector haveIndexes; - pieceStorage_->getAdvertisedPieceIndexes(haveIndexes, cuid_, haveTimer_); - haveTimer_ = global::wallclock(); + lastHaveIndex_ = pieceStorage_->getAdvertisedPieceIndexes(haveIndexes, cuid_, + lastHaveIndex_); + // Use bitfield message if it is equal to or less than the total // size of have messages. if (5 + pieceStorage_->getBitfieldLength() <= haveIndexes.size() * 9) { diff --git a/src/DefaultBtInteractive.h b/src/DefaultBtInteractive.h index 765201f7..73a684a8 100644 --- a/src/DefaultBtInteractive.h +++ b/src/DefaultBtInteractive.h @@ -124,8 +124,10 @@ private: DHTNode* localNode_; + // The last haveIndex we have advertised to the peer. + uint64_t lastHaveIndex_; + size_t allowedFastSetSize_; - Timer haveTimer_; Timer keepAliveTimer_; Timer floodingTimer_; FloodingStat floodingStat_; diff --git a/src/DefaultPieceStorage.cc b/src/DefaultPieceStorage.cc index a13dc19f..91b9aef5 100644 --- a/src/DefaultPieceStorage.cc +++ b/src/DefaultPieceStorage.cc @@ -85,6 +85,10 @@ DefaultPieceStorage::DefaultPieceStorage( endGame_(false), endGamePieceNum_(END_GAME_PIECE_NUM), option_(option), + // The DefaultBtInteractive has the default value of + // lastHaveIndex of 0, so we need to make nextHaveIndex_ more + // than that. + nextHaveIndex_(1), pieceStatMan_(std::make_shared( downloadContext->getNumPieces(), true)), pieceSelector_(make_unique(pieceStatMan_)), @@ -698,40 +702,44 @@ int32_t DefaultPieceStorage::getPieceLength(size_t index) return bitfieldMan_->getBlockLength(index); } -void DefaultPieceStorage::advertisePiece(cuid_t cuid, size_t index) +void DefaultPieceStorage::advertisePiece(cuid_t cuid, size_t index, + Timer registeredTime) { - HaveEntry entry(cuid, index, global::wallclock()); - haves_.push_front(entry); + haves_.emplace_back(nextHaveIndex_++, cuid, index, std::move(registeredTime)); } -void DefaultPieceStorage::getAdvertisedPieceIndexes( - std::vector& indexes, cuid_t myCuid, const Timer& lastCheckTime) +uint64_t DefaultPieceStorage::getAdvertisedPieceIndexes( + std::vector& indexes, cuid_t myCuid, uint64_t lastHaveIndex) { - for (std::deque::const_iterator itr = haves_.begin(), - eoi = haves_.end(); - itr != eoi; ++itr) { - const HaveEntry& have = *itr; - if (lastCheckTime > have.getRegisteredTime()) { - break; - } - indexes.push_back(have.getIndex()); + auto it = + std::upper_bound(std::begin(haves_), std::end(haves_), lastHaveIndex, + [](uint64_t lastHaveIndex, const HaveEntry& have) { + return lastHaveIndex < have.haveIndex; + }); + + if (it == std::end(haves_)) { + return lastHaveIndex; } + + for (; it != std::end(haves_); ++it) { + indexes.push_back((*it).index); + } + + return (*(std::end(haves_) - 1)).haveIndex; } -void DefaultPieceStorage::removeAdvertisedPiece( - const std::chrono::seconds& elapsed) +void DefaultPieceStorage::removeAdvertisedPiece(const Timer& expiry) { - auto itr = std::find_if( - std::begin(haves_), std::end(haves_), [&elapsed](const HaveEntry& have) { - return have.getRegisteredTime().difference(global::wallclock()) >= - elapsed; - }); + auto it = std::upper_bound(std::begin(haves_), std::end(haves_), expiry, + [](const Timer& expiry, const HaveEntry& have) { + return expiry < have.registeredTime; + }); - if (itr != std::end(haves_)) { - A2_LOG_DEBUG(fmt(MSG_REMOVED_HAVE_ENTRY, - static_cast(std::end(haves_) - itr))); - haves_.erase(itr, std::end(haves_)); - } + A2_LOG_DEBUG( + fmt(MSG_REMOVED_HAVE_ENTRY, + static_cast(std::distance(std::begin(haves_), it)))); + + haves_.erase(std::begin(haves_), it); } void DefaultPieceStorage::markAllPiecesDone() { bitfieldMan_->setAllBit(); } diff --git a/src/DefaultPieceStorage.h b/src/DefaultPieceStorage.h index ca95513f..e41736ab 100644 --- a/src/DefaultPieceStorage.h +++ b/src/DefaultPieceStorage.h @@ -55,23 +55,19 @@ class StreamPieceSelector; #define END_GAME_PIECE_NUM 20 -class HaveEntry { -private: - cuid_t cuid_; - size_t index_; - Timer registeredTime_; - -public: - HaveEntry(cuid_t cuid, size_t index, const Timer& registeredTime) - : cuid_(cuid), index_(index), registeredTime_(registeredTime) +struct HaveEntry { + HaveEntry(uint64_t haveIndex, cuid_t cuid, size_t index, Timer registeredTime) + : haveIndex(haveIndex), + cuid(cuid), + index(index), + registeredTime(std::move(registeredTime)) { } - cuid_t getCuid() const { return cuid_; } - - size_t getIndex() const { return index_; } - - const Timer& getRegisteredTime() const { return registeredTime_; } + uint64_t haveIndex; + cuid_t cuid; + size_t index; + Timer registeredTime; }; class DefaultPieceStorage : public PieceStorage { @@ -87,6 +83,10 @@ private: bool endGame_; size_t endGamePieceNum_; const Option* option_; + + // The next unique index on HaveEntry, which is ever strictly + // increasing sequence of integer. + uint64_t nextHaveIndex_; std::deque haves_; std::shared_ptr pieceStatMan_; @@ -238,14 +238,14 @@ public: virtual int32_t getPieceLength(size_t index) CXX11_OVERRIDE; - virtual void advertisePiece(cuid_t cuid, size_t index) CXX11_OVERRIDE; + virtual void advertisePiece(cuid_t cuid, size_t index, + Timer registeredTime) CXX11_OVERRIDE; - virtual void + virtual uint64_t getAdvertisedPieceIndexes(std::vector& indexes, cuid_t myCuid, - const Timer& lastCheckTime) CXX11_OVERRIDE; + uint64_t lastHaveIndex) CXX11_OVERRIDE; - virtual void - removeAdvertisedPiece(const std::chrono::seconds& elapsed) CXX11_OVERRIDE; + virtual void removeAdvertisedPiece(const Timer& expiry) CXX11_OVERRIDE; virtual void markAllPiecesDone() CXX11_OVERRIDE; diff --git a/src/HaveEraseCommand.cc b/src/HaveEraseCommand.cc index 30bacfc4..51ef9a53 100644 --- a/src/HaveEraseCommand.cc +++ b/src/HaveEraseCommand.cc @@ -37,6 +37,7 @@ #include "RequestGroupMan.h" #include "PieceStorage.h" #include "RequestGroup.h" +#include "wallclock.h" namespace aria2 { @@ -58,13 +59,19 @@ void HaveEraseCommand::preProcess() void HaveEraseCommand::process() { - const RequestGroupList& groups = + // we are making a copy of current wallclock. + auto expiry = global::wallclock(); + expiry.advance(5_s); + + const auto& groups = getDownloadEngine()->getRequestGroupMan()->getRequestGroups(); for (auto& group : groups) { const auto& ps = group->getPieceStorage(); - if (ps) { - ps->removeAdvertisedPiece(5_s); + if (!ps) { + continue; } + + ps->removeAdvertisedPiece(expiry); } } diff --git a/src/PieceStorage.h b/src/PieceStorage.h index a27b336c..6e7066de 100644 --- a/src/PieceStorage.h +++ b/src/PieceStorage.h @@ -235,21 +235,22 @@ public: * Adds piece index to advertise to other commands. They send have message * based on this information. */ - virtual void advertisePiece(cuid_t cuid, size_t index) = 0; + virtual void advertisePiece(cuid_t cuid, size_t index, + Timer registerdTime) = 0; /** - * indexes is filled with piece index which is not advertised by the caller - * command and newer than lastCheckTime. + * indexes is filled with piece index which is not advertised by the + * caller command and newer than lastHaveIndex. */ - virtual void getAdvertisedPieceIndexes(std::vector& indexes, - cuid_t myCuid, - const Timer& lastCheckTime) = 0; + virtual uint64_t getAdvertisedPieceIndexes(std::vector& indexes, + cuid_t myCuid, + uint64_t lastHaveIndex) = 0; /** - * Removes have entry if specified seconds have elapsed since its - * registration. + * Removes have entry if its registeredTime is at least as old as + * expiry. */ - virtual void removeAdvertisedPiece(const std::chrono::seconds& elapsed) = 0; + virtual void removeAdvertisedPiece(const Timer& expiry) = 0; /** * Sets all bits in bitfield to 1. diff --git a/src/RequestGroup.cc b/src/RequestGroup.cc index 43394d47..c89dacbf 100644 --- a/src/RequestGroup.cc +++ b/src/RequestGroup.cc @@ -997,7 +997,7 @@ void RequestGroup::releaseRuntimeResource(DownloadEngine* e) peerStorage_ = nullptr; #endif // ENABLE_BITTORRENT if (pieceStorage_) { - pieceStorage_->removeAdvertisedPiece(0_s); + pieceStorage_->removeAdvertisedPiece(Timer::zero()); } // Don't reset segmentMan_ and pieceStorage_ here to provide // progress information via RPC diff --git a/src/SegmentMan.cc b/src/SegmentMan.cc index c3167285..65748e1c 100644 --- a/src/SegmentMan.cc +++ b/src/SegmentMan.cc @@ -374,7 +374,8 @@ bool SegmentMan::completeSegment(cuid_t cuid, const std::shared_ptr& segment) { pieceStorage_->completePiece(segment->getPiece()); - pieceStorage_->advertisePiece(cuid, segment->getPiece()->getIndex()); + pieceStorage_->advertisePiece(cuid, segment->getPiece()->getIndex(), + global::wallclock()); auto itr = std::find_if(usedSegmentEntries_.begin(), usedSegmentEntries_.end(), FindSegmentEntry(segment)); if (itr == usedSegmentEntries_.end()) { diff --git a/src/UnknownLengthPieceStorage.h b/src/UnknownLengthPieceStorage.h index 00d79a45..3263870f 100644 --- a/src/UnknownLengthPieceStorage.h +++ b/src/UnknownLengthPieceStorage.h @@ -223,30 +223,22 @@ public: virtual int32_t getPieceLength(size_t index) CXX11_OVERRIDE; - /** - * Adds piece index to advertise to other commands. They send have message - * based on this information. - */ - virtual void advertisePiece(cuid_t cuid, size_t index) CXX11_OVERRIDE {} + virtual void advertisePiece(cuid_t cuid, size_t index, + Timer registeredTime) CXX11_OVERRIDE + { + } /** - * Returns piece index which is not advertised by the caller command and - * newer than lastCheckTime. + * indexes is filled with piece index which is not advertised by the + * caller command and newer than lastHaveIndex. */ - virtual void + virtual uint64_t getAdvertisedPieceIndexes(std::vector& indexes, cuid_t myCuid, - const Timer& lastCheckTime) CXX11_OVERRIDE + uint64_t lastHaveIndex) CXX11_OVERRIDE { } - /** - * Removes have entry if specified seconds have elapsed since its - * registration. - */ - virtual void - removeAdvertisedPiece(const std::chrono::seconds& elapsed) CXX11_OVERRIDE - { - } + virtual void removeAdvertisedPiece(const Timer& expiry) CXX11_OVERRIDE {} /** * Sets all bits in bitfield to 1. diff --git a/test/DefaultPieceStorageTest.cc b/test/DefaultPieceStorageTest.cc index 10162921..cbb3b4bf 100644 --- a/test/DefaultPieceStorageTest.cc +++ b/test/DefaultPieceStorageTest.cc @@ -39,6 +39,7 @@ class DefaultPieceStorageTest : public CppUnit::TestFixture { CPPUNIT_TEST(testGetCompletedLength); CPPUNIT_TEST(testGetFilteredCompletedLength); CPPUNIT_TEST(testGetNextUsedIndex); + CPPUNIT_TEST(testAdvertisePiece); CPPUNIT_TEST_SUITE_END(); private: @@ -77,6 +78,7 @@ public: void testGetCompletedLength(); void testGetFilteredCompletedLength(); void testGetNextUsedIndex(); + void testAdvertisePiece(); }; CPPUNIT_TEST_SUITE_REGISTRATION(DefaultPieceStorageTest); @@ -401,4 +403,58 @@ void DefaultPieceStorageTest::testGetNextUsedIndex() CPPUNIT_ASSERT_EQUAL((size_t)2, pss.getNextUsedIndex(0)); } +void DefaultPieceStorageTest::testAdvertisePiece() +{ + DefaultPieceStorage ps(dctx_, option_.get()); + + ps.advertisePiece(1, 100, Timer(10_s)); + ps.advertisePiece(2, 101, Timer(11_s)); + ps.advertisePiece(3, 102, Timer(11_s)); + ps.advertisePiece(1, 103, Timer(12_s)); + ps.advertisePiece(2, 104, Timer(100_s)); + + std::vector res, ans; + uint64_t lastHaveIndex; + + lastHaveIndex = ps.getAdvertisedPieceIndexes(res, 1, 0); + ans = std::vector{100, 101, 102, 103, 104}; + + CPPUNIT_ASSERT_EQUAL((uint64_t)5, lastHaveIndex); + CPPUNIT_ASSERT(ans == res); + + res.clear(); + lastHaveIndex = ps.getAdvertisedPieceIndexes(res, 1, 3); + ans = std::vector{103, 104}; + + CPPUNIT_ASSERT_EQUAL((uint64_t)5, lastHaveIndex); + CPPUNIT_ASSERT_EQUAL((size_t)2, res.size()); + CPPUNIT_ASSERT(ans == res); + + res.clear(); + lastHaveIndex = ps.getAdvertisedPieceIndexes(res, 1, 5); + + CPPUNIT_ASSERT_EQUAL((uint64_t)5, lastHaveIndex); + CPPUNIT_ASSERT_EQUAL((size_t)0, res.size()); + + // remove haves + + ps.removeAdvertisedPiece(Timer(11_s)); + + res.clear(); + lastHaveIndex = ps.getAdvertisedPieceIndexes(res, 1, 0); + ans = std::vector{103, 104}; + + CPPUNIT_ASSERT_EQUAL((uint64_t)5, lastHaveIndex); + CPPUNIT_ASSERT_EQUAL((size_t)2, res.size()); + CPPUNIT_ASSERT(ans == res); + + ps.removeAdvertisedPiece(Timer(300_s)); + + res.clear(); + lastHaveIndex = ps.getAdvertisedPieceIndexes(res, 1, 0); + + CPPUNIT_ASSERT_EQUAL((uint64_t)0, lastHaveIndex); + CPPUNIT_ASSERT_EQUAL((size_t)0, res.size()); +} + } // namespace aria2 diff --git a/test/MockPieceStorage.h b/test/MockPieceStorage.h index 9350e204..89297363 100644 --- a/test/MockPieceStorage.h +++ b/test/MockPieceStorage.h @@ -241,18 +241,18 @@ public: void addPieceLengthList(int32_t length) { pieceLengthList.push_back(length); } - virtual void advertisePiece(cuid_t cuid, size_t index) CXX11_OVERRIDE {} + virtual void advertisePiece(cuid_t cuid, size_t index, + Timer registeredTime) CXX11_OVERRIDE + { + } - virtual void + virtual uint64_t getAdvertisedPieceIndexes(std::vector& indexes, cuid_t myCuid, - const Timer& lastCheckTime) CXX11_OVERRIDE + uint64_t lastHaveIndex) CXX11_OVERRIDE { } - virtual void - removeAdvertisedPiece(const std::chrono::seconds& elapsed) CXX11_OVERRIDE - { - } + virtual void removeAdvertisedPiece(const Timer& expiry) CXX11_OVERRIDE {} virtual void markAllPiecesDone() CXX11_OVERRIDE {}