* src/PeerInteractionCommand.cc (checkInactiveConnection): New

function
	(detectMessageFlooding): Updated.
	(checkLongTimePeerChoking): Updated.
	(getNewPieceAndSendInterest): Added debug log.
	* src/PeerInteractionCommand.h: New function 
checkInactiveConnection()
	
	* src/TorrentMan.cc (deleteOldErrorPeers): Updated.
	(getPeer): Updated.
	* src/TorrentMan.h: Added MAX_PEER_ERROR.
	
	* src/PeerAbstractCommand.cc (onAbort): Increment error counter.

	* src/PeerListenCommand.cc (execute): Close connection if 
incoming peer
	is localhost.

	* src/main.cc (main): Updated PREF_PEER_CONNECTION_TIMEOUT to 
60.

	* src/PendingMessage.cc (processMessage): Not to send piece 
message
	if peer is not interested in the pieces localhost has.

	* src/Peer.cc (shouldChoke): Updated.

	* src/SendMessageQueue.cc (cancelAllRequest): Fixed.
pull/1/head
Tatsuhiro Tsujikawa 2006-03-31 13:58:22 +00:00
parent e1f24adc40
commit e6e0177560
14 changed files with 74 additions and 12 deletions

View File

@ -1,3 +1,29 @@
2006-03-31 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>
* src/PeerInteractionCommand.cc (checkInactiveConnection): New function
(detectMessageFlooding): Updated.
(checkLongTimePeerChoking): Updated.
(getNewPieceAndSendInterest): Added debug log.
* src/PeerInteractionCommand.h: New function checkInactiveConnection()
* src/TorrentMan.cc (deleteOldErrorPeers): Updated.
(getPeer): Updated.
* src/TorrentMan.h: Added MAX_PEER_ERROR.
* src/PeerAbstractCommand.cc (onAbort): Increment error counter.
* src/PeerListenCommand.cc (execute): Close connection if incoming peer
is localhost.
* src/main.cc (main): Updated PREF_PEER_CONNECTION_TIMEOUT to 60.
* src/PendingMessage.cc (processMessage): Not to send piece message
if peer is not interested in the pieces localhost has.
* src/Peer.cc (shouldChoke): Updated.
* src/SendMessageQueue.cc (cancelAllRequest): Fixed.
2006-03-28 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com> 2006-03-28 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>
Added new class SendMessageQueue that includes PendingMessages and Added new class SendMessageQueue that includes PendingMessages and

5
TODO
View File

@ -11,6 +11,5 @@
* Add port range command-line option * Add port range command-line option
* Add max peers command-line option * Add max peers command-line option
* Distinguish seeder from leecher * Distinguish seeder from leecher
* time out for connecting peers * file selection in multi-file mode
* ignore incoming connection from localhost. * try to use ftruncate to allocate file.
* do not connect to localhost

View File

@ -87,6 +87,9 @@ public:
void clearAllBit(); void clearAllBit();
void setAllBit(); void setAllBit();
void addFilter(long long int offset, long long int length);
void clearFilter();
}; };
#endif // _D_BITFIELD_MAN_H_ #endif // _D_BITFIELD_MAN_H_

View File

