2010-05-21 Tatsuhiro Tsujikawa <t-tujikawa@users.sourceforge.net>

Fixed the bug that connection pooling does not take into account
	proxy.  This means that when connection A via proxy X is pooled,
	it will be wrongly reused in the download using proxy Y.
	* src/DownloadEngine.cc
	* src/DownloadEngine.h
	* src/FtpFinishDownloadCommand.cc
	* src/FtpInitiateConnectionCommand.cc
	* src/FtpNegotiationCommand.cc
	* src/HttpDownloadCommand.cc
	* src/HttpInitiateConnectionCommand.cc
	* src/HttpResponseCommand.cc
	* src/HttpSkipResponseCommand.cc
pull/1/head
Tatsuhiro Tsujikawa 2010-05-21 13:54:50 +00:00
parent 47adbe618c
commit 92f84f71f5
10 changed files with 109 additions and 64 deletions

View File

@ -1,3 +1,18 @@
2010-05-21 Tatsuhiro Tsujikawa <t-tujikawa@users.sourceforge.net>
Fixed the bug that connection pooling does not take into account
proxy. This means that when connection A via proxy X is pooled,
it will be wrongly reused in the download using proxy Y.
* src/DownloadEngine.cc
* src/DownloadEngine.h
* src/FtpFinishDownloadCommand.cc
* src/FtpInitiateConnectionCommand.cc
* src/FtpNegotiationCommand.cc
* src/HttpDownloadCommand.cc
* src/HttpInitiateConnectionCommand.cc
* src/HttpResponseCommand.cc
* src/HttpSkipResponseCommand.cc
2010-05-21 Tatsuhiro Tsujikawa <t-tujikawa@users.sourceforge.net> 2010-05-21 Tatsuhiro Tsujikawa <t-tujikawa@users.sourceforge.net>
Fixed the bug that FTP download may fail when control connection Fixed the bug that FTP download may fail when control connection

View File

