*: migrate from model.* to promql.* types

pull/2643/head
Fabian Reinartz 2016-12-25 00:37:46 +01:00
parent 9ea10d5265
commit 5817cb5bde
8 changed files with 343 additions and 264 deletions

View File

@ -34,6 +34,7 @@ import (
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/relabel" "github.com/prometheus/prometheus/relabel"
"github.com/prometheus/prometheus/retrieval" "github.com/prometheus/prometheus/retrieval"
) )
@ -50,10 +51,57 @@ const (
alertmanagerLabel = "alertmanager" alertmanagerLabel = "alertmanager"
) )
// Alert is a generic representation of an alert in the Prometheus eco-system.
type Alert struct {
// Label value pairs for purpose of aggregation, matching, and disposition
// dispatching. This must minimally include an "alertname" label.
Labels labels.Labels `json:"labels"`
// Extra key/value information which does not define alert identity.
Annotations labels.Labels `json:"annotations"`
// The known time range for this alert. Both ends are optional.
StartsAt time.Time `json:"startsAt,omitempty"`
EndsAt time.Time `json:"endsAt,omitempty"`
GeneratorURL string `json:"generatorURL,omitempty"`
}
// Name returns the name of the alert. It is equivalent to the "alertname" label.
func (a *Alert) Name() string {
return a.Labels.Get(labels.AlertName)
}
// Hash returns a hash over the alert. It is equivalent to the alert labels hash.
func (a *Alert) Hash() uint64 {
return a.Labels.Hash()
}
func (a *Alert) String() string {
s := fmt.Sprintf("%s[%s]", a.Name(), fmt.Sprintf("%016x", a.Hash())[:7])
if a.Resolved() {
return s + "[resolved]"
}
return s + "[active]"
}
// Resolved returns true iff the activity interval ended in the past.
func (a *Alert) Resolved() bool {
return a.ResolvedAt(time.Now())
}
// ResolvedAt returns true off the activity interval ended before
// the given timestamp.
func (a *Alert) ResolvedAt(ts time.Time) bool {
if a.EndsAt.IsZero() {
return false
}
return !a.EndsAt.After(ts)
}
// Notifier is responsible for dispatching alert notifications to an // Notifier is responsible for dispatching alert notifications to an
// alert manager service. // alert manager service.
type Notifier struct { type Notifier struct {
queue model.Alerts queue []*Alert
opts *Options opts *Options
more chan struct{} more chan struct{}
@ -84,7 +132,7 @@ func New(o *Options) *Notifier {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
return &Notifier{ return &Notifier{
queue: make(model.Alerts, 0, o.QueueCapacity), queue: make([]*Alert, 0, o.QueueCapacity),
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
more: make(chan struct{}, 1), more: make(chan struct{}, 1),
@ -182,17 +230,17 @@ func (n *Notifier) queueLen() int {
return len(n.queue) return len(n.queue)
} }
func (n *Notifier) nextBatch() []*model.Alert { func (n *Notifier) nextBatch() []*Alert {
n.mtx.Lock() n.mtx.Lock()
defer n.mtx.Unlock() defer n.mtx.Unlock()
var alerts model.Alerts var alerts []*Alert
if len(n.queue) > maxBatchSize { if len(n.queue) > maxBatchSize {
alerts = append(make(model.Alerts, 0, maxBatchSize), n.queue[:maxBatchSize]...) alerts = append(make([]*Alert, 0, maxBatchSize), n.queue[:maxBatchSize]...)
n.queue = n.queue[maxBatchSize:] n.queue = n.queue[maxBatchSize:]
} else { } else {
alerts = append(make(model.Alerts, 0, len(n.queue)), n.queue...) alerts = append(make([]*Alert, 0, len(n.queue)), n.queue...)
n.queue = n.queue[:0] n.queue = n.queue[:0]
} }
@ -221,15 +269,17 @@ func (n *Notifier) Run() {
// Send queues the given notification requests for processing. // Send queues the given notification requests for processing.
// Panics if called on a handler that is not running. // Panics if called on a handler that is not running.
func (n *Notifier) Send(alerts ...*model.Alert) { func (n *Notifier) Send(alerts ...*Alert) {
n.mtx.Lock() n.mtx.Lock()
defer n.mtx.Unlock() defer n.mtx.Unlock()
// Attach external labels before relabelling and sending. // Attach external labels before relabelling and sending.
for _, a := range alerts { for _, a := range alerts {
lb := labels.NewBuilder(a.Labels)
for ln, lv := range n.opts.ExternalLabels { for ln, lv := range n.opts.ExternalLabels {
if _, ok := a.Labels[ln]; !ok { if a.Labels.Get(string(ln)) == "" {
a.Labels[ln] = lv lb.Set(string(ln), string(lv))
} }
} }
} }
@ -259,16 +309,19 @@ func (n *Notifier) Send(alerts ...*model.Alert) {
n.setMore() n.setMore()
} }
func (n *Notifier) relabelAlerts(alerts []*model.Alert) []*model.Alert { func (n *Notifier) relabelAlerts(alerts []*Alert) []*Alert {
var relabeledAlerts []*model.Alert // TODO(fabxc): temporarily disabled.
for _, alert := range alerts { return alerts
labels := relabel.Process(alert.Labels, n.opts.RelabelConfigs...)
if labels != nil { // var relabeledAlerts []*Alert
alert.Labels = labels // for _, alert := range alerts {
relabeledAlerts = append(relabeledAlerts, alert) // labels := relabel.Process(alert.Labels, n.opts.RelabelConfigs...)
} // if labels != nil {
} // alert.Labels = labels
return relabeledAlerts // relabeledAlerts = append(relabeledAlerts, alert)
// }
// }
// return relabeledAlerts
} }
// setMore signals that the alert queue has items. // setMore signals that the alert queue has items.
@ -302,7 +355,7 @@ func (n *Notifier) Alertmanagers() []string {
// sendAll sends the alerts to all configured Alertmanagers concurrently. // sendAll sends the alerts to all configured Alertmanagers concurrently.
// It returns true if the alerts could be sent successfully to at least one Alertmanager. // It returns true if the alerts could be sent successfully to at least one Alertmanager.
func (n *Notifier) sendAll(alerts ...*model.Alert) bool { func (n *Notifier) sendAll(alerts ...*Alert) bool {
begin := time.Now() begin := time.Now()
b, err := json.Marshal(alerts) b, err := json.Marshal(alerts)

View File

@ -61,6 +61,13 @@ func (ls Labels) Hash() uint64 {
return xxhash.Sum64(b) return xxhash.Sum64(b)
} }
// Copy returns a copy of the labels.
func (ls Labels) Copy() Labels {
res := make(Labels, len(ls))
copy(res, ls)
return res
}
// Get returns the value for the label with the given name. // Get returns the value for the label with the given name.
// Returns an empty string if the label doesn't exist. // Returns an empty string if the label doesn't exist.
func (ls Labels) Get(name string) string { func (ls Labels) Get(name string) string {
@ -72,8 +79,8 @@ func (ls Labels) Get(name string) string {
return "" return ""
} }
// Equals returns whether the two label sets are equal. // Equal returns whether the two label sets are equal.
func (ls Labels) Equals(o Labels) bool { func Equal(ls, o Labels) bool {
if len(ls) != len(o) { if len(ls) != len(o) {
return false return false
} }

View File

@ -25,6 +25,7 @@ import (
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/template" "github.com/prometheus/prometheus/template"
"github.com/prometheus/prometheus/util/strutil" "github.com/prometheus/prometheus/util/strutil"
@ -32,12 +33,12 @@ import (
const ( const (
// AlertMetricName is the metric name for synthetic alert timeseries. // AlertMetricName is the metric name for synthetic alert timeseries.
alertMetricName model.LabelValue = "ALERTS" alertMetricName = "ALERTS"
// AlertNameLabel is the label name indicating the name of an alert. // AlertNameLabel is the label name indicating the name of an alert.
alertNameLabel model.LabelName = "alertname" alertNameLabel = "alertname"
// AlertStateLabel is the label name indicating the state of an alert. // AlertStateLabel is the label name indicating the state of an alert.
alertStateLabel model.LabelName = "alertstate" alertStateLabel = "alertstate"
) )
// AlertState denotes the state of an active alert. // AlertState denotes the state of an active alert.
@ -69,10 +70,12 @@ func (s AlertState) String() string {
// Alert is the user-level representation of a single instance of an alerting rule. // Alert is the user-level representation of a single instance of an alerting rule.
type Alert struct { type Alert struct {
State AlertState State AlertState
Labels model.LabelSet
Annotations model.LabelSet Labels labels.Labels
Annotations labels.Labels
// The value at the last evaluation of the alerting expression. // The value at the last evaluation of the alerting expression.
Value model.SampleValue Value float64
// The interval during which the condition of this alert held true. // The interval during which the condition of this alert held true.
// ResolvedAt will be 0 to indicate a still active alert. // ResolvedAt will be 0 to indicate a still active alert.
ActiveAt, ResolvedAt model.Time ActiveAt, ResolvedAt model.Time
@ -88,26 +91,26 @@ type AlertingRule struct {
// output vector before an alert transitions from Pending to Firing state. // output vector before an alert transitions from Pending to Firing state.
holdDuration time.Duration holdDuration time.Duration
// Extra labels to attach to the resulting alert sample vectors. // Extra labels to attach to the resulting alert sample vectors.
labels model.LabelSet labels labels.Labels
// Non-identifying key/value pairs. // Non-identifying key/value pairs.
annotations model.LabelSet annotations labels.Labels
// Protects the below. // Protects the below.
mtx sync.Mutex mtx sync.Mutex
// A map of alerts which are currently active (Pending or Firing), keyed by // A map of alerts which are currently active (Pending or Firing), keyed by
// the fingerprint of the labelset they correspond to. // the fingerprint of the labelset they correspond to.
active map[model.Fingerprint]*Alert active map[uint64]*Alert
} }
// NewAlertingRule constructs a new AlertingRule. // NewAlertingRule constructs a new AlertingRule.
func NewAlertingRule(name string, vec promql.Expr, hold time.Duration, lbls, anns model.LabelSet) *AlertingRule { func NewAlertingRule(name string, vec promql.Expr, hold time.Duration, lbls, anns labels.Labels) *AlertingRule {
return &AlertingRule{ return &AlertingRule{
name: name, name: name,
vector: vec, vector: vec,
holdDuration: hold, holdDuration: hold,
labels: lbls, labels: lbls,
annotations: anns, annotations: anns,
active: map[model.Fingerprint]*Alert{}, active: map[uint64]*Alert{},
} }
} }
@ -117,27 +120,26 @@ func (r *AlertingRule) Name() string {
} }
func (r *AlertingRule) equal(o *AlertingRule) bool { func (r *AlertingRule) equal(o *AlertingRule) bool {
return r.name == o.name && r.labels.Equal(o.labels) return r.name == o.name && labels.Equal(r.labels, o.labels)
} }
func (r *AlertingRule) sample(alert *Alert, ts model.Time, set bool) *model.Sample { func (r *AlertingRule) sample(alert *Alert, ts model.Time, set bool) promql.Sample {
metric := model.Metric(r.labels.Clone()) lb := labels.NewBuilder(r.labels)
for ln, lv := range alert.Labels { for _, l := range alert.Labels {
metric[ln] = lv lb.Set(l.Name, l.Value)
} }
metric[model.MetricNameLabel] = alertMetricName lb.Set(labels.MetricName, alertMetricName)
metric[model.AlertNameLabel] = model.LabelValue(r.name) lb.Set(labels.AlertName, r.name)
metric[alertStateLabel] = model.LabelValue(alert.State.String()) lb.Set(alertStateLabel, alert.State.String())
s := &model.Sample{ s := promql.Sample{
Metric: metric, Metric: lb.Labels(),
Timestamp: ts, Point: promql.Point{T: int64(ts), V: 0},
Value: 0,
} }
if set { if set {
s.Value = 1 s.V = 1
} }
return s return s
} }
@ -148,8 +150,8 @@ const resolvedRetention = 15 * time.Minute
// Eval evaluates the rule expression and then creates pending alerts and fires // Eval evaluates the rule expression and then creates pending alerts and fires
// or removes previously pending alerts accordingly. // or removes previously pending alerts accordingly.
func (r *AlertingRule) Eval(ctx context.Context, ts model.Time, engine *promql.Engine, externalURLPath string) (model.Vector, error) { func (r *AlertingRule) Eval(ctx context.Context, ts model.Time, engine *promql.Engine, externalURLPath string) (promql.Vector, error) {
query, err := engine.NewInstantQuery(r.vector.String(), ts) query, err := engine.NewInstantQuery(r.vector.String(), ts.Time())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -163,13 +165,13 @@ func (r *AlertingRule) Eval(ctx context.Context, ts model.Time, engine *promql.E
// Create pending alerts for any new vector elements in the alert expression // Create pending alerts for any new vector elements in the alert expression
// or update the expression value for existing elements. // or update the expression value for existing elements.
resultFPs := map[model.Fingerprint]struct{}{} resultFPs := map[uint64]struct{}{}
for _, smpl := range res { for _, smpl := range res {
// Provide the alert information to the template. // Provide the alert information to the template.
l := make(map[string]string, len(smpl.Metric)) l := make(map[string]string, len(smpl.Metric))
for k, v := range smpl.Metric { for _, lbl := range smpl.Metric {
l[string(k)] = string(v) l[lbl.Name] = lbl.Value
} }
tmplData := struct { tmplData := struct {
@ -177,13 +179,13 @@ func (r *AlertingRule) Eval(ctx context.Context, ts model.Time, engine *promql.E
Value float64 Value float64
}{ }{
Labels: l, Labels: l,
Value: float64(smpl.Value), Value: smpl.V,
} }
// Inject some convenience variables that are easier to remember for users // Inject some convenience variables that are easier to remember for users
// who are not used to Go's templating system. // who are not used to Go's templating system.
defs := "{{$labels := .Labels}}{{$value := .Value}}" defs := "{{$labels := .Labels}}{{$value := .Value}}"
expand := func(text model.LabelValue) model.LabelValue { expand := func(text string) string {
tmpl := template.NewTemplateExpander( tmpl := template.NewTemplateExpander(
ctx, ctx,
defs+string(text), defs+string(text),
@ -198,44 +200,42 @@ func (r *AlertingRule) Eval(ctx context.Context, ts model.Time, engine *promql.E
result = fmt.Sprintf("<error expanding template: %s>", err) result = fmt.Sprintf("<error expanding template: %s>", err)
log.Warnf("Error expanding alert template %v with data '%v': %s", r.Name(), tmplData, err) log.Warnf("Error expanding alert template %v with data '%v': %s", r.Name(), tmplData, err)
} }
return model.LabelValue(result) return result
} }
delete(smpl.Metric, model.MetricNameLabel) lb := labels.NewBuilder(smpl.Metric).Del(labels.MetricName)
labels := make(model.LabelSet, len(smpl.Metric)+len(r.labels)+1)
for ln, lv := range smpl.Metric {
labels[ln] = lv
}
for ln, lv := range r.labels {
labels[ln] = expand(lv)
}
labels[model.AlertNameLabel] = model.LabelValue(r.Name())
annotations := make(model.LabelSet, len(r.annotations)) for _, l := range r.labels {
for an, av := range r.annotations { lb.Set(l.Name, expand(l.Value))
annotations[an] = expand(av)
} }
fp := smpl.Metric.Fingerprint() lb.Set(labels.AlertName, r.Name())
resultFPs[fp] = struct{}{}
annotations := make(labels.Labels, 0, len(r.annotations))
for _, a := range r.annotations {
annotations = append(annotations, labels.Label{Name: a.Name, Value: expand(a.Value)})
}
h := smpl.Metric.Hash()
resultFPs[h] = struct{}{}
// Check whether we already have alerting state for the identifying label set. // Check whether we already have alerting state for the identifying label set.
// Update the last value and annotations if so, create a new alert entry otherwise. // Update the last value and annotations if so, create a new alert entry otherwise.
if alert, ok := r.active[fp]; ok && alert.State != StateInactive { if alert, ok := r.active[h]; ok && alert.State != StateInactive {
alert.Value = smpl.Value alert.Value = smpl.V
alert.Annotations = annotations alert.Annotations = annotations
continue continue
} }
r.active[fp] = &Alert{ r.active[h] = &Alert{
Labels: labels, Labels: lb.Labels(),
Annotations: annotations, Annotations: annotations,
ActiveAt: ts, ActiveAt: ts,
State: StatePending, State: StatePending,
Value: smpl.Value, Value: smpl.V,
} }
} }
var vec model.Vector var vec promql.Vector
// Check if any pending alerts should be removed or fire now. Write out alert timeseries. // Check if any pending alerts should be removed or fire now. Write out alert timeseries.
for fp, a := range r.active { for fp, a := range r.active {
if _, ok := resultFPs[fp]; !ok { if _, ok := resultFPs[fp]; !ok {

View File

@ -20,6 +20,7 @@ import (
"path/filepath" "path/filepath"
"sync" "sync"
"time" "time"
"unsafe"
html_template "html/template" html_template "html/template"
@ -106,7 +107,7 @@ const (
type Rule interface { type Rule interface {
Name() string Name() string
// eval evaluates the rule, including any associated recording or alerting actions. // eval evaluates the rule, including any associated recording or alerting actions.
Eval(context.Context, model.Time, *promql.Engine, string) (model.Vector, error) Eval(context.Context, model.Time, *promql.Engine, string) (promql.Vector, error)
// String returns a human-readable string representation of the rule. // String returns a human-readable string representation of the rule.
String() string String() string
// HTMLSnippet returns a human-readable string representation of the rule, // HTMLSnippet returns a human-readable string representation of the rule,
@ -278,8 +279,16 @@ func (g *Group) Eval() {
numOutOfOrder = 0 numOutOfOrder = 0
numDuplicates = 0 numDuplicates = 0
) )
for _, s := range vector { for _, s := range vector {
if err := g.opts.SampleAppender.Append(s); err != nil { // TODO(fabxc): adjust after reworking appending.
var ms model.Sample
lbls := s.Metric.Map()
ms.Metric = *(*model.Metric)(unsafe.Pointer(&lbls))
ms.Timestamp = model.Time(s.T)
ms.Value = model.SampleValue(s.V)
if err := g.opts.SampleAppender.Append(&ms); err != nil {
switch err { switch err {
case local.ErrOutOfOrderSample: case local.ErrOutOfOrderSample:
numOutOfOrder++ numOutOfOrder++
@ -305,7 +314,7 @@ func (g *Group) Eval() {
// sendAlerts sends alert notifications for the given rule. // sendAlerts sends alert notifications for the given rule.
func (g *Group) sendAlerts(rule *AlertingRule, timestamp model.Time) error { func (g *Group) sendAlerts(rule *AlertingRule, timestamp model.Time) error {
var alerts model.Alerts var alerts []*notifier.Alert
for _, alert := range rule.currentAlerts() { for _, alert := range rule.currentAlerts() {
// Only send actually firing alerts. // Only send actually firing alerts.
@ -313,7 +322,7 @@ func (g *Group) sendAlerts(rule *AlertingRule, timestamp model.Time) error {
continue continue
} }
a := &model.Alert{ a := &notifier.Alert{
StartsAt: alert.ActiveAt.Add(rule.holdDuration).Time(), StartsAt: alert.ActiveAt.Add(rule.holdDuration).Time(),
Labels: alert.Labels, Labels: alert.Labels,
Annotations: alert.Annotations, Annotations: alert.Annotations,

View File

@ -20,6 +20,7 @@ import (
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"golang.org/x/net/context" "golang.org/x/net/context"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/util/strutil" "github.com/prometheus/prometheus/util/strutil"
) )
@ -28,15 +29,15 @@ import (
type RecordingRule struct { type RecordingRule struct {
name string name string
vector promql.Expr vector promql.Expr
labels model.LabelSet labels labels.Labels
} }
// NewRecordingRule returns a new recording rule. // NewRecordingRule returns a new recording rule.
func NewRecordingRule(name string, vector promql.Expr, labels model.LabelSet) *RecordingRule { func NewRecordingRule(name string, vector promql.Expr, lset labels.Labels) *RecordingRule {
return &RecordingRule{ return &RecordingRule{
name: name, name: name,
vector: vector, vector: vector,
labels: labels, labels: lset,
} }
} }
@ -46,35 +47,27 @@ func (rule RecordingRule) Name() string {
} }
// Eval evaluates the rule and then overrides the metric names and labels accordingly. // Eval evaluates the rule and then overrides the metric names and labels accordingly.
func (rule RecordingRule) Eval(ctx context.Context, timestamp model.Time, engine *promql.Engine, _ string) (model.Vector, error) { func (rule RecordingRule) Eval(ctx context.Context, timestamp model.Time, engine *promql.Engine, _ string) (promql.Vector, error) {
query, err := engine.NewInstantQuery(rule.vector.String(), timestamp) query, err := engine.NewInstantQuery(rule.vector.String(), timestamp.Time())
if err != nil { if err != nil {
return nil, err return nil, err
} }
var ( var (
result = query.Exec(ctx) result = query.Exec(ctx)
vector model.Vector vector promql.Vector
) )
if result.Err != nil { if result.Err != nil {
return nil, err return nil, err
} }
switch result.Value.(type) { switch v := result.Value.(type) {
case model.Vector: case promql.Vector:
vector, err = result.Vector() vector = v
if err != nil { case promql.Scalar:
return nil, err vector = promql.Vector{promql.Sample{
} Point: promql.Point(v),
case *model.Scalar: Metric: labels.Labels{},
scalar, err := result.Scalar()
if err != nil {
return nil, err
}
vector = model.Vector{&model.Sample{
Value: scalar.Value,
Timestamp: scalar.Timestamp,
Metric: model.Metric{},
}} }}
default: default:
return nil, fmt.Errorf("rule result is not a vector or scalar") return nil, fmt.Errorf("rule result is not a vector or scalar")
@ -82,15 +75,19 @@ func (rule RecordingRule) Eval(ctx context.Context, timestamp model.Time, engine
// Override the metric name and labels. // Override the metric name and labels.
for _, sample := range vector { for _, sample := range vector {
sample.Metric[model.MetricNameLabel] = model.LabelValue(rule.name) lb := labels.NewBuilder(sample.Metric)
for label, value := range rule.labels { lb.Set(labels.MetricName, rule.name)
if value == "" {
delete(sample.Metric, label) for _, l := range rule.labels {
if l.Value == "" {
lb.Del(l.Name)
} else { } else {
sample.Metric[label] = value lb.Set(l.Name, l.Value)
} }
} }
sample.Metric = lb.Labels()
} }
return vector, nil return vector, nil

View File

@ -27,6 +27,7 @@ import (
text_template "text/template" text_template "text/template"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"golang.org/x/net/context" "golang.org/x/net/context"
"github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql"
@ -66,22 +67,21 @@ func query(ctx context.Context, q string, ts time.Time, queryEngine *promql.Engi
if res.Err != nil { if res.Err != nil {
return nil, res.Err return nil, res.Err
} }
var vector model.Vector var vector promql.Vector
switch v := res.Value.(type) { switch v := res.Value.(type) {
case model.Matrix: case promql.Matrix:
return nil, errors.New("matrix return values not supported") return nil, errors.New("matrix return values not supported")
case model.Vector: case promql.Vector:
vector = v vector = v
case *model.Scalar: case promql.Scalar:
vector = model.Vector{&model.Sample{ vector = promql.Vector{promql.Sample{
Value: v.Value, Point: promql.Point(v),
Timestamp: v.Timestamp,
}} }}
case *model.String: case promql.String:
vector = model.Vector{&model.Sample{ vector = promql.Vector{promql.Sample{
Metric: model.Metric{"__value__": model.LabelValue(v.Value)}, Metric: labels.FromStrings("__value__", v.V),
Timestamp: v.Timestamp, Point: promql.Point{T: v.T},
}} }}
default: default:
panic("template.query: unhandled result value type") panic("template.query: unhandled result value type")
@ -89,14 +89,12 @@ func query(ctx context.Context, q string, ts time.Time, queryEngine *promql.Engi
// promql.Vector is hard to work with in templates, so convert to // promql.Vector is hard to work with in templates, so convert to
// base data types. // base data types.
// TODO(fabxc): probably not true anymore after type rework.
var result = make(queryResult, len(vector)) var result = make(queryResult, len(vector))
for n, v := range vector { for n, v := range vector {
s := sample{ s := sample{
Value: float64(v.Value), Value: v.V,
Labels: make(map[string]string), Labels: v.Metric.Map(),
}
for label, value := range v.Metric {
s.Labels[string(label)] = string(value)
} }
result[n] = &s result[n] = &s
} }
@ -119,7 +117,7 @@ func NewTemplateExpander(ctx context.Context, text string, name string, data int
data: data, data: data,
funcMap: text_template.FuncMap{ funcMap: text_template.FuncMap{
"query": func(q string) (queryResult, error) { "query": func(q string) (queryResult, error) {
return query(ctx, q, timestamp, queryEngine) return query(ctx, q, timestamp.Time(), queryEngine)
}, },
"first": func(v queryResult) (*sample, error) { "first": func(v queryResult) (*sample, error) {
if len(v) > 0 { if len(v) > 0 {

View File

@ -17,6 +17,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"math"
"net/http" "net/http"
"sort" "sort"
"strconv" "strconv"
@ -30,7 +31,6 @@ import (
"github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/retrieval" "github.com/prometheus/prometheus/retrieval"
"github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/util/httputil" "github.com/prometheus/prometheus/util/httputil"
) )
@ -96,7 +96,7 @@ type API struct {
targetRetriever targetRetriever targetRetriever targetRetriever
context func(r *http.Request) context.Context context func(r *http.Request) context.Context
now func() model.Time now func() time.Time
} }
// NewAPI returns an initialized API type. // NewAPI returns an initialized API type.
@ -106,7 +106,7 @@ func NewAPI(qe *promql.Engine, st local.Storage, tr targetRetriever) *API {
Storage: st, Storage: st,
targetRetriever: tr, targetRetriever: tr,
context: route.Context, context: route.Context,
now: model.Now, now: time.Now,
} }
} }
@ -142,8 +142,8 @@ func (api *API) Register(r *route.Router) {
} }
type queryData struct { type queryData struct {
ResultType model.ValueType `json:"resultType"` ResultType promql.ValueType `json:"resultType"`
Result model.Value `json:"result"` Result promql.Value `json:"result"`
} }
func (api *API) options(r *http.Request) (interface{}, *apiError) { func (api *API) options(r *http.Request) (interface{}, *apiError) {
@ -151,7 +151,7 @@ func (api *API) options(r *http.Request) (interface{}, *apiError) {
} }
func (api *API) query(r *http.Request) (interface{}, *apiError) { func (api *API) query(r *http.Request) (interface{}, *apiError) {
var ts model.Time var ts time.Time
if t := r.FormValue("time"); t != "" { if t := r.FormValue("time"); t != "" {
var err error var err error
ts, err = parseTime(t) ts, err = parseTime(t)
@ -262,7 +262,7 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) {
return nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")} return nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")}
} }
var start model.Time var start time.Time
if t := r.FormValue("start"); t != "" { if t := r.FormValue("start"); t != "" {
var err error var err error
start, err = parseTime(t) start, err = parseTime(t)
@ -270,10 +270,10 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) {
return nil, &apiError{errorBadData, err} return nil, &apiError{errorBadData, err}
} }
} else { } else {
start = model.Earliest start = time.Unix(math.MinInt64, 0)
} }
var end model.Time var end time.Time
if t := r.FormValue("end"); t != "" { if t := r.FormValue("end"); t != "" {
var err error var err error
end, err = parseTime(t) end, err = parseTime(t)
@ -281,10 +281,10 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) {
return nil, &apiError{errorBadData, err} return nil, &apiError{errorBadData, err}
} }
} else { } else {
end = model.Latest end = time.Unix(math.MaxInt64, 0)
} }
var matcherSets []metric.LabelMatchers var matcherSets [][]*promql.LabelMatcher
for _, s := range r.Form["match[]"] { for _, s := range r.Form["match[]"] {
matchers, err := promql.ParseMetricSelector(s) matchers, err := promql.ParseMetricSelector(s)
if err != nil { if err != nil {
@ -293,22 +293,26 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) {
matcherSets = append(matcherSets, matchers) matcherSets = append(matcherSets, matchers)
} }
q, err := api.Storage.Querier() // TODO(fabxc): temporarily disbaled.
if err != nil { _, _ = start, end
return nil, &apiError{errorExec, err} panic("disabled")
}
defer q.Close()
res, err := q.MetricsForLabelMatchers(api.context(r), start, end, matcherSets...) // q, err := api.Storage.Querier()
if err != nil { // if err != nil {
return nil, &apiError{errorExec, err} // return nil, &apiError{errorExec, err}
} // }
// defer q.Close()
metrics := make([]model.Metric, 0, len(res)) // res, err := q.MetricsForLabelMatchers(api.context(r), start, end, matcherSets...)
for _, met := range res { // if err != nil {
metrics = append(metrics, met.Metric) // return nil, &apiError{errorExec, err}
} // }
return metrics, nil
// metrics := make([]model.Metric, 0, len(res))
// for _, met := range res {
// metrics = append(metrics, met.Metric)
// }
// return metrics, nil
} }
func (api *API) dropSeries(r *http.Request) (interface{}, *apiError) { func (api *API) dropSeries(r *http.Request) (interface{}, *apiError) {
@ -317,25 +321,28 @@ func (api *API) dropSeries(r *http.Request) (interface{}, *apiError) {
return nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")} return nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")}
} }
numDeleted := 0 // TODO(fabxc): temporarily disabled
for _, s := range r.Form["match[]"] { panic("disabled")
matchers, err := promql.ParseMetricSelector(s)
if err != nil {
return nil, &apiError{errorBadData, err}
}
n, err := api.Storage.DropMetricsForLabelMatchers(context.TODO(), matchers...)
if err != nil {
return nil, &apiError{errorExec, err}
}
numDeleted += n
}
res := struct { // numDeleted := 0
NumDeleted int `json:"numDeleted"` // for _, s := range r.Form["match[]"] {
}{ // matchers, err := promql.ParseMetricSelector(s)
NumDeleted: numDeleted, // if err != nil {
} // return nil, &apiError{errorBadData, err}
return res, nil // }
// n, err := api.Storage.DropMetricsForLabelMatchers(context.TODO(), matchers...)
// if err != nil {
// return nil, &apiError{errorExec, err}
// }
// numDeleted += n
// }
// res := struct {
// NumDeleted int `json:"numDeleted"`
// }{
// NumDeleted: numDeleted,
// }
// return res, nil
} }
type Target struct { type Target struct {
@ -417,15 +424,15 @@ func respondError(w http.ResponseWriter, apiErr *apiError, data interface{}) {
w.Write(b) w.Write(b)
} }
func parseTime(s string) (model.Time, error) { func parseTime(s string) (time.Time, error) {
if t, err := strconv.ParseFloat(s, 64); err == nil { if t, err := strconv.ParseFloat(s, 64); err == nil {
ts := int64(t * float64(time.Second)) s, ns := math.Modf(t)
return model.TimeFromUnixNano(ts), nil return time.Unix(int64(s), int64(ns*float64(time.Second))), nil
} }
if t, err := time.Parse(time.RFC3339Nano, s); err == nil { if t, err := time.Parse(time.RFC3339Nano, s); err == nil {
return model.TimeFromUnixNano(t.UnixNano()), nil return t, nil
} }
return 0, fmt.Errorf("cannot parse %q to a valid timestamp", s) return time.Time{}, fmt.Errorf("cannot parse %q to a valid timestamp", s)
} }
func parseDuration(s string) (time.Duration, error) { func parseDuration(s string) (time.Duration, error) {

View File

@ -15,17 +15,13 @@ package web
import ( import (
"net/http" "net/http"
"sort"
"github.com/golang/protobuf/proto" "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt" "github.com/prometheus/common/expfmt"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage/metric"
) )
var ( var (
@ -41,7 +37,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
req.ParseForm() req.ParseForm()
var matcherSets []metric.LabelMatchers var matcherSets [][]*promql.LabelMatcher
for _, s := range req.Form["match[]"] { for _, s := range req.Form["match[]"] {
matchers, err := promql.ParseMetricSelector(s) matchers, err := promql.ParseMetricSelector(s)
if err != nil { if err != nil {
@ -52,102 +48,114 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
} }
var ( var (
minTimestamp = h.now().Add(-promql.StalenessDelta) // minTimestamp = h.now().Add(-promql.StalenessDelta)
format = expfmt.Negotiate(req.Header) format = expfmt.Negotiate(req.Header)
enc = expfmt.NewEncoder(w, format) // enc = expfmt.NewEncoder(w, format)
) )
w.Header().Set("Content-Type", string(format)) w.Header().Set("Content-Type", string(format))
q, err := h.storage.Querier()
if err != nil {
federationErrors.Inc() federationErrors.Inc()
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, errors.Errorf("federation disabled").Error(), http.StatusInternalServerError)
return return
}
defer q.Close()
vector, err := q.LastSampleForLabelMatchers(h.context, minTimestamp, matcherSets...) // q, err := h.storage.Querier()
if err != nil { // if err != nil {
federationErrors.Inc() // federationErrors.Inc()
http.Error(w, err.Error(), http.StatusInternalServerError) // http.Error(w, err.Error(), http.StatusInternalServerError)
return // return
} // }
sort.Sort(byName(vector)) // defer q.Close()
var ( // TODO(fabxc): support via TSDB storage.
lastMetricName model.LabelValue
protMetricFam *dto.MetricFamily
)
for _, s := range vector {
nameSeen := false
globalUsed := map[model.LabelName]struct{}{}
protMetric := &dto.Metric{
Untyped: &dto.Untyped{},
}
for ln, lv := range s.Metric { // var sets []tsdb.SeriesSet
if lv == "" { // for _, matchers := range matcherSets {
// No value means unset. Never consider those labels. // set, err := q.Select(matchers)
// This is also important to protect against nameless metrics. // sets = append(sets, set)
continue // }
}
if ln == model.MetricNameLabel {
nameSeen = true
if lv == lastMetricName {
// We already have the name in the current MetricFamily,
// and we ignore nameless metrics.
continue
}
// Need to start a new MetricFamily. Ship off the old one (if any) before
// creating the new one.
if protMetricFam != nil {
if err := enc.Encode(protMetricFam); err != nil {
federationErrors.Inc()
log.With("err", err).Error("federation failed")
return
}
}
protMetricFam = &dto.MetricFamily{
Type: dto.MetricType_UNTYPED.Enum(),
Name: proto.String(string(lv)),
}
lastMetricName = lv
continue
}
protMetric.Label = append(protMetric.Label, &dto.LabelPair{
Name: proto.String(string(ln)),
Value: proto.String(string(lv)),
})
if _, ok := h.externalLabels[ln]; ok {
globalUsed[ln] = struct{}{}
}
}
if !nameSeen {
log.With("metric", s.Metric).Warn("Ignoring nameless metric during federation.")
continue
}
// Attach global labels if they do not exist yet.
for ln, lv := range h.externalLabels {
if _, ok := globalUsed[ln]; !ok {
protMetric.Label = append(protMetric.Label, &dto.LabelPair{
Name: proto.String(string(ln)),
Value: proto.String(string(lv)),
})
}
}
protMetric.TimestampMs = proto.Int64(int64(s.Timestamp)) // vector, err := q.LastSampleForLabelMatchers(h.context, minTimestamp, matcherSets...)
protMetric.Untyped.Value = proto.Float64(float64(s.Value)) // if err != nil {
// federationErrors.Inc()
// http.Error(w, err.Error(), http.StatusInternalServerError)
// return
// }
// sort.Sort(byName(vector))
protMetricFam.Metric = append(protMetricFam.Metric, protMetric) // var (
} // lastMetricName model.LabelValue
// Still have to ship off the last MetricFamily, if any. // protMetricFam *dto.MetricFamily
if protMetricFam != nil { // )
if err := enc.Encode(protMetricFam); err != nil { // for _, s := range vector {
federationErrors.Inc() // nameSeen := false
log.With("err", err).Error("federation failed") // globalUsed := map[model.LabelName]struct{}{}
} // protMetric := &dto.Metric{
} // Untyped: &dto.Untyped{},
// }
// for ln, lv := range s.Metric {
// if lv == "" {
// // No value means unset. Never consider those labels.
// // This is also important to protect against nameless metrics.
// continue
// }
// if ln == model.MetricNameLabel {
// nameSeen = true
// if lv == lastMetricName {
// // We already have the name in the current MetricFamily,
// // and we ignore nameless metrics.
// continue
// }
// // Need to start a new MetricFamily. Ship off the old one (if any) before
// // creating the new one.
// if protMetricFam != nil {
// if err := enc.Encode(protMetricFam); err != nil {
// federationErrors.Inc()
// log.With("err", err).Error("federation failed")
// return
// }
// }
// protMetricFam = &dto.MetricFamily{
// Type: dto.MetricType_UNTYPED.Enum(),
// Name: proto.String(string(lv)),
// }
// lastMetricName = lv
// continue
// }
// protMetric.Label = append(protMetric.Label, &dto.LabelPair{
// Name: proto.String(string(ln)),
// Value: proto.String(string(lv)),
// })
// if _, ok := h.externalLabels[ln]; ok {
// globalUsed[ln] = struct{}{}
// }
// }
// if !nameSeen {
// log.With("metric", s.Metric).Warn("Ignoring nameless metric during federation.")
// continue
// }
// // Attach global labels if they do not exist yet.
// for ln, lv := range h.externalLabels {
// if _, ok := globalUsed[ln]; !ok {
// protMetric.Label = append(protMetric.Label, &dto.LabelPair{
// Name: proto.String(string(ln)),
// Value: proto.String(string(lv)),
// })
// }
// }
// protMetric.TimestampMs = proto.Int64(int64(s.Timestamp))
// protMetric.Untyped.Value = proto.Float64(float64(s.Value))
// protMetricFam.Metric = append(protMetricFam.Metric, protMetric)
// }
// // Still have to ship off the last MetricFamily, if any.
// if protMetricFam != nil {
// if err := enc.Encode(protMetricFam); err != nil {
// federationErrors.Inc()
// log.With("err", err).Error("federation failed")
// }
// }
} }
// byName makes a model.Vector sortable by metric name. // byName makes a model.Vector sortable by metric name.