Added new class SendMessageQueue that includes PendingMessages

and
	RequestSlotMan.

	* src/SendMessageQueue.h: New class.
	* src/SendMessageQueue.cc: New class.
	* src/PendingMessage.h: Added new member variable blockIndex and 
its
	accessors.
	(createRequestMessage): Updated.
	* src/PendingMessage.cc (createRequestMessage): Updated.
	* src/PeerInteractionCommand.cc (executeInternal): Updated with
	SendMessageQueue.
	(checkLongTimePeerChoking): Updated with SendMessageQueue.
	(receiveMessage): Updated with SendMessageQueue.
	(deletePendingPieceMessage): Removed.
	(getNewPieceAndSendInterest): Updated with SendMessageQueue.
	(sendInterest): Updated with SendMessageQueue.
	(createRequestPendingMessage): Updated with SendMessageQueue.
	(sendMessages): Updated with SendMessageQueue.
	(onAbort): Updated with SendMessageQueue.
	(keepAlive): Updated with SendMessageQueue.
	(beforeSocketCheck): Updated SendMessageQueue.

	* src/PeerInteractionCommand (sendMessages): Shuffle
	missingBLockIndexes before using it.
	
	Added its own timeout for peer connection.
	
	* src/PeerAbstractCommand.h: Added member variable timeout and 
its
	setter.
	* src/prefs.h: Added PREF_PEER_CONNECTION_TIMEOUT.
	* src/PeerInteractionCommand.cc (PeerInteractionCommand):
	Added setTimeout() call.
	(executeInternal): Added setTimeout() call.
	* src/PeerAbstractCommand.cc (PeerAbstractCommand):
	Added timeout.
	(isTimeoutDetected): Updated.
	* src/main.cc (main): Added PREF_PEER_CONNECTION_TIMEOUT entry 
to
	option.
	
	Added *simple* message flooding checker.
	
	* src/PeerInteractionCommand.cc (executeInternal):
	Added detectMessageFlooding() call.
	(detectMessageFlooding): New function.
	(receiveMessage): Count up CHOKE, UNCHOKE, HAVE message.
	(beforeSocketCheck): Added detectMessageFlooding() call.
	* src/PeerInteractionCommand.h: Added sendMessageQueue,
	chokeUnchokeCount, haveCount, detectMessageFlooding().
	Removed deletePendingPieceMessage(), getRequestSlot(),
	deleteRequestSlot(), deleteAllRequestSlot().
	
	* src/PeerInteractionCommand.cc (beforeSocketCheck):
	Added checkLongTimePeerChoking() call.

	* src/RequestSlotMan.h: Renamed deleteTimeoutRequestSlot().

	* src/TorrentMan.cc (addPeer): Delete at most MAX_PEER_LIST_SIZE 
peers
	if duplicate == false.

	The parameter "uploaded" and "downloaded" in the tracker request 
are
	the size since the client sent the "started" event to the 
tracker.
	
	* src/TorrentMan.cc (setup): Assigned saved downloaded Size and
	uploaded size to preDownloadedSize, preUploadedSize 
respectively.
	* src/TorrentMan.h: Added preDownloadedSize, preUploadedSize,
	getSessionDownloadedSize(), getSessionUploadedSize().
	* src/TrackerInitCommand.cc (execute): Use 
getSessionDownloadedSize(),
	getSessionUploadedSize() instead of getDownloadedSize(),
	getUploadedSize().
	
	* src/PendingMessage.cc (processMessage): Do not send request 
message
	if the peer is choking the client.

	* src/TrackerUpdateCommand.cc (execute): Check wtheher 
minInterval is
	less than interval.
pull/1/head
Tatsuhiro Tsujikawa 2006-03-28 15:23:51 +00:00
parent 4892b40945
commit e1f24adc40
21 changed files with 392 additions and 109 deletions

View File

