mirror of https://github.com/v2ray/v2ray-core
simplify reader/writer interface
parent
308c40553c
commit
70f803173a
|
@ -20,15 +20,11 @@ func ReadFrom(reader io.Reader, buffer *alloc.Buffer) (*alloc.Buffer, error) {
|
||||||
|
|
||||||
// Reader extends io.Reader with alloc.Buffer.
|
// Reader extends io.Reader with alloc.Buffer.
|
||||||
type Reader interface {
|
type Reader interface {
|
||||||
|
common.Releasable
|
||||||
// Read reads content from underlying reader, and put it into an alloc.Buffer.
|
// Read reads content from underlying reader, and put it into an alloc.Buffer.
|
||||||
Read() (*alloc.Buffer, error)
|
Read() (*alloc.Buffer, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type ReleasableReader interface {
|
|
||||||
Reader
|
|
||||||
common.Releasable
|
|
||||||
}
|
|
||||||
|
|
||||||
// AdaptiveReader is a Reader that adjusts its reading speed automatically.
|
// AdaptiveReader is a Reader that adjusts its reading speed automatically.
|
||||||
type AdaptiveReader struct {
|
type AdaptiveReader struct {
|
||||||
reader io.Reader
|
reader io.Reader
|
||||||
|
|
|
@ -3,21 +3,17 @@ package io
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
"github.com/v2ray/v2ray-core/common"
|
"github.com/v2ray/v2ray-core/common"
|
||||||
"github.com/v2ray/v2ray-core/common/alloc"
|
"github.com/v2ray/v2ray-core/common/alloc"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Writer extends io.Writer with alloc.Buffer.
|
// Writer extends io.Writer with alloc.Buffer.
|
||||||
type Writer interface {
|
type Writer interface {
|
||||||
|
common.Releasable
|
||||||
// Write writes an alloc.Buffer into underlying writer.
|
// Write writes an alloc.Buffer into underlying writer.
|
||||||
Write(*alloc.Buffer) error
|
Write(*alloc.Buffer) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type ReleasableWriter interface {
|
|
||||||
Writer
|
|
||||||
common.Releasable
|
|
||||||
}
|
|
||||||
|
|
||||||
// AdaptiveWriter is a Writer that writes alloc.Buffer into underlying writer.
|
// AdaptiveWriter is a Writer that writes alloc.Buffer into underlying writer.
|
||||||
type AdaptiveWriter struct {
|
type AdaptiveWriter struct {
|
||||||
writer io.Writer
|
writer io.Writer
|
||||||
|
|
|
@ -66,6 +66,11 @@ func NewChunkReader(reader io.Reader, auth *Authenticator) *ChunkReader {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *ChunkReader) Release() {
|
||||||
|
this.reader = nil
|
||||||
|
this.auth = nil
|
||||||
|
}
|
||||||
|
|
||||||
func (this *ChunkReader) Read() (*alloc.Buffer, error) {
|
func (this *ChunkReader) Read() (*alloc.Buffer, error) {
|
||||||
buffer := alloc.NewLargeBuffer()
|
buffer := alloc.NewLargeBuffer()
|
||||||
if _, err := io.ReadFull(this.reader, buffer.Value[:2]); err != nil {
|
if _, err := io.ReadFull(this.reader, buffer.Value[:2]); err != nil {
|
||||||
|
|
|
@ -234,6 +234,7 @@ func (this *Shadowsocks) handleConnection(conn *hub.TCPConn) {
|
||||||
|
|
||||||
v2io.ReaderToChan(ray.InboundInput(), payloadReader)
|
v2io.ReaderToChan(ray.InboundInput(), payloadReader)
|
||||||
close(ray.InboundInput())
|
close(ray.InboundInput())
|
||||||
|
payloadReader.Release()
|
||||||
|
|
||||||
writeFinish.Lock()
|
writeFinish.Lock()
|
||||||
}
|
}
|
||||||
|
|
|
@ -148,7 +148,7 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.TCPConn) {
|
||||||
defer close(input)
|
defer close(input)
|
||||||
defer readFinish.Unlock()
|
defer readFinish.Unlock()
|
||||||
bodyReader := session.DecodeRequestBody(reader)
|
bodyReader := session.DecodeRequestBody(reader)
|
||||||
var requestReader v2io.ReleasableReader
|
var requestReader v2io.Reader
|
||||||
if request.Option.IsChunkStream() {
|
if request.Option.IsChunkStream() {
|
||||||
requestReader = vmessio.NewAuthChunkReader(bodyReader)
|
requestReader = vmessio.NewAuthChunkReader(bodyReader)
|
||||||
} else {
|
} else {
|
||||||
|
@ -179,12 +179,12 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.TCPConn) {
|
||||||
|
|
||||||
writer.SetCached(false)
|
writer.SetCached(false)
|
||||||
go func(finish *sync.Mutex) {
|
go func(finish *sync.Mutex) {
|
||||||
var writer v2io.ReleasableWriter = v2io.NewAdaptiveWriter(bodyWriter)
|
var writer v2io.Writer = v2io.NewAdaptiveWriter(bodyWriter)
|
||||||
if request.Option.IsChunkStream() {
|
if request.Option.IsChunkStream() {
|
||||||
writer = vmessio.NewAuthChunkWriter(writer)
|
writer = vmessio.NewAuthChunkWriter(writer)
|
||||||
}
|
}
|
||||||
v2io.ChanToWriter(writer, output)
|
v2io.ChanToWriter(writer, output)
|
||||||
writer.Release()
|
writer.Release()
|
||||||
finish.Unlock()
|
finish.Unlock()
|
||||||
}(&writeFinish)
|
}(&writeFinish)
|
||||||
writeFinish.Lock()
|
writeFinish.Lock()
|
||||||
|
|
|
@ -9,10 +9,10 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type AuthChunkWriter struct {
|
type AuthChunkWriter struct {
|
||||||
writer v2io.ReleasableWriter
|
writer v2io.Writer
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewAuthChunkWriter(writer v2io.ReleasableWriter) *AuthChunkWriter {
|
func NewAuthChunkWriter(writer v2io.Writer) *AuthChunkWriter {
|
||||||
return &AuthChunkWriter{
|
return &AuthChunkWriter{
|
||||||
writer: writer,
|
writer: writer,
|
||||||
}
|
}
|
||||||
|
@ -25,7 +25,7 @@ func (this *AuthChunkWriter) Write(buffer *alloc.Buffer) error {
|
||||||
|
|
||||||
func (this *AuthChunkWriter) Release() {
|
func (this *AuthChunkWriter) Release() {
|
||||||
this.writer.Release()
|
this.writer.Release()
|
||||||
this.writer = nil
|
this.writer = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func Authenticate(buffer *alloc.Buffer) {
|
func Authenticate(buffer *alloc.Buffer) {
|
||||||
|
|
|
@ -116,12 +116,12 @@ func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn
|
||||||
writer.SetCached(false)
|
writer.SetCached(false)
|
||||||
|
|
||||||
if moreChunks {
|
if moreChunks {
|
||||||
var streamWriter v2io.ReleasableWriter = v2io.NewAdaptiveWriter(bodyWriter)
|
var streamWriter v2io.Writer = v2io.NewAdaptiveWriter(bodyWriter)
|
||||||
if request.Option.IsChunkStream() {
|
if request.Option.IsChunkStream() {
|
||||||
streamWriter = vmessio.NewAuthChunkWriter(streamWriter)
|
streamWriter = vmessio.NewAuthChunkWriter(streamWriter)
|
||||||
}
|
}
|
||||||
v2io.ChanToWriter(streamWriter, input)
|
v2io.ChanToWriter(streamWriter, input)
|
||||||
streamWriter.Release()
|
streamWriter.Release()
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -150,6 +150,7 @@ func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, con
|
||||||
}
|
}
|
||||||
|
|
||||||
v2io.ReaderToChan(output, bodyReader)
|
v2io.ReaderToChan(output, bodyReader)
|
||||||
|
bodyReader.Release()
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue