Browse Source

SplitHTTP: Do not produce too large upload (#3691)

pull/3698/head
mmmray 3 months ago committed by GitHub
parent
commit
160316d53c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 46
      transport/internet/splithttp/dialer.go
  2. 56
      transport/internet/splithttp/splithttp_test.go
  3. 8
      transport/pipe/impl.go
  4. 4
      transport/pipe/writer.go

46
transport/internet/splithttp/dialer.go

@ -227,7 +227,11 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
httpClient := getHTTPClient(ctx, dest, streamSettings) httpClient := getHTTPClient(ctx, dest, streamSettings)
uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(scMaxEachPostBytes.roll())) maxUploadSize := scMaxEachPostBytes.roll()
// WithSizeLimit(0) will still allow single bytes to pass, and a lot of
// code relies on this behavior. Subtract 1 so that together with
// uploadWriter wrapper, exact size limits can be enforced
uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(maxUploadSize - 1))
go func() { go func() {
requestsLimiter := semaphore.New(int(scMaxConcurrentPosts.roll())) requestsLimiter := semaphore.New(int(scMaxConcurrentPosts.roll()))
@ -318,12 +322,13 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
}, },
} }
// necessary in order to send larger chunks in upload writer := uploadWriter{
bufferedUploadPipeWriter := buf.NewBufferedWriter(uploadPipeWriter) uploadPipeWriter,
bufferedUploadPipeWriter.SetBuffered(false) maxUploadSize,
}
conn := splitConn{ conn := splitConn{
writer: bufferedUploadPipeWriter, writer: writer,
reader: lazyDownload, reader: lazyDownload,
remoteAddr: remoteAddr, remoteAddr: remoteAddr,
localAddr: localAddr, localAddr: localAddr,
@ -331,3 +336,34 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
return stat.Connection(&conn), nil return stat.Connection(&conn), nil
} }
// A wrapper around pipe that ensures the size limit is exactly honored.
//
// The MultiBuffer pipe accepts any single WriteMultiBuffer call even if that
// single MultiBuffer exceeds the size limit, and then starts blocking on the
// next WriteMultiBuffer call. This means that ReadMultiBuffer can return more
// bytes than the size limit. We work around this by splitting a potentially
// too large write up into multiple.
type uploadWriter struct {
*pipe.Writer
maxLen int32
}
func (w uploadWriter) Write(b []byte) (int, error) {
capacity := int(w.maxLen - w.Len())
if capacity > 0 && capacity < len(b) {
b = b[:capacity]
}
buffer := buf.New()
n, err := buffer.Write(b)
if err != nil {
return 0, err
}
err = w.WriteMultiBuffer([]*buf.Buffer{buffer})
if err != nil {
return 0, err
}
return n, nil
}

56
transport/internet/splithttp/splithttp_test.go

@ -388,7 +388,7 @@ func Test_queryString(t *testing.T) {
ctx := context.Background() ctx := context.Background()
streamSettings := &internet.MemoryStreamConfig{ streamSettings := &internet.MemoryStreamConfig{
ProtocolName: "splithttp", ProtocolName: "splithttp",
ProtocolSettings: &Config{Path: "sh"}, ProtocolSettings: &Config{Path: "sh?ed=2048"},
} }
conn, err := Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), listenPort), streamSettings) conn, err := Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), listenPort), streamSettings)
@ -407,3 +407,57 @@ func Test_queryString(t *testing.T) {
common.Must(conn.Close()) common.Must(conn.Close())
common.Must(listen.Close()) common.Must(listen.Close())
} }
func Test_maxUpload(t *testing.T) {
listenPort := tcp.PickPort()
streamSettings := &internet.MemoryStreamConfig{
ProtocolName: "splithttp",
ProtocolSettings: &Config{
Path: "/sh",
ScMaxEachPostBytes: &RandRangeConfig{
From: 100,
To: 100,
},
},
}
var uploadSize int
listen, err := ListenSH(context.Background(), net.LocalHostIP, listenPort, streamSettings, func(conn stat.Connection) {
go func(c stat.Connection) {
defer c.Close()
var b [1024]byte
c.SetReadDeadline(time.Now().Add(2 * time.Second))
n, err := c.Read(b[:])
if err != nil {
return
}
uploadSize = n
common.Must2(c.Write([]byte("Response")))
}(conn)
})
common.Must(err)
ctx := context.Background()
conn, err := Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), listenPort), streamSettings)
// send a slightly too large upload
var upload [101]byte
_, err = conn.Write(upload[:])
common.Must(err)
var b [1024]byte
n, _ := io.ReadFull(conn, b[:])
fmt.Println("string is", n)
if string(b[:n]) != "Response" {
t.Error("response: ", string(b[:n]))
}
common.Must(conn.Close())
if uploadSize > 100 || uploadSize == 0 {
t.Error("incorrect upload size: ", uploadSize)
}
common.Must(listen.Close())
}

8
transport/pipe/impl.go

@ -46,6 +46,14 @@ var (
errSlowDown = errors.New("slow down") errSlowDown = errors.New("slow down")
) )
func (p *pipe) Len() int32 {
data := p.data
if data == nil {
return 0
}
return data.Len()
}
func (p *pipe) getState(forRead bool) error { func (p *pipe) getState(forRead bool) error {
switch p.state { switch p.state {
case open: case open:

4
transport/pipe/writer.go

@ -19,6 +19,10 @@ func (w *Writer) Close() error {
return w.pipe.Close() return w.pipe.Close()
} }
func (w *Writer) Len() int32 {
return w.pipe.Len()
}
// Interrupt implements common.Interruptible. // Interrupt implements common.Interruptible.
func (w *Writer) Interrupt() { func (w *Writer) Interrupt() {
w.pipe.Interrupt() w.pipe.Interrupt()

Loading…
Cancel
Save