From 9cf05b71154ed8f999344856ee07b2549abc6716 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Wed, 21 Apr 2010 14:31:44 +0000 Subject: [PATCH] 2010-04-21 Tatsuhiro Tsujikawa Added opensolaris port_associate() support. * src/DownloadEngineFactory.cc * src/Makefile.am * src/OptionHandlerFactory.cc * src/PortEventPoll.cc * src/PortEventPoll.h * src/SocketCore.cc * src/SocketCore.h * src/configure.ac * src/main.cc * src/prefs.cc * src/prefs.h --- ChangeLog | 15 ++ config.h.in | 6 + configure | 27 ++++ configure.ac | 4 + src/DownloadEngineFactory.cc | 31 ++-- src/Makefile.am | 4 + src/Makefile.in | 15 +- src/OptionHandlerFactory.cc | 7 +- src/PortEventPoll.cc | 297 +++++++++++++++++++++++++++++++++++ src/PortEventPoll.h | 136 ++++++++++++++++ src/SocketCore.cc | 196 +++++++++++++++-------- src/SocketCore.h | 13 +- src/main.cc | 20 ++- src/prefs.cc | 1 + src/prefs.h | 1 + 15 files changed, 681 insertions(+), 92 deletions(-) create mode 100644 src/PortEventPoll.cc create mode 100644 src/PortEventPoll.h diff --git a/ChangeLog b/ChangeLog index 082eb5a4..ae842c53 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,18 @@ +2010-04-21 Tatsuhiro Tsujikawa + + Added opensolaris port_associate() support. + * src/DownloadEngineFactory.cc + * src/Makefile.am + * src/OptionHandlerFactory.cc + * src/PortEventPoll.cc + * src/PortEventPoll.h + * src/SocketCore.cc + * src/SocketCore.h + * src/configure.ac + * src/main.cc + * src/prefs.cc + * src/prefs.h + 2010-04-21 Tatsuhiro Tsujikawa Fixed the bug that _e is passed where e should be passed. diff --git a/config.h.in b/config.h.in index fe6e30ef..82492948 100644 --- a/config.h.in +++ b/config.h.in @@ -311,6 +311,12 @@ /* Define to 1 if you have the header file. */ #undef HAVE_POLL_H +/* Define to 1 if you have the `port_associate' function. */ +#undef HAVE_PORT_ASSOCIATE + +/* Define to 1 if you have the header file. */ +#undef HAVE_PORT_H + /* Define to 1 if you have the `posix_fallocate' function. */ #undef HAVE_POSIX_FALLOCATE diff --git a/configure b/configure index afd2b23c..10b5591b 100755 --- a/configure +++ b/configure @@ -600,6 +600,8 @@ ac_func_list= ac_subst_vars='am__EXEEXT_FALSE am__EXEEXT_TRUE LTLIBOBJS +HAVE_PORT_ASSOCIATE_FALSE +HAVE_PORT_ASSOCIATE_TRUE HAVE_TIMEGETTIME_FALSE HAVE_TIMEGETTIME_TRUE HAVE_POLL_FALSE @@ -7956,6 +7958,7 @@ for ac_header in argz.h \ netdb.h \ netinet/in.h \ poll.h \ + port.h \ stddef.h \ stdint.h \ stdio_ext.h \ @@ -15148,6 +15151,26 @@ fi ;; esac +for ac_func in port_associate +do : + ac_fn_cxx_check_func "$LINENO" "port_associate" "ac_cv_func_port_associate" +if test "x$ac_cv_func_port_associate" = x""yes; then : + cat >>confdefs.h <<_ACEOF +#define HAVE_PORT_ASSOCIATE 1 +_ACEOF + have_port_associate=yes +fi +done + + if test "x$have_port_associate" = "xyes"; then + HAVE_PORT_ASSOCIATE_TRUE= + HAVE_PORT_ASSOCIATE_FALSE='#' +else + HAVE_PORT_ASSOCIATE_TRUE='#' + HAVE_PORT_ASSOCIATE_FALSE= +fi + + ac_fn_cxx_check_member "$LINENO" "struct sockaddr_in" "sin_len" "ac_cv_member_struct_sockaddr_in_sin_len" "#include " if test "x$ac_cv_member_struct_sockaddr_in_sin_len" = x""yes; then : @@ -15498,6 +15521,10 @@ if test -z "${HAVE_TIMEGETTIME_TRUE}" && test -z "${HAVE_TIMEGETTIME_FALSE}"; th as_fn_error "conditional \"HAVE_TIMEGETTIME\" was never defined. Usually this means the macro was only invoked conditionally." "$LINENO" 5 fi +if test -z "${HAVE_PORT_ASSOCIATE_TRUE}" && test -z "${HAVE_PORT_ASSOCIATE_FALSE}"; then + as_fn_error "conditional \"HAVE_PORT_ASSOCIATE\" was never defined. +Usually this means the macro was only invoked conditionally." "$LINENO" 5 +fi : ${CONFIG_STATUS=./config.status} ac_write_fail=0 diff --git a/configure.ac b/configure.ac index c01f4a75..e564100b 100644 --- a/configure.ac +++ b/configure.ac @@ -249,6 +249,7 @@ AC_CHECK_HEADERS([argz.h \ netdb.h \ netinet/in.h \ poll.h \ + port.h \ stddef.h \ stdint.h \ stdio_ext.h \ @@ -413,6 +414,9 @@ case "$target" in ;; esac +AC_CHECK_FUNCS([port_associate], [have_port_associate=yes]) +AM_CONDITIONAL([HAVE_PORT_ASSOCIATE], [test "x$have_port_associate" = "xyes"]) + AC_CHECK_MEMBER([struct sockaddr_in.sin_len], [AC_DEFINE([HAVE_SOCKADDR_IN_SIN_LEN],[1], [Define to 1 if struct sockaddr_in has sin_len member.])], diff --git a/src/DownloadEngineFactory.cc b/src/DownloadEngineFactory.cc index 788cf270..34e7f6b8 100644 --- a/src/DownloadEngineFactory.cc +++ b/src/DownloadEngineFactory.cc @@ -61,6 +61,9 @@ #ifdef HAVE_EPOLL # include "EpollEventPoll.h" #endif // HAVE_EPOLL +#ifdef HAVE_PORT_ASSOCIATE +# include "PortEventPoll.h" +#endif // HAVE_PORT_ASSOCIATE #include "PollEventPoll.h" #include "SelectEventPoll.h" #include "DlAbortEx.h" @@ -81,8 +84,9 @@ DownloadEngineFactory::newDownloadEngine const size_t MAX_CONCURRENT_DOWNLOADS = op->getAsInt(PREF_MAX_CONCURRENT_DOWNLOADS); SharedHandle eventPoll; + const std::string& pollMethod = op->get(PREF_EVENT_POLL); #ifdef HAVE_EPOLL - if(op->get(PREF_EVENT_POLL) == V_EPOLL) { + if(pollMethod == V_EPOLL) { SharedHandle ep(new EpollEventPoll()); if(ep->good()) { eventPoll = ep; @@ -92,16 +96,25 @@ DownloadEngineFactory::newDownloadEngine } } else #endif // HAVE_EPLL -#ifdef HAVE_POLL - if(op->get(PREF_EVENT_POLL) == V_POLL) { - eventPoll.reset(new PollEventPoll()); - } else -#endif // HAVE_POLL - if(op->get(PREF_EVENT_POLL) == V_SELECT) { - eventPoll.reset(new SelectEventPoll()); + if(pollMethod == V_PORT) { + SharedHandle pp(new PortEventPoll()); + if(pp->good()) { + eventPoll = pp; } else { - abort(); + throw DL_ABORT_EX("Initializing PortEventPoll failed." + " Try --event-poll=select"); } + } else +#ifdef HAVE_POLL + if(pollMethod == V_POLL) { + eventPoll.reset(new PollEventPoll()); + } else +#endif // HAVE_POLL + if(pollMethod == V_SELECT) { + eventPoll.reset(new SelectEventPoll()); + } else { + abort(); + } DownloadEngineHandle e(new DownloadEngine(eventPoll)); e->option = op; diff --git a/src/Makefile.am b/src/Makefile.am index 404758b2..7bf45cfb 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -530,6 +530,10 @@ if HAVE_POLL SRCS += PollEventPoll.cc PollEventPoll.h endif # HAVE_POLL +if HAVE_PORT_ASSOCIATE +SRCS += PortEventPoll.cc PortEventPoll.h +endif # HAVE_PORT_ASSOCIATE + noinst_LIBRARIES = libaria2c.a libaria2c_a_SOURCES = $(SRCS) aria2c_LDADD = libaria2c.a @LIBINTL@ @ALLOCA@ @LIBGNUTLS_LIBS@\ diff --git a/src/Makefile.in b/src/Makefile.in index d62acd7f..e4e69150 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -280,6 +280,7 @@ bin_PROGRAMS = aria2c$(EXEEXT) @HAVE_TIMEGETTIME_TRUE@am__append_27 = clock_gettime_mingw.cc clock_gettime_mingw.h @HAVE_MACH_ABSOLUTE_TIME_TRUE@am__append_28 = clock_gettime_osx.cc clock_gettime_osx.h @HAVE_POLL_TRUE@am__append_29 = PollEventPoll.cc PollEventPoll.h +@HAVE_PORT_ASSOCIATE_TRUE@am__append_30 = PortEventPoll.cc PortEventPoll.h subdir = src DIST_COMMON = $(srcdir)/Makefile.am $(srcdir)/Makefile.in alloca.c ACLOCAL_M4 = $(top_srcdir)/aclocal.m4 @@ -607,7 +608,8 @@ am__libaria2c_a_SOURCES_DIST = Socket.h SocketCore.cc SocketCore.h \ inet_aton.h localtime_r.c localtime_r.h strptime.c strptime.h \ timegm.c timegm.h daemon.cc daemon.h clock_gettime_mingw.cc \ clock_gettime_mingw.h clock_gettime_osx.cc clock_gettime_osx.h \ - PollEventPoll.cc PollEventPoll.h + PollEventPoll.cc PollEventPoll.h PortEventPoll.cc \ + PortEventPoll.h @ENABLE_XML_RPC_TRUE@am__objects_1 = \ @ENABLE_XML_RPC_TRUE@ XmlRpcRequestParserController.$(OBJEXT) \ @ENABLE_XML_RPC_TRUE@ XmlRpcRequestParserStateMachine.$(OBJEXT) \ @@ -789,7 +791,8 @@ am__objects_6 = @HAVE_MACH_ABSOLUTE_TIME_TRUE@am__objects_28 = \ @HAVE_MACH_ABSOLUTE_TIME_TRUE@ clock_gettime_osx.$(OBJEXT) @HAVE_POLL_TRUE@am__objects_29 = PollEventPoll.$(OBJEXT) -am__objects_30 = SocketCore.$(OBJEXT) Command.$(OBJEXT) \ +@HAVE_PORT_ASSOCIATE_TRUE@am__objects_30 = PortEventPoll.$(OBJEXT) +am__objects_31 = SocketCore.$(OBJEXT) Command.$(OBJEXT) \ AbstractCommand.$(OBJEXT) \ InitiateConnectionCommandFactory.$(OBJEXT) \ DownloadCommand.$(OBJEXT) \ @@ -876,8 +879,9 @@ am__objects_30 = SocketCore.$(OBJEXT) Command.$(OBJEXT) \ $(am__objects_18) $(am__objects_19) $(am__objects_20) \ $(am__objects_21) $(am__objects_22) $(am__objects_23) \ $(am__objects_24) $(am__objects_25) $(am__objects_26) \ - $(am__objects_27) $(am__objects_28) $(am__objects_29) -am_libaria2c_a_OBJECTS = $(am__objects_30) + $(am__objects_27) $(am__objects_28) $(am__objects_29) \ + $(am__objects_30) +am_libaria2c_a_OBJECTS = $(am__objects_31) libaria2c_a_OBJECTS = $(am_libaria2c_a_OBJECTS) am__installdirs = "$(DESTDIR)$(bindir)" PROGRAMS = $(bin_PROGRAMS) @@ -1213,7 +1217,7 @@ SRCS = Socket.h SocketCore.cc SocketCore.h BinaryStream.h Command.cc \ $(am__append_19) $(am__append_20) $(am__append_21) \ $(am__append_22) $(am__append_23) $(am__append_24) \ $(am__append_25) $(am__append_26) $(am__append_27) \ - $(am__append_28) $(am__append_29) + $(am__append_28) $(am__append_29) $(am__append_30) noinst_LIBRARIES = libaria2c.a libaria2c_a_SOURCES = $(SRCS) aria2c_LDADD = libaria2c.a @LIBINTL@ @ALLOCA@ @LIBGNUTLS_LIBS@\ @@ -1553,6 +1557,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/PiecedSegment.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Platform.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/PollEventPoll.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/PortEventPoll.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/PriorityPieceSelector.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ProtocolDetector.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/RangeBtMessage.Po@am__quote@ diff --git a/src/OptionHandlerFactory.cc b/src/OptionHandlerFactory.cc index 94d64798..6187056b 100644 --- a/src/OptionHandlerFactory.cc +++ b/src/OptionHandlerFactory.cc @@ -210,6 +210,9 @@ OptionHandlers OptionHandlerFactory::createOptionHandlers() #ifdef HAVE_EPOLL V_EPOLL, #endif // HAVE_EPOLL +#ifdef HAVE_PORT_ASSOCIATE + V_PORT, +#endif // HAVE_PORT_ASSOCIATE #ifdef HAVE_POLL V_POLL, #endif // HAVE_POLL @@ -220,7 +223,9 @@ OptionHandlers OptionHandlerFactory::createOptionHandlers() TEXT_EVENT_POLL, #ifdef HAVE_EPOLL V_EPOLL, -#else // !HAVE_EPOLL +#elif HAVE_PORT_ASSOCIATE + V_PORT, +#else V_SELECT, #endif // !HAVE_EPOLL std::vector diff --git a/src/PortEventPoll.cc b/src/PortEventPoll.cc new file mode 100644 index 00000000..766ee4ab --- /dev/null +++ b/src/PortEventPoll.cc @@ -0,0 +1,297 @@ +/* */ +#include "PortEventPoll.h" + +#include +#include +#include + +#include "Command.h" +#include "LogFactory.h" +#include "Logger.h" + +namespace aria2 { + +PortEventPoll::KSocketEntry::KSocketEntry(sock_t s): + SocketEntry(s) {} + +int accumulateEvent(int events, const PortEventPoll::KEvent& event) +{ + return events|event.getEvents(); +} + +PortEventPoll::A2PortEvent PortEventPoll::KSocketEntry::getEvents() +{ + A2PortEvent portEvent; + portEvent.socketEntry = this; +#ifdef ENABLE_ASYNC_DNS + portEvent.events = + std::accumulate(_adnsEvents.begin(), + _adnsEvents.end(), + std::accumulate(_commandEvents.begin(), + _commandEvents.end(), 0, accumulateEvent), + accumulateEvent); +#else // !ENABLE_ASYNC_DNS + portEvent.events = + std::accumulate(_commandEvents.begin(), _commandEvents.end(), 0, + accumulateEvent); + +#endif // !ENABLE_ASYNC_DNS + return portEvent; +} + +PortEventPoll::PortEventPoll(): + _portEventsSize(PORT_EVENTS_SIZE), + _portEvents(new port_event_t[_portEventsSize]), + _logger(LogFactory::getInstance()) +{ + _port = port_create(); +} + +PortEventPoll::~PortEventPoll() +{ + if(_port != -1) { + int r; + while((r = close(_port)) == -1 && errno == EINTR); + if(r == -1) { + _logger->error("Error occurred while closing port %d: %s", + _port, strerror(errno)); + } + } + delete [] _portEvents; +} + +bool PortEventPoll::good() const +{ + return _port != -1; +} + +void PortEventPoll::poll(const struct timeval& tv) +{ + struct timespec timeout = { tv.tv_sec, tv.tv_usec*1000 }; + int res; + uint_t nget = 1; + // If port_getn was interrupted by signal, it can consume events but + // not updat nget!. For this very annoying bug, we have to check + // actually event is filled or not. + _portEvents[0].portev_user = (void*)-1; + res = port_getn(_port, _portEvents, _portEventsSize, &nget, &timeout); + if(res == 0 || + (res == -1 && (errno == ETIME || errno == EINTR) && + _portEvents[0].portev_user != (void*)-1)) { + if(_logger->debug()) { + _logger->debug("nget=%u", nget); + } + for(uint_t i = 0; i < nget; ++i) { + const port_event_t& pev = _portEvents[i]; + KSocketEntry* p = reinterpret_cast(pev.portev_user); + p->processEvents(pev.portev_events); + int r = port_associate(_port, PORT_SOURCE_FD, pev.portev_object, + p->getEvents().events, p); + if(r == -1) { + _logger->error("port_associate failed for file descriptor %d: cause %s", + pev.portev_object, strerror(errno)); + } + } + } +#ifdef ENABLE_ASYNC_DNS + // It turns out that we have to call ares_process_fd before ares's + // own timeout and ares may create new sockets or closes socket in + // their API. So we call ares_process_fd for all ares_channel and + // re-register their sockets. + for(std::deque >::iterator i = + _nameResolverEntries.begin(), eoi = _nameResolverEntries.end(); + i != eoi; ++i) { + (*i)->processTimeout(); + (*i)->removeSocketEvents(this); + (*i)->addSocketEvents(this); + } +#endif // ENABLE_ASYNC_DNS + + // TODO timeout of name resolver is determined in Command(AbstractCommand, + // DHTEntryPoint...Command) +} + +static int translateEvents(EventPoll::EventType events) +{ + int newEvents = 0; + if(EventPoll::EVENT_READ&events) { + newEvents |= PortEventPoll::IEV_READ; + } + if(EventPoll::EVENT_WRITE&events) { + newEvents |= PortEventPoll::IEV_WRITE; + } + if(EventPoll::EVENT_ERROR&events) { + newEvents |= PortEventPoll::IEV_ERROR; + } + if(EventPoll::EVENT_HUP&events) { + newEvents |= PortEventPoll::IEV_HUP; + } + return newEvents; +} + +bool PortEventPoll::addEvents(sock_t socket, + const PortEventPoll::KEvent& event) +{ + SharedHandle socketEntry(new KSocketEntry(socket)); + std::deque >::iterator i = + std::lower_bound(_socketEntries.begin(), _socketEntries.end(), socketEntry); + int r = 0; + if(i != _socketEntries.end() && (*i) == socketEntry) { + event.addSelf(*i); + A2PortEvent pv = (*i)->getEvents(); + r = port_associate(_port, PORT_SOURCE_FD, (*i)->getSocket(), + pv.events, pv.socketEntry); + } else { + _socketEntries.insert(i, socketEntry); + if(_socketEntries.size() > _portEventsSize) { + _portEventsSize *= 2; + delete [] _portEvents; + _portEvents = new port_event_t[_portEventsSize]; + } + event.addSelf(socketEntry); + A2PortEvent pv = socketEntry->getEvents(); + r = port_associate(_port, PORT_SOURCE_FD, socketEntry->getSocket(), + pv.events, pv.socketEntry); + } + if(r == -1) { + if(_logger->debug()) { + _logger->debug("Failed to add socket event %d:%s", + socket, strerror(errno)); + } + return false; + } else { + return true; + } +} + +bool PortEventPoll::addEvents(sock_t socket, Command* command, + EventPoll::EventType events) +{ + int portEvents = translateEvents(events); + return addEvents(socket, KCommandEvent(command, portEvents)); +} + +#ifdef ENABLE_ASYNC_DNS +bool PortEventPoll::addEvents(sock_t socket, Command* command, int events, + const SharedHandle& rs) +{ + return addEvents(socket, KADNSEvent(rs, command, socket, events)); +} +#endif // ENABLE_ASYNC_DNS + +bool PortEventPoll::deleteEvents(sock_t socket, + const PortEventPoll::KEvent& event) +{ + SharedHandle socketEntry(new KSocketEntry(socket)); + std::deque >::iterator i = + std::lower_bound(_socketEntries.begin(), _socketEntries.end(), socketEntry); + if(i != _socketEntries.end() && (*i) == socketEntry) { + event.removeSelf(*i); + int r = 0; + if((*i)->eventEmpty()) { + r = port_dissociate(_port, PORT_SOURCE_FD, (*i)->getSocket()); + _socketEntries.erase(i); + } else { + A2PortEvent pv = (*i)->getEvents(); + r = port_associate(_port, PORT_SOURCE_FD, (*i)->getSocket(), + pv.events, pv.socketEntry); + } + if(r == -1) { + if(_logger->debug()) { + _logger->debug("Failed to delete socket event:%s", strerror(errno)); + } + return false; + } else { + return true; + } + } else { + if(_logger->debug()) { + _logger->debug("Socket %d is not found in SocketEntries.", socket); + } + return false; + } +} + +#ifdef ENABLE_ASYNC_DNS +bool PortEventPoll::deleteEvents(sock_t socket, Command* command, + const SharedHandle& rs) +{ + return deleteEvents(socket, KADNSEvent(rs, command, socket, 0)); +} +#endif // ENABLE_ASYNC_DNS + +bool PortEventPoll::deleteEvents(sock_t socket, Command* command, + EventPoll::EventType events) +{ + int portEvents = translateEvents(events); + return deleteEvents(socket, KCommandEvent(command, portEvents)); +} + +#ifdef ENABLE_ASYNC_DNS +bool PortEventPoll::addNameResolver +(const SharedHandle& resolver, Command* command) +{ + SharedHandle entry + (new KAsyncNameResolverEntry(resolver, command)); + std::deque >::iterator itr = + std::find(_nameResolverEntries.begin(), _nameResolverEntries.end(), entry); + if(itr == _nameResolverEntries.end()) { + _nameResolverEntries.push_back(entry); + entry->addSocketEvents(this); + return true; + } else { + return false; + } +} + +bool PortEventPoll::deleteNameResolver +(const SharedHandle& resolver, Command* command) +{ + SharedHandle entry + (new KAsyncNameResolverEntry(resolver, command)); + std::deque >::iterator itr = + std::find(_nameResolverEntries.begin(), _nameResolverEntries.end(), entry); + if(itr == _nameResolverEntries.end()) { + return false; + } else { + (*itr)->removeSocketEvents(this); + _nameResolverEntries.erase(itr); + return true; + } +} +#endif // ENABLE_ASYNC_DNS + +} // namespace aria2 diff --git a/src/PortEventPoll.h b/src/PortEventPoll.h new file mode 100644 index 00000000..1759d44f --- /dev/null +++ b/src/PortEventPoll.h @@ -0,0 +1,136 @@ +/* */ +#ifndef _D_PORT_EVENT_POLL_H_ +#define _D_PORT_EVENT_POLL_H_ + +#include "EventPoll.h" + +#ifdef HAVE_PORT_H +# include +#endif // HAVE_PORT_H + +#include + +#include "Event.h" +#ifdef ENABLE_ASYNC_DNS +# include "AsyncNameResolver.h" +#endif // ENABLE_ASYNC_DNS + +namespace aria2 { + +class Logger; + +class PortEventPoll : public EventPoll { +private: + class KSocketEntry; + + typedef Event KEvent; + typedef CommandEvent KCommandEvent; + typedef ADNSEvent KADNSEvent; + typedef AsyncNameResolverEntry KAsyncNameResolverEntry; + friend class AsyncNameResolverEntry; + + struct A2PortEvent { + int events; + KSocketEntry* socketEntry; + }; + + class KSocketEntry: + public SocketEntry { + public: + KSocketEntry(sock_t socket); + + A2PortEvent getEvents(); + }; + + friend int accumulateEvent(int events, const KEvent& event); + +private: + std::deque > _socketEntries; +#ifdef ENABLE_ASYNC_DNS + std::deque > _nameResolverEntries; +#endif // ENABLE_ASYNC_DNS + + int _port; + + size_t _portEventsSize; + + port_event_t* _portEvents; + + static const size_t PORT_EVENTS_SIZE = 1024; + + Logger* _logger; + + bool addEvents(sock_t socket, const KEvent& event); + + bool deleteEvents(sock_t socket, const KEvent& event); + + bool addEvents(sock_t socket, Command* command, int events, + const SharedHandle& rs); + + bool deleteEvents(sock_t socket, Command* command, + const SharedHandle& rs); + +public: + PortEventPoll(); + + bool good() const; + + virtual ~PortEventPoll(); + + virtual void poll(const struct timeval& tv); + + virtual bool addEvents(sock_t socket, + Command* command, EventPoll::EventType events); + + virtual bool deleteEvents(sock_t socket, + Command* command, EventPoll::EventType events); +#ifdef ENABLE_ASYNC_DNS + + virtual bool addNameResolver(const SharedHandle& resolver, + Command* command); + virtual bool deleteNameResolver + (const SharedHandle& resolver, Command* command); +#endif // ENABLE_ASYNC_DNS + + static const int IEV_READ = POLLIN; + static const int IEV_WRITE = POLLOUT; + static const int IEV_ERROR = POLLERR; + static const int IEV_HUP = POLLHUP; +}; + +} // namespace aria2 + +#endif // _D_PORT_EVENT_POLL_H_ diff --git a/src/SocketCore.cc b/src/SocketCore.cc index 58668cae..299c3fac 100644 --- a/src/SocketCore.cc +++ b/src/SocketCore.cc @@ -38,6 +38,9 @@ #ifdef HAVE_IFADDRS_H # include #endif // HAVE_IFADDRS_H +#ifdef HAVE_PORT_H +# include +#endif // HAVE_PORT_H #include #include @@ -148,12 +151,12 @@ SocketCore::SocketCore(sock_t sockfd, int sockType):_sockType(sockType), sockfd( void SocketCore::init() { - #ifdef HAVE_EPOLL - _epfd = -1; - #endif // HAVE_EPOLL +#ifdef HAVE_PORT_ASSOCIATE + _portfd = -1; +#endif // HAVE_PORT_ASSOCIATE blocking = true; secure = 0; @@ -175,15 +178,16 @@ void SocketCore::init() SocketCore::~SocketCore() { closeConnection(); - #ifdef HAVE_EPOLL - if(_epfd != -1) { CLOSE(_epfd); } - #endif // HAVE_EPOLL - +#ifdef HAVE_PORT_ASSOCIATE + if(_portfd != -1) { + CLOSE(_portfd); + } +#endif // HAVE_PORT_ASSOCIATE #ifdef HAVE_LIBGNUTLS delete [] peekBuf; #endif // HAVE_LIBGNUTLS @@ -587,6 +591,19 @@ void SocketCore::initEPOLL() #endif // HAVE_EPOLL +#ifdef HAVE_PORT_ASSOCIATE +void SocketCore::initPort() +{ + if((_portfd = port_create()) == -1) { + throw DL_RETRY_EX(StringFormat("port_create failed:%s", errorMsg()).str()); + } + if(port_associate(_portfd, PORT_SOURCE_FD, sockfd, POLLIN|POLLOUT, 0) == -1) { + throw DL_RETRY_EX + (StringFormat("port_associate failed:%s", errorMsg()).str()); + } +} +#endif // HAVE_PORT_ASSOCIATE + bool SocketCore::isWritable(time_t timeout) { #ifdef HAVE_EPOLL @@ -607,49 +624,67 @@ bool SocketCore::isWritable(time_t timeout) } } else #endif // HAVE_EPOLL -#ifdef HAVE_POLL - if(_pollMethod == SocketCore::POLL_METHOD_POLL) { - struct pollfd p; - p.fd = sockfd; - p.events = POLLOUT; - int r; - while((r = poll(&p, 1, timeout*1000)) == -1 && errno == EINTR); - if(r > 0) { - return p.revents&(POLLOUT|POLLHUP|POLLERR); - } else if(r == 0) { +#ifdef HAVE_PORT_ASSOCIATE + if(_pollMethod == SocketCore::POLL_METHOD_PORT) { + if(_portfd == -1) { + initPort(); + } + struct timespec ts = { timeout, 0 }; + port_event_t portEvent; + int r = port_get(_portfd, &portEvent, &ts); + if(r == 0) { + return portEvent.portev_events&(POLLOUT|POLLHUP|POLLERR); + } else if(r == -1 && (errno == ETIME || errno == EINTR)) { return false; } else { throw DL_RETRY_EX (StringFormat(EX_SOCKET_CHECK_WRITABLE, errorMsg()).str()); } } else -#endif // HAVE_POLL - if(_pollMethod == SocketCore::POLL_METHOD_SELECT) { - fd_set fds; - FD_ZERO(&fds); - FD_SET(sockfd, &fds); - - struct timeval tv; - tv.tv_sec = timeout; - tv.tv_usec = 0; - - int r = select(sockfd+1, NULL, &fds, NULL, &tv); - if(r == 1) { - return true; +#endif // HAVE_PORT_ASSOCIATE +#ifdef HAVE_POLL + if(_pollMethod == SocketCore::POLL_METHOD_POLL) { + struct pollfd p; + p.fd = sockfd; + p.events = POLLOUT; + int r; + while((r = poll(&p, 1, timeout*1000)) == -1 && errno == EINTR); + if(r > 0) { + return p.revents&(POLLOUT|POLLHUP|POLLERR); } else if(r == 0) { - // time out return false; } else { - if(SOCKET_ERRNO == A2_EINPROGRESS || SOCKET_ERRNO == A2_EINTR) { + throw DL_RETRY_EX + (StringFormat(EX_SOCKET_CHECK_WRITABLE, errorMsg()).str()); + } + } else +#endif // HAVE_POLL + if(_pollMethod == SocketCore::POLL_METHOD_SELECT) { + fd_set fds; + FD_ZERO(&fds); + FD_SET(sockfd, &fds); + + struct timeval tv; + tv.tv_sec = timeout; + tv.tv_usec = 0; + + int r = select(sockfd+1, NULL, &fds, NULL, &tv); + if(r == 1) { + return true; + } else if(r == 0) { + // time out return false; } else { - throw DL_RETRY_EX - (StringFormat(EX_SOCKET_CHECK_WRITABLE, errorMsg()).str()); + if(SOCKET_ERRNO == A2_EINPROGRESS || SOCKET_ERRNO == A2_EINTR) { + return false; + } else { + throw DL_RETRY_EX + (StringFormat(EX_SOCKET_CHECK_WRITABLE, errorMsg()).str()); + } } + } else { + abort(); } - } else { - abort(); - } } bool SocketCore::isReadable(time_t timeout) @@ -679,49 +714,67 @@ bool SocketCore::isReadable(time_t timeout) } } else #endif // HAVE_EPOLL -#ifdef HAVE_POLL - if(_pollMethod == SocketCore::POLL_METHOD_POLL) { - struct pollfd p; - p.fd = sockfd; - p.events = POLLIN; - int r; - while((r = poll(&p, 1, timeout*1000)) == -1 && errno == EINTR); - if(r > 0) { - return p.revents&(POLLIN|POLLHUP|POLLERR); - } else if(r == 0) { +#ifdef HAVE_PORT_ASSOCIATE + if(_pollMethod == SocketCore::POLL_METHOD_PORT) { + if(_portfd == -1) { + initPort(); + } + struct timespec ts = { timeout, 0 }; + port_event_t portEvent; + int r = port_get(_portfd, &portEvent, &ts); + if(r == 0) { + return portEvent.portev_events&(POLLIN|POLLHUP|POLLERR); + } else if(r == -1 && (errno == ETIME || errno == EINTR)) { return false; } else { throw DL_RETRY_EX (StringFormat(EX_SOCKET_CHECK_READABLE, errorMsg()).str()); } - } else -#endif // HAVE_POLL - if(_pollMethod == SocketCore::POLL_METHOD_SELECT) { - fd_set fds; - FD_ZERO(&fds); - FD_SET(sockfd, &fds); - - struct timeval tv; - tv.tv_sec = timeout; - tv.tv_usec = 0; - - int r = select(sockfd+1, &fds, NULL, NULL, &tv); - if(r == 1) { - return true; + } else +#endif // HAVE_PORT_ASSOCIATE +#ifdef HAVE_POLL + if(_pollMethod == SocketCore::POLL_METHOD_POLL) { + struct pollfd p; + p.fd = sockfd; + p.events = POLLIN; + int r; + while((r = poll(&p, 1, timeout*1000)) == -1 && errno == EINTR); + if(r > 0) { + return p.revents&(POLLIN|POLLHUP|POLLERR); } else if(r == 0) { - // time out return false; } else { - if(SOCKET_ERRNO == A2_EINPROGRESS || SOCKET_ERRNO == A2_EINTR) { + throw DL_RETRY_EX + (StringFormat(EX_SOCKET_CHECK_READABLE, errorMsg()).str()); + } + } else +#endif // HAVE_POLL + if(_pollMethod == SocketCore::POLL_METHOD_SELECT) { + fd_set fds; + FD_ZERO(&fds); + FD_SET(sockfd, &fds); + + struct timeval tv; + tv.tv_sec = timeout; + tv.tv_usec = 0; + + int r = select(sockfd+1, &fds, NULL, NULL, &tv); + if(r == 1) { + return true; + } else if(r == 0) { + // time out return false; } else { - throw DL_RETRY_EX - (StringFormat(EX_SOCKET_CHECK_READABLE, errorMsg()).str()); + if(SOCKET_ERRNO == A2_EINPROGRESS || SOCKET_ERRNO == A2_EINTR) { + return false; + } else { + throw DL_RETRY_EX + (StringFormat(EX_SOCKET_CHECK_READABLE, errorMsg()).str()); + } } + } else { + abort(); } - } else { - abort(); - } } #ifdef HAVE_LIBSSL @@ -1274,6 +1327,13 @@ void SocketCore::useEpoll() } #endif // HAVE_EPOLL +#ifdef HAVE_PORT_ASSOCIATE +void SocketCore::usePort() +{ + _pollMethod = SocketCore::POLL_METHOD_PORT; +} +#endif // HAVE_PORT_ASSOCIATE + #ifdef HAVE_POLL void SocketCore::usePoll() { diff --git a/src/SocketCore.h b/src/SocketCore.h index c864204b..dd5fe2e7 100644 --- a/src/SocketCore.h +++ b/src/SocketCore.h @@ -87,8 +87,13 @@ private: #endif // HAVE_EPOLL +#ifdef HAVE_PORT_ASSOCIATE + int _portfd; +#endif // HAVE_PORT_ASSOCIATE + enum PollMethod { POLL_METHOD_EPOLL, + POLL_METHOD_PORT, POLL_METHOD_POLL, POLL_METHOD_SELECT }; @@ -134,10 +139,11 @@ private: void bind(const struct sockaddr* addr, socklen_t addrlen); #ifdef HAVE_EPOLL - void initEPOLL(); - #endif // HAVE_EPOLL +#ifdef HAVE_PORT_ASSOCIATE + void initPort(); +#endif // HAVE_PORT_ASSOCIATE void setSockOpt(int level, int optname, void* optval, socklen_t optlen); @@ -362,6 +368,9 @@ public: #ifdef HAVE_EPOLL static void useEpoll(); #endif // HAVE_EPOLL +#ifdef HAVE_PORT_ASSOCIATE + static void usePort(); +#endif // HAVE_PORT_ASSOCIATE #ifdef HAVE_POLL static void usePoll(); #endif // HAVE_POLL diff --git a/src/main.cc b/src/main.cc index 945c2e4f..63fc4c3e 100644 --- a/src/main.cc +++ b/src/main.cc @@ -186,19 +186,25 @@ downloadresultcode::RESULT main(int argc, char* argv[]) if(op->getAsBool(PREF_QUIET)) { LogFactory::setConsoleOutput(false); } + const std::string& pollMethod = op->get(PREF_EVENT_POLL); #ifdef HAVE_EPOLL - if(op->get(PREF_EVENT_POLL) == V_EPOLL) { + if(pollMethod == V_EPOLL) { SocketCore::useEpoll(); } else #endif // HAVE_EPOLL -#ifdef HAVE_POLL - if(op->get(PREF_EVENT_POLL) == V_POLL) { - SocketCore::usePoll(); +#ifdef HAVE_PORT_ASSOCIATE + if(pollMethod == V_PORT) { + SocketCore::usePort(); } else +#endif // HAVE_PORT_ASSOCIATE +#ifdef HAVE_POLL + if(pollMethod == V_POLL) { + SocketCore::usePoll(); + } else #endif // HAVE_POLL - if(op->get(PREF_EVENT_POLL) == V_SELECT) { - SocketCore::useSelect(); - } + if(pollMethod == V_SELECT) { + SocketCore::useSelect(); + } downloadresultcode::RESULT exitStatus = downloadresultcode::FINISHED; Logger* logger = LogFactory::getInstance(); diff --git a/src/prefs.cc b/src/prefs.cc index cb7f9395..9a71e97f 100644 --- a/src/prefs.cc +++ b/src/prefs.cc @@ -150,6 +150,7 @@ const std::string PREF_MAX_FILE_NOT_FOUND("max-file-not-found"); // value: epoll | select const std::string PREF_EVENT_POLL("event-poll"); const std::string V_EPOLL("epoll"); +const std::string V_PORT("port"); const std::string V_POLL("poll"); const std::string V_SELECT("select"); // value: 1*digit diff --git a/src/prefs.h b/src/prefs.h index ac5d584e..f8cf157a 100644 --- a/src/prefs.h +++ b/src/prefs.h @@ -154,6 +154,7 @@ extern const std::string PREF_MAX_FILE_NOT_FOUND; // value: epoll | select extern const std::string PREF_EVENT_POLL; extern const std::string V_EPOLL; +extern const std::string V_PORT; extern const std::string V_POLL; extern const std::string V_SELECT; // value: 1*digit