Browse Source

prototype of quic transport

pull/1435/head
Darien Raymond 6 years ago
parent
commit
096bbd2c51
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
  1. 1
      common/net/system.go
  2. 93
      transport/internet/quic/config.pb.go
  3. 14
      transport/internet/quic/config.proto
  4. 110
      transport/internet/quic/conn.go
  5. 90
      transport/internet/quic/dialer.go
  6. 6
      transport/internet/quic/errors.generated.go
  7. 105
      transport/internet/quic/hub.go
  8. 21
      transport/internet/quic/pool.go
  9. 22
      transport/internet/quic/quic.go
  10. 89
      transport/internet/quic/quic_test.go

1
common/net/system.go

@ -56,5 +56,6 @@ type TCPListener = net.TCPListener
type UnixListener = net.UnixListener
var ResolveUnixAddr = net.ResolveUnixAddr
var ResolveUDPAddr = net.ResolveUDPAddr
type Resolver = net.Resolver

93
transport/internet/quic/config.pb.go

@ -0,0 +1,93 @@
package quic
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
math "math"
serial "v2ray.com/core/common/serial"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Config struct {
Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
Header *serial.TypedMessage `protobuf:"bytes,2,opt,name=header,proto3" json:"header,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Config) Reset() { *m = Config{} }
func (m *Config) String() string { return proto.CompactTextString(m) }
func (*Config) ProtoMessage() {}
func (*Config) Descriptor() ([]byte, []int) {
return fileDescriptor_462e2eb906061b36, []int{0}
}
func (m *Config) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Config.Unmarshal(m, b)
}
func (m *Config) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Config.Marshal(b, m, deterministic)
}
func (m *Config) XXX_Merge(src proto.Message) {
xxx_messageInfo_Config.Merge(m, src)
}
func (m *Config) XXX_Size() int {
return xxx_messageInfo_Config.Size(m)
}
func (m *Config) XXX_DiscardUnknown() {
xxx_messageInfo_Config.DiscardUnknown(m)
}
var xxx_messageInfo_Config proto.InternalMessageInfo
func (m *Config) GetKey() string {
if m != nil {
return m.Key
}
return ""
}
func (m *Config) GetHeader() *serial.TypedMessage {
if m != nil {
return m.Header
}
return nil
}
func init() {
proto.RegisterType((*Config)(nil), "v2ray.core.transport.internet.quic.Config")
}
func init() {
proto.RegisterFile("v2ray.com/core/transport/internet/quic/config.proto", fileDescriptor_462e2eb906061b36)
}
var fileDescriptor_462e2eb906061b36 = []byte{
// 226 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x8e, 0xb1, 0x4b, 0x03, 0x31,
0x14, 0x87, 0x49, 0x95, 0x03, 0xe3, 0x22, 0x37, 0x15, 0xa7, 0x72, 0x43, 0xe9, 0xf4, 0x22, 0xd7,
0xdd, 0xc1, 0x4e, 0x0e, 0x82, 0x1e, 0xd5, 0xa1, 0x8b, 0xc4, 0xf4, 0x59, 0x83, 0x26, 0xef, 0x7c,
0x49, 0x85, 0xfc, 0x4b, 0xfe, 0x95, 0x92, 0xc6, 0x3b, 0xc4, 0xa5, 0x53, 0x32, 0xfc, 0xbe, 0xef,
0x7d, 0x72, 0xf9, 0xd5, 0xb2, 0x4e, 0x60, 0xc8, 0x29, 0x43, 0x8c, 0x2a, 0xb2, 0xf6, 0xa1, 0x27,
0x8e, 0xca, 0xfa, 0x88, 0xec, 0x31, 0xaa, 0xcf, 0xbd, 0x35, 0xca, 0x90, 0x7f, 0xb5, 0x3b, 0xe8,
0x99, 0x22, 0xd5, 0xcd, 0x00, 0x31, 0xc2, 0x08, 0xc0, 0x00, 0x40, 0x06, 0x2e, 0xaf, 0xfe, 0x89,
0x0d, 0x39, 0x47, 0x5e, 0x05, 0x64, 0xab, 0x3f, 0x54, 0x4c, 0x3d, 0x6e, 0x9f, 0x1d, 0x86, 0xa0,
0x77, 0x58, 0xac, 0xcd, 0x46, 0x56, 0xab, 0xc3, 0x95, 0xfa, 0x42, 0x9e, 0xbc, 0x63, 0x9a, 0x8a,
0x99, 0x58, 0x9c, 0x75, 0xf9, 0x5b, 0x5f, 0xcb, 0xea, 0x0d, 0xf5, 0x16, 0x79, 0x3a, 0x99, 0x89,
0xc5, 0x79, 0x3b, 0x87, 0x3f, 0x09, 0x45, 0x0d, 0x45, 0x0d, 0xeb, 0xac, 0xbe, 0x2b, 0xe6, 0xee,
0x97, 0xba, 0x79, 0x94, 0x73, 0x43, 0x0e, 0x8e, 0x77, 0xdf, 0x8b, 0xcd, 0x69, 0x7e, 0xbf, 0x27,
0xcd, 0x53, 0xdb, 0xe9, 0x04, 0xab, 0x3c, 0x5e, 0x8f, 0xe3, 0xdb, 0x61, 0xfc, 0xb0, 0xb7, 0xe6,
0xa5, 0x3a, 0x94, 0x2f, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0x9a, 0x6a, 0xd1, 0x56, 0x46, 0x01,
0x00, 0x00,
}

14
transport/internet/quic/config.proto

@ -0,0 +1,14 @@
syntax = "proto3";
package v2ray.core.transport.internet.quic;
option csharp_namespace = "V2Ray.Core.Transport.Internet.Quic";
option go_package = "quic";
option java_package = "com.v2ray.core.transport.internet.quic";
option java_multiple_files = true;
import "v2ray.com/core/common/serial/typed_message.proto";
message Config {
string key = 1;
v2ray.core.common.serial.TypedMessage header = 2;
}

110
transport/internet/quic/conn.go

@ -0,0 +1,110 @@
package quic
import (
"time"
quic "github.com/lucas-clemente/quic-go"
"v2ray.com/core/common/net"
"v2ray.com/core/transport/internet"
)
type sysConn struct {
conn net.PacketConn
header internet.PacketHeader
}
func (c *sysConn) ReadFrom(p []byte) (int, net.Addr, error) {
if c.header == nil {
return c.conn.ReadFrom(p)
}
overhead := int(c.header.Size())
buffer := getBuffer()
defer putBuffer(buffer)
nBytes, addr, err := c.conn.ReadFrom(buffer[:len(p)+overhead])
if err != nil {
return 0, nil, err
}
copy(p, buffer[overhead:nBytes])
return nBytes - overhead, addr, nil
}
func (c *sysConn) WriteTo(p []byte, addr net.Addr) (int, error) {
if c.header == nil {
return c.conn.WriteTo(p, addr)
}
buffer := getBuffer()
defer putBuffer(buffer)
overhead := int(c.header.Size())
c.header.Serialize(buffer)
copy(buffer[overhead:], p)
nBytes, err := c.conn.WriteTo(buffer[:len(p)+overhead], addr)
if err != nil {
return 0, err
}
return nBytes - overhead, nil
}
func (c *sysConn) Close() error {
return c.conn.Close()
}
func (c *sysConn) LocalAddr() net.Addr {
return c.conn.LocalAddr()
}
func (c *sysConn) SetDeadline(t time.Time) error {
return c.conn.SetDeadline(t)
}
func (c *sysConn) SetReadDeadline(t time.Time) error {
return c.conn.SetReadDeadline(t)
}
func (c *sysConn) SetWriteDeadline(t time.Time) error {
return c.conn.SetWriteDeadline(t)
}
type interConn struct {
stream quic.Stream
local net.Addr
remote net.Addr
}
func (c *interConn) Read(b []byte) (int, error) {
return c.stream.Read(b)
}
func (c *interConn) Write(b []byte) (int, error) {
return c.stream.Write(b)
}
func (c *interConn) Close() error {
return c.stream.Close()
}
func (c *interConn) LocalAddr() net.Addr {
return c.local
}
func (c *interConn) RemoteAddr() net.Addr {
return c.remote
}
func (c *interConn) SetDeadline(t time.Time) error {
return c.stream.SetDeadline(t)
}
func (c *interConn) SetReadDeadline(t time.Time) error {
return c.stream.SetReadDeadline(t)
}
func (c *interConn) SetWriteDeadline(t time.Time) error {
return c.stream.SetWriteDeadline(t)
}

90
transport/internet/quic/dialer.go

@ -0,0 +1,90 @@
package quic
import (
"context"
"sync"
"v2ray.com/core/transport/internet/tls"
quic "github.com/lucas-clemente/quic-go"
"v2ray.com/core/common"
"v2ray.com/core/common/net"
"v2ray.com/core/transport/internet"
)
type clientSessions struct {
access sync.Mutex
sessions map[net.Destination]quic.Session
}
func (s *clientSessions) getSession(destAddr net.Addr, tlsConfig *tls.Config, sockopt *internet.SocketConfig) (quic.Session, error) {
s.access.Lock()
defer s.access.Unlock()
if s.sessions == nil {
s.sessions = make(map[net.Destination]quic.Session)
}
dest := net.DestinationFromAddr(destAddr)
if session, found := s.sessions[dest]; found {
return session, nil
}
conn, err := internet.ListenSystemPacket(context.Background(), &net.UDPAddr{
IP: []byte{0, 0, 0, 0},
Port: 0,
}, sockopt)
if err != nil {
return nil, err
}
config := &quic.Config{
Versions: []quic.VersionNumber{quic.VersionMilestone0_10_0},
ConnectionIDLength: 12,
KeepAlive: true,
}
session, err := quic.DialContext(context.Background(), conn, destAddr, "", tlsConfig.GetTLSConfig(tls.WithDestination(dest)), config)
if err != nil {
return nil, err
}
s.sessions[dest] = session
return session, nil
}
var client clientSessions
func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (internet.Connection, error) {
tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
if tlsConfig == nil {
return nil, newError("TLS not enabled for QUIC")
}
destAddr, err := net.ResolveUDPAddr("udp", dest.NetAddr())
if err != nil {
return nil, err
}
session, err := client.getSession(destAddr, tlsConfig, streamSettings.SocketSettings)
if err != nil {
return nil, err
}
conn, err := session.OpenStreamSync()
if err != nil {
return nil, err
}
return &interConn{
stream: conn,
local: session.LocalAddr(),
remote: destAddr,
}, nil
}
func init() {
common.Must(internet.RegisterTransportDialer(protocolName, Dial))
}

6
transport/internet/quic/errors.generated.go

@ -0,0 +1,6 @@
package quic
import "v2ray.com/core/common/errors"
type errPathObjHolder struct {}
func newError(values ...interface{}) *errors.Error { return errors.New(values...).WithPathObj(errPathObjHolder{}) }

105
transport/internet/quic/hub.go

@ -0,0 +1,105 @@
package quic
import (
"context"
quic "github.com/lucas-clemente/quic-go"
"v2ray.com/core/common"
"v2ray.com/core/common/net"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/tls"
)
// Listener is an internet.Listener that listens for TCP connections.
type Listener struct {
listener quic.Listener
addConn internet.ConnHandler
}
func (l *Listener) acceptStreams(session quic.Session) {
for {
stream, err := session.AcceptStream()
if err != nil {
newError("failed to accept stream").Base(err).WriteToLog()
session.Close()
return
}
conn := &interConn{
stream: stream,
local: session.LocalAddr(),
remote: session.RemoteAddr(),
}
l.addConn(conn)
}
}
func (l *Listener) keepAccepting() {
for {
conn, err := l.listener.Accept()
if err != nil {
newError("failed to accept QUIC sessions").Base(err).WriteToLog()
l.listener.Close()
return
}
go l.acceptStreams(conn)
}
}
// Addr implements internet.Listener.Addr.
func (v *Listener) Addr() net.Addr {
return v.listener.Addr()
}
// Close implements internet.Listener.Close.
func (v *Listener) Close() error {
return v.listener.Close()
}
// Listen creates a new Listener based on configurations.
func Listen(ctx context.Context, address net.Address, port net.Port, streamSettings *internet.MemoryStreamConfig, handler internet.ConnHandler) (internet.Listener, error) {
if address.Family().IsDomain() {
return nil, newError("domain address is not allows for listening quic")
}
tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
if tlsConfig == nil {
return nil, newError("TLS config not enabled for QUIC")
}
conn, err := internet.ListenSystemPacket(context.Background(), &net.UDPAddr{
IP: address.IP(),
Port: int(port),
}, streamSettings.SocketSettings)
if err != nil {
return nil, err
}
config := &quic.Config{
Versions: []quic.VersionNumber{quic.VersionMilestone0_10_0},
ConnectionIDLength: 12,
KeepAlive: true,
AcceptCookie: func(net.Addr, *quic.Cookie) bool { return true },
}
qListener, err := quic.Listen(conn, tlsConfig.GetTLSConfig(), config)
if err != nil {
return nil, err
}
listener := &Listener{
listener: qListener,
addConn: handler,
}
go listener.keepAccepting()
return listener, nil
}
func init() {
common.Must(internet.RegisterTransportListener(protocolName, Listen))
}

21
transport/internet/quic/pool.go

@ -0,0 +1,21 @@
package quic
import (
"sync"
"v2ray.com/core/common/bytespool"
)
var pool *sync.Pool
func init() {
pool = bytespool.GetPool(2048)
}
func getBuffer() []byte {
return pool.Get().([]byte)
}
func putBuffer(p []byte) {
pool.Put(p)
}

22
transport/internet/quic/quic.go

@ -0,0 +1,22 @@
package quic
import (
"v2ray.com/core/common"
"v2ray.com/core/transport/internet"
)
//go:generate errorgen
// Here is some modification needs to be done before update quic vendor.
// * use bytespool in buffer_pool.go
// * set MaxReceivePacketSize to 1452 - 32 (16 bytes auth, 16 bytes head)
//
//
const protocolName = "quic"
func init() {
common.Must(internet.RegisterProtocolConfigCreatorByName(protocolName, func() interface{} {
return new(Config)
}))
}

89
transport/internet/quic/quic_test.go

@ -0,0 +1,89 @@
package quic_test
import (
"context"
"crypto/rand"
"testing"
"time"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/net"
"v2ray.com/core/common/protocol/tls/cert"
"v2ray.com/core/testing/servers/udp"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/quic"
"v2ray.com/core/transport/internet/tls"
. "v2ray.com/ext/assert"
)
func TestQuicConnection(t *testing.T) {
assert := With(t)
port := udp.PickPort()
listener, err := quic.Listen(context.Background(), net.LocalHostIP, port, &internet.MemoryStreamConfig{
ProtocolName: "quic",
ProtocolSettings: &quic.Config{},
SecurityType: "tls",
SecuritySettings: &tls.Config{
Certificate: []*tls.Certificate{tls.ParseCertificate(cert.MustGenerate(nil, cert.DNSNames("www.v2ray.com"), cert.CommonName("www.v2ray.com")))},
},
}, func(conn internet.Connection) {
go func() {
defer conn.Close()
b := buf.New()
defer b.Release()
for {
b.Clear()
if _, err := b.ReadFrom(conn); err != nil {
return
}
nBytes, err := conn.Write(b.Bytes())
assert(err, IsNil)
assert(int32(nBytes), Equals, b.Len())
}
}()
})
assert(err, IsNil)
defer listener.Close()
time.Sleep(time.Second)
dctx := context.Background()
conn, err := quic.Dial(dctx, net.TCPDestination(net.LocalHostIP, port), &internet.MemoryStreamConfig{
ProtocolName: "quic",
ProtocolSettings: &quic.Config{},
SecurityType: "tls",
SecuritySettings: &tls.Config{
ServerName: "www.v2ray.com",
AllowInsecure: true,
},
})
assert(err, IsNil)
defer conn.Close()
const N = 1024
b1 := make([]byte, N)
common.Must2(rand.Read(b1))
b2 := buf.New()
nBytes, err := conn.Write(b1)
assert(nBytes, Equals, N)
assert(err, IsNil)
b2.Clear()
common.Must2(b2.ReadFullFrom(conn, N))
assert(b2.Bytes(), Equals, b1)
nBytes, err = conn.Write(b1)
assert(nBytes, Equals, N)
assert(err, IsNil)
b2.Clear()
common.Must2(b2.ReadFullFrom(conn, N))
assert(b2.Bytes(), Equals, b1)
}
Loading…
Cancel
Save