From 0c66016d5f6635db87791c20f02e6cd06ff472c6 Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Wed, 18 Apr 2018 22:13:31 +0200 Subject: [PATCH] fix a regression that mux doesn't handle passive connection. fixes #1061 --- app/proxyman/mux/mux.go | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/app/proxyman/mux/mux.go b/app/proxyman/mux/mux.go index f10a3c2a..6d6fabe6 100644 --- a/app/proxyman/mux/mux.go +++ b/app/proxyman/mux/mux.go @@ -144,6 +144,19 @@ func (m *Client) monitor() { } } +func copyFirstPayload(reader *pipe.Reader, writer *Writer) error { + data, err := reader.ReadMultiBufferWithTimeout(time.Millisecond * 200) + if err == buf.ErrReadTimeout { + return writer.writeMetaOnly() + } + + if err != nil { + return err + } + + return writer.WriteMultiBuffer(data) +} + func fetchInput(ctx context.Context, s *Session, output buf.Writer) { dest, _ := proxy.TargetFromContext(ctx) transferType := protocol.TransferTypeStream @@ -153,14 +166,22 @@ func fetchInput(ctx context.Context, s *Session, output buf.Writer) { s.transferType = transferType writer := NewWriter(s.ID, dest, output, transferType) defer s.Close() + defer writer.Close() newError("dispatching request to ", dest).WithContext(ctx).WriteToLog() + if pReader, ok := s.input.(*pipe.Reader); ok { + if err := copyFirstPayload(pReader, writer); err != nil { + newError("failed to fetch first payload").Base(err).WithContext(ctx).WriteToLog() + writer.hasError = true + return + } + } + if err := buf.Copy(s.input, writer); err != nil { newError("failed to fetch all input").Base(err).WithContext(ctx).WriteToLog() writer.hasError = true + return } - - writer.Close() } func (m *Client) Dispatch(ctx context.Context, link *core.Link) bool {