rename cached to buffered

pull/255/merge
Darien Raymond 2016-12-27 21:41:44 +01:00
parent 7d1426ff7f
commit c347e50c28
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
9 changed files with 43 additions and 40 deletions

View File

@ -9,17 +9,17 @@ import (
// BufferedReader is a reader with internal cache. // BufferedReader is a reader with internal cache.
type BufferedReader struct { type BufferedReader struct {
reader io.Reader reader io.Reader
buffer *buf.Buffer buffer *buf.Buffer
cached bool buffered bool
} }
// NewReader creates a new BufferedReader based on an io.Reader. // NewReader creates a new BufferedReader based on an io.Reader.
func NewReader(rawReader io.Reader) *BufferedReader { func NewReader(rawReader io.Reader) *BufferedReader {
return &BufferedReader{ return &BufferedReader{
reader: rawReader, reader: rawReader,
buffer: buf.New(), buffer: buf.New(),
cached: true, buffered: true,
} }
} }
@ -33,20 +33,20 @@ func (v *BufferedReader) Release() {
common.Release(v.reader) common.Release(v.reader)
} }
// Cached returns true if the internal cache is effective. // IsBuffered returns true if the internal cache is effective.
func (v *BufferedReader) Cached() bool { func (v *BufferedReader) IsBuffered() bool {
return v.cached return v.buffered
} }
// SetCached is to enable or disable internal cache. If cache is disabled, // SetBuffered is to enable or disable internal cache. If cache is disabled,
// Read() and Write() calls will be delegated to the underlying io.Reader directly. // Read() calls will be delegated to the underlying io.Reader directly.
func (v *BufferedReader) SetCached(cached bool) { func (v *BufferedReader) SetBuffered(cached bool) {
v.cached = cached v.buffered = cached
} }
// Read implements io.Reader.Read(). // Read implements io.Reader.Read().
func (v *BufferedReader) Read(b []byte) (int, error) { func (v *BufferedReader) Read(b []byte) (int, error) {
if !v.cached || v.buffer == nil { if !v.buffered || v.buffer == nil {
if !v.buffer.IsEmpty() { if !v.buffer.IsEmpty() {
return v.buffer.Read(b) return v.buffer.Read(b)
} }

View File

@ -18,7 +18,7 @@ func TestBufferedReader(t *testing.T) {
len := content.Len() len := content.Len()
reader := NewReader(content) reader := NewReader(content)
assert.Bool(reader.Cached()).IsTrue() assert.Bool(reader.IsBuffered()).IsTrue()
payload := make([]byte, 16) payload := make([]byte, 16)

View File

@ -8,17 +8,20 @@ import (
"v2ray.com/core/common/errors" "v2ray.com/core/common/errors"
) )
// BufferedWriter is an io.Writer with internal buffer. It writes to underlying writer when buffer is full or on demand.
// This type is not thread safe.
type BufferedWriter struct { type BufferedWriter struct {
writer io.Writer writer io.Writer
buffer *buf.Buffer buffer *buf.Buffer
cached bool buffered bool
} }
// NewWriter creates a new BufferedWriter.
func NewWriter(rawWriter io.Writer) *BufferedWriter { func NewWriter(rawWriter io.Writer) *BufferedWriter {
return &BufferedWriter{ return &BufferedWriter{
writer: rawWriter, writer: rawWriter,
buffer: buf.NewSmall(), buffer: buf.NewSmall(),
cached: true, buffered: true,
} }
} }
@ -41,7 +44,7 @@ func (v *BufferedWriter) ReadFrom(reader io.Reader) (int64, error) {
} }
func (v *BufferedWriter) Write(b []byte) (int, error) { func (v *BufferedWriter) Write(b []byte) (int, error) {
if !v.cached || v.buffer == nil { if !v.buffered || v.buffer == nil {
return v.writer.Write(b) return v.writer.Write(b)
} }
nBytes, err := v.buffer.Write(b) nBytes, err := v.buffer.Write(b)
@ -74,12 +77,12 @@ func (v *BufferedWriter) Flush() error {
return nil return nil
} }
func (v *BufferedWriter) Cached() bool { func (v *BufferedWriter) Buffered() bool {
return v.cached return v.buffered
} }
func (v *BufferedWriter) SetCached(cached bool) { func (v *BufferedWriter) SetBuffered(cached bool) {
v.cached = cached v.buffered = cached
if !cached && !v.buffer.IsEmpty() { if !cached && !v.buffer.IsEmpty() {
v.Flush() v.Flush()
} }

View File

@ -15,7 +15,7 @@ func TestBufferedWriter(t *testing.T) {
content := buf.New() content := buf.New()
writer := NewWriter(content) writer := NewWriter(content)
assert.Bool(writer.Cached()).IsTrue() assert.Bool(writer.Buffered()).IsTrue()
payload := make([]byte, 16) payload := make([]byte, 16)
@ -25,7 +25,7 @@ func TestBufferedWriter(t *testing.T) {
assert.Bool(content.IsEmpty()).IsTrue() assert.Bool(content.IsEmpty()).IsTrue()
writer.SetCached(false) writer.SetBuffered(false)
assert.Int(content.Len()).Equals(16) assert.Int(content.Len()).Equals(16)
} }
@ -35,7 +35,7 @@ func TestBufferedWriterLargePayload(t *testing.T) {
content := buf.NewLocal(128 * 1024) content := buf.NewLocal(128 * 1024)
writer := NewWriter(content) writer := NewWriter(content)
assert.Bool(writer.Cached()).IsTrue() assert.Bool(writer.Buffered()).IsTrue()
payload := make([]byte, 64*1024) payload := make([]byte, 64*1024)
rand.Read(payload) rand.Read(payload)

View File

@ -126,7 +126,7 @@ func (v *Client) Dispatch(destination v2net.Destination, payload *buf.Buffer, ra
} }
}() }()
bufferedWriter.SetCached(false) bufferedWriter.SetBuffered(false)
if err := buf.PipeUntilEOF(ray.OutboundInput(), bodyWriter); err != nil { if err := buf.PipeUntilEOF(ray.OutboundInput(), bodyWriter); err != nil {
log.Info("Shadowsocks|Client: Failed to trasnport all TCP request: ", err) log.Info("Shadowsocks|Client: Failed to trasnport all TCP request: ", err)
} }

View File

@ -161,7 +161,7 @@ func (v *Server) handleConnection(conn internet.Connection) {
} }
defer bodyReader.Release() defer bodyReader.Release()
bufferedReader.SetCached(false) bufferedReader.SetBuffered(false)
userSettings := v.user.GetSettings() userSettings := v.user.GetSettings()
timedReader.SetTimeOut(userSettings.PayloadReadTimeout) timedReader.SetTimeOut(userSettings.PayloadReadTimeout)
@ -195,7 +195,7 @@ func (v *Server) handleConnection(conn internet.Connection) {
if payload, err := ray.InboundOutput().Read(); err == nil { if payload, err := ray.InboundOutput().Read(); err == nil {
responseWriter.Write(payload) responseWriter.Write(payload)
bufferedWriter.SetCached(false) bufferedWriter.SetBuffered(false)
if err := buf.PipeUntilEOF(ray.InboundOutput(), responseWriter); err != nil { if err := buf.PipeUntilEOF(ray.InboundOutput(), responseWriter); err != nil {
log.Info("Shadowsocks|Server: Failed to transport all TCP response: ", err) log.Info("Shadowsocks|Server: Failed to transport all TCP response: ", err)

View File

@ -216,8 +216,8 @@ func (v *Server) handleSocks5(clientAddr v2net.Destination, reader *bufio.Buffer
return err return err
} }
reader.SetCached(false) reader.SetBuffered(false)
writer.SetCached(false) writer.SetBuffered(false)
dest := request.Destination() dest := request.Destination()
session := &proxy.SessionInfo{ session := &proxy.SessionInfo{
@ -279,8 +279,8 @@ func (v *Server) handleSocks4(clientAddr v2net.Destination, reader *bufio.Buffer
return ErrUnsupportedSocksCommand return ErrUnsupportedSocksCommand
} }
reader.SetCached(false) reader.SetBuffered(false)
writer.SetCached(false) writer.SetBuffered(false)
dest := v2net.TCPDestination(v2net.IPAddress(auth.IP[:]), auth.Port) dest := v2net.TCPDestination(v2net.IPAddress(auth.IP[:]), auth.Port)
session := &proxy.SessionInfo{ session := &proxy.SessionInfo{

View File

@ -181,7 +181,7 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) {
userSettings := request.User.GetSettings() userSettings := request.User.GetSettings()
connReader.SetTimeOut(userSettings.PayloadReadTimeout) connReader.SetTimeOut(userSettings.PayloadReadTimeout)
reader.SetCached(false) reader.SetBuffered(false)
go func() { go func() {
bodyReader := session.DecodeRequestBody(request, reader) bodyReader := session.DecodeRequestBody(request, reader)
@ -216,7 +216,7 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) {
connection.SetReusable(false) connection.SetReusable(false)
} }
writer.SetCached(false) writer.SetBuffered(false)
if err := buf.PipeUntilEOF(output, bodyWriter); err != nil { if err := buf.PipeUntilEOF(output, bodyWriter); err != nil {
log.Debug("VMess|Inbound: Error when sending data to downstream: ", err) log.Debug("VMess|Inbound: Error when sending data to downstream: ", err)

View File

@ -110,7 +110,7 @@ func (v *VMessOutboundHandler) handleRequest(session *encoding.ClientSession, co
} }
payload.Release() payload.Release()
} }
writer.SetCached(false) writer.SetBuffered(false)
if err := buf.PipeUntilEOF(input, bodyWriter); err != nil { if err := buf.PipeUntilEOF(input, bodyWriter); err != nil {
conn.SetReusable(false) conn.SetReusable(false)
@ -141,7 +141,7 @@ func (v *VMessOutboundHandler) handleResponse(session *encoding.ClientSession, c
conn.SetReusable(header.Option.Has(protocol.ResponseOptionConnectionReuse)) conn.SetReusable(header.Option.Has(protocol.ResponseOptionConnectionReuse))
reader.SetCached(false) reader.SetBuffered(false)
bodyReader := session.DecodeResponseBody(request, reader) bodyReader := session.DecodeResponseBody(request, reader)
defer bodyReader.Release() defer bodyReader.Release()