@ -40,6 +40,10 @@ bool Peer::shouldChoke() const {
if(peerDownload <= pieceLength*10) { if(peerDownload <= pieceLength*10) {
return false; return false;
} }
// we are always optimistic.
if(amInterested && peerInterested) {
return false;
}
if(amChocking) { if(amChocking) {
return !(peerDownload+pieceLength*5 < peerUpload); return !(peerDownload+pieceLength*5 < peerUpload);
} else { } else {
@ -50,3 +54,7 @@ bool Peer::shouldChoke() const {
bool Peer::hasPiece(int index) const { bool Peer::hasPiece(int index) const {
return bitfield->isBitSet(index); return bitfield->isBitSet(index);
} }
bool Peer::isSeeder() const {
return bitfield->isAllBitSet();
}

View File

@ -111,6 +111,8 @@ public:
bool hasPiece(int index) const; bool hasPiece(int index) const;
bool isSeeder() const;
static Peer* nullPeer; static Peer* nullPeer;
}; };

View File

@ -125,7 +125,11 @@ bool PeerAbstractCommand::prepareForRetry(int wait) {
} }
void PeerAbstractCommand::onAbort(Exception* ex) { void PeerAbstractCommand::onAbort(Exception* ex) {
peer->error = 1; if(peer->isSeeder()) {
peer->error++;
} else {
peer->error += MAX_PEER_ERROR;
}
peer->tryCount = 0; peer->tryCount = 0;
peer->cuid = 0; peer->cuid = 0;
peer->amChocking = true; peer->amChocking = true;

View File

@ -107,6 +107,7 @@ bool PeerInteractionCommand::executeInternal() {
case WIRED: case WIRED:
detectMessageFlooding(); detectMessageFlooding();
checkLongTimePeerChoking(); checkLongTimePeerChoking();
checkInactiveConnection();
syncPiece(); syncPiece();
decideChoking(); decideChoking();
for(int i = 0; i < 10; i++) { for(int i = 0; i < 10; i++) {
@ -128,6 +129,16 @@ bool PeerInteractionCommand::executeInternal() {
return false; return false;
} }
void PeerInteractionCommand::checkInactiveConnection() {
if((!peer->amInterested && !peer->peerInterested &&
e->torrentMan->connections >= MAX_PEER_LIST_SIZE) ||
(!peer->amInterested && e->torrentMan->connections >= MAX_PEER_LIST_SIZE &&
e->torrentMan->isEndGame())) {
throw new DlAbortEx("marked as inactive connection.");
}
}
void PeerInteractionCommand::detectMessageFlooding() { void PeerInteractionCommand::detectMessageFlooding() {
struct timeval now; struct timeval now;
gettimeofday(&now, NULL); gettimeofday(&now, NULL);
@ -135,7 +146,7 @@ void PeerInteractionCommand::detectMessageFlooding() {
freqCheckPoint = now; freqCheckPoint = now;
} else { } else {
if(Util::difftv(now, freqCheckPoint) >= 5*1000000) { if(Util::difftv(now, freqCheckPoint) >= 5*1000000) {
if(chokeUnchokeCount*1.0/(Util::difftv(now, freqCheckPoint)/1000000) >= 0.3 if(chokeUnchokeCount*1.0/(Util::difftv(now, freqCheckPoint)/1000000) >= 0.4
|| haveCount*1.0/(Util::difftv(now, freqCheckPoint)/1000000) >= 20.0) { || haveCount*1.0/(Util::difftv(now, freqCheckPoint)/1000000) >= 20.0) {
throw new DlAbortEx("flooding detected."); throw new DlAbortEx("flooding detected.");
} else { } else {
@ -159,7 +170,7 @@ void PeerInteractionCommand::checkLongTimePeerChoking() {
} }
} else { } else {
if(peer->amInterested && peer->peerChoking) { if(peer->amInterested && peer->peerChoking) {
if(Util::difftv(now, chokeCheckPoint) >= 5*60*1000000) { if(Util::difftv(now, chokeCheckPoint) >= 3*60*1000000) {
throw new DlAbortEx("too long choking"); throw new DlAbortEx("too long choking");
} }
} else { } else {
@ -325,6 +336,7 @@ Piece PeerInteractionCommand::getNewPieceAndSendInterest() {
PendingMessage pendingMessage(PeerMessage::NOT_INTERESTED, peerConnection); PendingMessage pendingMessage(PeerMessage::NOT_INTERESTED, peerConnection);
sendMessageQueue->addPendingMessage(pendingMessage); sendMessageQueue->addPendingMessage(pendingMessage);
} else { } else {
e->logger->debug("CUID#%d - starting download for piece #%d", cuid, piece.getIndex());
e->logger->debug("CUID#%d - try to send interested", cuid); e->logger->debug("CUID#%d - try to send interested", cuid);
PendingMessage pendingMessage(PeerMessage::INTERESTED, peerConnection); PendingMessage pendingMessage(PeerMessage::INTERESTED, peerConnection);
sendMessageQueue->addPendingMessage(pendingMessage); sendMessageQueue->addPendingMessage(pendingMessage);
@ -404,6 +416,7 @@ void PeerInteractionCommand::beforeSocketCheck() {
e->torrentMan->unadvertisePiece(cuid); e->torrentMan->unadvertisePiece(cuid);
detectMessageFlooding(); detectMessageFlooding();
checkLongTimePeerChoking(); checkLongTimePeerChoking();
checkInactiveConnection();
PieceIndexes indexes = e->torrentMan->getAdvertisedPieceIndexes(cuid); PieceIndexes indexes = e->torrentMan->getAdvertisedPieceIndexes(cuid);
if(indexes.size() >= 20) { if(indexes.size() >= 20) {

View File

@ -42,6 +42,7 @@ private:
void receiveMessage(); void receiveMessage();
void detectMessageFlooding(); void detectMessageFlooding();
void checkLongTimePeerChoking(); void checkLongTimePeerChoking();
void checkInactiveConnection();
void syncPiece(); void syncPiece();
void detectTimeoutAndDuplicateBlock(); void detectTimeoutAndDuplicateBlock();
void decideChoking(); void decideChoking();

View File

@ -61,7 +61,10 @@ bool PeerListenCommand::execute() {
peerSocket = socket->acceptConnection(); peerSocket = socket->acceptConnection();
pair<string, int> peerInfo; pair<string, int> peerInfo;
peerSocket->getPeerInfo(peerInfo); peerSocket->getPeerInfo(peerInfo);
if(e->torrentMan->connections < MAX_PEERS) { pair<string, int> localInfo;
peerSocket->getAddrInfo(localInfo);
if(peerInfo.first != localInfo.first &&
e->torrentMan->connections < MAX_PEERS) {
Peer* peer = new Peer(peerInfo.first, peerInfo.second, Peer* peer = new Peer(peerInfo.first, peerInfo.second,
e->torrentMan->pieceLength, e->torrentMan->pieceLength,
e->torrentMan->totalSize); e->torrentMan->totalSize);

View File

@ -57,11 +57,13 @@ bool PendingMessage::processMessage() {
} }
break; break;
case PeerMessage::PIECE: case PeerMessage::PIECE:
if(!peerConnection->getPeer()->amChocking) { if((!peerConnection->getPeer()->amChocking &&
peerConnection->getPeer()->peerInterested) || inProgress) {
if(!inProgress) { if(!inProgress) {
peerConnection->sendPieceHeader(index, begin, length); peerConnection->sendPieceHeader(index, begin, length);
peerConnection->getPeer()->addPeerDownload(length); peerConnection->getPeer()->addPeerDownload(length);
} }
inProgress = false;
int writtenLength = peerConnection->sendPieceData(pieceDataOffset, leftPieceDataLength); int writtenLength = peerConnection->sendPieceData(pieceDataOffset, leftPieceDataLength);
if(writtenLength != leftPieceDataLength) { if(writtenLength != leftPieceDataLength) {
inProgress = true; inProgress = true;

View File

@ -110,7 +110,7 @@ void SendMessageQueue::cancelAllRequest() {
void SendMessageQueue::cancelAllRequest(Piece& piece) { void SendMessageQueue::cancelAllRequest(Piece& piece) {
deletePendingRequestMessage(); deletePendingRequestMessage();
requestSlotMan->deleteAllRequestSlot(Piece::nullPiece); requestSlotMan->deleteAllRequestSlot(piece);
} }
int SendMessageQueue::countPendingMessage() const { int SendMessageQueue::countPendingMessage() const {

View File

@ -109,7 +109,7 @@ int TorrentMan::deleteOldErrorPeers(int maxNum) {
int counter = 0; int counter = 0;
for(Peers::iterator itr = peers.begin(); itr != peers.end();) { for(Peers::iterator itr = peers.begin(); itr != peers.end();) {
Peer* p = *itr; Peer* p = *itr;
if(p->error != 0 && p->cuid == 0) { if(p->error >= MAX_PEER_ERROR && p->cuid == 0) {
delete p; delete p;
itr = peers.erase(itr); itr = peers.erase(itr);
counter++; counter++;
@ -126,7 +126,7 @@ int TorrentMan::deleteOldErrorPeers(int maxNum) {
Peer* TorrentMan::getPeer() const { Peer* TorrentMan::getPeer() const {
for(Peers::const_iterator itr = peers.begin(); itr != peers.end(); itr++) { for(Peers::const_iterator itr = peers.begin(); itr != peers.end(); itr++) {
Peer* p = *itr; Peer* p = *itr;
if(p->cuid == 0 && p->error == 0) { if(p->cuid == 0 && p->error < MAX_PEER_ERROR) {
return p; return p;
} }
} }

View File

@ -49,6 +49,7 @@ using namespace std;
#define MAX_PEER_LIST_SIZE 250 #define MAX_PEER_LIST_SIZE 250
#define END_GAME_PIECE_NUM 20 #define END_GAME_PIECE_NUM 20
#define MAX_PEER_ERROR 5
class FileEntry { class FileEntry {
public: public:

View File

@ -242,7 +242,7 @@ int main(int argc, char* argv[]) {
Option* op = new Option(); Option* op = new Option();
op->put(PREF_RETRY_WAIT, "5"); op->put(PREF_RETRY_WAIT, "5");
op->put(PREF_TIMEOUT, "60"); op->put(PREF_TIMEOUT, "60");
op->put(PREF_PEER_CONNECTION_TIMEOUT, "30"); op->put(PREF_PEER_CONNECTION_TIMEOUT, "60");
op->put(PREF_MIN_SEGMENT_SIZE, "1048576");// 1M op->put(PREF_MIN_SEGMENT_SIZE, "1048576");// 1M
op->put(PREF_MAX_TRIES, "5"); op->put(PREF_MAX_TRIES, "5");
op->put(PREF_HTTP_PROXY_METHOD, V_TUNNEL); op->put(PREF_HTTP_PROXY_METHOD, V_TUNNEL);