Added --disk-cache option

This option enables disk cache. If SIZE is 0, the disk cache is
disabled. This feature caches the downloaded data in memory, which
grows to at most SIZE bytes. The cache storage is created for aria2
instance and shared by all downloads. The one advantage of the disk
cache is reduce the disk seek time because the data is written in
larger unit and it is reordered by the offset of the file. If the
underlying file is heavily fragmented it is not the case.
pull/36/head
Tatsuhiro Tsujikawa 2012-11-27 22:04:59 +09:00
parent 8ac433a8e9
commit f314719618
33 changed files with 1062 additions and 57 deletions

View File

@ -57,6 +57,9 @@
#include "DownloadContext.h"
#include "PeerStorage.h"
#include "array_fun.h"
#include "WrDiskCache.h"
#include "WrDiskCacheEntry.h"
#include "DownloadFailureException.h"
namespace aria2 {
@ -119,8 +122,17 @@ void BtPieceMessage::doReceivedAction()
A2_LOG_DEBUG("Already have this block.");
return;
}
getPieceStorage()->getDiskAdaptor()->writeData
(data_+9, blockLength_, offset);
if(piece->getWrDiskCacheEntry()) {
// Write Disk Cache enabled. Unfortunately, it incurs extra data
// copy.
unsigned char* dataCopy = new unsigned char[blockLength_];
memcpy(dataCopy, data_+9, blockLength_);
piece->updateWrCache(getPieceStorage()->getWrDiskCache(),
dataCopy, 0, blockLength_, offset);
} else {
getPieceStorage()->getDiskAdaptor()->writeData(data_+9, blockLength_,
offset);
}
piece->completeBlock(slot.getBlockIndex());
A2_LOG_DEBUG(fmt(MSG_PIECE_BITFIELD, getCuid(),
util::toHex(piece->getBitfield(),
@ -234,16 +246,37 @@ bool BtPieceMessage::checkPieceHash(const SharedHandle<Piece>& piece)
return
piece->getDigest() == downloadContext_->getPieceHash(piece->getIndex());
} else {
int64_t offset = static_cast<int64_t>(piece->getIndex())
*downloadContext_->getPieceLength();
return message_digest::staticSHA1Digest
(getPieceStorage()->getDiskAdaptor(), offset, piece->getLength())
== downloadContext_->getPieceHash(piece->getIndex());
A2_LOG_DEBUG(fmt("Calculating hash index=%lu",
static_cast<unsigned long>(piece->getIndex())));
try {
return piece->getDigestWithWrCache(downloadContext_->getPieceLength(),
getPieceStorage()->getDiskAdaptor())
== downloadContext_->getPieceHash(piece->getIndex());
} catch(RecoverableException& e) {
piece->clearAllBlock();
if(piece->getWrDiskCacheEntry()) {
piece->clearWrCache(getPieceStorage()->getWrDiskCache());
}
throw;
}
}
}
void BtPieceMessage::onNewPiece(const SharedHandle<Piece>& piece)
{
if(piece->getWrDiskCacheEntry()) {
// We flush cached data whenever an whole piece is retrieved.
piece->flushWrCache(getPieceStorage()->getWrDiskCache());
if(piece->getWrDiskCacheEntry()->getError() !=
WrDiskCacheEntry::CACHE_ERR_SUCCESS) {
piece->clearAllBlock();
piece->clearWrCache(getPieceStorage()->getWrDiskCache());
throw DOWNLOAD_FAILURE_EXCEPTION2
(fmt("Write disk cache flush failure index=%lu",
static_cast<unsigned long>(piece->getIndex())),
piece->getWrDiskCacheEntry()->getErrorCode());
}
}
A2_LOG_INFO(fmt(MSG_GOT_NEW_PIECE,
getCuid(),
static_cast<unsigned long>(piece->getIndex())));
@ -256,29 +289,14 @@ void BtPieceMessage::onWrongPiece(const SharedHandle<Piece>& piece)
A2_LOG_INFO(fmt(MSG_GOT_WRONG_PIECE,
getCuid(),
static_cast<unsigned long>(piece->getIndex())));
erasePieceOnDisk(piece);
if(piece->getWrDiskCacheEntry()) {
piece->clearWrCache(getPieceStorage()->getWrDiskCache());
}
piece->clearAllBlock();
piece->destroyHashContext();
getBtRequestFactory()->removeTargetPiece(piece);
}
void BtPieceMessage::erasePieceOnDisk(const SharedHandle<Piece>& piece)
{
size_t BUFSIZE = 4096;
unsigned char buf[BUFSIZE];
memset(buf, 0, BUFSIZE);
int64_t offset =
static_cast<int64_t>(piece->getIndex())*downloadContext_->getPieceLength();
div_t res = div(piece->getLength(), BUFSIZE);
for(int i = 0; i < res.quot; ++i) {
getPieceStorage()->getDiskAdaptor()->writeData(buf, BUFSIZE, offset);
offset += BUFSIZE;
}
if(res.rem > 0) {
getPieceStorage()->getDiskAdaptor()->writeData(buf, res.rem, offset);
}
}
void BtPieceMessage::onChokingEvent(const BtChokingEvent& event)
{
if(!isInvalidate() &&

View File

@ -61,8 +61,6 @@ private:
void onWrongPiece(const SharedHandle<Piece>& piece);
void erasePieceOnDisk(const SharedHandle<Piece>& piece);
void pushPieceData(int64_t offset, int32_t length) const;
public:
BtPieceMessage(size_t index = 0, int32_t begin = 0, int32_t blockLength = 0);

View File

@ -65,6 +65,8 @@
#include "bitfield.h"
#include "SingletonHolder.h"
#include "Notifier.h"
#include "WrDiskCache.h"
#include "RequestGroup.h"
#ifdef ENABLE_BITTORRENT
# include "bittorrent_helper.h"
#endif // ENABLE_BITTORRENT
@ -81,7 +83,8 @@ DefaultPieceStorage::DefaultPieceStorage
endGamePieceNum_(END_GAME_PIECE_NUM),
option_(option),
pieceStatMan_(new PieceStatMan(downloadContext->getNumPieces(), true)),
pieceSelector_(new RarestPieceSelector(pieceStatMan_))
pieceSelector_(new RarestPieceSelector(pieceStatMan_)),
wrDiskCache_(0)
{
const std::string& pieceSelectorOpt =
option_->get(PREF_STREAM_PIECE_SELECTOR);
@ -116,6 +119,13 @@ SharedHandle<Piece> DefaultPieceStorage::checkOutPiece
addUsedPiece(piece);
}
piece->addUser(cuid);
RequestGroup* group = downloadContext_->getOwnerRequestGroup();
if((!group || !group->inMemoryDownload()) &&
wrDiskCache_ && !piece->getWrDiskCacheEntry()) {
// So, we rely on the fact that diskAdaptor_ is not reinitialized
// in the session.
piece->initWrCache(wrDiskCache_, diskAdaptor_);
}
return piece;
}
@ -401,6 +411,7 @@ void DefaultPieceStorage::deleteUsedPiece(const SharedHandle<Piece>& piece)
return;
}
usedPieces_.erase(piece);
piece->releaseWrCache(wrDiskCache_);
}
// void DefaultPieceStorage::reduceUsedPieces(size_t upperBound)
@ -661,6 +672,29 @@ SharedHandle<DiskAdaptor> DefaultPieceStorage::getDiskAdaptor() {
return diskAdaptor_;
}
WrDiskCache* DefaultPieceStorage::getWrDiskCache()
{
return wrDiskCache_;
}
void DefaultPieceStorage::flushWrDiskCacheEntry()
{
if(!wrDiskCache_) {
return;
}
// UsedPieceSet is sorted by piece index. It means we can flush
// cache by non-decreasing offset, which is good to reduce disk seek
// unless the file is heavily fragmented.
for(UsedPieceSet::const_iterator i = usedPieces_.begin(),
eoi = usedPieces_.end(); i != eoi; ++i) {
WrDiskCacheEntry* ce = (*i)->getWrDiskCacheEntry();
if(ce) {
(*i)->flushWrCache(wrDiskCache_);
(*i)->releaseWrCache(wrDiskCache_);
}
}
}
int32_t DefaultPieceStorage::getPieceLength(size_t index)
{
return bitfieldMan_->getBlockLength(index);

View File

@ -92,6 +92,8 @@ private:
SharedHandle<PieceSelector> pieceSelector_;
SharedHandle<StreamPieceSelector> streamPieceSelector_;
WrDiskCache* wrDiskCache_;
#ifdef ENABLE_BITTORRENT
void getMissingPiece
(std::vector<SharedHandle<Piece> >& pieces,
@ -242,6 +244,10 @@ public:
virtual SharedHandle<DiskAdaptor> getDiskAdaptor();
virtual WrDiskCache* getWrDiskCache();
virtual void flushWrDiskCacheEntry();
virtual int32_t getPieceLength(size_t index);
virtual void advertisePiece(cuid_t cuid, size_t index);
@ -304,6 +310,11 @@ public:
{
return pieceSelector_;
}
void setWrDiskCache(WrDiskCache* wrDiskCache)
{
wrDiskCache_ = wrDiskCache;
}
};
} // namespace aria2

