2010-04-21 Tatsuhiro Tsujikawa <t-tujikawa@users.sourceforge.net>

Added opensolaris port_associate() support.
	* src/DownloadEngineFactory.cc
	* src/Makefile.am
	* src/OptionHandlerFactory.cc
	* src/PortEventPoll.cc
	* src/PortEventPoll.h
	* src/SocketCore.cc
	* src/SocketCore.h
	* src/configure.ac
	* src/main.cc
	* src/prefs.cc
	* src/prefs.h
pull/1/head
Tatsuhiro Tsujikawa 2010-04-21 14:31:44 +00:00
parent c1047561b6
commit 9cf05b7115
15 changed files with 681 additions and 92 deletions

View File

@ -1,3 +1,18 @@
2010-04-21 Tatsuhiro Tsujikawa <t-tujikawa@users.sourceforge.net>
Added opensolaris port_associate() support.
* src/DownloadEngineFactory.cc
* src/Makefile.am
* src/OptionHandlerFactory.cc
* src/PortEventPoll.cc
* src/PortEventPoll.h
* src/SocketCore.cc
* src/SocketCore.h
* src/configure.ac
* src/main.cc
* src/prefs.cc
* src/prefs.h
2010-04-21 Tatsuhiro Tsujikawa <t-tujikawa@users.sourceforge.net>
Fixed the bug that _e is passed where e should be passed.

View File

@ -311,6 +311,12 @@
/* Define to 1 if you have the <poll.h> header file. */
#undef HAVE_POLL_H
/* Define to 1 if you have the `port_associate' function. */
#undef HAVE_PORT_ASSOCIATE
/* Define to 1 if you have the <port.h> header file. */
#undef HAVE_PORT_H
/* Define to 1 if you have the `posix_fallocate' function. */
#undef HAVE_POSIX_FALLOCATE

27
configure vendored
View File

@ -600,6 +600,8 @@ ac_func_list=
ac_subst_vars='am__EXEEXT_FALSE
am__EXEEXT_TRUE
LTLIBOBJS
HAVE_PORT_ASSOCIATE_FALSE
HAVE_PORT_ASSOCIATE_TRUE
HAVE_TIMEGETTIME_FALSE
HAVE_TIMEGETTIME_TRUE
HAVE_POLL_FALSE
@ -7956,6 +7958,7 @@ for ac_header in argz.h \
netdb.h \
netinet/in.h \
poll.h \
port.h \
stddef.h \
stdint.h \
stdio_ext.h \
@ -15148,6 +15151,26 @@ fi
;;
esac
for ac_func in port_associate
do :
ac_fn_cxx_check_func "$LINENO" "port_associate" "ac_cv_func_port_associate"
if test "x$ac_cv_func_port_associate" = x""yes; then :
cat >>confdefs.h <<_ACEOF
#define HAVE_PORT_ASSOCIATE 1
_ACEOF
have_port_associate=yes
fi
done
if test "x$have_port_associate" = "xyes"; then
HAVE_PORT_ASSOCIATE_TRUE=
HAVE_PORT_ASSOCIATE_FALSE='#'
else
HAVE_PORT_ASSOCIATE_TRUE='#'
HAVE_PORT_ASSOCIATE_FALSE=
fi
ac_fn_cxx_check_member "$LINENO" "struct sockaddr_in" "sin_len" "ac_cv_member_struct_sockaddr_in_sin_len" "#include <netinet/in.h>
"
if test "x$ac_cv_member_struct_sockaddr_in_sin_len" = x""yes; then :
@ -15498,6 +15521,10 @@ if test -z "${HAVE_TIMEGETTIME_TRUE}" && test -z "${HAVE_TIMEGETTIME_FALSE}"; th
as_fn_error "conditional \"HAVE_TIMEGETTIME\" was never defined.
Usually this means the macro was only invoked conditionally." "$LINENO" 5
fi
if test -z "${HAVE_PORT_ASSOCIATE_TRUE}" && test -z "${HAVE_PORT_ASSOCIATE_FALSE}"; then
as_fn_error "conditional \"HAVE_PORT_ASSOCIATE\" was never defined.
Usually this means the macro was only invoked conditionally." "$LINENO" 5
fi
: ${CONFIG_STATUS=./config.status}
ac_write_fail=0

View File

