/* */ #include "AbstractCommand.h" #include #include "Request.h" #include "DownloadEngine.h" #include "Option.h" #include "PeerStat.h" #include "SegmentMan.h" #include "Logger.h" #include "Segment.h" #include "DlAbortEx.h" #include "DlRetryEx.h" #include "DownloadFailureException.h" #include "CreateRequestCommand.h" #include "InitiateConnectionCommandFactory.h" #include "SleepCommand.h" #include "StreamCheckIntegrityEntry.h" #include "PieceStorage.h" #include "Socket.h" #include "message.h" #include "prefs.h" #include "fmt.h" #include "ServerStat.h" #include "RequestGroupMan.h" #include "A2STR.h" #include "util.h" #include "LogFactory.h" #include "DownloadContext.h" #include "wallclock.h" #include "NameResolver.h" #include "uri.h" #include "FileEntry.h" #ifdef ENABLE_ASYNC_DNS #include "AsyncNameResolver.h" #endif // ENABLE_ASYNC_DNS #ifdef ENABLE_MESSAGE_DIGEST # include "ChecksumCheckIntegrityEntry.h" #endif // ENABLE_MESSAGE_DIGEST namespace aria2 { AbstractCommand::AbstractCommand(cuid_t cuid, const SharedHandle& req, const SharedHandle& fileEntry, RequestGroup* requestGroup, DownloadEngine* e, const SocketHandle& s, bool incNumConnection): Command(cuid), checkPoint_(global::wallclock), timeout_(requestGroup->getTimeout()), requestGroup_(requestGroup), req_(req), fileEntry_(fileEntry), e_(e), socket_(s), checkSocketIsReadable_(false), checkSocketIsWritable_(false), nameResolverCheck_(false), incNumConnection_(incNumConnection) { if(socket_ && socket_->isOpen()) { setReadCheckSocket(socket_); } if(incNumConnection_) { requestGroup->increaseStreamConnection(); } requestGroup_->increaseStreamCommand(); requestGroup_->increaseNumCommand(); } AbstractCommand::~AbstractCommand() { disableReadCheckSocket(); disableWriteCheckSocket(); #ifdef ENABLE_ASYNC_DNS disableNameResolverCheck(asyncNameResolver_); #endif // ENABLE_ASYNC_DNS requestGroup_->decreaseNumCommand(); requestGroup_->decreaseStreamCommand(); if(incNumConnection_) { requestGroup_->decreaseStreamConnection(); } } bool AbstractCommand::execute() { A2_LOG_DEBUG(fmt("CUID#%lld - socket: read:%d, write:%d, hup:%d, err:%d", getCuid(), readEventEnabled(), writeEventEnabled(), hupEventEnabled(), errorEventEnabled())); try { if(requestGroup_->downloadFinished() || requestGroup_->isHaltRequested()) { return true; } if(req_ && req_->removalRequested()) { A2_LOG_DEBUG(fmt("CUID#%lld - Discard original URI=%s because it is" " requested.", getCuid(), req_->getUri().c_str())); return prepareForRetry(0); } if(getPieceStorage()) { segments_.clear(); getSegmentMan()->getInFlightSegment(segments_, getCuid()); if(req_ && segments_.empty()) { // This command previously has assigned segments, but it is // canceled. So discard current request chain. Plus, if no // segment is available when http pipelining is used. A2_LOG_DEBUG(fmt("CUID#%lld - It seems previously assigned segments" " are canceled. Restart.", getCuid())); // Request::isPipeliningEnabled() == true means aria2 // accessed the remote server and discovered that the server // supports pipelining. if(req_ && req_->isPipeliningEnabled()) { e_->poolSocket(req_, createProxyRequest(), socket_); } return prepareForRetry(0); } // TODO it is not needed to check other PeerStats every time. // Find faster Request when no segment is available. if(req_ && fileEntry_->countPooledRequest() > 0 && !getPieceStorage()->hasMissingUnusedPiece()) { SharedHandle fasterRequest = fileEntry_->findFasterRequest(req_); if(fasterRequest) { A2_LOG_INFO(fmt("CUID#%lld - Use faster Request hostname=%s, port=%u", getCuid(), fasterRequest->getHost().c_str(), fasterRequest->getPort())); // Cancel current Request object and use faster one. fileEntry_->removeRequest(req_); Command* command = InitiateConnectionCommandFactory::createInitiateConnectionCommand (getCuid(), fasterRequest, fileEntry_, requestGroup_, e_); e_->setNoWait(true); e_->addCommand(command); return true; } } } if((checkSocketIsReadable_ && readEventEnabled()) || (checkSocketIsWritable_ && writeEventEnabled()) || hupEventEnabled() || #ifdef ENABLE_ASYNC_DNS (nameResolverCheck_ && nameResolveFinished()) || #endif // ENABLE_ASYNC_DNS (!checkSocketIsReadable_ && !checkSocketIsWritable_ && !nameResolverCheck_)) { checkPoint_ = global::wallclock; if(getPieceStorage()) { if(!req_ || req_->getMaxPipelinedRequest() == 1 || // Why the following condition is necessary? That's because // For single file download, SegmentMan::getSegment(cuid) // is more efficient. getDownloadContext()->getFileEntries().size() == 1) { size_t maxSegments = req_?req_->getMaxPipelinedRequest():1; size_t minSplitSize = calculateMinSplitSize(); while(segments_.size() < maxSegments) { SharedHandle segment = getSegmentMan()->getSegment(getCuid(), minSplitSize); if(!segment) { break; } else { segments_.push_back(segment); } } if(segments_.empty()) { // TODO socket could be pooled here if pipelining is // enabled... Hmm, I don't think if pipelining is enabled // it does not go here. A2_LOG_INFO(fmt(MSG_NO_SEGMENT_AVAILABLE, getCuid())); // When all segments are ignored in SegmentMan, there are // no URIs available, so don't retry. if(getSegmentMan()->allSegmentsIgnored()) { A2_LOG_DEBUG("All segments are ignored."); return true; } else { return prepareForRetry(1); } } } else { // For multi-file downloads size_t minSplitSize = calculateMinSplitSize(); size_t maxSegments = req_->getMaxPipelinedRequest(); if(segments_.size() < maxSegments) { getSegmentMan()->getSegment (segments_, getCuid(), minSplitSize, fileEntry_, maxSegments); } if(segments_.empty()) { return prepareForRetry(0); } } } return executeInternal(); } else if(errorEventEnabled()) { throw DL_RETRY_EX(fmt(MSG_NETWORK_PROBLEM, socket_->getSocketError().c_str())); } else { if(checkPoint_.difference(global::wallclock) >= timeout_) { // timeout triggers ServerStat error state. SharedHandle ss = e_->getRequestGroupMan()->getOrCreateServerStat(req_->getHost(), req_->getProtocol()); ss->setError(); // Purging IP address cache to renew IP address. A2_LOG_DEBUG(fmt("CUID#%lld - Marking IP address %s as bad", getCuid(), req_->getConnectedAddr().c_str())); e_->markBadIPAddress(req_->getConnectedHostname(), req_->getConnectedAddr(), req_->getConnectedPort()); if(e_->findCachedIPAddress (req_->getConnectedHostname(), req_->getConnectedPort()).empty()) { A2_LOG_DEBUG(fmt("CUID#%lld - All IP addresses were marked bad." " Removing Entry.", getCuid())); e_->removeCachedIPAddress (req_->getConnectedHostname(), req_->getConnectedPort()); } throw DL_RETRY_EX2(EX_TIME_OUT, error_code::TIME_OUT); } e_->addCommand(this); return false; } } catch(DlAbortEx& err) { if(!req_) { A2_LOG_DEBUG_EX(EX_EXCEPTION_CAUGHT, err); } else { A2_LOG_ERROR_EX(fmt(MSG_DOWNLOAD_ABORTED, getCuid(), req_->getUri().c_str()), DL_ABORT_EX2(fmt("URI=%s", req_->getCurrentUri().c_str()), err)); fileEntry_->addURIResult(req_->getUri(), err.getCode()); requestGroup_->setLastErrorCode(err.getCode()); if(err.getCode() == error_code::CANNOT_RESUME) { requestGroup_->increaseResumeFailureCount(); } } onAbort(); tryReserved(); return true; } catch(DlRetryEx& err) { assert(req_); A2_LOG_INFO_EX(fmt(MSG_RESTARTING_DOWNLOAD, getCuid(), req_->getUri().c_str()), DL_RETRY_EX2(fmt("URI=%s", req_->getCurrentUri().c_str()), err)); req_->addTryCount(); req_->resetRedirectCount(); req_->resetUri(); const unsigned int maxTries = getOption()->getAsInt(PREF_MAX_TRIES); bool isAbort = maxTries != 0 && req_->getTryCount() >= maxTries; if(isAbort) { A2_LOG_INFO(fmt(MSG_MAX_TRY, getCuid(), req_->getTryCount())); A2_LOG_ERROR_EX(fmt(MSG_DOWNLOAD_ABORTED, getCuid(), req_->getUri().c_str()), err); fileEntry_->addURIResult(req_->getUri(), err.getCode()); requestGroup_->setLastErrorCode(err.getCode()); if(err.getCode() == error_code::CANNOT_RESUME) { requestGroup_->increaseResumeFailureCount(); } onAbort(); tryReserved(); return true; } else { return prepareForRetry(0); } } catch(DownloadFailureException& err) { A2_LOG_ERROR_EX(EX_EXCEPTION_CAUGHT, err); if(req_) { fileEntry_->addURIResult(req_->getUri(), err.getCode()); requestGroup_->setLastErrorCode(err.getCode()); } requestGroup_->setHaltRequested(true); return true; } } void AbstractCommand::tryReserved() { if(getDownloadContext()->getFileEntries().size() == 1) { const SharedHandle& entry = getDownloadContext()->getFirstFileEntry(); // Don't create new command if currently file length is unknown // and there are no URI left. Because file length is unknown, we // can assume that there are no in-flight request object. if(entry->getLength() == 0 && entry->getRemainingUris().empty()) { A2_LOG_DEBUG(fmt("CUID#%lld - Not trying next request." " No reserved/pooled request is remaining and" " total length is still unknown.", getCuid())); return; } } A2_LOG_DEBUG(fmt("CUID#%lld - Trying reserved/pooled request.", getCuid())); std::vector commands; requestGroup_->createNextCommand(commands, e_, 1); e_->setNoWait(true); e_->addCommand(commands); } bool AbstractCommand::prepareForRetry(time_t wait) { if(getPieceStorage()) { getSegmentMan()->cancelSegment(getCuid()); } if(req_) { fileEntry_->poolRequest(req_); A2_LOG_DEBUG(fmt("CUID#%lld - Pooling request URI=%s", getCuid(), req_->getUri().c_str())); if(getSegmentMan()) { getSegmentMan()->recognizeSegmentFor(fileEntry_); } } Command* command = new CreateRequestCommand(getCuid(), requestGroup_, e_); if(wait == 0) { e_->setNoWait(true); e_->addCommand(command); } else { SleepCommand* scom = new SleepCommand(getCuid(), e_, requestGroup_, command, wait); e_->addCommand(scom); } return true; } void AbstractCommand::onAbort() { if(req_) { fileEntry_->removeIdenticalURI(req_->getUri()); fileEntry_->removeRequest(req_); } A2_LOG_DEBUG(fmt("CUID#%lld - Aborting download", getCuid())); if(getPieceStorage()) { getSegmentMan()->cancelSegment(getCuid()); // Don't do following process if BitTorrent is involved or files // in DownloadContext is more than 1. The latter condition is // limitation of current implementation. if(!getOption()->getAsBool(PREF_ALWAYS_RESUME) && fileEntry_ && getSegmentMan()->calculateSessionDownloadLength() == 0 && !requestGroup_->p2pInvolved() && getDownloadContext()->getFileEntries().size() == 1) { const int maxTries = getOption()->getAsInt(PREF_MAX_RESUME_FAILURE_TRIES); if((maxTries > 0 && requestGroup_->getResumeFailureCount() >= maxTries)|| fileEntry_->emptyRequestUri()) { // Local file exists, but given servers(or at least contacted // ones) doesn't support resume. Let's restart download from // scratch. A2_LOG_NOTICE(fmt("CUID#%lld - Failed to resume download." " Download from scratch.", getCuid())); A2_LOG_DEBUG(fmt("CUID#%lld - Gathering URIs that has CANNOT_RESUME" " error", getCuid())); // Set PREF_ALWAYS_RESUME to A2_V_TRUE to avoid repeating this // process. getOption()->put(PREF_ALWAYS_RESUME, A2_V_TRUE); std::deque res; fileEntry_->extractURIResult(res, error_code::CANNOT_RESUME); if(!res.empty()) { getSegmentMan()->cancelAllSegments(); getSegmentMan()->eraseSegmentWrittenLengthMemo(); getPieceStorage()->markPiecesDone(0); std::vector uris; uris.reserve(res.size()); std::transform(res.begin(), res.end(), std::back_inserter(uris), std::mem_fun_ref(&URIResult::getURI)); A2_LOG_DEBUG(fmt("CUID#%lld - %lu URIs found.", getCuid(), static_cast(uris.size()))); fileEntry_->addUris(uris.begin(), uris.end()); getSegmentMan()->recognizeSegmentFor(fileEntry_); } } } } } void AbstractCommand::disableReadCheckSocket() { if(checkSocketIsReadable_) { e_->deleteSocketForReadCheck(readCheckTarget_, this); checkSocketIsReadable_ = false; readCheckTarget_.reset(); } } void AbstractCommand::setReadCheckSocket(const SocketHandle& socket) { if(!socket->isOpen()) { disableReadCheckSocket(); } else { if(checkSocketIsReadable_) { if(*readCheckTarget_ != *socket) { e_->deleteSocketForReadCheck(readCheckTarget_, this); e_->addSocketForReadCheck(socket, this); readCheckTarget_ = socket; } } else { e_->addSocketForReadCheck(socket, this); checkSocketIsReadable_ = true; readCheckTarget_ = socket; } } } void AbstractCommand::setReadCheckSocketIf (const SharedHandle& socket, bool pred) { if(pred) { setReadCheckSocket(socket); } else { disableReadCheckSocket(); } } void AbstractCommand::disableWriteCheckSocket() { if(checkSocketIsWritable_) { e_->deleteSocketForWriteCheck(writeCheckTarget_, this); checkSocketIsWritable_ = false; writeCheckTarget_.reset(); } } void AbstractCommand::setWriteCheckSocket(const SocketHandle& socket) { if(!socket->isOpen()) { disableWriteCheckSocket(); } else { if(checkSocketIsWritable_) { if(*writeCheckTarget_ != *socket) { e_->deleteSocketForWriteCheck(writeCheckTarget_, this); e_->addSocketForWriteCheck(socket, this); writeCheckTarget_ = socket; } } else { e_->addSocketForWriteCheck(socket, this); checkSocketIsWritable_ = true; writeCheckTarget_ = socket; } } } void AbstractCommand::setWriteCheckSocketIf (const SharedHandle& socket, bool pred) { if(pred) { setWriteCheckSocket(socket); } else { disableWriteCheckSocket(); } } namespace { // Returns proxy option value for the given protocol. const std::string& getProxyOptionFor (const std::string& proxyPref, const SharedHandle