From 9acd3df3cb833e429687347a0d049306353c6e2a Mon Sep 17 00:00:00 2001 From: Nils Maier Date: Wed, 10 Apr 2013 08:47:20 +0200 Subject: [PATCH] LibUV: Implement LibuvEventPoll LibUV event will use the best available polling method on a system, kind of like aria2 does already with the different *EventPoll implementations. However, libuv may support different/newer polling mechanisms; for example on Windows it will use IO Completion Ports which are superior to select() ;) --- configure.ac | 70 ++++++-- src/DownloadEngineFactory.cc | 14 ++ src/LibuvEventPoll.cc | 308 +++++++++++++++++++++++++++++++++++ src/LibuvEventPoll.h | 133 +++++++++++++++ src/Makefile.am | 4 + src/OptionHandlerFactory.cc | 13 +- src/SocketCore.cc | 2 +- src/prefs.cc | 1 + src/prefs.h | 1 + 9 files changed, 525 insertions(+), 21 deletions(-) create mode 100644 src/LibuvEventPoll.cc create mode 100644 src/LibuvEventPoll.h diff --git a/configure.ac b/configure.ac index f39ff0a9..ac279fa6 100644 --- a/configure.ac +++ b/configure.ac @@ -17,13 +17,14 @@ AC_CONFIG_HEADERS([config.h]) case "$host" in *mingw*) win_build=yes - LIBS="$LIBS -lws2_32 -lwsock32 -lgdi32 -lwinmm -liphlpapi" + LIBS="$LIBS -lws2_32 -lwsock32 -lgdi32 -lwinmm -liphlpapi -lpsapi" ;; esac AC_DEFINE_UNQUOTED([TARGET], ["$target"], [Define target-type]) # Checks for arguments. +ARIA2_ARG_WITHOUT([libuv]) ARIA2_ARG_WITHOUT([appletls]) ARIA2_ARG_WITHOUT([gnutls]) ARIA2_ARG_WITHOUT([libnettle]) @@ -110,6 +111,35 @@ if test "x$with_libz" = "xyes"; then fi fi +if test "x$with_libuv" = "xyes"; then + if test "x$win_build" = "xyes"; then + old_CPPFLAGS=$CPPFLAGS + CPPFLAGS="$CPPFLAGS -D_WIN32_WINNT=0x0600" + AC_SEARCH_LIBS([uv_poll_init_socket], [uv], [ + AC_CHECK_HEADER([uv.h], [have_libuv=yes], [have_libuv=no]) + break; + ], [have_libuv=no]) + if test "x$have_libuv" = "xyes"; then + AC_DEFINE([HAVE_LIBUV], [1], [Define to 1 if you have libuv.]) + else + CPPFLAGS=$old_CPPFLAGS + fi + else + AC_SEARCH_LIBS([uv_poll_init_socket], [uv], [ + AC_CHECK_HEADER([uv.h], [have_libuv=yes], [have_libuv=no]) + break; + ], [have_libuv=no]) + if test "x$have_libuv" = "xyes"; then + AC_DEFINE([HAVE_LIBUV], [1], [Define to 1 if you have libuv.]) + fi + fi + + if test "x$with_libuv_requested" = "xyes"; then + ARIA2_DEP_NOT_MET([libuv]) + fi +fi +AM_CONDITIONAL([HAVE_LIBUV], [test "x$have_libuv" = "xyes"]) + if test "x$with_libxml2" = "xyes"; then AM_PATH_XML2([2.6.24], [have_libxml2=yes]) if test "x$have_libxml2" = "xyes"; then @@ -386,18 +416,25 @@ AC_HEADER_STDC case "$host" in *mingw*) - AC_CHECK_HEADERS([windows.h \ - winsock2.h \ - ws2tcpip.h \ - mmsystem.h \ - io.h \ - iphlpapi.h\ - winioctl.h \ - share.h], [], [], -[[#ifdef HAVE_WINDOWS_H + AC_CHECK_HEADERS([windows.h \ + winsock2.h \ + ws2tcpip.h \ + mmsystem.h \ + io.h \ + iphlpapi.h\ + winioctl.h \ + share.h], [], [], + [[ +#ifdef HAVE_WS2TCPIP_H +# include +#endif +#ifdef HAVE_WINSOCK2_H +# include +#endif +#ifdef HAVE_WINDOWS_H # include #endif -]]) + ]]) ;; esac @@ -414,8 +451,8 @@ AC_CHECK_HEADERS([argz.h \ netdb.h \ netinet/in.h \ netinet/tcp.h \ - poll.h \ - port.h \ + poll.h \ + port.h \ signal.h \ stddef.h \ stdint.h \ @@ -432,10 +469,10 @@ AC_CHECK_HEADERS([argz.h \ sys/uio.h \ termios.h \ unistd.h \ - utime.h \ + utime.h \ wchar.h \ - ifaddrs.h \ - pwd.h]) + ifaddrs.h \ + pwd.h]) # Checks for typedefs, structures, and compiler characteristics. AC_HEADER_STDBOOL @@ -719,6 +756,7 @@ echo "CPPFLAGS: $CPPFLAGS" echo "LDFLAGS: $LDFLAGS" echo "LIBS: $LIBS" echo "DEFS: $DEFS" +echo "LibUV: $have_libuv" echo "SQLite3: $have_sqlite3" echo "SSL Support: $have_ssl" echo "AppleTLS: $have_appletls" diff --git a/src/DownloadEngineFactory.cc b/src/DownloadEngineFactory.cc index ac7048bb..164b1b15 100644 --- a/src/DownloadEngineFactory.cc +++ b/src/DownloadEngineFactory.cc @@ -59,6 +59,9 @@ #include "a2io.h" #include "DownloadContext.h" #include "array_fun.h" +#ifdef HAVE_LIBUV +# include "LibuvEventPoll.h" +#endif // HAVE_LIBUV #ifdef HAVE_EPOLL # include "EpollEventPoll.h" #endif // HAVE_EPOLL @@ -89,6 +92,17 @@ DownloadEngineFactory::newDownloadEngine op->getAsInt(PREF_MAX_CONCURRENT_DOWNLOADS); SharedHandle eventPoll; const std::string& pollMethod = op->get(PREF_EVENT_POLL); +#ifdef HAVE_LIBUV + if (pollMethod == V_LIBUV) { + SharedHandle ep(new LibuvEventPoll()); + if (!ep->good()) { + throw DL_ABORT_EX("Initializing LibuvEventPoll failed." + " Try --event-poll=select"); + } + eventPoll = ep; + } + else +#endif // HAVE_LIBUV #ifdef HAVE_EPOLL if(pollMethod == V_EPOLL) { SharedHandle ep(new EpollEventPoll()); diff --git a/src/LibuvEventPoll.cc b/src/LibuvEventPoll.cc new file mode 100644 index 00000000..89fa326d --- /dev/null +++ b/src/LibuvEventPoll.cc @@ -0,0 +1,308 @@ +/* */ + +#ifdef __MINGW32__ +#ifdef _WIN32_WINNT +#undef _WIN32_WINNT +#endif // _WIN32_WINNT +#define _WIN32_WINNT 0x0600 +#endif // __MINGW32__ + +#include "LibuvEventPoll.h" + +#include +#include +#include +#include + +#include + +#include "Command.h" +#include "LogFactory.h" +#include "Logger.h" +#include "a2functional.h" +#include "fmt.h" +#include "util.h" + +namespace { + template + static void close_callback(uv_handle_t* handle) + { + delete reinterpret_cast(handle); + } + + static void timer_callback(uv_timer_t* handle, int status) {} +} + +namespace aria2 { + +LibuvEventPoll::KSocketEntry::KSocketEntry(sock_t s) + : SocketEntry(s) +{} + +int accumulateEvent(int events, const LibuvEventPoll::KEvent& event) +{ + return events|event.getEvents(); +} + +int LibuvEventPoll::KSocketEntry::getEvents() +{ + int events = 0; +#ifdef ENABLE_ASYNC_DNS + events = std::accumulate(adnsEvents_.begin(), adnsEvents_.end(), + std::accumulate(commandEvents_.begin(), + commandEvents_.end(), 0, + accumulateEvent), + accumulateEvent); +#else // !ENABLE_ASYNC_DNS + events = std::accumulate(commandEvents_.begin(), commandEvents_.end(), 0, + accumulateEvent); +#endif // !ENABLE_ASYNC_DNS + return events; +} + +LibuvEventPoll::LibuvEventPoll() +{ + loop = uv_loop_new(); +} + +LibuvEventPoll::~LibuvEventPoll() +{ + for (KPolls::iterator i = polls_.begin(), e = polls_.end(); i != e; ++i) { + uv_poll_stop(i->second); + uv_close((uv_handle_t*)i->second, close_callback); + } + // Actually kill the polls, and timers, if any + uv_run(loop, (uv_run_mode)(UV_RUN_ONCE | UV_RUN_NOWAIT)); + + if (loop) { + uv_loop_delete(loop); + loop = 0; + } + polls_.clear(); +} + +void LibuvEventPoll::poll(const struct timeval& tv) +{ + int timeout = tv.tv_sec * 1000 + tv.tv_usec / 1000; + + if (timeout > 0) { + uv_timer_t* timer = new uv_timer_t; + uv_timer_init(loop, timer); + uv_timer_start(timer, timer_callback, timeout, timeout); + + uv_run(loop, UV_RUN_ONCE); + + // Remove timer again. + uv_timer_stop(timer); + uv_close((uv_handle_t*)timer, close_callback); + } + else { + uv_run(loop, (uv_run_mode)(UV_RUN_ONCE | UV_RUN_NOWAIT)); + } + +#ifdef ENABLE_ASYNC_DNS + // It turns out that we have to call ares_process_fd before ares's + // own timeout and ares may create new sockets or closes socket in + // their API. So we call ares_process_fd for all ares_channel and + // re-register their sockets. + for(KAsyncNameResolverEntrySet::iterator i = nameResolverEntries_.begin(), + eoi = nameResolverEntries_.end(); i != eoi; ++i) { + (*i)->processTimeout(); + (*i)->removeSocketEvents(this); + (*i)->addSocketEvents(this); + } +#endif // ENABLE_ASYNC_DNS + + // TODO timeout of name resolver is determined in Command(AbstractCommand, + // DHTEntryPoint...Command) +} + +int LibuvEventPoll::translateEvents(EventPoll::EventType events) +{ + int newEvents = 0; + if (EventPoll::EVENT_READ & events) { + newEvents |= IEV_READ; + } + if (EventPoll::EVENT_WRITE & events) { + newEvents |= IEV_WRITE; + } + if (EventPoll::EVENT_ERROR & events) { + newEvents |= IEV_ERROR; + } + if (EventPoll::EVENT_HUP & events) { + newEvents |= IEV_HUP; + } + return newEvents; +} + +void LibuvEventPoll::poll_callback(uv_poll_t* handle, int status, int events) +{ + if (status == -1) { + events = 0; + uv_err_t err = uv_last_error(handle->loop); + switch (err.code) { + case UV_EAGAIN: + return; + case UV_EOF: + case UV_ECONNABORTED: + case UV_ECONNREFUSED: + case UV_ECONNRESET: + case UV_ENOTCONN: + case UV_EPIPE: + case UV_ESHUTDOWN: + events = IEV_HUP; + default: + events = IEV_ERROR; + } + } + KSocketEntry *p = reinterpret_cast(handle->data); + p->processEvents(events); +} + +bool LibuvEventPoll::addEvents(sock_t socket, + const LibuvEventPoll::KEvent& event) +{ + SharedHandle socketEntry(new KSocketEntry(socket)); + KSocketEntrySet::iterator i = socketEntries_.lower_bound(socketEntry); + if (i != socketEntries_.end() && **i == *socketEntry) { + event.addSelf(*i); + KPolls::iterator poll = polls_.find(socket); + if (poll == polls_.end()) { + throw std::logic_error("Invalid socket"); + } + uv_poll_start(poll->second, (*i)->getEvents() & (IEV_READ | IEV_WRITE), + poll_callback); + } + else { + socketEntries_.insert(i, socketEntry); + event.addSelf(socketEntry); + uv_poll_t *poll = new uv_poll_t; + uv_poll_init_socket(loop, poll, socket); + poll->data = socketEntry.get(); + uv_poll_start(poll, socketEntry->getEvents() & (IEV_READ | IEV_WRITE), + poll_callback); + polls_[socket] = poll; + } + return true; +} + +bool LibuvEventPoll::addEvents(sock_t socket, Command* command, EventPoll::EventType events) +{ + int pollEvents = translateEvents(events); + return addEvents(socket, KCommandEvent(command, pollEvents)); +} + +#ifdef ENABLE_ASYNC_DNS +bool LibuvEventPoll::addEvents(sock_t socket, Command* command, int events, + const SharedHandle& rs) +{ + return addEvents(socket, KADNSEvent(rs, command, socket, events)); +} +#endif // ENABLE_ASYNC_DNS + +bool LibuvEventPoll::deleteEvents(sock_t socket, + const LibuvEventPoll::KEvent& event) +{ + SharedHandle socketEntry(new KSocketEntry(socket)); + KSocketEntrySet::iterator i = socketEntries_.find(socketEntry); + if(i == socketEntries_.end()) { + A2_LOG_DEBUG(fmt("Socket %d is not found in SocketEntries.", socket)); + return false; + } + + event.removeSelf(*i); + + if ((*i)->eventEmpty()) { + KPolls::iterator poll = polls_.find(socket); + if (poll == polls_.end()) { + return false; + } + uv_poll_stop(poll->second); + uv_close((uv_handle_t*)poll->second, close_callback); + polls_.erase(poll); + socketEntries_.erase(i); + } + return true; +} + +#ifdef ENABLE_ASYNC_DNS +bool LibuvEventPoll::deleteEvents(sock_t socket, Command* command, + const SharedHandle& rs) +{ + return deleteEvents(socket, KADNSEvent(rs, command, socket, 0)); +} +#endif // ENABLE_ASYNC_DNS + +bool LibuvEventPoll::deleteEvents(sock_t socket, Command* command, + EventPoll::EventType events) +{ + int pollEvents = translateEvents(events); + return deleteEvents(socket, KCommandEvent(command, pollEvents)); +} + +#ifdef ENABLE_ASYNC_DNS +bool LibuvEventPoll::addNameResolver(const SharedHandle& resolver, + Command* command) +{ + SharedHandle entry( + new KAsyncNameResolverEntry(resolver, command)); + KAsyncNameResolverEntrySet::iterator itr = + nameResolverEntries_.lower_bound(entry); + if (itr != nameResolverEntries_.end() && *(*itr) == *entry) { + return false; + } + nameResolverEntries_.insert(itr, entry); + entry->addSocketEvents(this); + return true; +} + +bool LibuvEventPoll::deleteNameResolver(const SharedHandle& resolver, + Command* command) +{ + SharedHandle entry( + new KAsyncNameResolverEntry(resolver, command)); + KAsyncNameResolverEntrySet::iterator itr = nameResolverEntries_.find(entry); + if (itr == nameResolverEntries_.end()) { + return false; + } + (*itr)->removeSocketEvents(this); + nameResolverEntries_.erase(itr); + return true; +} +#endif // ENABLE_ASYNC_DNS + +} // namespace aria2 diff --git a/src/LibuvEventPoll.h b/src/LibuvEventPoll.h new file mode 100644 index 00000000..54ced26b --- /dev/null +++ b/src/LibuvEventPoll.h @@ -0,0 +1,133 @@ +/* */ +#ifndef D_LIBUV_EVENT_POLL_H +#define D_LIBUV_EVENT_POLL_H + +#include "EventPoll.h" + +#include +#include + +#include + +#include "Event.h" +#include "a2functional.h" + +#ifdef ENABLE_ASYNC_DNS +#include "AsyncNameResolver.h" +#endif // ENABLE_ASYNC_DNS + +namespace aria2 { + +class LibuvEventPoll : public EventPoll { +private: + class KSocketEntry; + + typedef Event KEvent; + typedef CommandEvent KCommandEvent; + typedef ADNSEvent KADNSEvent; + typedef AsyncNameResolverEntry KAsyncNameResolverEntry; + friend class AsyncNameResolverEntry; + + class KSocketEntry: + public SocketEntry { + public: + KSocketEntry(sock_t socket); + int getEvents(); + }; + + friend int accumulateEvent(int events, const KEvent& event); + +private: + uv_loop_t* loop; + + typedef std::set, + DerefLess > > KSocketEntrySet; + KSocketEntrySet socketEntries_; + + typedef std::map KPolls; + KPolls polls_; + +#ifdef ENABLE_ASYNC_DNS + typedef std::set, + DerefLess > > + KAsyncNameResolverEntrySet; + KAsyncNameResolverEntrySet nameResolverEntries_; +#endif // ENABLE_ASYNC_DNS + + bool addEvents(sock_t socket, const KEvent& event); + bool deleteEvents(sock_t socket, const KEvent& event); + +#ifdef ENABLE_ASYNC_DNS + bool addEvents(sock_t socket, Command* command, int events, + const SharedHandle& rs); + bool deleteEvents(sock_t socket, Command* command, + const SharedHandle& rs); +#endif + + static int translateEvents(EventPoll::EventType events); + static void poll_callback(uv_poll_t* handle, int status, int events); + +public: + LibuvEventPoll(); + + virtual ~LibuvEventPoll(); + + bool good() const { return loop; } + + virtual void poll(const struct timeval& tv); + + virtual bool addEvents(sock_t socket, + Command* command, EventPoll::EventType events); + + virtual bool deleteEvents(sock_t socket, + Command* command, EventPoll::EventType events); +#ifdef ENABLE_ASYNC_DNS + + virtual bool addNameResolver(const SharedHandle& resolver, + Command* command); + virtual bool deleteNameResolver + (const SharedHandle& resolver, Command* command); +#endif // ENABLE_ASYNC_DNS + + static const int IEV_READ = UV_READABLE; + static const int IEV_WRITE = UV_WRITABLE; + static const int IEV_ERROR = 128; + static const int IEV_HUP = 255; +}; + +} // namespace aria2 + +#endif // D_LIBUV_EVENT_POLL_H diff --git a/src/Makefile.am b/src/Makefile.am index 594d4c35..6333d73a 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -629,6 +629,10 @@ if HAVE_KQUEUE SRCS += KqueueEventPoll.cc KqueueEventPoll.h endif # HAVE_KQUEUE +if HAVE_LIBUV +SRCS += LibuvEventPoll.cc LibuvEventPoll.h +endif # HAVE_LIBUV + AR = @AR@ noinst_LIBRARIES = libaria2c.a libaria2c_a_SOURCES = $(SRCS) diff --git a/src/OptionHandlerFactory.cc b/src/OptionHandlerFactory.cc index 835d2388..6715b31a 100644 --- a/src/OptionHandlerFactory.cc +++ b/src/OptionHandlerFactory.cc @@ -329,6 +329,9 @@ std::vector OptionHandlerFactory::createOptionHandlers() } { std::string params[] = { +#ifdef HAVE_LIBUV + V_LIBUV, +#endif // HAVE_LIBUV #ifdef HAVE_EPOLL V_EPOLL, #endif // HAVE_EPOLL @@ -346,15 +349,17 @@ std::vector OptionHandlerFactory::createOptionHandlers() OptionHandler* op(new ParameterOptionHandler (PREF_EVENT_POLL, TEXT_EVENT_POLL, -#ifdef HAVE_EPOLL +#ifdef HAVE_LIBUV + V_LIBUV, +#elif defined(HAVE_EPOLL) V_EPOLL, -#elif HAVE_KQUEUE +#elif defined(HAVE_KQUEUE) V_KQUEUE, -#elif HAVE_PORT_ASSOCIATE +#elif defined(HAVE_PORT_ASSOCIATE) V_PORT, #else V_SELECT, -#endif // !HAVE_EPOLL +#endif // !HAVE_LIBUV std::vector (vbegin(params), vend(params)))); op->addTag(TAG_ADVANCED); diff --git a/src/SocketCore.cc b/src/SocketCore.cc index b7ed9d85..4fcf8ea0 100644 --- a/src/SocketCore.cc +++ b/src/SocketCore.cc @@ -1217,7 +1217,7 @@ const uint32_t APIPA_IPV4_END = 2852061183u; // 169.254.255.255 void checkAddrconfig() { -#ifdef __MINGW32__ +#ifdef HAVE_IPHLPAPI_H A2_LOG_INFO("Checking configured addresses"); ULONG bufsize = 15*1024; ULONG retval = 0; diff --git a/src/prefs.cc b/src/prefs.cc index c937b958..fc7c8e37 100644 --- a/src/prefs.cc +++ b/src/prefs.cc @@ -153,6 +153,7 @@ const std::string V_ERROR("error"); const std::string V_INORDER("inorder"); const std::string V_FEEDBACK("feedback"); const std::string V_ADAPTIVE("adaptive"); +const std::string V_LIBUV("libuv"); const std::string V_EPOLL("epoll"); const std::string V_KQUEUE("kqueue"); const std::string V_PORT("port"); diff --git a/src/prefs.h b/src/prefs.h index 92c699a2..f62bfd7c 100644 --- a/src/prefs.h +++ b/src/prefs.h @@ -88,6 +88,7 @@ extern const std::string V_ERROR; extern const std::string V_INORDER; extern const std::string V_FEEDBACK; extern const std::string V_ADAPTIVE; +extern const std::string V_LIBUV; extern const std::string V_EPOLL; extern const std::string V_KQUEUE; extern const std::string V_PORT;