mirror of https://github.com/XTLS/Xray-core
				
				
				
			
		
			
				
	
	
		
			242 lines
		
	
	
		
			4.1 KiB
		
	
	
	
		
			Go
		
	
	
			
		
		
	
	
			242 lines
		
	
	
		
			4.1 KiB
		
	
	
	
		
			Go
		
	
	
| package mux
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"io"
 | |
| 	"runtime"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/xtls/xray-core/common"
 | |
| 	"github.com/xtls/xray-core/common/buf"
 | |
| 	"github.com/xtls/xray-core/common/errors"
 | |
| 	"github.com/xtls/xray-core/common/net"
 | |
| 	"github.com/xtls/xray-core/common/protocol"
 | |
| 	"github.com/xtls/xray-core/transport/pipe"
 | |
| )
 | |
| 
 | |
| type SessionManager struct {
 | |
| 	sync.RWMutex
 | |
| 	sessions map[uint16]*Session
 | |
| 	count    uint16
 | |
| 	closed   bool
 | |
| }
 | |
| 
 | |
| func NewSessionManager() *SessionManager {
 | |
| 	return &SessionManager{
 | |
| 		count:    0,
 | |
| 		sessions: make(map[uint16]*Session, 16),
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (m *SessionManager) Closed() bool {
 | |
| 	m.RLock()
 | |
| 	defer m.RUnlock()
 | |
| 
 | |
| 	return m.closed
 | |
| }
 | |
| 
 | |
| func (m *SessionManager) Size() int {
 | |
| 	m.RLock()
 | |
| 	defer m.RUnlock()
 | |
| 
 | |
| 	return len(m.sessions)
 | |
| }
 | |
| 
 | |
| func (m *SessionManager) Count() int {
 | |
| 	m.RLock()
 | |
| 	defer m.RUnlock()
 | |
| 
 | |
| 	return int(m.count)
 | |
| }
 | |
| 
 | |
| func (m *SessionManager) Allocate() *Session {
 | |
| 	m.Lock()
 | |
| 	defer m.Unlock()
 | |
| 
 | |
| 	if m.closed {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	m.count++
 | |
| 	s := &Session{
 | |
| 		ID:     m.count,
 | |
| 		parent: m,
 | |
| 	}
 | |
| 	m.sessions[s.ID] = s
 | |
| 	return s
 | |
| }
 | |
| 
 | |
| func (m *SessionManager) Add(s *Session) bool {
 | |
| 	m.Lock()
 | |
| 	defer m.Unlock()
 | |
| 
 | |
| 	if m.closed {
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	m.count++
 | |
| 	m.sessions[s.ID] = s
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| func (m *SessionManager) Remove(locked bool, id uint16) {
 | |
| 	if !locked {
 | |
| 		m.Lock()
 | |
| 		defer m.Unlock()
 | |
| 	}
 | |
| 	locked = true
 | |
| 
 | |
| 	if m.closed {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	delete(m.sessions, id)
 | |
| 
 | |
| 	/*
 | |
| 		if len(m.sessions) == 0 {
 | |
| 			m.sessions = make(map[uint16]*Session, 16)
 | |
| 		}
 | |
| 	*/
 | |
| }
 | |
| 
 | |
| func (m *SessionManager) Get(id uint16) (*Session, bool) {
 | |
| 	m.RLock()
 | |
| 	defer m.RUnlock()
 | |
| 
 | |
| 	if m.closed {
 | |
| 		return nil, false
 | |
| 	}
 | |
| 
 | |
| 	s, found := m.sessions[id]
 | |
| 	return s, found
 | |
| }
 | |
| 
 | |
| func (m *SessionManager) CloseIfNoSession() bool {
 | |
| 	m.Lock()
 | |
| 	defer m.Unlock()
 | |
| 
 | |
| 	if m.closed {
 | |
| 		return true
 | |
| 	}
 | |
| 
 | |
| 	if len(m.sessions) != 0 {
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	m.closed = true
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| func (m *SessionManager) Close() error {
 | |
| 	m.Lock()
 | |
| 	defer m.Unlock()
 | |
| 
 | |
| 	if m.closed {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	m.closed = true
 | |
| 
 | |
| 	for _, s := range m.sessions {
 | |
| 		s.Close(true)
 | |
| 	}
 | |
| 
 | |
| 	m.sessions = nil
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Session represents a client connection in a Mux connection.
 | |
| type Session struct {
 | |
| 	input        buf.Reader
 | |
| 	output       buf.Writer
 | |
| 	parent       *SessionManager
 | |
| 	ID           uint16
 | |
| 	transferType protocol.TransferType
 | |
| 	closed       bool
 | |
| 	XUDP         *XUDP
 | |
| }
 | |
| 
 | |
| // Close closes all resources associated with this session.
 | |
| func (s *Session) Close(locked bool) error {
 | |
| 	if !locked {
 | |
| 		s.parent.Lock()
 | |
| 		defer s.parent.Unlock()
 | |
| 	}
 | |
| 	locked = true
 | |
| 	if s.closed {
 | |
| 		return nil
 | |
| 	}
 | |
| 	s.closed = true
 | |
| 	if s.XUDP == nil {
 | |
| 		common.Interrupt(s.input)
 | |
| 		common.Close(s.output)
 | |
| 	} else {
 | |
| 		// Stop existing handle(), then trigger writer.Close().
 | |
| 		// Note that s.output may be dispatcher.SizeStatWriter.
 | |
| 		s.input.(*pipe.Reader).ReturnAnError(io.EOF)
 | |
| 		runtime.Gosched()
 | |
| 		// If the error set by ReturnAnError still exists, clear it.
 | |
| 		s.input.(*pipe.Reader).Recover()
 | |
| 		XUDPManager.Lock()
 | |
| 		if s.XUDP.Status == Active {
 | |
| 			s.XUDP.Expire = time.Now().Add(time.Minute)
 | |
| 			s.XUDP.Status = Expiring
 | |
| 			errors.LogDebug(context.Background(), "XUDP put ", s.XUDP.GlobalID)
 | |
| 		}
 | |
| 		XUDPManager.Unlock()
 | |
| 	}
 | |
| 	s.parent.Remove(locked, s.ID)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // NewReader creates a buf.Reader based on the transfer type of this Session.
 | |
| func (s *Session) NewReader(reader *buf.BufferedReader, dest *net.Destination) buf.Reader {
 | |
| 	if s.transferType == protocol.TransferTypeStream {
 | |
| 		return NewStreamReader(reader)
 | |
| 	}
 | |
| 	return NewPacketReader(reader, dest)
 | |
| }
 | |
| 
 | |
| const (
 | |
| 	Initializing = 0
 | |
| 	Active       = 1
 | |
| 	Expiring     = 2
 | |
| )
 | |
| 
 | |
| type XUDP struct {
 | |
| 	GlobalID [8]byte
 | |
| 	Status   uint64
 | |
| 	Expire   time.Time
 | |
| 	Mux      *Session
 | |
| }
 | |
| 
 | |
| func (x *XUDP) Interrupt() {
 | |
| 	common.Interrupt(x.Mux.input)
 | |
| 	common.Close(x.Mux.output)
 | |
| }
 | |
| 
 | |
| var XUDPManager struct {
 | |
| 	sync.Mutex
 | |
| 	Map map[[8]byte]*XUDP
 | |
| }
 | |
| 
 | |
| func init() {
 | |
| 	XUDPManager.Map = make(map[[8]byte]*XUDP)
 | |
| 	go func() {
 | |
| 		for {
 | |
| 			time.Sleep(time.Minute)
 | |
| 			now := time.Now()
 | |
| 			XUDPManager.Lock()
 | |
| 			for id, x := range XUDPManager.Map {
 | |
| 				if x.Status == Expiring && now.After(x.Expire) {
 | |
| 					x.Interrupt()
 | |
| 					delete(XUDPManager.Map, id)
 | |
| 					errors.LogDebug(context.Background(), "XUDP del ", id)
 | |
| 				}
 | |
| 			}
 | |
| 			XUDPManager.Unlock()
 | |
| 		}
 | |
| 	}()
 | |
| }
 |