2008-09-25 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>

Issue PWD command first and get working directory and use it as 
a prefix
	for CWD command.
	* src/DownloadEngine.cc
	* src/DownloadEngine.h
	* src/FtpConnection.cc
	* src/FtpConnection.h
	* src/FtpFinishDownloadCommand.cc
	* src/FtpInitiateConnectionCommand.cc
	* src/FtpNegotiationCommand.cc
	* src/FtpNegotiationCommand.h
	* test/FtpConnectionTest.cc
pull/1/head
Tatsuhiro Tsujikawa 2008-09-24 17:01:57 +00:00
parent d717ffb1d0
commit 6bc233f414
10 changed files with 340 additions and 30 deletions

View File

@ -1,3 +1,17 @@
2008-09-25 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>
Issue PWD command first and get working directory and use it as a prefix
for CWD command.
* src/DownloadEngine.cc
* src/DownloadEngine.h
* src/FtpConnection.cc
* src/FtpConnection.h
* src/FtpFinishDownloadCommand.cc
* src/FtpInitiateConnectionCommand.cc
* src/FtpNegotiationCommand.cc
* src/FtpNegotiationCommand.h
* test/FtpConnectionTest.cc
2008-09-25 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>
Removed default user/pass for FTP user/pass, since it should not have

View File

