2008-05-31 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>

Added epoll support. Use epoll if it is available. If not, use 
select.
	When async dns is enabled in compile time, epoll support is 
enabled
	only when aria2 is built with c-ares. This condition statement 
is
	defined in a2io.h.
	Currently aria2 supports both c-ares and ares, but ares is 
maintained
	for quite a while and debian already has c-ares package, ares 
support
	will be dropped in the future version.
	* configure.ac
	* src/AbstractCommand.cc
	* src/AsyncNameResolver.cc
	* src/AsyncNameResolver.h
	* src/Command.cc
	* src/Command.h
	* src/DownloadEngine.cc
	* src/DownloadEngine.h
	* src/PeerAbstractCommand.cc
	* src/SocketCore.cc
	* src/SocketCore.h
	* src/a2io.h
pull/1/head
Tatsuhiro Tsujikawa 2008-05-31 10:31:07 +00:00
parent 242267028d
commit 54ee6c4627
15 changed files with 1085 additions and 116 deletions

View File

@ -1,3 +1,25 @@
2008-05-31 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>
Added epoll support. Use epoll if it is available. If not, use select.
When async dns is enabled in compile time, epoll support is enabled
only when aria2 is built with c-ares. This condition statement is
defined in a2io.h.
Currently aria2 supports both c-ares and ares, but ares is maintained
for quite a while and debian already has c-ares package, ares support
will be dropped in the future version.
* configure.ac
* src/AbstractCommand.cc
* src/AsyncNameResolver.cc
* src/AsyncNameResolver.h
* src/Command.cc
* src/Command.h
* src/DownloadEngine.cc
* src/DownloadEngine.h
* src/PeerAbstractCommand.cc
* src/SocketCore.cc
* src/SocketCore.h
* src/a2io.h
2008-05-31 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>
* test/MetalinkEntryTest.cc: Added missing `defined' keyword to #if

View File

@ -102,6 +102,9 @@
/* Define to 1 if you don't have `vprintf' but do have `_doprnt.' */
#undef HAVE_DOPRNT
/* Define to 1 if you have the `epoll_create' function. */
#undef HAVE_EPOLL_CREATE
/* Define to 1 if you have the `EVP_DigestInit_ex' function. */
#undef HAVE_EVP_DIGESTINIT_EX

95
configure vendored
View File

@ -20003,6 +20003,101 @@ done
for ac_func in epoll_create
do
as_ac_var=`echo "ac_cv_func_$ac_func" | $as_tr_sh`
{ echo "$as_me:$LINENO: checking for $ac_func" >&5
echo $ECHO_N "checking for $ac_func... $ECHO_C" >&6; }
if { as_var=$as_ac_var; eval "test \"\${$as_var+set}\" = set"; }; then
echo $ECHO_N "(cached) $ECHO_C" >&6
else
cat >conftest.$ac_ext <<_ACEOF
/* confdefs.h. */
_ACEOF
cat confdefs.h >>conftest.$ac_ext
cat >>conftest.$ac_ext <<_ACEOF
/* end confdefs.h. */
/* Define $ac_func to an innocuous variant, in case <limits.h> declares $ac_func.
For example, HP-UX 11i <limits.h> declares gettimeofday. */
#define $ac_func innocuous_$ac_func
/* System header to define __stub macros and hopefully few prototypes,
which can conflict with char $ac_func (); below.
Prefer <limits.h> to <assert.h> if __STDC__ is defined, since
<limits.h> exists even on freestanding compilers. */
#ifdef __STDC__
# include <limits.h>
#else
# include <assert.h>
#endif
#undef $ac_func
/* Override any GCC internal prototype to avoid an error.
Use char because int might match the return type of a GCC
builtin and then its argument prototype would still apply. */
#ifdef __cplusplus
extern "C"
#endif
char $ac_func ();
/* The GNU C library defines this for functions which it implements
to always fail with ENOSYS. Some functions are actually named
something starting with __ and the normal name is an alias. */
#if defined __stub_$ac_func || defined __stub___$ac_func
choke me
#endif
int
main ()
{
return $ac_func ();
;
return 0;
}
_ACEOF
rm -f conftest.$ac_objext conftest$ac_exeext
if { (ac_try="$ac_link"
case "(($ac_try" in
*\"* | *\`* | *\\*) ac_try_echo=\$ac_try;;
*) ac_try_echo=$ac_try;;
esac
eval "echo \"\$as_me:$LINENO: $ac_try_echo\"") >&5
(eval "$ac_link") 2>conftest.er1
ac_status=$?
grep -v '^ *+' conftest.er1 >conftest.err
rm -f conftest.er1
cat conftest.err >&5
echo "$as_me:$LINENO: \$? = $ac_status" >&5
(exit $ac_status); } && {
test -z "$ac_cxx_werror_flag" ||
test ! -s conftest.err
} && test -s conftest$ac_exeext &&
$as_test_x conftest$ac_exeext; then
eval "$as_ac_var=yes"
else
echo "$as_me: failed program was:" >&5
sed 's/^/| /' conftest.$ac_ext >&5
eval "$as_ac_var=no"
fi
rm -f core conftest.err conftest.$ac_objext conftest_ipa8_conftest.oo \
conftest$ac_exeext conftest.$ac_ext
fi
ac_res=`eval echo '${'$as_ac_var'}'`
{ echo "$as_me:$LINENO: result: $ac_res" >&5
echo "${ECHO_T}$ac_res" >&6; }
if test `eval echo '${'$as_ac_var'}'` = yes; then
cat >>confdefs.h <<_ACEOF
#define `echo "HAVE_$ac_func" | $as_tr_cpp` 1
_ACEOF
fi
done
for ac_func in basename
do
as_ac_var=`echo "ac_cv_func_$ac_func" | $as_tr_sh`

View File

