diff --git a/configure.ac b/configure.ac index 5ec8a902..36bd6a99 100644 --- a/configure.ac +++ b/configure.ac @@ -17,13 +17,14 @@ AC_CONFIG_HEADERS([config.h]) case "$host" in *mingw*) win_build=yes - LIBS="$LIBS -lws2_32 -lwsock32 -lgdi32 -lwinmm -liphlpapi" + LIBS="$LIBS -lws2_32 -lwsock32 -lgdi32 -lwinmm -liphlpapi -lpsapi" ;; esac AC_DEFINE_UNQUOTED([TARGET], ["$target"], [Define target-type]) # Checks for arguments. +ARIA2_ARG_WITHOUT([libuv]) ARIA2_ARG_WITHOUT([appletls]) ARIA2_ARG_WITHOUT([gnutls]) ARIA2_ARG_WITHOUT([libnettle]) @@ -112,6 +113,69 @@ if test "x$with_libz" = "xyes"; then fi fi +if test "x$with_libuv" = "xyes"; then + case "$host" in + *mingw*|*msvc*) + old_CPPFLAGS=$CPPFLAGS + CPPFLAGS="$CPPFLAGS -D_WIN32_WINNT=0x0600" + AC_SEARCH_LIBS([uv_poll_init_socket], [uv], [ + AC_CHECK_HEADER([uv.h], [have_libuv=yes], [have_libuv=no]) + break; + ], [have_libuv=no]) + if test "x$have_libuv" != "xyes"; then + CPPFLAGS=$old_CPPFLAGS + fi + ;; + + *darwin*) + old_LDFLAGS=$LDFLAGS + LDFLAGS="$LDFLAGS -framework Foundation -framework CoreServices -framework ApplicationServices" + old_LIBS=$LIBS + LIBS="$LIBS -lm" + AC_SEARCH_LIBS([uv_poll_init_socket], [uv], [ + AC_CHECK_HEADER([uv.h], [have_libuv=yes], [have_libuv=no]) + break; + ], [have_libuv=no]) + if test "x$have_libuv" != "xyes"; then + LDFLAGS=$old_LDFLAGS + LIBS=$old_LIBS + fi + ;; + + *) + dnl Yeah, sucks that luv does not bring a pkg-config or config-tool + AC_MSG_CHECKING([for libuv]) + for combo in "" "-lrt" "-ldl -lrt" "-ldl -lrt -lpthread" "-lkvm"; do + old_LIBS=$LIBS + LIBS="-luv $combo $LIBS -lm" + AC_LINK_IFELSE([AC_LANG_SOURCE([ +extern "C" int uv_poll_init_socket(void); +int main() { return uv_poll_init_socket(); } + ])], [ + AC_MSG_RESULT(-luv $combo -lm) + AC_CHECK_HEADER([uv.h], [have_libuv=yes], [have_libuv=no]) + break; + ], [have_libuv=no]) + if test "x$have_libuv" = "xyes"; then + break; + else + LIBS=$old_LIBS + fi + done + if test "x$have_libuv" != "xyes"; then + AC_MSG_RESULT("no") + fi + ;; + esac + + if test "x$have_libuv" = "xyes"; then + AC_DEFINE([HAVE_LIBUV], [1], [Define to 1 if you have libuv.]) + elif test "x$with_libuv_requested" = "xyes"; then + ARIA2_DEP_NOT_MET([libuv]) + fi +fi +AM_CONDITIONAL([HAVE_LIBUV], [test "x$have_libuv" = "xyes"]) + if test "x$with_libxml2" = "xyes"; then AM_PATH_XML2([2.6.24], [have_libxml2=yes]) if test "x$have_libxml2" = "xyes"; then @@ -388,18 +452,25 @@ AC_HEADER_STDC case "$host" in *mingw*) - AC_CHECK_HEADERS([windows.h \ - winsock2.h \ - ws2tcpip.h \ - mmsystem.h \ - io.h \ - iphlpapi.h\ - winioctl.h \ - share.h], [], [], -[[#ifdef HAVE_WINDOWS_H + AC_CHECK_HEADERS([windows.h \ + winsock2.h \ + ws2tcpip.h \ + mmsystem.h \ + io.h \ + iphlpapi.h\ + winioctl.h \ + share.h], [], [], + [[ +#ifdef HAVE_WS2TCPIP_H +# include +#endif +#ifdef HAVE_WINSOCK2_H +# include +#endif +#ifdef HAVE_WINDOWS_H # include #endif -]]) + ]]) ;; esac @@ -416,8 +487,8 @@ AC_CHECK_HEADERS([argz.h \ netdb.h \ netinet/in.h \ netinet/tcp.h \ - poll.h \ - port.h \ + poll.h \ + port.h \ signal.h \ stddef.h \ stdint.h \ @@ -434,10 +505,10 @@ AC_CHECK_HEADERS([argz.h \ sys/uio.h \ termios.h \ unistd.h \ - utime.h \ + utime.h \ wchar.h \ - ifaddrs.h \ - pwd.h]) + ifaddrs.h \ + pwd.h]) # Checks for typedefs, structures, and compiler characteristics. AC_HEADER_STDBOOL @@ -722,6 +793,7 @@ echo "CPPFLAGS: $CPPFLAGS" echo "LDFLAGS: $LDFLAGS" echo "LIBS: $LIBS" echo "DEFS: $DEFS" +echo "LibUV: $have_libuv" echo "SQLite3: $have_sqlite3" echo "SSL Support: $have_ssl" echo "AppleTLS: $have_appletls" diff --git a/src/DownloadEngineFactory.cc b/src/DownloadEngineFactory.cc index ac7048bb..164b1b15 100644 --- a/src/DownloadEngineFactory.cc +++ b/src/DownloadEngineFactory.cc @@ -59,6 +59,9 @@ #include "a2io.h" #include "DownloadContext.h" #include "array_fun.h" +#ifdef HAVE_LIBUV +# include "LibuvEventPoll.h" +#endif // HAVE_LIBUV #ifdef HAVE_EPOLL # include "EpollEventPoll.h" #endif // HAVE_EPOLL @@ -89,6 +92,17 @@ DownloadEngineFactory::newDownloadEngine op->getAsInt(PREF_MAX_CONCURRENT_DOWNLOADS); SharedHandle eventPoll; const std::string& pollMethod = op->get(PREF_EVENT_POLL); +#ifdef HAVE_LIBUV + if (pollMethod == V_LIBUV) { + SharedHandle ep(new LibuvEventPoll()); + if (!ep->good()) { + throw DL_ABORT_EX("Initializing LibuvEventPoll failed." + " Try --event-poll=select"); + } + eventPoll = ep; + } + else +#endif // HAVE_LIBUV #ifdef HAVE_EPOLL if(pollMethod == V_EPOLL) { SharedHandle ep(new EpollEventPoll()); diff --git a/src/LibuvEventPoll.cc b/src/LibuvEventPoll.cc new file mode 100644 index 00000000..2509357f --- /dev/null +++ b/src/LibuvEventPoll.cc @@ -0,0 +1,327 @@ +/* */ + +#ifdef __MINGW32__ +#ifdef _WIN32_WINNT +#undef _WIN32_WINNT +#endif // _WIN32_WINNT +#define _WIN32_WINNT 0x0600 +#endif // __MINGW32__ + +#include "LibuvEventPoll.h" + +#include +#include +#include +#include + +#include + +#include "Command.h" +#include "LogFactory.h" +#include "Logger.h" +#include "a2functional.h" +#include "fmt.h" +#include "util.h" + +namespace { + using namespace aria2; + + template + static void close_callback(uv_handle_t* handle) + { + delete reinterpret_cast(handle); + } + + static void timer_callback(uv_timer_t* handle, int status) + { + uv_stop(handle->loop); + } +} + +namespace aria2 { + +LibuvEventPoll::KSocketEntry::KSocketEntry(sock_t s) + : SocketEntry(s) +{} + +inline int accumulateEvent(int events, const LibuvEventPoll::KEvent& event) +{ + return events | event.getEvents(); +} + +int LibuvEventPoll::KSocketEntry::getEvents() const +{ + int events = 0; +#ifdef ENABLE_ASYNC_DNS + events = std::accumulate(adnsEvents_.begin(), adnsEvents_.end(), + std::accumulate(commandEvents_.begin(), + commandEvents_.end(), 0, + accumulateEvent), + accumulateEvent); +#else // !ENABLE_ASYNC_DNS + events = std::accumulate(commandEvents_.begin(), commandEvents_.end(), 0, + accumulateEvent); +#endif // !ENABLE_ASYNC_DNS + + return events; +} + +LibuvEventPoll::LibuvEventPoll() +{ + loop_ = uv_loop_new(); +} + +LibuvEventPoll::~LibuvEventPoll() +{ + for (KPolls::iterator i = polls_.begin(), e = polls_.end(); i != e; ++i) { + i->second->close(); + } + // Actually kill the polls, and timers, if any. + uv_run(loop_, (uv_run_mode)(UV_RUN_ONCE | UV_RUN_NOWAIT)); + + if (loop_) { + uv_loop_delete(loop_); + loop_ = 0; + } + + // Need this to free only after the loop is gone. + polls_.clear(); +} + +void LibuvEventPoll::poll(const struct timeval& tv) +{ + const int timeout = tv.tv_sec * 1000 + tv.tv_usec / 1000; + + // timeout == 0 will tick once + if (timeout >= 0) { + uv_timer_t* timer = new uv_timer_t; + uv_timer_init(loop_, timer); + uv_timer_start(timer, timer_callback, timeout, timeout); + + uv_run(loop_, UV_RUN_DEFAULT); + + // Remove timer again. + uv_timer_stop(timer); + uv_close((uv_handle_t*)timer, close_callback); + } + else { + while (uv_run(loop_, (uv_run_mode)(UV_RUN_ONCE | UV_RUN_NOWAIT)) > 0) {} + } + +#ifdef ENABLE_ASYNC_DNS + // It turns out that we have to call ares_process_fd before ares's + // own timeout and ares may create new sockets or closes socket in + // their API. So we call ares_process_fd for all ares_channel and + // re-register their sockets. + for(KAsyncNameResolverEntrySet::iterator i = nameResolverEntries_.begin(), + eoi = nameResolverEntries_.end(); i != eoi; ++i) { + (*i)->processTimeout(); + (*i)->removeSocketEvents(this); + (*i)->addSocketEvents(this); + } +#endif // ENABLE_ASYNC_DNS + + // TODO timeout of name resolver is determined in Command(AbstractCommand, + // DHTEntryPoint...Command) +} + +int LibuvEventPoll::translateEvents(EventPoll::EventType events) +{ + int newEvents = 0; + if (EventPoll::EVENT_READ & events) { + newEvents |= IEV_READ; + } + if (EventPoll::EVENT_WRITE & events) { + newEvents |= IEV_WRITE; + } + if (EventPoll::EVENT_ERROR & events) { + newEvents |= IEV_ERROR; + } + if (EventPoll::EVENT_HUP & events) { + newEvents |= IEV_HUP; + } + return newEvents; +} + +void LibuvEventPoll::pollCallback(KPoll* poll, int status, int events) +{ + if (status == -1) { + uv_err_t err = uv_last_error(loop_); + switch (err.code) { + case UV_EAGAIN: + case UV_EINTR: + return; + case UV_EOF: + case UV_ECONNABORTED: + case UV_ECONNREFUSED: + case UV_ECONNRESET: + case UV_ENOTCONN: + case UV_EPIPE: + case UV_ESHUTDOWN: + events = IEV_HUP; + poll->processEvents(events); + poll->stop(); + uv_stop(loop_); + return; + default: + events = IEV_ERROR; + poll->processEvents(events); + poll->stop(); + uv_stop(loop_); + return; + } + } + + // Got something + poll->processEvents(events); + uv_stop(loop_); +} + +bool LibuvEventPoll::addEvents(sock_t socket, + const LibuvEventPoll::KEvent& event) +{ + SharedHandle socketEntry(new KSocketEntry(socket)); + KSocketEntrySet::iterator i = socketEntries_.lower_bound(socketEntry); + + if (i != socketEntries_.end() && **i == *socketEntry) { + event.addSelf(*i); + KPolls::iterator poll = polls_.find(socket); + if (poll == polls_.end()) { + throw std::logic_error("Invalid socket"); + } + poll->second->start(); + return true; + } + + socketEntries_.insert(i, socketEntry); + event.addSelf(socketEntry); + KPoll* poll = new KPoll(this, socketEntry.get(), socket); + polls_[socket] = poll; + poll->start(); + return true; +} + +bool LibuvEventPoll::addEvents(sock_t socket, Command* command, EventPoll::EventType events) +{ + int pollEvents = translateEvents(events); + return addEvents(socket, KCommandEvent(command, pollEvents)); +} + +#ifdef ENABLE_ASYNC_DNS +bool LibuvEventPoll::addEvents(sock_t socket, Command* command, int events, + const SharedHandle& rs) +{ + return addEvents(socket, KADNSEvent(rs, command, socket, events)); +} +#endif // ENABLE_ASYNC_DNS + +bool LibuvEventPoll::deleteEvents(sock_t socket, + const LibuvEventPoll::KEvent& event) +{ + SharedHandle socketEntry(new KSocketEntry(socket)); + KSocketEntrySet::iterator i = socketEntries_.find(socketEntry); + + if (i == socketEntries_.end()) { + A2_LOG_DEBUG(fmt("Socket %d is not found in SocketEntries.", socket)); + return false; + } + + event.removeSelf(*i); + + KPolls::iterator poll = polls_.find(socket); + if (poll == polls_.end()) { + return false; + } + + if ((*i)->eventEmpty()) { + poll->second->close(); + polls_.erase(poll); + socketEntries_.erase(i); + return true; + } + + poll->second->start(); + return true; +} + +#ifdef ENABLE_ASYNC_DNS +bool LibuvEventPoll::deleteEvents(sock_t socket, Command* command, + const SharedHandle& rs) +{ + return deleteEvents(socket, KADNSEvent(rs, command, socket, 0)); +} +#endif // ENABLE_ASYNC_DNS + +bool LibuvEventPoll::deleteEvents(sock_t socket, Command* command, + EventPoll::EventType events) +{ + int pollEvents = translateEvents(events); + return deleteEvents(socket, KCommandEvent(command, pollEvents)); +} + +#ifdef ENABLE_ASYNC_DNS +bool LibuvEventPoll::addNameResolver(const SharedHandle& resolver, + Command* command) +{ + SharedHandle entry( + new KAsyncNameResolverEntry(resolver, command)); + KAsyncNameResolverEntrySet::iterator itr = + nameResolverEntries_.lower_bound(entry); + if (itr != nameResolverEntries_.end() && *(*itr) == *entry) { + return false; + } + nameResolverEntries_.insert(itr, entry); + entry->addSocketEvents(this); + return true; +} + +bool LibuvEventPoll::deleteNameResolver(const SharedHandle& resolver, + Command* command) +{ + SharedHandle entry( + new KAsyncNameResolverEntry(resolver, command)); + KAsyncNameResolverEntrySet::iterator itr = nameResolverEntries_.find(entry); + if (itr == nameResolverEntries_.end()) { + return false; + } + (*itr)->removeSocketEvents(this); + nameResolverEntries_.erase(itr); + return true; +} +#endif // ENABLE_ASYNC_DNS + +} // namespace aria2 diff --git a/src/LibuvEventPoll.h b/src/LibuvEventPoll.h new file mode 100644 index 00000000..5c058737 --- /dev/null +++ b/src/LibuvEventPoll.h @@ -0,0 +1,171 @@ +/* */ +#ifndef D_LIBUV_EVENT_POLL_H +#define D_LIBUV_EVENT_POLL_H + +#include "EventPoll.h" + +#include +#include + +#include + +#include "Event.h" +#include "a2functional.h" + +#ifdef ENABLE_ASYNC_DNS +#include "AsyncNameResolver.h" +#endif // ENABLE_ASYNC_DNS + +namespace aria2 { + +class LibuvEventPoll : public EventPoll { +private: + class KSocketEntry; + + typedef Event KEvent; + typedef CommandEvent KCommandEvent; + typedef ADNSEvent KADNSEvent; + typedef AsyncNameResolverEntry KAsyncNameResolverEntry; + + friend class AsyncNameResolverEntry; + friend int accumulateEvent(int events, const KEvent& event); + + class KSocketEntry: public SocketEntry { + public: + KSocketEntry(sock_t socket); + int getEvents() const; + }; + + class KPoll { + private: + LibuvEventPoll *eventer_; + KSocketEntry *entry_; + uv_poll_t handle_; + + static void poll_callback(uv_poll_t* handle, int status, int events) { + KPoll* poll = static_cast(handle->data); + poll->eventer_->pollCallback(poll, status, events); + } + static void close_callback(uv_handle_t* handle) { + delete static_cast(handle->data); + } + + public: + inline KPoll(LibuvEventPoll* eventer, KSocketEntry* entry, sock_t sock) + : eventer_(eventer), entry_(entry) + { + uv_poll_init_socket(eventer->loop_, &handle_, sock); + handle_.data = this; + } + inline void start() { + uv_poll_start(&handle_, entry_->getEvents() & IEV_RW, poll_callback); + } + inline void stop() { + uv_poll_stop(&handle_); + } + inline void processEvents(int events) { + entry_->processEvents(events); + } + inline void close() { + stop(); + uv_close((uv_handle_t*)&handle_, close_callback); + } + }; + + typedef std::set, + DerefLess > > KSocketEntrySet; + + typedef std::map KPolls; + +#ifdef ENABLE_ASYNC_DNS + typedef std::set, + DerefLess > > + KAsyncNameResolverEntrySet; +#endif // ENABLE_ASYNC_DNS + + uv_loop_t* loop_; + KSocketEntrySet socketEntries_; + KPolls polls_; + +#ifdef ENABLE_ASYNC_DNS + KAsyncNameResolverEntrySet nameResolverEntries_; +#endif // ENABLE_ASYNC_DNS + + bool addEvents(sock_t socket, const KEvent& event); + bool deleteEvents(sock_t socket, const KEvent& event); + void pollCallback(KPoll *poll, int status, int events); + +#ifdef ENABLE_ASYNC_DNS + bool addEvents(sock_t socket, Command* command, int events, + const SharedHandle& rs); + bool deleteEvents(sock_t socket, Command* command, + const SharedHandle& rs); +#endif + + static int translateEvents(EventPoll::EventType events); + +public: + LibuvEventPoll(); + virtual ~LibuvEventPoll(); + + bool good() const { return loop_; } + + virtual void poll(const struct timeval& tv); + + virtual bool addEvents(sock_t socket, + Command* command, EventPoll::EventType events); + virtual bool deleteEvents(sock_t socket, + Command* command, EventPoll::EventType events); + +#ifdef ENABLE_ASYNC_DNS + virtual bool addNameResolver(const SharedHandle& resolver, + Command* command); + virtual bool deleteNameResolver( + const SharedHandle& resolver, Command* command); +#endif // ENABLE_ASYNC_DNS + + static const int IEV_READ = UV_READABLE; + static const int IEV_WRITE = UV_WRITABLE; + static const int IEV_RW = UV_READABLE | UV_WRITABLE; + + // Make sure these do not interfere with the uv_poll API later. + static const int IEV_ERROR = 128; + static const int IEV_HUP = 255; +}; + +} // namespace aria2 + +#endif // D_LIBUV_EVENT_POLL_H diff --git a/src/Makefile.am b/src/Makefile.am index 594d4c35..6333d73a 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -629,6 +629,10 @@ if HAVE_KQUEUE SRCS += KqueueEventPoll.cc KqueueEventPoll.h endif # HAVE_KQUEUE +if HAVE_LIBUV +SRCS += LibuvEventPoll.cc LibuvEventPoll.h +endif # HAVE_LIBUV + AR = @AR@ noinst_LIBRARIES = libaria2c.a libaria2c_a_SOURCES = $(SRCS) diff --git a/src/OptionHandlerFactory.cc b/src/OptionHandlerFactory.cc index 835d2388..83663cce 100644 --- a/src/OptionHandlerFactory.cc +++ b/src/OptionHandlerFactory.cc @@ -338,6 +338,9 @@ std::vector OptionHandlerFactory::createOptionHandlers() #ifdef HAVE_PORT_ASSOCIATE V_PORT, #endif // HAVE_PORT_ASSOCIATE +#ifdef HAVE_LIBUV + V_LIBUV, +#endif // HAVE_LIBUV #ifdef HAVE_POLL V_POLL, #endif // HAVE_POLL @@ -346,15 +349,19 @@ std::vector OptionHandlerFactory::createOptionHandlers() OptionHandler* op(new ParameterOptionHandler (PREF_EVENT_POLL, TEXT_EVENT_POLL, -#ifdef HAVE_EPOLL +#if defined(HAVE_EPOLL) V_EPOLL, -#elif HAVE_KQUEUE +#elif defined(HAVE_KQUEUE) V_KQUEUE, -#elif HAVE_PORT_ASSOCIATE +#elif defined(HAVE_PORT_ASSOCIATE) V_PORT, -#else +#elif defined(HAVE_LIBUV) + V_LIBUV, +#elif defined(HAVE_POLL) + V_POLL, +#else // defined(HAVE_EPOLL) V_SELECT, -#endif // !HAVE_EPOLL +#endif // defined(HAVE_EPOLL) std::vector (vbegin(params), vend(params)))); op->addTag(TAG_ADVANCED); diff --git a/src/SocketCore.cc b/src/SocketCore.cc index 02659fc9..be8be55f 100644 --- a/src/SocketCore.cc +++ b/src/SocketCore.cc @@ -1223,7 +1223,7 @@ const uint32_t APIPA_IPV4_END = 2852061183u; // 169.254.255.255 void checkAddrconfig() { -#ifdef __MINGW32__ +#ifdef HAVE_IPHLPAPI_H A2_LOG_INFO("Checking configured addresses"); ULONG bufsize = 15*1024; ULONG retval = 0; diff --git a/src/prefs.cc b/src/prefs.cc index c937b958..fc7c8e37 100644 --- a/src/prefs.cc +++ b/src/prefs.cc @@ -153,6 +153,7 @@ const std::string V_ERROR("error"); const std::string V_INORDER("inorder"); const std::string V_FEEDBACK("feedback"); const std::string V_ADAPTIVE("adaptive"); +const std::string V_LIBUV("libuv"); const std::string V_EPOLL("epoll"); const std::string V_KQUEUE("kqueue"); const std::string V_PORT("port"); diff --git a/src/prefs.h b/src/prefs.h index 92c699a2..f62bfd7c 100644 --- a/src/prefs.h +++ b/src/prefs.h @@ -88,6 +88,7 @@ extern const std::string V_ERROR; extern const std::string V_INORDER; extern const std::string V_FEEDBACK; extern const std::string V_ADAPTIVE; +extern const std::string V_LIBUV; extern const std::string V_EPOLL; extern const std::string V_KQUEUE; extern const std::string V_PORT;