@ -53,6 +53,7 @@
#include "DlAbortEx.h"
#include "ServerStatMan.h"
#include "CookieStorage.h"
#include "A2STR.h"
#include <signal.h>
#include <cstring>
#include <algorithm>
@ -891,33 +892,106 @@ SharedHandle<CookieStorage> DownloadEngine::getCookieStorage() const
return _cookieStorage;
}
void DownloadEngine::poolSocket(const std::string& ipaddr, uint16_t port,
const SharedHandle<SocketCore>& sock,
time_t timeout)
void DownloadEngine::poolSocket(const std::string& ipaddr,
uint16_t port,
const SocketPoolEntry& entry)
{
std::string addr = ipaddr+":"+Util::uitos(port);
logger->info("Pool socket for %s", addr.c_str());
SocketPoolEntry e(sock, timeout);
std::multimap<std::string, SocketPoolEntry>::value_type p(addr, e);
std::multimap<std::string, SocketPoolEntry>::value_type p(addr, entry);
_socketPool.insert(p);
if(_lastSocketPoolScan.elapsed(60)) {
std::multimap<std::string, SocketPoolEntry> newPool;
logger->debug("Scaning SocketPool and erasing timed out entry.");
_lastSocketPoolScan.reset();
for(std::multimap<std::string, SocketPoolEntry>::iterator i =
_socketPool.begin(); i != _socketPool.end(); ++i) {
if(!(*i).second.isTimeout()) {
newPool.insert(*i);
}
}
logger->debug("%zu entries removed.", _socketPool.size()-newPool.size());
_socketPool = newPool;
}
}
void DownloadEngine::poolSocket
(const std::string& ipaddr,
uint16_t port,
const SharedHandle<SocketCore>& sock,
const std::map<std::string, std::string>& options,
time_t timeout)
{
SocketPoolEntry e(sock, options, timeout);
poolSocket(ipaddr, port, e);
}
void DownloadEngine::poolSocket
(const std::string& ipaddr,
uint16_t port,
const SharedHandle<SocketCore>& sock,
time_t timeout)
{
SocketPoolEntry e(sock, std::map<std::string, std::string>(), timeout);
poolSocket(ipaddr, port, e);
}
std::multimap<std::string, DownloadEngine::SocketPoolEntry>::iterator
DownloadEngine::findSocketPoolEntry(const std::string& ipaddr, uint16_t port)
{
std::string addr = ipaddr+":"+Util::uitos(port);
std::pair<std::multimap<std::string, SocketPoolEntry>::iterator,
std::multimap<std::string, SocketPoolEntry>::iterator> range =
_socketPool.equal_range(addr);
for(std::multimap<std::string, SocketPoolEntry>::iterator i = range.first;
i != range.second; ++i) {
const SocketPoolEntry& e = (*i).second;
if(!e.isTimeout()) {
logger->info("Found socket for %s", addr.c_str());
return i;
}
}
return _socketPool.end();
}
SharedHandle<SocketCore>
DownloadEngine::popPooledSocket(const std::string& ipaddr, uint16_t port)
{
SharedHandle<SocketCore> s;
std::string addr = ipaddr+":"+Util::uitos(port);
std::multimap<std::string, SocketPoolEntry>::iterator i =
findSocketPoolEntry(ipaddr, port);
if(i != _socketPool.end()) {
s = (*i).second.getSocket();
_socketPool.erase(i);
}
return s;
}
std::multimap<std::string, SocketPoolEntry>::iterator first = _socketPool.find(addr);
for(std::multimap<std::string, SocketPoolEntry>::iterator i = first;
i != _socketPool.end() && (*i).first == addr; ++i) {
const SocketPoolEntry& e = (*i).second;
if(!e.isTimeout()) {
logger->info("Reuse socket for %s", addr.c_str());
s = e.getSocket();
_socketPool.erase(first, ++i);
SharedHandle<SocketCore>
DownloadEngine::popPooledSocket(std::map<std::string, std::string>& options,
const std::string& ipaddr, uint16_t port)
{
SharedHandle<SocketCore> s;
std::multimap<std::string, SocketPoolEntry>::iterator i =
findSocketPoolEntry(ipaddr, port);
if(i != _socketPool.end()) {
s = (*i).second.getSocket();
options = (*i).second.getOptions();
_socketPool.erase(i);
}
return s;
}
SharedHandle<SocketCore>
DownloadEngine::popPooledSocket
(const std::deque<std::string>& ipaddrs, uint16_t port)
{
SharedHandle<SocketCore> s;
for(std::deque<std::string>::const_iterator i = ipaddrs.begin();
i != ipaddrs.end(); ++i) {
s = popPooledSocket(*i, port);
if(!s.isNull()) {
break;
}
}
@ -926,22 +1000,26 @@ DownloadEngine::popPooledSocket(const std::string& ipaddr, uint16_t port)
SharedHandle<SocketCore>
DownloadEngine::popPooledSocket
(const std::deque<std::string>& ipaddrs, uint16_t port)
(std::map<std::string, std::string>& options,
const std::deque<std::string>& ipaddrs, uint16_t port)
{
SharedHandle<SocketCore> s;
for(std::deque<std::string>::const_iterator i = ipaddrs.begin();
i != ipaddrs.end(); ++i) {
SharedHandle<SocketCore> s = popPooledSocket(*i, port);
s = popPooledSocket(options, *i, port);
if(!s.isNull()) {
return s;
break;
}
}
return SharedHandle<SocketCore>();
return s;
}
DownloadEngine::SocketPoolEntry::SocketPoolEntry
(const SharedHandle<SocketCore>& socket,
const std::map<std::string, std::string>& options,
time_t timeout):
_socket(socket),
_options(options),
_timeout(timeout) {}
DownloadEngine::SocketPoolEntry::~SocketPoolEntry() {}
@ -956,4 +1034,10 @@ SharedHandle<SocketCore> DownloadEngine::SocketPoolEntry::getSocket() const
return _socket;
}
const std::map<std::string, std::string>&
DownloadEngine::SocketPoolEntry::getOptions() const
{
return _options;
}
} // namespace aria2

View File

@ -44,6 +44,7 @@
#ifdef ENABLE_ASYNC_DNS
# include "AsyncNameResolver.h"
#endif // ENABLE_ASYNC_DNS
#include <string>
#include <deque>
#include <map>
#ifdef HAVE_EPOLL
@ -261,11 +262,14 @@ private:
private:
SharedHandle<SocketCore> _socket;
std::map<std::string, std::string> _options;
time_t _timeout;
Time _registeredTime;
public:
SocketPoolEntry(const SharedHandle<SocketCore>& socket,
const std::map<std::string, std::string>& option,
time_t timeout);
~SocketPoolEntry();
@ -273,11 +277,15 @@ private:
bool isTimeout() const;
SharedHandle<SocketCore> getSocket() const;
const std::map<std::string, std::string>& getOptions() const;
};
// key = IP address:port, value = SocketPoolEntry
std::multimap<std::string, SocketPoolEntry> _socketPool;
Time _lastSocketPoolScan;
bool _noWait;
std::deque<Command*> _routineCommands;
@ -294,6 +302,13 @@ private:
void onEndOfRun();
void afterEachIteration();
void poolSocket(const std::string& ipaddr,
uint16_t port,
const SocketPoolEntry& entry);
std::multimap<std::string, SocketPoolEntry>::iterator
findSocketPoolEntry(const std::string& ipaddr, uint16_t port);
public:
std::deque<Command*> commands;
SharedHandle<RequestGroupMan> _requestGroupMan;
@ -364,15 +379,31 @@ public:
void addRoutineCommand(Command* command);
void poolSocket(const std::string& ipaddr, uint16_t port,
const SharedHandle<SocketCore>& sock, time_t timeout = 15);
const SharedHandle<SocketCore>& sock,
const std::map<std::string, std::string>& options,
time_t timeout = 15);
void poolSocket(const std::string& ipaddr, uint16_t port,
const SharedHandle<SocketCore>& sock,
time_t timeout = 15);
SharedHandle<SocketCore> popPooledSocket(const std::string& ipaddr,
uint16_t port);
SharedHandle<SocketCore> popPooledSocket
(std::map<std::string, std::string>& options,
const std::string& ipaddr,
uint16_t port);
SharedHandle<SocketCore>
popPooledSocket(const std::deque<std::string>& ipaddrs, uint16_t port);
SharedHandle<SocketCore>
popPooledSocket
(std::map<std::string, std::string>& options,
const std::deque<std::string>& ipaddrs,
uint16_t port);
SharedHandle<CookieStorage> getCookieStorage() const;
};

