mirror of https://github.com/prometheus/prometheus
Use ChunkMeta in SeriesWriter
parent
c00d17e691
commit
5d7ec06e04
18
head.go
18
head.go
|
@ -269,13 +269,19 @@ func (h *HeadBlock) persist(p string) (int64, error) {
|
|||
}
|
||||
|
||||
iw := newIndexWriter(xf)
|
||||
sw := newSeriesWriter(sf, iw, h.stats.MinTime)
|
||||
sw := newSeriesWriter(sf, iw)
|
||||
|
||||
defer sw.Close()
|
||||
defer iw.Close()
|
||||
|
||||
for ref, cd := range h.descs {
|
||||
if err := sw.WriteSeries(uint32(ref), cd.lset, []*chunkDesc{cd}); err != nil {
|
||||
if err := sw.WriteSeries(uint32(ref), cd.lset, []ChunkMeta{
|
||||
{
|
||||
MinTime: cd.firsTimestamp,
|
||||
MaxTime: cd.lastTimestamp,
|
||||
Chunk: cd.chunk,
|
||||
},
|
||||
}); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
@ -299,6 +305,14 @@ func (h *HeadBlock) persist(p string) (int64, error) {
|
|||
return 0, err
|
||||
}
|
||||
}
|
||||
// Write a postings list containing all series.
|
||||
all := make([]uint32, len(h.descs))
|
||||
for i := range all {
|
||||
all[i] = uint32(i)
|
||||
}
|
||||
if err := iw.WritePostings("", "", newListPostings(all)); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Everything written successfully, we can remove the WAL.
|
||||
if err := h.wal.Close(); err != nil {
|
||||
|
|
48
writer.go
48
writer.go
|
@ -10,6 +10,7 @@ import (
|
|||
"strings"
|
||||
|
||||
"github.com/bradfitz/slice"
|
||||
"github.com/fabxc/tsdb/chunks"
|
||||
"github.com/fabxc/tsdb/labels"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
@ -27,7 +28,7 @@ type SeriesWriter interface {
|
|||
// WriteSeries writes the time series data chunks for a single series.
|
||||
// The reference is used to resolve the correct series in the written index.
|
||||
// It only has to be valid for the duration of the write.
|
||||
WriteSeries(ref uint32, l labels.Labels, cds []*chunkDesc) error
|
||||
WriteSeries(ref uint32, l labels.Labels, chunks []ChunkMeta) error
|
||||
|
||||
// Size returns the size of the data written so far.
|
||||
Size() int64
|
||||
|
@ -44,16 +45,14 @@ type seriesWriter struct {
|
|||
n int64
|
||||
c int
|
||||
|
||||
baseTimestamp int64
|
||||
index IndexWriter
|
||||
index IndexWriter
|
||||
}
|
||||
|
||||
func newSeriesWriter(w io.Writer, index IndexWriter, base int64) *seriesWriter {
|
||||
func newSeriesWriter(w io.Writer, index IndexWriter) *seriesWriter {
|
||||
return &seriesWriter{
|
||||
w: w,
|
||||
n: 0,
|
||||
index: index,
|
||||
baseTimestamp: base,
|
||||
w: w,
|
||||
n: 0,
|
||||
index: index,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -72,7 +71,7 @@ func (w *seriesWriter) writeMeta() error {
|
|||
return w.write(w.w, b[:])
|
||||
}
|
||||
|
||||
func (w *seriesWriter) WriteSeries(ref uint32, lset labels.Labels, chks []*chunkDesc) error {
|
||||
func (w *seriesWriter) WriteSeries(ref uint32, lset labels.Labels, chks []ChunkMeta) error {
|
||||
// Initialize with meta data.
|
||||
if w.n == 0 {
|
||||
if err := w.writeMeta(); err != nil {
|
||||
|
@ -96,25 +95,23 @@ func (w *seriesWriter) WriteSeries(ref uint32, lset labels.Labels, chks []*chunk
|
|||
return err
|
||||
}
|
||||
|
||||
metas := make([]ChunkMeta, 0, len(chks))
|
||||
for i := range chks {
|
||||
chk := &chks[i]
|
||||
|
||||
for _, cd := range chks {
|
||||
metas = append(metas, ChunkMeta{
|
||||
MinTime: cd.firsTimestamp,
|
||||
MaxTime: cd.lastTimestamp,
|
||||
Ref: uint32(w.n),
|
||||
})
|
||||
n = binary.PutUvarint(b[:], uint64(len(cd.chunk.Bytes())))
|
||||
chk.Ref = uint32(w.n)
|
||||
|
||||
n = binary.PutUvarint(b[:], uint64(len(chk.Chunk.Bytes())))
|
||||
|
||||
if err := w.write(wr, b[:n]); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := w.write(wr, []byte{byte(cd.chunk.Encoding())}); err != nil {
|
||||
if err := w.write(wr, []byte{byte(chk.Chunk.Encoding())}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := w.write(wr, cd.chunk.Bytes()); err != nil {
|
||||
if err := w.write(wr, chk.Chunk.Bytes()); err != nil {
|
||||
return err
|
||||
}
|
||||
chk.Chunk = nil
|
||||
}
|
||||
|
||||
if err := w.write(w.w, h.Sum(nil)); err != nil {
|
||||
|
@ -122,7 +119,7 @@ func (w *seriesWriter) WriteSeries(ref uint32, lset labels.Labels, chks []*chunk
|
|||
}
|
||||
|
||||
if w.index != nil {
|
||||
w.index.AddSeries(ref, lset, metas...)
|
||||
w.index.AddSeries(ref, lset, chks...)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -144,10 +141,15 @@ func (w *seriesWriter) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// ChunkMeta holds information about a chunk of data.
|
||||
type ChunkMeta struct {
|
||||
Ref uint32
|
||||
MinTime int64
|
||||
MaxTime int64
|
||||
// Ref and Chunk hold either a reference that can be used to retrieve
|
||||
// chunk data or the data itself.
|
||||
// Generally, only one of them is set.
|
||||
Ref uint32
|
||||
Chunk chunks.Chunk
|
||||
|
||||
MinTime, MaxTime int64 // time range the data covers
|
||||
}
|
||||
|
||||
// IndexWriter serialized the index for a block of series data.
|
||||
|
|
Loading…
Reference in New Issue