read directly from underlying buffered reader in mux

pull/787/head
Darien Raymond 2017-11-25 01:05:30 +01:00
parent 74327ea8ae
commit 6de4ef014a
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
5 changed files with 35 additions and 35 deletions

View File

@ -192,25 +192,25 @@ func (m *Client) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) bool
return true return true
} }
func drain(reader io.Reader) error { func drain(reader *buf.BufferedReader) error {
return buf.Copy(NewStreamReader(reader), buf.Discard) return buf.Copy(NewStreamReader(reader), buf.Discard)
} }
func (m *Client) handleStatueKeepAlive(meta *FrameMetadata, reader io.Reader) error { func (m *Client) handleStatueKeepAlive(meta *FrameMetadata, reader *buf.BufferedReader) error {
if meta.Option.Has(OptionData) { if meta.Option.Has(OptionData) {
return drain(reader) return drain(reader)
} }
return nil return nil
} }
func (m *Client) handleStatusNew(meta *FrameMetadata, reader io.Reader) error { func (m *Client) handleStatusNew(meta *FrameMetadata, reader *buf.BufferedReader) error {
if meta.Option.Has(OptionData) { if meta.Option.Has(OptionData) {
return drain(reader) return drain(reader)
} }
return nil return nil
} }
func (m *Client) handleStatusKeep(meta *FrameMetadata, reader io.Reader) error { func (m *Client) handleStatusKeep(meta *FrameMetadata, reader *buf.BufferedReader) error {
if !meta.Option.Has(OptionData) { if !meta.Option.Has(OptionData) {
return nil return nil
} }
@ -221,7 +221,7 @@ func (m *Client) handleStatusKeep(meta *FrameMetadata, reader io.Reader) error {
return drain(reader) return drain(reader)
} }
func (m *Client) handleStatusEnd(meta *FrameMetadata, reader io.Reader) error { func (m *Client) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error {
if s, found := m.sessionManager.Get(meta.SessionID); found { if s, found := m.sessionManager.Get(meta.SessionID); found {
s.Close() s.Close()
} }
@ -315,14 +315,14 @@ func handle(ctx context.Context, s *Session, output buf.Writer) {
s.Close() s.Close()
} }
func (w *ServerWorker) handleStatusKeepAlive(meta *FrameMetadata, reader io.Reader) error { func (w *ServerWorker) handleStatusKeepAlive(meta *FrameMetadata, reader *buf.BufferedReader) error {
if meta.Option.Has(OptionData) { if meta.Option.Has(OptionData) {
return drain(reader) return drain(reader)
} }
return nil return nil
} }
func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata, reader io.Reader) error { func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata, reader *buf.BufferedReader) error {
log.Trace(newError("received request for ", meta.Target)) log.Trace(newError("received request for ", meta.Target))
inboundRay, err := w.dispatcher.Dispatch(ctx, meta.Target) inboundRay, err := w.dispatcher.Dispatch(ctx, meta.Target)
if err != nil { if err != nil {
@ -349,7 +349,7 @@ func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata,
return nil return nil
} }
func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader io.Reader) error { func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.BufferedReader) error {
if !meta.Option.Has(OptionData) { if !meta.Option.Has(OptionData) {
return nil return nil
} }
@ -359,7 +359,7 @@ func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader io.Reader) e
return drain(reader) return drain(reader)
} }
func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader io.Reader) error { func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error {
if s, found := w.sessionManager.Get(meta.SessionID); found { if s, found := w.sessionManager.Get(meta.SessionID); found {
s.Close() s.Close()
} }
@ -369,7 +369,7 @@ func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader io.Reader) er
return nil return nil
} }
func (w *ServerWorker) handleFrame(ctx context.Context, reader io.Reader) error { func (w *ServerWorker) handleFrame(ctx context.Context, reader *buf.BufferedReader) error {
meta, err := ReadMetadata(reader) meta, err := ReadMetadata(reader)
if err != nil { if err != nil {
return newError("failed to read metadata").Base(err) return newError("failed to read metadata").Base(err)

View File

@ -67,12 +67,12 @@ func (r *PacketReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
// StreamReader reads Mux frame as a stream. // StreamReader reads Mux frame as a stream.
type StreamReader struct { type StreamReader struct {
reader io.Reader reader *buf.BufferedReader
leftOver int leftOver int
} }
// NewStreamReader creates a new StreamReader. // NewStreamReader creates a new StreamReader.
func NewStreamReader(reader io.Reader) *StreamReader { func NewStreamReader(reader *buf.BufferedReader) *StreamReader {
return &StreamReader{ return &StreamReader{
reader: reader, reader: reader,
leftOver: -1, leftOver: -1,
@ -94,25 +94,7 @@ func (r *StreamReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
r.leftOver = int(size) r.leftOver = int(size)
} }
mb := buf.NewMultiBufferCap(32) mb, err := r.reader.ReadAtMost(r.leftOver)
for r.leftOver > 0 { r.leftOver -= mb.Len()
readLen := buf.Size return mb, err
if r.leftOver < readLen {
readLen = r.leftOver
}
b := buf.New()
if err := b.AppendSupplier(func(bb []byte) (int, error) {
return r.reader.Read(bb[:readLen])
}); err != nil {
b.Release()
mb.Release()
return nil, err
}
r.leftOver -= b.Len()
mb.Append(b)
if b.Len() < readLen {
break
}
}
return mb, nil
} }

View File

@ -1,7 +1,6 @@
package mux package mux
import ( import (
"io"
"sync" "sync"
"v2ray.com/core/common/buf" "v2ray.com/core/common/buf"
@ -139,7 +138,7 @@ func (s *Session) Close() {
} }
// NewReader creates a buf.Reader based on the transfer type of this Session. // NewReader creates a buf.Reader based on the transfer type of this Session.
func (s *Session) NewReader(reader io.Reader) buf.Reader { func (s *Session) NewReader(reader *buf.BufferedReader) buf.Reader {
if s.transferType == protocol.TransferTypeStream { if s.transferType == protocol.TransferTypeStream {
return NewStreamReader(reader) return NewStreamReader(reader)
} }

View File

@ -166,6 +166,11 @@ func (mb *MultiBuffer) SliceBySize(size int) MultiBuffer {
(*mb)[i] = nil (*mb)[i] = nil
} }
*mb = (*mb)[endIndex:] *mb = (*mb)[endIndex:]
if endIndex == 0 && len(*mb) > 0 {
b := New()
common.Must(b.Reset(ReadFullFrom((*mb)[0], size)))
return NewMultiBufferValue(b)
}
return slice return slice
} }

View File

@ -124,6 +124,20 @@ func (r *BufferedReader) ReadMultiBuffer() (MultiBuffer, error) {
return r.stream.ReadMultiBuffer() return r.stream.ReadMultiBuffer()
} }
// ReadAtMost returns a MultiBuffer with at most size.
func (r *BufferedReader) ReadAtMost(size int) (mb MultiBuffer, err error) {
if r.leftOver == nil {
r.leftOver, err = r.stream.ReadMultiBuffer()
}
if r.leftOver != nil {
mb = r.leftOver.SliceBySize(size)
if r.leftOver.IsEmpty() {
r.leftOver = nil
}
}
return
}
func (r *BufferedReader) writeToInternal(writer io.Writer) (int64, error) { func (r *BufferedReader) writeToInternal(writer io.Writer) (int64, error) {
mbWriter := NewWriter(writer) mbWriter := NewWriter(writer)
totalBytes := int64(0) totalBytes := int64(0)