Update bridge.go

pull/5110/head
patterniha 2025-09-09 18:53:02 +03:30
parent 9cca6b916c
commit f0f7130b08
1 changed files with 24 additions and 24 deletions

View File

@ -55,7 +55,7 @@ func (b *Bridge) cleanup() {
activeWorkers = append(activeWorkers, w) activeWorkers = append(activeWorkers, w)
} }
if w.Closed() { if w.Closed() {
w.timer.SetTimeout(0) w.Timer.SetTimeout(0)
} }
} }
@ -98,11 +98,11 @@ func (b *Bridge) Close() error {
} }
type BridgeWorker struct { type BridgeWorker struct {
tag string Tag string
worker *mux.ServerWorker Worker *mux.ServerWorker
dispatcher routing.Dispatcher Dispatcher routing.Dispatcher
state Control_State State Control_State
timer *signal.ActivityTimer Timer *signal.ActivityTimer
} }
func NewBridgeWorker(domain string, tag string, d routing.Dispatcher) (*BridgeWorker, error) { func NewBridgeWorker(domain string, tag string, d routing.Dispatcher) (*BridgeWorker, error) {
@ -120,20 +120,20 @@ func NewBridgeWorker(domain string, tag string, d routing.Dispatcher) (*BridgeWo
} }
w := &BridgeWorker{ w := &BridgeWorker{
dispatcher: d, Dispatcher: d,
tag: tag, Tag: tag,
} }
worker, err := mux.NewServerWorker(context.Background(), w, link) worker, err := mux.NewServerWorker(context.Background(), w, link)
if err != nil { if err != nil {
return nil, err return nil, err
} }
w.worker = worker w.Worker = worker
terminate := func() { terminate := func() {
worker.Close() worker.Close()
} }
w.timer = signal.CancelAfterInactivity(ctx, terminate, 60*time.Second) // prevent leak w.Timer = signal.CancelAfterInactivity(ctx, terminate, 60*time.Second)
return w, nil return w, nil
} }
@ -150,15 +150,15 @@ func (w *BridgeWorker) Close() error {
} }
func (w *BridgeWorker) IsActive() bool { func (w *BridgeWorker) IsActive() bool {
return w.state == Control_ACTIVE && !w.worker.Closed() return w.State == Control_ACTIVE && !w.Worker.Closed()
} }
func (w *BridgeWorker) Closed() bool { func (w *BridgeWorker) Closed() bool {
return w.worker.Closed() return w.Worker.Closed()
} }
func (w *BridgeWorker) Connections() uint32 { func (w *BridgeWorker) Connections() uint32 {
return w.worker.ActiveConnections() return w.Worker.ActiveConnections()
} }
func (w *BridgeWorker) handleInternalConn(link *transport.Link) { func (w *BridgeWorker) handleInternalConn(link *transport.Link) {
@ -167,22 +167,22 @@ func (w *BridgeWorker) handleInternalConn(link *transport.Link) {
mb, err := reader.ReadMultiBuffer() mb, err := reader.ReadMultiBuffer()
if err != nil { if err != nil {
if w.Closed() { if w.Closed() {
w.timer.SetTimeout(0) w.Timer.SetTimeout(0)
} else { } else {
w.timer.SetTimeout(24 * time.Hour) w.Timer.SetTimeout(24 * time.Hour)
} }
return return
} }
w.timer.Update() w.Timer.Update()
for _, b := range mb { for _, b := range mb {
var ctl Control var ctl Control
if err := proto.Unmarshal(b.Bytes(), &ctl); err != nil { if err := proto.Unmarshal(b.Bytes(), &ctl); err != nil {
errors.LogInfoInner(context.Background(), err, "failed to parse proto message") errors.LogInfoInner(context.Background(), err, "failed to parse proto message")
w.timer.SetTimeout(0) w.Timer.SetTimeout(0)
return return
} }
if ctl.State != w.state { if ctl.State != w.State {
w.state = ctl.State w.State = ctl.State
} }
} }
} }
@ -191,9 +191,9 @@ func (w *BridgeWorker) handleInternalConn(link *transport.Link) {
func (w *BridgeWorker) Dispatch(ctx context.Context, dest net.Destination) (*transport.Link, error) { func (w *BridgeWorker) Dispatch(ctx context.Context, dest net.Destination) (*transport.Link, error) {
if !isInternalDomain(dest) { if !isInternalDomain(dest) {
ctx = session.ContextWithInbound(ctx, &session.Inbound{ ctx = session.ContextWithInbound(ctx, &session.Inbound{
Tag: w.tag, Tag: w.Tag,
}) })
return w.dispatcher.Dispatch(ctx, dest) return w.Dispatcher.Dispatch(ctx, dest)
} }
opt := []pipe.Option{pipe.WithSizeLimit(16 * 1024)} opt := []pipe.Option{pipe.WithSizeLimit(16 * 1024)}
@ -214,12 +214,12 @@ func (w *BridgeWorker) Dispatch(ctx context.Context, dest net.Destination) (*tra
func (w *BridgeWorker) DispatchLink(ctx context.Context, dest net.Destination, link *transport.Link) error { func (w *BridgeWorker) DispatchLink(ctx context.Context, dest net.Destination, link *transport.Link) error {
if !isInternalDomain(dest) { if !isInternalDomain(dest) {
ctx = session.ContextWithInbound(ctx, &session.Inbound{ ctx = session.ContextWithInbound(ctx, &session.Inbound{
Tag: w.tag, Tag: w.Tag,
}) })
return w.dispatcher.DispatchLink(ctx, dest, link) return w.Dispatcher.DispatchLink(ctx, dest, link)
} }
link = w.dispatcher.(*dispatcher.DefaultDispatcher).WrapLink(ctx, link) link = w.Dispatcher.(*dispatcher.DefaultDispatcher).WrapLink(ctx, link)
w.handleInternalConn(link) w.handleInternalConn(link)
return nil return nil