/* */ #include "SegmentMan.h" #include #include #include #include "util.h" #include "message.h" #include "prefs.h" #include "PiecedSegment.h" #include "GrowSegment.h" #include "LogFactory.h" #include "Logger.h" #include "PieceStorage.h" #include "PeerStat.h" #include "Option.h" #include "DownloadContext.h" #include "Piece.h" #include "FileEntry.h" #include "wallclock.h" #include "fmt.h" #include "WrDiskCacheEntry.h" #include "DownloadFailureException.h" namespace aria2 { SegmentEntry::SegmentEntry(cuid_t cuid, const std::shared_ptr& segment) : cuid(cuid), segment(segment) {} SegmentEntry::~SegmentEntry() {} SegmentMan::SegmentMan (const std::shared_ptr& downloadContext, const std::shared_ptr& pieceStorage) : downloadContext_(downloadContext), pieceStorage_(pieceStorage), ignoreBitfield_(downloadContext->getPieceLength(), downloadContext->getTotalLength()) { ignoreBitfield_.enableFilter(); } SegmentMan::~SegmentMan() {} bool SegmentMan::downloadFinished() const { if(!pieceStorage_) { return false; } else { return pieceStorage_->downloadFinished(); } } void SegmentMan::init() { // TODO Do we have to do something about DownloadContext and // PieceStorage here? } int64_t SegmentMan::getTotalLength() const { if(!pieceStorage_) { return 0; } else { return pieceStorage_->getTotalLength(); } } void SegmentMan::setPieceStorage(const std::shared_ptr& pieceStorage) { pieceStorage_ = pieceStorage; } void SegmentMan::setDownloadContext (const std::shared_ptr& downloadContext) { downloadContext_ = downloadContext; } namespace { void flushWrDiskCache(WrDiskCache* wrDiskCache, const std::shared_ptr& piece) { piece->flushWrCache(wrDiskCache); if(piece->getWrDiskCacheEntry()->getError() != WrDiskCacheEntry::CACHE_ERR_SUCCESS) { piece->clearAllBlock(wrDiskCache); throw DOWNLOAD_FAILURE_EXCEPTION2 (fmt("Write disk cache flush failure index=%lu", static_cast(piece->getIndex())), piece->getWrDiskCacheEntry()->getErrorCode()); } } } // namespace std::shared_ptr SegmentMan::checkoutSegment (cuid_t cuid, const std::shared_ptr& piece) { if(!piece) { return nullptr; } A2_LOG_DEBUG(fmt("Attach segment#%lu to CUID#%" PRId64 ".", static_cast(piece->getIndex()), cuid)); if(piece->getWrDiskCacheEntry()) { // Flush cached data here, because the cached data may be overlapped // if BT peers are involved. A2_LOG_DEBUG(fmt("Flushing cached data, size=%lu", static_cast(piece->getWrDiskCacheEntry()-> getSize()))); flushWrDiskCache(pieceStorage_->getWrDiskCache(), piece); } piece->setUsedBySegment(true); std::shared_ptr segment; if(piece->getLength() == 0) { segment.reset(new GrowSegment(piece)); } else { segment.reset(new PiecedSegment(downloadContext_->getPieceLength(), piece)); } std::shared_ptr entry(new SegmentEntry(cuid, segment)); usedSegmentEntries_.push_back(entry); A2_LOG_DEBUG(fmt("index=%lu, length=%d, segmentLength=%d," " writtenLength=%d", static_cast(segment->getIndex()), segment->getLength(), segment->getSegmentLength(), segment->getWrittenLength())); if(piece->getLength() > 0) { auto positr = segmentWrittenLengthMemo_.find(segment->getIndex()); if(positr != segmentWrittenLengthMemo_.end()) { const int32_t writtenLength = (*positr).second; A2_LOG_DEBUG(fmt("writtenLength(in memo)=%d, writtenLength=%d", writtenLength, segment->getWrittenLength())); // If the difference between cached writtenLength and segment's // writtenLength is less than one block, we assume that these // missing bytes are already downloaded. if(segment->getWrittenLength() < writtenLength && writtenLength-segment->getWrittenLength() < piece->getBlockLength()) { segment->updateWrittenLength(writtenLength-segment->getWrittenLength()); } } } return segment; } void SegmentMan::getInFlightSegment (std::vector >& segments, cuid_t cuid) { for(SegmentEntries::const_iterator itr = usedSegmentEntries_.begin(), eoi = usedSegmentEntries_.end(); itr != eoi; ++itr) { const std::shared_ptr& segmentEntry = *itr; if(segmentEntry->cuid == cuid) { segments.push_back(segmentEntry->segment); } } } std::shared_ptr SegmentMan::getSegment(cuid_t cuid, size_t minSplitSize) { std::shared_ptr piece = pieceStorage_->getMissingPiece (minSplitSize, ignoreBitfield_.getFilterBitfield(), ignoreBitfield_.getBitfieldLength(), cuid); return checkoutSegment(cuid, piece); } void SegmentMan::getSegment (std::vector >& segments, cuid_t cuid, size_t minSplitSize, const std::shared_ptr& fileEntry, size_t maxSegments) { BitfieldMan filter(ignoreBitfield_); filter.enableFilter(); filter.addNotFilter(fileEntry->getOffset(), fileEntry->getLength()); std::vector > pending; while(segments.size() < maxSegments) { std::shared_ptr segment = checkoutSegment(cuid, pieceStorage_->getMissingPiece (minSplitSize, filter.getFilterBitfield(), filter.getBitfieldLength(), cuid)); if(!segment) { break; } if(segment->getPositionToWrite() < fileEntry->getOffset() || fileEntry->getLastOffset() <= segment->getPositionToWrite()) { pending.push_back(segment); } else { segments.push_back(segment); } } for(std::vector >::const_iterator i = pending.begin(), eoi = pending.end(); i != eoi; ++i) { cancelSegment(cuid, *i); } } std::shared_ptr SegmentMan::getSegmentWithIndex (cuid_t cuid, size_t index) { if(index > 0 && downloadContext_->getNumPieces() <= index) { return nullptr; } return checkoutSegment(cuid, pieceStorage_->getMissingPiece(index, cuid)); } std::shared_ptr SegmentMan::getCleanSegmentIfOwnerIsIdle (cuid_t cuid, size_t index) { if(index > 0 && downloadContext_->getNumPieces() <= index) { return nullptr; } for(SegmentEntries::const_iterator itr = usedSegmentEntries_.begin(), eoi = usedSegmentEntries_.end(); itr != eoi; ++itr) { const std::shared_ptr& segmentEntry = *itr; if(segmentEntry->segment->getIndex() == index) { if(segmentEntry->segment->getWrittenLength() > 0) { return nullptr; } if(segmentEntry->cuid == cuid) { return segmentEntry->segment; } cuid_t owner = segmentEntry->cuid; std::shared_ptr ps = getPeerStat(owner); if(!ps || ps->getStatus() == NetStat::IDLE) { cancelSegment(owner); return getSegmentWithIndex(cuid, index); } else { return nullptr; } } } return nullptr; } void SegmentMan::cancelSegmentInternal (cuid_t cuid, const std::shared_ptr& segment) { A2_LOG_DEBUG(fmt("Canceling segment#%lu", static_cast(segment->getIndex()))); const std::shared_ptr& piece = segment->getPiece(); // TODO In PieceStorage::cancelPiece(), WrDiskCacheEntry may be // released. Flush first. if(piece->getWrDiskCacheEntry()) { // Flush cached data here, because the cached data may be overlapped // if BT peers are involved. A2_LOG_DEBUG(fmt("Flushing cached data, size=%lu", static_cast(piece->getWrDiskCacheEntry() ->getSize()))); flushWrDiskCache(pieceStorage_->getWrDiskCache(), piece); // TODO Exception may cause some segments (pieces) are not // canceled. } piece->setUsedBySegment(false); pieceStorage_->cancelPiece(piece, cuid); segmentWrittenLengthMemo_[segment->getIndex()] = segment->getWrittenLength(); A2_LOG_DEBUG(fmt("Memorized segment index=%lu, writtenLength=%d", static_cast(segment->getIndex()), segment->getWrittenLength())); } void SegmentMan::cancelSegment(cuid_t cuid) { for(SegmentEntries::iterator itr = usedSegmentEntries_.begin(), eoi = usedSegmentEntries_.end(); itr != eoi;) { if((*itr)->cuid == cuid) { cancelSegmentInternal(cuid, (*itr)->segment); itr = usedSegmentEntries_.erase(itr); eoi = usedSegmentEntries_.end(); } else { ++itr; } } } void SegmentMan::cancelSegment (cuid_t cuid, const std::shared_ptr& segment) { for(SegmentEntries::iterator itr = usedSegmentEntries_.begin(), eoi = usedSegmentEntries_.end(); itr != eoi;) { if((*itr)->cuid == cuid && *(*itr)->segment == *segment) { cancelSegmentInternal(cuid, (*itr)->segment); itr = usedSegmentEntries_.erase(itr); break; } else { ++itr; } } } void SegmentMan::cancelAllSegments() { for(auto itr = usedSegmentEntries_.begin(), eoi = usedSegmentEntries_.end(); itr != eoi; ++itr) { cancelSegmentInternal((*itr)->cuid, (*itr)->segment); } usedSegmentEntries_.clear(); } void SegmentMan::eraseSegmentWrittenLengthMemo() { segmentWrittenLengthMemo_.clear(); } namespace { class FindSegmentEntry { private: std::shared_ptr segment_; public: FindSegmentEntry(const std::shared_ptr& segment):segment_(segment) {} bool operator()(const std::shared_ptr& segmentEntry) const { return segmentEntry->segment->getIndex() == segment_->getIndex(); } }; } // namespace bool SegmentMan::completeSegment (cuid_t cuid, const std::shared_ptr& segment) { pieceStorage_->completePiece(segment->getPiece()); pieceStorage_->advertisePiece(cuid, segment->getPiece()->getIndex()); SegmentEntries::iterator itr = std::find_if(usedSegmentEntries_.begin(), usedSegmentEntries_.end(), FindSegmentEntry(segment)); if(itr == usedSegmentEntries_.end()) { return false; } else { usedSegmentEntries_.erase(itr); return true; } } bool SegmentMan::hasSegment(size_t index) const { return pieceStorage_->hasPiece(index); } int64_t SegmentMan::getDownloadLength() const { if(!pieceStorage_) { return 0; } else { return pieceStorage_->getCompletedLength(); } } void SegmentMan::registerPeerStat(const std::shared_ptr& peerStat) { peerStats_.push_back(peerStat); } std::shared_ptr SegmentMan::getPeerStat(cuid_t cuid) const { for(auto i = peerStats_.begin(), eoi = peerStats_.end(); i != eoi; ++i) { if((*i)->getCuid() == cuid) { return *i; } } return nullptr; } namespace { class PeerStatHostProtoEqual { private: const std::shared_ptr& peerStat_; public: PeerStatHostProtoEqual(const std::shared_ptr& peerStat): peerStat_(peerStat) {} bool operator()(const std::shared_ptr& p) const { return peerStat_->getHostname() == p->getHostname() && peerStat_->getProtocol() == p->getProtocol(); } }; } // namespace void SegmentMan::updateFastestPeerStat(const std::shared_ptr& peerStat) { auto i = std::find_if(fastestPeerStats_.begin(), fastestPeerStats_.end(), PeerStatHostProtoEqual(peerStat)); if(i == fastestPeerStats_.end()) { fastestPeerStats_.push_back(peerStat); } else if((*i)->getAvgDownloadSpeed() < peerStat->getAvgDownloadSpeed()) { // *i's SessionDownloadLength must be added to peerStat peerStat->addSessionDownloadLength((*i)->getSessionDownloadLength()); *i = peerStat; } else { // peerStat's SessionDownloadLength must be added to *i (*i)->addSessionDownloadLength(peerStat->getSessionDownloadLength()); } } size_t SegmentMan::countFreePieceFrom(size_t index) const { size_t numPieces = downloadContext_->getNumPieces(); for(size_t i = index; i < numPieces; ++i) { if(pieceStorage_->hasPiece(i) || pieceStorage_->isPieceUsed(i)) { return i-index; } } return downloadContext_->getNumPieces()-index; } void SegmentMan::ignoreSegmentFor(const std::shared_ptr& fileEntry) { A2_LOG_DEBUG(fmt("ignoring segment for path=%s, offset=%" PRId64 ", length=%" PRId64 "", fileEntry->getPath().c_str(), fileEntry->getOffset(), fileEntry->getLength())); ignoreBitfield_.addFilter(fileEntry->getOffset(), fileEntry->getLength()); } void SegmentMan::recognizeSegmentFor(const std::shared_ptr& fileEntry) { ignoreBitfield_.removeFilter(fileEntry->getOffset(), fileEntry->getLength()); } bool SegmentMan::allSegmentsIgnored() const { return ignoreBitfield_.isAllFilterBitSet(); } } // namespace aria2