@ -1,3 +1,81 @@
2006-03-28 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>
Added new class SendMessageQueue that includes PendingMessages and
RequestSlotMan.
* src/SendMessageQueue.h: New class.
* src/SendMessageQueue.cc: New class.
* src/PendingMessage.h: Added new member variable blockIndex and its
accessors.
(createRequestMessage): Updated.
* src/PendingMessage.cc (createRequestMessage): Updated.
* src/PeerInteractionCommand.cc (executeInternal): Updated with
SendMessageQueue.
(checkLongTimePeerChoking): Updated with SendMessageQueue.
(receiveMessage): Updated with SendMessageQueue.
(deletePendingPieceMessage): Removed.
(getNewPieceAndSendInterest): Updated with SendMessageQueue.
(sendInterest): Updated with SendMessageQueue.
(createRequestPendingMessage): Updated with SendMessageQueue.
(sendMessages): Updated with SendMessageQueue.
(onAbort): Updated with SendMessageQueue.
(keepAlive): Updated with SendMessageQueue.
(beforeSocketCheck): Updated SendMessageQueue.
* src/PeerInteractionCommand (sendMessages): Shuffle
missingBLockIndexes before using it.
Added its own timeout for peer connection.
* src/PeerAbstractCommand.h: Added member variable timeout and its
setter.
* src/prefs.h: Added PREF_PEER_CONNECTION_TIMEOUT.
* src/PeerInteractionCommand.cc (PeerInteractionCommand):
Added setTimeout() call.
(executeInternal): Added setTimeout() call.
* src/PeerAbstractCommand.cc (PeerAbstractCommand):
Added timeout.
(isTimeoutDetected): Updated.
* src/main.cc (main): Added PREF_PEER_CONNECTION_TIMEOUT entry to
option.
Added *simple* message flooding checker.
* src/PeerInteractionCommand.cc (executeInternal):
Added detectMessageFlooding() call.
(detectMessageFlooding): New function.
(receiveMessage): Count up CHOKE, UNCHOKE, HAVE message.
(beforeSocketCheck): Added detectMessageFlooding() call.
* src/PeerInteractionCommand.h: Added sendMessageQueue,
chokeUnchokeCount, haveCount, detectMessageFlooding().
Removed deletePendingPieceMessage(), getRequestSlot(),
deleteRequestSlot(), deleteAllRequestSlot().
* src/PeerInteractionCommand.cc (beforeSocketCheck):
Added checkLongTimePeerChoking() call.
* src/RequestSlotMan.h: Renamed deleteTimeoutRequestSlot().
* src/TorrentMan.cc (addPeer): Delete at most MAX_PEER_LIST_SIZE peers
if duplicate == false.
The parameter "uploaded" and "downloaded" in the tracker request are
the size since the client sent the "started" event to the tracker.
* src/TorrentMan.cc (setup): Assigned saved downloaded Size and
uploaded size to preDownloadedSize, preUploadedSize respectively.
* src/TorrentMan.h: Added preDownloadedSize, preUploadedSize,
getSessionDownloadedSize(), getSessionUploadedSize().
* src/TrackerInitCommand.cc (execute): Use getSessionDownloadedSize(),
getSessionUploadedSize() instead of getDownloadedSize(),
getUploadedSize().
* src/PendingMessage.cc (processMessage): Do not send request message
if the peer is choking the client.
* src/TrackerUpdateCommand.cc (execute): Check wtheher minInterval is
less than interval.
2006-03-27 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>
* configure.in: Added gnutls support. Added several CPP macros.
@ -28,24 +106,20 @@
* src/InitiateConnectionCommandFactory.cc: Replaced HAVE_LIBSSL with
ENABLE_SSL.
* src/Request.cc: Replaced HAVE_LIBSSL with ENABLE_SSL.
* src/RequestSlotMan.cc:
(deleteCompletedRequestSlot)
* src/RequestSlotMan.cc (deleteCompletedRequestSlot):
If a piece is already acquired by another command, delete the request
slots for the piece.
* src/TrackerUpdateCommand.cc:
(execute)
* src/TrackerUpdateCommand.cc (execute):
Changed log level of MSG_TRACKER_WARNING_MESSAGE from info to warn.
Added a check whether peer list is null.
Fixed the bug that causes sending completed event to the tracker
several times.
* src/TrackerInitCommand.cc:
(execute)
* src/TrackerInitCommand.cc (execute):
Fixed the bug that causes sending completed event to the tracker
several times.
* src/AbstractDiskWriter.{h,cc}: Removed direct dependency on OpenSSL
by using messageDigest.h.
2006-03-26 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>
* PeerConnection.cc: Replaced log message "keep-alive" with

6
TODO
View File

@ -9,4 +9,8 @@
* Tracker UDP protocol
* no-compact peers format
* Add port range command-line option
* Add max peers command-line option
* Add max peers command-line option
* Distinguish seeder from leecher
* time out for connecting peers
* ignore incoming connection from localhost.
* do not connect to localhost

View File

@ -77,7 +77,8 @@ SRCS = Socket.cc Socket.h\
TorrentAutoSaveCommand.cc TorrentAutoSaveCommand.h\
Directory.cc Directory.h\
TrackerWatcherCommand.cc TrackerWatcherCommand.h\
messageDigest.h
messageDigest.h\
SendMessageQueue.cc SendMessageQueue.h
noinst_LIBRARIES = libaria2c.a
libaria2c_a_SOURCES = $(SRCS)
aria2c_LDADD = libaria2c.a @LIBINTL@ @ALLOCA@ @LIBGNUTLS_LIBS@\

View File

