simplify udp payload queue

pull/314/head
Darien Raymond 8 years ago
parent 1a1383c2ea
commit 292176c57f
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169

@ -23,13 +23,11 @@ type UDPPayloadHandler func(*alloc.Buffer, *proxy.SessionInfo)
type UDPPayloadQueue struct { type UDPPayloadQueue struct {
queue []chan UDPPayload queue []chan UDPPayload
callback UDPPayloadHandler callback UDPPayloadHandler
cancel *signal.CancelSignal
} }
func NewUDPPayloadQueue(option ListenOption) *UDPPayloadQueue { func NewUDPPayloadQueue(option ListenOption) *UDPPayloadQueue {
queue := &UDPPayloadQueue{ queue := &UDPPayloadQueue{
callback: option.Callback, callback: option.Callback,
cancel: signal.NewCloseSignal(),
queue: make([]chan UDPPayload, option.Concurrency), queue: make([]chan UDPPayload, option.Concurrency),
} }
for i := range queue.queue { for i := range queue.queue {
@ -56,24 +54,15 @@ func (this *UDPPayloadQueue) Enqueue(payload UDPPayload) {
} }
func (this *UDPPayloadQueue) Dequeue(queue <-chan UDPPayload) { func (this *UDPPayloadQueue) Dequeue(queue <-chan UDPPayload) {
this.cancel.WaitThread() for payload := range queue {
defer this.cancel.FinishThread()
for !this.cancel.Cancelled() {
payload, open := <-queue
if !open {
return
}
this.callback(payload.payload, payload.session) this.callback(payload.payload, payload.session)
} }
} }
func (this *UDPPayloadQueue) Close() { func (this *UDPPayloadQueue) Close() {
this.cancel.Cancel()
for _, queue := range this.queue { for _, queue := range this.queue {
close(queue) close(queue)
} }
this.cancel.WaitForDone()
} }
type ListenOption struct { type ListenOption struct {

Loading…
Cancel
Save