Try to use available URI if all pooled requests are sleeping

pull/704/head
Tatsuhiro Tsujikawa 2016-07-07 23:51:51 +09:00
parent b7e19eede7
commit 360ca57231
2 changed files with 91 additions and 57 deletions

View File

@ -136,6 +136,58 @@ OutputIterator enumerateInFlightHosts(InputIterator first, InputIterator last,
}
} // namespace
std::shared_ptr<Request> FileEntry::getRequestWithInFlightHosts(
URISelector* selector, bool uriReuse,
const std::vector<std::pair<size_t, std::string>>& usedHosts,
const std::string& referer, const std::string& method,
const std::vector<std::string>& inFlightHosts)
{
std::shared_ptr<Request> req;
for (int g = 0; g < 2; ++g) {
std::vector<std::string> pending;
std::vector<std::string> ignoreHost;
while (1) {
std::string uri = selector->select(this, usedHosts);
if (uri.empty()) {
break;
}
req = std::make_shared<Request>();
if (req->setUri(uri)) {
if (std::count(std::begin(inFlightHosts), std::end(inFlightHosts),
req->getHost()) >= maxConnectionPerServer_) {
pending.push_back(uri);
ignoreHost.push_back(req->getHost());
req.reset();
continue;
}
if (referer == "*") {
// Assuming uri has already been percent-encoded.
req->setReferer(uri);
}
else {
req->setReferer(util::percentEncodeMini(referer));
}
req->setMethod(method);
spentUris_.push_back(uri);
inFlightRequests_.insert(req);
break;
}
else {
req.reset();
}
}
uris_.insert(std::begin(uris_), std::begin(pending), std::end(pending));
if (g == 0 && uriReuse && !req && uris_.size() == pending.size()) {
// Reuse URIs other than ones in pending
reuseUri(ignoreHost);
continue;
}
return req;
}
}
std::shared_ptr<Request> FileEntry::getRequest(
URISelector* selector, bool uriReuse,
const std::vector<std::pair<size_t, std::string>>& usedHosts,
@ -144,71 +196,47 @@ std::shared_ptr<Request> FileEntry::getRequest(
std::shared_ptr<Request> req;
if (requestPool_.empty()) {
std::vector<std::string> inFlightHosts;
enumerateInFlightHosts(inFlightRequests_.begin(), inFlightRequests_.end(),
enumerateInFlightHosts(std::begin(inFlightRequests_),
std::end(inFlightRequests_),
std::back_inserter(inFlightHosts));
for (int g = 0; g < 2; ++g) {
std::vector<std::string> pending;
std::vector<std::string> ignoreHost;
while (1) {
std::string uri = selector->select(this, usedHosts);
if (uri.empty()) {
break;
}
req = std::make_shared<Request>();
if (req->setUri(uri)) {
if (std::count(inFlightHosts.begin(), inFlightHosts.end(),
req->getHost()) >= maxConnectionPerServer_) {
pending.push_back(uri);
ignoreHost.push_back(req->getHost());
req.reset();
continue;
}
if (referer == "*") {
// Assuming uri has already been percent-encoded.
req->setReferer(uri);
}
else {
req->setReferer(util::percentEncodeMini(referer));
}
req->setMethod(method);
spentUris_.push_back(uri);
inFlightRequests_.insert(req);
break;
}
else {
req.reset();
}
}
uris_.insert(uris_.begin(), pending.begin(), pending.end());
if (g == 0 && uriReuse && !req && uris_.size() == pending.size()) {
// Reuse URIs other than ones in pending
reuseUri(ignoreHost);
}
else {
break;
}
return getRequestWithInFlightHosts(selector, uriReuse, usedHosts, referer,
method, inFlightHosts);
}
// Skip Request object if it is still
// sleeping(Request::getWakeTime() < global::wallclock()). If all
// pooled objects are sleeping, we may return first one. Caller
// should inspect returned object's getWakeTime().
auto i = std::begin(requestPool_);
for (; i != std::end(requestPool_); ++i) {
if ((*i)->getWakeTime() <= global::wallclock()) {
break;
}
}
else {
// Skip Request object if it is still
// sleeping(Request::getWakeTime() < global::wallclock()). If all
// pooled objects are sleeping, return first one. Caller should
// inspect returned object's getWakeTime().
auto i = requestPool_.begin();
auto eoi = requestPool_.end();
for (; i != eoi; ++i) {
if ((*i)->getWakeTime() <= global::wallclock()) {
break;
}
}
if (i == eoi) {
i = requestPool_.begin();
if (i == std::end(requestPool_)) {
// all requests are sleeping; try to another URI
std::vector<std::string> inFlightHosts;
enumerateInFlightHosts(std::begin(inFlightRequests_),
std::end(inFlightRequests_),
std::back_inserter(inFlightHosts));
enumerateInFlightHosts(std::begin(requestPool_), std::end(requestPool_),
std::back_inserter(inFlightHosts));
req = getRequestWithInFlightHosts(selector, uriReuse, usedHosts, referer,
method, inFlightHosts);
if (!req || req->getUri() == (*std::begin(requestPool_))->getUri()) {
i = std::begin(requestPool_);
}
}
if (i != std::end(requestPool_)) {
req = *i;
requestPool_.erase(i);
inFlightRequests_.insert(req);
A2_LOG_DEBUG(fmt("Picked up from pool: %s", req->getUri().c_str()));
}
inFlightRequests_.insert(req);
return req;
}

View File

@ -97,6 +97,12 @@ private:
void storePool(const std::shared_ptr<Request>& request);
std::shared_ptr<Request> getRequestWithInFlightHosts(
URISelector* selector, bool uriReuse,
const std::vector<std::pair<size_t, std::string>>& usedHosts,
const std::string& referer, const std::string& method,
const std::vector<std::string>& inFlightHosts);
public:
FileEntry();