mirror of https://github.com/k3s-io/k3s
Merge pull request #71999 from mm4tt/kube-proxy
Start exporting the in-cluster network programming latency metric.pull/564/head
commit
41d2445f8e
|
@ -21,6 +21,7 @@ import (
|
|||
"reflect"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/klog"
|
||||
|
||||
|
@ -92,16 +93,20 @@ type EndpointChangeTracker struct {
|
|||
// isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable.
|
||||
isIPv6Mode *bool
|
||||
recorder record.EventRecorder
|
||||
// Map from the Endpoints namespaced-name to the times of the triggers that caused the endpoints
|
||||
// object to change. Used to calculate the network-programming-latency.
|
||||
lastChangeTriggerTimes map[types.NamespacedName][]time.Time
|
||||
}
|
||||
|
||||
// NewEndpointChangeTracker initializes an EndpointsChangeMap
|
||||
func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, isIPv6Mode *bool, recorder record.EventRecorder) *EndpointChangeTracker {
|
||||
return &EndpointChangeTracker{
|
||||
hostname: hostname,
|
||||
items: make(map[types.NamespacedName]*endpointsChange),
|
||||
makeEndpointInfo: makeEndpointInfo,
|
||||
isIPv6Mode: isIPv6Mode,
|
||||
recorder: recorder,
|
||||
hostname: hostname,
|
||||
items: make(map[types.NamespacedName]*endpointsChange),
|
||||
makeEndpointInfo: makeEndpointInfo,
|
||||
isIPv6Mode: isIPv6Mode,
|
||||
recorder: recorder,
|
||||
lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -133,14 +138,38 @@ func (ect *EndpointChangeTracker) Update(previous, current *v1.Endpoints) bool {
|
|||
change.previous = ect.endpointsToEndpointsMap(previous)
|
||||
ect.items[namespacedName] = change
|
||||
}
|
||||
if t := getLastChangeTriggerTime(endpoints); !t.IsZero() {
|
||||
ect.lastChangeTriggerTimes[namespacedName] =
|
||||
append(ect.lastChangeTriggerTimes[namespacedName], t)
|
||||
}
|
||||
change.current = ect.endpointsToEndpointsMap(current)
|
||||
// if change.previous equal to change.current, it means no change
|
||||
if reflect.DeepEqual(change.previous, change.current) {
|
||||
delete(ect.items, namespacedName)
|
||||
// Reset the lastChangeTriggerTimes for the Endpoints object. Given that the network programming
|
||||
// SLI is defined as the duration between a time of an event and a time when the network was
|
||||
// programmed to incorporate that event, if there are events that happened between two
|
||||
// consecutive syncs and that canceled each other out, e.g. pod A added -> pod A deleted,
|
||||
// there will be no network programming for them and thus no network programming latency metric
|
||||
// should be exported.
|
||||
delete(ect.lastChangeTriggerTimes, namespacedName)
|
||||
}
|
||||
return len(ect.items) > 0
|
||||
}
|
||||
|
||||
// getLastChangeTriggerTime returns the time.Time value of the EndpointsLastChangeTriggerTime
|
||||
// annotation stored in the given endpoints object or the "zero" time if the annotation wasn't set
|
||||
// or was set incorrectly.
|
||||
func getLastChangeTriggerTime(endpoints *v1.Endpoints) time.Time {
|
||||
val, err := time.Parse(time.RFC3339Nano, endpoints.Annotations[v1.EndpointsLastChangeTriggerTime])
|
||||
if err != nil {
|
||||
klog.Warningf("Error while parsing EndpointsLastChangeTriggerTimeAnnotation: '%s'. Error is %v",
|
||||
endpoints.Annotations[v1.EndpointsLastChangeTriggerTime], err)
|
||||
// In case of error val = time.Zero, which is ignored in the upstream code.
|
||||
}
|
||||
return val
|
||||
}
|
||||
|
||||
// endpointsChange contains all changes to endpoints that happened since proxy rules were synced. For a single object,
|
||||
// changes are accumulated, i.e. previous is state from before applying the changes,
|
||||
// current is state after applying the changes.
|
||||
|
@ -157,14 +186,19 @@ type UpdateEndpointMapResult struct {
|
|||
StaleEndpoints []ServiceEndpoint
|
||||
// StaleServiceNames identifies if a service is stale.
|
||||
StaleServiceNames []ServicePortName
|
||||
// List of the trigger times for all endpoints objects that changed. It's used to export the
|
||||
// network programming latency.
|
||||
LastChangeTriggerTimes []time.Time
|
||||
}
|
||||
|
||||
// UpdateEndpointsMap updates endpointsMap base on the given changes.
|
||||
func UpdateEndpointsMap(endpointsMap EndpointsMap, changes *EndpointChangeTracker) (result UpdateEndpointMapResult) {
|
||||
result.StaleEndpoints = make([]ServiceEndpoint, 0)
|
||||
result.StaleServiceNames = make([]ServicePortName, 0)
|
||||
result.LastChangeTriggerTimes = make([]time.Time, 0)
|
||||
|
||||
endpointsMap.apply(changes, &result.StaleEndpoints, &result.StaleServiceNames)
|
||||
endpointsMap.apply(
|
||||
changes, &result.StaleEndpoints, &result.StaleServiceNames, &result.LastChangeTriggerTimes)
|
||||
|
||||
// TODO: If this will appear to be computationally expensive, consider
|
||||
// computing this incrementally similarly to endpointsMap.
|
||||
|
@ -241,7 +275,10 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint
|
|||
// apply the changes to EndpointsMap and updates stale endpoints and service-endpoints pair. The `staleEndpoints` argument
|
||||
// is passed in to store the stale udp endpoints and `staleServiceNames` argument is passed in to store the stale udp service.
|
||||
// The changes map is cleared after applying them.
|
||||
func (endpointsMap EndpointsMap) apply(changes *EndpointChangeTracker, staleEndpoints *[]ServiceEndpoint, staleServiceNames *[]ServicePortName) {
|
||||
// In addition it returns (via argument) and resets the lastChangeTriggerTimes for all endpoints
|
||||
// that were changed and will result in syncing the proxy rules.
|
||||
func (endpointsMap EndpointsMap) apply(changes *EndpointChangeTracker, staleEndpoints *[]ServiceEndpoint,
|
||||
staleServiceNames *[]ServicePortName, lastChangeTriggerTimes *[]time.Time) {
|
||||
if changes == nil {
|
||||
return
|
||||
}
|
||||
|
@ -253,6 +290,10 @@ func (endpointsMap EndpointsMap) apply(changes *EndpointChangeTracker, staleEndp
|
|||
detectStaleConnections(change.previous, change.current, staleEndpoints, staleServiceNames)
|
||||
}
|
||||
changes.items = make(map[types.NamespacedName]*endpointsChange)
|
||||
for _, lastChangeTriggerTime := range changes.lastChangeTriggerTimes {
|
||||
*lastChangeTriggerTimes = append(*lastChangeTriggerTimes, lastChangeTriggerTime...)
|
||||
}
|
||||
changes.lastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time)
|
||||
}
|
||||
|
||||
// Merge ensures that the current EndpointsMap contains all <service, endpoints> pairs from the EndpointsMap passed in.
|
||||
|
|
|
@ -18,7 +18,9 @@ package proxy
|
|||
|
||||
import (
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
|
||||
|
@ -121,8 +123,9 @@ func TestGetLocalEndpointIPs(t *testing.T) {
|
|||
func makeTestEndpoints(namespace, name string, eptFunc func(*v1.Endpoints)) *v1.Endpoints {
|
||||
ept := &v1.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: namespace,
|
||||
Name: name,
|
||||
Namespace: namespace,
|
||||
Annotations: make(map[string]string),
|
||||
},
|
||||
}
|
||||
eptFunc(ept)
|
||||
|
@ -1268,6 +1271,120 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestLastChangeTriggerTime(t *testing.T) {
|
||||
t0 := time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC)
|
||||
t1 := t0.Add(time.Second)
|
||||
t2 := t1.Add(time.Second)
|
||||
t3 := t2.Add(time.Second)
|
||||
|
||||
createEndpoints := func(namespace, name string, triggerTime time.Time) *v1.Endpoints {
|
||||
e := makeTestEndpoints(namespace, name, func(ept *v1.Endpoints) {
|
||||
ept.Subsets = []v1.EndpointSubset{{
|
||||
Addresses: []v1.EndpointAddress{{IP: "1.1.1.1"}},
|
||||
Ports: []v1.EndpointPort{{Port: 11}},
|
||||
}}
|
||||
})
|
||||
e.Annotations[v1.EndpointsLastChangeTriggerTime] = triggerTime.Format(time.RFC3339Nano)
|
||||
return e
|
||||
}
|
||||
|
||||
modifyEndpoints := func(endpoints *v1.Endpoints, triggerTime time.Time) *v1.Endpoints {
|
||||
e := endpoints.DeepCopy()
|
||||
e.Subsets[0].Ports[0].Port++
|
||||
e.Annotations[v1.EndpointsLastChangeTriggerTime] = triggerTime.Format(time.RFC3339Nano)
|
||||
return e
|
||||
}
|
||||
|
||||
sortTimeSlice := func(data []time.Time) {
|
||||
sort.Slice(data, func(i, j int) bool { return data[i].Before(data[j]) })
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
scenario func(fp *FakeProxier)
|
||||
expected []time.Time
|
||||
}{
|
||||
{
|
||||
name: "Single addEndpoints",
|
||||
scenario: func(fp *FakeProxier) {
|
||||
e := createEndpoints("ns", "ep1", t0)
|
||||
fp.addEndpoints(e)
|
||||
},
|
||||
expected: []time.Time{t0},
|
||||
},
|
||||
{
|
||||
name: "addEndpoints then updatedEndpoints",
|
||||
scenario: func(fp *FakeProxier) {
|
||||
e := createEndpoints("ns", "ep1", t0)
|
||||
fp.addEndpoints(e)
|
||||
|
||||
e1 := modifyEndpoints(e, t1)
|
||||
fp.updateEndpoints(e, e1)
|
||||
},
|
||||
expected: []time.Time{t0, t1},
|
||||
},
|
||||
{
|
||||
name: "Add two endpoints then modify one",
|
||||
scenario: func(fp *FakeProxier) {
|
||||
e1 := createEndpoints("ns", "ep1", t1)
|
||||
fp.addEndpoints(e1)
|
||||
|
||||
e2 := createEndpoints("ns", "ep2", t2)
|
||||
fp.addEndpoints(e2)
|
||||
|
||||
e11 := modifyEndpoints(e1, t3)
|
||||
fp.updateEndpoints(e1, e11)
|
||||
},
|
||||
expected: []time.Time{t1, t2, t3},
|
||||
},
|
||||
{
|
||||
name: "Endpoints without annotation set",
|
||||
scenario: func(fp *FakeProxier) {
|
||||
e := createEndpoints("ns", "ep1", t1)
|
||||
delete(e.Annotations, v1.EndpointsLastChangeTriggerTime)
|
||||
fp.addEndpoints(e)
|
||||
},
|
||||
expected: []time.Time{},
|
||||
},
|
||||
{
|
||||
name: "addEndpoints then deleteEndpoints",
|
||||
scenario: func(fp *FakeProxier) {
|
||||
e := createEndpoints("ns", "ep1", t1)
|
||||
fp.addEndpoints(e)
|
||||
fp.deleteEndpoints(e)
|
||||
},
|
||||
expected: []time.Time{},
|
||||
},
|
||||
{
|
||||
name: "add then delete then add again",
|
||||
scenario: func(fp *FakeProxier) {
|
||||
e := createEndpoints("ns", "ep1", t1)
|
||||
fp.addEndpoints(e)
|
||||
fp.deleteEndpoints(e)
|
||||
e = modifyEndpoints(e, t2)
|
||||
fp.addEndpoints(e)
|
||||
},
|
||||
expected: []time.Time{t2},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
fp := newFakeProxier()
|
||||
|
||||
tc.scenario(fp)
|
||||
|
||||
result := UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges)
|
||||
got := result.LastChangeTriggerTimes
|
||||
sortTimeSlice(got)
|
||||
sortTimeSlice(tc.expected)
|
||||
|
||||
if !reflect.DeepEqual(got, tc.expected) {
|
||||
t.Errorf("%s: Invalid LastChangeTriggerTimes, expected: %v, got: %v",
|
||||
tc.name, tc.expected, result.LastChangeTriggerTimes)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func compareEndpointsMaps(t *testing.T, tci int, newMap EndpointsMap, expected map[ServicePortName][]*BaseEndpointInfo) {
|
||||
if len(newMap) != len(expected) {
|
||||
t.Errorf("[%d] expected %d results, got %d: %v", tci, len(expected), len(newMap), newMap)
|
||||
|
|
|
@ -1341,6 +1341,11 @@ func (proxier *Proxier) syncProxyRules() {
|
|||
utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
|
||||
return
|
||||
}
|
||||
for _, lastChangeTriggerTime := range endpointUpdateResult.LastChangeTriggerTimes {
|
||||
latency := metrics.SinceInSeconds(lastChangeTriggerTime)
|
||||
metrics.NetworkProgrammingLatency.Observe(latency)
|
||||
klog.V(4).Infof("Network programming took %f seconds", latency)
|
||||
}
|
||||
|
||||
// Close old local ports and save new ones.
|
||||
for k, v := range proxier.portsMap {
|
||||
|
|
|
@ -1200,6 +1200,11 @@ func (proxier *Proxier) syncProxyRules() {
|
|||
utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
|
||||
return
|
||||
}
|
||||
for _, lastChangeTriggerTime := range endpointUpdateResult.LastChangeTriggerTimes {
|
||||
latency := metrics.SinceInSeconds(lastChangeTriggerTime)
|
||||
metrics.NetworkProgrammingLatency.Observe(latency)
|
||||
klog.V(4).Infof("Network programming took %f seconds", latency)
|
||||
}
|
||||
|
||||
// Close old local ports and save new ones.
|
||||
for k, v := range proxier.portsMap {
|
||||
|
|
|
@ -45,15 +45,34 @@ var (
|
|||
Buckets: prometheus.ExponentialBuckets(1000, 2, 15),
|
||||
},
|
||||
)
|
||||
|
||||
// NetworkProgrammingLatency is defined as the time it took to program the network - from the time
|
||||
// the service or pod has changed to the time the change was propagated and the proper kube-proxy
|
||||
// rules were synced. Exported for each endpoints object that were part of the rules sync.
|
||||
// See https://github.com/kubernetes/community/blob/master/sig-scalability/slos/network_programming_latency.md
|
||||
// Note that the metrics is partially based on the time exported by the endpoints controller on
|
||||
// the master machine. The measurement may be inaccurate if there is a clock drift between the
|
||||
// node and master machine.
|
||||
NetworkProgrammingLatency = prometheus.NewHistogram(
|
||||
prometheus.HistogramOpts{
|
||||
Subsystem: kubeProxySubsystem,
|
||||
Name: "network_programming_latency_seconds",
|
||||
Help: "In Cluster Network Programming Latency in seconds",
|
||||
// TODO(mm4tt): Reevaluate buckets before 1.14 release.
|
||||
// The last bucket will be [0.001s*2^20 ~= 17min, +inf)
|
||||
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20),
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
var registerMetricsOnce sync.Once
|
||||
|
||||
// RegisterMetrics registers sync proxy rules latency metrics
|
||||
// RegisterMetrics registers kube-proxy metrics.
|
||||
func RegisterMetrics() {
|
||||
registerMetricsOnce.Do(func() {
|
||||
prometheus.MustRegister(SyncProxyRulesLatency)
|
||||
prometheus.MustRegister(DeprecatedSyncProxyRulesLatency)
|
||||
prometheus.MustRegister(NetworkProgrammingLatency)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue