Rewritten SocketBuffer::BufEntry and SocketBuffer::send()

pull/1/head
Tatsuhiro Tsujikawa 2011-03-18 17:20:37 +09:00
parent 18d51a3e20
commit 3d2fa5954e
2 changed files with 77 additions and 62 deletions

View File

@ -44,57 +44,77 @@
namespace aria2 { namespace aria2 {
SocketBuffer::ByteArrayBufEntry::ByteArrayBufEntry
(unsigned char* bytes, size_t length)
: bytes_(bytes), length_(length)
{}
SocketBuffer::ByteArrayBufEntry::~ByteArrayBufEntry()
{
delete [] bytes_;
}
ssize_t SocketBuffer::ByteArrayBufEntry::send
(const SharedHandle<SocketCore>& socket, size_t offset)
{
return socket->writeData(bytes_+offset, length_-offset);
}
bool SocketBuffer::ByteArrayBufEntry::final(size_t offset) const
{
return length_ <= offset;
}
SocketBuffer::StringBufEntry::StringBufEntry(const std::string& s)
: str_(s)
{}
ssize_t SocketBuffer::StringBufEntry::send
(const SharedHandle<SocketCore>& socket, size_t offset)
{
return socket->writeData(str_.data()+offset, str_.size()-offset);
}
bool SocketBuffer::StringBufEntry::final(size_t offset) const
{
return str_.size() <= offset;
}
SocketBuffer::SocketBuffer(const SharedHandle<SocketCore>& socket): SocketBuffer::SocketBuffer(const SharedHandle<SocketCore>& socket):
socket_(socket), offset_(0) {} socket_(socket), offset_(0) {}
SocketBuffer::~SocketBuffer() SocketBuffer::~SocketBuffer() {}
{
std::for_each(bufq_.begin(), bufq_.end(),
std::mem_fun_ref(&BufEntry::deleteBuf));
}
void SocketBuffer::pushBytes(unsigned char* bytes, size_t len) void SocketBuffer::pushBytes(unsigned char* bytes, size_t len)
{ {
bufq_.push_back(BufEntry(bytes, len)); if(len > 0) {
bufq_.push_back(SharedHandle<BufEntry>(new ByteArrayBufEntry(bytes, len)));
}
} }
void SocketBuffer::pushStr(const std::string& data) void SocketBuffer::pushStr(const std::string& data)
{ {
bufq_.push_back(BufEntry(data)); if(data.size() > 0) {
bufq_.push_back(SharedHandle<BufEntry>(new StringBufEntry(data)));
}
} }
ssize_t SocketBuffer::send() ssize_t SocketBuffer::send()
{ {
size_t totalslen = 0; size_t totalslen = 0;
while(!bufq_.empty()) { while(!bufq_.empty()) {
BufEntry& buf = bufq_[0]; const SharedHandle<BufEntry>& buf = bufq_[0];
if(buf.size() == 0) { ssize_t slen = buf->send(socket_, offset_);
buf.deleteBuf();
bufq_.pop_front();
continue;
}
const char* data;
ssize_t r;
if(buf.type == TYPE_BYTES) {
data = reinterpret_cast<const char*>(buf.bytes);
r = buf.bytesLen-offset_;
} else {
const std::string& str = *buf.str;
data = str.data();
r = str.size()-offset_;
}
ssize_t slen = socket_->writeData(data+offset_, r);
if(slen == 0 && !socket_->wantRead() && !socket_->wantWrite()) { if(slen == 0 && !socket_->wantRead() && !socket_->wantWrite()) {
throw DL_ABORT_EX(fmt(EX_SOCKET_SEND, "Connection closed.")); throw DL_ABORT_EX(fmt(EX_SOCKET_SEND, "Connection closed."));
} }
totalslen += slen; totalslen += slen;
if(slen < r) { offset_ += slen;
offset_ += slen; if(buf->final(offset_)) {
break;
} else {
offset_ = 0;
buf.deleteBuf();
bufq_.pop_front(); bufq_.pop_front();
offset_ = 0;
} else {
break;
} }
} }
return totalslen; return totalslen;

View File

@ -48,44 +48,39 @@ class SocketCore;
class SocketBuffer { class SocketBuffer {
private: private:
enum BUF_TYPE { class BufEntry {
TYPE_BYTES, public:
TYPE_STR virtual ~BufEntry() {}
virtual ssize_t send
(const SharedHandle<SocketCore>& socket, size_t offset) = 0;
virtual bool final(size_t offset) const = 0;
}; };
struct BufEntry {
BUF_TYPE type;
unsigned char* bytes;
size_t bytesLen;
std::string* str;
void deleteBuf() class ByteArrayBufEntry:public BufEntry {
{ public:
if(type == TYPE_BYTES) { ByteArrayBufEntry(unsigned char* bytes, size_t length);
delete [] bytes; virtual ~ByteArrayBufEntry();
} else if(type == TYPE_STR) { virtual ssize_t send
delete str; (const SharedHandle<SocketCore>& socket, size_t offset);
} virtual bool final(size_t offset) const;
} private:
unsigned char* bytes_;
size_t length_;
};
size_t size() const class StringBufEntry:public BufEntry {
{ public:
if(type == TYPE_BYTES) { StringBufEntry(const std::string& s);
return bytesLen; virtual ssize_t send
} else { (const SharedHandle<SocketCore>& socket, size_t offset);
return str->size(); virtual bool final(size_t offset) const;
} private:
} std::string str_;
BufEntry(unsigned char* bytes, size_t len):
type(TYPE_BYTES), bytes(bytes), bytesLen(len) {}
BufEntry(const std::string& str):
type(TYPE_STR), str(new std::string(str)) {}
}; };
SharedHandle<SocketCore> socket_; SharedHandle<SocketCore> socket_;
std::deque<BufEntry> bufq_; std::deque<SharedHandle<BufEntry> > bufq_;
// Offset of data in bufq_[0]. SocketBuffer tries to send bufq_[0], // Offset of data in bufq_[0]. SocketBuffer tries to send bufq_[0],
// but it cannot always send whole data. In this case, offset points // but it cannot always send whole data. In this case, offset points
@ -100,7 +95,7 @@ public:
SocketBuffer(const SocketBuffer&); SocketBuffer(const SocketBuffer&);
SocketBuffer& operator=(const SocketBuffer&); SocketBuffer& operator=(const SocketBuffer&);
// Feeds data pointered by bytes with length len. into queue. This // Feeds data pointered by bytes with length len into queue. This
// object gets ownership of bytes, so caller must not delete or // object gets ownership of bytes, so caller must not delete or
// later bytes after this call. This function doesn't send data. // later bytes after this call. This function doesn't send data.
void pushBytes(unsigned char* bytes, size_t len); void pushBytes(unsigned char* bytes, size_t len);