Xray-core/transport/internet/splithttp/upload_queue.go

157 lines
3.1 KiB
Go
Raw Normal View History

2024-06-18 05:36:36 +00:00
package splithttp
// upload_queue is a specialized priorityqueue + channel to reorder generic
// packets by a sequence number
import (
"container/heap"
"io"
"runtime"
"sync"
"github.com/xtls/xray-core/common/errors"
2024-06-18 05:36:36 +00:00
)
type Packet struct {
Reader io.ReadCloser
2024-06-18 05:36:36 +00:00
Payload []byte
Seq uint64
}
type uploadQueue struct {
reader io.ReadCloser
pushedPackets chan Packet
writeCloseMutex sync.Mutex
heap uploadHeap
nextSeq uint64
closed bool
maxPackets int
2024-06-18 05:36:36 +00:00
}
func NewUploadQueue(maxPackets int) *uploadQueue {
return &uploadQueue{
2024-06-18 05:36:36 +00:00
pushedPackets: make(chan Packet, maxPackets),
heap: uploadHeap{},
nextSeq: 0,
closed: false,
maxPackets: maxPackets,
}
}
func (h *uploadQueue) Push(p Packet) error {
h.writeCloseMutex.Lock()
defer h.writeCloseMutex.Unlock()
runtime.Gosched()
if h.reader != nil && p.Reader != nil {
p.Reader.Close()
return errors.New("h.reader already exists")
}
2024-06-18 05:36:36 +00:00
if h.closed {
if p.Reader != nil {
p.Reader.Close()
}
return errors.New("packet queue closed")
2024-06-18 05:36:36 +00:00
}
h.pushedPackets <- p
return nil
}
func (h *uploadQueue) Close() error {
h.writeCloseMutex.Lock()
defer h.writeCloseMutex.Unlock()
if !h.closed {
h.closed = true
close(h.pushedPackets)
}
runtime.Gosched()
if h.reader != nil {
return h.reader.Close()
}
2024-06-18 05:36:36 +00:00
return nil
}
func (h *uploadQueue) Read(b []byte) (int, error) {
if h.reader != nil {
return h.reader.Read(b)
}
if h.closed {
2024-06-18 05:36:36 +00:00
return 0, io.EOF
}
if len(h.heap) == 0 {
packet, more := <-h.pushedPackets
if !more {
return 0, io.EOF
}
if packet.Reader != nil {
h.reader = packet.Reader
return h.reader.Read(b)
}
heap.Push(&h.heap, packet)
}
2024-06-18 05:36:36 +00:00
for len(h.heap) > 0 {
2024-06-18 05:36:36 +00:00
packet := heap.Pop(&h.heap).(Packet)
n := 0
if packet.Seq == h.nextSeq {
copy(b, packet.Payload)
n = min(len(b), len(packet.Payload))
if n < len(packet.Payload) {
// partial read
packet.Payload = packet.Payload[n:]
heap.Push(&h.heap, packet)
} else {
h.nextSeq = packet.Seq + 1
}
return n, nil
}
// misordered packet
if packet.Seq > h.nextSeq {
if len(h.heap) > h.maxPackets {
// the "reassembly buffer" is too large, and we want to
// constrain memory usage somehow. let's tear down the
// connection, and hope the application retries.
return 0, errors.New("packet queue is too large")
2024-06-18 05:36:36 +00:00
}
heap.Push(&h.heap, packet)
packet2, more := <-h.pushedPackets
if !more {
return 0, io.EOF
}
heap.Push(&h.heap, packet2)
2024-06-18 05:36:36 +00:00
}
}
return 0, nil
}
// heap code directly taken from https://pkg.go.dev/container/heap
type uploadHeap []Packet
func (h uploadHeap) Len() int { return len(h) }
func (h uploadHeap) Less(i, j int) bool { return h[i].Seq < h[j].Seq }
func (h uploadHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *uploadHeap) Push(x any) {
// Push and Pop use pointer receivers because they modify the slice's length,
// not just its contents.
*h = append(*h, x.(Packet))
}
func (h *uploadHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}