/* */ #include "FileEntry.h" #include #include #include "util.h" #include "URISelector.h" #include "Logger.h" #include "LogFactory.h" #include "wallclock.h" #include "a2algo.h" #include "uri.h" #include "PeerStat.h" #include "fmt.h" #include "ServerStatMan.h" #include "ServerStat.h" namespace aria2 { bool FileEntry::RequestFaster:: operator()(const std::shared_ptr& lhs, const std::shared_ptr& rhs) const { if (!lhs->getPeerStat()) { return false; } if (!rhs->getPeerStat()) { return true; } int lspd = lhs->getPeerStat()->getAvgDownloadSpeed(); int rspd = rhs->getPeerStat()->getAvgDownloadSpeed(); return lspd > rspd || (lspd == rspd && lhs.get() < rhs.get()); } FileEntry::FileEntry(std::string path, int64_t length, int64_t offset, const std::vector& uris) : length_(length), offset_(offset), uris_(uris.begin(), uris.end()), path_(std::move(path)), lastFasterReplace_(Timer::zero()), maxConnectionPerServer_(1), requested_(true), uniqueProtocol_(false) { } FileEntry::FileEntry() : length_(0), offset_(0), maxConnectionPerServer_(1), requested_(false), uniqueProtocol_(false) { } FileEntry::~FileEntry() {} FileEntry& FileEntry::operator=(const FileEntry& entry) { if (this != &entry) { path_ = entry.path_; length_ = entry.length_; offset_ = entry.offset_; requested_ = entry.requested_; } return *this; } bool FileEntry::operator<(const FileEntry& fileEntry) const { return offset_ < fileEntry.offset_; } bool FileEntry::exists() const { return File(getPath()).exists(); } int64_t FileEntry::gtoloff(int64_t goff) const { assert(offset_ <= goff); return goff - offset_; } std::vector FileEntry::getUris() const { std::vector uris(std::begin(spentUris_), std::end(spentUris_)); uris.insert(std::end(uris), std::begin(uris_), std::end(uris_)); return uris; } namespace { template OutputIterator enumerateInFlightHosts(InputIterator first, InputIterator last, OutputIterator out) { for (; first != last; ++first) { uri_split_result us; if (uri_split(&us, (*first)->getUri().c_str()) == 0) { *out++ = uri::getFieldString(us, USR_HOST, (*first)->getUri().c_str()); } } return out; } } // namespace std::shared_ptr FileEntry::getRequest( URISelector* selector, bool uriReuse, const std::vector>& usedHosts, const std::string& referer, const std::string& method) { std::shared_ptr req; if (requestPool_.empty()) { std::vector inFlightHosts; enumerateInFlightHosts(inFlightRequests_.begin(), inFlightRequests_.end(), std::back_inserter(inFlightHosts)); for (int g = 0; g < 2; ++g) { std::vector pending; std::vector ignoreHost; while (1) { std::string uri = selector->select(this, usedHosts); if (uri.empty()) { break; } req = std::make_shared(); if (req->setUri(uri)) { if (std::count(inFlightHosts.begin(), inFlightHosts.end(), req->getHost()) >= maxConnectionPerServer_) { pending.push_back(uri); ignoreHost.push_back(req->getHost()); req.reset(); continue; } if (referer == "*") { // Assuming uri has already been percent-encoded. req->setReferer(uri); } else { req->setReferer(util::percentEncodeMini(referer)); } req->setMethod(method); spentUris_.push_back(uri); inFlightRequests_.insert(req); break; } else { req.reset(); } } uris_.insert(uris_.begin(), pending.begin(), pending.end()); if (g == 0 && uriReuse && !req && uris_.size() == pending.size()) { // Reuse URIs other than ones in pending reuseUri(ignoreHost); } else { break; } } } else { // Skip Request object if it is still // sleeping(Request::getWakeTime() < global::wallclock()). If all // pooled objects are sleeping, return first one. Caller should // inspect returned object's getWakeTime(). auto i = requestPool_.begin(); auto eoi = requestPool_.end(); for (; i != eoi; ++i) { if ((*i)->getWakeTime() <= global::wallclock()) { break; } } if (i == eoi) { i = requestPool_.begin(); } req = *i; requestPool_.erase(i); inFlightRequests_.insert(req); A2_LOG_DEBUG(fmt("Picked up from pool: %s", req->getUri().c_str())); } return req; } namespace { constexpr auto startupIdleTime = 10_s; } // namespace std::shared_ptr FileEntry::findFasterRequest(const std::shared_ptr& base) { if (requestPool_.empty() || lastFasterReplace_.difference(global::wallclock()) < startupIdleTime) { return nullptr; } const std::shared_ptr& fastest = (*requestPool_.begin())->getPeerStat(); if (!fastest) { return nullptr; } const std::shared_ptr& basestat = base->getPeerStat(); // TODO hard coded value. See PREF_STARTUP_IDLE_TIME if (!basestat || (basestat->getDownloadStartTime().difference( global::wallclock()) >= startupIdleTime && fastest->getAvgDownloadSpeed() * 0.8 > basestat->calculateDownloadSpeed())) { // TODO we should consider that "fastest" is very slow. std::shared_ptr fastestRequest = *requestPool_.begin(); requestPool_.erase(requestPool_.begin()); inFlightRequests_.insert(fastestRequest); lastFasterReplace_ = global::wallclock(); return fastestRequest; } return nullptr; } std::shared_ptr FileEntry::findFasterRequest( const std::shared_ptr& base, const std::vector>& usedHosts, const std::shared_ptr& serverStatMan) { constexpr int SPEED_THRESHOLD = 20_k; if (lastFasterReplace_.difference(global::wallclock()) < startupIdleTime) { return nullptr; } std::vector inFlightHosts; enumerateInFlightHosts(inFlightRequests_.begin(), inFlightRequests_.end(), std::back_inserter(inFlightHosts)); const std::shared_ptr& basestat = base->getPeerStat(); A2_LOG_DEBUG("Search faster server using ServerStat."); // Use first 10 good URIs to introduce some randomness. const size_t NUM_URI = 10; std::vector, std::string>> fastCands; std::vector normCands; for (std::deque::const_iterator i = uris_.begin(), eoi = uris_.end(); i != eoi && fastCands.size() < NUM_URI; ++i) { uri_split_result us; if (uri_split(&us, (*i).c_str()) == -1) { continue; } std::string host = uri::getFieldString(us, USR_HOST, (*i).c_str()); std::string protocol = uri::getFieldString(us, USR_SCHEME, (*i).c_str()); if (std::count(inFlightHosts.begin(), inFlightHosts.end(), host) >= maxConnectionPerServer_) { A2_LOG_DEBUG(fmt("%s has already used %d times, not considered.", (*i).c_str(), maxConnectionPerServer_)); continue; } if (findSecond(usedHosts.begin(), usedHosts.end(), host) != usedHosts.end()) { A2_LOG_DEBUG(fmt("%s is in usedHosts, not considered", (*i).c_str())); continue; } std::shared_ptr ss = serverStatMan->find(host, protocol); if (ss && ss->isOK()) { if ((basestat && ss->getDownloadSpeed() > basestat->calculateDownloadSpeed() * 1.5) || (!basestat && ss->getDownloadSpeed() > SPEED_THRESHOLD)) { fastCands.push_back(std::make_pair(ss, *i)); } } } if (!fastCands.empty()) { std::sort(fastCands.begin(), fastCands.end(), ServerStatFaster()); auto fastestRequest = std::make_shared(); const std::string& uri = fastCands.front().second; A2_LOG_DEBUG(fmt("Selected %s from fastCands", uri.c_str())); // Candidate URIs where already parsed when populating fastCands. (void)fastestRequest->setUri(uri); fastestRequest->setReferer(base->getReferer()); uris_.erase(std::find(uris_.begin(), uris_.end(), uri)); spentUris_.push_back(uri); inFlightRequests_.insert(fastestRequest); lastFasterReplace_ = global::wallclock(); return fastestRequest; } A2_LOG_DEBUG("No faster server found."); return nullptr; } void FileEntry::storePool(const std::shared_ptr& request) { const std::shared_ptr& peerStat = request->getPeerStat(); if (peerStat) { // We need to calculate average download speed here in order to // store Request in the right position in the pool. peerStat->calculateAvgDownloadSpeed(); } requestPool_.insert(request); } void FileEntry::poolRequest(const std::shared_ptr& request) { removeRequest(request); if (!request->removalRequested()) { storePool(request); } } bool FileEntry::removeRequest(const std::shared_ptr& request) { return inFlightRequests_.erase(request) == 1; } void FileEntry::removeURIWhoseHostnameIs(const std::string& hostname) { std::deque newURIs; for (std::deque::const_iterator itr = uris_.begin(), eoi = uris_.end(); itr != eoi; ++itr) { uri_split_result us; if (uri_split(&us, (*itr).c_str()) == -1) { continue; } if (us.fields[USR_HOST].len != hostname.size() || memcmp((*itr).c_str() + us.fields[USR_HOST].off, hostname.c_str(), hostname.size()) != 0) { newURIs.push_back(*itr); } } A2_LOG_DEBUG(fmt("Removed %lu duplicate hostname URIs for path=%s", static_cast(uris_.size() - newURIs.size()), getPath().c_str())); uris_.swap(newURIs); } void FileEntry::removeIdenticalURI(const std::string& uri) { uris_.erase(std::remove(uris_.begin(), uris_.end(), uri), uris_.end()); } void FileEntry::addURIResult(std::string uri, error_code::Value result) { uriResults_.push_back(URIResult(uri, result)); } namespace { class FindURIResultByResult { private: error_code::Value r_; public: FindURIResultByResult(error_code::Value r) : r_(r) {} bool operator()(const URIResult& uriResult) const { return uriResult.getResult() == r_; } }; } // namespace void FileEntry::extractURIResult(std::deque& res, error_code::Value r) { auto i = std::stable_partition(uriResults_.begin(), uriResults_.end(), FindURIResultByResult(r)); std::copy(uriResults_.begin(), i, std::back_inserter(res)); uriResults_.erase(uriResults_.begin(), i); } void FileEntry::reuseUri(const std::vector& ignore) { if (A2_LOG_DEBUG_ENABLED) { for (const auto& i : ignore) { A2_LOG_DEBUG(fmt("ignore host=%s", i.c_str())); } } std::deque uris = spentUris_; std::sort(uris.begin(), uris.end()); uris.erase(std::unique(uris.begin(), uris.end()), uris.end()); std::vector errorUris(uriResults_.size()); std::transform(uriResults_.begin(), uriResults_.end(), errorUris.begin(), std::mem_fn(&URIResult::getURI)); std::sort(errorUris.begin(), errorUris.end()); errorUris.erase(std::unique(errorUris.begin(), errorUris.end()), errorUris.end()); if (A2_LOG_DEBUG_ENABLED) { for (std::vector::const_iterator i = errorUris.begin(), eoi = errorUris.end(); i != eoi; ++i) { A2_LOG_DEBUG(fmt("error URI=%s", (*i).c_str())); } } std::vector reusableURIs; std::set_difference(uris.begin(), uris.end(), errorUris.begin(), errorUris.end(), std::back_inserter(reusableURIs)); auto insertionPoint = reusableURIs.begin(); for (auto i = reusableURIs.begin(), eoi = reusableURIs.end(); i != eoi; ++i) { uri_split_result us; if (uri_split(&us, (*i).c_str()) == 0 && std::find(ignore.begin(), ignore.end(), uri::getFieldString(us, USR_HOST, (*i).c_str())) == ignore.end()) { if (i != insertionPoint) { *insertionPoint = *i; } ++insertionPoint; } } reusableURIs.erase(insertionPoint, reusableURIs.end()); size_t ininum = reusableURIs.size(); if (A2_LOG_DEBUG_ENABLED) { A2_LOG_DEBUG( fmt("Found %u reusable URIs", static_cast(ininum))); for (std::vector::const_iterator i = reusableURIs.begin(), eoi = reusableURIs.end(); i != eoi; ++i) { A2_LOG_DEBUG(fmt("URI=%s", (*i).c_str())); } } uris_.insert(uris_.end(), reusableURIs.begin(), reusableURIs.end()); } void FileEntry::releaseRuntimeResource() { requestPool_.clear(); inFlightRequests_.clear(); } namespace { template void putBackUri(std::deque& uris, InputIterator first, InputIterator last) { for (; first != last; ++first) { uris.push_front((*first)->getUri()); } } } // namespace void FileEntry::putBackRequest() { putBackUri(uris_, requestPool_.begin(), requestPool_.end()); putBackUri(uris_, inFlightRequests_.begin(), inFlightRequests_.end()); } namespace { template InputIterator findRequestByUri(InputIterator first, InputIterator last, const T& uri) { for (; first != last; ++first) { if (!(*first)->removalRequested() && (*first)->getUri() == uri) { return first; } } return last; } } // namespace bool FileEntry::removeUri(const std::string& uri) { auto itr = std::find(spentUris_.begin(), spentUris_.end(), uri); if (itr == spentUris_.end()) { itr = std::find(uris_.begin(), uris_.end(), uri); if (itr == uris_.end()) { return false; } uris_.erase(itr); return true; } spentUris_.erase(itr); std::shared_ptr req; auto riter = findRequestByUri(inFlightRequests_.begin(), inFlightRequests_.end(), uri); if (riter == inFlightRequests_.end()) { auto riter = findRequestByUri(requestPool_.begin(), requestPool_.end(), uri); if (riter == requestPool_.end()) { return true; } req = *riter; requestPool_.erase(riter); } else { req = *riter; } req->requestRemoval(); return true; } std::string FileEntry::getBasename() const { return File(path_).getBasename(); } std::string FileEntry::getDirname() const { return File(path_).getDirname(); } size_t FileEntry::setUris(const std::vector& uris) { uris_.clear(); return addUris(uris.begin(), uris.end()); } bool FileEntry::addUri(const std::string& uri) { std::string peUri = util::percentEncodeMini(uri); if (uri_split(nullptr, peUri.c_str()) == 0) { uris_.push_back(peUri); return true; } else { return false; } } bool FileEntry::insertUri(const std::string& uri, size_t pos) { std::string peUri = util::percentEncodeMini(uri); if (uri_split(nullptr, peUri.c_str()) != 0) { return false; } pos = std::min(pos, uris_.size()); uris_.insert(uris_.begin() + pos, peUri); return true; } void FileEntry::setPath(std::string path) { path_ = std::move(path); } void FileEntry::setContentType(std::string contentType) { contentType_ = std::move(contentType); } size_t FileEntry::countInFlightRequest() const { return inFlightRequests_.size(); } size_t FileEntry::countPooledRequest() const { return requestPool_.size(); } void FileEntry::setOriginalName(std::string originalName) { originalName_ = std::move(originalName); } void FileEntry::setSuffixPath(std::string suffixPath) { suffixPath_ = std::move(suffixPath); } bool FileEntry::emptyRequestUri() const { return uris_.empty() && inFlightRequests_.empty() && requestPool_.empty(); } void writeFilePath(std::ostream& o, const std::shared_ptr& entry, bool memory) { if (entry->getPath().empty()) { auto uris = entry->getUris(); if (uris.empty()) { o << "n/a"; } else { o << uris.front(); } return; } if (memory) { o << "[MEMORY]" << File(entry->getPath()).getBasename(); } else { o << entry->getPath(); } } } // namespace aria2