mirror of https://github.com/aria2/aria2
2006-05-24 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>
To attempt to handle EINTR: * src/SocketCore.cc (isReadable): Added EINTR handling. (isWritable): Added EINTR handling. To improve the performance: * src/AbstractCommand.cc (isTimeoutDetected): Use Util::difftvsec() instead of Util::difftv(). * src/PeerInteractionCommand.h (receiveMessage): Renamed as receiveMessages(). (receiveMessages): New function. * src/PeerInteractionCommand.cc (executeInternal): receiveMessage loop is moved to new receiveMessages () function. detectMessageFlooding() is moved here. (detectMessageFlooding): Use Util::difftvsec() instead of Util::difftv (). The flooding detection for have message is comment out. (checkLongTimePeerChoking): Use Util::difftvsec() instead of Util::difftv(). (receiveMessage): Renamed as receiveMessages(). (receiveMessages): New function. (keepAlive): Use Util::difftvsec() instead of Util::difftv(). (beforeSocketCheck): Commented out checkLongTimePeerChoking(). * src/SleepCommand.cc (execute): Use Util::difftvsec() instead of Util::difftv(). * src/BitfieldMan.h (getNthBitIndex): New function. (hasMissingPiece): New function. (getAllMissingIndexes): New function. * src/BitfieldMan.cc (countSetBit): Rewritten. (getNthBitIndex): New function. (getMissingIndexRandomly): Rewritten. (hasMissingPiece): New function. (getAllMissingIndexes): New function. * src/TorrentMan.cc (hasMissingPiece): New function. (deleteUsedPiece): Rewritten using STL. * src/PeerInteraction.cc (getNewPieceAndSendInterest): Use TorrentMan::hasMissingPiece(), which is a little bit faster then TorrentMan::getMissingPieceIndex(). (addRequests): Updated the number of pending requests. * src/PeerAbstractCommand.cc (isTimeoutDetected): Use Util::difftvsec() instead of Util::difftv(). (execute): Returns true if TorrentMan::isHalt() is true. Corrected the condition of upload limit checking. * src/Util.h (countBit): New function. * src/Util.cc (nbits): New variable. (countBit): New function. * src/DownloadEngine.h (SockCmdMap): New type definition. (waitData): Added an argument. (addSocket): Added an argument. (addSocketForReadCheck): Added an argument. (addSocketForWriteCheck): Added an argument. * src/DownloadEngine.cc (run): Executes only commands whose sockets are ready to read or write. All commands are executed in every 1 second. (waitData): Calls select() again if it returned EINTR. (addSocket): Saves socket and command object pair to the map. (deleteSocket): Deletes socket and command object pair from the map. (addSocketForReadCheck): Added an argument. (addSocketForWriteCheck): Added an argument. * src/DownloadCommand.cc (executeInternal): Use Util::difftvsec() instead of Util::difftv(). To send "event=stopped" to the tracker when the application finishes: * src/PeerChokeCommand.h (checkPoint): New variable. * src/PeerChokeCommand.cc (PeerChokeCommand): Added the initialization of checkPoint. (execute): The interval check is now done by checkPoint, not SleepCommand. Return true if TorrentMan::isHalt() is true. * src/TorrentMan.h (halt): New function. (hasMissingPiece): New function. (isHalt): New function. (setHalt): New function. * src/TorrentMan.cc (TorrentMan): Added the initialization of halt. * src/TorrentAutoSaveCommand.h (checkPoint): New variable. * src/PeerListenCommand.cc (execute): Returns true if TorrentMan::isHalt() is true. * src/main.cc (setSignalHander): Added flags argument. (torrentHandler): Just calls TorrentMan::setHalt(true) and returns. (main): Set SA_ONESHOT flag of the signal hander of SIGINT and SIGTERM. Removed printDownloadAbortMessage() after torrent downloading loop. * src/TorrentAutoSaveCommand.cc (TorrentAutoSaveCommand): Added the initialization of checkPoint. (execute): Returns true if TorrentMan::isHalt() is true. The interval check is now done by checkPoint, not SleepCommand. * src/TrackerWatcherCommand.cc (execute): If TorrentMan::isHalt(), then create a tracker request with event=stopped. * src/TrackerUpdateCommand.cc (prepareForRetry): Do not use SleepCommand here. (execute): Returns true if TorrentMan::isHalt() is true. Others: * src/TorrentMan.cc (getMissingPieceIndex): Updated log message. * src/PeerInteraction.cc (createPeerMessag): Updated log message.pull/1/head
parent
896ca193e0
commit
0411de9271
118
ChangeLog
118
ChangeLog
|
@ -1,3 +1,117 @@
|
|||
2006-05-24 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>
|
||||
|
||||
To attempt to handle EINTR:
|
||||
|
||||
* src/SocketCore.cc
|
||||
(isReadable): Added EINTR handling.
|
||||
(isWritable): Added EINTR handling.
|
||||
|
||||
To improve the performance:
|
||||
|
||||
* src/AbstractCommand.cc
|
||||
(isTimeoutDetected): Use Util::difftvsec() instead of Util::difftv().
|
||||
* src/PeerInteractionCommand.h
|
||||
(receiveMessage): Renamed as receiveMessages().
|
||||
(receiveMessages): New function.
|
||||
* src/PeerInteractionCommand.cc
|
||||
(executeInternal): receiveMessage loop is moved to new receiveMessages
|
||||
() function. detectMessageFlooding() is moved here.
|
||||
(detectMessageFlooding): Use Util::difftvsec() instead of Util::difftv
|
||||
(). The flooding detection for have message is comment out.
|
||||
(checkLongTimePeerChoking): Use Util::difftvsec() instead of
|
||||
Util::difftv().
|
||||
(receiveMessage): Renamed as receiveMessages().
|
||||
(receiveMessages): New function.
|
||||
(keepAlive): Use Util::difftvsec() instead of Util::difftv().
|
||||
(beforeSocketCheck): Commented out checkLongTimePeerChoking().
|
||||
* src/SleepCommand.cc
|
||||
(execute): Use Util::difftvsec() instead of Util::difftv().
|
||||
* src/BitfieldMan.h
|
||||
(getNthBitIndex): New function.
|
||||
(hasMissingPiece): New function.
|
||||
(getAllMissingIndexes): New function.
|
||||
* src/BitfieldMan.cc
|
||||
(countSetBit): Rewritten.
|
||||
(getNthBitIndex): New function.
|
||||
(getMissingIndexRandomly): Rewritten.
|
||||
(hasMissingPiece): New function.
|
||||
(getAllMissingIndexes): New function.
|
||||
* src/TorrentMan.cc
|
||||
(hasMissingPiece): New function.
|
||||
(deleteUsedPiece): Rewritten using STL.
|
||||
* src/PeerInteraction.cc
|
||||
(getNewPieceAndSendInterest): Use TorrentMan::hasMissingPiece(), which
|
||||
is a little bit faster then TorrentMan::getMissingPieceIndex().
|
||||
(addRequests): Updated the number of pending requests.
|
||||
* src/PeerAbstractCommand.cc
|
||||
(isTimeoutDetected): Use Util::difftvsec() instead of Util::difftv().
|
||||
(execute): Returns true if TorrentMan::isHalt() is true.
|
||||
Corrected the condition of upload limit checking.
|
||||
* src/Util.h
|
||||
(countBit): New function.
|
||||
* src/Util.cc
|
||||
(nbits): New variable.
|
||||
(countBit): New function.
|
||||
* src/DownloadEngine.h
|
||||
(SockCmdMap): New type definition.
|
||||
(waitData): Added an argument.
|
||||
(addSocket): Added an argument.
|
||||
(addSocketForReadCheck): Added an argument.
|
||||
(addSocketForWriteCheck): Added an argument.
|
||||
* src/DownloadEngine.cc
|
||||
(run): Executes only commands whose sockets are ready to read or write.
|
||||
All commands are executed in every 1 second.
|
||||
(waitData): Calls select() again if it returned
|
||||
EINTR.
|
||||
(addSocket): Saves socket and command object pair to the map.
|
||||
(deleteSocket): Deletes socket and command object pair from the map.
|
||||
(addSocketForReadCheck): Added an argument.
|
||||
(addSocketForWriteCheck): Added an argument.
|
||||
* src/DownloadCommand.cc
|
||||
(executeInternal): Use Util::difftvsec() instead of Util::difftv().
|
||||
|
||||
To send "event=stopped" to the tracker when the application finishes:
|
||||
|
||||
* src/PeerChokeCommand.h
|
||||
(checkPoint): New variable.
|
||||
* src/PeerChokeCommand.cc
|
||||
(PeerChokeCommand): Added the initialization of checkPoint.
|
||||
(execute): The interval check is now done by checkPoint, not
|
||||
SleepCommand. Return true if TorrentMan::isHalt() is true.
|
||||
* src/TorrentMan.h
|
||||
(halt): New function.
|
||||
(hasMissingPiece): New function.
|
||||
(isHalt): New function.
|
||||
(setHalt): New function.
|
||||
* src/TorrentMan.cc
|
||||
(TorrentMan): Added the initialization of halt.
|
||||
* src/TorrentAutoSaveCommand.h
|
||||
(checkPoint): New variable.
|
||||
* src/PeerListenCommand.cc
|
||||
(execute): Returns true if TorrentMan::isHalt() is true.
|
||||
* src/main.cc
|
||||
(setSignalHander): Added flags argument.
|
||||
(torrentHandler): Just calls TorrentMan::setHalt(true) and returns.
|
||||
(main): Set SA_ONESHOT flag of the signal hander of SIGINT and SIGTERM.
|
||||
Removed printDownloadAbortMessage() after torrent downloading loop.
|
||||
* src/TorrentAutoSaveCommand.cc
|
||||
(TorrentAutoSaveCommand): Added the initialization of checkPoint.
|
||||
(execute): Returns true if TorrentMan::isHalt() is true.
|
||||
The interval check is now done by checkPoint, not SleepCommand.
|
||||
* src/TrackerWatcherCommand.cc
|
||||
(execute): If TorrentMan::isHalt(), then create a tracker request with
|
||||
event=stopped.
|
||||
* src/TrackerUpdateCommand.cc
|
||||
(prepareForRetry): Do not use SleepCommand here.
|
||||
(execute): Returns true if TorrentMan::isHalt() is true.
|
||||
|
||||
Others:
|
||||
|
||||
* src/TorrentMan.cc
|
||||
(getMissingPieceIndex): Updated log message.
|
||||
* src/PeerInteraction.cc
|
||||
(createPeerMessag): Updated log message.
|
||||
|
||||
2006-05-20 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>
|
||||
|
||||
To add the ability to download several pieces in mix in a command and
|
||||
|
@ -1112,7 +1226,7 @@
|
|||
* src/TrackerUpdateCommand.cc: Removed.
|
||||
* src/TrackerWatcherCommand.cc (execute): The construction of request
|
||||
url written in TrackerInitCommand was moved here. Do not create
|
||||
tracker request command if torretnMan->trackers != 0.
|
||||
tracker request command if torrentMan->trackers != 0.
|
||||
* src/CompactTrackerResponseProcessor.h: New class.
|
||||
* src/CompactTrackerResponseProcessor.cc: New class.
|
||||
* src/message.h (MSG_TRACKER_WARNING_MESSAGE): Updated.
|
||||
|
@ -1312,7 +1426,7 @@
|
|||
(finishPartialDownloadingMode): Reset requested flags.
|
||||
(onDownloadComplete): New function.
|
||||
* src/main.cc: Added --direct-file-mapping option.
|
||||
Use TorretMan::setupDiskWriter().
|
||||
Use TorrentMan::setupDiskWriter().
|
||||
* src/TorrentDownloadEngine.cc (afterEachIteration): Use TorrentMan::
|
||||
onDownloadComplete().
|
||||
|
||||
|
|
3
TODO
3
TODO
|
@ -17,5 +17,4 @@
|
|||
* Refacturing HttpConnection and FtpConnection
|
||||
* HTTP/FTP downloading regression test
|
||||
* Fast extension test
|
||||
* send stop to the tracker when download finishes or ctrl-c enters.
|
||||
* fix flooding detection
|
||||
* compile test(OpenSSL, non-ssl, non-torrent)
|
||||
|
|
|
@ -61,8 +61,8 @@ bool AbstractCommand::isTimeoutDetected() {
|
|||
checkPoint = now;
|
||||
return false;
|
||||
} else {
|
||||
long long int elapsed = Util::difftv(now, checkPoint);
|
||||
if(elapsed >= e->option->getAsLLInt(PREF_TIMEOUT)*1000000) {
|
||||
int elapsed = Util::difftvsec(now, checkPoint);
|
||||
if(elapsed >= e->option->getAsInt(PREF_TIMEOUT)) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
|
@ -145,11 +145,11 @@ void AbstractCommand::setReadCheckSocket(Socket* socket) {
|
|||
if(checkSocketIsReadable) {
|
||||
if(readCheckTarget != socket) {
|
||||
e->deleteSocketForReadCheck(readCheckTarget);
|
||||
e->addSocketForReadCheck(socket);
|
||||
e->addSocketForReadCheck(socket, this);
|
||||
readCheckTarget = socket;
|
||||
}
|
||||
} else {
|
||||
e->addSocketForReadCheck(socket);
|
||||
e->addSocketForReadCheck(socket, this);
|
||||
checkSocketIsReadable = true;
|
||||
readCheckTarget = socket;
|
||||
}
|
||||
|
@ -167,11 +167,11 @@ void AbstractCommand::setWriteCheckSocket(Socket* socket) {
|
|||
if(checkSocketIsWritable) {
|
||||
if(writeCheckTarget != socket) {
|
||||
e->deleteSocketForWriteCheck(writeCheckTarget);
|
||||
e->addSocketForWriteCheck(socket);
|
||||
e->addSocketForWriteCheck(socket, this);
|
||||
writeCheckTarget = socket;
|
||||
}
|
||||
} else {
|
||||
e->addSocketForWriteCheck(socket);
|
||||
e->addSocketForWriteCheck(socket, this);
|
||||
checkSocketIsWritable = true;
|
||||
writeCheckTarget = socket;
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
*/
|
||||
/* copyright --> */
|
||||
#include "BitfieldMan.h"
|
||||
#include "Util.h"
|
||||
#include <string.h>
|
||||
|
||||
BitfieldMan::BitfieldMan(int blockLength, long long int totalLength)
|
||||
|
@ -92,24 +93,21 @@ BitfieldMan& BitfieldMan::operator=(const BitfieldMan& bitfieldMan) {
|
|||
|
||||
int BitfieldMan::countSetBit(const unsigned char* bitfield, int len) const {
|
||||
int count = 0;
|
||||
for(int i = 0; i < len; i++) {
|
||||
unsigned char bit = bitfield[i];
|
||||
for(int bs = 7; bs >= 0 && i*8+7-bs < blocks; bs--) {
|
||||
unsigned char mask = 1 << bs;
|
||||
if(bit & mask) {
|
||||
count++;
|
||||
}
|
||||
}
|
||||
int size = sizeof(unsigned int);
|
||||
for(int i = 0; i < len/size; i++) {
|
||||
count += Util::countBit(*(unsigned int*)&bitfield[i*size]);
|
||||
}
|
||||
for(int i = len-len%size; i < len; i++) {
|
||||
count += Util::countBit((unsigned int)bitfield[i]);
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
int BitfieldMan::getMissingIndexRandomly(const unsigned char* bitfield, int len, int randMax) const {
|
||||
int BitfieldMan::getNthBitIndex(const unsigned char* bitfield, int len, int nth) const {
|
||||
int index = -1;
|
||||
int nth = 1+(int)(((double)randMax)*random()/(RAND_MAX+1.0));
|
||||
for(int i = 0; i < len && index == -1; i++) {
|
||||
unsigned char bit = bitfield[i];
|
||||
for(int bs = 7; bs >= 0 && i*8+7-bs < blocks; bs--) {
|
||||
for(int bs = 7; bs >= 0; bs--) {
|
||||
unsigned char mask = 1 << bs;
|
||||
if(bit & mask) {
|
||||
nth--;
|
||||
|
@ -123,6 +121,61 @@ int BitfieldMan::getMissingIndexRandomly(const unsigned char* bitfield, int len,
|
|||
return index;
|
||||
}
|
||||
|
||||
int BitfieldMan::getMissingIndexRandomly(const unsigned char* bitfield, int len, int randMax) const {
|
||||
int nth = 1+(int)(((double)randMax)*random()/(RAND_MAX+1.0));
|
||||
|
||||
int count = 0;
|
||||
int size = sizeof(unsigned int);
|
||||
for(int i = 0; i < len/size; i++) {
|
||||
int temp = Util::countBit(*(unsigned int*)&bitfield[i*size]);
|
||||
if(nth <= count+temp) {
|
||||
int t = i*size*8+getNthBitIndex(&bitfield[i*size], size, nth-count);
|
||||
return t;
|
||||
} else {
|
||||
count += temp;
|
||||
}
|
||||
}
|
||||
for(int i = len-len%size; i < len; i++) {
|
||||
int temp = Util::countBit((unsigned int)bitfield[i]);
|
||||
if(nth <= count+temp) {
|
||||
int t = i*8+getNthBitIndex(&bitfield[i], 1, nth-count);
|
||||
return t;
|
||||
} else {
|
||||
count += temp;
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
bool BitfieldMan::hasMissingPiece(const unsigned char* peerBitfield, int length) const {
|
||||
if(bitfieldLength != length) {
|
||||
return false;
|
||||
}
|
||||
unsigned char* tempBitfield = new unsigned char[bitfieldLength];
|
||||
for(int i = 0; i < bitfieldLength; i++) {
|
||||
tempBitfield[i] = peerBitfield[i] & ~bitfield[i];
|
||||
if(filterEnabled) {
|
||||
tempBitfield[i] &= filterBitfield[i];
|
||||
}
|
||||
}
|
||||
bool retval = false;
|
||||
int size = sizeof(unsigned int);
|
||||
for(int i = 0; i < length/size; i++) {
|
||||
if(Util::countBit(*(unsigned int*)&tempBitfield[i*size]) > 0) {
|
||||
retval = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
for(int i = length-length%size; i < length && retval == false; i++) {
|
||||
if(Util::countBit((unsigned int)tempBitfield[i]) > 0) {
|
||||
retval = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
delete [] tempBitfield;
|
||||
return retval;
|
||||
}
|
||||
|
||||
int BitfieldMan::getMissingIndex(const unsigned char* peerBitfield, int length) const {
|
||||
if(bitfieldLength != length) {
|
||||
return -1;
|
||||
|
@ -223,6 +276,26 @@ BlockIndexes BitfieldMan::getAllMissingIndexes() const {
|
|||
return missingIndexes;
|
||||
}
|
||||
|
||||
BlockIndexes BitfieldMan::getAllMissingIndexes(const unsigned char* peerBitfield, int peerBitfieldLength) const {
|
||||
BlockIndexes missingIndexes;
|
||||
if(bitfieldLength != peerBitfieldLength) {
|
||||
return missingIndexes;
|
||||
}
|
||||
for(int i = 0; i < bitfieldLength; i++) {
|
||||
unsigned char bit = peerBitfield[i] & ~bitfield[i];
|
||||
if(filterEnabled) {
|
||||
bit &= filterBitfield[i];
|
||||
}
|
||||
for(int bs = 7; bs >= 0 && i*8+7-bs < blocks; bs--) {
|
||||
unsigned char mask = 1 << bs;
|
||||
if(bit & mask) {
|
||||
missingIndexes.push_back(i*8+7-bs);
|
||||
}
|
||||
}
|
||||
}
|
||||
return missingIndexes;
|
||||
}
|
||||
|
||||
int BitfieldMan::countMissingBlock() const {
|
||||
if(filterEnabled) {
|
||||
unsigned char* temp = new unsigned char[bitfieldLength];
|
||||
|
|
|
@ -38,6 +38,7 @@ private:
|
|||
int blocks;
|
||||
bool filterEnabled;
|
||||
int countSetBit(const unsigned char* bitfield, int len) const;
|
||||
int getNthBitIndex(const unsigned char* bitfield, int len, int nth) const;
|
||||
int getMissingIndexRandomly(const unsigned char* bitfield, int len, int randMax) const;
|
||||
bool isBitSetInternal(const unsigned char* bitfield, int index) const;
|
||||
bool setBitInternal(unsigned char* bitfield, int index, bool on);
|
||||
|
@ -64,6 +65,10 @@ public:
|
|||
}
|
||||
long long int getTotalLength() const { return totalLength; }
|
||||
|
||||
/**
|
||||
* affected by filter
|
||||
*/
|
||||
bool hasMissingPiece(const unsigned char* bitfield, int len) const;
|
||||
/**
|
||||
* affected by filter
|
||||
*/
|
||||
|
@ -88,6 +93,10 @@ public:
|
|||
* affected by filter
|
||||
*/
|
||||
BlockIndexes getAllMissingIndexes() const;
|
||||
/**
|
||||
* affected by filter
|
||||
*/
|
||||
BlockIndexes getAllMissingIndexes(const unsigned char* bitfield, int len) const;
|
||||
/**
|
||||
* affected by filter
|
||||
*/
|
||||
|
|
|
@ -60,9 +60,9 @@ bool DownloadCommand::executeInternal(Segment seg) {
|
|||
sw = now;
|
||||
lastSize = seg.ds;
|
||||
} else {
|
||||
long long int diff = Util::difftv(now, sw);
|
||||
if(diff >= 1000000) {
|
||||
seg.speed = (int)((seg.ds-lastSize)/(diff/1000000.0));
|
||||
int diff = Util::difftvsec(now, sw);
|
||||
if(diff >= 1) {
|
||||
seg.speed = (int)((seg.ds-lastSize)/(diff*1.0));
|
||||
sw = now;
|
||||
lastSize = seg.ds;
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include <sys/types.h>
|
||||
#include <sys/stat.h>
|
||||
#include <fcntl.h>
|
||||
#include <errno.h>
|
||||
#include <algorithm>
|
||||
|
||||
using namespace std;
|
||||
|
@ -47,19 +48,41 @@ void DownloadEngine::cleanQueue() {
|
|||
|
||||
void DownloadEngine::run() {
|
||||
initStatistics();
|
||||
struct timeval cp;
|
||||
cp.tv_sec = 0;
|
||||
cp.tv_usec = 0;
|
||||
Sockets activeSockets;
|
||||
while(!commands.empty()) {
|
||||
int max = commands.size();
|
||||
for(int i = 0; i < max; i++) {
|
||||
Command* com = commands.front();
|
||||
commands.pop_front();
|
||||
if(com->execute()) {
|
||||
delete(com);
|
||||
struct timeval now;
|
||||
gettimeofday(&now, NULL);
|
||||
if(Util::difftvsec(now, cp) >= 1) {
|
||||
cp = now;
|
||||
int max = commands.size();
|
||||
for(int i = 0; i < max; i++) {
|
||||
Command* com = commands.front();
|
||||
commands.pop_front();
|
||||
if(com->execute()) {
|
||||
delete(com);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for(Sockets::iterator itr = activeSockets.begin();
|
||||
itr != activeSockets.end(); itr++) {
|
||||
Socket* socket = *itr;
|
||||
SockCmdMap::iterator mapItr = sockCmdMap.find(socket);
|
||||
if(mapItr != sockCmdMap.end()) {
|
||||
Command* com = (*mapItr).second;
|
||||
commands.erase(remove(commands.begin(), commands.end(), com));
|
||||
if(com->execute()) {
|
||||
delete(com);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
afterEachIteration();
|
||||
//shortSleep();
|
||||
if(!noWait && !commands.empty()) {
|
||||
waitData();
|
||||
waitData(activeSockets);
|
||||
}
|
||||
noWait = false;
|
||||
calculateStatistics();
|
||||
|
@ -76,39 +99,60 @@ void DownloadEngine::shortSleep() const {
|
|||
select(0, &rfds, NULL, NULL, &tv);
|
||||
}
|
||||
|
||||
void DownloadEngine::waitData() {
|
||||
void DownloadEngine::waitData(Sockets& activeSockets) {
|
||||
activeSockets.clear();
|
||||
fd_set rfds;
|
||||
fd_set wfds;
|
||||
struct timeval tv;
|
||||
int retval;
|
||||
int retval = 0;
|
||||
while(1) {
|
||||
struct timeval tv;
|
||||
|
||||
FD_ZERO(&rfds);
|
||||
FD_ZERO(&wfds);
|
||||
int max = 0;
|
||||
for(Sockets::iterator itr = rsockets.begin(); itr != rsockets.end(); itr++) {
|
||||
FD_SET((*itr)->getSockfd(), &rfds);
|
||||
if(max < (*itr)->getSockfd()) {
|
||||
max = (*itr)->getSockfd();
|
||||
FD_ZERO(&rfds);
|
||||
FD_ZERO(&wfds);
|
||||
int max = 0;
|
||||
for(Sockets::iterator itr = rsockets.begin(); itr != rsockets.end(); itr++) {
|
||||
FD_SET((*itr)->getSockfd(), &rfds);
|
||||
if(max < (*itr)->getSockfd()) {
|
||||
max = (*itr)->getSockfd();
|
||||
}
|
||||
}
|
||||
for(Sockets::iterator itr = wsockets.begin(); itr != wsockets.end(); itr++) {
|
||||
FD_SET((*itr)->getSockfd(), &wfds);
|
||||
if(max < (*itr)->getSockfd()) {
|
||||
max = (*itr)->getSockfd();
|
||||
}
|
||||
}
|
||||
tv.tv_sec = 1;
|
||||
tv.tv_usec = 0;
|
||||
retval = select(max+1, &rfds, &wfds, NULL, &tv);
|
||||
if(retval != -1 || errno != EINTR) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
for(Sockets::iterator itr = wsockets.begin(); itr != wsockets.end(); itr++) {
|
||||
FD_SET((*itr)->getSockfd(), &wfds);
|
||||
if(max < (*itr)->getSockfd()) {
|
||||
max = (*itr)->getSockfd();
|
||||
if(retval > 0) {
|
||||
for(Sockets::iterator itr = rsockets.begin(); itr != rsockets.end(); itr++) {
|
||||
if(FD_ISSET((*itr)->getSockfd(), &rfds)) {
|
||||
activeSockets.push_back(*itr);
|
||||
}
|
||||
}
|
||||
for(Sockets::iterator itr = wsockets.begin(); itr != wsockets.end(); itr++) {
|
||||
if(FD_ISSET((*itr)->getSockfd(), &wfds)) {
|
||||
activeSockets.push_back(*itr);
|
||||
}
|
||||
}
|
||||
sort(activeSockets.begin(), activeSockets.end());
|
||||
activeSockets.erase(unique(activeSockets.begin(), activeSockets.end()), activeSockets.end());
|
||||
}
|
||||
tv.tv_sec = 1;
|
||||
tv.tv_usec = 0;
|
||||
|
||||
retval = select(max+1, &rfds, &wfds, NULL, &tv);
|
||||
}
|
||||
|
||||
bool DownloadEngine::addSocket(Sockets& sockets, Socket* socket) {
|
||||
bool DownloadEngine::addSocket(Sockets& sockets, Socket* socket, Command* command) {
|
||||
Sockets::iterator itr = find(sockets.begin(),
|
||||
sockets.end(),
|
||||
socket);
|
||||
if(itr == sockets.end()) {
|
||||
sockets.push_back(socket);
|
||||
SockCmdMap::value_type vt(socket, command);
|
||||
sockCmdMap.insert(vt);
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
|
@ -121,22 +165,26 @@ bool DownloadEngine::deleteSocket(Sockets& sockets, Socket* socket) {
|
|||
socket);
|
||||
if(itr != sockets.end()) {
|
||||
sockets.erase(itr);
|
||||
SockCmdMap::iterator mapItr = sockCmdMap.find(socket);
|
||||
if(mapItr != sockCmdMap.end()) {
|
||||
sockCmdMap.erase(mapItr);
|
||||
}
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
bool DownloadEngine::addSocketForReadCheck(Socket* socket) {
|
||||
return addSocket(rsockets, socket);
|
||||
bool DownloadEngine::addSocketForReadCheck(Socket* socket, Command* command) {
|
||||
return addSocket(rsockets, socket, command);
|
||||
}
|
||||
|
||||
bool DownloadEngine::deleteSocketForReadCheck(Socket* socket) {
|
||||
return deleteSocket(rsockets , socket);
|
||||
}
|
||||
|
||||
bool DownloadEngine::addSocketForWriteCheck(Socket* socket) {
|
||||
return addSocket(wsockets, socket);
|
||||
bool DownloadEngine::addSocketForWriteCheck(Socket* socket, Command* command) {
|
||||
return addSocket(wsockets, socket, command);
|
||||
}
|
||||
|
||||
bool DownloadEngine::deleteSocketForWriteCheck(Socket* socket) {
|
||||
|
|
|
@ -33,15 +33,17 @@
|
|||
|
||||
typedef deque<Socket*> Sockets;
|
||||
typedef deque<Command*> Commands;
|
||||
typedef multimap<Socket*, Command*> SockCmdMap;
|
||||
|
||||
class DownloadEngine {
|
||||
private:
|
||||
void waitData();
|
||||
void waitData(Sockets& activeSockets);
|
||||
Sockets rsockets;
|
||||
Sockets wsockets;
|
||||
SockCmdMap sockCmdMap;
|
||||
|
||||
void shortSleep() const;
|
||||
bool addSocket(Sockets& sockets, Socket* socket);
|
||||
bool addSocket(Sockets& sockets, Socket* socket, Command* command);
|
||||
bool deleteSocket(Sockets& sockets, Socket* socket);
|
||||
protected:
|
||||
const Logger* logger;
|
||||
|
@ -62,9 +64,9 @@ public:
|
|||
|
||||
void cleanQueue();
|
||||
|
||||
bool addSocketForReadCheck(Socket* socket);
|
||||
bool addSocketForReadCheck(Socket* socket, Command* command);
|
||||
bool deleteSocketForReadCheck(Socket* socket);
|
||||
bool addSocketForWriteCheck(Socket* socket);
|
||||
bool addSocketForWriteCheck(Socket* socket, Command* command);
|
||||
bool deleteSocketForWriteCheck(Socket* socket);
|
||||
|
||||
};
|
||||
|
|
|
@ -64,8 +64,8 @@ bool PeerAbstractCommand::isTimeoutDetected() {
|
|||
checkPoint = now;
|
||||
return false;
|
||||
} else {
|
||||
long long int elapsed = Util::difftv(now, checkPoint);
|
||||
if(elapsed >= ((long long int)timeout)*1000000) {
|
||||
int elapsed = Util::difftvsec(now, checkPoint);
|
||||
if(elapsed >= timeout) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
|
@ -74,9 +74,13 @@ bool PeerAbstractCommand::isTimeoutDetected() {
|
|||
}
|
||||
|
||||
bool PeerAbstractCommand::execute() {
|
||||
if(e->torrentMan->isHalt()) {
|
||||
return true;
|
||||
}
|
||||
try {
|
||||
beforeSocketCheck();
|
||||
if(uploadLimitCheck && e->getUploadSpeed() <= uploadLimit*1024 ||
|
||||
if(uploadLimitCheck && (uploadLimit == 0 ||
|
||||
e->getUploadSpeed() <= uploadLimit*1024) ||
|
||||
checkSocketIsReadable && readCheckTarget->isReadable(0) ||
|
||||
checkSocketIsWritable && writeCheckTarget->isWritable(0) ||
|
||||
!checkSocketIsReadable && !checkSocketIsWritable) {
|
||||
|
@ -130,11 +134,11 @@ void PeerAbstractCommand::setReadCheckSocket(Socket* socket) {
|
|||
if(checkSocketIsReadable) {
|
||||
if(readCheckTarget != socket) {
|
||||
e->deleteSocketForReadCheck(readCheckTarget);
|
||||
e->addSocketForReadCheck(socket);
|
||||
e->addSocketForReadCheck(socket, this);
|
||||
readCheckTarget = socket;
|
||||
}
|
||||
} else {
|
||||
e->addSocketForReadCheck(socket);
|
||||
e->addSocketForReadCheck(socket, this);
|
||||
checkSocketIsReadable = true;
|
||||
readCheckTarget = socket;
|
||||
}
|
||||
|
@ -152,11 +156,11 @@ void PeerAbstractCommand::setWriteCheckSocket(Socket* socket) {
|
|||
if(checkSocketIsWritable) {
|
||||
if(writeCheckTarget != socket) {
|
||||
e->deleteSocketForWriteCheck(writeCheckTarget);
|
||||
e->addSocketForWriteCheck(socket);
|
||||
e->addSocketForWriteCheck(socket, this);
|
||||
writeCheckTarget = socket;
|
||||
}
|
||||
} else {
|
||||
e->addSocketForWriteCheck(socket);
|
||||
e->addSocketForWriteCheck(socket, this);
|
||||
checkSocketIsWritable = true;
|
||||
writeCheckTarget = socket;
|
||||
}
|
||||
|
|
|
@ -20,10 +20,12 @@
|
|||
*/
|
||||
/* copyright --> */
|
||||
#include "PeerChokeCommand.h"
|
||||
#include "SleepCommand.h"
|
||||
#include "Util.h"
|
||||
|
||||
PeerChokeCommand::PeerChokeCommand(int cuid, int interval, TorrentDownloadEngine* e):Command(cuid), interval(interval), e(e), rotate(0) {}
|
||||
PeerChokeCommand::PeerChokeCommand(int cuid, int interval, TorrentDownloadEngine* e):Command(cuid), interval(interval), e(e), rotate(0) {
|
||||
checkPoint.tv_sec = 0;
|
||||
checkPoint.tv_usec = 0;
|
||||
}
|
||||
|
||||
PeerChokeCommand::~PeerChokeCommand() {}
|
||||
|
||||
|
@ -91,47 +93,52 @@ void PeerChokeCommand::orderByDownloadRate(Peers& peers) const {
|
|||
}
|
||||
|
||||
bool PeerChokeCommand::execute() {
|
||||
Peers peers = e->torrentMan->getActivePeers();
|
||||
setAllPeerChoked(peers);
|
||||
if(e->torrentMan->downloadComplete()) {
|
||||
orderByDownloadRate(peers);
|
||||
} else {
|
||||
orderByUploadRate(peers);
|
||||
if(e->torrentMan->isHalt()) {
|
||||
return true;
|
||||
}
|
||||
int unchokingCount = peers.size() >= 4 ? 4 : peers.size();
|
||||
for(Peers::iterator itr = peers.begin(); unchokingCount > 0 && itr != peers.end(); ) {
|
||||
Peer* peer = *itr;
|
||||
if(peer->peerInterested) {
|
||||
peer->chokingRequired = false;
|
||||
peer->optUnchoking = false;
|
||||
itr = peers.erase(itr);
|
||||
unchokingCount--;
|
||||
logger->debug("cat01, unchoking %s, delta=%d", peer->ipaddr.c_str(), peer->getDeltaUpload());
|
||||
struct timeval now;
|
||||
gettimeofday(&now, NULL);
|
||||
if(Util::difftvsec(now, checkPoint) >= interval) {
|
||||
checkPoint = now;
|
||||
Peers peers = e->torrentMan->getActivePeers();
|
||||
setAllPeerChoked(peers);
|
||||
if(e->torrentMan->downloadComplete()) {
|
||||
orderByDownloadRate(peers);
|
||||
} else {
|
||||
itr++;
|
||||
orderByUploadRate(peers);
|
||||
}
|
||||
}
|
||||
for(Peers::iterator itr = peers.begin(); itr != peers.end(); ) {
|
||||
Peer* peer = *itr;
|
||||
if(!peer->peerInterested) {
|
||||
peer->chokingRequired = false;
|
||||
peer->optUnchoking = false;
|
||||
itr = peers.erase(itr);
|
||||
logger->debug("cat02, unchoking %s, delta=%d", peer->ipaddr.c_str(), peer->getDeltaUpload());
|
||||
break;
|
||||
} else {
|
||||
itr++;
|
||||
int unchokingCount = peers.size() >= 4 ? 4 : peers.size();
|
||||
for(Peers::iterator itr = peers.begin(); unchokingCount > 0 && itr != peers.end(); ) {
|
||||
Peer* peer = *itr;
|
||||
if(peer->peerInterested) {
|
||||
peer->chokingRequired = false;
|
||||
peer->optUnchoking = false;
|
||||
itr = peers.erase(itr);
|
||||
unchokingCount--;
|
||||
logger->debug("cat01, unchoking %s, delta=%d", peer->ipaddr.c_str(), peer->getDeltaUpload());
|
||||
} else {
|
||||
itr++;
|
||||
}
|
||||
}
|
||||
for(Peers::iterator itr = peers.begin(); itr != peers.end(); ) {
|
||||
Peer* peer = *itr;
|
||||
if(!peer->peerInterested) {
|
||||
peer->chokingRequired = false;
|
||||
peer->optUnchoking = false;
|
||||
itr = peers.erase(itr);
|
||||
logger->debug("cat02, unchoking %s, delta=%d", peer->ipaddr.c_str(), peer->getDeltaUpload());
|
||||
break;
|
||||
} else {
|
||||
itr++;
|
||||
}
|
||||
}
|
||||
if(rotate%3 == 0) {
|
||||
optUnchokingPeer(peers);
|
||||
rotate = 0;
|
||||
}
|
||||
rotate++;
|
||||
setAllPeerResetDelta(e->torrentMan->getActivePeers());
|
||||
}
|
||||
if(rotate%3 == 0) {
|
||||
optUnchokingPeer(peers);
|
||||
rotate = 0;
|
||||
}
|
||||
rotate++;
|
||||
setAllPeerResetDelta(e->torrentMan->getActivePeers());
|
||||
|
||||
SleepCommand* command = new SleepCommand(cuid, e, this, interval);
|
||||
e->commands.push_back(command);
|
||||
|
||||
e->commands.push_back(this);
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ private:
|
|||
int interval;
|
||||
TorrentDownloadEngine* e;
|
||||
int rotate;
|
||||
struct timeval checkPoint;
|
||||
|
||||
void orderByUploadRate(Peers& peers) const;
|
||||
void orderByDownloadRate(Peers& peers) const;
|
||||
|
|
|
@ -401,7 +401,7 @@ PeerMessage* PeerInteraction::createPeerMessage(const char* msg, int msgLength)
|
|||
((AllowedFastMessage*)peerMessage)->setPieces(torrentMan->pieces);
|
||||
break;
|
||||
default:
|
||||
throw new DlAbortEx("invalid message id. id = %d", id);
|
||||
throw new DlAbortEx("Invalid message id. id = %d", id);
|
||||
}
|
||||
}
|
||||
setPeerMessageCommonProperty(peerMessage);
|
||||
|
@ -422,8 +422,7 @@ void PeerInteraction::updatePiece() {
|
|||
}
|
||||
|
||||
void PeerInteraction::getNewPieceAndSendInterest(int pieceNum) {
|
||||
int index = torrentMan->getMissingPieceIndex(peer);
|
||||
if(pieces.empty() && index == -1) {
|
||||
if(pieces.empty() && !torrentMan->hasMissingPiece(peer)) {
|
||||
if(peer->amInterested) {
|
||||
logger->debug("CUID#%d - Not interested in the peer", cuid);
|
||||
addMessage(createNotInterestedMessage());
|
||||
|
@ -470,11 +469,9 @@ void PeerInteraction::addRequests() {
|
|||
}
|
||||
}
|
||||
int MAX_PENDING_REQUEST;
|
||||
if(peer->getLatency() < 300) {
|
||||
if(peer->getLatency() < 900) {
|
||||
MAX_PENDING_REQUEST = 24;
|
||||
} else if(peer->getLatency() < 600) {
|
||||
MAX_PENDING_REQUEST = 18;
|
||||
} else if(peer->getLatency() < 1000) {
|
||||
} else if(peer->getLatency() < 1500) {
|
||||
MAX_PENDING_REQUEST = 12;
|
||||
} else {
|
||||
MAX_PENDING_REQUEST = 6;
|
||||
|
|
|
@ -113,12 +113,10 @@ bool PeerInteractionCommand::executeInternal() {
|
|||
case WIRED:
|
||||
peerInteraction->syncPiece();
|
||||
decideChoking();
|
||||
for(int i = 0; i < 50; i++) {
|
||||
if(!socket->isReadable(0)) {
|
||||
break;
|
||||
}
|
||||
receiveMessage();
|
||||
}
|
||||
receiveMessages();
|
||||
detectMessageFlooding();
|
||||
//checkLongTimePeerChoking();
|
||||
|
||||
peerInteraction->deleteTimeoutRequestSlot();
|
||||
peerInteraction->deleteCompletedRequestSlot();
|
||||
peerInteraction->addRequests();
|
||||
|
@ -142,11 +140,12 @@ void PeerInteractionCommand::detectMessageFlooding() {
|
|||
if(freqCheckPoint.tv_sec == 0 && freqCheckPoint.tv_usec == 0) {
|
||||
freqCheckPoint = now;
|
||||
} else {
|
||||
if(Util::difftv(now, freqCheckPoint) >= 5*1000000) {
|
||||
if(chokeUnchokeCount*1.0/(Util::difftv(now, freqCheckPoint)/1000000) >= 0.4
|
||||
|| haveCount*1.0/(Util::difftv(now, freqCheckPoint)/1000000) >= 20.0
|
||||
|| keepAliveCount*1.0/(Util::difftv(now, freqCheckPoint)/1000000) >= 1.0) {
|
||||
throw new DlAbortEx("flooding detected.");
|
||||
int elapsed = Util::difftvsec(now, freqCheckPoint);
|
||||
if(elapsed >= 5) {
|
||||
if(chokeUnchokeCount*1.0/elapsed >= 0.4
|
||||
//|| haveCount*1.0/elapsed >= 20.0
|
||||
|| keepAliveCount*1.0/elapsed >= 1.0) {
|
||||
throw new DlAbortEx("Flooding detected.");
|
||||
} else {
|
||||
chokeUnchokeCount = 0;
|
||||
haveCount = 0;
|
||||
|
@ -169,8 +168,8 @@ void PeerInteractionCommand::checkLongTimePeerChoking() {
|
|||
}
|
||||
} else {
|
||||
if(peer->amInterested && peer->peerChoking) {
|
||||
if(Util::difftv(now, chokeCheckPoint) >= MAX_PEER_CHOKING_INTERVAL*1000000) {
|
||||
throw new DlAbortEx("too long choking");
|
||||
if(Util::difftvsec(now, chokeCheckPoint) >= MAX_PEER_CHOKING_INTERVAL) {
|
||||
throw new DlAbortEx("Too long choking.");
|
||||
}
|
||||
} else {
|
||||
chokeCheckPoint.tv_sec = 0;
|
||||
|
@ -191,39 +190,41 @@ void PeerInteractionCommand::decideChoking() {
|
|||
}
|
||||
}
|
||||
|
||||
void PeerInteractionCommand::receiveMessage() {
|
||||
PeerMessage* message = peerInteraction->receiveMessage();
|
||||
if(message == NULL) {
|
||||
return;
|
||||
}
|
||||
logger->info(MSG_RECEIVE_PEER_MESSAGE, cuid,
|
||||
peer->ipaddr.c_str(), peer->port,
|
||||
message->toString().c_str());
|
||||
// to detect flooding
|
||||
switch(message->getId()) {
|
||||
case KeepAliveMessage::ID:
|
||||
keepAliveCount++;
|
||||
break;
|
||||
case ChokeMessage::ID:
|
||||
if(!peer->peerChoking) {
|
||||
chokeUnchokeCount++;
|
||||
void PeerInteractionCommand::receiveMessages() {
|
||||
for(int i = 0; i < 50; i++) {
|
||||
PeerMessage* message = peerInteraction->receiveMessage();
|
||||
if(message == NULL) {
|
||||
return;
|
||||
}
|
||||
break;
|
||||
case UnchokeMessage::ID:
|
||||
if(peer->peerChoking) {
|
||||
chokeUnchokeCount++;
|
||||
logger->info(MSG_RECEIVE_PEER_MESSAGE, cuid,
|
||||
peer->ipaddr.c_str(), peer->port,
|
||||
message->toString().c_str());
|
||||
// to detect flooding
|
||||
switch(message->getId()) {
|
||||
case KeepAliveMessage::ID:
|
||||
keepAliveCount++;
|
||||
break;
|
||||
case ChokeMessage::ID:
|
||||
if(!peer->peerChoking) {
|
||||
chokeUnchokeCount++;
|
||||
}
|
||||
break;
|
||||
case UnchokeMessage::ID:
|
||||
if(peer->peerChoking) {
|
||||
chokeUnchokeCount++;
|
||||
}
|
||||
break;
|
||||
case HaveMessage::ID:
|
||||
haveCount++;
|
||||
break;
|
||||
}
|
||||
try {
|
||||
message->receivedAction();
|
||||
delete message;
|
||||
} catch(Exception* ex) {
|
||||
delete message;
|
||||
throw;
|
||||
}
|
||||
break;
|
||||
case HaveMessage::ID:
|
||||
haveCount++;
|
||||
break;
|
||||
}
|
||||
try {
|
||||
message->receivedAction();
|
||||
delete message;
|
||||
} catch(Exception* ex) {
|
||||
delete message;
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -255,7 +256,7 @@ void PeerInteractionCommand::keepAlive() {
|
|||
} else {
|
||||
struct timeval now;
|
||||
gettimeofday(&now, NULL);
|
||||
if(Util::difftv(now, keepAliveCheckPoint) >= (long long int)120*1000000) {
|
||||
if(Util::difftvsec(now, keepAliveCheckPoint) >= 120) {
|
||||
if(peerInteraction->countMessageInQueue() == 0) {
|
||||
peerInteraction->addMessage(peerInteraction->createKeepAliveMessage());
|
||||
peerInteraction->sendMessages(e->getUploadSpeed());
|
||||
|
@ -269,8 +270,7 @@ void PeerInteractionCommand::beforeSocketCheck() {
|
|||
if(sequence == WIRED) {
|
||||
e->torrentMan->unadvertisePiece(cuid);
|
||||
detectMessageFlooding();
|
||||
checkLongTimePeerChoking();
|
||||
|
||||
//checkLongTimePeerChoking();
|
||||
PieceIndexes indexes = e->torrentMan->getAdvertisedPieceIndexes(cuid);
|
||||
if(indexes.size() >= 20) {
|
||||
if(peer->isFastExtensionEnabled()) {
|
||||
|
|
|
@ -41,7 +41,7 @@ private:
|
|||
int chokeUnchokeCount;
|
||||
int haveCount;
|
||||
int keepAliveCount;
|
||||
void receiveMessage();
|
||||
void receiveMessages();
|
||||
void detectMessageFlooding();
|
||||
void checkLongTimePeerChoking();
|
||||
void detectTimeoutAndDuplicateBlock();
|
||||
|
|
|
@ -54,39 +54,47 @@ int PeerListenCommand::bindPort(int portRangeStart, int portRangeEnd) {
|
|||
}
|
||||
|
||||
bool PeerListenCommand::execute() {
|
||||
for(int i = 0; i < 3 && socket->isReadable(0); i++) {
|
||||
Socket* peerSocket = NULL;
|
||||
try {
|
||||
peerSocket = socket->acceptConnection();
|
||||
pair<string, int> peerInfo;
|
||||
peerSocket->getPeerInfo(peerInfo);
|
||||
pair<string, int> localInfo;
|
||||
peerSocket->getAddrInfo(localInfo);
|
||||
if(peerInfo.first != localInfo.first &&
|
||||
e->torrentMan->connections < MAX_PEERS) {
|
||||
Peer* peer = new Peer(peerInfo.first, peerInfo.second,
|
||||
e->torrentMan->pieceLength,
|
||||
e->torrentMan->getTotalLength());
|
||||
if(e->torrentMan->addPeer(peer, true)) {
|
||||
int newCuid = e->torrentMan->getNewCuid();
|
||||
peer->cuid = newCuid;
|
||||
PeerInteractionCommand* command =
|
||||
new PeerInteractionCommand(newCuid, peer, e, peerSocket,
|
||||
PeerInteractionCommand::RECEIVER_WAIT_HANDSHAKE);
|
||||
e->commands.push_back(command);
|
||||
logger->debug("CUID#%d - incoming connection, adding new command CUID#%d", cuid, newCuid);
|
||||
} else {
|
||||
delete peer;
|
||||
if(e->torrentMan->isHalt()) {
|
||||
return true;
|
||||
}
|
||||
try {
|
||||
for(int i = 0; i < 3 && socket->isReadable(0); i++) {
|
||||
Socket* peerSocket = NULL;
|
||||
try {
|
||||
peerSocket = socket->acceptConnection();
|
||||
pair<string, int> peerInfo;
|
||||
peerSocket->getPeerInfo(peerInfo);
|
||||
pair<string, int> localInfo;
|
||||
peerSocket->getAddrInfo(localInfo);
|
||||
if(peerInfo.first != localInfo.first &&
|
||||
e->torrentMan->connections < MAX_PEERS) {
|
||||
Peer* peer = new Peer(peerInfo.first, peerInfo.second,
|
||||
e->torrentMan->pieceLength,
|
||||
e->torrentMan->getTotalLength());
|
||||
if(e->torrentMan->addPeer(peer, true)) {
|
||||
int newCuid = e->torrentMan->getNewCuid();
|
||||
peer->cuid = newCuid;
|
||||
PeerInteractionCommand* command =
|
||||
new PeerInteractionCommand(newCuid, peer, e, peerSocket,
|
||||
PeerInteractionCommand::RECEIVER_WAIT_HANDSHAKE);
|
||||
e->commands.push_back(command);
|
||||
logger->debug("CUID#%d - incoming connection, adding new command CUID#%d", cuid, newCuid);
|
||||
} else {
|
||||
delete peer;
|
||||
}
|
||||
}
|
||||
delete peerSocket;
|
||||
} catch(Exception* ex) {
|
||||
logger->error("CUID#%d - error in accepting connection", ex, cuid);
|
||||
delete ex;
|
||||
if(peerSocket != NULL) {
|
||||
delete peerSocket;
|
||||
}
|
||||
}
|
||||
delete peerSocket;
|
||||
} catch(Exception* ex) {
|
||||
logger->error("CUID#%d - error in accepting connection", ex, cuid);
|
||||
delete ex;
|
||||
if(peerSocket != NULL) {
|
||||
delete peerSocket;
|
||||
}
|
||||
}
|
||||
} catch(Exception* e) {
|
||||
logger->error("CUID#%d - Exception occurred.", e, cuid);
|
||||
delete e;
|
||||
}
|
||||
e->commands.push_back(this);
|
||||
return false;
|
||||
|
|
|
@ -36,7 +36,7 @@ SleepCommand::~SleepCommand() {
|
|||
bool SleepCommand::execute() {
|
||||
struct timeval now;
|
||||
gettimeofday(&now, NULL);
|
||||
if(Util::difftv(now, checkPoint) >= ((long long int)wait)*1000000) {
|
||||
if(Util::difftvsec(now, checkPoint) >= wait) {
|
||||
engine->commands.push_back(nextCommand);
|
||||
nextCommand = NULL;
|
||||
return true;
|
||||
|
|
|
@ -225,7 +225,7 @@ bool SocketCore::isWritable(int timeout) const {
|
|||
// time out
|
||||
return false;
|
||||
} else {
|
||||
if(errno == EINPROGRESS) {
|
||||
if(errno == EINPROGRESS || errno == EINTR) {
|
||||
return false;
|
||||
} else {
|
||||
throw new DlRetryEx(EX_SOCKET_CHECK_WRITABLE, strerror(errno));
|
||||
|
@ -254,7 +254,7 @@ bool SocketCore::isReadable(int timeout) const {
|
|||
// time out
|
||||
return false;
|
||||
} else {
|
||||
if(errno == EINPROGRESS) {
|
||||
if(errno == EINPROGRESS || errno == EINTR) {
|
||||
return false;
|
||||
} else {
|
||||
throw new DlRetryEx(EX_SOCKET_CHECK_READABLE, strerror(errno));
|
||||
|
|
|
@ -20,18 +20,28 @@
|
|||
*/
|
||||
/* copyright --> */
|
||||
#include "TorrentAutoSaveCommand.h"
|
||||
#include "SleepCommand.h"
|
||||
#include "Util.h"
|
||||
|
||||
TorrentAutoSaveCommand::TorrentAutoSaveCommand(int cuid, TorrentDownloadEngine* e, int interval):Command(cuid), e(e), interval(interval) {}
|
||||
TorrentAutoSaveCommand::TorrentAutoSaveCommand(int cuid, TorrentDownloadEngine* e, int interval):Command(cuid), e(e), interval(interval) {
|
||||
checkPoint.tv_sec = 0;
|
||||
checkPoint.tv_usec = 0;
|
||||
}
|
||||
|
||||
TorrentAutoSaveCommand::~TorrentAutoSaveCommand() {}
|
||||
|
||||
bool TorrentAutoSaveCommand::execute() {
|
||||
if(e->torrentMan->downloadComplete()) {
|
||||
if(e->torrentMan->downloadComplete() || e->torrentMan->isHalt()) {
|
||||
return true;
|
||||
}
|
||||
e->torrentMan->save();
|
||||
SleepCommand* sleepCommand = new SleepCommand(cuid, e, this, interval);
|
||||
e->commands.push_back(sleepCommand);
|
||||
struct timeval now;
|
||||
gettimeofday(&now, NULL);
|
||||
if(Util::difftvsec(now, checkPoint) >= interval) {
|
||||
checkPoint = now;
|
||||
e->torrentMan->save();
|
||||
if(e->torrentMan->isHalt()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
e->commands.push_back(this);
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ class TorrentAutoSaveCommand : public Command {
|
|||
private:
|
||||
TorrentDownloadEngine* e;
|
||||
int interval;
|
||||
struct timeval checkPoint;
|
||||
public:
|
||||
TorrentAutoSaveCommand(int cuid, TorrentDownloadEngine* e, int interval);
|
||||
~TorrentAutoSaveCommand();
|
||||
|
|
|
@ -39,6 +39,7 @@
|
|||
#include <errno.h>
|
||||
#include <libgen.h>
|
||||
#include <string.h>
|
||||
#include <algorithm>
|
||||
|
||||
TorrentMan::TorrentMan():bitfield(NULL),
|
||||
peerEntryIdCounter(0), cuidCounter(0),
|
||||
|
@ -47,6 +48,7 @@ TorrentMan::TorrentMan():bitfield(NULL),
|
|||
deltaDownloadLength(0), deltaUploadLength(0),
|
||||
storeDir("."),
|
||||
setupComplete(false),
|
||||
halt(false),
|
||||
interval(DEFAULT_ANNOUNCE_INTERVAL),
|
||||
minInterval(DEFAULT_ANNOUNCE_MIN_INTERVAL),
|
||||
complete(0), incomplete(0),
|
||||
|
@ -145,6 +147,11 @@ bool TorrentMan::isEndGame() const {
|
|||
return bitfield->countMissingBlock() <= END_GAME_PIECE_NUM;
|
||||
}
|
||||
|
||||
bool TorrentMan::hasMissingPiece(const Peer* peer) const {
|
||||
return bitfield->hasMissingPiece(peer->getBitfield(),
|
||||
peer->getBitfieldLength());
|
||||
}
|
||||
|
||||
int TorrentMan::getMissingPieceIndex(const Peer* peer) const {
|
||||
int index = -1;
|
||||
if(isEndGame()) {
|
||||
|
@ -211,7 +218,7 @@ int TorrentMan::deleteUsedPiecesByFillRate(int fillRate, int toDelete) {
|
|||
Piece& piece = *itr;
|
||||
if(!bitfield->isUseBitSet(piece.getIndex()) &&
|
||||
piece.countCompleteBlock() <= piece.countBlock()*(fillRate/100.0)) {
|
||||
logger->debug("deleting used piece index=%d, fillRate(%%)=%d<=%d",
|
||||
logger->debug("Deleting used piece index=%d, fillRate(%%)=%d<=%d",
|
||||
piece.getIndex(),
|
||||
(piece.countCompleteBlock()*100)/piece.countBlock(),
|
||||
fillRate);
|
||||
|
@ -258,12 +265,7 @@ void TorrentMan::deleteUsedPiece(const Piece& piece) {
|
|||
if(Piece::isNull(piece)) {
|
||||
return;
|
||||
}
|
||||
for(UsedPieces::iterator itr = usedPieces.begin(); itr != usedPieces.end(); itr++) {
|
||||
if(itr->getIndex() == piece.getIndex()) {
|
||||
usedPieces.erase(itr);
|
||||
break;
|
||||
}
|
||||
}
|
||||
usedPieces.erase(std::remove(usedPieces.begin(), usedPieces.end(), piece));
|
||||
}
|
||||
|
||||
void TorrentMan::completePiece(const Piece& piece) {
|
||||
|
|
|
@ -78,6 +78,7 @@ private:
|
|||
bool setupComplete;
|
||||
const Logger* logger;
|
||||
Peers activePeers;
|
||||
bool halt;
|
||||
|
||||
FILE* openSegFile(const string& segFilename, const string& mode) const;
|
||||
void read(FILE* file);
|
||||
|
@ -127,6 +128,7 @@ public:
|
|||
bool isPeerAvailable() const;
|
||||
int deleteOldErrorPeers(int maxNum);
|
||||
|
||||
bool hasMissingPiece(const Peer* peer) const;
|
||||
int getMissingPieceIndex(const Peer* peer) const;
|
||||
int getMissingFastPieceIndex(const Peer* peer) const;
|
||||
Piece getMissingPiece(const Peer* peer);
|
||||
|
@ -260,6 +262,11 @@ public:
|
|||
activePeers.erase(itr);
|
||||
}
|
||||
|
||||
bool isHalt() const { return halt; }
|
||||
void setHalt(bool halt) {
|
||||
this->halt = halt;
|
||||
}
|
||||
|
||||
enum FILE_MODE {
|
||||
SINGLE,
|
||||
MULTI
|
||||
|
|
|
@ -36,8 +36,7 @@ TrackerUpdateCommand::TrackerUpdateCommand(int cuid, TorrentDownloadEngine*e):Co
|
|||
TrackerUpdateCommand::~TrackerUpdateCommand() {}
|
||||
|
||||
bool TrackerUpdateCommand::prepareForRetry() {
|
||||
Command* sleepCommand = new SleepCommand(cuid, e, this, 5);
|
||||
e->commands.push_back(sleepCommand);
|
||||
e->commands.push_back(this);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -174,6 +173,10 @@ bool TrackerUpdateCommand::execute() {
|
|||
|
||||
e->segmentMan->init();
|
||||
|
||||
return prepareForRetry();
|
||||
if(e->torrentMan->isHalt()) {
|
||||
return true;
|
||||
} else {
|
||||
return prepareForRetry();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
*/
|
||||
/* copyright --> */
|
||||
#include "TrackerWatcherCommand.h"
|
||||
#include "SleepCommand.h"
|
||||
#include "InitiateConnectionCommandFactory.h"
|
||||
#include "Util.h"
|
||||
|
||||
|
@ -37,16 +36,17 @@ TrackerWatcherCommand::~TrackerWatcherCommand() {}
|
|||
bool TrackerWatcherCommand::execute() {
|
||||
struct timeval now;
|
||||
gettimeofday(&now, NULL);
|
||||
|
||||
if(e->torrentMan->trackers == 0 &&
|
||||
Util::difftvsec(now, checkPoint) >= interval) {
|
||||
(Util::difftvsec(now, checkPoint) >= interval || e->torrentMan->isHalt())) {
|
||||
checkPoint = now;
|
||||
e->torrentMan->req->resetTryCount();
|
||||
int numWant = 50;
|
||||
if(e->torrentMan->connections >= MIN_PEERS) {
|
||||
if(e->torrentMan->connections >= MIN_PEERS || e->torrentMan->isHalt()) {
|
||||
numWant = 0;
|
||||
}
|
||||
if(e->torrentMan->downloadComplete()) {
|
||||
if(e->torrentMan->isHalt()) {
|
||||
e->torrentMan->req->setTrackerEvent(Request::STOPPED);
|
||||
} else if(e->torrentMan->downloadComplete()) {
|
||||
if(e->torrentMan->req->getTrackerEvent() == Request::COMPLETED) {
|
||||
e->torrentMan->req->setTrackerEvent(Request::AFTER_COMPLETED);
|
||||
} else {
|
||||
|
@ -92,6 +92,9 @@ bool TrackerWatcherCommand::execute() {
|
|||
e->torrentMan->trackers++;
|
||||
logger->info("CUID#%d - creating new tracker request command #%d", cuid,
|
||||
command->getCuid());
|
||||
if(e->torrentMan->isHalt()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
interval = e->torrentMan->minInterval;
|
||||
e->commands.push_back(this);
|
||||
|
|
46
src/Util.cc
46
src/Util.cc
|
@ -410,3 +410,49 @@ Integers Util::computeFastSet(string ipaddr, const unsigned char* infoHash,
|
|||
return fastSet;
|
||||
}
|
||||
|
||||
/*
|
||||
int Util::countBit(unsigned int n) {
|
||||
int count = 0;
|
||||
while(n > 0) {
|
||||
count++;
|
||||
n &= (n-1);
|
||||
}
|
||||
return count;
|
||||
}
|
||||
*/
|
||||
|
||||
static int nbits[] = {
|
||||
0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, 4,
|
||||
1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5,
|
||||
1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5,
|
||||
2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
|
||||
1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5,
|
||||
2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
|
||||
2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
|
||||
3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7,
|
||||
1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5,
|
||||
2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
|
||||
2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
|
||||
3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7,
|
||||
2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
|
||||
3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7,
|
||||
3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7,
|
||||
4, 5, 5, 6, 5, 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8,
|
||||
};
|
||||
|
||||
int Util::countBit(unsigned int n) {
|
||||
/*
|
||||
return
|
||||
nbits[n & 0xffu]+
|
||||
nbits[(n >> 8) & 0xffu]+
|
||||
nbits[(n >> 16) & 0xffu]+
|
||||
nbits[(n >> 24) & 0xffu];
|
||||
*/
|
||||
int count = 0;
|
||||
int size = sizeof(unsigned int);
|
||||
for(int i = 0; i < size; i++) {
|
||||
count += nbits[(n >> i*8) & 0xffu];
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
|
|
@ -84,6 +84,8 @@ public:
|
|||
|
||||
static Integers computeFastSet(string ipaddr, const unsigned char* infoHash,
|
||||
int pieces, int fastSetSize);
|
||||
|
||||
static int countBit(unsigned int);
|
||||
};
|
||||
|
||||
#endif // _D_UTIL_H_
|
||||
|
|
19
src/main.cc
19
src/main.cc
|
@ -73,10 +73,10 @@ void printDownloadAbortMessage() {
|
|||
printf(_("\nThe download was not complete because of errors. Check the log.\n"));
|
||||
}
|
||||
|
||||
void setSignalHander(int signal, void (*handler)(int)) {
|
||||
void setSignalHander(int signal, void (*handler)(int), int flags) {
|
||||
struct sigaction sigact;
|
||||
sigact.sa_handler = handler;
|
||||
sigact.sa_flags = 0;
|
||||
sigact.sa_flags = flags;
|
||||
sigemptyset(&sigact.sa_mask);
|
||||
sigaction(signal, &sigact, NULL);
|
||||
}
|
||||
|
@ -96,6 +96,7 @@ void handler(int signal) {
|
|||
}
|
||||
|
||||
void torrentHandler(int signal) {
|
||||
/*
|
||||
printf(_("\nstopping application...\n"));
|
||||
fflush(stdout);
|
||||
te->torrentMan->diskAdaptor->closeFile();
|
||||
|
@ -110,6 +111,8 @@ void torrentHandler(int signal) {
|
|||
delete te;
|
||||
printf(_("done\n"));
|
||||
exit(0);
|
||||
*/
|
||||
te->torrentMan->setHalt(true);
|
||||
}
|
||||
|
||||
void addCommand(int cuid, const string& url, string referer, Requests& requests) {
|
||||
|
@ -596,13 +599,13 @@ int main(int argc, char* argv[]) {
|
|||
exit(1);
|
||||
}
|
||||
|
||||
setSignalHander(SIGPIPE, SIG_IGN);
|
||||
setSignalHander(SIGPIPE, SIG_IGN, 0);
|
||||
|
||||
bool readyToTorrentMode = false;
|
||||
string downloadedTorrentFile;
|
||||
if(torrentFile.empty()) {
|
||||
setSignalHander(SIGINT, handler);
|
||||
setSignalHander(SIGTERM, handler);
|
||||
setSignalHander(SIGINT, handler, 0);
|
||||
setSignalHander(SIGTERM, handler, 0);
|
||||
|
||||
e = new ConsoleDownloadEngine();
|
||||
e->option = op;
|
||||
|
@ -644,8 +647,8 @@ int main(int argc, char* argv[]) {
|
|||
if(!torrentFile.empty() || followTorrent && readyToTorrentMode) {
|
||||
try {
|
||||
//op->put(PREF_MAX_TRIES, "0");
|
||||
setSignalHander(SIGINT, torrentHandler);
|
||||
setSignalHander(SIGTERM, torrentHandler);
|
||||
setSignalHander(SIGINT, torrentHandler, SA_ONESHOT);
|
||||
setSignalHander(SIGTERM, torrentHandler, SA_ONESHOT);
|
||||
|
||||
Request* req = new Request();
|
||||
req->isTorrent = true;
|
||||
|
@ -717,8 +720,6 @@ int main(int argc, char* argv[]) {
|
|||
|
||||
if(te->torrentMan->downloadComplete()) {
|
||||
printDownloadCompeleteMessage();
|
||||
} else {
|
||||
printDownloadAbortMessage();
|
||||
}
|
||||
delete req;
|
||||
te->cleanQueue();
|
||||
|
|
Loading…
Reference in New Issue