2006-07-19 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>

* src/SharedHandle.h: New class.

	To wrap Socket, Command, PeerMessage and Peer with SharedHandle:

	* src/HttpResponseCommand.h
	(HttpResponseCommand): Wrapped Socket.
	* src/SocketCore.h
	(operator==): New function.
	(operator!=): New function.
	(operator<): New function.
	(getSockfd): New function.
	(isOpen): New function.
	(writeData): New function.
	* src/SocketCore.cc
	(operator==): New function.
	(operator!=): New function.
	(operator<): New function.
	* src/AbstractCommand.h
	(socket): Changed its type to SocketHandle.
	(setReadCheckSocket): Replaced Socket with SocketHandle.
	(setWriteCheckSocket): Replaced Socket with SocketHandle.
	(disableReadCheckSocket): New function.
	(disableWriteCheckSocket): New function.
	(readCheckTarget): Changed its type to SocketHandle.
	(writeCheckTarget): Changed its type to SocketHandle.
	(AbstractCommand): Replaced Socket with SocketHandle.
	* src/AbstractCommand.cc
	(AbstractCommand): Replaced Socket with SocketHandle.
	(~AbstractCommand): Removed the deallocation for Socket object.
	(disableReadCheckSocket): New function.
	(setReadCheckSocket): Replaced Socket with SocketHandle.
	(disableWriteCheckSocket): New function.
	(setWriteCheckSocket): Replaced Socket with SocketHandle.
	* src/HttpDownloadCommand.cc
	(DownloadCommand): Replaced Socket with SocketHandle.
	* src/PeerAbstractCommand.h
	(socket): Changed its type to SocketHandle.
	(peer): Changed its type to PeerHandle.
	(setReadCheckSocket): Replaced Socket with SocketHandle.
	(setWriteCheckSocket): Replaced Socket with SocketHandle.
	(disableReadCheckSocket): New function.
	(disableWriteCheckSocket): New function.
	(readCheckTarget): Changed its type to SocketHandle.
	(writeCheckTarget): Changed its type to SocketHandle.
	(PeerAbstractCommand): Replaced Socket with SocketHandle.
	Replaced Peer with PeerHandle.
	* src/HttpRequestCommand.cc
	(HttpRequestCommand): Replaced Socket with SocketHandle.
	Use disableReadCheckSocket.
	* src/PeerInitiateConnectionCommand.h
	(PeerInitiateConnectionCommand): Replaced Peer with PeerHandle.
	* src/PeerChokeCommand.cc
	(UploadFaster::operator()): Replaced Peer with PeerHandle.
	(DownloadFaster::operator()): Replaced Peer with PeerHandle.
	(execute): Use PeerHandle.
	* src/PeerConnection.h
	(HandshakeMessage.h): Removed include of HandshakeMessage.h.
	(socket): Changed its type to SocketHandle.
	(PeerConnection): Replaced Socket with SocketHandle.	
	* src/PeerConnection.cc
	(PeerConnection): Replaced Socket with SocketHandle.
	* src/PeerInteractionCommand.h
	(PeerInteractionCommand): Replaced socket with SocketHandle.
	Replaced Peer with PeerHandle.	
	* src/PeerInteractionCommand.cc
	(PeerInteractionCommand): Replaced Socket with SocketHandle.
	Replaced Peer with PeerHandle.
	(executeInternal): Use disableWriteCheckSocket.
	Use HandshakeMessageHandle.
	(receiveMessages): Use PeerMessageHandle.
	(prepareForNextPeer): Use PeerHandle.
	* src/HttpProxyRequestCommand.h
	(HttpProxyRequestCommand): Replaced Socket with SocketHandle.
	* src/HttpResponseCommand.cc
	(HttpResponseCommand): Replaced Socket with SocketHandle.
	* src/TorrentMan.cc
	(nullPeer): Added external reference.
	(~TorrentMan): Removed the deallocation of the elements of 
peers.
	(addPeer): Rewritten.
	(isPeerAvailable): Use nullPeer.
	(deleteOldpeers): Replaced with deleteErrorPeer.
	(deleteErrorPeer): New function.
	(getPeer): Use PeerHandle and nullPeer.
	(hasMissingPiece): Replaced Peer with PeerHandle.
	(getMissingPieceIndex): Replaced Peer with PeerHandle.
	(getMissingFastPieceIndex): Replaced Peer with PeerHandle.
	(getMissingFastPiece): Replaced Peer with PeerHandle.
	(getMissingPiece): Replaced Peer with PeerHandle.
	* src/FtpNegotiateCommand.cc
	(FtpNegotiationCommand): Replaced Peer with PeerHandle.
	(~FtpNegotiationCommand): Removed the deallocation of Sockets.
	(recvGreeting): Use disableWriteCheckSocket.
	(recvPasv): Removed the allocation of Socket.
	Use disableReadCheckSocket.
	(sendRestPasv): Use disableWriteCheckSocket.
	(recvRetr): Changed assertion.
	* src/PeerInteraction.h
	(SharedHandle.h): Included SharedHandle.h.
	(PeerMessageHandle): New type definition.
	(HandshakeMessageHandle): New type definition.
	(MessageQueue): Changed. Now its element is of type 
PeerMessageHandle.
	(peer): Changed its type to PeerHandle.
	(createHandshakeMessage): Replaced HandshakeMessage with
	HandshakeMessageHandle.
	(createPeerMessage): Replaced PeerMessageHandle with 
PeerMessage.
	(PeerInteraction): Replaced Peer with PeerHandle.
	Replaced Socket with SocketHandle.
	(addMessage): Replaced PeerMessage with PeerMessageHandle.
	(receiveMessage): Replaced PeerMessage with PeerMessageHandle.
	(receiveHandshake): Replaced HandshakeMessage with
	HandshakeMessageHandle.
	* src/PeerInteraction.cc
	(PeerInteraction): Replaced Peer with PeerHandle.
	Replaced Socket with SocketHandle.
	(~PeerInteraction): Removed the deallocation of the elements of
	messageQueue.
	(MsgPushBack::operator()): Replaced PeerMessage with 
PeerMessageHandle.
	(isSendingMessageInProgress): Replaced PeerMessage with
	PeerMessageHandle.
	(sendMessages): Use PeerMessageHandle. Removed try-catch block.
	(addMessage): Replaced PeerMessage with PeerMessageHandle.
	(rejectAllPieceMessageInQueue): Use PeerMessageHandle.
	(rejectPieceMessageInQueue): Use PeerMessageHandle.
	(abortPiece): Use PeerMessageHandle.
	(receiveHandshake): Replaced HandshakeMessage with
	HandshakeMessageHandle. Removed try-catch block.
	(createHandshakeMessage): Replaced HandshakeMessage with
	HandshakeMessageHandle.
	(receiveMessage): Replaced PeerMessage with PeerMessageHandle.
	Removed try-catch block.
	(createPeerMessage): Replaced PeerMessage with 
PeerMessageHandle.
	* src/HttpProxyResponseCommand.cc
	(HttpProxyRequestCommand): Replaced Socket with SocketHandle.
	* src/FtpTunnelResponseCommand.h
	(FtpTunnelResponseCommand): Replaced Socket with SocketHandle.
	* src/HttpConnection.cc
	(HttpConnection): Replaced Socket with SocketHandle.
	* src/PeerAbstractCommand.cc
	(PeerAbstractCommand): Replaced Socket with SocketHandle.
	(~PeerAbstractCommand): Removed the deallocation of socket.
	Use disableReadCheckSocket, disableWriteCheckSocket.
	(disableReadCheckSocket): New function.
	(setReadCheckSocket): Replaced Socket with SocketHandle.
	(disableWriteCheckSocket): New function.
	(setWriteCheckSocket): Replaced Socket with SocketHandle.
	* src/InitiateConnectionCommandFactory.h: Corrected indentation.
	* src/FtpTunnelRequestCommand.cc
	(FtpTunnelRequestCommand): Replaced Socket with SocketHandle.
	(~FtpTunnelRequestCommand): Corrected indentation.
	* src/DownloadCommand.h
	(DownloadCommand): Replaced Socket with SocketHandle.
	* src/PeerListenCommand.cc
	(PeerListenCommand): Removed the initialization of socket.
	(~PeerListenCommand): Removed the deallocation of socket.
	(bindPort): Use SocketHandle.
	(execute): Use SocketHandle and PeerHandle.
	* src/FtpDowndloadCommand.cc
	(FtpDownloadCommand): Replaced Socket with SocketHandle.
	(~FtpDownloadCommand): Removed the deallocation of ctrlSocket.
	* src/main.cc
	(main): Corrected indentation.
	* src/HttpInitiateConnectionCommand.cc
	(HttpInitiateConnectionCommand): Replaced Socket with 
SocketHandle.
	(executeInternal): Removed the allocation of socket.
	* src/HttpRequestCommand.h
	(HttpRequestCommand): Replaced Socket with SocketHandle.
	* src/FtpNegotiationCommand.h
	(dataSocket): Changed its type to SocketHandle.
	(serverSocket): Changed its type to SocketHandle.
	(FtpNegotiationCommand): Replaced Socket with SocketHandle.
	* src/TorrentMan.h
	(MAX_PEER_UPDATE): Removed.
	(MAX_PEERS): New definition.
	(Peers): The element is now of type PeerHandle.
	(addPeer): Replaced Peer with PeerHandle. Removed 'duplicate' 
argument.
	(getPeer): Replaced Peer with PeerHandle.
	(deleteOldErrorPeers): Removed.
	(deleteErrorPeer): New function.
	(hasMissingPiece): Replaced Peer with PeerHandle.
	(getMissingPieceIndex): Replaced Peer with PeerHandle.
	(getMissingPiece): Replaced Peer with PeerHandle.
	(getMissingFastPieceIndex): Replaced Peer with PeerHandle.
	(getMissingFastPiece): Replaced Peer with PeerHandle.
	(addActivePeer): Replaced Peer with PeerHandle.
	(deleteActivePeer): Replaced Peer with PeerHandle.
	Added a check for the return value of find.
	* src/FtpTunnelResponseCommand.cc
	(FtpTunnelResponseCommand): Replaced Socket with SocketHandle.
	* src/FtpInitiateConnectionCommand.cc
	(executeInternal): Removed the allocation of socket.
	* src/DownloadEngine.h
	(Sockets): An element is now of type SocketHandle.
	(SockCmdMap): A key is of type SocketHandle, a value is of type 
int.
	(CommandUuids): New type definition.
	(rsockets): Changed its type to SockCmdMap.
	(wsockets): Changed its type to SockCmdMap.
	(addSocket): Rewritten.
	(deleteSocket): Rewritten.
	(addSocketForReadCheck): Rewritten.
	(deleteSocketForReadCheck): Rewritten.
	(addSocketForWriteCheck): Rewritten.
	(deleteSocketForWriteCheck): Rewritten.
	(PairFind): New template class.
	* src/HttpDownloadCommand.h
	(HttpDownloadCommand): Replaced Socket with SocketHandle.
	* src/FtpConnection.cc
	(FtpConnection): Replaced Socket with SocketHandle.
	(sendPort): Removed the allocation of serverSocket. Removed 
try-catch
	block.
	* src/InitiateConnectionCommandFactory.cc
	(DlAbortEx.h): Included DlAbortEx.h.
	(createInitiateConnectionCommand): Throw exception if the 
protocol of
	requested URI is not supported.
	* src/Peer.cc
	(nullPeer): Changed its type to PeerHandle.
	(operator==): New function.
	(operator!=): New function.
	* src/Peer.h
	(SharedHandle.h): Included SharedHandle.h.
	(operator==): New function.
	(operator!=): New function.
	(Peer): Added the default constructor.
	Use resetStatus() to initialize member variables.
	(nullPeer): Removed.
	* src/TrackerUpdateCommand.cc
	(execute): Brushed up using SharedHandle. Replaced 
MAX_PEER_UPDATE
	with MIN_PEERS.
	* src/PeerListenCommand.h
	(socket): Changed its type to SocketHandle.
	* src/Command.h
	(CommandUuid): New type definition.
	(uuid): New variable.
	(uuidGen): New variable.
	(Command): Added the initialization of uuid.
	(getUuid): New function.
	* src/Socket.h
	(Socket): Removed.
	(SocketHandle): New type definition.
	* src/DownloadEngine.h
	(FindCommand): New function object.
	(run): The portion of socket check was rewritten.
	(SetDescriptor): New function object.
	(AccumulateActiveCommandUuid): New function object.
	(waitData): Rewritten.
	(addSocket): Rewritten.
	(deleteSocket): Rewritten.
	(addSocketForReadCheck): Rewritten.
	(addSocketForWriteCheck): Rewritten.
	(deleteSocketForReadCheck): Rewritten.
	(deleteSocketForWriteCheck): Rewritten.
	* src/HttpProxyResponseCommand.h
	(HttpProxyResponseCommand): Replaced Socket with SocketHandle.
	* src/HttpConnection.h
	(socket): Changed its type to SocketHandle.
	(HttpConnection): Replaced Socket with SocketHandle.
	* src/PeerInitiateConnectionCommand.cc
	(PeerInitiateConnectionCommand): Replaced Peer with PeerHandle.
	(executeInternal): Removed the allocation of socket.
	(prepareForNextPeer): Use PeerHandle.
	* src/PeerMessage.h
	(peer): Changed its type to PeerHandle.
	(getPeer): Replaced Peer with PeerHandle.
	(setPeer): Replaced Peer with PeerHandle.
	* src/DownloadCommand.cc
	(DownloadCommand): Replaced Socket with SocketHandle.
	* src/FtpConnection.h
	(socket): Changed its type to SocketHandle.
	(FtpConnection): Replaced Socket with SocketHandle.
	(sendPort); Replaced Socket with SocketHandle.
	* src/FtpDowndloadCommand.h
	(ctrlSocket): Changed its type to SocketHandle.
	(FtpDownloadCommand): Replaced Socket with SocketHandle.
	* src/HttpProxyRequestCommand.cc
	(HttpProxyRequestCommand): Replaced Socket with SocketHandle.
	* src/FtpTunnelRequestCommand.h
	(FtpTunnelRequestCommand): Replaced Socket with SocketHandle.
	
	etc
	
	* src/PeerChokeCommand.h
	(setAllPeerChoked): Removed.
	(setAllPeerResetDelta): Removed.
	* src/PeerChokeCommand.cc
	(setAllPeerChoked): Removed.
	(ChokePeer): New function object.
	(setAllPeerResetDelta): Removed.
	(ResetDelta): New function object.
	(orderByDownloadRate): Fixed a bug: use DowloadFaster, not 
UploadFaster
	(execute): Show download speed when the local node is a seeder.
	setAllPeerChoked and setAllPeerResetDelta were rewritten
	using STL.
	* src/TrackerWatcherCommand.h
	(MIN_PEERS): Removed.
	* src/TorrentMan.cc
	(getPeer): Replaced MAX_PEER_UPDATE with MIN_PEERS.
pull/1/head
Tatsuhiro Tsujikawa 2006-07-19 17:07:45 +00:00
parent f15d41dd63
commit 524d664850
62 changed files with 922 additions and 626 deletions

298
ChangeLog
View File

