From 6c89940e655079b1912b86e8df4f10a7861052bc Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Sat, 27 Oct 2018 09:08:32 +0200 Subject: [PATCH] fix #1338 --- common/mux/server.go | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/common/mux/server.go b/common/mux/server.go index 5f963d0a..921b19d6 100644 --- a/common/mux/server.go +++ b/common/mux/server.go @@ -44,15 +44,14 @@ func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (*vio.Link, uplinkReader, uplinkWriter := pipe.New(opts...) downlinkReader, downlinkWriter := pipe.New(opts...) - worker := &ServerWorker{ - dispatcher: s.dispatcher, - link: &vio.Link{ - Reader: uplinkReader, - Writer: downlinkWriter, - }, - sessionManager: NewSessionManager(), + _, err := NewServerWorker(ctx, s.dispatcher, &vio.Link{ + Reader: uplinkReader, + Writer: downlinkWriter, + }) + if err != nil { + return nil, err } - go worker.run(ctx) + return &vio.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil } @@ -72,6 +71,16 @@ type ServerWorker struct { sessionManager *SessionManager } +func NewServerWorker(ctx context.Context, d routing.Dispatcher, link *vio.Link) (*ServerWorker, error) { + worker := &ServerWorker{ + dispatcher: d, + link: link, + sessionManager: NewSessionManager(), + } + go worker.run(ctx) + return worker, nil +} + func handle(ctx context.Context, s *Session, output buf.Writer) { writer := NewResponseWriter(s.ID, output, s.transferType) if err := buf.Copy(s.input, writer); err != nil { @@ -142,7 +151,7 @@ func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.Buffere s, found := w.sessionManager.Get(meta.SessionID) if !found { - buf.Copy(NewStreamReader(reader), buf.Discard) + return buf.Copy(NewStreamReader(reader), buf.Discard) } rr := s.NewReader(reader)