/* */ #include "SegmentMan.h" #include "DlAbortEx.h" #include "Util.h" #include "File.h" #include "message.h" #include "prefs.h" #include "LogFactory.h" #include "BitfieldManFactory.h" #ifdef ENABLE_MESSAGE_DIGEST #include "ChunkChecksumValidator.h" #endif // ENABLE_MESSAGE_DIGEST #include #include #include #include SegmentMan::SegmentMan():logger(LogFactory::getInstance()), bitfield(0), totalSize(0), isSplittable(true), downloadStarted(false), dir("."), errors(0), diskWriter(0) #ifdef ENABLE_MESSAGE_DIGEST , chunkHashLength(0), digestAlgo(DIGEST_ALGO_SHA1) #endif // ENABLE_MESSAGE_DIGEST {} SegmentMan::~SegmentMan() { delete bitfield; } bool SegmentMan::segmentFileExists() const { if(!isSplittable) { return false; } string segFilename = getSegmentFilePath(); File f(segFilename); if(f.isFile()) { logger->info(MSG_SEGMENT_FILE_EXISTS, segFilename.c_str()); return true; } else { logger->info(MSG_SEGMENT_FILE_DOES_NOT_EXIST, segFilename.c_str()); return false; } } void SegmentMan::load() { if(!isSplittable) { return; } string segFilename = getSegmentFilePath(); logger->info(MSG_LOADING_SEGMENT_FILE, segFilename.c_str()); FILE* segFile = openSegFile(segFilename, "r+"); try { read(segFile); fclose(segFile); } catch(string ex) { fclose(segFile); throw new DlAbortEx(EX_SEGMENT_FILE_READ, segFilename.c_str(), strerror(errno)); } logger->info(MSG_LOADED_SEGMENT_FILE); } void SegmentMan::save() const { if(!isSplittable || totalSize == 0 || !bitfield) { return; } string segFilename = getSegmentFilePath(); logger->info(MSG_SAVING_SEGMENT_FILE, segFilename.c_str()); FILE* segFile = openSegFile(segFilename, "w"); try { if(fwrite(&totalSize, sizeof(totalSize), 1, segFile) < 1) { throw string("writeError"); } int segmentLength = bitfield->getBlockLength(); if(fwrite(&segmentLength, sizeof(segmentLength), 1, segFile) < 1) { throw string("writeError"); } if(bitfield) { int bitfieldLength = bitfield->getBitfieldLength(); if(fwrite(&bitfieldLength, sizeof(bitfieldLength), 1, segFile) < 1) { throw string("writeError"); } if(fwrite(bitfield->getBitfield(), bitfield->getBitfieldLength(), 1, segFile) < 1) { throw string("writeError"); } } else { int i = 0; if(fwrite(&i, sizeof(i), 1, segFile) < 1) { throw string("writeError"); } } int usedSegmentCount = usedSegmentEntries.size(); if(fwrite(&usedSegmentCount, sizeof(usedSegmentCount), 1, segFile) < 1) { throw string("writeError"); } for(SegmentEntries::const_iterator itr = usedSegmentEntries.begin(); itr != usedSegmentEntries.end(); itr++) { if(fwrite((*itr)->segment.get(), sizeof(Segment), 1, segFile) < 1) { throw string("writeError"); } } fclose(segFile); logger->info(MSG_SAVED_SEGMENT_FILE); } catch(string ex) { fclose(segFile); throw new DlAbortEx(EX_SEGMENT_FILE_WRITE, segFilename.c_str(), strerror(errno)); } } FILE* SegmentMan::openSegFile(const string& segFilename, const string& mode) const { FILE* segFile = fopen(segFilename.c_str(), mode.c_str()); if(segFile == NULL) { throw new DlAbortEx(EX_SEGMENT_FILE_OPEN, segFilename.c_str(), strerror(errno)); } return segFile; } void SegmentMan::read(FILE* file) { assert(file != NULL); if(fread(&totalSize, sizeof(totalSize), 1, file) < 1) { throw string("readError"); } int segmentSize; if(fread(&segmentSize, sizeof(segmentSize), 1, file) < 1) { throw string("readError"); } int bitfieldLength; if(fread(&bitfieldLength, sizeof(bitfieldLength), 1, file) < 1) { throw string("readError"); } if(bitfieldLength > 0) { initBitfield(segmentSize, totalSize); unsigned char* savedBitfield = new unsigned char[bitfield->getBitfieldLength()]; if(fread(savedBitfield, bitfield->getBitfieldLength(), 1, file) < 1) { delete [] savedBitfield; throw string("readError"); } else { bitfield->setBitfield(savedBitfield, bitfield->getBitfieldLength()); delete [] savedBitfield; } } int segmentCount; if(fread(&segmentCount, sizeof(segmentCount), 1, file) < 1) { throw string("readError"); } while(segmentCount--) { SegmentHandle seg; if(fread(seg.get(), sizeof(Segment), 1, file) < 1) { throw string("readError"); } usedSegmentEntries.push_back(SegmentEntryHandle(new SegmentEntry(0, seg))); } } void SegmentMan::remove() const { if(!isSplittable) { return; } if(segmentFileExists()) { File f(getSegmentFilePath()); f.remove(); } } bool SegmentMan::finished() const { if(!downloadStarted) { return false; } if(!bitfield) { return false; } assert(bitfield); return bitfield->isAllBitSet(); } void SegmentMan::removeIfFinished() const { if(finished()) { remove(); } } void SegmentMan::init() { totalSize = 0; isSplittable = false; downloadStarted = false; errors = 0; //segments.clear(); usedSegmentEntries.clear(); delete bitfield; bitfield = 0; peerStats.clear(); diskWriter->closeFile(); } void SegmentMan::initBitfield(int32_t segmentLength, int64_t totalLength) { delete bitfield; this->bitfield = BitfieldManFactory::getFactoryInstance()->createBitfieldMan(segmentLength, totalLength); } SegmentHandle SegmentMan::checkoutSegment(int32_t cuid, int32_t index) { logger->debug("Attach segment#%d to CUID#%d.", index, cuid); bitfield->setUseBit(index); SegmentEntryHandle segmentEntry = getSegmentEntryByIndex(index); SegmentHandle segment(0); if(segmentEntry.isNull()) { segment = new Segment(index, bitfield->getBlockLength(index), bitfield->getBlockLength()); SegmentEntryHandle entry = new SegmentEntry(cuid, segment); usedSegmentEntries.push_back(entry); } else { segmentEntry->cuid = cuid; segment = segmentEntry->segment; } logger->debug("index=%d, length=%d, segmentLength=%d, writtenLength=%d", segment->index, segment->length, segment->segmentLength, segment->writtenLength); return segment; } SegmentHandle SegmentMan::onNullBitfield(int32_t cuid) { if(usedSegmentEntries.size() == 0) { SegmentHandle segment = new Segment(0, 0, 0); usedSegmentEntries.push_back(SegmentEntryHandle(new SegmentEntry(cuid, segment))); return segment; } else { SegmentEntryHandle segmentEntry = getSegmentEntryByCuid(cuid); if(segmentEntry.isNull()) { return 0; } else { return segmentEntry->segment; } } } SegmentEntryHandle SegmentMan::findSlowerSegmentEntry(const PeerStatHandle& peerStat) const { int speed = (int)(peerStat->getAvgDownloadSpeed()*0.8); SegmentEntryHandle slowSegmentEntry(0); for(SegmentEntries::const_iterator itr = usedSegmentEntries.begin(); itr != usedSegmentEntries.end(); ++itr) { const SegmentEntryHandle& segmentEntry = *itr; if(segmentEntry->cuid == 0) { continue; } PeerStatHandle p = getPeerStat(segmentEntry->cuid); if(!p.get() || p->getCuid() == peerStat->getCuid() || p->getStatus() != PeerStat::ACTIVE || !p->getDownloadStartTime().elapsed(option->getAsInt(PREF_STARTUP_IDLE_TIME))) { continue; } int pSpeed = p->calculateDownloadSpeed(); if(pSpeed < speed) { speed = pSpeed; slowSegmentEntry = segmentEntry; } } return slowSegmentEntry; } SegmentHandle SegmentMan::getSegment(int32_t cuid) { if(!bitfield) { return onNullBitfield(cuid); } SegmentEntryHandle segmentEntry = getSegmentEntryByCuid(cuid); if(!segmentEntry.isNull()) { return segmentEntry->segment; } int index = bitfield->getSparseMissingUnusedIndex(); if(index == -1) { PeerStatHandle myPeerStat = getPeerStat(cuid); if(!myPeerStat.get()) { return 0; } SegmentEntryHandle slowSegmentEntry = findSlowerSegmentEntry(myPeerStat); if(slowSegmentEntry.get()) { logger->info(MSG_SEGMENT_FORWARDING, slowSegmentEntry->cuid, slowSegmentEntry->segment->index, cuid); PeerStatHandle slowPeerStat = getPeerStat(slowSegmentEntry->cuid); slowPeerStat->requestIdle(); cancelSegment(slowSegmentEntry->cuid); return checkoutSegment(cuid, slowSegmentEntry->segment->index); } else { return 0; } } else { return checkoutSegment(cuid, index); } } SegmentHandle SegmentMan::getSegment(int32_t cuid, int32_t index) { if(!bitfield) { return onNullBitfield(cuid); } if(index < 0 || (int32_t)bitfield->countBlock() <= index) { return 0; } if(bitfield->isBitSet(index) || bitfield->isUseBitSet(index)) { return 0; } else { return checkoutSegment(cuid, index); } } /* bool SegmentMan::updateSegment(int cuid, const Segment& segment) { if(segment.isNull()) { return false; } SegmentEntryHandle segmentEntry = getSegmentEntryByCuid(cuid); if(segmentEntry.isNull()) { return false; } else { segmentEntry->segment = segment; return true; } } */ void SegmentMan::cancelSegment(int32_t cuid) { if(bitfield) { for(SegmentEntries::iterator itr = usedSegmentEntries.begin(); itr != usedSegmentEntries.end(); ++itr) { if((*itr)->cuid == cuid) { bitfield->unsetUseBit((*itr)->segment->index); (*itr)->cuid = 0; break; } } } else { usedSegmentEntries.clear(); } } bool SegmentMan::completeSegment(int32_t cuid, const SegmentHandle& segment) { if(segment->isNull()) { return false; } if(bitfield) { bitfield->unsetUseBit(segment->index); bitfield->setBit(segment->index); } else { initBitfield(option->getAsInt(PREF_SEGMENT_SIZE), segment->writtenLength); bitfield->setAllBit(); } SegmentEntries::iterator itr = getSegmentEntryIteratorByCuid(cuid); if(itr == usedSegmentEntries.end()) { return false; } else { usedSegmentEntries.erase(itr); return true; } } bool SegmentMan::hasSegment(int32_t index) const { if(bitfield) { return bitfield->isBitSet(index); } else { return false; } } int64_t SegmentMan::getDownloadLength() const { int64_t dlLength = 0; if(bitfield) { dlLength += bitfield->getCompletedLength(); } for(SegmentEntries::const_iterator itr = usedSegmentEntries.begin(); itr != usedSegmentEntries.end(); itr++) { dlLength += (*itr)->segment->writtenLength; } return dlLength; } void SegmentMan::registerPeerStat(const PeerStatHandle& peerStat) { PeerStatHandle temp = getPeerStat(peerStat->getCuid()); if(!temp.get()) { peerStats.push_back(peerStat); } } int32_t SegmentMan::calculateDownloadSpeed() const { int speed = 0; for(PeerStats::const_iterator itr = peerStats.begin(); itr != peerStats.end(); itr++) { const PeerStatHandle& peerStat = *itr; if(peerStat->getStatus() == PeerStat::ACTIVE) { speed += peerStat->calculateDownloadSpeed(); } } return speed; } bool SegmentMan::fileExists() const { return File(getFilePath()).exists(); } bool SegmentMan::shouldCancelDownloadForSafety() const { return fileExists() && !segmentFileExists() && option->get(PREF_ALLOW_OVERWRITE) != V_TRUE; } void SegmentMan::markAllPiecesDone() { if(bitfield) { bitfield->setAllBit(); } } void SegmentMan::markPieceDone(int64_t length) { if(bitfield) { if(length == bitfield->getTotalLength()) { bitfield->setAllBit(); } else { bitfield->clearAllBit(); int32_t numSegment = length/bitfield->getBlockLength(); int32_t remainingLength = length%bitfield->getBlockLength(); bitfield->setBitRange(0, numSegment-1); if(remainingLength > 0) { SegmentHandle segment = new Segment(); segment->index = numSegment; segment->length = bitfield->getBlockLength(numSegment); segment->segmentLength = bitfield->getBlockLength(); segment->writtenLength = remainingLength; usedSegmentEntries.push_back(new SegmentEntry(0, segment)); } } } } #ifdef ENABLE_MESSAGE_DIGEST void SegmentMan::checkIntegrity() { logger->notice(MSG_VALIDATING_FILE, getFilePath().c_str()); ChunkChecksumValidator v; v.setDigestAlgo(digestAlgo); v.setDiskWriter(diskWriter); v.setFileAllocationMonitor(FileAllocationMonitorFactory::getFactory()->createNewMonitor()); v.validate(bitfield, pieceHashes, chunkHashLength); } #endif // ENABLE_MESSAGE_DIGEST #ifdef ENABLE_MESSAGE_DIGEST bool SegmentMan::isChunkChecksumValidationReady() const { return bitfield && totalSize > 0 && ((int64_t)pieceHashes.size())*chunkHashLength >= totalSize; } #endif // ENABLE_MESSAGE_DIGEST #ifdef ENABLE_MESSAGE_DIGEST void SegmentMan::tryChunkChecksumValidation(const SegmentHandle& segment) { if(!isChunkChecksumValidationReady()) { return; } int32_t hashStartIndex; int32_t hashEndIndex; Util::indexRange(hashStartIndex, hashEndIndex, segment->getPosition(), segment->writtenLength, chunkHashLength); if(!bitfield->isBitSetOffsetRange((int64_t)hashStartIndex*chunkHashLength, chunkHashLength)) { ++hashStartIndex; } if(!bitfield->isBitSetOffsetRange((int64_t)hashEndIndex*chunkHashLength, chunkHashLength)) { --hashEndIndex; } logger->debug("hashStartIndex=%d, hashEndIndex=%d", hashStartIndex, hashEndIndex); if(hashStartIndex > hashEndIndex) { logger->debug(MSG_NO_CHUNK_CHECKSUM); return; } int64_t hashOffset = ((int64_t)hashStartIndex)*chunkHashLength; int32_t startIndex; int32_t endIndex; Util::indexRange(startIndex, endIndex, hashOffset, (hashEndIndex-hashStartIndex+1)*chunkHashLength, bitfield->getBlockLength()); logger->debug("startIndex=%d, endIndex=%d", startIndex, endIndex); if(bitfield->isBitRangeSet(startIndex, endIndex)) { for(int32_t index = hashStartIndex; index <= hashEndIndex; ++index) { int64_t offset = ((int64_t)index)*chunkHashLength; int32_t dataLength = offset+chunkHashLength <= totalSize ? chunkHashLength : totalSize-offset; string actualChecksum = diskWriter->messageDigest(offset, dataLength, digestAlgo); string expectedChecksum = pieceHashes[index]; if(expectedChecksum == actualChecksum) { logger->info(MSG_GOOD_CHUNK_CHECKSUM); } else { logger->info(EX_INVALID_CHUNK_CHECKSUM, index, offset, expectedChecksum.c_str(), actualChecksum.c_str()); logger->debug("Unset bit from %d to %d(inclusive)", startIndex, endIndex); bitfield->unsetBitRange(startIndex, endIndex); break; } } } } #endif // ENABLE_MESSAGE_DIGEST