2006-05-20 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>

To add the ability to download several pieces in mix in a 
command and
	increase the number of request slots according to request-cancel
	latency:
	
	* src/PeerInteractionCommand.cc
	(executeInternal): The number of messages to be received at a 
time
	is increased from 10 to 50.
	* src/LogFactory
	(getInstance): Added NullLogger.
	* src/NullLogger.h: New class.
	* src/BitfieldMan.h
	(getMissingIndex): New function.
	* src/BitfieldMan.cc
	(getMissingIndex): New function.
	* src/TorrentMan.h
	(checkOutPiece): New function.
	(getMissingPieceIndex): New function.
	(getMissingFastPieceIndex): New function.
	(getMissingFastPiece): New function.
	* src/TorrentMan.cc
	(updatePiece): Rewritten using STL.
	(syncPiece): Rewritten using STL.
	(getMissingPiece): Rewritten using getMissingPieceIndex() and
	checkOutPiece().
	(getMissingPieceIndex): New function.
	(getMissingFastPieceIndex): New function.
	(getMissingFastPiece): New function.
	(checkOutPiece): New function.
	* src/PeerInteraction.h
	(Pieces): New type definition.
	(piece): Removed.
	(pieces): New variable.
	(getNewPieceAndSendInterest): Added the "piece" argument.
	(abortPiece): Added the "piece" argument.
	(abortAllPieces): New function.
	(isInRequestSlot): New function.
	(hasDownloadPiece): Added the "index" argument.
	(setDownloadPiece): Removed.
	(getDownloadPiece): Added the "index" argument.
	(updatePiece): New function.
	(createRequestMessage): Added the "index" argument.
	* src/PeerInteraction.cc
	(onChoked): Rewritten.
	(abortPiece): Rewirtten.
	(abortAllPieces): New function.
	(deleteTimeoutRequestSlot): Rewritten.
	Clarified code a little bit.
	(deleteCompletedRequestSlot): Rewritten.
	(isInRequestSlot): New function.
	(syncPiece): Rewritten.
	(updatePiece): New function.
	(getNewPieceAndSendInterest): Rewritten.
	(addRequests):  Rewritten.
	(getDownloadPiece): Rewritten.
	(hasDownloadPiece): Rewritten.
	(createRequestMessage): Added the "index" argument.
	* src/common.h
	(BITFIELD_LEN_FROM_PIECES): Enclosed the variable in 
parentheses.
	(DIV_FLOOR): New definition.
	* src/PieceMessage.cc
	(receivedAction): Update request-piece latency here.
	* src/RequestSlot.h
	(getLatencyInMillis): New function.
	* src/RequestSlot.cc
	(isTimeout): Rewritten using getLatencyInMillis.
	(getLatencyInMillis): New function.
	* src/Piece.h
	(operator==): New function.
	(getMissingBlockIndex): New function.
	* src/Piece.cc
	(operator==): New function.
	(getMissingBlockIndex): New function.
	* src/Peer.h
	(DEFAULT_LATENCY): New definition.
	(latency): New variable.
	(updateLatency): New function.
	(getLatency): New function.
	* src/Peer.cc
	(resetStatus): Reset latecy.
	(updateLatency): New function.
pull/1/head
Tatsuhiro Tsujikawa 2006-05-21 16:19:17 +00:00
parent 6497ed63d4
commit 896ca193e0
20 changed files with 408 additions and 114 deletions

View File

@ -1,3 +1,86 @@
2006-05-20 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>
To add the ability to download several pieces in mix in a command and
increase the number of request slots according to request-cancel
latency:
* src/PeerInteractionCommand.cc
(executeInternal): The number of messages to be received at a time
is increased from 10 to 50.
* src/LogFactory
(getInstance): Added NullLogger.
* src/NullLogger.h: New class.
* src/BitfieldMan.h
(getMissingIndex): New function.
* src/BitfieldMan.cc
(getMissingIndex): New function.
* src/TorrentMan.h
(checkOutPiece): New function.
(getMissingPieceIndex): New function.
(getMissingFastPieceIndex): New function.
(getMissingFastPiece): New function.
* src/TorrentMan.cc
(updatePiece): Rewritten using STL.
(syncPiece): Rewritten using STL.
(getMissingPiece): Rewritten using getMissingPieceIndex() and
checkOutPiece().
(getMissingPieceIndex): New function.
(getMissingFastPieceIndex): New function.
(getMissingFastPiece): New function.
(checkOutPiece): New function.
* src/PeerInteraction.h
(Pieces): New type definition.
(piece): Removed.
(pieces): New variable.
(getNewPieceAndSendInterest): Added the "piece" argument.
(abortPiece): Added the "piece" argument.
(abortAllPieces): New function.
(isInRequestSlot): New function.
(hasDownloadPiece): Added the "index" argument.
(setDownloadPiece): Removed.
(getDownloadPiece): Added the "index" argument.
(updatePiece): New function.
(createRequestMessage): Added the "index" argument.
* src/PeerInteraction.cc
(onChoked): Rewritten.
(abortPiece): Rewirtten.
(abortAllPieces): New function.
(deleteTimeoutRequestSlot): Rewritten.
Clarified code a little bit.
(deleteCompletedRequestSlot): Rewritten.
(isInRequestSlot): New function.
(syncPiece): Rewritten.
(updatePiece): New function.
(getNewPieceAndSendInterest): Rewritten.
(addRequests): Rewritten.
(getDownloadPiece): Rewritten.
(hasDownloadPiece): Rewritten.
(createRequestMessage): Added the "index" argument.
* src/common.h
(BITFIELD_LEN_FROM_PIECES): Enclosed the variable in parentheses.
(DIV_FLOOR): New definition.
* src/PieceMessage.cc
(receivedAction): Update request-piece latency here.
* src/RequestSlot.h
(getLatencyInMillis): New function.
* src/RequestSlot.cc
(isTimeout): Rewritten using getLatencyInMillis.
(getLatencyInMillis): New function.
* src/Piece.h
(operator==): New function.
(getMissingBlockIndex): New function.
* src/Piece.cc
(operator==): New function.
(getMissingBlockIndex): New function.
* src/Peer.h
(DEFAULT_LATENCY): New definition.
(latency): New variable.
(updateLatency): New function.
(getLatency): New function.
* src/Peer.cc
(resetStatus): Reset latecy.
(updateLatency): New function.
2006-05-20 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com> 2006-05-20 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>
* src/SocketCore.cc * src/SocketCore.cc

