
664 lines
17 KiB
Raw Normal View History

// Copyright 2014 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package local
import (
clientmodel ""
var (
m1 = clientmodel.Metric{"label": "value1"}
m2 = clientmodel.Metric{"label": "value2"}
m3 = clientmodel.Metric{"label": "value3"}
2015-03-04 12:40:18 +00:00
func newTestPersistence(t *testing.T, chunkType byte) (*persistence, test.Closer) {
dir := test.NewTemporaryDirectory("test_persistence", t)
2015-03-04 12:40:18 +00:00
p, err := newPersistence(dir.Path(), chunkType, false)
if err != nil {
return p, test.NewCallbackCloser(func() {
2015-03-04 12:40:18 +00:00
func buildTestChunks(chunkType byte) map[clientmodel.Fingerprint][]chunk {
fps := clientmodel.Fingerprints{
fpToChunks := map[clientmodel.Fingerprint][]chunk{}
for _, fp := range fps {
fpToChunks[fp] = make([]chunk, 0, 10)
for i := 0; i < 10; i++ {
2015-03-04 12:40:18 +00:00
fpToChunks[fp] = append(fpToChunks[fp], chunkForType(chunkType).add(&metric.SamplePair{
Timestamp: clientmodel.Timestamp(i),
Value: clientmodel.SampleValue(fp),
return fpToChunks
func chunksEqual(c1, c2 chunk) bool {
values2 := c2.values()
for v1 := range c1.values() {
v2 := <-values2
if !v1.Equal(v2) {
return false
return true
2015-03-04 12:40:18 +00:00
func testPersistLoadDropChunks(t *testing.T, chunkType byte) {
p, closer := newTestPersistence(t, chunkType)
defer closer.Close()
2015-03-04 12:40:18 +00:00
fpToChunks := buildTestChunks(chunkType)
for fp, chunks := range fpToChunks {
for i, c := range chunks {
index, err := p.persistChunks(fp, []chunk{c})
if err != nil {
if i != index {
t.Errorf("Want chunk index %d, got %d.", i, index)
for fp, expectedChunks := range fpToChunks {
indexes := make([]int, 0, len(expectedChunks))
for i := range expectedChunks {
indexes = append(indexes, i)
actualChunks, err := p.loadChunks(fp, indexes, 0)
if err != nil {
for _, i := range indexes {
if !chunksEqual(expectedChunks[i], actualChunks[i]) {
t.Errorf("%d. Chunks not equal.", i)
// Load all chunk descs.
actualChunkDescs, err := p.loadChunkDescs(fp, 10)
if len(actualChunkDescs) != 10 {
t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 10)
for i, cd := range actualChunkDescs {
if cd.firstTime() != clientmodel.Timestamp(i) || cd.lastTime() != clientmodel.Timestamp(i) {
"Want ts=%v, got firstTime=%v, lastTime=%v.",
i, cd.firstTime(), cd.lastTime(),
// Load chunk descs partially.
actualChunkDescs, err = p.loadChunkDescs(fp, 5)
if len(actualChunkDescs) != 5 {
t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 5)
for i, cd := range actualChunkDescs {
if cd.firstTime() != clientmodel.Timestamp(i) || cd.lastTime() != clientmodel.Timestamp(i) {
"Want ts=%v, got firstTime=%v, lastTime=%v.",
i, cd.firstTime(), cd.lastTime(),
// Drop half of the chunks.
for fp, expectedChunks := range fpToChunks {
firstTime, numDropped, allDropped, err := p.dropChunks(fp, 5)
if err != nil {
if firstTime != 5 {
t.Errorf("want first time 5, got %d", firstTime)
if numDropped != 5 {
t.Errorf("want 5 dropped chunks, got %v", numDropped)
if allDropped {
t.Error("all chunks dropped")
indexes := make([]int, 5)
for i := range indexes {
indexes[i] = i
actualChunks, err := p.loadChunks(fp, indexes, 0)
if err != nil {
for _, i := range indexes {
if !chunksEqual(expectedChunks[i+5], actualChunks[i]) {
t.Errorf("%d. Chunks not equal.", i)
// Drop all the chunks.
for fp := range fpToChunks {
firstTime, numDropped, allDropped, err := p.dropChunks(fp, 100)
if firstTime != 0 {
t.Errorf("want first time 0, got %d", firstTime)
if err != nil {
if numDropped != 5 {
t.Errorf("want 5 dropped chunks, got %v", numDropped)
if !allDropped {
t.Error("not all chunks dropped")
2015-03-04 12:40:18 +00:00
func TestPersistLoadDropChunksType0(t *testing.T) {
testPersistLoadDropChunks(t, 0)
func TestPersistLoadDropChunksType1(t *testing.T) {
testPersistLoadDropChunks(t, 1)
func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, chunkType byte) {
p, closer := newTestPersistence(t, chunkType)
defer closer.Close()
fpLocker := newFingerprintLocker(10)
sm := newSeriesMap()
2015-03-04 12:40:18 +00:00
s1 := newMemorySeries(m1, true, 0, chunkType)
s2 := newMemorySeries(m2, false, 0, chunkType)
s3 := newMemorySeries(m3, false, 0, chunkType)
s1.add(m1.Fingerprint(), &metric.SamplePair{Timestamp: 1, Value: 3.14})
s3.add(m1.Fingerprint(), &metric.SamplePair{Timestamp: 2, Value: 2.7})
s3.headChunkPersisted = true
sm.put(m1.Fingerprint(), s1)
sm.put(m2.Fingerprint(), s2)
sm.put(m3.Fingerprint(), s3)
if err := p.checkpointSeriesMapAndHeads(sm, fpLocker); err != nil {
loadedSM, err := p.loadSeriesMapAndHeads()
if err != nil {
if loadedSM.length() != 2 {
t.Errorf("want 2 series in map, got %d", loadedSM.length())
if loadedS1, ok := loadedSM.get(m1.Fingerprint()); ok {
if !reflect.DeepEqual(loadedS1.metric, m1) {
t.Errorf("want metric %v, got %v", m1, loadedS1.metric)
if !reflect.DeepEqual(loadedS1.head().chunk, s1.head().chunk) {
t.Error("head chunks differ")
if loadedS1.chunkDescsOffset != 0 {
t.Errorf("want chunkDescsOffset 0, got %d", loadedS1.chunkDescsOffset)
if loadedS1.headChunkPersisted {
t.Error("headChunkPersisted is true")
} else {
t.Errorf("couldn't find %v in loaded map", m1)
if loadedS3, ok := loadedSM.get(m3.Fingerprint()); ok {
if !reflect.DeepEqual(loadedS3.metric, m3) {
t.Errorf("want metric %v, got %v", m3, loadedS3.metric)
if loadedS3.head().chunk != nil {
t.Error("head chunk not evicted")
if loadedS3.chunkDescsOffset != -1 {
t.Errorf("want chunkDescsOffset -1, got %d", loadedS3.chunkDescsOffset)
if !loadedS3.headChunkPersisted {
t.Error("headChunkPersisted is false")
} else {
t.Errorf("couldn't find %v in loaded map", m1)
2015-03-04 12:40:18 +00:00
func TestCheckpointAndLoadSeriesMapAndHeadsChunkType0(t *testing.T) {
testCheckpointAndLoadSeriesMapAndHeads(t, 0)
func TestCheckpointAndLoadSeriesMapAndHeadsChunkType1(t *testing.T) {
testCheckpointAndLoadSeriesMapAndHeads(t, 1)
func testGetFingerprintsModifiedBefore(t *testing.T, chunkType byte) {
p, closer := newTestPersistence(t, chunkType)
defer closer.Close()
m1 := clientmodel.Metric{"n1": "v1"}
m2 := clientmodel.Metric{"n2": "v2"}
m3 := clientmodel.Metric{"n1": "v2"}
p.archiveMetric(1, m1, 2, 4)
p.archiveMetric(2, m2, 1, 6)
p.archiveMetric(3, m3, 5, 5)
expectedFPs := map[clientmodel.Timestamp][]clientmodel.Fingerprint{
0: {},
1: {},
2: {2},
3: {1, 2},
4: {1, 2},
5: {1, 2},
6: {1, 2, 3},
for ts, want := range expectedFPs {
got, err := p.getFingerprintsModifiedBefore(ts)
if err != nil {
if !reflect.DeepEqual(want, got) {
t.Errorf("timestamp: %v, want FPs %v, got %v", ts, want, got)
unarchived, firstTime, err := p.unarchiveMetric(1)
if err != nil {
if !unarchived {
t.Fatal("expected actual unarchival")
if firstTime != 2 {
t.Errorf("expected first time 2, got %v", firstTime)
unarchived, firstTime, err = p.unarchiveMetric(1)
if err != nil {
if unarchived {
t.Fatal("expected no unarchival")
expectedFPs = map[clientmodel.Timestamp][]clientmodel.Fingerprint{
0: {},
1: {},
2: {2},
3: {2},
4: {2},
5: {2},
6: {2, 3},
for ts, want := range expectedFPs {
got, err := p.getFingerprintsModifiedBefore(ts)
if err != nil {
if !reflect.DeepEqual(want, got) {
t.Errorf("timestamp: %v, want FPs %v, got %v", ts, want, got)
2015-03-04 12:40:18 +00:00
func TestGetFingerprintsModifiedBeforeChunkType0(t *testing.T) {
testGetFingerprintsModifiedBefore(t, 0)
func TestGetFingerprintsModifiedBeforeChunkType1(t *testing.T) {
testGetFingerprintsModifiedBefore(t, 1)
func testDropArchivedMetric(t *testing.T, chunkType byte) {
p, closer := newTestPersistence(t, chunkType)
defer closer.Close()
m1 := clientmodel.Metric{"n1": "v1"}
m2 := clientmodel.Metric{"n2": "v2"}
p.archiveMetric(1, m1, 2, 4)
p.archiveMetric(2, m2, 1, 6)
p.indexMetric(1, m1)
p.indexMetric(2, m2)
outFPs, err := p.getFingerprintsForLabelPair(metric.LabelPair{Name: "n1", Value: "v1"})
if err != nil {
want := clientmodel.Fingerprints{1}
if !reflect.DeepEqual(outFPs, want) {
t.Errorf("want %#v, got %#v", want, outFPs)
outFPs, err = p.getFingerprintsForLabelPair(metric.LabelPair{Name: "n2", Value: "v2"})
if err != nil {
want = clientmodel.Fingerprints{2}
if !reflect.DeepEqual(outFPs, want) {
t.Errorf("want %#v, got %#v", want, outFPs)
if archived, _, _, err := p.hasArchivedMetric(1); err != nil || !archived {
t.Error("want FP 1 archived")
if archived, _, _, err := p.hasArchivedMetric(2); err != nil || !archived {
t.Error("want FP 2 archived")
if err != p.purgeArchivedMetric(1) {
if err != p.purgeArchivedMetric(3) {
// Purging something that has not beet archived is not an error.
outFPs, err = p.getFingerprintsForLabelPair(metric.LabelPair{Name: "n1", Value: "v1"})
if err != nil {
want = nil
if !reflect.DeepEqual(outFPs, want) {
t.Errorf("want %#v, got %#v", want, outFPs)
outFPs, err = p.getFingerprintsForLabelPair(metric.LabelPair{Name: "n2", Value: "v2"})
if err != nil {
want = clientmodel.Fingerprints{2}
if !reflect.DeepEqual(outFPs, want) {
t.Errorf("want %#v, got %#v", want, outFPs)
if archived, _, _, err := p.hasArchivedMetric(1); err != nil || archived {
t.Error("want FP 1 not archived")
if archived, _, _, err := p.hasArchivedMetric(2); err != nil || !archived {
t.Error("want FP 2 archived")
2015-03-04 12:40:18 +00:00
func TestDropArchivedMetricChunkType0(t *testing.T) {
testDropArchivedMetric(t, 0)
func TestDropArchivedMetricChunkType1(t *testing.T) {
testDropArchivedMetric(t, 1)
type incrementalBatch struct {
fpToMetric index.FingerprintMetricMapping
expectedLnToLvs index.LabelNameLabelValuesMapping
expectedLpToFps index.LabelPairFingerprintsMapping
2015-03-04 12:40:18 +00:00
func testIndexing(t *testing.T, chunkType byte) {
batches := []incrementalBatch{
fpToMetric: index.FingerprintMetricMapping{
0: {
clientmodel.MetricNameLabel: "metric_0",
"label_1": "value_1",
1: {
clientmodel.MetricNameLabel: "metric_0",
"label_2": "value_2",
"label_3": "value_3",
2: {
clientmodel.MetricNameLabel: "metric_1",
"label_1": "value_2",
expectedLnToLvs: index.LabelNameLabelValuesMapping{
clientmodel.MetricNameLabel: codable.LabelValueSet{
"metric_0": struct{}{},
"metric_1": struct{}{},
"label_1": codable.LabelValueSet{
"value_1": struct{}{},
"value_2": struct{}{},
"label_2": codable.LabelValueSet{
"value_2": struct{}{},
"label_3": codable.LabelValueSet{
"value_3": struct{}{},
expectedLpToFps: index.LabelPairFingerprintsMapping{
Name: clientmodel.MetricNameLabel,
Value: "metric_0",
}: codable.FingerprintSet{0: struct{}{}, 1: struct{}{}},
Name: clientmodel.MetricNameLabel,
Value: "metric_1",
}: codable.FingerprintSet{2: struct{}{}},
Name: "label_1",
Value: "value_1",
}: codable.FingerprintSet{0: struct{}{}},
Name: "label_1",
Value: "value_2",
}: codable.FingerprintSet{2: struct{}{}},
Name: "label_2",
Value: "value_2",
}: codable.FingerprintSet{1: struct{}{}},
Name: "label_3",
Value: "value_3",
}: codable.FingerprintSet{1: struct{}{}},
}, {
fpToMetric: index.FingerprintMetricMapping{
3: {
clientmodel.MetricNameLabel: "metric_0",
"label_1": "value_3",
4: {
clientmodel.MetricNameLabel: "metric_2",
"label_2": "value_2",
"label_3": "value_1",
5: {
clientmodel.MetricNameLabel: "metric_1",
"label_1": "value_3",
expectedLnToLvs: index.LabelNameLabelValuesMapping{
clientmodel.MetricNameLabel: codable.LabelValueSet{
"metric_0": struct{}{},
"metric_1": struct{}{},
"metric_2": struct{}{},
"label_1": codable.LabelValueSet{
"value_1": struct{}{},
"value_2": struct{}{},
"value_3": struct{}{},
"label_2": codable.LabelValueSet{
"value_2": struct{}{},
"label_3": codable.LabelValueSet{
"value_1": struct{}{},
"value_3": struct{}{},
expectedLpToFps: index.LabelPairFingerprintsMapping{
Name: clientmodel.MetricNameLabel,
Value: "metric_0",
}: codable.FingerprintSet{0: struct{}{}, 1: struct{}{}, 3: struct{}{}},
Name: clientmodel.MetricNameLabel,
Value: "metric_1",
}: codable.FingerprintSet{2: struct{}{}, 5: struct{}{}},
Name: clientmodel.MetricNameLabel,
Value: "metric_2",
}: codable.FingerprintSet{4: struct{}{}},
Name: "label_1",
Value: "value_1",
}: codable.FingerprintSet{0: struct{}{}},
Name: "label_1",
Value: "value_2",
}: codable.FingerprintSet{2: struct{}{}},
Name: "label_1",
Value: "value_3",
}: codable.FingerprintSet{3: struct{}{}, 5: struct{}{}},
Name: "label_2",
Value: "value_2",
}: codable.FingerprintSet{1: struct{}{}, 4: struct{}{}},
Name: "label_3",
Value: "value_1",
}: codable.FingerprintSet{4: struct{}{}},
Name: "label_3",
Value: "value_3",
}: codable.FingerprintSet{1: struct{}{}},
2015-03-04 12:40:18 +00:00
p, closer := newTestPersistence(t, chunkType)
defer closer.Close()
indexedFpsToMetrics := index.FingerprintMetricMapping{}
for i, b := range batches {
for fp, m := range b.fpToMetric {
p.indexMetric(fp, m)
if err := p.archiveMetric(fp, m, 1, 2); err != nil {
indexedFpsToMetrics[fp] = m
verifyIndexedState(i, t, b, indexedFpsToMetrics, p)
for i := len(batches) - 1; i >= 0; i-- {
b := batches[i]
verifyIndexedState(i, t, batches[i], indexedFpsToMetrics, p)
for fp, m := range b.fpToMetric {
p.unindexMetric(fp, m)
unarchived, firstTime, err := p.unarchiveMetric(fp)
if err != nil {
if !unarchived {
t.Errorf("%d. metric not unarchived", i)
if firstTime != 1 {
t.Errorf("%d. expected firstTime=1, got %v", i, firstTime)
delete(indexedFpsToMetrics, fp)
2015-03-04 12:40:18 +00:00
func TestIndexingChunkType0(t *testing.T) {
testIndexing(t, 0)
2015-03-04 12:40:18 +00:00
func TestIndexingChunkType1(t *testing.T) {
testIndexing(t, 1)
func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMetrics index.FingerprintMetricMapping, p *persistence) {
for fp, m := range indexedFpsToMetrics {
// Compare archived metrics with input metrics.
mOut, err := p.getArchivedMetric(fp)
if err != nil {
if !mOut.Equal(m) {
t.Errorf("%d. %v: Got: %s; want %s", i, fp, mOut, m)
// Check that archived metrics are in membership index.
has, first, last, err := p.hasArchivedMetric(fp)
if err != nil {
if !has {
t.Errorf("%d. fingerprint %v not found", i, fp)
if first != 1 || last != 2 {
"%d. %v: Got first: %d, last %d; want first: %d, last %d",
i, fp, first, last, 1, 2,
// Compare label name -> label values mappings.
for ln, lvs := range b.expectedLnToLvs {
outLvs, err := p.getLabelValuesForLabelName(ln)
if err != nil {
outSet := codable.LabelValueSet{}
for _, lv := range outLvs {
outSet[lv] = struct{}{}
if !reflect.DeepEqual(lvs, outSet) {
t.Errorf("%d. label values don't match. Got: %v; want %v", i, outSet, lvs)
// Compare label pair -> fingerprints mappings.
for lp, fps := range b.expectedLpToFps {
outFPs, err := p.getFingerprintsForLabelPair(lp)
if err != nil {
outSet := codable.FingerprintSet{}
for _, fp := range outFPs {
outSet[fp] = struct{}{}
if !reflect.DeepEqual(fps, outSet) {
t.Errorf("%d. %v: fingerprints don't match. Got: %v; want %v", i, lp, outSet, fps)