Merge pull request #14599 from machine424/sigsegv

test(tsdb): add a reproducer for https://github.com/prometheus/prometheus/issues/14422
pull/15176/head
Bryan Boreham 2024-10-16 14:16:54 +01:00 committed by GitHub
commit 5dd5aec0e6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 116 additions and 0 deletions

View File

@ -15,6 +15,7 @@ package tsdb
import ( import (
"bufio" "bufio"
"bytes"
"context" "context"
"encoding/binary" "encoding/binary"
"flag" "flag"
@ -23,6 +24,8 @@ import (
"log/slog" "log/slog"
"math" "math"
"math/rand" "math/rand"
"net/http"
"net/http/httptest"
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
@ -41,6 +44,12 @@ import (
"go.uber.org/atomic" "go.uber.org/atomic"
"go.uber.org/goleak" "go.uber.org/goleak"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage/remote"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
@ -8857,3 +8866,110 @@ func TestGenerateCompactionDelay(t *testing.T) {
assertDelay(db.generateCompactionDelay()) assertDelay(db.generateCompactionDelay())
} }
} }
type blockedResponseRecorder struct {
r *httptest.ResponseRecorder
// writeblocked is used to block writing until the test wants it to resume.
writeBlocked chan struct{}
// writeStarted is closed by blockedResponseRecorder to signal that writing has started.
writeStarted chan struct{}
}
func (br *blockedResponseRecorder) Write(buf []byte) (int, error) {
select {
case <-br.writeStarted:
default:
close(br.writeStarted)
}
<-br.writeBlocked
return br.r.Write(buf)
}
func (br *blockedResponseRecorder) Header() http.Header { return br.r.Header() }
func (br *blockedResponseRecorder) WriteHeader(code int) { br.r.WriteHeader(code) }
func (br *blockedResponseRecorder) Flush() { br.r.Flush() }
// TestBlockClosingBlockedDuringRemoteRead ensures that a TSDB Block is not closed while it is being queried
// through remote read. This is a regression test for https://github.com/prometheus/prometheus/issues/14422.
// TODO: Ideally, this should reside in storage/remote/read_handler_test.go once the necessary TSDB utils are accessible there.
func TestBlockClosingBlockedDuringRemoteRead(t *testing.T) {
dir := t.TempDir()
createBlock(t, dir, genSeries(2, 1, 0, 10))
db, err := Open(dir, nil, nil, nil, nil)
require.NoError(t, err)
// No error checking as manually closing the block is supposed to make this fail.
defer db.Close()
readAPI := remote.NewReadHandler(nil, nil, db, func() config.Config {
return config.Config{}
},
0, 1, 0,
)
matcher, err := labels.NewMatcher(labels.MatchRegexp, "__name__", ".*")
require.NoError(t, err)
query, err := remote.ToQuery(0, 10, []*labels.Matcher{matcher}, nil)
require.NoError(t, err)
req := &prompb.ReadRequest{
Queries: []*prompb.Query{query},
AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS},
}
data, err := proto.Marshal(req)
require.NoError(t, err)
request, err := http.NewRequest(http.MethodPost, "", bytes.NewBuffer(snappy.Encode(nil, data)))
require.NoError(t, err)
blockedRecorder := &blockedResponseRecorder{
r: httptest.NewRecorder(),
writeBlocked: make(chan struct{}),
writeStarted: make(chan struct{}),
}
readDone := make(chan struct{})
go func() {
readAPI.ServeHTTP(blockedRecorder, request)
require.Equal(t, http.StatusOK, blockedRecorder.r.Code)
close(readDone)
}()
// Wait for the read API to start streaming data.
<-blockedRecorder.writeStarted
// Try to close the queried block.
blockClosed := make(chan struct{})
go func() {
for _, block := range db.Blocks() {
block.Close()
}
close(blockClosed)
}()
// Closing the queried block should block.
// Wait a little bit to make sure of that.
select {
case <-time.After(100 * time.Millisecond):
case <-readDone:
require.Fail(t, "read API should still be streaming data.")
case <-blockClosed:
require.Fail(t, "Block shouldn't get closed while being queried.")
}
// Resume the read API data streaming.
close(blockedRecorder.writeBlocked)
<-readDone
// The block should be no longer needed and closing it should end.
select {
case <-time.After(10 * time.Millisecond):
require.Fail(t, "Closing the block timed out.")
case <-blockClosed:
}
}