@ -1,3 +1,301 @@
2006-07-19 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>
* src/SharedHandle.h: New class.
To wrap Socket, Command, PeerMessage and Peer with SharedHandle:
* src/HttpResponseCommand.h
(HttpResponseCommand): Wrapped Socket.
* src/SocketCore.h
(operator==): New function.
(operator!=): New function.
(operator<): New function.
(getSockfd): New function.
(isOpen): New function.
(writeData): New function.
* src/SocketCore.cc
(operator==): New function.
(operator!=): New function.
(operator<): New function.
* src/AbstractCommand.h
(socket): Changed its type to SocketHandle.
(setReadCheckSocket): Replaced Socket with SocketHandle.
(setWriteCheckSocket): Replaced Socket with SocketHandle.
(disableReadCheckSocket): New function.
(disableWriteCheckSocket): New function.
(readCheckTarget): Changed its type to SocketHandle.
(writeCheckTarget): Changed its type to SocketHandle.
(AbstractCommand): Replaced Socket with SocketHandle.
* src/AbstractCommand.cc
(AbstractCommand): Replaced Socket with SocketHandle.
(~AbstractCommand): Removed the deallocation for Socket object.
(disableReadCheckSocket): New function.
(setReadCheckSocket): Replaced Socket with SocketHandle.
(disableWriteCheckSocket): New function.
(setWriteCheckSocket): Replaced Socket with SocketHandle.
* src/HttpDownloadCommand.cc
(DownloadCommand): Replaced Socket with SocketHandle.
* src/PeerAbstractCommand.h
(socket): Changed its type to SocketHandle.
(peer): Changed its type to PeerHandle.
(setReadCheckSocket): Replaced Socket with SocketHandle.
(setWriteCheckSocket): Replaced Socket with SocketHandle.
(disableReadCheckSocket): New function.
(disableWriteCheckSocket): New function.
(readCheckTarget): Changed its type to SocketHandle.
(writeCheckTarget): Changed its type to SocketHandle.
(PeerAbstractCommand): Replaced Socket with SocketHandle.
Replaced Peer with PeerHandle.
* src/HttpRequestCommand.cc
(HttpRequestCommand): Replaced Socket with SocketHandle.
Use disableReadCheckSocket.
* src/PeerInitiateConnectionCommand.h
(PeerInitiateConnectionCommand): Replaced Peer with PeerHandle.
* src/PeerChokeCommand.cc
(UploadFaster::operator()): Replaced Peer with PeerHandle.
(DownloadFaster::operator()): Replaced Peer with PeerHandle.
(execute): Use PeerHandle.
* src/PeerConnection.h
(HandshakeMessage.h): Removed include of HandshakeMessage.h.
(socket): Changed its type to SocketHandle.
(PeerConnection): Replaced Socket with SocketHandle.
* src/PeerConnection.cc
(PeerConnection): Replaced Socket with SocketHandle.
* src/PeerInteractionCommand.h
(PeerInteractionCommand): Replaced socket with SocketHandle.
Replaced Peer with PeerHandle.
* src/PeerInteractionCommand.cc
(PeerInteractionCommand): Replaced Socket with SocketHandle.
Replaced Peer with PeerHandle.
(executeInternal): Use disableWriteCheckSocket.
Use HandshakeMessageHandle.
(receiveMessages): Use PeerMessageHandle.
(prepareForNextPeer): Use PeerHandle.
* src/HttpProxyRequestCommand.h
(HttpProxyRequestCommand): Replaced Socket with SocketHandle.
* src/HttpResponseCommand.cc
(HttpResponseCommand): Replaced Socket with SocketHandle.
* src/TorrentMan.cc
(nullPeer): Added external reference.
(~TorrentMan): Removed the deallocation of the elements of peers.
(addPeer): Rewritten.
(isPeerAvailable): Use nullPeer.
(deleteOldpeers): Replaced with deleteErrorPeer.
(deleteErrorPeer): New function.
(getPeer): Use PeerHandle and nullPeer.
(hasMissingPiece): Replaced Peer with PeerHandle.
(getMissingPieceIndex): Replaced Peer with PeerHandle.
(getMissingFastPieceIndex): Replaced Peer with PeerHandle.
(getMissingFastPiece): Replaced Peer with PeerHandle.
(getMissingPiece): Replaced Peer with PeerHandle.
* src/FtpNegotiateCommand.cc
(FtpNegotiationCommand): Replaced Peer with PeerHandle.
(~FtpNegotiationCommand): Removed the deallocation of Sockets.
(recvGreeting): Use disableWriteCheckSocket.
(recvPasv): Removed the allocation of Socket.
Use disableReadCheckSocket.
(sendRestPasv): Use disableWriteCheckSocket.
(recvRetr): Changed assertion.
* src/PeerInteraction.h
(SharedHandle.h): Included SharedHandle.h.
(PeerMessageHandle): New type definition.
(HandshakeMessageHandle): New type definition.
(MessageQueue): Changed. Now its element is of type PeerMessageHandle.
(peer): Changed its type to PeerHandle.
(createHandshakeMessage): Replaced HandshakeMessage with
HandshakeMessageHandle.
(createPeerMessage): Replaced PeerMessageHandle with PeerMessage.
(PeerInteraction): Replaced Peer with PeerHandle.
Replaced Socket with SocketHandle.
(addMessage): Replaced PeerMessage with PeerMessageHandle.
(receiveMessage): Replaced PeerMessage with PeerMessageHandle.
(receiveHandshake): Replaced HandshakeMessage with
HandshakeMessageHandle.
* src/PeerInteraction.cc
(PeerInteraction): Replaced Peer with PeerHandle.
Replaced Socket with SocketHandle.
(~PeerInteraction): Removed the deallocation of the elements of
messageQueue.
(MsgPushBack::operator()): Replaced PeerMessage with PeerMessageHandle.
(isSendingMessageInProgress): Replaced PeerMessage with
PeerMessageHandle.
(sendMessages): Use PeerMessageHandle. Removed try-catch block.
(addMessage): Replaced PeerMessage with PeerMessageHandle.
(rejectAllPieceMessageInQueue): Use PeerMessageHandle.
(rejectPieceMessageInQueue): Use PeerMessageHandle.
(abortPiece): Use PeerMessageHandle.
(receiveHandshake): Replaced HandshakeMessage with
HandshakeMessageHandle. Removed try-catch block.
(createHandshakeMessage): Replaced HandshakeMessage with
HandshakeMessageHandle.
(receiveMessage): Replaced PeerMessage with PeerMessageHandle.
Removed try-catch block.
(createPeerMessage): Replaced PeerMessage with PeerMessageHandle.
* src/HttpProxyResponseCommand.cc
(HttpProxyRequestCommand): Replaced Socket with SocketHandle.
* src/FtpTunnelResponseCommand.h
(FtpTunnelResponseCommand): Replaced Socket with SocketHandle.
* src/HttpConnection.cc
(HttpConnection): Replaced Socket with SocketHandle.
* src/PeerAbstractCommand.cc
(PeerAbstractCommand): Replaced Socket with SocketHandle.
(~PeerAbstractCommand): Removed the deallocation of socket.
Use disableReadCheckSocket, disableWriteCheckSocket.
(disableReadCheckSocket): New function.
(setReadCheckSocket): Replaced Socket with SocketHandle.
(disableWriteCheckSocket): New function.
(setWriteCheckSocket): Replaced Socket with SocketHandle.
* src/InitiateConnectionCommandFactory.h: Corrected indentation.
* src/FtpTunnelRequestCommand.cc
(FtpTunnelRequestCommand): Replaced Socket with SocketHandle.
(~FtpTunnelRequestCommand): Corrected indentation.
* src/DownloadCommand.h
(DownloadCommand): Replaced Socket with SocketHandle.
* src/PeerListenCommand.cc
(PeerListenCommand): Removed the initialization of socket.
(~PeerListenCommand): Removed the deallocation of socket.
(bindPort): Use SocketHandle.
(execute): Use SocketHandle and PeerHandle.
* src/FtpDowndloadCommand.cc
(FtpDownloadCommand): Replaced Socket with SocketHandle.
(~FtpDownloadCommand): Removed the deallocation of ctrlSocket.
* src/main.cc
(main): Corrected indentation.
* src/HttpInitiateConnectionCommand.cc
(HttpInitiateConnectionCommand): Replaced Socket with SocketHandle.
(executeInternal): Removed the allocation of socket.
* src/HttpRequestCommand.h
(HttpRequestCommand): Replaced Socket with SocketHandle.
* src/FtpNegotiationCommand.h
(dataSocket): Changed its type to SocketHandle.
(serverSocket): Changed its type to SocketHandle.
(FtpNegotiationCommand): Replaced Socket with SocketHandle.
* src/TorrentMan.h
(MAX_PEER_UPDATE): Removed.
(MAX_PEERS): New definition.
(Peers): The element is now of type PeerHandle.
(addPeer): Replaced Peer with PeerHandle. Removed 'duplicate' argument.
(getPeer): Replaced Peer with PeerHandle.
(deleteOldErrorPeers): Removed.
(deleteErrorPeer): New function.
(hasMissingPiece): Replaced Peer with PeerHandle.
(getMissingPieceIndex): Replaced Peer with PeerHandle.
(getMissingPiece): Replaced Peer with PeerHandle.
(getMissingFastPieceIndex): Replaced Peer with PeerHandle.
(getMissingFastPiece): Replaced Peer with PeerHandle.
(addActivePeer): Replaced Peer with PeerHandle.
(deleteActivePeer): Replaced Peer with PeerHandle.
Added a check for the return value of find.
* src/FtpTunnelResponseCommand.cc
(FtpTunnelResponseCommand): Replaced Socket with SocketHandle.
* src/FtpInitiateConnectionCommand.cc
(executeInternal): Removed the allocation of socket.
* src/DownloadEngine.h
(Sockets): An element is now of type SocketHandle.
(SockCmdMap): A key is of type SocketHandle, a value is of type int.
(CommandUuids): New type definition.
(rsockets): Changed its type to SockCmdMap.
(wsockets): Changed its type to SockCmdMap.
(addSocket): Rewritten.
(deleteSocket): Rewritten.
(addSocketForReadCheck): Rewritten.
(deleteSocketForReadCheck): Rewritten.
(addSocketForWriteCheck): Rewritten.
(deleteSocketForWriteCheck): Rewritten.
(PairFind): New template class.
* src/HttpDownloadCommand.h
(HttpDownloadCommand): Replaced Socket with SocketHandle.
* src/FtpConnection.cc
(FtpConnection): Replaced Socket with SocketHandle.
(sendPort): Removed the allocation of serverSocket. Removed try-catch
block.
* src/InitiateConnectionCommandFactory.cc
(DlAbortEx.h): Included DlAbortEx.h.
(createInitiateConnectionCommand): Throw exception if the protocol of
requested URI is not supported.
* src/Peer.cc
(nullPeer): Changed its type to PeerHandle.
(operator==): New function.
(operator!=): New function.
* src/Peer.h
(SharedHandle.h): Included SharedHandle.h.
(operator==): New function.
(operator!=): New function.
(Peer): Added the default constructor.
Use resetStatus() to initialize member variables.
(nullPeer): Removed.
* src/TrackerUpdateCommand.cc
(execute): Brushed up using SharedHandle. Replaced MAX_PEER_UPDATE
with MIN_PEERS.
* src/PeerListenCommand.h
(socket): Changed its type to SocketHandle.
* src/Command.h
(CommandUuid): New type definition.
(uuid): New variable.
(uuidGen): New variable.
(Command): Added the initialization of uuid.
(getUuid): New function.
* src/Socket.h
(Socket): Removed.
(SocketHandle): New type definition.
* src/DownloadEngine.h
(FindCommand): New function object.
(run): The portion of socket check was rewritten.
(SetDescriptor): New function object.
(AccumulateActiveCommandUuid): New function object.
(waitData): Rewritten.
(addSocket): Rewritten.
(deleteSocket): Rewritten.
(addSocketForReadCheck): Rewritten.
(addSocketForWriteCheck): Rewritten.
(deleteSocketForReadCheck): Rewritten.
(deleteSocketForWriteCheck): Rewritten.
* src/HttpProxyResponseCommand.h
(HttpProxyResponseCommand): Replaced Socket with SocketHandle.
* src/HttpConnection.h
(socket): Changed its type to SocketHandle.
(HttpConnection): Replaced Socket with SocketHandle.
* src/PeerInitiateConnectionCommand.cc
(PeerInitiateConnectionCommand): Replaced Peer with PeerHandle.
(executeInternal): Removed the allocation of socket.
(prepareForNextPeer): Use PeerHandle.
* src/PeerMessage.h
(peer): Changed its type to PeerHandle.
(getPeer): Replaced Peer with PeerHandle.
(setPeer): Replaced Peer with PeerHandle.
* src/DownloadCommand.cc
(DownloadCommand): Replaced Socket with SocketHandle.
* src/FtpConnection.h
(socket): Changed its type to SocketHandle.
(FtpConnection): Replaced Socket with SocketHandle.
(sendPort); Replaced Socket with SocketHandle.
* src/FtpDowndloadCommand.h
(ctrlSocket): Changed its type to SocketHandle.
(FtpDownloadCommand): Replaced Socket with SocketHandle.
* src/HttpProxyRequestCommand.cc
(HttpProxyRequestCommand): Replaced Socket with SocketHandle.
* src/FtpTunnelRequestCommand.h
(FtpTunnelRequestCommand): Replaced Socket with SocketHandle.
etc
* src/PeerChokeCommand.h
(setAllPeerChoked): Removed.
(setAllPeerResetDelta): Removed.
* src/PeerChokeCommand.cc
(setAllPeerChoked): Removed.
(ChokePeer): New function object.
(setAllPeerResetDelta): Removed.
(ResetDelta): New function object.
(orderByDownloadRate): Fixed a bug: use DowloadFaster, not UploadFaster
(execute): Show download speed when the local node is a seeder.
setAllPeerChoked and setAllPeerResetDelta were rewritten
using STL.
* src/TrackerWatcherCommand.h
(MIN_PEERS): Removed.
* src/TorrentMan.cc
(getPeer): Replaced MAX_PEER_UPDATE with MIN_PEERS.
2006-07-07 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>
To fix the bug that .aria2 file is not saved if downloading is stopped

1
TODO
View File

@ -11,3 +11,4 @@
* Refacturing HttpConnection and FtpConnection
* Query resource by location
* Log version
* List available os, version, etc for metalink

View File

