mirror of https://github.com/XTLS/Xray-core
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
156 lines
3.1 KiB
156 lines
3.1 KiB
package splithttp |
|
|
|
// upload_queue is a specialized priorityqueue + channel to reorder generic |
|
// packets by a sequence number |
|
|
|
import ( |
|
"container/heap" |
|
"io" |
|
"runtime" |
|
"sync" |
|
|
|
"github.com/xtls/xray-core/common/errors" |
|
) |
|
|
|
type Packet struct { |
|
Reader io.ReadCloser |
|
Payload []byte |
|
Seq uint64 |
|
} |
|
|
|
type uploadQueue struct { |
|
reader io.ReadCloser |
|
pushedPackets chan Packet |
|
writeCloseMutex sync.Mutex |
|
heap uploadHeap |
|
nextSeq uint64 |
|
closed bool |
|
maxPackets int |
|
} |
|
|
|
func NewUploadQueue(maxPackets int) *uploadQueue { |
|
return &uploadQueue{ |
|
pushedPackets: make(chan Packet, maxPackets), |
|
heap: uploadHeap{}, |
|
nextSeq: 0, |
|
closed: false, |
|
maxPackets: maxPackets, |
|
} |
|
} |
|
|
|
func (h *uploadQueue) Push(p Packet) error { |
|
h.writeCloseMutex.Lock() |
|
defer h.writeCloseMutex.Unlock() |
|
|
|
runtime.Gosched() |
|
if h.reader != nil && p.Reader != nil { |
|
p.Reader.Close() |
|
return errors.New("h.reader already exists") |
|
} |
|
|
|
if h.closed { |
|
if p.Reader != nil { |
|
p.Reader.Close() |
|
} |
|
return errors.New("splithttp packet queue closed") |
|
} |
|
|
|
h.pushedPackets <- p |
|
return nil |
|
} |
|
|
|
func (h *uploadQueue) Close() error { |
|
h.writeCloseMutex.Lock() |
|
defer h.writeCloseMutex.Unlock() |
|
|
|
if !h.closed { |
|
h.closed = true |
|
close(h.pushedPackets) |
|
} |
|
runtime.Gosched() |
|
if h.reader != nil { |
|
return h.reader.Close() |
|
} |
|
return nil |
|
} |
|
|
|
func (h *uploadQueue) Read(b []byte) (int, error) { |
|
if h.reader != nil { |
|
return h.reader.Read(b) |
|
} |
|
|
|
if h.closed { |
|
return 0, io.EOF |
|
} |
|
|
|
if len(h.heap) == 0 { |
|
packet, more := <-h.pushedPackets |
|
if !more { |
|
return 0, io.EOF |
|
} |
|
if packet.Reader != nil { |
|
h.reader = packet.Reader |
|
return h.reader.Read(b) |
|
} |
|
heap.Push(&h.heap, packet) |
|
} |
|
|
|
for len(h.heap) > 0 { |
|
packet := heap.Pop(&h.heap).(Packet) |
|
n := 0 |
|
|
|
if packet.Seq == h.nextSeq { |
|
copy(b, packet.Payload) |
|
n = min(len(b), len(packet.Payload)) |
|
|
|
if n < len(packet.Payload) { |
|
// partial read |
|
packet.Payload = packet.Payload[n:] |
|
heap.Push(&h.heap, packet) |
|
} else { |
|
h.nextSeq = packet.Seq + 1 |
|
} |
|
|
|
return n, nil |
|
} |
|
|
|
// misordered packet |
|
if packet.Seq > h.nextSeq { |
|
if len(h.heap) > h.maxPackets { |
|
// the "reassembly buffer" is too large, and we want to |
|
// constrain memory usage somehow. let's tear down the |
|
// connection, and hope the application retries. |
|
return 0, errors.New("packet queue is too large") |
|
} |
|
heap.Push(&h.heap, packet) |
|
packet2, more := <-h.pushedPackets |
|
if !more { |
|
return 0, io.EOF |
|
} |
|
heap.Push(&h.heap, packet2) |
|
} |
|
} |
|
|
|
return 0, nil |
|
} |
|
|
|
// heap code directly taken from https://pkg.go.dev/container/heap |
|
type uploadHeap []Packet |
|
|
|
func (h uploadHeap) Len() int { return len(h) } |
|
func (h uploadHeap) Less(i, j int) bool { return h[i].Seq < h[j].Seq } |
|
func (h uploadHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } |
|
|
|
func (h *uploadHeap) Push(x any) { |
|
// Push and Pop use pointer receivers because they modify the slice's length, |
|
// not just its contents. |
|
*h = append(*h, x.(Packet)) |
|
} |
|
|
|
func (h *uploadHeap) Pop() any { |
|
old := *h |
|
n := len(old) |
|
x := old[n-1] |
|
*h = old[0 : n-1] |
|
return x |
|
}
|
|
|