@ -279,13 +279,11 @@ void DownloadEngine::addRoutineCommand(Command* command)
_routineCommands.push_back(command); _routineCommands.push_back(command);
} }
void DownloadEngine::poolSocket(const std::string& ipaddr, void DownloadEngine::poolSocket(const std::string& key,
uint16_t port,
const SocketPoolEntry& entry) const SocketPoolEntry& entry)
{ {
std::string addr = strconcat(ipaddr, ":", util::uitos(port)); logger->info("Pool socket for %s", key.c_str());
logger->info("Pool socket for %s", addr.c_str()); std::multimap<std::string, SocketPoolEntry>::value_type p(key, entry);
std::multimap<std::string, SocketPoolEntry>::value_type p(addr, entry);
_socketPool.insert(p); _socketPool.insert(p);
if(_lastSocketPoolScan.difference(global::wallclock) >= 60) { if(_lastSocketPoolScan.difference(global::wallclock) >= 60) {
@ -310,12 +308,24 @@ void DownloadEngine::poolSocket(const std::string& ipaddr,
} }
static std::string createSockPoolKey static std::string createSockPoolKey
(const std::string& host, const std::string& username) (const std::string& host, uint16_t port,
const std::string& username,
const std::string& proxyhost, uint16_t proxyport)
{ {
std::string key; std::string key;
if(!username.empty()) {
key += util::percentEncode(username); key += util::percentEncode(username);
key += '@'; key += '@';
}
key += host; key += host;
key += A2STR::COLON_C;
key += util::uitos(port);
if(!proxyhost.empty()) {
key += A2STR::SLASH_C;
key += proxyhost;
key += A2STR::COLON_C;
key += util::uitos(proxyport);
}
return key; return key;
} }
@ -323,72 +333,78 @@ void DownloadEngine::poolSocket
(const std::string& ipaddr, (const std::string& ipaddr,
uint16_t port, uint16_t port,
const std::string& username, const std::string& username,
const std::string& proxyhost,
uint16_t proxyport,
const SharedHandle<SocketCore>& sock, const SharedHandle<SocketCore>& sock,
const std::map<std::string, std::string>& options, const std::map<std::string, std::string>& options,
time_t timeout) time_t timeout)
{ {
SocketPoolEntry e(sock, options, timeout); SocketPoolEntry e(sock, options, timeout);
poolSocket(createSockPoolKey(ipaddr, username), port, e); poolSocket(createSockPoolKey(ipaddr, port, username, proxyhost, proxyport),e);
} }
void DownloadEngine::poolSocket void DownloadEngine::poolSocket
(const std::string& ipaddr, (const std::string& ipaddr,
uint16_t port, uint16_t port,
const std::string& proxyhost,
uint16_t proxyport,
const SharedHandle<SocketCore>& sock, const SharedHandle<SocketCore>& sock,
time_t timeout) time_t timeout)
{ {
SocketPoolEntry e(sock, timeout); SocketPoolEntry e(sock, timeout);
poolSocket(ipaddr, port, e); poolSocket(createSockPoolKey(ipaddr, port, A2STR::NIL,proxyhost,proxyport),e);
} }
void DownloadEngine::poolSocket(const SharedHandle<Request>& request, void DownloadEngine::poolSocket(const SharedHandle<Request>& request,
bool proxyDefined, const SharedHandle<Request>& proxyRequest,
const SharedHandle<SocketCore>& socket, const SharedHandle<SocketCore>& socket,
time_t timeout) time_t timeout)
{ {
if(proxyDefined) { if(proxyRequest.isNull()) {
// If proxy is defined, then pool socket with its hostname.
poolSocket(request->getHost(), request->getPort(), socket, timeout);
} else {
std::pair<std::string, uint16_t> peerInfo; std::pair<std::string, uint16_t> peerInfo;
socket->getPeerInfo(peerInfo); socket->getPeerInfo(peerInfo);
poolSocket(peerInfo.first, peerInfo.second, socket, timeout); poolSocket(peerInfo.first, peerInfo.second,
A2STR::NIL, 0, socket, timeout);
} else {
// If proxy is defined, then pool socket with its hostname.
poolSocket(request->getHost(), request->getPort(),
proxyRequest->getHost(), proxyRequest->getPort(),
socket, timeout);
} }
} }
void DownloadEngine::poolSocket void DownloadEngine::poolSocket
(const SharedHandle<Request>& request, (const SharedHandle<Request>& request,
bool proxyDefined,
const std::string& username, const std::string& username,
const SharedHandle<Request>& proxyRequest,
const SharedHandle<SocketCore>& socket, const SharedHandle<SocketCore>& socket,
const std::map<std::string, std::string>& options, const std::map<std::string, std::string>& options,
time_t timeout) time_t timeout)
{ {
if(proxyDefined) { if(proxyRequest.isNull()) {
// If proxy is defined, then pool socket with its hostname.
poolSocket(request->getHost(), request->getPort(), username,
socket, options, timeout);
} else {
std::pair<std::string, uint16_t> peerInfo; std::pair<std::string, uint16_t> peerInfo;
socket->getPeerInfo(peerInfo); socket->getPeerInfo(peerInfo);
poolSocket(peerInfo.first, peerInfo.second, username, poolSocket(peerInfo.first, peerInfo.second, username,
A2STR::NIL, 0, socket, options, timeout);
} else {
// If proxy is defined, then pool socket with its hostname.
poolSocket(request->getHost(), request->getPort(), username,
proxyRequest->getHost(), proxyRequest->getPort(),
socket, options, timeout); socket, options, timeout);
} }
} }
std::multimap<std::string, DownloadEngine::SocketPoolEntry>::iterator std::multimap<std::string, DownloadEngine::SocketPoolEntry>::iterator
DownloadEngine::findSocketPoolEntry(const std::string& ipaddr, uint16_t port) DownloadEngine::findSocketPoolEntry(const std::string& key)
{ {
std::string addr = ipaddr;
strappend(addr, ":", util::uitos(port));
std::pair<std::multimap<std::string, SocketPoolEntry>::iterator, std::pair<std::multimap<std::string, SocketPoolEntry>::iterator,
std::multimap<std::string, SocketPoolEntry>::iterator> range = std::multimap<std::string, SocketPoolEntry>::iterator> range =
_socketPool.equal_range(addr); _socketPool.equal_range(key);
for(std::multimap<std::string, SocketPoolEntry>::iterator i = for(std::multimap<std::string, SocketPoolEntry>::iterator i =
range.first, eoi = range.second; i != eoi; ++i) { range.first, eoi = range.second; i != eoi; ++i) {
const SocketPoolEntry& e = (*i).second; const SocketPoolEntry& e = (*i).second;
if(!e.isTimeout()) { if(!e.isTimeout()) {
logger->info("Found socket for %s", addr.c_str()); logger->info("Found socket for %s", key.c_str());
return i; return i;
} }
} }
@ -396,11 +412,14 @@ DownloadEngine::findSocketPoolEntry(const std::string& ipaddr, uint16_t port)
} }
SharedHandle<SocketCore> SharedHandle<SocketCore>
DownloadEngine::popPooledSocket(const std::string& ipaddr, uint16_t port) DownloadEngine::popPooledSocket
(const std::string& ipaddr, uint16_t port,
const std::string& proxyhost, uint16_t proxyport)
{ {
SharedHandle<SocketCore> s; SharedHandle<SocketCore> s;
std::multimap<std::string, SocketPoolEntry>::iterator i = std::multimap<std::string, SocketPoolEntry>::iterator i =
findSocketPoolEntry(ipaddr, port); findSocketPoolEntry
(createSockPoolKey(ipaddr, port, A2STR::NIL, proxyhost, proxyport));
if(i != _socketPool.end()) { if(i != _socketPool.end()) {
s = (*i).second.getSocket(); s = (*i).second.getSocket();
_socketPool.erase(i); _socketPool.erase(i);
@ -409,13 +428,16 @@ DownloadEngine::popPooledSocket(const std::string& ipaddr, uint16_t port)
} }
SharedHandle<SocketCore> SharedHandle<SocketCore>
DownloadEngine::popPooledSocket(std::map<std::string, std::string>& options, DownloadEngine::popPooledSocket
(std::map<std::string, std::string>& options,
const std::string& ipaddr, uint16_t port, const std::string& ipaddr, uint16_t port,
const std::string& username) const std::string& username,
const std::string& proxyhost, uint16_t proxyport)
{ {
SharedHandle<SocketCore> s; SharedHandle<SocketCore> s;
std::multimap<std::string, SocketPoolEntry>::iterator i = std::multimap<std::string, SocketPoolEntry>::iterator i =
findSocketPoolEntry(createSockPoolKey(ipaddr, username), port); findSocketPoolEntry
(createSockPoolKey(ipaddr, port, username, proxyhost, proxyport));
if(i != _socketPool.end()) { if(i != _socketPool.end()) {
s = (*i).second.getSocket(); s = (*i).second.getSocket();
options = (*i).second.getOptions(); options = (*i).second.getOptions();
@ -431,7 +453,7 @@ DownloadEngine::popPooledSocket
SharedHandle<SocketCore> s; SharedHandle<SocketCore> s;
for(std::vector<std::string>::const_iterator i = ipaddrs.begin(), for(std::vector<std::string>::const_iterator i = ipaddrs.begin(),
eoi = ipaddrs.end(); i != eoi; ++i) { eoi = ipaddrs.end(); i != eoi; ++i) {
s = popPooledSocket(*i, port); s = popPooledSocket(*i, port, A2STR::NIL, 0);
if(!s.isNull()) { if(!s.isNull()) {
break; break;
} }
@ -448,7 +470,7 @@ DownloadEngine::popPooledSocket
SharedHandle<SocketCore> s; SharedHandle<SocketCore> s;
for(std::vector<std::string>::const_iterator i = ipaddrs.begin(), for(std::vector<std::string>::const_iterator i = ipaddrs.begin(),
eoi = ipaddrs.end(); i != eoi; ++i) { eoi = ipaddrs.end(); i != eoi; ++i) {
s = popPooledSocket(options, *i, port, username); s = popPooledSocket(options, *i, port, username, A2STR::NIL, 0);
if(!s.isNull()) { if(!s.isNull()) {
break; break;
} }

View File

@ -150,22 +150,10 @@ private:
void afterEachIteration(); void afterEachIteration();
void poolSocket(const std::string& ipaddr, void poolSocket(const std::string& key, const SocketPoolEntry& entry);
uint16_t port,
const SocketPoolEntry& entry);
void poolSocket(const std::string& ipaddr, uint16_t port,
const std::string& username,
const SharedHandle<SocketCore>& sock,
const std::map<std::string, std::string>& options,
time_t timeout);
void poolSocket(const std::string& ipaddr, uint16_t port,
const SharedHandle<SocketCore>& sock,
time_t timeout);
std::multimap<std::string, SocketPoolEntry>::iterator std::multimap<std::string, SocketPoolEntry>::iterator
findSocketPoolEntry(const std::string& ipaddr, uint16_t port); findSocketPoolEntry(const std::string& key);
public: public:
std::deque<Command*> commands; std::deque<Command*> commands;
SharedHandle<RequestGroupMan> _requestGroupMan; SharedHandle<RequestGroupMan> _requestGroupMan;
@ -215,29 +203,45 @@ public:
void addRoutineCommand(Command* command); void addRoutineCommand(Command* command);
void poolSocket(const SharedHandle<Request>& request, void poolSocket(const std::string& ipaddr, uint16_t port,
bool proxyDefined,
const std::string& username, const std::string& username,
const std::string& proxyhost, uint16_t proxyport,
const SharedHandle<SocketCore>& sock,
const std::map<std::string, std::string>& options,
time_t timeout);
void poolSocket(const SharedHandle<Request>& request,
const std::string& username,
const SharedHandle<Request>& proxyRequest,
const SharedHandle<SocketCore>& socket, const SharedHandle<SocketCore>& socket,
const std::map<std::string, std::string>& options, const std::map<std::string, std::string>& options,
time_t timeout = 15); time_t timeout = 15);
void poolSocket(const std::string& ipaddr, uint16_t port,
const std::string& proxyhost, uint16_t proxyport,
const SharedHandle<SocketCore>& sock,
time_t timeout);
void poolSocket(const SharedHandle<Request>& request, void poolSocket(const SharedHandle<Request>& request,
bool proxyDefined, const SharedHandle<Request>& proxyRequest,
const SharedHandle<SocketCore>& socket, const SharedHandle<SocketCore>& socket,
time_t timeout = 15); time_t timeout = 15);
SharedHandle<SocketCore> popPooledSocket(const std::string& ipaddr, SharedHandle<SocketCore> popPooledSocket
uint16_t port); (const std::string& ipaddr,
uint16_t port,
const std::string& proxyhost, uint16_t proxyport);
SharedHandle<SocketCore> popPooledSocket SharedHandle<SocketCore> popPooledSocket
(std::map<std::string, std::string>& options, (std::map<std::string, std::string>& options,
const std::string& ipaddr, const std::string& ipaddr,
uint16_t port, uint16_t port,
const std::string& username); const std::string& username,
const std::string& proxyhost, uint16_t proxyport);
SharedHandle<SocketCore> SharedHandle<SocketCore>
popPooledSocket(const std::vector<std::string>& ipaddrs, uint16_t port); popPooledSocket
(const std::vector<std::string>& ipaddrs, uint16_t port);
SharedHandle<SocketCore> SharedHandle<SocketCore>
popPooledSocket popPooledSocket

View File

@ -82,7 +82,7 @@ bool FtpFinishDownloadCommand::execute()
if(getOption()->getAsBool(PREF_FTP_REUSE_CONNECTION)) { if(getOption()->getAsBool(PREF_FTP_REUSE_CONNECTION)) {
std::map<std::string, std::string> options; std::map<std::string, std::string> options;
options["baseWorkingDir"] = _ftpConnection->getBaseWorkingDir(); options["baseWorkingDir"] = _ftpConnection->getBaseWorkingDir();
e->poolSocket(req, isProxyDefined(), _ftpConnection->getUser(), e->poolSocket(req, _ftpConnection->getUser(), createProxyRequest(),
socket, options); socket, options);
} }
} catch(RecoverableException& e) { } catch(RecoverableException& e) {

View File

@ -78,12 +78,15 @@ Command* FtpInitiateConnectionCommand::createNextCommand
SharedHandle<SocketCore> pooledSocket; SharedHandle<SocketCore> pooledSocket;
std::string proxyMethod = resolveProxyMethod(req->getProtocol()); std::string proxyMethod = resolveProxyMethod(req->getProtocol());
if(proxyMethod == V_GET) { if(proxyMethod == V_GET) {
pooledSocket = e->popPooledSocket(req->getHost(), req->getPort()); pooledSocket = e->popPooledSocket
(req->getHost(), req->getPort(),
proxyRequest->getHost(), proxyRequest->getPort());
} else { } else {
pooledSocket = e->popPooledSocket pooledSocket = e->popPooledSocket
(options, req->getHost(), req->getPort(), (options, req->getHost(), req->getPort(),
e->getAuthConfigFactory()->createAuthConfig e->getAuthConfigFactory()->createAuthConfig
(req, getOption().get())->getUser()); (req, getOption().get())->getUser(),
proxyRequest->getHost(), proxyRequest->getPort());
} }
if(pooledSocket.isNull()) { if(pooledSocket.isNull()) {
if(logger->info()) { if(logger->info()) {

View File

@ -806,7 +806,7 @@ void FtpNegotiationCommand::poolConnection() const
if(getOption()->getAsBool(PREF_FTP_REUSE_CONNECTION)) { if(getOption()->getAsBool(PREF_FTP_REUSE_CONNECTION)) {
std::map<std::string, std::string> options; std::map<std::string, std::string> options;
options["baseWorkingDir"] = ftp->getBaseWorkingDir(); options["baseWorkingDir"] = ftp->getBaseWorkingDir();
e->poolSocket(req, isProxyDefined(), ftp->getUser(), socket, options); e->poolSocket(req, ftp->getUser(), createProxyRequest(), socket, options);
} }
} }

View File

@ -98,7 +98,7 @@ bool HttpDownloadCommand::prepareForNextSegment() {
// pool terminated socket. In HTTP/1.1, keep-alive is default, // pool terminated socket. In HTTP/1.1, keep-alive is default,
// so closing connection without Connection: close header means // so closing connection without Connection: close header means
// that server is broken or not configured properly. // that server is broken or not configured properly.
e->poolSocket(req, isProxyDefined(), socket); e->poolSocket(req, createProxyRequest(), socket);
} }
// The request was sent assuming that server supported pipelining, but // The request was sent assuming that server supported pipelining, but
// it turned out that server didn't support it. // it turned out that server didn't support it.

View File

@ -70,7 +70,8 @@ Command* HttpInitiateConnectionCommand::createNextCommand
Command* command; Command* command;
if(!proxyRequest.isNull()) { if(!proxyRequest.isNull()) {
SharedHandle<SocketCore> pooledSocket = SharedHandle<SocketCore> pooledSocket =
e->popPooledSocket(req->getHost(), req->getPort()); e->popPooledSocket(req->getHost(), req->getPort(),
proxyRequest->getHost(), proxyRequest->getPort());
std::string proxyMethod = resolveProxyMethod(req->getProtocol()); std::string proxyMethod = resolveProxyMethod(req->getProtocol());
if(pooledSocket.isNull()) { if(pooledSocket.isNull()) {
if(logger->info()) { if(logger->info()) {

View File

@ -415,7 +415,7 @@ HttpDownloadCommand* HttpResponseCommand::createHttpDownloadCommand
void HttpResponseCommand::poolConnection() void HttpResponseCommand::poolConnection()
{ {
if(req->supportsPersistentConnection()) { if(req->supportsPersistentConnection()) {
e->poolSocket(req, isProxyDefined(), socket); e->poolSocket(req, createProxyRequest(), socket);
} }
} }

View File

@ -145,7 +145,7 @@ bool HttpSkipResponseCommand::executeInternal()
void HttpSkipResponseCommand::poolConnection() const void HttpSkipResponseCommand::poolConnection() const
{ {
if(req->supportsPersistentConnection()) { if(req->supportsPersistentConnection()) {
e->poolSocket(req, isProxyDefined(), socket); e->poolSocket(req, createProxyRequest(), socket);
} }
} }