better locking in udp server

pull/362/head
Darien Raymond 2017-01-06 11:40:59 +01:00
parent f2fcc90de2
commit 43cc81e5a8
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
1 changed files with 20 additions and 15 deletions

View File

@ -130,29 +130,34 @@ func (v *Server) locateExistingAndDispatch(name string, payload *buf.Buffer) boo
return false return false
} }
func (v *Server) getInboundRay(dest string, session *proxy.SessionInfo) (*TimedInboundRay, bool) {
v.Lock()
defer v.Unlock()
if entry, found := v.conns[dest]; found {
return entry, true
}
log.Info("UDP|Server: establishing new connection for ", dest)
inboundRay := v.packetDispatcher.DispatchToOutbound(session)
return NewTimedInboundRay(dest, inboundRay, v), false
}
func (v *Server) Dispatch(session *proxy.SessionInfo, payload *buf.Buffer, callback ResponseCallback) { func (v *Server) Dispatch(session *proxy.SessionInfo, payload *buf.Buffer, callback ResponseCallback) {
source := session.Source source := session.Source
destination := session.Destination destination := session.Destination
// TODO: Add user to destString // TODO: Add user to destString
destString := source.String() + "-" + destination.String() destString := source.String() + "-" + destination.String()
log.Debug("UDP Server: Dispatch request: ", destString) log.Debug("UDP|Server: Dispatch request: ", destString)
if v.locateExistingAndDispatch(destString, payload) { inboundRay, existing := v.getInboundRay(destString, session)
return outputStream := inboundRay.InboundInput()
}
log.Info("UDP Server: establishing new connection for ", destString)
inboundRay := v.packetDispatcher.DispatchToOutbound(session)
timedInboundRay := NewTimedInboundRay(destString, inboundRay, v)
outputStream := timedInboundRay.InboundInput()
if outputStream != nil { if outputStream != nil {
outputStream.Write(payload) outputStream.Write(payload)
} }
if !existing {
v.Lock() go v.handleConnection(inboundRay, source, callback)
v.conns[destString] = timedInboundRay }
v.Unlock()
go v.handleConnection(timedInboundRay, source, callback)
} }
func (v *Server) handleConnection(inboundRay *TimedInboundRay, source v2net.Destination, callback ResponseCallback) { func (v *Server) handleConnection(inboundRay *TimedInboundRay, source v2net.Destination, callback ResponseCallback) {
@ -161,7 +166,7 @@ func (v *Server) handleConnection(inboundRay *TimedInboundRay, source v2net.Dest
if inputStream == nil { if inputStream == nil {
break break
} }
data, err := inboundRay.InboundOutput().Read() data, err := inputStream.Read()
if err != nil { if err != nil {
break break
} }