From ac5ab6786251d035c0fa20dd37165e4010f1401c Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Sat, 28 Oct 2017 21:28:50 +0200 Subject: [PATCH] refactor --- transport/internet/udp/hub.go | 45 ++++++++++++++++------------------- 1 file changed, 20 insertions(+), 25 deletions(-) diff --git a/transport/internet/udp/hub.go b/transport/internet/udp/hub.go index f54c5e70..d5382c15 100644 --- a/transport/internet/udp/hub.go +++ b/transport/internet/udp/hub.go @@ -38,15 +38,16 @@ func NewPayloadQueue(option ListenOption) *PayloadQueue { return queue } -func (v *PayloadQueue) Enqueue(payload Payload) { - size := len(v.queue) +// Enqueue adds the payload to the end of this queue. +func (q *PayloadQueue) Enqueue(payload Payload) { + size := len(q.queue) idx := 0 if size > 1 { idx = dice.Roll(size) } for i := 0; i < size; i++ { select { - case v.queue[idx%size] <- payload: + case q.queue[idx%size] <- payload: return default: idx++ @@ -54,14 +55,14 @@ func (v *PayloadQueue) Enqueue(payload Payload) { } } -func (v *PayloadQueue) Dequeue(queue <-chan Payload) { +func (q *PayloadQueue) Dequeue(queue <-chan Payload) { for payload := range queue { - v.callback(payload.payload, payload.source, payload.originalDest) + q.callback(payload.payload, payload.source, payload.originalDest) } } -func (v *PayloadQueue) Close() { - for _, queue := range v.queue { +func (q *PayloadQueue) Close() { + for _, queue := range q.queue { close(queue) } } @@ -116,19 +117,19 @@ func ListenUDP(address net.Address, port net.Port, option ListenOption) (*Hub, e return hub, nil } -func (v *Hub) Close() { - v.cancel() - v.conn.Close() +func (h *Hub) Close() { + h.cancel() + h.conn.Close() } -func (v *Hub) WriteTo(payload []byte, dest net.Destination) (int, error) { - return v.conn.WriteToUDP(payload, &net.UDPAddr{ +func (h *Hub) WriteTo(payload []byte, dest net.Destination) (int, error) { + return h.conn.WriteToUDP(payload, &net.UDPAddr{ IP: dest.Address.IP(), Port: int(dest.Port), }) } -func (v *Hub) start(ctx context.Context) { +func (h *Hub) start(ctx context.Context) { oobBytes := make([]byte, 256) L: for { @@ -142,7 +143,7 @@ L: var noob int var addr *net.UDPAddr err := buffer.AppendSupplier(func(b []byte) (int, error) { - n, nb, _, a, e := ReadUDPMsg(v.conn, b, oobBytes) + n, nb, _, a, e := ReadUDPMsg(h.conn, b, oobBytes) noob = nb addr = a return n, e @@ -158,20 +159,14 @@ L: payload: buffer, } payload.source = net.UDPDestination(net.IPAddress(addr.IP), net.Port(addr.Port)) - if v.option.ReceiveOriginalDest && noob > 0 { + if h.option.ReceiveOriginalDest && noob > 0 { payload.originalDest = RetrieveOriginalDest(oobBytes[:noob]) } - v.queue.Enqueue(payload) + h.queue.Enqueue(payload) } - v.queue.Close() + h.queue.Close() } -// Connection returns the net.Conn underneath this hub. -// Private: Visible for testing only -func (v *Hub) Connection() net.Conn { - return v.conn -} - -func (v *Hub) Addr() net.Addr { - return v.conn.LocalAddr() +func (h *Hub) Addr() net.Addr { + return h.conn.LocalAddr() }