close input on error

pull/299/merge
Darien Raymond 2017-04-26 21:43:53 +02:00
parent 784d4ce560
commit 10ce629c02
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
1 changed files with 35 additions and 27 deletions

View File

@ -79,12 +79,12 @@ type Client struct {
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
manager *ClientManager manager *ClientManager
session2Remove chan uint16
concurrency uint32 concurrency uint32
} }
var muxCoolDestination = net.TCPDestination(net.DomainAddress("v1.mux.cool"), net.Port(9527)) var muxCoolDestination = net.TCPDestination(net.DomainAddress("v1.mux.cool"), net.Port(9527))
// NewClient creates a new mux.Client.
func NewClient(p proxy.Outbound, dialer proxy.Dialer, m *ClientManager) (*Client, error) { func NewClient(p proxy.Outbound, dialer proxy.Dialer, m *ClientManager) (*Client, error) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
ctx = proxy.ContextWithTarget(ctx, muxCoolDestination) ctx = proxy.ContextWithTarget(ctx, muxCoolDestination)
@ -96,7 +96,6 @@ func NewClient(p proxy.Outbound, dialer proxy.Dialer, m *ClientManager) (*Client
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
manager: m, manager: m,
session2Remove: make(chan uint16, 16),
concurrency: m.config.Concurrency, concurrency: m.config.Concurrency,
} }
go c.fetchOutput() go c.fetchOutput()
@ -266,6 +265,7 @@ type Server struct {
dispatcher dispatcher.Interface dispatcher dispatcher.Interface
} }
// NewServer creates a new mux.Server.
func NewServer(ctx context.Context) *Server { func NewServer(ctx context.Context) *Server {
s := &Server{} s := &Server{}
space := app.SpaceFromContext(ctx) space := app.SpaceFromContext(ctx)
@ -361,6 +361,31 @@ func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *Reader) erro
return nil return nil
} }
func (w *ServerWorker) handleFrame(ctx context.Context, reader *Reader) error {
meta, err := reader.ReadMetadata()
if err != nil {
return newError("failed to read metadata").Base(err)
}
switch meta.SessionStatus {
case SessionStatusKeepAlive:
err = w.handleStatusKeepAlive(meta, reader)
case SessionStatusEnd:
err = w.handleStatusEnd(meta, reader)
case SessionStatusNew:
err = w.handleStatusNew(ctx, meta, reader)
case SessionStatusKeep:
err = w.handleStatusKeep(meta, reader)
default:
return newError("unknown status: ", meta.SessionStatus).AtWarning()
}
if err != nil {
return newError("failed to process data").Base(err)
}
return nil
}
func (w *ServerWorker) run(ctx context.Context) { func (w *ServerWorker) run(ctx context.Context) {
input := w.outboundRay.OutboundInput() input := w.outboundRay.OutboundInput()
reader := NewReader(input) reader := NewReader(input)
@ -372,31 +397,14 @@ func (w *ServerWorker) run(ctx context.Context) {
case <-ctx.Done(): case <-ctx.Done():
return return
default: default:
} err := w.handleFrame(ctx, reader)
if err != nil {
meta, err := reader.ReadMetadata() if errors.Cause(err) != io.EOF {
if err != nil { log.Trace(newError("unexpected EOF").Base(err))
log.Trace(newError("failed to read metadata").Base(err)) input.CloseError()
return }
} return
}
switch meta.SessionStatus {
case SessionStatusKeepAlive:
err = w.handleStatusKeepAlive(meta, reader)
case SessionStatusEnd:
err = w.handleStatusEnd(meta, reader)
case SessionStatusNew:
err = w.handleStatusNew(ctx, meta, reader)
case SessionStatusKeep:
err = w.handleStatusKeep(meta, reader)
default:
log.Trace(newError("unknown status: ", meta.SessionStatus).AtWarning())
return
}
if err != nil {
log.Trace(newError("failed to process data").Base(err))
return
} }
} }
} }