diff --git a/src/BtPieceMessage.cc b/src/BtPieceMessage.cc index 3ddc140a..f2e7dccd 100644 --- a/src/BtPieceMessage.cc +++ b/src/BtPieceMessage.cc @@ -57,6 +57,9 @@ #include "DownloadContext.h" #include "PeerStorage.h" #include "array_fun.h" +#include "WrDiskCache.h" +#include "WrDiskCacheEntry.h" +#include "DownloadFailureException.h" namespace aria2 { @@ -119,8 +122,17 @@ void BtPieceMessage::doReceivedAction() A2_LOG_DEBUG("Already have this block."); return; } - getPieceStorage()->getDiskAdaptor()->writeData - (data_+9, blockLength_, offset); + if(piece->getWrDiskCacheEntry()) { + // Write Disk Cache enabled. Unfortunately, it incurs extra data + // copy. + unsigned char* dataCopy = new unsigned char[blockLength_]; + memcpy(dataCopy, data_+9, blockLength_); + piece->updateWrCache(getPieceStorage()->getWrDiskCache(), + dataCopy, 0, blockLength_, offset); + } else { + getPieceStorage()->getDiskAdaptor()->writeData(data_+9, blockLength_, + offset); + } piece->completeBlock(slot.getBlockIndex()); A2_LOG_DEBUG(fmt(MSG_PIECE_BITFIELD, getCuid(), util::toHex(piece->getBitfield(), @@ -234,16 +246,37 @@ bool BtPieceMessage::checkPieceHash(const SharedHandle& piece) return piece->getDigest() == downloadContext_->getPieceHash(piece->getIndex()); } else { - int64_t offset = static_cast(piece->getIndex()) - *downloadContext_->getPieceLength(); - return message_digest::staticSHA1Digest - (getPieceStorage()->getDiskAdaptor(), offset, piece->getLength()) - == downloadContext_->getPieceHash(piece->getIndex()); + A2_LOG_DEBUG(fmt("Calculating hash index=%lu", + static_cast(piece->getIndex()))); + try { + return piece->getDigestWithWrCache(downloadContext_->getPieceLength(), + getPieceStorage()->getDiskAdaptor()) + == downloadContext_->getPieceHash(piece->getIndex()); + } catch(RecoverableException& e) { + piece->clearAllBlock(); + if(piece->getWrDiskCacheEntry()) { + piece->clearWrCache(getPieceStorage()->getWrDiskCache()); + } + throw; + } } } void BtPieceMessage::onNewPiece(const SharedHandle& piece) { + if(piece->getWrDiskCacheEntry()) { + // We flush cached data whenever an whole piece is retrieved. + piece->flushWrCache(getPieceStorage()->getWrDiskCache()); + if(piece->getWrDiskCacheEntry()->getError() != + WrDiskCacheEntry::CACHE_ERR_SUCCESS) { + piece->clearAllBlock(); + piece->clearWrCache(getPieceStorage()->getWrDiskCache()); + throw DOWNLOAD_FAILURE_EXCEPTION2 + (fmt("Write disk cache flush failure index=%lu", + static_cast(piece->getIndex())), + piece->getWrDiskCacheEntry()->getErrorCode()); + } + } A2_LOG_INFO(fmt(MSG_GOT_NEW_PIECE, getCuid(), static_cast(piece->getIndex()))); @@ -256,29 +289,14 @@ void BtPieceMessage::onWrongPiece(const SharedHandle& piece) A2_LOG_INFO(fmt(MSG_GOT_WRONG_PIECE, getCuid(), static_cast(piece->getIndex()))); - erasePieceOnDisk(piece); + if(piece->getWrDiskCacheEntry()) { + piece->clearWrCache(getPieceStorage()->getWrDiskCache()); + } piece->clearAllBlock(); piece->destroyHashContext(); getBtRequestFactory()->removeTargetPiece(piece); } -void BtPieceMessage::erasePieceOnDisk(const SharedHandle& piece) -{ - size_t BUFSIZE = 4096; - unsigned char buf[BUFSIZE]; - memset(buf, 0, BUFSIZE); - int64_t offset = - static_cast(piece->getIndex())*downloadContext_->getPieceLength(); - div_t res = div(piece->getLength(), BUFSIZE); - for(int i = 0; i < res.quot; ++i) { - getPieceStorage()->getDiskAdaptor()->writeData(buf, BUFSIZE, offset); - offset += BUFSIZE; - } - if(res.rem > 0) { - getPieceStorage()->getDiskAdaptor()->writeData(buf, res.rem, offset); - } -} - void BtPieceMessage::onChokingEvent(const BtChokingEvent& event) { if(!isInvalidate() && diff --git a/src/BtPieceMessage.h b/src/BtPieceMessage.h index 6dde2db4..adcb20ec 100644 --- a/src/BtPieceMessage.h +++ b/src/BtPieceMessage.h @@ -61,8 +61,6 @@ private: void onWrongPiece(const SharedHandle& piece); - void erasePieceOnDisk(const SharedHandle& piece); - void pushPieceData(int64_t offset, int32_t length) const; public: BtPieceMessage(size_t index = 0, int32_t begin = 0, int32_t blockLength = 0); diff --git a/src/DefaultPieceStorage.cc b/src/DefaultPieceStorage.cc index de179bad..63fba262 100644 --- a/src/DefaultPieceStorage.cc +++ b/src/DefaultPieceStorage.cc @@ -65,6 +65,8 @@ #include "bitfield.h" #include "SingletonHolder.h" #include "Notifier.h" +#include "WrDiskCache.h" +#include "RequestGroup.h" #ifdef ENABLE_BITTORRENT # include "bittorrent_helper.h" #endif // ENABLE_BITTORRENT @@ -81,7 +83,8 @@ DefaultPieceStorage::DefaultPieceStorage endGamePieceNum_(END_GAME_PIECE_NUM), option_(option), pieceStatMan_(new PieceStatMan(downloadContext->getNumPieces(), true)), - pieceSelector_(new RarestPieceSelector(pieceStatMan_)) + pieceSelector_(new RarestPieceSelector(pieceStatMan_)), + wrDiskCache_(0) { const std::string& pieceSelectorOpt = option_->get(PREF_STREAM_PIECE_SELECTOR); @@ -116,6 +119,13 @@ SharedHandle DefaultPieceStorage::checkOutPiece addUsedPiece(piece); } piece->addUser(cuid); + RequestGroup* group = downloadContext_->getOwnerRequestGroup(); + if((!group || !group->inMemoryDownload()) && + wrDiskCache_ && !piece->getWrDiskCacheEntry()) { + // So, we rely on the fact that diskAdaptor_ is not reinitialized + // in the session. + piece->initWrCache(wrDiskCache_, diskAdaptor_); + } return piece; } @@ -401,6 +411,7 @@ void DefaultPieceStorage::deleteUsedPiece(const SharedHandle& piece) return; } usedPieces_.erase(piece); + piece->releaseWrCache(wrDiskCache_); } // void DefaultPieceStorage::reduceUsedPieces(size_t upperBound) @@ -661,6 +672,29 @@ SharedHandle DefaultPieceStorage::getDiskAdaptor() { return diskAdaptor_; } +WrDiskCache* DefaultPieceStorage::getWrDiskCache() +{ + return wrDiskCache_; +} + +void DefaultPieceStorage::flushWrDiskCacheEntry() +{ + if(!wrDiskCache_) { + return; + } + // UsedPieceSet is sorted by piece index. It means we can flush + // cache by non-decreasing offset, which is good to reduce disk seek + // unless the file is heavily fragmented. + for(UsedPieceSet::const_iterator i = usedPieces_.begin(), + eoi = usedPieces_.end(); i != eoi; ++i) { + WrDiskCacheEntry* ce = (*i)->getWrDiskCacheEntry(); + if(ce) { + (*i)->flushWrCache(wrDiskCache_); + (*i)->releaseWrCache(wrDiskCache_); + } + } +} + int32_t DefaultPieceStorage::getPieceLength(size_t index) { return bitfieldMan_->getBlockLength(index); diff --git a/src/DefaultPieceStorage.h b/src/DefaultPieceStorage.h index 1dd35c29..328d0876 100644 --- a/src/DefaultPieceStorage.h +++ b/src/DefaultPieceStorage.h @@ -92,6 +92,8 @@ private: SharedHandle pieceSelector_; SharedHandle streamPieceSelector_; + + WrDiskCache* wrDiskCache_; #ifdef ENABLE_BITTORRENT void getMissingPiece (std::vector >& pieces, @@ -242,6 +244,10 @@ public: virtual SharedHandle getDiskAdaptor(); + virtual WrDiskCache* getWrDiskCache(); + + virtual void flushWrDiskCacheEntry(); + virtual int32_t getPieceLength(size_t index); virtual void advertisePiece(cuid_t cuid, size_t index); @@ -304,6 +310,11 @@ public: { return pieceSelector_; } + + void setWrDiskCache(WrDiskCache* wrDiskCache) + { + wrDiskCache_ = wrDiskCache; + } }; } // namespace aria2 diff --git a/src/DownloadCommand.cc b/src/DownloadCommand.cc index 574ed663..09a62fd3 100644 --- a/src/DownloadCommand.cc +++ b/src/DownloadCommand.cc @@ -62,6 +62,9 @@ #include "SinkStreamFilter.h" #include "FileEntry.h" #include "SocketRecvBuffer.h" +#include "Piece.h" +#include "WrDiskCacheEntry.h" +#include "DownloadFailureException.h" #ifdef ENABLE_MESSAGE_DIGEST # include "MessageDigest.h" # include "message_digest_helper.h" @@ -105,7 +108,9 @@ DownloadCommand::DownloadCommand peerStat_->downloadStart(); getSegmentMan()->registerPeerStat(peerStat_); - streamFilter_.reset(new SinkStreamFilter(pieceHashValidationEnabled_)); + WrDiskCache* wrDiskCache = getPieceStorage()->getWrDiskCache(); + streamFilter_.reset(new SinkStreamFilter(wrDiskCache, + pieceHashValidationEnabled_)); streamFilter_->init(); sinkFilterOnly_ = true; checkSocketRecvBuffer(); @@ -116,6 +121,37 @@ DownloadCommand::~DownloadCommand() { getSegmentMan()->updateFastestPeerStat(peerStat_); } +namespace { +void flushWrDiskCacheEntry(WrDiskCache* wrDiskCache, + const SharedHandle& segment) +{ + const SharedHandle& piece = segment->getPiece(); + if(piece && piece->getWrDiskCacheEntry()) { + piece->flushWrCache(wrDiskCache); + if(piece->getWrDiskCacheEntry()->getError() != + WrDiskCacheEntry::CACHE_ERR_SUCCESS) { + segment->clear(); + piece->clearWrCache(wrDiskCache); + throw DOWNLOAD_FAILURE_EXCEPTION2 + (fmt("Write disk cache flush failure index=%lu", + static_cast(piece->getIndex())), + piece->getWrDiskCacheEntry()->getErrorCode()); + } + } +} +} // namespace + +namespace { +void clearWrDiskCacheEntry(WrDiskCache* wrDiskCache, + const SharedHandle& segment) +{ + const SharedHandle& piece = segment->getPiece(); + if(piece && piece->getWrDiskCacheEntry()) { + piece->clearWrCache(wrDiskCache); + } +} +} // namespace + bool DownloadCommand::executeInternal() { if(getDownloadEngine()->getRequestGroupMan()->doesOverallDownloadSpeedExceed() || getRequestGroup()->doesDownloadSpeedExceed()) { @@ -218,6 +254,7 @@ bool DownloadCommand::executeInternal() { // completed. A2_LOG_INFO(fmt(MSG_SEGMENT_DOWNLOAD_COMPLETED, getCuid())); + #ifdef ENABLE_MESSAGE_DIGEST { @@ -226,6 +263,7 @@ bool DownloadCommand::executeInternal() { if(pieceHashValidationEnabled_ && !expectedPieceHash.empty()) { if( #ifdef ENABLE_BITTORRENT + // TODO Is this necessary? (!getPieceStorage()->isEndGame() || !getDownloadContext()->hasAttribute(CTX_ATTR_BT)) && #endif // ENABLE_BITTORRENT @@ -235,22 +273,26 @@ bool DownloadCommand::executeInternal() { validatePieceHash (segment, expectedPieceHash, segment->getDigest()); } else { - messageDigest_->reset(); - validatePieceHash - (segment, expectedPieceHash, - message_digest::digest - (messageDigest_, - getPieceStorage()->getDiskAdaptor(), - segment->getPosition(), - segment->getLength())); + try { + std::string actualHash = + segment->getPiece()->getDigestWithWrCache + (segment->getSegmentLength(), diskAdaptor); + validatePieceHash(segment, expectedPieceHash, actualHash); + } catch(RecoverableException& e) { + segment->clear(); + clearWrDiskCacheEntry(getPieceStorage()->getWrDiskCache(), + segment); + getSegmentMan()->cancelSegment(getCuid()); + throw; + } } } else { - getSegmentMan()->completeSegment(getCuid(), segment); + completeSegment(getCuid(), segment); } } #else // !ENABLE_MESSAGE_DIGEST - getSegmentMan()->completeSegment(getCuid(), segment); + completeSegment(getCuid(), segment); #endif // !ENABLE_MESSAGE_DIGEST } else { // If segment is not canceled here, in the next pipelining @@ -357,7 +399,7 @@ void DownloadCommand::validatePieceHash(const SharedHandle& segment, { if(actualHash == expectedHash) { A2_LOG_INFO(fmt(MSG_GOOD_CHUNK_CHECKSUM, util::toHex(actualHash).c_str())); - getSegmentMan()->completeSegment(getCuid(), segment); + completeSegment(getCuid(), segment); } else { A2_LOG_INFO(fmt(EX_INVALID_CHUNK_CHECKSUM, static_cast(segment->getIndex()), @@ -365,6 +407,7 @@ void DownloadCommand::validatePieceHash(const SharedHandle& segment, util::toHex(expectedHash).c_str(), util::toHex(actualHash).c_str())); segment->clear(); + clearWrDiskCacheEntry(getPieceStorage()->getWrDiskCache(), segment); getSegmentMan()->cancelSegment(getCuid()); throw DL_RETRY_EX (fmt("Invalid checksum index=%lu", @@ -374,6 +417,13 @@ void DownloadCommand::validatePieceHash(const SharedHandle& segment, #endif // ENABLE_MESSAGE_DIGEST +void DownloadCommand::completeSegment(cuid_t cuid, + const SharedHandle& segment) +{ + flushWrDiskCacheEntry(getPieceStorage()->getWrDiskCache(), segment); + getSegmentMan()->completeSegment(cuid, segment); +} + void DownloadCommand::installStreamFilter (const SharedHandle& streamFilter) { diff --git a/src/DownloadCommand.h b/src/DownloadCommand.h index e2bc374f..544da2a6 100644 --- a/src/DownloadCommand.h +++ b/src/DownloadCommand.h @@ -67,6 +67,8 @@ private: void checkLowestDownloadSpeed() const; + void completeSegment(cuid_t cuid, const SharedHandle& segment); + SharedHandle streamFilter_; bool sinkFilterOnly_; diff --git a/src/DownloadEngineFactory.cc b/src/DownloadEngineFactory.cc index 0aba1494..ee328c5e 100644 --- a/src/DownloadEngineFactory.cc +++ b/src/DownloadEngineFactory.cc @@ -137,6 +137,7 @@ DownloadEngineFactory::newDownloadEngine SharedHandle requestGroupMan(new RequestGroupMan(requestGroups, MAX_CONCURRENT_DOWNLOADS, op)); + requestGroupMan->initWrDiskCache(); e->setRequestGroupMan(requestGroupMan); e->setFileAllocationMan (SharedHandle(new FileAllocationMan())); diff --git a/src/Makefile.am b/src/Makefile.am index 8408ad8a..2e9217ef 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -240,7 +240,9 @@ SRCS = SocketCore.cc SocketCore.h\ Notifier.cc Notifier.h\ ValueBaseDiskWriter.h\ AnonDiskWriterFactory.h\ - XmlRpcRequestParserController.cc XmlRpcRequestParserController.h + XmlRpcRequestParserController.cc XmlRpcRequestParserController.h\ + WrDiskCache.cc WrDiskCache.h\ + WrDiskCacheEntry.cc WrDiskCacheEntry.h if MINGW_BUILD SRCS += WinConsoleFile.cc WinConsoleFile.h diff --git a/src/OptionHandlerFactory.cc b/src/OptionHandlerFactory.cc index d599bc0a..79e1952e 100644 --- a/src/OptionHandlerFactory.cc +++ b/src/OptionHandlerFactory.cc @@ -204,6 +204,15 @@ std::vector OptionHandlerFactory::createOptionHandlers() op->addTag(TAG_ADVANCED); handlers.push_back(op); } + { + OptionHandler* op(new UnitNumberOptionHandler + (PREF_DISK_CACHE, + TEXT_DISK_CACHE, + "0", + 0)); + op->addTag(TAG_ADVANCED); + handlers.push_back(op); + } { OptionHandler* op(new BooleanOptionHandler (PREF_DEFERRED_INPUT, diff --git a/src/Piece.cc b/src/Piece.cc index 283a34ee..1f7e4596 100644 --- a/src/Piece.cc +++ b/src/Piece.cc @@ -38,6 +38,11 @@ #include "A2STR.h" #include "util.h" #include "a2functional.h" +#include "WrDiskCache.h" +#include "WrDiskCacheEntry.h" +#include "LogFactory.h" +#include "fmt.h" +#include "DiskAdaptor.h" #ifdef ENABLE_MESSAGE_DIGEST # include "MessageDigest.h" #endif // ENABLE_MESSAGE_DIGEST @@ -45,7 +50,7 @@ namespace aria2 { Piece::Piece():index_(0), length_(0), blockLength_(BLOCK_LENGTH), bitfield_(0), - usedBySegment_(false) + usedBySegment_(false), wrCache_(0) #ifdef ENABLE_MESSAGE_DIGEST , nextBegin_(0) #endif // ENABLE_MESSAGE_DIGEST @@ -56,7 +61,7 @@ Piece::Piece(size_t index, int32_t length, int32_t blockLength) length_(length), blockLength_(blockLength), bitfield_(new BitfieldMan(blockLength_, length)), - usedBySegment_(false) + usedBySegment_(false), wrCache_(0) #ifdef ENABLE_MESSAGE_DIGEST ,nextBegin_(0) #endif // ENABLE_MESSAGE_DIGEST @@ -64,6 +69,7 @@ Piece::Piece(size_t index, int32_t length, int32_t blockLength) Piece::~Piece() { + delete wrCache_; delete bitfield_; } @@ -232,6 +238,56 @@ std::string Piece::getDigest() } } +namespace { +void updateHashWithRead(const SharedHandle& mdctx, + const SharedHandle& adaptor, + int64_t offset, size_t len) +{ + const size_t BUFSIZE = 4096; + unsigned char buf[BUFSIZE]; + ldiv_t res = ldiv(len, BUFSIZE); + for(int j = 0; j < res.quot; ++j) { + ssize_t nread = adaptor->readData(buf, BUFSIZE, offset); + if((size_t)nread != BUFSIZE) { + throw DL_ABORT_EX(fmt(EX_FILE_READ, "n/a", "data is too short")); + } + mdctx->update(buf, nread); + offset += nread; + } + if(res.rem) { + ssize_t nread = adaptor->readData(buf, res.rem, offset); + if(nread != res.rem) { + throw DL_ABORT_EX(fmt(EX_FILE_READ, "n/a", "data is too short")); + } + mdctx->update(buf, nread); + offset += nread; + } +} +} // namespace + +std::string Piece::getDigestWithWrCache +(size_t pieceLength, const SharedHandle& adaptor) +{ + SharedHandle mdctx(MessageDigest::create(hashType_)); + int64_t start = static_cast(index_)*pieceLength; + int64_t goff = start; + if(wrCache_) { + const WrDiskCacheEntry::DataCellSet& dataSet = wrCache_->getDataSet(); + for(WrDiskCacheEntry::DataCellSet::iterator i = dataSet.begin(), + eoi = dataSet.end(); i != eoi; ++i) { + if(goff < (*i)->goff) { + updateHashWithRead(mdctx, adaptor, goff, (*i)->goff - goff); + } + mdctx->update((*i)->data+(*i)->offset, (*i)->len); + goff = (*i)->goff + (*i)->len; + } + updateHashWithRead(mdctx, adaptor, goff, start+length_-goff); + } else { + updateHashWithRead(mdctx, adaptor, goff, length_); + } + return mdctx->digest(); +} + void Piece::destroyHashContext() { mdctx_.reset(); @@ -257,4 +313,55 @@ void Piece::removeUser(cuid_t cuid) users_.erase(std::remove(users_.begin(), users_.end(), cuid), users_.end()); } +void Piece::initWrCache(WrDiskCache* diskCache, + const SharedHandle& diskAdaptor) +{ + assert(wrCache_ == 0); + wrCache_ = new WrDiskCacheEntry(diskAdaptor); + bool rv = diskCache->add(wrCache_); + assert(rv); +} + +void Piece::flushWrCache(WrDiskCache* diskCache) +{ + assert(wrCache_); + ssize_t size = static_cast(wrCache_->getSize()); + diskCache->update(wrCache_, -size); + wrCache_->writeToDisk(); +} + +void Piece::clearWrCache(WrDiskCache* diskCache) +{ + assert(wrCache_); + ssize_t size = static_cast(wrCache_->getSize()); + diskCache->update(wrCache_, -size); + wrCache_->clear(); +} + +void Piece::updateWrCache(WrDiskCache* diskCache, unsigned char* data, + size_t offset, size_t len, int64_t goff) +{ + A2_LOG_DEBUG(fmt("updateWrCache entry=%p", wrCache_)); + assert(wrCache_); + WrDiskCacheEntry::DataCell* cell = new WrDiskCacheEntry::DataCell(); + cell->goff = goff; + cell->data = data; + cell->offset = offset; + cell->len = len; + bool rv; + rv = wrCache_->cacheData(cell); + assert(rv); + rv = diskCache->update(wrCache_, len); + assert(rv); +} + +void Piece::releaseWrCache(WrDiskCache* diskCache) +{ + if(wrCache_) { + diskCache->remove(wrCache_); + delete wrCache_; + wrCache_ = 0; + } +} + } // namespace aria2 diff --git a/src/Piece.h b/src/Piece.h index 65fa4fca..9905d7d7 100644 --- a/src/Piece.h +++ b/src/Piece.h @@ -47,6 +47,9 @@ namespace aria2 { class BitfieldMan; +class WrDiskCache; +class WrDiskCacheEntry; +class DiskAdaptor; #ifdef ENABLE_MESSAGE_DIGEST @@ -62,6 +65,7 @@ private: BitfieldMan* bitfield_; std::vector users_; bool usedBySegment_; + WrDiskCacheEntry* wrCache_; #ifdef ENABLE_MESSAGE_DIGEST int32_t nextBegin_; @@ -172,6 +176,10 @@ public: void destroyHashContext(); + // Returns raw hash value, not hex digest, which is calculated using + // cached data and data on disk. + std::string getDigestWithWrCache(size_t pieceLength, + const SharedHandle& adaptor); #endif // ENABLE_MESSAGE_DIGEST /** @@ -194,6 +202,18 @@ public: { usedBySegment_ = f; } + + void initWrCache(WrDiskCache* diskCache, + const SharedHandle& diskAdaptor); + void flushWrCache(WrDiskCache* diskCache); + void clearWrCache(WrDiskCache* diskCache); + void updateWrCache(WrDiskCache* diskCache, unsigned char* data, + size_t offset, size_t len, int64_t goff); + void releaseWrCache(WrDiskCache* diskCache); + WrDiskCacheEntry* getWrDiskCacheEntry() const + { + return wrCache_; + } }; } // namespace aria2 diff --git a/src/PieceStorage.h b/src/PieceStorage.h index d6d99978..14b7160a 100644 --- a/src/PieceStorage.h +++ b/src/PieceStorage.h @@ -51,6 +51,7 @@ class Piece; class Peer; #endif // ENABLE_BITTORRENT class DiskAdaptor; +class WrDiskCache; class PieceStorage { public: @@ -228,6 +229,11 @@ public: virtual SharedHandle getDiskAdaptor() = 0; + virtual WrDiskCache* getWrDiskCache() = 0; + + // Flushes write disk cache for in-flight piece and evicts them. + virtual void flushWrDiskCacheEntry() = 0; + virtual int32_t getPieceLength(size_t index) = 0; /** diff --git a/src/RequestGroup.cc b/src/RequestGroup.cc index 02154f52..c296c321 100644 --- a/src/RequestGroup.cc +++ b/src/RequestGroup.cc @@ -210,6 +210,7 @@ error_code::Value RequestGroup::downloadResult() const void RequestGroup::closeFile() { if(pieceStorage_) { + pieceStorage_->flushWrDiskCacheEntry(); pieceStorage_->getDiskAdaptor()->closeFile(); } } @@ -621,6 +622,9 @@ void RequestGroup::initPieceStorage() new DefaultPieceStorage(downloadContext_, option_.get()); SharedHandle psHolder(ps); #endif // !ENABLE_BITTORRENT + if(requestGroupMan_) { + ps->setWrDiskCache(requestGroupMan_->getWrDiskCache()); + } if(diskWriterFactory_) { ps->setDiskWriterFactory(diskWriterFactory_); } diff --git a/src/RequestGroupMan.cc b/src/RequestGroupMan.cc index ab54dff5..83914ea9 100644 --- a/src/RequestGroupMan.cc +++ b/src/RequestGroupMan.cc @@ -78,6 +78,7 @@ #include "SingletonHolder.h" #include "Notifier.h" #include "PeerStat.h" +#include "WrDiskCache.h" #ifdef ENABLE_BITTORRENT # include "bittorrent_helper.h" #endif // ENABLE_BITTORRENT @@ -100,12 +101,16 @@ RequestGroupMan::RequestGroupMan queueCheck_(true), removedErrorResult_(0), removedLastErrorResult_(error_code::FINISHED), - maxDownloadResult_(option->getAsInt(PREF_MAX_DOWNLOAD_RESULT)) + maxDownloadResult_(option->getAsInt(PREF_MAX_DOWNLOAD_RESULT)), + wrDiskCache_(0) { addRequestGroupIndex(requestGroups); } -RequestGroupMan::~RequestGroupMan() {} +RequestGroupMan::~RequestGroupMan() +{ + delete wrDiskCache_; +} bool RequestGroupMan::downloadFinished() { @@ -620,8 +625,8 @@ void RequestGroupMan::fillRequestGroupFromReserver(DownloadEngine* e) // reference. groupToAdd->dropPieceStorage(); configureRequestGroup(groupToAdd); - createInitialCommand(groupToAdd, commands, e); groupToAdd->setRequestGroupMan(this); + createInitialCommand(groupToAdd, commands, e); if(commands.empty()) { requestQueueCheck(); } @@ -1087,4 +1092,13 @@ void RequestGroupMan::setUriListParser uriListParser_ = uriListParser; } +void RequestGroupMan::initWrDiskCache() +{ + assert(wrDiskCache_ == 0); + size_t limit = option_->getAsInt(PREF_DISK_CACHE); + if(limit > 0) { + wrDiskCache_ = new WrDiskCache(limit); + } +} + } // namespace aria2 diff --git a/src/RequestGroupMan.h b/src/RequestGroupMan.h index 9eee73fe..91fa8463 100644 --- a/src/RequestGroupMan.h +++ b/src/RequestGroupMan.h @@ -58,6 +58,7 @@ class ServerStat; class Option; class OutputFile; class UriListParser; +class WrDiskCache; class RequestGroupMan { private: @@ -95,6 +96,8 @@ private: // UriListParser for deferred input. SharedHandle uriListParser_; + WrDiskCache* wrDiskCache_; + void formatDownloadResultFull (OutputFile& out, const char* status, @@ -341,6 +344,15 @@ public: { return netStat_; } + + WrDiskCache* getWrDiskCache() const + { + return wrDiskCache_; + } + + // Initializes WrDiskCache according to PREF_DISK_CACHE option. If + // its value is 0, cache storage will not be initialized. + void initWrDiskCache(); }; } // namespace aria2 diff --git a/src/SinkStreamFilter.cc b/src/SinkStreamFilter.cc index 089cf37e..a5e4566b 100644 --- a/src/SinkStreamFilter.cc +++ b/src/SinkStreamFilter.cc @@ -33,14 +33,20 @@ */ /* copyright --> */ #include "SinkStreamFilter.h" + +#include + #include "BinaryStream.h" #include "Segment.h" +#include "WrDiskCache.h" +#include "Piece.h" namespace aria2 { const std::string SinkStreamFilter::NAME("SinkStreamFilter"); -SinkStreamFilter::SinkStreamFilter(bool hashUpdate): +SinkStreamFilter::SinkStreamFilter(WrDiskCache* wrDiskCache, bool hashUpdate): + wrDiskCache_(wrDiskCache), hashUpdate_(hashUpdate), bytesProcessed_(0) {} @@ -60,7 +66,16 @@ ssize_t SinkStreamFilter::transform } else { wlen = inlen; } - out->writeData(inbuf, wlen, segment->getPositionToWrite()); + const SharedHandle& piece = segment->getPiece(); + if(piece && piece->getWrDiskCacheEntry()) { + assert(wrDiskCache_); + unsigned char* dataCopy = new unsigned char[wlen]; + memcpy(dataCopy, inbuf, wlen); + piece->updateWrCache(wrDiskCache_, dataCopy, 0, wlen, + segment->getPositionToWrite()); + } else { + out->writeData(inbuf, wlen, segment->getPositionToWrite()); + } #ifdef ENABLE_MESSAGE_DIGEST if(hashUpdate_) { segment->updateHash(segment->getWrittenLength(), inbuf, wlen); diff --git a/src/SinkStreamFilter.h b/src/SinkStreamFilter.h index c8dcc76f..908489a9 100644 --- a/src/SinkStreamFilter.h +++ b/src/SinkStreamFilter.h @@ -39,13 +39,15 @@ namespace aria2 { +class WrDiskCache; + class SinkStreamFilter:public StreamFilter { private: + WrDiskCache* wrDiskCache_; bool hashUpdate_; - size_t bytesProcessed_; public: - SinkStreamFilter(bool hashUpdate = false); + SinkStreamFilter(WrDiskCache* wrDiskCache = 0, bool hashUpdate = false); virtual void init() {} diff --git a/src/UnknownLengthPieceStorage.h b/src/UnknownLengthPieceStorage.h index f7a64e54..643395a7 100644 --- a/src/UnknownLengthPieceStorage.h +++ b/src/UnknownLengthPieceStorage.h @@ -231,6 +231,10 @@ public: virtual SharedHandle getDiskAdaptor(); + virtual WrDiskCache* getWrDiskCache() { return 0; } + + virtual void flushWrDiskCacheEntry() {} + virtual int32_t getPieceLength(size_t index); /** diff --git a/src/WrDiskCache.cc b/src/WrDiskCache.cc new file mode 100644 index 00000000..79af80e9 --- /dev/null +++ b/src/WrDiskCache.cc @@ -0,0 +1,129 @@ +/* */ +#include "WrDiskCache.h" +#include "WrDiskCacheEntry.h" +#include "LogFactory.h" +#include "fmt.h" + +namespace aria2 { + +WrDiskCache::WrDiskCache(size_t limit) + : limit_(limit), + total_(0), + clock_(0) +{} + +WrDiskCache::~WrDiskCache() +{ + if(total_) { + A2_LOG_WARN(fmt("Write disk cache is not empty size=%lu", + static_cast(total_))); + } +} + +bool WrDiskCache::add(WrDiskCacheEntry* ent) +{ + ent->setSizeKey(ent->getSize()); + ent->setLastUpdate(++clock_); + std::pair rv = set_.insert(ent); + if(rv.second) { + total_ += ent->getSize(); + ensureLimit(); + return true; + } else { + A2_LOG_WARN(fmt("Found duplicate cache entry a.{size=%lu,clock=%"PRId64 + "} b{size=%lu,clock=%"PRId64"}", + static_cast((*rv.first)->getSize()), + (*rv.first)->getLastUpdate(), + static_cast(ent->getSize()), + ent->getLastUpdate())); + return false; + } +} + +bool WrDiskCache::remove(WrDiskCacheEntry* ent) +{ + if(set_.erase(ent)) { + A2_LOG_DEBUG(fmt("Removed cache entry size=%lu, clock=%"PRId64, + static_cast(ent->getSize()), + ent->getLastUpdate())); + total_ -= ent->getSize(); + return true; + } else { + return false; + } +} + +bool WrDiskCache::update(WrDiskCacheEntry* ent, ssize_t delta) +{ + if(!set_.erase(ent)) { + return false; + } + A2_LOG_DEBUG(fmt("Update cache entry size=%lu, delta=%ld, clock=%"PRId64, + static_cast(ent->getSize()), + static_cast(delta), + ent->getLastUpdate())); + + ent->setSizeKey(ent->getSize()); + ent->setLastUpdate(++clock_); + set_.insert(ent); + + if(delta < 0) { + assert(total_ >= static_cast(-delta)); + } + total_ += delta; + ensureLimit(); + return true; +} + +void WrDiskCache::ensureLimit() +{ + while(total_ > limit_) { + EntrySet::iterator i = set_.begin(); + total_ -= (*i)->getSize(); + (*i)->writeToDisk(); + WrDiskCacheEntry* ent = *i; + A2_LOG_DEBUG(fmt("Force flush cache entry size=%lu, clock=%"PRId64, + static_cast(ent->getSizeKey()), + ent->getLastUpdate())); + set_.erase(i); + + ent->setSizeKey(ent->getSize()); + ent->setLastUpdate(++clock_); + set_.insert(ent); + } +} + +} // namespace aria2 diff --git a/src/WrDiskCache.h b/src/WrDiskCache.h new file mode 100644 index 00000000..08fee829 --- /dev/null +++ b/src/WrDiskCache.h @@ -0,0 +1,83 @@ +/* */ +#ifndef D_WR_DISK_CACHE_H +#define D_WR_DISK_CACHE_H + +#include "common.h" + +#include + +#include "a2functional.h" + +namespace aria2 { + +class WrDiskCacheEntry; + +class WrDiskCache { +public: + WrDiskCache(size_t limit); + ~WrDiskCache(); + // Adds the cache entry |ent| to the storage. The size of cached + // data of ent is added to total_. + bool add(WrDiskCacheEntry* ent); + // Removes the cache entry |ent| from the stroage. The size of + // cached data of ent is subtracted from total_. + bool remove(WrDiskCacheEntry* ent); + // Updates the already added entry |ent|. The |delta| means how many + // bytes is increased in this update. If the size is reduced, use + // negative value. + bool update(WrDiskCacheEntry* ent, ssize_t delta); + // Evicts entries from storage so that total size of cache is kept + // under the limit. + void ensureLimit(); + size_t getSize() const + { + return total_; + } +private: + typedef std::set > EntrySet; + // Maximum number of bytes the storage can cache. + size_t limit_; + // Current number of bytes cached. + size_t total_; + EntrySet set_; + int64_t clock_; +}; + +} // namespace aria2 + +#endif // D_WR_DISK_CACHE_H + diff --git a/src/WrDiskCacheEntry.cc b/src/WrDiskCacheEntry.cc new file mode 100644 index 00000000..06713669 --- /dev/null +++ b/src/WrDiskCacheEntry.cc @@ -0,0 +1,110 @@ +/* */ +#include "WrDiskCacheEntry.h" +#include "DiskAdaptor.h" +#include "RecoverableException.h" +#include "DownloadFailureException.h" +#include "LogFactory.h" +#include "fmt.h" + +namespace aria2 { + +WrDiskCacheEntry::WrDiskCacheEntry +(const SharedHandle& diskAdaptor) + : sizeKey_(0), + lastUpdate_(0), + size_(0), + error_(CACHE_ERR_SUCCESS), + errorCode_(error_code::UNDEFINED), + diskAdaptor_(diskAdaptor) +{} + +WrDiskCacheEntry::~WrDiskCacheEntry() +{ + if(!set_.empty()) { + A2_LOG_WARN(fmt("WrDiskCacheEntry is not empty size=%lu", + static_cast(size_))); + } + deleteDataCells(); +} + +void WrDiskCacheEntry::deleteDataCells() +{ + for(DataCellSet::iterator i = set_.begin(), eoi = set_.end(); i != eoi; + ++i) { + delete [] (*i)->data; + delete *i; + } + set_.clear(); + size_ = 0; +} + +void WrDiskCacheEntry::writeToDisk() +{ + DataCellSet::iterator i = set_.begin(), eoi = set_.end(); + try { + for(; i != eoi; ++i) { + A2_LOG_DEBUG(fmt("WrDiskCacheEntry flush goff=%"PRId64", len=%lu", + (*i)->goff, static_cast((*i)->len))); + diskAdaptor_->writeData((*i)->data+(*i)->offset, (*i)->len, + (*i)->goff); + } + } catch(RecoverableException& e) { + A2_LOG_ERROR(fmt("WrDiskCacheEntry flush error goff=%"PRId64", len=%lu", + (*i)->goff, static_cast((*i)->len))); + error_ = CACHE_ERR_ERROR; + errorCode_ = e.getErrorCode(); + } + deleteDataCells(); +} + +void WrDiskCacheEntry::clear() +{ + deleteDataCells(); +} + +bool WrDiskCacheEntry::cacheData(DataCell* dataCell) +{ + A2_LOG_DEBUG(fmt("WrDiskCacheEntry cache goff=%"PRId64", len=%lu", + dataCell->goff, static_cast(dataCell->len))); + if(set_.insert(dataCell).second) { + size_ += dataCell->len; + return true; + } else { + return false; + } +} + +} // namespace aria2 diff --git a/src/WrDiskCacheEntry.h b/src/WrDiskCacheEntry.h new file mode 100644 index 00000000..b3b31f6b --- /dev/null +++ b/src/WrDiskCacheEntry.h @@ -0,0 +1,142 @@ +/* */ +#ifndef D_WR_DISK_CACHE_ENTRY_H +#define D_WR_DISK_CACHE_ENTRY_H + +#include "common.h" + +#include + +#include "SharedHandle.h" +#include "a2functional.h" +#include "error_code.h" + +namespace aria2 { + +class DiskAdaptor; +class WrDiskCache; + +class WrDiskCacheEntry { +public: + struct DataCell { + // Where the data is going to be put in DiskAdaptor + int64_t goff; + // data must be len+offset bytes. Thus, the cached data is + // [data+offset, data+offset+len). + unsigned char *data; + size_t offset; + size_t len; + bool operator<(const DataCell& rhs) const + { + return goff < rhs.goff; + } + }; + + typedef std::set > DataCellSet; + + WrDiskCacheEntry(const SharedHandle& diskAdaptor); + ~WrDiskCacheEntry(); + + // Flushes the cached data to the disk and deletes them. + void writeToDisk(); + // Deletes cached data without flushing to the disk. + void clear(); + + // Caches |dataCell| + bool cacheData(DataCell* dataCell); + size_t getSize() const + { + return size_; + } + void setSizeKey(size_t sizeKey) + { + sizeKey_ = sizeKey; + } + size_t getSizeKey() const + { + return sizeKey_; + } + void setLastUpdate(int64_t clock) + { + lastUpdate_ = clock; + } + int64_t getLastUpdate() const + { + return lastUpdate_; + } + bool operator<(const WrDiskCacheEntry& rhs) const + { + return sizeKey_ > rhs.sizeKey_ || + (sizeKey_ == rhs.sizeKey_ && lastUpdate_ < rhs.lastUpdate_); + } + + enum { + CACHE_ERR_SUCCESS, + CACHE_ERR_ERROR + }; + + int getError() const + { + return error_; + } + error_code::Value getErrorCode() const + { + return errorCode_; + } + + const DataCellSet& getDataSet() const + { + return set_; + } +private: + void deleteDataCells(); + + size_t sizeKey_; + int64_t lastUpdate_; + + size_t size_; + + DataCellSet set_; + + int error_; + error_code::Value errorCode_; + + SharedHandle diskAdaptor_; +}; + +} // namespace aria2 + +#endif // D_WR_DISK_CACHE_ENTRY_H + diff --git a/src/prefs.cc b/src/prefs.cc index ad369015..6675de2e 100644 --- a/src/prefs.cc +++ b/src/prefs.cc @@ -338,6 +338,8 @@ const Pref* PREF_STOP_WITH_PROCESS = makePref("stop-with-process"); const Pref* PREF_ENABLE_MMAP = makePref("enable-mmap"); // value: true | false const Pref* PREF_FORCE_SAVE = makePref("force-save"); +// value: 1*digit +const Pref* PREF_DISK_CACHE = makePref("disk-cache"); /** * FTP related preferences diff --git a/src/prefs.h b/src/prefs.h index f9928c70..01ed3397 100644 --- a/src/prefs.h +++ b/src/prefs.h @@ -281,6 +281,8 @@ extern const Pref* PREF_STOP_WITH_PROCESS; extern const Pref* PREF_ENABLE_MMAP; // value: true | false extern const Pref* PREF_FORCE_SAVE; +// value: 1*digit +extern const Pref* PREF_DISK_CACHE; /** * FTP related preferences diff --git a/src/usage_text.h b/src/usage_text.h index 0d966fd5..7d0d5bcf 100644 --- a/src/usage_text.h +++ b/src/usage_text.h @@ -914,3 +914,15 @@ " if the download is completed or removed. This\n" \ " may be useful to save BitTorrent seeding which\n" \ " is recognized as completed state.") +#define TEXT_DISK_CACHE \ + _(" --disk-cache=SIZE Enable disk cache. If SIZE is 0, the disk cache\n" \ + " is disabled. This feature caches the downloaded\n" \ + " data in memory, which grows to at most SIZE\n" \ + " bytes. The cache storage is created for aria2\n" \ + " instance and shared by all downloads. The one\n" \ + " advantage of the disk cache is reduce the disk\n" \ + " seek time because the data is written in larger\n" \ + " unit and it is reordered by the offset of the\n" \ + " file. If the underlying file is heavily\n" \ + " fragmented it is not the case.\n" \ + " SIZE can include K or M(1K = 1024, 1M = 1024K).") diff --git a/test/Makefile.am b/test/Makefile.am index 711a186d..0a69c791 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -85,7 +85,9 @@ aria2c_SOURCES = AllTest.cc\ ParamedStringTest.cc\ RpcHelperTest.cc\ AbstractCommandTest.cc\ - SinkStreamFilterTest.cc + SinkStreamFilterTest.cc\ + WrDiskCacheTest.cc\ + WrDiskCacheEntryTest.cc if ENABLE_XML_RPC aria2c_SOURCES += XmlRpcRequestParserControllerTest.cc diff --git a/test/MockPieceStorage.h b/test/MockPieceStorage.h index 31e51449..16f480ad 100644 --- a/test/MockPieceStorage.h +++ b/test/MockPieceStorage.h @@ -226,6 +226,12 @@ public: return diskAdaptor; } + virtual WrDiskCache* getWrDiskCache() { + return 0; + } + + virtual void flushWrDiskCacheEntry() {} + void setDiskAdaptor(const SharedHandle& adaptor) { this->diskAdaptor = adaptor; } diff --git a/test/PieceTest.cc b/test/PieceTest.cc index 7a07fa1d..bc594b17 100644 --- a/test/PieceTest.cc +++ b/test/PieceTest.cc @@ -5,6 +5,9 @@ #include #include "util.h" +#include "DirectDiskAdaptor.h" +#include "ByteArrayDiskWriter.h" +#include "WrDiskCache.h" namespace aria2 { @@ -13,24 +16,33 @@ class PieceTest:public CppUnit::TestFixture { CPPUNIT_TEST_SUITE(PieceTest); CPPUNIT_TEST(testCompleteBlock); CPPUNIT_TEST(testGetCompletedLength); - + CPPUNIT_TEST(testFlushWrCache); #ifdef ENABLE_MESSAGE_DIGEST + CPPUNIT_TEST(testGetDigestWithWrCache); CPPUNIT_TEST(testUpdateHash); #endif // ENABLE_MESSAGE_DIGEST CPPUNIT_TEST_SUITE_END(); private: - + SharedHandle adaptor_; + SharedHandle writer_; public: - void setUp() {} + void setUp() + { + adaptor_.reset(new DirectDiskAdaptor()); + writer_.reset(new ByteArrayDiskWriter()); + adaptor_->setDiskWriter(writer_); + } void testCompleteBlock(); void testGetCompletedLength(); + void testFlushWrCache(); #ifdef ENABLE_MESSAGE_DIGEST + void testGetDigestWithWrCache(); void testUpdateHash(); #endif // ENABLE_MESSAGE_DIGEST @@ -62,8 +74,58 @@ void PieceTest::testGetCompletedLength() CPPUNIT_ASSERT_EQUAL(blockLength*3+100, p.getCompletedLength()); } +void PieceTest::testFlushWrCache() +{ + unsigned char* data; + Piece p(0, 1024); + WrDiskCache dc(64); + p.initWrCache(&dc, adaptor_); + data = new unsigned char[3]; + memcpy(data, "foo", 3); + p.updateWrCache(&dc, data, 0, 3, 0); + data = new unsigned char[4]; + memcpy(data, " bar", 4); + p.updateWrCache(&dc, data, 0, 4, 3); + p.flushWrCache(&dc); + + CPPUNIT_ASSERT_EQUAL(std::string("foo bar"), writer_->getString()); + + data = new unsigned char[3]; + memcpy(data, "foo", 3); + p.updateWrCache(&dc, data, 0, 3, 0); + CPPUNIT_ASSERT_EQUAL((size_t)3, dc.getSize()); + p.clearWrCache(&dc); + CPPUNIT_ASSERT_EQUAL((size_t)0, dc.getSize()); + p.releaseWrCache(&dc); + CPPUNIT_ASSERT(!p.getWrDiskCacheEntry()); +} + #ifdef ENABLE_MESSAGE_DIGEST +void PieceTest::testGetDigestWithWrCache() +{ + unsigned char* data; + Piece p(0, 26); + p.setHashType("sha-1"); + WrDiskCache dc(64); + // 012345678901234567890123456 + writer_->setString("abcde...ijklmnopq...uvwx.z"); + p.initWrCache(&dc, adaptor_); + data = new unsigned char[3]; + memcpy(data, "fgh", 3); + p.updateWrCache(&dc, data, 0, 3, 5); + data = new unsigned char[3]; + memcpy(data, "rst", 3); + p.updateWrCache(&dc, data, 0, 3, 17); + data = new unsigned char[1]; + memcpy(data, "y", 1); + p.updateWrCache(&dc, data, 0, 1, 24); + + CPPUNIT_ASSERT_EQUAL + (std::string("32d10c7b8cf96570ca04ce37f2a19d84240d3a89"), + util::toHex(p.getDigestWithWrCache(p.getLength(), adaptor_))); +} + void PieceTest::testUpdateHash() { Piece p(0, 16, 2*1024*1024); diff --git a/test/SinkStreamFilterTest.cc b/test/SinkStreamFilterTest.cc index 91bc6bfc..3b8bf83b 100644 --- a/test/SinkStreamFilterTest.cc +++ b/test/SinkStreamFilterTest.cc @@ -54,9 +54,7 @@ public: void setUp() { writer_.reset(new ByteArrayDiskWriter()); - sinkFilter_.reset(new SinkStreamFilter()); - filter_.reset(new SinkStreamFilter(sinkFilter_)); - sinkFilter_->init(); + filter_.reset(new SinkStreamFilter()); filter_->init(); segment_.reset(new MockSegment2(16)); } diff --git a/test/TestUtil.cc b/test/TestUtil.cc index 6dd5d811..62424c6f 100644 --- a/test/TestUtil.cc +++ b/test/TestUtil.cc @@ -91,4 +91,18 @@ std::string fileHexDigest } #endif // ENABLE_MESSAGE_DIGEST +WrDiskCacheEntry::DataCell* createDataCell(int64_t goff, + const char* data, + size_t offset) +{ + WrDiskCacheEntry::DataCell* cell = new WrDiskCacheEntry::DataCell(); + cell->goff = goff; + size_t len = strlen(data); + cell->data = new unsigned char[len]; + memcpy(cell->data, data, len); + cell->offset = offset; + cell->len = len; + return cell; +} + } // namespace aria2 diff --git a/test/TestUtil.h b/test/TestUtil.h index 9dc66b87..0eaf330c 100644 --- a/test/TestUtil.h +++ b/test/TestUtil.h @@ -4,6 +4,7 @@ #include "SharedHandle.h" #include "Cookie.h" +#include "WrDiskCacheEntry.h" namespace aria2 { @@ -50,4 +51,8 @@ std::string fileHexDigest (const SharedHandle& ctx, const std::string& filename); #endif // ENABLE_MESSAGE_DIGEST +WrDiskCacheEntry::DataCell* createDataCell(int64_t goff, + const char* data, + size_t offset = 0); + } // namespace aria2 diff --git a/test/WrDiskCacheEntryTest.cc b/test/WrDiskCacheEntryTest.cc new file mode 100644 index 00000000..ad1c894b --- /dev/null +++ b/test/WrDiskCacheEntryTest.cc @@ -0,0 +1,55 @@ +#include "WrDiskCacheEntry.h" + +#include + +#include + +#include "TestUtil.h" +#include "DirectDiskAdaptor.h" +#include "ByteArrayDiskWriter.h" + +namespace aria2 { + +class WrDiskCacheEntryTest:public CppUnit::TestFixture { + + CPPUNIT_TEST_SUITE(WrDiskCacheEntryTest); + CPPUNIT_TEST(testWriteToDisk); + CPPUNIT_TEST(testClear); + CPPUNIT_TEST_SUITE_END(); + + SharedHandle adaptor_; + SharedHandle writer_; +public: + void setUp() + { + adaptor_.reset(new DirectDiskAdaptor()); + writer_.reset(new ByteArrayDiskWriter()); + adaptor_->setDiskWriter(writer_); + } + + void testWriteToDisk(); + void testClear(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION( WrDiskCacheEntryTest ); + +void WrDiskCacheEntryTest::testWriteToDisk() +{ + WrDiskCacheEntry e(adaptor_); + e.cacheData(createDataCell(0, "??01234567", 2)); + e.cacheData(createDataCell(8, "890")); + e.writeToDisk(); + CPPUNIT_ASSERT_EQUAL((size_t)0, e.getSize()); + CPPUNIT_ASSERT_EQUAL(std::string("01234567890"), writer_->getString()); +} + +void WrDiskCacheEntryTest::testClear() +{ + WrDiskCacheEntry e(adaptor_); + e.cacheData(createDataCell(0, "foo")); + e.clear(); + CPPUNIT_ASSERT_EQUAL((size_t)0, e.getSize()); + CPPUNIT_ASSERT_EQUAL(std::string(), writer_->getString()); +} + +} // namespace aria2 diff --git a/test/WrDiskCacheTest.cc b/test/WrDiskCacheTest.cc new file mode 100644 index 00000000..6e0663a7 --- /dev/null +++ b/test/WrDiskCacheTest.cc @@ -0,0 +1,74 @@ +#include "WrDiskCache.h" + +#include + +#include + +#include "TestUtil.h" +#include "DirectDiskAdaptor.h" +#include "ByteArrayDiskWriter.h" + +namespace aria2 { + +class WrDiskCacheTest:public CppUnit::TestFixture { + + CPPUNIT_TEST_SUITE(WrDiskCacheTest); + CPPUNIT_TEST(testAdd); + CPPUNIT_TEST_SUITE_END(); + + SharedHandle adaptor_; + SharedHandle writer_; +public: + void setUp() + { + adaptor_.reset(new DirectDiskAdaptor()); + writer_.reset(new ByteArrayDiskWriter()); + adaptor_->setDiskWriter(writer_); + } + + void testAdd(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION( WrDiskCacheTest ); + +void WrDiskCacheTest::testAdd() +{ + WrDiskCache dc(20); + CPPUNIT_ASSERT_EQUAL((size_t)0, dc.getSize()); + WrDiskCacheEntry e1(adaptor_); + e1.cacheData(createDataCell(0, "who knows?")); + CPPUNIT_ASSERT(dc.add(&e1)); + CPPUNIT_ASSERT_EQUAL((size_t)10, dc.getSize()); + + WrDiskCacheEntry e2(adaptor_); + e2.cacheData(createDataCell(21, "seconddata")); + CPPUNIT_ASSERT(dc.add(&e2)); + CPPUNIT_ASSERT_EQUAL((size_t)20, dc.getSize()); + + WrDiskCacheEntry e3(adaptor_); + e3.cacheData(createDataCell(10, "hello")); + CPPUNIT_ASSERT(dc.add(&e3)); + CPPUNIT_ASSERT_EQUAL((size_t)15, dc.getSize()); + // e1 is flushed to the disk + CPPUNIT_ASSERT_EQUAL(std::string("who knows?"), writer_->getString()); + CPPUNIT_ASSERT_EQUAL((size_t)0, e1.getSize()); + + e3.cacheData(createDataCell(15, " world")); + CPPUNIT_ASSERT(dc.update(&e3, 6)); + + // e3 is flushed to the disk + CPPUNIT_ASSERT_EQUAL(std::string("who knows?hello world"), + writer_->getString()); + CPPUNIT_ASSERT_EQUAL((size_t)0, e3.getSize()); + CPPUNIT_ASSERT_EQUAL((size_t)10, dc.getSize()); + + e2.cacheData(createDataCell(31, "01234567890")); + CPPUNIT_ASSERT(dc.update(&e2, 11)); + // e2 is flushed to the disk + CPPUNIT_ASSERT_EQUAL(std::string("who knows?hello worldseconddata01234567890"), + writer_->getString()); + CPPUNIT_ASSERT_EQUAL((size_t)0, e2.getSize()); + CPPUNIT_ASSERT_EQUAL((size_t)0, dc.getSize()); +} + +} // namespace aria2