/* */ #include "DefaultBtInteractive.h" #include "prefs.h" #include "message.h" #include "BtHandshakeMessage.h" #include "Util.h" #include "BtKeepAliveMessage.h" #include "BtChokeMessage.h" #include "BtUnchokeMessage.h" #include "DlAbortEx.h" void DefaultBtInteractive::initiateHandshake() { BtMessageHandle message = BT_MESSAGE_FACTORY(btContext, peer)-> createHandshakeMessage(btContext->getInfoHash(), btContext->getPeerId()); dispatcher->addMessageToQueue(message); dispatcher->sendMessages(); } BtMessageHandle DefaultBtInteractive::receiveHandshake(bool quickReply) { BtHandshakeMessageHandle message = btMessageReceiver->receiveHandshake(quickReply); if(message.isNull()) { return 0; } peer->setPeerId(message->getPeerId()); logger->info(MSG_RECEIVE_PEER_MESSAGE, cuid, peer->ipaddr.c_str(), peer->port, message->toString().c_str()); return message; } BtMessageHandle DefaultBtInteractive::receiveAndSendHandshake() { return receiveHandshake(true); } void DefaultBtInteractive::doPostHandshakeProcessing() { // TODO where is the valid place to rest haveCheckTime? haveCheckPoint.reset(); keepAliveCheckPoint.reset(); floodingCheckPoint.reset(); addBitfieldMessageToQueue(); addAllowedFastMessageToQueue(); } void DefaultBtInteractive::addBitfieldMessageToQueue() { BtMessageFactoryHandle factory = BT_MESSAGE_FACTORY(btContext, peer); if(peer->isFastExtensionEnabled()) { if(pieceStorage->downloadFinished()) { dispatcher->addMessageToQueue(factory->createHaveAllMessage()); } else if(pieceStorage->getCompletedLength() > 0) { dispatcher->addMessageToQueue(factory->createBitfieldMessage()); } else { dispatcher->addMessageToQueue(factory->createHaveNoneMessage()); } } else { if(pieceStorage->getCompletedLength() > 0) { dispatcher->addMessageToQueue(factory->createBitfieldMessage()); } } } void DefaultBtInteractive::addAllowedFastMessageToQueue() { if(peer->isFastExtensionEnabled()) { Integers fastSet = Util::computeFastSet(peer->ipaddr, btContext->getInfoHash(), btContext->getNumPieces(), allowedFastSetSize); for(Integers::const_iterator itr = fastSet.begin(); itr != fastSet.end(); itr++) { dispatcher->addMessageToQueue(BT_MESSAGE_FACTORY(btContext, peer)-> createAllowedFastMessage(*itr)); } } } void DefaultBtInteractive::decideChoking() { if(peer->shouldBeChoking()) { if(!peer->amChoking) { dispatcher->addMessageToQueue(BT_MESSAGE_FACTORY(btContext, peer)-> createChokeMessage()); } } else { if(peer->amChoking) { dispatcher->addMessageToQueue(BT_MESSAGE_FACTORY(btContext, peer)-> createUnchokeMessage()); } } } void DefaultBtInteractive::checkHave() { BtMessageFactoryHandle factory = BT_MESSAGE_FACTORY(btContext, peer); Integers indexes = pieceStorage->getAdvertisedPieceIndexes(cuid, haveCheckPoint); haveCheckPoint.reset(); if(indexes.size() >= 20) { if(peer->isFastExtensionEnabled() && pieceStorage->downloadFinished()) { dispatcher->addMessageToQueue(factory->createHaveAllMessage()); } else { dispatcher->addMessageToQueue(factory->createBitfieldMessage()); } } else { for(Integers::iterator itr = indexes.begin(); itr != indexes.end(); itr++) { dispatcher->addMessageToQueue(factory->createHaveMessage(*itr)); } } } void DefaultBtInteractive::sendKeepAlive() { if(keepAliveCheckPoint.elapsed(option->getAsInt(PREF_BT_KEEP_ALIVE_INTERVAL))) { if(dispatcher->countMessageInQueue() == 0) { dispatcher->addMessageToQueue(BT_MESSAGE_FACTORY(btContext, peer)->createKeepAliveMessage()); dispatcher->sendMessages(); } keepAliveCheckPoint.reset(); } } void DefaultBtInteractive::receiveMessages() { for(int i = 0; i < 50; i++) { int maxSpeedLimit = option->getAsInt(PREF_MAX_DOWNLOAD_LIMIT); if(maxSpeedLimit > 0) { TransferStat stat = peerStorage->calculateStat(); if(maxSpeedLimit < stat.downloadSpeed) { break; } } BtMessageHandle message = btMessageReceiver->receiveMessage(); if(message.isNull()) { break; } logger->info(MSG_RECEIVE_PEER_MESSAGE, cuid, peer->ipaddr.c_str(), peer->port, message->toString().c_str()); message->doReceivedAction(); switch(message->getId()) { case BtKeepAliveMessage::ID: floodingStat.incKeepAliveCount(); break; case BtChokeMessage::ID: if(!peer->peerChoking) { floodingStat.incChokeUnchokeCount(); } break; case BtUnchokeMessage::ID: if(peer->peerChoking) { floodingStat.incChokeUnchokeCount(); } break; } } } void DefaultBtInteractive::decideInterest() { if(!pieceStorage->hasMissingPiece(peer)) { if(peer->amInterested) { logger->debug("CUID#%d - Not interested in the peer", cuid); dispatcher-> addMessageToQueue(BT_MESSAGE_FACTORY(btContext, peer)-> createNotInterestedMessage()); } } else { if(!peer->amInterested) { logger->debug("CUID#%d - Interested in the peer", cuid); dispatcher-> addMessageToQueue(BT_MESSAGE_FACTORY(btContext, peer)-> createInterestedMessage()); } } } void DefaultBtInteractive::fillPiece(int maxPieceNum) { if(pieceStorage->hasMissingPiece(peer)) { if(peer->peerChoking) { dispatcher->doChokedAction(); if(peer->isFastExtensionEnabled()) { while(btRequestFactory->countTargetPiece() < maxPieceNum) { PieceHandle piece = pieceStorage->getMissingFastPiece(peer); if(piece.isNull()) { break; } else { btRequestFactory->addTargetPiece(piece); } } } } else { while(btRequestFactory->countTargetPiece() < maxPieceNum) { PieceHandle piece = pieceStorage->getMissingPiece(peer); if(piece.isNull()) { break; } else { btRequestFactory->addTargetPiece(piece); } } } } } void DefaultBtInteractive::addRequests() { uint32_t MAX_PENDING_REQUEST; if(peer->getLatency() < 500) { MAX_PENDING_REQUEST = 24; } else if(peer->getLatency() < 1500) { MAX_PENDING_REQUEST = 12; } else { MAX_PENDING_REQUEST = 6; } uint32_t pieceNum; if(pieceStorage->isEndGame()) { pieceNum = 1; } else { uint32_t blocks = DIV_FLOOR(btContext->getPieceLength(), BLOCK_LENGTH); pieceNum = DIV_FLOOR(MAX_PENDING_REQUEST, blocks); } fillPiece(pieceNum); uint32_t reqNumToCreate = MAX_PENDING_REQUEST <= dispatcher->countOutstandingRequest() ? 0 : MAX_PENDING_REQUEST-dispatcher->countOutstandingRequest(); if(reqNumToCreate > 0) { //logger->debug("CUID#%d - %u requets to go.", cuid, reqNumToCreate); BtMessages requests; if(pieceStorage->isEndGame()) { requests = btRequestFactory->createRequestMessagesOnEndGame(reqNumToCreate); } else { requests = btRequestFactory->createRequestMessages(reqNumToCreate); } dispatcher->addMessageToQueue(requests); } } void DefaultBtInteractive::cancelAllPiece() { btRequestFactory->removeAllTargetPiece(); } void DefaultBtInteractive::sendPendingMessage() { dispatcher->sendMessages(); } void DefaultBtInteractive::detectMessageFlooding() { if(floodingCheckPoint.elapsed(FLOODING_CHECK_INTERVAL)) { if(floodingStat.getChokeUnchokeCount() >= 2 || floodingStat.getKeepAliveCount() >= 2) { throw new DlAbortEx("Flooding detected."); } else { floodingStat.reset(); } floodingCheckPoint.reset(); } } void DefaultBtInteractive::doInteractionProcessing() { decideChoking(); detectMessageFlooding(); dispatcher->checkRequestSlotAndDoNecessaryThing(); checkHave(); sendKeepAlive(); receiveMessages(); btRequestFactory->removeCompletedPiece(); decideInterest(); if(!pieceStorage->downloadFinished()) { addRequests(); } sendPendingMessage(); }