View File

@ -61,7 +61,8 @@ FtpConnection::FtpConnection(int32_t cuid, const SocketHandle& socket,
const RequestHandle& req, const Option* op):
cuid(cuid), socket(socket), req(req), option(op),
logger(LogFactory::getInstance()),
_socketBuffer(socket) {}
_socketBuffer(socket),
_baseWorkingDir("/") {}
FtpConnection::~FtpConnection() {}
@ -108,10 +109,25 @@ bool FtpConnection::sendType()
return _socketBuffer.sendBufferIsEmpty();
}
bool FtpConnection::sendPwd()
{
if(_socketBuffer.sendBufferIsEmpty()) {
std::string request = "PWD\r\n";
logger->info(MSG_SENDING_REQUEST, cuid, request.c_str());
_socketBuffer.feedSendBuffer(request);
}
_socketBuffer.send();
return _socketBuffer.sendBufferIsEmpty();
}
bool FtpConnection::sendCwd()
{
if(_socketBuffer.sendBufferIsEmpty()) {
std::string request = "CWD "+Util::urldecode(req->getDir())+"\r\n";
logger->info("CUID#%d - Using base working directory '%s'",
cuid, _baseWorkingDir.c_str());
std::string request = "CWD "+
(_baseWorkingDir == "/" ? "" : _baseWorkingDir)+
Util::urldecode(req->getDir())+"\r\n";
logger->info(MSG_SENDING_REQUEST, cuid, request.c_str());
_socketBuffer.feedSendBuffer(request);
}
@ -380,4 +396,35 @@ unsigned int FtpConnection::receivePasvResponse(std::pair<std::string, uint16_t>
}
}
unsigned int FtpConnection::receivePwdResponse(std::string& pwd)
{
std::pair<unsigned int, std::string> response;
if(bulkReceiveResponse(response)) {
if(response.first == 257) {
std::string::size_type first;
std::string::size_type last;
if((first = response.second.find("\"")) != std::string::npos &&
(last = response.second.find("\"", ++first)) != std::string::npos) {
pwd = response.second.substr(first, last-first);
} else {
throw DlAbortEx(EX_INVALID_RESPONSE);
}
}
return response.first;
} else {
return 0;
}
}
void FtpConnection::setBaseWorkingDir(const std::string& baseWorkingDir)
{
_baseWorkingDir = baseWorkingDir;
}
const std::string& FtpConnection::getBaseWorkingDir() const
{
return _baseWorkingDir;
}
} // namespace aria2

View File

@ -62,6 +62,8 @@ private:
SocketBuffer _socketBuffer;
std::string _baseWorkingDir;
unsigned int getStatus(const std::string& response) const;
std::string::size_type findEndOfResponse(unsigned int status,
const std::string& buf) const;
@ -79,6 +81,7 @@ public:
bool sendUser();
bool sendPass();
bool sendType();
bool sendPwd();
bool sendCwd();
bool sendMdtm();
bool sendSize();
@ -98,6 +101,11 @@ public:
// If reply is not received yet, returns 0.
unsigned int receiveMdtmResponse(Time& time);
unsigned int receivePasvResponse(std::pair<std::string, uint16_t>& dest);
unsigned int receivePwdResponse(std::string& pwd);
void setBaseWorkingDir(const std::string& baseWorkingDir);
const std::string& getBaseWorkingDir() const;
};
} // namespace aria2

View File