@ -97,7 +97,8 @@ am__objects_1 = Socket.$(OBJEXT) SocketCore.$(OBJEXT) \
PeerListenCommand.$(OBJEXT) PendingMessage.$(OBJEXT) \
PeerMessage.$(OBJEXT) Piece.$(OBJEXT) RequestSlot.$(OBJEXT) \
RequestSlotMan.$(OBJEXT) TorrentAutoSaveCommand.$(OBJEXT) \
Directory.$(OBJEXT) TrackerWatcherCommand.$(OBJEXT)
Directory.$(OBJEXT) TrackerWatcherCommand.$(OBJEXT) \
SendMessageQueue.$(OBJEXT)
am_libaria2c_a_OBJECTS = $(am__objects_1)
libaria2c_a_OBJECTS = $(am_libaria2c_a_OBJECTS)
am__installdirs = "$(DESTDIR)$(bindir)"
@ -324,7 +325,8 @@ SRCS = Socket.cc Socket.h\
TorrentAutoSaveCommand.cc TorrentAutoSaveCommand.h\
Directory.cc Directory.h\
TrackerWatcherCommand.cc TrackerWatcherCommand.h\
messageDigest.h
messageDigest.h\
SendMessageQueue.cc SendMessageQueue.h
noinst_LIBRARIES = libaria2c.a
libaria2c_a_SOURCES = $(SRCS)
@ -458,6 +460,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/RequestSlotMan.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SegmentMan.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SegmentSplitter.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SendMessageQueue.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ShaVisitor.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SimpleLogger.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SleepCommand.Po@am__quote@

View File

