/* */ #ifdef __MINGW32__ #ifdef _WIN32_WINNT #undef _WIN32_WINNT #endif // _WIN32_WINNT #define _WIN32_WINNT 0x0600 #endif // __MINGW32__ #include "LibuvEventPoll.h" #include #include #include #include #include #include "Command.h" #include "LogFactory.h" #include "Logger.h" #include "a2functional.h" #include "fmt.h" #include "util.h" namespace { using namespace aria2; template static void close_callback(uv_handle_t* handle) { delete reinterpret_cast(handle); } #if !defined(UV_VERSION_MINOR) || UV_VERSION_MINOR <= 10 static void timer_callback(uv_timer_t* handle, int status) { uv_stop(handle->loop); } #else // !defined(UV_VERSION_MINOR) || UV_VERSION_MINOR <= 10 static void timer_callback(uv_timer_t* handle) { uv_stop(handle->loop); } #endif // !defined(UV_VERSION_MINOR) || UV_VERSION_MINOR <= 10 } namespace aria2 { LibuvEventPoll::KSocketEntry::KSocketEntry(sock_t s) : SocketEntry(s) {} inline int accumulateEvent(int events, const LibuvEventPoll::KEvent& event) { return events | event.getEvents(); } int LibuvEventPoll::KSocketEntry::getEvents() const { int events = 0; #ifdef ENABLE_ASYNC_DNS events = std::accumulate(adnsEvents_.begin(), adnsEvents_.end(), std::accumulate(commandEvents_.begin(), commandEvents_.end(), 0, accumulateEvent), accumulateEvent); #else // !ENABLE_ASYNC_DNS events = std::accumulate(commandEvents_.begin(), commandEvents_.end(), 0, accumulateEvent); #endif // !ENABLE_ASYNC_DNS return events; } LibuvEventPoll::LibuvEventPoll() { loop_ = uv_loop_new(); } LibuvEventPoll::~LibuvEventPoll() { for (auto& p: polls_) { p.second->close(); } // Actually kill the polls, and timers, if any. uv_run(loop_, (uv_run_mode)(UV_RUN_ONCE | UV_RUN_NOWAIT)); if (loop_) { uv_loop_delete(loop_); loop_ = nullptr; } // Need this to free only after the loop is gone. polls_.clear(); } void LibuvEventPoll::poll(const struct timeval& tv) { const int timeout = tv.tv_sec * 1000 + tv.tv_usec / 1000; // timeout == 0 will tick once if (timeout >= 0) { auto timer = new uv_timer_t; uv_timer_init(loop_, timer); uv_timer_start(timer, timer_callback, timeout, timeout); uv_run(loop_, UV_RUN_DEFAULT); // Remove timer again. uv_timer_stop(timer); uv_close((uv_handle_t*)timer, close_callback); } else { while (uv_run(loop_, (uv_run_mode)(UV_RUN_ONCE | UV_RUN_NOWAIT)) > 0) {} } #ifdef ENABLE_ASYNC_DNS // It turns out that we have to call ares_process_fd before ares's // own timeout and ares may create new sockets or closes socket in // their API. So we call ares_process_fd for all ares_channel and // re-register their sockets. for (auto& r: nameResolverEntries_) { r->processTimeout(); r->removeSocketEvents(this); r->addSocketEvents(this); } #endif // ENABLE_ASYNC_DNS // TODO timeout of name resolver is determined in Command(AbstractCommand, // DHTEntryPoint...Command) } int LibuvEventPoll::translateEvents(EventPoll::EventType events) { int newEvents = 0; if (EventPoll::EVENT_READ & events) { newEvents |= IEV_READ; } if (EventPoll::EVENT_WRITE & events) { newEvents |= IEV_WRITE; } if (EventPoll::EVENT_ERROR & events) { newEvents |= IEV_ERROR; } if (EventPoll::EVENT_HUP & events) { newEvents |= IEV_HUP; } return newEvents; } void LibuvEventPoll::pollCallback(KPoll* poll, int status, int events) { #if HAVE_UV_LAST_ERROR if (status == -1) { uv_err_t err = uv_last_error(loop_); switch (err.code) { #else if (status < 0) { switch (status) { #endif case UV_EAGAIN: case UV_EINTR: return; case UV_EOF: case UV_ECONNABORTED: case UV_ECONNREFUSED: case UV_ECONNRESET: case UV_ENOTCONN: case UV_EPIPE: case UV_ESHUTDOWN: events = IEV_HUP; poll->processEvents(events); poll->stop(); uv_stop(loop_); return; default: events = IEV_ERROR; poll->processEvents(events); poll->stop(); uv_stop(loop_); return; } } // Got something poll->processEvents(events); uv_stop(loop_); } bool LibuvEventPoll::addEvents(sock_t socket, const LibuvEventPoll::KEvent& event) { auto socketEntry = std::make_shared(socket); auto i = socketEntries_.lower_bound(socketEntry); if (i != socketEntries_.end() && **i == *socketEntry) { event.addSelf((*i).get()); auto poll = polls_.find(socket); if (poll == polls_.end()) { throw std::logic_error("Invalid socket"); } poll->second->start(); return true; } socketEntries_.insert(i, socketEntry); event.addSelf(socketEntry.get()); auto poll = new KPoll(this, socketEntry.get(), socket); polls_[socket] = poll; poll->start(); return true; } bool LibuvEventPoll::addEvents(sock_t socket, Command* command, EventPoll::EventType events) { int pollEvents = translateEvents(events); return addEvents(socket, KCommandEvent(command, pollEvents)); } #ifdef ENABLE_ASYNC_DNS bool LibuvEventPoll::addEvents(sock_t socket, Command* command, int events, const std::shared_ptr& rs) { return addEvents(socket, KADNSEvent(rs, command, socket, events)); } #endif // ENABLE_ASYNC_DNS bool LibuvEventPoll::deleteEvents(sock_t socket, const LibuvEventPoll::KEvent& event) { auto socketEntry = std::make_shared(socket); auto i = socketEntries_.find(socketEntry); if (i == socketEntries_.end()) { A2_LOG_DEBUG(fmt("Socket %d is not found in SocketEntries.", socket)); return false; } event.removeSelf((*i).get()); auto poll = polls_.find(socket); if (poll == polls_.end()) { return false; } if ((*i)->eventEmpty()) { poll->second->close(); polls_.erase(poll); socketEntries_.erase(i); return true; } poll->second->start(); return true; } #ifdef ENABLE_ASYNC_DNS bool LibuvEventPoll::deleteEvents(sock_t socket, Command* command, const std::shared_ptr& rs) { return deleteEvents(socket, KADNSEvent(rs, command, socket, 0)); } #endif // ENABLE_ASYNC_DNS bool LibuvEventPoll::deleteEvents(sock_t socket, Command* command, EventPoll::EventType events) { int pollEvents = translateEvents(events); return deleteEvents(socket, KCommandEvent(command, pollEvents)); } #ifdef ENABLE_ASYNC_DNS bool LibuvEventPoll::addNameResolver(const std::shared_ptr& resolver, Command* command) { auto entry = std::make_shared(resolver, command); auto itr = nameResolverEntries_.lower_bound(entry); if (itr != nameResolverEntries_.end() && *(*itr) == *entry) { return false; } nameResolverEntries_.insert(itr, entry); entry->addSocketEvents(this); return true; } bool LibuvEventPoll::deleteNameResolver(const std::shared_ptr& resolver, Command* command) { auto entry = std::make_shared(resolver, command); auto itr = nameResolverEntries_.find(entry); if (itr == nameResolverEntries_.end()) { return false; } (*itr)->removeSocketEvents(this); nameResolverEntries_.erase(itr); return true; } #endif // ENABLE_ASYNC_DNS } // namespace aria2