diff --git a/discovery/kubernetes/endpointslice.go b/discovery/kubernetes/endpointslice.go new file mode 100644 index 000000000..4bb4bd5b6 --- /dev/null +++ b/discovery/kubernetes/endpointslice.go @@ -0,0 +1,407 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "context" + "net" + "strconv" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/common/model" + apiv1 "k8s.io/api/core/v1" + disv1beta1 "k8s.io/api/discovery/v1beta1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + + "github.com/prometheus/prometheus/discovery/targetgroup" + "github.com/prometheus/prometheus/util/strutil" +) + +var ( + epslAddCount = eventCount.WithLabelValues("endpointslice", "add") + epslUpdateCount = eventCount.WithLabelValues("endpointslice", "update") + epslDeleteCount = eventCount.WithLabelValues("endpointslice", "delete") +) + +// EndpointSlice discovers new endpoint targets. +type EndpointSlice struct { + logger log.Logger + + endpointSliceInf cache.SharedInformer + serviceInf cache.SharedInformer + podInf cache.SharedInformer + + podStore cache.Store + endpointSliceStore cache.Store + serviceStore cache.Store + + queue *workqueue.Type +} + +// NewEndpointSlice returns a new endpointslice discovery. +func NewEndpointSlice(l log.Logger, svc, eps, pod cache.SharedInformer) *EndpointSlice { + if l == nil { + l = log.NewNopLogger() + } + e := &EndpointSlice{ + logger: l, + endpointSliceInf: eps, + endpointSliceStore: eps.GetStore(), + serviceInf: svc, + serviceStore: svc.GetStore(), + podInf: pod, + podStore: pod.GetStore(), + queue: workqueue.NewNamed("endpointSlice"), + } + + e.endpointSliceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(o interface{}) { + epslAddCount.Inc() + e.enqueue(o) + }, + UpdateFunc: func(_, o interface{}) { + epslUpdateCount.Inc() + e.enqueue(o) + }, + DeleteFunc: func(o interface{}) { + epslDeleteCount.Inc() + e.enqueue(o) + }, + }) + + serviceUpdate := func(o interface{}) { + svc, err := convertToService(o) + if err != nil { + level.Error(e.logger).Log("msg", "converting to Service object failed", "err", err) + return + } + + // TODO(brancz): use cache.Indexer to index endpoints by + // disv1beta1.LabelServiceName so this operation doesn't have to + // iterate over all endpoint objects. + for _, obj := range e.endpointSliceStore.List() { + ep := obj.(*disv1beta1.EndpointSlice) + if lv, exists := ep.Labels[disv1beta1.LabelServiceName]; exists && lv == svc.Name { + e.enqueue(ep) + } + } + } + e.serviceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(o interface{}) { + svcAddCount.Inc() + serviceUpdate(o) + }, + UpdateFunc: func(_, o interface{}) { + svcUpdateCount.Inc() + serviceUpdate(o) + }, + DeleteFunc: func(o interface{}) { + svcDeleteCount.Inc() + serviceUpdate(o) + }, + }) + + return e +} + +func (e *EndpointSlice) enqueue(obj interface{}) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + return + } + + e.queue.Add(key) +} + +// Run implements the Discoverer interface. +func (e *EndpointSlice) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { + defer e.queue.ShutDown() + + if !cache.WaitForCacheSync(ctx.Done(), e.endpointSliceInf.HasSynced, e.serviceInf.HasSynced, e.podInf.HasSynced) { + if ctx.Err() != context.Canceled { + level.Error(e.logger).Log("msg", "endpointslice informer unable to sync cache") + } + return + } + + go func() { + for e.process(ctx, ch) { + } + }() + + // Block until the target provider is explicitly canceled. + <-ctx.Done() +} + +func (e *EndpointSlice) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool { + keyObj, quit := e.queue.Get() + if quit { + return false + } + defer e.queue.Done(keyObj) + key := keyObj.(string) + + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + level.Error(e.logger).Log("msg", "splitting key failed", "key", key) + return true + } + + o, exists, err := e.endpointSliceStore.GetByKey(key) + if err != nil { + level.Error(e.logger).Log("msg", "getting object from store failed", "key", key) + return true + } + if !exists { + send(ctx, ch, &targetgroup.Group{Source: endpointSliceSourceFromNamespaceAndName(namespace, name)}) + return true + } + eps, err := convertToEndpointSlice(o) + if err != nil { + level.Error(e.logger).Log("msg", "converting to EndpointSlice object failed", "err", err) + return true + } + send(ctx, ch, e.buildEndpointSlice(eps)) + return true +} + +func convertToEndpointSlice(o interface{}) (*disv1beta1.EndpointSlice, error) { + endpoints, ok := o.(*disv1beta1.EndpointSlice) + if ok { + return endpoints, nil + } + + return nil, errors.Errorf("received unexpected object: %v", o) +} + +func endpointSliceSource(ep *disv1beta1.EndpointSlice) string { + return endpointSliceSourceFromNamespaceAndName(ep.Namespace, ep.Name) +} + +func endpointSliceSourceFromNamespaceAndName(namespace, name string) string { + return "endpointslice/" + namespace + "/" + name +} + +const ( + endpointSliceNameLabel = metaLabelPrefix + "endpointslice_name" + endpointSliceAddressTypeLabel = metaLabelPrefix + "endpointslice_address_type" + endpointSlicePortNameLabel = metaLabelPrefix + "endpointslice_port_name" + endpointSlicePortProtocolLabel = metaLabelPrefix + "endpointslice_port_protocol" + endpointSlicePortLabel = metaLabelPrefix + "endpointslice_port" + endpointSlicePortAppProtocol = metaLabelPrefix + "endpointslice_port_app_protocol" + endpointSliceEndpointConditionsReadyLabel = metaLabelPrefix + "endpointslice_endpoint_conditions_ready" + endpointSliceEndpointHostnameLabel = metaLabelPrefix + "endpointslice_endpoint_hostname" + endpointSliceAddressTargetKindLabel = metaLabelPrefix + "endpointslice_address_target_kind" + endpointSliceAddressTargetNameLabel = metaLabelPrefix + "endpointslice_address_target_name" + endpointSliceEndpointTopologyLabelPrefix = metaLabelPrefix + "endpointslice_endpoint_topology_" + endpointSliceEndpointTopologyLabelPresentPrefix = metaLabelPrefix + "endpointslice_endpoint_topology_present_" +) + +func (e *EndpointSlice) buildEndpointSlice(eps *disv1beta1.EndpointSlice) *targetgroup.Group { + tg := &targetgroup.Group{ + Source: endpointSliceSource(eps), + } + tg.Labels = model.LabelSet{ + namespaceLabel: lv(eps.Namespace), + endpointSliceNameLabel: lv(eps.Name), + endpointSliceAddressTypeLabel: lv(string(eps.AddressType)), + } + e.addServiceLabels(eps, tg) + + type podEntry struct { + pod *apiv1.Pod + servicePorts []disv1beta1.EndpointPort + } + seenPods := map[string]*podEntry{} + + add := func(addr string, ep disv1beta1.Endpoint, port disv1beta1.EndpointPort) { + a := addr + if port.Port != nil { + a = net.JoinHostPort(addr, strconv.FormatUint(uint64(*port.Port), 10)) + } + + target := model.LabelSet{ + model.AddressLabel: lv(a), + } + + if port.Name != nil { + target[endpointSlicePortNameLabel] = lv(*port.Name) + } + + if port.Protocol != nil { + target[endpointSlicePortProtocolLabel] = lv(string(*port.Protocol)) + } + + if port.Port != nil { + target[endpointSlicePortLabel] = lv(strconv.FormatUint(uint64(*port.Port), 10)) + } + + if port.AppProtocol != nil { + target[endpointSlicePortAppProtocol] = lv(*port.AppProtocol) + } + + if ep.Conditions.Ready != nil { + target[endpointSliceEndpointConditionsReadyLabel] = lv(strconv.FormatBool(*ep.Conditions.Ready)) + } + + if ep.Hostname != nil { + target[endpointSliceEndpointHostnameLabel] = lv(*ep.Hostname) + } + + if ep.TargetRef != nil { + target[model.LabelName(endpointSliceAddressTargetKindLabel)] = lv(ep.TargetRef.Kind) + target[model.LabelName(endpointSliceAddressTargetNameLabel)] = lv(ep.TargetRef.Name) + } + + for k, v := range ep.Topology { + ln := strutil.SanitizeLabelName(k) + target[model.LabelName(endpointSliceEndpointTopologyLabelPrefix+ln)] = lv(v) + target[model.LabelName(endpointSliceEndpointTopologyLabelPresentPrefix+ln)] = presentValue + } + + pod := e.resolvePodRef(ep.TargetRef) + if pod == nil { + // This target is not a Pod, so don't continue with Pod specific logic. + tg.Targets = append(tg.Targets, target) + return + } + s := pod.Namespace + "/" + pod.Name + + sp, ok := seenPods[s] + if !ok { + sp = &podEntry{pod: pod} + seenPods[s] = sp + } + + // Attach standard pod labels. + target = target.Merge(podLabels(pod)) + + // Attach potential container port labels matching the endpoint port. + for _, c := range pod.Spec.Containers { + for _, cport := range c.Ports { + if port.Port == nil { + continue + } + if *port.Port == cport.ContainerPort { + ports := strconv.FormatUint(uint64(*port.Port), 10) + + target[podContainerNameLabel] = lv(c.Name) + target[podContainerPortNameLabel] = lv(cport.Name) + target[podContainerPortNumberLabel] = lv(ports) + target[podContainerPortProtocolLabel] = lv(string(cport.Protocol)) + break + } + } + } + + // Add service port so we know that we have already generated a target + // for it. + sp.servicePorts = append(sp.servicePorts, port) + tg.Targets = append(tg.Targets, target) + } + + for _, ep := range eps.Endpoints { + for _, port := range eps.Ports { + for _, addr := range ep.Addresses { + add(addr, ep, port) + } + } + } + + // For all seen pods, check all container ports. If they were not covered + // by one of the service endpoints, generate targets for them. + for _, pe := range seenPods { + for _, c := range pe.pod.Spec.Containers { + for _, cport := range c.Ports { + hasSeenPort := func() bool { + for _, eport := range pe.servicePorts { + if eport.Port == nil { + continue + } + if cport.ContainerPort == *eport.Port { + return true + } + } + return false + } + if hasSeenPort() { + continue + } + + a := net.JoinHostPort(pe.pod.Status.PodIP, strconv.FormatUint(uint64(cport.ContainerPort), 10)) + ports := strconv.FormatUint(uint64(cport.ContainerPort), 10) + + target := model.LabelSet{ + model.AddressLabel: lv(a), + podContainerNameLabel: lv(c.Name), + podContainerPortNameLabel: lv(cport.Name), + podContainerPortNumberLabel: lv(ports), + podContainerPortProtocolLabel: lv(string(cport.Protocol)), + } + tg.Targets = append(tg.Targets, target.Merge(podLabels(pe.pod))) + } + } + } + + return tg +} + +func (e *EndpointSlice) resolvePodRef(ref *apiv1.ObjectReference) *apiv1.Pod { + if ref == nil || ref.Kind != "Pod" { + return nil + } + p := &apiv1.Pod{} + p.Namespace = ref.Namespace + p.Name = ref.Name + + obj, exists, err := e.podStore.Get(p) + if err != nil { + level.Error(e.logger).Log("msg", "resolving pod ref failed", "err", err) + return nil + } + if !exists { + return nil + } + return obj.(*apiv1.Pod) +} + +func (e *EndpointSlice) addServiceLabels(eps *disv1beta1.EndpointSlice, tg *targetgroup.Group) { + var ( + svc = &apiv1.Service{} + found bool + ) + svc.Namespace = eps.Namespace + + // Every EndpointSlice object has the Service they belong to in the + // kubernetes.io/service-name label. + svc.Name, found = eps.Labels[disv1beta1.LabelServiceName] + if !found { + return + } + + obj, exists, err := e.serviceStore.Get(svc) + if err != nil { + level.Error(e.logger).Log("msg", "retrieving service failed", "err", err) + return + } + if !exists { + return + } + svc = obj.(*apiv1.Service) + + tg.Labels = tg.Labels.Merge(serviceLabels(svc)) +} diff --git a/discovery/kubernetes/endpointslice_test.go b/discovery/kubernetes/endpointslice_test.go new file mode 100644 index 000000000..b6cb57081 --- /dev/null +++ b/discovery/kubernetes/endpointslice_test.go @@ -0,0 +1,631 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "context" + "testing" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/discovery/targetgroup" + v1 "k8s.io/api/core/v1" + disv1beta1 "k8s.io/api/discovery/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" +) + +func strptr(str string) *string { + return &str +} + +func boolptr(b bool) *bool { + return &b +} + +func int32ptr(i int32) *int32 { + return &i +} + +func protocolptr(p v1.Protocol) *v1.Protocol { + return &p +} + +func makeEndpointSlice() *disv1beta1.EndpointSlice { + return &disv1beta1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testendpoints", + Namespace: "default", + Labels: map[string]string{ + disv1beta1.LabelServiceName: "testendpoints", + }, + }, + AddressType: disv1beta1.AddressTypeIPv4, + Ports: []disv1beta1.EndpointPort{ + { + Name: strptr("testport"), + Port: int32ptr(9000), + Protocol: protocolptr(v1.ProtocolTCP), + }, + }, + Endpoints: []disv1beta1.Endpoint{ + { + Addresses: []string{"1.2.3.4"}, + Hostname: strptr("testendpoint1"), + }, { + Addresses: []string{"2.3.4.5"}, + Conditions: disv1beta1.EndpointConditions{ + Ready: boolptr(true), + }, + }, { + Addresses: []string{"3.4.5.6"}, + Conditions: disv1beta1.EndpointConditions{ + Ready: boolptr(false), + }, + }, + }, + } +} + +func TestEndpointSliceDiscoveryBeforeRun(t *testing.T) { + n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{}) + + k8sDiscoveryTest{ + discovery: n, + beforeRun: func() { + obj := makeEndpointSlice() + c.DiscoveryV1beta1().EndpointSlices(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{}) + }, + expectedMaxItems: 1, + expectedRes: map[string]*targetgroup.Group{ + "endpointslice/default/testendpoints": { + Targets: []model.LabelSet{ + { + "__address__": "1.2.3.4:9000", + "__meta_kubernetes_endpointslice_endpoint_hostname": "testendpoint1", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + }, + { + "__address__": "2.3.4.5:9000", + "__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + }, + { + "__address__": "3.4.5.6:9000", + "__meta_kubernetes_endpointslice_endpoint_conditions_ready": "false", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + }, + }, + Labels: model.LabelSet{ + "__meta_kubernetes_endpointslice_address_type": "IPv4", + "__meta_kubernetes_namespace": "default", + "__meta_kubernetes_endpointslice_name": "testendpoints", + }, + Source: "endpointslice/default/testendpoints", + }, + }, + }.Run(t) +} + +func TestEndpointSliceDiscoveryAdd(t *testing.T) { + obj := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpod", + Namespace: "default", + UID: types.UID("deadbeef"), + }, + Spec: v1.PodSpec{ + NodeName: "testnode", + Containers: []v1.Container{ + { + Name: "c1", + Ports: []v1.ContainerPort{ + { + Name: "mainport", + ContainerPort: 9000, + Protocol: v1.ProtocolTCP, + }, + }, + }, + { + Name: "c2", + Ports: []v1.ContainerPort{ + { + Name: "sideport", + ContainerPort: 9001, + Protocol: v1.ProtocolTCP, + }, + }, + }, + }, + }, + Status: v1.PodStatus{ + HostIP: "2.3.4.5", + PodIP: "1.2.3.4", + }, + } + n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{}, obj) + + k8sDiscoveryTest{ + discovery: n, + afterStart: func() { + obj := &disv1beta1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testendpoints", + Namespace: "default", + }, + AddressType: disv1beta1.AddressTypeIPv4, + Ports: []disv1beta1.EndpointPort{ + { + Name: strptr("testport"), + Port: int32ptr(9000), + Protocol: protocolptr(v1.ProtocolTCP), + }, + }, + Endpoints: []disv1beta1.Endpoint{ + { + Addresses: []string{"4.3.2.1"}, + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Name: "testpod", + Namespace: "default", + }, + Conditions: disv1beta1.EndpointConditions{ + Ready: boolptr(false), + }, + }, + }, + } + c.DiscoveryV1beta1().EndpointSlices(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{}) + }, + expectedMaxItems: 1, + expectedRes: map[string]*targetgroup.Group{ + "endpointslice/default/testendpoints": { + Targets: []model.LabelSet{ + { + "__address__": "4.3.2.1:9000", + "__meta_kubernetes_endpointslice_address_target_kind": "Pod", + "__meta_kubernetes_endpointslice_address_target_name": "testpod", + "__meta_kubernetes_endpointslice_endpoint_conditions_ready": "false", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + "__meta_kubernetes_pod_container_name": "c1", + "__meta_kubernetes_pod_container_port_name": "mainport", + "__meta_kubernetes_pod_container_port_number": "9000", + "__meta_kubernetes_pod_container_port_protocol": "TCP", + "__meta_kubernetes_pod_host_ip": "2.3.4.5", + "__meta_kubernetes_pod_ip": "1.2.3.4", + "__meta_kubernetes_pod_name": "testpod", + "__meta_kubernetes_pod_node_name": "testnode", + "__meta_kubernetes_pod_phase": "", + "__meta_kubernetes_pod_ready": "unknown", + "__meta_kubernetes_pod_uid": "deadbeef", + }, + { + "__address__": "1.2.3.4:9001", + "__meta_kubernetes_pod_container_name": "c2", + "__meta_kubernetes_pod_container_port_name": "sideport", + "__meta_kubernetes_pod_container_port_number": "9001", + "__meta_kubernetes_pod_container_port_protocol": "TCP", + "__meta_kubernetes_pod_host_ip": "2.3.4.5", + "__meta_kubernetes_pod_ip": "1.2.3.4", + "__meta_kubernetes_pod_name": "testpod", + "__meta_kubernetes_pod_node_name": "testnode", + "__meta_kubernetes_pod_phase": "", + "__meta_kubernetes_pod_ready": "unknown", + "__meta_kubernetes_pod_uid": "deadbeef", + }, + }, + Labels: model.LabelSet{ + "__meta_kubernetes_endpointslice_address_type": "IPv4", + "__meta_kubernetes_endpointslice_name": "testendpoints", + "__meta_kubernetes_namespace": "default", + }, + Source: "endpointslice/default/testendpoints", + }, + }, + }.Run(t) +} + +func TestEndpointSliceDiscoveryDelete(t *testing.T) { + n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{}, makeEndpointSlice()) + + k8sDiscoveryTest{ + discovery: n, + afterStart: func() { + obj := makeEndpointSlice() + c.DiscoveryV1beta1().EndpointSlices(obj.Namespace).Delete(context.Background(), obj.Name, metav1.DeleteOptions{}) + }, + expectedMaxItems: 2, + expectedRes: map[string]*targetgroup.Group{ + "endpointslice/default/testendpoints": { + Source: "endpointslice/default/testendpoints", + }, + }, + }.Run(t) +} + +func TestEndpointSliceDiscoveryUpdate(t *testing.T) { + n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{}, makeEndpointSlice()) + + k8sDiscoveryTest{ + discovery: n, + afterStart: func() { + obj := &disv1beta1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testendpoints", + Namespace: "default", + }, + AddressType: disv1beta1.AddressTypeIPv4, + Ports: []disv1beta1.EndpointPort{ + { + Name: strptr("testport"), + Port: int32ptr(9000), + Protocol: protocolptr(v1.ProtocolTCP), + }, + }, + Endpoints: []disv1beta1.Endpoint{ + { + Addresses: []string{"1.2.3.4"}, + Hostname: strptr("testendpoint1"), + }, { + Addresses: []string{"2.3.4.5"}, + Conditions: disv1beta1.EndpointConditions{ + Ready: boolptr(true), + }, + }, + }, + } + c.DiscoveryV1beta1().EndpointSlices(obj.Namespace).Update(context.Background(), obj, metav1.UpdateOptions{}) + }, + expectedMaxItems: 2, + expectedRes: map[string]*targetgroup.Group{ + "endpointslice/default/testendpoints": { + Targets: []model.LabelSet{ + { + "__address__": "1.2.3.4:9000", + "__meta_kubernetes_endpointslice_endpoint_hostname": "testendpoint1", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + }, + { + "__address__": "2.3.4.5:9000", + "__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + }, + }, + Labels: model.LabelSet{ + "__meta_kubernetes_endpointslice_address_type": "IPv4", + "__meta_kubernetes_endpointslice_name": "testendpoints", + "__meta_kubernetes_namespace": "default", + }, + Source: "endpointslice/default/testendpoints", + }, + }, + }.Run(t) +} + +func TestEndpointSliceDiscoveryEmptyEndpoints(t *testing.T) { + n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{}, makeEndpointSlice()) + + k8sDiscoveryTest{ + discovery: n, + afterStart: func() { + obj := &disv1beta1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testendpoints", + Namespace: "default", + }, + AddressType: disv1beta1.AddressTypeIPv4, + Ports: []disv1beta1.EndpointPort{ + { + Name: strptr("testport"), + Port: int32ptr(9000), + Protocol: protocolptr(v1.ProtocolTCP), + }, + }, + Endpoints: []disv1beta1.Endpoint{}, + } + c.DiscoveryV1beta1().EndpointSlices(obj.Namespace).Update(context.Background(), obj, metav1.UpdateOptions{}) + }, + expectedMaxItems: 2, + expectedRes: map[string]*targetgroup.Group{ + "endpointslice/default/testendpoints": { + Labels: model.LabelSet{ + "__meta_kubernetes_endpointslice_address_type": "IPv4", + "__meta_kubernetes_endpointslice_name": "testendpoints", + "__meta_kubernetes_namespace": "default", + }, + Source: "endpointslice/default/testendpoints", + }, + }, + }.Run(t) +} + +func TestEndpointSliceDiscoveryWithService(t *testing.T) { + n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{}, makeEndpointSlice()) + + k8sDiscoveryTest{ + discovery: n, + beforeRun: func() { + obj := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testendpoints", + Namespace: "default", + Labels: map[string]string{ + "app/name": "test", + }, + }, + } + c.CoreV1().Services(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{}) + }, + expectedMaxItems: 1, + expectedRes: map[string]*targetgroup.Group{ + "endpointslice/default/testendpoints": { + Targets: []model.LabelSet{ + { + "__address__": "1.2.3.4:9000", + "__meta_kubernetes_endpointslice_endpoint_hostname": "testendpoint1", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + }, + { + "__address__": "2.3.4.5:9000", + "__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + }, + { + "__address__": "3.4.5.6:9000", + "__meta_kubernetes_endpointslice_endpoint_conditions_ready": "false", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + }, + }, + Labels: model.LabelSet{ + "__meta_kubernetes_endpointslice_address_type": "IPv4", + "__meta_kubernetes_endpointslice_name": "testendpoints", + "__meta_kubernetes_namespace": "default", + "__meta_kubernetes_service_label_app_name": "test", + "__meta_kubernetes_service_labelpresent_app_name": "true", + "__meta_kubernetes_service_name": "testendpoints", + }, + Source: "endpointslice/default/testendpoints", + }, + }, + }.Run(t) +} + +func TestEndpointSliceDiscoveryWithServiceUpdate(t *testing.T) { + n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{}, makeEndpointSlice()) + + k8sDiscoveryTest{ + discovery: n, + beforeRun: func() { + obj := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testendpoints", + Namespace: "default", + Labels: map[string]string{ + "app/name": "test", + }, + }, + } + c.CoreV1().Services(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{}) + }, + afterStart: func() { + obj := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testendpoints", + Namespace: "default", + Labels: map[string]string{ + "app/name": "svc", + "component": "testing", + }, + }, + } + c.CoreV1().Services(obj.Namespace).Update(context.Background(), obj, metav1.UpdateOptions{}) + }, + expectedMaxItems: 2, + expectedRes: map[string]*targetgroup.Group{ + "endpointslice/default/testendpoints": { + Targets: []model.LabelSet{ + { + "__address__": "1.2.3.4:9000", + "__meta_kubernetes_endpointslice_endpoint_hostname": "testendpoint1", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + }, + { + "__address__": "2.3.4.5:9000", + "__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + }, + { + "__address__": "3.4.5.6:9000", + "__meta_kubernetes_endpointslice_endpoint_conditions_ready": "false", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + }, + }, + Labels: model.LabelSet{ + "__meta_kubernetes_endpointslice_address_type": "IPv4", + "__meta_kubernetes_endpointslice_name": "testendpoints", + "__meta_kubernetes_namespace": "default", + "__meta_kubernetes_service_label_app_name": "svc", + "__meta_kubernetes_service_label_component": "testing", + "__meta_kubernetes_service_labelpresent_app_name": "true", + "__meta_kubernetes_service_labelpresent_component": "true", + "__meta_kubernetes_service_name": "testendpoints", + }, + Source: "endpointslice/default/testendpoints", + }, + }, + }.Run(t) +} + +func TestEndpointSliceDiscoveryNamespaces(t *testing.T) { + epOne := makeEndpointSlice() + epOne.Namespace = "ns1" + objs := []runtime.Object{ + epOne, + &disv1beta1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testendpoints", + Namespace: "ns2", + }, + AddressType: disv1beta1.AddressTypeIPv4, + Ports: []disv1beta1.EndpointPort{ + { + Name: strptr("testport"), + Port: int32ptr(9000), + Protocol: protocolptr(v1.ProtocolTCP), + }, + }, + Endpoints: []disv1beta1.Endpoint{ + { + Addresses: []string{"4.3.2.1"}, + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Name: "testpod", + Namespace: "ns2", + }, + }, + }, + }, + &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testendpoints", + Namespace: "ns1", + Labels: map[string]string{ + "app": "app1", + }, + }, + }, + &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpod", + Namespace: "ns2", + UID: types.UID("deadbeef"), + }, + Spec: v1.PodSpec{ + NodeName: "testnode", + Containers: []v1.Container{ + { + Name: "c1", + Ports: []v1.ContainerPort{ + { + Name: "mainport", + ContainerPort: 9000, + Protocol: v1.ProtocolTCP, + }, + }, + }, + }, + }, + Status: v1.PodStatus{ + HostIP: "2.3.4.5", + PodIP: "4.3.2.1", + }, + }, + } + n, _ := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"ns1", "ns2"}}, objs...) + + k8sDiscoveryTest{ + discovery: n, + expectedMaxItems: 2, + expectedRes: map[string]*targetgroup.Group{ + "endpointslice/ns1/testendpoints": { + Targets: []model.LabelSet{ + { + "__address__": "1.2.3.4:9000", + "__meta_kubernetes_endpointslice_endpoint_hostname": "testendpoint1", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + }, + { + "__address__": "2.3.4.5:9000", + "__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + }, + { + "__address__": "3.4.5.6:9000", + "__meta_kubernetes_endpointslice_endpoint_conditions_ready": "false", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + }, + }, + Labels: model.LabelSet{ + "__meta_kubernetes_endpointslice_address_type": "IPv4", + "__meta_kubernetes_endpointslice_name": "testendpoints", + "__meta_kubernetes_namespace": "ns1", + "__meta_kubernetes_service_label_app": "app1", + "__meta_kubernetes_service_labelpresent_app": "true", + "__meta_kubernetes_service_name": "testendpoints", + }, + Source: "endpointslice/ns1/testendpoints", + }, + "endpointslice/ns2/testendpoints": { + Targets: []model.LabelSet{ + { + "__address__": "4.3.2.1:9000", + "__meta_kubernetes_endpointslice_address_target_kind": "Pod", + "__meta_kubernetes_endpointslice_address_target_name": "testpod", + "__meta_kubernetes_endpointslice_port": "9000", + "__meta_kubernetes_endpointslice_port_name": "testport", + "__meta_kubernetes_endpointslice_port_protocol": "TCP", + "__meta_kubernetes_pod_container_name": "c1", + "__meta_kubernetes_pod_container_port_name": "mainport", + "__meta_kubernetes_pod_container_port_number": "9000", + "__meta_kubernetes_pod_container_port_protocol": "TCP", + "__meta_kubernetes_pod_host_ip": "2.3.4.5", + "__meta_kubernetes_pod_ip": "4.3.2.1", + "__meta_kubernetes_pod_name": "testpod", + "__meta_kubernetes_pod_node_name": "testnode", + "__meta_kubernetes_pod_phase": "", + "__meta_kubernetes_pod_ready": "unknown", + "__meta_kubernetes_pod_uid": "deadbeef", + }, + }, + Labels: model.LabelSet{ + "__meta_kubernetes_endpointslice_address_type": "IPv4", + "__meta_kubernetes_endpointslice_name": "testendpoints", + "__meta_kubernetes_namespace": "ns2", + }, + Source: "endpointslice/ns2/testendpoints", + }, + }, + }.Run(t) +} diff --git a/discovery/kubernetes/kubernetes.go b/discovery/kubernetes/kubernetes.go index e2279bac4..b7d53e8ba 100644 --- a/discovery/kubernetes/kubernetes.go +++ b/discovery/kubernetes/kubernetes.go @@ -27,6 +27,7 @@ import ( config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" apiv1 "k8s.io/api/core/v1" + disv1beta1 "k8s.io/api/discovery/v1beta1" "k8s.io/api/networking/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -67,11 +68,12 @@ type Role string // The valid options for Role. const ( - RoleNode Role = "node" - RolePod Role = "pod" - RoleService Role = "service" - RoleEndpoint Role = "endpoints" - RoleIngress Role = "ingress" + RoleNode Role = "node" + RolePod Role = "pod" + RoleService Role = "service" + RoleEndpoint Role = "endpoints" + RoleEndpointSlice Role = "endpointslice" + RoleIngress Role = "ingress" ) // UnmarshalYAML implements the yaml.Unmarshaler interface. @@ -97,11 +99,12 @@ type SDConfig struct { } type roleSelector struct { - node resourceSelector - pod resourceSelector - service resourceSelector - endpoints resourceSelector - ingress resourceSelector + node resourceSelector + pod resourceSelector + service resourceSelector + endpoints resourceSelector + endpointslice resourceSelector + ingress resourceSelector } type SelectorConfig struct { @@ -124,7 +127,7 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { return err } if c.Role == "" { - return errors.Errorf("role missing (one of: pod, service, endpoints, node, ingress)") + return errors.Errorf("role missing (one of: pod, service, endpoints, endpointslice, node, ingress)") } err = c.HTTPClientConfig.Validate() if err != nil { @@ -136,11 +139,12 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { foundSelectorRoles := make(map[Role]struct{}) allowedSelectors := map[Role][]string{ - RolePod: {string(RolePod)}, - RoleService: {string(RoleService)}, - RoleEndpoint: {string(RolePod), string(RoleService), string(RoleEndpoint)}, - RoleNode: {string(RoleNode)}, - RoleIngress: {string(RoleIngress)}, + RolePod: {string(RolePod)}, + RoleService: {string(RoleService)}, + RoleEndpointSlice: {string(RolePod), string(RoleService), string(RoleEndpointSlice)}, + RoleEndpoint: {string(RolePod), string(RoleService), string(RoleEndpoint)}, + RoleNode: {string(RoleNode)}, + RoleIngress: {string(RoleIngress)}, } for _, selector := range c.Selectors { @@ -150,7 +154,7 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { foundSelectorRoles[selector.Role] = struct{}{} if _, ok := allowedSelectors[c.Role]; !ok { - return errors.Errorf("invalid role: %q, expecting one of: pod, service, endpoints, node or ingress", c.Role) + return errors.Errorf("invalid role: %q, expecting one of: pod, service, endpoints, endpointslice, node or ingress", c.Role) } var allowed bool for _, role := range allowedSelectors[c.Role] { @@ -193,7 +197,7 @@ func init() { prometheus.MustRegister(eventCount) // Initialize metric vectors. - for _, role := range []string{"endpoints", "node", "pod", "service", "ingress"} { + for _, role := range []string{"endpointslice", "endpoints", "node", "pod", "service", "ingress"} { for _, evt := range []string{"add", "delete", "update"} { eventCount.WithLabelValues(role, evt) } @@ -282,6 +286,9 @@ func mapSelector(rawSelector []SelectorConfig) roleSelector { rs := roleSelector{} for _, resourceSelectorRaw := range rawSelector { switch resourceSelectorRaw.Role { + case RoleEndpointSlice: + rs.endpointslice.field = resourceSelectorRaw.Field + rs.endpointslice.label = resourceSelectorRaw.Label case RoleEndpoint: rs.endpoints.field = resourceSelectorRaw.Field rs.endpoints.label = resourceSelectorRaw.Label @@ -310,6 +317,58 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { namespaces := d.getNamespaces() switch d.role { + case RoleEndpointSlice: + for _, namespace := range namespaces { + e := d.client.DiscoveryV1beta1().EndpointSlices(namespace) + elw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.FieldSelector = d.selectors.endpointslice.field + options.LabelSelector = d.selectors.endpointslice.label + return e.List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.FieldSelector = d.selectors.endpointslice.field + options.LabelSelector = d.selectors.endpointslice.label + return e.Watch(ctx, options) + }, + } + s := d.client.CoreV1().Services(namespace) + slw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.FieldSelector = d.selectors.service.field + options.LabelSelector = d.selectors.service.label + return s.List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.FieldSelector = d.selectors.service.field + options.LabelSelector = d.selectors.service.label + return s.Watch(ctx, options) + }, + } + p := d.client.CoreV1().Pods(namespace) + plw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.FieldSelector = d.selectors.pod.field + options.LabelSelector = d.selectors.pod.label + return p.List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.FieldSelector = d.selectors.pod.field + options.LabelSelector = d.selectors.pod.label + return p.Watch(ctx, options) + }, + } + eps := NewEndpointSlice( + log.With(d.logger, "role", "endpointslice"), + cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod), + cache.NewSharedInformer(elw, &disv1beta1.EndpointSlice{}, resyncPeriod), + cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod), + ) + d.discoverers = append(d.discoverers, eps) + go eps.endpointSliceInf.Run(ctx.Done()) + go eps.serviceInf.Run(ctx.Done()) + go eps.podInf.Run(ctx.Done()) + } case RoleEndpoint: for _, namespace := range namespaces { e := d.client.CoreV1().Endpoints(namespace) diff --git a/discovery/kubernetes/kubernetes_test.go b/discovery/kubernetes/kubernetes_test.go index eb1f97fa2..bc30cc09a 100644 --- a/discovery/kubernetes/kubernetes_test.go +++ b/discovery/kubernetes/kubernetes_test.go @@ -158,6 +158,7 @@ type hasSynced interface { var _ hasSynced = &Discovery{} var _ hasSynced = &Node{} var _ hasSynced = &Endpoints{} +var _ hasSynced = &EndpointSlice{} var _ hasSynced = &Ingress{} var _ hasSynced = &Pod{} var _ hasSynced = &Service{} @@ -183,6 +184,10 @@ func (e *Endpoints) hasSynced() bool { return e.endpointsInf.HasSynced() && e.serviceInf.HasSynced() && e.podInf.HasSynced() } +func (e *EndpointSlice) hasSynced() bool { + return e.endpointSliceInf.HasSynced() && e.serviceInf.HasSynced() && e.podInf.HasSynced() +} + func (i *Ingress) hasSynced() bool { return i.informer.HasSynced() }