diff --git a/ChangeLog b/ChangeLog index 7095f6f5..58c17995 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,86 @@ +2006-05-20 Tatsuhiro Tsujikawa + + To add the ability to download several pieces in mix in a command and + increase the number of request slots according to request-cancel + latency: + + * src/PeerInteractionCommand.cc + (executeInternal): The number of messages to be received at a time + is increased from 10 to 50. + * src/LogFactory + (getInstance): Added NullLogger. + * src/NullLogger.h: New class. + * src/BitfieldMan.h + (getMissingIndex): New function. + * src/BitfieldMan.cc + (getMissingIndex): New function. + * src/TorrentMan.h + (checkOutPiece): New function. + (getMissingPieceIndex): New function. + (getMissingFastPieceIndex): New function. + (getMissingFastPiece): New function. + * src/TorrentMan.cc + (updatePiece): Rewritten using STL. + (syncPiece): Rewritten using STL. + (getMissingPiece): Rewritten using getMissingPieceIndex() and + checkOutPiece(). + (getMissingPieceIndex): New function. + (getMissingFastPieceIndex): New function. + (getMissingFastPiece): New function. + (checkOutPiece): New function. + * src/PeerInteraction.h + (Pieces): New type definition. + (piece): Removed. + (pieces): New variable. + (getNewPieceAndSendInterest): Added the "piece" argument. + (abortPiece): Added the "piece" argument. + (abortAllPieces): New function. + (isInRequestSlot): New function. + (hasDownloadPiece): Added the "index" argument. + (setDownloadPiece): Removed. + (getDownloadPiece): Added the "index" argument. + (updatePiece): New function. + (createRequestMessage): Added the "index" argument. + * src/PeerInteraction.cc + (onChoked): Rewritten. + (abortPiece): Rewirtten. + (abortAllPieces): New function. + (deleteTimeoutRequestSlot): Rewritten. + Clarified code a little bit. + (deleteCompletedRequestSlot): Rewritten. + (isInRequestSlot): New function. + (syncPiece): Rewritten. + (updatePiece): New function. + (getNewPieceAndSendInterest): Rewritten. + (addRequests): Rewritten. + (getDownloadPiece): Rewritten. + (hasDownloadPiece): Rewritten. + (createRequestMessage): Added the "index" argument. + * src/common.h + (BITFIELD_LEN_FROM_PIECES): Enclosed the variable in parentheses. + (DIV_FLOOR): New definition. + * src/PieceMessage.cc + (receivedAction): Update request-piece latency here. + * src/RequestSlot.h + (getLatencyInMillis): New function. + * src/RequestSlot.cc + (isTimeout): Rewritten using getLatencyInMillis. + (getLatencyInMillis): New function. + * src/Piece.h + (operator==): New function. + (getMissingBlockIndex): New function. + * src/Piece.cc + (operator==): New function. + (getMissingBlockIndex): New function. + * src/Peer.h + (DEFAULT_LATENCY): New definition. + (latency): New variable. + (updateLatency): New function. + (getLatency): New function. + * src/Peer.cc + (resetStatus): Reset latecy. + (updateLatency): New function. + 2006-05-20 Tatsuhiro Tsujikawa * src/SocketCore.cc diff --git a/src/BitfieldMan.cc b/src/BitfieldMan.cc index 9ae3276c..c5c120f9 100644 --- a/src/BitfieldMan.cc +++ b/src/BitfieldMan.cc @@ -192,6 +192,20 @@ int BitfieldMan::getFirstMissingUnusedIndex() const { return -1; } +int BitfieldMan::getMissingIndex() const { + unsigned char* tempBitfield = new unsigned char[bitfieldLength]; + for(int i = 0; i < bitfieldLength; i++) { + tempBitfield[i] = ~bitfield[i]; + if(filterEnabled) { + tempBitfield[i] &= filterBitfield[i]; + } + } + int max = countSetBit(tempBitfield, bitfieldLength); + int index = getMissingIndexRandomly(tempBitfield, bitfieldLength, max); + delete [] tempBitfield; + return index; +} + BlockIndexes BitfieldMan::getAllMissingIndexes() const { BlockIndexes missingIndexes; for(int i = 0; i < bitfieldLength; i++) { diff --git a/src/BitfieldMan.h b/src/BitfieldMan.h index f329a52f..7c9a9522 100644 --- a/src/BitfieldMan.h +++ b/src/BitfieldMan.h @@ -68,6 +68,10 @@ public: * affected by filter */ int getMissingIndex(const unsigned char* bitfield, int len) const; + /** + * affected by filter + */ + int getMissingIndex() const; /** * affected by filter */ diff --git a/src/LogFactory.cc b/src/LogFactory.cc index e45ae653..8bb5780f 100644 --- a/src/LogFactory.cc +++ b/src/LogFactory.cc @@ -21,19 +21,20 @@ /* copyright --> */ #include "LogFactory.h" #include "SimpleLogger.h" +#include "NullLogger.h" string LogFactory::filename; Logger* LogFactory::logger = NULL; Logger* LogFactory::getInstance() { if(logger == NULL) { - SimpleLogger* slogger = new SimpleLogger(); if(filename.empty()) { - slogger->openFile("/dev/null"); + logger = new NullLogger(); } else { + SimpleLogger* slogger = new SimpleLogger(); slogger->openFile(filename); + logger = slogger; } - logger = slogger; } return logger; } diff --git a/src/Makefile.am b/src/Makefile.am index 5ac5d866..55209d41 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -100,7 +100,8 @@ SRCS = Socket.cc Socket.h\ RejectMessage.cc RejectMessage.h\ AllowedFastMessage.cc AllowedFastMessage.h\ SuggestPieceMessage.cc SuggestPieceMessage.h\ - SimplePeerMessage.cc SimplePeerMessage.h + SimplePeerMessage.cc SimplePeerMessage.h\ + NullLogger.h noinst_LIBRARIES = libaria2c.a libaria2c_a_SOURCES = $(SRCS) aria2c_LDADD = libaria2c.a @LIBINTL@ @ALLOCA@ @LIBGNUTLS_LIBS@\ diff --git a/src/Makefile.in b/src/Makefile.in index b61a96e9..a113ed2d 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -359,7 +359,8 @@ SRCS = Socket.cc Socket.h\ RejectMessage.cc RejectMessage.h\ AllowedFastMessage.cc AllowedFastMessage.h\ SuggestPieceMessage.cc SuggestPieceMessage.h\ - SimplePeerMessage.cc SimplePeerMessage.h + SimplePeerMessage.cc SimplePeerMessage.h\ + NullLogger.h noinst_LIBRARIES = libaria2c.a libaria2c_a_SOURCES = $(SRCS) diff --git a/src/NullLogger.h b/src/NullLogger.h new file mode 100644 index 00000000..e5cd69b4 --- /dev/null +++ b/src/NullLogger.h @@ -0,0 +1,43 @@ +/* */ +#ifndef _D_NULL_LOGGER_H_ +#define _D_NULL_LOGGER_H_ + +#include "Logger.h" + +using namespace std; + +class NullLogger : public Logger { +public: + NullLogger() {} + virtual ~NullLogger() {} + virtual void debug(const char* msg, ...) const {} + virtual void debug(const char* msg, Exception* ex, ...) const {} + virtual void info(const char* msg, ...) const {} + virtual void info(const char* msg, Exception* ex, ...) const {} + virtual void warn(const char* msg, ...) const {} + virtual void warn(const char* msg, Exception* ex, ...) const {} + virtual void error(const char* msg, ...) const {} + virtual void error(const char* msg, Exception* ex, ...) const {} +}; + +#endif // _D_NULL_LOGGER_H_ diff --git a/src/Peer.cc b/src/Peer.cc index 9c83321e..28099c45 100644 --- a/src/Peer.cc +++ b/src/Peer.cc @@ -60,6 +60,7 @@ void Peer::resetStatus() { chokingRequired = true; optUnchoking = false; fastExtensionEnabled = false; + latency = DEFAULT_LATENCY; fastSet.clear(); } @@ -76,3 +77,7 @@ void Peer::addFastSetIndex(int index) { void Peer::setAllBitfield() { bitfield->setAllBit(); } + +void Peer::updateLatency(int latency) { + this->latency = (this->latency*80+latency*20)/200; +} diff --git a/src/Peer.h b/src/Peer.h index 8958170f..f87010bb 100644 --- a/src/Peer.h +++ b/src/Peer.h @@ -30,6 +30,7 @@ using namespace std; #define PEER_ID_LENGTH 20 +#define DEFAULT_LATENCY 1000 class Peer { public: @@ -57,6 +58,7 @@ private: long long int totalLength; int deltaUpload; int deltaDownload; + int latency; public: Peer(string ipaddr, int port, int pieceLength, long long int totalLength): entryId(0), ipaddr(ipaddr), port(port), @@ -68,7 +70,8 @@ public: fastExtensionEnabled(false), peerUpload(0), peerDownload(0), pieceLength(pieceLength), totalLength(totalLength), - deltaUpload(0), deltaDownload(0) { + deltaUpload(0), deltaDownload(0), + latency(DEFAULT_LATENCY) { this->bitfield = new BitfieldMan(pieceLength, totalLength); } @@ -140,6 +143,9 @@ public: bool isSeeder() const; + void updateLatency(int latency); + int getLatency() const { return latency; } + static Peer* nullPeer; }; diff --git a/src/PeerInteraction.cc b/src/PeerInteraction.cc index 83dfd180..28fd9989 100644 --- a/src/PeerInteraction.cc +++ b/src/PeerInteraction.cc @@ -35,8 +35,7 @@ PeerInteraction::PeerInteraction(int cuid, :cuid(cuid), uploadLimit(0), torrentMan(torrentMan), - peer(peer), - piece(Piece::nullPiece) { + peer(peer) { peerConnection = new PeerConnection(cuid, socket, op); logger = LogFactory::getInstance(); } @@ -163,34 +162,51 @@ void PeerInteraction::rejectPieceMessageInQueue(int index, int begin, int length } void PeerInteraction::onChoked() { - if(!Piece::isNull(piece) && !peer->isInFastSet(piece.getIndex())) { - abortPiece(); + for(Pieces::iterator itr = pieces.begin(); itr != pieces.end();) { + Piece& piece = *itr; + if(!peer->isInFastSet(piece.getIndex())) { + abortPiece(piece); + itr = pieces.erase(itr); + } else { + itr++; + } } } -void PeerInteraction::abortPiece() { +void PeerInteraction::abortAllPieces() { + for(Pieces::iterator itr = pieces.begin(); itr != pieces.end();) { + abortPiece(*itr); + itr = pieces.erase(itr); + } +} + +void PeerInteraction::abortPiece(Piece& piece) { if(!Piece::isNull(piece)) { for(MessageQueue::iterator itr = messageQueue.begin(); itr != messageQueue.end();) { - if((*itr)->getId() == RequestMessage::ID - && !(*itr)->isInProgress()) { + if((*itr)->getId() == RequestMessage::ID && + !(*itr)->isInProgress() && + ((RequestMessage*)*itr)->getIndex() == piece.getIndex()) { delete *itr; itr = messageQueue.erase(itr); } else { itr++; } } - for(RequestSlots::const_iterator itr = requestSlots.begin(); - itr != requestSlots.end(); itr++) { - logger->debug("CUID#%d - Deleting request slot blockIndex=%d" - " because piece was canceled", - cuid, - itr->getBlockIndex()); - piece.cancelBlock(itr->getBlockIndex()); + for(RequestSlots::iterator itr = requestSlots.begin(); + itr != requestSlots.end();) { + if(itr->getIndex() == piece.getIndex()) { + logger->debug("CUID#%d - Deleting request slot blockIndex=%d" + " because piece was canceled", + cuid, + itr->getBlockIndex()); + piece.cancelBlock(itr->getBlockIndex()); + itr = requestSlots.erase(itr); + } else { + itr++; + } } - requestSlots.clear(); torrentMan->cancelPiece(piece); - piece = Piece::nullPiece; } } @@ -206,31 +222,35 @@ void PeerInteraction::deleteRequestSlot(const RequestSlot& requestSlot) { void PeerInteraction::deleteTimeoutRequestSlot() { for(RequestSlots::iterator itr = requestSlots.begin(); itr != requestSlots.end();) { - if(itr->isTimeout(REQUEST_TIME_OUT)) { + RequestSlot& slot = *itr; + if(slot.isTimeout(REQUEST_TIME_OUT)) { logger->debug("CUID#%d - Deleting request slot blockIndex=%d" " because of time out", cuid, - itr->getBlockIndex()); - if(!Piece::isNull(piece)) { - piece.cancelBlock(itr->getBlockIndex()); - } + slot.getBlockIndex()); + Piece& piece = getDownloadPiece(slot.getIndex()); + piece.cancelBlock(slot.getBlockIndex()); itr = requestSlots.erase(itr); } else { itr++; } } - torrentMan->updatePiece(piece); + updatePiece(); } void PeerInteraction::deleteCompletedRequestSlot() { for(RequestSlots::iterator itr = requestSlots.begin(); itr != requestSlots.end();) { - if(Piece::isNull(piece) || piece.hasBlock(itr->getBlockIndex()) || + RequestSlot& slot = *itr; + Piece piece = getDownloadPiece(slot.getIndex()); + if(piece.hasBlock(slot.getBlockIndex()) || torrentMan->hasPiece(piece.getIndex())) { logger->debug("CUID#%d - Deleting request slot blockIndex=%d because" " the block has been acquired.", cuid, - itr->getBlockIndex()); - addMessage(createCancelMessage(itr->getIndex(), itr->getBegin(), itr->getLength())); + slot.getBlockIndex()); + addMessage(createCancelMessage(slot.getIndex(), + slot.getBegin(), + slot.getLength())); itr = requestSlots.erase(itr); } else { itr++; @@ -238,6 +258,17 @@ void PeerInteraction::deleteCompletedRequestSlot() { } } +bool PeerInteraction::isInRequestSlot(int index, int blockIndex) const { + for(RequestSlots::const_iterator itr = requestSlots.begin(); + itr != requestSlots.end(); itr++) { + const RequestSlot& slot = *itr; + if(slot.getIndex() == index && slot.getBlockIndex() == blockIndex) { + return true; + } + } + return false; +} + RequestSlot PeerInteraction::getCorrespondingRequestSlot(int index, int begin, int length) const { @@ -379,62 +410,112 @@ PeerMessage* PeerInteraction::createPeerMessage(const char* msg, int msgLength) void PeerInteraction::syncPiece() { - if(Piece::isNull(piece)) { - return; + for(Pieces::iterator itr = pieces.begin(); itr != pieces.end(); itr++) { + torrentMan->syncPiece(*itr); } - torrentMan->syncPiece(piece); } -void PeerInteraction::getNewPieceAndSendInterest() { - piece = torrentMan->getMissingPiece(peer); - if(Piece::isNull(piece)) { - logger->debug("CUID#%d - Not interested in the peer", cuid); - addMessage(createNotInterestedMessage()); - } else { - if(peer->peerChoking && !peer->isInFastSet(piece.getIndex())) { - abortPiece(); - } else { - logger->info("CUID#%d - Starting download for piece index=%d (%d/%d completed)", - cuid, piece.getIndex(), piece.countCompleteBlock(), - piece.countBlock()); +void PeerInteraction::updatePiece() { + for(Pieces::iterator itr = pieces.begin(); itr != pieces.end(); itr++) { + torrentMan->updatePiece(*itr); + } +} + +void PeerInteraction::getNewPieceAndSendInterest(int pieceNum) { + int index = torrentMan->getMissingPieceIndex(peer); + if(pieces.empty() && index == -1) { + if(peer->amInterested) { + logger->debug("CUID#%d - Not interested in the peer", cuid); + addMessage(createNotInterestedMessage()); + } + } else { + if(peer->peerChoking) { + onChoked(); + if(peer->isFastExtensionEnabled()) { + while((int)pieces.size() < pieceNum) { + Piece piece = torrentMan->getMissingFastPiece(peer); + if(Piece::isNull(piece)) { + break; + } else { + pieces.push_back(piece); + } + } + } + } else { + while((int)pieces.size() < pieceNum) { + Piece piece = torrentMan->getMissingPiece(peer); + if(Piece::isNull(piece)) { + break; + } else { + pieces.push_back(piece); + } + } + } + if(!peer->amInterested) { + logger->debug("CUID#%d - Interested in the peer", cuid); + addMessage(createInterestedMessage()); } - logger->debug("CUID#%d - Interested in the peer", cuid); - addMessage(createInterestedMessage()); } } void PeerInteraction::addRequests() { - if(Piece::isNull(piece)) { - // retrive new piece from TorrentMan - getNewPieceAndSendInterest(); - } else if(peer->peerChoking && !peer->isInFastSet(piece.getIndex())) { - onChoked(); - } else if(piece.pieceComplete()) { - abortPiece(); - getNewPieceAndSendInterest(); + // Abort downloading of completed piece. + for(Pieces::iterator itr = pieces.begin(); itr != pieces.end();) { + Piece& piece = *itr; + if(piece.pieceComplete()) { + abortPiece(piece); + itr = pieces.erase(itr); + } else { + itr++; + } } - if(!Piece::isNull(piece)) { + int MAX_PENDING_REQUEST; + if(peer->getLatency() < 300) { + MAX_PENDING_REQUEST = 24; + } else if(peer->getLatency() < 600) { + MAX_PENDING_REQUEST = 18; + } else if(peer->getLatency() < 1000) { + MAX_PENDING_REQUEST = 12; + } else { + MAX_PENDING_REQUEST = 6; + } + int pieceNum; + if(torrentMan->isEndGame()) { + pieceNum = 1; + } else { + int blocks = DIV_FLOOR(torrentMan->pieceLength, BLOCK_LENGTH); + pieceNum = DIV_FLOOR(MAX_PENDING_REQUEST, blocks); + } + getNewPieceAndSendInterest(pieceNum); + for(Pieces::iterator itr = pieces.begin(); itr != pieces.end(); itr++) { + Piece& piece = *itr; if(torrentMan->isEndGame()) { BlockIndexes missingBlockIndexes = piece.getAllMissingBlockIndexes(); - if(countRequestSlot() == 0) { - random_shuffle(missingBlockIndexes.begin(), missingBlockIndexes.end()); - int count = 0; - for(BlockIndexes::const_iterator itr = missingBlockIndexes.begin(); - itr != missingBlockIndexes.end() && count < 6; itr++, count++) { - addMessage(createRequestMessage(*itr)); + random_shuffle(missingBlockIndexes.begin(), missingBlockIndexes.end()); + int count = countRequestSlot(); + for(BlockIndexes::const_iterator bitr = missingBlockIndexes.begin(); + bitr != missingBlockIndexes.end() && count < MAX_PENDING_REQUEST; + bitr++) { + int blockIndex = *bitr; + if(!isInRequestSlot(piece.getIndex(), blockIndex)) { + addMessage(createRequestMessage(piece.getIndex(), blockIndex)); + count++; } } } else { - for(int i = countRequestSlot(); i < 6; i++) { + while(countRequestSlot() < MAX_PENDING_REQUEST) { int blockIndex = piece.getMissingUnusedBlockIndex(); if(blockIndex == -1) { break; } - torrentMan->updatePiece(piece); - addMessage(createRequestMessage(blockIndex)); + addMessage(createRequestMessage(piece.getIndex(), blockIndex)); } } + if(countRequestSlot() >= MAX_PENDING_REQUEST) { + break; + } } + updatePiece(); } void PeerInteraction::sendHandshake() { @@ -474,11 +555,22 @@ void PeerInteraction::sendAllowedFast() { } } -Piece& PeerInteraction::getDownloadPiece() { - if(Piece::isNull(piece)) { - throw new DlAbortEx("current piece is null"); +Piece& PeerInteraction::getDownloadPiece(int index) { + for(Pieces::iterator itr = pieces.begin(); itr != pieces.end(); itr++) { + if(itr->getIndex() == index) { + return *itr; + } } - return piece; + throw new DlAbortEx("No such piece index=%d", index); +} + +bool PeerInteraction::hasDownloadPiece(int index) const { + for(Pieces::const_iterator itr = pieces.begin(); itr != pieces.end(); itr++) { + if(itr->getIndex() == index) { + return true; + } + } + return false; } bool PeerInteraction::isInFastSet(int index) const { @@ -497,8 +589,9 @@ void PeerInteraction::setPeerMessageCommonProperty(PeerMessage* peerMessage) { peerMessage->setPeerInteraction(this); } -RequestMessage* PeerInteraction::createRequestMessage(int blockIndex) { +RequestMessage* PeerInteraction::createRequestMessage(int index, int blockIndex) { RequestMessage* msg = new RequestMessage(); + Piece piece = getDownloadPiece(index); msg->setIndex(piece.getIndex()); msg->setBegin(blockIndex*piece.getBlockLength()); msg->setLength(piece.getBlockLength(blockIndex)); diff --git a/src/PeerInteraction.h b/src/PeerInteraction.h index f41c6bce..079a4973 100644 --- a/src/PeerInteraction.h +++ b/src/PeerInteraction.h @@ -48,6 +48,7 @@ typedef deque RequestSlots; typedef deque MessageQueue; +typedef deque Pieces; class PeerInteraction { private: @@ -59,12 +60,12 @@ private: TorrentMan* torrentMan; PeerConnection* peerConnection; Peer* peer; - Piece piece; + Pieces pieces; // allowed fast piece indexes that local client has sent Integers fastSet; const Logger* logger; - void getNewPieceAndSendInterest(); + void getNewPieceAndSendInterest(int pieceNum); PeerMessage* createPeerMessage(const char* msg, int msgLength); HandshakeMessage* createHandshakeMessage(const char* msg, int msgLength); void setPeerMessageCommonProperty(PeerMessage* peerMessage); @@ -81,13 +82,15 @@ public: void rejectPieceMessageInQueue(int index, int begin, int length); void rejectAllPieceMessageInQueue(); void onChoked(); - void abortPiece(); + void abortPiece(Piece& piece); + void abortAllPieces(); bool isSendingMessageInProgress() const; void deleteRequestSlot(const RequestSlot& requestSlot); void deleteTimeoutRequestSlot(); void deleteCompletedRequestSlot(); RequestSlot getCorrespondingRequestSlot(int index, int begin, int length) const; + bool isInRequestSlot(int index, int blockIndex) const; int countMessageInQueue() const; @@ -97,21 +100,17 @@ public: TorrentMan* getTorrentMan() const { return torrentMan; } PeerConnection* getPeerConnection() const { return peerConnection; } // If this object has nullPiece, then return false, otherwise true - bool hasDownloadPiece() const { - return !Piece::isNull(piece); - } + bool hasDownloadPiece(int index) const; // If the piece which this object has is nullPiece, then throws an exception. // So before calling this function, call hasDownloadPiece and make sure // this has valid piece, not nullPiece. - Piece& getDownloadPiece(); - void setDownloadPiece(const Piece& piece) { - this->piece = piece; - } + Piece& getDownloadPiece(int index); bool isInFastSet(int index) const; void addFastSetIndex(int index); void syncPiece(); + void updatePiece(); void addRequests(); void sendMessages(int currentUploadSpeed); void sendHandshake(); @@ -121,7 +120,7 @@ public: PeerMessage* receiveMessage(); HandshakeMessage* receiveHandshake(); - RequestMessage* createRequestMessage(int blockIndex); + RequestMessage* createRequestMessage(int index, int blockIndex); CancelMessage* createCancelMessage(int index, int begin, int length); PieceMessage* createPieceMessage(int index, int begin, int length); HaveMessage* createHaveMessage(int index); diff --git a/src/PeerInteractionCommand.cc b/src/PeerInteractionCommand.cc index 16e32515..e7be9d55 100644 --- a/src/PeerInteractionCommand.cc +++ b/src/PeerInteractionCommand.cc @@ -113,7 +113,7 @@ bool PeerInteractionCommand::executeInternal() { case WIRED: peerInteraction->syncPiece(); decideChoking(); - for(int i = 0; i < 10; i++) { + for(int i = 0; i < 50; i++) { if(!socket->isReadable(0)) { break; } @@ -245,7 +245,7 @@ bool PeerInteractionCommand::prepareForRetry(int wait) { } void PeerInteractionCommand::onAbort(Exception* ex) { - peerInteraction->abortPiece(); + peerInteraction->abortAllPieces(); PeerAbstractCommand::onAbort(ex); } diff --git a/src/Piece.cc b/src/Piece.cc index d4a36972..0c183d34 100644 --- a/src/Piece.cc +++ b/src/Piece.cc @@ -49,6 +49,10 @@ Piece& Piece::operator=(const Piece& piece) { return *this; } +bool Piece::operator==(const Piece& piece) const { + return index == piece.index; +} + void Piece::completeBlock(int blockIndex) { bitfield->setBit(blockIndex); bitfield->unsetUseBit(blockIndex); @@ -79,6 +83,15 @@ int Piece::getMissingUnusedBlockIndex() const { return blockIndex; } +int Piece::getMissingBlockIndex() const { + int blockIndex = bitfield->getMissingIndex(); + if(blockIndex == -1) { + return blockIndex; + } + bitfield->setUseBit(blockIndex); + return blockIndex; +} + BlockIndexes Piece::getAllMissingBlockIndexes() const { return bitfield->getAllMissingIndexes(); } diff --git a/src/Piece.h b/src/Piece.h index 9d77576f..7cf322ce 100644 --- a/src/Piece.h +++ b/src/Piece.h @@ -43,8 +43,10 @@ public: } Piece& operator=(const Piece& piece); + bool operator==(const Piece& piece) const; int getMissingUnusedBlockIndex() const; + int getMissingBlockIndex() const; BlockIndexes getAllMissingBlockIndexes() const; void completeBlock(int blockIndex); void cancelBlock(int blockIndex); diff --git a/src/PieceMessage.cc b/src/PieceMessage.cc index 54de5851..4930204b 100644 --- a/src/PieceMessage.cc +++ b/src/PieceMessage.cc @@ -57,9 +57,11 @@ void PieceMessage::receivedAction() { begin, blockLength); peer->addPeerUpload(blockLength); - if(peerInteraction->hasDownloadPiece() && - !RequestSlot::isNull(slot)) { - Piece& piece = peerInteraction->getDownloadPiece(); + if(!RequestSlot::isNull(slot) && + peerInteraction->hasDownloadPiece(slot.getIndex())) { + //logger->debug("CUID#%d - Latency=%d", cuid, slot.getLatencyInMillis()); + peer->updateLatency(slot.getLatencyInMillis()); + Piece& piece = peerInteraction->getDownloadPiece(slot.getIndex()); long long int offset = ((long long int)index)*torrentMan->pieceLength+begin; logger->debug("CUID#%d - Writing the block length=%d, offset=%lld", @@ -197,7 +199,6 @@ void PieceMessage::onGotNewPiece(Piece& piece) { logger->info(MSG_GOT_NEW_PIECE, cuid, piece.getIndex()); torrentMan->completePiece(piece); torrentMan->advertisePiece(cuid, piece.getIndex()); - piece = Piece::nullPiece; } void PieceMessage::onGotWrongPiece(Piece& piece) { diff --git a/src/RequestSlot.cc b/src/RequestSlot.cc index 7793999e..3cdd7957 100644 --- a/src/RequestSlot.cc +++ b/src/RequestSlot.cc @@ -53,9 +53,13 @@ void RequestSlot::setDispatchedTime() { } bool RequestSlot::isTimeout(int timeoutSec) const { + return getLatencyInMillis() > timeoutSec*1000; +} + +int RequestSlot::getLatencyInMillis() const { struct timeval now; gettimeofday(&now, NULL); - return Util::difftv(now, dispatchedTime) > ((long long int)timeoutSec)*1000000; + return Util::difftv(now, dispatchedTime)/1000; } bool RequestSlot::isNull(const RequestSlot& requestSlot) { diff --git a/src/RequestSlot.h b/src/RequestSlot.h index f4c36d9b..a7e565ea 100644 --- a/src/RequestSlot.h +++ b/src/RequestSlot.h @@ -43,6 +43,7 @@ public: void setDispatchedTime(); bool isTimeout(int timeoutSec) const; + int getLatencyInMillis() const; bool operator==(const RequestSlot& requestSlot) const; diff --git a/src/TorrentMan.cc b/src/TorrentMan.cc index 0cad6e26..c8b1b394 100644 --- a/src/TorrentMan.cc +++ b/src/TorrentMan.cc @@ -145,7 +145,19 @@ bool TorrentMan::isEndGame() const { return bitfield->countMissingBlock() <= END_GAME_PIECE_NUM; } -Piece TorrentMan::getMissingPiece(const Peer* peer) { +int TorrentMan::getMissingPieceIndex(const Peer* peer) const { + int index = -1; + if(isEndGame()) { + index = bitfield->getMissingIndex(peer->getBitfield(), + peer->getBitfieldLength()); + } else { + index = bitfield->getMissingUnusedIndex(peer->getBitfield(), + peer->getBitfieldLength()); + } + return index; +} + +int TorrentMan::getMissingFastPieceIndex(const Peer* peer) const { int index = -1; if(peer->isFastExtensionEnabled() && peer->countFastSet() > 0) { BitfieldMan tempBitfield(pieceLength, totalLength); @@ -163,15 +175,20 @@ Piece TorrentMan::getMissingPiece(const Peer* peer) { tempBitfield.getBitfieldLength()); } } - if(index == -1) { - if(isEndGame()) { - index = bitfield->getMissingIndex(peer->getBitfield(), - peer->getBitfieldLength()); - } else { - index = bitfield->getMissingUnusedIndex(peer->getBitfield(), - peer->getBitfieldLength()); - } - } + return index; +} + +Piece TorrentMan::getMissingFastPiece(const Peer* peer) { + int index = getMissingFastPieceIndex(peer); + return checkOutPiece(index); +} + +Piece TorrentMan::getMissingPiece(const Peer* peer) { + int index = getMissingPieceIndex(peer); + return checkOutPiece(index); +} + +Piece TorrentMan::checkOutPiece(int index) { if(index == -1) { return Piece::nullPiece; } @@ -281,11 +298,10 @@ void TorrentMan::updatePiece(const Piece& piece) { if(Piece::isNull(piece)) { return; } - for(UsedPieces::iterator itr = usedPieces.begin(); itr != usedPieces.end(); itr++) { - if(itr->getIndex() == piece.getIndex()) { - *itr = piece; - break; - } + UsedPieces::iterator itr = find(usedPieces.begin(), usedPieces.end(), + piece); + if(itr != usedPieces.end()) { + *itr = piece; } } @@ -293,17 +309,18 @@ void TorrentMan::syncPiece(Piece& piece) { if(Piece::isNull(piece)) { return; } - for(UsedPieces::iterator itr = usedPieces.begin(); itr != usedPieces.end(); itr++) { - if(itr->getIndex() == piece.getIndex()) { - piece = *itr; - return; + UsedPieces::iterator itr = find(usedPieces.begin(), usedPieces.end(), + piece); + if(itr != usedPieces.end()) { + piece = *itr; + return; + } else { + // hasPiece(piece.getIndex()) is true, then set all bit of + // piece.bitfield to 1 + if(hasPiece(piece.getIndex())) { + piece.setAllBlock(); } } - // hasPiece(piece.getIndex()) is true, then set all bit of - // piece.bitfield to 1 - if(hasPiece(piece.getIndex())) { - piece.setAllBlock(); - } } void TorrentMan::initBitfield() { diff --git a/src/TorrentMan.h b/src/TorrentMan.h index 2a84a45c..8fbdba6a 100644 --- a/src/TorrentMan.h +++ b/src/TorrentMan.h @@ -91,6 +91,7 @@ private: void setFileFilter(const Strings& filePaths); void setupInternal1(const string& metaInfoFile); void setupInternal2(); + Piece checkOutPiece(int index); public: int pieceLength; int pieces; @@ -126,7 +127,10 @@ public: bool isPeerAvailable() const; int deleteOldErrorPeers(int maxNum); + int getMissingPieceIndex(const Peer* peer) const; + int getMissingFastPieceIndex(const Peer* peer) const; Piece getMissingPiece(const Peer* peer); + Piece getMissingFastPiece(const Peer* peer); void completePiece(const Piece& piece); void cancelPiece(const Piece& piece); void updatePiece(const Piece& piece); diff --git a/src/common.h b/src/common.h index 298f136e..81889b82 100644 --- a/src/common.h +++ b/src/common.h @@ -44,7 +44,9 @@ #define USER_AGENT "aria2" -#define BITFIELD_LEN_FROM_PIECES(X) (X/8+(X%8? 1 : 0)) +#define BITFIELD_LEN_FROM_PIECES(X) ((X)/8+((X)%8? 1 : 0)) + +#define DIV_FLOOR(X,Y) ((X)/(Y)+((X)%(Y)? 1:0)) using namespace std;