From 9acd3df3cb833e429687347a0d049306353c6e2a Mon Sep 17 00:00:00 2001 From: Nils Maier Date: Wed, 10 Apr 2013 08:47:20 +0200 Subject: [PATCH 1/6] LibUV: Implement LibuvEventPoll LibUV event will use the best available polling method on a system, kind of like aria2 does already with the different *EventPoll implementations. However, libuv may support different/newer polling mechanisms; for example on Windows it will use IO Completion Ports which are superior to select() ;) --- configure.ac | 70 ++++++-- src/DownloadEngineFactory.cc | 14 ++ src/LibuvEventPoll.cc | 308 +++++++++++++++++++++++++++++++++++ src/LibuvEventPoll.h | 133 +++++++++++++++ src/Makefile.am | 4 + src/OptionHandlerFactory.cc | 13 +- src/SocketCore.cc | 2 +- src/prefs.cc | 1 + src/prefs.h | 1 + 9 files changed, 525 insertions(+), 21 deletions(-) create mode 100644 src/LibuvEventPoll.cc create mode 100644 src/LibuvEventPoll.h diff --git a/configure.ac b/configure.ac index f39ff0a9..ac279fa6 100644 --- a/configure.ac +++ b/configure.ac @@ -17,13 +17,14 @@ AC_CONFIG_HEADERS([config.h]) case "$host" in *mingw*) win_build=yes - LIBS="$LIBS -lws2_32 -lwsock32 -lgdi32 -lwinmm -liphlpapi" + LIBS="$LIBS -lws2_32 -lwsock32 -lgdi32 -lwinmm -liphlpapi -lpsapi" ;; esac AC_DEFINE_UNQUOTED([TARGET], ["$target"], [Define target-type]) # Checks for arguments. +ARIA2_ARG_WITHOUT([libuv]) ARIA2_ARG_WITHOUT([appletls]) ARIA2_ARG_WITHOUT([gnutls]) ARIA2_ARG_WITHOUT([libnettle]) @@ -110,6 +111,35 @@ if test "x$with_libz" = "xyes"; then fi fi +if test "x$with_libuv" = "xyes"; then + if test "x$win_build" = "xyes"; then + old_CPPFLAGS=$CPPFLAGS + CPPFLAGS="$CPPFLAGS -D_WIN32_WINNT=0x0600" + AC_SEARCH_LIBS([uv_poll_init_socket], [uv], [ + AC_CHECK_HEADER([uv.h], [have_libuv=yes], [have_libuv=no]) + break; + ], [have_libuv=no]) + if test "x$have_libuv" = "xyes"; then + AC_DEFINE([HAVE_LIBUV], [1], [Define to 1 if you have libuv.]) + else + CPPFLAGS=$old_CPPFLAGS + fi + else + AC_SEARCH_LIBS([uv_poll_init_socket], [uv], [ + AC_CHECK_HEADER([uv.h], [have_libuv=yes], [have_libuv=no]) + break; + ], [have_libuv=no]) + if test "x$have_libuv" = "xyes"; then + AC_DEFINE([HAVE_LIBUV], [1], [Define to 1 if you have libuv.]) + fi + fi + + if test "x$with_libuv_requested" = "xyes"; then + ARIA2_DEP_NOT_MET([libuv]) + fi +fi +AM_CONDITIONAL([HAVE_LIBUV], [test "x$have_libuv" = "xyes"]) + if test "x$with_libxml2" = "xyes"; then AM_PATH_XML2([2.6.24], [have_libxml2=yes]) if test "x$have_libxml2" = "xyes"; then @@ -386,18 +416,25 @@ AC_HEADER_STDC case "$host" in *mingw*) - AC_CHECK_HEADERS([windows.h \ - winsock2.h \ - ws2tcpip.h \ - mmsystem.h \ - io.h \ - iphlpapi.h\ - winioctl.h \ - share.h], [], [], -[[#ifdef HAVE_WINDOWS_H + AC_CHECK_HEADERS([windows.h \ + winsock2.h \ + ws2tcpip.h \ + mmsystem.h \ + io.h \ + iphlpapi.h\ + winioctl.h \ + share.h], [], [], + [[ +#ifdef HAVE_WS2TCPIP_H +# include +#endif +#ifdef HAVE_WINSOCK2_H +# include +#endif +#ifdef HAVE_WINDOWS_H # include #endif -]]) + ]]) ;; esac @@ -414,8 +451,8 @@ AC_CHECK_HEADERS([argz.h \ netdb.h \ netinet/in.h \ netinet/tcp.h \ - poll.h \ - port.h \ + poll.h \ + port.h \ signal.h \ stddef.h \ stdint.h \ @@ -432,10 +469,10 @@ AC_CHECK_HEADERS([argz.h \ sys/uio.h \ termios.h \ unistd.h \ - utime.h \ + utime.h \ wchar.h \ - ifaddrs.h \ - pwd.h]) + ifaddrs.h \ + pwd.h]) # Checks for typedefs, structures, and compiler characteristics. AC_HEADER_STDBOOL @@ -719,6 +756,7 @@ echo "CPPFLAGS: $CPPFLAGS" echo "LDFLAGS: $LDFLAGS" echo "LIBS: $LIBS" echo "DEFS: $DEFS" +echo "LibUV: $have_libuv" echo "SQLite3: $have_sqlite3" echo "SSL Support: $have_ssl" echo "AppleTLS: $have_appletls" diff --git a/src/DownloadEngineFactory.cc b/src/DownloadEngineFactory.cc index ac7048bb..164b1b15 100644 --- a/src/DownloadEngineFactory.cc +++ b/src/DownloadEngineFactory.cc @@ -59,6 +59,9 @@ #include "a2io.h" #include "DownloadContext.h" #include "array_fun.h" +#ifdef HAVE_LIBUV +# include "LibuvEventPoll.h" +#endif // HAVE_LIBUV #ifdef HAVE_EPOLL # include "EpollEventPoll.h" #endif // HAVE_EPOLL @@ -89,6 +92,17 @@ DownloadEngineFactory::newDownloadEngine op->getAsInt(PREF_MAX_CONCURRENT_DOWNLOADS); SharedHandle eventPoll; const std::string& pollMethod = op->get(PREF_EVENT_POLL); +#ifdef HAVE_LIBUV + if (pollMethod == V_LIBUV) { + SharedHandle ep(new LibuvEventPoll()); + if (!ep->good()) { + throw DL_ABORT_EX("Initializing LibuvEventPoll failed." + " Try --event-poll=select"); + } + eventPoll = ep; + } + else +#endif // HAVE_LIBUV #ifdef HAVE_EPOLL if(pollMethod == V_EPOLL) { SharedHandle ep(new EpollEventPoll()); diff --git a/src/LibuvEventPoll.cc b/src/LibuvEventPoll.cc new file mode 100644 index 00000000..89fa326d --- /dev/null +++ b/src/LibuvEventPoll.cc @@ -0,0 +1,308 @@ +/* */ + +#ifdef __MINGW32__ +#ifdef _WIN32_WINNT +#undef _WIN32_WINNT +#endif // _WIN32_WINNT +#define _WIN32_WINNT 0x0600 +#endif // __MINGW32__ + +#include "LibuvEventPoll.h" + +#include +#include +#include +#include + +#include + +#include "Command.h" +#include "LogFactory.h" +#include "Logger.h" +#include "a2functional.h" +#include "fmt.h" +#include "util.h" + +namespace { + template + static void close_callback(uv_handle_t* handle) + { + delete reinterpret_cast(handle); + } + + static void timer_callback(uv_timer_t* handle, int status) {} +} + +namespace aria2 { + +LibuvEventPoll::KSocketEntry::KSocketEntry(sock_t s) + : SocketEntry(s) +{} + +int accumulateEvent(int events, const LibuvEventPoll::KEvent& event) +{ + return events|event.getEvents(); +} + +int LibuvEventPoll::KSocketEntry::getEvents() +{ + int events = 0; +#ifdef ENABLE_ASYNC_DNS + events = std::accumulate(adnsEvents_.begin(), adnsEvents_.end(), + std::accumulate(commandEvents_.begin(), + commandEvents_.end(), 0, + accumulateEvent), + accumulateEvent); +#else // !ENABLE_ASYNC_DNS + events = std::accumulate(commandEvents_.begin(), commandEvents_.end(), 0, + accumulateEvent); +#endif // !ENABLE_ASYNC_DNS + return events; +} + +LibuvEventPoll::LibuvEventPoll() +{ + loop = uv_loop_new(); +} + +LibuvEventPoll::~LibuvEventPoll() +{ + for (KPolls::iterator i = polls_.begin(), e = polls_.end(); i != e; ++i) { + uv_poll_stop(i->second); + uv_close((uv_handle_t*)i->second, close_callback); + } + // Actually kill the polls, and timers, if any + uv_run(loop, (uv_run_mode)(UV_RUN_ONCE | UV_RUN_NOWAIT)); + + if (loop) { + uv_loop_delete(loop); + loop = 0; + } + polls_.clear(); +} + +void LibuvEventPoll::poll(const struct timeval& tv) +{ + int timeout = tv.tv_sec * 1000 + tv.tv_usec / 1000; + + if (timeout > 0) { + uv_timer_t* timer = new uv_timer_t; + uv_timer_init(loop, timer); + uv_timer_start(timer, timer_callback, timeout, timeout); + + uv_run(loop, UV_RUN_ONCE); + + // Remove timer again. + uv_timer_stop(timer); + uv_close((uv_handle_t*)timer, close_callback); + } + else { + uv_run(loop, (uv_run_mode)(UV_RUN_ONCE | UV_RUN_NOWAIT)); + } + +#ifdef ENABLE_ASYNC_DNS + // It turns out that we have to call ares_process_fd before ares's + // own timeout and ares may create new sockets or closes socket in + // their API. So we call ares_process_fd for all ares_channel and + // re-register their sockets. + for(KAsyncNameResolverEntrySet::iterator i = nameResolverEntries_.begin(), + eoi = nameResolverEntries_.end(); i != eoi; ++i) { + (*i)->processTimeout(); + (*i)->removeSocketEvents(this); + (*i)->addSocketEvents(this); + } +#endif // ENABLE_ASYNC_DNS + + // TODO timeout of name resolver is determined in Command(AbstractCommand, + // DHTEntryPoint...Command) +} + +int LibuvEventPoll::translateEvents(EventPoll::EventType events) +{ + int newEvents = 0; + if (EventPoll::EVENT_READ & events) { + newEvents |= IEV_READ; + } + if (EventPoll::EVENT_WRITE & events) { + newEvents |= IEV_WRITE; + } + if (EventPoll::EVENT_ERROR & events) { + newEvents |= IEV_ERROR; + } + if (EventPoll::EVENT_HUP & events) { + newEvents |= IEV_HUP; + } + return newEvents; +} + +void LibuvEventPoll::poll_callback(uv_poll_t* handle, int status, int events) +{ + if (status == -1) { + events = 0; + uv_err_t err = uv_last_error(handle->loop); + switch (err.code) { + case UV_EAGAIN: + return; + case UV_EOF: + case UV_ECONNABORTED: + case UV_ECONNREFUSED: + case UV_ECONNRESET: + case UV_ENOTCONN: + case UV_EPIPE: + case UV_ESHUTDOWN: + events = IEV_HUP; + default: + events = IEV_ERROR; + } + } + KSocketEntry *p = reinterpret_cast(handle->data); + p->processEvents(events); +} + +bool LibuvEventPoll::addEvents(sock_t socket, + const LibuvEventPoll::KEvent& event) +{ + SharedHandle socketEntry(new KSocketEntry(socket)); + KSocketEntrySet::iterator i = socketEntries_.lower_bound(socketEntry); + if (i != socketEntries_.end() && **i == *socketEntry) { + event.addSelf(*i); + KPolls::iterator poll = polls_.find(socket); + if (poll == polls_.end()) { + throw std::logic_error("Invalid socket"); + } + uv_poll_start(poll->second, (*i)->getEvents() & (IEV_READ | IEV_WRITE), + poll_callback); + } + else { + socketEntries_.insert(i, socketEntry); + event.addSelf(socketEntry); + uv_poll_t *poll = new uv_poll_t; + uv_poll_init_socket(loop, poll, socket); + poll->data = socketEntry.get(); + uv_poll_start(poll, socketEntry->getEvents() & (IEV_READ | IEV_WRITE), + poll_callback); + polls_[socket] = poll; + } + return true; +} + +bool LibuvEventPoll::addEvents(sock_t socket, Command* command, EventPoll::EventType events) +{ + int pollEvents = translateEvents(events); + return addEvents(socket, KCommandEvent(command, pollEvents)); +} + +#ifdef ENABLE_ASYNC_DNS +bool LibuvEventPoll::addEvents(sock_t socket, Command* command, int events, + const SharedHandle& rs) +{ + return addEvents(socket, KADNSEvent(rs, command, socket, events)); +} +#endif // ENABLE_ASYNC_DNS + +bool LibuvEventPoll::deleteEvents(sock_t socket, + const LibuvEventPoll::KEvent& event) +{ + SharedHandle socketEntry(new KSocketEntry(socket)); + KSocketEntrySet::iterator i = socketEntries_.find(socketEntry); + if(i == socketEntries_.end()) { + A2_LOG_DEBUG(fmt("Socket %d is not found in SocketEntries.", socket)); + return false; + } + + event.removeSelf(*i); + + if ((*i)->eventEmpty()) { + KPolls::iterator poll = polls_.find(socket); + if (poll == polls_.end()) { + return false; + } + uv_poll_stop(poll->second); + uv_close((uv_handle_t*)poll->second, close_callback); + polls_.erase(poll); + socketEntries_.erase(i); + } + return true; +} + +#ifdef ENABLE_ASYNC_DNS +bool LibuvEventPoll::deleteEvents(sock_t socket, Command* command, + const SharedHandle& rs) +{ + return deleteEvents(socket, KADNSEvent(rs, command, socket, 0)); +} +#endif // ENABLE_ASYNC_DNS + +bool LibuvEventPoll::deleteEvents(sock_t socket, Command* command, + EventPoll::EventType events) +{ + int pollEvents = translateEvents(events); + return deleteEvents(socket, KCommandEvent(command, pollEvents)); +} + +#ifdef ENABLE_ASYNC_DNS +bool LibuvEventPoll::addNameResolver(const SharedHandle& resolver, + Command* command) +{ + SharedHandle entry( + new KAsyncNameResolverEntry(resolver, command)); + KAsyncNameResolverEntrySet::iterator itr = + nameResolverEntries_.lower_bound(entry); + if (itr != nameResolverEntries_.end() && *(*itr) == *entry) { + return false; + } + nameResolverEntries_.insert(itr, entry); + entry->addSocketEvents(this); + return true; +} + +bool LibuvEventPoll::deleteNameResolver(const SharedHandle& resolver, + Command* command) +{ + SharedHandle entry( + new KAsyncNameResolverEntry(resolver, command)); + KAsyncNameResolverEntrySet::iterator itr = nameResolverEntries_.find(entry); + if (itr == nameResolverEntries_.end()) { + return false; + } + (*itr)->removeSocketEvents(this); + nameResolverEntries_.erase(itr); + return true; +} +#endif // ENABLE_ASYNC_DNS + +} // namespace aria2 diff --git a/src/LibuvEventPoll.h b/src/LibuvEventPoll.h new file mode 100644 index 00000000..54ced26b --- /dev/null +++ b/src/LibuvEventPoll.h @@ -0,0 +1,133 @@ +/* */ +#ifndef D_LIBUV_EVENT_POLL_H +#define D_LIBUV_EVENT_POLL_H + +#include "EventPoll.h" + +#include +#include + +#include + +#include "Event.h" +#include "a2functional.h" + +#ifdef ENABLE_ASYNC_DNS +#include "AsyncNameResolver.h" +#endif // ENABLE_ASYNC_DNS + +namespace aria2 { + +class LibuvEventPoll : public EventPoll { +private: + class KSocketEntry; + + typedef Event KEvent; + typedef CommandEvent KCommandEvent; + typedef ADNSEvent KADNSEvent; + typedef AsyncNameResolverEntry KAsyncNameResolverEntry; + friend class AsyncNameResolverEntry; + + class KSocketEntry: + public SocketEntry { + public: + KSocketEntry(sock_t socket); + int getEvents(); + }; + + friend int accumulateEvent(int events, const KEvent& event); + +private: + uv_loop_t* loop; + + typedef std::set, + DerefLess > > KSocketEntrySet; + KSocketEntrySet socketEntries_; + + typedef std::map KPolls; + KPolls polls_; + +#ifdef ENABLE_ASYNC_DNS + typedef std::set, + DerefLess > > + KAsyncNameResolverEntrySet; + KAsyncNameResolverEntrySet nameResolverEntries_; +#endif // ENABLE_ASYNC_DNS + + bool addEvents(sock_t socket, const KEvent& event); + bool deleteEvents(sock_t socket, const KEvent& event); + +#ifdef ENABLE_ASYNC_DNS + bool addEvents(sock_t socket, Command* command, int events, + const SharedHandle& rs); + bool deleteEvents(sock_t socket, Command* command, + const SharedHandle& rs); +#endif + + static int translateEvents(EventPoll::EventType events); + static void poll_callback(uv_poll_t* handle, int status, int events); + +public: + LibuvEventPoll(); + + virtual ~LibuvEventPoll(); + + bool good() const { return loop; } + + virtual void poll(const struct timeval& tv); + + virtual bool addEvents(sock_t socket, + Command* command, EventPoll::EventType events); + + virtual bool deleteEvents(sock_t socket, + Command* command, EventPoll::EventType events); +#ifdef ENABLE_ASYNC_DNS + + virtual bool addNameResolver(const SharedHandle& resolver, + Command* command); + virtual bool deleteNameResolver + (const SharedHandle& resolver, Command* command); +#endif // ENABLE_ASYNC_DNS + + static const int IEV_READ = UV_READABLE; + static const int IEV_WRITE = UV_WRITABLE; + static const int IEV_ERROR = 128; + static const int IEV_HUP = 255; +}; + +} // namespace aria2 + +#endif // D_LIBUV_EVENT_POLL_H diff --git a/src/Makefile.am b/src/Makefile.am index 594d4c35..6333d73a 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -629,6 +629,10 @@ if HAVE_KQUEUE SRCS += KqueueEventPoll.cc KqueueEventPoll.h endif # HAVE_KQUEUE +if HAVE_LIBUV +SRCS += LibuvEventPoll.cc LibuvEventPoll.h +endif # HAVE_LIBUV + AR = @AR@ noinst_LIBRARIES = libaria2c.a libaria2c_a_SOURCES = $(SRCS) diff --git a/src/OptionHandlerFactory.cc b/src/OptionHandlerFactory.cc index 835d2388..6715b31a 100644 --- a/src/OptionHandlerFactory.cc +++ b/src/OptionHandlerFactory.cc @@ -329,6 +329,9 @@ std::vector OptionHandlerFactory::createOptionHandlers() } { std::string params[] = { +#ifdef HAVE_LIBUV + V_LIBUV, +#endif // HAVE_LIBUV #ifdef HAVE_EPOLL V_EPOLL, #endif // HAVE_EPOLL @@ -346,15 +349,17 @@ std::vector OptionHandlerFactory::createOptionHandlers() OptionHandler* op(new ParameterOptionHandler (PREF_EVENT_POLL, TEXT_EVENT_POLL, -#ifdef HAVE_EPOLL +#ifdef HAVE_LIBUV + V_LIBUV, +#elif defined(HAVE_EPOLL) V_EPOLL, -#elif HAVE_KQUEUE +#elif defined(HAVE_KQUEUE) V_KQUEUE, -#elif HAVE_PORT_ASSOCIATE +#elif defined(HAVE_PORT_ASSOCIATE) V_PORT, #else V_SELECT, -#endif // !HAVE_EPOLL +#endif // !HAVE_LIBUV std::vector (vbegin(params), vend(params)))); op->addTag(TAG_ADVANCED); diff --git a/src/SocketCore.cc b/src/SocketCore.cc index b7ed9d85..4fcf8ea0 100644 --- a/src/SocketCore.cc +++ b/src/SocketCore.cc @@ -1217,7 +1217,7 @@ const uint32_t APIPA_IPV4_END = 2852061183u; // 169.254.255.255 void checkAddrconfig() { -#ifdef __MINGW32__ +#ifdef HAVE_IPHLPAPI_H A2_LOG_INFO("Checking configured addresses"); ULONG bufsize = 15*1024; ULONG retval = 0; diff --git a/src/prefs.cc b/src/prefs.cc index c937b958..fc7c8e37 100644 --- a/src/prefs.cc +++ b/src/prefs.cc @@ -153,6 +153,7 @@ const std::string V_ERROR("error"); const std::string V_INORDER("inorder"); const std::string V_FEEDBACK("feedback"); const std::string V_ADAPTIVE("adaptive"); +const std::string V_LIBUV("libuv"); const std::string V_EPOLL("epoll"); const std::string V_KQUEUE("kqueue"); const std::string V_PORT("port"); diff --git a/src/prefs.h b/src/prefs.h index 92c699a2..f62bfd7c 100644 --- a/src/prefs.h +++ b/src/prefs.h @@ -88,6 +88,7 @@ extern const std::string V_ERROR; extern const std::string V_INORDER; extern const std::string V_FEEDBACK; extern const std::string V_ADAPTIVE; +extern const std::string V_LIBUV; extern const std::string V_EPOLL; extern const std::string V_KQUEUE; extern const std::string V_PORT; From 1cd5dcc9b692ef9c32bcd5aaada54a9c80636595 Mon Sep 17 00:00:00 2001 From: Nils Maier Date: Wed, 10 Apr 2013 21:54:34 +0200 Subject: [PATCH 2/6] LibUV: Correct event removal --- src/LibuvEventPoll.cc | 84 +++++++++++++++++++++++++++---------------- src/LibuvEventPoll.h | 22 +++++++++--- src/OptionParser.cc | 2 +- 3 files changed, 73 insertions(+), 35 deletions(-) diff --git a/src/LibuvEventPoll.cc b/src/LibuvEventPoll.cc index 89fa326d..c2e89709 100644 --- a/src/LibuvEventPoll.cc +++ b/src/LibuvEventPoll.cc @@ -58,13 +58,17 @@ #include "util.h" namespace { + using namespace aria2; template static void close_callback(uv_handle_t* handle) { delete reinterpret_cast(handle); } - static void timer_callback(uv_timer_t* handle, int status) {} + static void timer_callback(uv_timer_t* handle, int status) + { + uv_stop(handle->loop); + } } namespace aria2 { @@ -96,21 +100,21 @@ int LibuvEventPoll::KSocketEntry::getEvents() LibuvEventPoll::LibuvEventPoll() { - loop = uv_loop_new(); + loop_ = uv_loop_new(); } LibuvEventPoll::~LibuvEventPoll() { for (KPolls::iterator i = polls_.begin(), e = polls_.end(); i != e; ++i) { - uv_poll_stop(i->second); - uv_close((uv_handle_t*)i->second, close_callback); + uv_poll_stop(&i->second->p); + uv_close((uv_handle_t*)&i->second->p, close_poll_callback); } // Actually kill the polls, and timers, if any - uv_run(loop, (uv_run_mode)(UV_RUN_ONCE | UV_RUN_NOWAIT)); + uv_run(loop_, (uv_run_mode)(UV_RUN_ONCE | UV_RUN_NOWAIT)); - if (loop) { - uv_loop_delete(loop); - loop = 0; + if (loop_) { + uv_loop_delete(loop_); + loop_ = 0; } polls_.clear(); } @@ -119,19 +123,19 @@ void LibuvEventPoll::poll(const struct timeval& tv) { int timeout = tv.tv_sec * 1000 + tv.tv_usec / 1000; - if (timeout > 0) { + if (timeout >= 0) { uv_timer_t* timer = new uv_timer_t; - uv_timer_init(loop, timer); + uv_timer_init(loop_, timer); uv_timer_start(timer, timer_callback, timeout, timeout); - uv_run(loop, UV_RUN_ONCE); + uv_run(loop_, UV_RUN_DEFAULT); // Remove timer again. uv_timer_stop(timer); uv_close((uv_handle_t*)timer, close_callback); } else { - uv_run(loop, (uv_run_mode)(UV_RUN_ONCE | UV_RUN_NOWAIT)); + while (uv_run(loop_, (uv_run_mode)(UV_RUN_ONCE | UV_RUN_NOWAIT)) > 0) {} } #ifdef ENABLE_ASYNC_DNS @@ -169,13 +173,14 @@ int LibuvEventPoll::translateEvents(EventPoll::EventType events) return newEvents; } -void LibuvEventPoll::poll_callback(uv_poll_t* handle, int status, int events) +void LibuvEventPoll::pollCallback(uv_poll_t* handle, poll_t* poll, int status, + int events) { if (status == -1) { - events = 0; - uv_err_t err = uv_last_error(handle->loop); + uv_err_t err = uv_last_error(loop_); switch (err.code) { case UV_EAGAIN: + case UV_EINTR: return; case UV_EOF: case UV_ECONNABORTED: @@ -185,12 +190,22 @@ void LibuvEventPoll::poll_callback(uv_poll_t* handle, int status, int events) case UV_EPIPE: case UV_ESHUTDOWN: events = IEV_HUP; + poll->entry->processEvents(events); + uv_poll_stop(handle); + uv_stop(loop_); + return; default: events = IEV_ERROR; + poll->entry->processEvents(events); + uv_poll_stop(handle); + uv_stop(loop_); + return; } } - KSocketEntry *p = reinterpret_cast(handle->data); - p->processEvents(events); + + // Got something + poll->entry->processEvents(events); + uv_stop(loop_); } bool LibuvEventPoll::addEvents(sock_t socket, @@ -204,18 +219,22 @@ bool LibuvEventPoll::addEvents(sock_t socket, if (poll == polls_.end()) { throw std::logic_error("Invalid socket"); } - uv_poll_start(poll->second, (*i)->getEvents() & (IEV_READ | IEV_WRITE), - poll_callback); + poll->second->events = (*i)->getEvents(); + uv_poll_start(&poll->second->p, + poll->second->events & (IEV_READ | IEV_WRITE), poll_callback); } else { socketEntries_.insert(i, socketEntry); event.addSelf(socketEntry); - uv_poll_t *poll = new uv_poll_t; - uv_poll_init_socket(loop, poll, socket); - poll->data = socketEntry.get(); - uv_poll_start(poll, socketEntry->getEvents() & (IEV_READ | IEV_WRITE), - poll_callback); + poll_t *poll = new poll_t; + uv_poll_init_socket(loop_, &poll->p, socket); + poll->entry = socketEntry.get(); + poll->eventer = this; + poll->events = socketEntry->getEvents(); + poll->p.data = poll; polls_[socket] = poll; + uv_poll_start(&poll->p, poll->events & (IEV_READ | IEV_WRITE), + poll_callback); } return true; } @@ -246,16 +265,21 @@ bool LibuvEventPoll::deleteEvents(sock_t socket, event.removeSelf(*i); + KPolls::iterator poll = polls_.find(socket); + if (poll == polls_.end()) { + return false; + } if ((*i)->eventEmpty()) { - KPolls::iterator poll = polls_.find(socket); - if (poll == polls_.end()) { - return false; - } - uv_poll_stop(poll->second); - uv_close((uv_handle_t*)poll->second, close_callback); + uv_poll_stop(&poll->second->p); + uv_close((uv_handle_t*)&poll->second->p, close_poll_callback); polls_.erase(poll); socketEntries_.erase(i); } + else { + poll->second->events = (*i)->getEvents(); + uv_poll_start(&poll->second->p, + poll->second->events & (IEV_READ | IEV_WRITE), poll_callback); + } return true; } diff --git a/src/LibuvEventPoll.h b/src/LibuvEventPoll.h index 54ced26b..330f1b60 100644 --- a/src/LibuvEventPoll.h +++ b/src/LibuvEventPoll.h @@ -71,13 +71,20 @@ private: friend int accumulateEvent(int events, const KEvent& event); private: - uv_loop_t* loop; + uv_loop_t* loop_; typedef std::set, DerefLess > > KSocketEntrySet; KSocketEntrySet socketEntries_; - typedef std::map KPolls; + typedef struct { + uv_poll_t p; + KSocketEntry *entry; + LibuvEventPoll *eventer; + int events; + } poll_t; + + typedef std::map KPolls; KPolls polls_; #ifdef ENABLE_ASYNC_DNS @@ -98,14 +105,21 @@ private: #endif static int translateEvents(EventPoll::EventType events); - static void poll_callback(uv_poll_t* handle, int status, int events); + static void close_poll_callback(uv_handle_t* handle) { + delete static_cast(handle->data); + } + static void poll_callback(uv_poll_t* handle, int status, int events) { + poll_t* poll = static_cast(handle->data); + poll->eventer->pollCallback(handle, poll, status, events); + } + void pollCallback(uv_poll_t* handle, poll_t *poll, int status, int events); public: LibuvEventPoll(); virtual ~LibuvEventPoll(); - bool good() const { return loop; } + bool good() const { return loop_; } virtual void poll(const struct timeval& tv); diff --git a/src/OptionParser.cc b/src/OptionParser.cc index 8aed0259..96700cff 100644 --- a/src/OptionParser.cc +++ b/src/OptionParser.cc @@ -240,7 +240,7 @@ void OptionParser::parse(Option& option, std::istream& is) const if(handler) { handler->parse(option, std::string(nv.second.first, nv.second.second)); } else { - A2_LOG_WARN(fmt("Unknown option: %s", line.c_str())); + //A2_LOG_WARN(fmt("Unknown option: %s", line.c_str())); } } } From 539fda0b4fbfc0a343788b16bdbfece93d9bae43 Mon Sep 17 00:00:00 2001 From: Nils Maier Date: Thu, 11 Apr 2013 03:05:14 +0200 Subject: [PATCH 3/6] LibUV: Code cleanup --- src/LibuvEventPoll.cc | 71 ++++++++++++++++------------------ src/LibuvEventPoll.h | 90 +++++++++++++++++++++++++++---------------- 2 files changed, 90 insertions(+), 71 deletions(-) diff --git a/src/LibuvEventPoll.cc b/src/LibuvEventPoll.cc index c2e89709..2509357f 100644 --- a/src/LibuvEventPoll.cc +++ b/src/LibuvEventPoll.cc @@ -59,6 +59,7 @@ namespace { using namespace aria2; + template static void close_callback(uv_handle_t* handle) { @@ -77,12 +78,12 @@ LibuvEventPoll::KSocketEntry::KSocketEntry(sock_t s) : SocketEntry(s) {} -int accumulateEvent(int events, const LibuvEventPoll::KEvent& event) +inline int accumulateEvent(int events, const LibuvEventPoll::KEvent& event) { - return events|event.getEvents(); + return events | event.getEvents(); } -int LibuvEventPoll::KSocketEntry::getEvents() +int LibuvEventPoll::KSocketEntry::getEvents() const { int events = 0; #ifdef ENABLE_ASYNC_DNS @@ -95,6 +96,7 @@ int LibuvEventPoll::KSocketEntry::getEvents() events = std::accumulate(commandEvents_.begin(), commandEvents_.end(), 0, accumulateEvent); #endif // !ENABLE_ASYNC_DNS + return events; } @@ -106,23 +108,25 @@ LibuvEventPoll::LibuvEventPoll() LibuvEventPoll::~LibuvEventPoll() { for (KPolls::iterator i = polls_.begin(), e = polls_.end(); i != e; ++i) { - uv_poll_stop(&i->second->p); - uv_close((uv_handle_t*)&i->second->p, close_poll_callback); + i->second->close(); } - // Actually kill the polls, and timers, if any + // Actually kill the polls, and timers, if any. uv_run(loop_, (uv_run_mode)(UV_RUN_ONCE | UV_RUN_NOWAIT)); if (loop_) { uv_loop_delete(loop_); loop_ = 0; } + + // Need this to free only after the loop is gone. polls_.clear(); } void LibuvEventPoll::poll(const struct timeval& tv) { - int timeout = tv.tv_sec * 1000 + tv.tv_usec / 1000; + const int timeout = tv.tv_sec * 1000 + tv.tv_usec / 1000; + // timeout == 0 will tick once if (timeout >= 0) { uv_timer_t* timer = new uv_timer_t; uv_timer_init(loop_, timer); @@ -173,8 +177,7 @@ int LibuvEventPoll::translateEvents(EventPoll::EventType events) return newEvents; } -void LibuvEventPoll::pollCallback(uv_poll_t* handle, poll_t* poll, int status, - int events) +void LibuvEventPoll::pollCallback(KPoll* poll, int status, int events) { if (status == -1) { uv_err_t err = uv_last_error(loop_); @@ -190,21 +193,21 @@ void LibuvEventPoll::pollCallback(uv_poll_t* handle, poll_t* poll, int status, case UV_EPIPE: case UV_ESHUTDOWN: events = IEV_HUP; - poll->entry->processEvents(events); - uv_poll_stop(handle); + poll->processEvents(events); + poll->stop(); uv_stop(loop_); return; default: events = IEV_ERROR; - poll->entry->processEvents(events); - uv_poll_stop(handle); + poll->processEvents(events); + poll->stop(); uv_stop(loop_); return; } } // Got something - poll->entry->processEvents(events); + poll->processEvents(events); uv_stop(loop_); } @@ -213,29 +216,22 @@ bool LibuvEventPoll::addEvents(sock_t socket, { SharedHandle socketEntry(new KSocketEntry(socket)); KSocketEntrySet::iterator i = socketEntries_.lower_bound(socketEntry); + if (i != socketEntries_.end() && **i == *socketEntry) { event.addSelf(*i); KPolls::iterator poll = polls_.find(socket); if (poll == polls_.end()) { throw std::logic_error("Invalid socket"); } - poll->second->events = (*i)->getEvents(); - uv_poll_start(&poll->second->p, - poll->second->events & (IEV_READ | IEV_WRITE), poll_callback); - } - else { - socketEntries_.insert(i, socketEntry); - event.addSelf(socketEntry); - poll_t *poll = new poll_t; - uv_poll_init_socket(loop_, &poll->p, socket); - poll->entry = socketEntry.get(); - poll->eventer = this; - poll->events = socketEntry->getEvents(); - poll->p.data = poll; - polls_[socket] = poll; - uv_poll_start(&poll->p, poll->events & (IEV_READ | IEV_WRITE), - poll_callback); + poll->second->start(); + return true; } + + socketEntries_.insert(i, socketEntry); + event.addSelf(socketEntry); + KPoll* poll = new KPoll(this, socketEntry.get(), socket); + polls_[socket] = poll; + poll->start(); return true; } @@ -258,7 +254,8 @@ bool LibuvEventPoll::deleteEvents(sock_t socket, { SharedHandle socketEntry(new KSocketEntry(socket)); KSocketEntrySet::iterator i = socketEntries_.find(socketEntry); - if(i == socketEntries_.end()) { + + if (i == socketEntries_.end()) { A2_LOG_DEBUG(fmt("Socket %d is not found in SocketEntries.", socket)); return false; } @@ -269,17 +266,15 @@ bool LibuvEventPoll::deleteEvents(sock_t socket, if (poll == polls_.end()) { return false; } + if ((*i)->eventEmpty()) { - uv_poll_stop(&poll->second->p); - uv_close((uv_handle_t*)&poll->second->p, close_poll_callback); + poll->second->close(); polls_.erase(poll); socketEntries_.erase(i); + return true; } - else { - poll->second->events = (*i)->getEvents(); - uv_poll_start(&poll->second->p, - poll->second->events & (IEV_READ | IEV_WRITE), poll_callback); - } + + poll->second->start(); return true; } diff --git a/src/LibuvEventPoll.h b/src/LibuvEventPoll.h index 330f1b60..5c058737 100644 --- a/src/LibuvEventPoll.h +++ b/src/LibuvEventPoll.h @@ -59,43 +59,74 @@ private: typedef CommandEvent KCommandEvent; typedef ADNSEvent KADNSEvent; typedef AsyncNameResolverEntry KAsyncNameResolverEntry; + friend class AsyncNameResolverEntry; - - class KSocketEntry: - public SocketEntry { - public: - KSocketEntry(sock_t socket); - int getEvents(); - }; - friend int accumulateEvent(int events, const KEvent& event); -private: - uv_loop_t* loop_; + class KSocketEntry: public SocketEntry { + public: + KSocketEntry(sock_t socket); + int getEvents() const; + }; + + class KPoll { + private: + LibuvEventPoll *eventer_; + KSocketEntry *entry_; + uv_poll_t handle_; + + static void poll_callback(uv_poll_t* handle, int status, int events) { + KPoll* poll = static_cast(handle->data); + poll->eventer_->pollCallback(poll, status, events); + } + static void close_callback(uv_handle_t* handle) { + delete static_cast(handle->data); + } + + public: + inline KPoll(LibuvEventPoll* eventer, KSocketEntry* entry, sock_t sock) + : eventer_(eventer), entry_(entry) + { + uv_poll_init_socket(eventer->loop_, &handle_, sock); + handle_.data = this; + } + inline void start() { + uv_poll_start(&handle_, entry_->getEvents() & IEV_RW, poll_callback); + } + inline void stop() { + uv_poll_stop(&handle_); + } + inline void processEvents(int events) { + entry_->processEvents(events); + } + inline void close() { + stop(); + uv_close((uv_handle_t*)&handle_, close_callback); + } + }; typedef std::set, DerefLess > > KSocketEntrySet; - KSocketEntrySet socketEntries_; - typedef struct { - uv_poll_t p; - KSocketEntry *entry; - LibuvEventPoll *eventer; - int events; - } poll_t; - - typedef std::map KPolls; - KPolls polls_; + typedef std::map KPolls; #ifdef ENABLE_ASYNC_DNS typedef std::set, DerefLess > > KAsyncNameResolverEntrySet; +#endif // ENABLE_ASYNC_DNS + + uv_loop_t* loop_; + KSocketEntrySet socketEntries_; + KPolls polls_; + +#ifdef ENABLE_ASYNC_DNS KAsyncNameResolverEntrySet nameResolverEntries_; #endif // ENABLE_ASYNC_DNS bool addEvents(sock_t socket, const KEvent& event); bool deleteEvents(sock_t socket, const KEvent& event); + void pollCallback(KPoll *poll, int status, int events); #ifdef ENABLE_ASYNC_DNS bool addEvents(sock_t socket, Command* command, int events, @@ -105,18 +136,9 @@ private: #endif static int translateEvents(EventPoll::EventType events); - static void close_poll_callback(uv_handle_t* handle) { - delete static_cast(handle->data); - } - static void poll_callback(uv_poll_t* handle, int status, int events) { - poll_t* poll = static_cast(handle->data); - poll->eventer->pollCallback(handle, poll, status, events); - } - void pollCallback(uv_poll_t* handle, poll_t *poll, int status, int events); public: LibuvEventPoll(); - virtual ~LibuvEventPoll(); bool good() const { return loop_; } @@ -125,19 +147,21 @@ public: virtual bool addEvents(sock_t socket, Command* command, EventPoll::EventType events); - virtual bool deleteEvents(sock_t socket, Command* command, EventPoll::EventType events); -#ifdef ENABLE_ASYNC_DNS +#ifdef ENABLE_ASYNC_DNS virtual bool addNameResolver(const SharedHandle& resolver, Command* command); - virtual bool deleteNameResolver - (const SharedHandle& resolver, Command* command); + virtual bool deleteNameResolver( + const SharedHandle& resolver, Command* command); #endif // ENABLE_ASYNC_DNS static const int IEV_READ = UV_READABLE; static const int IEV_WRITE = UV_WRITABLE; + static const int IEV_RW = UV_READABLE | UV_WRITABLE; + + // Make sure these do not interfere with the uv_poll API later. static const int IEV_ERROR = 128; static const int IEV_HUP = 255; }; From 0cccc2d27ad90dae6294e260ba902a5e3afcd0c3 Mon Sep 17 00:00:00 2001 From: Nils Maier Date: Wed, 17 Apr 2013 15:43:25 +0200 Subject: [PATCH 4/6] Libuv: Proper configure detection --- configure.ac | 78 +++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 56 insertions(+), 22 deletions(-) diff --git a/configure.ac b/configure.ac index ac279fa6..ac31d91f 100644 --- a/configure.ac +++ b/configure.ac @@ -112,29 +112,63 @@ if test "x$with_libz" = "xyes"; then fi if test "x$with_libuv" = "xyes"; then - if test "x$win_build" = "xyes"; then - old_CPPFLAGS=$CPPFLAGS - CPPFLAGS="$CPPFLAGS -D_WIN32_WINNT=0x0600" - AC_SEARCH_LIBS([uv_poll_init_socket], [uv], [ - AC_CHECK_HEADER([uv.h], [have_libuv=yes], [have_libuv=no]) - break; - ], [have_libuv=no]) - if test "x$have_libuv" = "xyes"; then - AC_DEFINE([HAVE_LIBUV], [1], [Define to 1 if you have libuv.]) - else - CPPFLAGS=$old_CPPFLAGS - fi - else - AC_SEARCH_LIBS([uv_poll_init_socket], [uv], [ - AC_CHECK_HEADER([uv.h], [have_libuv=yes], [have_libuv=no]) - break; - ], [have_libuv=no]) - if test "x$have_libuv" = "xyes"; then - AC_DEFINE([HAVE_LIBUV], [1], [Define to 1 if you have libuv.]) - fi - fi + case "$host" in + *mingw*|*msvc*) + old_CPPFLAGS=$CPPFLAGS + CPPFLAGS="$CPPFLAGS -D_WIN32_WINNT=0x0600" + AC_SEARCH_LIBS([uv_poll_init_socket], [uv], [ + AC_CHECK_HEADER([uv.h], [have_libuv=yes], [have_libuv=no]) + break; + ], [have_libuv=no]) + if test "x$have_libuv" != "xyes"; then + CPPFLAGS=$old_CPPFLAGS + fi + ;; - if test "x$with_libuv_requested" = "xyes"; then + *darwin*) + old_LDFLAGS=$LDFLAGS + LDFLAGS="$LDFLAGS -framework Foundation -framework CoreServices -framework ApplicationServices" + old_LIBS=$LIBS + LIBS="$LIBS -lm" + AC_SEARCH_LIBS([uv_poll_init_socket], [uv], [ + AC_CHECK_HEADER([uv.h], [have_libuv=yes], [have_libuv=no]) + break; + ], [have_libuv=no]) + if test "x$have_libuv" != "xyes"; then + LDFLAGS=$old_LDFLAGS + LIBS=$old_LIBS + fi + ;; + + *) + dnl Yeah, sucks that luv does not bring a pkg-config or config-tool + AC_MSG_CHECKING([for libuv]) + for combo in "" "-lrt" "-ldl -lrt" "-ldl -lrt -lpthread" "-lkvm"; do + old_LIBS=$LIBS + LIBS="-luv $combo $LIBS -lm" + AC_LINK_IFELSE([AC_LANG_SOURCE([ +extern "C" int uv_poll_init_socket(void); +int main() { return uv_poll_init_socket(); } + ])], [ + AC_MSG_RESULT(-luv $combo -lm) + AC_CHECK_HEADER([uv.h], [have_libuv=yes], [have_libuv=no]) + break; + ], [have_libuv=no]) + if test "x$have_libuv" = "xyes"; then + break; + else + LIBS=$old_LIBS + fi + done + if test "x$have_libuv" != "xyes"; then + AC_MSG_RESULT("no") + fi + ;; + esac + + if test "x$have_libuv" = "xyes"; then + AC_DEFINE([HAVE_LIBUV], [1], [Define to 1 if you have libuv.]) + elif test "x$with_libuv_requested" = "xyes"; then ARIA2_DEP_NOT_MET([libuv]) fi fi From e700ebd3e96638bb0e02dbec17010b13670c6486 Mon Sep 17 00:00:00 2001 From: Nils Maier Date: Sun, 28 Apr 2013 21:44:06 +0200 Subject: [PATCH 5/6] LibUV: Reorder event poll preference *nix will keep epoll/kqueue/port as the default (when available), while Windows, lacking all of these, will default to libuv (when available) --- src/OptionHandlerFactory.cc | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/OptionHandlerFactory.cc b/src/OptionHandlerFactory.cc index 6715b31a..83663cce 100644 --- a/src/OptionHandlerFactory.cc +++ b/src/OptionHandlerFactory.cc @@ -329,9 +329,6 @@ std::vector OptionHandlerFactory::createOptionHandlers() } { std::string params[] = { -#ifdef HAVE_LIBUV - V_LIBUV, -#endif // HAVE_LIBUV #ifdef HAVE_EPOLL V_EPOLL, #endif // HAVE_EPOLL @@ -341,6 +338,9 @@ std::vector OptionHandlerFactory::createOptionHandlers() #ifdef HAVE_PORT_ASSOCIATE V_PORT, #endif // HAVE_PORT_ASSOCIATE +#ifdef HAVE_LIBUV + V_LIBUV, +#endif // HAVE_LIBUV #ifdef HAVE_POLL V_POLL, #endif // HAVE_POLL @@ -349,17 +349,19 @@ std::vector OptionHandlerFactory::createOptionHandlers() OptionHandler* op(new ParameterOptionHandler (PREF_EVENT_POLL, TEXT_EVENT_POLL, -#ifdef HAVE_LIBUV - V_LIBUV, -#elif defined(HAVE_EPOLL) +#if defined(HAVE_EPOLL) V_EPOLL, #elif defined(HAVE_KQUEUE) V_KQUEUE, #elif defined(HAVE_PORT_ASSOCIATE) V_PORT, -#else +#elif defined(HAVE_LIBUV) + V_LIBUV, +#elif defined(HAVE_POLL) + V_POLL, +#else // defined(HAVE_EPOLL) V_SELECT, -#endif // !HAVE_LIBUV +#endif // defined(HAVE_EPOLL) std::vector (vbegin(params), vend(params)))); op->addTag(TAG_ADVANCED); From a1a3e21f73c8c9b1f2941da023c4b66344b54684 Mon Sep 17 00:00:00 2001 From: Nils Maier Date: Sun, 28 Apr 2013 21:54:35 +0200 Subject: [PATCH 6/6] LibUV: Revert unrelated change --- src/OptionParser.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/OptionParser.cc b/src/OptionParser.cc index 96700cff..8aed0259 100644 --- a/src/OptionParser.cc +++ b/src/OptionParser.cc @@ -240,7 +240,7 @@ void OptionParser::parse(Option& option, std::istream& is) const if(handler) { handler->parse(option, std::string(nv.second.first, nv.second.second)); } else { - //A2_LOG_WARN(fmt("Unknown option: %s", line.c_str())); + A2_LOG_WARN(fmt("Unknown option: %s", line.c_str())); } } }