@ -249,6 +249,7 @@ AC_CHECK_HEADERS([argz.h \
netdb.h \
netinet/in.h \
poll.h \
port.h \
stddef.h \
stdint.h \
stdio_ext.h \
@ -413,6 +414,9 @@ case "$target" in
;;
esac
AC_CHECK_FUNCS([port_associate], [have_port_associate=yes])
AM_CONDITIONAL([HAVE_PORT_ASSOCIATE], [test "x$have_port_associate" = "xyes"])
AC_CHECK_MEMBER([struct sockaddr_in.sin_len],
[AC_DEFINE([HAVE_SOCKADDR_IN_SIN_LEN],[1],
[Define to 1 if struct sockaddr_in has sin_len member.])],

View File

@ -61,6 +61,9 @@
#ifdef HAVE_EPOLL
# include "EpollEventPoll.h"
#endif // HAVE_EPOLL
#ifdef HAVE_PORT_ASSOCIATE
# include "PortEventPoll.h"
#endif // HAVE_PORT_ASSOCIATE
#include "PollEventPoll.h"
#include "SelectEventPoll.h"
#include "DlAbortEx.h"
@ -81,8 +84,9 @@ DownloadEngineFactory::newDownloadEngine
const size_t MAX_CONCURRENT_DOWNLOADS =
op->getAsInt(PREF_MAX_CONCURRENT_DOWNLOADS);
SharedHandle<EventPoll> eventPoll;
const std::string& pollMethod = op->get(PREF_EVENT_POLL);
#ifdef HAVE_EPOLL
if(op->get(PREF_EVENT_POLL) == V_EPOLL) {
if(pollMethod == V_EPOLL) {
SharedHandle<EpollEventPoll> ep(new EpollEventPoll());
if(ep->good()) {
eventPoll = ep;
@ -92,16 +96,25 @@ DownloadEngineFactory::newDownloadEngine
}
} else
#endif // HAVE_EPLL
#ifdef HAVE_POLL
if(op->get(PREF_EVENT_POLL) == V_POLL) {
eventPoll.reset(new PollEventPoll());
} else
#endif // HAVE_POLL
if(op->get(PREF_EVENT_POLL) == V_SELECT) {
eventPoll.reset(new SelectEventPoll());
if(pollMethod == V_PORT) {
SharedHandle<PortEventPoll> pp(new PortEventPoll());
if(pp->good()) {
eventPoll = pp;
} else {
abort();
throw DL_ABORT_EX("Initializing PortEventPoll failed."
" Try --event-poll=select");
}
} else
#ifdef HAVE_POLL
if(pollMethod == V_POLL) {
eventPoll.reset(new PollEventPoll());
} else
#endif // HAVE_POLL
if(pollMethod == V_SELECT) {
eventPoll.reset(new SelectEventPoll());
} else {
abort();
}
DownloadEngineHandle e(new DownloadEngine(eventPoll));
e->option = op;

View File

@ -530,6 +530,10 @@ if HAVE_POLL
SRCS += PollEventPoll.cc PollEventPoll.h
endif # HAVE_POLL
if HAVE_PORT_ASSOCIATE
SRCS += PortEventPoll.cc PortEventPoll.h
endif # HAVE_PORT_ASSOCIATE
noinst_LIBRARIES = libaria2c.a
libaria2c_a_SOURCES = $(SRCS)
aria2c_LDADD = libaria2c.a @LIBINTL@ @ALLOCA@ @LIBGNUTLS_LIBS@\

View File

@ -280,6 +280,7 @@ bin_PROGRAMS = aria2c$(EXEEXT)
@HAVE_TIMEGETTIME_TRUE@am__append_27 = clock_gettime_mingw.cc clock_gettime_mingw.h
@HAVE_MACH_ABSOLUTE_TIME_TRUE@am__append_28 = clock_gettime_osx.cc clock_gettime_osx.h
@HAVE_POLL_TRUE@am__append_29 = PollEventPoll.cc PollEventPoll.h
@HAVE_PORT_ASSOCIATE_TRUE@am__append_30 = PortEventPoll.cc PortEventPoll.h
subdir = src
DIST_COMMON = $(srcdir)/Makefile.am $(srcdir)/Makefile.in alloca.c
ACLOCAL_M4 = $(top_srcdir)/aclocal.m4
@ -607,7 +608,8 @@ am__libaria2c_a_SOURCES_DIST = Socket.h SocketCore.cc SocketCore.h \
inet_aton.h localtime_r.c localtime_r.h strptime.c strptime.h \
timegm.c timegm.h daemon.cc daemon.h clock_gettime_mingw.cc \
clock_gettime_mingw.h clock_gettime_osx.cc clock_gettime_osx.h \
PollEventPoll.cc PollEventPoll.h
PollEventPoll.cc PollEventPoll.h PortEventPoll.cc \
PortEventPoll.h
@ENABLE_XML_RPC_TRUE@am__objects_1 = \
@ENABLE_XML_RPC_TRUE@ XmlRpcRequestParserController.$(OBJEXT) \
@ENABLE_XML_RPC_TRUE@ XmlRpcRequestParserStateMachine.$(OBJEXT) \
@ -789,7 +791,8 @@ am__objects_6 =
@HAVE_MACH_ABSOLUTE_TIME_TRUE@am__objects_28 = \
@HAVE_MACH_ABSOLUTE_TIME_TRUE@ clock_gettime_osx.$(OBJEXT)
@HAVE_POLL_TRUE@am__objects_29 = PollEventPoll.$(OBJEXT)
am__objects_30 = SocketCore.$(OBJEXT) Command.$(OBJEXT) \
@HAVE_PORT_ASSOCIATE_TRUE@am__objects_30 = PortEventPoll.$(OBJEXT)
am__objects_31 = SocketCore.$(OBJEXT) Command.$(OBJEXT) \
AbstractCommand.$(OBJEXT) \
InitiateConnectionCommandFactory.$(OBJEXT) \
DownloadCommand.$(OBJEXT) \
@ -876,8 +879,9 @@ am__objects_30 = SocketCore.$(OBJEXT) Command.$(OBJEXT) \
$(am__objects_18) $(am__objects_19) $(am__objects_20) \
$(am__objects_21) $(am__objects_22) $(am__objects_23) \
$(am__objects_24) $(am__objects_25) $(am__objects_26) \
$(am__objects_27) $(am__objects_28) $(am__objects_29)
am_libaria2c_a_OBJECTS = $(am__objects_30)
$(am__objects_27) $(am__objects_28) $(am__objects_29) \
$(am__objects_30)
am_libaria2c_a_OBJECTS = $(am__objects_31)
libaria2c_a_OBJECTS = $(am_libaria2c_a_OBJECTS)
am__installdirs = "$(DESTDIR)$(bindir)"
PROGRAMS = $(bin_PROGRAMS)
@ -1213,7 +1217,7 @@ SRCS = Socket.h SocketCore.cc SocketCore.h BinaryStream.h Command.cc \
$(am__append_19) $(am__append_20) $(am__append_21) \
$(am__append_22) $(am__append_23) $(am__append_24) \
$(am__append_25) $(am__append_26) $(am__append_27) \
$(am__append_28) $(am__append_29)
$(am__append_28) $(am__append_29) $(am__append_30)
noinst_LIBRARIES = libaria2c.a
libaria2c_a_SOURCES = $(SRCS)
aria2c_LDADD = libaria2c.a @LIBINTL@ @ALLOCA@ @LIBGNUTLS_LIBS@\
@ -1553,6 +1557,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/PiecedSegment.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Platform.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/PollEventPoll.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/PortEventPoll.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/PriorityPieceSelector.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ProtocolDetector.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/RangeBtMessage.Po@am__quote@

View File

@ -210,6 +210,9 @@ OptionHandlers OptionHandlerFactory::createOptionHandlers()
#ifdef HAVE_EPOLL
V_EPOLL,
#endif // HAVE_EPOLL
#ifdef HAVE_PORT_ASSOCIATE
V_PORT,
#endif // HAVE_PORT_ASSOCIATE
#ifdef HAVE_POLL
V_POLL,
#endif // HAVE_POLL
@ -220,7 +223,9 @@ OptionHandlers OptionHandlerFactory::createOptionHandlers()
TEXT_EVENT_POLL,
#ifdef HAVE_EPOLL
V_EPOLL,
#else // !HAVE_EPOLL
#elif HAVE_PORT_ASSOCIATE
V_PORT,
#else
V_SELECT,
#endif // !HAVE_EPOLL
std::vector<std::string>

297
src/PortEventPoll.cc Normal file
View File

@ -0,0 +1,297 @@
/* <!-- copyright */
/*
* aria2 - The high speed download utility
*
* Copyright (C) 2010 Tatsuhiro Tsujikawa
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* In addition, as a special exception, the copyright holders give
* permission to link the code of portions of this program with the
* OpenSSL library under certain conditions as described in each
* individual source file, and distribute linked combinations
* including the two.
* You must obey the GNU General Public License in all respects
* for all of the code used other than OpenSSL. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you
* do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source
* files in the program, then also delete it here.
*/
/* copyright --> */
#include "PortEventPoll.h"
#include <cstring>
#include <algorithm>
#include <numeric>
#include "Command.h"
#include "LogFactory.h"
#include "Logger.h"
namespace aria2 {
PortEventPoll::KSocketEntry::KSocketEntry(sock_t s):
SocketEntry<KCommandEvent, KADNSEvent>(s) {}
int accumulateEvent(int events, const PortEventPoll::KEvent& event)
{
return events|event.getEvents();
}
PortEventPoll::A2PortEvent PortEventPoll::KSocketEntry::getEvents()
{
A2PortEvent portEvent;
portEvent.socketEntry = this;
#ifdef ENABLE_ASYNC_DNS
portEvent.events =
std::accumulate(_adnsEvents.begin(),
_adnsEvents.end(),
std::accumulate(_commandEvents.begin(),
_commandEvents.end(), 0, accumulateEvent),
accumulateEvent);
#else // !ENABLE_ASYNC_DNS
portEvent.events =
std::accumulate(_commandEvents.begin(), _commandEvents.end(), 0,
accumulateEvent);
#endif // !ENABLE_ASYNC_DNS
return portEvent;
}
PortEventPoll::PortEventPoll():
_portEventsSize(PORT_EVENTS_SIZE),
_portEvents(new port_event_t[_portEventsSize]),
_logger(LogFactory::getInstance())
{
_port = port_create();
}
PortEventPoll::~PortEventPoll()
{
if(_port != -1) {
int r;
while((r = close(_port)) == -1 && errno == EINTR);
if(r == -1) {
_logger->error("Error occurred while closing port %d: %s",
_port, strerror(errno));
}
}
delete [] _portEvents;
}
bool PortEventPoll::good() const
{
return _port != -1;
}
void PortEventPoll::poll(const struct timeval& tv)
{
struct timespec timeout = { tv.tv_sec, tv.tv_usec*1000 };
int res;
uint_t nget = 1;
// If port_getn was interrupted by signal, it can consume events but
// not updat nget!. For this very annoying bug, we have to check
// actually event is filled or not.
_portEvents[0].portev_user = (void*)-1;
res = port_getn(_port, _portEvents, _portEventsSize, &nget, &timeout);
if(res == 0 ||
(res == -1 && (errno == ETIME || errno == EINTR) &&
_portEvents[0].portev_user != (void*)-1)) {
if(_logger->debug()) {
_logger->debug("nget=%u", nget);
}
for(uint_t i = 0; i < nget; ++i) {
const port_event_t& pev = _portEvents[i];
KSocketEntry* p = reinterpret_cast<KSocketEntry*>(pev.portev_user);
p->processEvents(pev.portev_events);
int r = port_associate(_port, PORT_SOURCE_FD, pev.portev_object,
p->getEvents().events, p);
if(r == -1) {
_logger->error("port_associate failed for file descriptor %d: cause %s",
pev.portev_object, strerror(errno));
}
}
}
#ifdef ENABLE_ASYNC_DNS
// It turns out that we have to call ares_process_fd before ares's
// own timeout and ares may create new sockets or closes socket in
// their API. So we call ares_process_fd for all ares_channel and
// re-register their sockets.
for(std::deque<SharedHandle<KAsyncNameResolverEntry> >::iterator i =
_nameResolverEntries.begin(), eoi = _nameResolverEntries.end();
i != eoi; ++i) {
(*i)->processTimeout();
(*i)->removeSocketEvents(this);
(*i)->addSocketEvents(this);
}
#endif // ENABLE_ASYNC_DNS
// TODO timeout of name resolver is determined in Command(AbstractCommand,
// DHTEntryPoint...Command)
}
static int translateEvents(EventPoll::EventType events)
{
int newEvents = 0;
if(EventPoll::EVENT_READ&events) {
newEvents |= PortEventPoll::IEV_READ;
}
if(EventPoll::EVENT_WRITE&events) {
newEvents |= PortEventPoll::IEV_WRITE;
}
if(EventPoll::EVENT_ERROR&events) {
newEvents |= PortEventPoll::IEV_ERROR;
}
if(EventPoll::EVENT_HUP&events) {
newEvents |= PortEventPoll::IEV_HUP;
}
return newEvents;
}
bool PortEventPoll::addEvents(sock_t socket,
const PortEventPoll::KEvent& event)
{
SharedHandle<KSocketEntry> socketEntry(new KSocketEntry(socket));
std::deque<SharedHandle<KSocketEntry> >::iterator i =
std::lower_bound(_socketEntries.begin(), _socketEntries.end(), socketEntry);
int r = 0;
if(i != _socketEntries.end() && (*i) == socketEntry) {
event.addSelf(*i);
A2PortEvent pv = (*i)->getEvents();
r = port_associate(_port, PORT_SOURCE_FD, (*i)->getSocket(),
pv.events, pv.socketEntry);
} else {
_socketEntries.insert(i, socketEntry);
if(_socketEntries.size() > _portEventsSize) {
_portEventsSize *= 2;
delete [] _portEvents;
_portEvents = new port_event_t[_portEventsSize];
}
event.addSelf(socketEntry);
A2PortEvent pv = socketEntry->getEvents();
r = port_associate(_port, PORT_SOURCE_FD, socketEntry->getSocket(),
pv.events, pv.socketEntry);
}
if(r == -1) {
if(_logger->debug()) {
_logger->debug("Failed to add socket event %d:%s",
socket, strerror(errno));
}
return false;
} else {
return true;
}
}
bool PortEventPoll::addEvents(sock_t socket, Command* command,
EventPoll::EventType events)
{
int portEvents = translateEvents(events);
return addEvents(socket, KCommandEvent(command, portEvents));
}
#ifdef ENABLE_ASYNC_DNS
bool PortEventPoll::addEvents(sock_t socket, Command* command, int events,
const SharedHandle<AsyncNameResolver>& rs)
{
return addEvents(socket, KADNSEvent(rs, command, socket, events));
}
#endif // ENABLE_ASYNC_DNS
bool PortEventPoll::deleteEvents(sock_t socket,
const PortEventPoll::KEvent& event)
{
SharedHandle<KSocketEntry> socketEntry(new KSocketEntry(socket));
std::deque<SharedHandle<KSocketEntry> >::iterator i =
std::lower_bound(_socketEntries.begin(), _socketEntries.end(), socketEntry);
if(i != _socketEntries.end() && (*i) == socketEntry) {
event.removeSelf(*i);
int r = 0;
if((*i)->eventEmpty()) {
r = port_dissociate(_port, PORT_SOURCE_FD, (*i)->getSocket());
_socketEntries.erase(i);
} else {
A2PortEvent pv = (*i)->getEvents();
r = port_associate(_port, PORT_SOURCE_FD, (*i)->getSocket(),
pv.events, pv.socketEntry);
}
if(r == -1) {
if(_logger->debug()) {
_logger->debug("Failed to delete socket event:%s", strerror(errno));
}
return false;
} else {
return true;
}
} else {
if(_logger->debug()) {
_logger->debug("Socket %d is not found in SocketEntries.", socket);
}
return false;
}
}
#ifdef ENABLE_ASYNC_DNS
bool PortEventPoll::deleteEvents(sock_t socket, Command* command,
const SharedHandle<AsyncNameResolver>& rs)
{
return deleteEvents(socket, KADNSEvent(rs, command, socket, 0));
}
#endif // ENABLE_ASYNC_DNS
bool PortEventPoll::deleteEvents(sock_t socket, Command* command,
EventPoll::EventType events)
{
int portEvents = translateEvents(events);
return deleteEvents(socket, KCommandEvent(command, portEvents));
}
#ifdef ENABLE_ASYNC_DNS
bool PortEventPoll::addNameResolver
(const SharedHandle<AsyncNameResolver>& resolver, Command* command)
{
SharedHandle<KAsyncNameResolverEntry> entry
(new KAsyncNameResolverEntry(resolver, command));
std::deque<SharedHandle<KAsyncNameResolverEntry> >::iterator itr =
std::find(_nameResolverEntries.begin(), _nameResolverEntries.end(), entry);
if(itr == _nameResolverEntries.end()) {
_nameResolverEntries.push_back(entry);
entry->addSocketEvents(this);
return true;
} else {
return false;
}
}
bool PortEventPoll::deleteNameResolver
(const SharedHandle<AsyncNameResolver>& resolver, Command* command)
{
SharedHandle<KAsyncNameResolverEntry> entry
(new KAsyncNameResolverEntry(resolver, command));
std::deque<SharedHandle<KAsyncNameResolverEntry> >::iterator itr =
std::find(_nameResolverEntries.begin(), _nameResolverEntries.end(), entry);
if(itr == _nameResolverEntries.end()) {
return false;
} else {
(*itr)->removeSocketEvents(this);
_nameResolverEntries.erase(itr);
return true;
}
}
#endif // ENABLE_ASYNC_DNS
} // namespace aria2

136
src/PortEventPoll.h Normal file
View File

@ -0,0 +1,136 @@
/* <!-- copyright */
/*
* aria2 - The high speed download utility
*
* Copyright (C) 2010 Tatsuhiro Tsujikawa
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* In addition, as a special exception, the copyright holders give
* permission to link the code of portions of this program with the
* OpenSSL library under certain conditions as described in each
* individual source file, and distribute linked combinations
* including the two.
* You must obey the GNU General Public License in all respects
* for all of the code used other than OpenSSL. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you
* do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source
* files in the program, then also delete it here.
*/
/* copyright --> */
#ifndef _D_PORT_EVENT_POLL_H_
#define _D_PORT_EVENT_POLL_H_
#include "EventPoll.h"
#ifdef HAVE_PORT_H
# include <port.h>
#endif // HAVE_PORT_H
#include <deque>
#include "Event.h"
#ifdef ENABLE_ASYNC_DNS
# include "AsyncNameResolver.h"
#endif // ENABLE_ASYNC_DNS
namespace aria2 {
class Logger;
class PortEventPoll : public EventPoll {
private:
class KSocketEntry;
typedef Event<KSocketEntry> KEvent;
typedef CommandEvent<KSocketEntry, PortEventPoll> KCommandEvent;
typedef ADNSEvent<KSocketEntry, PortEventPoll> KADNSEvent;
typedef AsyncNameResolverEntry<PortEventPoll> KAsyncNameResolverEntry;
friend class AsyncNameResolverEntry<PortEventPoll>;
struct A2PortEvent {
int events;
KSocketEntry* socketEntry;
};
class KSocketEntry:
public SocketEntry<KCommandEvent, KADNSEvent> {
public:
KSocketEntry(sock_t socket);
A2PortEvent getEvents();
};
friend int accumulateEvent(int events, const KEvent& event);
private:
std::deque<SharedHandle<KSocketEntry> > _socketEntries;
#ifdef ENABLE_ASYNC_DNS
std::deque<SharedHandle<KAsyncNameResolverEntry> > _nameResolverEntries;
#endif // ENABLE_ASYNC_DNS
int _port;
size_t _portEventsSize;
port_event_t* _portEvents;
static const size_t PORT_EVENTS_SIZE = 1024;
Logger* _logger;
bool addEvents(sock_t socket, const KEvent& event);
bool deleteEvents(sock_t socket, const KEvent& event);
bool addEvents(sock_t socket, Command* command, int events,
const SharedHandle<AsyncNameResolver>& rs);
bool deleteEvents(sock_t socket, Command* command,
const SharedHandle<AsyncNameResolver>& rs);
public:
PortEventPoll();
bool good() const;
virtual ~PortEventPoll();
virtual void poll(const struct timeval& tv);
virtual bool addEvents(sock_t socket,
Command* command, EventPoll::EventType events);
virtual bool deleteEvents(sock_t socket,
Command* command, EventPoll::EventType events);
#ifdef ENABLE_ASYNC_DNS
virtual bool addNameResolver(const SharedHandle<AsyncNameResolver>& resolver,
Command* command);
virtual bool deleteNameResolver
(const SharedHandle<AsyncNameResolver>& resolver, Command* command);
#endif // ENABLE_ASYNC_DNS
static const int IEV_READ = POLLIN;
static const int IEV_WRITE = POLLOUT;
static const int IEV_ERROR = POLLERR;
static const int IEV_HUP = POLLHUP;
};
} // namespace aria2
#endif // _D_PORT_EVENT_POLL_H_

View File

@ -38,6 +38,9 @@
#ifdef HAVE_IFADDRS_H
# include <ifaddrs.h>
#endif // HAVE_IFADDRS_H
#ifdef HAVE_PORT_H
# include <port.h>
#endif // HAVE_PORT_H
#include <cerrno>
#include <cstring>
@ -148,12 +151,12 @@ SocketCore::SocketCore(sock_t sockfd, int sockType):_sockType(sockType), sockfd(
void SocketCore::init()
{
#ifdef HAVE_EPOLL
_epfd = -1;
#endif // HAVE_EPOLL
#ifdef HAVE_PORT_ASSOCIATE
_portfd = -1;
#endif // HAVE_PORT_ASSOCIATE
blocking = true;
secure = 0;
@ -175,15 +178,16 @@ void SocketCore::init()
SocketCore::~SocketCore() {
closeConnection();
#ifdef HAVE_EPOLL
if(_epfd != -1) {
CLOSE(_epfd);
}
#endif // HAVE_EPOLL
#ifdef HAVE_PORT_ASSOCIATE
if(_portfd != -1) {
CLOSE(_portfd);
}
#endif // HAVE_PORT_ASSOCIATE
#ifdef HAVE_LIBGNUTLS
delete [] peekBuf;
#endif // HAVE_LIBGNUTLS
@ -587,6 +591,19 @@ void SocketCore::initEPOLL()
#endif // HAVE_EPOLL
#ifdef HAVE_PORT_ASSOCIATE
void SocketCore::initPort()
{
if((_portfd = port_create()) == -1) {
throw DL_RETRY_EX(StringFormat("port_create failed:%s", errorMsg()).str());
}
if(port_associate(_portfd, PORT_SOURCE_FD, sockfd, POLLIN|POLLOUT, 0) == -1) {
throw DL_RETRY_EX
(StringFormat("port_associate failed:%s", errorMsg()).str());
}
}
#endif // HAVE_PORT_ASSOCIATE
bool SocketCore::isWritable(time_t timeout)
{
#ifdef HAVE_EPOLL
@ -607,49 +624,67 @@ bool SocketCore::isWritable(time_t timeout)
}
} else
#endif // HAVE_EPOLL
#ifdef HAVE_POLL
if(_pollMethod == SocketCore::POLL_METHOD_POLL) {
struct pollfd p;
p.fd = sockfd;
p.events = POLLOUT;
int r;
while((r = poll(&p, 1, timeout*1000)) == -1 && errno == EINTR);
if(r > 0) {
return p.revents&(POLLOUT|POLLHUP|POLLERR);
} else if(r == 0) {
#ifdef HAVE_PORT_ASSOCIATE
if(_pollMethod == SocketCore::POLL_METHOD_PORT) {
if(_portfd == -1) {
initPort();
}
struct timespec ts = { timeout, 0 };
port_event_t portEvent;
int r = port_get(_portfd, &portEvent, &ts);
if(r == 0) {
return portEvent.portev_events&(POLLOUT|POLLHUP|POLLERR);
} else if(r == -1 && (errno == ETIME || errno == EINTR)) {
return false;
} else {
throw DL_RETRY_EX
(StringFormat(EX_SOCKET_CHECK_WRITABLE, errorMsg()).str());
}
} else
#endif // HAVE_POLL
if(_pollMethod == SocketCore::POLL_METHOD_SELECT) {
fd_set fds;
FD_ZERO(&fds);
FD_SET(sockfd, &fds);
struct timeval tv;
tv.tv_sec = timeout;
tv.tv_usec = 0;
int r = select(sockfd+1, NULL, &fds, NULL, &tv);
if(r == 1) {
return true;
#endif // HAVE_PORT_ASSOCIATE
#ifdef HAVE_POLL
if(_pollMethod == SocketCore::POLL_METHOD_POLL) {
struct pollfd p;
p.fd = sockfd;
p.events = POLLOUT;
int r;
while((r = poll(&p, 1, timeout*1000)) == -1 && errno == EINTR);
if(r > 0) {
return p.revents&(POLLOUT|POLLHUP|POLLERR);
} else if(r == 0) {
// time out
return false;
} else {
if(SOCKET_ERRNO == A2_EINPROGRESS || SOCKET_ERRNO == A2_EINTR) {
throw DL_RETRY_EX
(StringFormat(EX_SOCKET_CHECK_WRITABLE, errorMsg()).str());
}
} else
#endif // HAVE_POLL
if(_pollMethod == SocketCore::POLL_METHOD_SELECT) {
fd_set fds;
FD_ZERO(&fds);
FD_SET(sockfd, &fds);
struct timeval tv;
tv.tv_sec = timeout;
tv.tv_usec = 0;
int r = select(sockfd+1, NULL, &fds, NULL, &tv);
if(r == 1) {
return true;
} else if(r == 0) {
// time out
return false;
} else {
throw DL_RETRY_EX
(StringFormat(EX_SOCKET_CHECK_WRITABLE, errorMsg()).str());
if(SOCKET_ERRNO == A2_EINPROGRESS || SOCKET_ERRNO == A2_EINTR) {
return false;
} else {
throw DL_RETRY_EX
(StringFormat(EX_SOCKET_CHECK_WRITABLE, errorMsg()).str());
}
}
} else {
abort();
}
} else {
abort();
}
}
bool SocketCore::isReadable(time_t timeout)
@ -679,49 +714,67 @@ bool SocketCore::isReadable(time_t timeout)
}
} else
#endif // HAVE_EPOLL
#ifdef HAVE_POLL
if(_pollMethod == SocketCore::POLL_METHOD_POLL) {
struct pollfd p;
p.fd = sockfd;
p.events = POLLIN;
int r;
while((r = poll(&p, 1, timeout*1000)) == -1 && errno == EINTR);
if(r > 0) {
return p.revents&(POLLIN|POLLHUP|POLLERR);
} else if(r == 0) {
#ifdef HAVE_PORT_ASSOCIATE
if(_pollMethod == SocketCore::POLL_METHOD_PORT) {
if(_portfd == -1) {
initPort();
}
struct timespec ts = { timeout, 0 };
port_event_t portEvent;
int r = port_get(_portfd, &portEvent, &ts);
if(r == 0) {
return portEvent.portev_events&(POLLIN|POLLHUP|POLLERR);
} else if(r == -1 && (errno == ETIME || errno == EINTR)) {
return false;
} else {
throw DL_RETRY_EX
(StringFormat(EX_SOCKET_CHECK_READABLE, errorMsg()).str());
}
} else
#endif // HAVE_POLL
if(_pollMethod == SocketCore::POLL_METHOD_SELECT) {
fd_set fds;
FD_ZERO(&fds);
FD_SET(sockfd, &fds);
struct timeval tv;
tv.tv_sec = timeout;
tv.tv_usec = 0;
int r = select(sockfd+1, &fds, NULL, NULL, &tv);
if(r == 1) {
return true;
#endif // HAVE_PORT_ASSOCIATE
#ifdef HAVE_POLL
if(_pollMethod == SocketCore::POLL_METHOD_POLL) {
struct pollfd p;
p.fd = sockfd;
p.events = POLLIN;
int r;
while((r = poll(&p, 1, timeout*1000)) == -1 && errno == EINTR);
if(r > 0) {
return p.revents&(POLLIN|POLLHUP|POLLERR);
} else if(r == 0) {
// time out
return false;
} else {
if(SOCKET_ERRNO == A2_EINPROGRESS || SOCKET_ERRNO == A2_EINTR) {
throw DL_RETRY_EX
(StringFormat(EX_SOCKET_CHECK_READABLE, errorMsg()).str());
}
} else
#endif // HAVE_POLL
if(_pollMethod == SocketCore::POLL_METHOD_SELECT) {
fd_set fds;
FD_ZERO(&fds);
FD_SET(sockfd, &fds);
struct timeval tv;
tv.tv_sec = timeout;
tv.tv_usec = 0;
int r = select(sockfd+1, &fds, NULL, NULL, &tv);
if(r == 1) {
return true;
} else if(r == 0) {
// time out
return false;
} else {
throw DL_RETRY_EX
(StringFormat(EX_SOCKET_CHECK_READABLE, errorMsg()).str());
if(SOCKET_ERRNO == A2_EINPROGRESS || SOCKET_ERRNO == A2_EINTR) {
return false;
} else {
throw DL_RETRY_EX
(StringFormat(EX_SOCKET_CHECK_READABLE, errorMsg()).str());
}
}
} else {
abort();
}
} else {
abort();
}
}
#ifdef HAVE_LIBSSL
@ -1274,6 +1327,13 @@ void SocketCore::useEpoll()
}
#endif // HAVE_EPOLL
#ifdef HAVE_PORT_ASSOCIATE
void SocketCore::usePort()
{
_pollMethod = SocketCore::POLL_METHOD_PORT;
}
#endif // HAVE_PORT_ASSOCIATE
#ifdef HAVE_POLL
void SocketCore::usePoll()
{

View File

@ -87,8 +87,13 @@ private:
#endif // HAVE_EPOLL
#ifdef HAVE_PORT_ASSOCIATE
int _portfd;
#endif // HAVE_PORT_ASSOCIATE
enum PollMethod {
POLL_METHOD_EPOLL,
POLL_METHOD_PORT,
POLL_METHOD_POLL,
POLL_METHOD_SELECT
};
@ -134,10 +139,11 @@ private:
void bind(const struct sockaddr* addr, socklen_t addrlen);
#ifdef HAVE_EPOLL
void initEPOLL();
#endif // HAVE_EPOLL
#ifdef HAVE_PORT_ASSOCIATE
void initPort();
#endif // HAVE_PORT_ASSOCIATE
void setSockOpt(int level, int optname, void* optval, socklen_t optlen);
@ -362,6 +368,9 @@ public:
#ifdef HAVE_EPOLL
static void useEpoll();
#endif // HAVE_EPOLL
#ifdef HAVE_PORT_ASSOCIATE
static void usePort();
#endif // HAVE_PORT_ASSOCIATE
#ifdef HAVE_POLL
static void usePoll();
#endif // HAVE_POLL

View File

@ -186,19 +186,25 @@ downloadresultcode::RESULT main(int argc, char* argv[])
if(op->getAsBool(PREF_QUIET)) {
LogFactory::setConsoleOutput(false);
}
const std::string& pollMethod = op->get(PREF_EVENT_POLL);
#ifdef HAVE_EPOLL
if(op->get(PREF_EVENT_POLL) == V_EPOLL) {
if(pollMethod == V_EPOLL) {
SocketCore::useEpoll();
} else
#endif // HAVE_EPOLL
#ifdef HAVE_POLL
if(op->get(PREF_EVENT_POLL) == V_POLL) {
SocketCore::usePoll();
#ifdef HAVE_PORT_ASSOCIATE
if(pollMethod == V_PORT) {
SocketCore::usePort();
} else
#endif // HAVE_PORT_ASSOCIATE
#ifdef HAVE_POLL
if(pollMethod == V_POLL) {
SocketCore::usePoll();
} else
#endif // HAVE_POLL
if(op->get(PREF_EVENT_POLL) == V_SELECT) {
SocketCore::useSelect();
}
if(pollMethod == V_SELECT) {
SocketCore::useSelect();
}
downloadresultcode::RESULT exitStatus = downloadresultcode::FINISHED;
Logger* logger = LogFactory::getInstance();

View File

@ -150,6 +150,7 @@ const std::string PREF_MAX_FILE_NOT_FOUND("max-file-not-found");
// value: epoll | select
const std::string PREF_EVENT_POLL("event-poll");
const std::string V_EPOLL("epoll");
const std::string V_PORT("port");
const std::string V_POLL("poll");
const std::string V_SELECT("select");
// value: 1*digit

View File

@ -154,6 +154,7 @@ extern const std::string PREF_MAX_FILE_NOT_FOUND;
// value: epoll | select
extern const std::string PREF_EVENT_POLL;
extern const std::string V_EPOLL;
extern const std::string V_PORT;
extern const std::string V_POLL;
extern const std::string V_SELECT;
// value: 1*digit