View File

@ -192,6 +192,20 @@ int BitfieldMan::getFirstMissingUnusedIndex() const {
return -1; return -1;
} }
int BitfieldMan::getMissingIndex() const {
unsigned char* tempBitfield = new unsigned char[bitfieldLength];
for(int i = 0; i < bitfieldLength; i++) {
tempBitfield[i] = ~bitfield[i];
if(filterEnabled) {
tempBitfield[i] &= filterBitfield[i];
}
}
int max = countSetBit(tempBitfield, bitfieldLength);
int index = getMissingIndexRandomly(tempBitfield, bitfieldLength, max);
delete [] tempBitfield;
return index;
}
BlockIndexes BitfieldMan::getAllMissingIndexes() const { BlockIndexes BitfieldMan::getAllMissingIndexes() const {
BlockIndexes missingIndexes; BlockIndexes missingIndexes;
for(int i = 0; i < bitfieldLength; i++) { for(int i = 0; i < bitfieldLength; i++) {

View File

@ -68,6 +68,10 @@ public:
* affected by filter * affected by filter
*/ */
int getMissingIndex(const unsigned char* bitfield, int len) const; int getMissingIndex(const unsigned char* bitfield, int len) const;
/**
* affected by filter
*/
int getMissingIndex() const;
/** /**
* affected by filter * affected by filter
*/ */

View File

@ -21,20 +21,21 @@
/* copyright --> */ /* copyright --> */
#include "LogFactory.h" #include "LogFactory.h"
#include "SimpleLogger.h" #include "SimpleLogger.h"
#include "NullLogger.h"
string LogFactory::filename; string LogFactory::filename;
Logger* LogFactory::logger = NULL; Logger* LogFactory::logger = NULL;
Logger* LogFactory::getInstance() { Logger* LogFactory::getInstance() {
if(logger == NULL) { if(logger == NULL) {
SimpleLogger* slogger = new SimpleLogger();
if(filename.empty()) { if(filename.empty()) {
slogger->openFile("/dev/null"); logger = new NullLogger();
} else { } else {
SimpleLogger* slogger = new SimpleLogger();
slogger->openFile(filename); slogger->openFile(filename);
}
logger = slogger; logger = slogger;
} }
}
return logger; return logger;
} }

View File

@ -100,7 +100,8 @@ SRCS = Socket.cc Socket.h\
RejectMessage.cc RejectMessage.h\ RejectMessage.cc RejectMessage.h\
AllowedFastMessage.cc AllowedFastMessage.h\ AllowedFastMessage.cc AllowedFastMessage.h\
SuggestPieceMessage.cc SuggestPieceMessage.h\ SuggestPieceMessage.cc SuggestPieceMessage.h\
SimplePeerMessage.cc SimplePeerMessage.h SimplePeerMessage.cc SimplePeerMessage.h\
NullLogger.h
noinst_LIBRARIES = libaria2c.a noinst_LIBRARIES = libaria2c.a
libaria2c_a_SOURCES = $(SRCS) libaria2c_a_SOURCES = $(SRCS)
aria2c_LDADD = libaria2c.a @LIBINTL@ @ALLOCA@ @LIBGNUTLS_LIBS@\ aria2c_LDADD = libaria2c.a @LIBINTL@ @ALLOCA@ @LIBGNUTLS_LIBS@\

View File

@ -359,7 +359,8 @@ SRCS = Socket.cc Socket.h\
RejectMessage.cc RejectMessage.h\ RejectMessage.cc RejectMessage.h\
AllowedFastMessage.cc AllowedFastMessage.h\ AllowedFastMessage.cc AllowedFastMessage.h\
SuggestPieceMessage.cc SuggestPieceMessage.h\ SuggestPieceMessage.cc SuggestPieceMessage.h\
SimplePeerMessage.cc SimplePeerMessage.h SimplePeerMessage.cc SimplePeerMessage.h\
NullLogger.h
noinst_LIBRARIES = libaria2c.a noinst_LIBRARIES = libaria2c.a
libaria2c_a_SOURCES = $(SRCS) libaria2c_a_SOURCES = $(SRCS)

43
src/NullLogger.h Normal file
View File

@ -0,0 +1,43 @@
/* <!-- copyright */
/*
* aria2 - a simple utility for downloading files faster
*
* Copyright (C) 2006 Tatsuhiro Tsujikawa
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
/* copyright --> */
#ifndef _D_NULL_LOGGER_H_
#define _D_NULL_LOGGER_H_
#include "Logger.h"
using namespace std;
class NullLogger : public Logger {
public:
NullLogger() {}
virtual ~NullLogger() {}
virtual void debug(const char* msg, ...) const {}
virtual void debug(const char* msg, Exception* ex, ...) const {}
virtual void info(const char* msg, ...) const {}
virtual void info(const char* msg, Exception* ex, ...) const {}
virtual void warn(const char* msg, ...) const {}
virtual void warn(const char* msg, Exception* ex, ...) const {}
virtual void error(const char* msg, ...) const {}
virtual void error(const char* msg, Exception* ex, ...) const {}
};
#endif // _D_NULL_LOGGER_H_

View File

@ -60,6 +60,7 @@ void Peer::resetStatus() {
chokingRequired = true; chokingRequired = true;
optUnchoking = false; optUnchoking = false;
fastExtensionEnabled = false; fastExtensionEnabled = false;
latency = DEFAULT_LATENCY;
fastSet.clear(); fastSet.clear();
} }
@ -76,3 +77,7 @@ void Peer::addFastSetIndex(int index) {
void Peer::setAllBitfield() { void Peer::setAllBitfield() {
bitfield->setAllBit(); bitfield->setAllBit();
} }
void Peer::updateLatency(int latency) {
this->latency = (this->latency*80+latency*20)/200;
}

View File

@ -30,6 +30,7 @@
using namespace std; using namespace std;
#define PEER_ID_LENGTH 20 #define PEER_ID_LENGTH 20
#define DEFAULT_LATENCY 1000
class Peer { class Peer {
public: public:
@ -57,6 +58,7 @@ private:
long long int totalLength; long long int totalLength;
int deltaUpload; int deltaUpload;
int deltaDownload; int deltaDownload;
int latency;
public: public:
Peer(string ipaddr, int port, int pieceLength, long long int totalLength): Peer(string ipaddr, int port, int pieceLength, long long int totalLength):
entryId(0), ipaddr(ipaddr), port(port), entryId(0), ipaddr(ipaddr), port(port),
@ -68,7 +70,8 @@ public:
fastExtensionEnabled(false), fastExtensionEnabled(false),
peerUpload(0), peerDownload(0), peerUpload(0), peerDownload(0),
pieceLength(pieceLength), totalLength(totalLength), pieceLength(pieceLength), totalLength(totalLength),
deltaUpload(0), deltaDownload(0) { deltaUpload(0), deltaDownload(0),
latency(DEFAULT_LATENCY) {
this->bitfield = new BitfieldMan(pieceLength, totalLength); this->bitfield = new BitfieldMan(pieceLength, totalLength);
} }
@ -140,6 +143,9 @@ public:
bool isSeeder() const; bool isSeeder() const;
void updateLatency(int latency);
int getLatency() const { return latency; }
static Peer* nullPeer; static Peer* nullPeer;
}; };

View File

@ -35,8 +35,7 @@ PeerInteraction::PeerInteraction(int cuid,
:cuid(cuid), :cuid(cuid),
uploadLimit(0), uploadLimit(0),
torrentMan(torrentMan), torrentMan(torrentMan),
peer(peer), peer(peer) {
piece(Piece::nullPiece) {
peerConnection = new PeerConnection(cuid, socket, op); peerConnection = new PeerConnection(cuid, socket, op);
logger = LogFactory::getInstance(); logger = LogFactory::getInstance();
} }
@ -163,34 +162,51 @@ void PeerInteraction::rejectPieceMessageInQueue(int index, int begin, int length
} }
void PeerInteraction::onChoked() { void PeerInteraction::onChoked() {
if(!Piece::isNull(piece) && !peer->isInFastSet(piece.getIndex())) { for(Pieces::iterator itr = pieces.begin(); itr != pieces.end();) {
abortPiece(); Piece& piece = *itr;
if(!peer->isInFastSet(piece.getIndex())) {
abortPiece(piece);
itr = pieces.erase(itr);
} else {
itr++;
}
} }
} }
void PeerInteraction::abortPiece() { void PeerInteraction::abortAllPieces() {
for(Pieces::iterator itr = pieces.begin(); itr != pieces.end();) {
abortPiece(*itr);
itr = pieces.erase(itr);
}
}
void PeerInteraction::abortPiece(Piece& piece) {
if(!Piece::isNull(piece)) { if(!Piece::isNull(piece)) {
for(MessageQueue::iterator itr = messageQueue.begin(); for(MessageQueue::iterator itr = messageQueue.begin();
itr != messageQueue.end();) { itr != messageQueue.end();) {
if((*itr)->getId() == RequestMessage::ID if((*itr)->getId() == RequestMessage::ID &&
&& !(*itr)->isInProgress()) { !(*itr)->isInProgress() &&
((RequestMessage*)*itr)->getIndex() == piece.getIndex()) {
delete *itr; delete *itr;
itr = messageQueue.erase(itr); itr = messageQueue.erase(itr);
} else { } else {
itr++; itr++;
} }
} }
for(RequestSlots::const_iterator itr = requestSlots.begin(); for(RequestSlots::iterator itr = requestSlots.begin();
itr != requestSlots.end(); itr++) { itr != requestSlots.end();) {
if(itr->getIndex() == piece.getIndex()) {
logger->debug("CUID#%d - Deleting request slot blockIndex=%d" logger->debug("CUID#%d - Deleting request slot blockIndex=%d"
" because piece was canceled", " because piece was canceled",
cuid, cuid,
itr->getBlockIndex()); itr->getBlockIndex());
piece.cancelBlock(itr->getBlockIndex()); piece.cancelBlock(itr->getBlockIndex());
itr = requestSlots.erase(itr);
} else {
itr++;
}
} }
requestSlots.clear();
torrentMan->cancelPiece(piece); torrentMan->cancelPiece(piece);
piece = Piece::nullPiece;
} }
} }
@ -206,31 +222,35 @@ void PeerInteraction::deleteRequestSlot(const RequestSlot& requestSlot) {
void PeerInteraction::deleteTimeoutRequestSlot() { void PeerInteraction::deleteTimeoutRequestSlot() {
for(RequestSlots::iterator itr = requestSlots.begin(); for(RequestSlots::iterator itr = requestSlots.begin();
itr != requestSlots.end();) { itr != requestSlots.end();) {
if(itr->isTimeout(REQUEST_TIME_OUT)) { RequestSlot& slot = *itr;
if(slot.isTimeout(REQUEST_TIME_OUT)) {
logger->debug("CUID#%d - Deleting request slot blockIndex=%d" logger->debug("CUID#%d - Deleting request slot blockIndex=%d"
" because of time out", " because of time out",
cuid, cuid,
itr->getBlockIndex()); slot.getBlockIndex());
if(!Piece::isNull(piece)) { Piece& piece = getDownloadPiece(slot.getIndex());
piece.cancelBlock(itr->getBlockIndex()); piece.cancelBlock(slot.getBlockIndex());
}
itr = requestSlots.erase(itr); itr = requestSlots.erase(itr);
} else { } else {
itr++; itr++;
} }
} }
torrentMan->updatePiece(piece); updatePiece();
} }
void PeerInteraction::deleteCompletedRequestSlot() { void PeerInteraction::deleteCompletedRequestSlot() {
for(RequestSlots::iterator itr = requestSlots.begin(); for(RequestSlots::iterator itr = requestSlots.begin();
itr != requestSlots.end();) { itr != requestSlots.end();) {
if(Piece::isNull(piece) || piece.hasBlock(itr->getBlockIndex()) || RequestSlot& slot = *itr;
Piece piece = getDownloadPiece(slot.getIndex());
if(piece.hasBlock(slot.getBlockIndex()) ||
torrentMan->hasPiece(piece.getIndex())) { torrentMan->hasPiece(piece.getIndex())) {
logger->debug("CUID#%d - Deleting request slot blockIndex=%d because" logger->debug("CUID#%d - Deleting request slot blockIndex=%d because"
" the block has been acquired.", cuid, " the block has been acquired.", cuid,
itr->getBlockIndex()); slot.getBlockIndex());
addMessage(createCancelMessage(itr->getIndex(), itr->getBegin(), itr->getLength())); addMessage(createCancelMessage(slot.getIndex(),
slot.getBegin(),
slot.getLength()));
itr = requestSlots.erase(itr); itr = requestSlots.erase(itr);
} else { } else {
itr++; itr++;
@ -238,6 +258,17 @@ void PeerInteraction::deleteCompletedRequestSlot() {
} }
} }
bool PeerInteraction::isInRequestSlot(int index, int blockIndex) const {
for(RequestSlots::const_iterator itr = requestSlots.begin();
itr != requestSlots.end(); itr++) {
const RequestSlot& slot = *itr;
if(slot.getIndex() == index && slot.getBlockIndex() == blockIndex) {
return true;
}
}
return false;
}
RequestSlot PeerInteraction::getCorrespondingRequestSlot(int index, RequestSlot PeerInteraction::getCorrespondingRequestSlot(int index,
int begin, int begin,
int length) const { int length) const {
@ -379,62 +410,112 @@ PeerMessage* PeerInteraction::createPeerMessage(const char* msg, int msgLength)
void PeerInteraction::syncPiece() { void PeerInteraction::syncPiece() {
if(Piece::isNull(piece)) { for(Pieces::iterator itr = pieces.begin(); itr != pieces.end(); itr++) {
return; torrentMan->syncPiece(*itr);
} }
torrentMan->syncPiece(piece);
} }
void PeerInteraction::getNewPieceAndSendInterest() { void PeerInteraction::updatePiece() {
piece = torrentMan->getMissingPiece(peer); for(Pieces::iterator itr = pieces.begin(); itr != pieces.end(); itr++) {
if(Piece::isNull(piece)) { torrentMan->updatePiece(*itr);
}
}
void PeerInteraction::getNewPieceAndSendInterest(int pieceNum) {
int index = torrentMan->getMissingPieceIndex(peer);
if(pieces.empty() && index == -1) {
if(peer->amInterested) {
logger->debug("CUID#%d - Not interested in the peer", cuid); logger->debug("CUID#%d - Not interested in the peer", cuid);
addMessage(createNotInterestedMessage()); addMessage(createNotInterestedMessage());
} else {
if(peer->peerChoking && !peer->isInFastSet(piece.getIndex())) {
abortPiece();
} else {
logger->info("CUID#%d - Starting download for piece index=%d (%d/%d completed)",
cuid, piece.getIndex(), piece.countCompleteBlock(),
piece.countBlock());
} }
} else {
if(peer->peerChoking) {
onChoked();
if(peer->isFastExtensionEnabled()) {
while((int)pieces.size() < pieceNum) {
Piece piece = torrentMan->getMissingFastPiece(peer);
if(Piece::isNull(piece)) {
break;
} else {
pieces.push_back(piece);
}
}
}
} else {
while((int)pieces.size() < pieceNum) {
Piece piece = torrentMan->getMissingPiece(peer);
if(Piece::isNull(piece)) {
break;
} else {
pieces.push_back(piece);
}
}
}
if(!peer->amInterested) {
logger->debug("CUID#%d - Interested in the peer", cuid); logger->debug("CUID#%d - Interested in the peer", cuid);
addMessage(createInterestedMessage()); addMessage(createInterestedMessage());
} }
}
} }
void PeerInteraction::addRequests() { void PeerInteraction::addRequests() {
if(Piece::isNull(piece)) { // Abort downloading of completed piece.
// retrive new piece from TorrentMan for(Pieces::iterator itr = pieces.begin(); itr != pieces.end();) {
getNewPieceAndSendInterest(); Piece& piece = *itr;
} else if(peer->peerChoking && !peer->isInFastSet(piece.getIndex())) { if(piece.pieceComplete()) {
onChoked(); abortPiece(piece);
} else if(piece.pieceComplete()) { itr = pieces.erase(itr);
abortPiece(); } else {
getNewPieceAndSendInterest(); itr++;
} }
if(!Piece::isNull(piece)) { }
int MAX_PENDING_REQUEST;
if(peer->getLatency() < 300) {
MAX_PENDING_REQUEST = 24;
} else if(peer->getLatency() < 600) {
MAX_PENDING_REQUEST = 18;
} else if(peer->getLatency() < 1000) {
MAX_PENDING_REQUEST = 12;
} else {
MAX_PENDING_REQUEST = 6;
}
int pieceNum;
if(torrentMan->isEndGame()) {
pieceNum = 1;
} else {
int blocks = DIV_FLOOR(torrentMan->pieceLength, BLOCK_LENGTH);
pieceNum = DIV_FLOOR(MAX_PENDING_REQUEST, blocks);
}
getNewPieceAndSendInterest(pieceNum);
for(Pieces::iterator itr = pieces.begin(); itr != pieces.end(); itr++) {
Piece& piece = *itr;
if(torrentMan->isEndGame()) { if(torrentMan->isEndGame()) {
BlockIndexes missingBlockIndexes = piece.getAllMissingBlockIndexes(); BlockIndexes missingBlockIndexes = piece.getAllMissingBlockIndexes();
if(countRequestSlot() == 0) {
random_shuffle(missingBlockIndexes.begin(), missingBlockIndexes.end()); random_shuffle(missingBlockIndexes.begin(), missingBlockIndexes.end());
int count = 0; int count = countRequestSlot();
for(BlockIndexes::const_iterator itr = missingBlockIndexes.begin(); for(BlockIndexes::const_iterator bitr = missingBlockIndexes.begin();
itr != missingBlockIndexes.end() && count < 6; itr++, count++) { bitr != missingBlockIndexes.end() && count < MAX_PENDING_REQUEST;
addMessage(createRequestMessage(*itr)); bitr++) {
int blockIndex = *bitr;
if(!isInRequestSlot(piece.getIndex(), blockIndex)) {
addMessage(createRequestMessage(piece.getIndex(), blockIndex));
count++;
} }
} }
} else { } else {
for(int i = countRequestSlot(); i < 6; i++) { while(countRequestSlot() < MAX_PENDING_REQUEST) {
int blockIndex = piece.getMissingUnusedBlockIndex(); int blockIndex = piece.getMissingUnusedBlockIndex();
if(blockIndex == -1) { if(blockIndex == -1) {
break; break;
} }
torrentMan->updatePiece(piece); addMessage(createRequestMessage(piece.getIndex(), blockIndex));
addMessage(createRequestMessage(blockIndex));
} }
} }
if(countRequestSlot() >= MAX_PENDING_REQUEST) {
break;
} }
}
updatePiece();
} }
void PeerInteraction::sendHandshake() { void PeerInteraction::sendHandshake() {
@ -474,11 +555,22 @@ void PeerInteraction::sendAllowedFast() {
} }
} }
Piece& PeerInteraction::getDownloadPiece() { Piece& PeerInteraction::getDownloadPiece(int index) {
if(Piece::isNull(piece)) { for(Pieces::iterator itr = pieces.begin(); itr != pieces.end(); itr++) {
throw new DlAbortEx("current piece is null"); if(itr->getIndex() == index) {
return *itr;
} }
return piece; }
throw new DlAbortEx("No such piece index=%d", index);
}
bool PeerInteraction::hasDownloadPiece(int index) const {
for(Pieces::const_iterator itr = pieces.begin(); itr != pieces.end(); itr++) {
if(itr->getIndex() == index) {
return true;
}
}
return false;
} }
bool PeerInteraction::isInFastSet(int index) const { bool PeerInteraction::isInFastSet(int index) const {
@ -497,8 +589,9 @@ void PeerInteraction::setPeerMessageCommonProperty(PeerMessage* peerMessage) {
peerMessage->setPeerInteraction(this); peerMessage->setPeerInteraction(this);
} }
RequestMessage* PeerInteraction::createRequestMessage(int blockIndex) { RequestMessage* PeerInteraction::createRequestMessage(int index, int blockIndex) {
RequestMessage* msg = new RequestMessage(); RequestMessage* msg = new RequestMessage();
Piece piece = getDownloadPiece(index);
msg->setIndex(piece.getIndex()); msg->setIndex(piece.getIndex());
msg->setBegin(blockIndex*piece.getBlockLength()); msg->setBegin(blockIndex*piece.getBlockLength());
msg->setLength(piece.getBlockLength(blockIndex)); msg->setLength(piece.getBlockLength(blockIndex));

View File

@ -48,6 +48,7 @@
typedef deque<RequestSlot> RequestSlots; typedef deque<RequestSlot> RequestSlots;
typedef deque<PeerMessage*> MessageQueue; typedef deque<PeerMessage*> MessageQueue;
typedef deque<Piece> Pieces;
class PeerInteraction { class PeerInteraction {
private: private:
@ -59,12 +60,12 @@ private:
TorrentMan* torrentMan; TorrentMan* torrentMan;
PeerConnection* peerConnection; PeerConnection* peerConnection;
Peer* peer; Peer* peer;
Piece piece; Pieces pieces;
// allowed fast piece indexes that local client has sent // allowed fast piece indexes that local client has sent
Integers fastSet; Integers fastSet;
const Logger* logger; const Logger* logger;
void getNewPieceAndSendInterest(); void getNewPieceAndSendInterest(int pieceNum);
PeerMessage* createPeerMessage(const char* msg, int msgLength); PeerMessage* createPeerMessage(const char* msg, int msgLength);
HandshakeMessage* createHandshakeMessage(const char* msg, int msgLength); HandshakeMessage* createHandshakeMessage(const char* msg, int msgLength);
void setPeerMessageCommonProperty(PeerMessage* peerMessage); void setPeerMessageCommonProperty(PeerMessage* peerMessage);
@ -81,13 +82,15 @@ public:
void rejectPieceMessageInQueue(int index, int begin, int length); void rejectPieceMessageInQueue(int index, int begin, int length);
void rejectAllPieceMessageInQueue(); void rejectAllPieceMessageInQueue();
void onChoked(); void onChoked();
void abortPiece(); void abortPiece(Piece& piece);
void abortAllPieces();
bool isSendingMessageInProgress() const; bool isSendingMessageInProgress() const;
void deleteRequestSlot(const RequestSlot& requestSlot); void deleteRequestSlot(const RequestSlot& requestSlot);
void deleteTimeoutRequestSlot(); void deleteTimeoutRequestSlot();
void deleteCompletedRequestSlot(); void deleteCompletedRequestSlot();
RequestSlot getCorrespondingRequestSlot(int index, int begin, int length) const; RequestSlot getCorrespondingRequestSlot(int index, int begin, int length) const;
bool isInRequestSlot(int index, int blockIndex) const;
int countMessageInQueue() const; int countMessageInQueue() const;
@ -97,21 +100,17 @@ public:
TorrentMan* getTorrentMan() const { return torrentMan; } TorrentMan* getTorrentMan() const { return torrentMan; }
PeerConnection* getPeerConnection() const { return peerConnection; } PeerConnection* getPeerConnection() const { return peerConnection; }
// If this object has nullPiece, then return false, otherwise true // If this object has nullPiece, then return false, otherwise true
bool hasDownloadPiece() const { bool hasDownloadPiece(int index) const;
return !Piece::isNull(piece);
}
// If the piece which this object has is nullPiece, then throws an exception. // If the piece which this object has is nullPiece, then throws an exception.
// So before calling this function, call hasDownloadPiece and make sure // So before calling this function, call hasDownloadPiece and make sure
// this has valid piece, not nullPiece. // this has valid piece, not nullPiece.
Piece& getDownloadPiece(); Piece& getDownloadPiece(int index);
void setDownloadPiece(const Piece& piece) {
this->piece = piece;
}
bool isInFastSet(int index) const; bool isInFastSet(int index) const;
void addFastSetIndex(int index); void addFastSetIndex(int index);
void syncPiece(); void syncPiece();
void updatePiece();
void addRequests(); void addRequests();
void sendMessages(int currentUploadSpeed); void sendMessages(int currentUploadSpeed);
void sendHandshake(); void sendHandshake();
@ -121,7 +120,7 @@ public:
PeerMessage* receiveMessage(); PeerMessage* receiveMessage();
HandshakeMessage* receiveHandshake(); HandshakeMessage* receiveHandshake();
RequestMessage* createRequestMessage(int blockIndex); RequestMessage* createRequestMessage(int index, int blockIndex);
CancelMessage* createCancelMessage(int index, int begin, int length); CancelMessage* createCancelMessage(int index, int begin, int length);
PieceMessage* createPieceMessage(int index, int begin, int length); PieceMessage* createPieceMessage(int index, int begin, int length);
HaveMessage* createHaveMessage(int index); HaveMessage* createHaveMessage(int index);

View File

@ -113,7 +113,7 @@ bool PeerInteractionCommand::executeInternal() {
case WIRED: case WIRED:
peerInteraction->syncPiece(); peerInteraction->syncPiece();
decideChoking(); decideChoking();
for(int i = 0; i < 10; i++) { for(int i = 0; i < 50; i++) {
if(!socket->isReadable(0)) { if(!socket->isReadable(0)) {
break; break;
} }
@ -245,7 +245,7 @@ bool PeerInteractionCommand::prepareForRetry(int wait) {
} }
void PeerInteractionCommand::onAbort(Exception* ex) { void PeerInteractionCommand::onAbort(Exception* ex) {
peerInteraction->abortPiece(); peerInteraction->abortAllPieces();
PeerAbstractCommand::onAbort(ex); PeerAbstractCommand::onAbort(ex);
} }

View File

@ -49,6 +49,10 @@ Piece& Piece::operator=(const Piece& piece) {
return *this; return *this;
} }
bool Piece::operator==(const Piece& piece) const {
return index == piece.index;
}
void Piece::completeBlock(int blockIndex) { void Piece::completeBlock(int blockIndex) {
bitfield->setBit(blockIndex); bitfield->setBit(blockIndex);
bitfield->unsetUseBit(blockIndex); bitfield->unsetUseBit(blockIndex);
@ -79,6 +83,15 @@ int Piece::getMissingUnusedBlockIndex() const {
return blockIndex; return blockIndex;
} }
int Piece::getMissingBlockIndex() const {
int blockIndex = bitfield->getMissingIndex();
if(blockIndex == -1) {
return blockIndex;
}
bitfield->setUseBit(blockIndex);
return blockIndex;
}
BlockIndexes Piece::getAllMissingBlockIndexes() const { BlockIndexes Piece::getAllMissingBlockIndexes() const {
return bitfield->getAllMissingIndexes(); return bitfield->getAllMissingIndexes();
} }

View File

@ -43,8 +43,10 @@ public:
} }
Piece& operator=(const Piece& piece); Piece& operator=(const Piece& piece);
bool operator==(const Piece& piece) const;
int getMissingUnusedBlockIndex() const; int getMissingUnusedBlockIndex() const;
int getMissingBlockIndex() const;
BlockIndexes getAllMissingBlockIndexes() const; BlockIndexes getAllMissingBlockIndexes() const;
void completeBlock(int blockIndex); void completeBlock(int blockIndex);
void cancelBlock(int blockIndex); void cancelBlock(int blockIndex);

View File

@ -57,9 +57,11 @@ void PieceMessage::receivedAction() {
begin, begin,
blockLength); blockLength);
peer->addPeerUpload(blockLength); peer->addPeerUpload(blockLength);
if(peerInteraction->hasDownloadPiece() && if(!RequestSlot::isNull(slot) &&
!RequestSlot::isNull(slot)) { peerInteraction->hasDownloadPiece(slot.getIndex())) {
Piece& piece = peerInteraction->getDownloadPiece(); //logger->debug("CUID#%d - Latency=%d", cuid, slot.getLatencyInMillis());
peer->updateLatency(slot.getLatencyInMillis());
Piece& piece = peerInteraction->getDownloadPiece(slot.getIndex());
long long int offset = long long int offset =
((long long int)index)*torrentMan->pieceLength+begin; ((long long int)index)*torrentMan->pieceLength+begin;
logger->debug("CUID#%d - Writing the block length=%d, offset=%lld", logger->debug("CUID#%d - Writing the block length=%d, offset=%lld",
@ -197,7 +199,6 @@ void PieceMessage::onGotNewPiece(Piece& piece) {
logger->info(MSG_GOT_NEW_PIECE, cuid, piece.getIndex()); logger->info(MSG_GOT_NEW_PIECE, cuid, piece.getIndex());
torrentMan->completePiece(piece); torrentMan->completePiece(piece);
torrentMan->advertisePiece(cuid, piece.getIndex()); torrentMan->advertisePiece(cuid, piece.getIndex());
piece = Piece::nullPiece;
} }
void PieceMessage::onGotWrongPiece(Piece& piece) { void PieceMessage::onGotWrongPiece(Piece& piece) {

View File

@ -53,9 +53,13 @@ void RequestSlot::setDispatchedTime() {
} }
bool RequestSlot::isTimeout(int timeoutSec) const { bool RequestSlot::isTimeout(int timeoutSec) const {
return getLatencyInMillis() > timeoutSec*1000;
}
int RequestSlot::getLatencyInMillis() const {
struct timeval now; struct timeval now;
gettimeofday(&now, NULL); gettimeofday(&now, NULL);
return Util::difftv(now, dispatchedTime) > ((long long int)timeoutSec)*1000000; return Util::difftv(now, dispatchedTime)/1000;
} }
bool RequestSlot::isNull(const RequestSlot& requestSlot) { bool RequestSlot::isNull(const RequestSlot& requestSlot) {

View File

@ -43,6 +43,7 @@ public:
void setDispatchedTime(); void setDispatchedTime();
bool isTimeout(int timeoutSec) const; bool isTimeout(int timeoutSec) const;
int getLatencyInMillis() const;
bool operator==(const RequestSlot& requestSlot) const; bool operator==(const RequestSlot& requestSlot) const;

View File

@ -145,7 +145,19 @@ bool TorrentMan::isEndGame() const {
return bitfield->countMissingBlock() <= END_GAME_PIECE_NUM; return bitfield->countMissingBlock() <= END_GAME_PIECE_NUM;
} }
Piece TorrentMan::getMissingPiece(const Peer* peer) { int TorrentMan::getMissingPieceIndex(const Peer* peer) const {
int index = -1;
if(isEndGame()) {
index = bitfield->getMissingIndex(peer->getBitfield(),
peer->getBitfieldLength());
} else {
index = bitfield->getMissingUnusedIndex(peer->getBitfield(),
peer->getBitfieldLength());
}
return index;
}
int TorrentMan::getMissingFastPieceIndex(const Peer* peer) const {
int index = -1; int index = -1;
if(peer->isFastExtensionEnabled() && peer->countFastSet() > 0) { if(peer->isFastExtensionEnabled() && peer->countFastSet() > 0) {
BitfieldMan tempBitfield(pieceLength, totalLength); BitfieldMan tempBitfield(pieceLength, totalLength);
@ -163,15 +175,20 @@ Piece TorrentMan::getMissingPiece(const Peer* peer) {
tempBitfield.getBitfieldLength()); tempBitfield.getBitfieldLength());
} }
} }
if(index == -1) { return index;
if(isEndGame()) { }
index = bitfield->getMissingIndex(peer->getBitfield(),
peer->getBitfieldLength()); Piece TorrentMan::getMissingFastPiece(const Peer* peer) {
} else { int index = getMissingFastPieceIndex(peer);
index = bitfield->getMissingUnusedIndex(peer->getBitfield(), return checkOutPiece(index);
peer->getBitfieldLength()); }
}
} Piece TorrentMan::getMissingPiece(const Peer* peer) {
int index = getMissingPieceIndex(peer);
return checkOutPiece(index);
}
Piece TorrentMan::checkOutPiece(int index) {
if(index == -1) { if(index == -1) {
return Piece::nullPiece; return Piece::nullPiece;
} }
@ -281,11 +298,10 @@ void TorrentMan::updatePiece(const Piece& piece) {
if(Piece::isNull(piece)) { if(Piece::isNull(piece)) {
return; return;
} }
for(UsedPieces::iterator itr = usedPieces.begin(); itr != usedPieces.end(); itr++) { UsedPieces::iterator itr = find(usedPieces.begin(), usedPieces.end(),
if(itr->getIndex() == piece.getIndex()) { piece);
if(itr != usedPieces.end()) {
*itr = piece; *itr = piece;
break;
}
} }
} }
@ -293,17 +309,18 @@ void TorrentMan::syncPiece(Piece& piece) {
if(Piece::isNull(piece)) { if(Piece::isNull(piece)) {
return; return;
} }
for(UsedPieces::iterator itr = usedPieces.begin(); itr != usedPieces.end(); itr++) { UsedPieces::iterator itr = find(usedPieces.begin(), usedPieces.end(),
if(itr->getIndex() == piece.getIndex()) { piece);
if(itr != usedPieces.end()) {
piece = *itr; piece = *itr;
return; return;
} } else {
}
// hasPiece(piece.getIndex()) is true, then set all bit of // hasPiece(piece.getIndex()) is true, then set all bit of
// piece.bitfield to 1 // piece.bitfield to 1
if(hasPiece(piece.getIndex())) { if(hasPiece(piece.getIndex())) {
piece.setAllBlock(); piece.setAllBlock();
} }
}
} }
void TorrentMan::initBitfield() { void TorrentMan::initBitfield() {

View File

@ -91,6 +91,7 @@ private:
void setFileFilter(const Strings& filePaths); void setFileFilter(const Strings& filePaths);
void setupInternal1(const string& metaInfoFile); void setupInternal1(const string& metaInfoFile);
void setupInternal2(); void setupInternal2();
Piece checkOutPiece(int index);
public: public:
int pieceLength; int pieceLength;
int pieces; int pieces;
@ -126,7 +127,10 @@ public:
bool isPeerAvailable() const; bool isPeerAvailable() const;
int deleteOldErrorPeers(int maxNum); int deleteOldErrorPeers(int maxNum);
int getMissingPieceIndex(const Peer* peer) const;
int getMissingFastPieceIndex(const Peer* peer) const;
Piece getMissingPiece(const Peer* peer); Piece getMissingPiece(const Peer* peer);
Piece getMissingFastPiece(const Peer* peer);
void completePiece(const Piece& piece); void completePiece(const Piece& piece);
void cancelPiece(const Piece& piece); void cancelPiece(const Piece& piece);
void updatePiece(const Piece& piece); void updatePiece(const Piece& piece);

View File

@ -44,7 +44,9 @@
#define USER_AGENT "aria2" #define USER_AGENT "aria2"
#define BITFIELD_LEN_FROM_PIECES(X) (X/8+(X%8? 1 : 0)) #define BITFIELD_LEN_FROM_PIECES(X) ((X)/8+((X)%8? 1 : 0))
#define DIV_FLOOR(X,Y) ((X)/(Y)+((X)%(Y)? 1:0))
using namespace std; using namespace std;