diff --git a/main.go b/main.go index 3a56e040d..753bf916e 100644 --- a/main.go +++ b/main.go @@ -131,14 +131,12 @@ func main() { select { case scrapeResult := <-scrapeResults: if scrapeResult.Err == nil { - ts.AppendSample(scrapeResult.Sample) + ts.AppendSamples(scrapeResult.Samples) } case ruleResult := <-ruleResults: if ruleResult.Err == nil { - for _, sample := range ruleResult.Samples { - ts.AppendSample(sample) - } + ts.AppendSamples(ruleResult.Samples) } } } diff --git a/retrieval/format/processor0_0_1.go b/retrieval/format/processor0_0_1.go index 043736909..59e847521 100644 --- a/retrieval/format/processor0_0_1.go +++ b/retrieval/format/processor0_0_1.go @@ -75,6 +75,7 @@ func (p *processor001) Process(stream io.ReadCloser, timestamp time.Time, baseLa } // TODO(matt): This outer loop is a great basis for parallelization. + pendingSamples := model.Samples{} for _, entity := range entities { for _, value := range entity.Metric.Value { metric := model.Metric{} @@ -95,19 +96,15 @@ func (p *processor001) Process(stream io.ReadCloser, timestamp time.Time, baseLa sampleValue, ok := value.Value.(float64) if !ok { err = fmt.Errorf("Could not convert value from %s %s to float64.", entity, value) + results <- Result{Err: err} continue } - sample := model.Sample{ + pendingSamples = append(pendingSamples, model.Sample{ Metric: metric, Timestamp: timestamp, Value: model.SampleValue(sampleValue), - } - - results <- Result{ - Err: err, - Sample: sample, - } + }) break @@ -115,6 +112,7 @@ func (p *processor001) Process(stream io.ReadCloser, timestamp time.Time, baseLa sampleValue, ok := value.Value.(map[string]interface{}) if !ok { err = fmt.Errorf("Could not convert value from %q to a map[string]interface{}.", value.Value) + results <- Result{Err: err} continue } @@ -122,6 +120,7 @@ func (p *processor001) Process(stream io.ReadCloser, timestamp time.Time, baseLa individualValue, ok := percentileValue.(float64) if !ok { err = fmt.Errorf("Could not convert value from %q to a float64.", percentileValue) + results <- Result{Err: err} continue } @@ -133,16 +132,11 @@ func (p *processor001) Process(stream io.ReadCloser, timestamp time.Time, baseLa childMetric[model.LabelName(percentile001)] = model.LabelValue(percentile) - sample := model.Sample{ + pendingSamples = append(pendingSamples, model.Sample{ Metric: childMetric, Timestamp: timestamp, Value: model.SampleValue(individualValue), - } - - results <- Result{ - Err: err, - Sample: sample, - } + }) } break @@ -150,6 +144,9 @@ func (p *processor001) Process(stream io.ReadCloser, timestamp time.Time, baseLa } } } + if len(pendingSamples) > 0 { + results <- Result{Samples: pendingSamples} + } return } diff --git a/retrieval/format/processor0_0_1_test.go b/retrieval/format/processor0_0_1_test.go index 3bf3d16ba..914bc39ec 100644 --- a/retrieval/format/processor0_0_1_test.go +++ b/retrieval/format/processor0_0_1_test.go @@ -27,138 +27,91 @@ import ( func testProcessor001Process(t test.Tester) { var scenarios = []struct { in string - out []Result + out model.Samples err error }{ { err: fmt.Errorf("unexpected end of JSON input"), }, { - in: "[{\"baseLabels\":{\"name\":\"rpc_calls_total\"},\"docstring\":\"RPC calls.\",\"metric\":{\"type\":\"counter\",\"value\":[{\"labels\":{\"service\":\"zed\"},\"value\":25},{\"labels\":{\"service\":\"bar\"},\"value\":25},{\"labels\":{\"service\":\"foo\"},\"value\":25}]}},{\"baseLabels\":{\"name\":\"rpc_latency_microseconds\"},\"docstring\":\"RPC latency.\",\"metric\":{\"type\":\"histogram\",\"value\":[{\"labels\":{\"service\":\"foo\"},\"value\":{\"0.010000\":15.890724674774395,\"0.050000\":15.890724674774395,\"0.500000\":84.63044031436561,\"0.900000\":160.21100853053224,\"0.990000\":172.49828748957728}},{\"labels\":{\"service\":\"zed\"},\"value\":{\"0.010000\":0.0459814091918713,\"0.050000\":0.0459814091918713,\"0.500000\":0.6120456642749681,\"0.900000\":1.355915069887731,\"0.990000\":1.772733213161236}},{\"labels\":{\"service\":\"bar\"},\"value\":{\"0.010000\":78.48563317257356,\"0.050000\":78.48563317257356,\"0.500000\":97.31798360385088,\"0.900000\":109.89202084295582,\"0.990000\":109.99626121011262}}]}}]", - out: []Result{ - { - Sample: model.Sample{ - Metric: model.Metric{"service": "zed", model.MetricNameLabel: "rpc_calls_total"}, - Value: 25, - }, + in: `[{"baseLabels":{"name":"rpc_calls_total"},"docstring":"RPC calls.","metric":{"type":"counter","value":[{"labels":{"service":"zed"},"value":25},{"labels":{"service":"bar"},"value":25},{"labels":{"service":"foo"},"value":25}]}},{"baseLabels":{"name":"rpc_latency_microseconds"},"docstring":"RPC latency.","metric":{"type":"histogram","value":[{"labels":{"service":"foo"},"value":{"0.010000":15.890724674774395,"0.050000":15.890724674774395,"0.500000":84.63044031436561,"0.900000":160.21100853053224,"0.990000":172.49828748957728}},{"labels":{"service":"zed"},"value":{"0.010000":0.0459814091918713,"0.050000":0.0459814091918713,"0.500000":0.6120456642749681,"0.900000":1.355915069887731,"0.990000":1.772733213161236}},{"labels":{"service":"bar"},"value":{"0.010000":78.48563317257356,"0.050000":78.48563317257356,"0.500000":97.31798360385088,"0.900000":109.89202084295582,"0.990000":109.99626121011262}}]}}]`, + out: model.Samples{ + model.Sample{ + Metric: model.Metric{"service": "zed", model.MetricNameLabel: "rpc_calls_total"}, + Value: 25, }, - { - Sample: model.Sample{ - Metric: model.Metric{"service": "bar", model.MetricNameLabel: "rpc_calls_total"}, - Value: 25, - }, + model.Sample{ + Metric: model.Metric{"service": "bar", model.MetricNameLabel: "rpc_calls_total"}, + Value: 25, }, - { - Sample: model.Sample{ - - Metric: model.Metric{"service": "foo", model.MetricNameLabel: "rpc_calls_total"}, - Value: 25, - }, + model.Sample{ + Metric: model.Metric{"service": "foo", model.MetricNameLabel: "rpc_calls_total"}, + Value: 25, }, - { - Sample: model.Sample{ - - Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, - Value: 0.0459814091918713, - }, + model.Sample{ + Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, + Value: 0.0459814091918713, }, - { - Sample: model.Sample{ - - Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, - Value: 78.48563317257356, - }, + model.Sample{ + Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, + Value: 78.48563317257356, }, - { - Sample: model.Sample{ - - Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, - Value: 15.890724674774395, - }, + model.Sample{ + Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, + Value: 15.890724674774395, }, - { - Sample: model.Sample{ + model.Sample{ - Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, - Value: 0.0459814091918713, - }, + Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, + Value: 0.0459814091918713, }, - { - Sample: model.Sample{ - - Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, - Value: 78.48563317257356, - }, + model.Sample{ + Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, + Value: 78.48563317257356, }, - { - Sample: model.Sample{ + model.Sample{ - Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, - Value: 15.890724674774395, - }, + Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, + Value: 15.890724674774395, }, - { - Sample: model.Sample{ - - Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, - Value: 0.6120456642749681, - }, + model.Sample{ + Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, + Value: 0.6120456642749681, }, - { - Sample: model.Sample{ + model.Sample{ - Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, - Value: 97.31798360385088, - }, + Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, + Value: 97.31798360385088, }, - { - Sample: model.Sample{ - - Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, - Value: 84.63044031436561, - }, + model.Sample{ + Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, + Value: 84.63044031436561, }, - { - Sample: model.Sample{ + model.Sample{ - Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, - Value: 1.355915069887731, - }, + Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, + Value: 1.355915069887731, }, - { - Sample: model.Sample{ - - Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, - Value: 109.89202084295582, - }, + model.Sample{ + Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, + Value: 109.89202084295582, }, - { - Sample: model.Sample{ - - Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, - Value: 160.21100853053224, - }, + model.Sample{ + Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, + Value: 160.21100853053224, }, - { - Sample: model.Sample{ - - Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, - Value: 1.772733213161236, - }, + model.Sample{ + Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, + Value: 1.772733213161236, }, - { - Sample: model.Sample{ + model.Sample{ - Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, - Value: 109.99626121011262, - }, + Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, + Value: 109.99626121011262, }, - { - Sample: model.Sample{ - - Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, - Value: 172.49828748957728, - }, + model.Sample{ + Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, + Value: 172.49828748957728, }, }, }, @@ -179,18 +132,14 @@ func testProcessor001Process(t test.Tester) { continue } - if scenario.err != nil && err != nil { - if scenario.err.Error() != err.Error() { - t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err) - } - } else if scenario.err != err { - t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err) - } - - delivered := make([]Result, 0) + delivered := model.Samples{} for len(inputChannel) != 0 { - delivered = append(delivered, <-inputChannel) + result := <-inputChannel + if result.Err != nil { + t.Fatalf("%d. expected no error, got: %s", i, result.Err) + } + delivered = append(delivered, result.Samples...) } if len(delivered) != len(scenario.out) { @@ -209,24 +158,20 @@ func testProcessor001Process(t test.Tester) { found := false for element := expectedElements.Front(); element != nil && found == false; element = element.Next() { - candidate := element.Value.(Result) - - if !test.ErrorEqual(candidate.Err, actual.Err) { - continue - } + candidate := element.Value.(model.Sample) - if candidate.Sample.Value != actual.Sample.Value { + if candidate.Value != actual.Value { continue } - if len(candidate.Sample.Metric) != len(actual.Sample.Metric) { + if len(candidate.Metric) != len(actual.Metric) { continue } labelsMatch := false - for key, value := range candidate.Sample.Metric { - actualValue, ok := actual.Sample.Metric[key] + for key, value := range candidate.Metric { + actualValue, ok := actual.Metric[key] if !ok { break } @@ -246,7 +191,7 @@ func testProcessor001Process(t test.Tester) { } if !found { - t.Errorf("%d.%d. expected to find %s among candidate, absent", i, j, actual.Sample) + t.Errorf("%d.%d. expected to find %s among candidate, absent", i, j, actual) } } } diff --git a/retrieval/format/processor0_0_2.go b/retrieval/format/processor0_0_2.go index 207063f61..c8e279099 100644 --- a/retrieval/format/processor0_0_2.go +++ b/retrieval/format/processor0_0_2.go @@ -51,6 +51,7 @@ var Processor002 ProcessorFunc = func(stream io.ReadCloser, timestamp time.Time, return err } + pendingSamples := model.Samples{} for _, entity := range entities { entityLabels := baseLabels.Merge(LabelSet(entity.BaseLabels)) @@ -68,13 +69,11 @@ var Processor002 ProcessorFunc = func(stream io.ReadCloser, timestamp time.Time, for _, counter := range values { labels := entityLabels.Merge(LabelSet(counter.Labels)) - results <- Result{ - Sample: model.Sample{ - Metric: model.Metric(labels), - Timestamp: timestamp, - Value: counter.Value, - }, - } + pendingSamples = append(pendingSamples, model.Sample{ + Metric: model.Metric(labels), + Timestamp: timestamp, + Value: counter.Value, + }) } case "histogram": @@ -92,13 +91,11 @@ var Processor002 ProcessorFunc = func(stream io.ReadCloser, timestamp time.Time, labels := entityLabels.Merge(LabelSet(histogram.Labels)) labels[model.LabelName("percentile")] = model.LabelValue(percentile) - results <- Result{ - Sample: model.Sample{ - Metric: model.Metric(labels), - Timestamp: timestamp, - Value: value, - }, - } + pendingSamples = append(pendingSamples, model.Sample{ + Metric: model.Metric(labels), + Timestamp: timestamp, + Value: value, + }) } } @@ -109,5 +106,9 @@ var Processor002 ProcessorFunc = func(stream io.ReadCloser, timestamp time.Time, } } + if len(pendingSamples) > 0 { + results <- Result{Samples: pendingSamples} + } + return nil } diff --git a/retrieval/format/processor0_0_2_test.go b/retrieval/format/processor0_0_2_test.go index ece77c7f0..7d6a2d532 100644 --- a/retrieval/format/processor0_0_2_test.go +++ b/retrieval/format/processor0_0_2_test.go @@ -27,7 +27,7 @@ import ( func testProcessor002Process(t test.Tester) { var scenarios = []struct { in string - out []Result + out model.Samples err error }{ { @@ -35,130 +35,83 @@ func testProcessor002Process(t test.Tester) { }, { in: `[{"baseLabels":{"name":"rpc_calls_total"},"docstring":"RPC calls.","metric":{"type":"counter","value":[{"labels":{"service":"zed"},"value":25},{"labels":{"service":"bar"},"value":25},{"labels":{"service":"foo"},"value":25}]}},{"baseLabels":{"name":"rpc_latency_microseconds"},"docstring":"RPC latency.","metric":{"type":"histogram","value":[{"labels":{"service":"foo"},"value":{"0.010000":15.890724674774395,"0.050000":15.890724674774395,"0.500000":84.63044031436561,"0.900000":160.21100853053224,"0.990000":172.49828748957728}},{"labels":{"service":"zed"},"value":{"0.010000":0.0459814091918713,"0.050000":0.0459814091918713,"0.500000":0.6120456642749681,"0.900000":1.355915069887731,"0.990000":1.772733213161236}},{"labels":{"service":"bar"},"value":{"0.010000":78.48563317257356,"0.050000":78.48563317257356,"0.500000":97.31798360385088,"0.900000":109.89202084295582,"0.990000":109.99626121011262}}]}}]`, - out: []Result{ - { - Sample: model.Sample{ - Metric: model.Metric{"service": "zed", model.MetricNameLabel: "rpc_calls_total"}, - Value: 25, - }, + out: model.Samples{ + model.Sample{ + Metric: model.Metric{"service": "zed", model.MetricNameLabel: "rpc_calls_total"}, + Value: 25, }, - { - Sample: model.Sample{ - Metric: model.Metric{"service": "bar", model.MetricNameLabel: "rpc_calls_total"}, - Value: 25, - }, + model.Sample{ + Metric: model.Metric{"service": "bar", model.MetricNameLabel: "rpc_calls_total"}, + Value: 25, }, - { - Sample: model.Sample{ - - Metric: model.Metric{"service": "foo", model.MetricNameLabel: "rpc_calls_total"}, - Value: 25, - }, + model.Sample{ + Metric: model.Metric{"service": "foo", model.MetricNameLabel: "rpc_calls_total"}, + Value: 25, }, - { - Sample: model.Sample{ - - Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, - Value: 0.0459814091918713, - }, + model.Sample{ + Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, + Value: 0.0459814091918713, }, - { - Sample: model.Sample{ - - Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, - Value: 78.48563317257356, - }, + model.Sample{ + Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, + Value: 78.48563317257356, }, - { - Sample: model.Sample{ - - Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, - Value: 15.890724674774395, - }, + model.Sample{ + Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, + Value: 15.890724674774395, }, - { - Sample: model.Sample{ + model.Sample{ - Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, - Value: 0.0459814091918713, - }, + Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, + Value: 0.0459814091918713, }, - { - Sample: model.Sample{ - - Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, - Value: 78.48563317257356, - }, + model.Sample{ + Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, + Value: 78.48563317257356, }, - { - Sample: model.Sample{ + model.Sample{ - Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, - Value: 15.890724674774395, - }, + Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, + Value: 15.890724674774395, }, - { - Sample: model.Sample{ - - Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, - Value: 0.6120456642749681, - }, + model.Sample{ + Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, + Value: 0.6120456642749681, }, - { - Sample: model.Sample{ + model.Sample{ - Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, - Value: 97.31798360385088, - }, + Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, + Value: 97.31798360385088, }, - { - Sample: model.Sample{ - - Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, - Value: 84.63044031436561, - }, + model.Sample{ + Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, + Value: 84.63044031436561, }, - { - Sample: model.Sample{ + model.Sample{ - Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, - Value: 1.355915069887731, - }, + Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, + Value: 1.355915069887731, }, - { - Sample: model.Sample{ - - Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, - Value: 109.89202084295582, - }, + model.Sample{ + Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, + Value: 109.89202084295582, }, - { - Sample: model.Sample{ - - Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, - Value: 160.21100853053224, - }, + model.Sample{ + Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, + Value: 160.21100853053224, }, - { - Sample: model.Sample{ - - Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, - Value: 1.772733213161236, - }, + model.Sample{ + Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, + Value: 1.772733213161236, }, - { - Sample: model.Sample{ + model.Sample{ - Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, - Value: 109.99626121011262, - }, + Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, + Value: 109.99626121011262, }, - { - Sample: model.Sample{ - - Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, - Value: 172.49828748957728, - }, + model.Sample{ + Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, + Value: 172.49828748957728, }, }, }, @@ -179,10 +132,14 @@ func testProcessor002Process(t test.Tester) { continue } - delivered := make([]Result, 0) + delivered := model.Samples{} for len(inputChannel) != 0 { - delivered = append(delivered, <-inputChannel) + result := <-inputChannel + if result.Err != nil { + t.Fatalf("%d. expected no error, got: %s", i, result.Err) + } + delivered = append(delivered, result.Samples...) } if len(delivered) != len(scenario.out) { @@ -201,24 +158,20 @@ func testProcessor002Process(t test.Tester) { found := false for element := expectedElements.Front(); element != nil && found == false; element = element.Next() { - candidate := element.Value.(Result) - - if !test.ErrorEqual(candidate.Err, actual.Err) { - continue - } + candidate := element.Value.(model.Sample) - if candidate.Sample.Value != actual.Sample.Value { + if candidate.Value != actual.Value { continue } - if len(candidate.Sample.Metric) != len(actual.Sample.Metric) { + if len(candidate.Metric) != len(actual.Metric) { continue } labelsMatch := false - for key, value := range candidate.Sample.Metric { - actualValue, ok := actual.Sample.Metric[key] + for key, value := range candidate.Metric { + actualValue, ok := actual.Metric[key] if !ok { break } @@ -238,7 +191,7 @@ func testProcessor002Process(t test.Tester) { } if !found { - t.Errorf("%d.%d. expected to find %s among candidate, absent", i, j, actual.Sample) + t.Errorf("%d.%d. expected to find %s among candidate, absent", i, j, actual) } } } diff --git a/retrieval/format/result.go b/retrieval/format/result.go index c7ee6dcfd..6e2a08755 100644 --- a/retrieval/format/result.go +++ b/retrieval/format/result.go @@ -17,9 +17,8 @@ import ( "github.com/prometheus/prometheus/model" ) -// Result encapsulates the outcome from processing a given sample from a -// source. +// Result encapsulates the outcome from processing samples from a source. type Result struct { - Err error - Sample model.Sample + Err error + Samples model.Samples } diff --git a/retrieval/target.go b/retrieval/target.go index 47d864c35..31ad6a1bf 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -157,8 +157,8 @@ func (t *target) recordScrapeHealth(results chan format.Result, timestamp time.T } results <- format.Result{ - Err: nil, - Sample: sample, + Err: nil, + Samples: model.Samples{sample}, } } diff --git a/retrieval/target_test.go b/retrieval/target_test.go index 9fabfac56..fb353aed6 100644 --- a/retrieval/target_test.go +++ b/retrieval/target_test.go @@ -46,7 +46,12 @@ func TestTargetRecordScrapeHealth(t *testing.T) { go testTarget.recordScrapeHealth(results, now, true) result := <-results - actual := result.Sample + + if len(result.Samples) != 1 { + t.Fatalf("Expected one sample, got %d", len(result.Samples)) + } + + actual := result.Samples[0] expected := model.Sample{ Metric: model.Metric{ model.MetricNameLabel: model.ScrapeHealthMetricName, diff --git a/rules/manager.go b/rules/manager.go index d9bcda390..9bb5065b4 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -15,7 +15,7 @@ package rules import ( "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/rules/ast" + "github.com/prometheus/prometheus/model" "log" "sync" "time" @@ -23,7 +23,7 @@ import ( type Result struct { Err error // TODO propagate errors from rule evaluation. - Samples ast.Vector + Samples model.Samples } type RuleManager interface { @@ -73,8 +73,12 @@ func (m *ruleManager) runIteration(results chan *Result) { wg.Add(1) go func(rule Rule) { vector, err := rule.Eval(now) + samples := model.Samples{} + for _, sample := range vector { + samples = append(samples, sample) + } m.results <- &Result{ - Samples: vector, + Samples: samples, Err: err, } wg.Done() diff --git a/rules/testdata.go b/rules/testdata.go index ae449ba63..689e28fa5 100644 --- a/rules/testdata.go +++ b/rules/testdata.go @@ -51,20 +51,19 @@ func getTestVectorFromTestMatrix(matrix ast.Matrix) ast.Vector { return vector } -func storeMatrix(storage metric.Storage, matrix ast.Matrix) error { +func storeMatrix(storage metric.Storage, matrix ast.Matrix) (err error) { + pendingSamples := model.Samples{} for _, sampleSet := range matrix { for _, sample := range sampleSet.Values { - err := storage.AppendSample(model.Sample{ + pendingSamples = append(pendingSamples, model.Sample{ Metric: sampleSet.Metric, Value: sample.Value, Timestamp: sample.Timestamp, }) - if err != nil { - return err - } } } - return nil + err = storage.AppendSamples(pendingSamples) + return } var testMatrix = ast.Matrix{ diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index aa45a2eaa..569b78cd7 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -30,8 +30,8 @@ import ( // tieredStorage both persists samples and generates materialized views for // queries. type tieredStorage struct { - appendToDiskQueue chan model.Sample - appendToMemoryQueue chan model.Sample + appendToDiskQueue chan model.Samples + appendToMemoryQueue chan model.Samples diskFrontier *diskFrontier diskStorage *LevelDBMetricPersistence draining chan chan bool @@ -54,8 +54,8 @@ type viewJob struct { // Provides a unified means for batch appending values into the datastore along // with querying for values in an efficient way. type Storage interface { - // Enqueues a Sample for storage. - AppendSample(model.Sample) error + // Enqueues Samples for storage. + AppendSamples(model.Samples) error // Enqueus a ViewRequestBuilder for materialization, subject to a timeout. MakeView(request ViewRequestBuilder, timeout time.Duration) (View, error) // Starts serving requests. @@ -81,8 +81,8 @@ func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueu } storage = &tieredStorage{ - appendToDiskQueue: make(chan model.Sample, appendToDiskQueueDepth), - appendToMemoryQueue: make(chan model.Sample, appendToMemoryQueueDepth), + appendToDiskQueue: make(chan model.Samples, appendToDiskQueueDepth), + appendToMemoryQueue: make(chan model.Samples, appendToMemoryQueueDepth), diskStorage: diskStorage, draining: make(chan chan bool), flushMemoryInterval: flushMemoryInterval, @@ -94,7 +94,7 @@ func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueu return } -func (t tieredStorage) AppendSample(s model.Sample) (err error) { +func (t tieredStorage) AppendSamples(s model.Samples) (err error) { if len(t.draining) > 0 { return fmt.Errorf("Storage is in the process of draining.") } @@ -218,7 +218,7 @@ func (t *tieredStorage) writeMemory() { pendingLength := len(t.appendToMemoryQueue) for i := 0; i < pendingLength; i++ { - t.memoryArena.AppendSample(<-t.appendToMemoryQueue) + t.memoryArena.AppendSamples(<-t.appendToMemoryQueue) } } @@ -248,7 +248,7 @@ func (t tieredStorage) flush() (err error) { } type memoryToDiskFlusher struct { - toDiskQueue chan model.Sample + toDiskQueue chan model.Samples disk MetricPersistence olderThan time.Time valuesAccepted int @@ -294,10 +294,12 @@ func (f memoryToDiskFlusherVisitor) Operate(key, value interface{}) (err *storag f.flusher.Flush() } - f.flusher.toDiskQueue <- model.Sample{ - Metric: f.stream.metric, - Timestamp: recordTime, - Value: recordValue, + f.flusher.toDiskQueue <- model.Samples{ + model.Sample{ + Metric: f.stream.metric, + Timestamp: recordTime, + Value: recordValue, + }, } f.stream.values.Delete(skipListTime(recordTime)) @@ -318,7 +320,7 @@ func (f *memoryToDiskFlusher) Flush() { length := len(f.toDiskQueue) samples := model.Samples{} for i := 0; i < length; i++ { - samples = append(samples, <-f.toDiskQueue) + samples = append(samples, <-f.toDiskQueue...) } f.disk.AppendSamples(samples) } diff --git a/storage/metric/tiered_test.go b/storage/metric/tiered_test.go index 77f274364..03789d036 100644 --- a/storage/metric/tiered_test.go +++ b/storage/metric/tiered_test.go @@ -340,11 +340,9 @@ func testMakeView(t test.Tester, flushToDisk bool) { for i, scenario := range scenarios { tiered, closer := NewTestTieredStorage(t) - for j, datum := range scenario.data { - err := tiered.AppendSample(datum) - if err != nil { - t.Fatalf("%d.%d. failed to add fixture data: %s", i, j, err) - } + err := tiered.AppendSamples(scenario.data) + if err != nil { + t.Fatalf("%d. failed to add fixture data: %s", i, err) } if flushToDisk {