diff --git a/common/protocol/raw/server.go b/common/protocol/raw/server.go index 35d1bb7f..57cfb25d 100644 --- a/common/protocol/raw/server.go +++ b/common/protocol/raw/server.go @@ -30,6 +30,15 @@ func NewServerSession(validator protocol.UserValidator) *ServerSession { } } +func (this *ServerSession) Release() { + this.userValidator = nil + this.requestBodyIV = nil + this.requestBodyKey = nil + this.responseBodyIV = nil + this.responseBodyKey = nil + this.responseWriter = nil +} + func (this *ServerSession) DecodeRequestHeader(reader io.Reader) (*protocol.RequestHeader, error) { buffer := alloc.NewSmallBuffer() defer buffer.Release() diff --git a/common/protocol/user_validator.go b/common/protocol/user_validator.go index 5aae4e10..90d61cbd 100644 --- a/common/protocol/user_validator.go +++ b/common/protocol/user_validator.go @@ -3,6 +3,9 @@ package protocol import ( "sync" "time" + + "github.com/v2ray/v2ray-core/common" + "github.com/v2ray/v2ray-core/common/signal" ) const ( @@ -18,6 +21,8 @@ type idEntry struct { } type UserValidator interface { + common.Releasable + Add(user *User) error Get(timeHash []byte) (*User, Timestamp, bool) } @@ -28,6 +33,7 @@ type TimedUserValidator struct { ids []*idEntry access sync.RWMutex hasher IDHash + cancel *signal.CancelSignal } type indexTimePair struct { @@ -42,11 +48,23 @@ func NewTimedUserValidator(hasher IDHash) UserValidator { access: sync.RWMutex{}, ids: make([]*idEntry, 0, 512), hasher: hasher, + cancel: signal.NewCloseSignal(), } - go tus.updateUserHash(time.Tick(updateIntervalSec * time.Second)) + go tus.updateUserHash(time.Tick(updateIntervalSec*time.Second), tus.cancel) return tus } +func (this *TimedUserValidator) Release() { + this.cancel.Cancel() + this.cancel.WaitForDone() + + this.validUsers = nil + this.userHash = nil + this.ids = nil + this.hasher = nil + this.cancel = nil +} + func (this *TimedUserValidator) generateNewHashes(nowSec Timestamp, idx int, entry *idEntry) { var hashValue [16]byte var hashValueRemoval [16]byte @@ -70,13 +88,20 @@ func (this *TimedUserValidator) generateNewHashes(nowSec Timestamp, idx int, ent } } -func (this *TimedUserValidator) updateUserHash(tick <-chan time.Time) { - for now := range tick { - nowSec := Timestamp(now.Unix() + cacheDurationSec) - for _, entry := range this.ids { - this.generateNewHashes(nowSec, entry.userIdx, entry) +func (this *TimedUserValidator) updateUserHash(tick <-chan time.Time, cancel *signal.CancelSignal) { +L: + for { + select { + case now := <-tick: + nowSec := Timestamp(now.Unix() + cacheDurationSec) + for _, entry := range this.ids { + this.generateNewHashes(nowSec, entry.userIdx, entry) + } + case <-cancel.WaitForCancel(): + break L } } + cancel.Done() } func (this *TimedUserValidator) Add(user *User) error { diff --git a/common/signal/close.go b/common/signal/close.go new file mode 100644 index 00000000..dd28978a --- /dev/null +++ b/common/signal/close.go @@ -0,0 +1,29 @@ +package signal + +type CancelSignal struct { + cancel chan struct{} + done chan struct{} +} + +func NewCloseSignal() *CancelSignal { + return &CancelSignal{ + cancel: make(chan struct{}), + done: make(chan struct{}), + } +} + +func (this *CancelSignal) Cancel() { + close(this.cancel) +} + +func (this *CancelSignal) WaitForCancel() <-chan struct{} { + return this.cancel +} + +func (this *CancelSignal) Done() { + close(this.done) +} + +func (this *CancelSignal) WaitForDone() <-chan struct{} { + return this.done +} diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index 05ee9ac7..dc2290c2 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -124,6 +124,7 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.TCPConn) { defer reader.Release() session := raw.NewServerSession(this.clients) + defer session.Release() request, err := session.DecodeRequestHeader(reader) if err != nil {