From 6ae5882f3cb644369382583f0cea8d6de6dd3da0 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Mon, 22 Feb 2010 15:58:05 +0000 Subject: [PATCH] 2010-02-23 Tatsuhiro Tsujikawa Added --bt-lpd-interface option to specify the interface to use for Local Peer Discovery. LpdMessageDispatcher object now has its own socket. LpdMessageReceiver's socket is binded to multicast address to only receive multicast packets. * src/BtSetup.cc * src/LpdMessageDispatcher.cc * src/LpdMessageDispatcher.h * src/LpdMessageReceiver.cc * src/LpdMessageReceiver.h * src/LpdReceiveMessageCommand.cc * src/LpdReceiveMessageCommand.h * src/OptionHandlerFactory.cc * src/SocketCore.cc * src/SocketCore.h * src/prefs.cc * src/prefs.h * src/usage_text.h * test/LpdMessageDispatcherTest.cc * test/LpdMessageReceiverTest.cc --- ChangeLog | 22 ++++++++ src/BtSetup.cc | 56 ++++++++++++++++----- src/LpdMessageDispatcher.cc | 29 ++++++++++- src/LpdMessageDispatcher.h | 6 ++- src/LpdMessageReceiver.cc | 14 ++++-- src/LpdMessageReceiver.h | 8 ++- src/LpdReceiveMessageCommand.cc | 5 -- src/LpdReceiveMessageCommand.h | 5 +- src/OptionHandlerFactory.cc | 10 ++++ src/SocketCore.cc | 86 ++++++++++++++++++++++++++++---- src/SocketCore.h | 14 +++++- src/prefs.cc | 2 + src/prefs.h | 2 + src/usage_text.h | 5 ++ test/LpdMessageDispatcherTest.cc | 10 ++-- test/LpdMessageReceiverTest.cc | 7 +-- 16 files changed, 231 insertions(+), 50 deletions(-) diff --git a/ChangeLog b/ChangeLog index aa2a6cef..69520b48 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,25 @@ +2010-02-23 Tatsuhiro Tsujikawa + + Added --bt-lpd-interface option to specify the interface to use + for Local Peer Discovery. LpdMessageDispatcher object now has its + own socket. LpdMessageReceiver's socket is binded to multicast + address to only receive multicast packets. + * src/BtSetup.cc + * src/LpdMessageDispatcher.cc + * src/LpdMessageDispatcher.h + * src/LpdMessageReceiver.cc + * src/LpdMessageReceiver.h + * src/LpdReceiveMessageCommand.cc + * src/LpdReceiveMessageCommand.h + * src/OptionHandlerFactory.cc + * src/SocketCore.cc + * src/SocketCore.h + * src/prefs.cc + * src/prefs.h + * src/usage_text.h + * test/LpdMessageDispatcherTest.cc + * test/LpdMessageReceiverTest.cc + 2010-02-22 Tatsuhiro Tsujikawa Put the portion of code to get interface addresses in diff --git a/src/BtSetup.cc b/src/BtSetup.cc index 756f745c..d94148bc 100644 --- a/src/BtSetup.cc +++ b/src/BtSetup.cc @@ -33,6 +33,9 @@ */ /* copyright --> */ #include "BtSetup.h" + +#include + #include "RequestGroup.h" #include "DownloadEngine.h" #include "Option.h" @@ -172,35 +175,62 @@ void BtSetup::setup(std::deque& commands, } if(option->getAsBool(PREF_BT_ENABLE_LPD) && (metadataGetMode || torrentAttrs[bittorrent::PRIVATE].i() == 0)) { + if(LpdReceiveMessageCommand::getNumInstance() == 0) { + _logger->info("Initializing LpdMessageReceiver."); SharedHandle receiver (new LpdMessageReceiver(LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT)); - try { - receiver->init(); - receiver->getSocket()->setMulticastTtl(1); - } catch(RecoverableException& e) { - _logger->info(EX_EXCEPTION_CAUGHT, e); - receiver.reset(); + bool initialized = false; + const std::string& lpdInterface = option->get(PREF_BT_LPD_INTERFACE); + if(lpdInterface.empty()) { + if(receiver->init("")) { + initialized = true; + } + } else { + std::vector > ifAddrs; + getInterfaceAddress(ifAddrs, lpdInterface, AF_INET); + for(std::vector >::const_iterator + i = ifAddrs.begin(); i != ifAddrs.end(); ++i) { + sockaddr_in addr; + memcpy(&addr, &(*i).first, (*i).second); + if(receiver->init(inet_ntoa(addr.sin_addr))) { + initialized = true; + break; + } + } } - if(!receiver.isNull()) { + if(initialized) { + _logger->info("LpdMessageReceiver initialized. multicastAddr=%s:%u," + " localAddr=%s", + LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT, + receiver->getLocalAddress().c_str()); LpdReceiveMessageCommand* cmd = LpdReceiveMessageCommand::getInstance(e, receiver); e->commands.push_back(cmd); + } else { + _logger->info("LpdMessageReceiver not initialized."); } } if(LpdReceiveMessageCommand::getNumInstance()) { const unsigned char* infoHash = bittorrent::getInfoHash(requestGroup->getDownloadContext()); + SharedHandle receiver = + LpdReceiveMessageCommand::getInstance()->getLpdMessageReceiver(); + _logger->info("Initializing LpdMessageDispatcher."); SharedHandle dispatcher (new LpdMessageDispatcher (std::string(&infoHash[0], &infoHash[INFO_HASH_LENGTH]), btRuntime->getListenPort(), - LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT, - LpdReceiveMessageCommand::getInstance()->getReceiverSocket())); - LpdDispatchMessageCommand* cmd = - new LpdDispatchMessageCommand(e->newCUID(), dispatcher, e); - cmd->setBtRuntime(btRuntime); - e->commands.push_back(cmd); + LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT)); + if(dispatcher->init(receiver->getLocalAddress(), /*ttl*/1, /*loop*/0)) { + _logger->info("LpdMessageDispatcher initialized."); + LpdDispatchMessageCommand* cmd = + new LpdDispatchMessageCommand(e->newCUID(), dispatcher, e); + cmd->setBtRuntime(btRuntime); + e->commands.push_back(cmd); + } else { + _logger->info("LpdMessageDispatcher not initialized."); + } } } time_t btStopTimeout = option->getAsInt(PREF_BT_STOP_TIMEOUT); diff --git a/src/LpdMessageDispatcher.cc b/src/LpdMessageDispatcher.cc index 3529c524..e51516a0 100644 --- a/src/LpdMessageDispatcher.cc +++ b/src/LpdMessageDispatcher.cc @@ -44,11 +44,9 @@ namespace aria2 { LpdMessageDispatcher::LpdMessageDispatcher (const std::string& infoHash, uint16_t port, const std::string& multicastAddress, uint16_t multicastPort, - const SharedHandle& socket, time_t interval): _infoHash(infoHash), _port(port), - _socket(socket), _multicastAddress(multicastAddress), _multicastPort(multicastPort), _timer(0), @@ -57,6 +55,33 @@ LpdMessageDispatcher::LpdMessageDispatcher _infoHash, _port)), _logger(LogFactory::getInstance()) {} +bool LpdMessageDispatcher::init(const std::string& localAddr, + unsigned char ttl, unsigned char loop) +{ + try { + _socket.reset(new SocketCore(SOCK_DGRAM)); + _socket->create(AF_INET); + if(_logger->debug()) { + _logger->debug("Setting multicast outgoing interface=%s", + localAddr.c_str()); + } + _socket->setMulticastInterface(localAddr); + if(_logger->debug()) { + _logger->debug("Setting multicast ttl=%u",static_cast(ttl)); + } + _socket->setMulticastTtl(ttl); + if(_logger->debug()) { + _logger->debug("Setting multicast loop=%u", + static_cast(loop)); + } + _socket->setMulticastLoop(loop); + return true; + } catch(RecoverableException& e) { + _logger->error("Failed to initialize LpdMessageDispatcher.", e); + } + return false; +} + bool LpdMessageDispatcher::sendMessage() { return diff --git a/src/LpdMessageDispatcher.h b/src/LpdMessageDispatcher.h index e09c0fa4..bb6bad52 100644 --- a/src/LpdMessageDispatcher.h +++ b/src/LpdMessageDispatcher.h @@ -47,9 +47,9 @@ class Logger; class LpdMessageDispatcher { private: + SharedHandle _socket; std::string _infoHash; uint16_t _port; - SharedHandle _socket; std::string _multicastAddress; uint16_t _multicastPort; Time _timer; @@ -60,9 +60,11 @@ public: LpdMessageDispatcher (const std::string& infoHash, uint16_t port, const std::string& multicastAddr, uint16_t multicastPort, - const SharedHandle& socket, time_t interval = 5*60); + // No throw + bool init(const std::string& localAddr, unsigned char ttl,unsigned char loop); + // Returns true if _timer reached announce interval, which is by // default 5mins. bool isAnnounceReady() const; diff --git a/src/LpdMessageReceiver.cc b/src/LpdMessageReceiver.cc index 3d9567f2..e5bcd65b 100644 --- a/src/LpdMessageReceiver.cc +++ b/src/LpdMessageReceiver.cc @@ -48,15 +48,19 @@ LpdMessageReceiver::LpdMessageReceiver _multicastPort(multicastPort), _logger(LogFactory::getInstance()) {} -bool LpdMessageReceiver::init() +bool LpdMessageReceiver::init(const std::string& localAddr) { try { _socket.reset(new SocketCore(SOCK_DGRAM)); - // SocketCore::bind(port, flags) cannot be used here, because it - // is affected by --interface option. - _socket->bindWithFamily(_multicastPort, AF_INET); - _socket->joinMulticastGroup(_multicastAddress, _multicastPort); + _socket->bind(_multicastAddress, _multicastPort); + if(_logger->debug()) { + _logger->debug("Joining multicast group. %s:%u, localAddr=%s", + _multicastAddress.c_str(), _multicastPort, + localAddr.c_str()); + } + _socket->joinMulticastGroup(_multicastAddress, _multicastPort, localAddr); _socket->setNonBlockingMode(); + _localAddress = localAddr; _logger->info("Listening multicast group (%s:%u) packet", _multicastAddress.c_str(), _multicastPort); return true; diff --git a/src/LpdMessageReceiver.h b/src/LpdMessageReceiver.h index 16d1f895..37baa0eb 100644 --- a/src/LpdMessageReceiver.h +++ b/src/LpdMessageReceiver.h @@ -50,6 +50,7 @@ private: SharedHandle _socket; std::string _multicastAddress; uint16_t _multicastPort; + std::string _localAddress; Logger* _logger; public: // Currently only IPv4 multicastAddresses are supported. @@ -57,7 +58,7 @@ public: (const std::string& multicastAddress, uint16_t multicastPort); // No throw. - bool init(); + bool init(const std::string& localAddr); // Receives LPD message and process it. Returns false if message is // not available. @@ -67,6 +68,11 @@ public: { return _socket; } + + const std::string& getLocalAddress() const + { + return _localAddress; + } }; } // namespace aria2 diff --git a/src/LpdReceiveMessageCommand.cc b/src/LpdReceiveMessageCommand.cc index ba169b8f..29a6b18e 100644 --- a/src/LpdReceiveMessageCommand.cc +++ b/src/LpdReceiveMessageCommand.cc @@ -127,11 +127,6 @@ bool LpdReceiveMessageCommand::execute() return false; } -SharedHandle LpdReceiveMessageCommand::getReceiverSocket() const -{ - return _receiver->getSocket(); -} - LpdReceiveMessageCommand* LpdReceiveMessageCommand::getInstance (DownloadEngine* e, const SharedHandle& receiver) diff --git a/src/LpdReceiveMessageCommand.h b/src/LpdReceiveMessageCommand.h index 251f0cfb..5c2706d3 100644 --- a/src/LpdReceiveMessageCommand.h +++ b/src/LpdReceiveMessageCommand.h @@ -63,7 +63,10 @@ public: virtual bool execute(); - SharedHandle getReceiverSocket() const; + SharedHandle getLpdMessageReceiver() const + { + return _receiver; + } static LpdReceiveMessageCommand* getInstance diff --git a/src/OptionHandlerFactory.cc b/src/OptionHandlerFactory.cc index 764944a1..e4e02647 100644 --- a/src/OptionHandlerFactory.cc +++ b/src/OptionHandlerFactory.cc @@ -978,6 +978,16 @@ OptionHandlers OptionHandlerFactory::createOptionHandlers() op->addTag(TAG_BITTORRENT); handlers.push_back(op); } + { + SharedHandle op(new DefaultOptionHandler + (PREF_BT_LPD_INTERFACE, + TEXT_BT_LPD_INTERFACE, + NO_DEFAULT_VALUE, + "interface, IP address", + OptionHandler::REQ_ARG)); + op->addTag(TAG_BITTORRENT); + handlers.push_back(op); + } { SharedHandle op(new NumberOptionHandler (PREF_BT_MAX_OPEN_FILES, diff --git a/src/SocketCore.cc b/src/SocketCore.cc index 26a39b79..68cfd33f 100644 --- a/src/SocketCore.cc +++ b/src/SocketCore.cc @@ -206,6 +206,24 @@ std::string uitos(T value) return str; } +void SocketCore::create(int family, int protocol) +{ + closeConnection(); + sock_t fd = socket(family, _sockType, protocol); + if(fd == (sock_t) -1) { + throw DL_ABORT_EX + (StringFormat("Failed to create socket. Cause:%s", errorMsg()).str()); + } + int sockopt = 1; + if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, + (a2_sockopt_t) &sockopt, sizeof(sockopt)) < 0) { + CLOSE(fd); + throw DL_ABORT_EX + (StringFormat("Failed to create socket. Cause:%s", errorMsg()).str()); + } + sockfd = fd; +} + static sock_t bindInternal(int family, int socktype, int protocol, const struct sockaddr* addr, socklen_t addrlen) { @@ -265,6 +283,19 @@ void SocketCore::bindWithFamily(uint16_t port, int family, int flags) } } +void SocketCore::bind(const std::string& addr, uint16_t port, int flags) +{ + closeConnection(); + std::string error; + sock_t fd = + bindTo(addr.c_str(), port, _protocolFamily, _sockType, flags, error); + if(fd == (sock_t)-1) { + throw DL_ABORT_EX(StringFormat(EX_SOCKET_BIND, error.c_str()).str()); + } else { + sockfd = fd; + } +} + void SocketCore::bind(uint16_t port, int flags) { closeConnection(); @@ -423,17 +454,52 @@ void SocketCore::setSockOpt } } +void SocketCore::setMulticastInterface(const std::string& localAddr) +{ + in_addr addr; + if(localAddr.empty()) { + addr.s_addr = htonl(INADDR_ANY); + } else { + if(inet_aton(localAddr.c_str(), &addr) == 0) { + throw DL_ABORT_EX + (StringFormat("inet_aton failed for %s", localAddr.c_str()).str()); + } + } + setSockOpt(IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr)); +} + void SocketCore::setMulticastTtl(unsigned char ttl) { setSockOpt(IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)); } -void SocketCore::joinMulticastGroup(const std::string& ipaddr, uint16_t port) +void SocketCore::setMulticastLoop(unsigned char loop) { + setSockOpt(IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop)); +} + +void SocketCore::joinMulticastGroup +(const std::string& multicastAddr, uint16_t multicastPort, + const std::string& localAddr) +{ + in_addr multiAddr; + if(inet_aton(multicastAddr.c_str(), &multiAddr) == 0) { + throw DL_ABORT_EX + (StringFormat("inet_aton failed for %s", multicastAddr.c_str()).str()); + } + in_addr ifAddr; + if(localAddr.empty()) { + ifAddr.s_addr = htonl(INADDR_ANY); + } else { + if(inet_aton(localAddr.c_str(), &ifAddr) == 0) { + throw DL_ABORT_EX + (StringFormat("inet_aton failed for %s", localAddr.c_str()).str()); + } + } struct ip_mreq mreq; memset(&mreq, 0, sizeof(mreq)); - mreq.imr_multiaddr.s_addr = inet_addr(ipaddr.c_str()); - mreq.imr_interface.s_addr = htonl(INADDR_ANY); + mreq.imr_multiaddr = multiAddr; + mreq.imr_interface = ifAddr; setSockOpt(IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); } @@ -1203,16 +1269,15 @@ void getInterfaceAddress (std::vector >& ifAddrs, const std::string& iface, int family) { - if(LogFactory::getInstance()->debug()) { - LogFactory::getInstance()->debug("Finding interface %s", iface.c_str()); + Logger* logger = LogFactory::getInstance(); + if(logger->debug()) { + logger->debug("Finding interface %s", iface.c_str()); } #ifdef HAVE_GETIFADDRS // First find interface in interface addresses struct ifaddrs* ifaddr = 0; if(getifaddrs(&ifaddr) == -1) { - throw DL_ABORT_EX - (StringFormat(MSG_INTERFACE_NOT_FOUND, - iface.c_str(), errorMsg()).str()); + logger->info(MSG_INTERFACE_NOT_FOUND, iface.c_str(), errorMsg()); } else { auto_delete ifaddrDeleter(ifaddr, freeifaddrs); for(struct ifaddrs* ifa = ifaddr; ifa; ifa = ifa->ifa_next) { @@ -1252,9 +1317,8 @@ void getInterfaceAddress s = callGetaddrinfo(&res, iface.c_str(), 0, family, SOCK_STREAM, 0, 0); if(s) { - throw DL_ABORT_EX - (StringFormat(MSG_INTERFACE_NOT_FOUND, - iface.c_str(), gai_strerror(s)).str()); + logger->info(MSG_INTERFACE_NOT_FOUND, + iface.c_str(), gai_strerror(s)); } else { WSAAPI_AUTO_DELETE resDeleter(res, freeaddrinfo); struct addrinfo* rp; diff --git a/src/SocketCore.h b/src/SocketCore.h index a07e264e..b1949cb4 100644 --- a/src/SocketCore.h +++ b/src/SocketCore.h @@ -146,12 +146,22 @@ public: bool isOpen() const { return sockfd != (sock_t) -1; } + void setMulticastInterface(const std::string& localAddr); + void setMulticastTtl(unsigned char ttl); - void joinMulticastGroup(const std::string& ipaddr, uint16_t port); + void setMulticastLoop(unsigned char loop); + + void joinMulticastGroup + (const std::string& multicastAddr, uint16_t multicastPort, + const std::string& localAddr); + void create(int family, int protocol = 0); + void bindWithFamily(uint16_t port, int family, int flags = AI_PASSIVE); + void bind(const std::string& addr, uint16_t port, int flags = AI_PASSIVE); + /** * Creates a socket and bind it with locahost's address and port. * flags is set to struct addrinfo's ai_flags. @@ -389,7 +399,7 @@ int callGetaddrinfo // Collects IP addresses of given inteface iface and stores in // ifAddres. iface may be specified as a hostname, IP address or // interface name like eth0. You can limit the family of IP addresses -// to collect using family argument. +// to collect using family argument. No throw. void getInterfaceAddress (std::vector >& ifAddrs, const std::string& iface, int family = AF_UNSPEC); diff --git a/src/prefs.cc b/src/prefs.cc index fa451eb2..d7caf736 100644 --- a/src/prefs.cc +++ b/src/prefs.cc @@ -318,6 +318,8 @@ const std::string PREF_BT_SAVE_METADATA("bt-save-metadata"); const std::string PREF_BT_METADATA_ONLY("bt-metadata-only"); // values: true | false const std::string PREF_BT_ENABLE_LPD("bt-enable-lpd"); +// values: string +const std::string PREF_BT_LPD_INTERFACE("bt-lpd-interface"); /** * Metalink related preferences diff --git a/src/prefs.h b/src/prefs.h index d99124d2..c8bf6e69 100644 --- a/src/prefs.h +++ b/src/prefs.h @@ -322,6 +322,8 @@ extern const std::string PREF_BT_SAVE_METADATA; extern const std::string PREF_BT_METADATA_ONLY; // values: true | false extern const std::string PREF_BT_ENABLE_LPD; +// values: string +extern const std::string PREF_BT_LPD_INTERFACE; /** * Metalink related preferences diff --git a/src/usage_text.h b/src/usage_text.h index 605d8afc..e51e6e8e 100644 --- a/src/usage_text.h +++ b/src/usage_text.h @@ -610,3 +610,8 @@ " (e.g., 1.2Ki, 3.4Mi) in the console readout.") #define TEXT_BT_ENABLE_LPD \ _(" --bt-enable-lpd[=true|false] Enable Local Peer Discovery.") +#define TEXT_BT_LPD_INTERFACE \ + _(" --bt-lpd-interface=INTERFACE Use given interface for Local Peer Discovery. If\n" \ + " this option is not specified, the default\n" \ + " interface is chosen. You can specify interface\n" \ + " name and IP address.") diff --git a/test/LpdMessageDispatcherTest.cc b/test/LpdMessageDispatcherTest.cc index 79272996..d712ef72 100644 --- a/test/LpdMessageDispatcherTest.cc +++ b/test/LpdMessageDispatcherTest.cc @@ -43,12 +43,12 @@ void LpdMessageDispatcherTest::testCreateLpdRequest() void LpdMessageDispatcherTest::testSendMessage() { SharedHandle recvsock(new SocketCore(SOCK_DGRAM)); - recvsock->bind(LPD_MULTICAST_PORT); - recvsock->joinMulticastGroup(LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT); - recvsock->setMulticastTtl(0); + recvsock->bind(LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT); + recvsock->joinMulticastGroup(LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT, ""); + LpdMessageDispatcher d("cd41c7fdddfd034a15a04d7ff881216e01c4ceaf", 6000, - LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT, - recvsock); + LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT); + d.init("", 0, 1); CPPUNIT_ASSERT(d.sendMessage()); diff --git a/test/LpdMessageReceiverTest.cc b/test/LpdMessageReceiverTest.cc index a0c412a7..ae04e1e4 100644 --- a/test/LpdMessageReceiverTest.cc +++ b/test/LpdMessageReceiverTest.cc @@ -29,9 +29,10 @@ CPPUNIT_TEST_SUITE_REGISTRATION(LpdMessageReceiverTest); void LpdMessageReceiverTest::testReceiveMessage() { LpdMessageReceiver rcv(LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT); - CPPUNIT_ASSERT(rcv.init()); + CPPUNIT_ASSERT(rcv.init("")); - SharedHandle sendsock = rcv.getSocket(); + SharedHandle sendsock(new SocketCore(SOCK_DGRAM)); + sendsock->create(AF_INET); sendsock->setMulticastTtl(0); std::string infoHashString = "cd41c7fdddfd034a15a04d7ff881216e01c4ceaf"; @@ -42,7 +43,7 @@ void LpdMessageReceiverTest::testReceiveMessage() 6000); sendsock->writeData(request.c_str(), request.size(), - LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT); + LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT); SharedHandle msg = rcv.receiveMessage(); CPPUNIT_ASSERT(!msg.isNull());