diff --git a/ChangeLog b/ChangeLog index 0b697a29..2fc8f071 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,32 @@ +2009-01-16 Tatsuhiro Tsujikawa + + Added --event-poll option to select the method for polling events. + The available methods are "select" and "epoll". "epoll" is + available on Linux only. The earlier release has the compile + option to choose from these methods. + * configure.ac + * src/RequestGroupMan.cc + * src/OptionHandlerFactory.cc + * src/EpollEventPoll.h + * src/StreamFileAllocationEntry.cc + * src/a2io.h + * src/SelectEventPoll.h + * src/option_processing.cc + * src/prefs.h + * src/EpollEventPoll.cc + * src/EventPoll.h + * src/SocketCore.h + * src/Makefile.am + * src/main.cc + * src/DownloadEngine.h + * src/SelectEventPoll.cc + * src/DownloadEngine.cc + * src/SocketCore.cc + * src/DownloadEngineFactory.cc + * src/Makefile.in + * src/prefs.cc + * src/usage_text.h + 2009-01-12 Tatsuhiro Tsujikawa Included RecoverableException.h from main.cc diff --git a/config.h.in b/config.h.in index 305a80e1..31a44953 100644 --- a/config.h.in +++ b/config.h.in @@ -105,6 +105,9 @@ /* Define to 1 if you don't have `vprintf' but do have `_doprnt.' */ #undef HAVE_DOPRNT +/* Define to 1 if epoll is available. */ +#undef HAVE_EPOLL + /* Define to 1 if you have the `epoll_create' function. */ #undef HAVE_EPOLL_CREATE diff --git a/configure b/configure index 7399cf6c..bdaa1791 100755 --- a/configure +++ b/configure @@ -811,6 +811,8 @@ LIBINTL LTLIBINTL POSUB LIBOBJS +HAVE_EPOLL_TRUE +HAVE_EPOLL_FALSE HAVE_ASCTIME_R_TRUE HAVE_ASCTIME_R_FALSE HAVE_BASENAME_TRUE @@ -20968,10 +20970,25 @@ if test `eval echo '${'$as_ac_var'}'` = yes; then cat >>confdefs.h <<_ACEOF #define `echo "HAVE_$ac_func" | $as_tr_cpp` 1 _ACEOF - + have_epoll=yes fi done + if test "x$have_epoll" = "xyes"; then + +cat >>confdefs.h <<\_ACEOF +#define HAVE_EPOLL 1 +_ACEOF + + fi + if test "x$have_epoll" = "xyes"; then + HAVE_EPOLL_TRUE= + HAVE_EPOLL_FALSE='#' +else + HAVE_EPOLL_TRUE='#' + HAVE_EPOLL_FALSE= +fi + fi @@ -22217,6 +22234,13 @@ echo "$as_me: error: conditional \"HAVE_SQLITE3\" was never defined. Usually this means the macro was only invoked conditionally." >&2;} { (exit 1); exit 1; }; } fi +if test -z "${HAVE_EPOLL_TRUE}" && test -z "${HAVE_EPOLL_FALSE}"; then + { { echo "$as_me:$LINENO: error: conditional \"HAVE_EPOLL\" was never defined. +Usually this means the macro was only invoked conditionally." >&5 +echo "$as_me: error: conditional \"HAVE_EPOLL\" was never defined. +Usually this means the macro was only invoked conditionally." >&2;} + { (exit 1); exit 1; }; } +fi if test -z "${HAVE_ASCTIME_R_TRUE}" && test -z "${HAVE_ASCTIME_R_FALSE}"; then { { echo "$as_me:$LINENO: error: conditional \"HAVE_ASCTIME_R\" was never defined. Usually this means the macro was only invoked conditionally." >&5 @@ -23138,7 +23162,7 @@ LIBINTL!$LIBINTL$ac_delim LTLIBINTL!$LTLIBINTL$ac_delim POSUB!$POSUB$ac_delim LIBOBJS!$LIBOBJS$ac_delim -HAVE_ASCTIME_R_TRUE!$HAVE_ASCTIME_R_TRUE$ac_delim +HAVE_EPOLL_TRUE!$HAVE_EPOLL_TRUE$ac_delim _ACEOF if test `sed -n "s/.*$ac_delim\$/X/p" conf$$subs.sed | grep -c X` = 97; then @@ -23180,6 +23204,8 @@ _ACEOF ac_delim='%!_!# ' for ac_last_try in false false false false false :; do cat >conf$$subs.sed <<_ACEOF +HAVE_EPOLL_FALSE!$HAVE_EPOLL_FALSE$ac_delim +HAVE_ASCTIME_R_TRUE!$HAVE_ASCTIME_R_TRUE$ac_delim HAVE_ASCTIME_R_FALSE!$HAVE_ASCTIME_R_FALSE$ac_delim HAVE_BASENAME_TRUE!$HAVE_BASENAME_TRUE$ac_delim HAVE_BASENAME_FALSE!$HAVE_BASENAME_FALSE$ac_delim @@ -23200,7 +23226,7 @@ HAVE_TIMEGM_FALSE!$HAVE_TIMEGM_FALSE$ac_delim LTLIBOBJS!$LTLIBOBJS$ac_delim _ACEOF - if test `sed -n "s/.*$ac_delim\$/X/p" conf$$subs.sed | grep -c X` = 18; then + if test `sed -n "s/.*$ac_delim\$/X/p" conf$$subs.sed | grep -c X` = 20; then break elif $ac_last_try; then { { echo "$as_me:$LINENO: error: could not make $CONFIG_STATUS" >&5 @@ -23929,5 +23955,6 @@ echo "LibXML2: $have_libxml2" echo "LibExpat: $have_libexpat" echo "LibCares: $have_libcares" echo "Libz: $have_libz" +echo "Epoll: $have_epoll" echo "Bittorrent: $enable_bittorrent" echo "Metalink: $enable_metalink" diff --git a/configure.ac b/configure.ac index e5088b9e..a4598b31 100644 --- a/configure.ac +++ b/configure.ac @@ -285,7 +285,11 @@ AC_CHECK_FUNCS([__argz_count \ usleep]) if test "x$enable_epoll" = "xyes"; then - AC_CHECK_FUNCS([epoll_create]) + AC_CHECK_FUNCS([epoll_create], [have_epoll=yes]) + if test "x$have_epoll" = "xyes"; then + AC_DEFINE([HAVE_EPOLL], [1], [Define to 1 if epoll is available.]) + fi + AM_CONDITIONAL([HAVE_EPOLL], [test "x$have_epoll" = "xyes"]) fi AC_CHECK_FUNCS([asctime_r], @@ -351,5 +355,6 @@ echo "LibXML2: $have_libxml2" echo "LibExpat: $have_libexpat" echo "LibCares: $have_libcares" echo "Libz: $have_libz" +echo "Epoll: $have_epoll" echo "Bittorrent: $enable_bittorrent" echo "Metalink: $enable_metalink" diff --git a/src/DownloadEngine.cc b/src/DownloadEngine.cc index 0dfc928f..1cb4c0f5 100644 --- a/src/DownloadEngine.cc +++ b/src/DownloadEngine.cc @@ -41,9 +41,6 @@ #include #include -#ifdef ENABLE_ASYNC_DNS -#include "AsyncNameResolver.h" -#endif // ENABLE_ASYNC_DNS #include "StatCalc.h" #include "RequestGroup.h" #include "RequestGroupMan.h" @@ -66,6 +63,8 @@ #include "AuthConfigFactory.h" #include "AuthConfig.h" #include "Request.h" +#include "EventPoll.h" +#include "Command.h" #include "BtRegistry.h" #include "BtContext.h" @@ -84,371 +83,19 @@ namespace aria2 { // 4 ... 2nd stop signal processed by DownloadEngine volatile sig_atomic_t globalHaltRequested = 0; - -CommandEvent::CommandEvent(Command* command, int events): - _command(command), _events(events) {} - -bool CommandEvent::operator==(const CommandEvent& commandEvent) const -{ - return _command == commandEvent._command; -} - -int CommandEvent::getEvents() const -{ - return _events; -} - -void CommandEvent::addEvents(int events) -{ - _events |= events; -} - -void CommandEvent::removeEvents(int events) -{ - _events &= (~events); -} - -bool CommandEvent::eventsEmpty() const -{ - return _events == 0; -} - -void CommandEvent::processEvents(int events) -{ - if((_events&events) || - ((SocketEntry::EVENT_ERROR|SocketEntry::EVENT_HUP)&events)) { - _command->setStatusActive(); - } - if(SocketEntry::EVENT_READ&events) { - _command->readEventReceived(); - } - if(SocketEntry::EVENT_WRITE&events) { - _command->writeEventReceived(); - } - if(SocketEntry::EVENT_ERROR&events) { - _command->errorEventReceived(); - } - if(SocketEntry::EVENT_HUP&events) { - _command->hupEventReceived(); - } -} - -#if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS - -ADNSEvent::ADNSEvent(const SharedHandle& resolver, - Command* command, - sock_t socket, int events): - _resolver(resolver), _command(command), _socket(socket), _events(events) {} - -bool ADNSEvent::operator==(const ADNSEvent& event) const -{ - return _resolver == event._resolver; -} - -int ADNSEvent::getEvents() const -{ - return _events; -} - -void ADNSEvent::processEvents(int events) -{ - ares_socket_t readfd; - ares_socket_t writefd; - if(events&(SocketEntry::EVENT_READ|SocketEntry::EVENT_ERROR|SocketEntry::EVENT_HUP)) { - readfd = _socket; - } else { - readfd = ARES_SOCKET_BAD; - } - if(events&(SocketEntry::EVENT_WRITE|SocketEntry::EVENT_ERROR|SocketEntry::EVENT_HUP)) { - writefd = _socket; - } else { - writefd = ARES_SOCKET_BAD; - } - _resolver->process(readfd, writefd); - _command->setStatusActive(); -} - -#endif // HAVE_EPOLL && ENABLE_ASYNC_DNS - -SocketEntry::SocketEntry(sock_t socket):_socket(socket) -{ -#ifdef HAVE_EPOLL - memset(&_epEvent, 0, sizeof(struct epoll_event)); -#endif // HAVE_EPOLL -} - -bool SocketEntry::operator==(const SocketEntry& entry) const -{ - return _socket == entry._socket; -} - -bool SocketEntry::operator<(const SocketEntry& entry) const -{ - return _socket < entry._socket; -} - -void SocketEntry::addCommandEvent(Command* command, int events) -{ - CommandEvent cev(command, events); - std::deque::iterator i = std::find(_commandEvents.begin(), - _commandEvents.end(), - cev); - if(i == _commandEvents.end()) { - _commandEvents.push_back(cev); - } else { - (*i).addEvents(events); - } -} - -void SocketEntry::removeCommandEvent(Command* command, int events) -{ - CommandEvent cev(command, events); - std::deque::iterator i = std::find(_commandEvents.begin(), - _commandEvents.end(), - cev); - if(i == _commandEvents.end()) { - // not found - } else { - (*i).removeEvents(events); - if((*i).eventsEmpty()) { - _commandEvents.erase(i); - } - } -} - -#if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS - -void SocketEntry::addADNSEvent(const SharedHandle& resolver, - Command* command, int events) -{ - ADNSEvent aev(resolver, command, _socket, events); - std::deque::iterator i = std::find(_adnsEvents.begin(), - _adnsEvents.end(), - aev); - if(i == _adnsEvents.end()) { - _adnsEvents.push_back(aev); - } -} - -void SocketEntry::removeADNSEvent(const SharedHandle& resolver, - Command* command) -{ - ADNSEvent aev(resolver, command, _socket, 0); - std::deque::iterator i = std::find(_adnsEvents.begin(), - _adnsEvents.end(), - aev); - if(i == _adnsEvents.end()) { - // not found - } else { - _adnsEvents.erase(i); - } -} - -#endif // HAVE_EPOLL && ENABLE_ASYNC_DNS - -void SocketEntry::processEvents(int events) -{ - std::for_each(_commandEvents.begin(), _commandEvents.end(), - std::bind2nd(std::mem_fun_ref(&CommandEvent::processEvents), - events)); - -#if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS - - std::for_each(_adnsEvents.begin(), _adnsEvents.end(), - std::bind2nd(std::mem_fun_ref(&ADNSEvent::processEvents), - events)); - -#endif // HAVE_EPOLL && ENABLE_ASYNC_DNS - -} - -sock_t SocketEntry::getSocket() const -{ - return _socket; -} - -bool SocketEntry::eventEmpty() const -{ - -#if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS - - return _commandEvents.empty() && _adnsEvents.empty(); - -#else // !(HAVE_EPOLL && ENABLE_ASYNC_DNS) - - return _commandEvents.empty(); - -#endif // !(HAVE_EPOLL && ENABLE_ASYNC_DNS) - -} - -class AccEvent { -public: - int operator()(int events, const CommandEvent& commandEvent) const - { - return events|commandEvent.getEvents(); - } - -#if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS - - int operator()(int events, const ADNSEvent& adnsEvent) const - { - return events|adnsEvent.getEvents(); - } - -#endif // HAVE_EPOLL && ENABLE_ASYNC_DNS - -}; - -#ifdef HAVE_EPOLL - -struct epoll_event& SocketEntry::getEpEvent() -{ - _epEvent.data.ptr = this; - -#ifdef ENABLE_ASYNC_DNS - - _epEvent.events = - std::accumulate(_adnsEvents.begin(), - _adnsEvents.end(), - std::accumulate(_commandEvents.begin(), - _commandEvents.end(), 0, AccEvent()), - AccEvent()); - -#else // !ENABLE_ASYNC_DNS - - _epEvent.events = - std::accumulate(_commandEvents.begin(), _commandEvents.end(), 0, AccEvent()); - -#endif // !ENABLE_ASYNC_DNS - - return _epEvent; -} - -#else // !HAVE_EPOLL - -int SocketEntry::getEvents() -{ - return - std::accumulate(_commandEvents.begin(), _commandEvents.end(), 0, AccEvent()); -} - -#endif // !HAVE_EPOLL - - -#ifdef ENABLE_ASYNC_DNS - -AsyncNameResolverEntry::AsyncNameResolverEntry -(const SharedHandle& nameResolver, Command* command): - _nameResolver(nameResolver), _command(command) -#ifdef HAVE_EPOLL - , _socketsSize(0) -#endif // HAVE_EPOLL +DownloadEngine::DownloadEngine(const SharedHandle& eventPoll): + _eventPoll(eventPoll), + logger(LogFactory::getInstance()), + _haltRequested(false), + _noWait(false), + _refreshInterval(DEFAULT_REFRESH_INTERVAL), + _cookieStorage(new CookieStorage()), + _btRegistry(new BtRegistry()), + _dnsCache(new SimpleDNSCache()) {} -bool AsyncNameResolverEntry::operator==(const AsyncNameResolverEntry& entry) -{ - return _nameResolver == entry._nameResolver && - _command == entry._command; -} - -#ifdef HAVE_EPOLL - -void AsyncNameResolverEntry::addSocketEvents(DownloadEngine* e) -{ - _socketsSize = 0; - int mask = _nameResolver->getsock(_sockets); - if(mask == 0) { - return; - } - size_t i; - for(i = 0; i < ARES_GETSOCK_MAXNUM; ++i) { - //epoll_event_t* epEventPtr = &_epEvents[_socketsSize]; - int events = 0; - if(ARES_GETSOCK_READABLE(mask, i)) { - events |= EPOLLIN; - } - if(ARES_GETSOCK_WRITABLE(mask, i)) { - events |= EPOLLOUT; - } - if(events == 0) { - // assume no further sockets are returned. - break; - } - e->addSocketEvents(_sockets[i], _command, events, _nameResolver); - } - _socketsSize = i; -} - -void AsyncNameResolverEntry::removeSocketEvents(DownloadEngine* e) -{ - for(size_t i = 0; i < _socketsSize; ++i) { - e->deleteSocketEvents(_sockets[i], _command, 0, _nameResolver); - } -} - -#else // !HAVE_EPOLL - -int AsyncNameResolverEntry::getFds(fd_set* rfdsPtr, fd_set* wfdsPtr) -{ - return _nameResolver->getFds(rfdsPtr, wfdsPtr); -} - -void AsyncNameResolverEntry::process(fd_set* rfdsPtr, fd_set* wfdsPtr) -{ - _nameResolver->process(rfdsPtr, wfdsPtr); - switch(_nameResolver->getStatus()) { - case AsyncNameResolver::STATUS_SUCCESS: - case AsyncNameResolver::STATUS_ERROR: - _command->setStatusActive(); - break; - default: - break; - } -} - -#endif // !HAVE_EPOLL - -#endif // ENABLE_ASYNC_DNS - -DownloadEngine::DownloadEngine():logger(LogFactory::getInstance()), - _haltRequested(false), - _noWait(false), - _refreshInterval(DEFAULT_REFRESH_INTERVAL), - _cookieStorage(new CookieStorage()), - _btRegistry(new BtRegistry()), - _dnsCache(new SimpleDNSCache()) -{ -#ifdef HAVE_EPOLL - - _epfd = epoll_create(EPOLL_EVENTS_MAX); - - _epEvents = new struct epoll_event[EPOLL_EVENTS_MAX]; - -#else // !HAVE_EPOLL - - updateFdSet(); - -#endif // !HAVE_EPOLL -} - DownloadEngine::~DownloadEngine() { cleanQueue(); - -#ifdef HAVE_EPOLL - - if(_epfd != -1) { - int r; - while((r = close(_epfd)) == -1 && errno == EINTR); - if(r == -1) { - logger->error("Error occurred while closing epoll file descriptor %d: %s", - _epfd, strerror(errno)); - } - } - - delete [] _epEvents; - -#endif // HAVE_EPOLL } void DownloadEngine::cleanQueue() { @@ -478,16 +125,8 @@ static void executeCommand(std::deque& commands, } } -void DownloadEngine::run() { - -#ifdef HAVE_EPOLL - - if(_epfd == -1) { - throw DlAbortEx("epoll_init() failed."); - } - -#endif // HAVE_EPOLL - +void DownloadEngine::run() +{ Time cp; cp.setTimeInSec(0); while(!commands.empty() || !_routineCommands.empty()) { @@ -509,295 +148,44 @@ void DownloadEngine::run() { onEndOfRun(); } -void DownloadEngine::shortSleep() const { - struct timeval tv; - tv.tv_sec = 0; - tv.tv_usec = 1000; - fd_set rfds; - FD_ZERO(&rfds); - select(0, &rfds, NULL, NULL, &tv); -} - void DownloadEngine::waitData() { -#ifdef HAVE_EPOLL - - // timeout is millisec - int timeout = _noWait ? 0 : 1000; - - int res; - while((res = epoll_wait(_epfd, _epEvents, EPOLL_EVENTS_MAX, timeout)) == -1 && - errno == EINTR); - - if(res > 0) { - for(int i = 0; i < res; ++i) { - SocketEntry* p = (SocketEntry*)_epEvents[i].data.ptr; - p->processEvents(_epEvents[i].events); - } - } - - // TODO timeout of name resolver is determined in Command(AbstractCommand, - // DHTEntryPoint...Command) - -#else // !HAVE_EPOLL - - fd_set rfds; - fd_set wfds; struct timeval tv; - - memcpy(&rfds, &rfdset, sizeof(fd_set)); - memcpy(&wfds, &wfdset, sizeof(fd_set)); - -#ifdef ENABLE_ASYNC_DNS - - for(std::deque >::iterator itr = - nameResolverEntries.begin(); itr != nameResolverEntries.end(); ++itr) { - SharedHandle& entry = *itr; - int fd = entry->getFds(&rfds, &wfds); - // TODO force error if fd == 0 - if(fdmax < fd) { - fdmax = fd; - } - } - -#endif // ENABLE_ASYNC_DNS - - tv.tv_sec = _noWait ? 0 : 1; - tv.tv_usec = 0; - int retval = select(fdmax+1, &rfds, &wfds, NULL, &tv); - if(retval > 0) { - for(std::deque >::iterator i = - socketEntries.begin(); i != socketEntries.end(); ++i) { - int events = 0; - if(FD_ISSET((*i)->getSocket(), &rfds)) { - events |= SocketEntry::EVENT_READ; - } - if(FD_ISSET((*i)->getSocket(), &wfds)) { - events |= SocketEntry::EVENT_WRITE; - } - (*i)->processEvents(events); - } - } - -#ifdef ENABLE_ASYNC_DNS - - for(std::deque >::iterator i = - nameResolverEntries.begin(); i != nameResolverEntries.end(); ++i) { - (*i)->process(&rfds, &wfds); - } - -#endif // ENABLE_ASYNC_DNS - -#endif // !HAVE_EPOLL -} - -#ifndef HAVE_EPOLL - -void DownloadEngine::updateFdSet() { - fdmax = 0; - FD_ZERO(&rfdset); - FD_ZERO(&wfdset); - for(std::deque >::iterator i = - socketEntries.begin(); i != socketEntries.end(); ++i) { - sock_t fd = (*i)->getSocket(); - int events = (*i)->getEvents(); - if(events&SocketEntry::EVENT_READ) { - FD_SET(fd, &rfdset); - } - if(events&SocketEntry::EVENT_WRITE) { - FD_SET(fd, &wfdset); - } - if(fdmax < fd) { - fdmax = fd; - } - } -} - -#endif // !HAVE_EPOLL - -bool DownloadEngine::addSocketEvents(sock_t socket, Command* command, int events -#if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS - ,const SharedHandle& rs -#endif // HAVE_EPOLL && ENABLE_ASYNC_DNS - ) -{ - SharedHandle socketEntry(new SocketEntry(socket)); - std::deque >::iterator i = - std::lower_bound(socketEntries.begin(), socketEntries.end(), socketEntry); - int r = 0; - if(i != socketEntries.end() && (*i) == socketEntry) { - -#ifdef HAVE_EPOLL - -#ifdef ENABLE_ASYNC_DNS - - if(rs.isNull()) { - (*i)->addCommandEvent(command, events); - } else { - (*i)->addADNSEvent(rs, command, events); - } - -#else // !ENABLE_ASYNC_DNS - - (*i)->addCommandEvent(command, events); - -#endif // !ENABLE_ASYNC_DNS - - r = epoll_ctl(_epfd, EPOLL_CTL_MOD, (*i)->getSocket(), &(*i)->getEpEvent()); - if(r == -1) { - // try EPOLL_CTL_ADD: There is a chance that previously socket X is - // added to epoll, but it is closed and is not yet removed from - // SocketEntries. In this case, EPOLL_CTL_MOD is failed with ENOENT. - - r = epoll_ctl(_epfd, EPOLL_CTL_ADD, (*i)->getSocket(), &(*i)->getEpEvent()); - } - -#else // !HAVE_EPOLL - - (*i)->addCommandEvent(command, events); - -#endif // !HAVE_EPOLL - + if(_noWait) { + tv.tv_sec = tv.tv_usec = 0; } else { - socketEntries.insert(i, socketEntry); - -#ifdef HAVE_EPOLL - -#ifdef ENABLE_ASYNC_DNS - - if(rs.isNull()) { - socketEntry->addCommandEvent(command, events); - } else { - socketEntry->addADNSEvent(rs, command, events); - } - -#else // !ENABLE_ASYNC_DNS - - socketEntry->addCommandEvent(command, events); - -#endif // !ENABLE_ASYNC_DNS - - r = epoll_ctl(_epfd, EPOLL_CTL_ADD, socketEntry->getSocket(), &socketEntry->getEpEvent()); - -#else // !HAVE_EPOLL - - socketEntry->addCommandEvent(command, events); - -#endif // !HAVE_EPOLL - - } - -#ifndef HAVE_EPOLL - - updateFdSet(); - -#endif // !HAVE_EPOLL - - if(r == -1) { - logger->debug("Failed to add socket event %d:%s", socket, strerror(errno)); - return false; - } else { - return true; - } -} - -bool DownloadEngine::deleteSocketEvents(sock_t socket, Command* command, int events -#if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS - ,const SharedHandle& rs -#endif // HAVE_EPOLL && ENABLE_ASYNC_DNS - ) -{ - SharedHandle socketEntry(new SocketEntry(socket)); - std::deque >::iterator i = - std::lower_bound(socketEntries.begin(), socketEntries.end(), socketEntry); - if(i != socketEntries.end() && (*i) == socketEntry) { - -#ifdef HAVE_EPOLL - -#ifdef ENABLE_ASYNC_DNS - - if(rs.isNull()) { - (*i)->removeCommandEvent(command, events); - } else { - (*i)->removeADNSEvent(rs, command); - } - -#else // !ENABLE_ASYNC_DNS - - (*i)->removeCommandEvent(command, events); - -#endif // !ENABLE_ASYNC_DNS - -#else // !HAVE_EPOLL - - (*i)->removeCommandEvent(command, events); - -#endif // !HAVE_EPOLL - - int r = 0; - if((*i)->eventEmpty()) { - -#ifdef HAVE_EPOLL - - r = epoll_ctl(_epfd, EPOLL_CTL_DEL, (*i)->getSocket(), 0); - -#endif // HAVE_EPOLL - - socketEntries.erase(i); - } else { - -#ifdef HAVE_EPOLL - - // If socket is closed, then it seems it is automatically removed from - // epoll, so following EPOLL_CTL_MOD may fail. - r = epoll_ctl(_epfd, EPOLL_CTL_MOD, (*i)->getSocket(), &(*i)->getEpEvent()); - if(r == -1) { - logger->debug("Failed to delete socket event, but may be ignored:%s", strerror(errno)); - } -#endif // HAVE_EPOLL - - } - -#ifndef HAVE_EPOLL - - updateFdSet(); - -#endif // !HAVE_EPOLL - - if(r == -1) { - logger->debug("Failed to delete socket event:%s", strerror(errno)); - return false; - } else { - return true; - } - } else { - logger->debug("Socket %d is not found in SocketEntries.", socket); - return false; + tv.tv_sec = 1; + tv.tv_usec = 0; } + _eventPoll->poll(tv); } bool DownloadEngine::addSocketForReadCheck(const SocketHandle& socket, Command* command) { - return addSocketEvents(socket->getSockfd(), command, SocketEntry::EVENT_READ); + return _eventPoll->addEvents(socket->getSockfd(), command, + EventPoll::EVENT_READ); } bool DownloadEngine::deleteSocketForReadCheck(const SocketHandle& socket, Command* command) { - return deleteSocketEvents(socket->getSockfd(), command, SocketEntry::EVENT_READ); + return _eventPoll->deleteEvents(socket->getSockfd(), command, + EventPoll::EVENT_READ); } bool DownloadEngine::addSocketForWriteCheck(const SocketHandle& socket, Command* command) { - return addSocketEvents(socket->getSockfd(), command, SocketEntry::EVENT_WRITE); + return _eventPoll->addEvents(socket->getSockfd(), command, + EventPoll::EVENT_WRITE); } bool DownloadEngine::deleteSocketForWriteCheck(const SocketHandle& socket, Command* command) { - return deleteSocketEvents(socket->getSockfd(), command, SocketEntry::EVENT_WRITE); + return _eventPoll->deleteEvents(socket->getSockfd(), command, + EventPoll::EVENT_WRITE); } void DownloadEngine::calculateStatistics() @@ -858,45 +246,13 @@ void DownloadEngine::addCommand(const Commands& commands) bool DownloadEngine::addNameResolverCheck (const SharedHandle& resolver, Command* command) { - SharedHandle entry - (new AsyncNameResolverEntry(resolver, command)); - std::deque >::iterator itr = - std::find(nameResolverEntries.begin(), nameResolverEntries.end(), entry); - if(itr == nameResolverEntries.end()) { - nameResolverEntries.push_back(entry); - -#ifdef HAVE_EPOLL - - entry->addSocketEvents(this); - -#endif // HAVE_EPOLL - - return true; - } else { - return false; - } + return _eventPoll->addNameResolver(resolver, command); } bool DownloadEngine::deleteNameResolverCheck (const SharedHandle& resolver, Command* command) { - SharedHandle entry - (new AsyncNameResolverEntry(resolver, command)); - std::deque >::iterator itr = - std::find(nameResolverEntries.begin(), nameResolverEntries.end(), entry); - if(itr == nameResolverEntries.end()) { - return false; - } else { - -#ifdef HAVE_EPOLL - - (*itr)->removeSocketEvents(this); - -#endif // HAVE_EPOLL - - nameResolverEntries.erase(itr); - return true; - } + return _eventPoll->deleteNameResolver(resolver, command); } #endif // ENABLE_ASYNC_DNS diff --git a/src/DownloadEngine.h b/src/DownloadEngine.h index 81d6c050..e8ca3452 100644 --- a/src/DownloadEngine.h +++ b/src/DownloadEngine.h @@ -37,16 +37,11 @@ #include "common.h" -#ifdef HAVE_EPOLL_CREATE -# include -#endif // HAVE_EPOLL_CREATE - #include #include #include #include "SharedHandle.h" -#include "Command.h" #include "a2netcompat.h" #include "TimeA2.h" #include "a2io.h" @@ -69,196 +64,14 @@ class BtRegistry; class DNSCache; class AuthConfigFactory; class Request; - -class CommandEvent -{ -private: - Command* _command; - int _events; -public: - CommandEvent(Command* command, int events); - - Command* getCommand() const; - - int getEvents() const; - - void addEvents(int events); - - void removeEvents(int events); - - bool eventsEmpty() const; - - void processEvents(int events); - - bool operator==(const CommandEvent& event) const; -}; - -#if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS - -class ADNSEvent { -private: - SharedHandle _resolver; - Command* _command; - sock_t _socket; - int _events; -public: - ADNSEvent(const SharedHandle& resolver, Command* command, - sock_t socket, int events); - - void processEvents(int events); - - bool operator==(const ADNSEvent& event) const; - - int getEvents() const; -}; - -#endif // HAVE_EPOLL && ENABLE_ASYNC_DNS - -class SocketEntry { -private: - sock_t _socket; - - std::deque _commandEvents; - -#if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS - - std::deque _adnsEvents; - -#endif // HAVE_EPOLL && ENABLE_ASYNC_DNS - -#ifdef HAVE_EPOLL - - struct epoll_event _epEvent; - -#endif // HAVE_EPOLL - -public: - -#ifdef HAVE_EPOLL - - enum EventType { - EVENT_READ = EPOLLIN, - EVENT_WRITE = EPOLLOUT, - EVENT_ERROR = EPOLLERR, - EVENT_HUP = EPOLLHUP, - }; - -#else // !HAVE_EPOLL - - enum EventType { - EVENT_READ = 1, - EVENT_WRITE = 1 << 1, - EVENT_ERROR = 1 << 2, - EVENT_HUP = 1 << 3, - }; - -#endif // !HAVE_EPOLL - - SocketEntry(sock_t socket); - - bool operator==(const SocketEntry& entry) const; - - bool operator<(const SocketEntry& entry) const; - - void addCommandEvent(Command* command, int events); - - void removeCommandEvent(Command* command, int events); - -#if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS - - void addADNSEvent(const SharedHandle& resolver, - Command* command, int events); - - void removeADNSEvent(const SharedHandle& resolver, - Command* command); - -#endif // HAVE_EPOLL && ENABLE_ASYNC_DNS - -#ifdef HAVE_EPOLL - - struct epoll_event& getEpEvent(); - -#else // !HAVE_EPOLL - - int getEvents(); - -#endif // !HAVE_EPOLL - - sock_t getSocket() const; - - bool eventEmpty() const; - - void processEvents(int events); - -}; - -#ifdef ENABLE_ASYNC_DNS - -class DownloadEngine; - -class AsyncNameResolverEntry { -private: - SharedHandle _nameResolver; - - Command* _command; - -#ifdef HAVE_EPOLL - // HAVE_EPOLL assumes c-ares - - size_t _socketsSize; - - sock_t _sockets[ARES_GETSOCK_MAXNUM]; - -#endif // HAVE_EPOLL - -public: - AsyncNameResolverEntry(const SharedHandle& nameResolver, - Command* command); - - bool operator==(const AsyncNameResolverEntry& entry); - -#ifdef HAVE_EPOLL - - void addSocketEvents(DownloadEngine* e); - - void removeSocketEvents(DownloadEngine* e); - -#else // !HAVE_EPOLL - - int getFds(fd_set* rfdsPtr, fd_set* wfdsPtr); - - void process(fd_set* rfdsPtr, fd_set* wfdsPtr); - -#endif // !HAVE_EPOLL - -}; - -#endif // ENABLE_ASYNC_DNS +class EventPoll; +class Command; class DownloadEngine { private: void waitData(); - std::deque > socketEntries; -#ifdef ENABLE_ASYNC_DNS - std::deque > nameResolverEntries; -#endif // ENABLE_ASYNC_DNS -#ifdef HAVE_EPOLL - - int _epfd; - - struct epoll_event* _epEvents; - - static const size_t EPOLL_EVENTS_MAX = 1024; - -#else // !HAVE_EPOLL - // If epoll is not available, then use select system call. - - fd_set rfdset; - fd_set wfdset; - int fdmax; - -#endif // !HAVE_EPOLL + SharedHandle _eventPoll; Logger* logger; @@ -312,8 +125,6 @@ private: SharedHandle _authConfigFactory; - void shortSleep() const; - /** * Delegates to StatCalc */ @@ -336,7 +147,7 @@ public: SharedHandle _checkIntegrityMan; const Option* option; - DownloadEngine(); + DownloadEngine(const SharedHandle& eventPoll); virtual ~DownloadEngine(); @@ -344,12 +155,6 @@ public: void cleanQueue(); -#ifndef HAVE_EPOLL - - void updateFdSet(); - -#endif // !HAVE_EPOLL - bool addSocketForReadCheck(const SharedHandle& socket, Command* command); bool deleteSocketForReadCheck(const SharedHandle& socket, @@ -359,20 +164,6 @@ public: bool deleteSocketForWriteCheck(const SharedHandle& socket, Command* command); - bool addSocketEvents(sock_t socket, Command* command, int events -#if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS - ,const SharedHandle& rs = - SharedHandle() -#endif // HAVE_EPOLL && ENABLE_ASYNC_DNS - ); - - bool deleteSocketEvents(sock_t socket, Command* command, int events -#if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS - ,const SharedHandle& rs = - SharedHandle() -#endif // HAVE_EPOLL && ENABLE_ASYNC_DNS - ); - #ifdef ENABLE_ASYNC_DNS bool addNameResolverCheck(const SharedHandle& resolver, @@ -381,7 +172,7 @@ public: Command* command); #endif // ENABLE_ASYNC_DNS - void addCommand(const Commands& commands); + void addCommand(const std::deque& commands); void fillCommand(); diff --git a/src/DownloadEngineFactory.cc b/src/DownloadEngineFactory.cc index da72e7df..13f9ea1d 100644 --- a/src/DownloadEngineFactory.cc +++ b/src/DownloadEngineFactory.cc @@ -54,6 +54,12 @@ #include "TimedHaltCommand.h" #include "DownloadResult.h" #include "ServerStatMan.h" +#include "a2io.h" +#ifdef HAVE_EPOLL +# include "EpollEventPoll.h" +#endif // HAVE_EPOLL +#include "SelectEventPoll.h" +#include "DlAbortEx.h" namespace aria2 { @@ -79,7 +85,24 @@ DownloadEngineFactory::newDownloadEngine(Option* op, workingSet = requestGroups; } - DownloadEngineHandle e(new DownloadEngine()); + SharedHandle eventPoll; +#ifdef HAVE_EPOLL + if(op->get(PREF_EVENT_POLL) == V_EPOLL) { + SharedHandle ep(new EpollEventPoll()); + if(ep->good()) { + eventPoll = ep; + } else { + throw DlAbortEx("Initializing EpollEventPoll failed." + " Try --event-poll=select"); + } + } else +#endif // HAVE_EPLL + if(op->get(PREF_EVENT_POLL) == V_SELECT) { + eventPoll.reset(new SelectEventPoll()); + } else { + abort(); + } + DownloadEngineHandle e(new DownloadEngine(eventPoll)); e->option = op; RequestGroupManHandle diff --git a/src/EpollEventPoll.cc b/src/EpollEventPoll.cc new file mode 100644 index 00000000..bf505550 --- /dev/null +++ b/src/EpollEventPoll.cc @@ -0,0 +1,546 @@ +/* */ +#include "EpollEventPoll.h" + +#include +#include +#include + +#include "Command.h" +#include "LogFactory.h" +#include "Logger.h" + +namespace aria2 { + +EpollEventPoll::CommandEvent::CommandEvent(Command* command, int events): + _command(command), _events(events) {} + +bool EpollEventPoll::CommandEvent::operator== +(const CommandEvent& commandEvent) const +{ + return _command == commandEvent._command; +} + +Command* EpollEventPoll::CommandEvent::getCommand() const +{ + return _command; +} + +int EpollEventPoll::CommandEvent::getEvents() const +{ + return _events; +} + +void EpollEventPoll::CommandEvent::addEvents(int events) +{ + _events |= events; +} + +void EpollEventPoll::CommandEvent::removeEvents(int events) +{ + _events &= (~events); +} + +bool EpollEventPoll::CommandEvent::eventsEmpty() const +{ + return _events == 0; +} + +void EpollEventPoll::CommandEvent::processEvents(int events) +{ + if((_events&events) || + ((EpollEventPoll::EVENT_ERROR|EpollEventPoll::EVENT_HUP)&events)) { + _command->setStatusActive(); + } + if(EpollEventPoll::EVENT_READ&events) { + _command->readEventReceived(); + } + if(EpollEventPoll::EVENT_WRITE&events) { + _command->writeEventReceived(); + } + if(EpollEventPoll::EVENT_ERROR&events) { + _command->errorEventReceived(); + } + if(EpollEventPoll::EVENT_HUP&events) { + _command->hupEventReceived(); + } +} + +void EpollEventPoll::CommandEvent::addSelf +(const SharedHandle& socketEntry) const +{ + socketEntry->addCommandEvent(*this); +} + +void EpollEventPoll::CommandEvent::removeSelf +(const SharedHandle& socketEntry) const +{ + socketEntry->removeCommandEvent(*this); +} + +#ifdef ENABLE_ASYNC_DNS + +EpollEventPoll::ADNSEvent::ADNSEvent +(const SharedHandle& resolver, + Command* command, + sock_t socket, int events): + _resolver(resolver), _command(command), _socket(socket), _events(events) {} + +bool EpollEventPoll::ADNSEvent::operator==(const ADNSEvent& event) const +{ + return _resolver == event._resolver; +} + +int EpollEventPoll::ADNSEvent::getEvents() const +{ + return _events; +} + +void EpollEventPoll::ADNSEvent::processEvents(int events) +{ + ares_socket_t readfd; + ares_socket_t writefd; + if(events&(EpollEventPoll::EVENT_READ|EpollEventPoll::EVENT_ERROR| + EpollEventPoll::EVENT_HUP)) { + readfd = _socket; + } else { + readfd = ARES_SOCKET_BAD; + } + if(events&(EpollEventPoll::EVENT_WRITE|EpollEventPoll::EVENT_ERROR| + EpollEventPoll::EVENT_HUP)) { + writefd = _socket; + } else { + writefd = ARES_SOCKET_BAD; + } + _resolver->process(readfd, writefd); + _command->setStatusActive(); +} + +void EpollEventPoll::ADNSEvent::addSelf +(const SharedHandle& socketEntry) const +{ + socketEntry->addADNSEvent(*this); +} + +void EpollEventPoll::ADNSEvent::removeSelf +(const SharedHandle& socketEntry) const +{ + socketEntry->removeADNSEvent(*this); +} + +#endif // ENABLE_ASYNC_DNS + +EpollEventPoll::SocketEntry::SocketEntry(sock_t socket):_socket(socket) +{ + memset(&_epEvent, 0, sizeof(struct epoll_event)); +} + +bool EpollEventPoll::SocketEntry::operator==(const SocketEntry& entry) const +{ + return _socket == entry._socket; +} + +bool EpollEventPoll::SocketEntry::operator<(const SocketEntry& entry) const +{ + return _socket < entry._socket; +} + +void EpollEventPoll::SocketEntry::addCommandEvent(const CommandEvent& cev) +{ + std::deque::iterator i = std::find(_commandEvents.begin(), + _commandEvents.end(), + cev); + if(i == _commandEvents.end()) { + _commandEvents.push_back(cev); + } else { + (*i).addEvents(cev.getEvents()); + } +} + +void EpollEventPoll::SocketEntry::removeCommandEvent(const CommandEvent& cev) +{ + std::deque::iterator i = std::find(_commandEvents.begin(), + _commandEvents.end(), + cev); + if(i == _commandEvents.end()) { + // not found + } else { + (*i).removeEvents(cev.getEvents()); + if((*i).eventsEmpty()) { + _commandEvents.erase(i); + } + } +} + +#ifdef ENABLE_ASYNC_DNS + +void EpollEventPoll::SocketEntry::addADNSEvent(const ADNSEvent& aev) +{ + std::deque::iterator i = std::find(_adnsEvents.begin(), + _adnsEvents.end(), + aev); + if(i == _adnsEvents.end()) { + _adnsEvents.push_back(aev); + } +} + +void EpollEventPoll::SocketEntry::removeADNSEvent(const ADNSEvent& aev) +{ + std::deque::iterator i = std::find(_adnsEvents.begin(), + _adnsEvents.end(), + aev); + if(i == _adnsEvents.end()) { + // not found + } else { + _adnsEvents.erase(i); + } +} + +#endif // ENABLE_ASYNC_DNS + +void EpollEventPoll::SocketEntry::processEvents(int events) +{ + std::for_each(_commandEvents.begin(), _commandEvents.end(), + std::bind2nd(std::mem_fun_ref + (&EpollEventPoll::CommandEvent::processEvents), + events)); +#ifdef ENABLE_ASYNC_DNS + + std::for_each(_adnsEvents.begin(), _adnsEvents.end(), + std::bind2nd(std::mem_fun_ref + (&EpollEventPoll::ADNSEvent::processEvents), + events)); + +#endif // ENABLE_ASYNC_DNS + +} + +sock_t EpollEventPoll::SocketEntry::getSocket() const +{ + return _socket; +} + +bool EpollEventPoll::SocketEntry::eventEmpty() const +{ + +#ifdef ENABLE_ASYNC_DNS + + return _commandEvents.empty() && _adnsEvents.empty(); + +#else // !ENABLE_ASYNC_DNS + + return _commandEvents.empty(); + +#endif // !ENABLE_ASYNC_DNS) + +} + +int accumulateEvent(int events, const EpollEventPoll::Event& event) +{ + return events|event.getEvents(); +} + +struct epoll_event& EpollEventPoll::SocketEntry::getEpEvent() +{ + _epEvent.data.ptr = this; + +#ifdef ENABLE_ASYNC_DNS + + _epEvent.events = + std::accumulate(_adnsEvents.begin(), + _adnsEvents.end(), + std::accumulate(_commandEvents.begin(), + _commandEvents.end(), 0, accumulateEvent), + accumulateEvent); + +#else // !ENABLE_ASYNC_DNS + + _epEvent.events = + std::accumulate(_commandEvents.begin(), _commandEvents.end(), 0, + accumulateEvent); + +#endif // !ENABLE_ASYNC_DNS + return _epEvent; +} + +#ifdef ENABLE_ASYNC_DNS + +EpollEventPoll::AsyncNameResolverEntry::AsyncNameResolverEntry +(const SharedHandle& nameResolver, Command* command): + _nameResolver(nameResolver), _command(command), _socketsSize(0) + +{} + +bool EpollEventPoll::AsyncNameResolverEntry::operator== +(const AsyncNameResolverEntry& entry) +{ + return _nameResolver == entry._nameResolver && + _command == entry._command; +} + +void EpollEventPoll::AsyncNameResolverEntry::addSocketEvents +(EpollEventPoll* e) +{ + _socketsSize = 0; + int mask = _nameResolver->getsock(_sockets); + if(mask == 0) { + return; + } + size_t i; + for(i = 0; i < ARES_GETSOCK_MAXNUM; ++i) { + //epoll_event_t* epEventPtr = &_epEvents[_socketsSize]; + int events = 0; + if(ARES_GETSOCK_READABLE(mask, i)) { + events |= EPOLLIN; + } + if(ARES_GETSOCK_WRITABLE(mask, i)) { + events |= EPOLLOUT; + } + if(events == 0) { + // assume no further sockets are returned. + break; + } + e->addEvents(_sockets[i], _command, events, _nameResolver); + } + _socketsSize = i; +} + +void EpollEventPoll::AsyncNameResolverEntry::removeSocketEvents +(EpollEventPoll* e) +{ + for(size_t i = 0; i < _socketsSize; ++i) { + e->deleteEvents(_sockets[i], _command, _nameResolver); + } +} + +#endif // ENABLE_ASYNC_DNS + +EpollEventPoll::EpollEventPoll():_logger(LogFactory::getInstance()) +{ + _epfd = epoll_create(EPOLL_EVENTS_MAX); + + _epEvents = new struct epoll_event[EPOLL_EVENTS_MAX]; +} + +EpollEventPoll::~EpollEventPoll() +{ + if(_epfd != -1) { + int r; + while((r = close(_epfd)) == -1 && errno == EINTR); + if(r == -1) { + _logger->error("Error occurred while closing epoll file descriptor" + " %d: %s", + _epfd, strerror(errno)); + } + } + delete [] _epEvents; +} + +bool EpollEventPoll::good() const +{ + return _epfd != -1; +} + +void EpollEventPoll::poll(const struct timeval& tv) +{ + // timeout is millisec + int timeout = tv.tv_sec*1000+tv.tv_usec/1000; + + int res; + while((res = epoll_wait(_epfd, _epEvents, EPOLL_EVENTS_MAX, timeout)) == -1 && + errno == EINTR); + + if(res > 0) { + for(int i = 0; i < res; ++i) { + SocketEntry* p = (SocketEntry*)_epEvents[i].data.ptr; + p->processEvents(_epEvents[i].events); + } + } + + // TODO timeout of name resolver is determined in Command(AbstractCommand, + // DHTEntryPoint...Command) +} + +static int translateEvents(EventPoll::EventType events) +{ + int newEvents = 0; + if(EventPoll::EVENT_READ&events) { + newEvents |= EPOLLIN; + } + if(EventPoll::EVENT_WRITE&events) { + newEvents |= EPOLLOUT; + } + if(EventPoll::EVENT_ERROR&events) { + newEvents |= EPOLLERR; + } + if(EventPoll::EVENT_HUP&events) { + newEvents |= EPOLLHUP; + } + return newEvents; +} + +bool EpollEventPoll::addEvents(sock_t socket, + const EpollEventPoll::Event& event) +{ + SharedHandle socketEntry(new SocketEntry(socket)); + std::deque >::iterator i = + std::lower_bound(_socketEntries.begin(), _socketEntries.end(), socketEntry); + int r = 0; + if(i != _socketEntries.end() && (*i) == socketEntry) { + + event.addSelf(*i); + + r = epoll_ctl(_epfd, EPOLL_CTL_MOD, (*i)->getSocket(), &(*i)->getEpEvent()); + if(r == -1) { + // try EPOLL_CTL_ADD: There is a chance that previously socket X is + // added to epoll, but it is closed and is not yet removed from + // SocketEntries. In this case, EPOLL_CTL_MOD is failed with ENOENT. + + r = epoll_ctl(_epfd, EPOLL_CTL_ADD, (*i)->getSocket(), + &(*i)->getEpEvent()); + } + } else { + _socketEntries.insert(i, socketEntry); + + event.addSelf(socketEntry); + + r = epoll_ctl(_epfd, EPOLL_CTL_ADD, socketEntry->getSocket(), + &socketEntry->getEpEvent()); + } + if(r == -1) { + _logger->debug("Failed to add socket event %d:%s", socket, strerror(errno)); + return false; + } else { + return true; + } +} + +bool EpollEventPoll::addEvents(sock_t socket, Command* command, + EventPoll::EventType events) +{ + int epEvents = translateEvents(events); + return addEvents(socket, CommandEvent(command, epEvents)); +} + +bool EpollEventPoll::addEvents(sock_t socket, Command* command, int events, + const SharedHandle& rs) +{ + return addEvents(socket, ADNSEvent(rs, command, socket, events)); +} + +bool EpollEventPoll::deleteEvents(sock_t socket, + const EpollEventPoll::Event& event) +{ + SharedHandle socketEntry(new SocketEntry(socket)); + std::deque >::iterator i = + std::lower_bound(_socketEntries.begin(), _socketEntries.end(), socketEntry); + if(i != _socketEntries.end() && (*i) == socketEntry) { + + event.removeSelf(*i); + + int r = 0; + if((*i)->eventEmpty()) { + r = epoll_ctl(_epfd, EPOLL_CTL_DEL, (*i)->getSocket(), 0); + _socketEntries.erase(i); + } else { + // If socket is closed, then it seems it is automatically removed from + // epoll, so following EPOLL_CTL_MOD may fail. + r = epoll_ctl(_epfd, EPOLL_CTL_MOD, (*i)->getSocket(), + &(*i)->getEpEvent()); + if(r == -1) { + _logger->debug("Failed to delete socket event, but may be ignored:%s", + strerror(errno)); + } + } + if(r == -1) { + _logger->debug("Failed to delete socket event:%s", strerror(errno)); + return false; + } else { + return true; + } + } else { + _logger->debug("Socket %d is not found in SocketEntries.", socket); + return false; + } +} + +bool EpollEventPoll::deleteEvents(sock_t socket, Command* command, + const SharedHandle& rs) +{ + return deleteEvents(socket, ADNSEvent(rs, command, socket, 0)); +} + +bool EpollEventPoll::deleteEvents(sock_t socket, Command* command, + EventPoll::EventType events) +{ + int epEvents = translateEvents(events); + return deleteEvents(socket, CommandEvent(command, epEvents)); +} + +#ifdef ENABLE_ASYNC_DNS +bool EpollEventPoll::addNameResolver +(const SharedHandle& resolver, Command* command) +{ + SharedHandle entry + (new AsyncNameResolverEntry(resolver, command)); + std::deque >::iterator itr = + std::find(_nameResolverEntries.begin(), _nameResolverEntries.end(), entry); + if(itr == _nameResolverEntries.end()) { + _nameResolverEntries.push_back(entry); + entry->addSocketEvents(this); + return true; + } else { + return false; + } +} + +bool EpollEventPoll::deleteNameResolver +(const SharedHandle& resolver, Command* command) +{ + SharedHandle entry + (new AsyncNameResolverEntry(resolver, command)); + std::deque >::iterator itr = + std::find(_nameResolverEntries.begin(), _nameResolverEntries.end(), entry); + if(itr == _nameResolverEntries.end()) { + return false; + } else { + (*itr)->removeSocketEvents(this); + _nameResolverEntries.erase(itr); + return true; + } +} +#endif // ENABLE_ASYNC_DNS + +} // namespace aria2 diff --git a/src/EpollEventPoll.h b/src/EpollEventPoll.h new file mode 100644 index 00000000..47012c55 --- /dev/null +++ b/src/EpollEventPoll.h @@ -0,0 +1,245 @@ +/* */ +#ifndef _D_EPOLL_EVENT_POLL_H_ +#define _D_EPLLL_EVENT_POLL_H_ + +#include "EventPoll.h" + +# include + +#include + +#ifdef ENABLE_ASYNC_DNS +# include "AsyncNameResolver.h" +#endif // ENABLE_ASYNC_DNS + +namespace aria2 { + +class Logger; + +class EpollEventPoll : public EventPoll { + +private: + + class SocketEntry; + + class Event { + public: + virtual ~Event() {} + + virtual void processEvents(int events) = 0; + + virtual int getEvents() const = 0; + + virtual void addSelf(const SharedHandle& socketEntry) const =0; + + virtual void removeSelf + (const SharedHandle& socketEntry) const =0; + }; + + friend int accumulateEvent(int events, const Event& event); + + class CommandEvent : public Event { + private: + Command* _command; + int _events; + public: + CommandEvent(Command* command, int events); + + Command* getCommand() const; + + void addEvents(int events); + + void removeEvents(int events); + + bool eventsEmpty() const; + + bool operator==(const CommandEvent& event) const; + + virtual int getEvents() const; + + virtual void processEvents(int events); + + virtual void addSelf(const SharedHandle& socketEntry) const; + + virtual void removeSelf(const SharedHandle& socketEntry) const; + }; + + class ADNSEvent : public Event { + private: + SharedHandle _resolver; + Command* _command; + sock_t _socket; + int _events; + public: + ADNSEvent(const SharedHandle& resolver, Command* command, + sock_t socket, int events); + + bool operator==(const ADNSEvent& event) const; + + virtual int getEvents() const; + + virtual void processEvents(int events); + + virtual void addSelf(const SharedHandle& socketEntry) const; + + virtual void removeSelf(const SharedHandle& socketEntry) const; + }; + + + class SocketEntry { + private: + sock_t _socket; + + std::deque _commandEvents; + +#ifdef ENABLE_ASYNC_DNS + + std::deque _adnsEvents; + +#endif // ENABLE_ASYNC_DNS + + struct epoll_event _epEvent; + + public: + SocketEntry(sock_t socket); + + bool operator==(const SocketEntry& entry) const; + + bool operator<(const SocketEntry& entry) const; + + void addCommandEvent(const CommandEvent& cev); + + void removeCommandEvent(const CommandEvent& cev); + +#ifdef ENABLE_ASYNC_DNS + + void addADNSEvent(const ADNSEvent& aev); + + void removeADNSEvent(const ADNSEvent& aev); + +#endif // ENABLE_ASYNC_DNS + + struct epoll_event& getEpEvent(); + + sock_t getSocket() const; + + bool eventEmpty() const; + + void processEvents(int events); + }; + +#ifdef ENABLE_ASYNC_DNS + + class AsyncNameResolverEntry { + private: + SharedHandle _nameResolver; + + Command* _command; + + size_t _socketsSize; + + sock_t _sockets[ARES_GETSOCK_MAXNUM]; + + public: + AsyncNameResolverEntry(const SharedHandle& nameResolver, + Command* command); + + bool operator==(const AsyncNameResolverEntry& entry); + + void addSocketEvents(EpollEventPoll* socketPoll); + + void removeSocketEvents(EpollEventPoll* socketPoll); + }; + +#endif // ENABLE_ASYNC_DNS +private: + enum EventType { + EVENT_READ = EPOLLIN, + EVENT_WRITE = EPOLLOUT, + EVENT_ERROR = EPOLLERR, + EVENT_HUP = EPOLLHUP, + }; + + std::deque > _socketEntries; +#ifdef ENABLE_ASYNC_DNS + std::deque > _nameResolverEntries; +#endif // ENABLE_ASYNC_DNS + + int _epfd; + + struct epoll_event* _epEvents; + + static const size_t EPOLL_EVENTS_MAX = 1024; + + Logger* _logger; + + bool addEvents(sock_t socket, const Event& event); + + bool deleteEvents(sock_t socket, const Event& event); + + bool addEvents(sock_t socket, Command* command, int events, + const SharedHandle& rs); + + bool deleteEvents(sock_t socket, Command* command, + const SharedHandle& rs); + +public: + EpollEventPoll(); + + bool good() const; + + virtual ~EpollEventPoll(); + + virtual void poll(const struct timeval& tv); + + virtual bool addEvents(sock_t socket, + Command* command, EventPoll::EventType events); + + virtual bool deleteEvents(sock_t socket, + Command* command, EventPoll::EventType events); +#ifdef ENABLE_ASYNC_DNS + + virtual bool addNameResolver(const SharedHandle& resolver, + Command* command); + virtual bool deleteNameResolver + (const SharedHandle& resolver, Command* command); +#endif // ENABLE_ASYNC_DNS + +}; + +} // namespace aria2 + +#endif // _D_EVENT_POLL_H_ diff --git a/src/EventPoll.h b/src/EventPoll.h new file mode 100644 index 00000000..b4dcec97 --- /dev/null +++ b/src/EventPoll.h @@ -0,0 +1,79 @@ +/* */ +#ifndef _D_EVENT_POLL_H_ +#define _D_EVENT_POLL_H_ + +#include "common.h" +#include "SharedHandle.h" +#include "TimeA2.h" +#include "a2netcompat.h" + +namespace aria2 { + +class SocketCore; +class Command; +class AsyncNameResolver; + +class EventPoll { + +public: + enum EventType { + EVENT_READ = 1, + EVENT_WRITE = 1 << 1, + EVENT_ERROR = 1 << 2, + EVENT_HUP = 1 << 3, + }; + + virtual ~EventPoll() {} + + virtual void poll(const struct timeval& tv) = 0; + + virtual bool addEvents(sock_t socket, Command* command, EventType events) = 0; + + virtual bool deleteEvents(sock_t socket, Command* command, + EventType events) = 0; +#ifdef ENABLE_ASYNC_DNS + + virtual bool addNameResolver(const SharedHandle& resolver, + Command* command) = 0; + virtual bool deleteNameResolver + (const SharedHandle& resolver, Command* command) = 0; +#endif // ENABLE_ASYNC_DNS + +}; + +} // namespace aria2 + +#endif // _D_EVENT_POLL_H_ diff --git a/src/Makefile.am b/src/Makefile.am index 7739de9a..8bc1a448 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -196,7 +196,13 @@ SRCS = Socket.h\ SocketBuffer.cc SocketBuffer.h\ OptionHandlerException.cc OptionHandlerException.h\ bencode.cc bencode.h\ - URIResult.cc URIResult.h + URIResult.cc URIResult.h\ + EventPoll.h\ + SelectEventPoll.cc SelectEventPoll.h + +if HAVE_EPOLL +SRCS += EpollEventPoll.cc EpollEventPoll.h +endif # HAVE_EPOLL if ENABLE_SSL SRCS += TLSContext.h diff --git a/src/Makefile.in b/src/Makefile.in index 60891f0b..e8a0948f 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -35,13 +35,14 @@ build_triplet = @build@ host_triplet = @host@ target_triplet = @target@ bin_PROGRAMS = aria2c$(EXEEXT) -@ENABLE_SSL_TRUE@am__append_1 = TLSContext.h -@HAVE_LIBGNUTLS_TRUE@am__append_2 = LibgnutlsTLSContext.cc LibgnutlsTLSContext.h -@HAVE_LIBSSL_TRUE@am__append_3 = LibsslTLSContext.cc LibsslTLSContext.h -@HAVE_LIBZ_TRUE@am__append_4 = GZipDecoder.cc GZipDecoder.h -@HAVE_SQLITE3_TRUE@am__append_5 = Sqlite3MozCookieParser.cc Sqlite3MozCookieParser.h -@ENABLE_ASYNC_DNS_TRUE@am__append_6 = AsyncNameResolver.cc AsyncNameResolver.h -@ENABLE_MESSAGE_DIGEST_TRUE@am__append_7 = IteratableChunkChecksumValidator.cc IteratableChunkChecksumValidator.h\ +@HAVE_EPOLL_TRUE@am__append_1 = EpollEventPoll.cc EpollEventPoll.h +@ENABLE_SSL_TRUE@am__append_2 = TLSContext.h +@HAVE_LIBGNUTLS_TRUE@am__append_3 = LibgnutlsTLSContext.cc LibgnutlsTLSContext.h +@HAVE_LIBSSL_TRUE@am__append_4 = LibsslTLSContext.cc LibsslTLSContext.h +@HAVE_LIBZ_TRUE@am__append_5 = GZipDecoder.cc GZipDecoder.h +@HAVE_SQLITE3_TRUE@am__append_6 = Sqlite3MozCookieParser.cc Sqlite3MozCookieParser.h +@ENABLE_ASYNC_DNS_TRUE@am__append_7 = AsyncNameResolver.cc AsyncNameResolver.h +@ENABLE_MESSAGE_DIGEST_TRUE@am__append_8 = IteratableChunkChecksumValidator.cc IteratableChunkChecksumValidator.h\ @ENABLE_MESSAGE_DIGEST_TRUE@ IteratableChecksumValidator.cc IteratableChecksumValidator.h\ @ENABLE_MESSAGE_DIGEST_TRUE@ CheckIntegrityCommand.cc CheckIntegrityCommand.h\ @ENABLE_MESSAGE_DIGEST_TRUE@ ChecksumCheckIntegrityEntry.cc ChecksumCheckIntegrityEntry.h\ @@ -50,7 +51,7 @@ bin_PROGRAMS = aria2c$(EXEEXT) @ENABLE_MESSAGE_DIGEST_TRUE@ Checksum.h\ @ENABLE_MESSAGE_DIGEST_TRUE@ ChunkChecksum.h -@ENABLE_BITTORRENT_TRUE@am__append_8 = PeerMessageUtil.cc PeerMessageUtil.h\ +@ENABLE_BITTORRENT_TRUE@am__append_9 = PeerMessageUtil.cc PeerMessageUtil.h\ @ENABLE_BITTORRENT_TRUE@ PeerAbstractCommand.cc PeerAbstractCommand.h\ @ENABLE_BITTORRENT_TRUE@ PeerInitiateConnectionCommand.cc PeerInitiateConnectionCommand.h\ @ENABLE_BITTORRENT_TRUE@ PeerInteractionCommand.cc PeerInteractionCommand.h\ @@ -218,7 +219,7 @@ bin_PROGRAMS = aria2c$(EXEEXT) @ENABLE_BITTORRENT_TRUE@ BtLeecherStateChoke.cc BtLeecherStateChoke.h\ @ENABLE_BITTORRENT_TRUE@ BtSeederStateChoke.cc BtSeederStateChoke.h -@ENABLE_METALINK_TRUE@am__append_9 = Metalinker.cc Metalinker.h\ +@ENABLE_METALINK_TRUE@am__append_10 = Metalinker.cc Metalinker.h\ @ENABLE_METALINK_TRUE@ MetalinkEntry.cc MetalinkEntry.h\ @ENABLE_METALINK_TRUE@ MetalinkResource.cc MetalinkResource.h\ @ENABLE_METALINK_TRUE@ MetalinkProcessor.h\ @@ -247,17 +248,17 @@ bin_PROGRAMS = aria2c$(EXEEXT) @ENABLE_METALINK_TRUE@ MetalinkPostDownloadHandler.cc MetalinkPostDownloadHandler.h\ @ENABLE_METALINK_TRUE@ MetalinkHelper.cc MetalinkHelper.h -@ENABLE_LIBXML2_TRUE@am__append_10 = XML2SAXMetalinkProcessor.cc XML2SAXMetalinkProcessor.h -@ENABLE_LIBEXPAT_TRUE@am__append_11 = ExpatMetalinkProcessor.cc ExpatMetalinkProcessor.h -@HAVE_ASCTIME_R_FALSE@am__append_12 = asctime_r.c asctime_r.h -@HAVE_BASENAME_FALSE@am__append_13 = libgen.c libgen.h -@HAVE_GETADDRINFO_FALSE@am__append_14 = getaddrinfo.c getaddrinfo.h -@HAVE_GAI_STRERROR_FALSE@am__append_15 = gai_strerror.c gai_strerror.h -@HAVE_GETTIMEOFDAY_FALSE@am__append_16 = gettimeofday.c gettimeofday.h -@HAVE_INET_ATON_FALSE@am__append_17 = inet_aton.c inet_aton.h -@HAVE_LOCALTIME_R_FALSE@am__append_18 = localtime_r.c localtime_r.h -@HAVE_STRPTIME_FALSE@am__append_19 = strptime.c strptime.h -@HAVE_TIMEGM_FALSE@am__append_20 = timegm.c timegm.h +@ENABLE_LIBXML2_TRUE@am__append_11 = XML2SAXMetalinkProcessor.cc XML2SAXMetalinkProcessor.h +@ENABLE_LIBEXPAT_TRUE@am__append_12 = ExpatMetalinkProcessor.cc ExpatMetalinkProcessor.h +@HAVE_ASCTIME_R_FALSE@am__append_13 = asctime_r.c asctime_r.h +@HAVE_BASENAME_FALSE@am__append_14 = libgen.c libgen.h +@HAVE_GETADDRINFO_FALSE@am__append_15 = getaddrinfo.c getaddrinfo.h +@HAVE_GAI_STRERROR_FALSE@am__append_16 = gai_strerror.c gai_strerror.h +@HAVE_GETTIMEOFDAY_FALSE@am__append_17 = gettimeofday.c gettimeofday.h +@HAVE_INET_ATON_FALSE@am__append_18 = inet_aton.c inet_aton.h +@HAVE_LOCALTIME_R_FALSE@am__append_19 = localtime_r.c localtime_r.h +@HAVE_STRPTIME_FALSE@am__append_20 = strptime.c strptime.h +@HAVE_TIMEGM_FALSE@am__append_21 = timegm.c timegm.h subdir = src DIST_COMMON = $(srcdir)/Makefile.am $(srcdir)/Makefile.in alloca.c ACLOCAL_M4 = $(top_srcdir)/aclocal.m4 @@ -412,9 +413,11 @@ am__libaria2c_a_SOURCES_DIST = Socket.h SocketCore.cc SocketCore.h \ CookieStorage.cc CookieStorage.h SocketBuffer.cc \ SocketBuffer.h OptionHandlerException.cc \ OptionHandlerException.h bencode.cc bencode.h URIResult.cc \ - URIResult.h TLSContext.h LibgnutlsTLSContext.cc \ - LibgnutlsTLSContext.h LibsslTLSContext.cc LibsslTLSContext.h \ - GZipDecoder.cc GZipDecoder.h Sqlite3MozCookieParser.cc \ + URIResult.h EventPoll.h SelectEventPoll.cc SelectEventPoll.h \ + EpollEventPoll.cc EpollEventPoll.h TLSContext.h \ + LibgnutlsTLSContext.cc LibgnutlsTLSContext.h \ + LibsslTLSContext.cc LibsslTLSContext.h GZipDecoder.cc \ + GZipDecoder.h Sqlite3MozCookieParser.cc \ Sqlite3MozCookieParser.h AsyncNameResolver.cc \ AsyncNameResolver.h IteratableChunkChecksumValidator.cc \ IteratableChunkChecksumValidator.h \ @@ -569,19 +572,20 @@ am__libaria2c_a_SOURCES_DIST = Socket.h SocketCore.cc SocketCore.h \ gai_strerror.h gettimeofday.c gettimeofday.h inet_aton.c \ inet_aton.h localtime_r.c localtime_r.h strptime.c strptime.h \ timegm.c timegm.h -am__objects_1 = -@HAVE_LIBGNUTLS_TRUE@am__objects_2 = LibgnutlsTLSContext.$(OBJEXT) -@HAVE_LIBSSL_TRUE@am__objects_3 = LibsslTLSContext.$(OBJEXT) -@HAVE_LIBZ_TRUE@am__objects_4 = GZipDecoder.$(OBJEXT) -@HAVE_SQLITE3_TRUE@am__objects_5 = Sqlite3MozCookieParser.$(OBJEXT) -@ENABLE_ASYNC_DNS_TRUE@am__objects_6 = AsyncNameResolver.$(OBJEXT) -@ENABLE_MESSAGE_DIGEST_TRUE@am__objects_7 = IteratableChunkChecksumValidator.$(OBJEXT) \ +@HAVE_EPOLL_TRUE@am__objects_1 = EpollEventPoll.$(OBJEXT) +am__objects_2 = +@HAVE_LIBGNUTLS_TRUE@am__objects_3 = LibgnutlsTLSContext.$(OBJEXT) +@HAVE_LIBSSL_TRUE@am__objects_4 = LibsslTLSContext.$(OBJEXT) +@HAVE_LIBZ_TRUE@am__objects_5 = GZipDecoder.$(OBJEXT) +@HAVE_SQLITE3_TRUE@am__objects_6 = Sqlite3MozCookieParser.$(OBJEXT) +@ENABLE_ASYNC_DNS_TRUE@am__objects_7 = AsyncNameResolver.$(OBJEXT) +@ENABLE_MESSAGE_DIGEST_TRUE@am__objects_8 = IteratableChunkChecksumValidator.$(OBJEXT) \ @ENABLE_MESSAGE_DIGEST_TRUE@ IteratableChecksumValidator.$(OBJEXT) \ @ENABLE_MESSAGE_DIGEST_TRUE@ CheckIntegrityCommand.$(OBJEXT) \ @ENABLE_MESSAGE_DIGEST_TRUE@ ChecksumCheckIntegrityEntry.$(OBJEXT) \ @ENABLE_MESSAGE_DIGEST_TRUE@ messageDigest.$(OBJEXT) \ @ENABLE_MESSAGE_DIGEST_TRUE@ MessageDigestHelper.$(OBJEXT) -@ENABLE_BITTORRENT_TRUE@am__objects_8 = PeerMessageUtil.$(OBJEXT) \ +@ENABLE_BITTORRENT_TRUE@am__objects_9 = PeerMessageUtil.$(OBJEXT) \ @ENABLE_BITTORRENT_TRUE@ PeerAbstractCommand.$(OBJEXT) \ @ENABLE_BITTORRENT_TRUE@ PeerInitiateConnectionCommand.$(OBJEXT) \ @ENABLE_BITTORRENT_TRUE@ PeerInteractionCommand.$(OBJEXT) \ @@ -689,7 +693,7 @@ am__objects_1 = @ENABLE_BITTORRENT_TRUE@ MSEHandshake.$(OBJEXT) \ @ENABLE_BITTORRENT_TRUE@ BtLeecherStateChoke.$(OBJEXT) \ @ENABLE_BITTORRENT_TRUE@ BtSeederStateChoke.$(OBJEXT) -@ENABLE_METALINK_TRUE@am__objects_9 = Metalinker.$(OBJEXT) \ +@ENABLE_METALINK_TRUE@am__objects_10 = Metalinker.$(OBJEXT) \ @ENABLE_METALINK_TRUE@ MetalinkEntry.$(OBJEXT) \ @ENABLE_METALINK_TRUE@ MetalinkResource.$(OBJEXT) \ @ENABLE_METALINK_TRUE@ MetalinkProcessorFactory.$(OBJEXT) \ @@ -715,20 +719,20 @@ am__objects_1 = @ENABLE_METALINK_TRUE@ Metalink2RequestGroup.$(OBJEXT) \ @ENABLE_METALINK_TRUE@ MetalinkPostDownloadHandler.$(OBJEXT) \ @ENABLE_METALINK_TRUE@ MetalinkHelper.$(OBJEXT) -@ENABLE_LIBXML2_TRUE@am__objects_10 = \ +@ENABLE_LIBXML2_TRUE@am__objects_11 = \ @ENABLE_LIBXML2_TRUE@ XML2SAXMetalinkProcessor.$(OBJEXT) -@ENABLE_LIBEXPAT_TRUE@am__objects_11 = \ +@ENABLE_LIBEXPAT_TRUE@am__objects_12 = \ @ENABLE_LIBEXPAT_TRUE@ ExpatMetalinkProcessor.$(OBJEXT) -@HAVE_ASCTIME_R_FALSE@am__objects_12 = asctime_r.$(OBJEXT) -@HAVE_BASENAME_FALSE@am__objects_13 = libgen.$(OBJEXT) -@HAVE_GETADDRINFO_FALSE@am__objects_14 = getaddrinfo.$(OBJEXT) -@HAVE_GAI_STRERROR_FALSE@am__objects_15 = gai_strerror.$(OBJEXT) -@HAVE_GETTIMEOFDAY_FALSE@am__objects_16 = gettimeofday.$(OBJEXT) -@HAVE_INET_ATON_FALSE@am__objects_17 = inet_aton.$(OBJEXT) -@HAVE_LOCALTIME_R_FALSE@am__objects_18 = localtime_r.$(OBJEXT) -@HAVE_STRPTIME_FALSE@am__objects_19 = strptime.$(OBJEXT) -@HAVE_TIMEGM_FALSE@am__objects_20 = timegm.$(OBJEXT) -am__objects_21 = SocketCore.$(OBJEXT) Command.$(OBJEXT) \ +@HAVE_ASCTIME_R_FALSE@am__objects_13 = asctime_r.$(OBJEXT) +@HAVE_BASENAME_FALSE@am__objects_14 = libgen.$(OBJEXT) +@HAVE_GETADDRINFO_FALSE@am__objects_15 = getaddrinfo.$(OBJEXT) +@HAVE_GAI_STRERROR_FALSE@am__objects_16 = gai_strerror.$(OBJEXT) +@HAVE_GETTIMEOFDAY_FALSE@am__objects_17 = gettimeofday.$(OBJEXT) +@HAVE_INET_ATON_FALSE@am__objects_18 = inet_aton.$(OBJEXT) +@HAVE_LOCALTIME_R_FALSE@am__objects_19 = localtime_r.$(OBJEXT) +@HAVE_STRPTIME_FALSE@am__objects_20 = strptime.$(OBJEXT) +@HAVE_TIMEGM_FALSE@am__objects_21 = timegm.$(OBJEXT) +am__objects_22 = SocketCore.$(OBJEXT) Command.$(OBJEXT) \ AbstractCommand.$(OBJEXT) \ InitiateConnectionCommandFactory.$(OBJEXT) \ DownloadCommand.$(OBJEXT) \ @@ -802,15 +806,16 @@ am__objects_21 = SocketCore.$(OBJEXT) Command.$(OBJEXT) \ InOrderURISelector.$(OBJEXT) ServerStatURISelector.$(OBJEXT) \ NsCookieParser.$(OBJEXT) CookieStorage.$(OBJEXT) \ SocketBuffer.$(OBJEXT) OptionHandlerException.$(OBJEXT) \ - bencode.$(OBJEXT) URIResult.$(OBJEXT) $(am__objects_1) \ - $(am__objects_2) $(am__objects_3) $(am__objects_4) \ - $(am__objects_5) $(am__objects_6) $(am__objects_7) \ - $(am__objects_8) $(am__objects_9) $(am__objects_10) \ - $(am__objects_11) $(am__objects_12) $(am__objects_13) \ - $(am__objects_14) $(am__objects_15) $(am__objects_16) \ - $(am__objects_17) $(am__objects_18) $(am__objects_19) \ - $(am__objects_20) -am_libaria2c_a_OBJECTS = $(am__objects_21) + bencode.$(OBJEXT) URIResult.$(OBJEXT) \ + SelectEventPoll.$(OBJEXT) $(am__objects_1) $(am__objects_2) \ + $(am__objects_3) $(am__objects_4) $(am__objects_5) \ + $(am__objects_6) $(am__objects_7) $(am__objects_8) \ + $(am__objects_9) $(am__objects_10) $(am__objects_11) \ + $(am__objects_12) $(am__objects_13) $(am__objects_14) \ + $(am__objects_15) $(am__objects_16) $(am__objects_17) \ + $(am__objects_18) $(am__objects_19) $(am__objects_20) \ + $(am__objects_21) +am_libaria2c_a_OBJECTS = $(am__objects_22) libaria2c_a_OBJECTS = $(am_libaria2c_a_OBJECTS) am__installdirs = "$(DESTDIR)$(bindir)" binPROGRAMS_INSTALL = $(INSTALL_PROGRAM) @@ -1135,13 +1140,14 @@ SRCS = Socket.h SocketCore.cc SocketCore.h BinaryStream.h Command.cc \ CookieStorage.cc CookieStorage.h SocketBuffer.cc \ SocketBuffer.h OptionHandlerException.cc \ OptionHandlerException.h bencode.cc bencode.h URIResult.cc \ - URIResult.h $(am__append_1) $(am__append_2) $(am__append_3) \ + URIResult.h EventPoll.h SelectEventPoll.cc SelectEventPoll.h \ + $(am__append_1) $(am__append_2) $(am__append_3) \ $(am__append_4) $(am__append_5) $(am__append_6) \ $(am__append_7) $(am__append_8) $(am__append_9) \ $(am__append_10) $(am__append_11) $(am__append_12) \ $(am__append_13) $(am__append_14) $(am__append_15) \ $(am__append_16) $(am__append_17) $(am__append_18) \ - $(am__append_19) $(am__append_20) + $(am__append_19) $(am__append_20) $(am__append_21) noinst_LIBRARIES = libaria2c.a libaria2c_a_SOURCES = $(SRCS) aria2c_LDADD = libaria2c.a @LIBINTL@ @ALLOCA@ @LIBGNUTLS_LIBS@\ @@ -1368,6 +1374,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DownloadHandler.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DownloadHandlerConstants.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/DownloadHandlerFactory.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/EpollEventPoll.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Exception.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ExpatMetalinkProcessor.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/FeatureConfig.Po@am__quote@ @@ -1474,6 +1481,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ResourcesMetalinkParserState.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SeedCheckCommand.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SegmentMan.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SelectEventPoll.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ServerHost.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ServerStat.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ServerStatMan.Po@am__quote@ diff --git a/src/OptionHandlerFactory.cc b/src/OptionHandlerFactory.cc index 2fc2a760..a7ee3ac7 100644 --- a/src/OptionHandlerFactory.cc +++ b/src/OptionHandlerFactory.cc @@ -150,6 +150,26 @@ OptionHandlers OptionHandlerFactory::createOptionHandlers() op->addTag(TAG_ADVANCED); handlers.push_back(op); } + { + std::string params[] = { +#ifdef HAVE_EPOLL + V_EPOLL, +#endif // HAVE_EPOLL + V_SELECT + }; + SharedHandle op(new ParameterOptionHandler + (PREF_EVENT_POLL, + TEXT_EVENT_POLL, +#ifdef HAVE_EPOLL + V_EPOLL, +#else // !HAVE_EPOLL + V_SELECT, +#endif // !HAVE_EPOLL + std::deque + (¶ms[0],¶ms[arrayLength(params)]))); + op->addTag(TAG_ADVANCED); + handlers.push_back(op); + } { SharedHandle op(new ParameterOptionHandler (PREF_FILE_ALLOCATION, diff --git a/src/RequestGroupMan.cc b/src/RequestGroupMan.cc index e2ec7379..f0d7c393 100644 --- a/src/RequestGroupMan.cc +++ b/src/RequestGroupMan.cc @@ -62,6 +62,7 @@ #include "prefs.h" #include "File.h" #include "Util.h" +#include "Command.h" namespace aria2 { diff --git a/src/SelectEventPoll.cc b/src/SelectEventPoll.cc new file mode 100644 index 00000000..f32b669b --- /dev/null +++ b/src/SelectEventPoll.cc @@ -0,0 +1,361 @@ +/* */ +#include "SelectEventPoll.h" + +#include +#include +#include +#include + +#include "Command.h" +#include "LogFactory.h" +#include "Logger.h" + +namespace aria2 { + +SelectEventPoll::CommandEvent::CommandEvent(Command* command, int events): + _command(command), _events(events) {} + +bool SelectEventPoll::CommandEvent::operator==(const CommandEvent& commandEvent) const +{ + return _command == commandEvent._command; +} + +Command* SelectEventPoll::CommandEvent::getCommand() const +{ + return _command; +} + +int SelectEventPoll::CommandEvent::getEvents() const +{ + return _events; +} + +void SelectEventPoll::CommandEvent::addEvents(int events) +{ + _events |= events; +} + +void SelectEventPoll::CommandEvent::removeEvents(int events) +{ + _events &= (~events); +} + +bool SelectEventPoll::CommandEvent::eventsEmpty() const +{ + return _events == 0; +} + +void SelectEventPoll::CommandEvent::processEvents(int events) +{ + if((_events&events) || + ((EventPoll::EVENT_ERROR|EventPoll::EVENT_HUP)&events)) { + _command->setStatusActive(); + } + if(EventPoll::EVENT_READ&events) { + _command->readEventReceived(); + } + if(EventPoll::EVENT_WRITE&events) { + _command->writeEventReceived(); + } + if(EventPoll::EVENT_ERROR&events) { + _command->errorEventReceived(); + } + if(EventPoll::EVENT_HUP&events) { + _command->hupEventReceived(); + } +} + +SelectEventPoll::SocketEntry::SocketEntry(sock_t socket):_socket(socket) {} + +bool SelectEventPoll::SocketEntry::operator==(const SocketEntry& entry) const +{ + return _socket == entry._socket; +} + +bool SelectEventPoll::SocketEntry::operator<(const SocketEntry& entry) const +{ + return _socket < entry._socket; +} + +void SelectEventPoll::SocketEntry::addCommandEvent +(Command* command, int events) +{ + CommandEvent cev(command, events); + std::deque::iterator i = std::find(_commandEvents.begin(), + _commandEvents.end(), + cev); + if(i == _commandEvents.end()) { + _commandEvents.push_back(cev); + } else { + (*i).addEvents(events); + } +} +void SelectEventPoll::SocketEntry::removeCommandEvent +(Command* command, int events) +{ + CommandEvent cev(command, events); + std::deque::iterator i = std::find(_commandEvents.begin(), + _commandEvents.end(), + cev); + if(i == _commandEvents.end()) { + // not found + } else { + (*i).removeEvents(events); + if((*i).eventsEmpty()) { + _commandEvents.erase(i); + } + } +} +void SelectEventPoll::SocketEntry::processEvents(int events) +{ + std::for_each(_commandEvents.begin(), _commandEvents.end(), + std::bind2nd(std::mem_fun_ref(&CommandEvent::processEvents), + events)); +} + +sock_t SelectEventPoll::SocketEntry::getSocket() const +{ + return _socket; +} + +bool SelectEventPoll::SocketEntry::eventEmpty() const +{ + return _commandEvents.empty(); +} + +int accumulateEvent(int events, const SelectEventPoll::CommandEvent& event) +{ + return events|event.getEvents(); +} + +int SelectEventPoll::SocketEntry::getEvents() +{ + return + std::accumulate(_commandEvents.begin(), _commandEvents.end(), 0, + accumulateEvent); +} + +#ifdef ENABLE_ASYNC_DNS + +SelectEventPoll::AsyncNameResolverEntry::AsyncNameResolverEntry +(const SharedHandle& nameResolver, Command* command): + _nameResolver(nameResolver), _command(command) {} + +bool SelectEventPoll::AsyncNameResolverEntry::operator== +(const AsyncNameResolverEntry& entry) +{ + return _nameResolver == entry._nameResolver && + _command == entry._command; +} +int SelectEventPoll::AsyncNameResolverEntry::getFds +(fd_set* rfdsPtr, fd_set* wfdsPtr) +{ + return _nameResolver->getFds(rfdsPtr, wfdsPtr); +} + +void SelectEventPoll::AsyncNameResolverEntry::process +(fd_set* rfdsPtr, fd_set* wfdsPtr) +{ + _nameResolver->process(rfdsPtr, wfdsPtr); + switch(_nameResolver->getStatus()) { + case AsyncNameResolver::STATUS_SUCCESS: + case AsyncNameResolver::STATUS_ERROR: + _command->setStatusActive(); + break; + default: + break; + } +} + +#endif // ENABLE_ASYNC_DNS + +SelectEventPoll::SelectEventPoll() +{ + updateFdSet(); +} + +SelectEventPoll::~SelectEventPoll() {} + +void SelectEventPoll::poll(const struct timeval& tv) +{ + fd_set rfds; + fd_set wfds; + + memcpy(&rfds, &_rfdset, sizeof(fd_set)); + memcpy(&wfds, &_wfdset, sizeof(fd_set)); +#ifdef ENABLE_ASYNC_DNS + + for(std::deque >::iterator itr = + _nameResolverEntries.begin(); itr != _nameResolverEntries.end(); + ++itr) { + SharedHandle& entry = *itr; + int fd = entry->getFds(&rfds, &wfds); + // TODO force error if fd == 0 + if(_fdmax < fd) { + _fdmax = fd; + } + } + +#endif // ENABLE_ASYNC_DNS + int retval; + do { + struct timeval ttv = tv; + retval = select(_fdmax+1, &rfds, &wfds, NULL, &ttv); + } while(retval == -1 && errno == EINTR); + if(retval > 0) { + for(std::deque >::iterator i = + _socketEntries.begin(); i != _socketEntries.end(); ++i) { + int events = 0; + if(FD_ISSET((*i)->getSocket(), &rfds)) { + events |= EventPoll::EVENT_READ; + } + if(FD_ISSET((*i)->getSocket(), &wfds)) { + events |= EventPoll::EVENT_WRITE; + } + (*i)->processEvents(events); + } + } +#ifdef ENABLE_ASYNC_DNS + + for(std::deque >::iterator i = + _nameResolverEntries.begin(); i != _nameResolverEntries.end(); ++i) { + (*i)->process(&rfds, &wfds); + } + +#endif // ENABLE_ASYNC_DNS +} + +void SelectEventPoll::updateFdSet() +{ + _fdmax = 0; + FD_ZERO(&_rfdset); + FD_ZERO(&_wfdset); + for(std::deque >::iterator i = + _socketEntries.begin(); i != _socketEntries.end(); ++i) { + sock_t fd = (*i)->getSocket(); + int events = (*i)->getEvents(); + if(events&EventPoll::EVENT_READ) { + FD_SET(fd, &_rfdset); + } + if(events&EventPoll::EVENT_WRITE) { + FD_SET(fd, &_wfdset); + } + if(_fdmax < fd) { + _fdmax = fd; + } + } +} + +bool SelectEventPoll::addEvents(sock_t socket, Command* command, + EventPoll::EventType events) +{ + SharedHandle socketEntry(new SocketEntry(socket)); + std::deque >::iterator i = + std::lower_bound(_socketEntries.begin(), _socketEntries.end(), socketEntry); + int r = 0; + if(i != _socketEntries.end() && (*i) == socketEntry) { + (*i)->addCommandEvent(command, events); + } else { + _socketEntries.insert(i, socketEntry); + socketEntry->addCommandEvent(command, events); + } + updateFdSet(); + if(r == -1) { + _logger->debug("Failed to add socket event %d:%s", socket, strerror(errno)); + return false; + } else { + return true; + } +} + +bool SelectEventPoll::deleteEvents(sock_t socket, Command* command, + EventPoll::EventType events) +{ + SharedHandle socketEntry(new SocketEntry(socket)); + std::deque >::iterator i = + std::lower_bound(_socketEntries.begin(), _socketEntries.end(), socketEntry); + if(i != _socketEntries.end() && (*i) == socketEntry) { + (*i)->removeCommandEvent(command, events); + int r = 0; + if((*i)->eventEmpty()) { + _socketEntries.erase(i); + } + updateFdSet(); + if(r == -1) { + _logger->debug("Failed to delete socket event:%s", strerror(errno)); + return false; + } else { + return true; + } + } else { + _logger->debug("Socket %d is not found in SocketEntries.", socket); + return false; + } +} + +#ifdef ENABLE_ASYNC_DNS +bool SelectEventPoll::addNameResolver +(const SharedHandle& resolver, Command* command) +{ + SharedHandle entry + (new AsyncNameResolverEntry(resolver, command)); + std::deque >::iterator itr = + std::find(_nameResolverEntries.begin(), _nameResolverEntries.end(), entry); + if(itr == _nameResolverEntries.end()) { + _nameResolverEntries.push_back(entry); + return true; + } else { + return false; + } +} + +bool SelectEventPoll::deleteNameResolver +(const SharedHandle& resolver, Command* command) +{ + SharedHandle entry + (new AsyncNameResolverEntry(resolver, command)); + std::deque >::iterator itr = + std::find(_nameResolverEntries.begin(), _nameResolverEntries.end(), entry); + if(itr == _nameResolverEntries.end()) { + return false; + } else { + _nameResolverEntries.erase(itr); + return true; + } +} +#endif // ENABLE_ASYNC_DNS + +} // namespace aria2 diff --git a/src/SelectEventPoll.h b/src/SelectEventPoll.h new file mode 100644 index 00000000..c8326d7e --- /dev/null +++ b/src/SelectEventPoll.h @@ -0,0 +1,157 @@ +/* */ +#ifndef _D_SELECT_EVENT_POLL_H_ +#define _D_SELECT_EVENT_POLL_H_ + +#include "EventPoll.h" + +#include + +#ifdef ENABLE_ASYNC_DNS +# include "AsyncNameResolver.h" +#endif // ENABLE_ASYNC_DNS + +namespace aria2 { + +class Logger; + +class SelectEventPoll : public EventPoll { +private: + class CommandEvent { + private: + Command* _command; + int _events; + public: + CommandEvent(Command* command, int events); + + Command* getCommand() const; + + void addEvents(int events); + + void removeEvents(int events); + + bool eventsEmpty() const; + + bool operator==(const CommandEvent& event) const; + + int getEvents() const; + + void processEvents(int events); + }; + + friend int accumulateEvent + (int events, const SelectEventPoll::CommandEvent& event); + + class SocketEntry { + private: + sock_t _socket; + + std::deque _commandEvents; + public: + SocketEntry(sock_t socket); + + bool operator==(const SocketEntry& entry) const; + + bool operator<(const SocketEntry& entry) const; + + void addCommandEvent(Command* command, int events); + + void removeCommandEvent(Command* command, int events); + + int getEvents(); + + sock_t getSocket() const; + + bool eventEmpty() const; + + void processEvents(int events); + }; + +#ifdef ENABLE_ASYNC_DNS + + class AsyncNameResolverEntry { + private: + SharedHandle _nameResolver; + + Command* _command; + + public: + AsyncNameResolverEntry(const SharedHandle& nameResolver, + Command* command); + + bool operator==(const AsyncNameResolverEntry& entry); + + int getFds(fd_set* rfdsPtr, fd_set* wfdsPtr); + + void process(fd_set* rfdsPtr, fd_set* wfdsPtr); + }; +#endif // ENABLE_ASYNC_DNS + + fd_set _rfdset; + fd_set _wfdset; + int _fdmax; + + std::deque > _socketEntries; +#ifdef ENABLE_ASYNC_DNS + std::deque > _nameResolverEntries; +#endif // ENABLE_ASYNC_DNS + + Logger* _logger; + + void updateFdSet(); +public: + SelectEventPoll(); + + virtual ~SelectEventPoll(); + + virtual void poll(const struct timeval& tv); + + virtual bool addEvents(sock_t socket, + Command* command, EventPoll::EventType events); + + virtual bool deleteEvents(sock_t socket, + Command* command, EventPoll::EventType events); +#ifdef ENABLE_ASYNC_DNS + + virtual bool addNameResolver(const SharedHandle& resolver, + Command* command); + virtual bool deleteNameResolver + (const SharedHandle& resolver, Command* command); +#endif // ENABLE_ASYNC_DNS +}; + +} // namespace aria2 + +#endif // _D_SELECT_EVENT_POLL_H_ diff --git a/src/SocketCore.cc b/src/SocketCore.cc index 9224c82a..29361ec8 100644 --- a/src/SocketCore.cc +++ b/src/SocketCore.cc @@ -76,6 +76,12 @@ namespace aria2 { +#ifdef HAVE_EPOLL +SocketCore::PollMethod SocketCore::_pollMethod = SocketCore::POLL_METHOD_EPOLL; +#else // !HAVE_EPOLL +SocketCore::PollMethod SocketCore::_pollMethod = SocketCore::POLL_METHOD_SELECT; +#endif // !HAVE_EPOLL + #ifdef ENABLE_SSL SharedHandle SocketCore::_tlsContext; @@ -364,50 +370,48 @@ void SocketCore::initEPOLL() bool SocketCore::isWritable(time_t timeout) { - #ifdef HAVE_EPOLL - - if(_epfd == -1) { - initEPOLL(); - } - struct epoll_event epEvents[1]; - int r; - while((r = epoll_wait(_epfd, epEvents, 1, 0)) == -1 && errno == EINTR); - - if(r > 0) { - return epEvents[0].events&(EPOLLOUT|EPOLLHUP|EPOLLERR); - } else if(r == 0) { - return false; - } else { - throw DlRetryEx(StringFormat(EX_SOCKET_CHECK_WRITABLE, errorMsg()).str()); - } - -#else // !HAVE_EPOLL - - fd_set fds; - FD_ZERO(&fds); - FD_SET(sockfd, &fds); - - struct timeval tv; - tv.tv_sec = timeout; - tv.tv_usec = 0; - - int r = select(sockfd+1, NULL, &fds, NULL, &tv); - if(r == 1) { - return true; - } else if(r == 0) { - // time out - return false; - } else { - if(SOCKET_ERRNO == A2_EINPROGRESS || SOCKET_ERRNO == EINTR) { + if(_pollMethod == SocketCore::POLL_METHOD_EPOLL) { + if(_epfd == -1) { + initEPOLL(); + } + struct epoll_event epEvents[1]; + int r; + while((r = epoll_wait(_epfd, epEvents, 1, 0)) == -1 && errno == EINTR); + if(r > 0) { + return epEvents[0].events&(EPOLLOUT|EPOLLHUP|EPOLLERR); + } else if(r == 0) { return false; } else { throw DlRetryEx(StringFormat(EX_SOCKET_CHECK_WRITABLE, errorMsg()).str()); } - } + } else +#endif // HAVE_EPOLL + if(_pollMethod == SocketCore::POLL_METHOD_SELECT) { + fd_set fds; + FD_ZERO(&fds); + FD_SET(sockfd, &fds); -#endif // !HAVE_EPOLL + struct timeval tv; + tv.tv_sec = timeout; + tv.tv_usec = 0; + int r = select(sockfd+1, NULL, &fds, NULL, &tv); + if(r == 1) { + return true; + } else if(r == 0) { + // time out + return false; + } else { + if(SOCKET_ERRNO == A2_EINPROGRESS || SOCKET_ERRNO == EINTR) { + return false; + } else { + throw DlRetryEx(StringFormat(EX_SOCKET_CHECK_WRITABLE, errorMsg()).str()); + } + } + } else { + abort(); + } } bool SocketCore::isReadable(time_t timeout) @@ -419,48 +423,48 @@ bool SocketCore::isReadable(time_t timeout) #endif // HAVE_LIBGNUTLS #ifdef HAVE_EPOLL + if(_pollMethod == SocketCore::POLL_METHOD_EPOLL) { + if(_epfd == -1) { + initEPOLL(); + } + struct epoll_event epEvents[1]; + int r; + while((r = epoll_wait(_epfd, epEvents, 1, 0)) == -1 && errno == EINTR); - if(_epfd == -1) { - initEPOLL(); - } - struct epoll_event epEvents[1]; - int r; - while((r = epoll_wait(_epfd, epEvents, 1, 0)) == -1 && errno == EINTR); - - if(r > 0) { - return epEvents[0].events&(EPOLLIN|EPOLLHUP|EPOLLERR); - } else if(r == 0) { - return false; - } else { - throw DlRetryEx(StringFormat(EX_SOCKET_CHECK_READABLE, errorMsg()).str()); - } - -#else // !HAVE_EPOLL - - fd_set fds; - FD_ZERO(&fds); - FD_SET(sockfd, &fds); - - struct timeval tv; - tv.tv_sec = timeout; - tv.tv_usec = 0; - - int r = select(sockfd+1, &fds, NULL, NULL, &tv); - if(r == 1) { - return true; - } else if(r == 0) { - // time out - return false; - } else { - if(SOCKET_ERRNO == A2_EINPROGRESS || SOCKET_ERRNO == EINTR) { + if(r > 0) { + return epEvents[0].events&(EPOLLIN|EPOLLHUP|EPOLLERR); + } else if(r == 0) { return false; } else { throw DlRetryEx(StringFormat(EX_SOCKET_CHECK_READABLE, errorMsg()).str()); } - } + } else +#endif // HAVE_EPOLL + if(_pollMethod == SocketCore::POLL_METHOD_SELECT) { + fd_set fds; + FD_ZERO(&fds); + FD_SET(sockfd, &fds); -#endif // !HAVE_EPOLL + struct timeval tv; + tv.tv_sec = timeout; + tv.tv_usec = 0; + int r = select(sockfd+1, &fds, NULL, NULL, &tv); + if(r == 1) { + return true; + } else if(r == 0) { + // time out + return false; + } else { + if(SOCKET_ERRNO == A2_EINPROGRESS || SOCKET_ERRNO == EINTR) { + return false; + } else { + throw DlRetryEx(StringFormat(EX_SOCKET_CHECK_READABLE, errorMsg()).str()); + } + } + } else { + abort(); + } } #ifdef HAVE_LIBSSL @@ -1053,4 +1057,16 @@ bool SocketCore::wantWrite() const return _wantWrite; } +#ifdef HAVE_EPOLL +void SocketCore::useEpoll() +{ + _pollMethod = SocketCore::POLL_METHOD_EPOLL; +} + +void SocketCore::useSelect() +{ + _pollMethod = SocketCore::POLL_METHOD_SELECT; +} +#endif // HAVE_EPOLL + } // namespace aria2 diff --git a/src/SocketCore.h b/src/SocketCore.h index c385170b..11578616 100644 --- a/src/SocketCore.h +++ b/src/SocketCore.h @@ -37,9 +37,9 @@ #include "common.h" -#ifdef HAVE_EPOLL_CREATE +#ifdef HAVE_EPOLL # include -#endif // HAVE_EPOLL_CREATE +#endif // HAVE_EPOLL #include #include @@ -82,8 +82,15 @@ private: struct epoll_event _epEvent; + enum PollMethod { + POLL_METHOD_EPOLL, POLL_METHOD_SELECT + }; + + static PollMethod _pollMethod; + #endif // HAVE_EPOLL + bool blocking; int secure; @@ -327,6 +334,11 @@ public: */ bool wantWrite() const; +#ifdef HAVE_EPOLL + static void useEpoll(); +#endif // HAVE_EPOLL + static void useSelect(); + #ifdef ENABLE_SSL static void setTLSContext(const SharedHandle& tlsContext); #endif // ENABLE_SSL diff --git a/src/StreamFileAllocationEntry.cc b/src/StreamFileAllocationEntry.cc index c152f6fb..faea2d44 100644 --- a/src/StreamFileAllocationEntry.cc +++ b/src/StreamFileAllocationEntry.cc @@ -43,6 +43,7 @@ #include "RequestGroup.h" #include "InitiateConnectionCommandFactory.h" #include "DownloadContext.h" +#include "Command.h" namespace aria2 { diff --git a/src/a2io.h b/src/a2io.h index 6c3f4ef5..bb66c9fc 100644 --- a/src/a2io.h +++ b/src/a2io.h @@ -135,9 +135,4 @@ #define OPEN_MODE S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH #define DIR_OPEN_MODE S_IRWXU|S_IRWXG|S_IRWXO -#if defined HAVE_EPOLL_CREATE && \ - (defined HAVE_LIBCARES || !defined ENABLE_ASYNC_DNS) -# define HAVE_EPOLL 1 -#endif // HAVE_EPOLL_CREATE || (HAVE_LIBCARES || !ENABLE_ASYNC_DNS) - #endif // _D_A2IO_H_ diff --git a/src/main.cc b/src/main.cc index f3fee1f7..2ca2479d 100644 --- a/src/main.cc +++ b/src/main.cc @@ -68,6 +68,7 @@ #include "Exception.h" #include "ProtocolDetector.h" #include "RecoverableException.h" +#include "SocketCore.h" #ifdef ENABLE_METALINK # include "MetalinkHelper.h" # include "MetalinkEntry.h" @@ -176,6 +177,14 @@ DownloadResult::RESULT main(int argc, char* argv[]) if(op->getAsBool(PREF_QUIET)) { LogFactory::setConsoleOutput(false); } +#ifdef HAVE_EPOLL + if(op->get(PREF_EVENT_POLL) == V_EPOLL) { + SocketCore::useEpoll(); + } else +#endif // HAVE_EPOLL + if(op->get(PREF_EVENT_POLL) == V_SELECT) { + SocketCore::useSelect(); + } DownloadResult::RESULT exitStatus = DownloadResult::FINISHED; try { Logger* logger = LogFactory::getInstance(); diff --git a/src/option_processing.cc b/src/option_processing.cc index bc8d33d4..96ada2b0 100644 --- a/src/option_processing.cc +++ b/src/option_processing.cc @@ -191,6 +191,7 @@ Option* option_processing(int argc, char* const argv[]) { PREF_CHECK_CERTIFICATE.c_str(), optional_argument, &lopt, 234 }, { PREF_NO_PROXY.c_str(), required_argument, &lopt, 235 }, { PREF_USE_HEAD.c_str(), optional_argument, &lopt, 236 }, + { PREF_EVENT_POLL.c_str(), required_argument, &lopt, 237 }, #if defined ENABLE_BITTORRENT || defined ENABLE_METALINK { PREF_SHOW_FILES.c_str(), no_argument, NULL, 'S' }, { PREF_SELECT_FILE.c_str(), required_argument, &lopt, 21 }, @@ -475,6 +476,9 @@ Option* option_processing(int argc, char* const argv[]) case 236: cmdstream << PREF_USE_HEAD << "=" << toBoolArg(optarg) << "\n"; break; + case 237: + cmdstream << PREF_EVENT_POLL << "=" << optarg << "\n"; + break; } break; } diff --git a/src/prefs.cc b/src/prefs.cc index 1fdb318b..fcd68cf9 100644 --- a/src/prefs.cc +++ b/src/prefs.cc @@ -146,6 +146,10 @@ const std::string PREF_SERVER_STAT_OF("server-stat-of"); const std::string PREF_REMOTE_TIME("remote-time"); // value: 1*digit const std::string PREF_MAX_FILE_NOT_FOUND("max-file-not-found"); +// value: epoll | select +const std::string PREF_EVENT_POLL("event-poll"); +const std::string V_EPOLL("epoll"); +const std::string V_SELECT("select"); /** * FTP related preferences diff --git a/src/prefs.h b/src/prefs.h index a3ee16e6..b83c95c3 100644 --- a/src/prefs.h +++ b/src/prefs.h @@ -150,6 +150,10 @@ extern const std::string PREF_SERVER_STAT_OF; extern const std::string PREF_REMOTE_TIME; // value: 1*digit extern const std::string PREF_MAX_FILE_NOT_FOUND; +// value: epoll | select +extern const std::string PREF_EVENT_POLL; +extern const std::string V_EPOLL; +extern const std::string V_SELECT; /** * FTP related preferences diff --git a/src/usage_text.h b/src/usage_text.h index a073a4aa..582f3507 100644 --- a/src/usage_text.h +++ b/src/usage_text.h @@ -437,3 +437,5 @@ _(" --no-proxy=DOMAINS Specify comma separated hostnames or domains wh #define TEXT_USE_HEAD \ _(" --use-head[=true|false] Use HEAD method for the first request to the HTTP\n"\ " server.") +#define TEXT_EVENT_POLL \ +_(" --event-poll=POLL Specify the method for polling events.")