@ -38,7 +38,7 @@ PeerAbstractCommand::PeerAbstractCommand(int cuid, Peer* peer, TorrentDownloadEn
}
this->checkPoint.tv_sec = 0;
this->checkPoint.tv_usec = 0;
timeout = e->option->getAsInt(PREF_TIMEOUT);
e->torrentMan->connections++;
}
@ -63,7 +63,7 @@ bool PeerAbstractCommand::isTimeoutDetected() {
return false;
} else {
long long int elapsed = Util::difftv(now, checkPoint);
if(elapsed >= e->option->getAsLLInt(PREF_TIMEOUT)*1000000) {
if(elapsed >= ((long long int)timeout)*1000000) {
return true;
} else {
return false;

View File

@ -32,11 +32,12 @@ private:
void updateCheckPoint();
bool isTimeoutDetected();
struct timeval checkPoint;
int timeout;
protected:
TorrentDownloadEngine* e;
Socket* socket;
Peer* peer;
void setTimeout(int timeout) { this->timeout = timeout; }
virtual bool prepareForNextPeer(int wait);
virtual bool prepareForRetry(int wait);
virtual void onAbort(Exception* ex);

View File

@ -25,6 +25,8 @@
#include "DlAbortEx.h"
#include "Util.h"
#include "message.h"
#include "prefs.h"
#include <algorithm>
PeerInteractionCommand::PeerInteractionCommand(int cuid, Peer* peer,
TorrentDownloadEngine* e,
@ -33,21 +35,26 @@ PeerInteractionCommand::PeerInteractionCommand(int cuid, Peer* peer,
if(sequence == INITIATOR_SEND_HANDSHAKE) {
setReadCheckSocket(NULL);
setWriteCheckSocket(socket);
setTimeout(e->option->getAsInt(PREF_PEER_CONNECTION_TIMEOUT));
}
peerConnection = new PeerConnection(cuid, socket, e->option, e->logger,
peer, e->torrentMan);
requestSlotMan = new RequestSlotMan(cuid, &pendingMessages, peerConnection,
e->torrentMan, e->logger);
sendMessageQueue = new SendMessageQueue(cuid, peerConnection, e->torrentMan,
e->logger);
piece = Piece::nullPiece;
keepAliveCheckPoint.tv_sec = 0;
keepAliveCheckPoint.tv_usec = 0;
chokeCheckPoint.tv_sec = 0;
chokeCheckPoint.tv_usec = 0;
freqCheckPoint.tv_sec = 0;
freqCheckPoint.tv_usec = 0;
chokeUnchokeCount = 0;
haveCount = 0;
}
PeerInteractionCommand::~PeerInteractionCommand() {
delete peerConnection;
delete requestSlotMan;
delete sendMessageQueue;
e->torrentMan->unadvertisePiece(cuid);
}
@ -55,6 +62,7 @@ bool PeerInteractionCommand::executeInternal() {
if(sequence == INITIATOR_SEND_HANDSHAKE) {
socket->setBlockingMode();
setReadCheckSocket(socket);
setTimeout(e->option->getAsInt(PREF_TIMEOUT));
}
setWriteCheckSocket(NULL);
@ -97,6 +105,7 @@ bool PeerInteractionCommand::executeInternal() {
break;
}
case WIRED:
detectMessageFlooding();
checkLongTimePeerChoking();
syncPiece();
decideChoking();
@ -106,19 +115,38 @@ bool PeerInteractionCommand::executeInternal() {
}
receiveMessage();
}
requestSlotMan->deleteTimedoutRequestSlot(piece);
requestSlotMan->deleteCompletedRequestSlot(piece);
sendMessageQueue->deleteTimeoutRequestSlot(piece);
sendMessageQueue->deleteCompletedRequestSlot(piece);
sendInterest();
sendMessages();
break;
}
if(pendingMessages.size() > 0) {
if(sendMessageQueue->countPendingMessage() > 0) {
setWriteCheckSocket(socket);
}
e->commands.push(this);
return false;
}
void PeerInteractionCommand::detectMessageFlooding() {
struct timeval now;
gettimeofday(&now, NULL);
if(freqCheckPoint.tv_sec == 0 && freqCheckPoint.tv_usec == 0) {
freqCheckPoint = now;
} else {
if(Util::difftv(now, freqCheckPoint) >= 5*1000000) {
if(chokeUnchokeCount*1.0/(Util::difftv(now, freqCheckPoint)/1000000) >= 0.3
|| haveCount*1.0/(Util::difftv(now, freqCheckPoint)/1000000) >= 20.0) {
throw new DlAbortEx("flooding detected.");
} else {
chokeUnchokeCount = 0;
haveCount = 0;
freqCheckPoint = now;
}
}
}
}
void PeerInteractionCommand::checkLongTimePeerChoking() {
if(e->torrentMan->downloadComplete()) {
return;
@ -152,21 +180,21 @@ void PeerInteractionCommand::decideChoking() {
if(e->torrentMan->downloadComplete()) {
if(peer->amChocking && peer->peerInterested) {
PendingMessage pendingMessage(PeerMessage::UNCHOKE, peerConnection);
pendingMessages.push_back(pendingMessage);
sendMessageQueue->addPendingMessage(pendingMessage);
}
return;
}
if(peer->shouldChoke()) {
if(!peer->amChocking) {
PendingMessage pendingMessage(PeerMessage::CHOKE, peerConnection);
pendingMessages.push_back(pendingMessage);
sendMessageQueue->addPendingMessage(pendingMessage);
}
} else if(peer->amChocking && peer->peerInterested) {
PendingMessage pendingMessage(PeerMessage::UNCHOKE, peerConnection);
pendingMessages.push_back(pendingMessage);
sendMessageQueue->addPendingMessage(pendingMessage);
} else if(!peer->peerInterested) {
PendingMessage pendingMessage(PeerMessage::CHOKE, peerConnection);
pendingMessages.push_back(pendingMessage);
sendMessageQueue->addPendingMessage(pendingMessage);
}
}
@ -183,10 +211,15 @@ void PeerInteractionCommand::receiveMessage() {
case PeerMessage::KEEP_ALIVE:
break;
case PeerMessage::CHOKE:
if(!peer->peerChoking) {
chokeUnchokeCount++;
}
peer->peerChoking = true;
requestSlotMan->deleteAllRequestSlot(piece);
break;
case PeerMessage::UNCHOKE:
if(peer->peerChoking) {
chokeUnchokeCount++;
}
peer->peerChoking = false;
break;
case PeerMessage::INTERESTED:
@ -196,6 +229,7 @@ void PeerInteractionCommand::receiveMessage() {
peer->peerInterested = false;
break;
case PeerMessage::HAVE:
haveCount++;
peer->updateBitfield(message->getIndex(), 1);
break;
case PeerMessage::BITFIELD:
@ -209,16 +243,16 @@ void PeerInteractionCommand::receiveMessage() {
message->getLength(),
e->torrentMan->pieceLength,
peerConnection);
pendingMessages.push_back(pendingMessage);
sendMessageQueue->addPendingMessage(pendingMessage);
e->torrentMan->addUploadedSize(message->getLength());
e->torrentMan->addDeltaUpload(message->getLength());
}
break;
case PeerMessage::CANCEL:
deletePendingMessage(message);
sendMessageQueue->deletePendingPieceMessage(message);
break;
case PeerMessage::PIECE: {
RequestSlot slot = requestSlotMan->getCorrespoindingRequestSlot(message);
RequestSlot slot = sendMessageQueue->getCorrespoindingRequestSlot(message);
peer->addPeerUpload(message->getBlockLength());
if(!Piece::isNull(piece) && !RequestSlot::isNull(slot)) {
long long int offset =
@ -229,7 +263,7 @@ void PeerInteractionCommand::receiveMessage() {
message->getBlockLength(),
offset);
piece.completeBlock(slot.getBlockIndex());
requestSlotMan->deleteRequestSlot(slot);
sendMessageQueue->deleteRequestSlot(slot);
e->torrentMan->updatePiece(piece);
e->logger->debug("CUID#%d - setting piece bit index=%d", cuid,
slot.getBlockIndex());
@ -252,27 +286,6 @@ void PeerInteractionCommand::receiveMessage() {
}
}
void PeerInteractionCommand::deletePendingMessage(PeerMessage* cancelMessage) {
for(PendingMessages::iterator itr = pendingMessages.begin();
itr != pendingMessages.end();) {
PendingMessage& pendingMessage = *itr;
if(pendingMessage.getPeerMessageId() == PeerMessage::PIECE &&
pendingMessage.getIndex() == cancelMessage->getIndex() &&
pendingMessage.getBegin() == cancelMessage->getBegin() &&
pendingMessage.getLength() == cancelMessage->getLength() &&
!pendingMessage.isInProgress()) {
e->logger->debug("CUID#%d - deleting pending piece message because cancel message received. index=%d, begin=%d, length=%d",
cuid,
pendingMessage.getIndex(),
pendingMessage.getBegin(),
pendingMessage.getLength());
itr = pendingMessages.erase(itr);
} else {
itr++;
}
}
}
void PeerInteractionCommand::onGotNewPiece() {
e->logger->info(MSG_GOT_NEW_PIECE, cuid, piece.getIndex());
e->torrentMan->completePiece(piece);
@ -305,15 +318,16 @@ bool PeerInteractionCommand::prepareForRetry(int wait) {
}
Piece PeerInteractionCommand::getNewPieceAndSendInterest() {
sendMessageQueue->cancelAllRequest();
Piece piece = e->torrentMan->getMissingPiece(peer);
if(Piece::isNull(piece)) {
e->logger->debug("CUID#%d - try to send not-interested", cuid);
PendingMessage pendingMessage(PeerMessage::NOT_INTERESTED, peerConnection);
pendingMessages.push_back(pendingMessage);
sendMessageQueue->addPendingMessage(pendingMessage);
} else {
e->logger->debug("CUID#%d - try to send interested", cuid);
PendingMessage pendingMessage(PeerMessage::INTERESTED, peerConnection);
pendingMessages.push_back(pendingMessage);
sendMessageQueue->addPendingMessage(pendingMessage);
}
return piece;
}
@ -323,8 +337,7 @@ void PeerInteractionCommand::sendInterest() {
// retrive new piece from TorrentMan
piece = getNewPieceAndSendInterest();
} else if(peer->peerChoking) {
// TODO separate method is better
requestSlotMan->deleteAllRequestSlot(piece);
sendMessageQueue->cancelAllRequest(piece);
e->torrentMan->cancelPiece(piece);
piece = Piece::nullPiece;
} else if(piece.pieceComplete()) {
@ -334,35 +347,26 @@ void PeerInteractionCommand::sendInterest() {
void PeerInteractionCommand::createRequestPendingMessage(int blockIndex) {
PendingMessage pendingMessage =
PendingMessage::createRequestMessage(piece.getIndex(),
blockIndex*piece.getBlockLength(),
piece.getBlockLength(blockIndex),
peerConnection);
pendingMessages.push_back(pendingMessage);
RequestSlot requestSlot(piece.getIndex(),
blockIndex*piece.getBlockLength(),
piece.getBlockLength(blockIndex),
blockIndex);
requestSlotMan->addRequestSlot(requestSlot);
PendingMessage::createRequestMessage(piece, blockIndex, peerConnection);
sendMessageQueue->addPendingMessage(pendingMessage);
}
void PeerInteractionCommand::sendMessages() {
if(!Piece::isNull(piece) && !peer->peerChoking) {
if(e->torrentMan->isEndGame()) {
BlockIndexes missingBlockIndexes = piece.getAllMissingBlockIndexes();
if(requestSlotMan->isEmpty()) {
if(sendMessageQueue->countRequestSlot() == 0) {
random_shuffle(missingBlockIndexes.begin(), missingBlockIndexes.end());
int count = 0;
for(PieceIndexes::const_iterator itr = missingBlockIndexes.begin();
itr != missingBlockIndexes.end(); itr++) {
itr != missingBlockIndexes.end() && count < 6; itr++, count++) {
createRequestPendingMessage(*itr);
}
}
} else {
for(int i = requestSlotMan->countRequestSlot(); i <= 5; i++) {
for(int i = sendMessageQueue->countRequestSlot(); i < 6; i++) {
int blockIndex = piece.getMissingUnusedBlockIndex();
if(blockIndex == -1) {
if(requestSlotMan->isEmpty()) {
piece = Piece::nullPiece;
}
break;
}
e->torrentMan->updatePiece(piece);
@ -371,18 +375,11 @@ void PeerInteractionCommand::sendMessages() {
}
}
for(PendingMessages::iterator itr = pendingMessages.begin(); itr != pendingMessages.end();) {
if(itr->processMessage()) {
itr = pendingMessages.erase(itr);
} else {
//setWriteCheckSocket(socket);
break;
}
}
sendMessageQueue->send();
}
void PeerInteractionCommand::onAbort(Exception* ex) {
requestSlotMan->deleteAllRequestSlot(piece);
sendMessageQueue->cancelAllRequest(piece);
e->torrentMan->cancelPiece(piece);
PeerAbstractCommand::onAbort(ex);
}
@ -394,7 +391,7 @@ void PeerInteractionCommand::keepAlive() {
struct timeval now;
gettimeofday(&now, NULL);
if(Util::difftv(now, keepAliveCheckPoint) >= (long long int)120*1000000) {
if(pendingMessages.empty()) {
if(sendMessageQueue->countPendingMessage() == 0) {
peerConnection->sendKeepAlive();
}
keepAliveCheckPoint = now;
@ -405,20 +402,22 @@ void PeerInteractionCommand::keepAlive() {
void PeerInteractionCommand::beforeSocketCheck() {
if(sequence == WIRED) {
e->torrentMan->unadvertisePiece(cuid);
detectMessageFlooding();
checkLongTimePeerChoking();
PieceIndexes indexes = e->torrentMan->getAdvertisedPieceIndexes(cuid);
if(indexes.size() >= 20) {
PendingMessage pendingMessage(PeerMessage::BITFIELD, peerConnection);
pendingMessages.push_back(pendingMessage);
sendMessageQueue->addPendingMessage(pendingMessage);
} else {
if(pendingMessages.size() == 0) {
if(sendMessageQueue->countPendingMessage() == 0) {
for(PieceIndexes::iterator itr = indexes.begin(); itr != indexes.end(); itr++) {
peerConnection->sendHave(*itr);
}
} else {
for(PieceIndexes::iterator itr = indexes.begin(); itr != indexes.end(); itr++) {
PendingMessage pendingMessage = PendingMessage::createHaveMessage(*itr, peerConnection);
pendingMessages.push_back(pendingMessage);
sendMessageQueue->addPendingMessage(pendingMessage);
}
}
}

View File

@ -24,8 +24,7 @@
#include "PeerAbstractCommand.h"
#include "PeerConnection.h"
#include "PendingMessage.h"
#include "RequestSlotMan.h"
#include "SendMessageQueue.h"
using namespace std;
@ -33,12 +32,15 @@ class PeerInteractionCommand : public PeerAbstractCommand {
private:
int sequence;
PeerConnection* peerConnection;
RequestSlotMan* requestSlotMan;
PendingMessages pendingMessages;
SendMessageQueue* sendMessageQueue;
Piece piece;
struct timeval keepAliveCheckPoint;
struct timeval chokeCheckPoint;
struct timeval freqCheckPoint;
int chokeUnchokeCount;
int haveCount;
void receiveMessage();
void detectMessageFlooding();
void checkLongTimePeerChoking();
void syncPiece();
void detectTimeoutAndDuplicateBlock();
@ -46,10 +48,6 @@ private:
void sendInterest();
void sendMessages();
void createRequestPendingMessage(int blockIndex);
void deletePendingMessage(PeerMessage* cancelMessage);
const RequestSlot& getRequestSlot(int index, int begin, int length) const;
bool deleteRequestSlot(const RequestSlot& slot);
void deleteAllRequestSlot();
bool checkPieceHash(const Piece& piece);
void erasePieceOnDisk(const Piece& piece);
void keepAlive();

View File

@ -59,9 +59,9 @@ bool PeerListenCommand::execute() {
Socket* peerSocket = NULL;
try {
peerSocket = socket->acceptConnection();
pair<string, int> peerInfo;
peerSocket->getPeerInfo(peerInfo);
if(e->torrentMan->connections < MAX_PEERS) {
pair<string, int> peerInfo;
peerSocket->getPeerInfo(peerInfo);
Peer* peer = new Peer(peerInfo.first, peerInfo.second,
e->torrentMan->pieceLength,
e->torrentMan->totalSize);

View File

@ -72,7 +72,9 @@ bool PendingMessage::processMessage() {
}
break;
case PeerMessage::REQUEST:
peerConnection->sendRequest(index, begin, length);
if(!peerConnection->getPeer()->peerChoking) {
peerConnection->sendRequest(index, begin, length);
}
break;
case PeerMessage::CANCEL:
peerConnection->sendCancel(index, begin, length);
@ -83,11 +85,14 @@ bool PendingMessage::processMessage() {
return retval;
}
PendingMessage PendingMessage::createRequestMessage(int index, int begin, int length, PeerConnection* peerConnection) {
PendingMessage PendingMessage::createRequestMessage(const Piece& piece,
int blockIndex,
PeerConnection* peerConnection) {
PendingMessage pendingMessage(PeerMessage::REQUEST, peerConnection);
pendingMessage.setIndex(index);
pendingMessage.setBegin(begin);
pendingMessage.setLength(length);
pendingMessage.setIndex(piece.getIndex());
pendingMessage.setBegin(blockIndex*piece.getBlockLength());
pendingMessage.setLength(piece.getBlockLength(blockIndex));
pendingMessage.setBlockIndex(blockIndex);
return pendingMessage;
}

View File

@ -32,6 +32,7 @@ private:
int index;
int begin;
int length;
int blockIndex;
long long int pieceDataOffset;
int leftPieceDataLength;
bool inProgress;
@ -58,9 +59,11 @@ public:
int getBegin() const { return begin; }
void setLength(int length) { this->length = length; }
int getLength() const { return length; }
void setBlockIndex(int blockIndex) { this->blockIndex = blockIndex; }
int getBlockIndex() const { return blockIndex; }
bool processMessage();
static PendingMessage createRequestMessage(int index, int begin, int length, PeerConnection* peerConnection);
static PendingMessage createRequestMessage(const Piece& piece, int blockIndex, PeerConnection* peerConnection);
static PendingMessage createCancelMessage(int index, int begin, int length, PeerConnection* peerConnection);
static PendingMessage createPieceMessage(int index, int begin, int length, int pieceLength, PeerConnection* peerConnection);
static PendingMessage createHaveMessage(int index, PeerConnection* peerConnectioin);

View File

@ -48,7 +48,7 @@ void RequestSlotMan::deleteAllRequestSlot(Piece& piece) {
requestSlots.clear();
}
void RequestSlotMan::deleteTimedoutRequestSlot(Piece& piece) {
void RequestSlotMan::deleteTimeoutRequestSlot(Piece& piece) {
for(RequestSlots::iterator itr = requestSlots.begin();
itr != requestSlots.end();) {
if(itr->isTimeout(timeout)) {

View File

@ -58,7 +58,7 @@ public:
void deleteRequestSlot(const RequestSlot& requestSlot);
void deleteAllRequestSlot(Piece& piece);
void deleteTimedoutRequestSlot(Piece& piece);
void deleteTimeoutRequestSlot(Piece& piece);
void deleteCompletedRequestSlot(const Piece& piece);
RequestSlot getCorrespoindingRequestSlot(const PeerMessage* pieceMessage) const;

122
src/SendMessageQueue.cc Normal file
View File

@ -0,0 +1,122 @@
/* <!-- copyright */
/*
* aria2 - a simple utility for downloading files faster
*
* Copyright (C) 2006 Tatsuhiro Tsujikawa
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
/* copyright --> */
#include "SendMessageQueue.h"
SendMessageQueue::SendMessageQueue(int cuid, PeerConnection* peerConnection,
TorrentMan* torrentMan,
const Logger* logger)
:cuid(cuid), logger(logger) {
requestSlotMan = new RequestSlotMan(cuid, &pendingMessages, peerConnection,
torrentMan, logger);
}
SendMessageQueue::~SendMessageQueue() {
delete requestSlotMan;
}
void SendMessageQueue::send() {
for(PendingMessages::iterator itr = pendingMessages.begin();
itr != pendingMessages.end();) {
if(itr->processMessage()) {
itr = pendingMessages.erase(itr);
} else {
break;
}
}
}
void SendMessageQueue::addPendingMessage(const PendingMessage& pendingMessage) {
pendingMessages.push_back(pendingMessage);
if(pendingMessage.getPeerMessageId() == PeerMessage::REQUEST) {
RequestSlot requestSlot(pendingMessage.getIndex(),
pendingMessage.getBegin(),
pendingMessage.getLength(),
pendingMessage.getBlockIndex());
requestSlotMan->addRequestSlot(requestSlot);
}
}
void SendMessageQueue::deletePendingPieceMessage(const PeerMessage* cancelMessage) {
for(PendingMessages::iterator itr = pendingMessages.begin();
itr != pendingMessages.end();) {
PendingMessage& pendingMessage = *itr;
if(pendingMessage.getPeerMessageId() == PeerMessage::PIECE &&
pendingMessage.getIndex() == cancelMessage->getIndex() &&
pendingMessage.getBegin() == cancelMessage->getBegin() &&
pendingMessage.getLength() == cancelMessage->getLength() &&
!pendingMessage.isInProgress()) {
logger->debug("CUID#%d - deleting pending piece message because cancel message received. index=%d, begin=%d, length=%d",
cuid,
pendingMessage.getIndex(),
pendingMessage.getBegin(),
pendingMessage.getLength());
itr = pendingMessages.erase(itr);
} else {
itr++;
}
}
}
void SendMessageQueue::deletePendingRequestMessage() {
for(PendingMessages::iterator itr = pendingMessages.begin();
itr != pendingMessages.end();) {
PendingMessage& pendingMessage = *itr;
if(pendingMessage.getPeerMessageId() == PeerMessage::REQUEST) {
itr = pendingMessages.erase(itr);
} else {
itr++;
}
}
}
void SendMessageQueue::deleteRequestSlot(const RequestSlot& requestSlot) {
requestSlotMan->deleteRequestSlot(requestSlot);
}
void SendMessageQueue::deleteTimeoutRequestSlot(Piece& piece) {
requestSlotMan->deleteTimeoutRequestSlot(piece);
}
void SendMessageQueue::deleteCompletedRequestSlot(const Piece& piece) {
requestSlotMan->deleteCompletedRequestSlot(piece);
}
RequestSlot SendMessageQueue::getCorrespoindingRequestSlot(const PeerMessage* pieceMessage) const {
return requestSlotMan->getCorrespoindingRequestSlot(pieceMessage);
}
void SendMessageQueue::cancelAllRequest() {
cancelAllRequest(Piece::nullPiece);
}
void SendMessageQueue::cancelAllRequest(Piece& piece) {
deletePendingRequestMessage();
requestSlotMan->deleteAllRequestSlot(Piece::nullPiece);
}
int SendMessageQueue::countPendingMessage() const {
return pendingMessages.size();
}
int SendMessageQueue::countRequestSlot() const {
return requestSlotMan->countRequestSlot();
}

57
src/SendMessageQueue.h Normal file
View File

@ -0,0 +1,57 @@
/* <!-- copyright */
/*
* aria2 - a simple utility for downloading files faster
*
* Copyright (C) 2006 Tatsuhiro Tsujikawa
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
/* copyright --> */
#ifndef _D_SEND_MESSAGE_QUEUE_H_
#define _D_SEND_MESSAGE_QUEUE_H_
#include "common.h"
#include "RequestSlotMan.h"
class SendMessageQueue {
private:
int cuid;
RequestSlotMan* requestSlotMan;
PendingMessages pendingMessages;
const Logger* logger;
public:
SendMessageQueue(int cuid, PeerConnection* peerConnection,
TorrentMan* torrentMan, const Logger* logger);
~SendMessageQueue();
void send();
void addPendingMessage(const PendingMessage& pendingMessage);
void deletePendingPieceMessage(const PeerMessage* cancelMessage);
void deletePendingRequestMessage();
void deleteRequestSlot(const RequestSlot& requestSlot);
void deleteTimeoutRequestSlot(Piece& piece);
void deleteCompletedRequestSlot(const Piece& piece);
RequestSlot getCorrespoindingRequestSlot(const PeerMessage* pieceMessage) const;
void cancelAllRequest();
void cancelAllRequest(Piece& piece);
int countPendingMessage() const;
int countRequestSlot() const;
};
#endif // _D_SEND_MESSAGE_QUEUE_H_

View File

@ -36,6 +36,7 @@
TorrentMan::TorrentMan():bitfield(NULL),
peerEntryIdCounter(0), cuidCounter(0),
downloadedSize(0), uploadedSize(0),
preDownloadedSize(0), preUploadedSize(0),
deltaDownload(0), deltaUpload(0),
storeDir("."),
multiFileTopDir(NULL),
@ -74,12 +75,7 @@ bool TorrentMan::addPeer(Peer* peer, bool duplicate) {
}
}
} else {
if(peers.size() >= MAX_PEER_LIST_SIZE) {
deleteOldErrorPeers(100);
if(peers.size() >= MAX_PEER_LIST_SIZE) {
return false;
}
}
deleteOldErrorPeers(MAX_PEER_LIST_SIZE);
for(Peers::iterator itr = peers.begin(); itr != peers.end(); itr++) {
Peer* p = *itr;
if(p->ipaddr == peer->ipaddr && p->port == peer->port) {
@ -455,6 +451,8 @@ void TorrentMan::read(FILE* file) {
if(fread(&uploadedSize, sizeof(uploadedSize), 1, file) < 1) {
throw new DlAbortEx(strerror(errno));
}
preDownloadedSize = downloadedSize;
preUploadedSize = uploadedSize;
delete [] savedBitfield;
} catch(Exception* ex) {
delete [] savedBitfield;

View File

@ -74,6 +74,8 @@ private:
int cuidCounter;
long long int downloadedSize;
long long int uploadedSize;
long long int preDownloadedSize;
long long int preUploadedSize;
int deltaDownload;
int deltaUpload;
int fileMode;
@ -186,6 +188,13 @@ public:
long long int getUploadedSize() const { return uploadedSize; }
void setUploadedSize(long long int size) { uploadedSize = size; }
long long int getSessionDownloadedSize() const {
return downloadedSize-preDownloadedSize;
}
long long int getSessionUploadedSize() const {
return uploadedSize-preUploadedSize;
}
void setFileMode(int mode) {
fileMode = mode;
}

View File

@ -57,8 +57,8 @@ bool TrackerInitCommand::execute() {
"info_hash="+Util::urlencode(e->torrentMan->getInfoHash(), 20)+"&"+
"peer_id="+e->torrentMan->peerId+"&"+
"port="+Util::itos(e->torrentMan->getPort())+"&"+
"uploaded="+Util::llitos(e->torrentMan->getUploadedSize())+"&"+
"downloaded="+Util::llitos(e->torrentMan->getDownloadedSize())+"&"+
"uploaded="+Util::llitos(e->torrentMan->getSessionUploadedSize())+"&"+
"downloaded="+Util::llitos(e->torrentMan->getSessionDownloadedSize())+"&"+
"left="+(e->torrentMan->totalSize-e->torrentMan->getDownloadedSize() <= 0
? "0" : Util::llitos(e->torrentMan->totalSize-e->torrentMan->getDownloadedSize()))+"&"+
"compact=1";

View File

@ -62,6 +62,9 @@ bool TrackerUpdateCommand::execute() {
e->torrentMan->minInterval = minInterval->toInt();
e->logger->debug("CUID#%d - min interval:%d", cuid, e->torrentMan->minInterval);
}
if(e->torrentMan->minInterval > e->torrentMan->interval) {
e->torrentMan->minInterval = e->torrentMan->interval;
}
Data* complete = (Data*)response->get("complete");
if(complete != NULL) {
e->torrentMan->complete = complete->toInt();

View File

@ -242,6 +242,7 @@ int main(int argc, char* argv[]) {
Option* op = new Option();
op->put(PREF_RETRY_WAIT, "5");
op->put(PREF_TIMEOUT, "60");
op->put(PREF_PEER_CONNECTION_TIMEOUT, "30");
op->put(PREF_MIN_SEGMENT_SIZE, "1048576");// 1M
op->put(PREF_MAX_TRIES, "5");
op->put(PREF_HTTP_PROXY_METHOD, V_TUNNEL);

View File

@ -83,5 +83,10 @@
// values: true | false
#define PREF_HTTP_PROXY_AUTH_ENABLED "http_proxy_auth_enabled"
/**
* BitTorrent related preferences
*/
// values: 1*digit
#define PREF_PEER_CONNECTION_TIMEOUT "peer_connection_timeout"
#endif // _D_PREFS_H_