mirror of https://github.com/v2ray/v2ray-core
				
				
				
			
		
			
				
	
	
		
			110 lines
		
	
	
		
			3.0 KiB
		
	
	
	
		
			Go
		
	
	
			
		
		
	
	
			110 lines
		
	
	
		
			3.0 KiB
		
	
	
	
		
			Go
		
	
	
| package quic
 | |
| 
 | |
| import (
 | |
| 	"sync"
 | |
| 
 | |
| 	"v2ray.com/core/external/github.com/lucas-clemente/quic-go/internal/protocol"
 | |
| 	"v2ray.com/core/external/github.com/lucas-clemente/quic-go/internal/wire"
 | |
| )
 | |
| 
 | |
| type framer interface {
 | |
| 	QueueControlFrame(wire.Frame)
 | |
| 	AppendControlFrames([]wire.Frame, protocol.ByteCount) ([]wire.Frame, protocol.ByteCount)
 | |
| 
 | |
| 	AddActiveStream(protocol.StreamID)
 | |
| 	AppendStreamFrames([]wire.Frame, protocol.ByteCount) []wire.Frame
 | |
| }
 | |
| 
 | |
| type framerI struct {
 | |
| 	mutex sync.Mutex
 | |
| 
 | |
| 	streamGetter streamGetter
 | |
| 	version      protocol.VersionNumber
 | |
| 
 | |
| 	activeStreams map[protocol.StreamID]struct{}
 | |
| 	streamQueue   []protocol.StreamID
 | |
| 
 | |
| 	controlFrameMutex sync.Mutex
 | |
| 	controlFrames     []wire.Frame
 | |
| }
 | |
| 
 | |
| var _ framer = &framerI{}
 | |
| 
 | |
| func newFramer(
 | |
| 	streamGetter streamGetter,
 | |
| 	v protocol.VersionNumber,
 | |
| ) framer {
 | |
| 	return &framerI{
 | |
| 		streamGetter:  streamGetter,
 | |
| 		activeStreams: make(map[protocol.StreamID]struct{}),
 | |
| 		version:       v,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (f *framerI) QueueControlFrame(frame wire.Frame) {
 | |
| 	f.controlFrameMutex.Lock()
 | |
| 	f.controlFrames = append(f.controlFrames, frame)
 | |
| 	f.controlFrameMutex.Unlock()
 | |
| }
 | |
| 
 | |
| func (f *framerI) AppendControlFrames(frames []wire.Frame, maxLen protocol.ByteCount) ([]wire.Frame, protocol.ByteCount) {
 | |
| 	var length protocol.ByteCount
 | |
| 	f.controlFrameMutex.Lock()
 | |
| 	for len(f.controlFrames) > 0 {
 | |
| 		frame := f.controlFrames[len(f.controlFrames)-1]
 | |
| 		frameLen := frame.Length(f.version)
 | |
| 		if length+frameLen > maxLen {
 | |
| 			break
 | |
| 		}
 | |
| 		frames = append(frames, frame)
 | |
| 		length += frameLen
 | |
| 		f.controlFrames = f.controlFrames[:len(f.controlFrames)-1]
 | |
| 	}
 | |
| 	f.controlFrameMutex.Unlock()
 | |
| 	return frames, length
 | |
| }
 | |
| 
 | |
| func (f *framerI) AddActiveStream(id protocol.StreamID) {
 | |
| 	f.mutex.Lock()
 | |
| 	if _, ok := f.activeStreams[id]; !ok {
 | |
| 		f.streamQueue = append(f.streamQueue, id)
 | |
| 		f.activeStreams[id] = struct{}{}
 | |
| 	}
 | |
| 	f.mutex.Unlock()
 | |
| }
 | |
| 
 | |
| func (f *framerI) AppendStreamFrames(frames []wire.Frame, maxLen protocol.ByteCount) []wire.Frame {
 | |
| 	var length protocol.ByteCount
 | |
| 	f.mutex.Lock()
 | |
| 	// pop STREAM frames, until less than MinStreamFrameSize bytes are left in the packet
 | |
| 	numActiveStreams := len(f.streamQueue)
 | |
| 	for i := 0; i < numActiveStreams; i++ {
 | |
| 		if maxLen-length < protocol.MinStreamFrameSize {
 | |
| 			break
 | |
| 		}
 | |
| 		id := f.streamQueue[0]
 | |
| 		f.streamQueue = f.streamQueue[1:]
 | |
| 		// This should never return an error. Better check it anyway.
 | |
| 		// The stream will only be in the streamQueue, if it enqueued itself there.
 | |
| 		str, err := f.streamGetter.GetOrOpenSendStream(id)
 | |
| 		// The stream can be nil if it completed after it said it had data.
 | |
| 		if str == nil || err != nil {
 | |
| 			delete(f.activeStreams, id)
 | |
| 			continue
 | |
| 		}
 | |
| 		frame, hasMoreData := str.popStreamFrame(maxLen - length)
 | |
| 		if hasMoreData { // put the stream back in the queue (at the end)
 | |
| 			f.streamQueue = append(f.streamQueue, id)
 | |
| 		} else { // no more data to send. Stream is not active any more
 | |
| 			delete(f.activeStreams, id)
 | |
| 		}
 | |
| 		if frame == nil { // can happen if the receiveStream was canceled after it said it had data
 | |
| 			continue
 | |
| 		}
 | |
| 		frames = append(frames, frame)
 | |
| 		length += frame.Length(f.version)
 | |
| 	}
 | |
| 	f.mutex.Unlock()
 | |
| 	return frames
 | |
| }
 |