close timer faster

pull/298/merge
Darien Raymond 2017-05-09 00:01:15 +02:00
parent aea71c2aa8
commit 5829b45bbe
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
12 changed files with 96 additions and 90 deletions

View File

@ -141,11 +141,14 @@ func (h *DynamicInboundHandler) refresh() error {
}
func (h *DynamicInboundHandler) monitor() {
timer := time.NewTicker(time.Minute * time.Duration(h.receiverConfig.AllocationStrategy.GetRefreshValue()))
defer timer.Stop()
for {
select {
case <-h.ctx.Done():
return
case <-time.After(time.Minute * time.Duration(h.receiverConfig.AllocationStrategy.GetRefreshValue())):
case <-timer.C:
h.refresh()
}
}

View File

@ -278,11 +278,14 @@ func (w *udpWorker) Close() {
}
func (w *udpWorker) monitor() {
timer := time.NewTicker(time.Second * 16)
defer timer.Stop()
for {
select {
case <-w.ctx.Done():
return
case <-time.After(time.Second * 16):
case <-timer.C:
nowSec := time.Now().Unix()
w.Lock()
for addr, conn := range w.activeConn {

View File

@ -115,6 +115,9 @@ func (m *Client) Closed() bool {
func (m *Client) monitor() {
defer m.manager.onClientFinish()
timer := time.NewTicker(time.Second * 16)
defer timer.Stop()
for {
select {
case <-m.ctx.Done():
@ -122,7 +125,7 @@ func (m *Client) monitor() {
m.inboundRay.InboundInput().Close()
m.inboundRay.InboundOutput().CloseError()
return
case <-time.After(time.Second * 16):
case <-timer.C:
size := m.sessionManager.Size()
if size == 0 && m.sessionManager.CloseIfNoSession() {
m.cancel()

View File

@ -35,7 +35,7 @@ func (r *retryer) On(method func() error) error {
accumulatedError = append(accumulatedError, err)
}
delay := r.nextDelay()
<-time.After(time.Duration(delay) * time.Millisecond)
time.Sleep(time.Duration(delay) * time.Millisecond)
attempt++
}
return newError(accumulatedError).Base(ErrRetryFailed)

View File

@ -100,7 +100,7 @@ func (*Server) handleUDP() error {
// The TCP connection closes after v method returns. We need to wait until
// the client closes it.
// TODO: get notified from UDP part
<-time.After(5 * time.Minute)
time.Sleep(5 * time.Minute)
return nil
}

View File

@ -34,7 +34,7 @@ func TestBuildAndRun(t *testing.T) {
cmd.Stderr = errBuffer
cmd.Start()
<-time.After(1 * time.Second)
time.Sleep(1 * time.Second)
cmd.Process.Kill()
outStr := string(outBuffer.Bytes())

View File

@ -18,30 +18,26 @@ import (
func TestDialAndListen(t *testing.T) {
assert := assert.On(t)
conns := make(chan internet.Connection, 16)
listerner, err := NewListener(internet.ContextWithTransportSettings(context.Background(), &Config{}), v2net.LocalHostIP, v2net.Port(0), conns)
listerner, err := NewListener(internet.ContextWithTransportSettings(context.Background(), &Config{}), v2net.LocalHostIP, v2net.Port(0), func(ctx context.Context, conn internet.Connection) bool {
go func(c internet.Connection) {
payload := make([]byte, 4096)
for {
nBytes, err := c.Read(payload)
if err != nil {
break
}
for idx, b := range payload[:nBytes] {
payload[idx] = b ^ 'c'
}
c.Write(payload[:nBytes])
}
c.Close()
}(conn)
return true
})
assert.Error(err).IsNil()
port := v2net.Port(listerner.Addr().(*net.UDPAddr).Port)
go func() {
for conn := range conns {
go func(c internet.Connection) {
payload := make([]byte, 4096)
for {
nBytes, err := c.Read(payload)
if err != nil {
break
}
for idx, b := range payload[:nBytes] {
payload[idx] = b ^ 'c'
}
c.Write(payload[:nBytes])
}
c.Close()
}(conn)
}
}()
ctx := internet.ContextWithTransportSettings(context.Background(), &Config{})
wg := new(sync.WaitGroup)
for i := 0; i < 10; i++ {
@ -76,5 +72,4 @@ func TestDialAndListen(t *testing.T) {
assert.Int(listerner.ActiveConnections()).Equals(0)
listerner.Close()
close(conns)
}

View File

@ -81,10 +81,10 @@ type Listener struct {
reader PacketReader
header internet.PacketHeader
security cipher.AEAD
conns chan<- internet.Connection
addConn internet.AddConnection
}
func NewListener(ctx context.Context, address v2net.Address, port v2net.Port, conns chan<- internet.Connection) (*Listener, error) {
func NewListener(ctx context.Context, address v2net.Address, port v2net.Port, addConn internet.AddConnection) (*Listener, error) {
networkSettings := internet.TransportSettingsFromContext(ctx)
kcpSettings := networkSettings.(*Config)
@ -106,7 +106,7 @@ func NewListener(ctx context.Context, address v2net.Address, port v2net.Port, co
sessions: make(map[ConnectionID]*Connection),
ctx: ctx,
config: kcpSettings,
conns: conns,
addConn: addConn,
}
securitySettings := internet.SecuritySettingsFromContext(ctx)
if securitySettings != nil {
@ -190,10 +190,7 @@ func (v *Listener) OnReceive(payload *buf.Buffer, src v2net.Destination, origina
netConn = tlsConn
}
select {
case v.conns <- netConn:
case <-time.After(time.Second * 5):
conn.Close()
if !v.addConn(context.Background(), netConn) {
return
}
v.sessions[id] = conn
@ -254,8 +251,8 @@ func (v *Writer) Close() error {
return nil
}
func ListenKCP(ctx context.Context, address v2net.Address, port v2net.Port, conns chan<- internet.Connection) (internet.Listener, error) {
return NewListener(ctx, address, port, conns)
func ListenKCP(ctx context.Context, address v2net.Address, port v2net.Port, addConn internet.AddConnection) (internet.Listener, error) {
return NewListener(ctx, address, port, addConn)
}
func init() {

View File

@ -4,7 +4,6 @@ import (
"context"
gotls "crypto/tls"
"net"
"time"
"v2ray.com/core/app/log"
"v2ray.com/core/common"
@ -20,10 +19,10 @@ type TCPListener struct {
tlsConfig *gotls.Config
authConfig internet.ConnectionAuthenticator
config *Config
conns chan<- internet.Connection
addConn internet.AddConnection
}
func ListenTCP(ctx context.Context, address v2net.Address, port v2net.Port, conns chan<- internet.Connection) (internet.Listener, error) {
func ListenTCP(ctx context.Context, address v2net.Address, port v2net.Port, addConn internet.AddConnection) (internet.Listener, error) {
listener, err := net.ListenTCP("tcp", &net.TCPAddr{
IP: address.IP(),
Port: int(port),
@ -39,7 +38,7 @@ func ListenTCP(ctx context.Context, address v2net.Address, port v2net.Port, conn
ctx: ctx,
listener: listener,
config: tcpSettings,
conns: conns,
addConn: addConn,
}
if securitySettings := internet.SecuritySettingsFromContext(ctx); securitySettings != nil {
tlsConfig, ok := securitySettings.(*tls.Config)
@ -90,11 +89,7 @@ func (v *TCPListener) KeepAccepting() {
conn = v.authConfig.Server(conn)
}
select {
case v.conns <- internet.Connection(conn):
case <-time.After(time.Second * 5):
conn.Close()
}
v.addConn(context.Background(), internet.Connection(conn))
}
}

View File

@ -3,6 +3,7 @@ package internet
import (
"context"
"net"
"time"
v2net "v2ray.com/core/common/net"
)
@ -19,7 +20,9 @@ func RegisterTransportListener(protocol TransportProtocol, listener ListenFunc)
return nil
}
type ListenFunc func(ctx context.Context, address v2net.Address, port v2net.Port, conns chan<- Connection) (Listener, error)
type AddConnection func(context.Context, Connection) bool
type ListenFunc func(ctx context.Context, address v2net.Address, port v2net.Port, addConn AddConnection) (Listener, error)
type Listener interface {
Close() error
@ -45,7 +48,26 @@ func ListenTCP(ctx context.Context, address v2net.Address, port v2net.Port, conn
if listenFunc == nil {
return nil, newError(protocol, " listener not registered.").AtError()
}
listener, err := listenFunc(ctx, address, port, conns)
listener, err := listenFunc(ctx, address, port, func(ctx context.Context, conn Connection) bool {
select {
case <-ctx.Done():
conn.Close()
return false
case conns <- conn:
return true
default:
select {
case <-ctx.Done():
conn.Close()
return false
case conns <- conn:
return true
case <-time.After(time.Second * 5):
conn.Close()
return false
}
}
})
if err != nil {
return nil, newError("failed to listen on address: ", address, ":", port).Base(err)
}

View File

@ -7,7 +7,6 @@ import (
"net/http"
"strconv"
"sync"
"time"
"github.com/gorilla/websocket"
"v2ray.com/core/app/log"
@ -33,13 +32,7 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
return
}
select {
case <-h.ln.ctx.Done():
conn.Close()
case h.ln.conns <- internet.Connection(conn):
case <-time.After(time.Second * 5):
conn.Close()
}
h.ln.addConn(h.ln.ctx, internet.Connection(conn))
}
type Listener struct {
@ -48,17 +41,17 @@ type Listener struct {
listener net.Listener
tlsConfig *tls.Config
config *Config
conns chan<- internet.Connection
addConn internet.AddConnection
}
func ListenWS(ctx context.Context, address v2net.Address, port v2net.Port, conns chan<- internet.Connection) (internet.Listener, error) {
func ListenWS(ctx context.Context, address v2net.Address, port v2net.Port, addConn internet.AddConnection) (internet.Listener, error) {
networkSettings := internet.TransportSettingsFromContext(ctx)
wsSettings := networkSettings.(*Config)
l := &Listener{
ctx: ctx,
config: wsSettings,
conns: conns,
ctx: ctx,
config: wsSettings,
addConn: addConn,
}
if securitySettings := internet.SecuritySettingsFromContext(ctx); securitySettings != nil {
tlsConfig, ok := securitySettings.(*v2tls.Config)

View File

@ -16,29 +16,26 @@ import (
func Test_listenWSAndDial(t *testing.T) {
assert := assert.On(t)
conns := make(chan internet.Connection, 16)
listen, err := ListenWS(internet.ContextWithTransportSettings(context.Background(), &Config{
Path: "ws",
}), v2net.DomainAddress("localhost"), 13146, conns)
}), v2net.DomainAddress("localhost"), 13146, func(ctx context.Context, conn internet.Connection) bool {
go func(c internet.Connection) {
defer c.Close()
var b [1024]byte
n, err := c.Read(b[:])
//assert.Error(err).IsNil()
if err != nil {
return
}
assert.Bool(bytes.HasPrefix(b[:n], []byte("Test connection"))).IsTrue()
_, err = c.Write([]byte("Response"))
assert.Error(err).IsNil()
}(conn)
return true
})
assert.Error(err).IsNil()
go func() {
for conn := range conns {
go func(c internet.Connection) {
defer c.Close()
var b [1024]byte
n, err := c.Read(b[:])
//assert.Error(err).IsNil()
if err != nil {
return
}
assert.Bool(bytes.HasPrefix(b[:n], []byte("Test connection"))).IsTrue()
_, err = c.Write([]byte("Response"))
assert.Error(err).IsNil()
}(conn)
}
}()
ctx := internet.ContextWithTransportSettings(context.Background(), &Config{Path: "ws"})
conn, err := Dial(ctx, v2net.TCPDestination(v2net.DomainAddress("localhost"), 13146))
@ -73,8 +70,6 @@ func Test_listenWSAndDial(t *testing.T) {
assert.Error(conn.Close()).IsNil()
assert.Error(listen.Close()).IsNil()
close(conns)
}
func Test_listenWSAndDial_TLS(t *testing.T) {
@ -91,14 +86,14 @@ func Test_listenWSAndDial_TLS(t *testing.T) {
AllowInsecure: true,
Certificate: []*v2tls.Certificate{tlsgen.GenerateCertificateForTest()},
})
conns := make(chan internet.Connection, 16)
listen, err := ListenWS(ctx, v2net.DomainAddress("localhost"), 13143, conns)
listen, err := ListenWS(ctx, v2net.DomainAddress("localhost"), 13143, func(ctx context.Context, conn internet.Connection) bool {
go func() {
conn.Close()
}()
return true
})
assert.Error(err).IsNil()
go func() {
conn := <-conns
conn.Close()
listen.Close()
}()
defer listen.Close()
conn, err := Dial(ctx, v2net.TCPDestination(v2net.DomainAddress("localhost"), 13143))
assert.Error(err).IsNil()