2010-06-09 Tatsuhiro Tsujikawa <t-tujikawa@users.sourceforge.net>

Made protected member variable private. Added accessor funcs.
	* src/InitiatorMSEHandshakeCommand.cc
	* src/PeerAbstractCommand.cc
	* src/PeerAbstractCommand.h
	* src/PeerInitiateConnectionCommand.cc
	* src/PeerInteractionCommand.cc
	* src/PeerInteractionCommand.h
	* src/PeerReceiveHandshakeCommand.cc
	* src/ReceiverMSEHandshakeCommand.cc
pull/1/head
Tatsuhiro Tsujikawa 2010-06-09 14:02:09 +00:00
parent bc50b5eea2
commit 43460a347f
9 changed files with 213 additions and 173 deletions

View File

@ -1,3 +1,15 @@
2010-06-09 Tatsuhiro Tsujikawa <t-tujikawa@users.sourceforge.net>
Made protected member variable private. Added accessor funcs.
* src/InitiatorMSEHandshakeCommand.cc
* src/PeerAbstractCommand.cc
* src/PeerAbstractCommand.h
* src/PeerInitiateConnectionCommand.cc
* src/PeerInteractionCommand.cc
* src/PeerInteractionCommand.h
* src/PeerReceiveHandshakeCommand.cc
* src/ReceiverMSEHandshakeCommand.cc
2010-06-09 Tatsuhiro Tsujikawa <t-tujikawa@users.sourceforge.net>
Code cleanup

View File

@ -73,10 +73,10 @@ InitiatorMSEHandshakeCommand::InitiatorMSEHandshakeCommand
_requestGroup(requestGroup),
_btRuntime(btRuntime),
_sequence(INITIATOR_SEND_KEY),
_mseHandshake(new MSEHandshake(cuid, socket, getOption().get()))
_mseHandshake(new MSEHandshake(cuid, s, getOption().get()))
{
disableReadCheckSocket();
setWriteCheckSocket(socket);
setWriteCheckSocket(getSocket());
setTimeout(getOption()->getAsInt(PREF_PEER_CONNECTION_TIMEOUT));
_btRuntime->increaseConnections();
@ -94,18 +94,18 @@ InitiatorMSEHandshakeCommand::~InitiatorMSEHandshakeCommand()
bool InitiatorMSEHandshakeCommand::executeInternal() {
switch(_sequence) {
case INITIATOR_SEND_KEY: {
if(!socket->isWritable(0)) {
if(!getSocket()->isWritable(0)) {
break;
}
disableWriteCheckSocket();
setReadCheckSocket(socket);
setReadCheckSocket(getSocket());
//socket->setBlockingMode();
setTimeout(getOption()->getAsInt(PREF_BT_TIMEOUT));
_mseHandshake->initEncryptionFacility(true);
if(_mseHandshake->sendPublicKey()) {
_sequence = INITIATOR_WAIT_KEY;
} else {
setWriteCheckSocket(socket);
setWriteCheckSocket(getSocket());
_sequence = INITIATOR_SEND_KEY_PENDING;
}
break;
@ -123,7 +123,7 @@ bool InitiatorMSEHandshakeCommand::executeInternal() {
if(_mseHandshake->sendInitiatorStep2()) {
_sequence = INITIATOR_FIND_VC_MARKER;
} else {
setWriteCheckSocket(socket);
setWriteCheckSocket(getSocket());
_sequence = INITIATOR_SEND_STEP2_PENDING;
}
}
@ -150,25 +150,26 @@ bool InitiatorMSEHandshakeCommand::executeInternal() {
case INITIATOR_RECEIVE_PAD_D: {
if(_mseHandshake->receivePad()) {
SharedHandle<PeerConnection> peerConnection
(new PeerConnection(getCuid(), socket));
(new PeerConnection(getCuid(), getSocket()));
if(_mseHandshake->getNegotiatedCryptoType() == MSEHandshake::CRYPTO_ARC4){
peerConnection->enableEncryption(_mseHandshake->getEncryptor(),
_mseHandshake->getDecryptor());
}
PeerInteractionCommand* c =
new PeerInteractionCommand
(getCuid(), _requestGroup, peer, e, _btRuntime, _pieceStorage,
(getCuid(), _requestGroup, getPeer(), getDownloadEngine(), _btRuntime,
_pieceStorage,
_peerStorage,
socket,
getSocket(),
PeerInteractionCommand::INITIATOR_SEND_HANDSHAKE,
peerConnection);
e->addCommand(c);
getDownloadEngine()->addCommand(c);
return true;
}
break;
}
}
e->addCommand(this);
getDownloadEngine()->addCommand(this);
return false;
}
@ -183,13 +184,13 @@ bool InitiatorMSEHandshakeCommand::prepareForNextPeer(time_t wait)
}
if(_peerStorage->isPeerAvailable() && _btRuntime->lessThanEqMinPeers()) {
SharedHandle<Peer> peer = _peerStorage->getUnusedPeer();
peer->usedBy(e->newCUID());
peer->usedBy(getDownloadEngine()->newCUID());
PeerInitiateConnectionCommand* command =
new PeerInitiateConnectionCommand(peer->usedBy(), _requestGroup, peer,
e, _btRuntime);
getDownloadEngine(), _btRuntime);
command->setPeerStorage(_peerStorage);
command->setPieceStorage(_pieceStorage);
e->addCommand(command);
getDownloadEngine()->addCommand(command);
}
return true;
} else {
@ -199,11 +200,11 @@ bool InitiatorMSEHandshakeCommand::prepareForNextPeer(time_t wait)
util::itos(getCuid()).c_str());
}
PeerInitiateConnectionCommand* command =
new PeerInitiateConnectionCommand(getCuid(), _requestGroup, peer, e,
_btRuntime, false);
new PeerInitiateConnectionCommand(getCuid(), _requestGroup, getPeer(),
getDownloadEngine(), _btRuntime, false);
command->setPeerStorage(_peerStorage);
command->setPieceStorage(_pieceStorage);
e->addCommand(command);
getDownloadEngine()->addCommand(command);
return true;
}
}
@ -211,7 +212,7 @@ bool InitiatorMSEHandshakeCommand::prepareForNextPeer(time_t wait)
void InitiatorMSEHandshakeCommand::onAbort()
{
if(getOption()->getAsBool(PREF_BT_REQUIRE_CRYPTO)) {
_peerStorage->returnPeer(peer);
_peerStorage->returnPeer(getPeer());
}
}

