mirror of https://github.com/prometheus/prometheus
Browse Source
Part of: https://github.com/prometheus/prometheus/issues/4517 and https://github.com/improbable-eng/thanos/issues/488 Changes: * Extended protobuf for chunked remote read and negotation. * Added checksumed, chunked Writer/Reader. * Added Server side implementation for chunked streamed remote-read. Signed-off-by: Bartek Plotka <bwplotka@gmail.com>pull/5927/head
Bartek Płotka
5 years ago
committed by
GitHub
12 changed files with 1919 additions and 207 deletions
@ -0,0 +1,154 @@
|
||||
// Copyright 2019 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
package remote |
||||
|
||||
import ( |
||||
"bufio" |
||||
"encoding/binary" |
||||
"hash" |
||||
"hash/crc32" |
||||
"io" |
||||
"net/http" |
||||
|
||||
"github.com/gogo/protobuf/proto" |
||||
"github.com/pkg/errors" |
||||
) |
||||
|
||||
// DefaultChunkedReadLimit is the default value for the maximum size of the protobuf frame client allows.
|
||||
// 50MB is the default. This is equivalent to ~100k full XOR chunks and average labelset.
|
||||
const DefaultChunkedReadLimit = 5e+7 |
||||
|
||||
// The table gets initialized with sync.Once but may still cause a race
|
||||
// with any other use of the crc32 package anywhere. Thus we initialize it
|
||||
// before.
|
||||
var castagnoliTable *crc32.Table |
||||
|
||||
func init() { |
||||
castagnoliTable = crc32.MakeTable(crc32.Castagnoli) |
||||
} |
||||
|
||||
// ChunkedWriter is an io.Writer wrapper that allows streaming by adding uvarint delimiter before each write in a form
|
||||
// of length of the corresponded byte array.
|
||||
type ChunkedWriter struct { |
||||
writer io.Writer |
||||
flusher http.Flusher |
||||
|
||||
crc32 hash.Hash32 |
||||
} |
||||
|
||||
// NewChunkedWriter constructs a ChunkedWriter.
|
||||
func NewChunkedWriter(w io.Writer, f http.Flusher) *ChunkedWriter { |
||||
return &ChunkedWriter{writer: w, flusher: f, crc32: crc32.New(castagnoliTable)} |
||||
} |
||||
|
||||
// Write writes given bytes to the stream and flushes it.
|
||||
// Each frame includes:
|
||||
//
|
||||
// 1. uvarint for the size of the data frame.
|
||||
// 2. big-endian uint32 for the Castagnoli polynomial CRC-32 checksum of the data frame.
|
||||
// 3. the bytes of the given data.
|
||||
//
|
||||
// Write returns number of sent bytes for a given buffer. The number does not include delimiter and checksum bytes.
|
||||
func (w *ChunkedWriter) Write(b []byte) (int, error) { |
||||
if len(b) == 0 { |
||||
return 0, nil |
||||
} |
||||
|
||||
var buf [binary.MaxVarintLen64]byte |
||||
v := binary.PutUvarint(buf[:], uint64(len(b))) |
||||
if _, err := w.writer.Write(buf[:v]); err != nil { |
||||
return 0, err |
||||
} |
||||
|
||||
w.crc32.Reset() |
||||
if _, err := w.crc32.Write(b); err != nil { |
||||
return 0, err |
||||
} |
||||
|
||||
if err := binary.Write(w.writer, binary.BigEndian, w.crc32.Sum32()); err != nil { |
||||
return 0, err |
||||
} |
||||
|
||||
n, err := w.writer.Write(b) |
||||
if err != nil { |
||||
return n, err |
||||
} |
||||
|
||||
w.flusher.Flush() |
||||
return n, nil |
||||
} |
||||
|
||||
// ChunkedReader is a buffered reader that expects uvarint delimiter and checksum before each message.
|
||||
// It will allocate as much as the biggest frame defined by delimiter (on top of bufio.Reader allocations).
|
||||
type ChunkedReader struct { |
||||
b *bufio.Reader |
||||
data []byte |
||||
sizeLimit uint64 |
||||
|
||||
crc32 hash.Hash32 |
||||
} |
||||
|
||||
// NewChunkedReader constructs a ChunkedReader.
|
||||
// It allows passing data slice for byte slice reuse, which will be increased to needed size if smaller.
|
||||
func NewChunkedReader(r io.Reader, sizeLimit uint64, data []byte) *ChunkedReader { |
||||
return &ChunkedReader{b: bufio.NewReader(r), sizeLimit: sizeLimit, data: data, crc32: crc32.New(castagnoliTable)} |
||||
} |
||||
|
||||
// Next returns the next length-delimited record from the input, or io.EOF if
|
||||
// there are no more records available. Returns io.ErrUnexpectedEOF if a short
|
||||
// record is found, with a length of n but fewer than n bytes of data.
|
||||
// Next also verifies the given checksum with Castagnoli polynomial CRC-32 checksum.
|
||||
//
|
||||
// NOTE: The slice returned is valid only until a subsequent call to Next. It's a caller's responsibility to copy the
|
||||
// returned slice if needed.
|
||||
func (r *ChunkedReader) Next() ([]byte, error) { |
||||
size, err := binary.ReadUvarint(r.b) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
if size > r.sizeLimit { |
||||
return nil, errors.Errorf("chunkedReader: message size exceeded the limit %v bytes; got: %v bytes", r.sizeLimit, size) |
||||
} |
||||
|
||||
if cap(r.data) < int(size) { |
||||
r.data = make([]byte, size) |
||||
} else { |
||||
r.data = r.data[:size] |
||||
} |
||||
|
||||
var crc32 uint32 |
||||
if err := binary.Read(r.b, binary.BigEndian, &crc32); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
r.crc32.Reset() |
||||
if _, err := io.ReadFull(io.TeeReader(r.b, r.crc32), r.data); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
if r.crc32.Sum32() != crc32 { |
||||
return nil, errors.New("chunkedReader: corrupted frame; checksum mismatch") |
||||
} |
||||
return r.data, nil |
||||
} |
||||
|
||||
// NextProto consumes the next available record by calling r.Next, and decodes
|
||||
// it into the protobuf with proto.Unmarshal.
|
||||
func (r *ChunkedReader) NextProto(pb proto.Message) error { |
||||
rec, err := r.Next() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
return proto.Unmarshal(rec, pb) |
||||
} |
@ -0,0 +1,106 @@
|
||||
// Copyright 2019 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
package remote |
||||
|
||||
import ( |
||||
"bytes" |
||||
"io" |
||||
"testing" |
||||
|
||||
"github.com/prometheus/prometheus/util/testutil" |
||||
) |
||||
|
||||
type mockedFlusher struct { |
||||
flushed int |
||||
} |
||||
|
||||
func (f *mockedFlusher) Flush() { |
||||
f.flushed++ |
||||
} |
||||
|
||||
func TestChunkedReaderCanReadFromChunkedWriter(t *testing.T) { |
||||
b := &bytes.Buffer{} |
||||
f := &mockedFlusher{} |
||||
w := NewChunkedWriter(b, f) |
||||
r := NewChunkedReader(b, 20, nil) |
||||
|
||||
msgs := [][]byte{ |
||||
[]byte("test1"), |
||||
[]byte("test2"), |
||||
[]byte("test3"), |
||||
[]byte("test4"), |
||||
[]byte{}, // This is ignored by writer.
|
||||
[]byte("test5-after-empty"), |
||||
} |
||||
|
||||
for _, msg := range msgs { |
||||
n, err := w.Write(msg) |
||||
testutil.Ok(t, err) |
||||
testutil.Equals(t, len(msg), n) |
||||
} |
||||
|
||||
i := 0 |
||||
for ; i < 4; i++ { |
||||
msg, err := r.Next() |
||||
testutil.Ok(t, err) |
||||
testutil.Assert(t, i < len(msgs), "more messages then expected") |
||||
testutil.Equals(t, msgs[i], msg) |
||||
} |
||||
|
||||
// Empty byte slice is skipped.
|
||||
i++ |
||||
|
||||
msg, err := r.Next() |
||||
testutil.Ok(t, err) |
||||
testutil.Assert(t, i < len(msgs), "more messages then expected") |
||||
testutil.Equals(t, msgs[i], msg) |
||||
|
||||
_, err = r.Next() |
||||
testutil.NotOk(t, err, "expected io.EOF") |
||||
testutil.Equals(t, io.EOF, err) |
||||
|
||||
testutil.Equals(t, 5, f.flushed) |
||||
} |
||||
|
||||
func TestChunkedReader_Overflow(t *testing.T) { |
||||
b := &bytes.Buffer{} |
||||
_, err := NewChunkedWriter(b, &mockedFlusher{}).Write([]byte("twelve bytes")) |
||||
testutil.Ok(t, err) |
||||
|
||||
b2 := make([]byte, 12) |
||||
copy(b2, b.Bytes()) |
||||
|
||||
ret, err := NewChunkedReader(b, 12, nil).Next() |
||||
testutil.Ok(t, err) |
||||
testutil.Equals(t, "twelve bytes", string(ret)) |
||||
|
||||
_, err = NewChunkedReader(bytes.NewReader(b2), 11, nil).Next() |
||||
testutil.NotOk(t, err, "expect exceed limit error") |
||||
testutil.Equals(t, "chunkedReader: message size exceeded the limit 11 bytes; got: 12 bytes", err.Error()) |
||||
} |
||||
|
||||
func TestChunkedReader_CorruptedFrame(t *testing.T) { |
||||
b := &bytes.Buffer{} |
||||
w := NewChunkedWriter(b, &mockedFlusher{}) |
||||
|
||||
n, err := w.Write([]byte("test1")) |
||||
testutil.Ok(t, err) |
||||
testutil.Equals(t, 5, n) |
||||
|
||||
bs := b.Bytes() |
||||
bs[9] = 1 // Malform the frame by changing one byte.
|
||||
|
||||
_, err = NewChunkedReader(bytes.NewReader(bs), 20, nil).Next() |
||||
testutil.NotOk(t, err, "expected malformed frame") |
||||
testutil.Equals(t, "chunkedReader: corrupted frame; checksum mismatch", err.Error()) |
||||
} |
Loading…
Reference in new issue