refactor mux

pull/432/head
Darien Raymond 8 years ago
parent 0a15bceb55
commit 4d682c01e0
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169

@ -42,7 +42,7 @@ func (s *session) closeUplink() {
allDone = s.uplinkClosed && s.downlinkClosed allDone = s.uplinkClosed && s.downlinkClosed
s.Unlock() s.Unlock()
if allDone { if allDone {
go s.parent.remove(s.id) s.parent.remove(s.id)
} }
} }
@ -53,7 +53,7 @@ func (s *session) closeDownlink() {
allDone = s.uplinkClosed && s.downlinkClosed allDone = s.uplinkClosed && s.downlinkClosed
s.Unlock() s.Unlock()
if allDone { if allDone {
go s.parent.remove(s.id) s.parent.remove(s.id)
} }
} }
@ -116,6 +116,7 @@ type Client struct {
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
manager *ClientManager manager *ClientManager
session2Remove chan uint16
} }
var muxCoolDestination = net.TCPDestination(net.DomainAddress("v1.mux.cool"), net.Port(9527)) var muxCoolDestination = net.TCPDestination(net.DomainAddress("v1.mux.cool"), net.Port(9527))
@ -132,21 +133,18 @@ func NewClient(p proxy.Outbound, dialer proxy.Dialer, m *ClientManager) (*Client
cancel: cancel, cancel: cancel,
manager: m, manager: m,
count: 0, count: 0,
session2Remove: make(chan uint16, 16),
} }
go c.fetchOutput() go c.fetchOutput()
go c.monitor()
return c, nil return c, nil
} }
func (m *Client) remove(id uint16) { func (m *Client) remove(id uint16) {
m.access.Lock() select {
defer m.access.Unlock() case m.session2Remove <- id:
default:
delete(m.sessions, id) // Probably not gonna happen.
if len(m.sessions) == 0 {
m.cancel()
m.inboundRay.InboundInput().Close()
go m.manager.onClientFinish()
} }
} }
@ -159,6 +157,31 @@ func (m *Client) Closed() bool {
} }
} }
func (m *Client) monitor() {
for {
select {
case <-m.ctx.Done():
m.cleanup()
return
case id := <-m.session2Remove:
m.access.Lock()
delete(m.sessions, id)
m.access.Unlock()
}
}
}
func (m *Client) cleanup() {
m.access.Lock()
defer m.access.Unlock()
for _, s := range m.sessions {
s.closeUplink()
s.closeDownlink()
s.output.CloseError()
}
}
func fetchInput(ctx context.Context, s *session, output buf.Writer) { func fetchInput(ctx context.Context, s *session, output buf.Writer) {
dest, _ := proxy.TargetFromContext(ctx) dest, _ := proxy.TargetFromContext(ctx)
writer := &Writer{ writer := &Writer{
@ -242,6 +265,8 @@ func pipe(reader *Reader, writer buf.Writer) error {
} }
func (m *Client) fetchOutput() { func (m *Client) fetchOutput() {
defer m.cancel()
reader := NewReader(m.inboundRay.InboundOutput()) reader := NewReader(m.inboundRay.InboundOutput())
for { for {
meta, err := reader.ReadMetadata() meta, err := reader.ReadMetadata()
@ -271,15 +296,6 @@ func (m *Client) fetchOutput() {
break break
} }
} }
// Close all downlinks
m.access.RLock()
for _, s := range m.sessions {
s.closeUplink()
s.closeDownlink()
s.output.CloseError()
}
m.access.RUnlock()
} }
type Server struct { type Server struct {

Loading…
Cancel
Save