/* */ #include "KqueueEventPoll.h" #include #include #include #include #include "Command.h" #include "LogFactory.h" #include "Logger.h" #include "util.h" #include "fmt.h" #ifdef KEVENT_UDATA_INTPTR_T # define PTR_TO_UDATA(X) (reinterpret_cast(X)) #else // !KEVENT_UDATA_INTPTR_T # define PTR_TO_UDATA(X) (X) #endif // !KEVENT_UDATA_INTPTR_T namespace aria2 { KqueueEventPoll::KSocketEntry::KSocketEntry(sock_t s) : SocketEntry(s) {} int accumulateEvent(int events, const KqueueEventPoll::KEvent& event) { return events|event.getEvents(); } size_t KqueueEventPoll::KSocketEntry::getEvents (struct kevent* eventlist) { int events; #ifdef ENABLE_ASYNC_DNS events = std::accumulate(adnsEvents_.begin(), adnsEvents_.end(), std::accumulate(commandEvents_.begin(), commandEvents_.end(), 0, accumulateEvent), accumulateEvent); #else // !ENABLE_ASYNC_DNS events = std::accumulate(commandEvents_.begin(), commandEvents_.end(), 0, accumulateEvent); #endif // !ENABLE_ASYNC_DNS EV_SET(&eventlist[0], socket_, EVFILT_READ, EV_ADD|((events&KqueueEventPoll::IEV_READ)?EV_ENABLE:EV_DISABLE), 0, 0, PTR_TO_UDATA(this)); EV_SET(&eventlist[1], socket_, EVFILT_WRITE, EV_ADD|((events&KqueueEventPoll::IEV_WRITE)?EV_ENABLE:EV_DISABLE), 0, 0, PTR_TO_UDATA(this)); return 2; } KqueueEventPoll::KqueueEventPoll() : kqEventsSize_(KQUEUE_EVENTS_MAX), kqEvents_(new struct kevent[kqEventsSize_]) { kqfd_ = kqueue(); } KqueueEventPoll::~KqueueEventPoll() { if(kqfd_ != -1) { int r = close(kqfd_); int errNum = errno; if(r == -1) { A2_LOG_ERROR(fmt("Error occurred while closing kqueue file descriptor" " %d: %s", kqfd_, util::safeStrerror(errNum).c_str())); } } delete [] kqEvents_; } bool KqueueEventPoll::good() const { return kqfd_ != -1; } void KqueueEventPoll::poll(const struct timeval& tv) { struct timespec timeout = { tv.tv_sec, tv.tv_usec*1000 }; int res; while((res = kevent(kqfd_, kqEvents_, 0, kqEvents_, kqEventsSize_, &timeout)) == -1 && errno == EINTR); if(res > 0) { for(int i = 0; i < res; ++i) { KSocketEntry* p = reinterpret_cast(kqEvents_[i].udata); int events = 0; int filter = kqEvents_[i].filter; if(filter == EVFILT_READ) { events = KqueueEventPoll::IEV_READ; } else if(filter == EVFILT_WRITE) { events = KqueueEventPoll::IEV_WRITE; } p->processEvents(events); } } else if(res == -1) { int errNum = errno; A2_LOG_INFO(fmt("kevent error: %s", util::safeStrerror(errNum).c_str())); } #ifdef ENABLE_ASYNC_DNS // It turns out that we have to call ares_process_fd before ares's // own timeout and ares may create new sockets or closes socket in // their API. So we call ares_process_fd for all ares_channel and // re-register their sockets. for(KAsyncNameResolverEntrySet::iterator i = nameResolverEntries_.begin(), eoi = nameResolverEntries_.end(); i != eoi; ++i) { (*i)->processTimeout(); (*i)->removeSocketEvents(this); (*i)->addSocketEvents(this); } #endif // ENABLE_ASYNC_DNS // TODO timeout of name resolver is determined in Command(AbstractCommand, // DHTEntryPoint...Command) } namespace { int translateEvents(EventPoll::EventType events) { int newEvents = 0; if(EventPoll::EVENT_READ&events) { newEvents |= KqueueEventPoll::IEV_READ; } if(EventPoll::EVENT_WRITE&events) { newEvents |= KqueueEventPoll::IEV_WRITE; } return newEvents; } } // namespace bool KqueueEventPoll::addEvents (sock_t socket, const KqueueEventPoll::KEvent& event) { std::shared_ptr socketEntry(new KSocketEntry(socket)); KSocketEntrySet::iterator i = socketEntries_.lower_bound(socketEntry); int r = 0; struct timespec zeroTimeout = { 0, 0 }; struct kevent changelist[2]; size_t n; if(i != socketEntries_.end() && *(*i) == *socketEntry) { event.addSelf(*i); n = (*i)->getEvents(changelist); } else { socketEntries_.insert(i, socketEntry); if(socketEntries_.size() > kqEventsSize_) { kqEventsSize_ *= 2; delete [] kqEvents_; kqEvents_ = new struct kevent[kqEventsSize_]; } event.addSelf(socketEntry); n = socketEntry->getEvents(changelist); } r = kevent(kqfd_, changelist, n, changelist, 0, &zeroTimeout); int errNum = errno; if(r == -1) { A2_LOG_DEBUG(fmt("Failed to add socket event %d:%s", socket, util::safeStrerror(errNum).c_str())); return false; } else { return true; } } bool KqueueEventPoll::addEvents(sock_t socket, Command* command, EventPoll::EventType events) { int kqEvents = translateEvents(events); return addEvents(socket, KCommandEvent(command, kqEvents)); } #ifdef ENABLE_ASYNC_DNS bool KqueueEventPoll::addEvents(sock_t socket, Command* command, int events, const std::shared_ptr& rs) { return addEvents(socket, KADNSEvent(rs, command, socket, events)); } #endif // ENABLE_ASYNC_DNS bool KqueueEventPoll::deleteEvents(sock_t socket, const KqueueEventPoll::KEvent& event) { std::shared_ptr socketEntry(new KSocketEntry(socket)); KSocketEntrySet::iterator i = socketEntries_.find(socketEntry); if(i == socketEntries_.end()) { A2_LOG_DEBUG(fmt("Socket %d is not found in SocketEntries.", socket)); return false; } else { event.removeSelf(*i); int r = 0; struct timespec zeroTimeout = { 0, 0 }; struct kevent changelist[2]; size_t n = (*i)->getEvents(changelist); r = kevent(kqfd_, changelist, n, changelist, 0, &zeroTimeout); int errNum = errno; if((*i)->eventEmpty()) { socketEntries_.erase(i); } if(r == -1) { A2_LOG_DEBUG(fmt("Failed to delete socket event:%s", util::safeStrerror(errNum).c_str())); return false; } else { return true; } } } #ifdef ENABLE_ASYNC_DNS bool KqueueEventPoll::deleteEvents(sock_t socket, Command* command, const std::shared_ptr& rs) { return deleteEvents(socket, KADNSEvent(rs, command, socket, 0)); } #endif // ENABLE_ASYNC_DNS bool KqueueEventPoll::deleteEvents(sock_t socket, Command* command, EventPoll::EventType events) { int kqEvents = translateEvents(events); return deleteEvents(socket, KCommandEvent(command, kqEvents)); } #ifdef ENABLE_ASYNC_DNS bool KqueueEventPoll::addNameResolver (const std::shared_ptr& resolver, Command* command) { std::shared_ptr entry (new KAsyncNameResolverEntry(resolver, command)); KAsyncNameResolverEntrySet::iterator itr = nameResolverEntries_.find(entry); if(itr == nameResolverEntries_.end()) { nameResolverEntries_.insert(entry); entry->addSocketEvents(this); return true; } else { return false; } } bool KqueueEventPoll::deleteNameResolver (const std::shared_ptr& resolver, Command* command) { std::shared_ptr entry (new KAsyncNameResolverEntry(resolver, command)); KAsyncNameResolverEntrySet::iterator itr = nameResolverEntries_.find(entry); if(itr == nameResolverEntries_.end()) { return false; } else { (*itr)->removeSocketEvents(this); nameResolverEntries_.erase(itr); return true; } } #endif // ENABLE_ASYNC_DNS } // namespace aria2