diff --git a/ChangeLog b/ChangeLog index 98fda3bc..30eef80f 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,9 @@ +2010-04-20 Tatsuhiro Tsujikawa + + Use Event in EpollEventPoll + * src/EpollEventPoll.cc + * src/EpollEventPoll.h + 2010-04-20 Tatsuhiro Tsujikawa Externalized Event, CommandEvent, ADNSEvent, SocketEntry and diff --git a/src/EpollEventPoll.cc b/src/EpollEventPoll.cc index f2f96f4e..e700c467 100644 --- a/src/EpollEventPoll.cc +++ b/src/EpollEventPoll.cc @@ -44,195 +44,23 @@ namespace aria2 { -EpollEventPoll::CommandEvent::CommandEvent(Command* command, int events): - _command(command), _events(events) {} +EpollEventPoll::KSocketEntry::KSocketEntry(sock_t s): + SocketEntry(s) {} -int EpollEventPoll::CommandEvent::getEvents() const -{ - return _events; -} - -void EpollEventPoll::CommandEvent::processEvents(int events) -{ - if((_events&events) || - ((EpollEventPoll::EVENT_ERROR|EpollEventPoll::EVENT_HUP)&events)) { - _command->setStatusActive(); - } - if(EpollEventPoll::EVENT_READ&events) { - _command->readEventReceived(); - } - if(EpollEventPoll::EVENT_WRITE&events) { - _command->writeEventReceived(); - } - if(EpollEventPoll::EVENT_ERROR&events) { - _command->errorEventReceived(); - } - if(EpollEventPoll::EVENT_HUP&events) { - _command->hupEventReceived(); - } -} - -void EpollEventPoll::CommandEvent::addSelf -(const SharedHandle& socketEntry) const -{ - socketEntry->addCommandEvent(*this); -} - -void EpollEventPoll::CommandEvent::removeSelf -(const SharedHandle& socketEntry) const -{ - socketEntry->removeCommandEvent(*this); -} - -#ifdef ENABLE_ASYNC_DNS - -EpollEventPoll::ADNSEvent::ADNSEvent -(const SharedHandle& resolver, - Command* command, - sock_t socket, int events): - _resolver(resolver), _command(command), _socket(socket), _events(events) {} - -int EpollEventPoll::ADNSEvent::getEvents() const -{ - return _events; -} - -void EpollEventPoll::ADNSEvent::processEvents(int events) -{ - ares_socket_t readfd; - ares_socket_t writefd; - if(events&(EpollEventPoll::EVENT_READ|EpollEventPoll::EVENT_ERROR| - EpollEventPoll::EVENT_HUP)) { - readfd = _socket; - } else { - readfd = ARES_SOCKET_BAD; - } - if(events&(EpollEventPoll::EVENT_WRITE|EpollEventPoll::EVENT_ERROR| - EpollEventPoll::EVENT_HUP)) { - writefd = _socket; - } else { - writefd = ARES_SOCKET_BAD; - } - _resolver->process(readfd, writefd); - _command->setStatusActive(); -} - -void EpollEventPoll::ADNSEvent::addSelf -(const SharedHandle& socketEntry) const -{ - socketEntry->addADNSEvent(*this); -} - -void EpollEventPoll::ADNSEvent::removeSelf -(const SharedHandle& socketEntry) const -{ - socketEntry->removeADNSEvent(*this); -} - -#endif // ENABLE_ASYNC_DNS - -EpollEventPoll::SocketEntry::SocketEntry(sock_t socket):_socket(socket) -{ - memset(&_epEvent, 0, sizeof(struct epoll_event)); -} - -void EpollEventPoll::SocketEntry::addCommandEvent(const CommandEvent& cev) -{ - std::deque::iterator i = std::find(_commandEvents.begin(), - _commandEvents.end(), - cev); - if(i == _commandEvents.end()) { - _commandEvents.push_back(cev); - } else { - (*i).addEvents(cev.getEvents()); - } -} - -void EpollEventPoll::SocketEntry::removeCommandEvent(const CommandEvent& cev) -{ - std::deque::iterator i = std::find(_commandEvents.begin(), - _commandEvents.end(), - cev); - if(i == _commandEvents.end()) { - // not found - } else { - (*i).removeEvents(cev.getEvents()); - if((*i).eventsEmpty()) { - _commandEvents.erase(i); - } - } -} - -#ifdef ENABLE_ASYNC_DNS - -void EpollEventPoll::SocketEntry::addADNSEvent(const ADNSEvent& aev) -{ - std::deque::iterator i = std::find(_adnsEvents.begin(), - _adnsEvents.end(), - aev); - if(i == _adnsEvents.end()) { - _adnsEvents.push_back(aev); - } -} - -void EpollEventPoll::SocketEntry::removeADNSEvent(const ADNSEvent& aev) -{ - std::deque::iterator i = std::find(_adnsEvents.begin(), - _adnsEvents.end(), - aev); - if(i == _adnsEvents.end()) { - // not found - } else { - _adnsEvents.erase(i); - } -} - -#endif // ENABLE_ASYNC_DNS - -void EpollEventPoll::SocketEntry::processEvents(int events) -{ - std::for_each(_commandEvents.begin(), _commandEvents.end(), - std::bind2nd(std::mem_fun_ref - (&EpollEventPoll::CommandEvent::processEvents), - events)); -#ifdef ENABLE_ASYNC_DNS - - std::for_each(_adnsEvents.begin(), _adnsEvents.end(), - std::bind2nd(std::mem_fun_ref - (&EpollEventPoll::ADNSEvent::processEvents), - events)); - -#endif // ENABLE_ASYNC_DNS - -} - -bool EpollEventPoll::SocketEntry::eventEmpty() const -{ - -#ifdef ENABLE_ASYNC_DNS - - return _commandEvents.empty() && _adnsEvents.empty(); - -#else // !ENABLE_ASYNC_DNS - - return _commandEvents.empty(); - -#endif // !ENABLE_ASYNC_DNS) - -} - -int accumulateEvent(int events, const EpollEventPoll::Event& event) +int accumulateEvent(int events, const EpollEventPoll::KEvent& event) { return events|event.getEvents(); } -struct epoll_event& EpollEventPoll::SocketEntry::getEpEvent() +struct epoll_event EpollEventPoll::KSocketEntry::getEvents() { - _epEvent.data.ptr = this; + struct epoll_event epEvent; + memset(&epEvent, 0, sizeof(struct epoll_event)); + epEvent.data.ptr = this; #ifdef ENABLE_ASYNC_DNS - _epEvent.events = + epEvent.events = std::accumulate(_adnsEvents.begin(), _adnsEvents.end(), std::accumulate(_commandEvents.begin(), @@ -241,64 +69,14 @@ struct epoll_event& EpollEventPoll::SocketEntry::getEpEvent() #else // !ENABLE_ASYNC_DNS - _epEvent.events = + epEvent.events = std::accumulate(_commandEvents.begin(), _commandEvents.end(), 0, accumulateEvent); #endif // !ENABLE_ASYNC_DNS - return _epEvent; + return epEvent; } -#ifdef ENABLE_ASYNC_DNS - -EpollEventPoll::AsyncNameResolverEntry::AsyncNameResolverEntry -(const SharedHandle& nameResolver, Command* command): - _nameResolver(nameResolver), _command(command), _socketsSize(0) - -{} - -void EpollEventPoll::AsyncNameResolverEntry::addSocketEvents -(EpollEventPoll* e) -{ - _socketsSize = 0; - int mask = _nameResolver->getsock(_sockets); - if(mask == 0) { - return; - } - size_t i; - for(i = 0; i < ARES_GETSOCK_MAXNUM; ++i) { - //epoll_event_t* epEventPtr = &_epEvents[_socketsSize]; - int events = 0; - if(ARES_GETSOCK_READABLE(mask, i)) { - events |= EPOLLIN; - } - if(ARES_GETSOCK_WRITABLE(mask, i)) { - events |= EPOLLOUT; - } - if(events == 0) { - // assume no further sockets are returned. - break; - } - e->addEvents(_sockets[i], _command, events, _nameResolver); - } - _socketsSize = i; -} - -void EpollEventPoll::AsyncNameResolverEntry::removeSocketEvents -(EpollEventPoll* e) -{ - for(size_t i = 0; i < _socketsSize; ++i) { - e->deleteEvents(_sockets[i], _command, _nameResolver); - } -} - -void EpollEventPoll::AsyncNameResolverEntry::processTimeout() -{ - _nameResolver->process(ARES_SOCKET_BAD, ARES_SOCKET_BAD); -} - -#endif // ENABLE_ASYNC_DNS - EpollEventPoll::EpollEventPoll():_logger(LogFactory::getInstance()) { _epfd = epoll_create(EPOLL_EVENTS_MAX); @@ -336,7 +114,7 @@ void EpollEventPoll::poll(const struct timeval& tv) if(res > 0) { for(int i = 0; i < res; ++i) { - SocketEntry* p = reinterpret_cast(_epEvents[i].data.ptr); + KSocketEntry* p = reinterpret_cast(_epEvents[i].data.ptr); p->processEvents(_epEvents[i].events); } } @@ -346,7 +124,7 @@ void EpollEventPoll::poll(const struct timeval& tv) // own timeout and ares may create new sockets or closes socket in // their API. So we call ares_process_fd for all ares_channel and // re-register their sockets. - for(std::deque >::iterator i = + for(std::deque >::iterator i = _nameResolverEntries.begin(), eoi = _nameResolverEntries.end(); i != eoi; ++i) { (*i)->processTimeout(); @@ -378,32 +156,33 @@ static int translateEvents(EventPoll::EventType events) } bool EpollEventPoll::addEvents(sock_t socket, - const EpollEventPoll::Event& event) + const EpollEventPoll::KEvent& event) { - SharedHandle socketEntry(new SocketEntry(socket)); - std::deque >::iterator i = + SharedHandle socketEntry(new KSocketEntry(socket)); + std::deque >::iterator i = std::lower_bound(_socketEntries.begin(), _socketEntries.end(), socketEntry); int r = 0; if(i != _socketEntries.end() && (*i) == socketEntry) { event.addSelf(*i); - r = epoll_ctl(_epfd, EPOLL_CTL_MOD, (*i)->getSocket(), &(*i)->getEpEvent()); + struct epoll_event epEvent = (*i)->getEvents(); + r = epoll_ctl(_epfd, EPOLL_CTL_MOD, (*i)->getSocket(), &epEvent); if(r == -1) { // try EPOLL_CTL_ADD: There is a chance that previously socket X is // added to epoll, but it is closed and is not yet removed from // SocketEntries. In this case, EPOLL_CTL_MOD is failed with ENOENT. r = epoll_ctl(_epfd, EPOLL_CTL_ADD, (*i)->getSocket(), - &(*i)->getEpEvent()); + &epEvent); } } else { _socketEntries.insert(i, socketEntry); event.addSelf(socketEntry); - r = epoll_ctl(_epfd, EPOLL_CTL_ADD, socketEntry->getSocket(), - &socketEntry->getEpEvent()); + struct epoll_event epEvent = socketEntry->getEvents(); + r = epoll_ctl(_epfd, EPOLL_CTL_ADD, socketEntry->getSocket(), &epEvent); } if(r == -1) { if(_logger->debug()) { @@ -420,22 +199,22 @@ bool EpollEventPoll::addEvents(sock_t socket, Command* command, EventPoll::EventType events) { int epEvents = translateEvents(events); - return addEvents(socket, CommandEvent(command, epEvents)); + return addEvents(socket, KCommandEvent(command, epEvents)); } #ifdef ENABLE_ASYNC_DNS bool EpollEventPoll::addEvents(sock_t socket, Command* command, int events, const SharedHandle& rs) { - return addEvents(socket, ADNSEvent(rs, command, socket, events)); + return addEvents(socket, KADNSEvent(rs, command, socket, events)); } #endif // ENABLE_ASYNC_DNS bool EpollEventPoll::deleteEvents(sock_t socket, - const EpollEventPoll::Event& event) + const EpollEventPoll::KEvent& event) { - SharedHandle socketEntry(new SocketEntry(socket)); - std::deque >::iterator i = + SharedHandle socketEntry(new KSocketEntry(socket)); + std::deque >::iterator i = std::lower_bound(_socketEntries.begin(), _socketEntries.end(), socketEntry); if(i != _socketEntries.end() && (*i) == socketEntry) { @@ -451,8 +230,8 @@ bool EpollEventPoll::deleteEvents(sock_t socket, } else { // If socket is closed, then it seems it is automatically removed from // epoll, so following EPOLL_CTL_MOD may fail. - r = epoll_ctl(_epfd, EPOLL_CTL_MOD, (*i)->getSocket(), - &(*i)->getEpEvent()); + struct epoll_event epEvent = (*i)->getEvents(); + r = epoll_ctl(_epfd, EPOLL_CTL_MOD, (*i)->getSocket(), &epEvent); if(r == -1) { if(_logger->debug()) { _logger->debug("Failed to delete socket event, but may be ignored:%s", @@ -480,7 +259,7 @@ bool EpollEventPoll::deleteEvents(sock_t socket, bool EpollEventPoll::deleteEvents(sock_t socket, Command* command, const SharedHandle& rs) { - return deleteEvents(socket, ADNSEvent(rs, command, socket, 0)); + return deleteEvents(socket, KADNSEvent(rs, command, socket, 0)); } #endif // ENABLE_ASYNC_DNS @@ -488,16 +267,16 @@ bool EpollEventPoll::deleteEvents(sock_t socket, Command* command, EventPoll::EventType events) { int epEvents = translateEvents(events); - return deleteEvents(socket, CommandEvent(command, epEvents)); + return deleteEvents(socket, KCommandEvent(command, epEvents)); } #ifdef ENABLE_ASYNC_DNS bool EpollEventPoll::addNameResolver (const SharedHandle& resolver, Command* command) { - SharedHandle entry - (new AsyncNameResolverEntry(resolver, command)); - std::deque >::iterator itr = + SharedHandle entry + (new KAsyncNameResolverEntry(resolver, command)); + std::deque >::iterator itr = std::find(_nameResolverEntries.begin(), _nameResolverEntries.end(), entry); if(itr == _nameResolverEntries.end()) { _nameResolverEntries.push_back(entry); @@ -511,9 +290,9 @@ bool EpollEventPoll::addNameResolver bool EpollEventPoll::deleteNameResolver (const SharedHandle& resolver, Command* command) { - SharedHandle entry - (new AsyncNameResolverEntry(resolver, command)); - std::deque >::iterator itr = + SharedHandle entry + (new KAsyncNameResolverEntry(resolver, command)); + std::deque >::iterator itr = std::find(_nameResolverEntries.begin(), _nameResolverEntries.end(), entry); if(itr == _nameResolverEntries.end()) { return false; diff --git a/src/EpollEventPoll.h b/src/EpollEventPoll.h index 5f807304..98460102 100644 --- a/src/EpollEventPoll.h +++ b/src/EpollEventPoll.h @@ -41,6 +41,7 @@ #include +#include "Event.h" #ifdef ENABLE_ASYNC_DNS # include "AsyncNameResolver.h" #endif // ENABLE_ASYNC_DNS @@ -50,190 +51,29 @@ namespace aria2 { class Logger; class EpollEventPoll : public EventPoll { +private: + class KSocketEntry; + + typedef Event KEvent; + typedef CommandEvent KCommandEvent; + typedef ADNSEvent KADNSEvent; + typedef AsyncNameResolverEntry KAsyncNameResolverEntry; + friend class AsyncNameResolverEntry; + + class KSocketEntry: + public SocketEntry { + public: + KSocketEntry(sock_t socket); + + virtual struct epoll_event getEvents(); + }; + + friend int accumulateEvent(int events, const KEvent& event); private: - - class SocketEntry; - - class Event { - public: - virtual ~Event() {} - - virtual void processEvents(int events) = 0; - - virtual int getEvents() const = 0; - - virtual void addSelf(const SharedHandle& socketEntry) const =0; - - virtual void removeSelf - (const SharedHandle& socketEntry) const =0; - }; - - friend int accumulateEvent(int events, const Event& event); - - class CommandEvent : public Event { - private: - Command* _command; - int _events; - public: - CommandEvent(Command* command, int events); - - Command* getCommand() const - { - return _command; - } - - void addEvents(int events) - { - _events |= events; - } - - void removeEvents(int events) - { - _events &= (~events); - } - - bool eventsEmpty() const - { - return _events == 0; - } - - bool operator==(const CommandEvent& commandEvent) const - { - return _command == commandEvent._command; - } - - virtual int getEvents() const; - - virtual void processEvents(int events); - - virtual void addSelf(const SharedHandle& socketEntry) const; - - virtual void removeSelf(const SharedHandle& socketEntry) const; - }; - + std::deque > _socketEntries; #ifdef ENABLE_ASYNC_DNS - - class ADNSEvent : public Event { - private: - SharedHandle _resolver; - Command* _command; - sock_t _socket; - int _events; - public: - ADNSEvent(const SharedHandle& resolver, Command* command, - sock_t socket, int events); - - bool operator==(const ADNSEvent& event) const - { - return _resolver == event._resolver; - } - - virtual int getEvents() const; - - virtual void processEvents(int events); - - virtual void addSelf(const SharedHandle& socketEntry) const; - - virtual void removeSelf(const SharedHandle& socketEntry) const; - }; - -#endif // ENABLE_ASYNC_DNS - - class SocketEntry { - private: - sock_t _socket; - - std::deque _commandEvents; - -#ifdef ENABLE_ASYNC_DNS - - std::deque _adnsEvents; - -#endif // ENABLE_ASYNC_DNS - - struct epoll_event _epEvent; - - public: - SocketEntry(sock_t socket); - - bool operator==(const SocketEntry& entry) const - { - return _socket == entry._socket; - } - - bool operator<(const SocketEntry& entry) const - { - return _socket < entry._socket; - } - - void addCommandEvent(const CommandEvent& cev); - - void removeCommandEvent(const CommandEvent& cev); - -#ifdef ENABLE_ASYNC_DNS - - void addADNSEvent(const ADNSEvent& aev); - - void removeADNSEvent(const ADNSEvent& aev); - -#endif // ENABLE_ASYNC_DNS - - struct epoll_event& getEpEvent(); - - sock_t getSocket() const - { - return _socket; - } - - bool eventEmpty() const; - - void processEvents(int events); - }; - -#ifdef ENABLE_ASYNC_DNS - - class AsyncNameResolverEntry { - private: - SharedHandle _nameResolver; - - Command* _command; - - size_t _socketsSize; - - sock_t _sockets[ARES_GETSOCK_MAXNUM]; - - public: - AsyncNameResolverEntry(const SharedHandle& nameResolver, - Command* command); - - bool operator==(const AsyncNameResolverEntry& entry) - { - return _nameResolver == entry._nameResolver && - _command == entry._command; - } - - void addSocketEvents(EpollEventPoll* socketPoll); - - void removeSocketEvents(EpollEventPoll* socketPoll); - - // Calls AsyncNameResolver::process(ARES_SOCKET_BAD, - // ARES_SOCKET_BAD). - void processTimeout(); - }; - -#endif // ENABLE_ASYNC_DNS -private: - enum EventType { - EVENT_READ = EPOLLIN, - EVENT_WRITE = EPOLLOUT, - EVENT_ERROR = EPOLLERR, - EVENT_HUP = EPOLLHUP, - }; - - std::deque > _socketEntries; -#ifdef ENABLE_ASYNC_DNS - std::deque > _nameResolverEntries; + std::deque > _nameResolverEntries; #endif // ENABLE_ASYNC_DNS int _epfd; @@ -244,9 +84,9 @@ private: Logger* _logger; - bool addEvents(sock_t socket, const Event& event); + bool addEvents(sock_t socket, const KEvent& event); - bool deleteEvents(sock_t socket, const Event& event); + bool deleteEvents(sock_t socket, const KEvent& event); bool addEvents(sock_t socket, Command* command, int events, const SharedHandle& rs); @@ -276,6 +116,10 @@ public: (const SharedHandle& resolver, Command* command); #endif // ENABLE_ASYNC_DNS + static const int IEV_READ = EPOLLIN; + static const int IEV_WRITE = EPOLLOUT; + static const int IEV_ERROR = EPOLLERR; + static const int IEV_HUP = EPOLLHUP; }; } // namespace aria2