diff --git a/ChangeLog b/ChangeLog index aa2a6cef..69520b48 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,25 @@ +2010-02-23 Tatsuhiro Tsujikawa + + Added --bt-lpd-interface option to specify the interface to use + for Local Peer Discovery. LpdMessageDispatcher object now has its + own socket. LpdMessageReceiver's socket is binded to multicast + address to only receive multicast packets. + * src/BtSetup.cc + * src/LpdMessageDispatcher.cc + * src/LpdMessageDispatcher.h + * src/LpdMessageReceiver.cc + * src/LpdMessageReceiver.h + * src/LpdReceiveMessageCommand.cc + * src/LpdReceiveMessageCommand.h + * src/OptionHandlerFactory.cc + * src/SocketCore.cc + * src/SocketCore.h + * src/prefs.cc + * src/prefs.h + * src/usage_text.h + * test/LpdMessageDispatcherTest.cc + * test/LpdMessageReceiverTest.cc + 2010-02-22 Tatsuhiro Tsujikawa Put the portion of code to get interface addresses in diff --git a/src/BtSetup.cc b/src/BtSetup.cc index 756f745c..d94148bc 100644 --- a/src/BtSetup.cc +++ b/src/BtSetup.cc @@ -33,6 +33,9 @@ */ /* copyright --> */ #include "BtSetup.h" + +#include + #include "RequestGroup.h" #include "DownloadEngine.h" #include "Option.h" @@ -172,35 +175,62 @@ void BtSetup::setup(std::deque& commands, } if(option->getAsBool(PREF_BT_ENABLE_LPD) && (metadataGetMode || torrentAttrs[bittorrent::PRIVATE].i() == 0)) { + if(LpdReceiveMessageCommand::getNumInstance() == 0) { + _logger->info("Initializing LpdMessageReceiver."); SharedHandle receiver (new LpdMessageReceiver(LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT)); - try { - receiver->init(); - receiver->getSocket()->setMulticastTtl(1); - } catch(RecoverableException& e) { - _logger->info(EX_EXCEPTION_CAUGHT, e); - receiver.reset(); + bool initialized = false; + const std::string& lpdInterface = option->get(PREF_BT_LPD_INTERFACE); + if(lpdInterface.empty()) { + if(receiver->init("")) { + initialized = true; + } + } else { + std::vector > ifAddrs; + getInterfaceAddress(ifAddrs, lpdInterface, AF_INET); + for(std::vector >::const_iterator + i = ifAddrs.begin(); i != ifAddrs.end(); ++i) { + sockaddr_in addr; + memcpy(&addr, &(*i).first, (*i).second); + if(receiver->init(inet_ntoa(addr.sin_addr))) { + initialized = true; + break; + } + } } - if(!receiver.isNull()) { + if(initialized) { + _logger->info("LpdMessageReceiver initialized. multicastAddr=%s:%u," + " localAddr=%s", + LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT, + receiver->getLocalAddress().c_str()); LpdReceiveMessageCommand* cmd = LpdReceiveMessageCommand::getInstance(e, receiver); e->commands.push_back(cmd); + } else { + _logger->info("LpdMessageReceiver not initialized."); } } if(LpdReceiveMessageCommand::getNumInstance()) { const unsigned char* infoHash = bittorrent::getInfoHash(requestGroup->getDownloadContext()); + SharedHandle receiver = + LpdReceiveMessageCommand::getInstance()->getLpdMessageReceiver(); + _logger->info("Initializing LpdMessageDispatcher."); SharedHandle dispatcher (new LpdMessageDispatcher (std::string(&infoHash[0], &infoHash[INFO_HASH_LENGTH]), btRuntime->getListenPort(), - LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT, - LpdReceiveMessageCommand::getInstance()->getReceiverSocket())); - LpdDispatchMessageCommand* cmd = - new LpdDispatchMessageCommand(e->newCUID(), dispatcher, e); - cmd->setBtRuntime(btRuntime); - e->commands.push_back(cmd); + LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT)); + if(dispatcher->init(receiver->getLocalAddress(), /*ttl*/1, /*loop*/0)) { + _logger->info("LpdMessageDispatcher initialized."); + LpdDispatchMessageCommand* cmd = + new LpdDispatchMessageCommand(e->newCUID(), dispatcher, e); + cmd->setBtRuntime(btRuntime); + e->commands.push_back(cmd); + } else { + _logger->info("LpdMessageDispatcher not initialized."); + } } } time_t btStopTimeout = option->getAsInt(PREF_BT_STOP_TIMEOUT); diff --git a/src/LpdMessageDispatcher.cc b/src/LpdMessageDispatcher.cc index 3529c524..e51516a0 100644 --- a/src/LpdMessageDispatcher.cc +++ b/src/LpdMessageDispatcher.cc @@ -44,11 +44,9 @@ namespace aria2 { LpdMessageDispatcher::LpdMessageDispatcher (const std::string& infoHash, uint16_t port, const std::string& multicastAddress, uint16_t multicastPort, - const SharedHandle& socket, time_t interval): _infoHash(infoHash), _port(port), - _socket(socket), _multicastAddress(multicastAddress), _multicastPort(multicastPort), _timer(0), @@ -57,6 +55,33 @@ LpdMessageDispatcher::LpdMessageDispatcher _infoHash, _port)), _logger(LogFactory::getInstance()) {} +bool LpdMessageDispatcher::init(const std::string& localAddr, + unsigned char ttl, unsigned char loop) +{ + try { + _socket.reset(new SocketCore(SOCK_DGRAM)); + _socket->create(AF_INET); + if(_logger->debug()) { + _logger->debug("Setting multicast outgoing interface=%s", + localAddr.c_str()); + } + _socket->setMulticastInterface(localAddr); + if(_logger->debug()) { + _logger->debug("Setting multicast ttl=%u",static_cast(ttl)); + } + _socket->setMulticastTtl(ttl); + if(_logger->debug()) { + _logger->debug("Setting multicast loop=%u", + static_cast(loop)); + } + _socket->setMulticastLoop(loop); + return true; + } catch(RecoverableException& e) { + _logger->error("Failed to initialize LpdMessageDispatcher.", e); + } + return false; +} + bool LpdMessageDispatcher::sendMessage() { return diff --git a/src/LpdMessageDispatcher.h b/src/LpdMessageDispatcher.h index e09c0fa4..bb6bad52 100644 --- a/src/LpdMessageDispatcher.h +++ b/src/LpdMessageDispatcher.h @@ -47,9 +47,9 @@ class Logger; class LpdMessageDispatcher { private: + SharedHandle _socket; std::string _infoHash; uint16_t _port; - SharedHandle _socket; std::string _multicastAddress; uint16_t _multicastPort; Time _timer; @@ -60,9 +60,11 @@ public: LpdMessageDispatcher (const std::string& infoHash, uint16_t port, const std::string& multicastAddr, uint16_t multicastPort, - const SharedHandle& socket, time_t interval = 5*60); + // No throw + bool init(const std::string& localAddr, unsigned char ttl,unsigned char loop); + // Returns true if _timer reached announce interval, which is by // default 5mins. bool isAnnounceReady() const; diff --git a/src/LpdMessageReceiver.cc b/src/LpdMessageReceiver.cc index 3d9567f2..e5bcd65b 100644 --- a/src/LpdMessageReceiver.cc +++ b/src/LpdMessageReceiver.cc @@ -48,15 +48,19 @@ LpdMessageReceiver::LpdMessageReceiver _multicastPort(multicastPort), _logger(LogFactory::getInstance()) {} -bool LpdMessageReceiver::init() +bool LpdMessageReceiver::init(const std::string& localAddr) { try { _socket.reset(new SocketCore(SOCK_DGRAM)); - // SocketCore::bind(port, flags) cannot be used here, because it - // is affected by --interface option. - _socket->bindWithFamily(_multicastPort, AF_INET); - _socket->joinMulticastGroup(_multicastAddress, _multicastPort); + _socket->bind(_multicastAddress, _multicastPort); + if(_logger->debug()) { + _logger->debug("Joining multicast group. %s:%u, localAddr=%s", + _multicastAddress.c_str(), _multicastPort, + localAddr.c_str()); + } + _socket->joinMulticastGroup(_multicastAddress, _multicastPort, localAddr); _socket->setNonBlockingMode(); + _localAddress = localAddr; _logger->info("Listening multicast group (%s:%u) packet", _multicastAddress.c_str(), _multicastPort); return true; diff --git a/src/LpdMessageReceiver.h b/src/LpdMessageReceiver.h index 16d1f895..37baa0eb 100644 --- a/src/LpdMessageReceiver.h +++ b/src/LpdMessageReceiver.h @@ -50,6 +50,7 @@ private: SharedHandle _socket; std::string _multicastAddress; uint16_t _multicastPort; + std::string _localAddress; Logger* _logger; public: // Currently only IPv4 multicastAddresses are supported. @@ -57,7 +58,7 @@ public: (const std::string& multicastAddress, uint16_t multicastPort); // No throw. - bool init(); + bool init(const std::string& localAddr); // Receives LPD message and process it. Returns false if message is // not available. @@ -67,6 +68,11 @@ public: { return _socket; } + + const std::string& getLocalAddress() const + { + return _localAddress; + } }; } // namespace aria2 diff --git a/src/LpdReceiveMessageCommand.cc b/src/LpdReceiveMessageCommand.cc index ba169b8f..29a6b18e 100644 --- a/src/LpdReceiveMessageCommand.cc +++ b/src/LpdReceiveMessageCommand.cc @@ -127,11 +127,6 @@ bool LpdReceiveMessageCommand::execute() return false; } -SharedHandle LpdReceiveMessageCommand::getReceiverSocket() const -{ - return _receiver->getSocket(); -} - LpdReceiveMessageCommand* LpdReceiveMessageCommand::getInstance (DownloadEngine* e, const SharedHandle& receiver) diff --git a/src/LpdReceiveMessageCommand.h b/src/LpdReceiveMessageCommand.h index 251f0cfb..5c2706d3 100644 --- a/src/LpdReceiveMessageCommand.h +++ b/src/LpdReceiveMessageCommand.h @@ -63,7 +63,10 @@ public: virtual bool execute(); - SharedHandle getReceiverSocket() const; + SharedHandle getLpdMessageReceiver() const + { + return _receiver; + } static LpdReceiveMessageCommand* getInstance diff --git a/src/OptionHandlerFactory.cc b/src/OptionHandlerFactory.cc index 764944a1..e4e02647 100644 --- a/src/OptionHandlerFactory.cc +++ b/src/OptionHandlerFactory.cc @@ -978,6 +978,16 @@ OptionHandlers OptionHandlerFactory::createOptionHandlers() op->addTag(TAG_BITTORRENT); handlers.push_back(op); } + { + SharedHandle op(new DefaultOptionHandler + (PREF_BT_LPD_INTERFACE, + TEXT_BT_LPD_INTERFACE, + NO_DEFAULT_VALUE, + "interface, IP address", + OptionHandler::REQ_ARG)); + op->addTag(TAG_BITTORRENT); + handlers.push_back(op); + } { SharedHandle op(new NumberOptionHandler (PREF_BT_MAX_OPEN_FILES, diff --git a/src/SocketCore.cc b/src/SocketCore.cc index 26a39b79..68cfd33f 100644 --- a/src/SocketCore.cc +++ b/src/SocketCore.cc @@ -206,6 +206,24 @@ std::string uitos(T value) return str; } +void SocketCore::create(int family, int protocol) +{ + closeConnection(); + sock_t fd = socket(family, _sockType, protocol); + if(fd == (sock_t) -1) { + throw DL_ABORT_EX + (StringFormat("Failed to create socket. Cause:%s", errorMsg()).str()); + } + int sockopt = 1; + if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, + (a2_sockopt_t) &sockopt, sizeof(sockopt)) < 0) { + CLOSE(fd); + throw DL_ABORT_EX + (StringFormat("Failed to create socket. Cause:%s", errorMsg()).str()); + } + sockfd = fd; +} + static sock_t bindInternal(int family, int socktype, int protocol, const struct sockaddr* addr, socklen_t addrlen) { @@ -265,6 +283,19 @@ void SocketCore::bindWithFamily(uint16_t port, int family, int flags) } } +void SocketCore::bind(const std::string& addr, uint16_t port, int flags) +{ + closeConnection(); + std::string error; + sock_t fd = + bindTo(addr.c_str(), port, _protocolFamily, _sockType, flags, error); + if(fd == (sock_t)-1) { + throw DL_ABORT_EX(StringFormat(EX_SOCKET_BIND, error.c_str()).str()); + } else { + sockfd = fd; + } +} + void SocketCore::bind(uint16_t port, int flags) { closeConnection(); @@ -423,17 +454,52 @@ void SocketCore::setSockOpt } } +void SocketCore::setMulticastInterface(const std::string& localAddr) +{ + in_addr addr; + if(localAddr.empty()) { + addr.s_addr = htonl(INADDR_ANY); + } else { + if(inet_aton(localAddr.c_str(), &addr) == 0) { + throw DL_ABORT_EX + (StringFormat("inet_aton failed for %s", localAddr.c_str()).str()); + } + } + setSockOpt(IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr)); +} + void SocketCore::setMulticastTtl(unsigned char ttl) { setSockOpt(IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)); } -void SocketCore::joinMulticastGroup(const std::string& ipaddr, uint16_t port) +void SocketCore::setMulticastLoop(unsigned char loop) { + setSockOpt(IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop)); +} + +void SocketCore::joinMulticastGroup +(const std::string& multicastAddr, uint16_t multicastPort, + const std::string& localAddr) +{ + in_addr multiAddr; + if(inet_aton(multicastAddr.c_str(), &multiAddr) == 0) { + throw DL_ABORT_EX + (StringFormat("inet_aton failed for %s", multicastAddr.c_str()).str()); + } + in_addr ifAddr; + if(localAddr.empty()) { + ifAddr.s_addr = htonl(INADDR_ANY); + } else { + if(inet_aton(localAddr.c_str(), &ifAddr) == 0) { + throw DL_ABORT_EX + (StringFormat("inet_aton failed for %s", localAddr.c_str()).str()); + } + } struct ip_mreq mreq; memset(&mreq, 0, sizeof(mreq)); - mreq.imr_multiaddr.s_addr = inet_addr(ipaddr.c_str()); - mreq.imr_interface.s_addr = htonl(INADDR_ANY); + mreq.imr_multiaddr = multiAddr; + mreq.imr_interface = ifAddr; setSockOpt(IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); } @@ -1203,16 +1269,15 @@ void getInterfaceAddress (std::vector >& ifAddrs, const std::string& iface, int family) { - if(LogFactory::getInstance()->debug()) { - LogFactory::getInstance()->debug("Finding interface %s", iface.c_str()); + Logger* logger = LogFactory::getInstance(); + if(logger->debug()) { + logger->debug("Finding interface %s", iface.c_str()); } #ifdef HAVE_GETIFADDRS // First find interface in interface addresses struct ifaddrs* ifaddr = 0; if(getifaddrs(&ifaddr) == -1) { - throw DL_ABORT_EX - (StringFormat(MSG_INTERFACE_NOT_FOUND, - iface.c_str(), errorMsg()).str()); + logger->info(MSG_INTERFACE_NOT_FOUND, iface.c_str(), errorMsg()); } else { auto_delete ifaddrDeleter(ifaddr, freeifaddrs); for(struct ifaddrs* ifa = ifaddr; ifa; ifa = ifa->ifa_next) { @@ -1252,9 +1317,8 @@ void getInterfaceAddress s = callGetaddrinfo(&res, iface.c_str(), 0, family, SOCK_STREAM, 0, 0); if(s) { - throw DL_ABORT_EX - (StringFormat(MSG_INTERFACE_NOT_FOUND, - iface.c_str(), gai_strerror(s)).str()); + logger->info(MSG_INTERFACE_NOT_FOUND, + iface.c_str(), gai_strerror(s)); } else { WSAAPI_AUTO_DELETE resDeleter(res, freeaddrinfo); struct addrinfo* rp; diff --git a/src/SocketCore.h b/src/SocketCore.h index a07e264e..b1949cb4 100644 --- a/src/SocketCore.h +++ b/src/SocketCore.h @@ -146,12 +146,22 @@ public: bool isOpen() const { return sockfd != (sock_t) -1; } + void setMulticastInterface(const std::string& localAddr); + void setMulticastTtl(unsigned char ttl); - void joinMulticastGroup(const std::string& ipaddr, uint16_t port); + void setMulticastLoop(unsigned char loop); + + void joinMulticastGroup + (const std::string& multicastAddr, uint16_t multicastPort, + const std::string& localAddr); + void create(int family, int protocol = 0); + void bindWithFamily(uint16_t port, int family, int flags = AI_PASSIVE); + void bind(const std::string& addr, uint16_t port, int flags = AI_PASSIVE); + /** * Creates a socket and bind it with locahost's address and port. * flags is set to struct addrinfo's ai_flags. @@ -389,7 +399,7 @@ int callGetaddrinfo // Collects IP addresses of given inteface iface and stores in // ifAddres. iface may be specified as a hostname, IP address or // interface name like eth0. You can limit the family of IP addresses -// to collect using family argument. +// to collect using family argument. No throw. void getInterfaceAddress (std::vector >& ifAddrs, const std::string& iface, int family = AF_UNSPEC); diff --git a/src/prefs.cc b/src/prefs.cc index fa451eb2..d7caf736 100644 --- a/src/prefs.cc +++ b/src/prefs.cc @@ -318,6 +318,8 @@ const std::string PREF_BT_SAVE_METADATA("bt-save-metadata"); const std::string PREF_BT_METADATA_ONLY("bt-metadata-only"); // values: true | false const std::string PREF_BT_ENABLE_LPD("bt-enable-lpd"); +// values: string +const std::string PREF_BT_LPD_INTERFACE("bt-lpd-interface"); /** * Metalink related preferences diff --git a/src/prefs.h b/src/prefs.h index d99124d2..c8bf6e69 100644 --- a/src/prefs.h +++ b/src/prefs.h @@ -322,6 +322,8 @@ extern const std::string PREF_BT_SAVE_METADATA; extern const std::string PREF_BT_METADATA_ONLY; // values: true | false extern const std::string PREF_BT_ENABLE_LPD; +// values: string +extern const std::string PREF_BT_LPD_INTERFACE; /** * Metalink related preferences diff --git a/src/usage_text.h b/src/usage_text.h index 605d8afc..e51e6e8e 100644 --- a/src/usage_text.h +++ b/src/usage_text.h @@ -610,3 +610,8 @@ " (e.g., 1.2Ki, 3.4Mi) in the console readout.") #define TEXT_BT_ENABLE_LPD \ _(" --bt-enable-lpd[=true|false] Enable Local Peer Discovery.") +#define TEXT_BT_LPD_INTERFACE \ + _(" --bt-lpd-interface=INTERFACE Use given interface for Local Peer Discovery. If\n" \ + " this option is not specified, the default\n" \ + " interface is chosen. You can specify interface\n" \ + " name and IP address.") diff --git a/test/LpdMessageDispatcherTest.cc b/test/LpdMessageDispatcherTest.cc index 79272996..d712ef72 100644 --- a/test/LpdMessageDispatcherTest.cc +++ b/test/LpdMessageDispatcherTest.cc @@ -43,12 +43,12 @@ void LpdMessageDispatcherTest::testCreateLpdRequest() void LpdMessageDispatcherTest::testSendMessage() { SharedHandle recvsock(new SocketCore(SOCK_DGRAM)); - recvsock->bind(LPD_MULTICAST_PORT); - recvsock->joinMulticastGroup(LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT); - recvsock->setMulticastTtl(0); + recvsock->bind(LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT); + recvsock->joinMulticastGroup(LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT, ""); + LpdMessageDispatcher d("cd41c7fdddfd034a15a04d7ff881216e01c4ceaf", 6000, - LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT, - recvsock); + LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT); + d.init("", 0, 1); CPPUNIT_ASSERT(d.sendMessage()); diff --git a/test/LpdMessageReceiverTest.cc b/test/LpdMessageReceiverTest.cc index a0c412a7..ae04e1e4 100644 --- a/test/LpdMessageReceiverTest.cc +++ b/test/LpdMessageReceiverTest.cc @@ -29,9 +29,10 @@ CPPUNIT_TEST_SUITE_REGISTRATION(LpdMessageReceiverTest); void LpdMessageReceiverTest::testReceiveMessage() { LpdMessageReceiver rcv(LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT); - CPPUNIT_ASSERT(rcv.init()); + CPPUNIT_ASSERT(rcv.init("")); - SharedHandle sendsock = rcv.getSocket(); + SharedHandle sendsock(new SocketCore(SOCK_DGRAM)); + sendsock->create(AF_INET); sendsock->setMulticastTtl(0); std::string infoHashString = "cd41c7fdddfd034a15a04d7ff881216e01c4ceaf"; @@ -42,7 +43,7 @@ void LpdMessageReceiverTest::testReceiveMessage() 6000); sendsock->writeData(request.c_str(), request.size(), - LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT); + LPD_MULTICAST_ADDR, LPD_MULTICAST_PORT); SharedHandle msg = rcv.receiveMessage(); CPPUNIT_ASSERT(!msg.isNull());