mirror of https://github.com/prometheus/prometheus
commit
71af5e29e8
10
CHANGELOG.md
10
CHANGELOG.md
|
@ -1,3 +1,13 @@
|
||||||
|
## 2.3.2 / 2018-07-12
|
||||||
|
|
||||||
|
* [BUGFIX] Fix various tsdb bugs #4369
|
||||||
|
* [BUGFIX] Reorder startup and shutdown to prevent panics. #4321
|
||||||
|
* [BUGFIX] Exit with non-zero code on error #4296
|
||||||
|
* [BUGFIX] discovery/kubernetes/ingress: fix scheme discovery #4329
|
||||||
|
* [BUGFIX] Fix race in zookeeper sd #4355
|
||||||
|
* [BUGFIX] Better timeout handling in promql #4291 #4300
|
||||||
|
* [BUGFIX] Propogate errors when selecting series from the tsdb #4136
|
||||||
|
|
||||||
## 2.3.1 / 2018-06-19
|
## 2.3.1 / 2018-06-19
|
||||||
|
|
||||||
* [BUGFIX] Avoid infinite loop on duplicate NaN values. #4275
|
* [BUGFIX] Avoid infinite loop on duplicate NaN values. #4275
|
||||||
|
|
|
@ -472,7 +472,9 @@ func main() {
|
||||||
|
|
||||||
},
|
},
|
||||||
func(err error) {
|
func(err error) {
|
||||||
close(cancel)
|
// Wait for any in-progress reloads to complete to avoid
|
||||||
|
// reloading things after they have been shutdown.
|
||||||
|
cancel <- struct{}{}
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
@ -506,6 +508,23 @@ func main() {
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
{
|
||||||
|
// Rule manager.
|
||||||
|
// TODO(krasi) refactor ruleManager.Run() to be blocking to avoid using an extra blocking channel.
|
||||||
|
cancel := make(chan struct{})
|
||||||
|
g.Add(
|
||||||
|
func() error {
|
||||||
|
<-reloadReady.C
|
||||||
|
ruleManager.Run()
|
||||||
|
<-cancel
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
func(err error) {
|
||||||
|
ruleManager.Stop()
|
||||||
|
close(cancel)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
{
|
{
|
||||||
// TSDB.
|
// TSDB.
|
||||||
cancel := make(chan struct{})
|
cancel := make(chan struct{})
|
||||||
|
@ -547,30 +566,10 @@ func main() {
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
func(err error) {
|
func(err error) {
|
||||||
// Keep this interrupt before the ruleManager.Stop().
|
|
||||||
// Shutting down the query engine before the rule manager will cause pending queries
|
|
||||||
// to be canceled and ensures a quick shutdown of the rule manager.
|
|
||||||
cancelWeb()
|
cancelWeb()
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
{
|
|
||||||
// Rule manager.
|
|
||||||
|
|
||||||
// TODO(krasi) refactor ruleManager.Run() to be blocking to avoid using an extra blocking channel.
|
|
||||||
cancel := make(chan struct{})
|
|
||||||
g.Add(
|
|
||||||
func() error {
|
|
||||||
ruleManager.Run()
|
|
||||||
<-cancel
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
func(err error) {
|
|
||||||
ruleManager.Stop()
|
|
||||||
close(cancel)
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
{
|
{
|
||||||
// Notifier.
|
// Notifier.
|
||||||
|
|
||||||
|
@ -595,6 +594,7 @@ func main() {
|
||||||
}
|
}
|
||||||
if err := g.Run(); err != nil {
|
if err := g.Run(); err != nil {
|
||||||
level.Error(logger).Log("err", err)
|
level.Error(logger).Log("err", err)
|
||||||
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
level.Info(logger).Log("msg", "See you next time!")
|
level.Info(logger).Log("msg", "See you next time!")
|
||||||
}
|
}
|
||||||
|
@ -626,6 +626,7 @@ func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config
|
||||||
if failed {
|
if failed {
|
||||||
return fmt.Errorf("one or more errors occurred while applying the new configuration (--config.file=%s)", filename)
|
return fmt.Errorf("one or more errors occurred while applying the new configuration (--config.file=%s)", filename)
|
||||||
}
|
}
|
||||||
|
level.Info(logger).Log("msg", "Completed loading of configuration file", "filename", filename)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"syscall"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -155,3 +156,20 @@ func TestComputeExternalURL(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Let's provide an invalid configuration file and verify the exit status indicates the error.
|
||||||
|
func TestFailedStartupExitCode(t *testing.T) {
|
||||||
|
fakeInputFile := "fake-input-file"
|
||||||
|
expectedExitStatus := 1
|
||||||
|
|
||||||
|
prom := exec.Command(promPath, "--config.file="+fakeInputFile)
|
||||||
|
err := prom.Run()
|
||||||
|
testutil.NotOk(t, err, "")
|
||||||
|
|
||||||
|
if exitError, ok := err.(*exec.ExitError); ok {
|
||||||
|
status := exitError.Sys().(syscall.WaitStatus)
|
||||||
|
testutil.Equals(t, expectedExitStatus, status.ExitStatus())
|
||||||
|
} else {
|
||||||
|
t.Errorf("unable to retrieve the exit status for prometheus: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -176,13 +176,22 @@ func (s *Ingress) buildIngress(ingress *v1beta1.Ingress) *targetgroup.Group {
|
||||||
}
|
}
|
||||||
tg.Labels = ingressLabels(ingress)
|
tg.Labels = ingressLabels(ingress)
|
||||||
|
|
||||||
schema := "http"
|
tlsHosts := make(map[string]struct{})
|
||||||
if ingress.Spec.TLS != nil {
|
for _, tls := range ingress.Spec.TLS {
|
||||||
schema = "https"
|
for _, host := range tls.Hosts {
|
||||||
|
tlsHosts[host] = struct{}{}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, rule := range ingress.Spec.Rules {
|
for _, rule := range ingress.Spec.Rules {
|
||||||
paths := pathsFromIngressRule(&rule.IngressRuleValue)
|
paths := pathsFromIngressRule(&rule.IngressRuleValue)
|
||||||
|
|
||||||
|
schema := "http"
|
||||||
|
_, isTLS := tlsHosts[rule.Host]
|
||||||
|
if isTLS {
|
||||||
|
schema = "https"
|
||||||
|
}
|
||||||
|
|
||||||
for _, path := range paths {
|
for _, path := range paths {
|
||||||
tg.Targets = append(tg.Targets, model.LabelSet{
|
tg.Targets = append(tg.Targets, model.LabelSet{
|
||||||
model.AddressLabel: lv(rule.Host),
|
model.AddressLabel: lv(rule.Host),
|
||||||
|
|
|
@ -23,8 +23,16 @@ import (
|
||||||
"k8s.io/client-go/pkg/apis/extensions/v1beta1"
|
"k8s.io/client-go/pkg/apis/extensions/v1beta1"
|
||||||
)
|
)
|
||||||
|
|
||||||
func makeIngress(tls []v1beta1.IngressTLS) *v1beta1.Ingress {
|
type TLSMode int
|
||||||
return &v1beta1.Ingress{
|
|
||||||
|
const (
|
||||||
|
TLSNo TLSMode = iota
|
||||||
|
TLSYes
|
||||||
|
TLSMixed
|
||||||
|
)
|
||||||
|
|
||||||
|
func makeIngress(tls TLSMode) *v1beta1.Ingress {
|
||||||
|
ret := &v1beta1.Ingress{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Name: "testingress",
|
Name: "testingress",
|
||||||
Namespace: "default",
|
Namespace: "default",
|
||||||
|
@ -32,7 +40,7 @@ func makeIngress(tls []v1beta1.IngressTLS) *v1beta1.Ingress {
|
||||||
Annotations: map[string]string{"testannotation": "testannotationvalue"},
|
Annotations: map[string]string{"testannotation": "testannotationvalue"},
|
||||||
},
|
},
|
||||||
Spec: v1beta1.IngressSpec{
|
Spec: v1beta1.IngressSpec{
|
||||||
TLS: tls,
|
TLS: nil,
|
||||||
Rules: []v1beta1.IngressRule{
|
Rules: []v1beta1.IngressRule{
|
||||||
{
|
{
|
||||||
Host: "example.com",
|
Host: "example.com",
|
||||||
|
@ -63,31 +71,47 @@ func makeIngress(tls []v1beta1.IngressTLS) *v1beta1.Ingress {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
switch tls {
|
||||||
|
case TLSYes:
|
||||||
|
ret.Spec.TLS = []v1beta1.IngressTLS{{Hosts: []string{"example.com", "test.example.com"}}}
|
||||||
|
case TLSMixed:
|
||||||
|
ret.Spec.TLS = []v1beta1.IngressTLS{{Hosts: []string{"example.com"}}}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
func expectedTargetGroups(ns string, tls bool) map[string]*targetgroup.Group {
|
func expectedTargetGroups(ns string, tls TLSMode) map[string]*targetgroup.Group {
|
||||||
scheme := "http"
|
scheme1 := "http"
|
||||||
if tls {
|
scheme2 := "http"
|
||||||
scheme = "https"
|
|
||||||
|
switch tls {
|
||||||
|
case TLSYes:
|
||||||
|
scheme1 = "https"
|
||||||
|
scheme2 = "https"
|
||||||
|
case TLSMixed:
|
||||||
|
scheme1 = "https"
|
||||||
}
|
}
|
||||||
|
|
||||||
key := fmt.Sprintf("ingress/%s/testingress", ns)
|
key := fmt.Sprintf("ingress/%s/testingress", ns)
|
||||||
return map[string]*targetgroup.Group{
|
return map[string]*targetgroup.Group{
|
||||||
key: {
|
key: {
|
||||||
Targets: []model.LabelSet{
|
Targets: []model.LabelSet{
|
||||||
{
|
{
|
||||||
"__meta_kubernetes_ingress_scheme": lv(scheme),
|
"__meta_kubernetes_ingress_scheme": lv(scheme1),
|
||||||
"__meta_kubernetes_ingress_host": "example.com",
|
"__meta_kubernetes_ingress_host": "example.com",
|
||||||
"__meta_kubernetes_ingress_path": "/",
|
"__meta_kubernetes_ingress_path": "/",
|
||||||
"__address__": "example.com",
|
"__address__": "example.com",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"__meta_kubernetes_ingress_scheme": lv(scheme),
|
"__meta_kubernetes_ingress_scheme": lv(scheme1),
|
||||||
"__meta_kubernetes_ingress_host": "example.com",
|
"__meta_kubernetes_ingress_host": "example.com",
|
||||||
"__meta_kubernetes_ingress_path": "/foo",
|
"__meta_kubernetes_ingress_path": "/foo",
|
||||||
"__address__": "example.com",
|
"__address__": "example.com",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"__meta_kubernetes_ingress_scheme": lv(scheme),
|
"__meta_kubernetes_ingress_scheme": lv(scheme2),
|
||||||
"__meta_kubernetes_ingress_host": "test.example.com",
|
"__meta_kubernetes_ingress_host": "test.example.com",
|
||||||
"__address__": "test.example.com",
|
"__address__": "test.example.com",
|
||||||
"__meta_kubernetes_ingress_path": "/",
|
"__meta_kubernetes_ingress_path": "/",
|
||||||
|
@ -110,12 +134,12 @@ func TestIngressDiscoveryAdd(t *testing.T) {
|
||||||
k8sDiscoveryTest{
|
k8sDiscoveryTest{
|
||||||
discovery: n,
|
discovery: n,
|
||||||
afterStart: func() {
|
afterStart: func() {
|
||||||
obj := makeIngress(nil)
|
obj := makeIngress(TLSNo)
|
||||||
c.ExtensionsV1beta1().Ingresses("default").Create(obj)
|
c.ExtensionsV1beta1().Ingresses("default").Create(obj)
|
||||||
w.Ingresses().Add(obj)
|
w.Ingresses().Add(obj)
|
||||||
},
|
},
|
||||||
expectedMaxItems: 1,
|
expectedMaxItems: 1,
|
||||||
expectedRes: expectedTargetGroups("default", false),
|
expectedRes: expectedTargetGroups("default", TLSNo),
|
||||||
}.Run(t)
|
}.Run(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -125,27 +149,42 @@ func TestIngressDiscoveryAddTLS(t *testing.T) {
|
||||||
k8sDiscoveryTest{
|
k8sDiscoveryTest{
|
||||||
discovery: n,
|
discovery: n,
|
||||||
afterStart: func() {
|
afterStart: func() {
|
||||||
obj := makeIngress([]v1beta1.IngressTLS{{}})
|
obj := makeIngress(TLSYes)
|
||||||
c.ExtensionsV1beta1().Ingresses("default").Create(obj)
|
c.ExtensionsV1beta1().Ingresses("default").Create(obj)
|
||||||
w.Ingresses().Add(obj)
|
w.Ingresses().Add(obj)
|
||||||
},
|
},
|
||||||
expectedMaxItems: 1,
|
expectedMaxItems: 1,
|
||||||
expectedRes: expectedTargetGroups("default", true),
|
expectedRes: expectedTargetGroups("default", TLSYes),
|
||||||
|
}.Run(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestIngressDiscoveryAddMixed(t *testing.T) {
|
||||||
|
n, c, w := makeDiscovery(RoleIngress, NamespaceDiscovery{Names: []string{"default"}})
|
||||||
|
|
||||||
|
k8sDiscoveryTest{
|
||||||
|
discovery: n,
|
||||||
|
afterStart: func() {
|
||||||
|
obj := makeIngress(TLSMixed)
|
||||||
|
c.ExtensionsV1beta1().Ingresses("default").Create(obj)
|
||||||
|
w.Ingresses().Add(obj)
|
||||||
|
},
|
||||||
|
expectedMaxItems: 1,
|
||||||
|
expectedRes: expectedTargetGroups("default", TLSMixed),
|
||||||
}.Run(t)
|
}.Run(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestIngressDiscoveryNamespaces(t *testing.T) {
|
func TestIngressDiscoveryNamespaces(t *testing.T) {
|
||||||
n, c, w := makeDiscovery(RoleIngress, NamespaceDiscovery{Names: []string{"ns1", "ns2"}})
|
n, c, w := makeDiscovery(RoleIngress, NamespaceDiscovery{Names: []string{"ns1", "ns2"}})
|
||||||
|
|
||||||
expected := expectedTargetGroups("ns1", false)
|
expected := expectedTargetGroups("ns1", TLSNo)
|
||||||
for k, v := range expectedTargetGroups("ns2", false) {
|
for k, v := range expectedTargetGroups("ns2", TLSNo) {
|
||||||
expected[k] = v
|
expected[k] = v
|
||||||
}
|
}
|
||||||
k8sDiscoveryTest{
|
k8sDiscoveryTest{
|
||||||
discovery: n,
|
discovery: n,
|
||||||
afterStart: func() {
|
afterStart: func() {
|
||||||
for _, ns := range []string{"ns1", "ns2"} {
|
for _, ns := range []string{"ns1", "ns2"} {
|
||||||
obj := makeIngress(nil)
|
obj := makeIngress(TLSNo)
|
||||||
obj.Namespace = ns
|
obj.Namespace = ns
|
||||||
c.ExtensionsV1beta1().Ingresses(obj.Namespace).Create(obj)
|
c.ExtensionsV1beta1().Ingresses(obj.Namespace).Create(obj)
|
||||||
w.Ingresses().Add(obj)
|
w.Ingresses().Add(obj)
|
||||||
|
|
|
@ -137,8 +137,11 @@ func NewDiscovery(
|
||||||
logger = log.NewNopLogger()
|
logger = log.NewNopLogger()
|
||||||
}
|
}
|
||||||
|
|
||||||
conn, _, err := zk.Connect(srvs, timeout)
|
conn, _, err := zk.Connect(
|
||||||
conn.SetLogger(treecache.NewZookeeperLogger(logger))
|
srvs, timeout,
|
||||||
|
func(c *zk.Conn) {
|
||||||
|
c.SetLogger(treecache.NewZookeeperLogger(logger))
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -237,57 +237,80 @@ type VectorMatching struct {
|
||||||
|
|
||||||
// Visitor allows visiting a Node and its child nodes. The Visit method is
|
// Visitor allows visiting a Node and its child nodes. The Visit method is
|
||||||
// invoked for each node with the path leading to the node provided additionally.
|
// invoked for each node with the path leading to the node provided additionally.
|
||||||
// If the result visitor w is not nil, Walk visits each of the children
|
// If the result visitor w is not nil and no error, Walk visits each of the children
|
||||||
// of node with the visitor w, followed by a call of w.Visit(nil, nil).
|
// of node with the visitor w, followed by a call of w.Visit(nil, nil).
|
||||||
type Visitor interface {
|
type Visitor interface {
|
||||||
Visit(node Node, path []Node) (w Visitor)
|
Visit(node Node, path []Node) (w Visitor, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Walk traverses an AST in depth-first order: It starts by calling
|
// Walk traverses an AST in depth-first order: It starts by calling
|
||||||
// v.Visit(node, path); node must not be nil. If the visitor w returned by
|
// v.Visit(node, path); node must not be nil. If the visitor w returned by
|
||||||
// v.Visit(node, path) is not nil, Walk is invoked recursively with visitor
|
// v.Visit(node, path) is not nil and the visitor returns no error, Walk is
|
||||||
// w for each of the non-nil children of node, followed by a call of
|
// invoked recursively with visitor w for each of the non-nil children of node,
|
||||||
// w.Visit(nil).
|
// followed by a call of w.Visit(nil), returning an error
|
||||||
// As the tree is descended the path of previous nodes is provided.
|
// As the tree is descended the path of previous nodes is provided.
|
||||||
func Walk(v Visitor, node Node, path []Node) {
|
func Walk(v Visitor, node Node, path []Node) error {
|
||||||
if v = v.Visit(node, path); v == nil {
|
var err error
|
||||||
return
|
if v, err = v.Visit(node, path); v == nil || err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
path = append(path, node)
|
path = append(path, node)
|
||||||
|
|
||||||
switch n := node.(type) {
|
switch n := node.(type) {
|
||||||
case Statements:
|
case Statements:
|
||||||
for _, s := range n {
|
for _, s := range n {
|
||||||
Walk(v, s, path)
|
if err := Walk(v, s, path); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
case *AlertStmt:
|
case *AlertStmt:
|
||||||
Walk(v, n.Expr, path)
|
if err := Walk(v, n.Expr, path); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
case *EvalStmt:
|
case *EvalStmt:
|
||||||
Walk(v, n.Expr, path)
|
if err := Walk(v, n.Expr, path); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
case *RecordStmt:
|
case *RecordStmt:
|
||||||
Walk(v, n.Expr, path)
|
if err := Walk(v, n.Expr, path); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
case Expressions:
|
case Expressions:
|
||||||
for _, e := range n {
|
for _, e := range n {
|
||||||
Walk(v, e, path)
|
if err := Walk(v, e, path); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
case *AggregateExpr:
|
case *AggregateExpr:
|
||||||
Walk(v, n.Expr, path)
|
if err := Walk(v, n.Expr, path); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
case *BinaryExpr:
|
case *BinaryExpr:
|
||||||
Walk(v, n.LHS, path)
|
if err := Walk(v, n.LHS, path); err != nil {
|
||||||
Walk(v, n.RHS, path)
|
return err
|
||||||
|
}
|
||||||
|
if err := Walk(v, n.RHS, path); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
case *Call:
|
case *Call:
|
||||||
Walk(v, n.Args, path)
|
if err := Walk(v, n.Args, path); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
case *ParenExpr:
|
case *ParenExpr:
|
||||||
Walk(v, n.Expr, path)
|
if err := Walk(v, n.Expr, path); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
case *UnaryExpr:
|
case *UnaryExpr:
|
||||||
Walk(v, n.Expr, path)
|
if err := Walk(v, n.Expr, path); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
case *MatrixSelector, *NumberLiteral, *StringLiteral, *VectorSelector:
|
case *MatrixSelector, *NumberLiteral, *StringLiteral, *VectorSelector:
|
||||||
// nothing to do
|
// nothing to do
|
||||||
|
@ -296,21 +319,23 @@ func Walk(v Visitor, node Node, path []Node) {
|
||||||
panic(fmt.Errorf("promql.Walk: unhandled node type %T", node))
|
panic(fmt.Errorf("promql.Walk: unhandled node type %T", node))
|
||||||
}
|
}
|
||||||
|
|
||||||
v.Visit(nil, nil)
|
_, err = v.Visit(nil, nil)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
type inspector func(Node, []Node) bool
|
type inspector func(Node, []Node) error
|
||||||
|
|
||||||
func (f inspector) Visit(node Node, path []Node) Visitor {
|
func (f inspector) Visit(node Node, path []Node) (Visitor, error) {
|
||||||
if f(node, path) {
|
if err := f(node, path); err == nil {
|
||||||
return f
|
return f, nil
|
||||||
|
} else {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Inspect traverses an AST in depth-first order: It starts by calling
|
// Inspect traverses an AST in depth-first order: It starts by calling
|
||||||
// f(node, path); node must not be nil. If f returns true, Inspect invokes f
|
// f(node, path); node must not be nil. If f returns a nil error, Inspect invokes f
|
||||||
// for all the non-nil children of node, recursively.
|
// for all the non-nil children of node, recursively.
|
||||||
func Inspect(node Node, f func(Node, []Node) bool) {
|
func Inspect(node Node, f inspector) {
|
||||||
Walk(inspector(f), node, nil)
|
Walk(inspector(f), node, nil)
|
||||||
}
|
}
|
||||||
|
|
|
@ -451,7 +451,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
|
||||||
|
|
||||||
func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *EvalStmt) (storage.Querier, error) {
|
func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *EvalStmt) (storage.Querier, error) {
|
||||||
var maxOffset time.Duration
|
var maxOffset time.Duration
|
||||||
Inspect(s.Expr, func(node Node, _ []Node) bool {
|
Inspect(s.Expr, func(node Node, _ []Node) error {
|
||||||
switch n := node.(type) {
|
switch n := node.(type) {
|
||||||
case *VectorSelector:
|
case *VectorSelector:
|
||||||
if maxOffset < LookbackDelta {
|
if maxOffset < LookbackDelta {
|
||||||
|
@ -468,7 +468,7 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
|
||||||
maxOffset = n.Offset + n.Range
|
maxOffset = n.Offset + n.Range
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
mint := s.Start.Add(-maxOffset)
|
mint := s.Start.Add(-maxOffset)
|
||||||
|
@ -478,7 +478,7 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
Inspect(s.Expr, func(node Node, path []Node) bool {
|
Inspect(s.Expr, func(node Node, path []Node) error {
|
||||||
var set storage.SeriesSet
|
var set storage.SeriesSet
|
||||||
params := &storage.SelectParams{
|
params := &storage.SelectParams{
|
||||||
Step: int64(s.Interval / time.Millisecond),
|
Step: int64(s.Interval / time.Millisecond),
|
||||||
|
@ -491,13 +491,13 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
|
||||||
set, err = querier.Select(params, n.LabelMatchers...)
|
set, err = querier.Select(params, n.LabelMatchers...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
level.Error(ng.logger).Log("msg", "error selecting series set", "err", err)
|
level.Error(ng.logger).Log("msg", "error selecting series set", "err", err)
|
||||||
return false
|
return err
|
||||||
}
|
}
|
||||||
n.series, err = expandSeriesSet(set)
|
n.series, err = expandSeriesSet(ctx, set)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO(fabxc): use multi-error.
|
// TODO(fabxc): use multi-error.
|
||||||
level.Error(ng.logger).Log("msg", "error expanding series set", "err", err)
|
level.Error(ng.logger).Log("msg", "error expanding series set", "err", err)
|
||||||
return false
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
case *MatrixSelector:
|
case *MatrixSelector:
|
||||||
|
@ -506,15 +506,15 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
|
||||||
set, err = querier.Select(params, n.LabelMatchers...)
|
set, err = querier.Select(params, n.LabelMatchers...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
level.Error(ng.logger).Log("msg", "error selecting series set", "err", err)
|
level.Error(ng.logger).Log("msg", "error selecting series set", "err", err)
|
||||||
return false
|
return err
|
||||||
}
|
}
|
||||||
n.series, err = expandSeriesSet(set)
|
n.series, err = expandSeriesSet(ctx, set)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
level.Error(ng.logger).Log("msg", "error expanding series set", "err", err)
|
level.Error(ng.logger).Log("msg", "error expanding series set", "err", err)
|
||||||
return false
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true
|
return nil
|
||||||
})
|
})
|
||||||
return querier, err
|
return querier, err
|
||||||
}
|
}
|
||||||
|
@ -538,8 +538,13 @@ func extractFuncFromPath(p []Node) string {
|
||||||
return extractFuncFromPath(p[:len(p)-1])
|
return extractFuncFromPath(p[:len(p)-1])
|
||||||
}
|
}
|
||||||
|
|
||||||
func expandSeriesSet(it storage.SeriesSet) (res []storage.Series, err error) {
|
func expandSeriesSet(ctx context.Context, it storage.SeriesSet) (res []storage.Series, err error) {
|
||||||
for it.Next() {
|
for it.Next() {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
res = append(res, it.At())
|
res = append(res, it.At())
|
||||||
}
|
}
|
||||||
return res, it.Err()
|
return res, it.Err()
|
||||||
|
@ -1039,6 +1044,9 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix {
|
||||||
|
|
||||||
var it *storage.BufferedSeriesIterator
|
var it *storage.BufferedSeriesIterator
|
||||||
for i, s := range node.series {
|
for i, s := range node.series {
|
||||||
|
if err := contextDone(ev.ctx, "expression evaluation"); err != nil {
|
||||||
|
ev.error(err)
|
||||||
|
}
|
||||||
if it == nil {
|
if it == nil {
|
||||||
it = storage.NewBuffer(s.Iterator(), durationMilliseconds(node.Range))
|
it = storage.NewBuffer(s.Iterator(), durationMilliseconds(node.Range))
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -164,6 +164,13 @@ type BlockStats struct {
|
||||||
NumTombstones uint64 `json:"numTombstones,omitempty"`
|
NumTombstones uint64 `json:"numTombstones,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BlockDesc describes a block by ULID and time range.
|
||||||
|
type BlockDesc struct {
|
||||||
|
ULID ulid.ULID `json:"ulid"`
|
||||||
|
MinTime int64 `json:"minTime"`
|
||||||
|
MaxTime int64 `json:"maxTime"`
|
||||||
|
}
|
||||||
|
|
||||||
// BlockMetaCompaction holds information about compactions a block went through.
|
// BlockMetaCompaction holds information about compactions a block went through.
|
||||||
type BlockMetaCompaction struct {
|
type BlockMetaCompaction struct {
|
||||||
// Maximum number of compaction cycles any source block has
|
// Maximum number of compaction cycles any source block has
|
||||||
|
@ -171,6 +178,9 @@ type BlockMetaCompaction struct {
|
||||||
Level int `json:"level"`
|
Level int `json:"level"`
|
||||||
// ULIDs of all source head blocks that went into the block.
|
// ULIDs of all source head blocks that went into the block.
|
||||||
Sources []ulid.ULID `json:"sources,omitempty"`
|
Sources []ulid.ULID `json:"sources,omitempty"`
|
||||||
|
// Short descriptions of the direct blocks that were used to create
|
||||||
|
// this block.
|
||||||
|
Parents []BlockDesc `json:"parents,omitempty"`
|
||||||
Failed bool `json:"failed,omitempty"`
|
Failed bool `json:"failed,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -424,7 +434,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error {
|
||||||
ir := pb.indexr
|
ir := pb.indexr
|
||||||
|
|
||||||
// Choose only valid postings which have chunks in the time-range.
|
// Choose only valid postings which have chunks in the time-range.
|
||||||
stones := memTombstones{}
|
stones := NewMemTombstones()
|
||||||
|
|
||||||
var lset labels.Labels
|
var lset labels.Labels
|
||||||
var chks []chunks.Meta
|
var chks []chunks.Meta
|
||||||
|
@ -437,10 +447,10 @@ Outer:
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, chk := range chks {
|
for _, chk := range chks {
|
||||||
if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) {
|
if chk.OverlapsClosedInterval(mint, maxt) {
|
||||||
// Delete only until the current values and not beyond.
|
// Delete only until the current values and not beyond.
|
||||||
tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime)
|
tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime)
|
||||||
stones[p.At()] = Intervals{{tmin, tmax}}
|
stones.addInterval(p.At(), Interval{tmin, tmax})
|
||||||
continue Outer
|
continue Outer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -452,7 +462,7 @@ Outer:
|
||||||
|
|
||||||
err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
|
err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
|
||||||
for _, iv := range ivs {
|
for _, iv := range ivs {
|
||||||
stones.add(id, iv)
|
stones.addInterval(id, iv)
|
||||||
pb.meta.Stats.NumTombstones++
|
pb.meta.Stats.NumTombstones++
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -475,19 +485,17 @@ func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, error) {
|
||||||
|
|
||||||
pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
|
pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
|
||||||
numStones += len(ivs)
|
numStones += len(ivs)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
if numStones == 0 {
|
if numStones == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
uid, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime)
|
meta := pb.Meta()
|
||||||
|
uid, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime, &meta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &uid, nil
|
return &uid, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -531,6 +539,13 @@ func (pb *Block) Snapshot(dir string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Returns true if the block overlaps [mint, maxt].
|
||||||
|
func (pb *Block) OverlapsClosedInterval(mint, maxt int64) bool {
|
||||||
|
// The block itself is a half-open interval
|
||||||
|
// [pb.meta.MinTime, pb.meta.MaxTime).
|
||||||
|
return pb.meta.MinTime <= maxt && mint < pb.meta.MaxTime
|
||||||
|
}
|
||||||
|
|
||||||
func clampInterval(a, b, mint, maxt int64) (int64, int64) {
|
func clampInterval(a, b, mint, maxt int64) (int64, int64) {
|
||||||
if a < mint {
|
if a < mint {
|
||||||
a = mint
|
a = mint
|
||||||
|
|
|
@ -57,6 +57,12 @@ func (cm *Meta) writeHash(h hash.Hash) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Returns true if the chunk overlaps [mint, maxt].
|
||||||
|
func (cm *Meta) OverlapsClosedInterval(mint, maxt int64) bool {
|
||||||
|
// The chunk itself is a closed interval [cm.MinTime, cm.MaxTime].
|
||||||
|
return cm.MinTime <= maxt && mint <= cm.MaxTime
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
errInvalidSize = fmt.Errorf("invalid size")
|
errInvalidSize = fmt.Errorf("invalid size")
|
||||||
errInvalidFlag = fmt.Errorf("invalid flag")
|
errInvalidFlag = fmt.Errorf("invalid flag")
|
||||||
|
@ -296,7 +302,7 @@ func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, err
|
||||||
}
|
}
|
||||||
// Verify magic number.
|
// Verify magic number.
|
||||||
if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks {
|
if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks {
|
||||||
return nil, fmt.Errorf("invalid magic number %x", m)
|
return nil, errors.Errorf("invalid magic number %x", m)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return &cr, nil
|
return &cr, nil
|
||||||
|
@ -357,8 +363,8 @@ func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) {
|
||||||
r := b.Range(off, off+binary.MaxVarintLen32)
|
r := b.Range(off, off+binary.MaxVarintLen32)
|
||||||
|
|
||||||
l, n := binary.Uvarint(r)
|
l, n := binary.Uvarint(r)
|
||||||
if n < 0 {
|
if n <= 0 {
|
||||||
return nil, fmt.Errorf("reading chunk length failed")
|
return nil, errors.Errorf("reading chunk length failed with %d", n)
|
||||||
}
|
}
|
||||||
r = b.Range(off+n, off+n+int(l))
|
r = b.Range(off+n, off+n+int(l))
|
||||||
|
|
||||||
|
|
|
@ -55,7 +55,7 @@ type Compactor interface {
|
||||||
Plan(dir string) ([]string, error)
|
Plan(dir string) ([]string, error)
|
||||||
|
|
||||||
// Write persists a Block into a directory.
|
// Write persists a Block into a directory.
|
||||||
Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error)
|
Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error)
|
||||||
|
|
||||||
// Compact runs compaction against the provided directories. Must
|
// Compact runs compaction against the provided directories. Must
|
||||||
// only be called concurrently with results of Plan().
|
// only be called concurrently with results of Plan().
|
||||||
|
@ -297,6 +297,11 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
|
||||||
for _, s := range b.Compaction.Sources {
|
for _, s := range b.Compaction.Sources {
|
||||||
sources[s] = struct{}{}
|
sources[s] = struct{}{}
|
||||||
}
|
}
|
||||||
|
res.Compaction.Parents = append(res.Compaction.Parents, BlockDesc{
|
||||||
|
ULID: b.ULID,
|
||||||
|
MinTime: b.MinTime,
|
||||||
|
MaxTime: b.MaxTime,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
res.Compaction.Level++
|
res.Compaction.Level++
|
||||||
|
|
||||||
|
@ -367,7 +372,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (uid ulid.ULID,
|
||||||
return uid, merr
|
return uid, merr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) {
|
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) {
|
||||||
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
uid := ulid.MustNew(ulid.Now(), entropy)
|
uid := ulid.MustNew(ulid.Now(), entropy)
|
||||||
|
|
||||||
|
@ -379,6 +384,12 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) (
|
||||||
meta.Compaction.Level = 1
|
meta.Compaction.Level = 1
|
||||||
meta.Compaction.Sources = []ulid.ULID{uid}
|
meta.Compaction.Sources = []ulid.ULID{uid}
|
||||||
|
|
||||||
|
if parent != nil {
|
||||||
|
meta.Compaction.Parents = []BlockDesc{
|
||||||
|
{ULID: parent.ULID, MinTime: parent.MinTime, MaxTime: parent.MaxTime},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
err := c.write(dest, meta, b)
|
err := c.write(dest, meta, b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return uid, err
|
return uid, err
|
||||||
|
@ -472,7 +483,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create an empty tombstones file.
|
// Create an empty tombstones file.
|
||||||
if err := writeTombstoneFile(tmp, EmptyTombstoneReader()); err != nil {
|
if err := writeTombstoneFile(tmp, NewMemTombstones()); err != nil {
|
||||||
return errors.Wrap(err, "write new tombstones file")
|
return errors.Wrap(err, "write new tombstones file")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -581,7 +592,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||||
if len(dranges) > 0 {
|
if len(dranges) > 0 {
|
||||||
// Re-encode the chunk to not have deleted values.
|
// Re-encode the chunk to not have deleted values.
|
||||||
for i, chk := range chks {
|
for i, chk := range chks {
|
||||||
if !intervalOverlap(dranges[0].Mint, dranges[len(dranges)-1].Maxt, chk.MinTime, chk.MaxTime) {
|
if !chk.OverlapsClosedInterval(dranges[0].Mint, dranges[len(dranges)-1].Maxt) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -75,7 +75,7 @@ type Appender interface {
|
||||||
// Returned reference numbers are ephemeral and may be rejected in calls
|
// Returned reference numbers are ephemeral and may be rejected in calls
|
||||||
// to AddFast() at any point. Adding the sample via Add() returns a new
|
// to AddFast() at any point. Adding the sample via Add() returns a new
|
||||||
// reference number.
|
// reference number.
|
||||||
// If the reference is the empty string it must not be used for caching.
|
// If the reference is 0 it must not be used for caching.
|
||||||
Add(l labels.Labels, t int64, v float64) (uint64, error)
|
Add(l labels.Labels, t int64, v float64) (uint64, error)
|
||||||
|
|
||||||
// Add adds a sample pair for the referenced series. It is generally faster
|
// Add adds a sample pair for the referenced series. It is generally faster
|
||||||
|
@ -267,17 +267,9 @@ func (db *DB) run() {
|
||||||
case <-db.compactc:
|
case <-db.compactc:
|
||||||
db.metrics.compactionsTriggered.Inc()
|
db.metrics.compactionsTriggered.Inc()
|
||||||
|
|
||||||
_, err1 := db.retentionCutoff()
|
_, err := db.compact()
|
||||||
if err1 != nil {
|
if err != nil {
|
||||||
level.Error(db.logger).Log("msg", "retention cutoff failed", "err", err1)
|
level.Error(db.logger).Log("msg", "compaction failed", "err", err)
|
||||||
}
|
|
||||||
|
|
||||||
_, err2 := db.compact()
|
|
||||||
if err2 != nil {
|
|
||||||
level.Error(db.logger).Log("msg", "compaction failed", "err", err2)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err1 != nil || err2 != nil {
|
|
||||||
backoff = exponential(backoff, 1*time.Second, 1*time.Minute)
|
backoff = exponential(backoff, 1*time.Second, 1*time.Minute)
|
||||||
} else {
|
} else {
|
||||||
backoff = 0
|
backoff = 0
|
||||||
|
@ -289,19 +281,9 @@ func (db *DB) run() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) retentionCutoff() (b bool, err error) {
|
func (db *DB) beyondRetention(meta *BlockMeta) bool {
|
||||||
defer func() {
|
|
||||||
if !b && err == nil {
|
|
||||||
// no data had to be cut off.
|
|
||||||
return
|
|
||||||
}
|
|
||||||
db.metrics.cutoffs.Inc()
|
|
||||||
if err != nil {
|
|
||||||
db.metrics.cutoffsFailed.Inc()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
if db.opts.RetentionDuration == 0 {
|
if db.opts.RetentionDuration == 0 {
|
||||||
return false, nil
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
db.mtx.RLock()
|
db.mtx.RLock()
|
||||||
|
@ -309,23 +291,13 @@ func (db *DB) retentionCutoff() (b bool, err error) {
|
||||||
db.mtx.RUnlock()
|
db.mtx.RUnlock()
|
||||||
|
|
||||||
if len(blocks) == 0 {
|
if len(blocks) == 0 {
|
||||||
return false, nil
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
last := blocks[len(db.blocks)-1]
|
last := blocks[len(db.blocks)-1]
|
||||||
|
|
||||||
mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration)
|
mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration)
|
||||||
dirs, err := retentionCutoffDirs(db.dir, mint)
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// This will close the dirs and then delete the dirs.
|
return meta.MaxTime < mint
|
||||||
if len(dirs) > 0 {
|
|
||||||
return true, db.reload(dirs...)
|
|
||||||
}
|
|
||||||
|
|
||||||
return false, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Appender opens a new appender against the database.
|
// Appender opens a new appender against the database.
|
||||||
|
@ -354,6 +326,13 @@ func (a dbAppender) Commit() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Compact data if possible. After successful compaction blocks are reloaded
|
||||||
|
// which will also trigger blocks to be deleted that fall out of the retention
|
||||||
|
// window.
|
||||||
|
// If no blocks are compacted, the retention window state doesn't change. Thus,
|
||||||
|
// this is sufficient to reliably delete old data.
|
||||||
|
// Old blocks are only deleted on reload based on the new block's parent information.
|
||||||
|
// See DB.reload documentation for further information.
|
||||||
func (db *DB) compact() (changes bool, err error) {
|
func (db *DB) compact() (changes bool, err error) {
|
||||||
db.cmtx.Lock()
|
db.cmtx.Lock()
|
||||||
defer db.cmtx.Unlock()
|
defer db.cmtx.Unlock()
|
||||||
|
@ -381,9 +360,15 @@ func (db *DB) compact() (changes bool, err error) {
|
||||||
head := &rangeHead{
|
head := &rangeHead{
|
||||||
head: db.head,
|
head: db.head,
|
||||||
mint: mint,
|
mint: mint,
|
||||||
maxt: maxt,
|
// We remove 1 millisecond from maxt because block
|
||||||
|
// intervals are half-open: [b.MinTime, b.MaxTime). But
|
||||||
|
// chunk intervals are closed: [c.MinTime, c.MaxTime];
|
||||||
|
// so in order to make sure that overlaps are evaluated
|
||||||
|
// consistently, we explicitly remove the last value
|
||||||
|
// from the block interval here.
|
||||||
|
maxt: maxt - 1,
|
||||||
}
|
}
|
||||||
if _, err = db.compactor.Write(db.dir, head, mint, maxt); err != nil {
|
if _, err = db.compactor.Write(db.dir, head, mint, maxt, nil); err != nil {
|
||||||
return changes, errors.Wrap(err, "persist head block")
|
return changes, errors.Wrap(err, "persist head block")
|
||||||
}
|
}
|
||||||
changes = true
|
changes = true
|
||||||
|
@ -418,7 +403,7 @@ func (db *DB) compact() (changes bool, err error) {
|
||||||
changes = true
|
changes = true
|
||||||
runtime.GC()
|
runtime.GC()
|
||||||
|
|
||||||
if err := db.reload(plan...); err != nil {
|
if err := db.reload(); err != nil {
|
||||||
return changes, errors.Wrap(err, "reload blocks")
|
return changes, errors.Wrap(err, "reload blocks")
|
||||||
}
|
}
|
||||||
runtime.GC()
|
runtime.GC()
|
||||||
|
@ -427,39 +412,6 @@ func (db *DB) compact() (changes bool, err error) {
|
||||||
return changes, nil
|
return changes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// retentionCutoffDirs returns all directories of blocks in dir that are strictly
|
|
||||||
// before mint.
|
|
||||||
func retentionCutoffDirs(dir string, mint int64) ([]string, error) {
|
|
||||||
df, err := fileutil.OpenDir(dir)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrapf(err, "open directory")
|
|
||||||
}
|
|
||||||
defer df.Close()
|
|
||||||
|
|
||||||
dirs, err := blockDirs(dir)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrapf(err, "list block dirs %s", dir)
|
|
||||||
}
|
|
||||||
|
|
||||||
delDirs := []string{}
|
|
||||||
|
|
||||||
for _, dir := range dirs {
|
|
||||||
meta, err := readMetaFile(dir)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrapf(err, "read block meta %s", dir)
|
|
||||||
}
|
|
||||||
// The first block we encounter marks that we crossed the boundary
|
|
||||||
// of deletable blocks.
|
|
||||||
if meta.MaxTime >= mint {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
delDirs = append(delDirs, dir)
|
|
||||||
}
|
|
||||||
|
|
||||||
return delDirs, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {
|
func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {
|
||||||
for _, b := range db.blocks {
|
for _, b := range db.blocks {
|
||||||
if b.Meta().ULID == id {
|
if b.Meta().ULID == id {
|
||||||
|
@ -469,18 +421,10 @@ func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
func stringsContain(set []string, elem string) bool {
|
|
||||||
for _, e := range set {
|
|
||||||
if elem == e {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// reload on-disk blocks and trigger head truncation if new blocks appeared. It takes
|
// reload on-disk blocks and trigger head truncation if new blocks appeared. It takes
|
||||||
// a list of block directories which should be deleted during reload.
|
// a list of block directories which should be deleted during reload.
|
||||||
func (db *DB) reload(deleteable ...string) (err error) {
|
// Blocks that are obsolete due to replacement or retention will be deleted.
|
||||||
|
func (db *DB) reload() (err error) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
db.metrics.reloadsFailed.Inc()
|
db.metrics.reloadsFailed.Inc()
|
||||||
|
@ -492,21 +436,58 @@ func (db *DB) reload(deleteable ...string) (err error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "find blocks")
|
return errors.Wrap(err, "find blocks")
|
||||||
}
|
}
|
||||||
|
// We delete old blocks that have been superseded by new ones by gathering all parents
|
||||||
|
// from existing blocks. Those parents all have newer replacements and can be safely deleted
|
||||||
|
// after we loaded the other blocks.
|
||||||
|
// This makes us resilient against the process crashing towards the end of a compaction.
|
||||||
|
// Creation of a new block and deletion of its parents cannot happen atomically. By creating
|
||||||
|
// blocks with their parents, we can pick up the deletion where it left off during a crash.
|
||||||
var (
|
var (
|
||||||
blocks []*Block
|
blocks []*Block
|
||||||
exist = map[ulid.ULID]struct{}{}
|
corrupted = map[ulid.ULID]error{}
|
||||||
|
opened = map[ulid.ULID]struct{}{}
|
||||||
|
deleteable = map[ulid.ULID]struct{}{}
|
||||||
)
|
)
|
||||||
|
for _, dir := range dirs {
|
||||||
|
meta, err := readMetaFile(dir)
|
||||||
|
if err != nil {
|
||||||
|
// The block was potentially in the middle of being deleted during a crash.
|
||||||
|
// Skip it since we may delete it properly further down again.
|
||||||
|
level.Warn(db.logger).Log("msg", "read meta information", "err", err, "dir", dir)
|
||||||
|
|
||||||
|
ulid, err2 := ulid.Parse(filepath.Base(dir))
|
||||||
|
if err2 != nil {
|
||||||
|
level.Error(db.logger).Log("msg", "not a block dir", "dir", dir)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
corrupted[ulid] = err
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if db.beyondRetention(meta) {
|
||||||
|
deleteable[meta.ULID] = struct{}{}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, b := range meta.Compaction.Parents {
|
||||||
|
deleteable[b.ULID] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Blocks we failed to open should all be those we are want to delete anyway.
|
||||||
|
for c, err := range corrupted {
|
||||||
|
if _, ok := deleteable[c]; !ok {
|
||||||
|
return errors.Wrapf(err, "unexpected corrupted block %s", c)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Load new blocks into memory.
|
||||||
for _, dir := range dirs {
|
for _, dir := range dirs {
|
||||||
meta, err := readMetaFile(dir)
|
meta, err := readMetaFile(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "read meta information %s", dir)
|
return errors.Wrapf(err, "read meta information %s", dir)
|
||||||
}
|
}
|
||||||
// If the block is pending for deletion, don't add it to the new block set.
|
// Don't load blocks that are scheduled for deletion.
|
||||||
if stringsContain(deleteable, dir) {
|
if _, ok := deleteable[meta.ULID]; ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
// See if we already have the block in memory or open it otherwise.
|
||||||
b, ok := db.getBlock(meta.ULID)
|
b, ok := db.getBlock(meta.ULID)
|
||||||
if !ok {
|
if !ok {
|
||||||
b, err = OpenBlock(dir, db.chunkPool)
|
b, err = OpenBlock(dir, db.chunkPool)
|
||||||
|
@ -514,9 +495,8 @@ func (db *DB) reload(deleteable ...string) (err error) {
|
||||||
return errors.Wrapf(err, "open block %s", dir)
|
return errors.Wrapf(err, "open block %s", dir)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
blocks = append(blocks, b)
|
blocks = append(blocks, b)
|
||||||
exist[meta.ULID] = struct{}{}
|
opened[meta.ULID] = struct{}{}
|
||||||
}
|
}
|
||||||
sort.Slice(blocks, func(i, j int) bool {
|
sort.Slice(blocks, func(i, j int) bool {
|
||||||
return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime
|
return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime
|
||||||
|
@ -533,15 +513,19 @@ func (db *DB) reload(deleteable ...string) (err error) {
|
||||||
db.blocks = blocks
|
db.blocks = blocks
|
||||||
db.mtx.Unlock()
|
db.mtx.Unlock()
|
||||||
|
|
||||||
|
// Drop old blocks from memory.
|
||||||
for _, b := range oldBlocks {
|
for _, b := range oldBlocks {
|
||||||
if _, ok := exist[b.Meta().ULID]; ok {
|
if _, ok := opened[b.Meta().ULID]; ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err := b.Close(); err != nil {
|
if err := b.Close(); err != nil {
|
||||||
level.Warn(db.logger).Log("msg", "closing block failed", "err", err)
|
level.Warn(db.logger).Log("msg", "closing block failed", "err", err)
|
||||||
}
|
}
|
||||||
if err := os.RemoveAll(b.Dir()); err != nil {
|
}
|
||||||
level.Warn(db.logger).Log("msg", "deleting block failed", "err", err)
|
// Delete all obsolete blocks. None of them are opened any longer.
|
||||||
|
for ulid := range deleteable {
|
||||||
|
if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil {
|
||||||
|
return errors.Wrapf(err, "delete obsolete block %s", ulid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -765,7 +749,7 @@ func (db *DB) Snapshot(dir string, withHead bool) error {
|
||||||
if !withHead {
|
if !withHead {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
_, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime())
|
_, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime(), nil)
|
||||||
return errors.Wrap(err, "snapshot head block")
|
return errors.Wrap(err, "snapshot head block")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -778,8 +762,7 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) {
|
||||||
defer db.mtx.RUnlock()
|
defer db.mtx.RUnlock()
|
||||||
|
|
||||||
for _, b := range db.blocks {
|
for _, b := range db.blocks {
|
||||||
m := b.Meta()
|
if b.OverlapsClosedInterval(mint, maxt) {
|
||||||
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
|
|
||||||
blocks = append(blocks, b)
|
blocks = append(blocks, b)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -821,8 +804,7 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
|
||||||
defer db.mtx.RUnlock()
|
defer db.mtx.RUnlock()
|
||||||
|
|
||||||
for _, b := range db.blocks {
|
for _, b := range db.blocks {
|
||||||
m := b.Meta()
|
if b.OverlapsClosedInterval(mint, maxt) {
|
||||||
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
|
|
||||||
g.Go(func(b *Block) func() error {
|
g.Go(func(b *Block) func() error {
|
||||||
return func() error { return b.Delete(mint, maxt, ms...) }
|
return func() error { return b.Delete(mint, maxt, ms...) }
|
||||||
}(b))
|
}(b))
|
||||||
|
@ -859,27 +841,15 @@ func (db *DB) CleanTombstones() (err error) {
|
||||||
blocks := db.blocks[:]
|
blocks := db.blocks[:]
|
||||||
db.mtx.RUnlock()
|
db.mtx.RUnlock()
|
||||||
|
|
||||||
deletable := []string{}
|
|
||||||
for _, b := range blocks {
|
for _, b := range blocks {
|
||||||
if uid, er := b.CleanTombstones(db.Dir(), db.compactor); er != nil {
|
if uid, er := b.CleanTombstones(db.Dir(), db.compactor); er != nil {
|
||||||
err = errors.Wrapf(er, "clean tombstones: %s", b.Dir())
|
err = errors.Wrapf(er, "clean tombstones: %s", b.Dir())
|
||||||
return err
|
return err
|
||||||
} else if uid != nil { // New block was created.
|
} else if uid != nil { // New block was created.
|
||||||
deletable = append(deletable, b.Dir())
|
|
||||||
newUIDs = append(newUIDs, *uid)
|
newUIDs = append(newUIDs, *uid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return errors.Wrap(db.reload(), "reload blocks")
|
||||||
if len(deletable) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return errors.Wrap(db.reload(deletable...), "reload blocks")
|
|
||||||
}
|
|
||||||
|
|
||||||
func intervalOverlap(amin, amax, bmin, bmax int64) bool {
|
|
||||||
// Checks Overlap: http://stackoverflow.com/questions/3269434/
|
|
||||||
return amin <= bmax && bmin <= amax
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func isBlockDir(fi os.FileInfo) bool {
|
func isBlockDir(fi os.FileInfo) bool {
|
||||||
|
|
|
@ -69,7 +69,7 @@ type Head struct {
|
||||||
|
|
||||||
postings *index.MemPostings // postings lists for terms
|
postings *index.MemPostings // postings lists for terms
|
||||||
|
|
||||||
tombstones memTombstones
|
tombstones *memTombstones
|
||||||
}
|
}
|
||||||
|
|
||||||
type headMetrics struct {
|
type headMetrics struct {
|
||||||
|
@ -189,7 +189,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
|
||||||
values: map[string]stringset{},
|
values: map[string]stringset{},
|
||||||
symbols: map[string]struct{}{},
|
symbols: map[string]struct{}{},
|
||||||
postings: index.NewUnorderedMemPostings(),
|
postings: index.NewUnorderedMemPostings(),
|
||||||
tombstones: memTombstones{},
|
tombstones: NewMemTombstones(),
|
||||||
}
|
}
|
||||||
h.metrics = newHeadMetrics(h, r)
|
h.metrics = newHeadMetrics(h, r)
|
||||||
|
|
||||||
|
@ -300,7 +300,7 @@ func (h *Head) ReadWAL() error {
|
||||||
if itv.Maxt < mint {
|
if itv.Maxt < mint {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
h.tombstones.add(s.ref, itv)
|
h.tombstones.addInterval(s.ref, itv)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -521,7 +521,8 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *headAppender) Commit() error {
|
func (a *headAppender) Commit() error {
|
||||||
defer a.Rollback()
|
defer a.head.metrics.activeAppenders.Dec()
|
||||||
|
defer a.head.putAppendBuffer(a.samples)
|
||||||
|
|
||||||
if err := a.head.wal.LogSeries(a.series); err != nil {
|
if err := a.head.wal.LogSeries(a.series); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -565,7 +566,9 @@ func (a *headAppender) Rollback() error {
|
||||||
a.head.metrics.activeAppenders.Dec()
|
a.head.metrics.activeAppenders.Dec()
|
||||||
a.head.putAppendBuffer(a.samples)
|
a.head.putAppendBuffer(a.samples)
|
||||||
|
|
||||||
return nil
|
// Series are created in the head memory regardless of rollback. Thus we have
|
||||||
|
// to log them to the WAL in any case.
|
||||||
|
return a.head.wal.LogSeries(a.series)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete all samples in the range of [mint, maxt] for series that satisfy the given
|
// Delete all samples in the range of [mint, maxt] for series that satisfy the given
|
||||||
|
@ -602,7 +605,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, s := range stones {
|
for _, s := range stones {
|
||||||
h.tombstones.add(s.ref, s.intervals[0])
|
h.tombstones.addInterval(s.ref, s.intervals[0])
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -732,19 +735,14 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) {
|
||||||
s.Lock()
|
s.Lock()
|
||||||
c := s.chunk(int(cid))
|
c := s.chunk(int(cid))
|
||||||
|
|
||||||
// This means that the chunk has been garbage collected.
|
// This means that the chunk has been garbage collected or is outside
|
||||||
if c == nil {
|
// the specified range.
|
||||||
|
if c == nil || !c.OverlapsClosedInterval(h.mint, h.maxt) {
|
||||||
s.Unlock()
|
s.Unlock()
|
||||||
return nil, ErrNotFound
|
return nil, ErrNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
mint, maxt := c.minTime, c.maxTime
|
|
||||||
s.Unlock()
|
s.Unlock()
|
||||||
|
|
||||||
// Do not expose chunks that are outside of the specified range.
|
|
||||||
if c == nil || !intervalOverlap(mint, maxt, h.mint, h.maxt) {
|
|
||||||
return nil, ErrNotFound
|
|
||||||
}
|
|
||||||
return &safeChunk{
|
return &safeChunk{
|
||||||
Chunk: c.chunk,
|
Chunk: c.chunk,
|
||||||
s: s,
|
s: s,
|
||||||
|
@ -849,7 +847,7 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks
|
||||||
|
|
||||||
for i, c := range s.chunks {
|
for i, c := range s.chunks {
|
||||||
// Do not expose chunks that are outside of the specified range.
|
// Do not expose chunks that are outside of the specified range.
|
||||||
if !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) {
|
if !c.OverlapsClosedInterval(h.mint, h.maxt) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
*chks = append(*chks, chunks.Meta{
|
*chks = append(*chks, chunks.Meta{
|
||||||
|
@ -1288,6 +1286,11 @@ type memChunk struct {
|
||||||
minTime, maxTime int64
|
minTime, maxTime int64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Returns true if the chunk overlaps [mint, maxt].
|
||||||
|
func (mc *memChunk) OverlapsClosedInterval(mint, maxt int64) bool {
|
||||||
|
return mc.minTime <= maxt && mint <= mc.maxTime
|
||||||
|
}
|
||||||
|
|
||||||
type memSafeIterator struct {
|
type memSafeIterator struct {
|
||||||
chunkenc.Iterator
|
chunkenc.Iterator
|
||||||
|
|
||||||
|
|
|
@ -740,8 +740,8 @@ func (r *Reader) decbufUvarintAt(off int) decbuf {
|
||||||
b := r.b.Range(off, off+binary.MaxVarintLen32)
|
b := r.b.Range(off, off+binary.MaxVarintLen32)
|
||||||
|
|
||||||
l, n := binary.Uvarint(b)
|
l, n := binary.Uvarint(b)
|
||||||
if n > binary.MaxVarintLen32 {
|
if n <= 0 || n > binary.MaxVarintLen32 {
|
||||||
return decbuf{e: errors.New("invalid uvarint")}
|
return decbuf{e: errors.Errorf("invalid uvarint %d", n)}
|
||||||
}
|
}
|
||||||
|
|
||||||
if r.b.Len() < off+n+int(l)+4 {
|
if r.b.Len() < off+n+int(l)+4 {
|
||||||
|
|
|
@ -478,7 +478,7 @@ type baseChunkSeries struct {
|
||||||
// over them. It drops chunks based on tombstones in the given reader.
|
// over them. It drops chunks based on tombstones in the given reader.
|
||||||
func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher) (ChunkSeriesSet, error) {
|
func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher) (ChunkSeriesSet, error) {
|
||||||
if tr == nil {
|
if tr == nil {
|
||||||
tr = EmptyTombstoneReader()
|
tr = NewMemTombstones()
|
||||||
}
|
}
|
||||||
p, err := PostingsForMatchers(ir, ms...)
|
p, err := PostingsForMatchers(ir, ms...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -16,12 +16,12 @@ package tsdb
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/pkg/errors"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const tombstoneFilename = "tombstones"
|
const tombstoneFilename = "tombstones"
|
||||||
|
@ -107,10 +107,10 @@ type Stone struct {
|
||||||
intervals Intervals
|
intervals Intervals
|
||||||
}
|
}
|
||||||
|
|
||||||
func readTombstones(dir string) (memTombstones, error) {
|
func readTombstones(dir string) (*memTombstones, error) {
|
||||||
b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename))
|
b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename))
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return memTombstones{}, nil
|
return NewMemTombstones(), nil
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -140,7 +140,7 @@ func readTombstones(dir string) (memTombstones, error) {
|
||||||
return nil, errors.New("checksum did not match")
|
return nil, errors.New("checksum did not match")
|
||||||
}
|
}
|
||||||
|
|
||||||
stonesMap := memTombstones{}
|
stonesMap := NewMemTombstones()
|
||||||
|
|
||||||
for d.len() > 0 {
|
for d.len() > 0 {
|
||||||
k := d.uvarint64()
|
k := d.uvarint64()
|
||||||
|
@ -150,27 +150,31 @@ func readTombstones(dir string) (memTombstones, error) {
|
||||||
return nil, d.err()
|
return nil, d.err()
|
||||||
}
|
}
|
||||||
|
|
||||||
stonesMap.add(k, Interval{mint, maxt})
|
stonesMap.addInterval(k, Interval{mint, maxt})
|
||||||
}
|
}
|
||||||
|
|
||||||
return stonesMap, nil
|
return stonesMap, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type memTombstones map[uint64]Intervals
|
type memTombstones struct {
|
||||||
|
intvlGroups map[uint64]Intervals
|
||||||
var emptyTombstoneReader = memTombstones{}
|
mtx sync.RWMutex
|
||||||
|
|
||||||
// EmptyTombstoneReader returns a TombstoneReader that is always empty.
|
|
||||||
func EmptyTombstoneReader() TombstoneReader {
|
|
||||||
return emptyTombstoneReader
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t memTombstones) Get(ref uint64) (Intervals, error) {
|
func NewMemTombstones() *memTombstones {
|
||||||
return t[ref], nil
|
return &memTombstones{intvlGroups: make(map[uint64]Intervals)}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t memTombstones) Iter(f func(uint64, Intervals) error) error {
|
func (t *memTombstones) Get(ref uint64) (Intervals, error) {
|
||||||
for ref, ivs := range t {
|
t.mtx.RLock()
|
||||||
|
defer t.mtx.RUnlock()
|
||||||
|
return t.intvlGroups[ref], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *memTombstones) Iter(f func(uint64, Intervals) error) error {
|
||||||
|
t.mtx.RLock()
|
||||||
|
defer t.mtx.RUnlock()
|
||||||
|
for ref, ivs := range t.intvlGroups {
|
||||||
if err := f(ref, ivs); err != nil {
|
if err := f(ref, ivs); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -178,8 +182,13 @@ func (t memTombstones) Iter(f func(uint64, Intervals) error) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t memTombstones) add(ref uint64, itv Interval) {
|
// addInterval to an existing memTombstones
|
||||||
t[ref] = t[ref].add(itv)
|
func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) {
|
||||||
|
t.mtx.Lock()
|
||||||
|
defer t.mtx.Unlock()
|
||||||
|
for _, itv := range itvs {
|
||||||
|
t.intvlGroups[ref] = t.intvlGroups[ref].add(itv)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (memTombstones) Close() error {
|
func (memTombstones) Close() error {
|
||||||
|
@ -208,7 +217,7 @@ func (tr Interval) isSubrange(dranges Intervals) bool {
|
||||||
// Intervals represents a set of increasing and non-overlapping time-intervals.
|
// Intervals represents a set of increasing and non-overlapping time-intervals.
|
||||||
type Intervals []Interval
|
type Intervals []Interval
|
||||||
|
|
||||||
// This adds the new time-range to the existing ones.
|
// add the new time-range to the existing ones.
|
||||||
// The existing ones must be sorted.
|
// The existing ones must be sorted.
|
||||||
func (itvs Intervals) add(n Interval) Intervals {
|
func (itvs Intervals) add(n Interval) Intervals {
|
||||||
for i, r := range itvs {
|
for i, r := range itvs {
|
||||||
|
|
|
@ -820,40 +820,40 @@
|
||||||
"revisionTime": "2016-04-11T19:08:41Z"
|
"revisionTime": "2016-04-11T19:08:41Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "eohOTRwnox/+qrSrgYmnxeJB2yM=",
|
"checksumSHA1": "gzvR+g1v/ILXxAt/NuxzIPWk1x0=",
|
||||||
"path": "github.com/prometheus/tsdb",
|
"path": "github.com/prometheus/tsdb",
|
||||||
"revision": "c848349f07c83bd38d5d19faa5ea71c7fd8923ea",
|
"revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e",
|
||||||
"revisionTime": "2018-06-05T09:24:13Z"
|
"revisionTime": "2018-07-11T11:21:26Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "QI0UME2olSr4kH6Z8UkpffM59Mc=",
|
"checksumSHA1": "QI0UME2olSr4kH6Z8UkpffM59Mc=",
|
||||||
"path": "github.com/prometheus/tsdb/chunkenc",
|
"path": "github.com/prometheus/tsdb/chunkenc",
|
||||||
"revision": "c848349f07c83bd38d5d19faa5ea71c7fd8923ea",
|
"revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e",
|
||||||
"revisionTime": "2018-06-05T09:24:13Z"
|
"revisionTime": "2018-07-11T11:21:26Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "746Mjy2y6wdsGjY/FcGhc8tI4w8=",
|
"checksumSHA1": "+5bPifRe479zdFeTYhZ+CZRLMgw=",
|
||||||
"path": "github.com/prometheus/tsdb/chunks",
|
"path": "github.com/prometheus/tsdb/chunks",
|
||||||
"revision": "c848349f07c83bd38d5d19faa5ea71c7fd8923ea",
|
"revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e",
|
||||||
"revisionTime": "2018-06-05T09:24:13Z"
|
"revisionTime": "2018-07-11T11:21:26Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "dnyelqeik/xHDRCvCmKFv/Op9XQ=",
|
"checksumSHA1": "dnyelqeik/xHDRCvCmKFv/Op9XQ=",
|
||||||
"path": "github.com/prometheus/tsdb/fileutil",
|
"path": "github.com/prometheus/tsdb/fileutil",
|
||||||
"revision": "c848349f07c83bd38d5d19faa5ea71c7fd8923ea",
|
"revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e",
|
||||||
"revisionTime": "2018-06-05T09:24:13Z"
|
"revisionTime": "2018-07-11T11:21:26Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "A2uIFwIgeHmXGBzOpna95kM80RY=",
|
"checksumSHA1": "AZGFK4UtJe8/j8pHqGTNQ8wu27g=",
|
||||||
"path": "github.com/prometheus/tsdb/index",
|
"path": "github.com/prometheus/tsdb/index",
|
||||||
"revision": "c848349f07c83bd38d5d19faa5ea71c7fd8923ea",
|
"revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e",
|
||||||
"revisionTime": "2018-06-05T09:24:13Z"
|
"revisionTime": "2018-07-11T11:21:26Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "Va8HWvOFTwFeewZFadMAOzNGDps=",
|
"checksumSHA1": "Va8HWvOFTwFeewZFadMAOzNGDps=",
|
||||||
"path": "github.com/prometheus/tsdb/labels",
|
"path": "github.com/prometheus/tsdb/labels",
|
||||||
"revision": "c848349f07c83bd38d5d19faa5ea71c7fd8923ea",
|
"revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e",
|
||||||
"revisionTime": "2018-06-05T09:24:13Z"
|
"revisionTime": "2018-07-11T11:21:26Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",
|
"checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",
|
||||||
|
|
Loading…
Reference in New Issue