From e1f24adc4046dfc24d01fc5eb9797d05420c136f Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Tue, 28 Mar 2006 15:23:51 +0000 Subject: [PATCH] Added new class SendMessageQueue that includes PendingMessages and RequestSlotMan. * src/SendMessageQueue.h: New class. * src/SendMessageQueue.cc: New class. * src/PendingMessage.h: Added new member variable blockIndex and its accessors. (createRequestMessage): Updated. * src/PendingMessage.cc (createRequestMessage): Updated. * src/PeerInteractionCommand.cc (executeInternal): Updated with SendMessageQueue. (checkLongTimePeerChoking): Updated with SendMessageQueue. (receiveMessage): Updated with SendMessageQueue. (deletePendingPieceMessage): Removed. (getNewPieceAndSendInterest): Updated with SendMessageQueue. (sendInterest): Updated with SendMessageQueue. (createRequestPendingMessage): Updated with SendMessageQueue. (sendMessages): Updated with SendMessageQueue. (onAbort): Updated with SendMessageQueue. (keepAlive): Updated with SendMessageQueue. (beforeSocketCheck): Updated SendMessageQueue. * src/PeerInteractionCommand (sendMessages): Shuffle missingBLockIndexes before using it. Added its own timeout for peer connection. * src/PeerAbstractCommand.h: Added member variable timeout and its setter. * src/prefs.h: Added PREF_PEER_CONNECTION_TIMEOUT. * src/PeerInteractionCommand.cc (PeerInteractionCommand): Added setTimeout() call. (executeInternal): Added setTimeout() call. * src/PeerAbstractCommand.cc (PeerAbstractCommand): Added timeout. (isTimeoutDetected): Updated. * src/main.cc (main): Added PREF_PEER_CONNECTION_TIMEOUT entry to option. Added *simple* message flooding checker. * src/PeerInteractionCommand.cc (executeInternal): Added detectMessageFlooding() call. (detectMessageFlooding): New function. (receiveMessage): Count up CHOKE, UNCHOKE, HAVE message. (beforeSocketCheck): Added detectMessageFlooding() call. * src/PeerInteractionCommand.h: Added sendMessageQueue, chokeUnchokeCount, haveCount, detectMessageFlooding(). Removed deletePendingPieceMessage(), getRequestSlot(), deleteRequestSlot(), deleteAllRequestSlot(). * src/PeerInteractionCommand.cc (beforeSocketCheck): Added checkLongTimePeerChoking() call. * src/RequestSlotMan.h: Renamed deleteTimeoutRequestSlot(). * src/TorrentMan.cc (addPeer): Delete at most MAX_PEER_LIST_SIZE peers if duplicate == false. The parameter "uploaded" and "downloaded" in the tracker request are the size since the client sent the "started" event to the tracker. * src/TorrentMan.cc (setup): Assigned saved downloaded Size and uploaded size to preDownloadedSize, preUploadedSize respectively. * src/TorrentMan.h: Added preDownloadedSize, preUploadedSize, getSessionDownloadedSize(), getSessionUploadedSize(). * src/TrackerInitCommand.cc (execute): Use getSessionDownloadedSize(), getSessionUploadedSize() instead of getDownloadedSize(), getUploadedSize(). * src/PendingMessage.cc (processMessage): Do not send request message if the peer is choking the client. * src/TrackerUpdateCommand.cc (execute): Check wtheher minInterval is less than interval. --- ChangeLog | 88 ++++++++++++++++++++-- TODO | 6 +- src/Makefile.am | 3 +- src/Makefile.in | 7 +- src/PeerAbstractCommand.cc | 4 +- src/PeerAbstractCommand.h | 3 +- src/PeerInteractionCommand.cc | 137 +++++++++++++++++----------------- src/PeerInteractionCommand.h | 14 ++-- src/PeerListenCommand.cc | 4 +- src/PendingMessage.cc | 15 ++-- src/PendingMessage.h | 5 +- src/RequestSlotMan.cc | 2 +- src/RequestSlotMan.h | 2 +- src/SendMessageQueue.cc | 122 ++++++++++++++++++++++++++++++ src/SendMessageQueue.h | 57 ++++++++++++++ src/TorrentMan.cc | 10 +-- src/TorrentMan.h | 9 +++ src/TrackerInitCommand.cc | 4 +- src/TrackerUpdateCommand.cc | 3 + src/main.cc | 1 + src/prefs.h | 5 ++ 21 files changed, 392 insertions(+), 109 deletions(-) create mode 100644 src/SendMessageQueue.cc create mode 100644 src/SendMessageQueue.h diff --git a/ChangeLog b/ChangeLog index ad107dd1..0f1643b6 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,81 @@ +2006-03-28 Tatsuhiro Tsujikawa + + Added new class SendMessageQueue that includes PendingMessages and + RequestSlotMan. + + * src/SendMessageQueue.h: New class. + * src/SendMessageQueue.cc: New class. + * src/PendingMessage.h: Added new member variable blockIndex and its + accessors. + (createRequestMessage): Updated. + * src/PendingMessage.cc (createRequestMessage): Updated. + * src/PeerInteractionCommand.cc (executeInternal): Updated with + SendMessageQueue. + (checkLongTimePeerChoking): Updated with SendMessageQueue. + (receiveMessage): Updated with SendMessageQueue. + (deletePendingPieceMessage): Removed. + (getNewPieceAndSendInterest): Updated with SendMessageQueue. + (sendInterest): Updated with SendMessageQueue. + (createRequestPendingMessage): Updated with SendMessageQueue. + (sendMessages): Updated with SendMessageQueue. + (onAbort): Updated with SendMessageQueue. + (keepAlive): Updated with SendMessageQueue. + (beforeSocketCheck): Updated SendMessageQueue. + + * src/PeerInteractionCommand (sendMessages): Shuffle + missingBLockIndexes before using it. + + Added its own timeout for peer connection. + + * src/PeerAbstractCommand.h: Added member variable timeout and its + setter. + * src/prefs.h: Added PREF_PEER_CONNECTION_TIMEOUT. + * src/PeerInteractionCommand.cc (PeerInteractionCommand): + Added setTimeout() call. + (executeInternal): Added setTimeout() call. + * src/PeerAbstractCommand.cc (PeerAbstractCommand): + Added timeout. + (isTimeoutDetected): Updated. + * src/main.cc (main): Added PREF_PEER_CONNECTION_TIMEOUT entry to + option. + + Added *simple* message flooding checker. + + * src/PeerInteractionCommand.cc (executeInternal): + Added detectMessageFlooding() call. + (detectMessageFlooding): New function. + (receiveMessage): Count up CHOKE, UNCHOKE, HAVE message. + (beforeSocketCheck): Added detectMessageFlooding() call. + * src/PeerInteractionCommand.h: Added sendMessageQueue, + chokeUnchokeCount, haveCount, detectMessageFlooding(). + Removed deletePendingPieceMessage(), getRequestSlot(), + deleteRequestSlot(), deleteAllRequestSlot(). + + * src/PeerInteractionCommand.cc (beforeSocketCheck): + Added checkLongTimePeerChoking() call. + + * src/RequestSlotMan.h: Renamed deleteTimeoutRequestSlot(). + + * src/TorrentMan.cc (addPeer): Delete at most MAX_PEER_LIST_SIZE peers + if duplicate == false. + + The parameter "uploaded" and "downloaded" in the tracker request are + the size since the client sent the "started" event to the tracker. + + * src/TorrentMan.cc (setup): Assigned saved downloaded Size and + uploaded size to preDownloadedSize, preUploadedSize respectively. + * src/TorrentMan.h: Added preDownloadedSize, preUploadedSize, + getSessionDownloadedSize(), getSessionUploadedSize(). + * src/TrackerInitCommand.cc (execute): Use getSessionDownloadedSize(), + getSessionUploadedSize() instead of getDownloadedSize(), + getUploadedSize(). + + * src/PendingMessage.cc (processMessage): Do not send request message + if the peer is choking the client. + + * src/TrackerUpdateCommand.cc (execute): Check wtheher minInterval is + less than interval. + 2006-03-27 Tatsuhiro Tsujikawa * configure.in: Added gnutls support. Added several CPP macros. @@ -28,24 +106,20 @@ * src/InitiateConnectionCommandFactory.cc: Replaced HAVE_LIBSSL with ENABLE_SSL. * src/Request.cc: Replaced HAVE_LIBSSL with ENABLE_SSL. - * src/RequestSlotMan.cc: - (deleteCompletedRequestSlot) + * src/RequestSlotMan.cc (deleteCompletedRequestSlot): If a piece is already acquired by another command, delete the request slots for the piece. - * src/TrackerUpdateCommand.cc: - (execute) + * src/TrackerUpdateCommand.cc (execute): Changed log level of MSG_TRACKER_WARNING_MESSAGE from info to warn. Added a check whether peer list is null. Fixed the bug that causes sending completed event to the tracker several times. - * src/TrackerInitCommand.cc: - (execute) + * src/TrackerInitCommand.cc (execute): Fixed the bug that causes sending completed event to the tracker several times. * src/AbstractDiskWriter.{h,cc}: Removed direct dependency on OpenSSL by using messageDigest.h. - 2006-03-26 Tatsuhiro Tsujikawa * PeerConnection.cc: Replaced log message "keep-alive" with diff --git a/TODO b/TODO index 671f9b44..2f344b4d 100644 --- a/TODO +++ b/TODO @@ -9,4 +9,8 @@ * Tracker UDP protocol * no-compact peers format * Add port range command-line option -* Add max peers command-line option \ No newline at end of file +* Add max peers command-line option +* Distinguish seeder from leecher +* time out for connecting peers +* ignore incoming connection from localhost. +* do not connect to localhost diff --git a/src/Makefile.am b/src/Makefile.am index 3f5811b8..4161700d 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -77,7 +77,8 @@ SRCS = Socket.cc Socket.h\ TorrentAutoSaveCommand.cc TorrentAutoSaveCommand.h\ Directory.cc Directory.h\ TrackerWatcherCommand.cc TrackerWatcherCommand.h\ - messageDigest.h + messageDigest.h\ + SendMessageQueue.cc SendMessageQueue.h noinst_LIBRARIES = libaria2c.a libaria2c_a_SOURCES = $(SRCS) aria2c_LDADD = libaria2c.a @LIBINTL@ @ALLOCA@ @LIBGNUTLS_LIBS@\ diff --git a/src/Makefile.in b/src/Makefile.in index 20eb7c28..d5fcff57 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -97,7 +97,8 @@ am__objects_1 = Socket.$(OBJEXT) SocketCore.$(OBJEXT) \ PeerListenCommand.$(OBJEXT) PendingMessage.$(OBJEXT) \ PeerMessage.$(OBJEXT) Piece.$(OBJEXT) RequestSlot.$(OBJEXT) \ RequestSlotMan.$(OBJEXT) TorrentAutoSaveCommand.$(OBJEXT) \ - Directory.$(OBJEXT) TrackerWatcherCommand.$(OBJEXT) + Directory.$(OBJEXT) TrackerWatcherCommand.$(OBJEXT) \ + SendMessageQueue.$(OBJEXT) am_libaria2c_a_OBJECTS = $(am__objects_1) libaria2c_a_OBJECTS = $(am_libaria2c_a_OBJECTS) am__installdirs = "$(DESTDIR)$(bindir)" @@ -324,7 +325,8 @@ SRCS = Socket.cc Socket.h\ TorrentAutoSaveCommand.cc TorrentAutoSaveCommand.h\ Directory.cc Directory.h\ TrackerWatcherCommand.cc TrackerWatcherCommand.h\ - messageDigest.h + messageDigest.h\ + SendMessageQueue.cc SendMessageQueue.h noinst_LIBRARIES = libaria2c.a libaria2c_a_SOURCES = $(SRCS) @@ -458,6 +460,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/RequestSlotMan.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SegmentMan.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SegmentSplitter.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SendMessageQueue.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ShaVisitor.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SimpleLogger.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SleepCommand.Po@am__quote@ diff --git a/src/PeerAbstractCommand.cc b/src/PeerAbstractCommand.cc index 92f1ac7a..269d4411 100644 --- a/src/PeerAbstractCommand.cc +++ b/src/PeerAbstractCommand.cc @@ -38,7 +38,7 @@ PeerAbstractCommand::PeerAbstractCommand(int cuid, Peer* peer, TorrentDownloadEn } this->checkPoint.tv_sec = 0; this->checkPoint.tv_usec = 0; - + timeout = e->option->getAsInt(PREF_TIMEOUT); e->torrentMan->connections++; } @@ -63,7 +63,7 @@ bool PeerAbstractCommand::isTimeoutDetected() { return false; } else { long long int elapsed = Util::difftv(now, checkPoint); - if(elapsed >= e->option->getAsLLInt(PREF_TIMEOUT)*1000000) { + if(elapsed >= ((long long int)timeout)*1000000) { return true; } else { return false; diff --git a/src/PeerAbstractCommand.h b/src/PeerAbstractCommand.h index a6250a02..317e0dd7 100644 --- a/src/PeerAbstractCommand.h +++ b/src/PeerAbstractCommand.h @@ -32,11 +32,12 @@ private: void updateCheckPoint(); bool isTimeoutDetected(); struct timeval checkPoint; + int timeout; protected: TorrentDownloadEngine* e; Socket* socket; Peer* peer; - + void setTimeout(int timeout) { this->timeout = timeout; } virtual bool prepareForNextPeer(int wait); virtual bool prepareForRetry(int wait); virtual void onAbort(Exception* ex); diff --git a/src/PeerInteractionCommand.cc b/src/PeerInteractionCommand.cc index 59ff8663..b1021df6 100644 --- a/src/PeerInteractionCommand.cc +++ b/src/PeerInteractionCommand.cc @@ -25,6 +25,8 @@ #include "DlAbortEx.h" #include "Util.h" #include "message.h" +#include "prefs.h" +#include PeerInteractionCommand::PeerInteractionCommand(int cuid, Peer* peer, TorrentDownloadEngine* e, @@ -33,21 +35,26 @@ PeerInteractionCommand::PeerInteractionCommand(int cuid, Peer* peer, if(sequence == INITIATOR_SEND_HANDSHAKE) { setReadCheckSocket(NULL); setWriteCheckSocket(socket); + setTimeout(e->option->getAsInt(PREF_PEER_CONNECTION_TIMEOUT)); } peerConnection = new PeerConnection(cuid, socket, e->option, e->logger, peer, e->torrentMan); - requestSlotMan = new RequestSlotMan(cuid, &pendingMessages, peerConnection, - e->torrentMan, e->logger); + sendMessageQueue = new SendMessageQueue(cuid, peerConnection, e->torrentMan, + e->logger); piece = Piece::nullPiece; keepAliveCheckPoint.tv_sec = 0; keepAliveCheckPoint.tv_usec = 0; chokeCheckPoint.tv_sec = 0; chokeCheckPoint.tv_usec = 0; + freqCheckPoint.tv_sec = 0; + freqCheckPoint.tv_usec = 0; + chokeUnchokeCount = 0; + haveCount = 0; } PeerInteractionCommand::~PeerInteractionCommand() { delete peerConnection; - delete requestSlotMan; + delete sendMessageQueue; e->torrentMan->unadvertisePiece(cuid); } @@ -55,6 +62,7 @@ bool PeerInteractionCommand::executeInternal() { if(sequence == INITIATOR_SEND_HANDSHAKE) { socket->setBlockingMode(); setReadCheckSocket(socket); + setTimeout(e->option->getAsInt(PREF_TIMEOUT)); } setWriteCheckSocket(NULL); @@ -97,6 +105,7 @@ bool PeerInteractionCommand::executeInternal() { break; } case WIRED: + detectMessageFlooding(); checkLongTimePeerChoking(); syncPiece(); decideChoking(); @@ -106,19 +115,38 @@ bool PeerInteractionCommand::executeInternal() { } receiveMessage(); } - requestSlotMan->deleteTimedoutRequestSlot(piece); - requestSlotMan->deleteCompletedRequestSlot(piece); + sendMessageQueue->deleteTimeoutRequestSlot(piece); + sendMessageQueue->deleteCompletedRequestSlot(piece); sendInterest(); sendMessages(); break; } - if(pendingMessages.size() > 0) { + if(sendMessageQueue->countPendingMessage() > 0) { setWriteCheckSocket(socket); } e->commands.push(this); return false; } +void PeerInteractionCommand::detectMessageFlooding() { + struct timeval now; + gettimeofday(&now, NULL); + if(freqCheckPoint.tv_sec == 0 && freqCheckPoint.tv_usec == 0) { + freqCheckPoint = now; + } else { + if(Util::difftv(now, freqCheckPoint) >= 5*1000000) { + if(chokeUnchokeCount*1.0/(Util::difftv(now, freqCheckPoint)/1000000) >= 0.3 + || haveCount*1.0/(Util::difftv(now, freqCheckPoint)/1000000) >= 20.0) { + throw new DlAbortEx("flooding detected."); + } else { + chokeUnchokeCount = 0; + haveCount = 0; + freqCheckPoint = now; + } + } + } +} + void PeerInteractionCommand::checkLongTimePeerChoking() { if(e->torrentMan->downloadComplete()) { return; @@ -152,21 +180,21 @@ void PeerInteractionCommand::decideChoking() { if(e->torrentMan->downloadComplete()) { if(peer->amChocking && peer->peerInterested) { PendingMessage pendingMessage(PeerMessage::UNCHOKE, peerConnection); - pendingMessages.push_back(pendingMessage); + sendMessageQueue->addPendingMessage(pendingMessage); } return; } if(peer->shouldChoke()) { if(!peer->amChocking) { PendingMessage pendingMessage(PeerMessage::CHOKE, peerConnection); - pendingMessages.push_back(pendingMessage); + sendMessageQueue->addPendingMessage(pendingMessage); } } else if(peer->amChocking && peer->peerInterested) { PendingMessage pendingMessage(PeerMessage::UNCHOKE, peerConnection); - pendingMessages.push_back(pendingMessage); + sendMessageQueue->addPendingMessage(pendingMessage); } else if(!peer->peerInterested) { PendingMessage pendingMessage(PeerMessage::CHOKE, peerConnection); - pendingMessages.push_back(pendingMessage); + sendMessageQueue->addPendingMessage(pendingMessage); } } @@ -183,10 +211,15 @@ void PeerInteractionCommand::receiveMessage() { case PeerMessage::KEEP_ALIVE: break; case PeerMessage::CHOKE: + if(!peer->peerChoking) { + chokeUnchokeCount++; + } peer->peerChoking = true; - requestSlotMan->deleteAllRequestSlot(piece); break; case PeerMessage::UNCHOKE: + if(peer->peerChoking) { + chokeUnchokeCount++; + } peer->peerChoking = false; break; case PeerMessage::INTERESTED: @@ -196,6 +229,7 @@ void PeerInteractionCommand::receiveMessage() { peer->peerInterested = false; break; case PeerMessage::HAVE: + haveCount++; peer->updateBitfield(message->getIndex(), 1); break; case PeerMessage::BITFIELD: @@ -209,16 +243,16 @@ void PeerInteractionCommand::receiveMessage() { message->getLength(), e->torrentMan->pieceLength, peerConnection); - pendingMessages.push_back(pendingMessage); + sendMessageQueue->addPendingMessage(pendingMessage); e->torrentMan->addUploadedSize(message->getLength()); e->torrentMan->addDeltaUpload(message->getLength()); } break; case PeerMessage::CANCEL: - deletePendingMessage(message); + sendMessageQueue->deletePendingPieceMessage(message); break; case PeerMessage::PIECE: { - RequestSlot slot = requestSlotMan->getCorrespoindingRequestSlot(message); + RequestSlot slot = sendMessageQueue->getCorrespoindingRequestSlot(message); peer->addPeerUpload(message->getBlockLength()); if(!Piece::isNull(piece) && !RequestSlot::isNull(slot)) { long long int offset = @@ -229,7 +263,7 @@ void PeerInteractionCommand::receiveMessage() { message->getBlockLength(), offset); piece.completeBlock(slot.getBlockIndex()); - requestSlotMan->deleteRequestSlot(slot); + sendMessageQueue->deleteRequestSlot(slot); e->torrentMan->updatePiece(piece); e->logger->debug("CUID#%d - setting piece bit index=%d", cuid, slot.getBlockIndex()); @@ -252,27 +286,6 @@ void PeerInteractionCommand::receiveMessage() { } } -void PeerInteractionCommand::deletePendingMessage(PeerMessage* cancelMessage) { - for(PendingMessages::iterator itr = pendingMessages.begin(); - itr != pendingMessages.end();) { - PendingMessage& pendingMessage = *itr; - if(pendingMessage.getPeerMessageId() == PeerMessage::PIECE && - pendingMessage.getIndex() == cancelMessage->getIndex() && - pendingMessage.getBegin() == cancelMessage->getBegin() && - pendingMessage.getLength() == cancelMessage->getLength() && - !pendingMessage.isInProgress()) { - e->logger->debug("CUID#%d - deleting pending piece message because cancel message received. index=%d, begin=%d, length=%d", - cuid, - pendingMessage.getIndex(), - pendingMessage.getBegin(), - pendingMessage.getLength()); - itr = pendingMessages.erase(itr); - } else { - itr++; - } - } -} - void PeerInteractionCommand::onGotNewPiece() { e->logger->info(MSG_GOT_NEW_PIECE, cuid, piece.getIndex()); e->torrentMan->completePiece(piece); @@ -305,15 +318,16 @@ bool PeerInteractionCommand::prepareForRetry(int wait) { } Piece PeerInteractionCommand::getNewPieceAndSendInterest() { + sendMessageQueue->cancelAllRequest(); Piece piece = e->torrentMan->getMissingPiece(peer); if(Piece::isNull(piece)) { e->logger->debug("CUID#%d - try to send not-interested", cuid); PendingMessage pendingMessage(PeerMessage::NOT_INTERESTED, peerConnection); - pendingMessages.push_back(pendingMessage); + sendMessageQueue->addPendingMessage(pendingMessage); } else { e->logger->debug("CUID#%d - try to send interested", cuid); PendingMessage pendingMessage(PeerMessage::INTERESTED, peerConnection); - pendingMessages.push_back(pendingMessage); + sendMessageQueue->addPendingMessage(pendingMessage); } return piece; } @@ -323,8 +337,7 @@ void PeerInteractionCommand::sendInterest() { // retrive new piece from TorrentMan piece = getNewPieceAndSendInterest(); } else if(peer->peerChoking) { - // TODO separate method is better - requestSlotMan->deleteAllRequestSlot(piece); + sendMessageQueue->cancelAllRequest(piece); e->torrentMan->cancelPiece(piece); piece = Piece::nullPiece; } else if(piece.pieceComplete()) { @@ -334,35 +347,26 @@ void PeerInteractionCommand::sendInterest() { void PeerInteractionCommand::createRequestPendingMessage(int blockIndex) { PendingMessage pendingMessage = - PendingMessage::createRequestMessage(piece.getIndex(), - blockIndex*piece.getBlockLength(), - piece.getBlockLength(blockIndex), - peerConnection); - pendingMessages.push_back(pendingMessage); - RequestSlot requestSlot(piece.getIndex(), - blockIndex*piece.getBlockLength(), - piece.getBlockLength(blockIndex), - blockIndex); - requestSlotMan->addRequestSlot(requestSlot); + PendingMessage::createRequestMessage(piece, blockIndex, peerConnection); + sendMessageQueue->addPendingMessage(pendingMessage); } void PeerInteractionCommand::sendMessages() { if(!Piece::isNull(piece) && !peer->peerChoking) { if(e->torrentMan->isEndGame()) { BlockIndexes missingBlockIndexes = piece.getAllMissingBlockIndexes(); - if(requestSlotMan->isEmpty()) { + if(sendMessageQueue->countRequestSlot() == 0) { + random_shuffle(missingBlockIndexes.begin(), missingBlockIndexes.end()); + int count = 0; for(PieceIndexes::const_iterator itr = missingBlockIndexes.begin(); - itr != missingBlockIndexes.end(); itr++) { + itr != missingBlockIndexes.end() && count < 6; itr++, count++) { createRequestPendingMessage(*itr); } } } else { - for(int i = requestSlotMan->countRequestSlot(); i <= 5; i++) { + for(int i = sendMessageQueue->countRequestSlot(); i < 6; i++) { int blockIndex = piece.getMissingUnusedBlockIndex(); if(blockIndex == -1) { - if(requestSlotMan->isEmpty()) { - piece = Piece::nullPiece; - } break; } e->torrentMan->updatePiece(piece); @@ -371,18 +375,11 @@ void PeerInteractionCommand::sendMessages() { } } - for(PendingMessages::iterator itr = pendingMessages.begin(); itr != pendingMessages.end();) { - if(itr->processMessage()) { - itr = pendingMessages.erase(itr); - } else { - //setWriteCheckSocket(socket); - break; - } - } + sendMessageQueue->send(); } void PeerInteractionCommand::onAbort(Exception* ex) { - requestSlotMan->deleteAllRequestSlot(piece); + sendMessageQueue->cancelAllRequest(piece); e->torrentMan->cancelPiece(piece); PeerAbstractCommand::onAbort(ex); } @@ -394,7 +391,7 @@ void PeerInteractionCommand::keepAlive() { struct timeval now; gettimeofday(&now, NULL); if(Util::difftv(now, keepAliveCheckPoint) >= (long long int)120*1000000) { - if(pendingMessages.empty()) { + if(sendMessageQueue->countPendingMessage() == 0) { peerConnection->sendKeepAlive(); } keepAliveCheckPoint = now; @@ -405,20 +402,22 @@ void PeerInteractionCommand::keepAlive() { void PeerInteractionCommand::beforeSocketCheck() { if(sequence == WIRED) { e->torrentMan->unadvertisePiece(cuid); + detectMessageFlooding(); + checkLongTimePeerChoking(); PieceIndexes indexes = e->torrentMan->getAdvertisedPieceIndexes(cuid); if(indexes.size() >= 20) { PendingMessage pendingMessage(PeerMessage::BITFIELD, peerConnection); - pendingMessages.push_back(pendingMessage); + sendMessageQueue->addPendingMessage(pendingMessage); } else { - if(pendingMessages.size() == 0) { + if(sendMessageQueue->countPendingMessage() == 0) { for(PieceIndexes::iterator itr = indexes.begin(); itr != indexes.end(); itr++) { peerConnection->sendHave(*itr); } } else { for(PieceIndexes::iterator itr = indexes.begin(); itr != indexes.end(); itr++) { PendingMessage pendingMessage = PendingMessage::createHaveMessage(*itr, peerConnection); - pendingMessages.push_back(pendingMessage); + sendMessageQueue->addPendingMessage(pendingMessage); } } } diff --git a/src/PeerInteractionCommand.h b/src/PeerInteractionCommand.h index fc0cd636..48c5246d 100644 --- a/src/PeerInteractionCommand.h +++ b/src/PeerInteractionCommand.h @@ -24,8 +24,7 @@ #include "PeerAbstractCommand.h" #include "PeerConnection.h" -#include "PendingMessage.h" -#include "RequestSlotMan.h" +#include "SendMessageQueue.h" using namespace std; @@ -33,12 +32,15 @@ class PeerInteractionCommand : public PeerAbstractCommand { private: int sequence; PeerConnection* peerConnection; - RequestSlotMan* requestSlotMan; - PendingMessages pendingMessages; + SendMessageQueue* sendMessageQueue; Piece piece; struct timeval keepAliveCheckPoint; struct timeval chokeCheckPoint; + struct timeval freqCheckPoint; + int chokeUnchokeCount; + int haveCount; void receiveMessage(); + void detectMessageFlooding(); void checkLongTimePeerChoking(); void syncPiece(); void detectTimeoutAndDuplicateBlock(); @@ -46,10 +48,6 @@ private: void sendInterest(); void sendMessages(); void createRequestPendingMessage(int blockIndex); - void deletePendingMessage(PeerMessage* cancelMessage); - const RequestSlot& getRequestSlot(int index, int begin, int length) const; - bool deleteRequestSlot(const RequestSlot& slot); - void deleteAllRequestSlot(); bool checkPieceHash(const Piece& piece); void erasePieceOnDisk(const Piece& piece); void keepAlive(); diff --git a/src/PeerListenCommand.cc b/src/PeerListenCommand.cc index 81021e48..79ee2a87 100644 --- a/src/PeerListenCommand.cc +++ b/src/PeerListenCommand.cc @@ -59,9 +59,9 @@ bool PeerListenCommand::execute() { Socket* peerSocket = NULL; try { peerSocket = socket->acceptConnection(); + pair peerInfo; + peerSocket->getPeerInfo(peerInfo); if(e->torrentMan->connections < MAX_PEERS) { - pair peerInfo; - peerSocket->getPeerInfo(peerInfo); Peer* peer = new Peer(peerInfo.first, peerInfo.second, e->torrentMan->pieceLength, e->torrentMan->totalSize); diff --git a/src/PendingMessage.cc b/src/PendingMessage.cc index a88027e8..ba1c5ab9 100644 --- a/src/PendingMessage.cc +++ b/src/PendingMessage.cc @@ -72,7 +72,9 @@ bool PendingMessage::processMessage() { } break; case PeerMessage::REQUEST: - peerConnection->sendRequest(index, begin, length); + if(!peerConnection->getPeer()->peerChoking) { + peerConnection->sendRequest(index, begin, length); + } break; case PeerMessage::CANCEL: peerConnection->sendCancel(index, begin, length); @@ -83,11 +85,14 @@ bool PendingMessage::processMessage() { return retval; } -PendingMessage PendingMessage::createRequestMessage(int index, int begin, int length, PeerConnection* peerConnection) { +PendingMessage PendingMessage::createRequestMessage(const Piece& piece, + int blockIndex, + PeerConnection* peerConnection) { PendingMessage pendingMessage(PeerMessage::REQUEST, peerConnection); - pendingMessage.setIndex(index); - pendingMessage.setBegin(begin); - pendingMessage.setLength(length); + pendingMessage.setIndex(piece.getIndex()); + pendingMessage.setBegin(blockIndex*piece.getBlockLength()); + pendingMessage.setLength(piece.getBlockLength(blockIndex)); + pendingMessage.setBlockIndex(blockIndex); return pendingMessage; } diff --git a/src/PendingMessage.h b/src/PendingMessage.h index 3b542282..7909fb84 100644 --- a/src/PendingMessage.h +++ b/src/PendingMessage.h @@ -32,6 +32,7 @@ private: int index; int begin; int length; + int blockIndex; long long int pieceDataOffset; int leftPieceDataLength; bool inProgress; @@ -58,9 +59,11 @@ public: int getBegin() const { return begin; } void setLength(int length) { this->length = length; } int getLength() const { return length; } + void setBlockIndex(int blockIndex) { this->blockIndex = blockIndex; } + int getBlockIndex() const { return blockIndex; } bool processMessage(); - static PendingMessage createRequestMessage(int index, int begin, int length, PeerConnection* peerConnection); + static PendingMessage createRequestMessage(const Piece& piece, int blockIndex, PeerConnection* peerConnection); static PendingMessage createCancelMessage(int index, int begin, int length, PeerConnection* peerConnection); static PendingMessage createPieceMessage(int index, int begin, int length, int pieceLength, PeerConnection* peerConnection); static PendingMessage createHaveMessage(int index, PeerConnection* peerConnectioin); diff --git a/src/RequestSlotMan.cc b/src/RequestSlotMan.cc index e6d24cd6..42324867 100644 --- a/src/RequestSlotMan.cc +++ b/src/RequestSlotMan.cc @@ -48,7 +48,7 @@ void RequestSlotMan::deleteAllRequestSlot(Piece& piece) { requestSlots.clear(); } -void RequestSlotMan::deleteTimedoutRequestSlot(Piece& piece) { +void RequestSlotMan::deleteTimeoutRequestSlot(Piece& piece) { for(RequestSlots::iterator itr = requestSlots.begin(); itr != requestSlots.end();) { if(itr->isTimeout(timeout)) { diff --git a/src/RequestSlotMan.h b/src/RequestSlotMan.h index 2269651d..9815d7c6 100644 --- a/src/RequestSlotMan.h +++ b/src/RequestSlotMan.h @@ -58,7 +58,7 @@ public: void deleteRequestSlot(const RequestSlot& requestSlot); void deleteAllRequestSlot(Piece& piece); - void deleteTimedoutRequestSlot(Piece& piece); + void deleteTimeoutRequestSlot(Piece& piece); void deleteCompletedRequestSlot(const Piece& piece); RequestSlot getCorrespoindingRequestSlot(const PeerMessage* pieceMessage) const; diff --git a/src/SendMessageQueue.cc b/src/SendMessageQueue.cc new file mode 100644 index 00000000..e03c9571 --- /dev/null +++ b/src/SendMessageQueue.cc @@ -0,0 +1,122 @@ +/* */ +#include "SendMessageQueue.h" + +SendMessageQueue::SendMessageQueue(int cuid, PeerConnection* peerConnection, + TorrentMan* torrentMan, + const Logger* logger) + :cuid(cuid), logger(logger) { + requestSlotMan = new RequestSlotMan(cuid, &pendingMessages, peerConnection, + torrentMan, logger); +} + +SendMessageQueue::~SendMessageQueue() { + delete requestSlotMan; +} + +void SendMessageQueue::send() { + for(PendingMessages::iterator itr = pendingMessages.begin(); + itr != pendingMessages.end();) { + if(itr->processMessage()) { + itr = pendingMessages.erase(itr); + } else { + break; + } + } +} + +void SendMessageQueue::addPendingMessage(const PendingMessage& pendingMessage) { + pendingMessages.push_back(pendingMessage); + if(pendingMessage.getPeerMessageId() == PeerMessage::REQUEST) { + RequestSlot requestSlot(pendingMessage.getIndex(), + pendingMessage.getBegin(), + pendingMessage.getLength(), + pendingMessage.getBlockIndex()); + requestSlotMan->addRequestSlot(requestSlot); + } +} + +void SendMessageQueue::deletePendingPieceMessage(const PeerMessage* cancelMessage) { + for(PendingMessages::iterator itr = pendingMessages.begin(); + itr != pendingMessages.end();) { + PendingMessage& pendingMessage = *itr; + if(pendingMessage.getPeerMessageId() == PeerMessage::PIECE && + pendingMessage.getIndex() == cancelMessage->getIndex() && + pendingMessage.getBegin() == cancelMessage->getBegin() && + pendingMessage.getLength() == cancelMessage->getLength() && + !pendingMessage.isInProgress()) { + logger->debug("CUID#%d - deleting pending piece message because cancel message received. index=%d, begin=%d, length=%d", + cuid, + pendingMessage.getIndex(), + pendingMessage.getBegin(), + pendingMessage.getLength()); + itr = pendingMessages.erase(itr); + } else { + itr++; + } + } +} + +void SendMessageQueue::deletePendingRequestMessage() { + for(PendingMessages::iterator itr = pendingMessages.begin(); + itr != pendingMessages.end();) { + PendingMessage& pendingMessage = *itr; + if(pendingMessage.getPeerMessageId() == PeerMessage::REQUEST) { + itr = pendingMessages.erase(itr); + } else { + itr++; + } + } +} + +void SendMessageQueue::deleteRequestSlot(const RequestSlot& requestSlot) { + requestSlotMan->deleteRequestSlot(requestSlot); +} + +void SendMessageQueue::deleteTimeoutRequestSlot(Piece& piece) { + requestSlotMan->deleteTimeoutRequestSlot(piece); +} + +void SendMessageQueue::deleteCompletedRequestSlot(const Piece& piece) { + requestSlotMan->deleteCompletedRequestSlot(piece); +} + +RequestSlot SendMessageQueue::getCorrespoindingRequestSlot(const PeerMessage* pieceMessage) const { + return requestSlotMan->getCorrespoindingRequestSlot(pieceMessage); +} + +void SendMessageQueue::cancelAllRequest() { + cancelAllRequest(Piece::nullPiece); +} + +void SendMessageQueue::cancelAllRequest(Piece& piece) { + deletePendingRequestMessage(); + requestSlotMan->deleteAllRequestSlot(Piece::nullPiece); +} + +int SendMessageQueue::countPendingMessage() const { + return pendingMessages.size(); +} + +int SendMessageQueue::countRequestSlot() const { + return requestSlotMan->countRequestSlot(); +} diff --git a/src/SendMessageQueue.h b/src/SendMessageQueue.h new file mode 100644 index 00000000..0d0c58c7 --- /dev/null +++ b/src/SendMessageQueue.h @@ -0,0 +1,57 @@ +/* */ +#ifndef _D_SEND_MESSAGE_QUEUE_H_ +#define _D_SEND_MESSAGE_QUEUE_H_ + +#include "common.h" +#include "RequestSlotMan.h" + +class SendMessageQueue { +private: + int cuid; + RequestSlotMan* requestSlotMan; + PendingMessages pendingMessages; + const Logger* logger; +public: + SendMessageQueue(int cuid, PeerConnection* peerConnection, + TorrentMan* torrentMan, const Logger* logger); + ~SendMessageQueue(); + + void send(); + + void addPendingMessage(const PendingMessage& pendingMessage); + void deletePendingPieceMessage(const PeerMessage* cancelMessage); + void deletePendingRequestMessage(); + + void deleteRequestSlot(const RequestSlot& requestSlot); + void deleteTimeoutRequestSlot(Piece& piece); + void deleteCompletedRequestSlot(const Piece& piece); + RequestSlot getCorrespoindingRequestSlot(const PeerMessage* pieceMessage) const; + + void cancelAllRequest(); + void cancelAllRequest(Piece& piece); + + int countPendingMessage() const; + int countRequestSlot() const; +}; + +#endif // _D_SEND_MESSAGE_QUEUE_H_ diff --git a/src/TorrentMan.cc b/src/TorrentMan.cc index bd336c81..1cd89d97 100644 --- a/src/TorrentMan.cc +++ b/src/TorrentMan.cc @@ -36,6 +36,7 @@ TorrentMan::TorrentMan():bitfield(NULL), peerEntryIdCounter(0), cuidCounter(0), downloadedSize(0), uploadedSize(0), + preDownloadedSize(0), preUploadedSize(0), deltaDownload(0), deltaUpload(0), storeDir("."), multiFileTopDir(NULL), @@ -74,12 +75,7 @@ bool TorrentMan::addPeer(Peer* peer, bool duplicate) { } } } else { - if(peers.size() >= MAX_PEER_LIST_SIZE) { - deleteOldErrorPeers(100); - if(peers.size() >= MAX_PEER_LIST_SIZE) { - return false; - } - } + deleteOldErrorPeers(MAX_PEER_LIST_SIZE); for(Peers::iterator itr = peers.begin(); itr != peers.end(); itr++) { Peer* p = *itr; if(p->ipaddr == peer->ipaddr && p->port == peer->port) { @@ -455,6 +451,8 @@ void TorrentMan::read(FILE* file) { if(fread(&uploadedSize, sizeof(uploadedSize), 1, file) < 1) { throw new DlAbortEx(strerror(errno)); } + preDownloadedSize = downloadedSize; + preUploadedSize = uploadedSize; delete [] savedBitfield; } catch(Exception* ex) { delete [] savedBitfield; diff --git a/src/TorrentMan.h b/src/TorrentMan.h index 4e91cca9..4e47525c 100644 --- a/src/TorrentMan.h +++ b/src/TorrentMan.h @@ -74,6 +74,8 @@ private: int cuidCounter; long long int downloadedSize; long long int uploadedSize; + long long int preDownloadedSize; + long long int preUploadedSize; int deltaDownload; int deltaUpload; int fileMode; @@ -186,6 +188,13 @@ public: long long int getUploadedSize() const { return uploadedSize; } void setUploadedSize(long long int size) { uploadedSize = size; } + long long int getSessionDownloadedSize() const { + return downloadedSize-preDownloadedSize; + } + long long int getSessionUploadedSize() const { + return uploadedSize-preUploadedSize; + } + void setFileMode(int mode) { fileMode = mode; } diff --git a/src/TrackerInitCommand.cc b/src/TrackerInitCommand.cc index 077f8268..374a1f66 100644 --- a/src/TrackerInitCommand.cc +++ b/src/TrackerInitCommand.cc @@ -57,8 +57,8 @@ bool TrackerInitCommand::execute() { "info_hash="+Util::urlencode(e->torrentMan->getInfoHash(), 20)+"&"+ "peer_id="+e->torrentMan->peerId+"&"+ "port="+Util::itos(e->torrentMan->getPort())+"&"+ - "uploaded="+Util::llitos(e->torrentMan->getUploadedSize())+"&"+ - "downloaded="+Util::llitos(e->torrentMan->getDownloadedSize())+"&"+ + "uploaded="+Util::llitos(e->torrentMan->getSessionUploadedSize())+"&"+ + "downloaded="+Util::llitos(e->torrentMan->getSessionDownloadedSize())+"&"+ "left="+(e->torrentMan->totalSize-e->torrentMan->getDownloadedSize() <= 0 ? "0" : Util::llitos(e->torrentMan->totalSize-e->torrentMan->getDownloadedSize()))+"&"+ "compact=1"; diff --git a/src/TrackerUpdateCommand.cc b/src/TrackerUpdateCommand.cc index 3fc9cda3..e9c33a0b 100644 --- a/src/TrackerUpdateCommand.cc +++ b/src/TrackerUpdateCommand.cc @@ -62,6 +62,9 @@ bool TrackerUpdateCommand::execute() { e->torrentMan->minInterval = minInterval->toInt(); e->logger->debug("CUID#%d - min interval:%d", cuid, e->torrentMan->minInterval); } + if(e->torrentMan->minInterval > e->torrentMan->interval) { + e->torrentMan->minInterval = e->torrentMan->interval; + } Data* complete = (Data*)response->get("complete"); if(complete != NULL) { e->torrentMan->complete = complete->toInt(); diff --git a/src/main.cc b/src/main.cc index fef22eb7..e3493809 100644 --- a/src/main.cc +++ b/src/main.cc @@ -242,6 +242,7 @@ int main(int argc, char* argv[]) { Option* op = new Option(); op->put(PREF_RETRY_WAIT, "5"); op->put(PREF_TIMEOUT, "60"); + op->put(PREF_PEER_CONNECTION_TIMEOUT, "30"); op->put(PREF_MIN_SEGMENT_SIZE, "1048576");// 1M op->put(PREF_MAX_TRIES, "5"); op->put(PREF_HTTP_PROXY_METHOD, V_TUNNEL); diff --git a/src/prefs.h b/src/prefs.h index 140661c9..bc6525c6 100644 --- a/src/prefs.h +++ b/src/prefs.h @@ -83,5 +83,10 @@ // values: true | false #define PREF_HTTP_PROXY_AUTH_ENABLED "http_proxy_auth_enabled" +/** + * BitTorrent related preferences + */ +// values: 1*digit +#define PREF_PEER_CONNECTION_TIMEOUT "peer_connection_timeout" #endif // _D_PREFS_H_