diff --git a/app/proxyman/mux/mux.go b/app/proxyman/mux/mux.go index 2f458d07..f3cbf635 100644 --- a/app/proxyman/mux/mux.go +++ b/app/proxyman/mux/mux.go @@ -177,8 +177,7 @@ func fetchInput(ctx context.Context, s *session, output buf.Writer) { return } } - _, timer := signal.CancelAfterInactivity(ctx, time.Minute*30) - if err := buf.PipeUntilEOF(timer, s.input, writer); err != nil { + if err := buf.PipeUntilEOF(signal.BackgroundTimer(), s.input, writer); err != nil { log.Info("Proxyman|Mux|Client: Failed to fetch all input: ", err) } } @@ -324,8 +323,7 @@ func handle(ctx context.Context, s *session, output buf.Writer) { writer := NewResponseWriter(s.id, output) defer writer.Close() - _, timer := signal.CancelAfterInactivity(ctx, time.Minute*30) - if err := buf.PipeUntilEOF(timer, s.input, writer); err != nil { + if err := buf.PipeUntilEOF(signal.BackgroundTimer(), s.input, writer); err != nil { log.Info("Proxyman|Mux|ServerWorker: Session ", s.id, " ends: ", err) } } diff --git a/common/buf/io.go b/common/buf/io.go index 21daa6f9..60032e61 100644 --- a/common/buf/io.go +++ b/common/buf/io.go @@ -42,7 +42,7 @@ func ReadFullFrom(reader io.Reader, size int) Supplier { // Pipe dumps all payload from reader to writer, until an error occurs. // ActivityTimer gets updated as soon as there is a payload. -func Pipe(timer *signal.ActivityTimer, reader Reader, writer Writer) error { +func Pipe(timer signal.ActivityTimer, reader Reader, writer Writer) error { for { buffer, err := reader.Read() if err != nil { @@ -65,7 +65,7 @@ func Pipe(timer *signal.ActivityTimer, reader Reader, writer Writer) error { } // PipeUntilEOF behaves the same as Pipe(). The only difference is PipeUntilEOF returns nil on EOF. -func PipeUntilEOF(timer *signal.ActivityTimer, reader Reader, writer Writer) error { +func PipeUntilEOF(timer signal.ActivityTimer, reader Reader, writer Writer) error { err := Pipe(timer, reader, writer) if err != nil && errors.Cause(err) != io.EOF { return err diff --git a/common/signal/timer.go b/common/signal/timer.go index 9d74168a..deb06016 100644 --- a/common/signal/timer.go +++ b/common/signal/timer.go @@ -5,21 +5,25 @@ import ( "time" ) -type ActivityTimer struct { +type ActivityTimer interface { + Update() +} + +type realActivityTimer struct { updated chan bool timeout time.Duration ctx context.Context cancel context.CancelFunc } -func (t *ActivityTimer) Update() { +func (t *realActivityTimer) Update() { select { case t.updated <- true: default: } } -func (t *ActivityTimer) run() { +func (t *realActivityTimer) run() { for { select { case <-time.After(t.timeout): @@ -37,9 +41,9 @@ func (t *ActivityTimer) run() { } } -func CancelAfterInactivity(ctx context.Context, timeout time.Duration) (context.Context, *ActivityTimer) { +func CancelAfterInactivity(ctx context.Context, timeout time.Duration) (context.Context, ActivityTimer) { ctx, cancel := context.WithCancel(ctx) - timer := &ActivityTimer{ + timer := &realActivityTimer{ ctx: ctx, cancel: cancel, timeout: timeout, @@ -48,3 +52,11 @@ func CancelAfterInactivity(ctx context.Context, timeout time.Duration) (context. go timer.run() return ctx, timer } + +type noOpActivityTimer struct{} + +func (noOpActivityTimer) Update() {} + +func BackgroundTimer() ActivityTimer { + return noOpActivityTimer{} +} diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index 1585697d..93b2bdd7 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -123,7 +123,7 @@ func (v *Handler) GetUser(email string) *protocol.User { return user } -func transferRequest(timer *signal.ActivityTimer, session *encoding.ServerSession, request *protocol.RequestHeader, input io.Reader, output ray.OutputStream) error { +func transferRequest(timer signal.ActivityTimer, session *encoding.ServerSession, request *protocol.RequestHeader, input io.Reader, output ray.OutputStream) error { defer output.Close() bodyReader := session.DecodeRequestBody(request, input) @@ -133,7 +133,7 @@ func transferRequest(timer *signal.ActivityTimer, session *encoding.ServerSessio return nil } -func transferResponse(timer *signal.ActivityTimer, session *encoding.ServerSession, request *protocol.RequestHeader, response *protocol.ResponseHeader, input ray.InputStream, output io.Writer) error { +func transferResponse(timer signal.ActivityTimer, session *encoding.ServerSession, request *protocol.RequestHeader, response *protocol.ResponseHeader, input ray.InputStream, output io.Writer) error { session.EncodeResponseHeader(response, output) bodyWriter := session.EncodeResponseBody(request, output)