/* */ #include "SegmentMan.h" #include #include #include #include "util.h" #include "message.h" #include "prefs.h" #include "PiecedSegment.h" #include "GrowSegment.h" #include "LogFactory.h" #include "Logger.h" #include "PieceStorage.h" #include "PeerStat.h" #include "Option.h" #include "DownloadContext.h" #include "Piece.h" #include "FileEntry.h" #include "wallclock.h" #include "fmt.h" namespace aria2 { SegmentEntry::SegmentEntry(cuid_t cuid, const SharedHandle& segment) : cuid(cuid), segment(segment) {} SegmentEntry::~SegmentEntry() {} SegmentMan::SegmentMan (const Option* option, const SharedHandle& downloadContext, const PieceStorageHandle& pieceStorage) : option_(option), downloadContext_(downloadContext), pieceStorage_(pieceStorage), lastPeerStatDlspdMapUpdated_(0), cachedDlspd_(0), ignoreBitfield_(downloadContext->getPieceLength(), downloadContext->getTotalLength()) { ignoreBitfield_.enableFilter(); } SegmentMan::~SegmentMan() {} bool SegmentMan::downloadFinished() const { if(!pieceStorage_) { return false; } else { return pieceStorage_->downloadFinished(); } } void SegmentMan::init() { // TODO Do we have to do something about DownloadContext and // PieceStorage here? } uint64_t SegmentMan::getTotalLength() const { if(!pieceStorage_) { return 0; } else { return pieceStorage_->getTotalLength(); } } void SegmentMan::setPieceStorage(const PieceStorageHandle& pieceStorage) { pieceStorage_ = pieceStorage; } void SegmentMan::setDownloadContext (const SharedHandle& downloadContext) { downloadContext_ = downloadContext; } SharedHandle SegmentMan::checkoutSegment (cuid_t cuid, const SharedHandle& piece) { if(!piece) { return SharedHandle(); } A2_LOG_DEBUG(fmt("Attach segment#%lu to CUID#%lld.", static_cast(piece->getIndex()), cuid)); SharedHandle segment; if(piece->getLength() == 0) { segment.reset(new GrowSegment(piece)); } else { segment.reset(new PiecedSegment(downloadContext_->getPieceLength(), piece)); } SegmentEntryHandle entry(new SegmentEntry(cuid, segment)); usedSegmentEntries_.push_back(entry); A2_LOG_DEBUG(fmt("index=%lu, length=%lu, segmentLength=%lu," " writtenLength=%lu", static_cast(segment->getIndex()), static_cast(segment->getLength()), static_cast(segment->getSegmentLength()), static_cast(segment->getWrittenLength()))); if(piece->getLength() > 0) { std::map::iterator positr = segmentWrittenLengthMemo_.find(segment->getIndex()); if(positr != segmentWrittenLengthMemo_.end()) { const size_t writtenLength = (*positr).second; A2_LOG_DEBUG(fmt("writtenLength(in memo)=%lu, writtenLength=%lu", static_cast(writtenLength), static_cast(segment->getWrittenLength())) ); // If the difference between cached writtenLength and segment's // writtenLength is less than one block, we assume that these // missing bytes are already downloaded. if(segment->getWrittenLength() < writtenLength && writtenLength-segment->getWrittenLength() < piece->getBlockLength()) { segment->updateWrittenLength(writtenLength-segment->getWrittenLength()); } } } return segment; } void SegmentMan::getInFlightSegment (std::vector >& segments, cuid_t cuid) { for(SegmentEntries::const_iterator itr = usedSegmentEntries_.begin(), eoi = usedSegmentEntries_.end(); itr != eoi; ++itr) { const SegmentEntryHandle& segmentEntry = *itr; if(segmentEntry->cuid == cuid) { segments.push_back(segmentEntry->segment); } } } SharedHandle SegmentMan::getSegment(cuid_t cuid, size_t minSplitSize) { SharedHandle piece = pieceStorage_->getSparseMissingUnusedPiece (minSplitSize, ignoreBitfield_.getFilterBitfield(), ignoreBitfield_.getBitfieldLength()); return checkoutSegment(cuid, piece); } void SegmentMan::getSegment (std::vector >& segments, cuid_t cuid, size_t minSplitSize, const SharedHandle& fileEntry, size_t maxSegments) { BitfieldMan filter(ignoreBitfield_); filter.enableFilter(); filter.addNotFilter(fileEntry->getOffset(), fileEntry->getLength()); std::vector > pending; while(segments.size() < maxSegments) { SharedHandle segment = checkoutSegment(cuid, pieceStorage_->getSparseMissingUnusedPiece (minSplitSize, filter.getFilterBitfield(), filter.getBitfieldLength())); if(!segment) { break; } if(segment->getPositionToWrite() < fileEntry->getOffset() || fileEntry->getLastOffset() <= segment->getPositionToWrite()) { pending.push_back(segment); } else { segments.push_back(segment); } } for(std::vector >::const_iterator i = pending.begin(), eoi = pending.end(); i != eoi; ++i) { cancelSegment(cuid, *i); } } SharedHandle SegmentMan::getSegmentWithIndex (cuid_t cuid, size_t index) { if(index > 0 && downloadContext_->getNumPieces() <= index) { return SharedHandle(); } return checkoutSegment(cuid, pieceStorage_->getMissingPiece(index)); } SharedHandle SegmentMan::getCleanSegmentIfOwnerIsIdle (cuid_t cuid, size_t index) { if(index > 0 && downloadContext_->getNumPieces() <= index) { return SharedHandle(); } for(SegmentEntries::const_iterator itr = usedSegmentEntries_.begin(), eoi = usedSegmentEntries_.end(); itr != eoi; ++itr) { const SharedHandle& segmentEntry = *itr; if(segmentEntry->segment->getIndex() == index) { if(segmentEntry->segment->getWrittenLength() > 0) { return SharedHandle(); } if(segmentEntry->cuid == cuid) { return segmentEntry->segment; } cuid_t owner = segmentEntry->cuid; SharedHandle ps = getPeerStat(owner); if(!ps || ps->getStatus() == PeerStat::IDLE) { cancelSegment(owner); return getSegmentWithIndex(cuid, index); } else { return SharedHandle(); } } } return SharedHandle(); } void SegmentMan::cancelSegment(const SharedHandle& segment) { A2_LOG_DEBUG(fmt("Canceling segment#%lu", static_cast(segment->getIndex()))); pieceStorage_->cancelPiece(segment->getPiece()); segmentWrittenLengthMemo_[segment->getIndex()] = segment->getWrittenLength(); A2_LOG_DEBUG(fmt("Memorized segment index=%lu, writtenLength=%lu", static_cast(segment->getIndex()), static_cast(segment->getWrittenLength()))); } void SegmentMan::cancelSegment(cuid_t cuid) { for(SegmentEntries::iterator itr = usedSegmentEntries_.begin(), eoi = usedSegmentEntries_.end(); itr != eoi;) { if((*itr)->cuid == cuid) { cancelSegment((*itr)->segment); itr = usedSegmentEntries_.erase(itr); eoi = usedSegmentEntries_.end(); } else { ++itr; } } } void SegmentMan::cancelSegment (cuid_t cuid, const SharedHandle& segment) { for(SegmentEntries::iterator itr = usedSegmentEntries_.begin(), eoi = usedSegmentEntries_.end(); itr != eoi;) { if((*itr)->cuid == cuid && *(*itr)->segment == *segment) { cancelSegment((*itr)->segment); itr = usedSegmentEntries_.erase(itr); //eoi = usedSegmentEntries_.end(); break; } else { ++itr; } } } void SegmentMan::cancelAllSegments() { for(std::deque >::iterator itr = usedSegmentEntries_.begin(), eoi = usedSegmentEntries_.end(); itr != eoi; ++itr) { cancelSegment((*itr)->segment); } usedSegmentEntries_.clear(); } void SegmentMan::eraseSegmentWrittenLengthMemo() { segmentWrittenLengthMemo_.clear(); } namespace { class FindSegmentEntry { private: SharedHandle segment_; public: FindSegmentEntry(const SharedHandle& segment):segment_(segment) {} bool operator()(const SegmentEntryHandle& segmentEntry) const { return segmentEntry->segment->getIndex() == segment_->getIndex(); } }; } // namespace bool SegmentMan::completeSegment (cuid_t cuid, const SharedHandle& segment) { pieceStorage_->completePiece(segment->getPiece()); pieceStorage_->advertisePiece(cuid, segment->getPiece()->getIndex()); SegmentEntries::iterator itr = std::find_if(usedSegmentEntries_.begin(), usedSegmentEntries_.end(), FindSegmentEntry(segment)); if(itr == usedSegmentEntries_.end()) { return false; } else { usedSegmentEntries_.erase(itr); return true; } } bool SegmentMan::hasSegment(size_t index) const { return pieceStorage_->hasPiece(index); } uint64_t SegmentMan::getDownloadLength() const { if(!pieceStorage_) { return 0; } else { return pieceStorage_->getCompletedLength(); } } void SegmentMan::registerPeerStat(const SharedHandle& peerStat) { for(std::vector >::iterator i = peerStats_.begin(), eoi = peerStats_.end(); i != eoi; ++i) { if((*i)->getStatus() == PeerStat::IDLE) { *i = peerStat; return; } } peerStats_.push_back(peerStat); } SharedHandle SegmentMan::getPeerStat(cuid_t cuid) const { for(std::vector >::const_iterator i = peerStats_.begin(), eoi = peerStats_.end(); i != eoi; ++i) { if((*i)->getCuid() == cuid) { return *i; } } return SharedHandle(); } namespace { class PeerStatHostProtoEqual { private: const SharedHandle& peerStat_; public: PeerStatHostProtoEqual(const SharedHandle& peerStat): peerStat_(peerStat) {} bool operator()(const SharedHandle& p) const { return peerStat_->getHostname() == p->getHostname() && peerStat_->getProtocol() == p->getProtocol(); } }; } // namespace void SegmentMan::updateFastestPeerStat(const SharedHandle& peerStat) { std::vector >::iterator i = std::find_if(fastestPeerStats_.begin(), fastestPeerStats_.end(), PeerStatHostProtoEqual(peerStat)); if(i == fastestPeerStats_.end()) { fastestPeerStats_.push_back(peerStat); } else if((*i)->getAvgDownloadSpeed() < peerStat->getAvgDownloadSpeed()) { // *i's SessionDownloadLength must be added to peerStat peerStat->addSessionDownloadLength((*i)->getSessionDownloadLength()); *i = peerStat; } else { // peerStat's SessionDownloadLength must be added to *i (*i)->addSessionDownloadLength(peerStat->getSessionDownloadLength()); } } unsigned int SegmentMan::calculateDownloadSpeed() { unsigned int speed = 0; if(lastPeerStatDlspdMapUpdated_.differenceInMillis(global::wallclock) >= 250){ lastPeerStatDlspdMapUpdated_ = global::wallclock; peerStatDlspdMap_.clear(); for(std::vector >::const_iterator i = peerStats_.begin(), eoi = peerStats_.end(); i != eoi; ++i) { if((*i)->getStatus() == PeerStat::ACTIVE) { unsigned int s = (*i)->calculateDownloadSpeed(); peerStatDlspdMap_[(*i)->getCuid()] = s; speed += s; } } cachedDlspd_ = speed; } else { speed = cachedDlspd_; } return speed; } void SegmentMan::updateDownloadSpeedFor(const SharedHandle& pstat) { unsigned int newspd = pstat->calculateDownloadSpeed(); unsigned int oldSpd = peerStatDlspdMap_[pstat->getCuid()]; if(cachedDlspd_ > oldSpd) { cachedDlspd_ -= oldSpd; cachedDlspd_ += newspd; } else { cachedDlspd_ = newspd; } peerStatDlspdMap_[pstat->getCuid()] = newspd; } namespace { class PeerStatDownloadLengthOperator { public: uint64_t operator()(uint64_t total, const SharedHandle& ps) { return ps->getSessionDownloadLength()+total; } }; } // namespace uint64_t SegmentMan::calculateSessionDownloadLength() const { return std::accumulate(fastestPeerStats_.begin(), fastestPeerStats_.end(), 0LL, PeerStatDownloadLengthOperator()); } size_t SegmentMan::countFreePieceFrom(size_t index) const { size_t numPieces = downloadContext_->getNumPieces(); for(size_t i = index; i < numPieces; ++i) { if(pieceStorage_->hasPiece(i) || pieceStorage_->isPieceUsed(i)) { return i-index; } } return downloadContext_->getNumPieces()-index; } void SegmentMan::ignoreSegmentFor(const SharedHandle& fileEntry) { A2_LOG_DEBUG(fmt("ignoring segment for path=%s, offset=%s, length=%s", fileEntry->getPath().c_str(), util::itos(fileEntry->getOffset()).c_str(), util::uitos(fileEntry->getLength()).c_str())); ignoreBitfield_.addFilter(fileEntry->getOffset(), fileEntry->getLength()); } void SegmentMan::recognizeSegmentFor(const SharedHandle& fileEntry) { ignoreBitfield_.removeFilter(fileEntry->getOffset(), fileEntry->getLength()); } bool SegmentMan::allSegmentsIgnored() const { return ignoreBitfield_.isAllFilterBitSet(); } } // namespace aria2