diff --git a/common/buf/io.go b/common/buf/io.go index fc739040..21daa6f9 100644 --- a/common/buf/io.go +++ b/common/buf/io.go @@ -49,7 +49,7 @@ func Pipe(timer *signal.ActivityTimer, reader Reader, writer Writer) error { return err } - timer.UpdateActivity() + timer.Update() if buffer.IsEmpty() { buffer.Release() diff --git a/common/signal/timer.go b/common/signal/timer.go index dfcbd849..9d74168a 100644 --- a/common/signal/timer.go +++ b/common/signal/timer.go @@ -12,7 +12,7 @@ type ActivityTimer struct { cancel context.CancelFunc } -func (t *ActivityTimer) UpdateActivity() { +func (t *ActivityTimer) Update() { select { case t.updated <- true: default: @@ -37,7 +37,8 @@ func (t *ActivityTimer) run() { } } -func CancelAfterInactivity(ctx context.Context, cancel context.CancelFunc, timeout time.Duration) *ActivityTimer { +func CancelAfterInactivity(ctx context.Context, timeout time.Duration) (context.Context, *ActivityTimer) { + ctx, cancel := context.WithCancel(ctx) timer := &ActivityTimer{ ctx: ctx, cancel: cancel, @@ -45,5 +46,5 @@ func CancelAfterInactivity(ctx context.Context, cancel context.CancelFunc, timeo updated: make(chan bool, 1), } go timer.run() - return timer + return ctx, timer } diff --git a/proxy/dokodemo/dokodemo.go b/proxy/dokodemo/dokodemo.go index 998abbd2..fa2cc3ea 100644 --- a/proxy/dokodemo/dokodemo.go +++ b/proxy/dokodemo/dokodemo.go @@ -60,14 +60,12 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in log.Info("Dokodemo: Invalid destination. Discarding...") return errors.New("Dokodemo: Unable to get destination.") } - ctx, cancel := context.WithCancel(ctx) - defer cancel() timeout := time.Second * time.Duration(d.config.Timeout) if timeout == 0 { timeout = time.Minute * 2 } - timer := signal.CancelAfterInactivity(ctx, cancel, timeout) + ctx, timer := signal.CancelAfterInactivity(ctx, timeout) inboundRay, err := dispatcher.Dispatch(ctx, dest) if err != nil { diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index da034118..345588d9 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -108,15 +108,12 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial conn.SetReusable(false) - ctx, cancel := context.WithCancel(ctx) - defer cancel() - timeout := time.Second * time.Duration(v.timeout) if timeout == 0 { timeout = time.Minute * 5 } log.Debug("Freedom: Cancel after ", timeout) - timer := signal.CancelAfterInactivity(ctx, cancel, timeout) + ctx, timer := signal.CancelAfterInactivity(ctx, timeout) requestDone := signal.ExecuteAsync(func() error { v2writer := buf.NewWriter(conn) @@ -137,10 +134,9 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial }) if err := signal.ErrorOrFinish2(ctx, requestDone, responseDone); err != nil { - log.Info("Freedom: Connection ending with ", err) input.CloseError() output.CloseError() - return err + return errors.Base(err).Message("Freedom: Connection ends.") } runtime.KeepAlive(timer) diff --git a/proxy/http/server.go b/proxy/http/server.go index f74341ad..4ddb9b7e 100644 --- a/proxy/http/server.go +++ b/proxy/http/server.go @@ -121,14 +121,11 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade return errors.Base(err).Message("HTTP|Server: Failed to write back OK response.") } - ctx, cancel := context.WithCancel(ctx) - defer cancel() - timeout := time.Second * time.Duration(s.config.Timeout) if timeout == 0 { timeout = time.Minute * 2 } - timer := signal.CancelAfterInactivity(ctx, cancel, timeout) + ctx, timer := signal.CancelAfterInactivity(ctx, timeout) ray, err := dispatcher.Dispatch(ctx, dest) if err != nil { return err diff --git a/proxy/shadowsocks/client.go b/proxy/shadowsocks/client.go index 089963bd..6a752874 100644 --- a/proxy/shadowsocks/client.go +++ b/proxy/shadowsocks/client.go @@ -90,17 +90,13 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale request.Option |= RequestOptionOneTimeAuth } - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - timer := signal.CancelAfterInactivity(ctx, cancel, time.Minute*2) + ctx, timer := signal.CancelAfterInactivity(ctx, time.Minute*2) if request.Command == protocol.RequestCommandTCP { bufferedWriter := buf.NewBufferedWriter(conn) bodyWriter, err := WriteTCPRequest(request, bufferedWriter) if err != nil { - log.Info("Shadowsocks|Client: Failed to write request: ", err) - return err + return errors.Base(err).Message("Shadowsocks|Client: Failed to write request") } if err := bufferedWriter.SetBuffered(false); err != nil { @@ -131,8 +127,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale }) if err := signal.ErrorOrFinish2(ctx, requestDone, responseDone); err != nil { - log.Info("Shadowsocks|Client: Connection ends with ", err) - return err + return errors.Base(err).Message("Shadowsocks|Client: Connection ends.") } return nil @@ -147,8 +142,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale requestDone := signal.ExecuteAsync(func() error { if err := buf.PipeUntilEOF(timer, outboundRay.OutboundInput(), writer); err != nil { - log.Info("Shadowsocks|Client: Failed to transport all UDP request: ", err) - return err + return errors.Base(err).Message("Shadowsocks|Client: Failed to transport all UDP request") } return nil }) @@ -162,15 +156,13 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale } if err := buf.PipeUntilEOF(timer, reader, outboundRay.OutboundOutput()); err != nil { - log.Info("Shadowsocks|Client: Failed to transport all UDP response: ", err) - return err + return errors.Base(err).Message("Shadowsocks|Client: Failed to transport all UDP response.") } return nil }) if err := signal.ErrorOrFinish2(ctx, requestDone, responseDone); err != nil { - log.Info("Shadowsocks|Client: Connection ends with ", err) - return err + return errors.Base(err).Message("Shadowsocks|Client: Connection ends.") } return nil diff --git a/proxy/shadowsocks/server.go b/proxy/shadowsocks/server.go index 1af0bc2c..9dd3f8b9 100644 --- a/proxy/shadowsocks/server.go +++ b/proxy/shadowsocks/server.go @@ -134,8 +134,7 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection, request, bodyReader, err := ReadTCPSession(s.user, bufferedReader) if err != nil { log.Access(conn.RemoteAddr(), "", log.AccessRejected, err) - log.Info("Shadowsocks|Server: Failed to create request from: ", conn.RemoteAddr(), ": ", err) - return err + return errors.Base(err).Message("Shadowsocks|Server: Failed to create request from: ", conn.RemoteAddr()) } conn.SetReadDeadline(time.Time{}) @@ -147,9 +146,8 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection, ctx = protocol.ContextWithUser(ctx, request.User) - ctx, cancel := context.WithCancel(ctx) userSettings := s.user.GetSettings() - timer := signal.CancelAfterInactivity(ctx, cancel, userSettings.PayloadTimeout) + ctx, timer := signal.CancelAfterInactivity(ctx, userSettings.PayloadTimeout) ray, err := dispatcher.Dispatch(ctx, dest) if err != nil { return err @@ -159,8 +157,7 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection, bufferedWriter := buf.NewBufferedWriter(conn) responseWriter, err := WriteTCPResponse(request, bufferedWriter) if err != nil { - log.Warning("Shadowsocks|Server: Failed to write response: ", err) - return err + return errors.Base(err).Message("Shadowsocks|Server: Failed to write response.") } payload, err := ray.InboundOutput().Read() @@ -177,8 +174,7 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection, } if err := buf.PipeUntilEOF(timer, ray.InboundOutput(), responseWriter); err != nil { - log.Info("Shadowsocks|Server: Failed to transport all TCP response: ", err) - return err + return errors.Base(err).Message("Shadowsocks|Server: Failed to transport all TCP response.") } return nil @@ -188,18 +184,15 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection, defer ray.InboundInput().Close() if err := buf.PipeUntilEOF(timer, bodyReader, ray.InboundInput()); err != nil { - log.Info("Shadowsocks|Server: Failed to transport all TCP request: ", err) - return err + return errors.Base(err).Message("Shadowsocks|Server: Failed to transport all TCP request.") } return nil }) if err := signal.ErrorOrFinish2(ctx, requestDone, responseDone); err != nil { - log.Info("Shadowsocks|Server: Connection ends with ", err) - cancel() ray.InboundInput().CloseError() ray.InboundOutput().CloseError() - return err + return errors.Base(err).Message("Shadowsocks|Server: Connection ends.") } runtime.KeepAlive(timer) diff --git a/proxy/socks/client.go b/proxy/socks/client.go index f3fde012..118e2a59 100644 --- a/proxy/socks/client.go +++ b/proxy/socks/client.go @@ -81,10 +81,7 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay, dialer proxy. return errors.Base(err).RequireUserAction().Message("Socks|Client: Failed to establish connection to server.") } - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - timer := signal.CancelAfterInactivity(ctx, cancel, time.Minute*2) + ctx, timer := signal.CancelAfterInactivity(ctx, time.Minute*2) var requestFunc func() error var responseFunc func() error diff --git a/proxy/socks/server.go b/proxy/socks/server.go index df553444..b602f90b 100644 --- a/proxy/socks/server.go +++ b/proxy/socks/server.go @@ -110,14 +110,11 @@ func (*Server) handleUDP() error { } func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writer, dest net.Destination, dispatcher dispatcher.Interface) error { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - timeout := time.Second * time.Duration(v.config.Timeout) if timeout == 0 { timeout = time.Minute * 2 } - timer := signal.CancelAfterInactivity(ctx, cancel, timeout) + ctx, timer := signal.CancelAfterInactivity(ctx, timeout) ray, err := dispatcher.Dispatch(ctx, dest) if err != nil { diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index 6fbb0ad3..1585697d 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -193,10 +193,8 @@ func (v *Handler) Process(ctx context.Context, network net.Network, connection i userSettings := request.User.GetSettings() ctx = protocol.ContextWithUser(ctx, request.User) - ctx, cancel := context.WithCancel(ctx) - defer cancel() - timer := signal.CancelAfterInactivity(ctx, cancel, userSettings.PayloadTimeout) + ctx, timer := signal.CancelAfterInactivity(ctx, userSettings.PayloadTimeout) ray, err := dispatcher.Dispatch(ctx, request.Destination()) if err != nil { return err diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index 81c30a82..860c8273 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -105,10 +105,7 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial session := encoding.NewClientSession(protocol.DefaultIDHash) - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - timer := signal.CancelAfterInactivity(ctx, cancel, time.Minute*2) + ctx, timer := signal.CancelAfterInactivity(ctx, time.Minute*2) requestDone := signal.ExecuteAsync(func() error { writer := buf.NewBufferedWriter(conn)