LibUV: Code cleanup

pull/70/head
Nils Maier 2013-04-11 03:05:14 +02:00
parent 1cd5dcc9b6
commit 539fda0b4f
2 changed files with 90 additions and 71 deletions

View File

@ -59,6 +59,7 @@
namespace { namespace {
using namespace aria2; using namespace aria2;
template<typename T> template<typename T>
static void close_callback(uv_handle_t* handle) static void close_callback(uv_handle_t* handle)
{ {
@ -77,12 +78,12 @@ LibuvEventPoll::KSocketEntry::KSocketEntry(sock_t s)
: SocketEntry<KCommandEvent, KADNSEvent>(s) : SocketEntry<KCommandEvent, KADNSEvent>(s)
{} {}
int accumulateEvent(int events, const LibuvEventPoll::KEvent& event) inline int accumulateEvent(int events, const LibuvEventPoll::KEvent& event)
{ {
return events|event.getEvents(); return events | event.getEvents();
} }
int LibuvEventPoll::KSocketEntry::getEvents() int LibuvEventPoll::KSocketEntry::getEvents() const
{ {
int events = 0; int events = 0;
#ifdef ENABLE_ASYNC_DNS #ifdef ENABLE_ASYNC_DNS
@ -95,6 +96,7 @@ int LibuvEventPoll::KSocketEntry::getEvents()
events = std::accumulate(commandEvents_.begin(), commandEvents_.end(), 0, events = std::accumulate(commandEvents_.begin(), commandEvents_.end(), 0,
accumulateEvent); accumulateEvent);
#endif // !ENABLE_ASYNC_DNS #endif // !ENABLE_ASYNC_DNS
return events; return events;
} }
@ -106,23 +108,25 @@ LibuvEventPoll::LibuvEventPoll()
LibuvEventPoll::~LibuvEventPoll() LibuvEventPoll::~LibuvEventPoll()
{ {
for (KPolls::iterator i = polls_.begin(), e = polls_.end(); i != e; ++i) { for (KPolls::iterator i = polls_.begin(), e = polls_.end(); i != e; ++i) {
uv_poll_stop(&i->second->p); i->second->close();
uv_close((uv_handle_t*)&i->second->p, close_poll_callback);
} }
// Actually kill the polls, and timers, if any // Actually kill the polls, and timers, if any.
uv_run(loop_, (uv_run_mode)(UV_RUN_ONCE | UV_RUN_NOWAIT)); uv_run(loop_, (uv_run_mode)(UV_RUN_ONCE | UV_RUN_NOWAIT));
if (loop_) { if (loop_) {
uv_loop_delete(loop_); uv_loop_delete(loop_);
loop_ = 0; loop_ = 0;
} }
// Need this to free only after the loop is gone.
polls_.clear(); polls_.clear();
} }
void LibuvEventPoll::poll(const struct timeval& tv) void LibuvEventPoll::poll(const struct timeval& tv)
{ {
int timeout = tv.tv_sec * 1000 + tv.tv_usec / 1000; const int timeout = tv.tv_sec * 1000 + tv.tv_usec / 1000;
// timeout == 0 will tick once
if (timeout >= 0) { if (timeout >= 0) {
uv_timer_t* timer = new uv_timer_t; uv_timer_t* timer = new uv_timer_t;
uv_timer_init(loop_, timer); uv_timer_init(loop_, timer);
@ -173,8 +177,7 @@ int LibuvEventPoll::translateEvents(EventPoll::EventType events)
return newEvents; return newEvents;
} }
void LibuvEventPoll::pollCallback(uv_poll_t* handle, poll_t* poll, int status, void LibuvEventPoll::pollCallback(KPoll* poll, int status, int events)
int events)
{ {
if (status == -1) { if (status == -1) {
uv_err_t err = uv_last_error(loop_); uv_err_t err = uv_last_error(loop_);
@ -190,21 +193,21 @@ void LibuvEventPoll::pollCallback(uv_poll_t* handle, poll_t* poll, int status,
case UV_EPIPE: case UV_EPIPE:
case UV_ESHUTDOWN: case UV_ESHUTDOWN:
events = IEV_HUP; events = IEV_HUP;
poll->entry->processEvents(events); poll->processEvents(events);
uv_poll_stop(handle); poll->stop();
uv_stop(loop_); uv_stop(loop_);
return; return;
default: default:
events = IEV_ERROR; events = IEV_ERROR;
poll->entry->processEvents(events); poll->processEvents(events);
uv_poll_stop(handle); poll->stop();
uv_stop(loop_); uv_stop(loop_);
return; return;
} }
} }
// Got something // Got something
poll->entry->processEvents(events); poll->processEvents(events);
uv_stop(loop_); uv_stop(loop_);
} }
@ -213,29 +216,22 @@ bool LibuvEventPoll::addEvents(sock_t socket,
{ {
SharedHandle<KSocketEntry> socketEntry(new KSocketEntry(socket)); SharedHandle<KSocketEntry> socketEntry(new KSocketEntry(socket));
KSocketEntrySet::iterator i = socketEntries_.lower_bound(socketEntry); KSocketEntrySet::iterator i = socketEntries_.lower_bound(socketEntry);
if (i != socketEntries_.end() && **i == *socketEntry) { if (i != socketEntries_.end() && **i == *socketEntry) {
event.addSelf(*i); event.addSelf(*i);
KPolls::iterator poll = polls_.find(socket); KPolls::iterator poll = polls_.find(socket);
if (poll == polls_.end()) { if (poll == polls_.end()) {
throw std::logic_error("Invalid socket"); throw std::logic_error("Invalid socket");
} }
poll->second->events = (*i)->getEvents(); poll->second->start();
uv_poll_start(&poll->second->p, return true;
poll->second->events & (IEV_READ | IEV_WRITE), poll_callback);
}
else {
socketEntries_.insert(i, socketEntry);
event.addSelf(socketEntry);
poll_t *poll = new poll_t;
uv_poll_init_socket(loop_, &poll->p, socket);
poll->entry = socketEntry.get();
poll->eventer = this;
poll->events = socketEntry->getEvents();
poll->p.data = poll;
polls_[socket] = poll;
uv_poll_start(&poll->p, poll->events & (IEV_READ | IEV_WRITE),
poll_callback);
} }
socketEntries_.insert(i, socketEntry);
event.addSelf(socketEntry);
KPoll* poll = new KPoll(this, socketEntry.get(), socket);
polls_[socket] = poll;
poll->start();
return true; return true;
} }
@ -258,7 +254,8 @@ bool LibuvEventPoll::deleteEvents(sock_t socket,
{ {
SharedHandle<KSocketEntry> socketEntry(new KSocketEntry(socket)); SharedHandle<KSocketEntry> socketEntry(new KSocketEntry(socket));
KSocketEntrySet::iterator i = socketEntries_.find(socketEntry); KSocketEntrySet::iterator i = socketEntries_.find(socketEntry);
if(i == socketEntries_.end()) {
if (i == socketEntries_.end()) {
A2_LOG_DEBUG(fmt("Socket %d is not found in SocketEntries.", socket)); A2_LOG_DEBUG(fmt("Socket %d is not found in SocketEntries.", socket));
return false; return false;
} }
@ -269,17 +266,15 @@ bool LibuvEventPoll::deleteEvents(sock_t socket,
if (poll == polls_.end()) { if (poll == polls_.end()) {
return false; return false;
} }
if ((*i)->eventEmpty()) { if ((*i)->eventEmpty()) {
uv_poll_stop(&poll->second->p); poll->second->close();
uv_close((uv_handle_t*)&poll->second->p, close_poll_callback);
polls_.erase(poll); polls_.erase(poll);
socketEntries_.erase(i); socketEntries_.erase(i);
return true;
} }
else {
poll->second->events = (*i)->getEvents(); poll->second->start();
uv_poll_start(&poll->second->p,
poll->second->events & (IEV_READ | IEV_WRITE), poll_callback);
}
return true; return true;
} }

View File

@ -59,43 +59,74 @@ private:
typedef CommandEvent<KSocketEntry, LibuvEventPoll> KCommandEvent; typedef CommandEvent<KSocketEntry, LibuvEventPoll> KCommandEvent;
typedef ADNSEvent<KSocketEntry, LibuvEventPoll> KADNSEvent; typedef ADNSEvent<KSocketEntry, LibuvEventPoll> KADNSEvent;
typedef AsyncNameResolverEntry<LibuvEventPoll> KAsyncNameResolverEntry; typedef AsyncNameResolverEntry<LibuvEventPoll> KAsyncNameResolverEntry;
friend class AsyncNameResolverEntry<LibuvEventPoll>; friend class AsyncNameResolverEntry<LibuvEventPoll>;
class KSocketEntry:
public SocketEntry<KCommandEvent, KADNSEvent> {
public:
KSocketEntry(sock_t socket);
int getEvents();
};
friend int accumulateEvent(int events, const KEvent& event); friend int accumulateEvent(int events, const KEvent& event);
private: class KSocketEntry: public SocketEntry<KCommandEvent, KADNSEvent> {
uv_loop_t* loop_; public:
KSocketEntry(sock_t socket);
int getEvents() const;
};
class KPoll {
private:
LibuvEventPoll *eventer_;
KSocketEntry *entry_;
uv_poll_t handle_;
static void poll_callback(uv_poll_t* handle, int status, int events) {
KPoll* poll = static_cast<KPoll*>(handle->data);
poll->eventer_->pollCallback(poll, status, events);
}
static void close_callback(uv_handle_t* handle) {
delete static_cast<KPoll*>(handle->data);
}
public:
inline KPoll(LibuvEventPoll* eventer, KSocketEntry* entry, sock_t sock)
: eventer_(eventer), entry_(entry)
{
uv_poll_init_socket(eventer->loop_, &handle_, sock);
handle_.data = this;
}
inline void start() {
uv_poll_start(&handle_, entry_->getEvents() & IEV_RW, poll_callback);
}
inline void stop() {
uv_poll_stop(&handle_);
}
inline void processEvents(int events) {
entry_->processEvents(events);
}
inline void close() {
stop();
uv_close((uv_handle_t*)&handle_, close_callback);
}
};
typedef std::set<SharedHandle<KSocketEntry>, typedef std::set<SharedHandle<KSocketEntry>,
DerefLess<SharedHandle<KSocketEntry> > > KSocketEntrySet; DerefLess<SharedHandle<KSocketEntry> > > KSocketEntrySet;
KSocketEntrySet socketEntries_;
typedef struct { typedef std::map<sock_t, KPoll*> KPolls;
uv_poll_t p;
KSocketEntry *entry;
LibuvEventPoll *eventer;
int events;
} poll_t;
typedef std::map<sock_t, poll_t*> KPolls;
KPolls polls_;
#ifdef ENABLE_ASYNC_DNS #ifdef ENABLE_ASYNC_DNS
typedef std::set<SharedHandle<KAsyncNameResolverEntry>, typedef std::set<SharedHandle<KAsyncNameResolverEntry>,
DerefLess<SharedHandle<KAsyncNameResolverEntry> > > DerefLess<SharedHandle<KAsyncNameResolverEntry> > >
KAsyncNameResolverEntrySet; KAsyncNameResolverEntrySet;
#endif // ENABLE_ASYNC_DNS
uv_loop_t* loop_;
KSocketEntrySet socketEntries_;
KPolls polls_;
#ifdef ENABLE_ASYNC_DNS
KAsyncNameResolverEntrySet nameResolverEntries_; KAsyncNameResolverEntrySet nameResolverEntries_;
#endif // ENABLE_ASYNC_DNS #endif // ENABLE_ASYNC_DNS
bool addEvents(sock_t socket, const KEvent& event); bool addEvents(sock_t socket, const KEvent& event);
bool deleteEvents(sock_t socket, const KEvent& event); bool deleteEvents(sock_t socket, const KEvent& event);
void pollCallback(KPoll *poll, int status, int events);
#ifdef ENABLE_ASYNC_DNS #ifdef ENABLE_ASYNC_DNS
bool addEvents(sock_t socket, Command* command, int events, bool addEvents(sock_t socket, Command* command, int events,
@ -105,18 +136,9 @@ private:
#endif #endif
static int translateEvents(EventPoll::EventType events); static int translateEvents(EventPoll::EventType events);
static void close_poll_callback(uv_handle_t* handle) {
delete static_cast<poll_t*>(handle->data);
}
static void poll_callback(uv_poll_t* handle, int status, int events) {
poll_t* poll = static_cast<poll_t*>(handle->data);
poll->eventer->pollCallback(handle, poll, status, events);
}
void pollCallback(uv_poll_t* handle, poll_t *poll, int status, int events);
public: public:
LibuvEventPoll(); LibuvEventPoll();
virtual ~LibuvEventPoll(); virtual ~LibuvEventPoll();
bool good() const { return loop_; } bool good() const { return loop_; }
@ -125,19 +147,21 @@ public:
virtual bool addEvents(sock_t socket, virtual bool addEvents(sock_t socket,
Command* command, EventPoll::EventType events); Command* command, EventPoll::EventType events);
virtual bool deleteEvents(sock_t socket, virtual bool deleteEvents(sock_t socket,
Command* command, EventPoll::EventType events); Command* command, EventPoll::EventType events);
#ifdef ENABLE_ASYNC_DNS
#ifdef ENABLE_ASYNC_DNS
virtual bool addNameResolver(const SharedHandle<AsyncNameResolver>& resolver, virtual bool addNameResolver(const SharedHandle<AsyncNameResolver>& resolver,
Command* command); Command* command);
virtual bool deleteNameResolver virtual bool deleteNameResolver(
(const SharedHandle<AsyncNameResolver>& resolver, Command* command); const SharedHandle<AsyncNameResolver>& resolver, Command* command);
#endif // ENABLE_ASYNC_DNS #endif // ENABLE_ASYNC_DNS
static const int IEV_READ = UV_READABLE; static const int IEV_READ = UV_READABLE;
static const int IEV_WRITE = UV_WRITABLE; static const int IEV_WRITE = UV_WRITABLE;
static const int IEV_RW = UV_READABLE | UV_WRITABLE;
// Make sure these do not interfere with the uv_poll API later.
static const int IEV_ERROR = 128; static const int IEV_ERROR = 128;
static const int IEV_HUP = 255; static const int IEV_HUP = 255;
}; };