/* */ #include "DefaultBtInteractive.h" #include "prefs.h" #include "message.h" #include "BtHandshakeMessage.h" #include "Util.h" #include "BtKeepAliveMessage.h" #include "BtChokeMessage.h" #include "BtUnchokeMessage.h" #include "BtRequestMessage.h" #include "BtPieceMessage.h" #include "DlAbortEx.h" #include "BtExtendedMessage.h" #include "HandshakeExtensionMessage.h" #include "UTPexExtensionMessage.h" #include "DefaultExtensionMessageFactory.h" #include "BtRegistry.h" void DefaultBtInteractive::initiateHandshake() { BtHandshakeMessageHandle message = messageFactory->createHandshakeMessage(btContext->getInfoHash(), btContext->getPeerId()); dispatcher->addMessageToQueue(message); dispatcher->sendMessages(); } BtMessageHandle DefaultBtInteractive::receiveHandshake(bool quickReply) { BtHandshakeMessageHandle message = btMessageReceiver->receiveHandshake(quickReply); if(message.isNull()) { return 0; } peer->setPeerId(message->getPeerId()); if(message->isFastExtensionSupported()) { peer->setFastExtensionEnabled(true); logger->info(MSG_FAST_EXTENSION_ENABLED, cuid); } if(message->isExtendedMessagingEnabled()) { peer->setExtendedMessagingEnabled(true); DefaultExtensionMessageFactoryHandle factory = new DefaultExtensionMessageFactory(btContext, peer); if(!_utPexEnabled) { factory->removeExtension("ut_pex"); } PEER_OBJECT(btContext, peer)->extensionMessageFactory = factory; logger->info(MSG_EXTENDED_MESSAGING_ENABLED, cuid); } logger->info(MSG_RECEIVE_PEER_MESSAGE, cuid, peer->ipaddr.c_str(), peer->port, message->toString().c_str()); return message; } BtMessageHandle DefaultBtInteractive::receiveAndSendHandshake() { return receiveHandshake(true); } void DefaultBtInteractive::doPostHandshakeProcessing() { // TODO where is the valid place to rest haveCheckTime? haveCheckPoint.reset(); keepAliveCheckPoint.reset(); floodingCheckPoint.reset(); _pexCheckPoint.setTimeInSec(0); if(peer->isExtendedMessagingEnabled()) { addHandshakeExtendedMessageToQueue(); } addBitfieldMessageToQueue(); addAllowedFastMessageToQueue(); sendPendingMessage(); } void DefaultBtInteractive::addHandshakeExtendedMessageToQueue() { HandshakeExtensionMessageHandle m = new HandshakeExtensionMessage(); m->setClientVersion("aria2"); m->setTCPPort(btRuntime->getListenPort()); m->setExtensions(EXTENSION_MESSAGE_FACTORY(btContext, peer)->getExtensions()); BtExtendedMessageHandle msg = messageFactory->createBtExtendedMessage(m); dispatcher->addMessageToQueue(msg); } void DefaultBtInteractive::addBitfieldMessageToQueue() { if(peer->isFastExtensionEnabled()) { if(pieceStorage->allDownloadFinished()) { dispatcher->addMessageToQueue(messageFactory->createHaveAllMessage()); } else if(pieceStorage->getCompletedLength() > 0) { dispatcher->addMessageToQueue(messageFactory->createBitfieldMessage()); } else { dispatcher->addMessageToQueue(messageFactory->createHaveNoneMessage()); } } else { if(pieceStorage->getCompletedLength() > 0) { dispatcher->addMessageToQueue(messageFactory->createBitfieldMessage()); } } } void DefaultBtInteractive::addAllowedFastMessageToQueue() { if(peer->isFastExtensionEnabled()) { Integers fastSet = btContext->computeFastSet(peer->ipaddr, allowedFastSetSize); for(Integers::const_iterator itr = fastSet.begin(); itr != fastSet.end(); itr++) { dispatcher->addMessageToQueue(messageFactory->createAllowedFastMessage(*itr)); } } } void DefaultBtInteractive::decideChoking() { if(peer->shouldBeChoking()) { if(!peer->amChoking) { dispatcher->addMessageToQueue(messageFactory->createChokeMessage()); } } else { if(peer->amChoking) { dispatcher->addMessageToQueue(messageFactory->createUnchokeMessage()); } } } void DefaultBtInteractive::checkHave() { Integers indexes = pieceStorage->getAdvertisedPieceIndexes(cuid, haveCheckPoint); haveCheckPoint.reset(); if(indexes.size() >= 20) { if(peer->isFastExtensionEnabled() && pieceStorage->allDownloadFinished()) { dispatcher->addMessageToQueue(messageFactory->createHaveAllMessage()); } else { dispatcher->addMessageToQueue(messageFactory->createBitfieldMessage()); } } else { for(Integers::iterator itr = indexes.begin(); itr != indexes.end(); itr++) { dispatcher->addMessageToQueue(messageFactory->createHaveMessage(*itr)); } } } void DefaultBtInteractive::sendKeepAlive() { if(keepAliveCheckPoint.elapsed(keepAliveInterval)) { dispatcher->addMessageToQueue(messageFactory->createKeepAliveMessage()); dispatcher->sendMessages(); keepAliveCheckPoint.reset(); } } void DefaultBtInteractive::receiveMessages() { for(int i = 0; i < 50; i++) { if(maxDownloadSpeedLimit > 0) { TransferStat stat = peerStorage->calculateStat(); if(maxDownloadSpeedLimit < stat.downloadSpeed) { break; } } BtMessageHandle message = btMessageReceiver->receiveMessage(); if(message.isNull()) { break; } logger->info(MSG_RECEIVE_PEER_MESSAGE, cuid, peer->ipaddr.c_str(), peer->port, message->toString().c_str()); message->doReceivedAction(); switch(message->getId()) { case BtKeepAliveMessage::ID: floodingStat.incKeepAliveCount(); break; case BtChokeMessage::ID: if(!peer->peerChoking) { floodingStat.incChokeUnchokeCount(); } break; case BtUnchokeMessage::ID: if(peer->peerChoking) { floodingStat.incChokeUnchokeCount(); } break; case BtRequestMessage::ID: case BtPieceMessage::ID: inactiveCheckPoint.reset(); break; } } } void DefaultBtInteractive::decideInterest() { if(pieceStorage->hasMissingPiece(peer)) { if(!peer->amInterested) { logger->debug(MSG_PEER_INTERESTED, cuid); dispatcher-> addMessageToQueue(messageFactory->createInterestedMessage()); } } else { if(peer->amInterested) { logger->debug(MSG_PEER_NOT_INTERESTED, cuid); dispatcher-> addMessageToQueue(messageFactory->createNotInterestedMessage()); } } } void DefaultBtInteractive::fillPiece(int maxPieceNum) { if(pieceStorage->hasMissingPiece(peer)) { if(peer->peerChoking) { if(peer->isFastExtensionEnabled()) { while(btRequestFactory->countTargetPiece() < maxPieceNum) { PieceHandle piece = pieceStorage->getMissingFastPiece(peer); if(piece.isNull()) { break; } else { btRequestFactory->addTargetPiece(piece); } } } } else { while(btRequestFactory->countTargetPiece() < maxPieceNum) { PieceHandle piece = pieceStorage->getMissingPiece(peer); if(piece.isNull()) { break; } else { btRequestFactory->addTargetPiece(piece); } } } } } void DefaultBtInteractive::addRequests() { int32_t MAX_PENDING_REQUEST; if(peer->getLatency() < 500) { MAX_PENDING_REQUEST = 24; } else if(peer->getLatency() < 1500) { MAX_PENDING_REQUEST = 12; } else { MAX_PENDING_REQUEST = 6; } int32_t pieceNum; if(pieceStorage->isEndGame()) { pieceNum = 1; } else { int32_t blocks = DIV_FLOOR(btContext->getPieceLength(), Piece::BLOCK_LENGTH); pieceNum = DIV_FLOOR(MAX_PENDING_REQUEST, blocks); } fillPiece(pieceNum); int32_t reqNumToCreate = MAX_PENDING_REQUEST <= dispatcher->countOutstandingRequest() ? 0 : MAX_PENDING_REQUEST-dispatcher->countOutstandingRequest(); if(reqNumToCreate > 0) { BtMessages requests; if(pieceStorage->isEndGame()) { requests = btRequestFactory->createRequestMessagesOnEndGame(reqNumToCreate); } else { requests = btRequestFactory->createRequestMessages(reqNumToCreate); } dispatcher->addMessageToQueue(requests); } } void DefaultBtInteractive::cancelAllPiece() { btRequestFactory->removeAllTargetPiece(); } void DefaultBtInteractive::sendPendingMessage() { dispatcher->sendMessages(); } void DefaultBtInteractive::detectMessageFlooding() { if(floodingCheckPoint.elapsed(FLOODING_CHECK_INTERVAL)) { if(floodingStat.getChokeUnchokeCount() >= 2 || floodingStat.getKeepAliveCount() >= 2) { throw new DlAbortEx(EX_FLOODING_DETECTED); } else { floodingStat.reset(); } floodingCheckPoint.reset(); } } void DefaultBtInteractive::checkActiveInteraction() { int32_t interval = 5*60; if(inactiveCheckPoint.elapsed(interval)) { throw new DlAbortEx(EX_DROP_INACTIVE_CONNECTION, interval); } } void DefaultBtInteractive::addPeerExchangeMessage() { time_t interval = 60; if(_pexCheckPoint.elapsed(interval)) { UTPexExtensionMessageHandle m = new UTPexExtensionMessage(peer->getExtensionMessageID("ut_pex")); const Peers& peers = peerStorage->getPeers(); { size_t max = 30; for(Peers::const_iterator i = peers.begin(); i != peers.end() && max; ++i) { const PeerHandle& cpeer = *i; if(peer->ipaddr != cpeer->ipaddr && !cpeer->getFirstContactTime().elapsed(interval) && Util::isNumbersAndDotsNotation(cpeer->ipaddr)) { m->addFreshPeer(cpeer); --max; } } } { size_t max = 10; for(Peers::const_reverse_iterator i = peers.rbegin(); i != peers.rend() && max; ++i) { const PeerHandle& cpeer = *i; if(peer->ipaddr != cpeer->ipaddr && !cpeer->getBadConditionStartTime().elapsed(interval) && Util::isNumbersAndDotsNotation(cpeer->ipaddr)) { m->addDroppedPeer(cpeer); --max; } } } BtExtendedMessageHandle msg = messageFactory->createBtExtendedMessage(m); dispatcher->addMessageToQueue(msg); _pexCheckPoint.reset(); } } void DefaultBtInteractive::doInteractionProcessing() { checkActiveInteraction(); decideChoking(); detectMessageFlooding(); dispatcher->checkRequestSlotAndDoNecessaryThing(); checkHave(); sendKeepAlive(); receiveMessages(); btRequestFactory->removeCompletedPiece(); decideInterest(); if(!pieceStorage->downloadFinished()) { addRequests(); } if(peer->getExtensionMessageID("ut_pex") && _utPexEnabled) { addPeerExchangeMessage(); } sendPendingMessage(); }