Compare commits

..

50 Commits

Author SHA1 Message Date
v2ray
18d75cb7b4 bug fixes to dns server 2016-05-16 11:53:18 -07:00
v2ray
49c8c31e26 test case for dns config 2016-05-16 09:19:55 -07:00
v2ray
10d1d9288b default value of dns config 2016-05-16 09:05:01 -07:00
v2ray
dac1339d6e Use dns in router 2016-05-16 00:25:34 -07:00
v2ray
3b545abe02 dns client implementation 2016-05-15 23:09:28 -07:00
v2ray
ac4d92a186 fix udp connection mapping 2016-05-13 19:35:59 -07:00
v2ray
634c4964cc Massive fixes 2016-05-12 17:20:07 -07:00
v2ray
3c02805186 realtime logger 2016-05-12 17:18:07 -07:00
v2ray
9a3c7a03c9 rename shadowsocks server 2016-05-12 10:42:08 -07:00
v2ray
0cedf8ad7d generator for china ip 2016-05-11 23:45:35 -07:00
v2ray
7e481fe943 rename retry_test package 2016-05-11 23:27:54 -07:00
v2ray
abdcda0a2f reorg common/log 2016-05-11 23:24:41 -07:00
v2ray
a37819c330 rename RegisterApp to Register 2016-05-11 12:05:58 -07:00
v2ray
c224f67666 actually return nil 2016-05-11 11:54:55 -07:00
v2ray
7fd94e1116 clear logic in v2io.Pipe 2016-05-11 11:54:29 -07:00
v2ray
e9ae553f78 test cases for v2io 2016-05-11 10:54:20 -07:00
v2ray
61ce85435f respond to system signals 2016-05-10 15:51:38 -07:00
v2ray
8fbca309df better management on udp connections 2016-05-10 15:43:02 -07:00
v2ray
1bd893501c reduce time of waiting for channel empty 2016-05-09 08:23:46 -07:00
v2ray
ff210aa67f agressively close stream 2016-05-09 08:23:31 -07:00
v2ray
9b511fb071 Fix a typo 2016-05-09 08:04:55 -07:00
v2ray
2739cf2f4a disable access log if log level = none 2016-05-08 23:01:27 -07:00
v2ray
2f9b8a955c remove unnecessary access logger init 2016-05-08 23:01:14 -07:00
v2ray
663b9c9bbc busy wait on writing 2016-05-08 19:04:51 -07:00
v2ray
b5f43031d4 refactor alter id generation 2016-05-07 21:07:46 +02:00
v2ray
8a07534586 remove unnecessary package alias 2016-05-07 20:26:29 +02:00
v2ray
73e84b1e06 account 2016-05-07 20:26:03 +02:00
v2ray
ef200c3c5e http client 2016-05-07 14:08:27 +02:00
v2ray
f2c656843e allow tls connection in http proxy 2016-05-07 10:36:36 +02:00
v2ray
b65017d28d Fix build break 2016-05-07 10:06:56 +02:00
v2ray
301b0ccff7 refine cert config in http 2016-05-07 10:06:12 +02:00
v2ray
67db5830be agreesively close rays 2016-05-07 09:53:15 +02:00
v2ray
8d1f06ebaf clean up on error 2016-05-07 00:19:06 +02:00
v2ray
0bc846f016 Fix ending in vmess reader. 2016-05-05 00:24:18 +02:00
v2ray
a7f61af79b fix auth reader 2016-05-04 23:41:24 +02:00
v2ray
03d6c648ab explictly invoke GC 2016-05-04 23:41:09 +02:00
v2ray
f5efd3f997 mips64 2016-05-03 17:13:58 +02:00
v2ray
34ea1538dd golang 1.6.2 2016-05-03 17:13:10 +02:00
v2ray
4a38882779 Merge branch 'master' of https://github.com/v2ray/v2ray-core 2016-05-03 14:57:16 +02:00
v2ray
c20c44526c Fix buffer overrun in vmess 2016-05-03 14:57:09 +02:00
Darien Raymond
ed8f5b732e Merge pull request #131 from wtlusvm/master
Fix #127
2016-05-03 11:44:20 +02:00
wtlusvm
5ab9d5476e Fix #127 2016-05-03 06:06:07 +00:00
v2ray
7d43952690 tcphub now takes tls config as construtor parameter 2016-05-03 00:16:07 +02:00
v2ray
f594f5b606 simplify tcp connection 2016-05-02 23:53:16 +02:00
v2ray
406360ed3e fix test servers 2016-05-02 23:02:40 +02:00
v2ray
7cb784b678 initial check in for mobile 2016-05-02 22:59:05 +02:00
v2ray
31d03a893a test case for port range 2016-05-02 18:56:58 +02:00
v2ray
18069d377c Release stream when conn ends 2016-05-02 14:03:58 +02:00
v2ray
36bf645199 Fix a dead lock issue in VMess 2016-05-02 14:03:31 +02:00
v2ray
0442000964 fix nil reference issue in shadowsocks 2016-05-02 09:38:50 +02:00
92 changed files with 8313 additions and 7286 deletions

View File

@@ -1,7 +1,7 @@
language: go
go:
- 1.6
- 1.6.2
before_install:
- go get golang.org/x/tools/cmd/cover
@@ -30,6 +30,7 @@ deploy:
- "$GOPATH/bin/v2ray-linux-32.zip"
- "$GOPATH/bin/v2ray-linux-arm.zip"
- "$GOPATH/bin/v2ray-linux-arm64.zip"
- "$GOPATH/bin/v2ray-linux-mips64.zip"
- "$GOPATH/bin/metadata.txt"
skip_cleanup: true
on:

View File

