2016-09-30 09:33:23 +00:00
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2016-10-07 12:53:11 +00:00
package kubernetes
2016-09-30 08:42:07 +00:00
import (
2017-10-25 04:21:42 +00:00
"context"
2016-09-30 08:42:07 +00:00
"net"
"strconv"
2021-06-11 16:17:59 +00:00
"github.com/go-kit/log"
"github.com/go-kit/log/level"
2019-03-25 23:01:12 +00:00
"github.com/pkg/errors"
2016-09-30 08:42:07 +00:00
"github.com/prometheus/common/model"
2018-07-03 07:37:22 +00:00
apiv1 "k8s.io/api/core/v1"
2017-05-11 08:29:10 +00:00
"k8s.io/client-go/tools/cache"
2018-04-09 16:35:14 +00:00
"k8s.io/client-go/util/workqueue"
2019-03-25 23:01:12 +00:00
"github.com/prometheus/prometheus/discovery/targetgroup"
2021-10-22 08:19:38 +00:00
"github.com/prometheus/prometheus/util/strutil"
2016-09-30 08:42:07 +00:00
)
2020-02-06 15:52:58 +00:00
var (
epAddCount = eventCount . WithLabelValues ( "endpoints" , "add" )
epUpdateCount = eventCount . WithLabelValues ( "endpoints" , "update" )
epDeleteCount = eventCount . WithLabelValues ( "endpoints" , "delete" )
)
2016-09-30 08:42:07 +00:00
// Endpoints discovers new endpoint targets.
type Endpoints struct {
logger log . Logger
endpointsInf cache . SharedInformer
2016-09-30 12:18:49 +00:00
serviceInf cache . SharedInformer
podInf cache . SharedInformer
2016-09-30 08:42:07 +00:00
podStore cache . Store
endpointsStore cache . Store
2016-09-30 12:18:49 +00:00
serviceStore cache . Store
2018-04-09 16:35:14 +00:00
queue * workqueue . Type
2016-09-30 08:42:07 +00:00
}
// NewEndpoints returns a new endpoints discovery.
func NewEndpoints ( l log . Logger , svc , eps , pod cache . SharedInformer ) * Endpoints {
2017-08-11 18:45:52 +00:00
if l == nil {
l = log . NewNopLogger ( )
}
2018-04-09 16:35:14 +00:00
e := & Endpoints {
2016-09-30 08:42:07 +00:00
logger : l ,
endpointsInf : eps ,
endpointsStore : eps . GetStore ( ) ,
2016-09-30 12:18:49 +00:00
serviceInf : svc ,
serviceStore : svc . GetStore ( ) ,
podInf : pod ,
2016-09-30 08:42:07 +00:00
podStore : pod . GetStore ( ) ,
2018-04-09 16:35:14 +00:00
queue : workqueue . NewNamed ( "endpoints" ) ,
2016-09-30 08:42:07 +00:00
}
e . endpointsInf . AddEventHandler ( cache . ResourceEventHandlerFuncs {
AddFunc : func ( o interface { } ) {
2020-02-06 15:52:58 +00:00
epAddCount . Inc ( )
2018-04-09 16:35:14 +00:00
e . enqueue ( o )
2016-09-30 08:42:07 +00:00
} ,
UpdateFunc : func ( _ , o interface { } ) {
2020-02-06 15:52:58 +00:00
epUpdateCount . Inc ( )
2018-04-09 16:35:14 +00:00
e . enqueue ( o )
2016-09-30 08:42:07 +00:00
} ,
DeleteFunc : func ( o interface { } ) {
2020-02-06 15:52:58 +00:00
epDeleteCount . Inc ( )
2018-04-09 16:35:14 +00:00
e . enqueue ( o )
2016-09-30 08:42:07 +00:00
} ,
} )
2016-11-14 15:21:38 +00:00
serviceUpdate := func ( o interface { } ) {
svc , err := convertToService ( o )
if err != nil {
2017-08-11 18:45:52 +00:00
level . Error ( e . logger ) . Log ( "msg" , "converting to Service object failed" , "err" , err )
2016-11-14 15:21:38 +00:00
return
}
2016-09-30 08:42:07 +00:00
ep := & apiv1 . Endpoints { }
ep . Namespace = svc . Namespace
ep . Name = svc . Name
obj , exists , err := e . endpointsStore . Get ( ep )
2018-04-09 16:35:14 +00:00
if exists && err == nil {
e . enqueue ( obj . ( * apiv1 . Endpoints ) )
2016-09-30 08:42:07 +00:00
}
2018-04-09 16:35:14 +00:00
2016-09-30 12:18:49 +00:00
if err != nil {
2017-08-11 18:45:52 +00:00
level . Error ( e . logger ) . Log ( "msg" , "retrieving endpoints failed" , "err" , err )
2016-09-30 12:18:49 +00:00
}
2016-09-30 08:42:07 +00:00
}
2016-09-30 12:18:49 +00:00
e . serviceInf . AddEventHandler ( cache . ResourceEventHandlerFuncs {
2016-10-17 09:05:13 +00:00
// TODO(fabxc): potentially remove add and delete event handlers. Those should
// be triggered via the endpoint handlers already.
2016-10-23 11:33:54 +00:00
AddFunc : func ( o interface { } ) {
2020-02-06 15:52:58 +00:00
svcAddCount . Inc ( )
2016-11-21 10:44:48 +00:00
serviceUpdate ( o )
2016-10-23 11:33:54 +00:00
} ,
UpdateFunc : func ( _ , o interface { } ) {
2020-02-06 15:52:58 +00:00
svcUpdateCount . Inc ( )
2016-11-21 10:44:48 +00:00
serviceUpdate ( o )
2016-10-23 11:33:54 +00:00
} ,
DeleteFunc : func ( o interface { } ) {
2020-02-06 15:52:58 +00:00
svcDeleteCount . Inc ( )
2016-11-21 10:44:48 +00:00
serviceUpdate ( o )
2016-10-23 11:33:54 +00:00
} ,
2016-09-30 08:42:07 +00:00
} )
2018-04-09 16:35:14 +00:00
return e
}
func ( e * Endpoints ) enqueue ( obj interface { } ) {
key , err := cache . DeletionHandlingMetaNamespaceKeyFunc ( obj )
if err != nil {
return
}
e . queue . Add ( key )
}
// Run implements the Discoverer interface.
func ( e * Endpoints ) Run ( ctx context . Context , ch chan <- [ ] * targetgroup . Group ) {
defer e . queue . ShutDown ( )
2018-04-10 08:53:00 +00:00
if ! cache . WaitForCacheSync ( ctx . Done ( ) , e . endpointsInf . HasSynced , e . serviceInf . HasSynced , e . podInf . HasSynced ) {
2019-10-09 09:51:38 +00:00
if ctx . Err ( ) != context . Canceled {
level . Error ( e . logger ) . Log ( "msg" , "endpoints informer unable to sync cache" )
}
2018-04-09 16:35:14 +00:00
return
}
go func ( ) {
2018-04-10 08:53:00 +00:00
for e . process ( ctx , ch ) {
2018-04-09 16:35:14 +00:00
}
} ( )
2016-09-30 08:42:07 +00:00
// Block until the target provider is explicitly canceled.
<- ctx . Done ( )
}
2018-04-10 08:53:00 +00:00
func ( e * Endpoints ) process ( ctx context . Context , ch chan <- [ ] * targetgroup . Group ) bool {
2018-04-09 16:35:14 +00:00
keyObj , quit := e . queue . Get ( )
if quit {
return false
}
defer e . queue . Done ( keyObj )
key := keyObj . ( string )
namespace , name , err := cache . SplitMetaNamespaceKey ( key )
if err != nil {
2018-11-27 16:44:29 +00:00
level . Error ( e . logger ) . Log ( "msg" , "splitting key failed" , "key" , key )
2018-04-09 16:35:14 +00:00
return true
}
o , exists , err := e . endpointsStore . GetByKey ( key )
if err != nil {
level . Error ( e . logger ) . Log ( "msg" , "getting object from store failed" , "key" , key )
return true
}
if ! exists {
2020-02-18 16:36:57 +00:00
send ( ctx , ch , & targetgroup . Group { Source : endpointsSourceFromNamespaceAndName ( namespace , name ) } )
2018-04-09 16:35:14 +00:00
return true
}
eps , err := convertToEndpoints ( o )
if err != nil {
level . Error ( e . logger ) . Log ( "msg" , "converting to Endpoints object failed" , "err" , err )
return true
}
2020-02-18 16:36:57 +00:00
send ( ctx , ch , e . buildEndpoints ( eps ) )
2018-04-09 16:35:14 +00:00
return true
}
2016-11-14 15:21:38 +00:00
func convertToEndpoints ( o interface { } ) ( * apiv1 . Endpoints , error ) {
2017-09-04 11:10:44 +00:00
endpoints , ok := o . ( * apiv1 . Endpoints )
if ok {
return endpoints , nil
2016-11-14 15:21:38 +00:00
}
2019-03-25 23:01:12 +00:00
return nil , errors . Errorf ( "received unexpected object: %v" , o )
2016-11-14 15:21:38 +00:00
}
func endpointsSource ( ep * apiv1 . Endpoints ) string {
2018-04-25 16:36:22 +00:00
return endpointsSourceFromNamespaceAndName ( ep . Namespace , ep . Name )
2016-09-30 08:42:07 +00:00
}
2018-04-09 16:35:14 +00:00
func endpointsSourceFromNamespaceAndName ( namespace , name string ) string {
return "endpoints/" + namespace + "/" + name
2016-09-30 08:42:07 +00:00
}
const (
2020-12-10 02:30:30 +00:00
endpointsLabelPrefix = metaLabelPrefix + "endpoints_label_"
endpointsLabelPresentPrefix = metaLabelPrefix + "endpoints_labelpresent_"
2018-03-09 10:07:00 +00:00
endpointsNameLabel = metaLabelPrefix + "endpoints_name"
2019-05-16 07:49:00 +00:00
endpointNodeName = metaLabelPrefix + "endpoint_node_name"
endpointHostname = metaLabelPrefix + "endpoint_hostname"
2018-03-09 10:07:00 +00:00
endpointReadyLabel = metaLabelPrefix + "endpoint_ready"
endpointPortNameLabel = metaLabelPrefix + "endpoint_port_name"
endpointPortProtocolLabel = metaLabelPrefix + "endpoint_port_protocol"
endpointAddressTargetKindLabel = metaLabelPrefix + "endpoint_address_target_kind"
endpointAddressTargetNameLabel = metaLabelPrefix + "endpoint_address_target_name"
2016-09-30 08:42:07 +00:00
)
Refactor SD configuration to remove `config` dependency (#3629)
* refactor: move targetGroup struct and CheckOverflow() to their own package
* refactor: move auth and security related structs to a utility package, fix import error in utility package
* refactor: Azure SD, remove SD struct from config
* refactor: DNS SD, remove SD struct from config into dns package
* refactor: ec2 SD, move SD struct from config into the ec2 package
* refactor: file SD, move SD struct from config to file discovery package
* refactor: gce, move SD struct from config to gce discovery package
* refactor: move HTTPClientConfig and URL into util/config, fix import error in httputil
* refactor: consul, move SD struct from config into consul discovery package
* refactor: marathon, move SD struct from config into marathon discovery package
* refactor: triton, move SD struct from config to triton discovery package, fix test
* refactor: zookeeper, move SD structs from config to zookeeper discovery package
* refactor: openstack, remove SD struct from config, move into openstack discovery package
* refactor: kubernetes, move SD struct from config into kubernetes discovery package
* refactor: notifier, use targetgroup package instead of config
* refactor: tests for file, marathon, triton SD - use targetgroup package instead of config.TargetGroup
* refactor: retrieval, use targetgroup package instead of config.TargetGroup
* refactor: storage, use config util package
* refactor: discovery manager, use targetgroup package instead of config.TargetGroup
* refactor: use HTTPClient and TLS config from configUtil instead of config
* refactor: tests, use targetgroup package instead of config.TargetGroup
* refactor: fix tagetgroup.Group pointers that were removed by mistake
* refactor: openstack, kubernetes: drop prefixes
* refactor: remove import aliases forced due to vscode bug
* refactor: move main SD struct out of config into discovery/config
* refactor: rename configUtil to config_util
* refactor: rename yamlUtil to yaml_config
* refactor: kubernetes, remove prefixes
* refactor: move the TargetGroup package to discovery/
* refactor: fix order of imports
2017-12-29 20:01:34 +00:00
func ( e * Endpoints ) buildEndpoints ( eps * apiv1 . Endpoints ) * targetgroup . Group {
tg := & targetgroup . Group {
2016-11-14 15:21:38 +00:00
Source : endpointsSource ( eps ) ,
2016-09-30 08:42:07 +00:00
}
tg . Labels = model . LabelSet {
namespaceLabel : lv ( eps . Namespace ) ,
endpointsNameLabel : lv ( eps . Name ) ,
}
2016-09-30 12:18:49 +00:00
e . addServiceLabels ( eps . Namespace , eps . Name , tg )
2021-05-31 22:49:29 +00:00
// Add endpoints labels metadata.
2020-12-10 02:30:30 +00:00
for k , v := range eps . Labels {
ln := strutil . SanitizeLabelName ( k )
tg . Labels [ model . LabelName ( endpointsLabelPrefix + ln ) ] = lv ( v )
tg . Labels [ model . LabelName ( endpointsLabelPresentPrefix + ln ) ] = presentValue
}
2016-09-30 08:42:07 +00:00
2016-09-30 12:18:49 +00:00
type podEntry struct {
pod * apiv1 . Pod
servicePorts [ ] apiv1 . EndpointPort
}
seenPods := map [ string ] * podEntry { }
2016-09-30 08:42:07 +00:00
add := func ( addr apiv1 . EndpointAddress , port apiv1 . EndpointPort , ready string ) {
2016-10-17 09:05:13 +00:00
a := net . JoinHostPort ( addr . IP , strconv . FormatUint ( uint64 ( port . Port ) , 10 ) )
2016-09-30 08:42:07 +00:00
2016-09-30 12:18:49 +00:00
target := model . LabelSet {
model . AddressLabel : lv ( a ) ,
endpointPortNameLabel : lv ( port . Name ) ,
endpointPortProtocolLabel : lv ( string ( port . Protocol ) ) ,
endpointReadyLabel : lv ( ready ) ,
}
2018-03-09 10:07:00 +00:00
if addr . TargetRef != nil {
target [ model . LabelName ( endpointAddressTargetKindLabel ) ] = lv ( addr . TargetRef . Kind )
target [ model . LabelName ( endpointAddressTargetNameLabel ) ] = lv ( addr . TargetRef . Name )
}
2019-05-16 07:49:00 +00:00
if addr . NodeName != nil {
target [ model . LabelName ( endpointNodeName ) ] = lv ( * addr . NodeName )
}
if addr . Hostname != "" {
target [ model . LabelName ( endpointHostname ) ] = lv ( addr . Hostname )
}
2016-09-30 12:18:49 +00:00
pod := e . resolvePodRef ( addr . TargetRef )
if pod == nil {
2016-10-14 15:16:06 +00:00
// This target is not a Pod, so don't continue with Pod specific logic.
2016-09-30 12:18:49 +00:00
tg . Targets = append ( tg . Targets , target )
return
}
s := pod . Namespace + "/" + pod . Name
sp , ok := seenPods [ s ]
if ! ok {
sp = & podEntry { pod : pod }
seenPods [ s ] = sp
}
// Attach standard pod labels.
target = target . Merge ( podLabels ( pod ) )
// Attach potential container port labels matching the endpoint port.
for _ , c := range pod . Spec . Containers {
for _ , cport := range c . Ports {
if port . Port == cport . ContainerPort {
2016-10-17 09:05:13 +00:00
ports := strconv . FormatUint ( uint64 ( port . Port ) , 10 )
2016-09-30 12:18:49 +00:00
target [ podContainerNameLabel ] = lv ( c . Name )
2016-10-17 09:05:13 +00:00
target [ podContainerPortNameLabel ] = lv ( cport . Name )
target [ podContainerPortNumberLabel ] = lv ( ports )
2016-09-30 12:18:49 +00:00
target [ podContainerPortProtocolLabel ] = lv ( string ( port . Protocol ) )
break
}
}
}
// Add service port so we know that we have already generated a target
// for it.
sp . servicePorts = append ( sp . servicePorts , port )
tg . Targets = append ( tg . Targets , target )
2016-09-30 08:42:07 +00:00
}
for _ , ss := range eps . Subsets {
for _ , port := range ss . Ports {
for _ , addr := range ss . Addresses {
add ( addr , port , "true" )
}
2016-10-14 15:16:06 +00:00
// Although this generates the same target again, as it was generated in
// the loop above, it causes the ready meta label to be overridden.
2016-09-30 08:42:07 +00:00
for _ , addr := range ss . NotReadyAddresses {
add ( addr , port , "false" )
}
}
}
2021-10-08 20:17:04 +00:00
v := eps . Labels [ apiv1 . EndpointsOverCapacity ]
if v == "truncated" {
level . Warn ( e . logger ) . Log ( "msg" , "Number of endpoints in one Endpoints object exceeds 1000 and has been truncated, please use \"role: endpointslice\" instead" , "endpoint" , eps . Name )
}
if v == "warning" {
level . Warn ( e . logger ) . Log ( "msg" , "Number of endpoints in one Endpoints object exceeds 1000, please use \"role: endpointslice\" instead" , "endpoint" , eps . Name )
}
2016-09-30 12:18:49 +00:00
// For all seen pods, check all container ports. If they were not covered
// by one of the service endpoints, generate targets for them.
for _ , pe := range seenPods {
for _ , c := range pe . pod . Spec . Containers {
for _ , cport := range c . Ports {
hasSeenPort := func ( ) bool {
for _ , eport := range pe . servicePorts {
if cport . ContainerPort == eport . Port {
return true
}
}
return false
}
if hasSeenPort ( ) {
continue
}
2016-10-17 09:05:13 +00:00
a := net . JoinHostPort ( pe . pod . Status . PodIP , strconv . FormatUint ( uint64 ( cport . ContainerPort ) , 10 ) )
ports := strconv . FormatUint ( uint64 ( cport . ContainerPort ) , 10 )
2016-09-30 12:18:49 +00:00
target := model . LabelSet {
model . AddressLabel : lv ( a ) ,
podContainerNameLabel : lv ( c . Name ) ,
podContainerPortNameLabel : lv ( cport . Name ) ,
2016-10-17 09:05:13 +00:00
podContainerPortNumberLabel : lv ( ports ) ,
2016-09-30 12:18:49 +00:00
podContainerPortProtocolLabel : lv ( string ( cport . Protocol ) ) ,
}
tg . Targets = append ( tg . Targets , target . Merge ( podLabels ( pe . pod ) ) )
}
}
}
2016-09-30 08:42:07 +00:00
return tg
}
func ( e * Endpoints ) resolvePodRef ( ref * apiv1 . ObjectReference ) * apiv1 . Pod {
2016-09-30 12:18:49 +00:00
if ref == nil || ref . Kind != "Pod" {
2016-09-30 08:42:07 +00:00
return nil
}
2016-09-30 12:18:49 +00:00
p := & apiv1 . Pod { }
p . Namespace = ref . Namespace
p . Name = ref . Name
obj , exists , err := e . podStore . Get ( p )
if err != nil {
2017-08-11 18:45:52 +00:00
level . Error ( e . logger ) . Log ( "msg" , "resolving pod ref failed" , "err" , err )
2019-05-03 13:11:28 +00:00
return nil
}
if ! exists {
return nil
2016-09-30 12:18:49 +00:00
}
return obj . ( * apiv1 . Pod )
2016-09-30 08:42:07 +00:00
}
Refactor SD configuration to remove `config` dependency (#3629)
* refactor: move targetGroup struct and CheckOverflow() to their own package
* refactor: move auth and security related structs to a utility package, fix import error in utility package
* refactor: Azure SD, remove SD struct from config
* refactor: DNS SD, remove SD struct from config into dns package
* refactor: ec2 SD, move SD struct from config into the ec2 package
* refactor: file SD, move SD struct from config to file discovery package
* refactor: gce, move SD struct from config to gce discovery package
* refactor: move HTTPClientConfig and URL into util/config, fix import error in httputil
* refactor: consul, move SD struct from config into consul discovery package
* refactor: marathon, move SD struct from config into marathon discovery package
* refactor: triton, move SD struct from config to triton discovery package, fix test
* refactor: zookeeper, move SD structs from config to zookeeper discovery package
* refactor: openstack, remove SD struct from config, move into openstack discovery package
* refactor: kubernetes, move SD struct from config into kubernetes discovery package
* refactor: notifier, use targetgroup package instead of config
* refactor: tests for file, marathon, triton SD - use targetgroup package instead of config.TargetGroup
* refactor: retrieval, use targetgroup package instead of config.TargetGroup
* refactor: storage, use config util package
* refactor: discovery manager, use targetgroup package instead of config.TargetGroup
* refactor: use HTTPClient and TLS config from configUtil instead of config
* refactor: tests, use targetgroup package instead of config.TargetGroup
* refactor: fix tagetgroup.Group pointers that were removed by mistake
* refactor: openstack, kubernetes: drop prefixes
* refactor: remove import aliases forced due to vscode bug
* refactor: move main SD struct out of config into discovery/config
* refactor: rename configUtil to config_util
* refactor: rename yamlUtil to yaml_config
* refactor: kubernetes, remove prefixes
* refactor: move the TargetGroup package to discovery/
* refactor: fix order of imports
2017-12-29 20:01:34 +00:00
func ( e * Endpoints ) addServiceLabels ( ns , name string , tg * targetgroup . Group ) {
2016-09-30 08:42:07 +00:00
svc := & apiv1 . Service { }
svc . Namespace = ns
svc . Name = name
2016-09-30 12:18:49 +00:00
obj , exists , err := e . serviceStore . Get ( svc )
if err != nil {
2017-08-11 18:45:52 +00:00
level . Error ( e . logger ) . Log ( "msg" , "retrieving service failed" , "err" , err )
2019-05-03 13:11:28 +00:00
return
}
if ! exists {
return
2016-09-30 12:18:49 +00:00
}
2016-09-30 08:42:07 +00:00
svc = obj . ( * apiv1 . Service )
2016-10-06 14:28:59 +00:00
tg . Labels = tg . Labels . Merge ( serviceLabels ( svc ) )
2016-09-30 08:42:07 +00:00
}