2010-02-23 Tatsuhiro Tsujikawa <t-tujikawa@users.sourceforge.net>

Added --bt-lpd-interface option to specify the interface to use
	for Local Peer Discovery. LpdMessageDispatcher object now has its
	own socket. LpdMessageReceiver's socket is binded to multicast
	address to only receive multicast packets.
	* src/BtSetup.cc
	* src/LpdMessageDispatcher.cc
	* src/LpdMessageDispatcher.h
	* src/LpdMessageReceiver.cc
	* src/LpdMessageReceiver.h
	* src/LpdReceiveMessageCommand.cc
	* src/LpdReceiveMessageCommand.h
	* src/OptionHandlerFactory.cc
	* src/SocketCore.cc
	* src/SocketCore.h
	* src/prefs.cc
	* src/prefs.h
	* src/usage_text.h
	* test/LpdMessageDispatcherTest.cc
	* test/LpdMessageReceiverTest.cc
pull/1/head
Tatsuhiro Tsujikawa 2010-02-22 15:58:05 +00:00
parent 6f0fff2a5f
commit 6ae5882f3c
16 changed files with 231 additions and 50 deletions

View File

@ -1,3 +1,25 @@
2010-02-23 Tatsuhiro Tsujikawa <t-tujikawa@users.sourceforge.net>
Added --bt-lpd-interface option to specify the interface to use
for Local Peer Discovery. LpdMessageDispatcher object now has its
own socket. LpdMessageReceiver's socket is binded to multicast
address to only receive multicast packets.
* src/BtSetup.cc
* src/LpdMessageDispatcher.cc
* src/LpdMessageDispatcher.h
* src/LpdMessageReceiver.cc
* src/LpdMessageReceiver.h
* src/LpdReceiveMessageCommand.cc
* src/LpdReceiveMessageCommand.h
* src/OptionHandlerFactory.cc
* src/SocketCore.cc
* src/SocketCore.h
* src/prefs.cc
* src/prefs.h
* src/usage_text.h
* test/LpdMessageDispatcherTest.cc
* test/LpdMessageReceiverTest.cc
2010-02-22 Tatsuhiro Tsujikawa <t-tujikawa@users.sourceforge.net>
Put the portion of code to get interface addresses in

View File

@ -33,6 +33,9 @@
*/
/* copyright --> */
#include "BtSetup.h"
#include <cstring>
#include "RequestGroup.h"
#include "DownloadEngine.h"
#include "Option.h"
@ -172,35 +175,62 @@ void BtSetup::setup(std::deque<Command*>& commands,
}
if(option->getAsBool(PREF_BT_ENABLE_LPD) &&
(metadataGetMode || torrentAttrs[bittorrent::PRIVATE].i() == 0)) {
if(LpdReceiveMessageCommand::getNumInstance() == 0) {
_logger->info("Initializing LpdMessageReceiver.");
SharedHandle<LpdMessageReceiver> receiver
(new LpdMessageReceiver(LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT));
try {
receiver->init();
receiver->getSocket()->setMulticastTtl(1);
} catch(RecoverableException& e) {
_logger->info(EX_EXCEPTION_CAUGHT, e);
receiver.reset();
bool initialized = false;
const std::string& lpdInterface = option->get(PREF_BT_LPD_INTERFACE);
if(lpdInterface.empty()) {
if(receiver->init("")) {
initialized = true;
}
} else {
std::vector<std::pair<sockaddr_storage, socklen_t> > ifAddrs;
getInterfaceAddress(ifAddrs, lpdInterface, AF_INET);
for(std::vector<std::pair<sockaddr_storage, socklen_t> >::const_iterator
i = ifAddrs.begin(); i != ifAddrs.end(); ++i) {
sockaddr_in addr;
memcpy(&addr, &(*i).first, (*i).second);
if(receiver->init(inet_ntoa(addr.sin_addr))) {
initialized = true;
break;
}
}
}
if(!receiver.isNull()) {
if(initialized) {
_logger->info("LpdMessageReceiver initialized. multicastAddr=%s:%u,"
" localAddr=%s",
LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT,
receiver->getLocalAddress().c_str());
LpdReceiveMessageCommand* cmd =
LpdReceiveMessageCommand::getInstance(e, receiver);
e->commands.push_back(cmd);
} else {
_logger->info("LpdMessageReceiver not initialized.");
}
}
if(LpdReceiveMessageCommand::getNumInstance()) {
const unsigned char* infoHash =
bittorrent::getInfoHash(requestGroup->getDownloadContext());
SharedHandle<LpdMessageReceiver> receiver =
LpdReceiveMessageCommand::getInstance()->getLpdMessageReceiver();
_logger->info("Initializing LpdMessageDispatcher.");
SharedHandle<LpdMessageDispatcher> dispatcher
(new LpdMessageDispatcher
(std::string(&infoHash[0], &infoHash[INFO_HASH_LENGTH]),
btRuntime->getListenPort(),
LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT,
LpdReceiveMessageCommand::getInstance()->getReceiverSocket()));
LpdDispatchMessageCommand* cmd =
new LpdDispatchMessageCommand(e->newCUID(), dispatcher, e);
cmd->setBtRuntime(btRuntime);
e->commands.push_back(cmd);
LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT));
if(dispatcher->init(receiver->getLocalAddress(), /*ttl*/1, /*loop*/0)) {
_logger->info("LpdMessageDispatcher initialized.");
LpdDispatchMessageCommand* cmd =
new LpdDispatchMessageCommand(e->newCUID(), dispatcher, e);
cmd->setBtRuntime(btRuntime);
e->commands.push_back(cmd);
} else {
_logger->info("LpdMessageDispatcher not initialized.");
}
}
}
time_t btStopTimeout = option->getAsInt(PREF_BT_STOP_TIMEOUT);

