mirror of https://github.com/v2ray/v2ray-core
simplify metadata reader
parent
a1cf299848
commit
27c099dd37
|
@ -235,10 +235,9 @@ func (m *Client) fetchOutput() {
|
||||||
defer m.cancel()
|
defer m.cancel()
|
||||||
|
|
||||||
reader := buf.ToBytesReader(m.inboundRay.InboundOutput())
|
reader := buf.ToBytesReader(m.inboundRay.InboundOutput())
|
||||||
metaReader := NewMetadataReader(reader)
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
meta, err := metaReader.Read()
|
meta, err := ReadMetadata(reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Cause(err) != io.EOF {
|
if errors.Cause(err) != io.EOF {
|
||||||
log.Trace(newError("failed to read metadata").Base(err))
|
log.Trace(newError("failed to read metadata").Base(err))
|
||||||
|
@ -370,8 +369,8 @@ func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader io.Reader) er
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *ServerWorker) handleFrame(ctx context.Context, reader io.Reader, metaReader *MetadataReader) error {
|
func (w *ServerWorker) handleFrame(ctx context.Context, reader io.Reader) error {
|
||||||
meta, err := metaReader.Read()
|
meta, err := ReadMetadata(reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return newError("failed to read metadata").Base(err)
|
return newError("failed to read metadata").Base(err)
|
||||||
}
|
}
|
||||||
|
@ -398,7 +397,6 @@ func (w *ServerWorker) handleFrame(ctx context.Context, reader io.Reader, metaRe
|
||||||
func (w *ServerWorker) run(ctx context.Context) {
|
func (w *ServerWorker) run(ctx context.Context) {
|
||||||
input := w.outboundRay.OutboundInput()
|
input := w.outboundRay.OutboundInput()
|
||||||
reader := buf.ToBytesReader(input)
|
reader := buf.ToBytesReader(input)
|
||||||
metaReader := NewMetadataReader(reader)
|
|
||||||
|
|
||||||
defer w.sessionManager.Close()
|
defer w.sessionManager.Close()
|
||||||
|
|
||||||
|
@ -407,7 +405,7 @@ func (w *ServerWorker) run(ctx context.Context) {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
err := w.handleFrame(ctx, reader, metaReader)
|
err := w.handleFrame(ctx, reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Cause(err) != io.EOF {
|
if errors.Cause(err) != io.EOF {
|
||||||
log.Trace(newError("unexpected EOF").Base(err))
|
log.Trace(newError("unexpected EOF").Base(err))
|
||||||
|
|
|
@ -61,10 +61,9 @@ func TestReaderWriter(t *testing.T) {
|
||||||
writer2.Close()
|
writer2.Close()
|
||||||
|
|
||||||
bytesReader := buf.ToBytesReader(stream)
|
bytesReader := buf.ToBytesReader(stream)
|
||||||
metaReader := NewMetadataReader(bytesReader)
|
|
||||||
streamReader := NewStreamReader(bytesReader)
|
streamReader := NewStreamReader(bytesReader)
|
||||||
|
|
||||||
meta, err := metaReader.Read()
|
meta, err := ReadMetadata(bytesReader)
|
||||||
assert(err, IsNil)
|
assert(err, IsNil)
|
||||||
assert(meta.SessionID, Equals, uint16(1))
|
assert(meta.SessionID, Equals, uint16(1))
|
||||||
assert(byte(meta.SessionStatus), Equals, byte(SessionStatusNew))
|
assert(byte(meta.SessionStatus), Equals, byte(SessionStatusNew))
|
||||||
|
@ -76,14 +75,14 @@ func TestReaderWriter(t *testing.T) {
|
||||||
assert(len(data), Equals, 1)
|
assert(len(data), Equals, 1)
|
||||||
assert(data[0].String(), Equals, "abcd")
|
assert(data[0].String(), Equals, "abcd")
|
||||||
|
|
||||||
meta, err = metaReader.Read()
|
meta, err = ReadMetadata(bytesReader)
|
||||||
assert(err, IsNil)
|
assert(err, IsNil)
|
||||||
assert(byte(meta.SessionStatus), Equals, byte(SessionStatusNew))
|
assert(byte(meta.SessionStatus), Equals, byte(SessionStatusNew))
|
||||||
assert(meta.SessionID, Equals, uint16(2))
|
assert(meta.SessionID, Equals, uint16(2))
|
||||||
assert(byte(meta.Option), Equals, byte(0))
|
assert(byte(meta.Option), Equals, byte(0))
|
||||||
assert(meta.Target, Equals, dest2)
|
assert(meta.Target, Equals, dest2)
|
||||||
|
|
||||||
meta, err = metaReader.Read()
|
meta, err = ReadMetadata(bytesReader)
|
||||||
assert(err, IsNil)
|
assert(err, IsNil)
|
||||||
assert(byte(meta.SessionStatus), Equals, byte(SessionStatusKeep))
|
assert(byte(meta.SessionStatus), Equals, byte(SessionStatusKeep))
|
||||||
assert(meta.SessionID, Equals, uint16(1))
|
assert(meta.SessionID, Equals, uint16(1))
|
||||||
|
@ -94,7 +93,7 @@ func TestReaderWriter(t *testing.T) {
|
||||||
assert(len(data), Equals, 1)
|
assert(len(data), Equals, 1)
|
||||||
assert(data[0].String(), Equals, "efgh")
|
assert(data[0].String(), Equals, "efgh")
|
||||||
|
|
||||||
meta, err = metaReader.Read()
|
meta, err = ReadMetadata(bytesReader)
|
||||||
assert(err, IsNil)
|
assert(err, IsNil)
|
||||||
assert(byte(meta.SessionStatus), Equals, byte(SessionStatusNew))
|
assert(byte(meta.SessionStatus), Equals, byte(SessionStatusNew))
|
||||||
assert(meta.SessionID, Equals, uint16(3))
|
assert(meta.SessionID, Equals, uint16(3))
|
||||||
|
@ -106,19 +105,19 @@ func TestReaderWriter(t *testing.T) {
|
||||||
assert(len(data), Equals, 1)
|
assert(len(data), Equals, 1)
|
||||||
assert(data[0].String(), Equals, "x")
|
assert(data[0].String(), Equals, "x")
|
||||||
|
|
||||||
meta, err = metaReader.Read()
|
meta, err = ReadMetadata(bytesReader)
|
||||||
assert(err, IsNil)
|
assert(err, IsNil)
|
||||||
assert(byte(meta.SessionStatus), Equals, byte(SessionStatusEnd))
|
assert(byte(meta.SessionStatus), Equals, byte(SessionStatusEnd))
|
||||||
assert(meta.SessionID, Equals, uint16(1))
|
assert(meta.SessionID, Equals, uint16(1))
|
||||||
assert(byte(meta.Option), Equals, byte(0))
|
assert(byte(meta.Option), Equals, byte(0))
|
||||||
|
|
||||||
meta, err = metaReader.Read()
|
meta, err = ReadMetadata(bytesReader)
|
||||||
assert(err, IsNil)
|
assert(err, IsNil)
|
||||||
assert(byte(meta.SessionStatus), Equals, byte(SessionStatusEnd))
|
assert(byte(meta.SessionStatus), Equals, byte(SessionStatusEnd))
|
||||||
assert(meta.SessionID, Equals, uint16(3))
|
assert(meta.SessionID, Equals, uint16(3))
|
||||||
assert(byte(meta.Option), Equals, byte(0))
|
assert(byte(meta.Option), Equals, byte(0))
|
||||||
|
|
||||||
meta, err = metaReader.Read()
|
meta, err = ReadMetadata(bytesReader)
|
||||||
assert(err, IsNil)
|
assert(err, IsNil)
|
||||||
assert(byte(meta.SessionStatus), Equals, byte(SessionStatusKeep))
|
assert(byte(meta.SessionStatus), Equals, byte(SessionStatusKeep))
|
||||||
assert(meta.SessionID, Equals, uint16(2))
|
assert(meta.SessionID, Equals, uint16(2))
|
||||||
|
@ -129,7 +128,7 @@ func TestReaderWriter(t *testing.T) {
|
||||||
assert(len(data), Equals, 1)
|
assert(len(data), Equals, 1)
|
||||||
assert(data[0].String(), Equals, "y")
|
assert(data[0].String(), Equals, "y")
|
||||||
|
|
||||||
meta, err = metaReader.Read()
|
meta, err = ReadMetadata(bytesReader)
|
||||||
assert(err, IsNil)
|
assert(err, IsNil)
|
||||||
assert(byte(meta.SessionStatus), Equals, byte(SessionStatusEnd))
|
assert(byte(meta.SessionStatus), Equals, byte(SessionStatusEnd))
|
||||||
assert(meta.SessionID, Equals, uint16(2))
|
assert(meta.SessionID, Equals, uint16(2))
|
||||||
|
@ -137,7 +136,7 @@ func TestReaderWriter(t *testing.T) {
|
||||||
|
|
||||||
stream.Close()
|
stream.Close()
|
||||||
|
|
||||||
meta, err = metaReader.Read()
|
meta, err = ReadMetadata(bytesReader)
|
||||||
assert(err, IsNotNil)
|
assert(err, IsNotNil)
|
||||||
assert(meta, IsNil)
|
assert(meta, IsNil)
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,20 +7,8 @@ import (
|
||||||
"v2ray.com/core/common/serial"
|
"v2ray.com/core/common/serial"
|
||||||
)
|
)
|
||||||
|
|
||||||
type MetadataReader struct {
|
func ReadMetadata(reader io.Reader) (*FrameMetadata, error) {
|
||||||
reader io.Reader
|
metaLen, err := serial.ReadUint16(reader)
|
||||||
buffer []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewMetadataReader(reader io.Reader) *MetadataReader {
|
|
||||||
return &MetadataReader{
|
|
||||||
reader: reader,
|
|
||||||
buffer: make([]byte, 1024),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *MetadataReader) Read() (*FrameMetadata, error) {
|
|
||||||
metaLen, err := serial.ReadUint16(r.reader)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -28,10 +16,13 @@ func (r *MetadataReader) Read() (*FrameMetadata, error) {
|
||||||
return nil, newError("invalid metalen ", metaLen).AtWarning()
|
return nil, newError("invalid metalen ", metaLen).AtWarning()
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := io.ReadFull(r.reader, r.buffer[:metaLen]); err != nil {
|
b := buf.New()
|
||||||
|
defer b.Release()
|
||||||
|
|
||||||
|
if err := b.Reset(buf.ReadFullFrom(reader, int(metaLen))); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return ReadFrameFrom(r.buffer)
|
return ReadFrameFrom(b.Bytes())
|
||||||
}
|
}
|
||||||
|
|
||||||
type PacketReader struct {
|
type PacketReader struct {
|
||||||
|
|
Loading…
Reference in New Issue