@ -28,24 +28,18 @@
#include "SleepCommand.h"
#include "prefs.h"
AbstractCommand::AbstractCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s):
Command(cuid), req(req), e(e), checkSocketIsReadable(false), checkSocketIsWritable(false) {
if(s != NULL) {
socket = new Socket(*s);
setReadCheckSocket(socket);
} else {
socket = NULL;
}
AbstractCommand::AbstractCommand(int cuid, Request* req, DownloadEngine* e,
const SocketHandle& s):
Command(cuid), req(req), e(e), socket(s),
checkSocketIsReadable(false), checkSocketIsWritable(false) {
setReadCheckSocket(socket);
timeout = this->e->option->getAsInt(PREF_TIMEOUT);
}
AbstractCommand::~AbstractCommand() {
setReadCheckSocket(NULL);
setWriteCheckSocket(NULL);
if(socket != NULL) {
delete(socket);
}
disableReadCheckSocket();
disableWriteCheckSocket();
}
bool AbstractCommand::execute() {
@ -124,44 +118,52 @@ void AbstractCommand::onAbort(Exception* ex) {
e->segmentMan->unregisterId(cuid);
}
void AbstractCommand::setReadCheckSocket(Socket* socket) {
if(socket == NULL) {
if(checkSocketIsReadable) {
e->deleteSocketForReadCheck(readCheckTarget);
checkSocketIsReadable = false;
readCheckTarget = NULL;
}
void AbstractCommand::disableReadCheckSocket() {
if(checkSocketIsReadable) {
e->deleteSocketForReadCheck(readCheckTarget, getUuid());
checkSocketIsReadable = false;
readCheckTarget = SocketHandle();
}
}
void AbstractCommand::setReadCheckSocket(const SocketHandle& socket) {
if(!socket->isOpen()) {
disableReadCheckSocket();
} else {
if(checkSocketIsReadable) {
if(readCheckTarget != socket) {
e->deleteSocketForReadCheck(readCheckTarget);
e->addSocketForReadCheck(socket, this);
e->deleteSocketForReadCheck(readCheckTarget, getUuid());
e->addSocketForReadCheck(socket, getUuid());
readCheckTarget = socket;
}
} else {
e->addSocketForReadCheck(socket, this);
e->addSocketForReadCheck(socket, getUuid());
checkSocketIsReadable = true;
readCheckTarget = socket;
}
}
}
void AbstractCommand::setWriteCheckSocket(Socket* socket) {
if(socket == NULL) {
if(checkSocketIsWritable) {
e->deleteSocketForWriteCheck(writeCheckTarget);
checkSocketIsWritable = false;
writeCheckTarget = NULL;
}
void AbstractCommand::disableWriteCheckSocket() {
if(checkSocketIsWritable) {
e->deleteSocketForWriteCheck(writeCheckTarget, getUuid());
checkSocketIsWritable = false;
writeCheckTarget = SocketHandle();
}
}
void AbstractCommand::setWriteCheckSocket(const SocketHandle& socket) {
if(!socket->isOpen()) {
disableWriteCheckSocket();
} else {
if(checkSocketIsWritable) {
if(writeCheckTarget != socket) {
e->deleteSocketForWriteCheck(writeCheckTarget);
e->addSocketForWriteCheck(socket, this);
e->deleteSocketForWriteCheck(writeCheckTarget, getUuid());
e->addSocketForWriteCheck(socket, getUuid());
writeCheckTarget = socket;
}
} else {
e->addSocketForWriteCheck(socket, this);
e->addSocketForWriteCheck(socket, getUuid());
checkSocketIsWritable = true;
writeCheckTarget = socket;
}

View File

@ -35,23 +35,25 @@ private:
protected:
Request* req;
DownloadEngine* e;
Socket* socket;
SocketHandle socket;
void tryReserved();
virtual bool prepareForRetry(int wait);
virtual void onAbort(Exception* ex);
virtual bool executeInternal(Segment segment) = 0;
void setReadCheckSocket(Socket* socket);
void setWriteCheckSocket(Socket* socket);
void setReadCheckSocket(const SocketHandle& socket);
void setWriteCheckSocket(const SocketHandle& socket);
void disableReadCheckSocket();
void disableWriteCheckSocket();
void setTimeout(int timeout) { this->timeout = timeout; }
private:
bool checkSocketIsReadable;
bool checkSocketIsWritable;
Socket* readCheckTarget;
Socket* writeCheckTarget;
SocketHandle readCheckTarget;
SocketHandle writeCheckTarget;
public:
AbstractCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s= NULL);
AbstractCommand(int cuid, Request* req, DownloadEngine* e, const SocketHandle& s = SocketHandle());
virtual ~AbstractCommand();
bool execute();
};

View File

@ -25,18 +25,24 @@
#include "common.h"
#include "LogFactory.h"
typedef int CommandUuid;
class Command {
private:
CommandUuid uuid;
static int uuidGen;
protected:
int cuid;
const Logger* logger;
public:
Command(int cuid):cuid(cuid) {
Command(int cuid):uuid(uuidGen++), cuid(cuid) {
logger = LogFactory::getInstance();
}
virtual ~Command() {}
virtual bool execute() = 0;
int getCuid() const { return cuid; }
const CommandUuid& getUuid() const { return uuid; }
};
#endif // _D_COMMAND_H_

View File

@ -27,7 +27,10 @@
#include "InitiateConnectionCommandFactory.h"
#include "message.h"
DownloadCommand::DownloadCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s):AbstractCommand(cuid, req, e, s), lastSize(0) {}
DownloadCommand::DownloadCommand(int cuid, Request* req, DownloadEngine* e,
const SocketHandle& s):
AbstractCommand(cuid, req, e, s), lastSize(0) {
}
DownloadCommand::~DownloadCommand() {}

View File

@ -38,7 +38,8 @@ protected:
bool prepareForRetry(int wait);
bool prepareForNextSegment();
public:
DownloadCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s);
DownloadCommand(int cuid, Request* req, DownloadEngine* e,
const SocketHandle& s);
virtual ~DownloadCommand();
virtual TransferEncoding* getTransferEncoding(const string& transferEncoding) = 0;

View File

@ -47,10 +47,26 @@ void DownloadEngine::cleanQueue() {
commands.clear();
}
class FindCommand {
private:
CommandUuid uuid;
public:
FindCommand(const CommandUuid& uuid):uuid(uuid) {}
bool operator()(const Command* command) {
if(command->getUuid() == uuid) {
return true;
} else {
return false;
}
}
};
void DownloadEngine::run() {
initStatistics();
Time cp;
Sockets activeSockets;
CommandUuids activeCommandUuids;
while(!commands.empty()) {
if(cp.elapsed(1)) {
cp.reset();
@ -59,27 +75,26 @@ void DownloadEngine::run() {
Command* com = commands.front();
commands.pop_front();
if(com->execute()) {
delete(com);
delete com;
}
}
} else {
for(Sockets::iterator itr = activeSockets.begin();
itr != activeSockets.end(); itr++) {
Socket* socket = *itr;
SockCmdMap::iterator mapItr = sockCmdMap.find(socket);
if(mapItr != sockCmdMap.end()) {
Command* com = (*mapItr).second;
commands.erase(remove(commands.begin(), commands.end(), com));
if(com->execute()) {
delete(com);
}
for(CommandUuids::iterator itr = activeCommandUuids.begin();
itr != activeCommandUuids.end(); itr++) {
Commands::iterator comItr = find_if(commands.begin(), commands.end(),
FindCommand(*itr));
assert(comItr != commands.end());
Command* com = *comItr;
commands.erase(comItr);
if(com->execute()) {
delete com;
}
}
}
afterEachIteration();
activeSockets.clear();
activeCommandUuids.clear();
if(!noWait && !commands.empty()) {
waitData(activeSockets);
waitData(activeCommandUuids);
}
noWait = false;
calculateStatistics();
@ -96,7 +111,40 @@ void DownloadEngine::shortSleep() const {
select(0, &rfds, NULL, NULL, &tv);
}
void DownloadEngine::waitData(Sockets& activeSockets) {
class SetDescriptor {
private:
fd_set* fds_ptr;
int* max_ptr;
public:
SetDescriptor(int* max_ptr, fd_set* fds_ptr)
:fds_ptr(fds_ptr), max_ptr(max_ptr) {}
void operator()(const pair<SocketHandle, CommandUuid>& pa) {
int fd = pa.first->getSockfd();
FD_SET(fd, fds_ptr);
if(*max_ptr < fd) {
*max_ptr = fd;
}
}
};
class AccumulateActiveCommandUuid {
private:
CommandUuids* activeCommandUuids_ptr;
fd_set* fds_ptr;
public:
AccumulateActiveCommandUuid(CommandUuids* activeCommandUuids_ptr,
fd_set* fds_ptr)
:activeCommandUuids_ptr(activeCommandUuids_ptr), fds_ptr(fds_ptr) {}
void operator()(const pair<SocketHandle, CommandUuid>& pa) {
if(FD_ISSET(pa.first->getSockfd(), fds_ptr)) {
activeCommandUuids_ptr->push_back(pa.second);
}
}
};
void DownloadEngine::waitData(CommandUuids& activeCommandUuids) {
fd_set rfds;
fd_set wfds;
int retval = 0;
@ -106,18 +154,9 @@ void DownloadEngine::waitData(Sockets& activeSockets) {
FD_ZERO(&rfds);
FD_ZERO(&wfds);
int max = 0;
for(Sockets::iterator itr = rsockets.begin(); itr != rsockets.end(); itr++) {
FD_SET((*itr)->getSockfd(), &rfds);
if(max < (*itr)->getSockfd()) {
max = (*itr)->getSockfd();
}
}
for(Sockets::iterator itr = wsockets.begin(); itr != wsockets.end(); itr++) {
FD_SET((*itr)->getSockfd(), &wfds);
if(max < (*itr)->getSockfd()) {
max = (*itr)->getSockfd();
}
}
for_each(rsockmap.begin(), rsockmap.end(), SetDescriptor(&max, &rfds));
for_each(wsockmap.begin(), wsockmap.end(), SetDescriptor(&max, &wfds));
tv.tv_sec = 1;
tv.tv_usec = 0;
retval = select(max+1, &rfds, &wfds, NULL, &tv);
@ -126,64 +165,60 @@ void DownloadEngine::waitData(Sockets& activeSockets) {
}
}
if(retval > 0) {
for(Sockets::iterator itr = rsockets.begin(); itr != rsockets.end(); itr++) {
if(FD_ISSET((*itr)->getSockfd(), &rfds)) {
activeSockets.push_back(*itr);
}
}
for(Sockets::iterator itr = wsockets.begin(); itr != wsockets.end(); itr++) {
if(FD_ISSET((*itr)->getSockfd(), &wfds)) {
activeSockets.push_back(*itr);
}
}
sort(activeSockets.begin(), activeSockets.end());
activeSockets.erase(unique(activeSockets.begin(), activeSockets.end()), activeSockets.end());
for_each(rsockmap.begin(), rsockmap.end(),
AccumulateActiveCommandUuid(&activeCommandUuids, &rfds));
for_each(wsockmap.begin(), wsockmap.end(),
AccumulateActiveCommandUuid(&activeCommandUuids, &wfds));
sort(activeCommandUuids.begin(), activeCommandUuids.end());
activeCommandUuids.erase(unique(activeCommandUuids.begin(),
activeCommandUuids.end()),
activeCommandUuids.end());
}
}
bool DownloadEngine::addSocket(Sockets& sockets, Socket* socket, Command* command) {
Sockets::iterator itr = find(sockets.begin(),
sockets.end(),
socket);
if(itr == sockets.end()) {
sockets.push_back(socket);
SockCmdMap::value_type vt(socket, command);
sockCmdMap.insert(vt);
bool DownloadEngine::addSocket(SockCmdMap& sockmap,
const SocketHandle& socket,
const CommandUuid& commandUuid) {
SockCmdMap::iterator itr = find_if(sockmap.begin(), sockmap.end(),
PairFind<SocketHandle, CommandUuid>(socket, commandUuid));
if(itr == sockmap.end()) {
SockCmdMap::value_type vt(socket, commandUuid);
sockmap.insert(vt);
return true;
} else {
return false;
}
}
bool DownloadEngine::deleteSocket(Sockets& sockets, Socket* socket) {
Sockets::iterator itr = find(sockets.begin(),
sockets.end(),
socket);
if(itr != sockets.end()) {
sockets.erase(itr);
SockCmdMap::iterator mapItr = sockCmdMap.find(socket);
if(mapItr != sockCmdMap.end()) {
sockCmdMap.erase(mapItr);
}
return true;
} else {
bool DownloadEngine::deleteSocket(SockCmdMap& sockmap,
const SocketHandle& socket,
const CommandUuid& commandUuid) {
SockCmdMap::iterator itr = find_if(sockmap.begin(), sockmap.end(),
PairFind<SocketHandle, CommandUuid>(socket, commandUuid));
if(itr == sockmap.end()) {
return false;
} else {
sockmap.erase(itr);
return true;
}
}
bool DownloadEngine::addSocketForReadCheck(Socket* socket, Command* command) {
return addSocket(rsockets, socket, command);
bool DownloadEngine::addSocketForReadCheck(const SocketHandle& socket,
const CommandUuid& commandUuid) {
return addSocket(rsockmap, socket, commandUuid);
}
bool DownloadEngine::deleteSocketForReadCheck(Socket* socket) {
return deleteSocket(rsockets , socket);
bool DownloadEngine::deleteSocketForReadCheck(const SocketHandle& socket,
const CommandUuid& commandUuid) {
return deleteSocket(rsockmap, socket, commandUuid);
}
bool DownloadEngine::addSocketForWriteCheck(Socket* socket, Command* command) {
return addSocket(wsockets, socket, command);
bool DownloadEngine::addSocketForWriteCheck(const SocketHandle& socket,
const CommandUuid& commandUuid) {
return addSocket(wsockmap, socket, commandUuid);
}
bool DownloadEngine::deleteSocketForWriteCheck(Socket* socket) {
return deleteSocket(wsockets, socket);
bool DownloadEngine::deleteSocketForWriteCheck(const SocketHandle& socket,
const CommandUuid& commandUuid) {
return deleteSocket(wsockmap, socket, commandUuid);
}

View File

@ -29,20 +29,22 @@
#include "Logger.h"
#include "Option.h"
typedef deque<Socket*> Sockets;
typedef deque<SocketHandle> Sockets;
typedef deque<Command*> Commands;
typedef multimap<Socket*, Command*> SockCmdMap;
typedef deque<CommandUuid> CommandUuids;
typedef multimap<SocketHandle, int> SockCmdMap;
class DownloadEngine {
private:
void waitData(Sockets& activeSockets);
Sockets rsockets;
Sockets wsockets;
SockCmdMap sockCmdMap;
void waitData(CommandUuids& activeCommandUuids);
SockCmdMap rsockmap;
SockCmdMap wsockmap;
void shortSleep() const;
bool addSocket(Sockets& sockets, Socket* socket, Command* command);
bool deleteSocket(Sockets& sockets, Socket* socket);
bool addSocket(SockCmdMap& sockmap, const SocketHandle& socket,
const CommandUuid& commandUuid);
bool deleteSocket(SockCmdMap& sockmap, const SocketHandle& socket,
const CommandUuid& commandUuid);
protected:
const Logger* logger;
virtual void initStatistics() = 0;
@ -62,12 +64,33 @@ public:
void cleanQueue();
bool addSocketForReadCheck(Socket* socket, Command* command);
bool deleteSocketForReadCheck(Socket* socket);
bool addSocketForWriteCheck(Socket* socket, Command* command);
bool deleteSocketForWriteCheck(Socket* socket);
bool addSocketForReadCheck(const SocketHandle& socket,
const CommandUuid& commandUuid);
bool deleteSocketForReadCheck(const SocketHandle& socket,
const CommandUuid& commandUuid);
bool addSocketForWriteCheck(const SocketHandle& socket,
const CommandUuid& commandUuid);
bool deleteSocketForWriteCheck(const SocketHandle& socket,
const CommandUuid& command);
};
template<class T1, class T2>
class PairFind {
private:
T1 first;
T2 second;
public:
PairFind(T1 t1, T2 t2):first(t1), second(t2) {}
bool operator()(const pair<T1, T2>& pa) {
if(pa.first == first && pa.second == second) {
return true;
} else {
return false;
}
}
};
#endif // _D_DOWNLOAD_ENGINE_H_

View File

@ -27,7 +27,9 @@
#include "prefs.h"
#include "LogFactory.h"
FtpConnection::FtpConnection(int cuid, const Socket* socket, const Request* req, const Option* op):cuid(cuid), socket(socket), req(req), option(op) {
FtpConnection::FtpConnection(int cuid, const SocketHandle& socket,
const Request* req, const Option* op)
:cuid(cuid), socket(socket), req(req), option(op) {
logger = LogFactory::getInstance();
}
@ -75,27 +77,22 @@ void FtpConnection::sendPasv() const {
socket->writeData(request);
}
Socket* FtpConnection::sendPort() const {
Socket* serverSocket = new Socket();
try {
serverSocket->beginListen();
pair<string, int> addrinfo;
socket->getAddrInfo(addrinfo);
int ipaddr[4];
sscanf(addrinfo.first.c_str(), "%d.%d.%d.%d",
&ipaddr[0], &ipaddr[1], &ipaddr[2], &ipaddr[3]);
serverSocket->getAddrInfo(addrinfo);
string request = "PORT "+
Util::itos(ipaddr[0])+","+Util::itos(ipaddr[1])+","+
Util::itos(ipaddr[2])+","+Util::itos(ipaddr[3])+","+
Util::itos(addrinfo.second/256)+","+Util::itos(addrinfo.second%256)+"\r\n";
logger->info(MSG_SENDING_REQUEST, cuid, request.c_str());
socket->writeData(request);
} catch (Exception* ex) {
delete serverSocket;
throw;
}
SocketHandle FtpConnection::sendPort() const {
SocketHandle serverSocket;
serverSocket->beginListen();
pair<string, int> addrinfo;
socket->getAddrInfo(addrinfo);
int ipaddr[4];
sscanf(addrinfo.first.c_str(), "%d.%d.%d.%d",
&ipaddr[0], &ipaddr[1], &ipaddr[2], &ipaddr[3]);
serverSocket->getAddrInfo(addrinfo);
string request = "PORT "+
Util::itos(ipaddr[0])+","+Util::itos(ipaddr[1])+","+
Util::itos(ipaddr[2])+","+Util::itos(ipaddr[3])+","+
Util::itos(addrinfo.second/256)+","+Util::itos(addrinfo.second%256)+"\r\n";
logger->info(MSG_SENDING_REQUEST, cuid, request.c_str());
socket->writeData(request);
return serverSocket;
}

View File

@ -35,7 +35,7 @@ using namespace std;
class FtpConnection {
private:
int cuid;
const Socket* socket;
SocketHandle socket;
const Request* req;
const Option* option;
const Logger* logger;
@ -46,7 +46,8 @@ private:
bool isEndOfResponse(int status, const string& response) const;
bool bulkReceiveResponse(pair<int, string>& response);
public:
FtpConnection(int cuid, const Socket* socket, const Request* req, const Option* op);
FtpConnection(int cuid, const SocketHandle& socket,
const Request* req, const Option* op);
~FtpConnection();
void sendUser() const;
void sendPass() const;
@ -54,7 +55,7 @@ public:
void sendCwd() const;
void sendSize() const;
void sendPasv() const;
Socket* sendPort() const;
SocketHandle sendPort() const;
void sendRest(const Segment& segment) const;
void sendRetr() const;

View File

@ -21,17 +21,13 @@
/* copyright --> */
#include "FtpDownloadCommand.h"
FtpDownloadCommand::FtpDownloadCommand(int cuid, Request* req, DownloadEngine* e, const Socket* dataSocket, const Socket* ctrlSocket):
DownloadCommand(cuid, req, e, dataSocket)
{
this->ctrlSocket = new Socket(*ctrlSocket);
}
FtpDownloadCommand::FtpDownloadCommand(int cuid, Request* req,
DownloadEngine* e,
const SocketHandle& dataSocket,
const SocketHandle& ctrlSocket)
:DownloadCommand(cuid, req, e, dataSocket), ctrlSocket(ctrlSocket) {}
FtpDownloadCommand::~FtpDownloadCommand() {
if(ctrlSocket != NULL) {
delete ctrlSocket;
}
}
FtpDownloadCommand::~FtpDownloadCommand() {}
TransferEncoding* FtpDownloadCommand::getTransferEncoding(const string& name) {
return NULL;

View File

@ -26,9 +26,11 @@
class FtpDownloadCommand : public DownloadCommand {
private:
Socket* ctrlSocket;
SocketHandle ctrlSocket;
public:
FtpDownloadCommand(int cuid, Request* req, DownloadEngine* e, const Socket* dataSocket, const Socket* ctrlSocket);
FtpDownloadCommand(int cuid, Request* req, DownloadEngine* e,
const SocketHandle& dataSocket,
const SocketHandle& ctrlSocket);
~FtpDownloadCommand();
TransferEncoding* getTransferEncoding(const string& name);

View File

@ -44,7 +44,6 @@ bool FtpInitiateConnectionCommand::executeInternal(Segment segment) {
}
}
socket = new Socket();
Command* command;
if(useHttpProxy()) {
logger->info(MSG_CONNECTING_TO_SERVER, cuid,

View File

@ -26,22 +26,17 @@
#include "message.h"
#include "prefs.h"
FtpNegotiationCommand::FtpNegotiationCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s):
AbstractCommand(cuid, req, e, s),
dataSocket(NULL), serverSocket(NULL), sequence(SEQ_RECV_GREETING)
FtpNegotiationCommand::FtpNegotiationCommand(int cuid, Request* req,
DownloadEngine* e,
const SocketHandle& s):
AbstractCommand(cuid, req, e, s), sequence(SEQ_RECV_GREETING)
{
ftp = new FtpConnection(cuid, socket, req, e->option);
setReadCheckSocket(NULL);
disableReadCheckSocket();
setWriteCheckSocket(socket);
}
FtpNegotiationCommand::~FtpNegotiationCommand() {
if(dataSocket != NULL) {
delete dataSocket;
}
if(serverSocket != NULL) {
delete serverSocket;
}
delete ftp;
}
@ -50,7 +45,8 @@ bool FtpNegotiationCommand::executeInternal(Segment segment) {
if(sequence == SEQ_RETRY) {
return prepareForRetry(0);
} else if(sequence == SEQ_NEGOTIATION_COMPLETED) {
FtpDownloadCommand* command = new FtpDownloadCommand(cuid, req, e, dataSocket, socket);
FtpDownloadCommand* command =
new FtpDownloadCommand(cuid, req, e, dataSocket, socket);
e->commands.push_back(command);
return true;
} else {
@ -71,7 +67,7 @@ bool FtpNegotiationCommand::recvGreeting() {
sequence = SEQ_SEND_USER;
setReadCheckSocket(socket);
setWriteCheckSocket(NULL);
disableWriteCheckSocket();
return true;
}
@ -219,14 +215,12 @@ bool FtpNegotiationCommand::recvPasv() {
throw new DlRetryEx(EX_BAD_STATUS, status);
}
// make a data connection to the server.
dataSocket = new Socket();
logger->info(MSG_CONNECTING_TO_SERVER, cuid,
dest.first.c_str(),
dest.second);
dataSocket->establishConnection(dest.first, dest.second);
setReadCheckSocket(NULL);
disableReadCheckSocket();
setWriteCheckSocket(dataSocket);
sequence = SEQ_SEND_REST_PASV;
@ -236,7 +230,7 @@ bool FtpNegotiationCommand::recvPasv() {
bool FtpNegotiationCommand::sendRestPasv(const Segment& segment) {
dataSocket->setBlockingMode();
setReadCheckSocket(socket);
setWriteCheckSocket(NULL);
disableWriteCheckSocket();
return sendRest(segment);
}
@ -274,7 +268,7 @@ bool FtpNegotiationCommand::recvRetr() {
throw new DlRetryEx(EX_BAD_STATUS, status);
}
if(e->option->get(PREF_FTP_PASV_ENABLED) != V_TRUE) {
assert(serverSocket);
assert(serverSocket->getSockfd() != -1);
dataSocket = serverSocket->acceptConnection();
}
sequence = SEQ_NEGOTIATION_COMPLETED;

View File

@ -73,14 +73,15 @@ private:
bool recvRetr();
bool processSequence(const Segment& segment);
Socket* dataSocket;
Socket* serverSocket;
SocketHandle dataSocket;
SocketHandle serverSocket;
int sequence;
FtpConnection* ftp;
protected:
bool executeInternal(Segment segment);
public:
FtpNegotiationCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s);
FtpNegotiationCommand(int cuid, Request* req, DownloadEngine* e,
const SocketHandle& s);
~FtpNegotiationCommand();
};

View File

@ -23,9 +23,12 @@
#include "FtpTunnelResponseCommand.h"
#include "HttpConnection.h"
FtpTunnelRequestCommand::FtpTunnelRequestCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s):AbstractCommand(cuid, req, e, s) {
setReadCheckSocket(NULL);
setWriteCheckSocket(NULL);
FtpTunnelRequestCommand::FtpTunnelRequestCommand(int cuid, Request* req,
DownloadEngine* e,
const SocketHandle& s)
:AbstractCommand(cuid, req, e, s) {
disableReadCheckSocket();
disableWriteCheckSocket();
}
FtpTunnelRequestCommand::~FtpTunnelRequestCommand() {}
@ -35,7 +38,8 @@ bool FtpTunnelRequestCommand::executeInternal(Segment segment) {
HttpConnection httpConnection(cuid, socket, req, e->option);
httpConnection.sendProxyRequest();
FtpTunnelResponseCommand* command = new FtpTunnelResponseCommand(cuid, req, e, socket);
FtpTunnelResponseCommand* command
= new FtpTunnelResponseCommand(cuid, req, e, socket);
e->commands.push_back(command);
return true;
}

View File

@ -28,7 +28,8 @@ class FtpTunnelRequestCommand : public AbstractCommand {
protected:
bool executeInternal(Segment segment);
public:
FtpTunnelRequestCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s);
FtpTunnelRequestCommand(int cuid, Request* req, DownloadEngine* e,
const SocketHandle& s);
~FtpTunnelRequestCommand();
};

View File

@ -24,7 +24,10 @@
#include "DlRetryEx.h"
#include "message.h"
FtpTunnelResponseCommand::FtpTunnelResponseCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s):AbstractCommand(cuid, req, e, s) {
FtpTunnelResponseCommand::FtpTunnelResponseCommand(int cuid, Request* req,
DownloadEngine* e,
const SocketHandle& s)
:AbstractCommand(cuid, req, e, s) {
http = new HttpConnection(cuid, socket, req, e->option);
}
@ -43,7 +46,8 @@ bool FtpTunnelResponseCommand::executeInternal(Segment segment) {
if(status != 200) {
throw new DlRetryEx(EX_PROXY_CONNECTION_FAILED);
}
FtpNegotiationCommand* command = new FtpNegotiationCommand(cuid, req, e, socket);
FtpNegotiationCommand* command
= new FtpNegotiationCommand(cuid, req, e, socket);
e->commands.push_back(command);
return true;
}

View File

@ -31,7 +31,8 @@ private:
protected:
bool executeInternal(Segment segment);
public:
FtpTunnelResponseCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s);
FtpTunnelResponseCommand(int cuid, Request* req, DownloadEngine* e,
const SocketHandle& s);
~FtpTunnelResponseCommand();
};

View File

@ -27,7 +27,8 @@
#include "prefs.h"
#include "LogFactory.h"
HttpConnection::HttpConnection(int cuid, const Socket* socket, const Request* req, const Option* op):
HttpConnection::HttpConnection(int cuid, const SocketHandle& socket,
const Request* req, const Option* op):
cuid(cuid), socket(socket), req(req), option(op), headerBufLength(0) {
logger = LogFactory::getInstance();
}

View File

@ -46,14 +46,15 @@ private:
bool useProxyGet() const;
string getProxyAuthString() const;
int cuid;
const Socket* socket;
SocketHandle socket;
const Request* req;
const Option* option;
const Logger* logger;
char headerBuf[HEADERBUF_SIZE+1];
int headerBufLength;
public:
HttpConnection(int cuid, const Socket* socket, const Request* req, const Option* op);
HttpConnection(int cuid, const SocketHandle& socket, const Request* req,
const Option* op);
/**
* Sends Http request.

View File

@ -33,8 +33,8 @@ using namespace std;
HttpDownloadCommand::HttpDownloadCommand(int cuid, Request* req,
DownloadEngine* e,
const Socket* socket):
DownloadCommand(cuid, req, e, socket)
const SocketHandle& socket)
:DownloadCommand(cuid, req, e, socket)
{
ChunkedEncoding* ce = new ChunkedEncoding();
transferEncodings["chunked"] = ce;

View File

@ -37,7 +37,8 @@ class HttpDownloadCommand:public DownloadCommand {
private:
map<string, TransferEncoding*> transferEncodings;
public:
HttpDownloadCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s);
HttpDownloadCommand(int cuid, Request* req, DownloadEngine* e,
const SocketHandle& s);
~HttpDownloadCommand();
TransferEncoding* getTransferEncoding(const string& transferEncoding);

View File

@ -27,12 +27,13 @@
#include "message.h"
#include "prefs.h"
HttpInitiateConnectionCommand::HttpInitiateConnectionCommand(int cuid, Request* req, DownloadEngine* e):AbstractCommand(cuid, req, e) {}
HttpInitiateConnectionCommand::HttpInitiateConnectionCommand(int cuid,
Request* req,
DownloadEngine* e):AbstractCommand(cuid, req, e) {}
HttpInitiateConnectionCommand::~HttpInitiateConnectionCommand() {}
bool HttpInitiateConnectionCommand::executeInternal(Segment segment) {
socket = new Socket();
// socket->establishConnection(...);
Command* command;
if(useProxy()) {

View File

@ -23,8 +23,11 @@
#include "HttpConnection.h"
#include "HttpProxyResponseCommand.h"
HttpProxyRequestCommand::HttpProxyRequestCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s):AbstractCommand(cuid, req, e, s) {
setReadCheckSocket(NULL);
HttpProxyRequestCommand::HttpProxyRequestCommand(int cuid, Request* req,
DownloadEngine* e,
const SocketHandle& s)
:AbstractCommand(cuid, req, e, s) {
disableReadCheckSocket();
setWriteCheckSocket(socket);
}

View File

@ -28,7 +28,8 @@ class HttpProxyRequestCommand : public AbstractCommand {
protected:
bool executeInternal(Segment segment);
public:
HttpProxyRequestCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s);
HttpProxyRequestCommand(int cuid, Request* req, DownloadEngine* e,
const SocketHandle& s);
~HttpProxyRequestCommand();
};

View File

@ -24,7 +24,10 @@
#include "DlRetryEx.h"
#include "message.h"
HttpProxyResponseCommand::HttpProxyResponseCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s):AbstractCommand(cuid, req, e, s) {
HttpProxyResponseCommand::HttpProxyResponseCommand(int cuid, Request* req,
DownloadEngine* e,
const SocketHandle& s)
:AbstractCommand(cuid, req, e, s) {
http = new HttpConnection(cuid, socket, req, e->option);
}

View File

@ -31,7 +31,8 @@ private:
protected:
bool executeInternal(Segment segment);
public:
HttpProxyResponseCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s);
HttpProxyResponseCommand(int cuid, Request* req, DownloadEngine* e,
const SocketHandle& s);
~HttpProxyResponseCommand();
};

View File

@ -23,8 +23,11 @@
#include "HttpResponseCommand.h"
#include "HttpConnection.h"
HttpRequestCommand::HttpRequestCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s):AbstractCommand(cuid, req, e, s) {
setReadCheckSocket(NULL);
HttpRequestCommand::HttpRequestCommand(int cuid, Request* req,
DownloadEngine* e,
const SocketHandle& s)
:AbstractCommand(cuid, req, e, s) {
disableReadCheckSocket();
setWriteCheckSocket(socket);
}

View File

@ -29,7 +29,8 @@ protected:
bool executeInternal(Segment segment);
Command* getNextCommand() const;
public:
HttpRequestCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s);
HttpRequestCommand(int cuid, Request* req, DownloadEngine* e,
const SocketHandle& s);
~HttpRequestCommand();
};

View File

@ -29,8 +29,10 @@
#include <sys/types.h>
#include <unistd.h>
HttpResponseCommand::HttpResponseCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s):
AbstractCommand(cuid, req, e, s) {
HttpResponseCommand::HttpResponseCommand(int cuid, Request* req,
DownloadEngine* e,
const SocketHandle& s)
:AbstractCommand(cuid, req, e, s) {
http = new HttpConnection(cuid, socket, req, e->option);
}
@ -154,7 +156,6 @@ bool HttpResponseCommand::handleOtherEncoding(const string& transferEncoding, co
}
void HttpResponseCommand::createHttpDownloadCommand(const string& transferEncoding) {
HttpDownloadCommand* command = new HttpDownloadCommand(cuid, req, e, socket);
TransferEncoding* enc = NULL;
if(transferEncoding.size() && (enc = command->getTransferEncoding(transferEncoding)) == NULL) {

View File

@ -38,7 +38,8 @@ private:
protected:
bool executeInternal(Segment segment);
public:
HttpResponseCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s);
HttpResponseCommand(int cuid, Request* req, DownloadEngine* e,
const SocketHandle& s);
~HttpResponseCommand();
};

View File

@ -22,6 +22,7 @@
#include "InitiateConnectionCommandFactory.h"
#include "HttpInitiateConnectionCommand.h"
#include "FtpInitiateConnectionCommand.h"
#include "DlAbortEx.h"
Command* InitiateConnectionCommandFactory::createInitiateConnectionCommand(int cuid, Request* req, DownloadEngine* e) {
if(req->getProtocol() == "http"
@ -35,6 +36,6 @@ Command* InitiateConnectionCommandFactory::createInitiateConnectionCommand(int c
return new FtpInitiateConnectionCommand(cuid, req, e);
} else {
// these protocols are not supported yet
return NULL;
throw new DlAbortEx("%s is not supported yet.", req->getProtocol().c_str());
}
}

View File

@ -28,7 +28,8 @@
class InitiateConnectionCommandFactory {
public:
static Command* createInitiateConnectionCommand(int cuid, Request* req, DownloadEngine* e);
static Command* createInitiateConnectionCommand(int cuid, Request* req,
DownloadEngine* e);
};
#endif // _D_INITIATE_CONNECTION_COMMAND_FACTORY_H_

View File

@ -1,8 +1,8 @@
bin_PROGRAMS = aria2c
aria2c_SOURCES = main.cc
SRCS = Socket.cc Socket.h\
SRCS = Socket.h\
SocketCore.cc SocketCore.h\
Command.h\
Command.cc Command.h\
AbstractCommand.cc AbstractCommand.h\
InitiateConnectionCommandFactory.cc InitiateConnectionCommandFactory.h\
DownloadCommand.cc DownloadCommand.h\
@ -104,7 +104,8 @@ SRCS += MetaEntry.h\
RejectMessage.cc RejectMessage.h\
AllowedFastMessage.cc AllowedFastMessage.h\
SuggestPieceMessage.cc SuggestPieceMessage.h\
SimplePeerMessage.cc SimplePeerMessage.h
SimplePeerMessage.cc SimplePeerMessage.h\
SharedHandle.h
endif # ENABLE_BITTORRENT
if ENABLE_METALINK

View File

@ -89,7 +89,8 @@ bin_PROGRAMS = aria2c$(EXEEXT)
@ENABLE_BITTORRENT_TRUE@ RejectMessage.cc RejectMessage.h\
@ENABLE_BITTORRENT_TRUE@ AllowedFastMessage.cc AllowedFastMessage.h\
@ENABLE_BITTORRENT_TRUE@ SuggestPieceMessage.cc SuggestPieceMessage.h\
@ENABLE_BITTORRENT_TRUE@ SimplePeerMessage.cc SimplePeerMessage.h
@ENABLE_BITTORRENT_TRUE@ SimplePeerMessage.cc SimplePeerMessage.h\
@ENABLE_BITTORRENT_TRUE@ SharedHandle.h
@ENABLE_METALINK_TRUE@am__append_2 = Metalinker.cc Metalinker.h\
@ENABLE_METALINK_TRUE@ MetalinkEntry.cc MetalinkEntry.h\
@ -121,8 +122,8 @@ AR = ar
ARFLAGS = cru
libaria2c_a_AR = $(AR) $(ARFLAGS)
libaria2c_a_LIBADD =
am__libaria2c_a_SOURCES_DIST = Socket.cc Socket.h SocketCore.cc \
SocketCore.h Command.h AbstractCommand.cc AbstractCommand.h \
am__libaria2c_a_SOURCES_DIST = Socket.h SocketCore.cc SocketCore.h \
Command.cc Command.h AbstractCommand.cc AbstractCommand.h \
InitiateConnectionCommandFactory.cc \
InitiateConnectionCommandFactory.h DownloadCommand.cc \
DownloadCommand.h HttpInitiateConnectionCommand.cc \
@ -186,8 +187,8 @@ am__libaria2c_a_SOURCES_DIST = Socket.cc Socket.h SocketCore.cc \
HaveNoneMessage.cc HaveNoneMessage.h RejectMessage.cc \
RejectMessage.h AllowedFastMessage.cc AllowedFastMessage.h \
SuggestPieceMessage.cc SuggestPieceMessage.h \
SimplePeerMessage.cc SimplePeerMessage.h Metalinker.cc \
Metalinker.h MetalinkEntry.cc MetalinkEntry.h \
SimplePeerMessage.cc SimplePeerMessage.h SharedHandle.h \
Metalinker.cc Metalinker.h MetalinkEntry.cc MetalinkEntry.h \
MetalinkResource.cc MetalinkResource.h MetalinkProcessor.h \
Xml2MetalinkProcessor.cc Xml2MetalinkProcessor.h
@ENABLE_BITTORRENT_TRUE@am__objects_1 = Data.$(OBJEXT) \
@ -240,7 +241,7 @@ am__libaria2c_a_SOURCES_DIST = Socket.cc Socket.h SocketCore.cc \
@ENABLE_METALINK_TRUE@ MetalinkEntry.$(OBJEXT) \
@ENABLE_METALINK_TRUE@ MetalinkResource.$(OBJEXT) \
@ENABLE_METALINK_TRUE@ Xml2MetalinkProcessor.$(OBJEXT)
am__objects_3 = Socket.$(OBJEXT) SocketCore.$(OBJEXT) \
am__objects_3 = SocketCore.$(OBJEXT) Command.$(OBJEXT) \
AbstractCommand.$(OBJEXT) \
InitiateConnectionCommandFactory.$(OBJEXT) \
DownloadCommand.$(OBJEXT) \
@ -420,7 +421,7 @@ sharedstatedir = @sharedstatedir@
sysconfdir = @sysconfdir@
target_alias = @target_alias@
aria2c_SOURCES = main.cc
SRCS = Socket.cc Socket.h SocketCore.cc SocketCore.h Command.h \
SRCS = Socket.h SocketCore.cc SocketCore.h Command.cc Command.h \
AbstractCommand.cc AbstractCommand.h \
InitiateConnectionCommandFactory.cc \
InitiateConnectionCommandFactory.h DownloadCommand.cc \
@ -547,6 +548,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/CancelMessage.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ChokeMessage.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ChunkedEncoding.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Command.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ConsoleDownloadEngine.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/CookieBox.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/CopyDiskAdaptor.Po@am__quote@
@ -614,7 +616,6 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SimpleLogger.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SimplePeerMessage.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SleepCommand.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Socket.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SocketCore.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SplitFirstSegmentSplitter.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SplitSlowestSegmentSplitter.Po@am__quote@

View File

@ -21,7 +21,7 @@
/* copyright --> */
#include "Peer.h"
Peer* Peer::nullPeer = new Peer("", 0, 0, 0);
PeerHandle nullPeer = PeerHandle(new Peer("", 0, 0, 0));
void Peer::updateBitfield(int index, int operation) {
if(operation == 1) {
@ -82,3 +82,11 @@ void Peer::setAllBitfield() {
void Peer::updateLatency(int latency) {
this->latency = (this->latency*20+latency*80)/200;
}
bool operator==(const Peer& p1, const Peer& p2) {
return p1.ipaddr == p2.ipaddr && p1.port == p2.port;
}
bool operator!=(const Peer& p1, const Peer& p2) {
return !(p1 == p2);
}

View File

@ -24,6 +24,7 @@
#include "common.h"
#include "BitfieldMan.h"
#include "SharedHandle.h"
#include <string.h>
#include <string>
@ -33,6 +34,8 @@ using namespace std;
#define DEFAULT_LATENCY 1500
class Peer {
friend bool operator==(const Peer& p1, const Peer& p2);
friend bool operator!=(const Peer& p1, const Peer& p2);
public:
int entryId;
string ipaddr;
@ -60,22 +63,20 @@ private:
int deltaDownload;
int latency;
public:
Peer(string ipaddr, int port, int pieceLength, long long int totalLength):
entryId(0), ipaddr(ipaddr), port(port),
amChoking(true), amInterested(false),
peerChoking(true), peerInterested(false),
tryCount(0), error(0), cuid(0),
chokingRequired(true), optUnchoking(false),
snubbing(false),
bitfield(NULL),
fastExtensionEnabled(false),
peerUpload(0), peerDownload(0),
pieceLength(pieceLength),
deltaUpload(0), deltaDownload(0),
latency(DEFAULT_LATENCY) {
Peer(string ipaddr, int port, int pieceLength, long long int totalLength)
:entryId(0), ipaddr(ipaddr), port(port), error(0),
peerUpload(0), peerDownload(0), pieceLength(pieceLength)
{
resetStatus();
this->bitfield = new BitfieldMan(pieceLength, totalLength);
}
Peer():entryId(0), ipaddr(""), port(0), bitfield(0),
peerUpload(0), peerDownload(0), pieceLength(0)
{
resetStatus();
}
~Peer() {
if(bitfield != NULL) {
delete bitfield;
@ -146,8 +147,11 @@ public:
void updateLatency(int latency);
int getLatency() const { return latency; }
static Peer* nullPeer;
};
bool operator==(const Peer& p1, const Peer& p2);
bool operator!=(const Peer& p1, const Peer& p2);
typedef SharedHandle<Peer> PeerHandle;
#endif // _D_PEER_H_

View File

@ -26,27 +26,20 @@
#include "message.h"
#include "prefs.h"
PeerAbstractCommand::PeerAbstractCommand(int cuid, Peer* peer, TorrentDownloadEngine* e, const Socket* s):
Command(cuid), e(e), peer(peer),
PeerAbstractCommand::PeerAbstractCommand(int cuid, const PeerHandle& peer,
TorrentDownloadEngine* e,
const SocketHandle& s)
:Command(cuid), e(e), socket(s), peer(peer),
checkSocketIsReadable(false), checkSocketIsWritable(false),
uploadLimitCheck(false), uploadLimit(0) {
if(s != NULL) {
socket = new Socket(*s);
setReadCheckSocket(socket);
} else {
socket = NULL;
}
setReadCheckSocket(socket);
timeout = e->option->getAsInt(PREF_TIMEOUT);
e->torrentMan->connections++;
}
PeerAbstractCommand::~PeerAbstractCommand() {
setReadCheckSocket(NULL);
setWriteCheckSocket(NULL);
if(socket != NULL) {
delete(socket);
}
disableReadCheckSocket();
disableWriteCheckSocket();
e->torrentMan->connections--;
}
@ -92,44 +85,52 @@ void PeerAbstractCommand::onAbort(Exception* ex) {
logger->debug("CUID#%d - Peer %s:%d banned.", cuid, peer->ipaddr.c_str(), peer->port);
}
void PeerAbstractCommand::setReadCheckSocket(Socket* socket) {
if(socket == NULL) {
if(checkSocketIsReadable) {
e->deleteSocketForReadCheck(readCheckTarget);
checkSocketIsReadable = false;
readCheckTarget = NULL;
}
void PeerAbstractCommand::disableReadCheckSocket() {
if(checkSocketIsReadable) {
e->deleteSocketForReadCheck(readCheckTarget, getUuid());
checkSocketIsReadable = false;
readCheckTarget = SocketHandle();
}
}
void PeerAbstractCommand::setReadCheckSocket(const SocketHandle& socket) {
if(!socket->isOpen()) {
disableReadCheckSocket();
} else {
if(checkSocketIsReadable) {
if(readCheckTarget != socket) {
e->deleteSocketForReadCheck(readCheckTarget);
e->addSocketForReadCheck(socket, this);
e->deleteSocketForReadCheck(readCheckTarget, getUuid());
e->addSocketForReadCheck(socket, getUuid());
readCheckTarget = socket;
}
} else {
e->addSocketForReadCheck(socket, this);
e->addSocketForReadCheck(socket, getUuid());
checkSocketIsReadable = true;
readCheckTarget = socket;
}
}
}
void PeerAbstractCommand::setWriteCheckSocket(Socket* socket) {
if(socket == NULL) {
if(checkSocketIsWritable) {
e->deleteSocketForWriteCheck(writeCheckTarget);
checkSocketIsWritable = false;
writeCheckTarget = NULL;
}
void PeerAbstractCommand::disableWriteCheckSocket() {
if(checkSocketIsWritable) {
e->deleteSocketForWriteCheck(writeCheckTarget, getUuid());
checkSocketIsWritable = false;
writeCheckTarget = SocketHandle();
}
}
void PeerAbstractCommand::setWriteCheckSocket(const SocketHandle& socket) {
if(!socket->isOpen()) {
disableWriteCheckSocket();
} else {
if(checkSocketIsWritable) {
if(writeCheckTarget != socket) {
e->deleteSocketForWriteCheck(writeCheckTarget);
e->addSocketForWriteCheck(socket, this);
e->deleteSocketForWriteCheck(writeCheckTarget, getUuid());
e->addSocketForWriteCheck(socket, getUuid());
writeCheckTarget = socket;
}
} else {
e->addSocketForWriteCheck(socket, this);
e->addSocketForWriteCheck(socket, getUuid());
checkSocketIsWritable = true;
writeCheckTarget = socket;
}

View File

@ -33,26 +33,30 @@ private:
int timeout;
protected:
TorrentDownloadEngine* e;
Socket* socket;
Peer* peer;
SocketHandle socket;
PeerHandle peer;
void setTimeout(int timeout) { this->timeout = timeout; }
virtual bool prepareForNextPeer(int wait);
virtual bool prepareForRetry(int wait);
virtual void onAbort(Exception* ex);
virtual bool executeInternal() = 0;
void setReadCheckSocket(Socket* socket);
void setWriteCheckSocket(Socket* socket);
void setReadCheckSocket(const SocketHandle& socket);
void setWriteCheckSocket(const SocketHandle& socket);
void disableReadCheckSocket();
void disableWriteCheckSocket();
void setUploadLimit(int uploadLimit);
void setUploadLimitCheck(bool check);
private:
bool checkSocketIsReadable;
bool checkSocketIsWritable;
Socket* readCheckTarget;
Socket* writeCheckTarget;
SocketHandle readCheckTarget;
SocketHandle writeCheckTarget;
bool uploadLimitCheck;
int uploadLimit;
public:
PeerAbstractCommand(int cuid, Peer* peer, TorrentDownloadEngine* e, const Socket* s = NULL);
PeerAbstractCommand(int cuid, const PeerHandle& peer,
TorrentDownloadEngine* e,
const SocketHandle& s = SocketHandle());
virtual ~PeerAbstractCommand();
bool execute();
};

View File

@ -26,12 +26,13 @@ PeerChokeCommand::PeerChokeCommand(int cuid, int interval, TorrentDownloadEngine
PeerChokeCommand::~PeerChokeCommand() {}
void PeerChokeCommand::setAllPeerChoked(Peers& peers) const {
for(Peers::iterator itr = peers.begin(); itr != peers.end(); itr++) {
Peer* peer = *itr;
class ChokePeer {
public:
ChokePeer() {}
void operator()(PeerHandle& peer) {
peer->chokingRequired = true;
}
}
};
void PeerChokeCommand::optUnchokingPeer(Peers& peers) const {
if(peers.empty()) {
@ -52,17 +53,18 @@ void PeerChokeCommand::optUnchokingPeer(Peers& peers) const {
}
}
void PeerChokeCommand::setAllPeerResetDelta(Peers& peers) const {
for(Peers::iterator itr = peers.begin(); itr != peers.end(); itr++) {
Peer* peer = *itr;
class ResetDelta {
public:
ResetDelta() {}
void operator()(PeerHandle& peer) {
peer->resetDeltaUpload();
peer->resetDeltaDownload();
}
}
};
class UploadFaster {
public:
bool operator() (const Peer* left, const Peer* right) const {
bool operator() (const PeerHandle& left, const PeerHandle& right) const {
return left->getDeltaUpload() > right->getDeltaUpload();
}
};
@ -73,13 +75,13 @@ void PeerChokeCommand::orderByUploadRate(Peers& peers) const {
class DownloadFaster {
public:
bool operator() (const Peer* left, const Peer* right) const {
bool operator() (const PeerHandle& left, const PeerHandle& right) const {
return left->getDeltaDownload() > right->getDeltaDownload();
}
};
void PeerChokeCommand::orderByDownloadRate(Peers& peers) const {
sort(peers.begin(), peers.end(), UploadFaster());
sort(peers.begin(), peers.end(), DownloadFaster());
}
bool PeerChokeCommand::execute() {
@ -89,7 +91,7 @@ bool PeerChokeCommand::execute() {
if(checkPoint.elapsed(interval)) {
checkPoint.reset();
Peers peers = e->torrentMan->getActivePeers();
setAllPeerChoked(peers);
for_each(peers.begin(), peers.end(), ChokePeer());
if(e->torrentMan->downloadComplete()) {
orderByDownloadRate(peers);
} else {
@ -97,24 +99,30 @@ bool PeerChokeCommand::execute() {
}
int unchokingCount = 4;//peers.size() >= 4 ? 4 : peers.size();
for(Peers::iterator itr = peers.begin(); itr != peers.end() && unchokingCount > 0; ) {
Peer* peer = *itr;
PeerHandle peer = *itr;
if(peer->peerInterested && !peer->snubbing) {
unchokingCount--;
peer->chokingRequired = false;
peer->optUnchoking = false;
itr = peers.erase(itr);
logger->debug("cat01, unchoking %s, delta=%d", peer->ipaddr.c_str(), peer->getDeltaUpload());
logger->debug("cat01, unchoking %s, delta=%d",
peer->ipaddr.c_str(),
e->torrentMan->downloadComplete() ?
peer->getDeltaDownload() : peer->getDeltaUpload());
} else {
itr++;
}
}
for(Peers::iterator itr = peers.begin(); itr != peers.end(); ) {
Peer* peer = *itr;
PeerHandle peer = *itr;
if(!peer->peerInterested && !peer->snubbing) {
peer->chokingRequired = false;
peer->optUnchoking = false;
itr = peers.erase(itr);
logger->debug("cat02, unchoking %s, delta=%d", peer->ipaddr.c_str(), peer->getDeltaUpload());
logger->debug("cat02, unchoking %s, delta=%d",
peer->ipaddr.c_str(),
e->torrentMan->downloadComplete() ?
peer->getDeltaDownload() : peer->getDeltaUpload());
break;
} else {
itr++;
@ -125,7 +133,9 @@ bool PeerChokeCommand::execute() {
rotate = 0;
}
rotate++;
setAllPeerResetDelta(e->torrentMan->getActivePeers());
for_each(e->torrentMan->getActivePeers().begin(),
e->torrentMan->getActivePeers().end(),
ResetDelta());
}
e->commands.push_back(this);
return false;

View File

@ -35,8 +35,6 @@ private:
void orderByUploadRate(Peers& peers) const;
void orderByDownloadRate(Peers& peers) const;
void setAllPeerChoked(Peers& peers) const;
void setAllPeerResetDelta(Peers& peers) const;
void optUnchokingPeer(Peers& peers) const;
public:

View File

@ -28,7 +28,7 @@
#include <netinet/in.h>
PeerConnection::PeerConnection(int cuid,
const Socket* socket,
const SocketHandle& socket,
const Option* op)
:cuid(cuid),
socket(socket),

View File

@ -27,7 +27,6 @@
#include "Logger.h"
#include "TorrentMan.h"
#include "PeerMessage.h"
#include "HandshakeMessage.h"
#include "common.h"
// we assume maximum length of incoming message is "piece" message with 16KB
@ -37,7 +36,7 @@
class PeerConnection {
private:
int cuid;
const Socket* socket;
SocketHandle socket;
const Option* option;
const Logger* logger;
@ -48,7 +47,7 @@ private:
int lenbufLength;
public:
PeerConnection(int cuid, const Socket* socket, const Option* op);
PeerConnection(int cuid, const SocketHandle& socket, const Option* op);
~PeerConnection();
// Returns the number of bytes written

View File

@ -27,16 +27,13 @@
#include "prefs.h"
PeerInitiateConnectionCommand::PeerInitiateConnectionCommand(int cuid,
Peer* peer,
const PeerHandle& peer,
TorrentDownloadEngine* e)
:PeerAbstractCommand(cuid, peer, e) {}
PeerInitiateConnectionCommand::~PeerInitiateConnectionCommand() {}
bool PeerInitiateConnectionCommand::executeInternal() {
socket = new Socket();
// socket->establishConnection(...);
Command* command;
logger->info(MSG_CONNECTING_TO_SERVER, cuid, peer->ipaddr.c_str(),
peer->port);
@ -50,10 +47,11 @@ bool PeerInitiateConnectionCommand::executeInternal() {
// TODO this method removed when PeerBalancerCommand is implemented
bool PeerInitiateConnectionCommand::prepareForNextPeer(int wait) {
if(e->torrentMan->isPeerAvailable()) {
Peer* peer = e->torrentMan->getPeer();
PeerHandle peer = e->torrentMan->getPeer();
int newCuid = e->torrentMan->getNewCuid();
peer->cuid = newCuid;
PeerInitiateConnectionCommand* command = new PeerInitiateConnectionCommand(newCuid, peer, e);
PeerInitiateConnectionCommand* command =
new PeerInitiateConnectionCommand(newCuid, peer, e);
e->commands.push_back(command);
}
return true;

View File

@ -30,7 +30,8 @@ protected:
bool prepareForRetry(int wait);
bool prepareForNextPeer(int wait);
public:
PeerInitiateConnectionCommand(int cuid, Peer* peer, TorrentDownloadEngine* e);
PeerInitiateConnectionCommand(int cuid, const PeerHandle& peer,
TorrentDownloadEngine* e);
~PeerInitiateConnectionCommand();
};

View File

@ -28,10 +28,10 @@
#include <netinet/in.h>
PeerInteraction::PeerInteraction(int cuid,
const Socket* socket,
const PeerHandle& peer,
const SocketHandle& socket,
const Option* op,
TorrentMan* torrentMan,
Peer* peer)
TorrentMan* torrentMan)
:cuid(cuid),
uploadLimit(0),
torrentMan(torrentMan),
@ -43,7 +43,6 @@ PeerInteraction::PeerInteraction(int cuid,
PeerInteraction::~PeerInteraction() {
delete peerConnection;
for_each(messageQueue.begin(), messageQueue.end(), Deleter());
}
class MsgPushBack {
@ -52,14 +51,14 @@ private:
public:
MsgPushBack(MessageQueue* messageQueue):messageQueue(messageQueue) {}
void operator()(PeerMessage* msg) {
void operator()(const PeerMessageHandle& msg) {
messageQueue->push_back(msg);
}
};
bool PeerInteraction::isSendingMessageInProgress() const {
if(messageQueue.size() > 0) {
PeerMessage* peerMessage = messageQueue.front();
const PeerMessageHandle& peerMessage = messageQueue.front();
if(peerMessage->isInProgress()) {
return true;
}
@ -70,7 +69,7 @@ bool PeerInteraction::isSendingMessageInProgress() const {
void PeerInteraction::sendMessages(int uploadSpeed) {
MessageQueue tempQueue;
while(messageQueue.size() > 0) {
PeerMessage* msg = messageQueue.front();
PeerMessageHandle msg = messageQueue.front();
messageQueue.pop_front();
if(uploadLimit != 0 && uploadLimit*1024 <= uploadSpeed &&
msg->getId() == PieceMessage::ID && !msg->isInProgress()) {
@ -78,27 +77,20 @@ void PeerInteraction::sendMessages(int uploadSpeed) {
//((PieceMessage*)msg)->incrementPendingCount();
tempQueue.push_back(msg);
} else {
try {
msg->send();
} catch(Exception* ex) {
delete msg;
throw;
}
msg->send();
if(msg->isInProgress()) {
messageQueue.push_front(msg);
break;
} else {
delete msg;
}
}
}
for_each(tempQueue.begin(), tempQueue.end(), MsgPushBack(&messageQueue));
}
void PeerInteraction::addMessage(PeerMessage* peerMessage) {
void PeerInteraction::addMessage(const PeerMessageHandle& peerMessage) {
messageQueue.push_back(peerMessage);
if(peerMessage->getId() == RequestMessage::ID) {
RequestMessage* requestMessage = (RequestMessage*)peerMessage;
RequestMessage* requestMessage = (RequestMessage*)peerMessage.get();
RequestSlot requestSlot(requestMessage->getIndex(),
requestMessage->getBegin(),
requestMessage->getLength(),
@ -113,8 +105,8 @@ void PeerInteraction::rejectAllPieceMessageInQueue() {
itr != messageQueue.end();) {
// Don't delete piece message which is in the allowed fast set.
if((*itr)->getId() == PieceMessage::ID && !(*itr)->isInProgress()
&& !isInFastSet(((PieceMessage*)*itr)->getIndex())) {
PieceMessage* pieceMessage = (PieceMessage*)*itr;
&& !isInFastSet(((PieceMessage*)(*itr).get())->getIndex())) {
PieceMessage* pieceMessage = (PieceMessage*)(*itr).get();
logger->debug("CUID#%d - Reject piece message in queue because"
" peer has been choked. index=%d, begin=%d, length=%d",
cuid,
@ -126,7 +118,6 @@ void PeerInteraction::rejectAllPieceMessageInQueue() {
pieceMessage->getBegin(),
pieceMessage->getBlockLength()));
}
delete pieceMessage;
itr = messageQueue.erase(itr);
} else {
itr++;
@ -140,14 +131,13 @@ void PeerInteraction::rejectPieceMessageInQueue(int index, int begin, int length
for(MessageQueue::iterator itr = messageQueue.begin();
itr != messageQueue.end();) {
if((*itr)->getId() == PieceMessage::ID && !(*itr)->isInProgress()) {
PieceMessage* pieceMessage = (PieceMessage*)*itr;
PieceMessage* pieceMessage = (PieceMessage*)(*itr).get();
if(pieceMessage->getIndex() == index &&
pieceMessage->getBegin() == begin &&
pieceMessage->getBlockLength() == length) {
logger->debug("CUID#%d - Reject piece message in queue because cancel"
" message received. index=%d, begin=%d, length=%d",
cuid, index, begin, length);
delete pieceMessage;
itr = messageQueue.erase(itr);
if(peer->isFastExtensionEnabled()) {
tempQueue.push_back(createRejectMessage(index, begin, length));
@ -187,8 +177,7 @@ void PeerInteraction::abortPiece(Piece& piece) {
itr != messageQueue.end();) {
if((*itr)->getId() == RequestMessage::ID &&
!(*itr)->isInProgress() &&
((RequestMessage*)*itr)->getIndex() == piece.getIndex()) {
delete *itr;
((RequestMessage*)(*itr).get())->getIndex() == piece.getIndex()) {
itr = messageQueue.erase(itr);
} else {
itr++;
@ -286,7 +275,7 @@ int PeerInteraction::countRequestSlot() const {
return requestSlots.size();
}
HandshakeMessage* PeerInteraction::receiveHandshake(bool quickReply) {
HandshakeMessageHandle PeerInteraction::receiveHandshake(bool quickReply) {
char msg[HANDSHAKE_MESSAGE_LENGTH];
int msgLength = HANDSHAKE_MESSAGE_LENGTH;
bool retval = peerConnection->receiveHandshake(msg, msgLength);
@ -301,13 +290,8 @@ HandshakeMessage* PeerInteraction::receiveHandshake(bool quickReply) {
if(!retval) {
return NULL;
}
HandshakeMessage* handshakeMessage = createHandshakeMessage(msg, msgLength);
try {
handshakeMessage->check();
} catch(Exception* e) {
delete handshakeMessage;
throw;
}
HandshakeMessageHandle handshakeMessage(createHandshakeMessage(msg, msgLength));
handshakeMessage->check();
if(handshakeMessage->isFastExtensionSupported()) {
peer->setFastExtensionEnabled(true);
logger->info("CUID#%d - Fast extension enabled.", cuid);
@ -315,30 +299,25 @@ HandshakeMessage* PeerInteraction::receiveHandshake(bool quickReply) {
return handshakeMessage;
}
HandshakeMessage* PeerInteraction::createHandshakeMessage(const char* msg, int msgLength) {
HandshakeMessageHandle PeerInteraction::createHandshakeMessage(const char* msg, int msgLength) {
HandshakeMessage* message = HandshakeMessage::create(msg, msgLength);
setPeerMessageCommonProperty(message);
return message;
}
PeerMessage* PeerInteraction::receiveMessage() {
PeerMessageHandle PeerInteraction::receiveMessage() {
char msg[MAX_PAYLOAD_LEN];
int msgLength = 0;
if(!peerConnection->receiveMessage(msg, msgLength)) {
return NULL;
}
PeerMessage* peerMessage = createPeerMessage(msg, msgLength);
try {
peerMessage->check();
} catch(Exception* e) {
delete peerMessage;
throw;
}
PeerMessageHandle peerMessage(createPeerMessage(msg, msgLength));
peerMessage->check();
return peerMessage;
}
PeerMessage* PeerInteraction::createPeerMessage(const char* msg, int msgLength) {
PeerMessageHandle PeerInteraction::createPeerMessage(const char* msg, int msgLength) {
PeerMessage* peerMessage;
if(msgLength == 0) {
// keep-alive

View File

@ -42,12 +42,15 @@
#include "AllowedFastMessage.h"
#include "SuggestPieceMessage.h"
#include "RequestSlot.h"
#include "SharedHandle.h"
#define REQUEST_TIME_OUT 60
#define ALLOWED_FAST_SET_SIZE 10
typedef SharedHandle<PeerMessage> PeerMessageHandle;
typedef SharedHandle<HandshakeMessage> HandshakeMessageHandle;
typedef deque<RequestSlot> RequestSlots;
typedef deque<PeerMessage*> MessageQueue;
typedef deque<PeerMessageHandle> MessageQueue;
class PeerInteraction {
private:
@ -58,7 +61,7 @@ private:
int uploadLimit;
TorrentMan* torrentMan;
PeerConnection* peerConnection;
Peer* peer;
PeerHandle peer;
Pieces pieces;
// allowed fast piece indexes that local client has sent
Integers fastSet;
@ -66,19 +69,19 @@ private:
const Logger* logger;
void getNewPieceAndSendInterest(int pieceNum);
PeerMessage* createPeerMessage(const char* msg, int msgLength);
HandshakeMessage* createHandshakeMessage(const char* msg, int msgLength);
PeerMessageHandle createPeerMessage(const char* msg, int msgLength);
HandshakeMessageHandle createHandshakeMessage(const char* msg, int msgLength);
void setPeerMessageCommonProperty(PeerMessage* peerMessage);
int countRequestSlot() const;
public:
PeerInteraction(int cuid,
const Socket* socket,
const PeerHandle& peer,
const SocketHandle& socket,
const Option* op,
TorrentMan* torrentMan,
Peer* peer);
TorrentMan* torrentMan);
~PeerInteraction();
void addMessage(PeerMessage* peerMessage);
void addMessage(const PeerMessageHandle& peerMessage);
void rejectPieceMessageInQueue(int index, int begin, int length);
void rejectAllPieceMessageInQueue();
void onChoked();
@ -116,8 +119,8 @@ public:
void sendBitfield();
void sendAllowedFast();
PeerMessage* receiveMessage();
HandshakeMessage* receiveHandshake(bool quickReply = false);
PeerMessageHandle receiveMessage();
HandshakeMessageHandle receiveHandshake(bool quickReply = false);
RequestMessage* createRequestMessage(int index, int blockIndex);
CancelMessage* createCancelMessage(int index, int begin, int length);

View File

@ -28,28 +28,30 @@
#include "prefs.h"
#include <algorithm>
PeerInteractionCommand::PeerInteractionCommand(int cuid, Peer* peer,
PeerInteractionCommand::PeerInteractionCommand(int cuid,
const PeerHandle& p,
TorrentDownloadEngine* e,
const Socket* s, int sequence)
:PeerAbstractCommand(cuid, peer, e, s), sequence(sequence) {
const SocketHandle& s,
int sequence)
:PeerAbstractCommand(cuid, p, e, s), sequence(sequence) {
if(sequence == INITIATOR_SEND_HANDSHAKE) {
setReadCheckSocket(NULL);
disableReadCheckSocket();
setWriteCheckSocket(socket);
setTimeout(e->option->getAsInt(PREF_PEER_CONNECTION_TIMEOUT));
}
peerInteraction = new PeerInteraction(cuid, socket, e->option,
e->torrentMan, this->peer);
peerInteraction = new PeerInteraction(cuid, peer, socket, e->option,
e->torrentMan);
peerInteraction->setUploadLimit(e->option->getAsInt(PREF_UPLOAD_LIMIT));
setUploadLimit(e->option->getAsInt(PREF_UPLOAD_LIMIT));
chokeUnchokeCount = 0;
haveCount = 0;
keepAliveCount = 0;
e->torrentMan->addActivePeer(this->peer);
e->torrentMan->addActivePeer(peer);
}
PeerInteractionCommand::~PeerInteractionCommand() {
delete peerInteraction;
e->torrentMan->deleteActivePeer(this->peer);
e->torrentMan->deleteActivePeer(peer);
}
bool PeerInteractionCommand::executeInternal() {
@ -58,7 +60,7 @@ bool PeerInteractionCommand::executeInternal() {
setReadCheckSocket(socket);
setTimeout(e->option->getAsInt(PREF_TIMEOUT));
}
setWriteCheckSocket(NULL);
disableWriteCheckSocket();
setUploadLimitCheck(false);
switch(sequence) {
@ -73,15 +75,15 @@ bool PeerInteractionCommand::executeInternal() {
break;
}
}
HandshakeMessage* handshakeMessage = peerInteraction->receiveHandshake();
if(handshakeMessage == NULL) {
HandshakeMessageHandle handshakeMessage =
peerInteraction->receiveHandshake();
if(handshakeMessage.get() == NULL) {
break;
}
peer->setPeerId(handshakeMessage->peerId);
logger->info(MSG_RECEIVE_PEER_MESSAGE, cuid,
peer->ipaddr.c_str(), peer->port,
handshakeMessage->toString().c_str());
delete handshakeMessage;
haveCheckTime.reset();
peerInteraction->sendBitfield();
peerInteraction->sendAllowedFast();
@ -89,16 +91,15 @@ bool PeerInteractionCommand::executeInternal() {
break;
}
case RECEIVER_WAIT_HANDSHAKE: {
HandshakeMessage* handshakeMessage =
HandshakeMessageHandle handshakeMessage =
peerInteraction->receiveHandshake(true);
if(handshakeMessage == NULL) {
if(handshakeMessage.get() == NULL) {
break;
}
peer->setPeerId(handshakeMessage->peerId);
logger->info(MSG_RECEIVE_PEER_MESSAGE, cuid,
peer->ipaddr.c_str(), peer->port,
handshakeMessage->toString().c_str());
delete handshakeMessage;
haveCheckTime.reset();
peerInteraction->sendBitfield();
peerInteraction->sendAllowedFast();
@ -174,8 +175,8 @@ void PeerInteractionCommand::decideChoking() {
void PeerInteractionCommand::receiveMessages() {
for(int i = 0; i < 50; i++) {
PeerMessage* message = peerInteraction->receiveMessage();
if(message == NULL) {
PeerMessageHandle message = peerInteraction->receiveMessage();
if(message.get() == NULL) {
return;
}
logger->info(MSG_RECEIVE_PEER_MESSAGE, cuid,
@ -200,23 +201,18 @@ void PeerInteractionCommand::receiveMessages() {
haveCount++;
break;
}
try {
message->receivedAction();
delete message;
} catch(Exception* ex) {
delete message;
throw;
}
message->receivedAction();
}
}
// TODO this method removed when PeerBalancerCommand is implemented
bool PeerInteractionCommand::prepareForNextPeer(int wait) {
if(e->torrentMan->isPeerAvailable()) {
Peer* peer = e->torrentMan->getPeer();
PeerHandle peer = e->torrentMan->getPeer();
int newCuid = e->torrentMan->getNewCuid();
peer->cuid = newCuid;
PeerInitiateConnectionCommand* command = new PeerInitiateConnectionCommand(newCuid, peer, e);
PeerInitiateConnectionCommand* command =
new PeerInitiateConnectionCommand(newCuid, peer, e);
e->commands.push_back(command);
}
return true;

View File

@ -57,7 +57,9 @@ protected:
bool prepareForNextPeer(int wait);
void onAbort(Exception* ex);
public:
PeerInteractionCommand(int cuid, Peer* peer, TorrentDownloadEngine* e, const Socket* s, int sequence);
PeerInteractionCommand(int cuid, const PeerHandle& peer,
TorrentDownloadEngine* e,
const SocketHandle& s, int sequence);
~PeerInteractionCommand();
enum Seq {

View File

@ -23,13 +23,9 @@
#include "PeerInteractionCommand.h"
PeerListenCommand::PeerListenCommand(int cuid, TorrentDownloadEngine* e)
:Command(cuid), e(e), socket(NULL) {}
:Command(cuid), e(e) {}
PeerListenCommand::~PeerListenCommand() {
if(socket != NULL) {
delete socket;
}
}
PeerListenCommand::~PeerListenCommand() {}
int PeerListenCommand::bindPort(int portRangeStart, int portRangeEnd) {
if(portRangeStart > portRangeEnd) {
@ -37,7 +33,6 @@ int PeerListenCommand::bindPort(int portRangeStart, int portRangeEnd) {
}
for(int port = portRangeStart; port <= portRangeEnd; port++) {
try {
socket = new Socket();
socket->beginListen(port);
logger->info("CUID#%d - using port %d for accepting new connections",
cuid, port);
@ -45,9 +40,8 @@ int PeerListenCommand::bindPort(int portRangeStart, int portRangeEnd) {
} catch(Exception* ex) {
logger->error("CUID#%d - an error occurred while binding port=%d",
ex, cuid, port);
socket->closeConnection();
delete ex;
delete socket;
socket = NULL;
}
}
return -1;
@ -57,44 +51,33 @@ bool PeerListenCommand::execute() {
if(e->torrentMan->isHalt()) {
return true;
}
try {
for(int i = 0; i < 3 && socket->isReadable(0); i++) {
Socket* peerSocket = NULL;
try {
peerSocket = socket->acceptConnection();
pair<string, int> peerInfo;
peerSocket->getPeerInfo(peerInfo);
pair<string, int> localInfo;
peerSocket->getAddrInfo(localInfo);
if(peerInfo.first != localInfo.first &&
e->torrentMan->connections < MAX_PEERS) {
Peer* peer = new Peer(peerInfo.first, peerInfo.second,
e->torrentMan->pieceLength,
e->torrentMan->getTotalLength());
if(e->torrentMan->addPeer(peer, true)) {
int newCuid = e->torrentMan->getNewCuid();
peer->cuid = newCuid;
PeerInteractionCommand* command =
new PeerInteractionCommand(newCuid, peer, e, peerSocket,
PeerInteractionCommand::RECEIVER_WAIT_HANDSHAKE);
e->commands.push_back(command);
logger->debug("CUID#%d - incoming connection, adding new command CUID#%d", cuid, newCuid);
} else {
delete peer;
}
for(int i = 0; i < 3 && socket->isReadable(0); i++) {
SocketHandle peerSocket;
try {
peerSocket = socket->acceptConnection();
pair<string, int> peerInfo;
peerSocket->getPeerInfo(peerInfo);
pair<string, int> localInfo;
peerSocket->getAddrInfo(localInfo);
if(peerInfo.first != localInfo.first &&
e->torrentMan->connections < MAX_PEERS) {
PeerHandle peer = PeerHandle(new Peer(peerInfo.first, peerInfo.second,
e->torrentMan->pieceLength,
e->torrentMan->getTotalLength()));
if(e->torrentMan->addPeer(peer)) {
int newCuid = e->torrentMan->getNewCuid();
peer->cuid = newCuid;
PeerInteractionCommand* command =
new PeerInteractionCommand(newCuid, peer, e, peerSocket,
PeerInteractionCommand::RECEIVER_WAIT_HANDSHAKE);
e->commands.push_back(command);
logger->debug("CUID#%d - incoming connection, adding new command CUID#%d", cuid, newCuid);
}
delete peerSocket;
} catch(Exception* ex) {
logger->error("CUID#%d - error in accepting connection", ex, cuid);
delete ex;
if(peerSocket != NULL) {
delete peerSocket;
}
}
}
} catch(Exception* e) {
logger->error("CUID#%d - Exception occurred.", e, cuid);
delete e;
}
} catch(Exception* ex) {
logger->error("CUID#%d - error in accepting connection", ex, cuid);
delete ex;
}
}
e->commands.push_back(this);
return false;

View File

@ -28,7 +28,7 @@
class PeerListenCommand : public Command {
private:
TorrentDownloadEngine* e;
Socket* socket;
SocketHandle socket;
public:
PeerListenCommand(int cuid, TorrentDownloadEngine* e);
~PeerListenCommand();

View File

@ -33,7 +33,7 @@ class PeerMessage {
protected:
bool inProgress;
int cuid;
Peer* peer;
PeerHandle peer;
PeerInteraction* peerInteraction;
const Logger* logger;
public:
@ -47,8 +47,8 @@ public:
void setCuid(int cuid) {
this->cuid = cuid;
}
Peer* getPeer() const { return this->peer; }
void setPeer(Peer* peer) {
PeerHandle getPeer() const { return this->peer; }
void setPeer(const PeerHandle& peer) {
this->peer = peer;
}
PeerInteraction* getPeerInteraction() const { return peerInteraction; }

View File

@ -22,101 +22,9 @@
#ifndef _D_SOCKET_H_
#define _D_SOCKET_H_
#include <string>
#include "SocketCore.h"
#include "common.h"
#include "SharedHandle.h"
using namespace std;
class Socket {
private:
SocketCore* core;
/**
* This method doesn't increment the use count of core.
*/
Socket(SocketCore* core);
public:
Socket();
Socket(const Socket& s);
~Socket();
Socket& operator=(const Socket& s);
/**
* Returns socket descriptor of this socket.
* @returns socket descriptor of this socket.
*/
int getSockfd() const { return core->sockfd; }
/**
* @see SocketCore::beginListen()
*/
void beginListen(int port = 0) const;
/**
* @see SocketCore::getAddrInfo()
*/
void getAddrInfo(pair<string, int>& addrinfo) const;
/**
* @see SocketCore::getPeerInfo();
*/
void getPeerInfo(pair<string, int>& peerinfo) const;
/**
* @see SocketCore::acceptConnection()
*/
Socket* acceptConnection() const;
/**
* @see SocketCore::establishConnection()
*/
void establishConnection(const string& host, int port) const;
/**
* @see SocketCore::setBlockingMode()
*/
void setBlockingMode() const;
/**
* @see SocketCore::closeConnection()
*/
void closeConnection() const;
/**
* @see SocketCore::isWritable()
*/
bool isWritable(int timeout) const;
/**
* @see SocketCore::isReadable()
*/
bool isReadable(int timeout) const;
/**
* @see SocketCore::writeData()
*/
void writeData(const char* data, int len) const;
/**
* A covenient function that can take string class parameter and
* internally calls SocketCore::writeData().
*/
void writeData(const string& str) const;
/**
* @see SocketCore::readData()
*/
void readData(char* data, int& len) const;
/**
* @see SocketCore::peekData()
*/
void peekData(char* data, int& len) const;
/**
* @see SocketCore::initiateSecureConnection()
*/
void initiateSecureConnection() const;
};
typedef SharedHandle<SocketCore> SocketHandle;
#endif // _D_SOCKET_H_

View File

@ -456,3 +456,14 @@ void SocketCore::initiateSecureConnection() {
#endif // HAVE_LIBGNUTLS
}
bool operator==(const SocketCore& s1, const SocketCore& s2) {
return s1.sockfd == s2.sockfd;
}
bool operator!=(const SocketCore& s1, const SocketCore& s2) {
return s1.sockfd != s2.sockfd;
}
bool operator<(const SocketCore& s1, const SocketCore& s2) {
return s1.sockfd < s2.sockfd;
}

View File

@ -22,9 +22,9 @@
#ifndef _D_SOCKET_CORE_H_
#define _D_SOCKET_CORE_H_
#include "common.h"
#include <string>
#include <utility>
#include "common.h"
#ifdef HAVE_LIBSSL
// for SSL
@ -37,7 +37,9 @@
using namespace std;
class SocketCore {
friend class Socket;
friend bool operator==(const SocketCore& s1, const SocketCore& s2);
friend bool operator!=(const SocketCore& s1, const SocketCore& s2);
friend bool operator<(const SocketCore& s1, const SocketCore& s2);
private:
// socket endpoint descriptor
int sockfd;
@ -68,6 +70,10 @@ public:
SocketCore();
~SocketCore();
int getSockfd() const { return sockfd; }
bool isOpen() const { return sockfd != -1; }
/**
* Creates a socket and listens form connection on it.
* @param port port to listen. If 0 is specified, os automaticaly
@ -143,6 +149,7 @@ public:
* @param len length of data
*/
void writeData(const char* data, int len);
void writeData(const string& msg) { writeData(msg.c_str(), msg.size()); }
/**
* Reads up to len bytes from this socket.
@ -176,5 +183,4 @@ public:
*/
void initiateSecureConnection() ;
};
#endif // _D_SOCKET_CORE_H_

View File

@ -41,18 +41,28 @@
#include <string.h>
#include <algorithm>
extern PeerHandle nullPeer;
TorrentMan::TorrentMan():bitfield(NULL),
peerEntryIdCounter(0), cuidCounter(0),
downloadLength(0), uploadLength(0),
preDownloadLength(0), preUploadLength(0),
deltaDownloadLength(0), deltaUploadLength(0),
peerEntryIdCounter(0),
cuidCounter(0),
downloadLength(0),
uploadLength(0),
preDownloadLength(0),
preUploadLength(0),
deltaDownloadLength(0),
deltaUploadLength(0),
storeDir("."),
setupComplete(false),
halt(false),
interval(DEFAULT_ANNOUNCE_INTERVAL),
minInterval(DEFAULT_ANNOUNCE_MIN_INTERVAL),
complete(0), incomplete(0),
connections(0), trackers(0), diskAdaptor(NULL) {
complete(0),
incomplete(0),
connections(0),
trackers(0),
diskAdaptor(NULL)
{
logger = LogFactory::getInstance();
}
@ -60,9 +70,6 @@ TorrentMan::~TorrentMan() {
if(bitfield != NULL) {
delete bitfield;
}
for(Peers::iterator itr = peers.begin(); itr != peers.end(); itr++) {
delete *itr;
}
if(diskAdaptor != NULL) {
delete diskAdaptor;
}
@ -73,32 +80,25 @@ void TorrentMan::updatePeers(const Peers& peers) {
this->peers = peers;
}
bool TorrentMan::addPeer(Peer* peer, bool duplicate) {
bool TorrentMan::addPeer(const PeerHandle& peer) {
if(peers.size() >= MAX_PEER_LIST_SIZE) {
deleteOldErrorPeers();
deleteErrorPeer();
}
if(duplicate) {
for(Peers::iterator itr = peers.begin(); itr != peers.end(); itr++) {
Peer* p = *itr;
if(p->ipaddr == peer->ipaddr && p->port == peer->port && p->error > 0) {
return false;
}
}
Peers::iterator itr = find(peers.begin(), peers.end(), peer);
if(itr == peers.end()) {
++peerEntryIdCounter;
peer->entryId = peerEntryIdCounter;
peers.push_back(peer);
return true;
} else {
if(peers.size() >= MAX_PEER_LIST_SIZE) {
const PeerHandle& peer = *itr;
if(peer->error >= MAX_PEER_ERROR || peer->cuid != 0) {
return false;
}
for(Peers::iterator itr = peers.begin(); itr != peers.end(); itr++) {
Peer* p = *itr;
if(p->ipaddr == peer->ipaddr && p->port == peer->port) {
return false;
}
}
} else {
*itr = peer;
return true;
}
}
++peerEntryIdCounter;
peer->entryId = peerEntryIdCounter;
peers.push_back(peer);
return true;
}
/*
@ -114,14 +114,13 @@ void TorrentMan::updatePeer(const Peer& peer) {
*/
bool TorrentMan::isPeerAvailable() const {
return getPeer() != Peer::nullPeer;
return getPeer() != nullPeer;
}
void TorrentMan::deleteOldErrorPeers() {
void TorrentMan::deleteErrorPeer() {
for(Peers::iterator itr = peers.begin(); itr != peers.end();) {
Peer* p = *itr;
if(p->error >= MAX_PEER_ERROR && p->cuid == 0) {
delete p;
const PeerHandle& p = *itr;
if(p->error > 0 && p->cuid == 0) {
itr = peers.erase(itr);
} else {
itr++;
@ -129,29 +128,29 @@ void TorrentMan::deleteOldErrorPeers() {
}
}
Peer* TorrentMan::getPeer() const {
if(connections > MAX_PEER_UPDATE) {
return Peer::nullPeer;
PeerHandle TorrentMan::getPeer() const {
if(connections > MIN_PEERS) {
return nullPeer;
}
for(Peers::const_iterator itr = peers.begin(); itr != peers.end(); itr++) {
Peer* p = *itr;
const PeerHandle& p = *itr;
if(p->cuid == 0 && p->error < MAX_PEER_ERROR) {
return p;
}
}
return Peer::nullPeer;
return nullPeer;
}
bool TorrentMan::isEndGame() const {
return bitfield->countMissingBlock() <= END_GAME_PIECE_NUM;
}
bool TorrentMan::hasMissingPiece(const Peer* peer) const {
bool TorrentMan::hasMissingPiece(const PeerHandle& peer) const {
return bitfield->hasMissingPiece(peer->getBitfield(),
peer->getBitfieldLength());
}
int TorrentMan::getMissingPieceIndex(const Peer* peer) const {
int TorrentMan::getMissingPieceIndex(const PeerHandle& peer) const {
int index = -1;
if(isEndGame()) {
index = bitfield->getMissingIndex(peer->getBitfield(),
@ -163,7 +162,7 @@ int TorrentMan::getMissingPieceIndex(const Peer* peer) const {
return index;
}
int TorrentMan::getMissingFastPieceIndex(const Peer* peer) const {
int TorrentMan::getMissingFastPieceIndex(const PeerHandle& peer) const {
int index = -1;
if(peer->isFastExtensionEnabled() && peer->countFastSet() > 0) {
BitfieldMan tempBitfield(pieceLength, totalLength);
@ -184,12 +183,12 @@ int TorrentMan::getMissingFastPieceIndex(const Peer* peer) const {
return index;
}
Piece TorrentMan::getMissingFastPiece(const Peer* peer) {
Piece TorrentMan::getMissingFastPiece(const PeerHandle& peer) {
int index = getMissingFastPieceIndex(peer);
return checkOutPiece(index);
}
Piece TorrentMan::getMissingPiece(const Peer* peer) {
Piece TorrentMan::getMissingPiece(const PeerHandle& peer) {
int index = getMissingPieceIndex(peer);
return checkOutPiece(index);
}

View File

@ -46,7 +46,7 @@ using namespace std;
#define DEFAULT_ANNOUNCE_INTERVAL 1800
#define DEFAULT_ANNOUNCE_MIN_INTERVAL 1800
#define MAX_PEERS 55
#define MAX_PEER_UPDATE 15
#define MIN_PEERS 15
#define MAX_PEER_LIST_SIZE 250
#define END_GAME_PIECE_NUM 20
#define MAX_PEER_ERROR 5
@ -61,7 +61,7 @@ public:
index(index) {}
};
typedef deque<Peer*> Peers;
typedef deque<PeerHandle> Peers;
typedef deque<HaveEntry> Haves;
typedef deque<int> PieceIndexes;
typedef deque<Piece> Pieces;
@ -133,18 +133,18 @@ public:
// TODO do not use this method
void updatePeers(const Peers& peers);
bool addPeer(Peer* peer, bool duplicate = false);
bool addPeer(const PeerHandle& peer);
//void updatePeer(const Peer* peer);
const Peers& getPeers() const { return peers; }
Peer* getPeer() const;
PeerHandle getPeer() const;
bool isPeerAvailable() const;
void deleteOldErrorPeers();
void deleteErrorPeer();
bool hasMissingPiece(const Peer* peer) const;
int getMissingPieceIndex(const Peer* peer) const;
int getMissingFastPieceIndex(const Peer* peer) const;
Piece getMissingPiece(const Peer* peer);
Piece getMissingFastPiece(const Peer* peer);
bool hasMissingPiece(const PeerHandle& peer) const;
int getMissingPieceIndex(const PeerHandle& peer) const;
int getMissingFastPieceIndex(const PeerHandle& peer) const;
Piece getMissingPiece(const PeerHandle& peer);
Piece getMissingFastPiece(const PeerHandle& peer);
void completePiece(const Piece& piece);
void cancelPiece(const Piece& piece);
void updatePiece(const Piece& piece);
@ -246,15 +246,17 @@ public:
void onDownloadComplete();
void addActivePeer(Peer* peer) {
void addActivePeer(const PeerHandle& peer) {
activePeers.push_back(peer);
}
Peers& getActivePeers() { return this->activePeers; }
void deleteActivePeer(Peer* peer) {
void deleteActivePeer(const PeerHandle& peer) {
Peers::iterator itr = find(activePeers.begin(), activePeers.end(), peer);
activePeers.erase(itr);
if(itr != activePeers.end()) {
activePeers.erase(itr);
}
}
bool isHalt() const { return halt; }

View File

@ -69,24 +69,14 @@ bool TrackerUpdateCommand::execute() {
if(!e->segmentMan->finished()) {
return prepareForRetry();
}
MetaEntry* entry = NULL;
char* trackerResponse = NULL;
int trackerResponseLength = 0;
try {
try {
trackerResponse = getTrackerResponse(trackerResponseLength);
entry = MetaFileUtil::bdecoding(trackerResponse,
trackerResponseLength);
if(trackerResponse != NULL) {
delete [] trackerResponse;
}
} catch(Exception* e) {
if(trackerResponse != NULL) {
delete [] trackerResponse;
}
throw;
}
Dictionary* response = (Dictionary*)entry;
trackerResponse = getTrackerResponse(trackerResponseLength);
SharedHandle<MetaEntry> entry(MetaFileUtil::bdecoding(trackerResponse,
trackerResponseLength));
Dictionary* response = (Dictionary*)entry.get();
Data* failureReason = (Data*)response->get("failure reason");
if(failureReason != NULL) {
throw new DlAbortEx("Tracker returned failure reason: %s", failureReason->toString().c_str());
@ -126,7 +116,9 @@ bool TrackerUpdateCommand::execute() {
logger->debug("CUID#%d - Incomplete:%d",
cuid, e->torrentMan->incomplete);
}
if(dynamic_cast<const Data*>(response->get("peers"))) {
if(!e->torrentMan->isHalt() &&
e->torrentMan->connections < MIN_PEERS &&
dynamic_cast<const Data*>(response->get("peers"))) {
Data* peers = (Data*)response->get("peers");
if(peers != NULL && peers->getLen() > 0) {
for(int i = 0; i < peers->getLen(); i += 6) {
@ -139,24 +131,24 @@ bool TrackerUpdateCommand::execute() {
snprintf(ipaddr, sizeof(ipaddr), "%d.%d.%d.%d",
ipaddr1, ipaddr2, ipaddr3, ipaddr4);
Peer* peer = new Peer(ipaddr, port, e->torrentMan->pieceLength,
e->torrentMan->getTotalLength());
PeerHandle peer =
PeerHandle(new Peer(ipaddr, port, e->torrentMan->pieceLength,
e->torrentMan->getTotalLength()));
if(e->torrentMan->addPeer(peer)) {
logger->debug("CUID#%d - Adding peer %s:%d",
cuid, peer->ipaddr.c_str(), peer->port);
} else {
delete peer;
}
}
} else {
logger->info("CUID#%d - No peer list received.", cuid);
}
while(e->torrentMan->isPeerAvailable() &&
e->torrentMan->connections < MAX_PEER_UPDATE) {
Peer* peer = e->torrentMan->getPeer();
e->torrentMan->connections < MIN_PEERS) {
PeerHandle peer = e->torrentMan->getPeer();
int newCuid = e->torrentMan->getNewCuid();
peer->cuid = newCuid;
PeerInitiateConnectionCommand* command = new PeerInitiateConnectionCommand(newCuid, peer, e);
PeerInitiateConnectionCommand* command =
new PeerInitiateConnectionCommand(newCuid, peer, e);
e->commands.push_back(command);
logger->debug("CUID#%d - Adding new command CUID#%d", cuid, newCuid);
}
@ -166,10 +158,10 @@ bool TrackerUpdateCommand::execute() {
}
} catch(Exception* err) {
logger->error("CUID#%d - Error occurred while processing tracker response.", cuid, err);
delete(err);
delete err;
}
if(entry != NULL) {
delete entry;
if(trackerResponse != NULL) {
delete [] trackerResponse;
}
e->torrentMan->trackers = 0;

View File

@ -26,8 +26,6 @@
#include "TorrentDownloadEngine.h"
#include "Time.h"
#define MIN_PEERS 15
class TrackerWatcherCommand : public Command {
private:
TorrentDownloadEngine* e;

View File

@ -870,16 +870,17 @@ int main(int argc, char* argv[]) {
}
te->torrentMan->setPort(port);
te->commands.push_back(listenCommand);
te->commands.push_back(new TrackerWatcherCommand(te->torrentMan->getNewCuid(),
te,
te->torrentMan->minInterval));
te->commands.push_back(new TrackerUpdateCommand(te->torrentMan->getNewCuid(),
te));
te));
te->commands.push_back(new TorrentAutoSaveCommand(te->torrentMan->getNewCuid(),
te,
op->getAsInt(PREF_AUTO_SAVE_INTERVAL)));
te,
op->getAsInt(PREF_AUTO_SAVE_INTERVAL)));
te->commands.push_back(new PeerChokeCommand(te->torrentMan->getNewCuid(),
10, te));
10, te));
te->run();
if(te->torrentMan->downloadComplete()) {