View File

@ -44,11 +44,9 @@ namespace aria2 {
LpdMessageDispatcher::LpdMessageDispatcher
(const std::string& infoHash, uint16_t port,
const std::string& multicastAddress, uint16_t multicastPort,
const SharedHandle<SocketCore>& socket,
time_t interval):
_infoHash(infoHash),
_port(port),
_socket(socket),
_multicastAddress(multicastAddress),
_multicastPort(multicastPort),
_timer(0),
@ -57,6 +55,33 @@ LpdMessageDispatcher::LpdMessageDispatcher
_infoHash, _port)),
_logger(LogFactory::getInstance()) {}
bool LpdMessageDispatcher::init(const std::string& localAddr,
unsigned char ttl, unsigned char loop)
{
try {
_socket.reset(new SocketCore(SOCK_DGRAM));
_socket->create(AF_INET);
if(_logger->debug()) {
_logger->debug("Setting multicast outgoing interface=%s",
localAddr.c_str());
}
_socket->setMulticastInterface(localAddr);
if(_logger->debug()) {
_logger->debug("Setting multicast ttl=%u",static_cast<unsigned int>(ttl));
}
_socket->setMulticastTtl(ttl);
if(_logger->debug()) {
_logger->debug("Setting multicast loop=%u",
static_cast<unsigned int>(loop));
}
_socket->setMulticastLoop(loop);
return true;
} catch(RecoverableException& e) {
_logger->error("Failed to initialize LpdMessageDispatcher.", e);
}
return false;
}
bool LpdMessageDispatcher::sendMessage()
{
return

View File

@ -47,9 +47,9 @@ class Logger;
class LpdMessageDispatcher {
private:
SharedHandle<SocketCore> _socket;
std::string _infoHash;
uint16_t _port;
SharedHandle<SocketCore> _socket;
std::string _multicastAddress;
uint16_t _multicastPort;
Time _timer;
@ -60,9 +60,11 @@ public:
LpdMessageDispatcher
(const std::string& infoHash, uint16_t port,
const std::string& multicastAddr, uint16_t multicastPort,
const SharedHandle<SocketCore>& socket,
time_t interval = 5*60);
// No throw
bool init(const std::string& localAddr, unsigned char ttl,unsigned char loop);
// Returns true if _timer reached announce interval, which is by
// default 5mins.
bool isAnnounceReady() const;

View File

@ -48,15 +48,19 @@ LpdMessageReceiver::LpdMessageReceiver
_multicastPort(multicastPort),
_logger(LogFactory::getInstance()) {}
bool LpdMessageReceiver::init()
bool LpdMessageReceiver::init(const std::string& localAddr)
{
try {
_socket.reset(new SocketCore(SOCK_DGRAM));
// SocketCore::bind(port, flags) cannot be used here, because it
// is affected by --interface option.
_socket->bindWithFamily(_multicastPort, AF_INET);
_socket->joinMulticastGroup(_multicastAddress, _multicastPort);
_socket->bind(_multicastAddress, _multicastPort);
if(_logger->debug()) {
_logger->debug("Joining multicast group. %s:%u, localAddr=%s",
_multicastAddress.c_str(), _multicastPort,
localAddr.c_str());
}
_socket->joinMulticastGroup(_multicastAddress, _multicastPort, localAddr);
_socket->setNonBlockingMode();
_localAddress = localAddr;
_logger->info("Listening multicast group (%s:%u) packet",
_multicastAddress.c_str(), _multicastPort);
return true;

View File

@ -50,6 +50,7 @@ private:
SharedHandle<SocketCore> _socket;
std::string _multicastAddress;
uint16_t _multicastPort;
std::string _localAddress;
Logger* _logger;
public:
// Currently only IPv4 multicastAddresses are supported.
@ -57,7 +58,7 @@ public:
(const std::string& multicastAddress, uint16_t multicastPort);
// No throw.
bool init();
bool init(const std::string& localAddr);
// Receives LPD message and process it. Returns false if message is
// not available.
@ -67,6 +68,11 @@ public:
{
return _socket;
}
const std::string& getLocalAddress() const
{
return _localAddress;
}
};
} // namespace aria2

View File

@ -127,11 +127,6 @@ bool LpdReceiveMessageCommand::execute()
return false;
}
SharedHandle<SocketCore> LpdReceiveMessageCommand::getReceiverSocket() const
{
return _receiver->getSocket();
}
LpdReceiveMessageCommand*
LpdReceiveMessageCommand::getInstance
(DownloadEngine* e, const SharedHandle<LpdMessageReceiver>& receiver)

View File

@ -63,7 +63,10 @@ public:
virtual bool execute();
SharedHandle<SocketCore> getReceiverSocket() const;
SharedHandle<LpdMessageReceiver> getLpdMessageReceiver() const
{
return _receiver;
}
static LpdReceiveMessageCommand*
getInstance

View File

@ -978,6 +978,16 @@ OptionHandlers OptionHandlerFactory::createOptionHandlers()
op->addTag(TAG_BITTORRENT);
handlers.push_back(op);
}
{
SharedHandle<OptionHandler> op(new DefaultOptionHandler
(PREF_BT_LPD_INTERFACE,
TEXT_BT_LPD_INTERFACE,
NO_DEFAULT_VALUE,
"interface, IP address",
OptionHandler::REQ_ARG));
op->addTag(TAG_BITTORRENT);
handlers.push_back(op);
}
{
SharedHandle<OptionHandler> op(new NumberOptionHandler
(PREF_BT_MAX_OPEN_FILES,

View File

@ -206,6 +206,24 @@ std::string uitos(T value)
return str;
}
void SocketCore::create(int family, int protocol)
{
closeConnection();
sock_t fd = socket(family, _sockType, protocol);
if(fd == (sock_t) -1) {
throw DL_ABORT_EX
(StringFormat("Failed to create socket. Cause:%s", errorMsg()).str());
}
int sockopt = 1;
if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
(a2_sockopt_t) &sockopt, sizeof(sockopt)) < 0) {
CLOSE(fd);
throw DL_ABORT_EX
(StringFormat("Failed to create socket. Cause:%s", errorMsg()).str());
}
sockfd = fd;
}
static sock_t bindInternal(int family, int socktype, int protocol,
const struct sockaddr* addr, socklen_t addrlen)
{
@ -265,6 +283,19 @@ void SocketCore::bindWithFamily(uint16_t port, int family, int flags)
}
}
void SocketCore::bind(const std::string& addr, uint16_t port, int flags)
{
closeConnection();
std::string error;
sock_t fd =
bindTo(addr.c_str(), port, _protocolFamily, _sockType, flags, error);
if(fd == (sock_t)-1) {
throw DL_ABORT_EX(StringFormat(EX_SOCKET_BIND, error.c_str()).str());
} else {
sockfd = fd;
}
}
void SocketCore::bind(uint16_t port, int flags)
{
closeConnection();
@ -423,17 +454,52 @@ void SocketCore::setSockOpt
}
}
void SocketCore::setMulticastInterface(const std::string& localAddr)
{
in_addr addr;
if(localAddr.empty()) {
addr.s_addr = htonl(INADDR_ANY);
} else {
if(inet_aton(localAddr.c_str(), &addr) == 0) {
throw DL_ABORT_EX
(StringFormat("inet_aton failed for %s", localAddr.c_str()).str());
}
}
setSockOpt(IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr));
}
void SocketCore::setMulticastTtl(unsigned char ttl)
{
setSockOpt(IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl));
}
void SocketCore::joinMulticastGroup(const std::string& ipaddr, uint16_t port)
void SocketCore::setMulticastLoop(unsigned char loop)
{
setSockOpt(IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop));
}
void SocketCore::joinMulticastGroup
(const std::string& multicastAddr, uint16_t multicastPort,
const std::string& localAddr)
{
in_addr multiAddr;
if(inet_aton(multicastAddr.c_str(), &multiAddr) == 0) {
throw DL_ABORT_EX
(StringFormat("inet_aton failed for %s", multicastAddr.c_str()).str());
}
in_addr ifAddr;
if(localAddr.empty()) {
ifAddr.s_addr = htonl(INADDR_ANY);
} else {
if(inet_aton(localAddr.c_str(), &ifAddr) == 0) {
throw DL_ABORT_EX
(StringFormat("inet_aton failed for %s", localAddr.c_str()).str());
}
}
struct ip_mreq mreq;
memset(&mreq, 0, sizeof(mreq));
mreq.imr_multiaddr.s_addr = inet_addr(ipaddr.c_str());
mreq.imr_interface.s_addr = htonl(INADDR_ANY);
mreq.imr_multiaddr = multiAddr;
mreq.imr_interface = ifAddr;
setSockOpt(IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
}
@ -1203,16 +1269,15 @@ void getInterfaceAddress
(std::vector<std::pair<struct sockaddr_storage, socklen_t> >& ifAddrs,
const std::string& iface, int family)
{
if(LogFactory::getInstance()->debug()) {
LogFactory::getInstance()->debug("Finding interface %s", iface.c_str());
Logger* logger = LogFactory::getInstance();
if(logger->debug()) {
logger->debug("Finding interface %s", iface.c_str());
}
#ifdef HAVE_GETIFADDRS
// First find interface in interface addresses
struct ifaddrs* ifaddr = 0;
if(getifaddrs(&ifaddr) == -1) {
throw DL_ABORT_EX
(StringFormat(MSG_INTERFACE_NOT_FOUND,
iface.c_str(), errorMsg()).str());
logger->info(MSG_INTERFACE_NOT_FOUND, iface.c_str(), errorMsg());
} else {
auto_delete<struct ifaddrs*> ifaddrDeleter(ifaddr, freeifaddrs);
for(struct ifaddrs* ifa = ifaddr; ifa; ifa = ifa->ifa_next) {
@ -1252,9 +1317,8 @@ void getInterfaceAddress
s = callGetaddrinfo(&res, iface.c_str(), 0, family,
SOCK_STREAM, 0, 0);
if(s) {
throw DL_ABORT_EX
(StringFormat(MSG_INTERFACE_NOT_FOUND,
iface.c_str(), gai_strerror(s)).str());
logger->info(MSG_INTERFACE_NOT_FOUND,
iface.c_str(), gai_strerror(s));
} else {
WSAAPI_AUTO_DELETE<struct addrinfo*> resDeleter(res, freeaddrinfo);
struct addrinfo* rp;

View File

@ -146,12 +146,22 @@ public:
bool isOpen() const { return sockfd != (sock_t) -1; }
void setMulticastInterface(const std::string& localAddr);
void setMulticastTtl(unsigned char ttl);
void joinMulticastGroup(const std::string& ipaddr, uint16_t port);
void setMulticastLoop(unsigned char loop);
void joinMulticastGroup
(const std::string& multicastAddr, uint16_t multicastPort,
const std::string& localAddr);
void create(int family, int protocol = 0);
void bindWithFamily(uint16_t port, int family, int flags = AI_PASSIVE);
void bind(const std::string& addr, uint16_t port, int flags = AI_PASSIVE);
/**
* Creates a socket and bind it with locahost's address and port.
* flags is set to struct addrinfo's ai_flags.
@ -389,7 +399,7 @@ int callGetaddrinfo
// Collects IP addresses of given inteface iface and stores in
// ifAddres. iface may be specified as a hostname, IP address or
// interface name like eth0. You can limit the family of IP addresses
// to collect using family argument.
// to collect using family argument. No throw.
void getInterfaceAddress
(std::vector<std::pair<struct sockaddr_storage, socklen_t> >& ifAddrs,
const std::string& iface, int family = AF_UNSPEC);

View File

@ -318,6 +318,8 @@ const std::string PREF_BT_SAVE_METADATA("bt-save-metadata");
const std::string PREF_BT_METADATA_ONLY("bt-metadata-only");
// values: true | false
const std::string PREF_BT_ENABLE_LPD("bt-enable-lpd");
// values: string
const std::string PREF_BT_LPD_INTERFACE("bt-lpd-interface");
/**
* Metalink related preferences

View File

@ -322,6 +322,8 @@ extern const std::string PREF_BT_SAVE_METADATA;
extern const std::string PREF_BT_METADATA_ONLY;
// values: true | false
extern const std::string PREF_BT_ENABLE_LPD;
// values: string
extern const std::string PREF_BT_LPD_INTERFACE;
/**
* Metalink related preferences

View File

@ -610,3 +610,8 @@
" (e.g., 1.2Ki, 3.4Mi) in the console readout.")
#define TEXT_BT_ENABLE_LPD \
_(" --bt-enable-lpd[=true|false] Enable Local Peer Discovery.")
#define TEXT_BT_LPD_INTERFACE \
_(" --bt-lpd-interface=INTERFACE Use given interface for Local Peer Discovery. If\n" \
" this option is not specified, the default\n" \
" interface is chosen. You can specify interface\n" \
" name and IP address.")

View File

@ -43,12 +43,12 @@ void LpdMessageDispatcherTest::testCreateLpdRequest()
void LpdMessageDispatcherTest::testSendMessage()
{
SharedHandle<SocketCore> recvsock(new SocketCore(SOCK_DGRAM));
recvsock->bind(LPD_MULTICAST_PORT);
recvsock->joinMulticastGroup(LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT);
recvsock->setMulticastTtl(0);
recvsock->bind(LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT);
recvsock->joinMulticastGroup(LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT, "");
LpdMessageDispatcher d("cd41c7fdddfd034a15a04d7ff881216e01c4ceaf", 6000,
LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT,
recvsock);
LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT);
d.init("", 0, 1);
CPPUNIT_ASSERT(d.sendMessage());

View File

@ -29,9 +29,10 @@ CPPUNIT_TEST_SUITE_REGISTRATION(LpdMessageReceiverTest);
void LpdMessageReceiverTest::testReceiveMessage()
{
LpdMessageReceiver rcv(LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT);
CPPUNIT_ASSERT(rcv.init());
CPPUNIT_ASSERT(rcv.init(""));
SharedHandle<SocketCore> sendsock = rcv.getSocket();
SharedHandle<SocketCore> sendsock(new SocketCore(SOCK_DGRAM));
sendsock->create(AF_INET);
sendsock->setMulticastTtl(0);
std::string infoHashString = "cd41c7fdddfd034a15a04d7ff881216e01c4ceaf";
@ -42,7 +43,7 @@ void LpdMessageReceiverTest::testReceiveMessage()
6000);
sendsock->writeData(request.c_str(), request.size(),
LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT);
LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT);
SharedHandle<LpdMessage> msg = rcv.receiveMessage();
CPPUNIT_ASSERT(!msg.isNull());