diff --git a/app/proxyman/mux/frame.go b/app/proxyman/mux/frame.go index ac32d3ca..958aa740 100644 --- a/app/proxyman/mux/frame.go +++ b/app/proxyman/mux/frame.go @@ -15,11 +15,11 @@ const ( SessionStatusKeep SessionStatus = 0x02 SessionStatusEnd SessionStatus = 0x03 SessionStatusKeepAlive SessionStatus = 0x04 - SessionStatusError SessionStatus = 0x05 ) const ( - OptionData bitmask.Byte = 0x01 + OptionData bitmask.Byte = 0x01 + OptionError bitmask.Byte = 0x02 ) type TargetNetwork byte diff --git a/app/proxyman/mux/mux.go b/app/proxyman/mux/mux.go index 93fb1b1c..802eadb5 100644 --- a/app/proxyman/mux/mux.go +++ b/app/proxyman/mux/mux.go @@ -10,8 +10,10 @@ import ( "v2ray.com/core" "v2ray.com/core/app/proxyman" + "v2ray.com/core/common" "v2ray.com/core/common/buf" "v2ray.com/core/common/errors" + "v2ray.com/core/common/log" "v2ray.com/core/common/net" "v2ray.com/core/common/protocol" "v2ray.com/core/common/signal" @@ -131,7 +133,7 @@ func (m *Client) monitor() { case <-timer.C: size := m.sessionManager.Size() if size == 0 && m.sessionManager.CloseIfNoSession() { - m.done.Close() + common.Must(m.done.Close()) return } } @@ -151,10 +153,10 @@ func fetchInput(ctx context.Context, s *Session, output buf.Writer) { newError("dispatching request to ", dest).WithContext(ctx).WriteToLog() if err := buf.Copy(s.input, writer); err != nil { newError("failed to fetch all input").Base(err).WithContext(ctx).WriteToLog() - writer.Error() - } else { - writer.Close() + writer.hasError = true } + + writer.Close() } func (m *Client) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) bool { @@ -208,18 +210,10 @@ func (m *Client) handleStatusKeep(meta *FrameMetadata, reader *buf.BufferedReade func (m *Client) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error { if s, found := m.sessionManager.Get(meta.SessionID); found { - s.Close() - } - if meta.Option.Has(OptionData) { - return drain(reader) - } - return nil -} - -func (m *Client) handleStatusError(meta *FrameMetadata, reader *buf.BufferedReader) error { - if s, found := m.sessionManager.Get(meta.SessionID); found { - s.output.CloseError() - s.input.CloseError() + if meta.Option.Has(OptionError) { + s.input.CloseError() + s.output.CloseError() + } s.Close() } if meta.Option.Has(OptionData) { @@ -251,8 +245,6 @@ func (m *Client) fetchOutput() { err = m.handleStatusNew(meta, reader) case SessionStatusKeep: err = m.handleStatusKeep(meta, reader) - case SessionStatusError: - err = m.handleStatusError(meta, reader) default: newError("unknown status: ", meta.SessionStatus).AtError().WriteToLog() return @@ -310,10 +302,10 @@ func handle(ctx context.Context, s *Session, output buf.Writer) { writer := NewResponseWriter(s.ID, output, s.transferType) if err := buf.Copy(s.input, writer); err != nil { newError("session ", s.ID, " ends.").Base(err).WithContext(ctx).WriteToLog() - writer.Error() - } else { - writer.Close() + writer.hasError = true } + + writer.Close() s.Close() } @@ -326,6 +318,17 @@ func (w *ServerWorker) handleStatusKeepAlive(meta *FrameMetadata, reader *buf.Bu func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata, reader *buf.BufferedReader) error { newError("received request for ", meta.Target).WithContext(ctx).WriteToLog() + { + msg := &log.AccessMessage{ + To: meta.Target, + Status: log.AccessAccepted, + Reason: "", + } + if src, f := proxy.SourceFromContext(ctx); f { + msg.From = src + } + log.Record(msg) + } inboundRay, err := w.dispatcher.Dispatch(ctx, meta.Target) if err != nil { if meta.Option.Has(OptionData) { @@ -363,18 +366,10 @@ func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.Buffere func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error { if s, found := w.sessionManager.Get(meta.SessionID); found { - s.Close() - } - if meta.Option.Has(OptionData) { - return drain(reader) - } - return nil -} - -func (w *ServerWorker) handleStatusError(meta *FrameMetadata, reader *buf.BufferedReader) error { - if s, found := w.sessionManager.Get(meta.SessionID); found { - s.input.CloseError() - s.output.CloseError() + if meta.Option.Has(OptionError) { + s.input.CloseError() + s.output.CloseError() + } s.Close() } if meta.Option.Has(OptionData) { @@ -398,8 +393,6 @@ func (w *ServerWorker) handleFrame(ctx context.Context, reader *buf.BufferedRead err = w.handleStatusNew(ctx, meta, reader) case SessionStatusKeep: err = w.handleStatusKeep(meta, reader) - case SessionStatusError: - err = w.handleStatusError(meta, reader) default: return newError("unknown status: ", meta.SessionStatus).AtError() } diff --git a/app/proxyman/mux/writer.go b/app/proxyman/mux/writer.go index be2e2d90..ed0f938f 100644 --- a/app/proxyman/mux/writer.go +++ b/app/proxyman/mux/writer.go @@ -13,6 +13,7 @@ type Writer struct { writer buf.Writer id uint16 followup bool + hasError bool transferType protocol.TransferType } @@ -40,6 +41,7 @@ func (w *Writer) getNextFrameMeta() FrameMetadata { SessionID: w.id, Target: w.dest, } + if w.followup { meta.SessionStatus = SessionStatusKeep } else { @@ -105,18 +107,8 @@ func (w *Writer) Close() error { SessionID: w.id, SessionStatus: SessionStatusEnd, } - - frame := buf.New() - common.Must(meta.WriteTo(frame)) - - w.writer.WriteMultiBuffer(buf.NewMultiBufferValue(frame)) - return nil -} - -func (w *Writer) Error() error { - meta := FrameMetadata{ - SessionID: w.id, - SessionStatus: SessionStatusError, + if w.hasError { + meta.Option.Set(OptionError) } frame := buf.New()