@ -183,6 +183,8 @@ AC_FUNC_STRFTIME
AC_FUNC_VPRINTF
AC_CHECK_FUNCS([__argz_count __argz_next __argz_stringify atexit daemon ftruncate getcwd gethostbyaddr gethostbyname getpagesize inet_ntoa memchr memmove mempcpy memset mkdir munmap nl_langinfo posix_memalign putenv rmdir select setlocale sleep socket stpcpy strcasecmp strchr strcspn strdup strerror strncasecmp strstr strtol strtoul strtoull tzset unsetenv usleep])
AC_CHECK_FUNCS([epoll_create])
AC_CHECK_FUNCS([basename],
[AM_CONDITIONAL([HAVE_BASENAME], true)],
[AM_CONDITIONAL([HAVE_BASENAME], false)])

View File

@ -103,8 +103,8 @@ bool AbstractCommand::execute() {
return true;
}
}
if((checkSocketIsReadable && readCheckTarget->isReadable(0)) ||
(checkSocketIsWritable && writeCheckTarget->isWritable(0)) ||
if((checkSocketIsReadable && _readEvent) ||
(checkSocketIsWritable && _writeEvent) ||
#ifdef ENABLE_ASYNC_DNS
(nameResolverCheck && nameResolveFinished()) ||
#endif // ENABLE_ASYNC_DNS

View File

@ -115,6 +115,20 @@ void AsyncNameResolver::process(fd_set* rfdsPtr, fd_set* wfdsPtr)
ares_process(channel, rfdsPtr, wfdsPtr);
}
#ifdef HAVE_LIBCARES
int AsyncNameResolver::getsock(int* sockets) const
{
return ares_getsock(channel, sockets, ARES_GETSOCK_MAXNUM);
}
void AsyncNameResolver::process(ares_socket_t readfd, ares_socket_t writefd)
{
ares_process_fd(channel, readfd, writefd);
}
#endif // HAVE_LIBCARES
bool AsyncNameResolver::operator==(const AsyncNameResolver& resolver) const
{
return this == &resolver;

View File

@ -89,6 +89,14 @@ public:
void process(fd_set* rfdsPtr, fd_set* wfdsPtr);
#ifdef HAVE_LIBCARES
int getsock(int* sockets) const;
void process(ares_socket_t readfd, ares_socket_t writefd);
#endif // HAVE_LIBCARES
bool operator==(const AsyncNameResolver& resolver) const;
void setAddr(const std::string& addrString);

View File

@ -43,7 +43,10 @@ int32_t Command::uuidGen = 0;
Command::Command(int32_t cuid):uuid(uuidGen++),
status(STATUS_INACTIVE),
cuid(cuid),
logger(LogFactory::getInstance()) {}
logger(LogFactory::getInstance()),
_readEvent(false),
_writeEvent(false),
_errorEvent(false) {}
void Command::transitStatus()
{
@ -60,4 +63,26 @@ void Command::setStatus(STATUS status)
this->status = status;
}
void Command::readEventReceived()
{
_readEvent = true;
}
void Command::writeEventReceived()
{
_writeEvent = true;
}
void Command::errorEventRecieved()
{
_errorEvent = true;
}
void Command::clearIOEvents()
{
_readEvent = false;
_writeEvent = false;
_errorEvent = false;
}
} // namespace aria2

View File

@ -61,6 +61,10 @@ private:
protected:
int32_t cuid;
Logger* logger;
bool _readEvent;
bool _writeEvent;
bool _errorEvent;
public:
Command(int32_t cuid);
@ -86,6 +90,14 @@ public:
}
void transitStatus();
void readEventReceived();
void writeEventReceived();
void errorEventRecieved();
void clearIOEvents();
};
typedef std::deque<Command*> Commands;

View File