@ -44,6 +44,7 @@
#include "SocketCore.h"
#include "RequestGroup.h"
#include "Logger.h"
#include <map>
namespace aria2 {
@ -85,7 +86,9 @@ bool FtpFinishDownloadCommand::execute()
e->option->getAsBool(PREF_FTP_REUSE_CONNECTION)) {
std::pair<std::string, uint16_t> peerInfo;
socket->getPeerInfo(peerInfo);
e->poolSocket(peerInfo.first, peerInfo.second, socket);
std::map<std::string, std::string> options;
options["baseWorkingDir"] = _ftpConnection->getBaseWorkingDir();
e->poolSocket(peerInfo.first, peerInfo.second, socket, options);
}
} catch(RecoverableException& e) {
logger->info(EX_EXCEPTION_CAUGHT, e);

View File

@ -47,6 +47,7 @@
#include "prefs.h"
#include "HttpConnection.h"
#include "Socket.h"
#include <map>
namespace aria2 {
@ -81,17 +82,20 @@ Command* FtpInitiateConnectionCommand::createNextCommand
throw DlAbortEx("ERROR");
}
} else {
std::map<std::string, std::string> options;
SharedHandle<SocketCore> pooledSocket =
e->popPooledSocket(resolvedAddresses, req->getPort());
e->popPooledSocket(options, resolvedAddresses, req->getPort());
if(pooledSocket.isNull()) {
logger->info(MSG_CONNECTING_TO_SERVER, cuid, req->getHost().c_str(),
req->getPort());
socket.reset(new SocketCore());
socket->establishConnection(resolvedAddresses.front(), req->getPort());
command = new FtpNegotiationCommand(cuid, req, _requestGroup, e, socket);
} else {
command = new FtpNegotiationCommand(cuid, req, _requestGroup, e, pooledSocket, FtpNegotiationCommand::SEQ_SEND_CWD);
command =
new FtpNegotiationCommand(cuid, req, _requestGroup, e, pooledSocket,
FtpNegotiationCommand::SEQ_SEND_CWD,
options["baseWorkingDir"]);
}
}
return command;

View File

@ -59,6 +59,7 @@
#include <stdint.h>
#include <cassert>
#include <utility>
#include <map>
namespace aria2 {
@ -67,10 +68,12 @@ FtpNegotiationCommand::FtpNegotiationCommand(int32_t cuid,
RequestGroup* requestGroup,
DownloadEngine* e,
const SocketHandle& s,
Seq seq):
Seq seq,
const std::string& baseWorkingDir):
AbstractCommand(cuid, req, requestGroup, e, s), sequence(seq),
ftp(new FtpConnection(cuid, socket, req, e->option))
{
ftp->setBaseWorkingDir(baseWorkingDir);
if(seq == SEQ_RECV_GREETING) {
setTimeout(e->option->getAsInt(PREF_CONNECT_TIMEOUT));
}
@ -200,6 +203,33 @@ bool FtpNegotiationCommand::recvType() {
if(status != 200) {
throw DlAbortEx(StringFormat(EX_BAD_STATUS, status).str());
}
sequence = SEQ_SEND_PWD;
return true;
}
bool FtpNegotiationCommand::sendPwd()
{
if(ftp->sendPwd()) {
disableWriteCheckSocket();
sequence = SEQ_RECV_PWD;
} else {
setWriteCheckSocket(socket);
}
return false;
}
bool FtpNegotiationCommand::recvPwd()
{
std::string pwd;
unsigned int status = ftp->receivePwdResponse(pwd);
if(status == 0) {
return false;
}
if(status != 257) {
throw DlAbortEx(StringFormat(EX_BAD_STATUS, status).str());
}
ftp->setBaseWorkingDir(pwd);
logger->info("CUID#%d - base working directory is '%s'", cuid, pwd.c_str());
sequence = SEQ_SEND_CWD;
return true;
}
@ -536,6 +566,10 @@ bool FtpNegotiationCommand::processSequence(const SegmentHandle& segment) {
return sendType();
case SEQ_RECV_TYPE:
return recvType();
case SEQ_SEND_PWD:
return sendPwd();
case SEQ_RECV_PWD:
return recvPwd();
case SEQ_SEND_CWD:
return sendCwd();
case SEQ_RECV_CWD:
@ -582,7 +616,9 @@ void FtpNegotiationCommand::poolConnection() const
e->option->getAsBool(PREF_FTP_REUSE_CONNECTION)) {
std::pair<std::string, uint16_t> peerInfo;
socket->getPeerInfo(peerInfo);
e->poolSocket(peerInfo.first, peerInfo.second, socket);
std::map<std::string, std::string> options;
options["baseWorkingDir"] = ftp->getBaseWorkingDir();
e->poolSocket(peerInfo.first, peerInfo.second, socket, options);
}
}

View File

