LibUV: Correct event removal

pull/70/head
Nils Maier 2013-04-10 21:54:34 +02:00
parent 9acd3df3cb
commit 1cd5dcc9b6
3 changed files with 73 additions and 35 deletions

View File

@ -58,13 +58,17 @@
#include "util.h"
namespace {
using namespace aria2;
template<typename T>
static void close_callback(uv_handle_t* handle)
{
delete reinterpret_cast<T*>(handle);
}
static void timer_callback(uv_timer_t* handle, int status) {}
static void timer_callback(uv_timer_t* handle, int status)
{
uv_stop(handle->loop);
}
}
namespace aria2 {
@ -96,21 +100,21 @@ int LibuvEventPoll::KSocketEntry::getEvents()
LibuvEventPoll::LibuvEventPoll()
{
loop = uv_loop_new();
loop_ = uv_loop_new();
}
LibuvEventPoll::~LibuvEventPoll()
{
for (KPolls::iterator i = polls_.begin(), e = polls_.end(); i != e; ++i) {
uv_poll_stop(i->second);
uv_close((uv_handle_t*)i->second, close_callback<uv_poll_t>);
uv_poll_stop(&i->second->p);
uv_close((uv_handle_t*)&i->second->p, close_poll_callback);
}
// Actually kill the polls, and timers, if any
uv_run(loop, (uv_run_mode)(UV_RUN_ONCE | UV_RUN_NOWAIT));
uv_run(loop_, (uv_run_mode)(UV_RUN_ONCE | UV_RUN_NOWAIT));
if (loop) {
uv_loop_delete(loop);
loop = 0;
if (loop_) {
uv_loop_delete(loop_);
loop_ = 0;
}
polls_.clear();
}
@ -119,19 +123,19 @@ void LibuvEventPoll::poll(const struct timeval& tv)
{
int timeout = tv.tv_sec * 1000 + tv.tv_usec / 1000;
if (timeout > 0) {
if (timeout >= 0) {
uv_timer_t* timer = new uv_timer_t;
uv_timer_init(loop, timer);
uv_timer_init(loop_, timer);
uv_timer_start(timer, timer_callback, timeout, timeout);
uv_run(loop, UV_RUN_ONCE);
uv_run(loop_, UV_RUN_DEFAULT);
// Remove timer again.
uv_timer_stop(timer);
uv_close((uv_handle_t*)timer, close_callback<uv_timer_t>);
}
else {
uv_run(loop, (uv_run_mode)(UV_RUN_ONCE | UV_RUN_NOWAIT));
while (uv_run(loop_, (uv_run_mode)(UV_RUN_ONCE | UV_RUN_NOWAIT)) > 0) {}
}
#ifdef ENABLE_ASYNC_DNS
@ -169,13 +173,14 @@ int LibuvEventPoll::translateEvents(EventPoll::EventType events)
return newEvents;
}
void LibuvEventPoll::poll_callback(uv_poll_t* handle, int status, int events)
void LibuvEventPoll::pollCallback(uv_poll_t* handle, poll_t* poll, int status,
int events)
{
if (status == -1) {
events = 0;
uv_err_t err = uv_last_error(handle->loop);
uv_err_t err = uv_last_error(loop_);
switch (err.code) {
case UV_EAGAIN:
case UV_EINTR:
return;
case UV_EOF:
case UV_ECONNABORTED:
@ -185,12 +190,22 @@ void LibuvEventPoll::poll_callback(uv_poll_t* handle, int status, int events)
case UV_EPIPE:
case UV_ESHUTDOWN:
events = IEV_HUP;
poll->entry->processEvents(events);
uv_poll_stop(handle);
uv_stop(loop_);
return;
default:
events = IEV_ERROR;
poll->entry->processEvents(events);
uv_poll_stop(handle);
uv_stop(loop_);
return;
}
}
KSocketEntry *p = reinterpret_cast<KSocketEntry*>(handle->data);
p->processEvents(events);
// Got something
poll->entry->processEvents(events);
uv_stop(loop_);
}
bool LibuvEventPoll::addEvents(sock_t socket,
@ -204,18 +219,22 @@ bool LibuvEventPoll::addEvents(sock_t socket,
if (poll == polls_.end()) {
throw std::logic_error("Invalid socket");
}
uv_poll_start(poll->second, (*i)->getEvents() & (IEV_READ | IEV_WRITE),
poll_callback);
poll->second->events = (*i)->getEvents();
uv_poll_start(&poll->second->p,
poll->second->events & (IEV_READ | IEV_WRITE), poll_callback);
}
else {
socketEntries_.insert(i, socketEntry);
event.addSelf(socketEntry);
uv_poll_t *poll = new uv_poll_t;
uv_poll_init_socket(loop, poll, socket);
poll->data = socketEntry.get();
uv_poll_start(poll, socketEntry->getEvents() & (IEV_READ | IEV_WRITE),
poll_callback);
poll_t *poll = new poll_t;
uv_poll_init_socket(loop_, &poll->p, socket);
poll->entry = socketEntry.get();
poll->eventer = this;
poll->events = socketEntry->getEvents();
poll->p.data = poll;
polls_[socket] = poll;
uv_poll_start(&poll->p, poll->events & (IEV_READ | IEV_WRITE),
poll_callback);
}
return true;
}
@ -246,16 +265,21 @@ bool LibuvEventPoll::deleteEvents(sock_t socket,
event.removeSelf(*i);
KPolls::iterator poll = polls_.find(socket);
if (poll == polls_.end()) {
return false;
}
if ((*i)->eventEmpty()) {
KPolls::iterator poll = polls_.find(socket);
if (poll == polls_.end()) {
return false;
}
uv_poll_stop(poll->second);
uv_close((uv_handle_t*)poll->second, close_callback<uv_poll_t>);
uv_poll_stop(&poll->second->p);
uv_close((uv_handle_t*)&poll->second->p, close_poll_callback);
polls_.erase(poll);
socketEntries_.erase(i);
}
else {
poll->second->events = (*i)->getEvents();
uv_poll_start(&poll->second->p,
poll->second->events & (IEV_READ | IEV_WRITE), poll_callback);
}
return true;
}

View File

@ -71,13 +71,20 @@ private:
friend int accumulateEvent(int events, const KEvent& event);
private:
uv_loop_t* loop;
uv_loop_t* loop_;
typedef std::set<SharedHandle<KSocketEntry>,
DerefLess<SharedHandle<KSocketEntry> > > KSocketEntrySet;
KSocketEntrySet socketEntries_;
typedef std::map<sock_t, uv_poll_t*> KPolls;
typedef struct {
uv_poll_t p;
KSocketEntry *entry;
LibuvEventPoll *eventer;
int events;
} poll_t;
typedef std::map<sock_t, poll_t*> KPolls;
KPolls polls_;
#ifdef ENABLE_ASYNC_DNS
@ -98,14 +105,21 @@ private:
#endif
static int translateEvents(EventPoll::EventType events);
static void poll_callback(uv_poll_t* handle, int status, int events);
static void close_poll_callback(uv_handle_t* handle) {
delete static_cast<poll_t*>(handle->data);
}
static void poll_callback(uv_poll_t* handle, int status, int events) {
poll_t* poll = static_cast<poll_t*>(handle->data);
poll->eventer->pollCallback(handle, poll, status, events);
}
void pollCallback(uv_poll_t* handle, poll_t *poll, int status, int events);
public:
LibuvEventPoll();
virtual ~LibuvEventPoll();
bool good() const { return loop; }
bool good() const { return loop_; }
virtual void poll(const struct timeval& tv);

View File

@ -240,7 +240,7 @@ void OptionParser::parse(Option& option, std::istream& is) const
if(handler) {
handler->parse(option, std::string(nv.second.first, nv.second.second));
} else {
A2_LOG_WARN(fmt("Unknown option: %s", line.c_str()));
//A2_LOG_WARN(fmt("Unknown option: %s", line.c_str()));
}
}
}