diff --git a/retrieval/discovery/kubernetes_v2/endpoint.go b/retrieval/discovery/kubernetes_v2/endpoint.go index c7a9c9b37..71465b28e 100644 --- a/retrieval/discovery/kubernetes_v2/endpoint.go +++ b/retrieval/discovery/kubernetes_v2/endpoint.go @@ -32,11 +32,12 @@ type Endpoints struct { logger log.Logger endpointsInf cache.SharedInformer - servicesInf cache.SharedInformer - podsInf cache.SharedInformer + serviceInf cache.SharedInformer + podInf cache.SharedInformer podStore cache.Store endpointsStore cache.Store + serviceStore cache.Store } // NewEndpoints returns a new endpoints discovery. @@ -45,8 +46,9 @@ func NewEndpoints(l log.Logger, svc, eps, pod cache.SharedInformer) *Endpoints { logger: l, endpointsInf: eps, endpointsStore: eps.GetStore(), - servicesInf: svc, - podsInf: pod, + serviceInf: svc, + serviceStore: svc.GetStore(), + podInf: pod, podStore: pod.GetStore(), } @@ -99,8 +101,11 @@ func (e *Endpoints) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { if exists && err != nil { send(e.buildEndpoints(obj.(*apiv1.Endpoints))) } + if err != nil { + e.logger.With("err", err).Errorln("retrieving endpoints failed") + } } - e.servicesInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ + e.serviceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(o interface{}) { serviceUpdate(o.(*apiv1.Service)) }, UpdateFunc: func(_, o interface{}) { serviceUpdate(o.(*apiv1.Service)) }, DeleteFunc: func(o interface{}) { serviceUpdate(o.(*apiv1.Service)) }, @@ -119,9 +124,10 @@ const ( serviceLabelPrefix = metaLabelPrefix + "service_label_" serviceAnnotationPrefix = metaLabelPrefix + "service_annotation_" - endpointsNameLabel = metaLabelPrefix + "endpoints_name" - endpointReadyLabel = metaLabelPrefix + "endpoint_ready" - endpointPortNameLabel = metaLabelPrefix + "endpoint_port_name" + endpointsNameLabel = metaLabelPrefix + "endpoints_name" + endpointReadyLabel = metaLabelPrefix + "endpoint_ready" + endpointPortNameLabel = metaLabelPrefix + "endpoint_port_name" + endpointPortProtocolLabel = metaLabelPrefix + "endpoint_port_protocol" ) func (e *Endpoints) buildEndpoints(eps *apiv1.Endpoints) *config.TargetGroup { @@ -136,22 +142,56 @@ func (e *Endpoints) buildEndpoints(eps *apiv1.Endpoints) *config.TargetGroup { namespaceLabel: lv(eps.Namespace), endpointsNameLabel: lv(eps.Name), } - e.decorateService(eps.Namespace, eps.Name, tg) + e.addServiceLabels(eps.Namespace, eps.Name, tg) - // type podEntry struct { - // pod *apiv1.Pod - // servicePorts []apiv1.ServicePort - // } - // seenPods := map[string]podEntry{} + type podEntry struct { + pod *apiv1.Pod + servicePorts []apiv1.EndpointPort + } + seenPods := map[string]*podEntry{} add := func(addr apiv1.EndpointAddress, port apiv1.EndpointPort, ready string) { a := net.JoinHostPort(addr.IP, strconv.FormatInt(int64(port.Port), 10)) - tg.Targets = append(tg.Targets, model.LabelSet{ - model.AddressLabel: lv(a), - endpointPortNameLabel: lv(port.Name), - endpointReadyLabel: lv(ready), - }) + target := model.LabelSet{ + model.AddressLabel: lv(a), + endpointPortNameLabel: lv(port.Name), + endpointPortProtocolLabel: lv(string(port.Protocol)), + endpointReadyLabel: lv(ready), + } + + pod := e.resolvePodRef(addr.TargetRef) + if pod == nil { + tg.Targets = append(tg.Targets, target) + return + } + s := pod.Namespace + "/" + pod.Name + + sp, ok := seenPods[s] + if !ok { + sp = &podEntry{pod: pod} + seenPods[s] = sp + } + + // Attach standard pod labels. + target = target.Merge(podLabels(pod)) + + // Attach potential container port labels matching the endpoint port. + for _, c := range pod.Spec.Containers { + for _, cport := range c.Ports { + if port.Port == cport.ContainerPort { + target[podContainerNameLabel] = lv(c.Name) + target[podContainerPortNameLabel] = lv(port.Name) + target[podContainerPortProtocolLabel] = lv(string(port.Protocol)) + break + } + } + } + + // Add service port so we know that we have already generated a target + // for it. + sp.servicePorts = append(sp.servicePorts, port) + tg.Targets = append(tg.Targets, target) } for _, ss := range eps.Subsets { @@ -165,29 +205,69 @@ func (e *Endpoints) buildEndpoints(eps *apiv1.Endpoints) *config.TargetGroup { } } + // For all seen pods, check all container ports. If they were not covered + // by one of the service endpoints, generate targets for them. + for _, pe := range seenPods { + for _, c := range pe.pod.Spec.Containers { + for _, cport := range c.Ports { + hasSeenPort := func() bool { + for _, eport := range pe.servicePorts { + if cport.ContainerPort == eport.Port { + return true + } + } + return false + } + if hasSeenPort() { + continue + } + + a := net.JoinHostPort(pe.pod.Status.PodIP, strconv.FormatInt(int64(cport.ContainerPort), 10)) + + target := model.LabelSet{ + model.AddressLabel: lv(a), + podContainerNameLabel: lv(c.Name), + podContainerPortNameLabel: lv(cport.Name), + podContainerPortProtocolLabel: lv(string(cport.Protocol)), + } + tg.Targets = append(tg.Targets, target.Merge(podLabels(pe.pod))) + } + } + } + return tg } func (e *Endpoints) resolvePodRef(ref *apiv1.ObjectReference) *apiv1.Pod { - if ref.Kind != "Pod" { + if ref == nil || ref.Kind != "Pod" { return nil } - p, exists, err := e.podStore.Get(ref) + p := &apiv1.Pod{} + p.Namespace = ref.Namespace + p.Name = ref.Name + + obj, exists, err := e.podStore.Get(p) if err != nil || !exists { return nil } - return p.(*apiv1.Pod) + if err != nil { + e.logger.With("err", err).Errorln("resolving pod ref failed") + } + return obj.(*apiv1.Pod) } -func (e *Endpoints) decorateService(ns, name string, tg *config.TargetGroup) { +func (e *Endpoints) addServiceLabels(ns, name string, tg *config.TargetGroup) { svc := &apiv1.Service{} svc.Namespace = ns svc.Name = name - obj, exists, err := e.servicesInf.GetStore().Get(svc) + obj, exists, err := e.serviceStore.Get(svc) if !exists || err != nil { return } + if err != nil { + e.logger.With("err", err).Errorln("retrieving service failed") + } svc = obj.(*apiv1.Service) tg.Labels[serviceNameLabel] = lv(svc.Name) diff --git a/retrieval/discovery/kubernetes_v2/kubernetes.go b/retrieval/discovery/kubernetes_v2/kubernetes.go index 1bd780f1a..ddf0dcc10 100644 --- a/retrieval/discovery/kubernetes_v2/kubernetes.go +++ b/retrieval/discovery/kubernetes_v2/kubernetes.go @@ -121,16 +121,16 @@ func (k *Kubernetes) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod), ) go eps.endpointsInf.Run(ctx.Done()) - go eps.servicesInf.Run(ctx.Done()) - go eps.podsInf.Run(ctx.Done()) + go eps.serviceInf.Run(ctx.Done()) + go eps.podInf.Run(ctx.Done()) - for !eps.servicesInf.HasSynced() { + for !eps.serviceInf.HasSynced() { time.Sleep(100 * time.Millisecond) } for !eps.endpointsInf.HasSynced() { time.Sleep(100 * time.Millisecond) } - for !eps.podsInf.HasSynced() { + for !eps.podInf.HasSynced() { time.Sleep(100 * time.Millisecond) } eps.Run(ctx, ch) diff --git a/retrieval/discovery/kubernetes_v2/pod.go b/retrieval/discovery/kubernetes_v2/pod.go index c235d0f26..8da686811 100644 --- a/retrieval/discovery/kubernetes_v2/pod.go +++ b/retrieval/discovery/kubernetes_v2/pod.go @@ -22,13 +22,14 @@ import ( "github.com/prometheus/common/log" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/util/strutil" "golang.org/x/net/context" "k8s.io/client-go/1.5/pkg/api" apiv1 "k8s.io/client-go/1.5/pkg/api/v1" "k8s.io/client-go/1.5/tools/cache" ) -// Pods discovers new pod targets. +// Pod discovers new pod targets. type Pod struct { informer cache.SharedInformer store cache.Store @@ -86,9 +87,10 @@ func (p *Pod) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { const ( podNameLabel = metaLabelPrefix + "pod_name" - podAddressLabel = metaLabelPrefix + "pod_address" + podIPLabel = metaLabelPrefix + "pod_ip" podContainerNameLabel = metaLabelPrefix + "pod_container_name" podContainerPortNameLabel = metaLabelPrefix + "pod_container_port_name" + podContainerPortNumberLabel = metaLabelPrefix + "pod_container_port_number" podContainerPortProtocolLabel = metaLabelPrefix + "pod_container_port_protocol" podReadyLabel = metaLabelPrefix + "pod_ready" podLabelPrefix = metaLabelPrefix + "pod_label_" @@ -97,19 +99,40 @@ const ( podHostIPLabel = metaLabelPrefix + "pod_host_ip" ) -func (p *Pod) buildPod(pod *apiv1.Pod) *config.TargetGroup { - tg := &config.TargetGroup{ - Source: podSource(pod), - } - tg.Labels = model.LabelSet{ - namespaceLabel: lv(pod.Namespace), +func podLabels(pod *apiv1.Pod) model.LabelSet { + ls := model.LabelSet{ podNameLabel: lv(pod.ObjectMeta.Name), - podAddressLabel: lv(pod.Status.PodIP), + podIPLabel: lv(pod.Status.PodIP), podReadyLabel: podReady(pod), podNodeNameLabel: lv(pod.Spec.NodeName), podHostIPLabel: lv(pod.Status.HostIP), } + for k, v := range pod.Labels { + ln := strutil.SanitizeLabelName(serviceLabelPrefix + k) + ls[model.LabelName(ln)] = lv(v) + } + + for k, v := range pod.Annotations { + ln := strutil.SanitizeLabelName(serviceAnnotationPrefix + k) + ls[model.LabelName(ln)] = lv(v) + } + + return ls +} + +func (p *Pod) buildPod(pod *apiv1.Pod) *config.TargetGroup { + // During startup the pod may not have an IP yet. This does not even allow + // for an up metric, so we skip the target. + if len(pod.Status.PodIP) == 0 { + return nil + } + tg := &config.TargetGroup{ + Source: podSource(pod), + } + tg.Labels = podLabels(pod) + tg.Labels[namespaceLabel] = lv(pod.Namespace) + for _, c := range pod.Spec.Containers { // If no ports are defined for the container, create an anonymous // target per container. @@ -124,11 +147,13 @@ func (p *Pod) buildPod(pod *apiv1.Pod) *config.TargetGroup { } // Otherwise create one target for each container/port combination. for _, port := range c.Ports { - addr := net.JoinHostPort(pod.Status.PodIP, strconv.FormatInt(int64(port.ContainerPort), 10)) + ports := strconv.FormatInt(int64(port.ContainerPort), 10) + addr := net.JoinHostPort(pod.Status.PodIP, ports) tg.Targets = append(tg.Targets, model.LabelSet{ model.AddressLabel: lv(addr), podContainerNameLabel: lv(c.Name), + podContainerPortNumberLabel: lv(ports), podContainerPortNameLabel: lv(port.Name), podContainerPortProtocolLabel: lv(string(port.Protocol)), })