/* */ #include "DownloadCommand.h" #include "Util.h" #include "DlRetryEx.h" #include "DlAbortEx.h" #include "HttpInitiateConnectionCommand.h" #include "InitiateConnectionCommandFactory.h" #include "message.h" #include "prefs.h" #include DownloadCommand::DownloadCommand(int cuid, const RequestHandle req, RequestGroup* requestGroup, DownloadEngine* e, const SocketHandle& s): AbstractCommand(cuid, req, requestGroup, e, s), peerStat(0), transferDecoder(0) { peerStat = _requestGroup->getSegmentMan()->getPeerStat(cuid); if(!peerStat.get()) { peerStat = new PeerStat(cuid); _requestGroup->getSegmentMan()->registerPeerStat(peerStat); } peerStat->downloadStart(); } DownloadCommand::~DownloadCommand() { assert(peerStat.get()); peerStat->downloadStop(); } bool DownloadCommand::executeInternal() { // TODO we need to specify the sum of all segmentMan's download speed here. if(maxDownloadSpeedLimit > 0 && maxDownloadSpeedLimit < _requestGroup->getSegmentMan()->calculateDownloadSpeed()) { usleep(1); e->commands.push_back(this); return false; } int32_t bufSize = 16*1024; char buf[bufSize]; socket->readData(buf, bufSize); if(transferDecoder.isNull()) { _requestGroup->getSegmentMan()->diskWriter->writeData(buf, bufSize, segment->getPositionToWrite()); segment->writtenLength += bufSize; peerStat->updateDownloadLength(bufSize); } else { int32_t infbufSize = 16*1024; char infbuf[infbufSize]; transferDecoder->inflate(infbuf, infbufSize, buf, bufSize); _requestGroup->getSegmentMan()->diskWriter->writeData(infbuf, infbufSize, segment->getPositionToWrite()); segment->writtenLength += infbufSize; peerStat->updateDownloadLength(infbufSize); } // calculate downloading speed if(peerStat->getDownloadStartTime().elapsed(startupIdleTime)) { int32_t nowSpeed = peerStat->calculateDownloadSpeed(); if(lowestDownloadSpeedLimit > 0 && nowSpeed <= lowestDownloadSpeedLimit) { throw new DlAbortEx("CUID#%d - Too slow Downloading speed: %d <= %d(B/s)", cuid, nowSpeed, lowestDownloadSpeedLimit); } } if(_requestGroup->getSegmentMan()->totalSize != 0 && bufSize == 0) { throw new DlRetryEx(EX_GOT_EOF); } if(!transferDecoder.isNull() && transferDecoder->finished() || transferDecoder.isNull() && segment->complete() || bufSize == 0) { if(!transferDecoder.isNull()) transferDecoder->end(); logger->info(MSG_DOWNLOAD_COMPLETED, cuid); _requestGroup->getSegmentMan()->completeSegment(cuid, segment); #ifdef ENABLE_MESSAGE_DIGEST if(e->option->get(PREF_REALTIME_CHUNK_CHECKSUM) == V_TRUE) { _requestGroup->getSegmentMan()->tryChunkChecksumValidation(segment); } #endif // ENABLE_MESSAGE_DIGEST // this unit is going to download another segment. return prepareForNextSegment(); } else { e->commands.push_back(this); return false; } } bool DownloadCommand::prepareForNextSegment() { if(_requestGroup->getSegmentMan()->finished()) { return true; } else { // Merge segment with next segment, if segment.index+1 == nextSegment.index SegmentHandle tempSegment = segment; while(1) { SegmentHandle nextSegment = _requestGroup->getSegmentMan()->getSegment(cuid, tempSegment->index+1); if(nextSegment.isNull()) { break; } else { if(nextSegment->writtenLength > 0) { return prepareForRetry(0); } nextSegment->writtenLength = tempSegment->writtenLength-tempSegment->length; if(nextSegment->complete()) { _requestGroup->getSegmentMan()->completeSegment(cuid, nextSegment); tempSegment = nextSegment; } else { segment = nextSegment; e->commands.push_back(this); return false; } } } return prepareForRetry(0); } }