View File

@ -58,19 +58,19 @@ PeerAbstractCommand::PeerAbstractCommand(cuid_t cuid,
DownloadEngine* e,
const SocketHandle& s):
Command(cuid),
checkPoint(global::wallclock),
e(e),
socket(s),
peer(peer),
checkSocketIsReadable(false),
checkSocketIsWritable(false),
noCheck(false)
{
if(!socket.isNull() && socket->isOpen()) {
setReadCheckSocket(socket);
}
_checkPoint(global::wallclock),
// TODO referring global option
timeout = e->getOption()->getAsInt(PREF_BT_TIMEOUT);
_timeout(e->getOption()->getAsInt(PREF_BT_TIMEOUT)),
_e(e),
_socket(s),
_peer(peer),
_checkSocketIsReadable(false),
_checkSocketIsWritable(false),
_noCheck(false)
{
if(!_socket.isNull() && _socket->isOpen()) {
setReadCheckSocket(_socket);
}
}
PeerAbstractCommand::~PeerAbstractCommand()
@ -87,24 +87,24 @@ bool PeerAbstractCommand::execute()
util::itos(getCuid()).c_str(),
readEventEnabled(), writeEventEnabled(),
hupEventEnabled(), errorEventEnabled(),
noCheck);
_noCheck);
}
if(exitBeforeExecute()) {
onAbort();
return true;
}
try {
if(noCheck ||
(checkSocketIsReadable && readEventEnabled()) ||
(checkSocketIsWritable && writeEventEnabled()) ||
if(_noCheck ||
(_checkSocketIsReadable && readEventEnabled()) ||
(_checkSocketIsWritable && writeEventEnabled()) ||
hupEventEnabled()) {
checkPoint = global::wallclock;
_checkPoint = global::wallclock;
} else if(errorEventEnabled()) {
throw DL_ABORT_EX
(StringFormat(MSG_NETWORK_PROBLEM,
socket->getSocketError().c_str()).str());
_socket->getSocketError().c_str()).str());
}
if(checkPoint.difference(global::wallclock) >= timeout) {
if(_checkPoint.difference(global::wallclock) >= _timeout) {
throw DL_ABORT_EX(EX_TIME_OUT);
}
return executeInternal();
@ -118,8 +118,8 @@ bool PeerAbstractCommand::execute()
getLogger()->debug(MSG_TORRENT_DOWNLOAD_ABORTED, err,
util::itos(getCuid()).c_str());
getLogger()->debug(MSG_PEER_BANNED,
util::itos(getCuid()).c_str(), peer->ipaddr.c_str(),
peer->port);
util::itos(getCuid()).c_str(), _peer->ipaddr.c_str(),
_peer->port);
}
onAbort();
return prepareForNextPeer(0);
@ -134,10 +134,10 @@ bool PeerAbstractCommand::prepareForNextPeer(time_t wait)
void PeerAbstractCommand::disableReadCheckSocket()
{
if(checkSocketIsReadable) {
e->deleteSocketForReadCheck(readCheckTarget, this);
checkSocketIsReadable = false;
readCheckTarget = SocketHandle();
if(_checkSocketIsReadable) {
_e->deleteSocketForReadCheck(_readCheckTarget, this);
_checkSocketIsReadable = false;
_readCheckTarget.reset();
}
}
@ -146,26 +146,26 @@ void PeerAbstractCommand::setReadCheckSocket(const SocketHandle& socket)
if(!socket->isOpen()) {
disableReadCheckSocket();
} else {
if(checkSocketIsReadable) {
if(readCheckTarget != socket) {
e->deleteSocketForReadCheck(readCheckTarget, this);
e->addSocketForReadCheck(socket, this);
readCheckTarget = socket;
if(_checkSocketIsReadable) {
if(_readCheckTarget != socket) {
_e->deleteSocketForReadCheck(_readCheckTarget, this);
_e->addSocketForReadCheck(socket, this);
_readCheckTarget = socket;
}
} else {
e->addSocketForReadCheck(socket, this);
checkSocketIsReadable = true;
readCheckTarget = socket;
_e->addSocketForReadCheck(socket, this);
_checkSocketIsReadable = true;
_readCheckTarget = socket;
}
}
}
void PeerAbstractCommand::disableWriteCheckSocket()
{
if(checkSocketIsWritable) {
e->deleteSocketForWriteCheck(writeCheckTarget, this);
checkSocketIsWritable = false;
writeCheckTarget = SocketHandle();
if(_checkSocketIsWritable) {
_e->deleteSocketForWriteCheck(_writeCheckTarget, this);
_checkSocketIsWritable = false;
_writeCheckTarget.reset();
}
}
@ -174,28 +174,33 @@ void PeerAbstractCommand::setWriteCheckSocket(const SocketHandle& socket)
if(!socket->isOpen()) {
disableWriteCheckSocket();
} else {
if(checkSocketIsWritable) {
if(writeCheckTarget != socket) {
e->deleteSocketForWriteCheck(writeCheckTarget, this);
e->addSocketForWriteCheck(socket, this);
writeCheckTarget = socket;
if(_checkSocketIsWritable) {
if(_writeCheckTarget != socket) {
_e->deleteSocketForWriteCheck(_writeCheckTarget, this);
_e->addSocketForWriteCheck(socket, this);
_writeCheckTarget = socket;
}
} else {
e->addSocketForWriteCheck(socket, this);
checkSocketIsWritable = true;
writeCheckTarget = socket;
_e->addSocketForWriteCheck(socket, this);
_checkSocketIsWritable = true;
_writeCheckTarget = socket;
}
}
}
void PeerAbstractCommand::setNoCheck(bool check)
{
this->noCheck = check;
_noCheck = check;
}
void PeerAbstractCommand::updateKeepAlive()
{
checkPoint = global::wallclock;
_checkPoint = global::wallclock;
}
void PeerAbstractCommand::createSocket()
{
_socket.reset(new SocketCore());
}
} // namespace aria2

View File

@ -48,14 +48,36 @@ class SocketCore;
class PeerAbstractCommand : public Command {
private:
Timer checkPoint;
time_t timeout;
protected:
DownloadEngine* e;
SharedHandle<SocketCore> socket;
SharedHandle<Peer> peer;
Timer _checkPoint;
time_t _timeout;
DownloadEngine* _e;
SharedHandle<SocketCore> _socket;
SharedHandle<Peer> _peer;
void setTimeout(time_t timeout) { this->timeout = timeout; }
bool _checkSocketIsReadable;
bool _checkSocketIsWritable;
SharedHandle<SocketCore> _readCheckTarget;
SharedHandle<SocketCore> _writeCheckTarget;
bool _noCheck;
protected:
DownloadEngine* getDownloadEngine() const
{
return _e;
}
const SharedHandle<SocketCore>& getSocket() const
{
return _socket;
}
void createSocket();
const SharedHandle<Peer>& getPeer() const
{
return _peer;
}
void setTimeout(time_t timeout) { _timeout = timeout; }
virtual bool prepareForNextPeer(time_t wait);
virtual void onAbort() {};
// This function is called when DownloadFailureException is caught right after
@ -69,12 +91,6 @@ protected:
void disableWriteCheckSocket();
void setNoCheck(bool check);
void updateKeepAlive();
private:
bool checkSocketIsReadable;
bool checkSocketIsWritable;
SharedHandle<SocketCore> readCheckTarget;
SharedHandle<SocketCore> writeCheckTarget;
bool noCheck;
public:
PeerAbstractCommand(cuid_t cuid,
const SharedHandle<Peer>& peer,

View File

@ -82,25 +82,26 @@ PeerInitiateConnectionCommand::~PeerInitiateConnectionCommand()
bool PeerInitiateConnectionCommand::executeInternal() {
if(getLogger()->info()) {
getLogger()->info(MSG_CONNECTING_TO_SERVER,
util::itos(getCuid()).c_str(), peer->ipaddr.c_str(),
peer->port);
util::itos(getCuid()).c_str(), getPeer()->ipaddr.c_str(),
getPeer()->port);
}
socket.reset(new SocketCore());
socket->establishConnection(peer->ipaddr, peer->port);
createSocket();
getSocket()->establishConnection(getPeer()->ipaddr, getPeer()->port);
if(_mseHandshakeEnabled) {
InitiatorMSEHandshakeCommand* c =
new InitiatorMSEHandshakeCommand(getCuid(), _requestGroup, peer, e,
_btRuntime, socket);
new InitiatorMSEHandshakeCommand(getCuid(), _requestGroup, getPeer(),
getDownloadEngine(),
_btRuntime, getSocket());
c->setPeerStorage(_peerStorage);
c->setPieceStorage(_pieceStorage);
e->addCommand(c);
getDownloadEngine()->addCommand(c);
} else {
PeerInteractionCommand* command =
new PeerInteractionCommand
(getCuid(), _requestGroup, peer, e, _btRuntime, _pieceStorage,
_peerStorage,
socket, PeerInteractionCommand::INITIATOR_SEND_HANDSHAKE);
e->addCommand(command);
(getCuid(), _requestGroup, getPeer(), getDownloadEngine(),
_btRuntime, _pieceStorage, _peerStorage,
getSocket(), PeerInteractionCommand::INITIATOR_SEND_HANDSHAKE);
getDownloadEngine()->addCommand(command);
}
return true;
}
@ -109,19 +110,19 @@ bool PeerInitiateConnectionCommand::executeInternal() {
bool PeerInitiateConnectionCommand::prepareForNextPeer(time_t wait) {
if(_peerStorage->isPeerAvailable() && _btRuntime->lessThanEqMinPeers()) {
SharedHandle<Peer> peer = _peerStorage->getUnusedPeer();
peer->usedBy(e->newCUID());
peer->usedBy(getDownloadEngine()->newCUID());
PeerInitiateConnectionCommand* command =
new PeerInitiateConnectionCommand(peer->usedBy(), _requestGroup, peer, e,
_btRuntime);
new PeerInitiateConnectionCommand(peer->usedBy(), _requestGroup, peer,
getDownloadEngine(), _btRuntime);
command->setPeerStorage(_peerStorage);
command->setPieceStorage(_pieceStorage);
e->addCommand(command);
getDownloadEngine()->addCommand(command);
}
return true;
}
void PeerInitiateConnectionCommand::onAbort() {
_peerStorage->returnPeer(peer);
_peerStorage->returnPeer(getPeer());
}
bool PeerInitiateConnectionCommand::exitBeforeExecute()

View File

@ -94,12 +94,12 @@ PeerInteractionCommand::PeerInteractionCommand
_btRuntime(btRuntime),
_pieceStorage(pieceStorage),
_peerStorage(peerStorage),
sequence(sequence)
_sequence(sequence)
{
// TODO move following bunch of processing to separate method, like init()
if(sequence == INITIATOR_SEND_HANDSHAKE) {
if(_sequence == INITIATOR_SEND_HANDSHAKE) {
disableReadCheckSocket();
setWriteCheckSocket(socket);
setWriteCheckSocket(getSocket());
setTimeout(getOption()->getAsInt(PREF_PEER_CONNECTION_TIMEOUT));
}
@ -119,7 +119,7 @@ PeerInteractionCommand::PeerInteractionCommand
}
SharedHandle<DefaultExtensionMessageFactory> extensionMessageFactory
(new DefaultExtensionMessageFactory(peer, exMsgRegistry));
(new DefaultExtensionMessageFactory(getPeer(), exMsgRegistry));
extensionMessageFactory->setPeerStorage(peerStorage);
extensionMessageFactory->setDownloadContext
(_requestGroup->getDownloadContext());
@ -133,7 +133,7 @@ PeerInteractionCommand::PeerInteractionCommand
factory->setPieceStorage(pieceStorage);
factory->setPeerStorage(peerStorage);
factory->setExtensionMessageFactory(extensionMessageFactory);
factory->setPeer(peer);
factory->setPeer(getPeer());
factory->setLocalNode(DHTRegistry::_localNode);
factory->setRoutingTable(DHTRegistry::_routingTable);
factory->setTaskQueue(DHTRegistry::_taskQueue);
@ -144,7 +144,7 @@ PeerInteractionCommand::PeerInteractionCommand
PeerConnectionHandle peerConnection;
if(passedPeerConnection.isNull()) {
peerConnection.reset(new PeerConnection(cuid, socket));
peerConnection.reset(new PeerConnection(cuid, getSocket()));
} else {
peerConnection = passedPeerConnection;
}
@ -152,17 +152,17 @@ PeerInteractionCommand::PeerInteractionCommand
SharedHandle<DefaultBtMessageDispatcher> dispatcher
(new DefaultBtMessageDispatcher());
dispatcher->setCuid(cuid);
dispatcher->setPeer(peer);
dispatcher->setPeer(getPeer());
dispatcher->setDownloadContext(_requestGroup->getDownloadContext());
dispatcher->setPieceStorage(pieceStorage);
dispatcher->setPeerStorage(peerStorage);
dispatcher->setRequestTimeout(getOption()->getAsInt(PREF_BT_REQUEST_TIMEOUT));
dispatcher->setBtMessageFactory(factory);
dispatcher->setRequestGroupMan(e->getRequestGroupMan());
dispatcher->setRequestGroupMan(getDownloadEngine()->getRequestGroupMan());
DefaultBtMessageReceiverHandle receiver(new DefaultBtMessageReceiver());
receiver->setCuid(cuid);
receiver->setPeer(peer);
receiver->setPeer(getPeer());
receiver->setDownloadContext(_requestGroup->getDownloadContext());
receiver->setPeerConnection(peerConnection);
receiver->setDispatcher(dispatcher);
@ -171,13 +171,13 @@ PeerInteractionCommand::PeerInteractionCommand
SharedHandle<DefaultBtRequestFactory> reqFactory
(new DefaultBtRequestFactory());
reqFactory->setCuid(cuid);
reqFactory->setPeer(peer);
reqFactory->setPeer(getPeer());
reqFactory->setPieceStorage(pieceStorage);
reqFactory->setBtMessageDispatcher(dispatcher);
reqFactory->setBtMessageFactory(factory);
DefaultBtInteractiveHandle btInteractive
(new DefaultBtInteractive(_requestGroup->getDownloadContext(), peer));
(new DefaultBtInteractive(_requestGroup->getDownloadContext(), getPeer()));
btInteractive->setBtRuntime(_btRuntime);
btInteractive->setPieceStorage(_pieceStorage);
btInteractive->setPeerStorage(peerStorage); // Note: Not a member variable.
@ -190,10 +190,10 @@ PeerInteractionCommand::PeerInteractionCommand
btInteractive->setExtensionMessageRegistry(exMsgRegistry);
btInteractive->setKeepAliveInterval
(getOption()->getAsInt(PREF_BT_KEEP_ALIVE_INTERVAL));
btInteractive->setRequestGroupMan(e->getRequestGroupMan());
btInteractive->setRequestGroupMan(getDownloadEngine()->getRequestGroupMan());
btInteractive->setBtMessageFactory(factory);
if((metadataGetMode || torrentAttrs[bittorrent::PRIVATE].i() == 0) &&
!peer->isLocalPeer()) {
!getPeer()->isLocalPeer()) {
if(getOption()->getAsBool(PREF_ENABLE_PEER_EXCHANGE)) {
btInteractive->setUTPexEnabled(true);
}
@ -209,7 +209,7 @@ PeerInteractionCommand::PeerInteractionCommand
btInteractive->enableMetadataGetMode();
}
this->btInteractive = btInteractive;
_btInteractive = btInteractive;
// reverse depends
factory->setBtMessageDispatcher(dispatcher);
@ -224,26 +224,26 @@ PeerInteractionCommand::PeerInteractionCommand
(_requestGroup->getDownloadContext());
utMetadataRequestFactory->setBtMessageDispatcher(dispatcher);
utMetadataRequestFactory->setBtMessageFactory(factory);
utMetadataRequestFactory->setPeer(peer);
utMetadataRequestFactory->setPeer(getPeer());
utMetadataRequestFactory->setUTMetadataRequestTracker
(utMetadataRequestTracker);
}
peer->allocateSessionResource
getPeer()->allocateSessionResource
(_requestGroup->getDownloadContext()->getPieceLength(),
_requestGroup->getDownloadContext()->getTotalLength());
peer->setBtMessageDispatcher(dispatcher);
getPeer()->setBtMessageDispatcher(dispatcher);
_btRuntime->increaseConnections();
_requestGroup->increaseNumCommand();
}
PeerInteractionCommand::~PeerInteractionCommand() {
if(peer->getCompletedLength() > 0) {
_pieceStorage->subtractPieceStats(peer->getBitfield(),
peer->getBitfieldLength());
if(getPeer()->getCompletedLength() > 0) {
_pieceStorage->subtractPieceStats(getPeer()->getBitfield(),
getPeer()->getBitfieldLength());
}
peer->releaseSessionResource();
getPeer()->releaseSessionResource();
_requestGroup->decreaseNumCommand();
_btRuntime->decreaseConnections();
@ -251,75 +251,76 @@ PeerInteractionCommand::~PeerInteractionCommand() {
bool PeerInteractionCommand::executeInternal() {
setNoCheck(false);
switch(sequence) {
switch(_sequence) {
case INITIATOR_SEND_HANDSHAKE:
if(!socket->isWritable(0)) {
if(!getSocket()->isWritable(0)) {
break;
}
disableWriteCheckSocket();
setReadCheckSocket(socket);
setReadCheckSocket(getSocket());
//socket->setBlockingMode();
setTimeout(getOption()->getAsInt(PREF_BT_TIMEOUT));
btInteractive->initiateHandshake();
sequence = INITIATOR_WAIT_HANDSHAKE;
_btInteractive->initiateHandshake();
_sequence = INITIATOR_WAIT_HANDSHAKE;
break;
case INITIATOR_WAIT_HANDSHAKE: {
if(btInteractive->countPendingMessage() > 0) {
btInteractive->sendPendingMessage();
if(btInteractive->countPendingMessage() > 0) {
if(_btInteractive->countPendingMessage() > 0) {
_btInteractive->sendPendingMessage();
if(_btInteractive->countPendingMessage() > 0) {
break;
}
}
BtMessageHandle handshakeMessage = btInteractive->receiveHandshake();
BtMessageHandle handshakeMessage = _btInteractive->receiveHandshake();
if(handshakeMessage.isNull()) {
break;
}
btInteractive->doPostHandshakeProcessing();
sequence = WIRED;
_btInteractive->doPostHandshakeProcessing();
_sequence = WIRED;
break;
}
case RECEIVER_WAIT_HANDSHAKE: {
BtMessageHandle handshakeMessage = btInteractive->receiveAndSendHandshake();
BtMessageHandle handshakeMessage =_btInteractive->receiveAndSendHandshake();
if(handshakeMessage.isNull()) {
break;
}
btInteractive->doPostHandshakeProcessing();
sequence = WIRED;
_btInteractive->doPostHandshakeProcessing();
_sequence = WIRED;
break;
}
case WIRED:
// See the comment for writable check below.
disableWriteCheckSocket();
btInteractive->doInteractionProcessing();
if(btInteractive->countReceivedMessageInIteration() > 0) {
_btInteractive->doInteractionProcessing();
if(_btInteractive->countReceivedMessageInIteration() > 0) {
updateKeepAlive();
}
if((peer->amInterested() && !peer->peerChoking()) ||
btInteractive->countOutstandingRequest() ||
(peer->peerInterested() && !peer->amChoking())) {
if((getPeer()->amInterested() && !getPeer()->peerChoking()) ||
_btInteractive->countOutstandingRequest() ||
(getPeer()->peerInterested() && !getPeer()->amChoking())) {
// Writable check to avoid slow seeding
if(btInteractive->isSendingMessageInProgress()) {
setWriteCheckSocket(socket);
if(_btInteractive->isSendingMessageInProgress()) {
setWriteCheckSocket(getSocket());
}
if(e->getRequestGroupMan()->doesOverallDownloadSpeedExceed() ||
if(getDownloadEngine()->getRequestGroupMan()->
doesOverallDownloadSpeedExceed() ||
_requestGroup->doesDownloadSpeedExceed()) {
disableReadCheckSocket();
setNoCheck(true);
} else {
setReadCheckSocket(socket);
setReadCheckSocket(getSocket());
}
} else {
disableReadCheckSocket();
}
break;
}
if(btInteractive->countPendingMessage() > 0) {
if(_btInteractive->countPendingMessage() > 0) {
setNoCheck(true);
}
e->addCommand(this);
getDownloadEngine()->addCommand(this);
return false;
}
@ -327,20 +328,20 @@ bool PeerInteractionCommand::executeInternal() {
bool PeerInteractionCommand::prepareForNextPeer(time_t wait) {
if(_peerStorage->isPeerAvailable() && _btRuntime->lessThanEqMinPeers()) {
SharedHandle<Peer> peer = _peerStorage->getUnusedPeer();
peer->usedBy(e->newCUID());
peer->usedBy(getDownloadEngine()->newCUID());
PeerInitiateConnectionCommand* command =
new PeerInitiateConnectionCommand
(peer->usedBy(), _requestGroup, peer, e, _btRuntime);
(peer->usedBy(), _requestGroup, peer, getDownloadEngine(), _btRuntime);
command->setPeerStorage(_peerStorage);
command->setPieceStorage(_pieceStorage);
e->addCommand(command);
getDownloadEngine()->addCommand(command);
}
return true;
}
void PeerInteractionCommand::onAbort() {
btInteractive->cancelAllPiece();
_peerStorage->returnPeer(peer);
_btInteractive->cancelAllPiece();
_peerStorage->returnPeer(getPeer());
}
void PeerInteractionCommand::onFailure()

View File

@ -63,8 +63,8 @@ private:
SharedHandle<PeerStorage> _peerStorage;
Seq sequence;
SharedHandle<BtInteractive> btInteractive;
Seq _sequence;
SharedHandle<BtInteractive> _btInteractive;
const SharedHandle<Option>& getOption() const;
protected:

View File

@ -73,7 +73,7 @@ PeerReceiveHandshakeCommand::PeerReceiveHandshakeCommand
_peerConnection(peerConnection)
{
if(_peerConnection.isNull()) {
_peerConnection.reset(new PeerConnection(cuid, socket));
_peerConnection.reset(new PeerConnection(cuid, getSocket()));
}
}
@ -81,7 +81,8 @@ PeerReceiveHandshakeCommand::~PeerReceiveHandshakeCommand() {}
bool PeerReceiveHandshakeCommand::exitBeforeExecute()
{
return e->isHaltRequested() || e->getRequestGroupMan()->downloadFinished();
return getDownloadEngine()->isHaltRequested() ||
getDownloadEngine()->getRequestGroupMan()->downloadFinished();
}
bool PeerReceiveHandshakeCommand::executeInternal()
@ -97,13 +98,13 @@ bool PeerReceiveHandshakeCommand::executeInternal()
std::string infoHash = std::string(&data[28], &data[28+INFO_HASH_LENGTH]);
SharedHandle<DownloadContext> downloadContext =
e->getBtRegistry()->getDownloadContext(infoHash);
getDownloadEngine()->getBtRegistry()->getDownloadContext(infoHash);
if(downloadContext.isNull()) {
throw DL_ABORT_EX
(StringFormat("Unknown info hash %s",
util::toHex(infoHash).c_str()).str());
}
BtObject btObject = e->getBtRegistry()->get
BtObject btObject = getDownloadEngine()->getBtRegistry()->get
(downloadContext->getOwnerRequestGroup()->getGID());
SharedHandle<BtRuntime> btRuntime = btObject._btRuntime;
SharedHandle<PieceStorage> pieceStorage = btObject._pieceStorage;
@ -128,33 +129,31 @@ bool PeerReceiveHandshakeCommand::executeInternal()
if((!pieceStorage->downloadFinished() &&
tstat.getDownloadSpeed() < thresholdSpeed) ||
btRuntime->lessThanMaxPeers()) {
if(peerStorage->addPeer(peer)) {
peer->usedBy(getCuid());
if(peerStorage->addPeer(getPeer())) {
getPeer()->usedBy(getCuid());
PeerInteractionCommand* command =
new PeerInteractionCommand
(getCuid(),
downloadContext->getOwnerRequestGroup(),
peer,
e,
getPeer(),
getDownloadEngine(),
btRuntime,
pieceStorage,
peerStorage,
socket,
getSocket(),
PeerInteractionCommand::RECEIVER_WAIT_HANDSHAKE,
_peerConnection);
e->addCommand(command);
getDownloadEngine()->addCommand(command);
if(getLogger()->debug()) {
getLogger()->debug(MSG_INCOMING_PEER_CONNECTION,
util::itos(getCuid()).c_str(),
util::itos(peer->usedBy()).c_str());
util::itos(getPeer()->usedBy()).c_str());
}
}
}
return true;
} else {
e->addCommand(this);
getDownloadEngine()->addCommand(this);
return false;
}
}

View File

@ -81,7 +81,8 @@ ReceiverMSEHandshakeCommand::~ReceiverMSEHandshakeCommand()
bool ReceiverMSEHandshakeCommand::exitBeforeExecute()
{
return e->isHaltRequested() || e->getRequestGroupMan()->downloadFinished();
return getDownloadEngine()->isHaltRequested() ||
getDownloadEngine()->getRequestGroupMan()->downloadFinished();
}
bool ReceiverMSEHandshakeCommand::executeInternal()
@ -97,18 +98,21 @@ bool ReceiverMSEHandshakeCommand::executeInternal()
_sequence = RECEIVER_WAIT_KEY;
break;
case MSEHandshake::HANDSHAKE_LEGACY: {
if(e->getOption()->getAsBool(PREF_BT_REQUIRE_CRYPTO)) {
if(getDownloadEngine()->getOption()->getAsBool(PREF_BT_REQUIRE_CRYPTO)) {
throw DL_ABORT_EX
("The legacy BitTorrent handshake is not acceptable by the"
" preference.");
}
SharedHandle<PeerConnection> peerConnection
(new PeerConnection(getCuid(), socket));
(new PeerConnection(getCuid(), getSocket()));
peerConnection->presetBuffer(_mseHandshake->getBuffer(),
_mseHandshake->getBufferLength());
Command* c = new PeerReceiveHandshakeCommand(getCuid(), peer, e, socket,
Command* c = new PeerReceiveHandshakeCommand(getCuid(),
getPeer(),
getDownloadEngine(),
getSocket(),
peerConnection);
e->addCommand(c);
getDownloadEngine()->addCommand(c);
return true;
}
default:
@ -121,7 +125,7 @@ bool ReceiverMSEHandshakeCommand::executeInternal()
if(_mseHandshake->sendPublicKey()) {
_sequence = RECEIVER_FIND_HASH_MARKER;
} else {
setWriteCheckSocket(socket);
setWriteCheckSocket(getSocket());
_sequence = RECEIVER_SEND_KEY_PENDING;
}
}
@ -141,7 +145,7 @@ bool ReceiverMSEHandshakeCommand::executeInternal()
}
case RECEIVER_RECEIVE_PAD_C_LENGTH: {
std::vector<SharedHandle<DownloadContext> > downloadContexts;
e->getBtRegistry()->getAllDownloadContext
getDownloadEngine()->getBtRegistry()->getAllDownloadContext
(std::back_inserter(downloadContexts));
if(_mseHandshake->receiveReceiverHashAndPadCLength(downloadContexts)) {
_sequence = RECEIVER_RECEIVE_PAD_C;
@ -166,7 +170,7 @@ bool ReceiverMSEHandshakeCommand::executeInternal()
createCommand();
return true;
} else {
setWriteCheckSocket(socket);
setWriteCheckSocket(getSocket());
_sequence = RECEIVER_SEND_STEP2_PENDING;
}
}
@ -180,14 +184,14 @@ bool ReceiverMSEHandshakeCommand::executeInternal()
}
break;
}
e->addCommand(this);
getDownloadEngine()->addCommand(this);
return false;
}
void ReceiverMSEHandshakeCommand::createCommand()
{
SharedHandle<PeerConnection> peerConnection
(new PeerConnection(getCuid(), socket));
(new PeerConnection(getCuid(), getSocket()));
if(_mseHandshake->getNegotiatedCryptoType() == MSEHandshake::CRYPTO_ARC4) {
peerConnection->enableEncryption(_mseHandshake->getEncryptor(),
_mseHandshake->getDecryptor());
@ -200,8 +204,9 @@ void ReceiverMSEHandshakeCommand::createCommand()
// as a hint. If this info hash and one in BitTorrent Handshake does not
// match, then drop connection.
Command* c =
new PeerReceiveHandshakeCommand(getCuid(), peer, e, socket, peerConnection);
e->addCommand(c);
new PeerReceiveHandshakeCommand(getCuid(), getPeer(), getDownloadEngine(),
getSocket(), peerConnection);
getDownloadEngine()->addCommand(c);
}
} // namespace aria2