diff --git a/tsdb/block_test.go b/tsdb/block_test.go index 0ccaa7abd..6c095925d 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -302,12 +302,9 @@ func TestReadIndexFormatV1(t *testing.T) { // createBlock creates a block with given set of series and returns its dir. func createBlock(tb testing.TB, dir string, series []storage.Series) string { - chunkDir, err := ioutil.TempDir("", "chunk_dir") + blockDir, err := CreateBlock(series, dir, 0, log.NewNopLogger()) testutil.Ok(tb, err) - defer func() { testutil.Ok(tb, os.RemoveAll(chunkDir)) }() - head := createHead(tb, nil, series, chunkDir) - defer func() { testutil.Ok(tb, head.Close()) }() - return createBlockFromHead(tb, dir, head) + return blockDir } func createBlockFromHead(tb testing.TB, dir string, head *Head) string { diff --git a/tsdb/blockwriter.go b/tsdb/blockwriter.go new file mode 100644 index 000000000..bb6184169 --- /dev/null +++ b/tsdb/blockwriter.go @@ -0,0 +1,122 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "context" + "io/ioutil" + "math" + "os" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/oklog/ulid" + "github.com/pkg/errors" + + "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunkenc" +) + +// BlockWriter is a block writer that allows appending and flushing series to disk. +type BlockWriter struct { + logger log.Logger + destinationDir string + + head *Head + blockSize int64 // in ms + chunkDir string +} + +// NewBlockWriter create a new block writer. +// +// The returned writer accumulates all the series in the Head block until `Flush` is called. +// +// Note that the writer will not check if the target directory exists or +// contains anything at all. It is the caller's responsibility to +// ensure that the resulting blocks do not overlap etc. +// Writer ensures the block flush is atomic (via rename). +func NewBlockWriter(logger log.Logger, dir string, blockSize int64) (*BlockWriter, error) { + w := &BlockWriter{ + logger: logger, + destinationDir: dir, + blockSize: blockSize, + } + if err := w.initHead(); err != nil { + return nil, err + } + return w, nil +} + +// initHead creates and initialises a new TSDB head. +func (w *BlockWriter) initHead() error { + chunkDir, err := ioutil.TempDir(os.TempDir(), "head") + if err != nil { + return errors.Wrap(err, "create temp dir") + } + w.chunkDir = chunkDir + + h, err := NewHead(nil, w.logger, nil, w.blockSize, w.chunkDir, nil, DefaultStripeSize, nil) + if err != nil { + return errors.Wrap(err, "tsdb.NewHead") + } + + w.head = h + return w.head.Init(math.MinInt64) +} + +// Appender returns a new appender on the database. +// Appender can't be called concurrently. However, the returned Appender can safely be used concurrently. +func (w *BlockWriter) Appender(ctx context.Context) storage.Appender { + return w.head.Appender(ctx) +} + +// Flush implements the Writer interface. This is where actual block writing +// happens. After flush completes, no writes can be done. +func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) { + seriesCount := w.head.NumSeries() + if w.head.NumSeries() == 0 { + return ulid.ULID{}, errors.New("no series appended, aborting") + } + + mint := w.head.MinTime() + // Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). + // Because of this block intervals are always +1 than the total samples it includes. + maxt := w.head.MaxTime() + 1 + level.Info(w.logger).Log("msg", "flushing", "series_count", seriesCount, "mint", timestamp.Time(mint), "maxt", timestamp.Time(maxt)) + + compactor, err := NewLeveledCompactor(ctx, + nil, + w.logger, + []int64{w.blockSize}, + chunkenc.NewPool()) + if err != nil { + return ulid.ULID{}, errors.Wrap(err, "create leveled compactor") + } + id, err := compactor.Write(w.destinationDir, w.head, mint, maxt, nil) + if err != nil { + return ulid.ULID{}, errors.Wrap(err, "compactor write") + } + + return id, nil +} + +func (w *BlockWriter) Close() error { + defer func() { + if err := os.RemoveAll(w.chunkDir); err != nil { + level.Error(w.logger).Log("msg", "error in deleting BlockWriter files", "err", err) + } + }() + return w.head.Close() +} diff --git a/tsdb/blockwriter_test.go b/tsdb/blockwriter_test.go new file mode 100644 index 000000000..eaeed677f --- /dev/null +++ b/tsdb/blockwriter_test.go @@ -0,0 +1,67 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "context" + "io/ioutil" + "math" + "os" + "path/filepath" + "testing" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb/tsdbutil" + "github.com/prometheus/prometheus/util/testutil" +) + +func TestBlockWriter(t *testing.T) { + ctx := context.Background() + outputDir, err := ioutil.TempDir(os.TempDir(), "output") + testutil.Ok(t, err) + w, err := NewBlockWriter(log.NewNopLogger(), outputDir, DefaultBlockDuration) + testutil.Ok(t, err) + + // Flush with no series results in error. + _, err = w.Flush(ctx) + testutil.ErrorEqual(t, err, errors.New("no series appended, aborting")) + + // Add some series. + app := w.Appender(ctx) + ts1, v1 := int64(44), float64(7) + _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, ts1, v1) + testutil.Ok(t, err) + ts2, v2 := int64(55), float64(12) + _, err = app.Add(labels.Labels{{Name: "c", Value: "d"}}, ts2, v2) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + id, err := w.Flush(ctx) + testutil.Ok(t, err) + + // Confirm the block has the correct data. + blockpath := filepath.Join(outputDir, id.String()) + b, err := OpenBlock(nil, blockpath, nil) + testutil.Ok(t, err) + q, err := NewBlockQuerier(b, math.MinInt64, math.MaxInt64) + testutil.Ok(t, err) + series := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "", ".*")) + sample1 := []tsdbutil.Sample{sample{t: ts1, v: v1}} + sample2 := []tsdbutil.Sample{sample{t: ts2, v: v2}} + expectedSeries := map[string][]tsdbutil.Sample{"{a=\"b\"}": sample1, "{c=\"d\"}": sample2} + testutil.Equals(t, expectedSeries, series) + + testutil.Ok(t, w.Close()) +} diff --git a/tsdb/tsdbblockutil.go b/tsdb/tsdbblockutil.go index d152594da..15f129d31 100644 --- a/tsdb/tsdbblockutil.go +++ b/tsdb/tsdbblockutil.go @@ -16,72 +16,61 @@ package tsdb import ( "context" "fmt" - "os" "path/filepath" "github.com/go-kit/kit/log" - "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" ) var ErrInvalidTimes = fmt.Errorf("max time is lesser than min time") -type MetricSample struct { - TimestampMs int64 - Value float64 - Labels labels.Labels -} - -// CreateHead creates a TSDB writer head to write the sample data to. -func CreateHead(samples []*MetricSample, chunkRange int64, chunkDir string, logger log.Logger) (*Head, error) { - head, err := NewHead(nil, logger, nil, chunkRange, chunkDir, nil, DefaultStripeSize, nil) - - if err != nil { - return nil, err - } - app := head.Appender(context.TODO()) - for _, sample := range samples { - _, err = app.Add(sample.Labels, sample.TimestampMs, sample.Value) - if err != nil { - return nil, err - } - } - err = app.Commit() - if err != nil { - return nil, err - } - return head, nil -} - // CreateBlock creates a chunkrange block from the samples passed to it, and writes it to disk. -func CreateBlock(samples []*MetricSample, dir string, mint, maxt int64, logger log.Logger) (string, error) { - chunkRange := maxt - mint +func CreateBlock(series []storage.Series, dir string, chunkRange int64, logger log.Logger) (string, error) { if chunkRange == 0 { chunkRange = DefaultBlockDuration } if chunkRange < 0 { return "", ErrInvalidTimes } - chunkDir := filepath.Join(dir, "chunks_tmp") - defer func() { - os.RemoveAll(chunkDir) - }() - head, err := CreateHead(samples, chunkRange, chunkDir, logger) + + w, err := NewBlockWriter(logger, dir, chunkRange) if err != nil { return "", err } - defer head.Close() + defer func() { + if err := w.Close(); err != nil { + logger.Log("err closing blockwriter", err.Error()) + } + }() - compactor, err := NewLeveledCompactor(context.Background(), nil, logger, ExponentialBlockRanges(DefaultBlockDuration, 3, 5), nil) - if err != nil { - return "", err + ctx := context.Background() + app := w.Appender(ctx) + + for _, s := range series { + ref := uint64(0) + it := s.Iterator() + for it.Next() { + t, v := it.At() + if ref != 0 { + if err := app.AddFast(ref, t, v); err == nil { + continue + } + } + ref, err = app.Add(s.Labels(), t, v) + if err != nil { + return "", err + } + } + if it.Err() != nil { + return "", it.Err() + } } - err = os.MkdirAll(dir, 0777) - if err != nil { + if err = app.Commit(); err != nil { return "", err } - ulid, err := compactor.Write(dir, head, mint, maxt, nil) + ulid, err := w.Flush(ctx) if err != nil { return "", err }