@ -52,6 +52,8 @@ public:
SEQ_RECV_PASS,
SEQ_SEND_TYPE,
SEQ_RECV_TYPE,
SEQ_SEND_PWD,
SEQ_RECV_PWD,
SEQ_SEND_CWD,
SEQ_RECV_CWD,
SEQ_SEND_MDTM,
@ -83,6 +85,8 @@ private:
bool recvPass();
bool sendType();
bool recvType();
bool sendPwd();
bool recvPwd();
bool sendCwd();
bool recvCwd();
bool sendMdtm();
@ -120,7 +124,8 @@ public:
RequestGroup* requestGroup,
DownloadEngine* e,
const SharedHandle<SocketCore>& s,
Seq seq = SEQ_RECV_GREETING);
Seq seq = SEQ_RECV_GREETING,
const std::string& baseWorkingDir = "/");
virtual ~FtpNegotiationCommand();
};

View File

@ -5,6 +5,7 @@
#include "Request.h"
#include "Option.h"
#include "DlRetryEx.h"
#include "DlAbortEx.h"
#include <iostream>
#include <cstring>
#include <cppunit/extensions/HelperMacros.h>
@ -18,6 +19,12 @@ class FtpConnectionTest:public CppUnit::TestFixture {
CPPUNIT_TEST(testReceiveResponse_overflow);
CPPUNIT_TEST(testSendMdtm);
CPPUNIT_TEST(testReceiveMdtmResponse);
CPPUNIT_TEST(testSendPwd);
CPPUNIT_TEST(testReceivePwdResponse);
CPPUNIT_TEST(testReceivePwdResponse_unquotedResponse);
CPPUNIT_TEST(testReceivePwdResponse_badStatus);
CPPUNIT_TEST(testSendCwd);
CPPUNIT_TEST(testSendCwd_baseWorkingDir);
CPPUNIT_TEST_SUITE_END();
private:
SharedHandle<SocketCore> _serverSocket;
@ -54,6 +61,12 @@ public:
void testReceiveMdtmResponse();
void testReceiveResponse();
void testReceiveResponse_overflow();
void testSendPwd();
void testReceivePwdResponse();
void testReceivePwdResponse_unquotedResponse();
void testReceivePwdResponse_badStatus();
void testSendCwd();
void testSendCwd_baseWorkingDir();
};
@ -159,4 +172,69 @@ void FtpConnectionTest::testReceiveResponse_overflow()
}
}
void FtpConnectionTest::testSendPwd()
{
_ftp->sendPwd();
char data[32];
size_t len = sizeof(data);
_serverSocket->readData(data, len);
CPPUNIT_ASSERT_EQUAL((size_t)5, len);
data[len] = '\0';
CPPUNIT_ASSERT_EQUAL(std::string("PWD\r\n"), std::string(data));
}
void FtpConnectionTest::testReceivePwdResponse()
{
std::string pwd;
_serverSocket->writeData("257 ");
CPPUNIT_ASSERT_EQUAL((unsigned int)0, _ftp->receivePwdResponse(pwd));
CPPUNIT_ASSERT(pwd.empty());
_serverSocket->writeData("\"/dir/to\" is your directory.\r\n");
CPPUNIT_ASSERT_EQUAL((unsigned int)257, _ftp->receivePwdResponse(pwd));
CPPUNIT_ASSERT_EQUAL(std::string("/dir/to"), pwd);
}
void FtpConnectionTest::testReceivePwdResponse_unquotedResponse()
{
std::string pwd;
_serverSocket->writeData("257 /dir/to\r\n");
try {
_ftp->receivePwdResponse(pwd);
CPPUNIT_FAIL("exception must be thrown.");
} catch(DlAbortEx& e) {
// success
}
}
void FtpConnectionTest::testReceivePwdResponse_badStatus()
{
std::string pwd;
_serverSocket->writeData("500 failed\r\n");
CPPUNIT_ASSERT_EQUAL((unsigned int)500, _ftp->receivePwdResponse(pwd));
CPPUNIT_ASSERT(pwd.empty());
}
void FtpConnectionTest::testSendCwd()
{
_ftp->sendCwd();
char data[32];
size_t len = sizeof(data);
_serverSocket->readData(data, len);
CPPUNIT_ASSERT_EQUAL((size_t)10, len);
data[len] = '\0';
CPPUNIT_ASSERT_EQUAL(std::string("CWD /dir\r\n"), std::string(data));
}
void FtpConnectionTest::testSendCwd_baseWorkingDir()
{
_ftp->setBaseWorkingDir("/base");
_ftp->sendCwd();
char data[32];
size_t len = sizeof(data);
_serverSocket->readData(data, len);
CPPUNIT_ASSERT_EQUAL((size_t)15, len);
data[len] = '\0';
CPPUNIT_ASSERT_EQUAL(std::string("CWD /base/dir\r\n"), std::string(data));
}
} // namespace aria2