diff --git a/ChangeLog b/ChangeLog index 0502e01a..7c734833 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,22 @@ +2010-04-19 Tatsuhiro Tsujikawa + + Supported poll() for socket event notification. --event-poll can + take value "poll". Fixed the bug that timeout for + SocketCore::isReadable()/isWritable() is ignored when epoll is + used. + * src/OptionHandlerFactory.cc + * src/a2io.h + * src/prefs.h + * src/SocketCore.h + * src/Makefile.am + * src/main.cc + * src/PollEventPoll.h + * src/SocketCore.cc + * src/DownloadEngineFactory.cc + * src/PollEventPoll.cc + * src/prefs.cc + * configure.ac + 2010-04-19 Tatsuhiro Tsujikawa Reverted previous change to DownloadCommand.cc with additional diff --git a/config.h.in b/config.h.in index ff5564ad..fe6e30ef 100644 --- a/config.h.in +++ b/config.h.in @@ -305,6 +305,12 @@ /* Define 1 if struct option.name is const char* */ #undef HAVE_OPTION_CONST_NAME +/* Define to 1 if you have the `poll' function. */ +#undef HAVE_POLL + +/* Define to 1 if you have the header file. */ +#undef HAVE_POLL_H + /* Define to 1 if you have the `posix_fallocate' function. */ #undef HAVE_POSIX_FALLOCATE diff --git a/configure b/configure index 2c572d7e..afd2b23c 100755 --- a/configure +++ b/configure @@ -602,6 +602,8 @@ am__EXEEXT_TRUE LTLIBOBJS HAVE_TIMEGETTIME_FALSE HAVE_TIMEGETTIME_TRUE +HAVE_POLL_FALSE +HAVE_POLL_TRUE HAVE_MACH_ABSOLUTE_TIME_FALSE HAVE_MACH_ABSOLUTE_TIME_TRUE HAVE_DAEMON_FALSE @@ -7953,6 +7955,7 @@ for ac_header in argz.h \ malloc.h \ netdb.h \ netinet/in.h \ + poll.h \ stddef.h \ stdint.h \ stdio_ext.h \ @@ -15084,6 +15087,26 @@ else fi +for ac_func in poll +do : + ac_fn_cxx_check_func "$LINENO" "poll" "ac_cv_func_poll" +if test "x$ac_cv_func_poll" = x""yes; then : + cat >>confdefs.h <<_ACEOF +#define HAVE_POLL 1 +_ACEOF + have_poll=yes +fi +done + + if test "x$have_poll" = "xyes"; then + HAVE_POLL_TRUE= + HAVE_POLL_FALSE='#' +else + HAVE_POLL_TRUE='#' + HAVE_POLL_FALSE= +fi + + case "$target" in *mingw*) if true; then @@ -15455,6 +15478,10 @@ if test -z "${HAVE_MACH_ABSOLUTE_TIME_TRUE}" && test -z "${HAVE_MACH_ABSOLUTE_TI as_fn_error "conditional \"HAVE_MACH_ABSOLUTE_TIME\" was never defined. Usually this means the macro was only invoked conditionally." "$LINENO" 5 fi +if test -z "${HAVE_POLL_TRUE}" && test -z "${HAVE_POLL_FALSE}"; then + as_fn_error "conditional \"HAVE_POLL\" was never defined. +Usually this means the macro was only invoked conditionally." "$LINENO" 5 +fi if test -z "${HAVE_GETADDRINFO_TRUE}" && test -z "${HAVE_GETADDRINFO_FALSE}"; then as_fn_error "conditional \"HAVE_GETADDRINFO\" was never defined. Usually this means the macro was only invoked conditionally." "$LINENO" 5 diff --git a/configure.ac b/configure.ac index 2fd17abf..c01f4a75 100644 --- a/configure.ac +++ b/configure.ac @@ -248,6 +248,7 @@ AC_CHECK_HEADERS([argz.h \ malloc.h \ netdb.h \ netinet/in.h \ + poll.h \ stddef.h \ stdint.h \ stdio_ext.h \ @@ -394,6 +395,9 @@ fi AM_CONDITIONAL([HAVE_MACH_ABSOLUTE_TIME], [test "x$have_mach_absolute_time" = "xyes"]) +AC_CHECK_FUNCS([poll], [have_poll=yes]) +AM_CONDITIONAL([HAVE_POLL], [test "x$have_poll" = "xyes"]) + case "$target" in *mingw*) dnl defined in ws2tcpip.h, but only if _WIN32_WINNT >= 0x0501 diff --git a/src/DownloadEngineFactory.cc b/src/DownloadEngineFactory.cc index e3f93646..788cf270 100644 --- a/src/DownloadEngineFactory.cc +++ b/src/DownloadEngineFactory.cc @@ -61,6 +61,7 @@ #ifdef HAVE_EPOLL # include "EpollEventPoll.h" #endif // HAVE_EPOLL +#include "PollEventPoll.h" #include "SelectEventPoll.h" #include "DlAbortEx.h" #include "FileAllocationEntry.h" @@ -91,11 +92,16 @@ DownloadEngineFactory::newDownloadEngine } } else #endif // HAVE_EPLL - if(op->get(PREF_EVENT_POLL) == V_SELECT) { - eventPoll.reset(new SelectEventPoll()); - } else { - abort(); - } +#ifdef HAVE_POLL + if(op->get(PREF_EVENT_POLL) == V_POLL) { + eventPoll.reset(new PollEventPoll()); + } else +#endif // HAVE_POLL + if(op->get(PREF_EVENT_POLL) == V_SELECT) { + eventPoll.reset(new SelectEventPoll()); + } else { + abort(); + } DownloadEngineHandle e(new DownloadEngine(eventPoll)); e->option = op; diff --git a/src/Makefile.am b/src/Makefile.am index c33dccd1..7b2b8043 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -525,6 +525,10 @@ if HAVE_MACH_ABSOLUTE_TIME SRCS += clock_gettime_osx.cc clock_gettime_osx.h endif # HAVE_MACH_ABSOLUTE_TIME +if HAVE_POLL +SRCS += PollEventPoll.cc PollEventPoll.h +endif # HAVE_POLL + noinst_LIBRARIES = libaria2c.a libaria2c_a_SOURCES = $(SRCS) aria2c_LDADD = libaria2c.a @LIBINTL@ @ALLOCA@ @LIBGNUTLS_LIBS@\ diff --git a/src/Makefile.in b/src/Makefile.in index 34882b66..99e31744 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -279,6 +279,7 @@ bin_PROGRAMS = aria2c$(EXEEXT) @HAVE_DAEMON_FALSE@am__append_26 = daemon.cc daemon.h @HAVE_TIMEGETTIME_TRUE@am__append_27 = clock_gettime_mingw.cc clock_gettime_mingw.h @HAVE_MACH_ABSOLUTE_TIME_TRUE@am__append_28 = clock_gettime_osx.cc clock_gettime_osx.h +@HAVE_POLL_TRUE@am__append_29 = PollEventPoll.cc PollEventPoll.h subdir = src DIST_COMMON = $(srcdir)/Makefile.am $(srcdir)/Makefile.in alloca.c ACLOCAL_M4 = $(top_srcdir)/aclocal.m4 @@ -605,7 +606,8 @@ am__libaria2c_a_SOURCES_DIST = Socket.h SocketCore.cc SocketCore.h \ gai_strerror.h gettimeofday.c gettimeofday.h inet_aton.c \ inet_aton.h localtime_r.c localtime_r.h strptime.c strptime.h \ timegm.c timegm.h daemon.cc daemon.h clock_gettime_mingw.cc \ - clock_gettime_mingw.h clock_gettime_osx.cc clock_gettime_osx.h + clock_gettime_mingw.h clock_gettime_osx.cc clock_gettime_osx.h \ + PollEventPoll.cc PollEventPoll.h @ENABLE_XML_RPC_TRUE@am__objects_1 = \ @ENABLE_XML_RPC_TRUE@ XmlRpcRequestParserController.$(OBJEXT) \ @ENABLE_XML_RPC_TRUE@ XmlRpcRequestParserStateMachine.$(OBJEXT) \ @@ -786,7 +788,8 @@ am__objects_6 = @HAVE_TIMEGETTIME_TRUE@am__objects_27 = clock_gettime_mingw.$(OBJEXT) @HAVE_MACH_ABSOLUTE_TIME_TRUE@am__objects_28 = \ @HAVE_MACH_ABSOLUTE_TIME_TRUE@ clock_gettime_osx.$(OBJEXT) -am__objects_29 = SocketCore.$(OBJEXT) Command.$(OBJEXT) \ +@HAVE_POLL_TRUE@am__objects_29 = PollEventPoll.$(OBJEXT) +am__objects_30 = SocketCore.$(OBJEXT) Command.$(OBJEXT) \ AbstractCommand.$(OBJEXT) \ InitiateConnectionCommandFactory.$(OBJEXT) \ DownloadCommand.$(OBJEXT) \ @@ -873,8 +876,8 @@ am__objects_29 = SocketCore.$(OBJEXT) Command.$(OBJEXT) \ $(am__objects_18) $(am__objects_19) $(am__objects_20) \ $(am__objects_21) $(am__objects_22) $(am__objects_23) \ $(am__objects_24) $(am__objects_25) $(am__objects_26) \ - $(am__objects_27) $(am__objects_28) -am_libaria2c_a_OBJECTS = $(am__objects_29) + $(am__objects_27) $(am__objects_28) $(am__objects_29) +am_libaria2c_a_OBJECTS = $(am__objects_30) libaria2c_a_OBJECTS = $(am_libaria2c_a_OBJECTS) am__installdirs = "$(DESTDIR)$(bindir)" PROGRAMS = $(bin_PROGRAMS) @@ -1210,7 +1213,7 @@ SRCS = Socket.h SocketCore.cc SocketCore.h BinaryStream.h Command.cc \ $(am__append_19) $(am__append_20) $(am__append_21) \ $(am__append_22) $(am__append_23) $(am__append_24) \ $(am__append_25) $(am__append_26) $(am__append_27) \ - $(am__append_28) + $(am__append_28) $(am__append_29) noinst_LIBRARIES = libaria2c.a libaria2c_a_SOURCES = $(SRCS) aria2c_LDADD = libaria2c.a @LIBINTL@ @ALLOCA@ @LIBGNUTLS_LIBS@\ @@ -1549,6 +1552,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/PieceStatMan.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/PiecedSegment.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Platform.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/PollEventPoll.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/PriorityPieceSelector.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ProtocolDetector.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/RangeBtMessage.Po@am__quote@ diff --git a/src/OptionHandlerFactory.cc b/src/OptionHandlerFactory.cc index 66257738..94d64798 100644 --- a/src/OptionHandlerFactory.cc +++ b/src/OptionHandlerFactory.cc @@ -210,6 +210,9 @@ OptionHandlers OptionHandlerFactory::createOptionHandlers() #ifdef HAVE_EPOLL V_EPOLL, #endif // HAVE_EPOLL +#ifdef HAVE_POLL + V_POLL, +#endif // HAVE_POLL V_SELECT }; SharedHandle op(new ParameterOptionHandler diff --git a/src/PollEventPoll.cc b/src/PollEventPoll.cc new file mode 100644 index 00000000..98f1de3e --- /dev/null +++ b/src/PollEventPoll.cc @@ -0,0 +1,496 @@ +/* */ +#include "PollEventPoll.h" + +#include +#include +#include + +#include "Command.h" +#include "LogFactory.h" +#include "Logger.h" + +namespace aria2 { + +PollEventPoll::CommandEvent::CommandEvent(Command* command, int events): + _command(command), _events(events) {} + +int PollEventPoll::CommandEvent::getEvents() const +{ + return _events; +} + +void PollEventPoll::CommandEvent::processEvents(int events) +{ + if((_events&events) || + ((PollEventPoll::EVENT_ERROR|PollEventPoll::EVENT_HUP)&events)) { + _command->setStatusActive(); + } + if(PollEventPoll::EVENT_READ&events) { + _command->readEventReceived(); + } + if(PollEventPoll::EVENT_WRITE&events) { + _command->writeEventReceived(); + } + if(PollEventPoll::EVENT_ERROR&events) { + _command->errorEventReceived(); + } + if(PollEventPoll::EVENT_HUP&events) { + _command->hupEventReceived(); + } +} + +void PollEventPoll::CommandEvent::addSelf +(const SharedHandle& socketEntry) const +{ + socketEntry->addCommandEvent(*this); +} + +void PollEventPoll::CommandEvent::removeSelf +(const SharedHandle& socketEntry) const +{ + socketEntry->removeCommandEvent(*this); +} + +#ifdef ENABLE_ASYNC_DNS + +PollEventPoll::ADNSEvent::ADNSEvent +(const SharedHandle& resolver, + Command* command, + sock_t socket, int events): + _resolver(resolver), _command(command), _socket(socket), _events(events) {} + +int PollEventPoll::ADNSEvent::getEvents() const +{ + return _events; +} + +void PollEventPoll::ADNSEvent::processEvents(int events) +{ + ares_socket_t readfd; + ares_socket_t writefd; + if(events&(PollEventPoll::EVENT_READ|PollEventPoll::EVENT_ERROR| + PollEventPoll::EVENT_HUP)) { + readfd = _socket; + } else { + readfd = ARES_SOCKET_BAD; + } + if(events&(PollEventPoll::EVENT_WRITE|PollEventPoll::EVENT_ERROR| + PollEventPoll::EVENT_HUP)) { + writefd = _socket; + } else { + writefd = ARES_SOCKET_BAD; + } + _resolver->process(readfd, writefd); + _command->setStatusActive(); +} + +void PollEventPoll::ADNSEvent::addSelf +(const SharedHandle& socketEntry) const +{ + socketEntry->addADNSEvent(*this); +} + +void PollEventPoll::ADNSEvent::removeSelf +(const SharedHandle& socketEntry) const +{ + socketEntry->removeADNSEvent(*this); +} + +#endif // ENABLE_ASYNC_DNS + +PollEventPoll::SocketEntry::SocketEntry(sock_t socket):_socket(socket) +{ + memset(&_pollEvent, 0, sizeof(struct pollfd)); +} + +void PollEventPoll::SocketEntry::addCommandEvent(const CommandEvent& cev) +{ + std::deque::iterator i = std::find(_commandEvents.begin(), + _commandEvents.end(), + cev); + if(i == _commandEvents.end()) { + _commandEvents.push_back(cev); + } else { + (*i).addEvents(cev.getEvents()); + } +} + +void PollEventPoll::SocketEntry::removeCommandEvent(const CommandEvent& cev) +{ + std::deque::iterator i = std::find(_commandEvents.begin(), + _commandEvents.end(), + cev); + if(i == _commandEvents.end()) { + // not found + } else { + (*i).removeEvents(cev.getEvents()); + if((*i).eventsEmpty()) { + _commandEvents.erase(i); + } + } +} + +#ifdef ENABLE_ASYNC_DNS + +void PollEventPoll::SocketEntry::addADNSEvent(const ADNSEvent& aev) +{ + std::deque::iterator i = std::find(_adnsEvents.begin(), + _adnsEvents.end(), + aev); + if(i == _adnsEvents.end()) { + _adnsEvents.push_back(aev); + } +} + +void PollEventPoll::SocketEntry::removeADNSEvent(const ADNSEvent& aev) +{ + std::deque::iterator i = std::find(_adnsEvents.begin(), + _adnsEvents.end(), + aev); + if(i == _adnsEvents.end()) { + // not found + } else { + _adnsEvents.erase(i); + } +} + +#endif // ENABLE_ASYNC_DNS + +void PollEventPoll::SocketEntry::processEvents(int events) +{ + std::for_each(_commandEvents.begin(), _commandEvents.end(), + std::bind2nd(std::mem_fun_ref + (&PollEventPoll::CommandEvent::processEvents), + events)); +#ifdef ENABLE_ASYNC_DNS + + std::for_each(_adnsEvents.begin(), _adnsEvents.end(), + std::bind2nd(std::mem_fun_ref + (&PollEventPoll::ADNSEvent::processEvents), + events)); + +#endif // ENABLE_ASYNC_DNS + +} + +bool PollEventPoll::SocketEntry::eventEmpty() const +{ + +#ifdef ENABLE_ASYNC_DNS + + return _commandEvents.empty() && _adnsEvents.empty(); + +#else // !ENABLE_ASYNC_DNS + + return _commandEvents.empty(); + +#endif // !ENABLE_ASYNC_DNS) + +} + +int accumulateEvent(int events, const PollEventPoll::Event& event) +{ + return events|event.getEvents(); +} + +struct pollfd& PollEventPoll::SocketEntry::getPollEvent() +{ + _pollEvent.fd = _socket; +#ifdef ENABLE_ASYNC_DNS + _pollEvent.events = + std::accumulate(_adnsEvents.begin(), + _adnsEvents.end(), + std::accumulate(_commandEvents.begin(), + _commandEvents.end(), 0, accumulateEvent), + accumulateEvent); +#else // !ENABLE_ASYNC_DNS + _pollEvent.events = + std::accumulate(_commandEvents.begin(), _commandEvents.end(), 0, + accumulateEvent); +#endif // !ENABLE_ASYNC_DNS + return _pollEvent; +} + +#ifdef ENABLE_ASYNC_DNS + +PollEventPoll::AsyncNameResolverEntry::AsyncNameResolverEntry +(const SharedHandle& nameResolver, Command* command): + _nameResolver(nameResolver), _command(command), _socketsSize(0) + +{} + +void PollEventPoll::AsyncNameResolverEntry::addSocketEvents(PollEventPoll* e) +{ + _socketsSize = 0; + int mask = _nameResolver->getsock(_sockets); + if(mask == 0) { + return; + } + size_t i; + for(i = 0; i < ARES_GETSOCK_MAXNUM; ++i) { + int events = 0; + if(ARES_GETSOCK_READABLE(mask, i)) { + events |= PollEventPoll::EVENT_READ; + } + if(ARES_GETSOCK_WRITABLE(mask, i)) { + events |= PollEventPoll::EVENT_WRITE; + } + if(events == 0) { + // assume no further sockets are returned. + break; + } + LogFactory::getInstance()->debug("addSocketEvents, %d", _sockets[i]); + e->addEvents(_sockets[i], _command, events, _nameResolver); + } + _socketsSize = i; +} + +void PollEventPoll::AsyncNameResolverEntry::removeSocketEvents(PollEventPoll* e) +{ + for(size_t i = 0; i < _socketsSize; ++i) { + e->deleteEvents(_sockets[i], _command, _nameResolver); + } +} + +void PollEventPoll::AsyncNameResolverEntry::processTimeout() +{ + _nameResolver->process(ARES_SOCKET_BAD, ARES_SOCKET_BAD); +} + +#endif // ENABLE_ASYNC_DNS + +PollEventPoll::PollEventPoll(): + _pollfdCapacity(1024), _pollfdNum(0), _logger(LogFactory::getInstance()) +{ + _pollfds = new struct pollfd[_pollfdCapacity]; +} + +PollEventPoll::~PollEventPoll() +{ + delete [] _pollfds; +} + +void PollEventPoll::poll(const struct timeval& tv) +{ + // timeout is millisec + int timeout = tv.tv_sec*1000+tv.tv_usec/1000; + int res; + while((res = ::poll(_pollfds, _pollfdNum, timeout)) == -1 && + errno == EINTR); + if(res > 0) { + SharedHandle se(new SocketEntry(0)); + for(struct pollfd* first = _pollfds, *last = _pollfds+_pollfdNum; + first != last; ++first) { + if(first->revents) { + se->setSocket(first->fd); + std::deque >::iterator itr = + std::lower_bound(_socketEntries.begin(), _socketEntries.end(), se); + if(itr != _socketEntries.end() && (*itr) == se) { + (*itr)->processEvents(first->revents); + } else { + if(_logger->debug()) { + _logger->debug("Socket %d is not found in SocketEntries.", + first->fd); + } + } + } + } + } +#ifdef ENABLE_ASYNC_DNS + // It turns out that we have to call ares_process_fd before ares's + // own timeout and ares may create new sockets or closes socket in + // their API. So we call ares_process_fd for all ares_channel and + // re-register their sockets. + for(std::deque >::iterator i = + _nameResolverEntries.begin(), eoi = _nameResolverEntries.end(); + i != eoi; ++i) { + (*i)->processTimeout(); + (*i)->removeSocketEvents(this); + (*i)->addSocketEvents(this); + } +#endif // ENABLE_ASYNC_DNS + + // TODO timeout of name resolver is determined in Command(AbstractCommand, + // DHTEntryPoint...Command) +} + +int PollEventPoll::translateEvents(EventPoll::EventType events) +{ + int newEvents = 0; + if(EventPoll::EVENT_READ&events) { + newEvents |= PollEventPoll::EVENT_READ; + } + if(EventPoll::EVENT_WRITE&events) { + newEvents |= PollEventPoll::EVENT_WRITE; + } + if(EventPoll::EVENT_ERROR&events) { + newEvents |= PollEventPoll::EVENT_ERROR; + } + if(EventPoll::EVENT_HUP&events) { + newEvents |= PollEventPoll::EVENT_HUP; + } + return newEvents; +} + +bool PollEventPoll::addEvents +(sock_t socket, const PollEventPoll::Event& event) +{ + SharedHandle socketEntry(new SocketEntry(socket)); + std::deque >::iterator i = + std::lower_bound(_socketEntries.begin(), _socketEntries.end(), socketEntry); + if(i != _socketEntries.end() && (*i) == socketEntry) { + event.addSelf(*i); + for(struct pollfd* first = _pollfds, *last = _pollfds+_pollfdNum; + first != last; ++first) { + if(first->fd == socket) { + *first = (*i)->getPollEvent(); + break; + } + } + } else { + _socketEntries.insert(i, socketEntry); + event.addSelf(socketEntry); + if(_pollfdCapacity == _pollfdNum) { + _pollfdCapacity *= 2; + struct pollfd* newPollfds = new struct pollfd[_pollfdCapacity]; + memcpy(newPollfds, _pollfds, _pollfdNum*sizeof(struct pollfd)); + delete [] _pollfds; + _pollfds = newPollfds; + } + _pollfds[_pollfdNum] = socketEntry->getPollEvent(); + ++_pollfdNum; + } + return true; +} + +bool PollEventPoll::addEvents +(sock_t socket, Command* command, EventPoll::EventType events) +{ + int pollEvents = translateEvents(events); + return addEvents(socket, CommandEvent(command, pollEvents)); +} + +#ifdef ENABLE_ASYNC_DNS +bool PollEventPoll::addEvents +(sock_t socket, Command* command, int events, + const SharedHandle& rs) +{ + return addEvents(socket, ADNSEvent(rs, command, socket, events)); +} +#endif // ENABLE_ASYNC_DNS + +bool PollEventPoll::deleteEvents +(sock_t socket, const PollEventPoll::Event& event) +{ + SharedHandle socketEntry(new SocketEntry(socket)); + std::deque >::iterator i = + std::lower_bound(_socketEntries.begin(), _socketEntries.end(), socketEntry); + if(i != _socketEntries.end() && (*i) == socketEntry) { + event.removeSelf(*i); + for(struct pollfd* first = _pollfds, *last = _pollfds+_pollfdNum; + first != last; ++first) { + if(first->fd == socket) { + if((*i)->eventEmpty()) { + if(_pollfdNum >= 2) { + *first = *(last-1); + } + --_pollfdNum; + _socketEntries.erase(i); + } else { + *first = (*i)->getPollEvent(); + } + break; + } + } + return true; + } else { + if(_logger->debug()) { + _logger->debug("Socket %d is not found in SocketEntries.", socket); + } + return false; + } +} + +#ifdef ENABLE_ASYNC_DNS +bool PollEventPoll::deleteEvents +(sock_t socket, Command* command, const SharedHandle& rs) +{ + return deleteEvents(socket, ADNSEvent(rs, command, socket, 0)); +} +#endif // ENABLE_ASYNC_DNS + +bool PollEventPoll::deleteEvents +(sock_t socket, Command* command, EventPoll::EventType events) +{ + int pollEvents = translateEvents(events); + return deleteEvents(socket, CommandEvent(command, pollEvents)); +} + +#ifdef ENABLE_ASYNC_DNS +bool PollEventPoll::addNameResolver +(const SharedHandle& resolver, Command* command) +{ + SharedHandle entry + (new AsyncNameResolverEntry(resolver, command)); + std::deque >::iterator itr = + std::find(_nameResolverEntries.begin(), _nameResolverEntries.end(), entry); + if(itr == _nameResolverEntries.end()) { + _nameResolverEntries.push_back(entry); + entry->addSocketEvents(this); + return true; + } else { + return false; + } +} + +bool PollEventPoll::deleteNameResolver +(const SharedHandle& resolver, Command* command) +{ + SharedHandle entry + (new AsyncNameResolverEntry(resolver, command)); + std::deque >::iterator itr = + std::find(_nameResolverEntries.begin(), _nameResolverEntries.end(), entry); + if(itr == _nameResolverEntries.end()) { + return false; + } else { + (*itr)->removeSocketEvents(this); + _nameResolverEntries.erase(itr); + return true; + } +} +#endif // ENABLE_ASYNC_DNS + +} // namespace aria2 diff --git a/src/PollEventPoll.h b/src/PollEventPoll.h new file mode 100644 index 00000000..653d0510 --- /dev/null +++ b/src/PollEventPoll.h @@ -0,0 +1,289 @@ +/* */ +#ifndef _D_POLL_EVENT_POLL_H_ +#define _D_POLL_EVENT_POLL_H_ + +#include "EventPoll.h" + +# include + +#include + +#ifdef ENABLE_ASYNC_DNS +# include "AsyncNameResolver.h" +#endif // ENABLE_ASYNC_DNS + +namespace aria2 { + +class Logger; + +class PollEventPoll : public EventPoll { + +private: + + class SocketEntry; + + class Event { + public: + virtual ~Event() {} + + virtual void processEvents(int events) = 0; + + virtual int getEvents() const = 0; + + virtual void addSelf(const SharedHandle& socketEntry) const =0; + + virtual void removeSelf + (const SharedHandle& socketEntry) const =0; + }; + + friend int accumulateEvent(int events, const Event& event); + + class CommandEvent : public Event { + private: + Command* _command; + int _events; + public: + CommandEvent(Command* command, int events); + + Command* getCommand() const + { + return _command; + } + + void addEvents(int events) + { + _events |= events; + } + + void removeEvents(int events) + { + _events &= (~events); + } + + bool eventsEmpty() const + { + return _events == 0; + } + + bool operator==(const CommandEvent& commandEvent) const + { + return _command == commandEvent._command; + } + + virtual int getEvents() const; + + virtual void processEvents(int events); + + virtual void addSelf(const SharedHandle& socketEntry) const; + + virtual void removeSelf(const SharedHandle& socketEntry) const; + }; + +#ifdef ENABLE_ASYNC_DNS + + class ADNSEvent : public Event { + private: + SharedHandle _resolver; + Command* _command; + sock_t _socket; + int _events; + public: + ADNSEvent(const SharedHandle& resolver, Command* command, + sock_t socket, int events); + + bool operator==(const ADNSEvent& event) const + { + return _resolver == event._resolver; + } + + virtual int getEvents() const; + + virtual void processEvents(int events); + + virtual void addSelf(const SharedHandle& socketEntry) const; + + virtual void removeSelf(const SharedHandle& socketEntry) const; + }; + +#endif // ENABLE_ASYNC_DNS + + class SocketEntry { + private: + sock_t _socket; + + std::deque _commandEvents; + +#ifdef ENABLE_ASYNC_DNS + + std::deque _adnsEvents; + +#endif // ENABLE_ASYNC_DNS + + struct pollfd _pollEvent; + + public: + SocketEntry(sock_t socket); + + bool operator==(const SocketEntry& entry) const + { + return _socket == entry._socket; + } + + bool operator<(const SocketEntry& entry) const + { + return _socket < entry._socket; + } + + void addCommandEvent(const CommandEvent& cev); + + void removeCommandEvent(const CommandEvent& cev); + +#ifdef ENABLE_ASYNC_DNS + + void addADNSEvent(const ADNSEvent& aev); + + void removeADNSEvent(const ADNSEvent& aev); + +#endif // ENABLE_ASYNC_DNS + + struct pollfd& getPollEvent(); + + sock_t getSocket() const + { + return _socket; + } + + void setSocket(sock_t socket) + { + _socket = socket; + } + + bool eventEmpty() const; + + void processEvents(int events); + }; + +#ifdef ENABLE_ASYNC_DNS + + class AsyncNameResolverEntry { + private: + SharedHandle _nameResolver; + + Command* _command; + + size_t _socketsSize; + + sock_t _sockets[ARES_GETSOCK_MAXNUM]; + + public: + AsyncNameResolverEntry(const SharedHandle& nameResolver, + Command* command); + + bool operator==(const AsyncNameResolverEntry& entry) + { + return _nameResolver == entry._nameResolver && + _command == entry._command; + } + + void addSocketEvents(PollEventPoll* socketPoll); + + void removeSocketEvents(PollEventPoll* socketPoll); + + // Calls AsyncNameResolver::process(ARES_SOCKET_BAD, + // ARES_SOCKET_BAD). + void processTimeout(); + }; + +#endif // ENABLE_ASYNC_DNS +private: + enum PollEventType { + EVENT_READ = POLLIN, + EVENT_WRITE = POLLOUT, + EVENT_ERROR = POLLERR, + EVENT_HUP = POLLHUP, + }; + + std::deque > _socketEntries; +#ifdef ENABLE_ASYNC_DNS + std::deque > _nameResolverEntries; +#endif // ENABLE_ASYNC_DNS + + // Allocated the number of struct pollfd in _pollfds. + int _pollfdCapacity; + + // The number of valid struct pollfd in _pollfds. + int _pollfdNum; + + struct pollfd* _pollfds; + + Logger* _logger; + + bool addEvents(sock_t socket, const Event& event); + + bool deleteEvents(sock_t socket, const Event& event); + + bool addEvents(sock_t socket, Command* command, int events, + const SharedHandle& rs); + + bool deleteEvents(sock_t socket, Command* command, + const SharedHandle& rs); + + static int translateEvents(EventPoll::EventType events); +public: + PollEventPoll(); + + virtual ~PollEventPoll(); + + virtual void poll(const struct timeval& tv); + + virtual bool addEvents(sock_t socket, + Command* command, EventPoll::EventType events); + + virtual bool deleteEvents(sock_t socket, + Command* command, EventPoll::EventType events); +#ifdef ENABLE_ASYNC_DNS + + virtual bool addNameResolver(const SharedHandle& resolver, + Command* command); + virtual bool deleteNameResolver + (const SharedHandle& resolver, Command* command); +#endif // ENABLE_ASYNC_DNS + +}; + +} // namespace aria2 + +#endif // _D_POLL_EVENT_POLL_H_ diff --git a/src/SocketCore.cc b/src/SocketCore.cc index 0347c39d..58668cae 100644 --- a/src/SocketCore.cc +++ b/src/SocketCore.cc @@ -596,7 +596,8 @@ bool SocketCore::isWritable(time_t timeout) } struct epoll_event epEvents[1]; int r; - while((r = epoll_wait(_epfd, epEvents, 1, 0)) == -1 && errno == EINTR); + while((r = epoll_wait(_epfd, epEvents, 1, timeout*1000)) == -1 && + errno == EINTR); if(r > 0) { return epEvents[0].events&(EPOLLOUT|EPOLLHUP|EPOLLERR); } else if(r == 0) { @@ -606,31 +607,49 @@ bool SocketCore::isWritable(time_t timeout) } } else #endif // HAVE_EPOLL - if(_pollMethod == SocketCore::POLL_METHOD_SELECT) { - fd_set fds; - FD_ZERO(&fds); - FD_SET(sockfd, &fds); - - struct timeval tv; - tv.tv_sec = timeout; - tv.tv_usec = 0; - - int r = select(sockfd+1, NULL, &fds, NULL, &tv); - if(r == 1) { - return true; +#ifdef HAVE_POLL + if(_pollMethod == SocketCore::POLL_METHOD_POLL) { + struct pollfd p; + p.fd = sockfd; + p.events = POLLOUT; + int r; + while((r = poll(&p, 1, timeout*1000)) == -1 && errno == EINTR); + if(r > 0) { + return p.revents&(POLLOUT|POLLHUP|POLLERR); } else if(r == 0) { - // time out return false; } else { - if(SOCKET_ERRNO == A2_EINPROGRESS || SOCKET_ERRNO == A2_EINTR) { + throw DL_RETRY_EX + (StringFormat(EX_SOCKET_CHECK_WRITABLE, errorMsg()).str()); + } + } else +#endif // HAVE_POLL + if(_pollMethod == SocketCore::POLL_METHOD_SELECT) { + fd_set fds; + FD_ZERO(&fds); + FD_SET(sockfd, &fds); + + struct timeval tv; + tv.tv_sec = timeout; + tv.tv_usec = 0; + + int r = select(sockfd+1, NULL, &fds, NULL, &tv); + if(r == 1) { + return true; + } else if(r == 0) { + // time out return false; } else { - throw DL_RETRY_EX(StringFormat(EX_SOCKET_CHECK_WRITABLE, errorMsg()).str()); + if(SOCKET_ERRNO == A2_EINPROGRESS || SOCKET_ERRNO == A2_EINTR) { + return false; + } else { + throw DL_RETRY_EX + (StringFormat(EX_SOCKET_CHECK_WRITABLE, errorMsg()).str()); + } } + } else { + abort(); } - } else { - abort(); - } } bool SocketCore::isReadable(time_t timeout) @@ -648,7 +667,8 @@ bool SocketCore::isReadable(time_t timeout) } struct epoll_event epEvents[1]; int r; - while((r = epoll_wait(_epfd, epEvents, 1, 0)) == -1 && errno == EINTR); + while((r = epoll_wait(_epfd, epEvents, 1, timeout*1000)) == -1 && + errno == EINTR); if(r > 0) { return epEvents[0].events&(EPOLLIN|EPOLLHUP|EPOLLERR); @@ -659,31 +679,49 @@ bool SocketCore::isReadable(time_t timeout) } } else #endif // HAVE_EPOLL - if(_pollMethod == SocketCore::POLL_METHOD_SELECT) { - fd_set fds; - FD_ZERO(&fds); - FD_SET(sockfd, &fds); - - struct timeval tv; - tv.tv_sec = timeout; - tv.tv_usec = 0; - - int r = select(sockfd+1, &fds, NULL, NULL, &tv); - if(r == 1) { - return true; +#ifdef HAVE_POLL + if(_pollMethod == SocketCore::POLL_METHOD_POLL) { + struct pollfd p; + p.fd = sockfd; + p.events = POLLIN; + int r; + while((r = poll(&p, 1, timeout*1000)) == -1 && errno == EINTR); + if(r > 0) { + return p.revents&(POLLIN|POLLHUP|POLLERR); } else if(r == 0) { - // time out return false; } else { - if(SOCKET_ERRNO == A2_EINPROGRESS || SOCKET_ERRNO == A2_EINTR) { + throw DL_RETRY_EX + (StringFormat(EX_SOCKET_CHECK_READABLE, errorMsg()).str()); + } + } else +#endif // HAVE_POLL + if(_pollMethod == SocketCore::POLL_METHOD_SELECT) { + fd_set fds; + FD_ZERO(&fds); + FD_SET(sockfd, &fds); + + struct timeval tv; + tv.tv_sec = timeout; + tv.tv_usec = 0; + + int r = select(sockfd+1, &fds, NULL, NULL, &tv); + if(r == 1) { + return true; + } else if(r == 0) { + // time out return false; } else { - throw DL_RETRY_EX(StringFormat(EX_SOCKET_CHECK_READABLE, errorMsg()).str()); + if(SOCKET_ERRNO == A2_EINPROGRESS || SOCKET_ERRNO == A2_EINTR) { + return false; + } else { + throw DL_RETRY_EX + (StringFormat(EX_SOCKET_CHECK_READABLE, errorMsg()).str()); + } } + } else { + abort(); } - } else { - abort(); - } } #ifdef HAVE_LIBSSL @@ -1236,6 +1274,13 @@ void SocketCore::useEpoll() } #endif // HAVE_EPOLL +#ifdef HAVE_POLL +void SocketCore::usePoll() +{ + _pollMethod = SocketCore::POLL_METHOD_POLL; +} +#endif // HAVE_POLL + void SocketCore::useSelect() { _pollMethod = SocketCore::POLL_METHOD_SELECT; diff --git a/src/SocketCore.h b/src/SocketCore.h index dc183e8c..c864204b 100644 --- a/src/SocketCore.h +++ b/src/SocketCore.h @@ -88,7 +88,9 @@ private: #endif // HAVE_EPOLL enum PollMethod { - POLL_METHOD_EPOLL, POLL_METHOD_SELECT + POLL_METHOD_EPOLL, + POLL_METHOD_POLL, + POLL_METHOD_SELECT }; static PollMethod _pollMethod; @@ -360,6 +362,9 @@ public: #ifdef HAVE_EPOLL static void useEpoll(); #endif // HAVE_EPOLL +#ifdef HAVE_POLL + static void usePoll(); +#endif // HAVE_POLL static void useSelect(); #ifdef ENABLE_SSL diff --git a/src/a2io.h b/src/a2io.h index 11b2398c..ac0ca4d9 100644 --- a/src/a2io.h +++ b/src/a2io.h @@ -40,7 +40,9 @@ #include #include #include - +#ifdef HAVE_POLL_H +# include +#endif // HAVE_POLL_H #ifdef HAVE_IO_H # include #endif // HAVE_IO_H diff --git a/src/main.cc b/src/main.cc index 242dcaa5..945c2e4f 100644 --- a/src/main.cc +++ b/src/main.cc @@ -191,9 +191,14 @@ downloadresultcode::RESULT main(int argc, char* argv[]) SocketCore::useEpoll(); } else #endif // HAVE_EPOLL - if(op->get(PREF_EVENT_POLL) == V_SELECT) { - SocketCore::useSelect(); - } +#ifdef HAVE_POLL + if(op->get(PREF_EVENT_POLL) == V_POLL) { + SocketCore::usePoll(); + } else +#endif // HAVE_POLL + if(op->get(PREF_EVENT_POLL) == V_SELECT) { + SocketCore::useSelect(); + } downloadresultcode::RESULT exitStatus = downloadresultcode::FINISHED; Logger* logger = LogFactory::getInstance(); diff --git a/src/prefs.cc b/src/prefs.cc index a0723de5..cb7f9395 100644 --- a/src/prefs.cc +++ b/src/prefs.cc @@ -150,6 +150,7 @@ const std::string PREF_MAX_FILE_NOT_FOUND("max-file-not-found"); // value: epoll | select const std::string PREF_EVENT_POLL("event-poll"); const std::string V_EPOLL("epoll"); +const std::string V_POLL("poll"); const std::string V_SELECT("select"); // value: 1*digit const std::string PREF_XML_RPC_LISTEN_PORT("xml-rpc-listen-port"); diff --git a/src/prefs.h b/src/prefs.h index 748d8b0a..ac5d584e 100644 --- a/src/prefs.h +++ b/src/prefs.h @@ -154,6 +154,7 @@ extern const std::string PREF_MAX_FILE_NOT_FOUND; // value: epoll | select extern const std::string PREF_EVENT_POLL; extern const std::string V_EPOLL; +extern const std::string V_POLL; extern const std::string V_SELECT; // value: 1*digit extern const std::string PREF_XML_RPC_LISTEN_PORT;