View File

@ -62,6 +62,9 @@
#include "SinkStreamFilter.h"
#include "FileEntry.h"
#include "SocketRecvBuffer.h"
#include "Piece.h"
#include "WrDiskCacheEntry.h"
#include "DownloadFailureException.h"
#ifdef ENABLE_MESSAGE_DIGEST
# include "MessageDigest.h"
# include "message_digest_helper.h"
@ -105,7 +108,9 @@ DownloadCommand::DownloadCommand
peerStat_->downloadStart();
getSegmentMan()->registerPeerStat(peerStat_);
streamFilter_.reset(new SinkStreamFilter(pieceHashValidationEnabled_));
WrDiskCache* wrDiskCache = getPieceStorage()->getWrDiskCache();
streamFilter_.reset(new SinkStreamFilter(wrDiskCache,
pieceHashValidationEnabled_));
streamFilter_->init();
sinkFilterOnly_ = true;
checkSocketRecvBuffer();
@ -116,6 +121,37 @@ DownloadCommand::~DownloadCommand() {
getSegmentMan()->updateFastestPeerStat(peerStat_);
}
namespace {
void flushWrDiskCacheEntry(WrDiskCache* wrDiskCache,
const SharedHandle<Segment>& segment)
{
const SharedHandle<Piece>& piece = segment->getPiece();
if(piece && piece->getWrDiskCacheEntry()) {
piece->flushWrCache(wrDiskCache);
if(piece->getWrDiskCacheEntry()->getError() !=
WrDiskCacheEntry::CACHE_ERR_SUCCESS) {
segment->clear();
piece->clearWrCache(wrDiskCache);
throw DOWNLOAD_FAILURE_EXCEPTION2
(fmt("Write disk cache flush failure index=%lu",
static_cast<unsigned long>(piece->getIndex())),
piece->getWrDiskCacheEntry()->getErrorCode());
}
}
}
} // namespace
namespace {
void clearWrDiskCacheEntry(WrDiskCache* wrDiskCache,
const SharedHandle<Segment>& segment)
{
const SharedHandle<Piece>& piece = segment->getPiece();
if(piece && piece->getWrDiskCacheEntry()) {
piece->clearWrCache(wrDiskCache);
}
}
} // namespace
bool DownloadCommand::executeInternal() {
if(getDownloadEngine()->getRequestGroupMan()->doesOverallDownloadSpeedExceed()
|| getRequestGroup()->doesDownloadSpeedExceed()) {
@ -218,6 +254,7 @@ bool DownloadCommand::executeInternal() {
// completed.
A2_LOG_INFO(fmt(MSG_SEGMENT_DOWNLOAD_COMPLETED,
getCuid()));
#ifdef ENABLE_MESSAGE_DIGEST
{
@ -226,6 +263,7 @@ bool DownloadCommand::executeInternal() {
if(pieceHashValidationEnabled_ && !expectedPieceHash.empty()) {
if(
#ifdef ENABLE_BITTORRENT
// TODO Is this necessary?
(!getPieceStorage()->isEndGame() ||
!getDownloadContext()->hasAttribute(CTX_ATTR_BT)) &&
#endif // ENABLE_BITTORRENT
@ -235,22 +273,26 @@ bool DownloadCommand::executeInternal() {
validatePieceHash
(segment, expectedPieceHash, segment->getDigest());
} else {
messageDigest_->reset();
validatePieceHash
(segment, expectedPieceHash,
message_digest::digest
(messageDigest_,
getPieceStorage()->getDiskAdaptor(),
segment->getPosition(),
segment->getLength()));
try {
std::string actualHash =
segment->getPiece()->getDigestWithWrCache
(segment->getSegmentLength(), diskAdaptor);
validatePieceHash(segment, expectedPieceHash, actualHash);
} catch(RecoverableException& e) {
segment->clear();
clearWrDiskCacheEntry(getPieceStorage()->getWrDiskCache(),
segment);
getSegmentMan()->cancelSegment(getCuid());
throw;
}
}
} else {
getSegmentMan()->completeSegment(getCuid(), segment);
completeSegment(getCuid(), segment);
}
}
#else // !ENABLE_MESSAGE_DIGEST
getSegmentMan()->completeSegment(getCuid(), segment);
completeSegment(getCuid(), segment);
#endif // !ENABLE_MESSAGE_DIGEST
} else {
// If segment is not canceled here, in the next pipelining
@ -357,7 +399,7 @@ void DownloadCommand::validatePieceHash(const SharedHandle<Segment>& segment,
{
if(actualHash == expectedHash) {
A2_LOG_INFO(fmt(MSG_GOOD_CHUNK_CHECKSUM, util::toHex(actualHash).c_str()));
getSegmentMan()->completeSegment(getCuid(), segment);
completeSegment(getCuid(), segment);
} else {
A2_LOG_INFO(fmt(EX_INVALID_CHUNK_CHECKSUM,
static_cast<unsigned long>(segment->getIndex()),
@ -365,6 +407,7 @@ void DownloadCommand::validatePieceHash(const SharedHandle<Segment>& segment,
util::toHex(expectedHash).c_str(),
util::toHex(actualHash).c_str()));
segment->clear();
clearWrDiskCacheEntry(getPieceStorage()->getWrDiskCache(), segment);
getSegmentMan()->cancelSegment(getCuid());
throw DL_RETRY_EX
(fmt("Invalid checksum index=%lu",
@ -374,6 +417,13 @@ void DownloadCommand::validatePieceHash(const SharedHandle<Segment>& segment,
#endif // ENABLE_MESSAGE_DIGEST
void DownloadCommand::completeSegment(cuid_t cuid,
const SharedHandle<Segment>& segment)
{
flushWrDiskCacheEntry(getPieceStorage()->getWrDiskCache(), segment);
getSegmentMan()->completeSegment(cuid, segment);
}
void DownloadCommand::installStreamFilter
(const SharedHandle<StreamFilter>& streamFilter)
{

View File

@ -67,6 +67,8 @@ private:
void checkLowestDownloadSpeed() const;
void completeSegment(cuid_t cuid, const SharedHandle<Segment>& segment);
SharedHandle<StreamFilter> streamFilter_;
bool sinkFilterOnly_;

View File

@ -137,6 +137,7 @@ DownloadEngineFactory::newDownloadEngine
SharedHandle<RequestGroupMan>
requestGroupMan(new RequestGroupMan(requestGroups, MAX_CONCURRENT_DOWNLOADS,
op));
requestGroupMan->initWrDiskCache();
e->setRequestGroupMan(requestGroupMan);
e->setFileAllocationMan
(SharedHandle<FileAllocationMan>(new FileAllocationMan()));

View File

@ -240,7 +240,9 @@ SRCS = SocketCore.cc SocketCore.h\
Notifier.cc Notifier.h\
ValueBaseDiskWriter.h\
AnonDiskWriterFactory.h\
XmlRpcRequestParserController.cc XmlRpcRequestParserController.h
XmlRpcRequestParserController.cc XmlRpcRequestParserController.h\
WrDiskCache.cc WrDiskCache.h\
WrDiskCacheEntry.cc WrDiskCacheEntry.h
if MINGW_BUILD
SRCS += WinConsoleFile.cc WinConsoleFile.h

View File

@ -204,6 +204,15 @@ std::vector<OptionHandler*> OptionHandlerFactory::createOptionHandlers()
op->addTag(TAG_ADVANCED);
handlers.push_back(op);
}
{
OptionHandler* op(new UnitNumberOptionHandler
(PREF_DISK_CACHE,
TEXT_DISK_CACHE,
"0",
0));
op->addTag(TAG_ADVANCED);
handlers.push_back(op);
}
{
OptionHandler* op(new BooleanOptionHandler
(PREF_DEFERRED_INPUT,

View File

@ -38,6 +38,11 @@
#include "A2STR.h"
#include "util.h"
#include "a2functional.h"
#include "WrDiskCache.h"
#include "WrDiskCacheEntry.h"
#include "LogFactory.h"
#include "fmt.h"
#include "DiskAdaptor.h"
#ifdef ENABLE_MESSAGE_DIGEST
# include "MessageDigest.h"
#endif // ENABLE_MESSAGE_DIGEST
@ -45,7 +50,7 @@
namespace aria2 {
Piece::Piece():index_(0), length_(0), blockLength_(BLOCK_LENGTH), bitfield_(0),
usedBySegment_(false)
usedBySegment_(false), wrCache_(0)
#ifdef ENABLE_MESSAGE_DIGEST
, nextBegin_(0)
#endif // ENABLE_MESSAGE_DIGEST
@ -56,7 +61,7 @@ Piece::Piece(size_t index, int32_t length, int32_t blockLength)
length_(length),
blockLength_(blockLength),
bitfield_(new BitfieldMan(blockLength_, length)),
usedBySegment_(false)
usedBySegment_(false), wrCache_(0)
#ifdef ENABLE_MESSAGE_DIGEST
,nextBegin_(0)
#endif // ENABLE_MESSAGE_DIGEST
@ -64,6 +69,7 @@ Piece::Piece(size_t index, int32_t length, int32_t blockLength)
Piece::~Piece()
{
delete wrCache_;
delete bitfield_;
}
@ -232,6 +238,56 @@ std::string Piece::getDigest()
}
}
namespace {
void updateHashWithRead(const SharedHandle<MessageDigest>& mdctx,
const SharedHandle<DiskAdaptor>& adaptor,
int64_t offset, size_t len)
{
const size_t BUFSIZE = 4096;
unsigned char buf[BUFSIZE];
ldiv_t res = ldiv(len, BUFSIZE);
for(int j = 0; j < res.quot; ++j) {
ssize_t nread = adaptor->readData(buf, BUFSIZE, offset);
if((size_t)nread != BUFSIZE) {
throw DL_ABORT_EX(fmt(EX_FILE_READ, "n/a", "data is too short"));
}
mdctx->update(buf, nread);
offset += nread;
}
if(res.rem) {
ssize_t nread = adaptor->readData(buf, res.rem, offset);
if(nread != res.rem) {
throw DL_ABORT_EX(fmt(EX_FILE_READ, "n/a", "data is too short"));
}
mdctx->update(buf, nread);
offset += nread;
}
}
} // namespace
std::string Piece::getDigestWithWrCache
(size_t pieceLength, const SharedHandle<DiskAdaptor>& adaptor)
{
SharedHandle<MessageDigest> mdctx(MessageDigest::create(hashType_));
int64_t start = static_cast<int64_t>(index_)*pieceLength;
int64_t goff = start;
if(wrCache_) {
const WrDiskCacheEntry::DataCellSet& dataSet = wrCache_->getDataSet();
for(WrDiskCacheEntry::DataCellSet::iterator i = dataSet.begin(),
eoi = dataSet.end(); i != eoi; ++i) {
if(goff < (*i)->goff) {
updateHashWithRead(mdctx, adaptor, goff, (*i)->goff - goff);
}
mdctx->update((*i)->data+(*i)->offset, (*i)->len);
goff = (*i)->goff + (*i)->len;
}
updateHashWithRead(mdctx, adaptor, goff, start+length_-goff);
} else {
updateHashWithRead(mdctx, adaptor, goff, length_);
}
return mdctx->digest();
}
void Piece::destroyHashContext()
{
mdctx_.reset();
@ -257,4 +313,55 @@ void Piece::removeUser(cuid_t cuid)
users_.erase(std::remove(users_.begin(), users_.end(), cuid), users_.end());
}
void Piece::initWrCache(WrDiskCache* diskCache,
const SharedHandle<DiskAdaptor>& diskAdaptor)
{
assert(wrCache_ == 0);
wrCache_ = new WrDiskCacheEntry(diskAdaptor);
bool rv = diskCache->add(wrCache_);
assert(rv);
}
void Piece::flushWrCache(WrDiskCache* diskCache)
{
assert(wrCache_);
ssize_t size = static_cast<ssize_t>(wrCache_->getSize());
diskCache->update(wrCache_, -size);
wrCache_->writeToDisk();
}
void Piece::clearWrCache(WrDiskCache* diskCache)
{
assert(wrCache_);
ssize_t size = static_cast<ssize_t>(wrCache_->getSize());
diskCache->update(wrCache_, -size);
wrCache_->clear();
}
void Piece::updateWrCache(WrDiskCache* diskCache, unsigned char* data,
size_t offset, size_t len, int64_t goff)
{
A2_LOG_DEBUG(fmt("updateWrCache entry=%p", wrCache_));
assert(wrCache_);
WrDiskCacheEntry::DataCell* cell = new WrDiskCacheEntry::DataCell();
cell->goff = goff;
cell->data = data;
cell->offset = offset;
cell->len = len;
bool rv;
rv = wrCache_->cacheData(cell);
assert(rv);
rv = diskCache->update(wrCache_, len);
assert(rv);
}
void Piece::releaseWrCache(WrDiskCache* diskCache)
{
if(wrCache_) {
diskCache->remove(wrCache_);
delete wrCache_;
wrCache_ = 0;
}
}
} // namespace aria2

View File

@ -47,6 +47,9 @@
namespace aria2 {
class BitfieldMan;
class WrDiskCache;
class WrDiskCacheEntry;
class DiskAdaptor;
#ifdef ENABLE_MESSAGE_DIGEST
@ -62,6 +65,7 @@ private:
BitfieldMan* bitfield_;
std::vector<cuid_t> users_;
bool usedBySegment_;
WrDiskCacheEntry* wrCache_;
#ifdef ENABLE_MESSAGE_DIGEST
int32_t nextBegin_;
@ -172,6 +176,10 @@ public:
void destroyHashContext();
// Returns raw hash value, not hex digest, which is calculated using
// cached data and data on disk.
std::string getDigestWithWrCache(size_t pieceLength,
const SharedHandle<DiskAdaptor>& adaptor);
#endif // ENABLE_MESSAGE_DIGEST
/**
@ -194,6 +202,18 @@ public:
{
usedBySegment_ = f;
}
void initWrCache(WrDiskCache* diskCache,
const SharedHandle<DiskAdaptor>& diskAdaptor);
void flushWrCache(WrDiskCache* diskCache);
void clearWrCache(WrDiskCache* diskCache);
void updateWrCache(WrDiskCache* diskCache, unsigned char* data,
size_t offset, size_t len, int64_t goff);
void releaseWrCache(WrDiskCache* diskCache);
WrDiskCacheEntry* getWrDiskCacheEntry() const
{
return wrCache_;
}
};
} // namespace aria2

View File

@ -51,6 +51,7 @@ class Piece;
class Peer;
#endif // ENABLE_BITTORRENT
class DiskAdaptor;
class WrDiskCache;
class PieceStorage {
public:
@ -228,6 +229,11 @@ public:
virtual SharedHandle<DiskAdaptor> getDiskAdaptor() = 0;
virtual WrDiskCache* getWrDiskCache() = 0;
// Flushes write disk cache for in-flight piece and evicts them.
virtual void flushWrDiskCacheEntry() = 0;
virtual int32_t getPieceLength(size_t index) = 0;
/**

View File

@ -210,6 +210,7 @@ error_code::Value RequestGroup::downloadResult() const
void RequestGroup::closeFile()
{
if(pieceStorage_) {
pieceStorage_->flushWrDiskCacheEntry();
pieceStorage_->getDiskAdaptor()->closeFile();
}
}
@ -621,6 +622,9 @@ void RequestGroup::initPieceStorage()
new DefaultPieceStorage(downloadContext_, option_.get());
SharedHandle<PieceStorage> psHolder(ps);
#endif // !ENABLE_BITTORRENT
if(requestGroupMan_) {
ps->setWrDiskCache(requestGroupMan_->getWrDiskCache());
}
if(diskWriterFactory_) {
ps->setDiskWriterFactory(diskWriterFactory_);
}

View File

@ -78,6 +78,7 @@
#include "SingletonHolder.h"
#include "Notifier.h"
#include "PeerStat.h"
#include "WrDiskCache.h"
#ifdef ENABLE_BITTORRENT
# include "bittorrent_helper.h"
#endif // ENABLE_BITTORRENT
@ -100,12 +101,16 @@ RequestGroupMan::RequestGroupMan
queueCheck_(true),
removedErrorResult_(0),
removedLastErrorResult_(error_code::FINISHED),
maxDownloadResult_(option->getAsInt(PREF_MAX_DOWNLOAD_RESULT))
maxDownloadResult_(option->getAsInt(PREF_MAX_DOWNLOAD_RESULT)),
wrDiskCache_(0)
{
addRequestGroupIndex(requestGroups);
}
RequestGroupMan::~RequestGroupMan() {}
RequestGroupMan::~RequestGroupMan()
{
delete wrDiskCache_;
}
bool RequestGroupMan::downloadFinished()
{
@ -620,8 +625,8 @@ void RequestGroupMan::fillRequestGroupFromReserver(DownloadEngine* e)
// reference.
groupToAdd->dropPieceStorage();
configureRequestGroup(groupToAdd);
createInitialCommand(groupToAdd, commands, e);
groupToAdd->setRequestGroupMan(this);
createInitialCommand(groupToAdd, commands, e);
if(commands.empty()) {
requestQueueCheck();
}
@ -1087,4 +1092,13 @@ void RequestGroupMan::setUriListParser
uriListParser_ = uriListParser;
}
void RequestGroupMan::initWrDiskCache()
{
assert(wrDiskCache_ == 0);
size_t limit = option_->getAsInt(PREF_DISK_CACHE);
if(limit > 0) {
wrDiskCache_ = new WrDiskCache(limit);
}
}
} // namespace aria2

View File

@ -58,6 +58,7 @@ class ServerStat;
class Option;
class OutputFile;
class UriListParser;
class WrDiskCache;
class RequestGroupMan {
private:
@ -95,6 +96,8 @@ private:
// UriListParser for deferred input.
SharedHandle<UriListParser> uriListParser_;
WrDiskCache* wrDiskCache_;
void formatDownloadResultFull
(OutputFile& out,
const char* status,
@ -341,6 +344,15 @@ public:
{
return netStat_;
}
WrDiskCache* getWrDiskCache() const
{
return wrDiskCache_;
}
// Initializes WrDiskCache according to PREF_DISK_CACHE option. If
// its value is 0, cache storage will not be initialized.
void initWrDiskCache();
};
} // namespace aria2

View File

@ -33,14 +33,20 @@
*/
/* copyright --> */
#include "SinkStreamFilter.h"
#include <cstring>
#include "BinaryStream.h"
#include "Segment.h"
#include "WrDiskCache.h"
#include "Piece.h"
namespace aria2 {
const std::string SinkStreamFilter::NAME("SinkStreamFilter");
SinkStreamFilter::SinkStreamFilter(bool hashUpdate):
SinkStreamFilter::SinkStreamFilter(WrDiskCache* wrDiskCache, bool hashUpdate):
wrDiskCache_(wrDiskCache),
hashUpdate_(hashUpdate),
bytesProcessed_(0) {}
@ -60,7 +66,16 @@ ssize_t SinkStreamFilter::transform
} else {
wlen = inlen;
}
out->writeData(inbuf, wlen, segment->getPositionToWrite());
const SharedHandle<Piece>& piece = segment->getPiece();
if(piece && piece->getWrDiskCacheEntry()) {
assert(wrDiskCache_);
unsigned char* dataCopy = new unsigned char[wlen];
memcpy(dataCopy, inbuf, wlen);
piece->updateWrCache(wrDiskCache_, dataCopy, 0, wlen,
segment->getPositionToWrite());
} else {
out->writeData(inbuf, wlen, segment->getPositionToWrite());
}
#ifdef ENABLE_MESSAGE_DIGEST
if(hashUpdate_) {
segment->updateHash(segment->getWrittenLength(), inbuf, wlen);

View File

@ -39,13 +39,15 @@
namespace aria2 {
class WrDiskCache;
class SinkStreamFilter:public StreamFilter {
private:
WrDiskCache* wrDiskCache_;
bool hashUpdate_;
size_t bytesProcessed_;
public:
SinkStreamFilter(bool hashUpdate = false);
SinkStreamFilter(WrDiskCache* wrDiskCache = 0, bool hashUpdate = false);
virtual void init() {}

View File

@ -231,6 +231,10 @@ public:
virtual SharedHandle<DiskAdaptor> getDiskAdaptor();
virtual WrDiskCache* getWrDiskCache() { return 0; }
virtual void flushWrDiskCacheEntry() {}
virtual int32_t getPieceLength(size_t index);
/**

129
src/WrDiskCache.cc Normal file
View File

@ -0,0 +1,129 @@
/* <!-- copyright */
/*
* aria2 - The high speed download utility
*
* Copyright (C) 2012 Tatsuhiro Tsujikawa
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* In addition, as a special exception, the copyright holders give
* permission to link the code of portions of this program with the
* OpenSSL library under certain conditions as described in each
* individual source file, and distribute linked combinations
* including the two.
* You must obey the GNU General Public License in all respects
* for all of the code used other than OpenSSL. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you
* do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source
* files in the program, then also delete it here.
*/
/* copyright --> */
#include "WrDiskCache.h"
#include "WrDiskCacheEntry.h"
#include "LogFactory.h"
#include "fmt.h"
namespace aria2 {
WrDiskCache::WrDiskCache(size_t limit)
: limit_(limit),
total_(0),
clock_(0)
{}
WrDiskCache::~WrDiskCache()
{
if(total_) {
A2_LOG_WARN(fmt("Write disk cache is not empty size=%lu",
static_cast<unsigned long>(total_)));
}
}
bool WrDiskCache::add(WrDiskCacheEntry* ent)
{
ent->setSizeKey(ent->getSize());
ent->setLastUpdate(++clock_);
std::pair<EntrySet::iterator, bool> rv = set_.insert(ent);
if(rv.second) {
total_ += ent->getSize();
ensureLimit();
return true;
} else {
A2_LOG_WARN(fmt("Found duplicate cache entry a.{size=%lu,clock=%"PRId64
"} b{size=%lu,clock=%"PRId64"}",
static_cast<unsigned long>((*rv.first)->getSize()),
(*rv.first)->getLastUpdate(),
static_cast<unsigned long>(ent->getSize()),
ent->getLastUpdate()));
return false;
}
}
bool WrDiskCache::remove(WrDiskCacheEntry* ent)
{
if(set_.erase(ent)) {
A2_LOG_DEBUG(fmt("Removed cache entry size=%lu, clock=%"PRId64,
static_cast<unsigned long>(ent->getSize()),
ent->getLastUpdate()));
total_ -= ent->getSize();
return true;
} else {
return false;
}
}
bool WrDiskCache::update(WrDiskCacheEntry* ent, ssize_t delta)
{
if(!set_.erase(ent)) {
return false;
}
A2_LOG_DEBUG(fmt("Update cache entry size=%lu, delta=%ld, clock=%"PRId64,
static_cast<unsigned long>(ent->getSize()),
static_cast<long>(delta),
ent->getLastUpdate()));
ent->setSizeKey(ent->getSize());
ent->setLastUpdate(++clock_);
set_.insert(ent);
if(delta < 0) {
assert(total_ >= static_cast<size_t>(-delta));
}
total_ += delta;
ensureLimit();
return true;
}
void WrDiskCache::ensureLimit()
{
while(total_ > limit_) {
EntrySet::iterator i = set_.begin();
total_ -= (*i)->getSize();
(*i)->writeToDisk();
WrDiskCacheEntry* ent = *i;
A2_LOG_DEBUG(fmt("Force flush cache entry size=%lu, clock=%"PRId64,
static_cast<unsigned long>(ent->getSizeKey()),
ent->getLastUpdate()));
set_.erase(i);
ent->setSizeKey(ent->getSize());
ent->setLastUpdate(++clock_);
set_.insert(ent);
}
}
} // namespace aria2

83
src/WrDiskCache.h Normal file
View File

@ -0,0 +1,83 @@
/* <!-- copyright */
/*
* aria2 - The high speed download utility
*
* Copyright (C) 2012 Tatsuhiro Tsujikawa
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* In addition, as a special exception, the copyright holders give
* permission to link the code of portions of this program with the
* OpenSSL library under certain conditions as described in each
* individual source file, and distribute linked combinations
* including the two.
* You must obey the GNU General Public License in all respects
* for all of the code used other than OpenSSL. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you
* do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source
* files in the program, then also delete it here.
*/
/* copyright --> */
#ifndef D_WR_DISK_CACHE_H
#define D_WR_DISK_CACHE_H
#include "common.h"
#include <set>
#include "a2functional.h"
namespace aria2 {
class WrDiskCacheEntry;
class WrDiskCache {
public:
WrDiskCache(size_t limit);
~WrDiskCache();
// Adds the cache entry |ent| to the storage. The size of cached
// data of ent is added to total_.
bool add(WrDiskCacheEntry* ent);
// Removes the cache entry |ent| from the stroage. The size of
// cached data of ent is subtracted from total_.
bool remove(WrDiskCacheEntry* ent);
// Updates the already added entry |ent|. The |delta| means how many
// bytes is increased in this update. If the size is reduced, use
// negative value.
bool update(WrDiskCacheEntry* ent, ssize_t delta);
// Evicts entries from storage so that total size of cache is kept
// under the limit.
void ensureLimit();
size_t getSize() const
{
return total_;
}
private:
typedef std::set<WrDiskCacheEntry*,
DerefLess<WrDiskCacheEntry*> > EntrySet;
// Maximum number of bytes the storage can cache.
size_t limit_;
// Current number of bytes cached.
size_t total_;
EntrySet set_;
int64_t clock_;
};
} // namespace aria2
#endif // D_WR_DISK_CACHE_H

110
src/WrDiskCacheEntry.cc Normal file
View File

@ -0,0 +1,110 @@
/* <!-- copyright */
/*
* aria2 - The high speed download utility
*
* Copyright (C) 2012 Tatsuhiro Tsujikawa
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* In addition, as a special exception, the copyright holders give
* permission to link the code of portions of this program with the
* OpenSSL library under certain conditions as described in each
* individual source file, and distribute linked combinations
* including the two.
* You must obey the GNU General Public License in all respects
* for all of the code used other than OpenSSL. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you
* do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source
* files in the program, then also delete it here.
*/
/* copyright --> */
#include "WrDiskCacheEntry.h"
#include "DiskAdaptor.h"
#include "RecoverableException.h"
#include "DownloadFailureException.h"
#include "LogFactory.h"
#include "fmt.h"
namespace aria2 {
WrDiskCacheEntry::WrDiskCacheEntry
(const SharedHandle<DiskAdaptor>& diskAdaptor)
: sizeKey_(0),
lastUpdate_(0),
size_(0),
error_(CACHE_ERR_SUCCESS),
errorCode_(error_code::UNDEFINED),
diskAdaptor_(diskAdaptor)
{}
WrDiskCacheEntry::~WrDiskCacheEntry()
{
if(!set_.empty()) {
A2_LOG_WARN(fmt("WrDiskCacheEntry is not empty size=%lu",
static_cast<unsigned long>(size_)));
}
deleteDataCells();
}
void WrDiskCacheEntry::deleteDataCells()
{
for(DataCellSet::iterator i = set_.begin(), eoi = set_.end(); i != eoi;
++i) {
delete [] (*i)->data;
delete *i;
}
set_.clear();
size_ = 0;
}
void WrDiskCacheEntry::writeToDisk()
{
DataCellSet::iterator i = set_.begin(), eoi = set_.end();
try {
for(; i != eoi; ++i) {
A2_LOG_DEBUG(fmt("WrDiskCacheEntry flush goff=%"PRId64", len=%lu",
(*i)->goff, static_cast<unsigned long>((*i)->len)));
diskAdaptor_->writeData((*i)->data+(*i)->offset, (*i)->len,
(*i)->goff);
}
} catch(RecoverableException& e) {
A2_LOG_ERROR(fmt("WrDiskCacheEntry flush error goff=%"PRId64", len=%lu",
(*i)->goff, static_cast<unsigned long>((*i)->len)));
error_ = CACHE_ERR_ERROR;
errorCode_ = e.getErrorCode();
}
deleteDataCells();
}
void WrDiskCacheEntry::clear()
{
deleteDataCells();
}
bool WrDiskCacheEntry::cacheData(DataCell* dataCell)
{
A2_LOG_DEBUG(fmt("WrDiskCacheEntry cache goff=%"PRId64", len=%lu",
dataCell->goff, static_cast<unsigned long>(dataCell->len)));
if(set_.insert(dataCell).second) {
size_ += dataCell->len;
return true;
} else {
return false;
}
}
} // namespace aria2

142
src/WrDiskCacheEntry.h Normal file
View File

@ -0,0 +1,142 @@
/* <!-- copyright */
/*
* aria2 - The high speed download utility
*
* Copyright (C) 2012 Tatsuhiro Tsujikawa
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* In addition, as a special exception, the copyright holders give
* permission to link the code of portions of this program with the
* OpenSSL library under certain conditions as described in each
* individual source file, and distribute linked combinations
* including the two.
* You must obey the GNU General Public License in all respects
* for all of the code used other than OpenSSL. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you
* do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source
* files in the program, then also delete it here.
*/
/* copyright --> */
#ifndef D_WR_DISK_CACHE_ENTRY_H
#define D_WR_DISK_CACHE_ENTRY_H
#include "common.h"
#include <set>
#include "SharedHandle.h"
#include "a2functional.h"
#include "error_code.h"
namespace aria2 {
class DiskAdaptor;
class WrDiskCache;
class WrDiskCacheEntry {
public:
struct DataCell {
// Where the data is going to be put in DiskAdaptor
int64_t goff;
// data must be len+offset bytes. Thus, the cached data is
// [data+offset, data+offset+len).
unsigned char *data;
size_t offset;
size_t len;
bool operator<(const DataCell& rhs) const
{
return goff < rhs.goff;
}
};
typedef std::set<DataCell*, DerefLess<DataCell*> > DataCellSet;
WrDiskCacheEntry(const SharedHandle<DiskAdaptor>& diskAdaptor);
~WrDiskCacheEntry();
// Flushes the cached data to the disk and deletes them.
void writeToDisk();
// Deletes cached data without flushing to the disk.
void clear();
// Caches |dataCell|
bool cacheData(DataCell* dataCell);
size_t getSize() const
{
return size_;
}
void setSizeKey(size_t sizeKey)
{
sizeKey_ = sizeKey;
}
size_t getSizeKey() const
{
return sizeKey_;
}
void setLastUpdate(int64_t clock)
{
lastUpdate_ = clock;
}
int64_t getLastUpdate() const
{
return lastUpdate_;
}
bool operator<(const WrDiskCacheEntry& rhs) const
{
return sizeKey_ > rhs.sizeKey_ ||
(sizeKey_ == rhs.sizeKey_ && lastUpdate_ < rhs.lastUpdate_);
}
enum {
CACHE_ERR_SUCCESS,
CACHE_ERR_ERROR
};
int getError() const
{
return error_;
}
error_code::Value getErrorCode() const
{
return errorCode_;
}
const DataCellSet& getDataSet() const
{
return set_;
}
private:
void deleteDataCells();
size_t sizeKey_;
int64_t lastUpdate_;
size_t size_;
DataCellSet set_;
int error_;
error_code::Value errorCode_;
SharedHandle<DiskAdaptor> diskAdaptor_;
};
} // namespace aria2
#endif // D_WR_DISK_CACHE_ENTRY_H

View File

@ -338,6 +338,8 @@ const Pref* PREF_STOP_WITH_PROCESS = makePref("stop-with-process");
const Pref* PREF_ENABLE_MMAP = makePref("enable-mmap");
// value: true | false
const Pref* PREF_FORCE_SAVE = makePref("force-save");
// value: 1*digit
const Pref* PREF_DISK_CACHE = makePref("disk-cache");
/**
* FTP related preferences

View File

@ -281,6 +281,8 @@ extern const Pref* PREF_STOP_WITH_PROCESS;
extern const Pref* PREF_ENABLE_MMAP;
// value: true | false
extern const Pref* PREF_FORCE_SAVE;
// value: 1*digit
extern const Pref* PREF_DISK_CACHE;
/**
* FTP related preferences

View File

@ -914,3 +914,15 @@
" if the download is completed or removed. This\n" \
" may be useful to save BitTorrent seeding which\n" \
" is recognized as completed state.")
#define TEXT_DISK_CACHE \
_(" --disk-cache=SIZE Enable disk cache. If SIZE is 0, the disk cache\n" \
" is disabled. This feature caches the downloaded\n" \
" data in memory, which grows to at most SIZE\n" \
" bytes. The cache storage is created for aria2\n" \
" instance and shared by all downloads. The one\n" \
" advantage of the disk cache is reduce the disk\n" \
" seek time because the data is written in larger\n" \
" unit and it is reordered by the offset of the\n" \
" file. If the underlying file is heavily\n" \
" fragmented it is not the case.\n" \
" SIZE can include K or M(1K = 1024, 1M = 1024K).")

View File

@ -85,7 +85,9 @@ aria2c_SOURCES = AllTest.cc\
ParamedStringTest.cc\
RpcHelperTest.cc\
AbstractCommandTest.cc\
SinkStreamFilterTest.cc
SinkStreamFilterTest.cc\
WrDiskCacheTest.cc\
WrDiskCacheEntryTest.cc
if ENABLE_XML_RPC
aria2c_SOURCES += XmlRpcRequestParserControllerTest.cc

View File

@ -226,6 +226,12 @@ public:
return diskAdaptor;
}
virtual WrDiskCache* getWrDiskCache() {
return 0;
}
virtual void flushWrDiskCacheEntry() {}
void setDiskAdaptor(const SharedHandle<DiskAdaptor>& adaptor) {
this->diskAdaptor = adaptor;
}

View File

@ -5,6 +5,9 @@
#include <cppunit/extensions/HelperMacros.h>
#include "util.h"
#include "DirectDiskAdaptor.h"
#include "ByteArrayDiskWriter.h"
#include "WrDiskCache.h"
namespace aria2 {
@ -13,24 +16,33 @@ class PieceTest:public CppUnit::TestFixture {
CPPUNIT_TEST_SUITE(PieceTest);
CPPUNIT_TEST(testCompleteBlock);
CPPUNIT_TEST(testGetCompletedLength);
CPPUNIT_TEST(testFlushWrCache);
#ifdef ENABLE_MESSAGE_DIGEST
CPPUNIT_TEST(testGetDigestWithWrCache);
CPPUNIT_TEST(testUpdateHash);
#endif // ENABLE_MESSAGE_DIGEST
CPPUNIT_TEST_SUITE_END();
private:
SharedHandle<DirectDiskAdaptor> adaptor_;
SharedHandle<ByteArrayDiskWriter> writer_;
public:
void setUp() {}
void setUp()
{
adaptor_.reset(new DirectDiskAdaptor());
writer_.reset(new ByteArrayDiskWriter());
adaptor_->setDiskWriter(writer_);
}
void testCompleteBlock();
void testGetCompletedLength();
void testFlushWrCache();
#ifdef ENABLE_MESSAGE_DIGEST
void testGetDigestWithWrCache();
void testUpdateHash();
#endif // ENABLE_MESSAGE_DIGEST
@ -62,8 +74,58 @@ void PieceTest::testGetCompletedLength()
CPPUNIT_ASSERT_EQUAL(blockLength*3+100, p.getCompletedLength());
}
void PieceTest::testFlushWrCache()
{
unsigned char* data;
Piece p(0, 1024);
WrDiskCache dc(64);
p.initWrCache(&dc, adaptor_);
data = new unsigned char[3];
memcpy(data, "foo", 3);
p.updateWrCache(&dc, data, 0, 3, 0);
data = new unsigned char[4];
memcpy(data, " bar", 4);
p.updateWrCache(&dc, data, 0, 4, 3);
p.flushWrCache(&dc);
CPPUNIT_ASSERT_EQUAL(std::string("foo bar"), writer_->getString());
data = new unsigned char[3];
memcpy(data, "foo", 3);
p.updateWrCache(&dc, data, 0, 3, 0);
CPPUNIT_ASSERT_EQUAL((size_t)3, dc.getSize());
p.clearWrCache(&dc);
CPPUNIT_ASSERT_EQUAL((size_t)0, dc.getSize());
p.releaseWrCache(&dc);
CPPUNIT_ASSERT(!p.getWrDiskCacheEntry());
}
#ifdef ENABLE_MESSAGE_DIGEST
void PieceTest::testGetDigestWithWrCache()
{
unsigned char* data;
Piece p(0, 26);
p.setHashType("sha-1");
WrDiskCache dc(64);
// 012345678901234567890123456
writer_->setString("abcde...ijklmnopq...uvwx.z");
p.initWrCache(&dc, adaptor_);
data = new unsigned char[3];
memcpy(data, "fgh", 3);
p.updateWrCache(&dc, data, 0, 3, 5);
data = new unsigned char[3];
memcpy(data, "rst", 3);
p.updateWrCache(&dc, data, 0, 3, 17);
data = new unsigned char[1];
memcpy(data, "y", 1);
p.updateWrCache(&dc, data, 0, 1, 24);
CPPUNIT_ASSERT_EQUAL
(std::string("32d10c7b8cf96570ca04ce37f2a19d84240d3a89"),
util::toHex(p.getDigestWithWrCache(p.getLength(), adaptor_)));
}
void PieceTest::testUpdateHash()
{
Piece p(0, 16, 2*1024*1024);

View File

@ -54,9 +54,7 @@ public:
void setUp()
{
writer_.reset(new ByteArrayDiskWriter());
sinkFilter_.reset(new SinkStreamFilter());
filter_.reset(new SinkStreamFilter(sinkFilter_));
sinkFilter_->init();
filter_.reset(new SinkStreamFilter());
filter_->init();
segment_.reset(new MockSegment2(16));
}

View File

@ -91,4 +91,18 @@ std::string fileHexDigest
}
#endif // ENABLE_MESSAGE_DIGEST
WrDiskCacheEntry::DataCell* createDataCell(int64_t goff,
const char* data,
size_t offset)
{
WrDiskCacheEntry::DataCell* cell = new WrDiskCacheEntry::DataCell();
cell->goff = goff;
size_t len = strlen(data);
cell->data = new unsigned char[len];
memcpy(cell->data, data, len);
cell->offset = offset;
cell->len = len;
return cell;
}
} // namespace aria2

View File

@ -4,6 +4,7 @@
#include "SharedHandle.h"
#include "Cookie.h"
#include "WrDiskCacheEntry.h"
namespace aria2 {
@ -50,4 +51,8 @@ std::string fileHexDigest
(const SharedHandle<MessageDigest>& ctx, const std::string& filename);
#endif // ENABLE_MESSAGE_DIGEST
WrDiskCacheEntry::DataCell* createDataCell(int64_t goff,
const char* data,
size_t offset = 0);
} // namespace aria2

View File

@ -0,0 +1,55 @@
#include "WrDiskCacheEntry.h"
#include <cstring>
#include <cppunit/extensions/HelperMacros.h>
#include "TestUtil.h"
#include "DirectDiskAdaptor.h"
#include "ByteArrayDiskWriter.h"
namespace aria2 {
class WrDiskCacheEntryTest:public CppUnit::TestFixture {
CPPUNIT_TEST_SUITE(WrDiskCacheEntryTest);
CPPUNIT_TEST(testWriteToDisk);
CPPUNIT_TEST(testClear);
CPPUNIT_TEST_SUITE_END();
SharedHandle<DirectDiskAdaptor> adaptor_;
SharedHandle<ByteArrayDiskWriter> writer_;
public:
void setUp()
{
adaptor_.reset(new DirectDiskAdaptor());
writer_.reset(new ByteArrayDiskWriter());
adaptor_->setDiskWriter(writer_);
}
void testWriteToDisk();
void testClear();
};
CPPUNIT_TEST_SUITE_REGISTRATION( WrDiskCacheEntryTest );
void WrDiskCacheEntryTest::testWriteToDisk()
{
WrDiskCacheEntry e(adaptor_);
e.cacheData(createDataCell(0, "??01234567", 2));
e.cacheData(createDataCell(8, "890"));
e.writeToDisk();
CPPUNIT_ASSERT_EQUAL((size_t)0, e.getSize());
CPPUNIT_ASSERT_EQUAL(std::string("01234567890"), writer_->getString());
}
void WrDiskCacheEntryTest::testClear()
{
WrDiskCacheEntry e(adaptor_);
e.cacheData(createDataCell(0, "foo"));
e.clear();
CPPUNIT_ASSERT_EQUAL((size_t)0, e.getSize());
CPPUNIT_ASSERT_EQUAL(std::string(), writer_->getString());
}
} // namespace aria2

74
test/WrDiskCacheTest.cc Normal file
View File

@ -0,0 +1,74 @@
#include "WrDiskCache.h"
#include <cstring>
#include <cppunit/extensions/HelperMacros.h>
#include "TestUtil.h"
#include "DirectDiskAdaptor.h"
#include "ByteArrayDiskWriter.h"
namespace aria2 {
class WrDiskCacheTest:public CppUnit::TestFixture {
CPPUNIT_TEST_SUITE(WrDiskCacheTest);
CPPUNIT_TEST(testAdd);
CPPUNIT_TEST_SUITE_END();
SharedHandle<DirectDiskAdaptor> adaptor_;
SharedHandle<ByteArrayDiskWriter> writer_;
public:
void setUp()
{
adaptor_.reset(new DirectDiskAdaptor());
writer_.reset(new ByteArrayDiskWriter());
adaptor_->setDiskWriter(writer_);
}
void testAdd();
};
CPPUNIT_TEST_SUITE_REGISTRATION( WrDiskCacheTest );
void WrDiskCacheTest::testAdd()
{
WrDiskCache dc(20);
CPPUNIT_ASSERT_EQUAL((size_t)0, dc.getSize());
WrDiskCacheEntry e1(adaptor_);
e1.cacheData(createDataCell(0, "who knows?"));
CPPUNIT_ASSERT(dc.add(&e1));
CPPUNIT_ASSERT_EQUAL((size_t)10, dc.getSize());
WrDiskCacheEntry e2(adaptor_);
e2.cacheData(createDataCell(21, "seconddata"));
CPPUNIT_ASSERT(dc.add(&e2));
CPPUNIT_ASSERT_EQUAL((size_t)20, dc.getSize());
WrDiskCacheEntry e3(adaptor_);
e3.cacheData(createDataCell(10, "hello"));
CPPUNIT_ASSERT(dc.add(&e3));
CPPUNIT_ASSERT_EQUAL((size_t)15, dc.getSize());
// e1 is flushed to the disk
CPPUNIT_ASSERT_EQUAL(std::string("who knows?"), writer_->getString());
CPPUNIT_ASSERT_EQUAL((size_t)0, e1.getSize());
e3.cacheData(createDataCell(15, " world"));
CPPUNIT_ASSERT(dc.update(&e3, 6));
// e3 is flushed to the disk
CPPUNIT_ASSERT_EQUAL(std::string("who knows?hello world"),
writer_->getString());
CPPUNIT_ASSERT_EQUAL((size_t)0, e3.getSize());
CPPUNIT_ASSERT_EQUAL((size_t)10, dc.getSize());
e2.cacheData(createDataCell(31, "01234567890"));
CPPUNIT_ASSERT(dc.update(&e2, 11));
// e2 is flushed to the disk
CPPUNIT_ASSERT_EQUAL(std::string("who knows?hello worldseconddata01234567890"),
writer_->getString());
CPPUNIT_ASSERT_EQUAL((size_t)0, e2.getSize());
CPPUNIT_ASSERT_EQUAL((size_t)0, dc.getSize());
}
} // namespace aria2