diff --git a/config/config.go b/config/config.go index 473c9fd2d..81882b063 100644 --- a/config/config.go +++ b/config/config.go @@ -119,6 +119,11 @@ var ( Timeout: Duration(10 * time.Second), } + // DefaultNerveSDConfig is the default Nerve SD configuration. + DefaultNerveSDConfig = NerveSDConfig{ + Timeout: Duration(10 * time.Second), + } + // DefaultMarathonSDConfig is the default Marathon SD configuration. DefaultMarathonSDConfig = MarathonSDConfig{ RefreshInterval: Duration(30 * time.Second), @@ -367,6 +372,8 @@ type ScrapeConfig struct { ConsulSDConfigs []*ConsulSDConfig `yaml:"consul_sd_configs,omitempty"` // List of Serverset service discovery configurations. ServersetSDConfigs []*ServersetSDConfig `yaml:"serverset_sd_configs,omitempty"` + // NerveSDConfigs is a list of Nerve service discovery configurations. + NerveSDConfigs []*NerveSDConfig `yaml:"nerve_sd_configs,omitempty"` // MarathonSDConfigs is a list of Marathon service discovery configurations. MarathonSDConfigs []*MarathonSDConfig `yaml:"marathon_sd_configs,omitempty"` // List of Kubernetes service discovery configurations. @@ -647,6 +654,38 @@ func (c *ServersetSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) err return checkOverflow(c.XXX, "serverset_sd_config") } +// NerveSDConfig is the configuration for AirBnB's Nerve in Zookeeper based discovery. +type NerveSDConfig struct { + Servers []string `yaml:"servers"` + Paths []string `yaml:"paths"` + Timeout Duration `yaml:"timeout,omitempty"` + + // Catches all undefined fields and must be empty after parsing. + XXX map[string]interface{} `yaml:",inline"` +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (c *NerveSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + *c = DefaultNerveSDConfig + type plain NerveSDConfig + err := unmarshal((*plain)(c)) + if err != nil { + return err + } + if len(c.Servers) == 0 { + return fmt.Errorf("nerve SD config must contain at least one Zookeeper server") + } + if len(c.Paths) == 0 { + return fmt.Errorf("nerve SD config must contain at least one path") + } + for _, path := range c.Paths { + if !strings.HasPrefix(path, "/") { + return fmt.Errorf("nerve SD config paths must begin with '/': %s", path) + } + } + return checkOverflow(c.XXX, "nerve_sd_config") +} + // MarathonSDConfig is the configuration for services running on Marathon. type MarathonSDConfig struct { Servers []string `yaml:"servers,omitempty"` diff --git a/config/config_test.go b/config/config_test.go index e35adb683..8c27f1b54 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -270,6 +270,23 @@ var expectedConf = &Config{ }, }, }, + { + JobName: "service-nerve", + + ScrapeInterval: Duration(15 * time.Second), + ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, + + MetricsPath: DefaultScrapeConfig.MetricsPath, + Scheme: DefaultScrapeConfig.Scheme, + + NerveSDConfigs: []*NerveSDConfig{ + { + Servers: []string{"localhost"}, + Paths: []string{"/monitoring"}, + Timeout: Duration(10 * time.Second), + }, + }, + }, }, original: "", } diff --git a/config/testdata/conf.good.yml b/config/testdata/conf.good.yml index fd4126113..b26f4ca78 100644 --- a/config/testdata/conf.good.yml +++ b/config/testdata/conf.good.yml @@ -126,3 +126,10 @@ scrape_configs: - region: us-east-1 access_key: access secret_key: secret + +- job_name: service-nerve + nerve_sd_configs: + - servers: + - localhost + paths: + - /monitoring diff --git a/retrieval/discovery/nerve.go b/retrieval/discovery/nerve.go new file mode 100644 index 000000000..d23416062 --- /dev/null +++ b/retrieval/discovery/nerve.go @@ -0,0 +1,150 @@ +// Copyright 2015 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package discovery + +import ( + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/prometheus/common/model" + "github.com/samuel/go-zookeeper/zk" + + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/util/treecache" +) + +const ( + nerveNodePrefix = "member_" + + nerveLabelPrefix = model.MetaLabelPrefix + "nerve_" + nervePathLabel = nerveLabelPrefix + "path" + nerveEndpointLabelPrefix = nerveLabelPrefix + "endpoint" +) + +type nerveMember struct { + Host string `json:"host"` + Port int `json:"port"` + Name string `json:"name"` +} + +// NerveDiscovery retrieves target information from a Nerve server +// and updates them via watches. +type NerveDiscovery struct { + conf *config.NerveSDConfig + conn *zk.Conn + mu sync.RWMutex + sources map[string]*config.TargetGroup + sdUpdates *chan<- config.TargetGroup + updates chan treecache.ZookeeperTreeCacheEvent + treeCaches []*treecache.ZookeeperTreeCache +} + +// NewNerveDiscovery returns a new NerveDiscovery for the given config. +func NewNerveDiscovery(conf *config.NerveSDConfig) *NerveDiscovery { + conn, _, err := zk.Connect(conf.Servers, time.Duration(conf.Timeout)) + conn.SetLogger(treecache.ZookeeperLogger{}) + if err != nil { + return nil + } + updates := make(chan treecache.ZookeeperTreeCacheEvent) + sd := &NerveDiscovery{ + conf: conf, + conn: conn, + updates: updates, + sources: map[string]*config.TargetGroup{}, + } + go sd.processUpdates() + for _, path := range conf.Paths { + sd.treeCaches = append(sd.treeCaches, treecache.NewZookeeperTreeCache(conn, path, updates)) + } + return sd +} + +// Sources implements the TargetProvider interface. +func (sd *NerveDiscovery) Sources() []string { + sd.mu.RLock() + defer sd.mu.RUnlock() + srcs := []string{} + for t := range sd.sources { + srcs = append(srcs, t) + } + return srcs +} + +func (sd *NerveDiscovery) processUpdates() { + defer sd.conn.Close() + for event := range sd.updates { + tg := &config.TargetGroup{ + Source: event.Path, + } + sd.mu.Lock() + if event.Data != nil { + labelSet, err := parseNerveMember(*event.Data, event.Path) + if err == nil { + tg.Targets = []model.LabelSet{*labelSet} + sd.sources[event.Path] = tg + } else { + delete(sd.sources, event.Path) + } + } else { + delete(sd.sources, event.Path) + } + sd.mu.Unlock() + if sd.sdUpdates != nil { + *sd.sdUpdates <- *tg + } + } + + if sd.sdUpdates != nil { + close(*sd.sdUpdates) + } +} + +// Run implements the TargetProvider interface. +func (sd *NerveDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { + // Send on everything we have seen so far. + sd.mu.Lock() + for _, targetGroup := range sd.sources { + ch <- *targetGroup + } + // Tell processUpdates to send future updates. + sd.sdUpdates = &ch + sd.mu.Unlock() + + <-done + for _, tc := range sd.treeCaches { + tc.Stop() + } +} + +func parseNerveMember(data []byte, path string) (*model.LabelSet, error) { + member := nerveMember{} + err := json.Unmarshal(data, &member) + if err != nil { + return nil, fmt.Errorf("error unmarshaling nerve member %q: %s", path, err) + } + + labels := model.LabelSet{} + labels[nervePathLabel] = model.LabelValue(path) + labels[model.AddressLabel] = model.LabelValue( + fmt.Sprintf("%s:%d", member.Host, member.Port)) + + labels[nerveEndpointLabelPrefix+"_host"] = model.LabelValue(member.Host) + labels[nerveEndpointLabelPrefix+"_port"] = model.LabelValue(fmt.Sprintf("%d", member.Port)) + labels[nerveEndpointLabelPrefix+"_name"] = model.LabelValue(member.Name) + + return &labels, nil +} diff --git a/retrieval/discovery/serverset.go b/retrieval/discovery/serverset.go index 6dc8f5162..d1172b243 100644 --- a/retrieval/discovery/serverset.go +++ b/retrieval/discovery/serverset.go @@ -14,20 +14,18 @@ package discovery import ( - "bytes" "encoding/json" "fmt" "strconv" - "strings" "sync" "time" - "github.com/prometheus/common/log" "github.com/prometheus/common/model" "github.com/samuel/go-zookeeper/zk" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/util/strutil" + "github.com/prometheus/prometheus/util/treecache" ) const ( @@ -52,14 +50,6 @@ type serversetEndpoint struct { Port int } -type zookeeperLogger struct { -} - -// Implements zk.Logger -func (zl zookeeperLogger) Printf(s string, i ...interface{}) { - log.Infof(s, i...) -} - // ServersetDiscovery retrieves target information from a Serverset server // and updates them via watches. type ServersetDiscovery struct { @@ -68,18 +58,18 @@ type ServersetDiscovery struct { mu sync.RWMutex sources map[string]*config.TargetGroup sdUpdates *chan<- config.TargetGroup - updates chan zookeeperTreeCacheEvent - treeCaches []*zookeeperTreeCache + updates chan treecache.ZookeeperTreeCacheEvent + treeCaches []*treecache.ZookeeperTreeCache } // NewServersetDiscovery returns a new ServersetDiscovery for the given config. func NewServersetDiscovery(conf *config.ServersetSDConfig) *ServersetDiscovery { conn, _, err := zk.Connect(conf.Servers, time.Duration(conf.Timeout)) - conn.SetLogger(zookeeperLogger{}) + conn.SetLogger(treecache.ZookeeperLogger{}) if err != nil { return nil } - updates := make(chan zookeeperTreeCacheEvent) + updates := make(chan treecache.ZookeeperTreeCacheEvent) sd := &ServersetDiscovery{ conf: conf, conn: conn, @@ -88,7 +78,7 @@ func NewServersetDiscovery(conf *config.ServersetSDConfig) *ServersetDiscovery { } go sd.processUpdates() for _, path := range conf.Paths { - sd.treeCaches = append(sd.treeCaches, newZookeeperTreeCache(conn, path, updates)) + sd.treeCaches = append(sd.treeCaches, treecache.NewZookeeperTreeCache(conn, path, updates)) } return sd } @@ -179,197 +169,3 @@ func parseServersetMember(data []byte, path string) (*model.LabelSet, error) { return &labels, nil } - -type zookeeperTreeCache struct { - conn *zk.Conn - prefix string - events chan zookeeperTreeCacheEvent - zkEvents chan zk.Event - stop chan struct{} - head *zookeeperTreeCacheNode -} - -type zookeeperTreeCacheEvent struct { - Path string - Data *[]byte -} - -type zookeeperTreeCacheNode struct { - data *[]byte - events chan zk.Event - done chan struct{} - stopped bool - children map[string]*zookeeperTreeCacheNode -} - -func newZookeeperTreeCache(conn *zk.Conn, path string, events chan zookeeperTreeCacheEvent) *zookeeperTreeCache { - tc := &zookeeperTreeCache{ - conn: conn, - prefix: path, - events: events, - stop: make(chan struct{}), - } - tc.head = &zookeeperTreeCacheNode{ - events: make(chan zk.Event), - children: map[string]*zookeeperTreeCacheNode{}, - stopped: true, - } - err := tc.recursiveNodeUpdate(path, tc.head) - if err != nil { - log.Errorf("Error during initial read of Zookeeper: %s", err) - } - go tc.loop(err != nil) - return tc -} - -func (tc *zookeeperTreeCache) Stop() { - tc.stop <- struct{}{} -} - -func (tc *zookeeperTreeCache) loop(failureMode bool) { - retryChan := make(chan struct{}) - - failure := func() { - failureMode = true - time.AfterFunc(time.Second*10, func() { - retryChan <- struct{}{} - }) - } - if failureMode { - failure() - } - - for { - select { - case ev := <-tc.head.events: - log.Debugf("Received Zookeeper event: %s", ev) - if failureMode { - continue - } - - if ev.Type == zk.EventNotWatching { - log.Infof("Lost connection to Zookeeper.") - failure() - } else { - path := strings.TrimPrefix(ev.Path, tc.prefix) - parts := strings.Split(path, "/") - node := tc.head - for _, part := range parts[1:] { - childNode := node.children[part] - if childNode == nil { - childNode = &zookeeperTreeCacheNode{ - events: tc.head.events, - children: map[string]*zookeeperTreeCacheNode{}, - done: make(chan struct{}, 1), - } - node.children[part] = childNode - } - node = childNode - } - - err := tc.recursiveNodeUpdate(ev.Path, node) - if err != nil { - log.Errorf("Error during processing of Zookeeper event: %s", err) - failure() - } else if tc.head.data == nil { - log.Errorf("Error during processing of Zookeeper event: path %s no longer exists", tc.prefix) - failure() - } - } - case <-retryChan: - log.Infof("Attempting to resync state with Zookeeper") - // Reset root child nodes before traversing the Zookeeper path. - tc.head.children = make(map[string]*zookeeperTreeCacheNode) - err := tc.recursiveNodeUpdate(tc.prefix, tc.head) - if err != nil { - log.Errorf("Error during Zookeeper resync: %s", err) - failure() - } else { - log.Infof("Zookeeper resync successful") - failureMode = false - } - case <-tc.stop: - close(tc.events) - return - } - } -} - -func (tc *zookeeperTreeCache) recursiveNodeUpdate(path string, node *zookeeperTreeCacheNode) error { - data, _, dataWatcher, err := tc.conn.GetW(path) - if err == zk.ErrNoNode { - tc.recursiveDelete(path, node) - if node == tc.head { - return fmt.Errorf("path %s does not exist", path) - } - return nil - } else if err != nil { - return err - } - - if node.data == nil || !bytes.Equal(*node.data, data) { - node.data = &data - tc.events <- zookeeperTreeCacheEvent{Path: path, Data: node.data} - } - - children, _, childWatcher, err := tc.conn.ChildrenW(path) - if err == zk.ErrNoNode { - tc.recursiveDelete(path, node) - return nil - } else if err != nil { - return err - } - - currentChildren := map[string]struct{}{} - for _, child := range children { - currentChildren[child] = struct{}{} - childNode := node.children[child] - // Does not already exists or we previous had a watch that - // triggered. - if childNode == nil || childNode.stopped { - node.children[child] = &zookeeperTreeCacheNode{ - events: node.events, - children: map[string]*zookeeperTreeCacheNode{}, - done: make(chan struct{}, 1), - } - err = tc.recursiveNodeUpdate(path+"/"+child, node.children[child]) - if err != nil { - return err - } - } - } - - // Remove nodes that no longer exist - for name, childNode := range node.children { - if _, ok := currentChildren[name]; !ok || node.data == nil { - tc.recursiveDelete(path+"/"+name, childNode) - delete(node.children, name) - } - } - - go func() { - // Pass up zookeeper events, until the node is deleted. - select { - case event := <-dataWatcher: - node.events <- event - case event := <-childWatcher: - node.events <- event - case <-node.done: - } - }() - return nil -} - -func (tc *zookeeperTreeCache) recursiveDelete(path string, node *zookeeperTreeCacheNode) { - if !node.stopped { - node.done <- struct{}{} - node.stopped = true - } - if node.data != nil { - tc.events <- zookeeperTreeCacheEvent{Path: path, Data: nil} - node.data = nil - } - for name, childNode := range node.children { - tc.recursiveDelete(path+"/"+name, childNode) - } -} diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 2538f51ef..8a453e70a 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -434,6 +434,9 @@ func providersFromConfig(cfg *config.ScrapeConfig) []TargetProvider { for i, c := range cfg.ServersetSDConfigs { app("serverset", i, discovery.NewServersetDiscovery(c)) } + for i, c := range cfg.NerveSDConfigs { + app("nerve", i, discovery.NewNerveDiscovery(c)) + } for i, c := range cfg.EC2SDConfigs { app("ec2", i, discovery.NewEC2Discovery(c)) } diff --git a/util/treecache/treecache.go b/util/treecache/treecache.go new file mode 100644 index 000000000..c23766f3c --- /dev/null +++ b/util/treecache/treecache.go @@ -0,0 +1,213 @@ +package treecache + +import ( + "bytes" + "fmt" + "strings" + "time" + + "github.com/prometheus/common/log" + "github.com/samuel/go-zookeeper/zk" +) + +type ZookeeperLogger struct { +} + +// Implements zk.Logger +func (zl ZookeeperLogger) Printf(s string, i ...interface{}) { + log.Infof(s, i...) +} + +type ZookeeperTreeCache struct { + conn *zk.Conn + prefix string + events chan ZookeeperTreeCacheEvent + zkEvents chan zk.Event + stop chan struct{} + head *zookeeperTreeCacheNode +} + +type ZookeeperTreeCacheEvent struct { + Path string + Data *[]byte +} + +type zookeeperTreeCacheNode struct { + data *[]byte + events chan zk.Event + done chan struct{} + stopped bool + children map[string]*zookeeperTreeCacheNode +} + +func NewZookeeperTreeCache(conn *zk.Conn, path string, events chan ZookeeperTreeCacheEvent) *ZookeeperTreeCache { + tc := &ZookeeperTreeCache{ + conn: conn, + prefix: path, + events: events, + stop: make(chan struct{}), + } + tc.head = &zookeeperTreeCacheNode{ + events: make(chan zk.Event), + children: map[string]*zookeeperTreeCacheNode{}, + stopped: true, + } + err := tc.recursiveNodeUpdate(path, tc.head) + if err != nil { + log.Errorf("Error during initial read of Zookeeper: %s", err) + } + go tc.loop(err != nil) + return tc +} + +func (tc *ZookeeperTreeCache) Stop() { + tc.stop <- struct{}{} +} + +func (tc *ZookeeperTreeCache) loop(failureMode bool) { + retryChan := make(chan struct{}) + + failure := func() { + failureMode = true + time.AfterFunc(time.Second*10, func() { + retryChan <- struct{}{} + }) + } + if failureMode { + failure() + } + + for { + select { + case ev := <-tc.head.events: + log.Debugf("Received Zookeeper event: %s", ev) + if failureMode { + continue + } + + if ev.Type == zk.EventNotWatching { + log.Infof("Lost connection to Zookeeper.") + failure() + } else { + path := strings.TrimPrefix(ev.Path, tc.prefix) + parts := strings.Split(path, "/") + node := tc.head + for _, part := range parts[1:] { + childNode := node.children[part] + if childNode == nil { + childNode = &zookeeperTreeCacheNode{ + events: tc.head.events, + children: map[string]*zookeeperTreeCacheNode{}, + done: make(chan struct{}, 1), + } + node.children[part] = childNode + } + node = childNode + } + + err := tc.recursiveNodeUpdate(ev.Path, node) + if err != nil { + log.Errorf("Error during processing of Zookeeper event: %s", err) + failure() + } else if tc.head.data == nil { + log.Errorf("Error during processing of Zookeeper event: path %s no longer exists", tc.prefix) + failure() + } + } + case <-retryChan: + log.Infof("Attempting to resync state with Zookeeper") + // Reset root child nodes before traversing the Zookeeper path. + tc.head.children = make(map[string]*zookeeperTreeCacheNode) + err := tc.recursiveNodeUpdate(tc.prefix, tc.head) + if err != nil { + log.Errorf("Error during Zookeeper resync: %s", err) + failure() + } else { + log.Infof("Zookeeper resync successful") + failureMode = false + } + case <-tc.stop: + close(tc.events) + return + } + } +} + +func (tc *ZookeeperTreeCache) recursiveNodeUpdate(path string, node *zookeeperTreeCacheNode) error { + data, _, dataWatcher, err := tc.conn.GetW(path) + if err == zk.ErrNoNode { + tc.recursiveDelete(path, node) + if node == tc.head { + return fmt.Errorf("path %s does not exist", path) + } + return nil + } else if err != nil { + return err + } + + if node.data == nil || !bytes.Equal(*node.data, data) { + node.data = &data + tc.events <- ZookeeperTreeCacheEvent{Path: path, Data: node.data} + } + + children, _, childWatcher, err := tc.conn.ChildrenW(path) + if err == zk.ErrNoNode { + tc.recursiveDelete(path, node) + return nil + } else if err != nil { + return err + } + + currentChildren := map[string]struct{}{} + for _, child := range children { + currentChildren[child] = struct{}{} + childNode := node.children[child] + // Does not already exists or we previous had a watch that + // triggered. + if childNode == nil || childNode.stopped { + node.children[child] = &zookeeperTreeCacheNode{ + events: node.events, + children: map[string]*zookeeperTreeCacheNode{}, + done: make(chan struct{}, 1), + } + err = tc.recursiveNodeUpdate(path+"/"+child, node.children[child]) + if err != nil { + return err + } + } + } + + // Remove nodes that no longer exist + for name, childNode := range node.children { + if _, ok := currentChildren[name]; !ok || node.data == nil { + tc.recursiveDelete(path+"/"+name, childNode) + delete(node.children, name) + } + } + + go func() { + // Pass up zookeeper events, until the node is deleted. + select { + case event := <-dataWatcher: + node.events <- event + case event := <-childWatcher: + node.events <- event + case <-node.done: + } + }() + return nil +} + +func (tc *ZookeeperTreeCache) recursiveDelete(path string, node *zookeeperTreeCacheNode) { + if !node.stopped { + node.done <- struct{}{} + node.stopped = true + } + if node.data != nil { + tc.events <- ZookeeperTreeCacheEvent{Path: path, Data: nil} + node.data = nil + } + for name, childNode := range node.children { + tc.recursiveDelete(path+"/"+name, childNode) + } +}