Port poll changes to LibuvEventPoll

pull/314/head
Nils Maier 2014-12-12 12:21:13 +01:00
parent 6f9ea555c2
commit 811c0f758d
2 changed files with 32 additions and 25 deletions

View File

@ -229,11 +229,11 @@ void LibuvEventPoll::pollCallback(KPoll* poll, int status, int events)
bool LibuvEventPoll::addEvents(sock_t socket, bool LibuvEventPoll::addEvents(sock_t socket,
const LibuvEventPoll::KEvent& event) const LibuvEventPoll::KEvent& event)
{ {
auto socketEntry = std::make_shared<KSocketEntry>(socket); auto i = socketEntries_.lower_bound(socket);
auto i = socketEntries_.lower_bound(socketEntry);
if (i != socketEntries_.end() && **i == *socketEntry) { if (i != socketEntries_.end() && i->first == socket) {
event.addSelf((*i).get()); auto& socketEntry = i->second;
event.addSelf(&socketEntry);
auto poll = polls_.find(socket); auto poll = polls_.find(socket);
if (poll == polls_.end()) { if (poll == polls_.end()) {
throw std::logic_error("Invalid socket"); throw std::logic_error("Invalid socket");
@ -242,9 +242,10 @@ bool LibuvEventPoll::addEvents(sock_t socket,
return true; return true;
} }
socketEntries_.insert(i, socketEntry); i = socketEntries_.insert(i, std::make_pair(socket, KSocketEntry(socket)));
event.addSelf(socketEntry.get()); auto& socketEntry = i->second;
auto poll = new KPoll(this, socketEntry.get(), socket); event.addSelf(&socketEntry);
auto poll = new KPoll(this, &socketEntry, socket);
polls_[socket] = poll; polls_[socket] = poll;
poll->start(); poll->start();
return true; return true;
@ -267,22 +268,22 @@ bool LibuvEventPoll::addEvents(sock_t socket, Command* command, int events,
bool LibuvEventPoll::deleteEvents(sock_t socket, bool LibuvEventPoll::deleteEvents(sock_t socket,
const LibuvEventPoll::KEvent& event) const LibuvEventPoll::KEvent& event)
{ {
auto socketEntry = std::make_shared<KSocketEntry>(socket); auto i = socketEntries_.find(socket);
auto i = socketEntries_.find(socketEntry);
if (i == socketEntries_.end()) { if (i == socketEntries_.end()) {
A2_LOG_DEBUG(fmt("Socket %d is not found in SocketEntries.", socket)); A2_LOG_DEBUG(fmt("Socket %d is not found in SocketEntries.", socket));
return false; return false;
} }
event.removeSelf((*i).get()); auto& socketEntry = (*i).second;
event.removeSelf(&socketEntry);
auto poll = polls_.find(socket); auto poll = polls_.find(socket);
if (poll == polls_.end()) { if (poll == polls_.end()) {
return false; return false;
} }
if ((*i)->eventEmpty()) { if (socketEntry.eventEmpty()) {
poll->second->close(); poll->second->close();
polls_.erase(poll); polls_.erase(poll);
socketEntries_.erase(i); socketEntries_.erase(i);
@ -312,25 +313,29 @@ bool LibuvEventPoll::deleteEvents(sock_t socket, Command* command,
bool LibuvEventPoll::addNameResolver(const std::shared_ptr<AsyncNameResolver>& resolver, bool LibuvEventPoll::addNameResolver(const std::shared_ptr<AsyncNameResolver>& resolver,
Command* command) Command* command)
{ {
auto entry = std::make_shared<KAsyncNameResolverEntry>(resolver, command); auto key = std::make_pair(resolver.get(), command);
auto itr = nameResolverEntries_.lower_bound(entry); auto itr = nameResolverEntries_.lower_bound(key);
if (itr != nameResolverEntries_.end() && *(*itr) == *entry) {
if(itr != std::end(nameResolverEntries_) && (*itr).first == key) {
return false; return false;
} }
nameResolverEntries_.insert(itr, entry);
entry->addSocketEvents(this); itr = nameResolverEntries_.insert
(itr, std::make_pair(key, KAsyncNameResolverEntry(resolver, command)));
(*itr).second.addSocketEvents(this);
return true; return true;
} }
bool LibuvEventPoll::deleteNameResolver(const std::shared_ptr<AsyncNameResolver>& resolver, bool LibuvEventPoll::deleteNameResolver(const std::shared_ptr<AsyncNameResolver>& resolver,
Command* command) Command* command)
{ {
auto entry = std::make_shared<KAsyncNameResolverEntry>(resolver, command); auto key = std::make_pair(resolver.get(), command);
auto itr = nameResolverEntries_.find(entry); auto itr = nameResolverEntries_.find(key);
if (itr == nameResolverEntries_.end()) { if(itr == std::end(nameResolverEntries_)) {
return false; return false;
} }
(*itr)->removeSocketEvents(this);
(*itr).second.removeSocketEvents(this);
nameResolverEntries_.erase(itr); nameResolverEntries_.erase(itr);
return true; return true;
} }

View File

@ -66,6 +66,10 @@ private:
class KSocketEntry: public SocketEntry<KCommandEvent, KADNSEvent> { class KSocketEntry: public SocketEntry<KCommandEvent, KADNSEvent> {
public: public:
KSocketEntry(sock_t socket); KSocketEntry(sock_t socket);
KSocketEntry(const KSocketEntry&) = delete;
KSocketEntry(KSocketEntry&&) = default;
int getEvents() const; int getEvents() const;
}; };
@ -105,15 +109,13 @@ private:
} }
}; };
typedef std::set<std::shared_ptr<KSocketEntry>, typedef std::map<sock_t, KSocketEntry> KSocketEntrySet;
DerefLess<std::shared_ptr<KSocketEntry> > > KSocketEntrySet;
typedef std::map<sock_t, KPoll*> KPolls; typedef std::map<sock_t, KPoll*> KPolls;
#ifdef ENABLE_ASYNC_DNS #ifdef ENABLE_ASYNC_DNS
typedef std::set<std::shared_ptr<KAsyncNameResolverEntry>, typedef std::map<std::pair<AsyncNameResolver*, Command*>,
DerefLess<std::shared_ptr<KAsyncNameResolverEntry> > > KAsyncNameResolverEntry> KAsyncNameResolverEntrySet;
KAsyncNameResolverEntrySet;
#endif // ENABLE_ASYNC_DNS #endif // ENABLE_ASYNC_DNS
uv_loop_t* loop_; uv_loop_t* loop_;