/* */ #include "PeerInteractionCommand.h" #include "DownloadEngine.h" #include "PeerInitiateConnectionCommand.h" #include "PeerMessageUtil.h" #include "DefaultBtInteractive.h" #include "DlAbortEx.h" #include "Util.h" #include "message.h" #include "prefs.h" #include "DefaultBtMessageDispatcher.h" #include "DefaultBtMessageReceiver.h" #include "DefaultBtRequestFactory.h" #include "DefaultBtMessageFactory.h" #include "DefaultBtInteractive.h" #include "CUIDCounter.h" #include PeerInteractionCommand::PeerInteractionCommand(int32_t cuid, RequestGroup* requestGroup, const PeerHandle& p, DownloadEngine* e, const BtContextHandle& btContext, const SocketHandle& s, Seq sequence, const PeerConnectionHandle& passedPeerConnection) :PeerAbstractCommand(cuid, p, e, s), BtContextAwareCommand(btContext), RequestGroupAware(requestGroup), sequence(sequence), btInteractive(0), maxDownloadSpeedLimit(0) { // TODO move following bunch of processing to separate method, like init() if(sequence == INITIATOR_SEND_HANDSHAKE) { disableReadCheckSocket(); setWriteCheckSocket(socket); setTimeout(e->option->getAsInt(PREF_PEER_CONNECTION_TIMEOUT)); } DefaultBtMessageFactoryHandle factory = new DefaultBtMessageFactory(); factory->setCuid(cuid); factory->setBtContext(btContext); factory->setPeer(peer); PeerConnectionHandle peerConnection = passedPeerConnection.isNull() ? new PeerConnection(cuid, socket, e->option) : passedPeerConnection; DefaultBtMessageDispatcherHandle dispatcher = new DefaultBtMessageDispatcher(); dispatcher->setCuid(cuid); dispatcher->setPeer(peer); dispatcher->setBtContext(btContext); dispatcher->setMaxUploadSpeedLimit(e->option->getAsInt(PREF_MAX_UPLOAD_LIMIT)); dispatcher->setRequestTimeout(e->option->getAsInt(PREF_BT_REQUEST_TIMEOUT)); dispatcher->setBtMessageFactory(factory); DefaultBtMessageReceiverHandle receiver = new DefaultBtMessageReceiver(); receiver->setCuid(cuid); receiver->setPeer(peer); receiver->setBtContext(btContext); receiver->setPeerConnection(peerConnection); receiver->setDispatcher(dispatcher); receiver->setBtMessageFactory(factory); DefaultBtRequestFactoryHandle reqFactory = new DefaultBtRequestFactory(); reqFactory->setCuid(cuid); reqFactory->setPeer(peer); reqFactory->setBtContext(btContext); reqFactory->setBtMessageDispatcher(dispatcher); reqFactory->setBtMessageFactory(factory); DefaultBtInteractiveHandle btInteractive = new DefaultBtInteractive(); btInteractive->setCuid(cuid); btInteractive->setPeer(peer); btInteractive->setBtContext(btContext); btInteractive->setBtMessageReceiver(receiver); btInteractive->setDispatcher(dispatcher); btInteractive->setBtRequestFactory(reqFactory); btInteractive->setPeerConnection(peerConnection); btInteractive->setKeepAliveInterval(e->option->getAsInt(PREF_BT_KEEP_ALIVE_INTERVAL)); btInteractive->setMaxDownloadSpeedLimit(e->option->getAsInt(PREF_MAX_DOWNLOAD_LIMIT)); btInteractive->setBtMessageFactory(factory); this->btInteractive = btInteractive; // reverse depends factory->setBtMessageDispatcher(dispatcher); factory->setBtRequestFactory(reqFactory); factory->setPeerConnection(peerConnection); PeerObjectHandle peerObject = new PeerObject(); peerObject->btMessageDispatcher = dispatcher; peerObject->btMessageReceiver = receiver; peerObject->btMessageFactory = factory; peerObject->btRequestFactory = reqFactory; peerObject->peerConnection = peerConnection; PEER_OBJECT_CLUSTER(btContext)->registerHandle(peer->getId(), peerObject); setUploadLimit(e->option->getAsInt(PREF_MAX_UPLOAD_LIMIT)); peer->activate(); maxDownloadSpeedLimit = e->option->getAsInt(PREF_MAX_DOWNLOAD_LIMIT); btRuntime->increaseConnections(); } PeerInteractionCommand::~PeerInteractionCommand() { peer->deactivate(); PEER_OBJECT_CLUSTER(btContext)->unregisterHandle(peer->getId()); btRuntime->decreaseConnections(); //logger->debug("CUID#%d - unregistered message factory using ID:%s", //cuid, peer->getId().c_str()); } bool PeerInteractionCommand::executeInternal() { setReadCheckSocket(socket); disableWriteCheckSocket(); setUploadLimitCheck(false); setNoCheck(false); switch(sequence) { case INITIATOR_SEND_HANDSHAKE: if(!socket->isWritable(0)) { disableReadCheckSocket(); setWriteCheckSocket(socket); break; } socket->setBlockingMode(); setTimeout(e->option->getAsInt(PREF_BT_TIMEOUT)); btInteractive->initiateHandshake(); sequence = INITIATOR_WAIT_HANDSHAKE; break; case INITIATOR_WAIT_HANDSHAKE: { if(btInteractive->countPendingMessage() > 0) { btInteractive->sendPendingMessage(); if(btInteractive->countPendingMessage() > 0) { break; } } BtMessageHandle handshakeMessage = btInteractive->receiveHandshake(); if(handshakeMessage.isNull()) { break; } btInteractive->doPostHandshakeProcessing(); sequence = WIRED; break; } case RECEIVER_WAIT_HANDSHAKE: { BtMessageHandle handshakeMessage = btInteractive->receiveAndSendHandshake(); if(handshakeMessage.isNull()) { break; } btInteractive->doPostHandshakeProcessing(); sequence = WIRED; break; } case WIRED: btInteractive->doInteractionProcessing(); break; } if(btInteractive->countPendingMessage() > 0) { setNoCheck(true); } if(maxDownloadSpeedLimit > 0) { TransferStat stat = peerStorage->calculateStat(); if(maxDownloadSpeedLimit < stat.downloadSpeed) { disableReadCheckSocket(); setNoCheck(true); } } e->commands.push_back(this); return false; } // TODO this method removed when PeerBalancerCommand is implemented bool PeerInteractionCommand::prepareForNextPeer(int32_t wait) { if(peerStorage->isPeerAvailable() && btRuntime->lessThanEqMinPeer()) { PeerHandle peer = peerStorage->getUnusedPeer(); peer->cuid = CUIDCounterSingletonHolder::instance()->newID(); PeerInitiateConnectionCommand* command = new PeerInitiateConnectionCommand(peer->cuid, _requestGroup, peer, e, btContext); e->commands.push_back(command); } return true; } bool PeerInteractionCommand::prepareForRetry(int32_t wait) { e->commands.push_back(this); return false; } void PeerInteractionCommand::onAbort(Exception* ex) { btInteractive->cancelAllPiece(); peerStorage->returnPeer(peer); //PeerAbstractCommand::onAbort(ex); } bool PeerInteractionCommand::exitBeforeExecute() { return btRuntime->isHalt(); }