mirror of https://github.com/XTLS/Xray-core
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
249 lines
7.5 KiB
249 lines
7.5 KiB
package outbound |
|
|
|
import ( |
|
"context" |
|
"crypto/hmac" |
|
"crypto/sha256" |
|
"hash/crc64" |
|
"time" |
|
|
|
"github.com/xtls/xray-core/common" |
|
"github.com/xtls/xray-core/common/buf" |
|
"github.com/xtls/xray-core/common/errors" |
|
"github.com/xtls/xray-core/common/net" |
|
"github.com/xtls/xray-core/common/platform" |
|
"github.com/xtls/xray-core/common/protocol" |
|
"github.com/xtls/xray-core/common/retry" |
|
"github.com/xtls/xray-core/common/session" |
|
"github.com/xtls/xray-core/common/signal" |
|
"github.com/xtls/xray-core/common/task" |
|
"github.com/xtls/xray-core/common/xudp" |
|
core "github.com/xtls/xray-core/core" |
|
"github.com/xtls/xray-core/features/policy" |
|
"github.com/xtls/xray-core/proxy/vmess" |
|
"github.com/xtls/xray-core/proxy/vmess/encoding" |
|
"github.com/xtls/xray-core/transport" |
|
"github.com/xtls/xray-core/transport/internet" |
|
"github.com/xtls/xray-core/transport/internet/stat" |
|
) |
|
|
|
// Handler is an outbound connection handler for VMess protocol. |
|
type Handler struct { |
|
serverList *protocol.ServerList |
|
serverPicker protocol.ServerPicker |
|
policyManager policy.Manager |
|
cone bool |
|
} |
|
|
|
// New creates a new VMess outbound handler. |
|
func New(ctx context.Context, config *Config) (*Handler, error) { |
|
serverList := protocol.NewServerList() |
|
for _, rec := range config.Receiver { |
|
s, err := protocol.NewServerSpecFromPB(rec) |
|
if err != nil { |
|
return nil, errors.New("failed to parse server spec").Base(err) |
|
} |
|
serverList.AddServer(s) |
|
} |
|
|
|
v := core.MustFromContext(ctx) |
|
handler := &Handler{ |
|
serverList: serverList, |
|
serverPicker: protocol.NewRoundRobinServerPicker(serverList), |
|
policyManager: v.GetFeature(policy.ManagerType()).(policy.Manager), |
|
cone: ctx.Value("cone").(bool), |
|
} |
|
|
|
return handler, nil |
|
} |
|
|
|
// Process implements proxy.Outbound.Process(). |
|
func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error { |
|
outbounds := session.OutboundsFromContext(ctx) |
|
ob := outbounds[len(outbounds)-1] |
|
if !ob.Target.IsValid() { |
|
return errors.New("target not specified").AtError() |
|
} |
|
ob.Name = "vmess" |
|
ob.CanSpliceCopy = 3 |
|
|
|
var rec *protocol.ServerSpec |
|
var conn stat.Connection |
|
err := retry.ExponentialBackoff(5, 200).On(func() error { |
|
rec = h.serverPicker.PickServer() |
|
rawConn, err := dialer.Dial(ctx, rec.Destination()) |
|
if err != nil { |
|
return err |
|
} |
|
conn = rawConn |
|
|
|
return nil |
|
}) |
|
if err != nil { |
|
return errors.New("failed to find an available destination").Base(err).AtWarning() |
|
} |
|
defer conn.Close() |
|
|
|
target := ob.Target |
|
errors.LogInfo(ctx, "tunneling request to ", target, " via ", rec.Destination().NetAddr()) |
|
|
|
command := protocol.RequestCommandTCP |
|
if target.Network == net.Network_UDP { |
|
command = protocol.RequestCommandUDP |
|
} |
|
if target.Address.Family().IsDomain() && target.Address.Domain() == "v1.mux.cool" { |
|
command = protocol.RequestCommandMux |
|
} |
|
|
|
user := rec.PickUser() |
|
request := &protocol.RequestHeader{ |
|
Version: encoding.Version, |
|
User: user, |
|
Command: command, |
|
Address: target.Address, |
|
Port: target.Port, |
|
Option: protocol.RequestOptionChunkStream, |
|
} |
|
|
|
account := request.User.Account.(*vmess.MemoryAccount) |
|
request.Security = account.Security |
|
|
|
if request.Security == protocol.SecurityType_AES128_GCM || request.Security == protocol.SecurityType_NONE || request.Security == protocol.SecurityType_CHACHA20_POLY1305 { |
|
request.Option.Set(protocol.RequestOptionChunkMasking) |
|
} |
|
|
|
if shouldEnablePadding(request.Security) && request.Option.Has(protocol.RequestOptionChunkMasking) { |
|
request.Option.Set(protocol.RequestOptionGlobalPadding) |
|
} |
|
|
|
if request.Security == protocol.SecurityType_ZERO { |
|
request.Security = protocol.SecurityType_NONE |
|
request.Option.Clear(protocol.RequestOptionChunkStream) |
|
request.Option.Clear(protocol.RequestOptionChunkMasking) |
|
} |
|
|
|
if account.AuthenticatedLengthExperiment { |
|
request.Option.Set(protocol.RequestOptionAuthenticatedLength) |
|
} |
|
|
|
input := link.Reader |
|
output := link.Writer |
|
|
|
hashkdf := hmac.New(sha256.New, []byte("VMessBF")) |
|
hashkdf.Write(account.ID.Bytes()) |
|
|
|
behaviorSeed := crc64.Checksum(hashkdf.Sum(nil), crc64.MakeTable(crc64.ISO)) |
|
|
|
var newCtx context.Context |
|
var newCancel context.CancelFunc |
|
if session.TimeoutOnlyFromContext(ctx) { |
|
newCtx, newCancel = context.WithCancel(context.Background()) |
|
} |
|
|
|
session := encoding.NewClientSession(ctx, int64(behaviorSeed)) |
|
sessionPolicy := h.policyManager.ForLevel(request.User.Level) |
|
|
|
ctx, cancel := context.WithCancel(ctx) |
|
timer := signal.CancelAfterInactivity(ctx, func() { |
|
cancel() |
|
if newCancel != nil { |
|
newCancel() |
|
} |
|
}, sessionPolicy.Timeouts.ConnectionIdle) |
|
|
|
if request.Command == protocol.RequestCommandUDP && h.cone && request.Port != 53 && request.Port != 443 { |
|
request.Command = protocol.RequestCommandMux |
|
request.Address = net.DomainAddress("v1.mux.cool") |
|
request.Port = net.Port(666) |
|
} |
|
|
|
requestDone := func() error { |
|
defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly) |
|
|
|
writer := buf.NewBufferedWriter(buf.NewWriter(conn)) |
|
if err := session.EncodeRequestHeader(request, writer); err != nil { |
|
return errors.New("failed to encode request").Base(err).AtWarning() |
|
} |
|
|
|
bodyWriter, err := session.EncodeRequestBody(request, writer) |
|
if err != nil { |
|
return errors.New("failed to start encoding").Base(err) |
|
} |
|
bodyWriter2 := bodyWriter |
|
if request.Command == protocol.RequestCommandMux && request.Port == 666 { |
|
bodyWriter = xudp.NewPacketWriter(bodyWriter, target, xudp.GetGlobalID(ctx)) |
|
} |
|
if err := buf.CopyOnceTimeout(input, bodyWriter, time.Millisecond*100); err != nil && err != buf.ErrNotTimeoutReader && err != buf.ErrReadTimeout { |
|
return errors.New("failed to write first payload").Base(err) |
|
} |
|
|
|
if err := writer.SetBuffered(false); err != nil { |
|
return err |
|
} |
|
|
|
if err := buf.Copy(input, bodyWriter, buf.UpdateActivity(timer)); err != nil { |
|
return err |
|
} |
|
|
|
if request.Option.Has(protocol.RequestOptionChunkStream) && !account.NoTerminationSignal { |
|
if err := bodyWriter2.WriteMultiBuffer(buf.MultiBuffer{}); err != nil { |
|
return err |
|
} |
|
} |
|
|
|
return nil |
|
} |
|
|
|
responseDone := func() error { |
|
defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly) |
|
|
|
reader := &buf.BufferedReader{Reader: buf.NewReader(conn)} |
|
header, err := session.DecodeResponseHeader(reader) |
|
if err != nil { |
|
return errors.New("failed to read header").Base(err) |
|
} |
|
h.handleCommand(rec.Destination(), header.Command) |
|
|
|
bodyReader, err := session.DecodeResponseBody(request, reader) |
|
if err != nil { |
|
return errors.New("failed to start encoding response").Base(err) |
|
} |
|
if request.Command == protocol.RequestCommandMux && request.Port == 666 { |
|
bodyReader = xudp.NewPacketReader(&buf.BufferedReader{Reader: bodyReader}) |
|
} |
|
|
|
return buf.Copy(bodyReader, output, buf.UpdateActivity(timer)) |
|
} |
|
|
|
if newCtx != nil { |
|
ctx = newCtx |
|
} |
|
|
|
responseDonePost := task.OnSuccess(responseDone, task.Close(output)) |
|
if err := task.Run(ctx, requestDone, responseDonePost); err != nil { |
|
return errors.New("connection ends").Base(err) |
|
} |
|
|
|
return nil |
|
} |
|
|
|
var ( |
|
enablePadding = false |
|
) |
|
|
|
func shouldEnablePadding(s protocol.SecurityType) bool { |
|
return enablePadding || s == protocol.SecurityType_AES128_GCM || s == protocol.SecurityType_CHACHA20_POLY1305 || s == protocol.SecurityType_AUTO |
|
} |
|
|
|
func init() { |
|
common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) { |
|
return New(ctx, config.(*Config)) |
|
})) |
|
|
|
const defaultFlagValue = "NOT_DEFINED_AT_ALL" |
|
|
|
paddingValue := platform.NewEnvFlag(platform.UseVmessPadding).GetValue(func() string { return defaultFlagValue }) |
|
if paddingValue != defaultFlagValue { |
|
enablePadding = true |
|
} |
|
}
|
|
|