@@ -29,7 +29,7 @@ func (this *contextedPacketDispatcher) DispatchToOutbound(destination v2net.Dest
}
func init() {
app.RegisterApp(APP_ID, func(context app.Context, obj interface{}) interface{} {
app.Register(APP_ID, func(context app.Context, obj interface{}) interface{} {
packetDispatcher := obj.(packetDispatcherWithContext)
return &contextedPacketDispatcher{
context: context,

9
app/dns/config.go Normal file
View File

@@ -0,0 +1,9 @@
package dns
import (
v2net "github.com/v2ray/v2ray-core/common/net"
)
type Config struct {
NameServers []v2net.Destination
}

25
app/dns/config_json.go Normal file
View File

@@ -0,0 +1,25 @@
// +build json
package dns
import (
"encoding/json"
v2net "github.com/v2ray/v2ray-core/common/net"
)
func (this *Config) UnmarshalJSON(data []byte) error {
type JsonConfig struct {
Servers []v2net.AddressJson `json:"servers"`
}
jsonConfig := new(JsonConfig)
if err := json.Unmarshal(data, jsonConfig); err != nil {
return err
}
this.NameServers = make([]v2net.Destination, len(jsonConfig.Servers))
for idx, server := range jsonConfig.Servers {
this.NameServers[idx] = v2net.UDPDestination(server.Address, v2net.Port(53))
}
return nil
}

View File

@@ -0,0 +1,30 @@
// +build json
package dns_test
import (
"encoding/json"
"testing"
. "github.com/v2ray/v2ray-core/app/dns"
v2net "github.com/v2ray/v2ray-core/common/net"
netassert "github.com/v2ray/v2ray-core/common/net/testing/assert"
v2testing "github.com/v2ray/v2ray-core/testing"
"github.com/v2ray/v2ray-core/testing/assert"
)
func TestConfigParsing(t *testing.T) {
v2testing.Current(t)
rawJson := `{
"servers": ["8.8.8.8"]
}`
config := new(Config)
err := json.Unmarshal([]byte(rawJson), config)
assert.Error(err).IsNil()
assert.Int(len(config.NameServers)).Equals(1)
netassert.Destination(config.NameServers[0]).IsUDP()
netassert.Address(config.NameServers[0].Address()).Equals(v2net.IPAddress([]byte{8, 8, 8, 8}))
netassert.Port(config.NameServers[0].Port()).Equals(v2net.Port(53))
}

View File

@@ -11,33 +11,27 @@ const (
)
// A DnsCache is an internal cache of DNS resolutions.
type DnsCache interface {
Get(domain string) net.IP
Add(domain string, ip net.IP)
type Server interface {
Get(domain string) []net.IP
}
type dnsCacheWithContext interface {
Get(context app.Context, domain string) net.IP
Add(contaxt app.Context, domain string, ip net.IP)
type dnsServerWithContext interface {
Get(context app.Context, domain string) []net.IP
}
type contextedDnsCache struct {
type contextedDnsServer struct {
context app.Context
dnsCache dnsCacheWithContext
dnsCache dnsServerWithContext
}
func (this *contextedDnsCache) Get(domain string) net.IP {
func (this *contextedDnsServer) Get(domain string) []net.IP {
return this.dnsCache.Get(this.context, domain)
}
func (this *contextedDnsCache) Add(domain string, ip net.IP) {
this.dnsCache.Add(this.context, domain, ip)
}
func init() {
app.RegisterApp(APP_ID, func(context app.Context, obj interface{}) interface{} {
dcContext := obj.(dnsCacheWithContext)
return &contextedDnsCache{
app.Register(APP_ID, func(context app.Context, obj interface{}) interface{} {
dcContext := obj.(dnsServerWithContext)
return &contextedDnsServer{
context: context,
dnsCache: dcContext,
}

View File

@@ -1,14 +0,0 @@
package internal
import (
"github.com/v2ray/v2ray-core/common/serial"
)
type CacheConfig struct {
TrustedTags map[serial.StringLiteral]bool
}
func (this *CacheConfig) IsTrustedSource(tag serial.StringLiteral) bool {
_, found := this.TrustedTags[tag]
return found
}

View File

@@ -1,23 +0,0 @@
// +build json
package internal
import (
"encoding/json"
"github.com/v2ray/v2ray-core/common/serial"
)
func (this *CacheConfig) UnmarshalJSON(data []byte) error {
var strlist serial.StringLiteralList
if err := json.Unmarshal(data, strlist); err != nil {
return err
}
config := &CacheConfig{
TrustedTags: make(map[serial.StringLiteral]bool, strlist.Len()),
}
for _, str := range strlist {
config.TrustedTags[str.TrimSpace()] = true
}
return nil
}

View File

@@ -1,62 +0,0 @@
package internal
import (
"net"
"time"
"github.com/v2ray/v2ray-core/app"
"github.com/v2ray/v2ray-core/common/collect"
"github.com/v2ray/v2ray-core/common/serial"
)
type entry struct {
domain string
ip net.IP
validUntil time.Time
}
func newEntry(domain string, ip net.IP) *entry {
this := &entry{
domain: domain,
ip: ip,
}
this.Extend()
return this
}
func (this *entry) IsValid() bool {
return this.validUntil.After(time.Now())
}
func (this *entry) Extend() {
this.validUntil = time.Now().Add(time.Hour)
}
type DnsCache struct {
cache *collect.ValidityMap
config *CacheConfig
}
func NewCache(config *CacheConfig) *DnsCache {
cache := &DnsCache{
cache: collect.NewValidityMap(3600),
config: config,
}
return cache
}
func (this *DnsCache) Add(context app.Context, domain string, ip net.IP) {
callerTag := context.CallerTag()
if !this.config.IsTrustedSource(serial.StringLiteral(callerTag)) {
return
}
this.cache.Set(serial.StringLiteral(domain), newEntry(domain, ip))
}
func (this *DnsCache) Get(context app.Context, domain string) net.IP {
if value := this.cache.Get(serial.StringLiteral(domain)); value != nil {
return value.(*entry).ip
}
return nil
}

View File

@@ -1,33 +0,0 @@
package internal_test
import (
"net"
"testing"
. "github.com/v2ray/v2ray-core/app/dns/internal"
apptesting "github.com/v2ray/v2ray-core/app/testing"
netassert "github.com/v2ray/v2ray-core/common/net/testing/assert"
"github.com/v2ray/v2ray-core/common/serial"
v2testing "github.com/v2ray/v2ray-core/testing"
)
func TestDnsAdd(t *testing.T) {
v2testing.Current(t)
domain := "v2ray.com"
cache := NewCache(&CacheConfig{
TrustedTags: map[serial.StringLiteral]bool{
serial.StringLiteral("testtag"): true,
},
})
ip := cache.Get(&apptesting.Context{}, domain)
netassert.IP(ip).IsNil()
cache.Add(&apptesting.Context{CallerTagValue: "notvalidtag"}, domain, []byte{1, 2, 3, 4})
ip = cache.Get(&apptesting.Context{}, domain)
netassert.IP(ip).IsNil()
cache.Add(&apptesting.Context{CallerTagValue: "testtag"}, domain, []byte{1, 2, 3, 4})
ip = cache.Get(&apptesting.Context{}, domain)
netassert.IP(ip).Equals(net.IP([]byte{1, 2, 3, 4}))
}

183
app/dns/nameserver.go Normal file
View File

@@ -0,0 +1,183 @@
package dns
import (
"math/rand"
"net"
"sync"
"time"
"github.com/v2ray/v2ray-core/app/dispatcher"
"github.com/v2ray/v2ray-core/common/alloc"
"github.com/v2ray/v2ray-core/common/log"
v2net "github.com/v2ray/v2ray-core/common/net"
"github.com/v2ray/v2ray-core/transport/hub"
"github.com/miekg/dns"
)
const (
DefaultTTL = uint32(3600)
)
type ARecord struct {
IPs []net.IP
Expire time.Time
}
type NameServer interface {
QueryA(domain string) <-chan *ARecord
}
type PendingRequest struct {
expire time.Time
response chan<- *ARecord
}
type UDPNameServer struct {
sync.Mutex
address v2net.Destination
requests map[uint16]*PendingRequest
udpServer *hub.UDPServer
}
func NewUDPNameServer(address v2net.Destination, dispatcher dispatcher.PacketDispatcher) *UDPNameServer {
s := &UDPNameServer{
address: address,
requests: make(map[uint16]*PendingRequest),
udpServer: hub.NewUDPServer(dispatcher),
}
go s.Cleanup()
return s
}
// @Private
func (this *UDPNameServer) Cleanup() {
for {
time.Sleep(time.Second * 60)
expiredRequests := make([]uint16, 0, 16)
now := time.Now()
this.Lock()
for id, r := range this.requests {
if r.expire.Before(now) {
expiredRequests = append(expiredRequests, id)
close(r.response)
}
}
for _, id := range expiredRequests {
delete(this.requests, id)
}
this.Unlock()
expiredRequests = nil
}
}
// @Private
func (this *UDPNameServer) AssignUnusedID(response chan<- *ARecord) uint16 {
var id uint16
this.Lock()
for {
id = uint16(rand.Intn(65536))
if _, found := this.requests[id]; found {
continue
}
log.Debug("DNS: Add pending request id ", id)
this.requests[id] = &PendingRequest{
expire: time.Now().Add(time.Second * 16),
response: response,
}
break
}
this.Unlock()
return id
}
// @Private
func (this *UDPNameServer) HandleResponse(dest v2net.Destination, payload *alloc.Buffer) {
msg := new(dns.Msg)
err := msg.Unpack(payload.Value)
if err != nil {
log.Warning("DNS: Failed to parse DNS response: ", err)
return
}
record := &ARecord{
IPs: make([]net.IP, 0, 16),
}
id := msg.Id
ttl := DefaultTTL
log.Debug("DNS: Handling response for id ", id, " content: ", msg.String())
this.Lock()
request, found := this.requests[id]
if !found {
this.Unlock()
return
}
delete(this.requests, id)
this.Unlock()
for _, rr := range msg.Answer {
switch rr := rr.(type) {
case *dns.A:
record.IPs = append(record.IPs, rr.A)
if rr.Hdr.Ttl < ttl {
ttl = rr.Hdr.Ttl
}
case *dns.AAAA:
record.IPs = append(record.IPs, rr.AAAA)
if rr.Hdr.Ttl < ttl {
ttl = rr.Hdr.Ttl
}
}
}
record.Expire = time.Now().Add(time.Second * time.Duration(ttl))
request.response <- record
close(request.response)
}
func (this *UDPNameServer) QueryA(domain string) <-chan *ARecord {
response := make(chan *ARecord, 1)
buffer := alloc.NewBuffer()
msg := new(dns.Msg)
msg.Id = this.AssignUnusedID(response)
msg.RecursionDesired = true
msg.Question = []dns.Question{
dns.Question{
Name: dns.Fqdn(domain),
Qtype: dns.TypeA,
Qclass: dns.ClassINET,
}}
writtenBuffer, _ := msg.PackBuffer(buffer.Value)
buffer.Slice(0, len(writtenBuffer))
fakeDestination := v2net.UDPDestination(v2net.LocalHostIP, v2net.Port(53))
this.udpServer.Dispatch(fakeDestination, this.address, buffer, this.HandleResponse)
return response
}
type LocalNameServer struct {
}
func (this *LocalNameServer) QueryA(domain string) <-chan *ARecord {
response := make(chan *ARecord, 1)
go func() {
defer close(response)
ips, err := net.LookupIP(domain)
if err != nil {
log.Info("DNS: Failed to lookup IPs for domain ", domain)
return
}
response <- &ARecord{
IPs: ips,
Expire: time.Now().Add(time.Second * time.Duration(DefaultTTL)),
}
}()
return response
}

83
app/dns/server.go Normal file
View File

@@ -0,0 +1,83 @@
package dns
import (
"net"
"sync"
"time"
"github.com/v2ray/v2ray-core/app"
"github.com/v2ray/v2ray-core/app/dispatcher"
"github.com/v2ray/v2ray-core/common/log"
"github.com/miekg/dns"
)
const (
QueryTimeout = time.Second * 8
)
type DomainRecord struct {
A *ARecord
}
type CacheServer struct {
sync.RWMutex
records map[string]*DomainRecord
servers []NameServer
}
func NewCacheServer(space app.Space, config *Config) *CacheServer {
server := &CacheServer{
records: make(map[string]*DomainRecord),
servers: make([]NameServer, len(config.NameServers)),
}
dispatcher := space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher)
for idx, ns := range config.NameServers {
if ns.Address().IsDomain() && ns.Address().Domain() == "localhost" {
server.servers[idx] = &LocalNameServer{}
} else {
server.servers[idx] = NewUDPNameServer(ns, dispatcher)
}
}
return server
}
//@Private
func (this *CacheServer) GetCached(domain string) []net.IP {
this.RLock()
defer this.RUnlock()
if record, found := this.records[domain]; found && record.A.Expire.After(time.Now()) {
return record.A.IPs
}
return nil
}
func (this *CacheServer) Get(context app.Context, domain string) []net.IP {
domain = dns.Fqdn(domain)
ips := this.GetCached(domain)
if ips != nil {
return ips
}
for _, server := range this.servers {
response := server.QueryA(domain)
select {
case a, open := <-response:
if !open || a == nil {
continue
}
this.Lock()
this.records[domain] = &DomainRecord{
A: a,
}
this.Unlock()
log.Debug("DNS: Returning ", len(a.IPs), " IPs for domain ", domain)
return a.IPs
case <-time.After(QueryTimeout):
}
}
log.Debug("DNS: Returning nil for domain ", domain)
return nil
}

59
app/dns/server_test.go Normal file
View File

@@ -0,0 +1,59 @@
package dns_test
import (
"net"
"testing"
"github.com/v2ray/v2ray-core/app"
"github.com/v2ray/v2ray-core/app/dispatcher"
. "github.com/v2ray/v2ray-core/app/dns"
apptesting "github.com/v2ray/v2ray-core/app/testing"
v2net "github.com/v2ray/v2ray-core/common/net"
netassert "github.com/v2ray/v2ray-core/common/net/testing/assert"
"github.com/v2ray/v2ray-core/proxy/freedom"
v2testing "github.com/v2ray/v2ray-core/testing"
"github.com/v2ray/v2ray-core/testing/assert"
"github.com/v2ray/v2ray-core/transport/ray"
)
type TestDispatcher struct {
freedom *freedom.FreedomConnection
}
func (this *TestDispatcher) DispatchToOutbound(context app.Context, dest v2net.Destination) ray.InboundRay {
direct := ray.NewRay()
go func() {
payload, err := direct.OutboundInput().Read()
if err != nil {
direct.OutboundInput().Release()
direct.OutboundOutput().Release()
return
}
this.freedom.Dispatch(dest, payload, direct)
}()
return direct
}
func TestDnsAdd(t *testing.T) {
v2testing.Current(t)
d := &TestDispatcher{
freedom: &freedom.FreedomConnection{},
}
spaceController := app.NewController()
spaceController.Bind(dispatcher.APP_ID, d)
space := spaceController.ForContext("test")
domain := "local.v2ray.com"
server := NewCacheServer(space, &Config{
NameServers: []v2net.Destination{
v2net.UDPDestination(v2net.IPAddress([]byte{8, 8, 8, 8}), v2net.Port(53)),
},
})
ips := server.Get(&apptesting.Context{
CallerTagValue: "a",
}, domain)
assert.Int(len(ips)).Equals(1)
netassert.IP(ips[0].To4()).Equals(net.IP([]byte{127, 0, 0, 1}))
}

View File

@@ -27,7 +27,7 @@ func (this *inboundHandlerManagerWithContextImpl) GetHandler(tag string) (proxy.
}
func init() {
app.RegisterApp(APP_ID_INBOUND_MANAGER, func(context app.Context, obj interface{}) interface{} {
app.Register(APP_ID_INBOUND_MANAGER, func(context app.Context, obj interface{}) interface{} {
manager := obj.(inboundHandlerManagerWithContext)
return &inboundHandlerManagerWithContextImpl{
context: context,

View File

@@ -1,6 +1,7 @@
package router
import (
"github.com/v2ray/v2ray-core/app"
v2net "github.com/v2ray/v2ray-core/common/net"
)
@@ -9,7 +10,7 @@ type Router interface {
}
type RouterFactory interface {
Create(rawConfig interface{}) (Router, error)
Create(rawConfig interface{}, space app.Space) (Router, error)
}
var (
@@ -22,9 +23,9 @@ func RegisterRouter(name string, factory RouterFactory) error {
return nil
}
func CreateRouter(name string, rawConfig interface{}) (Router, error) {
func CreateRouter(name string, rawConfig interface{}, space app.Space) (Router, error) {
if factory, found := routerCache[name]; found {
return factory.Create(rawConfig)
return factory.Create(rawConfig, space)
}
return nil, ErrorRouterNotFound
}

View File

@@ -21,7 +21,7 @@ func TestRouter(t *testing.T) {
pointConfig, err := point.LoadConfig(filepath.Join(baseDir, "vpoint_socks_vmess.json"))
assert.Error(err).IsNil()
router, err := CreateRouter(pointConfig.RouterConfig.Strategy, pointConfig.RouterConfig.Settings)
router, err := CreateRouter(pointConfig.RouterConfig.Strategy, pointConfig.RouterConfig.Settings, nil)
assert.Error(err).IsNil()
dest := v2net.TCPDestination(v2net.IPAddress(net.ParseIP("120.135.126.1")), 80)

File diff suppressed because it is too large Load Diff

View File

@@ -1,11 +1,15 @@
// +build generate
package main
import (
"bufio"
"fmt"
"log"
"math"
"net"
"net/http"
"os"
"strconv"
"strings"
@@ -52,9 +56,28 @@ func main() {
ipNet.Add(t)
}
dump := ipNet.Serialize()
fmt.Println("map[uint32]byte {")
for i := 0; i < len(dump); i += 2 {
fmt.Println(dump[i], ": ", dump[i+1], ",")
file, err := os.OpenFile("chinaip_init.go", os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
if err != nil {
log.Fatalf("Failed to generate chinaip_init.go: %v", err)
}
fmt.Println("}")
defer file.Close()
fmt.Fprintln(file, "package rules")
fmt.Fprintln(file, "import (")
fmt.Fprintln(file, "v2net \"github.com/v2ray/v2ray-core/common/net\"")
fmt.Fprintln(file, ")")
fmt.Fprintln(file, "var (")
fmt.Fprintln(file, "chinaIPNet *v2net.IPNet")
fmt.Fprintln(file, ")")
fmt.Fprintln(file, "func init() {")
fmt.Fprintln(file, "chinaIPNet = v2net.NewIPNetInitialValue(map[uint32]byte {")
for i := 0; i < len(dump); i += 2 {
fmt.Fprintln(file, dump[i], ": ", dump[i+1], ",")
}
fmt.Fprintln(file, "})")
fmt.Fprintln(file, "}")
}

File diff suppressed because it is too large Load Diff

View File

@@ -22,6 +22,7 @@ func TestChinaIP(t *testing.T) {
assert.Bool(rule.Apply(makeDestination("101.226.103.106"))).IsTrue() // qq.com
assert.Bool(rule.Apply(makeDestination("115.239.210.36"))).IsTrue() // image.baidu.com
assert.Bool(rule.Apply(makeDestination("120.135.126.1"))).IsTrue()
assert.Bool(rule.Apply(makeDestination("101.201.173.126"))).IsTrue()
assert.Bool(rule.Apply(makeDestination("8.8.8.8"))).IsFalse()
}

View File

@@ -13,7 +13,15 @@ func (this *Rule) Apply(dest v2net.Destination) bool {
return this.Condition.Apply(dest)
}
type DomainStrategy int
var (
DomainAsIs = DomainStrategy(0)
AlwaysUseIP = DomainStrategy(1)
UseIPIfNonMatch = DomainStrategy(2)
)
type RouterRuleConfig struct {
Rules []*Rule
ResolveDomain bool
Rules []*Rule
DomainStrategy DomainStrategy
}

View File

@@ -117,16 +117,22 @@ func ParseRule(msg json.RawMessage) *Rule {
func init() {
router.RegisterRouterConfig("rules", func(data []byte) (interface{}, error) {
type JsonConfig struct {
RuleList []json.RawMessage `json:"rules"`
ResolveDomain bool `json:"resolveDomain"`
RuleList []json.RawMessage `json:"rules"`
DomainStrategy string `json:"domainStrategy"`
}
jsonConfig := new(JsonConfig)
if err := json.Unmarshal(data, jsonConfig); err != nil {
return nil, err
}
config := &RouterRuleConfig{
Rules: make([]*Rule, len(jsonConfig.RuleList)),
ResolveDomain: jsonConfig.ResolveDomain,
Rules: make([]*Rule, len(jsonConfig.RuleList)),
DomainStrategy: DomainAsIs,
}
domainStrategy := serial.StringLiteral(jsonConfig.DomainStrategy).ToLower()
if domainStrategy.String() == "alwaysip" {
config.DomainStrategy = AlwaysUseIP
} else if domainStrategy.String() == "ipifnonmatch" {
config.DomainStrategy = UseIPIfNonMatch
}
for idx, rawRule := range jsonConfig.RuleList {
rule := ParseRule(rawRule)

View File

@@ -4,8 +4,11 @@ import (
"errors"
"time"
"github.com/v2ray/v2ray-core/app"
"github.com/v2ray/v2ray-core/app/dns"
"github.com/v2ray/v2ray-core/app/router"
"github.com/v2ray/v2ray-core/common/collect"
"github.com/v2ray/v2ray-core/common/log"
v2net "github.com/v2ray/v2ray-core/common/net"
)
@@ -40,21 +43,56 @@ func (this *cacheEntry) Extend() {
type Router struct {
config *RouterRuleConfig
cache *collect.ValidityMap
space app.Space
}
func NewRouter(config *RouterRuleConfig) *Router {
func NewRouter(config *RouterRuleConfig, space app.Space) *Router {
return &Router{
config: config,
cache: collect.NewValidityMap(3600),
space: space,
}
}
// @Private
func (this *Router) ResolveIP(dest v2net.Destination) []v2net.Destination {
dnsServer := this.space.GetApp(dns.APP_ID).(dns.Server)
ips := dnsServer.Get(dest.Address().Domain())
if len(ips) == 0 {
return nil
}
dests := make([]v2net.Destination, len(ips))
for idx, ip := range ips {
if dest.IsTCP() {
dests[idx] = v2net.TCPDestination(v2net.IPAddress(ip), dest.Port())
} else {
dests[idx] = v2net.UDPDestination(v2net.IPAddress(ip), dest.Port())
}
}
return dests
}
func (this *Router) takeDetourWithoutCache(dest v2net.Destination) (string, error) {
for _, rule := range this.config.Rules {
if rule.Apply(dest) {
return rule.Tag, nil
}
}
if this.config.DomainStrategy == UseIPIfNonMatch && dest.Address().IsDomain() {
log.Info("Router: Looking up IP for ", dest)
ipDests := this.ResolveIP(dest)
if ipDests != nil {
for _, ipDest := range ipDests {
log.Info("Router: Trying IP ", ipDest)
for _, rule := range this.config.Rules {
if rule.Apply(ipDest) {
return rule.Tag, nil
}
}
}
}
}
return "", ErrorNoRuleApplicable
}
@@ -72,8 +110,8 @@ func (this *Router) TakeDetour(dest v2net.Destination) (string, error) {
type RouterFactory struct {
}
func (this *RouterFactory) Create(rawConfig interface{}) (router.Router, error) {
return NewRouter(rawConfig.(*RouterRuleConfig)), nil
func (this *RouterFactory) Create(rawConfig interface{}, space app.Space) (router.Router, error) {
return NewRouter(rawConfig.(*RouterRuleConfig), space), nil
}
func init() {

View File

@@ -21,7 +21,7 @@ func TestSimpleRouter(t *testing.T) {
},
}
router := NewRouter(config)
router := NewRouter(config, nil)
tag, err := router.TakeDetour(v2net.TCPDestination(v2net.DomainAddress("v2ray.com"), 80))
assert.Error(err).IsNil()

View File

@@ -20,7 +20,7 @@ var (
metadataCache = make(map[ID]ForContextCreator)
)
func RegisterApp(id ID, creator ForContextCreator) {
func Register(id ID, creator ForContextCreator) {
// TODO: check id
metadataCache[id] = creator
}

View File

@@ -69,12 +69,14 @@ func (b *Buffer) Bytes() []byte {
// Slice cuts the buffer at the given position.
func (b *Buffer) Slice(from, to int) *Buffer {
b.offset += from
b.Value = b.Value[from:to]
return b
}
// SliceFrom cuts the buffer at the given position.
func (b *Buffer) SliceFrom(from int) *Buffer {
b.offset += from
b.Value = b.Value[from:]
return b
}
@@ -121,9 +123,10 @@ func (b *Buffer) Read(data []byte) (int, error) {
}
nBytes := copy(data, b.Value)
if nBytes == b.Len() {
b.Value = b.Value[:0]
b.Clear()
} else {
b.Value = b.Value[nBytes:]
b.offset += nBytes
}
return nBytes, nil
}
@@ -132,7 +135,9 @@ func (b *Buffer) FillFrom(reader io.Reader) (int, error) {
begin := b.Len()
b.Value = b.Value[:cap(b.Value)]
nBytes, err := reader.Read(b.Value[begin:])
b.Value = b.Value[:begin+nBytes]
if err == nil {
b.Value = b.Value[:begin+nBytes]
}
return nBytes, err
}

View File

@@ -49,6 +49,11 @@ func (p *BufferPool) Free(buffer *Buffer) {
}
}
const (
BufferSize = 8*1024 - defaultOffset
LargeBufferSize = 64*1024 - defaultOffset
)
var smallPool = NewBufferPool(1024, 64)
var mediumPool = NewBufferPool(8*1024, 128)
var largePool = NewBufferPool(64*1024, 64)

View File

@@ -1,8 +1,9 @@
package alloc
package alloc_test
import (
"testing"
. "github.com/v2ray/v2ray-core/common/alloc"
v2testing "github.com/v2ray/v2ray-core/testing"
"github.com/v2ray/v2ray-core/testing/assert"
)

View File

@@ -35,16 +35,15 @@ func (this *BufferedWriter) Write(b []byte) (int, error) {
}
func (this *BufferedWriter) Flush() error {
nBytes, err := this.writer.Write(this.buffer.Value)
this.buffer.SliceFrom(nBytes)
if !this.buffer.IsEmpty() {
nBytes, err = this.writer.Write(this.buffer.Value)
defer this.buffer.Clear()
for !this.buffer.IsEmpty() {
nBytes, err := this.writer.Write(this.buffer.Value)
if err != nil {
return err
}
this.buffer.SliceFrom(nBytes)
}
if this.buffer.IsEmpty() {
this.buffer.Clear()
}
return err
return nil
}
func (this *BufferedWriter) Cached() bool {
@@ -59,6 +58,7 @@ func (this *BufferedWriter) SetCached(cached bool) {
}
func (this *BufferedWriter) Release() {
this.Flush()
this.buffer.Release()
this.buffer = nil
this.writer = nil

View File

@@ -29,7 +29,6 @@ type Reader interface {
type AdaptiveReader struct {
reader io.Reader
allocate func() *alloc.Buffer
isLarge bool
}
// NewAdaptiveReader creates a new AdaptiveReader.
@@ -38,7 +37,6 @@ func NewAdaptiveReader(reader io.Reader) *AdaptiveReader {
return &AdaptiveReader{
reader: reader,
allocate: alloc.NewBuffer,
isLarge: false,
}
}
@@ -46,12 +44,10 @@ func NewAdaptiveReader(reader io.Reader) *AdaptiveReader {
func (this *AdaptiveReader) Read() (*alloc.Buffer, error) {
buffer, err := ReadFrom(this.reader, this.allocate())
if buffer.IsFull() && !this.isLarge {
if buffer.Len() >= alloc.BufferSize {
this.allocate = alloc.NewLargeBuffer
this.isLarge = true
} else if !buffer.IsFull() {
} else {
this.allocate = alloc.NewBuffer
this.isLarge = false
}
if err != nil {

28
common/io/reader_test.go Normal file
View File

@@ -0,0 +1,28 @@
package io_test
import (
"bytes"
"testing"
"github.com/v2ray/v2ray-core/common/alloc"
. "github.com/v2ray/v2ray-core/common/io"
v2testing "github.com/v2ray/v2ray-core/testing"
"github.com/v2ray/v2ray-core/testing/assert"
)
func TestAdaptiveReader(t *testing.T) {
v2testing.Current(t)
rawContent := make([]byte, 1024*1024)
reader := NewAdaptiveReader(bytes.NewBuffer(rawContent))
b1, err := reader.Read()
assert.Error(err).IsNil()
assert.Bool(b1.IsFull()).IsTrue()
assert.Int(b1.Len()).Equals(alloc.BufferSize)
b2, err := reader.Read()
assert.Error(err).IsNil()
assert.Bool(b2.IsFull()).IsTrue()
assert.Int(b2.Len()).Equals(alloc.LargeBufferSize)
}

View File

@@ -1,16 +1,26 @@
package io
import (
"github.com/v2ray/v2ray-core/common/log"
)
func Pipe(reader Reader, writer Writer) error {
for {
buffer, err := reader.Read()
if buffer.Len() > 0 {
err = writer.Write(buffer)
} else {
buffer.Release()
if err != nil {
log.Debug("IO: Pipe exits as ", err)
return err
}
if buffer.IsEmpty() {
buffer.Release()
continue
}
err = writer.Write(buffer)
if err != nil {
return nil
log.Debug("IO: Pipe exits as ", err)
return err
}
}
}

View File

@@ -28,12 +28,15 @@ func NewAdaptiveWriter(writer io.Writer) *AdaptiveWriter {
// Write implements Writer.Write(). Write() takes ownership of the given buffer.
func (this *AdaptiveWriter) Write(buffer *alloc.Buffer) error {
nBytes, err := this.writer.Write(buffer.Value)
if nBytes < buffer.Len() {
_, err = this.writer.Write(buffer.Value[nBytes:])
defer buffer.Release()
for !buffer.IsEmpty() {
nBytes, err := this.writer.Write(buffer.Value)
if err != nil {
return err
}
buffer.SliceFrom(nBytes)
}
buffer.Release()
return err
return nil
}
func (this *AdaptiveWriter) Release() {

26
common/io/writer_test.go Normal file
View File

@@ -0,0 +1,26 @@
package io_test
import (
"bytes"
"crypto/rand"
"testing"
"github.com/v2ray/v2ray-core/common/alloc"
. "github.com/v2ray/v2ray-core/common/io"
v2testing "github.com/v2ray/v2ray-core/testing"
"github.com/v2ray/v2ray-core/testing/assert"
)
func TestAdaptiveWriter(t *testing.T) {
v2testing.Current(t)
lb := alloc.NewLargeBuffer()
rand.Read(lb.Value)
writeBuffer := make([]byte, 0, 1024*1024)
writer := NewAdaptiveWriter(NewBufferedWriter(bytes.NewBuffer(writeBuffer)))
err := writer.Write(lb)
assert.Error(err).IsNil()
assert.Bytes(lb.Bytes()).Equals(writeBuffer)
}

View File

@@ -1,7 +1,7 @@
package log
import (
"github.com/v2ray/v2ray-core/common/alloc"
"github.com/v2ray/v2ray-core/common/log/internal"
"github.com/v2ray/v2ray-core/common/serial"
)
@@ -14,32 +14,12 @@ const (
)
var (
accessLoggerInstance logWriter = &noOpLogWriter{}
accessLoggerInstance internal.LogWriter = new(internal.NoOpLogWriter)
)
type accessLog struct {
From serial.String
To serial.String
Status AccessStatus
Reason serial.String
}
func (this *accessLog) Release() {
this.From = nil
this.To = nil
this.Reason = nil
}
func (this *accessLog) String() string {
b := alloc.NewSmallBuffer().Clear()
defer b.Release()
return b.AppendString(this.From.String()).AppendString(" ").AppendString(string(this.Status)).AppendString(" ").AppendString(this.To.String()).AppendString(" ").AppendString(this.Reason.String()).String()
}
// InitAccessLogger initializes the access logger to write into the give file.
func InitAccessLogger(file string) error {
logger, err := newFileLogWriter(file)
logger, err := internal.NewFileLogWriter(file)
if err != nil {
Error("Failed to create access logger on file (", file, "): ", file, err)
return err
@@ -50,10 +30,10 @@ func InitAccessLogger(file string) error {
// Access writes an access log.
func Access(from, to serial.String, status AccessStatus, reason serial.String) {
accessLoggerInstance.Log(&accessLog{
accessLoggerInstance.Log(&internal.AccessLog{
From: from,
To: to,
Status: status,
Status: string(status),
Reason: reason,
})
}

View File

@@ -1,36 +0,0 @@
package log
import (
"io/ioutil"
"os"
"testing"
"time"
"github.com/v2ray/v2ray-core/common/serial"
v2testing "github.com/v2ray/v2ray-core/testing"
"github.com/v2ray/v2ray-core/testing/assert"
)
func TestAccessLog(t *testing.T) {
v2testing.Current(t)
filename := "/tmp/test_access_log.log"
InitAccessLogger(filename)
_, err := os.Stat(filename)
assert.Error(err).IsNil()
Access(serial.StringLiteral("test_from"), serial.StringLiteral("test_to"), AccessAccepted, serial.StringLiteral("test_reason"))
<-time.After(2 * time.Second)
accessLoggerInstance.(*fileLogWriter).close()
accessLoggerInstance = &noOpLogWriter{}
content, err := ioutil.ReadFile(filename)
assert.Error(err).IsNil()
contentStr := serial.StringLiteral(content)
assert.String(contentStr).Contains(serial.StringLiteral("test_from"))
assert.String(contentStr).Contains(serial.StringLiteral("test_to"))
assert.String(contentStr).Contains(serial.StringLiteral("test_reason"))
assert.String(contentStr).Contains(serial.StringLiteral("accepted"))
}

View File

@@ -0,0 +1,69 @@
package internal
import (
"fmt"
"github.com/v2ray/v2ray-core/common"
"github.com/v2ray/v2ray-core/common/alloc"
"github.com/v2ray/v2ray-core/common/serial"
)
type LogEntry interface {
common.Releasable
serial.String
}
type ErrorLog struct {
Prefix string
Values []interface{}
}
func (this *ErrorLog) Release() {
for index := range this.Values {
this.Values[index] = nil
}
this.Values = nil
}
func (this *ErrorLog) String() string {
b := alloc.NewSmallBuffer().Clear()
defer b.Release()
b.AppendString(this.Prefix)
for _, value := range this.Values {
switch typedVal := value.(type) {
case string:
b.AppendString(typedVal)
case *string:
b.AppendString(*typedVal)
case serial.String:
b.AppendString(typedVal.String())
case error:
b.AppendString(typedVal.Error())
default:
b.AppendString(fmt.Sprint(value))
}
}
return b.String()
}
type AccessLog struct {
From serial.String
To serial.String
Status string
Reason serial.String
}
func (this *AccessLog) Release() {
this.From = nil
this.To = nil
this.Reason = nil
}
func (this *AccessLog) String() string {
b := alloc.NewSmallBuffer().Clear()
defer b.Release()
return b.AppendString(this.From.String()).AppendString(" ").AppendString(this.Status).AppendString(" ").AppendString(this.To.String()).AppendString(" ").AppendString(this.Reason.String()).String()
}

View File

@@ -0,0 +1,27 @@
package internal_test
import (
"testing"
. "github.com/v2ray/v2ray-core/common/log/internal"
"github.com/v2ray/v2ray-core/common/serial"
v2testing "github.com/v2ray/v2ray-core/testing"
"github.com/v2ray/v2ray-core/testing/assert"
)
func TestAccessLog(t *testing.T) {
v2testing.Current(t)
entry := &AccessLog{
From: serial.StringLiteral("test_from"),
To: serial.StringLiteral("test_to"),
Status: "Accepted",
Reason: serial.StringLiteral("test_reason"),
}
entryStr := entry.String()
assert.StringLiteral(entryStr).Contains(serial.StringLiteral("test_from"))
assert.StringLiteral(entryStr).Contains(serial.StringLiteral("test_to"))
assert.StringLiteral(entryStr).Contains(serial.StringLiteral("test_reason"))
assert.StringLiteral(entryStr).Contains(serial.StringLiteral("Accepted"))
}

View File

@@ -0,0 +1,77 @@
package internal
import (
"log"
"os"
"github.com/v2ray/v2ray-core/common/platform"
)
type LogWriter interface {
Log(LogEntry)
}
type NoOpLogWriter struct {
}
func (this *NoOpLogWriter) Log(entry LogEntry) {
entry.Release()
}
type StdOutLogWriter struct {
logger *log.Logger
}
func NewStdOutLogWriter() LogWriter {
return &StdOutLogWriter{
logger: log.New(os.Stdout, "", log.Ldate|log.Ltime),
}
}
func (this *StdOutLogWriter) Log(log LogEntry) {
this.logger.Print(log.String() + platform.LineSeparator())
log.Release()
}
type FileLogWriter struct {
queue chan string
logger *log.Logger
file *os.File
}
func (this *FileLogWriter) Log(log LogEntry) {
select {
case this.queue <- log.String():
default:
// We don't expect this to happen, but don't want to block main thread as well.
}
log.Release()
}
func (this *FileLogWriter) run() {
for {
entry, open := <-this.queue
if !open {
break
}
this.logger.Print(entry + platform.LineSeparator())
}
}
func (this *FileLogWriter) Close() {
this.file.Close()
}
func NewFileLogWriter(path string) (*FileLogWriter, error) {
file, err := os.OpenFile(path, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return nil, err
}
logger := &FileLogWriter{
queue: make(chan string, 16),
logger: log.New(file, "", log.Ldate|log.Ltime),
file: file,
}
go logger.run()
return logger, nil
}

View File

@@ -0,0 +1 @@
package internal_test

View File

@@ -1,13 +1,11 @@
package log
import (
"fmt"
"github.com/v2ray/v2ray-core/common"
"github.com/v2ray/v2ray-core/common/alloc"
"github.com/v2ray/v2ray-core/common/serial"
"github.com/v2ray/v2ray-core/common/log/internal"
)
type LogLevel int
const (
DebugLevel = LogLevel(0)
InfoLevel = LogLevel(1)
@@ -16,82 +14,43 @@ const (
NoneLevel = LogLevel(999)
)
type LogEntry interface {
common.Releasable
serial.String
}
type errorLog struct {
prefix string
values []interface{}
}
func (this *errorLog) Release() {
for index := range this.values {
this.values[index] = nil
}
this.values = nil
}
func (this *errorLog) String() string {
b := alloc.NewSmallBuffer().Clear()
defer b.Release()
b.AppendString(this.prefix)
for _, value := range this.values {
switch typedVal := value.(type) {
case string:
b.AppendString(typedVal)
case *string:
b.AppendString(*typedVal)
case serial.String:
b.AppendString(typedVal.String())
case error:
b.AppendString(typedVal.Error())
default:
b.AppendString(fmt.Sprint(value))
}
}
return b.String()
}
var (
noOpLoggerInstance logWriter = &noOpLogWriter{}
streamLoggerInstance logWriter = newStdOutLogWriter()
streamLoggerInstance internal.LogWriter = internal.NewStdOutLogWriter()
debugLogger = noOpLoggerInstance
infoLogger = noOpLoggerInstance
warningLogger = streamLoggerInstance
errorLogger = streamLoggerInstance
debugLogger internal.LogWriter = streamLoggerInstance
infoLogger internal.LogWriter = streamLoggerInstance
warningLogger internal.LogWriter = streamLoggerInstance
errorLogger internal.LogWriter = streamLoggerInstance
)
type LogLevel int
func SetLogLevel(level LogLevel) {
debugLogger = noOpLoggerInstance
debugLogger = new(internal.NoOpLogWriter)
if level <= DebugLevel {
debugLogger = streamLoggerInstance
}
infoLogger = noOpLoggerInstance
infoLogger = new(internal.NoOpLogWriter)
if level <= InfoLevel {
infoLogger = streamLoggerInstance
}
warningLogger = noOpLoggerInstance
warningLogger = new(internal.NoOpLogWriter)
if level <= WarningLevel {
warningLogger = streamLoggerInstance
}
errorLogger = noOpLoggerInstance
errorLogger = new(internal.NoOpLogWriter)
if level <= ErrorLevel {
errorLogger = streamLoggerInstance
}
if level == NoneLevel {
accessLoggerInstance = new(internal.NoOpLogWriter)
}
}
func InitErrorLogger(file string) error {
logger, err := newFileLogWriter(file)
logger, err := internal.NewFileLogWriter(file)
if err != nil {
Error("Failed to create error logger on file (", file, "): ", err)
return err
@@ -102,32 +61,32 @@ func InitErrorLogger(file string) error {
// Debug outputs a debug log with given format and optional arguments.
func Debug(v ...interface{}) {
debugLogger.Log(&errorLog{
prefix: "[Debug]",
values: v,
debugLogger.Log(&internal.ErrorLog{
Prefix: "[Debug]",
Values: v,
})
}
// Info outputs an info log with given format and optional arguments.
func Info(v ...interface{}) {
infoLogger.Log(&errorLog{
prefix: "[Info]",
values: v,
infoLogger.Log(&internal.ErrorLog{
Prefix: "[Info]",
Values: v,
})
}
// Warning outputs a warning log with given format and optional arguments.
func Warning(v ...interface{}) {
warningLogger.Log(&errorLog{
prefix: "[Warning]",
values: v,
warningLogger.Log(&internal.ErrorLog{
Prefix: "[Warning]",
Values: v,
})
}
// Error outputs an error log with given format and optional arguments.
func Error(v ...interface{}) {
errorLogger.Log(&errorLog{
prefix: "[Error]",
values: v,
errorLogger.Log(&internal.ErrorLog{
Prefix: "[Error]",
Values: v,
})
}

View File

@@ -1,40 +0,0 @@
package log
import (
"bytes"
"log"
"testing"
"github.com/v2ray/v2ray-core/common/platform"
"github.com/v2ray/v2ray-core/common/serial"
v2testing "github.com/v2ray/v2ray-core/testing"
"github.com/v2ray/v2ray-core/testing/assert"
)
func TestLogLevelSetting(t *testing.T) {
v2testing.Current(t)
assert.Pointer(debugLogger).Equals(noOpLoggerInstance)
SetLogLevel(DebugLevel)
assert.Pointer(debugLogger).Equals(streamLoggerInstance)
SetLogLevel(InfoLevel)
assert.Pointer(debugLogger).Equals(noOpLoggerInstance)
assert.Pointer(infoLogger).Equals(streamLoggerInstance)
}
func TestStreamLogger(t *testing.T) {
v2testing.Current(t)
buffer := bytes.NewBuffer(make([]byte, 0, 1024))
infoLogger = &stdOutLogWriter{
logger: log.New(buffer, "", 0),
}
Info("Test ", "Stream Logger", " Format")
assert.StringLiteral(string(buffer.Bytes())).Equals("[Info]Test Stream Logger Format" + platform.LineSeparator())
buffer.Reset()
errorLogger = infoLogger
Error("Test ", serial.StringLiteral("literal"), " Format")
assert.StringLiteral(string(buffer.Bytes())).Equals("[Error]Test literal Format" + platform.LineSeparator())
}

View File

@@ -1,84 +0,0 @@
package log
import (
"io"
"log"
"os"
"github.com/v2ray/v2ray-core/common/platform"
)
func createLogger(writer io.Writer) *log.Logger {
return log.New(writer, "", log.Ldate|log.Ltime)
}
type logWriter interface {
Log(LogEntry)
}
type noOpLogWriter struct {
}
func (this *noOpLogWriter) Log(entry LogEntry) {
entry.Release()
}
type stdOutLogWriter struct {
logger *log.Logger
}
func newStdOutLogWriter() logWriter {
return &stdOutLogWriter{
logger: createLogger(os.Stdout),
}
}
func (this *stdOutLogWriter) Log(log LogEntry) {
this.logger.Print(log.String() + platform.LineSeparator())
log.Release()
}
type fileLogWriter struct {
queue chan LogEntry
logger *log.Logger
file *os.File
}
func (this *fileLogWriter) Log(log LogEntry) {
select {
case this.queue <- log:
default:
log.Release()
// We don't expect this to happen, but don't want to block main thread as well.
}
}
func (this *fileLogWriter) run() {
for {
entry, open := <-this.queue
if !open {
break
}
this.logger.Print(entry.String() + platform.LineSeparator())
entry.Release()
entry = nil
}
}
func (this *fileLogWriter) close() {
this.file.Close()
}
func newFileLogWriter(path string) (*fileLogWriter, error) {
file, err := os.OpenFile(path, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return nil, err
}
logger := &fileLogWriter{
queue: make(chan LogEntry, 16),
logger: log.New(file, "", log.Ldate|log.Ltime),
file: file,
}
go logger.run()
return logger, nil
}

View File

@@ -7,6 +7,10 @@ import (
"github.com/v2ray/v2ray-core/common/serial"
)
var (
LocalHostIP = IPAddress([]byte{127, 0, 0, 1})
)
// Address represents a network address to be communicated with. It may be an IP address or domain
// address, not both. This interface doesn't resolve IP address for a given domain.
type Address interface {

19
common/net/port_test.go Normal file
View File

@@ -0,0 +1,19 @@
package net_test
import (
"testing"
. "github.com/v2ray/v2ray-core/common/net"
v2testing "github.com/v2ray/v2ray-core/testing"
"github.com/v2ray/v2ray-core/testing/assert"
)
func TestPortRangeContains(t *testing.T) {
v2testing.Current(t)
portRange := &PortRange{
From: Port(53),
To: Port(53),
}
assert.Bool(portRange.Contains(Port(53))).IsTrue()
}

View File

@@ -0,0 +1,4 @@
package protocol
type Account interface {
}

View File

@@ -57,3 +57,15 @@ func NewID(uuid *uuid.UUID) *ID {
md5hash.Sum(id.cmdKey[:0])
return id
}
func NewAlterIDs(primary *ID, alterIDCount uint16) []*ID {
alterIDs := make([]*ID, alterIDCount)
prevID := primary.UUID()
for idx := range alterIDs {
newid := prevID.Next()
// TODO: check duplicates
alterIDs[idx] = NewID(newid)
prevID = newid
}
return alterIDs
}

View File

@@ -18,8 +18,8 @@ func TestRequestSerialization(t *testing.T) {
user := protocol.NewUser(
protocol.NewID(uuid.New()),
nil,
protocol.UserLevelUntrusted,
0,
"test@v2ray.com")
expectedRequest := &protocol.RequestHeader{

View File

@@ -18,23 +18,13 @@ type User struct {
Email string
}
func NewUser(id *ID, level UserLevel, alterIdCount uint16, email string) *User {
u := &User{
ID: id,
Level: level,
Email: email,
func NewUser(primary *ID, secondary []*ID, level UserLevel, email string) *User {
return &User{
ID: primary,
AlterIDs: secondary,
Level: level,
Email: email,
}
if alterIdCount > 0 {
u.AlterIDs = make([]*ID, alterIdCount)
prevId := id.UUID()
for idx := range u.AlterIDs {
newid := prevId.Next()
// TODO: check duplicate
u.AlterIDs[idx] = NewID(newid)
prevId = newid
}
}
return u
}
func (this *User) AnyValidID() *ID {
@@ -57,7 +47,3 @@ func GetUserSettings(level UserLevel) UserSettings {
}
return settings
}
type Account interface {
CryptionKey() []byte
}

View File

@@ -23,7 +23,9 @@ func (u *User) UnmarshalJSON(data []byte) error {
if err != nil {
return err
}
*u = *NewUser(NewID(id), UserLevel(rawUserValue.LevelByte), rawUserValue.AlterIdCount, rawUserValue.EmailString)
primaryID := NewID(id)
alterIDs := NewAlterIDs(primaryID, rawUserValue.AlterIdCount)
*u = *NewUser(primaryID, alterIDs, UserLevel(rawUserValue.LevelByte), rawUserValue.EmailString)
return nil
}

View File

@@ -6,7 +6,7 @@ import (
)
var (
errorRetryFailed = errors.New("All retry attempts failed.")
ErrorRetryFailed = errors.New("All retry attempts failed.")
)
// Strategy is a way to retry on a specific function.
@@ -29,7 +29,7 @@ func (r *retryer) On(method func() error) error {
}
delay := r.NextDelay(attempt)
if delay < 0 {
return errorRetryFailed
return ErrorRetryFailed
}
<-time.After(time.Duration(delay) * time.Millisecond)
attempt++

View File

@@ -1,10 +1,11 @@
package retry
package retry_test
import (
"errors"
"testing"
"time"
. "github.com/v2ray/v2ray-core/common/retry"
v2testing "github.com/v2ray/v2ray-core/testing"
"github.com/v2ray/v2ray-core/testing/assert"
)
@@ -76,6 +77,6 @@ func TestRetryExhausted(t *testing.T) {
})
duration := time.Since(startTime)
assert.Error(err).Equals(errorRetryFailed)
assert.Error(err).Equals(ErrorRetryFailed)
assert.Int64(int64(duration / time.Millisecond)).AtLeast(1900)
}

View File

@@ -95,19 +95,21 @@ func (this *DokodemoDoor) ListenUDP(port v2net.Port) error {
}
func (this *DokodemoDoor) handleUDPPackets(payload *alloc.Buffer, dest v2net.Destination) {
this.udpServer.Dispatch(dest, v2net.UDPDestination(this.address, this.port), payload, func(destination v2net.Destination, payload *alloc.Buffer) {
defer payload.Release()
this.udpMutex.RLock()
defer this.udpMutex.RUnlock()
if !this.accepting {
return
}
this.udpHub.WriteTo(payload.Value, destination)
})
this.udpServer.Dispatch(dest, v2net.UDPDestination(this.address, this.port), payload, this.handleUDPResponse)
}
func (this *DokodemoDoor) handleUDPResponse(dest v2net.Destination, payload *alloc.Buffer) {
defer payload.Release()
this.udpMutex.RLock()
defer this.udpMutex.RUnlock()
if !this.accepting {
return
}
this.udpHub.WriteTo(payload.Value, dest)
}
func (this *DokodemoDoor) ListenTCP(port v2net.Port) error {
tcpListener, err := hub.ListenTCP(port, this.HandleTCPConnection)
tcpListener, err := hub.ListenTCP(port, this.HandleTCPConnection, nil)
if err != nil {
log.Error("Dokodemo: Failed to listen on port ", port, ": ", err)
return err
@@ -118,7 +120,7 @@ func (this *DokodemoDoor) ListenTCP(port v2net.Port) error {
return nil
}
func (this *DokodemoDoor) HandleTCPConnection(conn hub.Connection) {
func (this *DokodemoDoor) HandleTCPConnection(conn *hub.Connection) {
defer conn.Close()
ray := this.packetDispatcher.DispatchToOutbound(v2net.TCPDestination(this.address, this.port))
@@ -129,6 +131,8 @@ func (this *DokodemoDoor) HandleTCPConnection(conn hub.Connection) {
outputFinish.Lock()
reader := v2net.NewTimeOutReader(this.config.Timeout, conn)
defer reader.Release()
go func() {
v2reader := v2io.NewAdaptiveReader(reader)
defer v2reader.Release()
@@ -147,4 +151,5 @@ func (this *DokodemoDoor) HandleTCPConnection(conn hub.Connection) {
}()
outputFinish.Lock()
inputFinish.Lock()
}

View File

@@ -22,6 +22,7 @@ func (this *FreedomConnection) Dispatch(destination v2net.Destination, payload *
defer payload.Release()
defer ray.OutboundInput().Release()
defer ray.OutboundOutput().Close()
var conn net.Conn
err := retry.Timed(5, 100).On(func() error {
@@ -56,7 +57,6 @@ func (this *FreedomConnection) Dispatch(destination v2net.Destination, payload *
go func() {
defer readMutex.Unlock()
defer output.Close()
var reader io.Reader = conn
@@ -68,6 +68,7 @@ func (this *FreedomConnection) Dispatch(destination v2net.Destination, payload *
defer v2reader.Release()
v2io.Pipe(v2reader, output)
ray.OutboundOutput().Close()
}()
writeMutex.Lock()

View File

@@ -34,7 +34,7 @@ func TestSinglePacket(t *testing.T) {
data2Send := "Data to be sent to remote"
payload := alloc.NewSmallBuffer().Clear().Append([]byte(data2Send))
go freedom.Dispatch(v2net.TCPDestination(v2net.IPAddress([]byte{127, 0, 0, 1}), port), payload, traffic)
go freedom.Dispatch(v2net.TCPDestination(v2net.LocalHostIP, port), payload, traffic)
traffic.InboundInput().Close()
respPayload, err := traffic.InboundOutput().Read()

1
proxy/http/client.go Normal file
View File

@@ -0,0 +1 @@
package http

View File

@@ -1,13 +1,38 @@
package http
import (
"crypto/tls"
v2net "github.com/v2ray/v2ray-core/common/net"
)
type CertificateConfig struct {
Domain string
Certificate tls.Certificate
}
type TlsConfig struct {
Enabled bool
CertFile string
KeyFile string
Enabled bool
Certs []*CertificateConfig
}
func (this *TlsConfig) GetConfig() *tls.Config {
if !this.Enabled {
return nil
}
config := &tls.Config{
InsecureSkipVerify: false,
}
config.Certificates = make([]tls.Certificate, len(this.Certs))
for index, cert := range this.Certs {
config.Certificates[index] = cert.Certificate
}
config.BuildNameToCertificate()
return config
}
type Config struct {

View File

@@ -3,17 +3,37 @@
package http
import (
"crypto/tls"
"encoding/json"
v2net "github.com/v2ray/v2ray-core/common/net"
"github.com/v2ray/v2ray-core/proxy/internal/config"
)
func (this *CertificateConfig) UnmarshalJSON(data []byte) error {
type JsonConfig struct {
Domain string `json:"domain"`
CertFile string `json:"cert"`
KeyFile string `json:"key"`
}
jsonConfig := new(JsonConfig)
if err := json.Unmarshal(data, jsonConfig); err != nil {
return err
}
cert, err := tls.LoadX509KeyPair(jsonConfig.CertFile, jsonConfig.KeyFile)
if err != nil {
return err
}
this.Domain = jsonConfig.Domain
this.Certificate = cert
return nil
}
func (this *TlsConfig) UnmarshalJSON(data []byte) error {
type JsonConfig struct {
Enabled bool
CertFile string
KeyFile string
Enabled bool `json:"enable"`
Certs []*CertificateConfig `json:"certs"`
}
jsonConfig := new(JsonConfig)
if err := json.Unmarshal(data, jsonConfig); err != nil {
@@ -21,8 +41,7 @@ func (this *TlsConfig) UnmarshalJSON(data []byte) error {
}
this.Enabled = jsonConfig.Enabled
this.CertFile = jsonConfig.CertFile
this.KeyFile = jsonConfig.KeyFile
this.Certs = jsonConfig.Certs
return nil
}

View File

@@ -1,20 +0,0 @@
package http
import (
"github.com/v2ray/v2ray-core/app"
"github.com/v2ray/v2ray-core/app/dispatcher"
"github.com/v2ray/v2ray-core/proxy"
"github.com/v2ray/v2ray-core/proxy/internal"
)
func init() {
internal.MustRegisterInboundHandlerCreator("http",
func(space app.Space, rawConfig interface{}) (proxy.InboundHandler, error) {
if !space.HasApp(dispatcher.APP_ID) {
return nil, internal.ErrorBadConfiguration
}
return NewHttpProxyServer(
rawConfig.(*Config),
space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher)), nil
})
}

View File

@@ -2,6 +2,7 @@ package http
import (
"bufio"
"crypto/tls"
"io"
"net"
"net/http"
@@ -9,6 +10,7 @@ import (
"strings"
"sync"
"github.com/v2ray/v2ray-core/app"
"github.com/v2ray/v2ray-core/app/dispatcher"
"github.com/v2ray/v2ray-core/common/alloc"
v2io "github.com/v2ray/v2ray-core/common/io"
@@ -16,6 +18,7 @@ import (
v2net "github.com/v2ray/v2ray-core/common/net"
"github.com/v2ray/v2ray-core/common/serial"
"github.com/v2ray/v2ray-core/proxy"
"github.com/v2ray/v2ray-core/proxy/internal"
"github.com/v2ray/v2ray-core/transport/hub"
"github.com/v2ray/v2ray-core/transport/ray"
)
@@ -60,7 +63,11 @@ func (this *HttpProxyServer) Listen(port v2net.Port) error {
}
this.listeningPort = port
tcpListener, err := hub.ListenTCP(port, this.handleConnection)
var tlsConfig *tls.Config = nil
if this.config.TlsConfig != nil {
tlsConfig = this.config.TlsConfig.GetConfig()
}
tcpListener, err := hub.ListenTCP(port, this.handleConnection, tlsConfig)
if err != nil {
log.Error("Http: Failed listen on port ", port, ": ", err)
return err
@@ -95,7 +102,7 @@ func parseHost(rawHost string, defaultPort v2net.Port) (v2net.Destination, error
return v2net.TCPDestination(v2net.DomainAddress(host), port), nil
}
func (this *HttpProxyServer) handleConnection(conn hub.Connection) {
func (this *HttpProxyServer) handleConnection(conn *hub.Connection) {
defer conn.Close()
reader := bufio.NewReader(conn)
@@ -247,3 +254,15 @@ func (this *HttpProxyServer) handlePlainHTTP(request *http.Request, dest v2net.D
}()
wg.Wait()
}
func init() {
internal.MustRegisterInboundHandlerCreator("http",
func(space app.Space, rawConfig interface{}) (proxy.InboundHandler, error) {
if !space.HasApp(dispatcher.APP_ID) {
return nil, internal.ErrorBadConfiguration
}
return NewHttpProxyServer(
rawConfig.(*Config),
space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher)), nil
})
}

View File

@@ -21,7 +21,7 @@ func TestNormalRequestParsing(t *testing.T) {
request, err := ReadRequest(buffer, nil, false)
assert.Error(err).IsNil()
netassert.Address(request.Address).Equals(v2net.IPAddress([]byte{127, 0, 0, 1}))
netassert.Address(request.Address).Equals(v2net.LocalHostIP)
netassert.Port(request.Port).Equals(v2net.Port(80))
assert.Bool(request.OTA).IsFalse()
}
@@ -110,7 +110,7 @@ func TestUDPRequestParsing(t *testing.T) {
request, err := ReadRequest(buffer, nil, true)
assert.Error(err).IsNil()
netassert.Address(request.Address).Equals(v2net.IPAddress([]byte{127, 0, 0, 1}))
netassert.Address(request.Address).Equals(v2net.LocalHostIP)
netassert.Port(request.Port).Equals(v2net.Port(80))
assert.Bool(request.OTA).IsFalse()
assert.Bytes(request.UDPPayload.Value).Equals([]byte{1, 2, 3, 4, 5, 6})

View File

@@ -20,7 +20,7 @@ import (
"github.com/v2ray/v2ray-core/transport/hub"
)
type Shadowsocks struct {
type Server struct {
packetDispatcher dispatcher.PacketDispatcher
config *Config
port v2net.Port
@@ -30,18 +30,18 @@ type Shadowsocks struct {
udpServer *hub.UDPServer
}
func NewShadowsocks(config *Config, packetDispatcher dispatcher.PacketDispatcher) *Shadowsocks {
return &Shadowsocks{
func NewServer(config *Config, packetDispatcher dispatcher.PacketDispatcher) *Server {
return &Server{
config: config,
packetDispatcher: packetDispatcher,
}
}
func (this *Shadowsocks) Port() v2net.Port {
func (this *Server) Port() v2net.Port {
return this.port
}
func (this *Shadowsocks) Close() {
func (this *Server) Close() {
this.accepting = false
// TODO: synchronization
if this.tcpHub != nil {
@@ -56,7 +56,7 @@ func (this *Shadowsocks) Close() {
}
func (this *Shadowsocks) Listen(port v2net.Port) error {
func (this *Server) Listen(port v2net.Port) error {
if this.accepting {
if this.port == port {
return nil
@@ -65,7 +65,7 @@ func (this *Shadowsocks) Listen(port v2net.Port) error {
}
}
tcpHub, err := hub.ListenTCP(port, this.handleConnection)
tcpHub, err := hub.ListenTCP(port, this.handleConnection, nil)
if err != nil {
log.Error("Shadowsocks: Failed to listen TCP on port ", port, ": ", err)
return err
@@ -88,7 +88,7 @@ func (this *Shadowsocks) Listen(port v2net.Port) error {
return nil
}
func (this *Shadowsocks) handlerUDPPayload(payload *alloc.Buffer, source v2net.Destination) {
func (this *Server) handlerUDPPayload(payload *alloc.Buffer, source v2net.Destination) {
defer payload.Release()
ivLen := this.config.Cipher.IVSize()
@@ -110,7 +110,7 @@ func (this *Shadowsocks) handlerUDPPayload(payload *alloc.Buffer, source v2net.D
log.Warning("Shadowsocks: Invalid request from ", source, ": ", err)
return
}
defer request.Release()
//defer request.Release()
dest := v2net.UDPDestination(request.Address, request.Port)
log.Access(source, dest, log.AccessAccepted, serial.StringLiteral(""))
@@ -157,7 +157,7 @@ func (this *Shadowsocks) handlerUDPPayload(payload *alloc.Buffer, source v2net.D
})
}
func (this *Shadowsocks) handleConnection(conn hub.Connection) {
func (this *Server) handleConnection(conn *hub.Connection) {
defer conn.Close()
buffer := alloc.NewSmallBuffer()
@@ -205,6 +205,7 @@ func (this *Shadowsocks) handleConnection(conn hub.Connection) {
log.Info("Shadowsocks: Tunnelling request to ", dest)
ray := this.packetDispatcher.DispatchToOutbound(dest)
defer ray.InboundOutput().Release()
var writeFinish sync.Mutex
writeFinish.Lock()
@@ -227,7 +228,6 @@ func (this *Shadowsocks) handleConnection(conn hub.Connection) {
v2writer := v2io.NewAdaptiveWriter(writer)
v2io.Pipe(ray.InboundOutput(), v2writer)
ray.InboundOutput().Release()
writer.Release()
v2writer.Release()
}
@@ -255,7 +255,7 @@ func init() {
if !space.HasApp(dispatcher.APP_ID) {
return nil, internal.ErrorBadConfiguration
}
return NewShadowsocks(
return NewServer(
rawConfig.(*Config),
space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher)), nil
})

View File

@@ -56,7 +56,7 @@ func init() {
if rawConfig.Host != nil {
socksConfig.Address = rawConfig.Host.Address
} else {
socksConfig.Address = v2net.IPAddress([]byte{127, 0, 0, 1})
socksConfig.Address = v2net.LocalHostIP
}
return socksConfig, nil
})

View File

@@ -77,7 +77,7 @@ func (this *Server) Listen(port v2net.Port) error {
}
this.listeningPort = port
listener, err := hub.ListenTCP(port, this.handleConnection)
listener, err := hub.ListenTCP(port, this.handleConnection, nil)
if err != nil {
log.Error("Socks: failed to listen on port ", port, ": ", err)
return err
@@ -92,7 +92,7 @@ func (this *Server) Listen(port v2net.Port) error {
return nil
}
func (this *Server) handleConnection(connection hub.Connection) {
func (this *Server) handleConnection(connection *hub.Connection) {
defer connection.Close()
timedReader := v2net.NewTimeOutReader(120, connection)

View File

@@ -1,7 +1,7 @@
package inbound
import (
proto "github.com/v2ray/v2ray-core/common/protocol"
"github.com/v2ray/v2ray-core/common/protocol"
)
type DetourConfig struct {
@@ -14,11 +14,11 @@ type FeaturesConfig struct {
type DefaultConfig struct {
AlterIDs uint16
Level proto.UserLevel
Level protocol.UserLevel
}
type Config struct {
AllowedUsers []*proto.User
AllowedUsers []*protocol.User
Features *FeaturesConfig
Defaults *DefaultConfig
DetourConfig *DetourConfig

View File

@@ -5,7 +5,7 @@ package inbound
import (
"encoding/json"
proto "github.com/v2ray/v2ray-core/common/protocol"
"github.com/v2ray/v2ray-core/common/protocol"
"github.com/v2ray/v2ray-core/proxy/internal/config"
)
@@ -46,16 +46,16 @@ func (this *DefaultConfig) UnmarshalJSON(data []byte) error {
if this.AlterIDs == 0 {
this.AlterIDs = 32
}
this.Level = proto.UserLevel(jsonConfig.Level)
this.Level = protocol.UserLevel(jsonConfig.Level)
return nil
}
func (this *Config) UnmarshalJSON(data []byte) error {
type JsonConfig struct {
Users []*proto.User `json:"clients"`
Features *FeaturesConfig `json:"features"`
Defaults *DefaultConfig `json:"default"`
DetourConfig *DetourConfig `json:"detour"`
Users []*protocol.User `json:"clients"`
Features *FeaturesConfig `json:"features"`
Defaults *DefaultConfig `json:"default"`
DetourConfig *DetourConfig `json:"detour"`
}
jsonConfig := new(JsonConfig)
if err := json.Unmarshal(data, jsonConfig); err != nil {
@@ -66,7 +66,7 @@ func (this *Config) UnmarshalJSON(data []byte) error {
this.Defaults = jsonConfig.Defaults
if this.Defaults == nil {
this.Defaults = &DefaultConfig{
Level: proto.UserLevel(0),
Level: protocol.UserLevel(0),
AlterIDs: 32,
}
}

View File

@@ -10,8 +10,8 @@ import (
v2io "github.com/v2ray/v2ray-core/common/io"
"github.com/v2ray/v2ray-core/common/log"
v2net "github.com/v2ray/v2ray-core/common/net"
proto "github.com/v2ray/v2ray-core/common/protocol"
raw "github.com/v2ray/v2ray-core/common/protocol/raw"
"github.com/v2ray/v2ray-core/common/protocol"
"github.com/v2ray/v2ray-core/common/protocol/raw"
"github.com/v2ray/v2ray-core/common/serial"
"github.com/v2ray/v2ray-core/common/uuid"
"github.com/v2ray/v2ray-core/proxy"
@@ -22,13 +22,13 @@ import (
type userByEmail struct {
sync.RWMutex
cache map[string]*proto.User
defaultLevel proto.UserLevel
cache map[string]*protocol.User
defaultLevel protocol.UserLevel
defaultAlterIDs uint16
}
func NewUserByEmail(users []*proto.User, config *DefaultConfig) *userByEmail {
cache := make(map[string]*proto.User)
func NewUserByEmail(users []*protocol.User, config *DefaultConfig) *userByEmail {
cache := make(map[string]*protocol.User)
for _, user := range users {
cache[user.Email] = user
}
@@ -39,8 +39,8 @@ func NewUserByEmail(users []*proto.User, config *DefaultConfig) *userByEmail {
}
}
func (this *userByEmail) Get(email string) (*proto.User, bool) {
var user *proto.User
func (this *userByEmail) Get(email string) (*protocol.User, bool) {
var user *protocol.User
var found bool
this.RLock()
user, found = this.cache[email]
@@ -49,8 +49,9 @@ func (this *userByEmail) Get(email string) (*proto.User, bool) {
this.Lock()
user, found = this.cache[email]
if !found {
id := proto.NewID(uuid.New())
user = proto.NewUser(id, this.defaultLevel, this.defaultAlterIDs, email)
id := protocol.NewID(uuid.New())
alterIDs := protocol.NewAlterIDs(id, this.defaultAlterIDs)
user = protocol.NewUser(id, alterIDs, this.defaultLevel, email)
this.cache[email] = user
}
this.Unlock()
@@ -63,7 +64,7 @@ type VMessInboundHandler struct {
sync.Mutex
packetDispatcher dispatcher.PacketDispatcher
inboundHandlerManager proxyman.InboundHandlerManager
clients proto.UserValidator
clients protocol.UserValidator
usersByEmail *userByEmail
accepting bool
listener *hub.TCPHub
@@ -85,7 +86,7 @@ func (this *VMessInboundHandler) Close() {
}
}
func (this *VMessInboundHandler) GetUser(email string) *proto.User {
func (this *VMessInboundHandler) GetUser(email string) *protocol.User {
user, existing := this.usersByEmail.Get(email)
if !existing {
this.clients.Add(user)
@@ -103,7 +104,7 @@ func (this *VMessInboundHandler) Listen(port v2net.Port) error {
}
this.listeningPort = port
tcpListener, err := hub.ListenTCP(port, this.HandleConnection)
tcpListener, err := hub.ListenTCP(port, this.HandleConnection, nil)
if err != nil {
log.Error("Unable to listen tcp port ", port, ": ", err)
return err
@@ -115,7 +116,7 @@ func (this *VMessInboundHandler) Listen(port v2net.Port) error {
return nil
}
func (this *VMessInboundHandler) HandleConnection(connection hub.Connection) {
func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) {
defer connection.Close()
connReader := v2net.NewTimeOutReader(16, connection)
@@ -139,16 +140,16 @@ func (this *VMessInboundHandler) HandleConnection(connection hub.Connection) {
ray := this.packetDispatcher.DispatchToOutbound(request.Destination())
input := ray.InboundInput()
output := ray.InboundOutput()
var readFinish, writeFinish sync.Mutex
readFinish.Lock()
writeFinish.Lock()
defer input.Close()
defer output.Release()
userSettings := proto.GetUserSettings(request.User.Level)
var readFinish sync.Mutex
readFinish.Lock()
userSettings := protocol.GetUserSettings(request.User.Level)
connReader.SetTimeOut(userSettings.PayloadReadTimeout)
reader.SetCached(false)
go func() {
defer input.Close()
defer readFinish.Unlock()
bodyReader := session.DecodeRequestBody(reader)
var requestReader v2io.Reader
if request.Option.IsChunkStream() {
@@ -158,12 +159,14 @@ func (this *VMessInboundHandler) HandleConnection(connection hub.Connection) {
}
v2io.Pipe(requestReader, input)
requestReader.Release()
input.Close()
readFinish.Unlock()
}()
writer := v2io.NewBufferedWriter(connection)
defer writer.Release()
response := &proto.ResponseHeader{
response := &protocol.ResponseHeader{
Command: this.generateCommand(request),
}
@@ -173,31 +176,24 @@ func (this *VMessInboundHandler) HandleConnection(connection hub.Connection) {
// Optimize for small response packet
if data, err := output.Read(); err == nil {
var v2writer v2io.Writer = v2io.NewAdaptiveWriter(bodyWriter)
if request.Option.IsChunkStream() {
vmessio.Authenticate(data)
v2writer = vmessio.NewAuthChunkWriter(v2writer)
}
bodyWriter.Write(data.Value)
data.Release()
v2writer.Write(data)
writer.SetCached(false)
go func(finish *sync.Mutex) {
var writer v2io.Writer = v2io.NewAdaptiveWriter(bodyWriter)
if request.Option.IsChunkStream() {
writer = vmessio.NewAuthChunkWriter(writer)
}
v2io.Pipe(output, writer)
if request.Option.IsChunkStream() {
writer.Write(alloc.NewSmallBuffer().Clear())
}
output.Release()
writer.Release()
finish.Unlock()
}(&writeFinish)
writeFinish.Lock()
v2io.Pipe(output, v2writer)
output.Release()
if request.Option.IsChunkStream() {
v2writer.Write(alloc.NewSmallBuffer().Clear())
}
v2writer.Release()
}
readFinish.Lock()
writeFinish.Lock()
}
func init() {
@@ -208,7 +204,7 @@ func init() {
}
config := rawConfig.(*Config)
allowedClients := proto.NewTimedUserValidator(proto.DefaultIDHash)
allowedClients := protocol.NewTimedUserValidator(protocol.DefaultIDHash)
for _, user := range config.AllowedUsers {
allowedClients.Add(user)
}

109
proxy/vmess/io/io_test.go Normal file
View File

@@ -0,0 +1,109 @@
package io_test
import (
"bytes"
"crypto/rand"
"io"
"testing"
"github.com/v2ray/v2ray-core/common/alloc"
v2io "github.com/v2ray/v2ray-core/common/io"
. "github.com/v2ray/v2ray-core/proxy/vmess/io"
v2testing "github.com/v2ray/v2ray-core/testing"
"github.com/v2ray/v2ray-core/testing/assert"
)
func TestAuthenticate(t *testing.T) {
v2testing.Current(t)
buffer := alloc.NewBuffer().Clear()
buffer.AppendBytes(1, 2, 3, 4)
Authenticate(buffer)
assert.Bytes(buffer.Value).Equals([]byte{0, 8, 87, 52, 168, 125, 1, 2, 3, 4})
b2, err := NewAuthChunkReader(buffer).Read()
assert.Error(err).IsNil()
assert.Bytes(b2.Value).Equals([]byte{1, 2, 3, 4})
}
func TestSingleIO(t *testing.T) {
v2testing.Current(t)
content := bytes.NewBuffer(make([]byte, 0, 1024*1024))
writer := NewAuthChunkWriter(v2io.NewAdaptiveWriter(content))
writer.Write(alloc.NewBuffer().Clear().AppendString("abcd"))
writer.Release()
reader := NewAuthChunkReader(content)
buffer, err := reader.Read()
assert.Error(err).IsNil()
assert.Bytes(buffer.Value).Equals([]byte("abcd"))
}
func TestLargeIO(t *testing.T) {
v2testing.Current(t)
content := make([]byte, 1024*1024)
rand.Read(content)
chunckContent := bytes.NewBuffer(make([]byte, 0, len(content)*2))
writer := NewAuthChunkWriter(v2io.NewAdaptiveWriter(chunckContent))
writeSize := 0
for {
chunkSize := 7 * 1024
if chunkSize+writeSize > len(content) {
chunkSize = len(content) - writeSize
}
writer.Write(alloc.NewBuffer().Clear().Append(content[writeSize : writeSize+chunkSize]))
writeSize += chunkSize
if writeSize == len(content) {
break
}
chunkSize = 8 * 1024
if chunkSize+writeSize > len(content) {
chunkSize = len(content) - writeSize
}
writer.Write(alloc.NewLargeBuffer().Clear().Append(content[writeSize : writeSize+chunkSize]))
writeSize += chunkSize
if writeSize == len(content) {
break
}
chunkSize = 63 * 1024
if chunkSize+writeSize > len(content) {
chunkSize = len(content) - writeSize
}
writer.Write(alloc.NewLargeBuffer().Clear().Append(content[writeSize : writeSize+chunkSize]))
writeSize += chunkSize
if writeSize == len(content) {
break
}
chunkSize = 64*1024 - 16
if chunkSize+writeSize > len(content) {
chunkSize = len(content) - writeSize
}
writer.Write(alloc.NewLargeBuffer().Clear().Append(content[writeSize : writeSize+chunkSize]))
writeSize += chunkSize
if writeSize == len(content) {
break
}
}
writer.Release()
actualContent := make([]byte, 0, len(content))
reader := NewAuthChunkReader(chunckContent)
for {
buffer, err := reader.Read()
if err == io.EOF {
break
}
assert.Error(err).IsNil()
actualContent = append(actualContent, buffer.Value...)
}
assert.Int(len(actualContent)).Equals(len(content))
assert.Bytes(actualContent).Equals(content)
}

View File

@@ -1,58 +1,121 @@
package io
import (
"hash"
"hash/fnv"
"io"
"github.com/v2ray/v2ray-core/common/alloc"
"github.com/v2ray/v2ray-core/common/log"
"github.com/v2ray/v2ray-core/common/serial"
"github.com/v2ray/v2ray-core/transport"
)
// @Private
func AllocBuffer(size int) *alloc.Buffer {
if size < 8*1024-16 {
return alloc.NewBuffer()
}
return alloc.NewLargeBuffer()
}
// @Private
type Validator struct {
actualAuth hash.Hash32
expectedAuth uint32
}
func NewValidator(expectedAuth uint32) *Validator {
return &Validator{
actualAuth: fnv.New32a(),
expectedAuth: expectedAuth,
}
}
func (this *Validator) Consume(b []byte) {
this.actualAuth.Write(b)
}
func (this *Validator) Validate() bool {
log.Debug("VMess Reader: Expected auth ", this.expectedAuth, " actual auth: ", this.actualAuth.Sum32())
return this.actualAuth.Sum32() == this.expectedAuth
}
type AuthChunkReader struct {
reader io.Reader
reader io.Reader
last *alloc.Buffer
chunkLength int
validator *Validator
}
func NewAuthChunkReader(reader io.Reader) *AuthChunkReader {
return &AuthChunkReader{
reader: reader,
reader: reader,
chunkLength: -1,
}
}
func (this *AuthChunkReader) Read() (*alloc.Buffer, error) {
buffer := alloc.NewBuffer()
if _, err := io.ReadFull(this.reader, buffer.Value[:2]); err != nil {
buffer.Release()
return nil, err
var buffer *alloc.Buffer
if this.last != nil {
buffer = this.last
this.last = nil
} else {
buffer = AllocBuffer(this.chunkLength).Clear()
}
length := serial.BytesLiteral(buffer.Value[:2]).Uint16Value()
if length <= 4 { // Length of authentication bytes.
if this.chunkLength == -1 {
for buffer.Len() < 6 {
_, err := buffer.FillFrom(this.reader)
if err != nil {
buffer.Release()
return nil, err
}
}
log.Debug("VMess Reader: raw buffer: ", buffer.Value)
length := serial.BytesLiteral(buffer.Value[:2]).Uint16Value()
this.chunkLength = int(length) - 4
this.validator = NewValidator(serial.BytesLiteral(buffer.Value[2:6]).Uint32Value())
buffer.SliceFrom(6)
} else if buffer.Len() < this.chunkLength {
_, err := buffer.FillFrom(this.reader)
if err != nil {
buffer.Release()
return nil, err
}
}
if this.chunkLength == 0 {
buffer.Release()
return nil, io.EOF
}
if length > 8*1024 {
buffer.Release()
buffer = alloc.NewLargeBuffer()
}
buffer.SliceBack(16)
if _, err := io.ReadFull(this.reader, buffer.Value[:length]); err != nil {
buffer.Release()
return nil, err
}
buffer.Slice(0, int(length))
fnvHash := fnv.New32a()
fnvHash.Write(buffer.Value[4:])
expAuth := serial.BytesLiteral(fnvHash.Sum(nil))
actualAuth := serial.BytesLiteral(buffer.Value[:4])
if !actualAuth.Equals(expAuth) {
buffer.Release()
return nil, transport.ErrorCorruptedPacket
if buffer.Len() < this.chunkLength {
this.validator.Consume(buffer.Value)
this.chunkLength -= buffer.Len()
} else {
this.validator.Consume(buffer.Value[:this.chunkLength])
if !this.validator.Validate() {
buffer.Release()
return nil, transport.ErrorCorruptedPacket
}
leftLength := buffer.Len() - this.chunkLength
if leftLength > 0 {
this.last = AllocBuffer(leftLength).Clear()
this.last.Append(buffer.Value[this.chunkLength:])
buffer.Slice(0, this.chunkLength)
}
this.chunkLength = -1
this.validator = nil
}
buffer.SliceFrom(4)
return buffer, nil
}
func (this *AuthChunkReader) Release() {
this.reader = nil
this.last.Release()
this.last = nil
this.validator = nil
}

View File

@@ -1,23 +0,0 @@
package io_test
import (
"testing"
"github.com/v2ray/v2ray-core/common/alloc"
. "github.com/v2ray/v2ray-core/proxy/vmess/io"
v2testing "github.com/v2ray/v2ray-core/testing"
"github.com/v2ray/v2ray-core/testing/assert"
)
func TestAuthenticate(t *testing.T) {
v2testing.Current(t)
buffer := alloc.NewBuffer().Clear()
buffer.AppendBytes(1, 2, 3, 4)
Authenticate(buffer)
assert.Bytes(buffer.Value).Equals([]byte{0, 8, 87, 52, 168, 125, 1, 2, 3, 4})
b2, err := NewAuthChunkReader(buffer).Read()
assert.Error(err).IsNil()
assert.Bytes(b2.Value).Equals([]byte{1, 2, 3, 4})
}

View File

@@ -3,11 +3,12 @@ package outbound
import (
v2net "github.com/v2ray/v2ray-core/common/net"
"github.com/v2ray/v2ray-core/common/protocol"
proto "github.com/v2ray/v2ray-core/common/protocol"
)
func (this *VMessOutboundHandler) handleSwitchAccount(cmd *protocol.CommandSwitchAccount) {
user := proto.NewUser(proto.NewID(cmd.ID), cmd.Level, cmd.AlterIds.Value(), "")
primary := protocol.NewID(cmd.ID)
alters := protocol.NewAlterIDs(primary, cmd.AlterIds.Value())
user := protocol.NewUser(primary, alters, cmd.Level, "")
dest := v2net.TCPDestination(cmd.Host, cmd.Port)
this.receiverManager.AddDetour(NewReceiver(dest, user), cmd.ValidMin)
}

View File

@@ -9,8 +9,8 @@ import (
v2io "github.com/v2ray/v2ray-core/common/io"
"github.com/v2ray/v2ray-core/common/log"
v2net "github.com/v2ray/v2ray-core/common/net"
proto "github.com/v2ray/v2ray-core/common/protocol"
raw "github.com/v2ray/v2ray-core/common/protocol/raw"
"github.com/v2ray/v2ray-core/common/protocol"
"github.com/v2ray/v2ray-core/common/protocol/raw"
"github.com/v2ray/v2ray-core/proxy"
"github.com/v2ray/v2ray-core/proxy/internal"
vmessio "github.com/v2ray/v2ray-core/proxy/vmess/io"
@@ -23,27 +23,27 @@ type VMessOutboundHandler struct {
}
func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *alloc.Buffer, ray ray.OutboundRay) error {
defer ray.OutboundInput().Release()
defer ray.OutboundOutput().Close()
destination, vNextUser := this.receiverManager.PickReceiver()
command := proto.RequestCommandTCP
command := protocol.RequestCommandTCP
if target.IsUDP() {
command = proto.RequestCommandUDP
command = protocol.RequestCommandUDP
}
request := &proto.RequestHeader{
request := &protocol.RequestHeader{
Version: raw.Version,
User: vNextUser,
Command: command,
Address: target.Address(),
Port: target.Port(),
Option: proto.RequestOptionChunkStream,
Option: protocol.RequestOptionChunkStream,
}
conn, err := dialer.Dial(destination)
if err != nil {
log.Error("Failed to open ", destination, ": ", err)
if ray != nil {
ray.OutboundOutput().Close()
}
return err
}
log.Info("VMessOut: Tunneling request to ", request.Address, " via ", destination)
@@ -57,39 +57,31 @@ func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *al
requestFinish.Lock()
responseFinish.Lock()
session := raw.NewClientSession(proto.DefaultIDHash)
session := raw.NewClientSession(protocol.DefaultIDHash)
go this.handleRequest(session, conn, request, payload, input, &requestFinish)
go this.handleResponse(session, conn, request, destination, output, &responseFinish)
requestFinish.Lock()
responseFinish.Lock()
output.Close()
input.Release()
return nil
}
func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn net.Conn, request *proto.RequestHeader, payload *alloc.Buffer, input v2io.Reader, finish *sync.Mutex) {
func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn net.Conn, request *protocol.RequestHeader, payload *alloc.Buffer, input v2io.Reader, finish *sync.Mutex) {
defer finish.Unlock()
defer payload.Release()
writer := v2io.NewBufferedWriter(conn)
defer writer.Release()
session.EncodeRequestHeader(request, writer)
if request.Option.IsChunkStream() {
vmessio.Authenticate(payload)
}
bodyWriter := session.EncodeRequestBody(writer)
bodyWriter.Write(payload.Value)
writer.SetCached(false)
var streamWriter v2io.Writer = v2io.NewAdaptiveWriter(bodyWriter)
if request.Option.IsChunkStream() {
streamWriter = vmessio.NewAuthChunkWriter(streamWriter)
}
streamWriter.Write(payload)
writer.SetCached(false)
v2io.Pipe(input, streamWriter)
if request.Option.IsChunkStream() {
streamWriter.Write(alloc.NewSmallBuffer().Clear())
@@ -98,7 +90,7 @@ func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn
return
}
func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, conn net.Conn, request *proto.RequestHeader, dest v2net.Destination, output v2io.Writer, finish *sync.Mutex) {
func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, conn net.Conn, request *protocol.RequestHeader, dest v2net.Destination, output v2io.Writer, finish *sync.Mutex) {
defer finish.Unlock()
reader := v2io.NewBufferedReader(conn)
@@ -112,7 +104,7 @@ func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, con
go this.handleCommand(dest, header.Command)
reader.SetCached(false)
decryptReader := session.DecodeResponseBody(conn)
decryptReader := session.DecodeResponseBody(reader)
var bodyReader v2io.Reader
if request.Option.IsChunkStream() {

View File

@@ -6,23 +6,23 @@ import (
"github.com/v2ray/v2ray-core/common/dice"
v2net "github.com/v2ray/v2ray-core/common/net"
proto "github.com/v2ray/v2ray-core/common/protocol"
"github.com/v2ray/v2ray-core/common/protocol"
)
type Receiver struct {
sync.RWMutex
Destination v2net.Destination
Accounts []*proto.User
Accounts []*protocol.User
}
func NewReceiver(dest v2net.Destination, users ...*proto.User) *Receiver {
func NewReceiver(dest v2net.Destination, users ...*protocol.User) *Receiver {
return &Receiver{
Destination: dest,
Accounts: users,
}
}
func (this *Receiver) HasUser(user *proto.User) bool {
func (this *Receiver) HasUser(user *protocol.User) bool {
this.RLock()
defer this.RUnlock()
for _, u := range this.Accounts {
@@ -34,7 +34,7 @@ func (this *Receiver) HasUser(user *proto.User) bool {
return false
}
func (this *Receiver) AddUser(user *proto.User) {
func (this *Receiver) AddUser(user *protocol.User) {
if this.HasUser(user) {
return
}
@@ -43,7 +43,7 @@ func (this *Receiver) AddUser(user *proto.User) {
this.Unlock()
}
func (this *Receiver) PickUser() *proto.User {
func (this *Receiver) PickUser() *protocol.User {
return this.Accounts[dice.Roll(len(this.Accounts))]
}
@@ -125,7 +125,7 @@ func (this *ReceiverManager) pickStdReceiver() *Receiver {
return this.receivers[dice.Roll(len(this.receivers))]
}
func (this *ReceiverManager) PickReceiver() (v2net.Destination, *proto.User) {
func (this *ReceiverManager) PickReceiver() (v2net.Destination, *protocol.User) {
rec := this.pickDetour()
if rec == nil {
rec = this.pickStdReceiver()

View File

@@ -7,7 +7,7 @@ import (
"github.com/v2ray/v2ray-core/common/log"
v2net "github.com/v2ray/v2ray-core/common/net"
proto "github.com/v2ray/v2ray-core/common/protocol"
"github.com/v2ray/v2ray-core/common/protocol"
"github.com/v2ray/v2ray-core/proxy/internal"
)
@@ -15,7 +15,7 @@ func (this *Receiver) UnmarshalJSON(data []byte) error {
type RawConfigTarget struct {
Address *v2net.AddressJson `json:"address"`
Port v2net.Port `json:"port"`
Users []*proto.User `json:"users"`
Users []*protocol.User `json:"users"`
}
var rawConfig RawConfigTarget
if err := json.Unmarshal(data, &rawConfig); err != nil {

View File

@@ -4,7 +4,7 @@ import (
"testing"
v2net "github.com/v2ray/v2ray-core/common/net"
proto "github.com/v2ray/v2ray-core/common/protocol"
"github.com/v2ray/v2ray-core/common/protocol"
"github.com/v2ray/v2ray-core/common/uuid"
. "github.com/v2ray/v2ray-core/proxy/vmess/outbound"
v2testing "github.com/v2ray/v2ray-core/testing"
@@ -14,14 +14,16 @@ import (
func TestReceiverUser(t *testing.T) {
v2testing.Current(t)
id := proto.NewID(uuid.New())
user := proto.NewUser(id, proto.UserLevel(0), 100, "")
id := protocol.NewID(uuid.New())
alters := protocol.NewAlterIDs(id, 100)
user := protocol.NewUser(id, alters, protocol.UserLevel(0), "")
rec := NewReceiver(v2net.TCPDestination(v2net.DomainAddress("v2ray.com"), 80), user)
assert.Bool(rec.HasUser(user)).IsTrue()
assert.Int(len(rec.Accounts)).Equals(1)
id2 := proto.NewID(uuid.New())
user2 := proto.NewUser(id2, proto.UserLevel(0), 100, "")
id2 := protocol.NewID(uuid.New())
alters2 := protocol.NewAlterIDs(id2, 100)
user2 := protocol.NewUser(id2, alters2, protocol.UserLevel(0), "")
assert.Bool(rec.HasUser(user2)).IsFalse()
rec.AddUser(user2)

View File

@@ -8,7 +8,7 @@ import (
"github.com/v2ray/v2ray-core/app/dispatcher"
v2net "github.com/v2ray/v2ray-core/common/net"
v2nettesting "github.com/v2ray/v2ray-core/common/net/testing"
proto "github.com/v2ray/v2ray-core/common/protocol"
"github.com/v2ray/v2ray-core/common/protocol"
"github.com/v2ray/v2ray-core/common/uuid"
"github.com/v2ray/v2ray-core/proxy"
proxytesting "github.com/v2ray/v2ray-core/proxy/testing"
@@ -26,7 +26,7 @@ func TestVMessInAndOut(t *testing.T) {
id, err := uuid.ParseString("ad937d9d-6e23-4a5a-ba23-bce5092a7c51")
assert.Error(err).IsNil()
testAccount := proto.NewID(id)
testAccount := protocol.NewID(id)
portA := v2nettesting.PickPort()
portB := v2nettesting.PickPort()

View File

@@ -69,7 +69,7 @@ VDIS="64"
if [[ "$ARCH" == "i686" ]] || [[ "$ARCH" == "i386" ]]; then
VDIS="32"
elif [[ "$ARCH" == *"armv7"* ]]; then
elif [[ "$ARCH" == *"armv7"* ]] || [[ "$ARCH" == "armv6l" ]]; then
VDIS="arm"
elif [[ "$ARCH" == *"armv8"* ]]; then
VDIS="arm64"

93
release/mobile/main.go Normal file
View File

@@ -0,0 +1,93 @@
package main
import (
"flag"
"fmt"
"os"
"path/filepath"
"github.com/v2ray/v2ray-core"
_ "github.com/v2ray/v2ray-core/app/router/rules"
"github.com/v2ray/v2ray-core/common/log"
"github.com/v2ray/v2ray-core/shell/point"
// The following are necessary as they register handlers in their init functions.
_ "github.com/v2ray/v2ray-core/proxy/blackhole"
_ "github.com/v2ray/v2ray-core/proxy/dokodemo"
_ "github.com/v2ray/v2ray-core/proxy/freedom"
_ "github.com/v2ray/v2ray-core/proxy/http"
_ "github.com/v2ray/v2ray-core/proxy/shadowsocks"
_ "github.com/v2ray/v2ray-core/proxy/socks"
_ "github.com/v2ray/v2ray-core/proxy/vmess/inbound"
_ "github.com/v2ray/v2ray-core/proxy/vmess/outbound"
)
var (
configFile string
logLevel = flag.String("loglevel", "warning", "Level of log info to be printed to console, available value: debug, info, warning, error")
version = flag.Bool("version", false, "Show current version of V2Ray.")
test = flag.Bool("test", false, "Test config file only, without launching V2Ray server.")
)
func init() {
defaultConfigFile := ""
workingDir, err := filepath.Abs(filepath.Dir(os.Args[0]))
if err == nil {
defaultConfigFile = filepath.Join(workingDir, "config.json")
}
flag.StringVar(&configFile, "config", defaultConfigFile, "Config file for this Point server.")
}
func main() {
flag.Parse()
core.PrintVersion()
if *version {
return
}
switch *logLevel {
case "debug":
log.SetLogLevel(log.DebugLevel)
case "info":
log.SetLogLevel(log.InfoLevel)
case "warning":
log.SetLogLevel(log.WarningLevel)
case "error":
log.SetLogLevel(log.ErrorLevel)
default:
fmt.Println("Unknown log level: " + *logLevel)
return
}
if len(configFile) == 0 {
log.Error("Config file is not set.")
return
}
config, err := point.LoadConfig(configFile)
if err != nil {
log.Error("Failed to read config file (", configFile, "): ", configFile, err)
return
}
vPoint, err := point.NewPoint(config)
if err != nil {
log.Error("Failed to create Point server: ", err)
return
}
if *test {
fmt.Println("Configuration OK.")
return
}
err = vPoint.Start()
if err != nil {
log.Error("Error starting Point server: ", err)
return
}
finish := make(chan bool)
<-finish
}

View File

@@ -4,6 +4,7 @@ import (
"flag"
"fmt"
"os"
"os/signal"
"path/filepath"
"github.com/v2ray/v2ray-core"
@@ -92,6 +93,9 @@ func main() {
return
}
finish := make(chan bool)
<-finish
osSignals := make(chan os.Signal, 1)
signal.Notify(osSignals, os.Interrupt, os.Kill)
<-osSignals
vPoint.Close()
}

View File

@@ -1,6 +1,7 @@
package point
import (
"github.com/v2ray/v2ray-core/app/dns"
"github.com/v2ray/v2ray-core/app/router"
"github.com/v2ray/v2ray-core/common/log"
v2net "github.com/v2ray/v2ray-core/common/net"
@@ -47,6 +48,7 @@ type Config struct {
Port v2net.Port
LogConfig *LogConfig
RouterConfig *router.Config
DNSConfig *dns.Config
InboundConfig *ConnectionConfig
OutboundConfig *ConnectionConfig
InboundDetours []*InboundDetourConfig

View File

@@ -8,6 +8,7 @@ import (
"os"
"strings"
"github.com/v2ray/v2ray-core/app/dns"
"github.com/v2ray/v2ray-core/app/router"
"github.com/v2ray/v2ray-core/common/log"
v2net "github.com/v2ray/v2ray-core/common/net"
@@ -22,6 +23,7 @@ func (this *Config) UnmarshalJSON(data []byte) error {
Port v2net.Port `json:"port"` // Port of this Point server.
LogConfig *LogConfig `json:"log"`
RouterConfig *router.Config `json:"routing"`
DNSConfig *dns.Config `json:"dns"`
InboundConfig *ConnectionConfig `json:"inbound"`
OutboundConfig *ConnectionConfig `json:"outbound"`
InboundDetours []*InboundDetourConfig `json:"inboundDetour"`
@@ -38,6 +40,14 @@ func (this *Config) UnmarshalJSON(data []byte) error {
this.OutboundConfig = jsonConfig.OutboundConfig
this.InboundDetours = jsonConfig.InboundDetours
this.OutboundDetours = jsonConfig.OutboundDetours
if jsonConfig.DNSConfig == nil {
jsonConfig.DNSConfig = &dns.Config{
NameServers: []v2net.Destination{
v2net.UDPDestination(v2net.DomainAddress("localhost"), v2net.Port(53)),
},
}
}
this.DNSConfig = jsonConfig.DNSConfig
return nil
}

View File

@@ -7,6 +7,7 @@ package point
import (
"github.com/v2ray/v2ray-core/app"
"github.com/v2ray/v2ray-core/app/dispatcher"
"github.com/v2ray/v2ray-core/app/dns"
"github.com/v2ray/v2ray-core/app/proxyman"
"github.com/v2ray/v2ray-core/app/router"
"github.com/v2ray/v2ray-core/common/log"
@@ -120,9 +121,15 @@ func NewPoint(pConfig *Config) (*Point, error) {
}
}
dnsConfig := pConfig.DNSConfig
if dnsConfig != nil {
dnsServer := dns.NewCacheServer(vpoint.space.ForContext("system.dns"), dnsConfig)
vpoint.space.Bind(dns.APP_ID, dnsServer)
}
routerConfig := pConfig.RouterConfig
if routerConfig != nil {
r, err := router.CreateRouter(routerConfig.Strategy, routerConfig.Settings)
r, err := router.CreateRouter(routerConfig.Strategy, routerConfig.Settings, vpoint.space.ForContext("system.router"))
if err != nil {
log.Error("Failed to create router: ", err)
return nil, ErrorBadConfiguration
@@ -196,7 +203,7 @@ func (this *Point) FilterPacketAndDispatch(destination v2net.Destination, link r
payload, err := link.OutboundInput().Read()
if err != nil {
log.Info("Point: No payload to dispatch, stopping dispatching now.")
link.OutboundOutput().Close()
link.OutboundOutput().Release()
link.OutboundInput().Release()
return
}

View File

@@ -1,6 +1,8 @@
package scenarios
import (
"bytes"
"io"
"net"
"testing"
@@ -42,10 +44,10 @@ func TestDynamicVMess(t *testing.T) {
conn.CloseWrite()
response := make([]byte, 1024)
nBytes, err = conn.Read(response)
response := bytes.NewBuffer(nil)
_, err = io.Copy(response, conn)
assert.Error(err).IsNil()
assert.StringLiteral("Processed: " + payload).Equals(string(response[:nBytes]))
assert.StringLiteral("Processed: " + payload).Equals(string(response.Bytes()))
conn.Close()
}

View File

@@ -192,7 +192,7 @@ func TestUDPAssociate(t *testing.T) {
for i := 0; i < 100; i++ {
udpPayload := "UDP request to udp server."
udpRequest := socks5UDPRequest(v2net.UDPDestination(v2net.IPAddress([]byte{127, 0, 0, 1}), targetPort), []byte(udpPayload))
udpRequest := socks5UDPRequest(v2net.UDPDestination(v2net.LocalHostIP, targetPort), []byte(udpPayload))
nBytes, err = udpConn.Write(udpRequest)
assert.Int(nBytes).Equals(len(udpRequest))
@@ -202,7 +202,7 @@ func TestUDPAssociate(t *testing.T) {
nBytes, err = udpConn.Read(udpResponse)
assert.Error(err).IsNil()
assert.Bytes(udpResponse[:nBytes]).Equals(
socks5UDPRequest(v2net.UDPDestination(v2net.IPAddress([]byte{127, 0, 0, 1}), targetPort), []byte("Processed: UDP request to udp server.")))
socks5UDPRequest(v2net.UDPDestination(v2net.LocalHostIP, targetPort), []byte("Processed: UDP request to udp server.")))
}
udpConn.Close()

View File

@@ -28,7 +28,7 @@ func (server *Server) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
func (server *Server) Start() (v2net.Destination, error) {
go http.ListenAndServe(":"+server.Port.String(), server)
return v2net.TCPDestination(v2net.IPAddress([]byte{127, 0, 0, 1}), v2net.Port(server.Port)), nil
return v2net.TCPDestination(v2net.LocalHostIP, v2net.Port(server.Port)), nil
}
func (this *Server) Close() {

View File

@@ -32,7 +32,6 @@ func (server *Server) Start() (v2net.Destination, error) {
func (server *Server) acceptConnections(listener *net.TCPListener) {
server.accepting = true
defer listener.Close()
for server.accepting {
conn, err := listener.Accept()
if err != nil {

View File

@@ -31,8 +31,7 @@ func (server *Server) Start() (v2net.Destination, error) {
func (server *Server) handleConnection(conn *net.UDPConn) {
server.accepting = true
defer conn.Close()
for {
for server.accepting {
buffer := make([]byte, 2*1024)
nBytes, addr, err := conn.ReadFromUDP(buffer)
if err != nil {

View File

@@ -1,36 +1,18 @@
package hub
import (
"crypto/tls"
"net"
"time"
"github.com/v2ray/v2ray-core/common"
)
type ConnectionHandler func(Connection)
type ConnectionHandler func(*Connection)
type Connection interface {
common.Releasable
Read([]byte) (int, error)
Write([]byte) (int, error)
Close() error
LocalAddr() net.Addr
RemoteAddr() net.Addr
SetDeadline(t time.Time) error
SetReadDeadline(t time.Time) error
SetWriteDeadline(t time.Time) error
CloseRead() error
CloseWrite() error
}
type TCPConnection struct {
conn *net.TCPConn
type Connection struct {
conn net.Conn
listener *TCPHub
}
func (this *TCPConnection) Read(b []byte) (int, error) {
func (this *Connection) Read(b []byte) (int, error) {
if this == nil || this.conn == nil {
return 0, ErrorClosedConnection
}
@@ -38,14 +20,14 @@ func (this *TCPConnection) Read(b []byte) (int, error) {
return this.conn.Read(b)
}
func (this *TCPConnection) Write(b []byte) (int, error) {
func (this *Connection) Write(b []byte) (int, error) {
if this == nil || this.conn == nil {
return 0, ErrorClosedConnection
}
return this.conn.Write(b)
}
func (this *TCPConnection) Close() error {
func (this *Connection) Close() error {
if this == nil || this.conn == nil {
return ErrorClosedConnection
}
@@ -53,7 +35,7 @@ func (this *TCPConnection) Close() error {
return err
}
func (this *TCPConnection) Release() {
func (this *Connection) Release() {
if this == nil || this.listener == nil {
return
}
@@ -63,101 +45,22 @@ func (this *TCPConnection) Release() {
this.listener = nil
}
func (this *TCPConnection) LocalAddr() net.Addr {
func (this *Connection) LocalAddr() net.Addr {
return this.conn.LocalAddr()
}
func (this *TCPConnection) RemoteAddr() net.Addr {
func (this *Connection) RemoteAddr() net.Addr {
return this.conn.RemoteAddr()
}
func (this *TCPConnection) SetDeadline(t time.Time) error {
func (this *Connection) SetDeadline(t time.Time) error {
return this.conn.SetDeadline(t)
}
func (this *TCPConnection) SetReadDeadline(t time.Time) error {
func (this *Connection) SetReadDeadline(t time.Time) error {
return this.conn.SetReadDeadline(t)
}
func (this *TCPConnection) SetWriteDeadline(t time.Time) error {
func (this *Connection) SetWriteDeadline(t time.Time) error {
return this.conn.SetWriteDeadline(t)
}
func (this *TCPConnection) CloseRead() error {
if this == nil || this.conn == nil {
return nil
}
return this.conn.CloseRead()
}
func (this *TCPConnection) CloseWrite() error {
if this == nil || this.conn == nil {
return nil
}
return this.conn.CloseWrite()
}
type TLSConnection struct {
conn *tls.Conn
listener *TCPHub
}
func (this *TLSConnection) Read(b []byte) (int, error) {
if this == nil || this.conn == nil {
return 0, ErrorClosedConnection
}
return this.conn.Read(b)
}
func (this *TLSConnection) Write(b []byte) (int, error) {
if this == nil || this.conn == nil {
return this.conn.Write(b)
}
return this.conn.Write(b)
}
func (this *TLSConnection) Close() error {
if this == nil || this.conn == nil {
return ErrorClosedConnection
}
err := this.conn.Close()
return err
}
func (this *TLSConnection) Release() {
if this == nil || this.listener == nil {
return
}
this.Close()
this.conn = nil
this.listener = nil
}
func (this *TLSConnection) LocalAddr() net.Addr {
return this.conn.LocalAddr()
}
func (this *TLSConnection) RemoteAddr() net.Addr {
return this.conn.RemoteAddr()
}
func (this *TLSConnection) SetDeadline(t time.Time) error {
return this.conn.SetDeadline(t)
}
func (this *TLSConnection) SetReadDeadline(t time.Time) error {
return this.conn.SetReadDeadline(t)
}
func (this *TLSConnection) SetWriteDeadline(t time.Time) error {
return this.conn.SetWriteDeadline(t)
}
func (this *TLSConnection) CloseRead() error {
return nil
}
func (this *TLSConnection) CloseWrite() error {
return nil
}

View File

@@ -1,6 +1,7 @@
package hub
import (
"crypto/tls"
"errors"
"net"
@@ -13,12 +14,12 @@ var (
)
type TCPHub struct {
listener *net.TCPListener
listener net.Listener
connCallback ConnectionHandler
accepting bool
}
func ListenTCP(port v2net.Port, callback ConnectionHandler) (*TCPHub, error) {
func ListenTCP(port v2net.Port, callback ConnectionHandler, tlsConfig *tls.Config) (*TCPHub, error) {
listener, err := net.ListenTCP("tcp", &net.TCPAddr{
IP: []byte{0, 0, 0, 0},
Port: int(port),
@@ -27,12 +28,22 @@ func ListenTCP(port v2net.Port, callback ConnectionHandler) (*TCPHub, error) {
if err != nil {
return nil, err
}
tcpListener := &TCPHub{
listener: listener,
connCallback: callback,
var hub *TCPHub
if tlsConfig != nil {
tlsListener := tls.NewListener(listener, tlsConfig)
hub = &TCPHub{
listener: tlsListener,
connCallback: callback,
}
} else {
hub = &TCPHub{
listener: listener,
connCallback: callback,
}
}
go tcpListener.start()
return tcpListener, nil
go hub.start()
return hub, nil
}
func (this *TCPHub) Close() {
@@ -44,14 +55,14 @@ func (this *TCPHub) Close() {
func (this *TCPHub) start() {
this.accepting = true
for this.accepting {
conn, err := this.listener.AcceptTCP()
conn, err := this.listener.Accept()
if err != nil {
if this.accepting {
log.Warning("Listener: Failed to accept new TCP connection: ", err)
}
continue
}
go this.connCallback(&TCPConnection{
go this.connCallback(&Connection{
conn: conn,
listener: this,
})

View File

@@ -2,70 +2,165 @@ package hub
import (
"sync"
"time"
"github.com/v2ray/v2ray-core/app/dispatcher"
"github.com/v2ray/v2ray-core/common/alloc"
"github.com/v2ray/v2ray-core/common/log"
v2net "github.com/v2ray/v2ray-core/common/net"
"github.com/v2ray/v2ray-core/transport/ray"
)
type UDPResponseCallback func(destination v2net.Destination, payload *alloc.Buffer)
type connEntry struct {
type TimedInboundRay struct {
name string
inboundRay ray.InboundRay
callback UDPResponseCallback
accessed chan bool
server *UDPServer
sync.RWMutex
}
func NewTimedInboundRay(name string, inboundRay ray.InboundRay, server *UDPServer) *TimedInboundRay {
r := &TimedInboundRay{
name: name,
inboundRay: inboundRay,
accessed: make(chan bool, 1),
server: server,
}
go r.Monitor()
return r
}
func (this *TimedInboundRay) Monitor() {
for {
time.Sleep(time.Second * 16)
select {
case <-this.accessed:
default:
// Ray not accessed for a while, assuming communication is dead.
this.RLock()
if this.server == nil {
this.RUnlock()
return
}
this.server.RemoveRay(this.name)
this.RUnlock()
this.Release()
return
}
}
}
func (this *TimedInboundRay) InboundInput() ray.OutputStream {
this.RLock()
defer this.RUnlock()
if this.inboundRay == nil {
return nil
}
select {
case this.accessed <- true:
default:
}
return this.inboundRay.InboundInput()
}
func (this *TimedInboundRay) InboundOutput() ray.InputStream {
this.RLock()
defer this.RUnlock()
if this.inboundRay == nil {
return nil
}
select {
case this.accessed <- true:
default:
}
return this.inboundRay.InboundOutput()
}
func (this *TimedInboundRay) Release() {
log.Debug("UDP Server: Releasing TimedInboundRay: ", this.name)
this.Lock()
defer this.Unlock()
if this.server == nil {
return
}
this.server = nil
this.inboundRay.InboundInput().Close()
this.inboundRay.InboundOutput().Release()
this.inboundRay = nil
}
type UDPServer struct {
sync.RWMutex
conns map[string]*connEntry
conns map[string]*TimedInboundRay
packetDispatcher dispatcher.PacketDispatcher
}
func NewUDPServer(packetDispatcher dispatcher.PacketDispatcher) *UDPServer {
return &UDPServer{
conns: make(map[string]*connEntry),
conns: make(map[string]*TimedInboundRay),
packetDispatcher: packetDispatcher,
}
}
func (this *UDPServer) locateExistingAndDispatch(dest string, payload *alloc.Buffer) bool {
func (this *UDPServer) RemoveRay(name string) {
this.Lock()
defer this.Unlock()
delete(this.conns, name)
}
func (this *UDPServer) locateExistingAndDispatch(name string, payload *alloc.Buffer) bool {
log.Debug("UDP Server: Locating existing connection for ", name)
this.RLock()
defer this.RUnlock()
if entry, found := this.conns[dest]; found {
entry.inboundRay.InboundInput().Write(payload)
if entry, found := this.conns[name]; found {
outputStream := entry.InboundInput()
if outputStream == nil {
return false
}
err := outputStream.Write(payload)
if err != nil {
go entry.Release()
return false
}
return true
}
return false
}
func (this *UDPServer) Dispatch(source v2net.Destination, destination v2net.Destination, payload *alloc.Buffer, callback UDPResponseCallback) {
destString := source.String() + "-" + destination.NetAddr()
destString := source.NetAddr() + "-" + destination.NetAddr()
log.Debug("UDP Server: Dispatch request: ", destString)
if this.locateExistingAndDispatch(destString, payload) {
return
}
this.Lock()
log.Info("UDP Server: establishing new connection for ", destString)
inboundRay := this.packetDispatcher.DispatchToOutbound(destination)
inboundRay.InboundInput().Write(payload)
this.conns[destString] = &connEntry{
inboundRay: inboundRay,
callback: callback,
timedInboundRay := NewTimedInboundRay(destString, inboundRay, this)
outputStream := timedInboundRay.InboundInput()
if outputStream != nil {
outputStream.Write(payload)
}
this.Lock()
this.conns[destString] = timedInboundRay
this.Unlock()
go this.handleConnection(destString, inboundRay, source, callback)
go this.handleConnection(timedInboundRay, source, callback)
}
func (this *UDPServer) handleConnection(destString string, inboundRay ray.InboundRay, source v2net.Destination, callback UDPResponseCallback) {
func (this *UDPServer) handleConnection(inboundRay *TimedInboundRay, source v2net.Destination, callback UDPResponseCallback) {
for {
inputStream := inboundRay.InboundOutput()
if inputStream == nil {
break
}
data, err := inboundRay.InboundOutput().Read()
if err != nil {
break
}
callback(source, data)
}
this.Lock()
delete(this.conns, destString)
this.Unlock()
inboundRay.Release()
}

View File

@@ -1,8 +1,10 @@
package ray
import (
"errors"
"io"
"sync"
"time"
"github.com/v2ray/v2ray-core/common/alloc"
)
@@ -11,6 +13,10 @@ const (
bufferSize = 128
)
var (
ErrorIOTimeout = errors.New("IO Timeout")
)
// NewRay creates a new Ray for direct traffic transport.
func NewRay() Ray {
return &directRay{
@@ -74,13 +80,26 @@ func (this *Stream) Write(data *alloc.Buffer) error {
if this.closed {
return io.EOF
}
for {
err := this.TryWriteOnce(data)
if err != ErrorIOTimeout {
return err
}
}
}
func (this *Stream) TryWriteOnce(data *alloc.Buffer) error {
this.access.RLock()
defer this.access.RUnlock()
if this.closed {
return io.EOF
}
this.buffer <- data
return nil
select {
case this.buffer <- data:
return nil
case <-time.After(2 * time.Second):
return ErrorIOTimeout
}
}
func (this *Stream) Close() {