diff --git a/app/proxyman/inbound/always.go b/app/proxyman/inbound/always.go index 38c933f5..e6760357 100644 --- a/app/proxyman/inbound/always.go +++ b/app/proxyman/inbound/always.go @@ -7,9 +7,9 @@ import ( "v2ray.com/core/app/proxyman" "v2ray.com/core/common" "v2ray.com/core/common/dice" + "v2ray.com/core/common/errors" "v2ray.com/core/common/mux" "v2ray.com/core/common/net" - "v2ray.com/core/common/serial" "v2ray.com/core/features/policy" "v2ray.com/core/features/stats" "v2ray.com/core/proxy" @@ -137,17 +137,13 @@ func (h *AlwaysOnInboundHandler) Start() error { // Close implements common.Closable. func (h *AlwaysOnInboundHandler) Close() error { - var errors []interface{} + var errs []error for _, worker := range h.workers { - if err := worker.Close(); err != nil { - errors = append(errors, err) - } - } - if err := h.mux.Close(); err != nil { - errors = append(errors, err) + errs = append(errs, worker.Close()) } - if len(errors) > 0 { - return newError("failed to close all resources").Base(newError(serial.Concat(errors...))) + errs = append(errs, h.mux.Close()) + if err := errors.Combine(errs...); err != nil { + return newError("failed to close all resources").Base(err) } return nil } diff --git a/app/proxyman/outbound/outbound.go b/app/proxyman/outbound/outbound.go index 5cf838c9..ce5cc5c7 100644 --- a/app/proxyman/outbound/outbound.go +++ b/app/proxyman/outbound/outbound.go @@ -10,6 +10,7 @@ import ( "v2ray.com/core" "v2ray.com/core/app/proxyman" "v2ray.com/core/common" + "v2ray.com/core/common/errors" "v2ray.com/core/features/outbound" ) @@ -64,15 +65,16 @@ func (m *Manager) Close() error { m.running = false + var errs []error for _, h := range m.taggedHandler { - h.Close() + errs = append(errs, h.Close()) } for _, h := range m.untaggedHandlers { - h.Close() + errs = append(errs, h.Close()) } - return nil + return errors.Combine(errs...) } // GetDefaultHandler implements outbound.Manager. diff --git a/app/reverse/reverse.go b/app/reverse/reverse.go index ed3b41fb..88f15f79 100644 --- a/app/reverse/reverse.go +++ b/app/reverse/reverse.go @@ -7,6 +7,7 @@ import ( "v2ray.com/core" "v2ray.com/core/common" + "v2ray.com/core/common/errors" "v2ray.com/core/common/net" "v2ray.com/core/features/outbound" "v2ray.com/core/features/routing" @@ -82,17 +83,14 @@ func (r *Reverse) Start() error { } func (r *Reverse) Close() error { + var errs []error for _, b := range r.bridges { - if err := b.Close(); err != nil { - return err - } + errs = append(errs, b.Close()) } for _, p := range r.portals { - if err := p.Close(); err != nil { - return err - } + errs = append(errs, p.Close()) } - return nil + return errors.Combine(errs...) } diff --git a/common/errors/multi_error.go b/common/errors/multi_error.go new file mode 100644 index 00000000..8f19c97a --- /dev/null +++ b/common/errors/multi_error.go @@ -0,0 +1,30 @@ +package errors + +import ( + "strings" +) + +type multiError []error + +func (e multiError) Error() string { + var r strings.Builder + r.WriteString("multierr: ") + for _, err := range e { + r.WriteString(err.Error()) + r.WriteString(" | ") + } + return r.String() +} + +func Combine(maybeError ...error) error { + var errs multiError + for _, err := range maybeError { + if err != nil { + errs = append(errs, err) + } + } + if len(errs) == 0 { + return nil + } + return errs +} diff --git a/common/task/task.go b/common/task/task.go index 26227b85..1b6a5319 100644 --- a/common/task/task.go +++ b/common/task/task.go @@ -2,26 +2,12 @@ package task import ( "context" - "strings" - "v2ray.com/core/common" "v2ray.com/core/common/signal/semaphore" ) type Task func() error -type MultiError []error - -func (e MultiError) Error() string { - var r strings.Builder - common.Must2(r.WriteString("multierr: ")) - for _, err := range e { - common.Must2(r.WriteString(err.Error())) - common.Must2(r.WriteString(" | ")) - } - return r.String() -} - type executionContext struct { ctx context.Context tasks []Task @@ -90,30 +76,6 @@ func Sequential(tasks ...Task) ExecutionOption { } } -func SequentialAll(tasks ...Task) ExecutionOption { - return func(c *executionContext) { - switch len(tasks) { - case 0: - return - case 1: - c.tasks = append(c.tasks, tasks[0]) - default: - c.tasks = append(c.tasks, func() error { - var merr MultiError - for _, task := range tasks { - if err := task(); err != nil { - merr = append(merr, err) - } - } - if len(merr) == 0 { - return nil - } - return merr - }) - } - } -} - func OnSuccess(task Task) ExecutionOption { return func(c *executionContext) { c.onSuccess = task diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index e9274e1a..5d6a05eb 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -139,9 +139,10 @@ func New(ctx context.Context, config *Config) (*Handler, error) { // Close implements common.Closable. func (h *Handler) Close() error { - return task.Run( - task.SequentialAll( - task.Close(h.clients), task.Close(h.sessionHistory), task.Close(h.usersByEmail)))() + return errors.Combine( + h.clients.Close(), + h.sessionHistory.Close(), + common.Close(h.usersByEmail)) } // Network implements proxy.Inbound.Network(). @@ -290,9 +291,10 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection i } responseDone := func() error { + defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly) + writer := buf.NewBufferedWriter(buf.NewWriter(connection)) defer writer.Flush() - defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly) response := &protocol.ResponseHeader{ Command: h.generateCommand(ctx, request),