mirror of https://github.com/XTLS/Xray-core
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
193 lines
5.0 KiB
193 lines
5.0 KiB
package wireguard |
|
|
|
import ( |
|
"context" |
|
goerrors "errors" |
|
"io" |
|
|
|
"github.com/xtls/xray-core/common" |
|
"github.com/xtls/xray-core/common/buf" |
|
"github.com/xtls/xray-core/common/errors" |
|
"github.com/xtls/xray-core/common/log" |
|
"github.com/xtls/xray-core/common/net" |
|
"github.com/xtls/xray-core/common/session" |
|
"github.com/xtls/xray-core/common/signal" |
|
"github.com/xtls/xray-core/common/task" |
|
"github.com/xtls/xray-core/core" |
|
"github.com/xtls/xray-core/features/dns" |
|
"github.com/xtls/xray-core/features/policy" |
|
"github.com/xtls/xray-core/features/routing" |
|
"github.com/xtls/xray-core/transport/internet/stat" |
|
) |
|
|
|
var nullDestination = net.TCPDestination(net.AnyIP, 0) |
|
|
|
type Server struct { |
|
bindServer *netBindServer |
|
|
|
info routingInfo |
|
policyManager policy.Manager |
|
} |
|
|
|
type routingInfo struct { |
|
ctx context.Context |
|
dispatcher routing.Dispatcher |
|
inboundTag *session.Inbound |
|
outboundTag *session.Outbound |
|
contentTag *session.Content |
|
} |
|
|
|
func NewServer(ctx context.Context, conf *DeviceConfig) (*Server, error) { |
|
v := core.MustFromContext(ctx) |
|
|
|
endpoints, hasIPv4, hasIPv6, err := parseEndpoints(conf) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
server := &Server{ |
|
bindServer: &netBindServer{ |
|
netBind: netBind{ |
|
dns: v.GetFeature(dns.ClientType()).(dns.Client), |
|
dnsOption: dns.IPOption{ |
|
IPv4Enable: hasIPv4, |
|
IPv6Enable: hasIPv6, |
|
}, |
|
}, |
|
}, |
|
policyManager: v.GetFeature(policy.ManagerType()).(policy.Manager), |
|
} |
|
|
|
tun, err := conf.createTun()(endpoints, int(conf.Mtu), server.forwardConnection) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
if err = tun.BuildDevice(createIPCRequest(conf), server.bindServer); err != nil { |
|
_ = tun.Close() |
|
return nil, err |
|
} |
|
|
|
return server, nil |
|
} |
|
|
|
// Network implements proxy.Inbound. |
|
func (*Server) Network() []net.Network { |
|
return []net.Network{net.Network_UDP} |
|
} |
|
|
|
// Process implements proxy.Inbound. |
|
func (s *Server) Process(ctx context.Context, network net.Network, conn stat.Connection, dispatcher routing.Dispatcher) error { |
|
inbound := session.InboundFromContext(ctx) |
|
inbound.Name = "wireguard" |
|
inbound.CanSpliceCopy = 3 |
|
outbounds := session.OutboundsFromContext(ctx) |
|
ob := outbounds[len(outbounds)-1] |
|
|
|
s.info = routingInfo{ |
|
ctx: core.ToBackgroundDetachedContext(ctx), |
|
dispatcher: dispatcher, |
|
inboundTag: session.InboundFromContext(ctx), |
|
outboundTag: ob, |
|
contentTag: session.ContentFromContext(ctx), |
|
} |
|
|
|
ep, err := s.bindServer.ParseEndpoint(conn.RemoteAddr().String()) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
nep := ep.(*netEndpoint) |
|
nep.conn = conn |
|
|
|
reader := buf.NewPacketReader(conn) |
|
for { |
|
mpayload, err := reader.ReadMultiBuffer() |
|
if err != nil { |
|
return err |
|
} |
|
|
|
for _, payload := range mpayload { |
|
v, ok := <-s.bindServer.readQueue |
|
if !ok { |
|
return nil |
|
} |
|
i, err := payload.Read(v.buff) |
|
|
|
v.bytes = i |
|
v.endpoint = nep |
|
v.err = err |
|
v.waiter.Done() |
|
if err != nil && goerrors.Is(err, io.EOF) { |
|
nep.conn = nil |
|
return nil |
|
} |
|
} |
|
} |
|
} |
|
|
|
func (s *Server) forwardConnection(dest net.Destination, conn net.Conn) { |
|
if s.info.dispatcher == nil { |
|
errors.LogError(s.info.ctx, "unexpected: dispatcher == nil") |
|
return |
|
} |
|
defer conn.Close() |
|
|
|
ctx, cancel := context.WithCancel(core.ToBackgroundDetachedContext(s.info.ctx)) |
|
plcy := s.policyManager.ForLevel(0) |
|
timer := signal.CancelAfterInactivity(ctx, cancel, plcy.Timeouts.ConnectionIdle) |
|
|
|
ctx = log.ContextWithAccessMessage(ctx, &log.AccessMessage{ |
|
From: nullDestination, |
|
To: dest, |
|
Status: log.AccessAccepted, |
|
Reason: "", |
|
}) |
|
|
|
// what's this? |
|
// Session information should not be shared between different connections |
|
// why reuse them in server level? This will cause incorrect destoverride and unexpected routing behavior. |
|
// Disable it temporarily. Maybe s.info should be removed. |
|
|
|
// if s.info.inboundTag != nil { |
|
// ctx = session.ContextWithInbound(ctx, s.info.inboundTag) |
|
// } |
|
// if s.info.outboundTag != nil { |
|
// ctx = session.ContextWithOutbounds(ctx, []*session.Outbound{s.info.outboundTag}) |
|
// } |
|
// if s.info.contentTag != nil { |
|
// ctx = session.ContextWithContent(ctx, s.info.contentTag) |
|
// } |
|
|
|
link, err := s.info.dispatcher.Dispatch(ctx, dest) |
|
if err != nil { |
|
errors.LogErrorInner(s.info.ctx, err, "dispatch connection") |
|
} |
|
defer cancel() |
|
|
|
requestDone := func() error { |
|
defer timer.SetTimeout(plcy.Timeouts.DownlinkOnly) |
|
if err := buf.Copy(buf.NewReader(conn), link.Writer, buf.UpdateActivity(timer)); err != nil { |
|
return errors.New("failed to transport all TCP request").Base(err) |
|
} |
|
|
|
return nil |
|
} |
|
|
|
responseDone := func() error { |
|
defer timer.SetTimeout(plcy.Timeouts.UplinkOnly) |
|
if err := buf.Copy(link.Reader, buf.NewWriter(conn), buf.UpdateActivity(timer)); err != nil { |
|
return errors.New("failed to transport all TCP response").Base(err) |
|
} |
|
|
|
return nil |
|
} |
|
|
|
requestDonePost := task.OnSuccess(requestDone, task.Close(link.Writer)) |
|
if err := task.Run(ctx, requestDonePost, responseDone); err != nil { |
|
common.Interrupt(link.Reader) |
|
common.Interrupt(link.Writer) |
|
errors.LogDebugInner(s.info.ctx, err, "connection ends") |
|
return |
|
} |
|
}
|
|
|