mirror of https://github.com/v2ray/v2ray-core
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
401 lines
9.2 KiB
401 lines
9.2 KiB
package mux |
|
|
|
import ( |
|
"context" |
|
"io" |
|
"sync" |
|
"time" |
|
|
|
"v2ray.com/core/common" |
|
"v2ray.com/core/common/buf" |
|
"v2ray.com/core/common/errors" |
|
"v2ray.com/core/common/net" |
|
"v2ray.com/core/common/protocol" |
|
"v2ray.com/core/common/session" |
|
"v2ray.com/core/common/signal/done" |
|
"v2ray.com/core/common/task" |
|
"v2ray.com/core/proxy" |
|
"v2ray.com/core/transport" |
|
"v2ray.com/core/transport/internet" |
|
"v2ray.com/core/transport/pipe" |
|
) |
|
|
|
type ClientManager struct { |
|
Picker WorkerPicker |
|
} |
|
|
|
func (m *ClientManager) Dispatch(ctx context.Context, link *transport.Link) error { |
|
for i := 0; i < 16; i++ { |
|
worker, err := m.Picker.PickAvailable() |
|
if err != nil { |
|
return err |
|
} |
|
if worker.Dispatch(ctx, link) { |
|
return nil |
|
} |
|
} |
|
|
|
return newError("unable to find an available mux client").AtWarning() |
|
} |
|
|
|
type WorkerPicker interface { |
|
PickAvailable() (*ClientWorker, error) |
|
} |
|
|
|
type IncrementalWorkerPicker struct { |
|
Factory ClientWorkerFactory |
|
|
|
access sync.Mutex |
|
workers []*ClientWorker |
|
cleanupTask *task.Periodic |
|
} |
|
|
|
func (p *IncrementalWorkerPicker) cleanupFunc() error { |
|
p.access.Lock() |
|
defer p.access.Unlock() |
|
|
|
if len(p.workers) == 0 { |
|
return newError("no worker") |
|
} |
|
|
|
p.cleanup() |
|
return nil |
|
} |
|
|
|
func (p *IncrementalWorkerPicker) cleanup() { |
|
var activeWorkers []*ClientWorker |
|
for _, w := range p.workers { |
|
if !w.Closed() { |
|
activeWorkers = append(activeWorkers, w) |
|
} |
|
} |
|
p.workers = activeWorkers |
|
} |
|
|
|
func (p *IncrementalWorkerPicker) findAvailable() int { |
|
for idx, w := range p.workers { |
|
if !w.IsFull() { |
|
return idx |
|
} |
|
} |
|
|
|
return -1 |
|
} |
|
|
|
func (p *IncrementalWorkerPicker) pickInternal() (*ClientWorker, error, bool) { |
|
p.access.Lock() |
|
defer p.access.Unlock() |
|
|
|
idx := p.findAvailable() |
|
if idx >= 0 { |
|
n := len(p.workers) |
|
if n > 1 && idx != n-1 { |
|
p.workers[n-1], p.workers[idx] = p.workers[idx], p.workers[n-1] |
|
} |
|
return p.workers[idx], nil, false |
|
} |
|
|
|
p.cleanup() |
|
|
|
worker, err := p.Factory.Create() |
|
if err != nil { |
|
return nil, err, false |
|
} |
|
p.workers = append(p.workers, worker) |
|
|
|
if p.cleanupTask == nil { |
|
p.cleanupTask = &task.Periodic{ |
|
Interval: time.Second * 30, |
|
Execute: p.cleanupFunc, |
|
} |
|
} |
|
|
|
return worker, nil, true |
|
} |
|
|
|
func (p *IncrementalWorkerPicker) PickAvailable() (*ClientWorker, error) { |
|
worker, err, start := p.pickInternal() |
|
if start { |
|
common.Must(p.cleanupTask.Start()) |
|
} |
|
|
|
return worker, err |
|
} |
|
|
|
type ClientWorkerFactory interface { |
|
Create() (*ClientWorker, error) |
|
} |
|
|
|
type DialingWorkerFactory struct { |
|
Proxy proxy.Outbound |
|
Dialer internet.Dialer |
|
Strategy ClientStrategy |
|
} |
|
|
|
func (f *DialingWorkerFactory) Create() (*ClientWorker, error) { |
|
opts := []pipe.Option{pipe.WithSizeLimit(64 * 1024)} |
|
uplinkReader, upLinkWriter := pipe.New(opts...) |
|
downlinkReader, downlinkWriter := pipe.New(opts...) |
|
|
|
c, err := NewClientWorker(transport.Link{ |
|
Reader: downlinkReader, |
|
Writer: upLinkWriter, |
|
}, f.Strategy) |
|
|
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
go func(p proxy.Outbound, d internet.Dialer, c common.Closable) { |
|
ctx := session.ContextWithOutbound(context.Background(), &session.Outbound{ |
|
Target: net.TCPDestination(muxCoolAddress, muxCoolPort), |
|
}) |
|
ctx, cancel := context.WithCancel(ctx) |
|
|
|
if err := p.Process(ctx, &transport.Link{Reader: uplinkReader, Writer: downlinkWriter}, d); err != nil { |
|
errors.New("failed to handler mux client connection").Base(err).WriteToLog() |
|
} |
|
common.Must(c.Close()) |
|
cancel() |
|
}(f.Proxy, f.Dialer, c.done) |
|
|
|
return c, nil |
|
} |
|
|
|
type ClientStrategy struct { |
|
MaxConcurrency uint32 |
|
MaxConnection uint32 |
|
} |
|
|
|
type ClientWorker struct { |
|
sessionManager *SessionManager |
|
link transport.Link |
|
done *done.Instance |
|
strategy ClientStrategy |
|
} |
|
|
|
var muxCoolAddress = net.DomainAddress("v1.mux.cool") |
|
var muxCoolPort = net.Port(9527) |
|
|
|
// NewClientWorker creates a new mux.Client. |
|
func NewClientWorker(stream transport.Link, s ClientStrategy) (*ClientWorker, error) { |
|
c := &ClientWorker{ |
|
sessionManager: NewSessionManager(), |
|
link: stream, |
|
done: done.New(), |
|
strategy: s, |
|
} |
|
|
|
go c.fetchOutput() |
|
go c.monitor() |
|
|
|
return c, nil |
|
} |
|
|
|
func (m *ClientWorker) TotalConnections() uint32 { |
|
return uint32(m.sessionManager.Count()) |
|
} |
|
|
|
func (m *ClientWorker) ActiveConnections() uint32 { |
|
return uint32(m.sessionManager.Size()) |
|
} |
|
|
|
// Closed returns true if this Client is closed. |
|
func (m *ClientWorker) Closed() bool { |
|
return m.done.Done() |
|
} |
|
|
|
func (m *ClientWorker) monitor() { |
|
timer := time.NewTicker(time.Second * 16) |
|
defer timer.Stop() |
|
|
|
for { |
|
select { |
|
case <-m.done.Wait(): |
|
m.sessionManager.Close() |
|
common.Close(m.link.Writer) // nolint: errcheck |
|
pipe.CloseError(m.link.Reader) // nolint: errcheck |
|
return |
|
case <-timer.C: |
|
size := m.sessionManager.Size() |
|
if size == 0 && m.sessionManager.CloseIfNoSession() { |
|
common.Must(m.done.Close()) |
|
} |
|
} |
|
} |
|
} |
|
|
|
func writeFirstPayload(reader buf.Reader, writer *Writer) error { |
|
err := buf.CopyOnceTimeout(reader, writer, time.Millisecond*100) |
|
if err == buf.ErrNotTimeoutReader || err == buf.ErrReadTimeout { |
|
return writer.WriteMultiBuffer(buf.MultiBuffer{}) |
|
} |
|
|
|
if err != nil { |
|
return err |
|
} |
|
|
|
return nil |
|
} |
|
|
|
func fetchInput(ctx context.Context, s *Session, output buf.Writer) { |
|
dest := session.OutboundFromContext(ctx).Target |
|
transferType := protocol.TransferTypeStream |
|
if dest.Network == net.Network_UDP { |
|
transferType = protocol.TransferTypePacket |
|
} |
|
s.transferType = transferType |
|
writer := NewWriter(s.ID, dest, output, transferType) |
|
defer s.Close() // nolint: errcheck |
|
defer writer.Close() // nolint: errcheck |
|
|
|
newError("dispatching request to ", dest).WriteToLog(session.ExportIDToError(ctx)) |
|
if err := writeFirstPayload(s.input, writer); err != nil { |
|
newError("failed to write first payload").Base(err).WriteToLog(session.ExportIDToError(ctx)) |
|
writer.hasError = true |
|
pipe.CloseError(s.input) |
|
return |
|
} |
|
|
|
if err := buf.Copy(s.input, writer); err != nil { |
|
newError("failed to fetch all input").Base(err).WriteToLog(session.ExportIDToError(ctx)) |
|
writer.hasError = true |
|
pipe.CloseError(s.input) |
|
return |
|
} |
|
} |
|
|
|
func (m *ClientWorker) IsClosing() bool { |
|
sm := m.sessionManager |
|
if m.strategy.MaxConnection > 0 && sm.Count() >= int(m.strategy.MaxConnection) { |
|
return true |
|
} |
|
return false |
|
} |
|
|
|
func (m *ClientWorker) IsFull() bool { |
|
if m.IsClosing() || m.Closed() { |
|
return true |
|
} |
|
|
|
sm := m.sessionManager |
|
if m.strategy.MaxConcurrency > 0 && sm.Size() >= int(m.strategy.MaxConcurrency) { |
|
return true |
|
} |
|
return false |
|
} |
|
|
|
func (m *ClientWorker) Dispatch(ctx context.Context, link *transport.Link) bool { |
|
if m.IsFull() || m.Closed() { |
|
return false |
|
} |
|
|
|
sm := m.sessionManager |
|
s := sm.Allocate() |
|
if s == nil { |
|
return false |
|
} |
|
s.input = link.Reader |
|
s.output = link.Writer |
|
go fetchInput(ctx, s, m.link.Writer) |
|
return true |
|
} |
|
|
|
func (m *ClientWorker) handleStatueKeepAlive(meta *FrameMetadata, reader *buf.BufferedReader) error { |
|
if meta.Option.Has(OptionData) { |
|
return buf.Copy(NewStreamReader(reader), buf.Discard) |
|
} |
|
return nil |
|
} |
|
|
|
func (m *ClientWorker) handleStatusNew(meta *FrameMetadata, reader *buf.BufferedReader) error { |
|
if meta.Option.Has(OptionData) { |
|
return buf.Copy(NewStreamReader(reader), buf.Discard) |
|
} |
|
return nil |
|
} |
|
|
|
func (m *ClientWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.BufferedReader) error { |
|
if !meta.Option.Has(OptionData) { |
|
return nil |
|
} |
|
|
|
s, found := m.sessionManager.Get(meta.SessionID) |
|
if !found { |
|
// Notify remote peer to close this session. |
|
closingWriter := NewResponseWriter(meta.SessionID, m.link.Writer, protocol.TransferTypeStream) |
|
closingWriter.Close() |
|
|
|
return buf.Copy(NewStreamReader(reader), buf.Discard) |
|
} |
|
|
|
rr := s.NewReader(reader) |
|
err := buf.Copy(rr, s.output) |
|
if err != nil && buf.IsWriteError(err) { |
|
newError("failed to write to downstream. closing session ", s.ID).Base(err).WriteToLog() |
|
|
|
// Notify remote peer to close this session. |
|
closingWriter := NewResponseWriter(meta.SessionID, m.link.Writer, protocol.TransferTypeStream) |
|
closingWriter.Close() |
|
|
|
drainErr := buf.Copy(rr, buf.Discard) |
|
pipe.CloseError(s.input) |
|
s.Close() |
|
return drainErr |
|
} |
|
|
|
return err |
|
} |
|
|
|
func (m *ClientWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error { |
|
if s, found := m.sessionManager.Get(meta.SessionID); found { |
|
if meta.Option.Has(OptionError) { |
|
pipe.CloseError(s.input) |
|
pipe.CloseError(s.output) |
|
} |
|
s.Close() |
|
} |
|
if meta.Option.Has(OptionData) { |
|
return buf.Copy(NewStreamReader(reader), buf.Discard) |
|
} |
|
return nil |
|
} |
|
|
|
func (m *ClientWorker) fetchOutput() { |
|
defer func() { |
|
common.Must(m.done.Close()) |
|
}() |
|
|
|
reader := &buf.BufferedReader{Reader: m.link.Reader} |
|
|
|
var meta FrameMetadata |
|
for { |
|
err := meta.Unmarshal(reader) |
|
if err != nil { |
|
if errors.Cause(err) != io.EOF { |
|
newError("failed to read metadata").Base(err).WriteToLog() |
|
} |
|
break |
|
} |
|
|
|
switch meta.SessionStatus { |
|
case SessionStatusKeepAlive: |
|
err = m.handleStatueKeepAlive(&meta, reader) |
|
case SessionStatusEnd: |
|
err = m.handleStatusEnd(&meta, reader) |
|
case SessionStatusNew: |
|
err = m.handleStatusNew(&meta, reader) |
|
case SessionStatusKeep: |
|
err = m.handleStatusKeep(&meta, reader) |
|
default: |
|
status := meta.SessionStatus |
|
newError("unknown status: ", status).AtError().WriteToLog() |
|
return |
|
} |
|
|
|
if err != nil { |
|
newError("failed to process data").Base(err).WriteToLog() |
|
return |
|
} |
|
} |
|
}
|
|
|