From a9cfb66b2849497356b0b3dac78d506d907dc6f9 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 7 Oct 2016 13:33:04 +0200 Subject: [PATCH] kubernetes: add node discovery --- .../discovery/kubernetes_v2/kubernetes.go | 15 +- retrieval/discovery/kubernetes_v2/node.go | 161 ++++++++++++++++++ 2 files changed, 175 insertions(+), 1 deletion(-) create mode 100644 retrieval/discovery/kubernetes_v2/node.go diff --git a/retrieval/discovery/kubernetes_v2/kubernetes.go b/retrieval/discovery/kubernetes_v2/kubernetes.go index 975d14159..f756802b4 100644 --- a/retrieval/discovery/kubernetes_v2/kubernetes.go +++ b/retrieval/discovery/kubernetes_v2/kubernetes.go @@ -152,7 +152,7 @@ func (k *Kubernetes) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { slw := cache.NewListWatchFromClient(rclient, "services", api.NamespaceAll, nil) svc := NewService( k.logger.With("kubernetes_sd", "service"), - cache.NewSharedInformer(slw, &apiv1.Service, resyncPeriod), + cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod), ) go svc.informer.Run(ctx.Done()) @@ -161,6 +161,19 @@ func (k *Kubernetes) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { } svc.Run(ctx, ch) + case "node": + nlw := cache.NewListWatchFromClient(rclient, "nodes", api.NamespaceAll, nil) + node := NewNode( + k.logger.With("kubernetes_sd", "node"), + cache.NewSharedInformer(nlw, &apiv1.Node{}, resyncPeriod), + ) + go node.informer.Run(ctx.Done()) + + for !node.informer.HasSynced() { + time.Sleep(100 * time.Millisecond) + } + node.Run(ctx, ch) + default: k.logger.Errorf("unknown Kubernetes discovery kind %q", k.role) } diff --git a/retrieval/discovery/kubernetes_v2/node.go b/retrieval/discovery/kubernetes_v2/node.go new file mode 100644 index 000000000..4e2dbabca --- /dev/null +++ b/retrieval/discovery/kubernetes_v2/node.go @@ -0,0 +1,161 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetesv2 + +import ( + "fmt" + "net" + "strconv" + + "github.com/prometheus/common/log" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/util/strutil" + "golang.org/x/net/context" + "k8s.io/client-go/1.5/pkg/api" + apiv1 "k8s.io/client-go/1.5/pkg/api/v1" + "k8s.io/client-go/1.5/tools/cache" +) + +// Node discovers Kubernetes nodes. +type Node struct { + logger log.Logger + informer cache.SharedInformer + store cache.Store +} + +// NewNode returns a new node discovery. +func NewNode(l log.Logger, inf cache.SharedInformer) *Node { + return &Node{logger: l, informer: inf, store: inf.GetStore()} +} + +// Run implements the TargetProvider interface. +func (n *Node) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { + // Send full initial set of pod targets. + var initial []*config.TargetGroup + for _, o := range n.store.List() { + tg := n.buildNode(o.(*apiv1.Node)) + initial = append(initial, tg) + } + select { + case <-ctx.Done(): + return + case ch <- initial: + } + + // Send target groups for service updates. + send := func(tg *config.TargetGroup) { + select { + case <-ctx.Done(): + case ch <- []*config.TargetGroup{tg}: + } + } + n.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(o interface{}) { + send(n.buildNode(o.(*apiv1.Node))) + }, + DeleteFunc: func(o interface{}) { + send(&config.TargetGroup{Source: nodeSource(o.(*apiv1.Node))}) + }, + UpdateFunc: func(_, o interface{}) { + send(n.buildNode(o.(*apiv1.Node))) + }, + }) + + // Block until the target provider is explicitly canceled. + <-ctx.Done() +} + +func nodeSource(n *apiv1.Node) string { + return "node/" + n.Namespace + "/" + n.Name +} + +const ( + nodeNameLabel = metaLabelPrefix + "node_name" + nodeLabelPrefix = metaLabelPrefix + "node_label_" + nodeAnnotationPrefix = metaLabelPrefix + "node_annotation_" + nodeAddressPrefix = metaLabelPrefix + "node_address_" +) + +func nodeLabels(n *apiv1.Node) model.LabelSet { + ls := make(model.LabelSet, len(n.Labels)+len(n.Annotations)+2) + + ls[nodeNameLabel] = lv(n.Name) + + for k, v := range n.Labels { + ln := strutil.SanitizeLabelName(nodeLabelPrefix + k) + ls[model.LabelName(ln)] = lv(v) + } + + for k, v := range n.Annotations { + ln := strutil.SanitizeLabelName(nodeAnnotationPrefix + k) + ls[model.LabelName(ln)] = lv(v) + } + return ls +} + +func (n *Node) buildNode(node *apiv1.Node) *config.TargetGroup { + tg := &config.TargetGroup{ + Source: nodeSource(node), + } + tg.Labels = nodeLabels(node) + + addr, addrMap, err := nodeAddress(node) + if err != nil { + n.logger.With("err", err).Debugf("No node address found") + return nil + } + addr = net.JoinHostPort(addr, strconv.FormatInt(int64(node.Status.DaemonEndpoints.KubeletEndpoint.Port), 10)) + + t := model.LabelSet{ + model.AddressLabel: lv(addr), + model.InstanceLabel: lv(node.Name), + } + + for ty, a := range addrMap { + ln := strutil.SanitizeLabelName(nodeAddressPrefix + string(ty)) + t[model.LabelName(ln)] = lv(a[0]) + } + tg.Targets = append(tg.Targets, t) + + return tg +} + +// nodeAddresses returns the provided node's address, based on the priority: +// 1. NodeInternalIP +// 2. NodeExternalIP +// 3. NodeLegacyHostIP +// 3. NodeHostName +// +// Derived from k8s.io/kubernetes/pkg/util/node/node.go +func nodeAddress(node *apiv1.Node) (string, map[apiv1.NodeAddressType][]string, error) { + m := map[apiv1.NodeAddressType][]string{} + for _, a := range node.Status.Addresses { + m[a.Type] = append(m[a.Type], a.Address) + } + + if addresses, ok := m[apiv1.NodeInternalIP]; ok { + return addresses[0], m, nil + } + if addresses, ok := m[apiv1.NodeExternalIP]; ok { + return addresses[0], m, nil + } + if addresses, ok := m[apiv1.NodeAddressType(api.NodeLegacyHostIP)]; ok { + return addresses[0], m, nil + } + if addresses, ok := m[apiv1.NodeHostName]; ok { + return addresses[0], m, nil + } + return "", m, fmt.Errorf("host address unknown") +}