/* */ #include "DefaultBtInteractive.h" #include #include "prefs.h" #include "message.h" #include "BtHandshakeMessage.h" #include "util.h" #include "BtKeepAliveMessage.h" #include "BtChokeMessage.h" #include "BtUnchokeMessage.h" #include "BtRequestMessage.h" #include "BtPieceMessage.h" #include "DlAbortEx.h" #include "BtExtendedMessage.h" #include "HandshakeExtensionMessage.h" #include "UTPexExtensionMessage.h" #include "DefaultExtensionMessageFactory.h" #include "ExtensionMessageRegistry.h" #include "DHTNode.h" #include "Peer.h" #include "Piece.h" #include "DownloadContext.h" #include "PieceStorage.h" #include "PeerStorage.h" #include "BtRuntime.h" #include "BtMessageReceiver.h" #include "BtMessageDispatcher.h" #include "BtMessageFactory.h" #include "BtRequestFactory.h" #include "PeerConnection.h" #include "Logger.h" #include "LogFactory.h" #include "StringFormat.h" #include "RequestGroup.h" #include "RequestGroupMan.h" #include "bittorrent_helper.h" #include "UTMetadataRequestFactory.h" #include "UTMetadataRequestTracker.h" namespace aria2 { DefaultBtInteractive::DefaultBtInteractive (const SharedHandle& downloadContext, const SharedHandle& peer) : _downloadContext(downloadContext), peer(peer), _metadataGetMode(false), logger(LogFactory::getInstance()), allowedFastSetSize(10), keepAliveInterval(120), _utPexEnabled(false), _dhtEnabled(false), _numReceivedMessage(0), _maxOutstandingRequest(DEFAULT_MAX_OUTSTANDING_REQUEST) {} DefaultBtInteractive::~DefaultBtInteractive() {} void DefaultBtInteractive::initiateHandshake() { SharedHandle message = messageFactory->createHandshakeMessage (bittorrent::getInfoHash(_downloadContext), bittorrent::getStaticPeerId()); dispatcher->addMessageToQueue(message); dispatcher->sendMessages(); } BtMessageHandle DefaultBtInteractive::receiveHandshake(bool quickReply) { SharedHandle message = btMessageReceiver->receiveHandshake(quickReply); if(message.isNull()) { return SharedHandle(); } if(memcmp(message->getPeerId(), bittorrent::getStaticPeerId(), PEER_ID_LENGTH) == 0) { throw DL_ABORT_EX (StringFormat ("CUID#%d - Drop connection from the same Peer ID", cuid).str()); } peer->setPeerId(message->getPeerId()); if(message->isFastExtensionSupported()) { peer->setFastExtensionEnabled(true); logger->info(MSG_FAST_EXTENSION_ENABLED, cuid); } if(message->isExtendedMessagingEnabled()) { peer->setExtendedMessagingEnabled(true); if(!_utPexEnabled) { _extensionMessageRegistry->removeExtension("ut_pex"); } logger->info(MSG_EXTENDED_MESSAGING_ENABLED, cuid); } if(message->isDHTEnabled()) { peer->setDHTEnabled(true); logger->info(MSG_DHT_ENABLED_PEER, cuid); } logger->info(MSG_RECEIVE_PEER_MESSAGE, cuid, peer->ipaddr.c_str(), peer->port, message->toString().c_str()); return message; } BtMessageHandle DefaultBtInteractive::receiveAndSendHandshake() { return receiveHandshake(true); } void DefaultBtInteractive::doPostHandshakeProcessing() { // Set time 0 to haveCheckPoint to cache http/ftp download piece completion haveCheckPoint.setTimeInSec(0); keepAliveCheckPoint.reset(); floodingCheckPoint.reset(); _pexCheckPoint.setTimeInSec(0); if(peer->isExtendedMessagingEnabled()) { addHandshakeExtendedMessageToQueue(); } if(!_metadataGetMode) { addBitfieldMessageToQueue(); } if(peer->isDHTEnabled() && _dhtEnabled) { addPortMessageToQueue(); } if(!_metadataGetMode) { addAllowedFastMessageToQueue(); } sendPendingMessage(); } void DefaultBtInteractive::addPortMessageToQueue() { dispatcher->addMessageToQueue (messageFactory->createPortMessage(_localNode->getPort())); } void DefaultBtInteractive::addHandshakeExtendedMessageToQueue() { static const std::string CLIENT_ARIA2("aria2/"PACKAGE_VERSION); HandshakeExtensionMessageHandle m(new HandshakeExtensionMessage()); m->setClientVersion(CLIENT_ARIA2); m->setTCPPort(_btRuntime->getListenPort()); m->setExtensions(_extensionMessageRegistry->getExtensions()); const BDE& attrs = _downloadContext->getAttribute(bittorrent::BITTORRENT); if(attrs.containsKey(bittorrent::METADATA)) { m->setMetadataSize(attrs[bittorrent::METADATA_SIZE].i()); } SharedHandle msg = messageFactory->createBtExtendedMessage(m); dispatcher->addMessageToQueue(msg); } void DefaultBtInteractive::addBitfieldMessageToQueue() { if(peer->isFastExtensionEnabled()) { if(_pieceStorage->allDownloadFinished()) { dispatcher->addMessageToQueue(messageFactory->createHaveAllMessage()); } else if(_pieceStorage->getCompletedLength() > 0) { dispatcher->addMessageToQueue(messageFactory->createBitfieldMessage()); } else { dispatcher->addMessageToQueue(messageFactory->createHaveNoneMessage()); } } else { if(_pieceStorage->getCompletedLength() > 0) { dispatcher->addMessageToQueue(messageFactory->createBitfieldMessage()); } } } void DefaultBtInteractive::addAllowedFastMessageToQueue() { if(peer->isFastExtensionEnabled()) { std::vector fastSet; bittorrent::computeFastSet(fastSet, peer->ipaddr, _downloadContext->getNumPieces(), bittorrent::getInfoHash(_downloadContext), allowedFastSetSize); for(std::vector::const_iterator itr = fastSet.begin(); itr != fastSet.end(); ++itr) { dispatcher->addMessageToQueue (messageFactory->createAllowedFastMessage(*itr)); } } } void DefaultBtInteractive::decideChoking() { if(peer->shouldBeChoking()) { if(!peer->amChoking()) { dispatcher->addMessageToQueue(messageFactory->createChokeMessage()); } } else { if(peer->amChoking()) { dispatcher->addMessageToQueue(messageFactory->createUnchokeMessage()); } } } void DefaultBtInteractive::checkHave() { std::deque indexes; _pieceStorage->getAdvertisedPieceIndexes(indexes, cuid, haveCheckPoint); haveCheckPoint.reset(); if(indexes.size() >= 20) { if(peer->isFastExtensionEnabled() && _pieceStorage->allDownloadFinished()) { dispatcher->addMessageToQueue(messageFactory->createHaveAllMessage()); } else { dispatcher->addMessageToQueue(messageFactory->createBitfieldMessage()); } } else { for(std::deque::iterator itr = indexes.begin(); itr != indexes.end(); ++itr) { dispatcher->addMessageToQueue(messageFactory->createHaveMessage(*itr)); } } } void DefaultBtInteractive::sendKeepAlive() { if(keepAliveCheckPoint.elapsed(keepAliveInterval)) { dispatcher->addMessageToQueue(messageFactory->createKeepAliveMessage()); dispatcher->sendMessages(); keepAliveCheckPoint.reset(); } } size_t DefaultBtInteractive::receiveMessages() { size_t countOldOutstandingRequest = dispatcher->countOutstandingRequest(); size_t msgcount = 0; for(int i = 0; i < 50; ++i) { if(_requestGroupMan->doesOverallDownloadSpeedExceed() || _downloadContext->getOwnerRequestGroup()->doesDownloadSpeedExceed()) { break; } BtMessageHandle message = btMessageReceiver->receiveMessage(); if(message.isNull()) { break; } ++msgcount; logger->info(MSG_RECEIVE_PEER_MESSAGE, cuid, peer->ipaddr.c_str(), peer->port, message->toString().c_str()); message->doReceivedAction(); switch(message->getId()) { case BtKeepAliveMessage::ID: floodingStat.incKeepAliveCount(); break; case BtChokeMessage::ID: if(!peer->peerChoking()) { floodingStat.incChokeUnchokeCount(); } break; case BtUnchokeMessage::ID: if(peer->peerChoking()) { floodingStat.incChokeUnchokeCount(); } break; case BtPieceMessage::ID: _peerStorage->updateTransferStatFor(peer); // pass through case BtRequestMessage::ID: inactiveCheckPoint.reset(); break; } } if(countOldOutstandingRequest > 0 && dispatcher->countOutstandingRequest() == 0){ _maxOutstandingRequest = std::min((size_t)UB_MAX_OUTSTANDING_REQUEST, _maxOutstandingRequest*2); } return msgcount; } void DefaultBtInteractive::decideInterest() { if(_pieceStorage->hasMissingPiece(peer)) { if(!peer->amInterested()) { logger->debug(MSG_PEER_INTERESTED, cuid); dispatcher-> addMessageToQueue(messageFactory->createInterestedMessage()); } } else { if(peer->amInterested()) { logger->debug(MSG_PEER_NOT_INTERESTED, cuid); dispatcher-> addMessageToQueue(messageFactory->createNotInterestedMessage()); } } } void DefaultBtInteractive::fillPiece(size_t maxMissingBlock) { if(_pieceStorage->hasMissingPiece(peer)) { size_t numMissingBlock = btRequestFactory->countMissingBlock(); if(peer->peerChoking()) { if(peer->isFastExtensionEnabled()) { std::deque excludedIndexes; btRequestFactory->getTargetPieceIndexes(excludedIndexes); while(numMissingBlock < maxMissingBlock) { SharedHandle piece = _pieceStorage->getMissingFastPiece(peer, excludedIndexes); if(piece.isNull()) { break; } else { btRequestFactory->addTargetPiece(piece); numMissingBlock += piece->countMissingBlock(); excludedIndexes.push_back(piece->getIndex()); } } } } else { std::deque excludedIndexes; btRequestFactory->getTargetPieceIndexes(excludedIndexes); while(numMissingBlock < maxMissingBlock) { SharedHandle piece = _pieceStorage->getMissingPiece(peer, excludedIndexes); if(piece.isNull()) { break; } else { btRequestFactory->addTargetPiece(piece); numMissingBlock += piece->countMissingBlock(); excludedIndexes.push_back(piece->getIndex()); } } } } } void DefaultBtInteractive::addRequests() { fillPiece(_maxOutstandingRequest); size_t reqNumToCreate = _maxOutstandingRequest <= dispatcher->countOutstandingRequest() ? 0 : _maxOutstandingRequest-dispatcher->countOutstandingRequest(); if(reqNumToCreate > 0) { BtMessages requests; if(_pieceStorage->isEndGame()) { btRequestFactory->createRequestMessagesOnEndGame(requests,reqNumToCreate); } else { btRequestFactory->createRequestMessages(requests, reqNumToCreate); } dispatcher->addMessageToQueue(requests); } } void DefaultBtInteractive::cancelAllPiece() { btRequestFactory->removeAllTargetPiece(); if(_metadataGetMode && _downloadContext->getTotalLength() > 0) { std::vector metadataRequests = _utMetadataRequestTracker->getAllTrackedIndex(); for(std::vector::const_iterator i = metadataRequests.begin(); i != metadataRequests.end(); ++i) { logger->debug("Cancel metadata: piece=%lu", static_cast(*i)); _pieceStorage->cancelPiece(_pieceStorage->getPiece(*i)); } } } void DefaultBtInteractive::sendPendingMessage() { dispatcher->sendMessages(); } void DefaultBtInteractive::detectMessageFlooding() { if(floodingCheckPoint.elapsed(FLOODING_CHECK_INTERVAL)) { if(floodingStat.getChokeUnchokeCount() >= 2 || floodingStat.getKeepAliveCount() >= 2) { throw DL_ABORT_EX(EX_FLOODING_DETECTED); } else { floodingStat.reset(); } floodingCheckPoint.reset(); } } void DefaultBtInteractive::checkActiveInteraction() { // To allow aria2 to accept mutially interested peer, disconnect unintersted // peer. { const time_t interval = 30; if(!peer->amInterested() && !peer->peerInterested() && inactiveCheckPoint.elapsed(interval)) { // TODO change the message throw DL_ABORT_EX (StringFormat("Disconnect peer because we are not interested each other" " after %u second(s).", interval).str()); } } // Since the peers which are *just* connected and do nothing to improve // mutual download progress are completely waste of resources, those peers // are disconnected in a certain time period. { const time_t interval = 60; if(inactiveCheckPoint.elapsed(interval)) { throw DL_ABORT_EX (StringFormat(EX_DROP_INACTIVE_CONNECTION, interval).str()); } } } void DefaultBtInteractive::addPeerExchangeMessage() { if(_pexCheckPoint.elapsed(UTPexExtensionMessage::DEFAULT_INTERVAL)) { UTPexExtensionMessageHandle m (new UTPexExtensionMessage(peer->getExtensionMessageID("ut_pex"))); const Peers& peers = _peerStorage->getPeers(); { for(std::deque >::const_iterator i = peers.begin(); i != peers.end() && !m->freshPeersAreFull(); ++i) { if(peer->ipaddr != (*i)->ipaddr) { m->addFreshPeer(*i); } } } { for(std::deque >::const_reverse_iterator i = peers.rbegin(); i != peers.rend() && !m->droppedPeersAreFull(); ++i) { if(peer->ipaddr != (*i)->ipaddr) { m->addDroppedPeer(*i); } } } BtMessageHandle msg = messageFactory->createBtExtendedMessage(m); dispatcher->addMessageToQueue(msg); _pexCheckPoint.reset(); } } void DefaultBtInteractive::doInteractionProcessing() { if(_metadataGetMode) { sendKeepAlive(); _numReceivedMessage = receiveMessages(); // PieceStorage is re-initialized with metadata_size in // HandshakeExtensionMessage::doReceivedAction(). _pieceStorage = _downloadContext->getOwnerRequestGroup()->getPieceStorage(); if(peer->getExtensionMessageID("ut_metadata") && _downloadContext->getTotalLength() > 0) { size_t num = _utMetadataRequestTracker->avail(); if(num > 0) { std::deque > requests; _utMetadataRequestFactory->create(requests, num, _pieceStorage); dispatcher->addMessageToQueue(requests); } if(_perSecCheckPoint.elapsed(1)) { _perSecCheckPoint.reset(); // Drop timeout request after queuing message to give a chance // to other connection to request piece. std::vector indexes = _utMetadataRequestTracker->removeTimeoutEntry(); for(std::vector::const_iterator i = indexes.begin(); i != indexes.end(); ++i) { _pieceStorage->cancelPiece(_pieceStorage->getPiece(*i)); } } if(_pieceStorage->downloadFinished()) { _downloadContext->getOwnerRequestGroup()->setForceHaltRequested (true, RequestGroup::NONE); } } } else { checkActiveInteraction(); decideChoking(); detectMessageFlooding(); if(_perSecCheckPoint.elapsed(1)) { _perSecCheckPoint.reset(); dispatcher->checkRequestSlotAndDoNecessaryThing(); } checkHave(); sendKeepAlive(); _numReceivedMessage = receiveMessages(); btRequestFactory->removeCompletedPiece(); decideInterest(); if(!_pieceStorage->downloadFinished()) { addRequests(); } } if(peer->getExtensionMessageID("ut_pex") && _utPexEnabled) { addPeerExchangeMessage(); } sendPendingMessage(); } void DefaultBtInteractive::setLocalNode(const WeakHandle& node) { _localNode = node; } size_t DefaultBtInteractive::countPendingMessage() { return dispatcher->countMessageInQueue(); } bool DefaultBtInteractive::isSendingMessageInProgress() { return dispatcher->isSendingInProgress(); } size_t DefaultBtInteractive::countReceivedMessageInIteration() const { return _numReceivedMessage; } size_t DefaultBtInteractive::countOutstandingRequest() { if(_metadataGetMode) { return _utMetadataRequestTracker->count(); } else { return dispatcher->countOutstandingRequest(); } } void DefaultBtInteractive::setBtRuntime (const SharedHandle& btRuntime) { _btRuntime = btRuntime; } void DefaultBtInteractive::setPieceStorage (const SharedHandle& pieceStorage) { _pieceStorage = pieceStorage; } void DefaultBtInteractive::setPeerStorage (const SharedHandle& peerStorage) { _peerStorage = peerStorage; } void DefaultBtInteractive::setPeer(const SharedHandle& peer) { this->peer = peer; } void DefaultBtInteractive::setBtMessageReceiver (const SharedHandle& receiver) { this->btMessageReceiver = receiver; } void DefaultBtInteractive::setDispatcher (const SharedHandle& dispatcher) { this->dispatcher = dispatcher; } void DefaultBtInteractive::setBtRequestFactory (const SharedHandle& factory) { this->btRequestFactory = factory; } void DefaultBtInteractive::setPeerConnection (const SharedHandle& peerConnection) { this->peerConnection = peerConnection; } void DefaultBtInteractive::setExtensionMessageFactory (const SharedHandle& factory) { _extensionMessageFactory = factory; } void DefaultBtInteractive::setBtMessageFactory (const SharedHandle& factory) { this->messageFactory = factory; } void DefaultBtInteractive::setRequestGroupMan (const WeakHandle& rgman) { _requestGroupMan = rgman; } } // namespace aria2