@ -50,9 +50,12 @@
#include "Socket.h"
#include "Util.h"
#include "a2functional.h"
#include "DlAbortEx.h"
#include <signal.h>
#include <cstring>
#include <algorithm>
#include <numeric>
#include <cerrno>
namespace aria2 {
@ -63,40 +66,361 @@ namespace aria2 {
// 4 ... 2nd stop signal processed by DownloadEngine
volatile sig_atomic_t globalHaltRequested = 0;
SocketEntry::SocketEntry(const SocketHandle& socket,
Command* command,
TYPE type):
socket(socket), command(command), type(type) {}
bool SocketEntry::operator==(const SocketEntry& entry)
CommandEvent::CommandEvent(Command* command, int events):
_command(command), _events(events) {}
bool CommandEvent::operator==(const CommandEvent& commandEvent) const
{
return socket == entry.socket &&
command == entry.command &&
type == entry.type;
return _command == commandEvent._command;
}
int CommandEvent::getEvents() const
{
return _events;
}
void CommandEvent::addEvents(int events)
{
_events |= events;
}
void CommandEvent::removeEvents(int events)
{
_events &= (~events);
}
bool CommandEvent::eventsEmpty() const
{
return _events == 0;
}
void CommandEvent::processEvents(int events)
{
if((_events&events) ||
((SocketEntry::EVENT_ERROR|SocketEntry::EVENT_HUP)&events)) {
_command->setStatusActive();
}
if(SocketEntry::EVENT_READ&events) {
_command->readEventReceived();
}
if(SocketEntry::EVENT_WRITE&events) {
_command->writeEventReceived();
}
if((SocketEntry::EVENT_ERROR|SocketEntry::EVENT_HUP)&events) {
_command->errorEventRecieved();
}
}
#if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS
ADNSEvent::ADNSEvent(const SharedHandle<AsyncNameResolver>& resolver,
Command* command,
int socket, int events):
_resolver(resolver), _command(command), _socket(socket), _events(events) {}
bool ADNSEvent::operator==(const ADNSEvent& event) const
{
return _resolver == event._resolver;
}
int ADNSEvent::getEvents() const
{
return _events;
}
void ADNSEvent::processEvents(int events)
{
ares_socket_t readfd;
ares_socket_t writefd;
if(events&EPOLLIN) {
readfd = _socket;
} else {
readfd = ARES_SOCKET_BAD;
}
if(events&EPOLLOUT) {
writefd = _socket;
} else {
writefd = ARES_SOCKET_BAD;
}
_resolver->process(readfd, writefd);
_command->setStatusActive();
}
#endif // HAVE_EPOLL && ENABLE_ASYNC_DNS
SocketEntry::SocketEntry(int socket):_socket(socket)
{
#ifdef HAVE_EPOLL
memset(&_epEvent, 0, sizeof(struct epoll_event));
#endif // HAVE_EPOLL
}
bool SocketEntry::operator==(const SocketEntry& entry) const
{
return _socket == entry._socket;
}
bool SocketEntry::operator<(const SocketEntry& entry) const
{
return _socket < entry._socket;
}
void SocketEntry::addCommandEvent(Command* command, int events)
{
CommandEvent cev(command, events);
std::deque<CommandEvent>::iterator i = std::find(_commandEvents.begin(),
_commandEvents.end(),
cev);
if(i == _commandEvents.end()) {
_commandEvents.push_back(cev);
} else {
(*i).addEvents(events);
}
}
void SocketEntry::removeCommandEvent(Command* command, int events)
{
CommandEvent cev(command, events);
std::deque<CommandEvent>::iterator i = std::find(_commandEvents.begin(),
_commandEvents.end(),
cev);
if(i == _commandEvents.end()) {
// not found
} else {
(*i).removeEvents(events);
if((*i).eventsEmpty()) {
_commandEvents.erase(i);
}
}
}
#if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS
void SocketEntry::addADNSEvent(const SharedHandle<AsyncNameResolver>& resolver,
Command* command, int events)
{
ADNSEvent aev(resolver, command, _socket, events);
std::deque<ADNSEvent>::iterator i = std::find(_adnsEvents.begin(),
_adnsEvents.end(),
aev);
if(i == _adnsEvents.end()) {
_adnsEvents.push_back(aev);
}
}
void SocketEntry::removeADNSEvent(const SharedHandle<AsyncNameResolver>& resolver,
Command* command)
{
ADNSEvent aev(resolver, command, _socket, 0);
std::deque<ADNSEvent>::iterator i = std::find(_adnsEvents.begin(),
_adnsEvents.end(),
aev);
if(i == _adnsEvents.end()) {
// not found
} else {
_adnsEvents.erase(i);
}
}
#endif // HAVE_EPOLL && ENABLE_ASYNC_DNS
void SocketEntry::processEvents(int events)
{
std::for_each(_commandEvents.begin(), _commandEvents.end(),
std::bind2nd(std::mem_fun_ref(&CommandEvent::processEvents),
events));
#if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS
std::for_each(_adnsEvents.begin(), _adnsEvents.end(),
std::bind2nd(std::mem_fun_ref(&ADNSEvent::processEvents),
events));
#endif // HAVE_EPOLL && ENABLE_ASYNC_DNS
}
int SocketEntry::getSocket() const
{
return _socket;
}
bool SocketEntry::eventEmpty() const
{
#if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS
return _commandEvents.empty() && _adnsEvents.empty();
#else // !(HAVE_EPOLL && ENABLE_ASYNC_DNS)
return _commandEvents.empty();
#endif // !(HAVE_EPOLL && ENABLE_ASYNC_DNS)
}
class AccEvent {
public:
int operator()(int events, const CommandEvent& commandEvent) const
{
return events|commandEvent.getEvents();
}
#if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS
int operator()(int events, const ADNSEvent& adnsEvent) const
{
return events|adnsEvent.getEvents();
}
#endif // HAVE_EPOLL && ENABLE_ASYNC_DNS
};
#ifdef HAVE_EPOLL
struct epoll_event& SocketEntry::getEpEvent()
{
_epEvent.data.ptr = this;
#ifdef ENABLE_ASYNC_DNS
_epEvent.events =
std::accumulate(_adnsEvents.begin(),
_adnsEvents.end(),
std::accumulate(_commandEvents.begin(),
_commandEvents.end(), 0, AccEvent()),
AccEvent());
#else // !ENABLE_ASYNC_DNS
_epEvent.events =
std::accumulate(_commandEvents.begin(), _commandEvents.end(), 0, AccEvent());
#endif // !ENABLE_ASYNC_DNS
return _epEvent;
}
#else // !HAVE_EPOLL
int SocketEntry::getEvents()
{
return
std::accumulate(_commandEvents.begin(), _commandEvents.end(), 0, AccEvent());
}
#endif // !HAVE_EPOLL
#ifdef ENABLE_ASYNC_DNS
AsyncNameResolverEntry::AsyncNameResolverEntry
(const SharedHandle<AsyncNameResolver>& nameResolver,
Command* command):
nameResolver(nameResolver), command(command) {}
(const SharedHandle<AsyncNameResolver>& nameResolver, Command* command):
_nameResolver(nameResolver), _command(command)
#ifdef HAVE_EPOLL
, _socketsSize(0)
#endif // HAVE_EPOLL
{}
bool AsyncNameResolverEntry::operator==(const AsyncNameResolverEntry& entry)
{
return nameResolver == entry.nameResolver &&
command == entry.command;
return _nameResolver == entry._nameResolver &&
_command == entry._command;
}
#ifdef HAVE_EPOLL
void AsyncNameResolverEntry::addSocketEvents(DownloadEngine* e)
{
_socketsSize = 0;
int mask = _nameResolver->getsock(_sockets);
if(mask == 0) {
return;
}
size_t i;
for(i = 0; i < ARES_GETSOCK_MAXNUM; ++i) {
//epoll_event_t* epEventPtr = &_epEvents[_socketsSize];
int events = 0;
if(ARES_GETSOCK_READABLE(mask, i)) {
events |= EPOLLIN;
}
if(ARES_GETSOCK_WRITABLE(mask, i)) {
events |= EPOLLOUT;
}
if(events == 0) {
// assume no further sockets are returned.
break;
}
e->addSocketEvents(_sockets[i], _command, events, _nameResolver);
}
_socketsSize = i;
}
void AsyncNameResolverEntry::removeSocketEvents(DownloadEngine* e)
{
for(size_t i = 0; i < _socketsSize; ++i) {
e->deleteSocketEvents(_sockets[i], _command, 0, _nameResolver);
}
}
#else // !HAVE_EPOLL
int AsyncNameResolverEntry::getFds(fd_set* rfdsPtr, fd_set* wfdsPtr)
{
return _nameResolver->getFds(rfdsPtr, wfdsPtr);
}
void AsyncNameResolverEntry::process(fd_set* rfdsPtr, fd_set* wfdsPtr)
{
_nameResolver->process(rfdsPtr, wfdsPtr);
switch(_nameResolver->getStatus()) {
case AsyncNameResolver::STATUS_SUCCESS:
case AsyncNameResolver::STATUS_ERROR:
_command->setStatusActive();
break;
default:
break;
}
}
#endif // !HAVE_EPOLL
#endif // ENABLE_ASYNC_DNS
DownloadEngine::DownloadEngine():logger(LogFactory::getInstance()),
_haltRequested(false),
_noWait(false)
{
#ifdef HAVE_EPOLL
_epfd = epoll_create(EPOLL_EVENTS_MAX);
#else // !HAVE_EPOLL
updateFdSet();
#endif // !HAVE_EPOLL
}
DownloadEngine::~DownloadEngine() {
cleanQueue();
#ifdef HAVE_EPOLL
if(_epfd != -1) {
int r;
while((r = close(_epfd)) == -1 && errno == EINTR);
if(r == -1) {
logger->error("Error occurred while closing epoll file descriptor %d: %s",
_epfd, strerror(errno));
}
}
#endif // HAVE_EPOLL
}
void DownloadEngine::cleanQueue() {
@ -115,14 +439,27 @@ static void executeCommand(std::deque<Command*>& commands,
com->transitStatus();
if(com->execute()) {
delete com;
com = 0;
}
} else {
commands.push_back(com);
}
if(com) {
com->clearIOEvents();
}
}
}
void DownloadEngine::run() {
#ifdef HAVE_EPOLL
if(_epfd == -1) {
throw DlAbortEx("epoll_init() failed.");
}
#endif // HAVE_EPOLL
Time cp;
cp.setTimeInSec(0);
while(!commands.empty() || !_routineCommands.empty()) {
@ -152,7 +489,33 @@ void DownloadEngine::shortSleep() const {
select(0, &rfds, NULL, NULL, &tv);
}
void DownloadEngine::waitData() {
void DownloadEngine::waitData()
{
#ifdef HAVE_EPOLL
// timeout is millisec
int timeout = _noWait ? 0 : 1000;
// TODO make member variable
const size_t _maxEpEvents = EPOLL_EVENTS_MAX;
struct epoll_event _epEvents[_maxEpEvents];
int res;
while((res = epoll_wait(_epfd, _epEvents, _maxEpEvents, timeout)) == -1 &&
errno == EINTR);
if(res > 0) {
for(int i = 0; i < res; ++i) {
SocketEntry* p = (SocketEntry*)_epEvents[i].data.ptr;
p->processEvents(_epEvents[i].events);
}
}
// TODO timeout of name resolver is determined in Command(AbstractCommand,
// DHTEntryPoint...Command)
#else // !HAVE_EPOLL
fd_set rfds;
fd_set wfds;
struct timeval tv;
@ -161,62 +524,63 @@ void DownloadEngine::waitData() {
memcpy(&wfds, &wfdset, sizeof(fd_set));
#ifdef ENABLE_ASYNC_DNS
for(AsyncNameResolverEntries::iterator itr = nameResolverEntries.begin();
itr != nameResolverEntries.end(); ++itr) {
AsyncNameResolverEntry& entry = *itr;
int fd = entry.nameResolver->getFds(&rfds, &wfds);
for(std::deque<SharedHandle<AsyncNameResolverEntry> >::iterator itr =
nameResolverEntries.begin(); itr != nameResolverEntries.end(); ++itr) {
SharedHandle<AsyncNameResolverEntry>& entry = *itr;
int fd = entry->getFds(&rfds, &wfds);
// TODO force error if fd == 0
if(fdmax < fd) {
fdmax = fd;
}
}
#endif // ENABLE_ASYNC_DNS
tv.tv_sec = _noWait ? 0 : 1;
tv.tv_usec = 0;
int retval = select(fdmax+1, &rfds, &wfds, NULL, &tv);
if(retval > 0) {
for(SocketEntries::iterator itr = socketEntries.begin();
itr != socketEntries.end(); ++itr) {
SocketEntry& entry = *itr;
if(FD_ISSET(entry.socket->getSockfd(), &rfds) ||
FD_ISSET(entry.socket->getSockfd(), &wfds)) {
entry.command->setStatusActive();
for(std::deque<SharedHandle<SocketEntry> >::iterator i =
socketEntries.begin(); i != socketEntries.end(); ++i) {
int events = 0;
if(FD_ISSET((*i)->getSocket(), &rfds)) {
events |= SocketEntry::EVENT_READ;
}
if(FD_ISSET((*i)->getSocket(), &wfds)) {
events |= SocketEntry::EVENT_WRITE;
}
(*i)->processEvents(events);
}
}
#ifdef ENABLE_ASYNC_DNS
for(AsyncNameResolverEntries::iterator itr = nameResolverEntries.begin();
itr != nameResolverEntries.end(); ++itr) {
AsyncNameResolverEntry& entry = *itr;
entry.nameResolver->process(&rfds, &wfds);
switch(entry.nameResolver->getStatus()) {
case AsyncNameResolver::STATUS_SUCCESS:
case AsyncNameResolver::STATUS_ERROR:
entry.command->setStatusActive();
break;
default:
break;
}
for(std::deque<SharedHandle<AsyncNameResolverEntry> >::iterator i =
nameResolverEntries.begin(); i != nameResolverEntries.end(); ++i) {
(*i)->process(&rfds, &wfds);
}
#endif // ENABLE_ASYNC_DNS
#endif // !HAVE_EPOLL
}
#ifndef HAVE_EPOLL
void DownloadEngine::updateFdSet() {
fdmax = 0;
FD_ZERO(&rfdset);
FD_ZERO(&wfdset);
for(SocketEntries::iterator itr = socketEntries.begin();
itr != socketEntries.end(); ++itr) {
SocketEntry& entry = *itr;
int fd = entry.socket->getSockfd();
switch(entry.type) {
case SocketEntry::TYPE_RD:
for(std::deque<SharedHandle<SocketEntry> >::iterator i =
socketEntries.begin(); i != socketEntries.end(); ++i) {
int fd = (*i)->getSocket();
int events = (*i)->getEvents();
if(events&SocketEntry::EVENT_READ) {
FD_SET(fd, &rfdset);
break;
case SocketEntry::TYPE_WR:
}
if(events&SocketEntry::EVENT_WRITE) {
FD_SET(fd, &wfdset);
break;
}
if(fdmax < fd) {
fdmax = fd;
@ -224,52 +588,191 @@ void DownloadEngine::updateFdSet() {
}
}
bool DownloadEngine::addSocket(const SocketEntry& entry) {
SocketEntries::iterator itr =
std::find(socketEntries.begin(), socketEntries.end(), entry);
if(itr == socketEntries.end()) {
socketEntries.push_back(entry);
updateFdSet();
return true;
#endif // !HAVE_EPOLL
bool DownloadEngine::addSocketEvents(int socket, Command* command, int events
#if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS
,const SharedHandle<AsyncNameResolver>& rs
#endif // HAVE_EPOLL && ENABLE_ASYNC_DNS
)
{
SharedHandle<SocketEntry> socketEntry(new SocketEntry(socket));
std::deque<SharedHandle<SocketEntry> >::iterator i =
std::lower_bound(socketEntries.begin(), socketEntries.end(), socketEntry);
int r = 0;
if(i != socketEntries.end() && (*i) == socketEntry) {
#ifdef HAVE_EPOLL
#ifdef ENABLE_ASYNC_DNS
if(rs.isNull()) {
(*i)->addCommandEvent(command, events);
} else {
(*i)->addADNSEvent(rs, command, events);
}
#else // !ENABLE_ASYNC_DNS
(*i)->addCommandEvent(command, events);
#endif // !ENABLE_ASYNC_DNS
r = epoll_ctl(_epfd, EPOLL_CTL_MOD, (*i)->getSocket(), &(*i)->getEpEvent());
if(r == -1) {
// try EPOLL_CTL_ADD: There is a chance that previously socket X is
// added to epoll, but it is closed and is not yet removed from
// SocketEntries. In this case, EPOLL_CTL_MOD is failed with ENOENT.
r = epoll_ctl(_epfd, EPOLL_CTL_ADD, (*i)->getSocket(), &(*i)->getEpEvent());
}
#else // !HAVE_EPOLL
(*i)->addCommandEvent(command, events);
#endif // !HAVE_EPOLL
} else {
socketEntries.insert(i, socketEntry);
#ifdef HAVE_EPOLL
#ifdef ENABLE_ASYNC_DNS
if(rs.isNull()) {
socketEntry->addCommandEvent(command, events);
} else {
socketEntry->addADNSEvent(rs, command, events);
}
#else // !ENABLE_ASYNC_DNS
socketEntry->addCommandEvent(command, events);
#endif // !ENABLE_ASYNC_DNS
r = epoll_ctl(_epfd, EPOLL_CTL_ADD, socketEntry->getSocket(), &socketEntry->getEpEvent());
#else // !HAVE_EPOLL
socketEntry->addCommandEvent(command, events);
#endif // !HAVE_EPOLL
}
#ifndef HAVE_EPOLL
updateFdSet();
#endif // !HAVE_EPOLL
if(r == -1) {
logger->debug("Failed to add socket event %d:%s", socket, strerror(errno));
return false;
} else {
return true;
}
}
bool DownloadEngine::deleteSocket(const SocketEntry& entry) {
SocketEntries::iterator itr =
std::find(socketEntries.begin(), socketEntries.end(), entry);
if(itr == socketEntries.end()) {
return false;
} else {
socketEntries.erase(itr);
bool DownloadEngine::deleteSocketEvents(int socket, Command* command, int events
#if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS
,const SharedHandle<AsyncNameResolver>& rs
#endif // HAVE_EPOLL && ENABLE_ASYNC_DNS
)
{
SharedHandle<SocketEntry> socketEntry(new SocketEntry(socket));
std::deque<SharedHandle<SocketEntry> >::iterator i =
std::lower_bound(socketEntries.begin(), socketEntries.end(), socketEntry);
if(i != socketEntries.end() && (*i) == socketEntry) {
#ifdef HAVE_EPOLL
#ifdef ENABLE_ASYNC_DNS
if(rs.isNull()) {
(*i)->removeCommandEvent(command, events);
} else {
(*i)->removeADNSEvent(rs, command);
}
#else // !ENABLE_ASYNC_DNS
(*i)->removeCommandEvent(command, events);
#endif // !ENABLE_ASYNC_DNS
#else // !HAVE_EPOLL
(*i)->removeCommandEvent(command, events);
#endif // !HAVE_EPOLL
int r = 0;
if((*i)->eventEmpty()) {
#ifdef HAVE_EPOLL
r = epoll_ctl(_epfd, EPOLL_CTL_DEL, (*i)->getSocket(), 0);
#endif // HAVE_EPOLL
socketEntries.erase(i);
} else {
#ifdef HAVE_EPOLL
// If socket is closed, then it seems it is automatically removed from
// epoll, so following EPOLL_CTL_MOD may fail.
r = epoll_ctl(_epfd, EPOLL_CTL_MOD, (*i)->getSocket(), &(*i)->getEpEvent());
if(r == -1) {
logger->debug("Failed to delete socket event, but may be ignored:%s", strerror(errno));
}
#endif // HAVE_EPOLL
}
#ifndef HAVE_EPOLL
updateFdSet();
return true;
#endif // !HAVE_EPOLL
if(r == -1) {
logger->debug("Failed to delete socket event:%s", strerror(errno));
return false;
} else {
return true;
}
} else {
logger->debug("Socket %d is not found in SocketEntries.", socket);
return false;
}
}
bool DownloadEngine::addSocketForReadCheck(const SocketHandle& socket,
Command* command) {
SocketEntry entry(socket, command, SocketEntry::TYPE_RD);
return addSocket(entry);
Command* command)
{
return addSocketEvents(socket->getSockfd(), command, SocketEntry::EVENT_READ);
}
bool DownloadEngine::deleteSocketForReadCheck(const SocketHandle& socket,
Command* command) {
SocketEntry entry(socket, command, SocketEntry::TYPE_RD);
return deleteSocket(entry);
Command* command)
{
return deleteSocketEvents(socket->getSockfd(), command, SocketEntry::EVENT_READ);
}
bool DownloadEngine::addSocketForWriteCheck(const SocketHandle& socket,
Command* command) {
SocketEntry entry(socket, command, SocketEntry::TYPE_WR);
return addSocket(entry);
Command* command)
{
return addSocketEvents(socket->getSockfd(), command, SocketEntry::EVENT_WRITE);
}
bool DownloadEngine::deleteSocketForWriteCheck(const SocketHandle& socket,
Command* command) {
SocketEntry entry(socket, command, SocketEntry::TYPE_WR);
return deleteSocket(entry);
Command* command)
{
return deleteSocketEvents(socket->getSockfd(), command, SocketEntry::EVENT_WRITE);
}
void DownloadEngine::calculateStatistics()
@ -323,14 +826,21 @@ void DownloadEngine::addCommand(const Commands& commands)
#ifdef ENABLE_ASYNC_DNS
bool DownloadEngine::addNameResolverCheck
(const SharedHandle<AsyncNameResolver>& resolver,
Command* command)
(const SharedHandle<AsyncNameResolver>& resolver, Command* command)
{
AsyncNameResolverEntry entry(resolver, command);
AsyncNameResolverEntries::iterator itr =
SharedHandle<AsyncNameResolverEntry> entry
(new AsyncNameResolverEntry(resolver, command));
std::deque<SharedHandle<AsyncNameResolverEntry> >::iterator itr =
std::find(nameResolverEntries.begin(), nameResolverEntries.end(), entry);
if(itr == nameResolverEntries.end()) {
nameResolverEntries.push_back(entry);
#ifdef HAVE_EPOLL
entry->addSocketEvents(this);
#endif // HAVE_EPOLL
return true;
} else {
return false;
@ -338,15 +848,22 @@ bool DownloadEngine::addNameResolverCheck
}
bool DownloadEngine::deleteNameResolverCheck
(const SharedHandle<AsyncNameResolver>& resolver,
Command* command)
(const SharedHandle<AsyncNameResolver>& resolver, Command* command)
{
AsyncNameResolverEntry entry(resolver, command);
AsyncNameResolverEntries::iterator itr =
SharedHandle<AsyncNameResolverEntry> entry
(new AsyncNameResolverEntry(resolver, command));
std::deque<SharedHandle<AsyncNameResolverEntry> >::iterator itr =
std::find(nameResolverEntries.begin(), nameResolverEntries.end(), entry);
if(itr == nameResolverEntries.end()) {
return false;
} else {
#ifdef HAVE_EPOLL
(*itr)->removeSocketEvents(this);
#endif // HAVE_EPOLL
nameResolverEntries.erase(itr);
return true;
}

View File

@ -40,68 +40,214 @@
#include "Command.h"
#include "a2netcompat.h"
#include "TimeA2.h"
#include "a2io.h"
#ifdef ENABLE_ASYNC_DNS
# include "AsyncNameResolver.h"
#endif // ENABLE_ASYNC_DNS
#include <deque>
#include <map>
#ifdef HAVE_EPOLL
# include <sys/epoll.h>
#endif // HAVE_EPOLL
namespace aria2 {
class Logger;
class Option;
#ifdef ENABLE_ASYNC_DNS
class AsyncNameResolver;
#endif // ENABLE_ASYNC_DNS
class RequestGroupMan;
class FileAllocationMan;
class StatCalc;
class CheckIntegrityMan;
class SocketCore;
class SocketEntry {
class CommandEvent
{
private:
Command* _command;
int _events;
public:
enum TYPE {
TYPE_RD,
TYPE_WR,
};
CommandEvent(Command* command, int events);
SharedHandle<SocketCore> socket;
Command* command;
TYPE type;
public:
SocketEntry(const SharedHandle<SocketCore>& socket,
Command* command,
TYPE type);
Command* getCommand() const;
bool operator==(const SocketEntry& entry);
int getEvents() const;
void addEvents(int events);
void removeEvents(int events);
bool eventsEmpty() const;
void processEvents(int events);
bool operator==(const CommandEvent& event) const;
};
typedef std::deque<SocketEntry> SocketEntries;
#if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS
class ADNSEvent {
private:
SharedHandle<AsyncNameResolver> _resolver;
Command* _command;
int _socket;
int _events;
public:
ADNSEvent(const SharedHandle<AsyncNameResolver>& resolver, Command* command,
int socket, int events);
void processEvents(int events);
bool operator==(const ADNSEvent& event) const;
int getEvents() const;
};
#endif // HAVE_EPOLL && ENABLE_ASYNC_DNS
class SocketEntry {
private:
int _socket;
std::deque<CommandEvent> _commandEvents;
#if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS
std::deque<ADNSEvent> _adnsEvents;
#endif // HAVE_EPOLL && ENABLE_ASYNC_DNS
#ifdef HAVE_EPOLL
struct epoll_event _epEvent;
#endif // HAVE_EPOLL
public:
#ifdef HAVE_EPOLL
enum EventType {
EVENT_READ = EPOLLIN,
EVENT_WRITE = EPOLLOUT,
EVENT_ERROR = EPOLLERR,
EVENT_HUP = EPOLLHUP,
};
#else // !HAVE_EPOLL
enum EventType {
EVENT_READ = 1,
EVENT_WRITE = 1 << 1,
EVENT_ERROR = 1 << 2,
EVENT_HUP = 1 << 3,
};
#endif // !HAVE_EPOLL
SocketEntry(int socket);
bool operator==(const SocketEntry& entry) const;
bool operator<(const SocketEntry& entry) const;
void addCommandEvent(Command* command, int events);
void removeCommandEvent(Command* command, int events);
#if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS
void addADNSEvent(const SharedHandle<AsyncNameResolver>& resolver,
Command* command, int events);
void removeADNSEvent(const SharedHandle<AsyncNameResolver>& resolver,
Command* command);
#endif // HAVE_EPOLL && ENABLE_ASYNC_DNS
#ifdef HAVE_EPOLL
struct epoll_event& getEpEvent();
#else // !HAVE_EPOLL
int getEvents();
#endif // !HAVE_EPOLL
int getSocket() const;
bool eventEmpty() const;
void processEvents(int events);
};
#ifdef ENABLE_ASYNC_DNS
class DownloadEngine;
class AsyncNameResolverEntry {
public:
SharedHandle<AsyncNameResolver> nameResolver;
Command* command;
private:
SharedHandle<AsyncNameResolver> _nameResolver;
Command* _command;
#ifdef HAVE_EPOLL
// HAVE_EPOLL assumes c-ares
size_t _socketsSize;
int _sockets[ARES_GETSOCK_MAXNUM];
#endif // HAVE_EPOLL
public:
AsyncNameResolverEntry(const SharedHandle<AsyncNameResolver>& nameResolver,
Command* command);
bool operator==(const AsyncNameResolverEntry& entry);
#ifdef HAVE_EPOLL
void addSocketEvents(DownloadEngine* e);
void removeSocketEvents(DownloadEngine* e);
#else // !HAVE_EPOLL
int getFds(fd_set* rfdsPtr, fd_set* wfdsPtr);
void process(fd_set* rfdsPtr, fd_set* wfdsPtr);
#endif // !HAVE_EPOLL
};
typedef std::deque<AsyncNameResolverEntry> AsyncNameResolverEntries;
#endif // ENABLE_ASYNC_DNS
class DownloadEngine {
private:
void waitData();
SocketEntries socketEntries;
std::deque<SharedHandle<SocketEntry> > socketEntries;
#ifdef ENABLE_ASYNC_DNS
AsyncNameResolverEntries nameResolverEntries;
std::deque<SharedHandle<AsyncNameResolverEntry> > nameResolverEntries;
#endif // ENABLE_ASYNC_DNS
#ifdef HAVE_EPOLL
int _epfd;
static const size_t EPOLL_EVENTS_MAX = 1024;
#else // !HAVE_EPOLL
// If epoll is not available, then use select system call.
fd_set rfdset;
fd_set wfdset;
int fdmax;
#endif // !HAVE_EPOLL
Logger* logger;
SharedHandle<StatCalc> _statCalc;
@ -130,8 +276,6 @@ private:
std::multimap<std::string, SocketPoolEntry> _socketPool;
void shortSleep() const;
bool addSocket(const SocketEntry& socketEntry);
bool deleteSocket(const SocketEntry& socketEntry);
/**
* Delegates to StatCalc
@ -162,8 +306,12 @@ public:
void cleanQueue();
#ifndef HAVE_EPOLL
void updateFdSet();
#endif // !HAVE_EPOLL
bool addSocketForReadCheck(const SharedHandle<SocketCore>& socket,
Command* command);
bool deleteSocketForReadCheck(const SharedHandle<SocketCore>& socket,
@ -172,7 +320,23 @@ public:
Command* command);
bool deleteSocketForWriteCheck(const SharedHandle<SocketCore>& socket,
Command* command);
bool addSocketEvents(int socket, Command* command, int events
#if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS
,const SharedHandle<AsyncNameResolver>& rs =
SharedHandle<AsyncNameResolver>()
#endif // HAVE_EPOLL && ENABLE_ASYNC_DNS
);
bool deleteSocketEvents(int socket, Command* command, int events
#if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS
,const SharedHandle<AsyncNameResolver>& rs =
SharedHandle<AsyncNameResolver>()
#endif // HAVE_EPOLL && ENABLE_ASYNC_DNS
);
#ifdef ENABLE_ASYNC_DNS
bool addNameResolverCheck(const SharedHandle<AsyncNameResolver>& resolver,
Command* command);
bool deleteNameResolverCheck(const SharedHandle<AsyncNameResolver>& resolver,

View File

@ -74,8 +74,8 @@ bool PeerAbstractCommand::execute() {
uploadLimitCheck && (uploadLimit == 0 ||
e->getUploadSpeed() <= uploadLimit*1024) ||
*/
(checkSocketIsReadable && readCheckTarget->isReadable(0)) ||
(checkSocketIsWritable && writeCheckTarget->isWritable(0))) {
(checkSocketIsReadable && _readEvent) ||
(checkSocketIsWritable && _writeEvent)) {
checkPoint.reset();
}
if(checkPoint.elapsed(timeout)) {

View File

@ -62,7 +62,7 @@
namespace aria2 {
SocketCore::SocketCore(int sockType):_sockType(sockType), sockfd(-1) {
SocketCore::SocketCore(int sockType):_sockType(sockType), sockfd(-1) {
init();
}
@ -72,6 +72,13 @@ SocketCore::SocketCore(int sockfd, int sockType):_sockType(sockType), sockfd(soc
void SocketCore::init()
{
#ifdef HAVE_EPOLL
_epfd = -1;
#endif // HAVE_EPOLL
blocking = true;
secure = false;
#ifdef HAVE_LIBSSL
@ -90,6 +97,15 @@ void SocketCore::init()
SocketCore::~SocketCore() {
closeConnection();
#ifdef HAVE_EPOLL
if(_epfd != -1) {
CLOSE(_epfd);
}
#endif // HAVE_EPOLL
#ifdef HAVE_LIBGNUTLS
delete [] peekBuf;
#endif // HAVE_LIBGNUTLS
@ -304,8 +320,47 @@ void SocketCore::closeConnection()
#endif // HAVE_LIBGNUTLS
}
bool SocketCore::isWritable(time_t timeout) const
#ifdef HAVE_EPOLL
void SocketCore::initEPOLL()
{
if((_epfd = epoll_create(1)) == -1) {
throw new DlRetryEx(StringFormat("epoll_create failed:%s", errorMsg()).str());
}
memset(&_epEvent, 0, sizeof(struct epoll_event));
_epEvent.events = EPOLLIN|EPOLLOUT;
_epEvent.data.fd = sockfd;
if(epoll_ctl(_epfd, EPOLL_CTL_ADD, sockfd, &_epEvent) == -1) {
throw DlRetryEx(StringFormat("epoll_ctl failed:%s", errorMsg()).str());
}
}
#endif // HAVE_EPOLL
bool SocketCore::isWritable(time_t timeout)
{
#ifdef HAVE_EPOLL
if(_epfd == -1) {
initEPOLL();
}
struct epoll_event epEvents[1];
int r;
while((r = epoll_wait(_epfd, epEvents, 1, 0)) == -1 && errno == EINTR);
if(r > 0) {
return epEvents[0].events&(EPOLLOUT|EPOLLHUP|EPOLLERR);
} else if(r == 0) {
return false;
} else {
throw DlRetryEx(StringFormat(EX_SOCKET_CHECK_WRITABLE, errorMsg()).str());
}
#else // !HAVE_EPOLL
fd_set fds;
FD_ZERO(&fds);
FD_SET(sockfd, &fds);
@ -327,15 +382,38 @@ bool SocketCore::isWritable(time_t timeout) const
throw DlRetryEx(StringFormat(EX_SOCKET_CHECK_WRITABLE, errorMsg()).str());
}
}
#endif // !HAVE_EPOLL
}
bool SocketCore::isReadable(time_t timeout) const
bool SocketCore::isReadable(time_t timeout)
{
#ifdef HAVE_LIBGNUTLS
if(secure && peekBufLength > 0) {
return true;
}
#endif // HAVE_LIBGNUTLS
#ifdef HAVE_EPOLL
if(_epfd == -1) {
initEPOLL();
}
struct epoll_event epEvents[1];
int r;
while((r = epoll_wait(_epfd, epEvents, 1, 0)) == -1 && errno == EINTR);
if(r > 0) {
return epEvents[0].events&(EPOLLIN|EPOLLHUP|EPOLLERR);
} else if(r == 0) {
return false;
} else {
throw DlRetryEx(StringFormat(EX_SOCKET_CHECK_READABLE, errorMsg()).str());
}
#else // !HAVE_EPOLL
fd_set fds;
FD_ZERO(&fds);
FD_SET(sockfd, &fds);
@ -357,6 +435,9 @@ bool SocketCore::isReadable(time_t timeout) const
throw DlRetryEx(StringFormat(EX_SOCKET_CHECK_READABLE, errorMsg()).str());
}
}
#endif // !HAVE_EPOLL
}
void SocketCore::writeData(const char* data, size_t len)

View File

@ -36,6 +36,7 @@
#define _D_SOCKET_CORE_H_
#include "common.h"
#include "a2io.h"
#include "a2netcompat.h"
#include "a2time.h"
#include <cstdlib>
@ -50,6 +51,9 @@
#ifdef HAVE_LIBGNUTLS
# include <gnutls/gnutls.h>
#endif // HAVE_LIBGNUTLS
#ifdef HAVE_EPOLL
# include <sys/epoll.h>
#endif // HAVE_EPOLL
namespace aria2 {
@ -62,6 +66,16 @@ private:
int _sockType;
// socket endpoint descriptor
int sockfd;
#ifdef HAVE_EPOLL
// file descriptor used for epoll
int _epfd;
struct epoll_event _epEvent;
#endif // HAVE_EPOLL
bool blocking;
bool secure;
#ifdef HAVE_LIBSSL
@ -83,6 +97,13 @@ private:
#endif // HAVE_LIBGNUTLS
void init();
#ifdef HAVE_EPOLL
void initEPOLL();
#endif // HAVE_EPOLL
SocketCore(int sockfd, int sockType);
static int error();
static const char *errorMsg();
@ -156,7 +177,7 @@ public:
* @return true if the socket is available for writing,
* otherwise returns false.
*/
bool isWritable(time_t timeout) const;
bool isWritable(time_t timeout);
/**
* Checks whether this socket is available for reading.
@ -165,7 +186,7 @@ public:
* @return true if the socket is available for reading,
* otherwise returns false.
*/
bool isReadable(time_t timeout) const;
bool isReadable(time_t timeout);
/**
* Writes characters into this socket. data is a pointer pointing the first

View File

@ -116,11 +116,16 @@
# define a2mkdir(path, openMode) mkdir(path, openMode)
#endif // __MINGW32__
#if defined HAVE_POSIX_MEMALIGN && O_DIRECT
#if defined HAVE_POSIX_MEMALIGN && defined O_DIRECT
# define ENABLE_DIRECT_IO 1
#endif // HAVE_POSIX_MEMALIGN && O_DIRECT
#define OPEN_MODE S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH
#define DIR_OPEN_MODE S_IRWXU|S_IRWXG|S_IRWXO
#if defined HAVE_EPOLL_CREATE && \
(defined HAVE_LIBCARES || !defined ENABLE_ASYNC_DNS)
# define HAVE_EPOLL 1
#endif // HAVE_EPOLL_CREATE || (HAVE_LIBCARES || !ENABLE_ASYNC_DNS)
#endif // _D_A2IO_H_