diff --git a/app/proxyman/mux/mux.go b/app/proxyman/mux/mux.go index 89b12552..10ff3ef0 100644 --- a/app/proxyman/mux/mux.go +++ b/app/proxyman/mux/mux.go @@ -204,20 +204,20 @@ func (m *Client) Dispatch(ctx context.Context, link *core.Link) bool { return true } -func drain(reader *buf.BufferedReader) error { - return buf.Copy(NewStreamReader(reader), buf.Discard) +func drain(reader buf.Reader) error { + return buf.Copy(reader, buf.Discard) } func (m *Client) handleStatueKeepAlive(meta *FrameMetadata, reader *buf.BufferedReader) error { if meta.Option.Has(OptionData) { - return drain(reader) + return drain(NewStreamReader(reader)) } return nil } func (m *Client) handleStatusNew(meta *FrameMetadata, reader *buf.BufferedReader) error { if meta.Option.Has(OptionData) { - return drain(reader) + return drain(NewStreamReader(reader)) } return nil } @@ -228,14 +228,15 @@ func (m *Client) handleStatusKeep(meta *FrameMetadata, reader *buf.BufferedReade } if s, found := m.sessionManager.Get(meta.SessionID); found { - if err := buf.Copy(s.NewReader(reader), s.output); err != nil { - drain(reader) + rr := s.NewReader(reader) + if err := buf.Copy(rr, s.output); err != nil { + drain(rr) pipe.CloseError(s.input) return s.Close() } return nil } - return drain(reader) + return drain(NewStreamReader(reader)) } func (m *Client) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error { @@ -247,7 +248,7 @@ func (m *Client) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader s.Close() } if meta.Option.Has(OptionData) { - return drain(reader) + return drain(NewStreamReader(reader)) } return nil } @@ -346,7 +347,7 @@ func handle(ctx context.Context, s *Session, output buf.Writer) { func (w *ServerWorker) handleStatusKeepAlive(meta *FrameMetadata, reader *buf.BufferedReader) error { if meta.Option.Has(OptionData) { - return drain(reader) + return drain(NewStreamReader(reader)) } return nil } @@ -367,7 +368,7 @@ func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata, link, err := w.dispatcher.Dispatch(ctx, meta.Target) if err != nil { if meta.Option.Has(OptionData) { - drain(reader) + drain(NewStreamReader(reader)) } return newError("failed to dispatch request.").Base(err) } @@ -383,8 +384,15 @@ func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata, } w.sessionManager.Add(s) go handle(ctx, s, w.link.Writer) - if meta.Option.Has(OptionData) { - return buf.Copy(s.NewReader(reader), s.output, buf.IgnoreWriterError()) + if !meta.Option.Has(OptionData) { + return nil + } + + rr := s.NewReader(reader) + if err := buf.Copy(rr, s.output); err != nil { + drain(rr) + pipe.CloseError(s.input) + return s.Close() } return nil } @@ -394,14 +402,15 @@ func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.Buffere return nil } if s, found := w.sessionManager.Get(meta.SessionID); found { - if err := buf.Copy(s.NewReader(reader), s.output); err != nil { - drain(reader) + rr := s.NewReader(reader) + if err := buf.Copy(rr, s.output); err != nil { + drain(rr) pipe.CloseError(s.input) return s.Close() } return nil } - return drain(reader) + return drain(NewStreamReader(reader)) } func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error { @@ -413,7 +422,7 @@ func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.Buffered s.Close() } if meta.Option.Has(OptionData) { - return drain(reader) + return drain(NewStreamReader(reader)) } return nil }