diff --git a/src/BtPieceMessage.cc b/src/BtPieceMessage.cc index 108b39ce..92f8193b 100644 --- a/src/BtPieceMessage.cc +++ b/src/BtPieceMessage.cc @@ -128,7 +128,7 @@ void BtPieceMessage::doReceivedAction() unsigned char* dataCopy = new unsigned char[blockLength_]; memcpy(dataCopy, data_+9, blockLength_); piece->updateWrCache(getPieceStorage()->getWrDiskCache(), - dataCopy, 0, blockLength_, offset); + dataCopy, 0, blockLength_, blockLength_, offset); } else { getPieceStorage()->getDiskAdaptor()->writeData(data_+9, blockLength_, offset); diff --git a/src/Piece.cc b/src/Piece.cc index 1a667a0e..d2751da7 100644 --- a/src/Piece.cc +++ b/src/Piece.cc @@ -351,7 +351,8 @@ void Piece::clearWrCache(WrDiskCache* diskCache) } void Piece::updateWrCache(WrDiskCache* diskCache, unsigned char* data, - size_t offset, size_t len, int64_t goff) + size_t offset, size_t len, size_t capacity, + int64_t goff) { if(!diskCache) { return; @@ -363,6 +364,7 @@ void Piece::updateWrCache(WrDiskCache* diskCache, unsigned char* data, cell->data = data; cell->offset = offset; cell->len = len; + cell->capacity = capacity; bool rv; rv = wrCache_->cacheData(cell); assert(rv); @@ -370,6 +372,22 @@ void Piece::updateWrCache(WrDiskCache* diskCache, unsigned char* data, assert(rv); } +size_t Piece::appendWrCache(WrDiskCache* diskCache, int64_t goff, + const unsigned char* data, size_t len) +{ + if(!diskCache) { + return 0; + } + assert(wrCache_); + size_t delta = wrCache_->append(goff, data, len); + bool rv; + if(delta > 0) { + rv = diskCache->update(wrCache_, delta); + assert(rv); + } + return delta; +} + void Piece::releaseWrCache(WrDiskCache* diskCache) { if(diskCache && wrCache_) { diff --git a/src/Piece.h b/src/Piece.h index ed605e9b..ca3573ee 100644 --- a/src/Piece.h +++ b/src/Piece.h @@ -208,7 +208,14 @@ public: void flushWrCache(WrDiskCache* diskCache); void clearWrCache(WrDiskCache* diskCache); void updateWrCache(WrDiskCache* diskCache, unsigned char* data, - size_t offset, size_t len, int64_t goff); + size_t offset, size_t len, size_t capacity, int64_t goff); + void updateWrCache(WrDiskCache* diskCache, unsigned char* data, + size_t offset, size_t len, int64_t goff) + { + updateWrCache(diskCache, data, offset, len, len, goff); + } + size_t appendWrCache(WrDiskCache* diskCache, int64_t goff, + const unsigned char* data, size_t len); void releaseWrCache(WrDiskCache* diskCache); WrDiskCacheEntry* getWrDiskCacheEntry() const { diff --git a/src/SinkStreamFilter.cc b/src/SinkStreamFilter.cc index a5e4566b..e7f296ea 100644 --- a/src/SinkStreamFilter.cc +++ b/src/SinkStreamFilter.cc @@ -67,12 +67,23 @@ ssize_t SinkStreamFilter::transform wlen = inlen; } const SharedHandle& piece = segment->getPiece(); - if(piece && piece->getWrDiskCacheEntry()) { + if(piece->getWrDiskCacheEntry()) { assert(wrDiskCache_); - unsigned char* dataCopy = new unsigned char[wlen]; - memcpy(dataCopy, inbuf, wlen); - piece->updateWrCache(wrDiskCache_, dataCopy, 0, wlen, - segment->getPositionToWrite()); + // If we receive small data (e.g., 1 or 2 bytes), cache entry + // becomes a headache. To mitigate this problem, we allocate + // cache buffer at least 4KiB and append the data to the + // contagious cache data. + size_t alen = piece->appendWrCache(wrDiskCache_, + segment->getPositionToWrite(), + inbuf, wlen); + if(alen < wlen) { + size_t len = wlen - alen; + size_t capacity = std::max(len, static_cast(4096)); + unsigned char* dataCopy = new unsigned char[capacity]; + memcpy(dataCopy, inbuf + alen, len); + piece->updateWrCache(wrDiskCache_, dataCopy, 0, len, capacity, + segment->getPositionToWrite() + alen); + } } else { out->writeData(inbuf, wlen, segment->getPositionToWrite()); } diff --git a/src/WrDiskCacheEntry.cc b/src/WrDiskCacheEntry.cc index 53e30eb1..77b5a2f2 100644 --- a/src/WrDiskCacheEntry.cc +++ b/src/WrDiskCacheEntry.cc @@ -33,6 +33,9 @@ */ /* copyright --> */ #include "WrDiskCacheEntry.h" + +#include + #include "DiskAdaptor.h" #include "RecoverableException.h" #include "DownloadFailureException.h" @@ -101,4 +104,22 @@ bool WrDiskCacheEntry::cacheData(DataCell* dataCell) } } +size_t WrDiskCacheEntry::append(int64_t goff, const unsigned char *data, + size_t len) +{ + if(set_.empty()) { + return 0; + } + DataCellSet::iterator i = set_.end(); + --i; + if(static_cast((*i)->goff + (*i)->len) == goff) { + size_t wlen = std::min((*i)->capacity - (*i)->len, len); + memcpy((*i)->data + (*i)->offset + (*i)->len, data, wlen); + (*i)->len += wlen; + return wlen; + } else { + return 0; + } +} + } // namespace aria2 diff --git a/src/WrDiskCacheEntry.h b/src/WrDiskCacheEntry.h index b3b31f6b..cb0036ed 100644 --- a/src/WrDiskCacheEntry.h +++ b/src/WrDiskCacheEntry.h @@ -58,6 +58,8 @@ public: unsigned char *data; size_t offset; size_t len; + // valid memory range from data+offset + size_t capacity; bool operator<(const DataCell& rhs) const { return goff < rhs.goff; @@ -76,6 +78,11 @@ public: // Caches |dataCell| bool cacheData(DataCell* dataCell); + + // Appends into last dataCell in set_ if the region is + // contagious. Returns the number of copied bytes. + size_t append(int64_t goff, const unsigned char *data, size_t len); + size_t getSize() const { return size_; diff --git a/test/MockSegment.h b/test/MockSegment.h index dc6c1af4..42e29de1 100644 --- a/test/MockSegment.h +++ b/test/MockSegment.h @@ -71,7 +71,7 @@ public: virtual SharedHandle getPiece() const { - return SharedHandle(); + return SharedHandle(new Piece()); } }; diff --git a/test/PieceTest.cc b/test/PieceTest.cc index bc594b17..be8354ba 100644 --- a/test/PieceTest.cc +++ b/test/PieceTest.cc @@ -17,6 +17,7 @@ class PieceTest:public CppUnit::TestFixture { CPPUNIT_TEST(testCompleteBlock); CPPUNIT_TEST(testGetCompletedLength); CPPUNIT_TEST(testFlushWrCache); + CPPUNIT_TEST(testAppendWrCache); #ifdef ENABLE_MESSAGE_DIGEST CPPUNIT_TEST(testGetDigestWithWrCache); @@ -39,6 +40,7 @@ public: void testCompleteBlock(); void testGetCompletedLength(); void testFlushWrCache(); + void testAppendWrCache(); #ifdef ENABLE_MESSAGE_DIGEST @@ -100,6 +102,23 @@ void PieceTest::testFlushWrCache() CPPUNIT_ASSERT(!p.getWrDiskCacheEntry()); } +void PieceTest::testAppendWrCache() +{ + unsigned char* data; + Piece p(0, 1024); + WrDiskCache dc(1024); + p.initWrCache(&dc, adaptor_); + size_t capacity = 6; + data = new unsigned char[capacity]; + memcpy(data, "foo", 3); + p.updateWrCache(&dc, data, 0, 3, capacity, 0); + size_t alen = p.appendWrCache + (&dc, 3, reinterpret_cast("barbaz"), 6); + CPPUNIT_ASSERT_EQUAL((size_t)3, alen); + p.flushWrCache(&dc); + CPPUNIT_ASSERT_EQUAL(std::string("foobar"), writer_->getString()); +} + #ifdef ENABLE_MESSAGE_DIGEST void PieceTest::testGetDigestWithWrCache() diff --git a/test/TestUtil.cc b/test/TestUtil.cc index 829cb525..64621f25 100644 --- a/test/TestUtil.cc +++ b/test/TestUtil.cc @@ -101,7 +101,7 @@ WrDiskCacheEntry::DataCell* createDataCell(int64_t goff, cell->data = new unsigned char[len]; memcpy(cell->data, data, len); cell->offset = offset; - cell->len = len - offset; + cell->len = cell->capacity = len - offset; return cell; } diff --git a/test/WrDiskCacheEntryTest.cc b/test/WrDiskCacheEntryTest.cc index ad1c894b..d58dc1da 100644 --- a/test/WrDiskCacheEntryTest.cc +++ b/test/WrDiskCacheEntryTest.cc @@ -14,6 +14,7 @@ class WrDiskCacheEntryTest:public CppUnit::TestFixture { CPPUNIT_TEST_SUITE(WrDiskCacheEntryTest); CPPUNIT_TEST(testWriteToDisk); + CPPUNIT_TEST(testAppend); CPPUNIT_TEST(testClear); CPPUNIT_TEST_SUITE_END(); @@ -28,6 +29,7 @@ public: } void testWriteToDisk(); + void testAppend(); void testClear(); }; @@ -43,6 +45,27 @@ void WrDiskCacheEntryTest::testWriteToDisk() CPPUNIT_ASSERT_EQUAL(std::string("01234567890"), writer_->getString()); } +void WrDiskCacheEntryTest::testAppend() +{ + WrDiskCacheEntry e(adaptor_); + WrDiskCacheEntry::DataCell* cell = new WrDiskCacheEntry::DataCell(); + cell->goff = 0; + size_t capacity = 6; + size_t offset = 2; + cell->data = new unsigned char[offset+capacity]; + memcpy(cell->data, "??foo", 3); + cell->offset = offset; + cell->len = 3; + cell->capacity = capacity; + e.cacheData(cell); + CPPUNIT_ASSERT_EQUAL((size_t)3, + e.append(3, (const unsigned char*)"barbaz", 6)); + CPPUNIT_ASSERT_EQUAL((size_t)6, cell->len); + + CPPUNIT_ASSERT_EQUAL((size_t)0, + e.append(7, (const unsigned char*)"FOO", 3)); +} + void WrDiskCacheEntryTest::testClear() { WrDiskCacheEntry e(adaptor_);