diff --git a/common/net/system.go b/common/net/system.go index daf1d2e7..19468794 100644 --- a/common/net/system.go +++ b/common/net/system.go @@ -56,5 +56,6 @@ type TCPListener = net.TCPListener type UnixListener = net.UnixListener var ResolveUnixAddr = net.ResolveUnixAddr +var ResolveUDPAddr = net.ResolveUDPAddr type Resolver = net.Resolver diff --git a/transport/internet/quic/config.pb.go b/transport/internet/quic/config.pb.go new file mode 100644 index 00000000..d5f0d1f0 --- /dev/null +++ b/transport/internet/quic/config.pb.go @@ -0,0 +1,93 @@ +package quic + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" + serial "v2ray.com/core/common/serial" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type Config struct { + Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + Header *serial.TypedMessage `protobuf:"bytes,2,opt,name=header,proto3" json:"header,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Config) Reset() { *m = Config{} } +func (m *Config) String() string { return proto.CompactTextString(m) } +func (*Config) ProtoMessage() {} +func (*Config) Descriptor() ([]byte, []int) { + return fileDescriptor_462e2eb906061b36, []int{0} +} + +func (m *Config) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Config.Unmarshal(m, b) +} +func (m *Config) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Config.Marshal(b, m, deterministic) +} +func (m *Config) XXX_Merge(src proto.Message) { + xxx_messageInfo_Config.Merge(m, src) +} +func (m *Config) XXX_Size() int { + return xxx_messageInfo_Config.Size(m) +} +func (m *Config) XXX_DiscardUnknown() { + xxx_messageInfo_Config.DiscardUnknown(m) +} + +var xxx_messageInfo_Config proto.InternalMessageInfo + +func (m *Config) GetKey() string { + if m != nil { + return m.Key + } + return "" +} + +func (m *Config) GetHeader() *serial.TypedMessage { + if m != nil { + return m.Header + } + return nil +} + +func init() { + proto.RegisterType((*Config)(nil), "v2ray.core.transport.internet.quic.Config") +} + +func init() { + proto.RegisterFile("v2ray.com/core/transport/internet/quic/config.proto", fileDescriptor_462e2eb906061b36) +} + +var fileDescriptor_462e2eb906061b36 = []byte{ + // 226 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x8e, 0xb1, 0x4b, 0x03, 0x31, + 0x14, 0x87, 0x49, 0x95, 0x03, 0xe3, 0x22, 0x37, 0x15, 0xa7, 0x72, 0x43, 0xe9, 0xf4, 0x22, 0xd7, + 0xdd, 0xc1, 0x4e, 0x0e, 0x82, 0x1e, 0xd5, 0xa1, 0x8b, 0xc4, 0xf4, 0x59, 0x83, 0x26, 0xef, 0x7c, + 0x49, 0x85, 0xfc, 0x4b, 0xfe, 0x95, 0x92, 0xc6, 0x3b, 0xc4, 0xa5, 0x53, 0x32, 0xfc, 0xbe, 0xef, + 0x7d, 0x72, 0xf9, 0xd5, 0xb2, 0x4e, 0x60, 0xc8, 0x29, 0x43, 0x8c, 0x2a, 0xb2, 0xf6, 0xa1, 0x27, + 0x8e, 0xca, 0xfa, 0x88, 0xec, 0x31, 0xaa, 0xcf, 0xbd, 0x35, 0xca, 0x90, 0x7f, 0xb5, 0x3b, 0xe8, + 0x99, 0x22, 0xd5, 0xcd, 0x00, 0x31, 0xc2, 0x08, 0xc0, 0x00, 0x40, 0x06, 0x2e, 0xaf, 0xfe, 0x89, + 0x0d, 0x39, 0x47, 0x5e, 0x05, 0x64, 0xab, 0x3f, 0x54, 0x4c, 0x3d, 0x6e, 0x9f, 0x1d, 0x86, 0xa0, + 0x77, 0x58, 0xac, 0xcd, 0x46, 0x56, 0xab, 0xc3, 0x95, 0xfa, 0x42, 0x9e, 0xbc, 0x63, 0x9a, 0x8a, + 0x99, 0x58, 0x9c, 0x75, 0xf9, 0x5b, 0x5f, 0xcb, 0xea, 0x0d, 0xf5, 0x16, 0x79, 0x3a, 0x99, 0x89, + 0xc5, 0x79, 0x3b, 0x87, 0x3f, 0x09, 0x45, 0x0d, 0x45, 0x0d, 0xeb, 0xac, 0xbe, 0x2b, 0xe6, 0xee, + 0x97, 0xba, 0x79, 0x94, 0x73, 0x43, 0x0e, 0x8e, 0x77, 0xdf, 0x8b, 0xcd, 0x69, 0x7e, 0xbf, 0x27, + 0xcd, 0x53, 0xdb, 0xe9, 0x04, 0xab, 0x3c, 0x5e, 0x8f, 0xe3, 0xdb, 0x61, 0xfc, 0xb0, 0xb7, 0xe6, + 0xa5, 0x3a, 0x94, 0x2f, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0x9a, 0x6a, 0xd1, 0x56, 0x46, 0x01, + 0x00, 0x00, +} diff --git a/transport/internet/quic/config.proto b/transport/internet/quic/config.proto new file mode 100644 index 00000000..ec4e251a --- /dev/null +++ b/transport/internet/quic/config.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package v2ray.core.transport.internet.quic; +option csharp_namespace = "V2Ray.Core.Transport.Internet.Quic"; +option go_package = "quic"; +option java_package = "com.v2ray.core.transport.internet.quic"; +option java_multiple_files = true; + +import "v2ray.com/core/common/serial/typed_message.proto"; + +message Config { + string key = 1; + v2ray.core.common.serial.TypedMessage header = 2; +} diff --git a/transport/internet/quic/conn.go b/transport/internet/quic/conn.go new file mode 100644 index 00000000..90bd2b07 --- /dev/null +++ b/transport/internet/quic/conn.go @@ -0,0 +1,110 @@ +package quic + +import ( + "time" + + quic "github.com/lucas-clemente/quic-go" + + "v2ray.com/core/common/net" + "v2ray.com/core/transport/internet" +) + +type sysConn struct { + conn net.PacketConn + header internet.PacketHeader +} + +func (c *sysConn) ReadFrom(p []byte) (int, net.Addr, error) { + if c.header == nil { + return c.conn.ReadFrom(p) + } + + overhead := int(c.header.Size()) + buffer := getBuffer() + defer putBuffer(buffer) + + nBytes, addr, err := c.conn.ReadFrom(buffer[:len(p)+overhead]) + if err != nil { + return 0, nil, err + } + + copy(p, buffer[overhead:nBytes]) + + return nBytes - overhead, addr, nil +} + +func (c *sysConn) WriteTo(p []byte, addr net.Addr) (int, error) { + if c.header == nil { + return c.conn.WriteTo(p, addr) + } + + buffer := getBuffer() + defer putBuffer(buffer) + + overhead := int(c.header.Size()) + c.header.Serialize(buffer) + copy(buffer[overhead:], p) + nBytes, err := c.conn.WriteTo(buffer[:len(p)+overhead], addr) + if err != nil { + return 0, err + } + return nBytes - overhead, nil +} + +func (c *sysConn) Close() error { + return c.conn.Close() +} + +func (c *sysConn) LocalAddr() net.Addr { + return c.conn.LocalAddr() +} + +func (c *sysConn) SetDeadline(t time.Time) error { + return c.conn.SetDeadline(t) +} + +func (c *sysConn) SetReadDeadline(t time.Time) error { + return c.conn.SetReadDeadline(t) +} + +func (c *sysConn) SetWriteDeadline(t time.Time) error { + return c.conn.SetWriteDeadline(t) +} + +type interConn struct { + stream quic.Stream + local net.Addr + remote net.Addr +} + +func (c *interConn) Read(b []byte) (int, error) { + return c.stream.Read(b) +} + +func (c *interConn) Write(b []byte) (int, error) { + return c.stream.Write(b) +} + +func (c *interConn) Close() error { + return c.stream.Close() +} + +func (c *interConn) LocalAddr() net.Addr { + return c.local +} + +func (c *interConn) RemoteAddr() net.Addr { + return c.remote +} + +func (c *interConn) SetDeadline(t time.Time) error { + return c.stream.SetDeadline(t) +} + +func (c *interConn) SetReadDeadline(t time.Time) error { + return c.stream.SetReadDeadline(t) +} + +func (c *interConn) SetWriteDeadline(t time.Time) error { + return c.stream.SetWriteDeadline(t) +} diff --git a/transport/internet/quic/dialer.go b/transport/internet/quic/dialer.go new file mode 100644 index 00000000..2d463d43 --- /dev/null +++ b/transport/internet/quic/dialer.go @@ -0,0 +1,90 @@ +package quic + +import ( + "context" + "sync" + + "v2ray.com/core/transport/internet/tls" + + quic "github.com/lucas-clemente/quic-go" + + "v2ray.com/core/common" + "v2ray.com/core/common/net" + "v2ray.com/core/transport/internet" +) + +type clientSessions struct { + access sync.Mutex + sessions map[net.Destination]quic.Session +} + +func (s *clientSessions) getSession(destAddr net.Addr, tlsConfig *tls.Config, sockopt *internet.SocketConfig) (quic.Session, error) { + s.access.Lock() + defer s.access.Unlock() + + if s.sessions == nil { + s.sessions = make(map[net.Destination]quic.Session) + } + + dest := net.DestinationFromAddr(destAddr) + + if session, found := s.sessions[dest]; found { + return session, nil + } + + conn, err := internet.ListenSystemPacket(context.Background(), &net.UDPAddr{ + IP: []byte{0, 0, 0, 0}, + Port: 0, + }, sockopt) + if err != nil { + return nil, err + } + + config := &quic.Config{ + Versions: []quic.VersionNumber{quic.VersionMilestone0_10_0}, + ConnectionIDLength: 12, + KeepAlive: true, + } + + session, err := quic.DialContext(context.Background(), conn, destAddr, "", tlsConfig.GetTLSConfig(tls.WithDestination(dest)), config) + if err != nil { + return nil, err + } + + s.sessions[dest] = session + return session, nil +} + +var client clientSessions + +func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (internet.Connection, error) { + tlsConfig := tls.ConfigFromStreamSettings(streamSettings) + if tlsConfig == nil { + return nil, newError("TLS not enabled for QUIC") + } + + destAddr, err := net.ResolveUDPAddr("udp", dest.NetAddr()) + if err != nil { + return nil, err + } + + session, err := client.getSession(destAddr, tlsConfig, streamSettings.SocketSettings) + if err != nil { + return nil, err + } + + conn, err := session.OpenStreamSync() + if err != nil { + return nil, err + } + + return &interConn{ + stream: conn, + local: session.LocalAddr(), + remote: destAddr, + }, nil +} + +func init() { + common.Must(internet.RegisterTransportDialer(protocolName, Dial)) +} diff --git a/transport/internet/quic/errors.generated.go b/transport/internet/quic/errors.generated.go new file mode 100644 index 00000000..b143ae17 --- /dev/null +++ b/transport/internet/quic/errors.generated.go @@ -0,0 +1,6 @@ +package quic + +import "v2ray.com/core/common/errors" + +type errPathObjHolder struct {} +func newError(values ...interface{}) *errors.Error { return errors.New(values...).WithPathObj(errPathObjHolder{}) } diff --git a/transport/internet/quic/hub.go b/transport/internet/quic/hub.go new file mode 100644 index 00000000..764425f8 --- /dev/null +++ b/transport/internet/quic/hub.go @@ -0,0 +1,105 @@ +package quic + +import ( + "context" + + quic "github.com/lucas-clemente/quic-go" + "v2ray.com/core/common" + "v2ray.com/core/common/net" + "v2ray.com/core/transport/internet" + "v2ray.com/core/transport/internet/tls" +) + +// Listener is an internet.Listener that listens for TCP connections. +type Listener struct { + listener quic.Listener + addConn internet.ConnHandler +} + +func (l *Listener) acceptStreams(session quic.Session) { + for { + stream, err := session.AcceptStream() + if err != nil { + newError("failed to accept stream").Base(err).WriteToLog() + session.Close() + return + } + + conn := &interConn{ + stream: stream, + local: session.LocalAddr(), + remote: session.RemoteAddr(), + } + + l.addConn(conn) + } + +} + +func (l *Listener) keepAccepting() { + for { + conn, err := l.listener.Accept() + if err != nil { + newError("failed to accept QUIC sessions").Base(err).WriteToLog() + l.listener.Close() + return + } + go l.acceptStreams(conn) + } +} + +// Addr implements internet.Listener.Addr. +func (v *Listener) Addr() net.Addr { + return v.listener.Addr() +} + +// Close implements internet.Listener.Close. +func (v *Listener) Close() error { + return v.listener.Close() +} + +// Listen creates a new Listener based on configurations. +func Listen(ctx context.Context, address net.Address, port net.Port, streamSettings *internet.MemoryStreamConfig, handler internet.ConnHandler) (internet.Listener, error) { + if address.Family().IsDomain() { + return nil, newError("domain address is not allows for listening quic") + } + + tlsConfig := tls.ConfigFromStreamSettings(streamSettings) + if tlsConfig == nil { + return nil, newError("TLS config not enabled for QUIC") + } + + conn, err := internet.ListenSystemPacket(context.Background(), &net.UDPAddr{ + IP: address.IP(), + Port: int(port), + }, streamSettings.SocketSettings) + + if err != nil { + return nil, err + } + + config := &quic.Config{ + Versions: []quic.VersionNumber{quic.VersionMilestone0_10_0}, + ConnectionIDLength: 12, + KeepAlive: true, + AcceptCookie: func(net.Addr, *quic.Cookie) bool { return true }, + } + + qListener, err := quic.Listen(conn, tlsConfig.GetTLSConfig(), config) + if err != nil { + return nil, err + } + + listener := &Listener{ + listener: qListener, + addConn: handler, + } + + go listener.keepAccepting() + + return listener, nil +} + +func init() { + common.Must(internet.RegisterTransportListener(protocolName, Listen)) +} diff --git a/transport/internet/quic/pool.go b/transport/internet/quic/pool.go new file mode 100644 index 00000000..497cb0c2 --- /dev/null +++ b/transport/internet/quic/pool.go @@ -0,0 +1,21 @@ +package quic + +import ( + "sync" + + "v2ray.com/core/common/bytespool" +) + +var pool *sync.Pool + +func init() { + pool = bytespool.GetPool(2048) +} + +func getBuffer() []byte { + return pool.Get().([]byte) +} + +func putBuffer(p []byte) { + pool.Put(p) +} diff --git a/transport/internet/quic/quic.go b/transport/internet/quic/quic.go new file mode 100644 index 00000000..3a9816e0 --- /dev/null +++ b/transport/internet/quic/quic.go @@ -0,0 +1,22 @@ +package quic + +import ( + "v2ray.com/core/common" + "v2ray.com/core/transport/internet" +) + +//go:generate errorgen + +// Here is some modification needs to be done before update quic vendor. +// * use bytespool in buffer_pool.go +// * set MaxReceivePacketSize to 1452 - 32 (16 bytes auth, 16 bytes head) +// +// + +const protocolName = "quic" + +func init() { + common.Must(internet.RegisterProtocolConfigCreatorByName(protocolName, func() interface{} { + return new(Config) + })) +} diff --git a/transport/internet/quic/quic_test.go b/transport/internet/quic/quic_test.go new file mode 100644 index 00000000..3155a83b --- /dev/null +++ b/transport/internet/quic/quic_test.go @@ -0,0 +1,89 @@ +package quic_test + +import ( + "context" + "crypto/rand" + "testing" + "time" + + "v2ray.com/core/common" + "v2ray.com/core/common/buf" + "v2ray.com/core/common/net" + "v2ray.com/core/common/protocol/tls/cert" + "v2ray.com/core/testing/servers/udp" + "v2ray.com/core/transport/internet" + "v2ray.com/core/transport/internet/quic" + "v2ray.com/core/transport/internet/tls" + . "v2ray.com/ext/assert" +) + +func TestQuicConnection(t *testing.T) { + assert := With(t) + + port := udp.PickPort() + + listener, err := quic.Listen(context.Background(), net.LocalHostIP, port, &internet.MemoryStreamConfig{ + ProtocolName: "quic", + ProtocolSettings: &quic.Config{}, + SecurityType: "tls", + SecuritySettings: &tls.Config{ + Certificate: []*tls.Certificate{tls.ParseCertificate(cert.MustGenerate(nil, cert.DNSNames("www.v2ray.com"), cert.CommonName("www.v2ray.com")))}, + }, + }, func(conn internet.Connection) { + go func() { + defer conn.Close() + + b := buf.New() + defer b.Release() + + for { + b.Clear() + if _, err := b.ReadFrom(conn); err != nil { + return + } + nBytes, err := conn.Write(b.Bytes()) + assert(err, IsNil) + assert(int32(nBytes), Equals, b.Len()) + } + }() + }) + assert(err, IsNil) + + defer listener.Close() + + time.Sleep(time.Second) + + dctx := context.Background() + conn, err := quic.Dial(dctx, net.TCPDestination(net.LocalHostIP, port), &internet.MemoryStreamConfig{ + ProtocolName: "quic", + ProtocolSettings: &quic.Config{}, + SecurityType: "tls", + SecuritySettings: &tls.Config{ + ServerName: "www.v2ray.com", + AllowInsecure: true, + }, + }) + assert(err, IsNil) + defer conn.Close() + + const N = 1024 + b1 := make([]byte, N) + common.Must2(rand.Read(b1)) + b2 := buf.New() + + nBytes, err := conn.Write(b1) + assert(nBytes, Equals, N) + assert(err, IsNil) + + b2.Clear() + common.Must2(b2.ReadFullFrom(conn, N)) + assert(b2.Bytes(), Equals, b1) + + nBytes, err = conn.Write(b1) + assert(nBytes, Equals, N) + assert(err, IsNil) + + b2.Clear() + common.Must2(b2.ReadFullFrom(conn, N)) + assert(b2.Bytes(), Equals, b1) +}