|
|
|
@ -109,6 +109,8 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
|
|
|
|
|
timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle) |
|
|
|
|
|
|
|
|
|
requestDone := signal.ExecuteAsync(func() error { |
|
|
|
|
defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly) |
|
|
|
|
|
|
|
|
|
writer := buf.NewBufferedWriter(buf.NewWriter(conn)) |
|
|
|
|
if err := session.EncodeRequestHeader(request, writer); err != nil { |
|
|
|
|
return newError("failed to encode request").Base(err).AtWarning() |
|
|
|
@ -139,23 +141,23 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
|
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly) |
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
responseDone := signal.ExecuteAsync(func() error { |
|
|
|
|
defer output.Close() |
|
|
|
|
defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly) |
|
|
|
|
|
|
|
|
|
reader := buf.NewBufferedReader(buf.NewReader(conn)) |
|
|
|
|
header, err := session.DecodeResponseHeader(reader) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
return newError("failed to read header").Base(err) |
|
|
|
|
} |
|
|
|
|
v.handleCommand(rec.Destination(), header.Command) |
|
|
|
|
|
|
|
|
|
reader.SetBuffered(false) |
|
|
|
|
bodyReader := session.DecodeResponseBody(request, reader) |
|
|
|
|
|
|
|
|
|
return buf.Copy(bodyReader, output, buf.UpdateActivity(timer)) |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|