Allow attaching node metadata

Signed-off-by: fpetkovski <filip.petkovsky@gmail.com>
pull/10080/head
fpetkovski 2021-12-23 10:50:00 +01:00
parent fdb6916baf
commit fa798d3042
No known key found for this signature in database
GPG Key ID: 431B0F2E85E42402
5 changed files with 176 additions and 29 deletions

View File

@ -122,6 +122,7 @@ type SDConfig struct {
HTTPClientConfig config.HTTPClientConfig `yaml:",inline"`
NamespaceDiscovery NamespaceDiscovery `yaml:"namespaces,omitempty"`
Selectors []SelectorConfig `yaml:"selectors,omitempty"`
AttachMetadata AttachMetadataConfig `yaml:"attach_metadata,omitempty"`
}
// Name returns the name of the Config.
@ -158,6 +159,12 @@ type resourceSelector struct {
field string
}
// AttachMetadataConfig is the configuration for attaching additional metadata
// coming from nodes on which the targets are scheduled.
type AttachMetadataConfig struct {
Node bool `yaml:"node"`
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = DefaultSDConfig
@ -259,6 +266,7 @@ type Discovery struct {
discoverers []discovery.Discoverer
selectors roleSelector
ownNamespace string
attachMetadata AttachMetadataConfig
}
func (d *Discovery) getNamespaces() []string {
@ -337,6 +345,7 @@ func New(l log.Logger, conf *SDConfig) (*Discovery, error) {
discoverers: make([]discovery.Discoverer, 0),
selectors: mapSelector(conf.Selectors),
ownNamespace: ownNamespace,
attachMetadata: conf.AttachMetadata,
}, nil
}
@ -480,6 +489,12 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
go eps.podInf.Run(ctx.Done())
}
case RolePod:
var nodeInformer cache.SharedInformer
if d.attachMetadata.Node {
nodeInformer = d.newNodeInformer(ctx)
go nodeInformer.Run(ctx.Done())
}
for _, namespace := range namespaces {
p := d.client.CoreV1().Pods(namespace)
plw := &cache.ListWatch{
@ -497,9 +512,10 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
pod := NewPod(
log.With(d.logger, "role", "pod"),
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod),
nodeInformer,
)
d.discoverers = append(d.discoverers, pod)
go pod.informer.Run(ctx.Done())
go pod.podInf.Run(ctx.Done())
}
case RoleService:
for _, namespace := range namespaces {
@ -581,22 +597,8 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
go ingress.informer.Run(ctx.Done())
}
case RoleNode:
nlw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = d.selectors.node.field
options.LabelSelector = d.selectors.node.label
return d.client.CoreV1().Nodes().List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = d.selectors.node.field
options.LabelSelector = d.selectors.node.label
return d.client.CoreV1().Nodes().Watch(ctx, options)
},
}
node := NewNode(
log.With(d.logger, "role", "node"),
cache.NewSharedInformer(nlw, &apiv1.Node{}, resyncPeriod),
)
nodeInformer := d.newNodeInformer(ctx)
node := NewNode(log.With(d.logger, "role", "node"), nodeInformer)
d.discoverers = append(d.discoverers, node)
go node.informer.Run(ctx.Done())
default:
@ -661,3 +663,19 @@ func checkNetworkingV1Supported(client kubernetes.Interface) (bool, error) {
// https://github.com/kubernetes/kubernetes/blob/master/CHANGELOG/CHANGELOG-1.19.md
return semVer.Major() >= 1 && semVer.Minor() >= 19, nil
}
func (d *Discovery) newNodeInformer(ctx context.Context) cache.SharedInformer {
nlw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = d.selectors.node.field
options.LabelSelector = d.selectors.node.label
return d.client.CoreV1().Nodes().List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = d.selectors.node.field
options.LabelSelector = d.selectors.node.label
return d.client.CoreV1().Nodes().Watch(ctx, options)
},
}
return cache.NewSharedInformer(nlw, &apiv1.Node{}, resyncPeriod)
}

View File

