/* */ #include "SinkStreamFilter.h" #include #include #include "BinaryStream.h" #include "Segment.h" #include "WrDiskCache.h" #include "Piece.h" namespace aria2 { const std::string SinkStreamFilter::NAME("SinkStreamFilter"); SinkStreamFilter::SinkStreamFilter(WrDiskCache* wrDiskCache, bool hashUpdate): wrDiskCache_(wrDiskCache), hashUpdate_(hashUpdate), bytesProcessed_(0) {} ssize_t SinkStreamFilter::transform (const std::shared_ptr& out, const std::shared_ptr& segment, const unsigned char* inbuf, size_t inlen) { size_t wlen; if(inlen > 0) { if(segment->getLength() > 0) { // We must not write data larger than available space in // segment. assert(segment->getLength() >= segment->getWrittenLength()); size_t lenAvail = segment->getLength()-segment->getWrittenLength(); wlen = std::min(inlen, lenAvail); } else { wlen = inlen; } const std::shared_ptr& piece = segment->getPiece(); if(piece->getWrDiskCacheEntry()) { assert(wrDiskCache_); // If we receive small data (e.g., 1 or 2 bytes), cache entry // becomes a headache. To mitigate this problem, we allocate // cache buffer at least 4KiB and append the data to the // contagious cache data. size_t alen = piece->appendWrCache(wrDiskCache_, segment->getPositionToWrite(), inbuf, wlen); if(alen < wlen) { size_t len = wlen - alen; size_t capacity = std::max(len, static_cast(4096)); auto dataCopy = new unsigned char[capacity]; memcpy(dataCopy, inbuf + alen, len); piece->updateWrCache(wrDiskCache_, dataCopy, 0, len, capacity, segment->getPositionToWrite() + alen); } } else { out->writeData(inbuf, wlen, segment->getPositionToWrite()); } #ifdef ENABLE_MESSAGE_DIGEST if(hashUpdate_) { segment->updateHash(segment->getWrittenLength(), inbuf, wlen); } #endif // ENABLE_MESSAGE_DIGEST segment->updateWrittenLength(wlen); } else { wlen = 0; } bytesProcessed_ = wlen; return bytesProcessed_; } } // namespace aria2