/* */ #include "SocketBuffer.h" #include #include #include "SocketCore.h" #include "DlAbortEx.h" #include "message.h" #include "fmt.h" #include "LogFactory.h" #include "a2functional.h" namespace aria2 { SocketBuffer::ByteArrayBufEntry::ByteArrayBufEntry( unsigned char* bytes, size_t length, std::unique_ptr progressUpdate) : BufEntry(std::move(progressUpdate)), bytes_(bytes), length_(length) { } SocketBuffer::ByteArrayBufEntry::~ByteArrayBufEntry() { delete[] bytes_; } ssize_t SocketBuffer::ByteArrayBufEntry::send(const std::shared_ptr& socket, size_t offset) { return socket->writeData(bytes_ + offset, length_ - offset); } bool SocketBuffer::ByteArrayBufEntry::final(size_t offset) const { return length_ <= offset; } size_t SocketBuffer::ByteArrayBufEntry::getLength() const { return length_; } const unsigned char* SocketBuffer::ByteArrayBufEntry::getData() const { return bytes_; } SocketBuffer::StringBufEntry::StringBufEntry( std::string s, std::unique_ptr progressUpdate) : BufEntry(std::move(progressUpdate)), str_(std::move(s)) { } ssize_t SocketBuffer::StringBufEntry::send(const std::shared_ptr& socket, size_t offset) { return socket->writeData(str_.data() + offset, str_.size() - offset); } bool SocketBuffer::StringBufEntry::final(size_t offset) const { return str_.size() <= offset; } size_t SocketBuffer::StringBufEntry::getLength() const { return str_.size(); } const unsigned char* SocketBuffer::StringBufEntry::getData() const { return reinterpret_cast(str_.c_str()); } SocketBuffer::SocketBuffer(std::shared_ptr socket) : socket_(std::move(socket)), offset_(0) { } SocketBuffer::~SocketBuffer() {} void SocketBuffer::pushBytes(unsigned char* bytes, size_t len, std::unique_ptr progressUpdate) { if (len > 0) { bufq_.push_back( make_unique(bytes, len, std::move(progressUpdate))); } } void SocketBuffer::pushStr(std::string data, std::unique_ptr progressUpdate) { if (!data.empty()) { bufq_.push_back(make_unique(std::move(data), std::move(progressUpdate))); } } ssize_t SocketBuffer::send() { a2iovec iov[A2_IOV_MAX]; size_t totalslen = 0; while (!bufq_.empty()) { size_t num; size_t bufqlen = bufq_.size(); ssize_t amount = 24_k; ssize_t firstlen = bufq_.front()->getLength() - offset_; amount -= firstlen; iov[0].A2IOVEC_BASE = reinterpret_cast( const_cast(bufq_.front()->getData() + offset_)); iov[0].A2IOVEC_LEN = firstlen; num = 1; for (auto i = std::begin(bufq_) + 1, eoi = std::end(bufq_); i != eoi && num < A2_IOV_MAX && num < bufqlen && amount > 0; ++i, ++num) { ssize_t len = (*i)->getLength(); if (amount < len) { break; } amount -= len; iov[num].A2IOVEC_BASE = reinterpret_cast(const_cast((*i)->getData())); iov[num].A2IOVEC_LEN = len; } ssize_t slen = socket_->writeVector(iov, num); if (slen == 0 && !socket_->wantRead() && !socket_->wantWrite()) { throw DL_ABORT_EX(fmt(EX_SOCKET_SEND, "Connection closed.")); } // A2_LOG_NOTICE(fmt("num=%zu, amount=%d, bufq.size()=%zu, SEND=%d", // num, amount, bufq_.size(), slen)); totalslen += slen; if (firstlen > slen) { offset_ += slen; bufq_.front()->progressUpdate(slen, false); if (socket_->wantRead() || socket_->wantWrite()) { goto fin; } continue; } slen -= firstlen; bufq_.front()->progressUpdate(firstlen, true); bufq_.pop_front(); offset_ = 0; for (size_t i = 1; i < num; ++i) { auto& buf = bufq_.front(); ssize_t len = buf->getLength(); if (len > slen) { offset_ = slen; bufq_.front()->progressUpdate(slen, false); goto fin; } slen -= len; bufq_.front()->progressUpdate(len, true); bufq_.pop_front(); } } fin: return totalslen; } bool SocketBuffer::sendBufferIsEmpty() const { return bufq_.empty(); } } // namespace aria2