@ -58,6 +58,13 @@ func makeDiscoveryWithVersion(role Role, nsDiscovery NamespaceDiscovery, k8sVer
}, clientset
}
// makeDiscoveryWithMetadata creates a kubernetes.Discovery instance with the specified metadata config.
func makeDiscoveryWithMetadata(role Role, nsDiscovery NamespaceDiscovery, attachMetadata AttachMetadataConfig, objects ...runtime.Object) (*Discovery, kubernetes.Interface) {
d, k8s := makeDiscovery(role, nsDiscovery, objects...)
d.attachMetadata = attachMetadata
return d, k8s
}
type k8sDiscoveryTest struct {
// discovery is instance of discovery.Discoverer
discovery discovery.Discoverer
@ -215,7 +222,7 @@ func (i *Ingress) hasSynced() bool {
}
func (p *Pod) hasSynced() bool {
return p.informer.HasSynced()
return p.podInf.HasSynced()
}
func (s *Service) hasSynced() bool {

View File

@ -40,24 +40,28 @@ var (
// Pod discovers new pod targets.
type Pod struct {
informer cache.SharedInformer
store cache.Store
logger log.Logger
queue *workqueue.Type
podInf cache.SharedInformer
nodeInf cache.SharedInformer
withNodeMetadata bool
store cache.Store
logger log.Logger
queue *workqueue.Type
}
// NewPod creates a new pod discovery.
func NewPod(l log.Logger, pods cache.SharedInformer) *Pod {
func NewPod(l log.Logger, pods, nodes cache.SharedInformer) *Pod {
if l == nil {
l = log.NewNopLogger()
}
p := &Pod{
informer: pods,
store: pods.GetStore(),
logger: l,
queue: workqueue.NewNamed("pod"),
podInf: pods,
nodeInf: nodes,
withNodeMetadata: nodes != nil,
store: pods.GetStore(),
logger: l,
queue: workqueue.NewNamed("pod"),
}
p.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
p.podInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
podAddCount.Inc()
p.enqueue(o)
@ -71,6 +75,24 @@ func NewPod(l log.Logger, pods cache.SharedInformer) *Pod {
p.enqueue(o)
},
})
if p.withNodeMetadata {
p.nodeInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
node := o.(*apiv1.Node)
p.enqueuePodsForNode(node.Name)
},
UpdateFunc: func(_, o interface{}) {
node := o.(*apiv1.Node)
p.enqueuePodsForNode(node.Name)
},
DeleteFunc: func(o interface{}) {
node := o.(*apiv1.Node)
p.enqueuePodsForNode(node.Name)
},
})
}
return p
}
@ -87,7 +109,12 @@ func (p *Pod) enqueue(obj interface{}) {
func (p *Pod) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
defer p.queue.ShutDown()
if !cache.WaitForCacheSync(ctx.Done(), p.informer.HasSynced) {
cacheSyncs := []cache.InformerSynced{p.podInf.HasSynced}
if p.withNodeMetadata {
cacheSyncs = append(cacheSyncs, p.nodeInf.HasSynced)
}
if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
if ctx.Err() != context.Canceled {
level.Error(p.logger).Log("msg", "pod informer unable to sync cache")
}
@ -221,6 +248,9 @@ func (p *Pod) buildPod(pod *apiv1.Pod) *targetgroup.Group {
tg.Labels = podLabels(pod)
tg.Labels[namespaceLabel] = lv(pod.Namespace)
if p.withNodeMetadata {
p.attachNodeMetadata(tg, pod)
}
containers := append(pod.Spec.Containers, pod.Spec.InitContainers...)
for i, c := range containers {
@ -257,6 +287,36 @@ func (p *Pod) buildPod(pod *apiv1.Pod) *targetgroup.Group {
return tg
}
func (p *Pod) attachNodeMetadata(tg *targetgroup.Group, pod *apiv1.Pod) {
tg.Labels[nodeNameLabel] = lv(pod.Spec.NodeName)
obj, exists, err := p.nodeInf.GetStore().GetByKey(pod.Spec.NodeName)
if err != nil {
level.Error(p.logger).Log("msg", "Error getting node", "node", pod.Spec.NodeName, "err", err)
return
}
if !exists {
return
}
node := obj.(*apiv1.Node)
for k, v := range node.GetLabels() {
ln := strutil.SanitizeLabelName(k)
tg.Labels[model.LabelName(nodeLabelPrefix+ln)] = lv(v)
tg.Labels[model.LabelName(nodeLabelPresentPrefix+ln)] = presentValue
}
}
func (p *Pod) enqueuePodsForNode(nodeName string) {
for _, pod := range p.store.List() {
pod := pod.(*apiv1.Pod)
if pod.Spec.NodeName == nodeName {
p.enqueue(pod)
}
}
}
func podSource(pod *apiv1.Pod) string {
return podSourceFromNamespaceAndName(pod.Namespace, pod.Name)
}

View File

@ -190,6 +190,19 @@ func expectedPodTargetGroups(ns string) map[string]*targetgroup.Group {
}
}
func expectedPodTargetGroupsWithNodeMeta(ns, nodeName string, nodeLabels map[string]string) map[string]*targetgroup.Group {
result := expectedPodTargetGroups(ns)
for _, tg := range result {
tg.Labels["__meta_kubernetes_node_name"] = lv(nodeName)
for k, v := range nodeLabels {
tg.Labels[model.LabelName("__meta_kubernetes_node_label_"+k)] = lv(v)
tg.Labels[model.LabelName("__meta_kubernetes_node_labelpresent_"+k)] = lv("true")
}
}
return result
}
func TestPodDiscoveryBeforeRun(t *testing.T) {
n, c := makeDiscovery(RolePod, NamespaceDiscovery{})
@ -407,3 +420,46 @@ func TestPodDiscoveryOwnNamespace(t *testing.T) {
expectedRes: expected,
}.Run(t)
}
func TestPodDiscoveryWithNodeMetadata(t *testing.T) {
attachMetadata := AttachMetadataConfig{Node: true}
n, c := makeDiscoveryWithMetadata(RolePod, NamespaceDiscovery{}, attachMetadata)
nodeLbls := map[string]string{"l1": "v1"}
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
nodes := makeNode("testnode", "", "", nodeLbls, nil)
c.CoreV1().Nodes().Create(context.Background(), nodes, metav1.CreateOptions{})
pods := makePods()
c.CoreV1().Pods(pods.Namespace).Create(context.Background(), pods, metav1.CreateOptions{})
},
expectedMaxItems: 2,
expectedRes: expectedPodTargetGroupsWithNodeMeta("default", "testnode", nodeLbls),
}.Run(t)
}
func TestPodDiscoveryWithNodeMetadataUpdateNode(t *testing.T) {
nodeLbls := map[string]string{"l2": "v2"}
attachMetadata := AttachMetadataConfig{Node: true}
n, c := makeDiscoveryWithMetadata(RolePod, NamespaceDiscovery{}, attachMetadata)
k8sDiscoveryTest{
discovery: n,
beforeRun: func() {
oldNodeLbls := map[string]string{"l1": "v1"}
nodes := makeNode("testnode", "", "", oldNodeLbls, nil)
c.CoreV1().Nodes().Create(context.Background(), nodes, metav1.CreateOptions{})
},
afterStart: func() {
pods := makePods()
c.CoreV1().Pods(pods.Namespace).Create(context.Background(), pods, metav1.CreateOptions{})
nodes := makeNode("testnode", "", "", nodeLbls, nil)
c.CoreV1().Nodes().Update(context.Background(), nodes, metav1.UpdateOptions{})
},
expectedMaxItems: 2,
expectedRes: expectedPodTargetGroupsWithNodeMeta("default", "testnode", nodeLbls),
}.Run(t)
}

View File

@ -1720,6 +1720,12 @@ namespaces:
[ - role: <string>
[ label: <string> ]
[ field: <string> ] ]]
# Optional metadata to attach to discovered targets. If omitted, no additional metadata is attached.
attach_metadata:
# Attaches node metadata to discovered targets. Only valid for role: pod.
# When set to true, Prometheus must have permissions to get Nodes.
[ node: <boolean> | default = false ]
```
See [this example Prometheus configuration file](/documentation/examples/prometheus-kubernetes.yml)