From 7141ece4bfee8cb63a82737a74c14d6340da0bb9 Mon Sep 17 00:00:00 2001 From: Matt Matejczyk Date: Mon, 5 Nov 2018 15:38:50 -0500 Subject: [PATCH] Start exporting the in-cluster network programming latency metric. --- pkg/proxy/endpoints.go | 55 ++++++++++++++-- pkg/proxy/endpoints_test.go | 121 +++++++++++++++++++++++++++++++++- pkg/proxy/iptables/proxier.go | 5 ++ pkg/proxy/ipvs/proxier.go | 5 ++ pkg/proxy/metrics/metrics.go | 21 +++++- 5 files changed, 197 insertions(+), 10 deletions(-) diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index c0992b290f..e42bd3a380 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -21,6 +21,7 @@ import ( "reflect" "strconv" "sync" + "time" "k8s.io/klog" @@ -92,16 +93,20 @@ type EndpointChangeTracker struct { // isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable. isIPv6Mode *bool recorder record.EventRecorder + // Map from the Endpoints namespaced-name to the times of the triggers that caused the endpoints + // object to change. Used to calculate the network-programming-latency. + lastChangeTriggerTimes map[types.NamespacedName][]time.Time } // NewEndpointChangeTracker initializes an EndpointsChangeMap func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, isIPv6Mode *bool, recorder record.EventRecorder) *EndpointChangeTracker { return &EndpointChangeTracker{ - hostname: hostname, - items: make(map[types.NamespacedName]*endpointsChange), - makeEndpointInfo: makeEndpointInfo, - isIPv6Mode: isIPv6Mode, - recorder: recorder, + hostname: hostname, + items: make(map[types.NamespacedName]*endpointsChange), + makeEndpointInfo: makeEndpointInfo, + isIPv6Mode: isIPv6Mode, + recorder: recorder, + lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time), } } @@ -133,14 +138,38 @@ func (ect *EndpointChangeTracker) Update(previous, current *v1.Endpoints) bool { change.previous = ect.endpointsToEndpointsMap(previous) ect.items[namespacedName] = change } + if t := getLastChangeTriggerTime(endpoints); !t.IsZero() { + ect.lastChangeTriggerTimes[namespacedName] = + append(ect.lastChangeTriggerTimes[namespacedName], t) + } change.current = ect.endpointsToEndpointsMap(current) // if change.previous equal to change.current, it means no change if reflect.DeepEqual(change.previous, change.current) { delete(ect.items, namespacedName) + // Reset the lastChangeTriggerTimes for the Endpoints object. Given that the network programming + // SLI is defined as the duration between a time of an event and a time when the network was + // programmed to incorporate that event, if there are events that happened between two + // consecutive syncs and that canceled each other out, e.g. pod A added -> pod A deleted, + // there will be no network programming for them and thus no network programming latency metric + // should be exported. + delete(ect.lastChangeTriggerTimes, namespacedName) } return len(ect.items) > 0 } +// getLastChangeTriggerTime returns the time.Time value of the EndpointsLastChangeTriggerTime +// annotation stored in the given endpoints object or the "zero" time if the annotation wasn't set +// or was set incorrectly. +func getLastChangeTriggerTime(endpoints *v1.Endpoints) time.Time { + val, err := time.Parse(time.RFC3339Nano, endpoints.Annotations[v1.EndpointsLastChangeTriggerTime]) + if err != nil { + klog.Warningf("Error while parsing EndpointsLastChangeTriggerTimeAnnotation: '%s'. Error is %v", + endpoints.Annotations[v1.EndpointsLastChangeTriggerTime], err) + // In case of error val = time.Zero, which is ignored in the upstream code. + } + return val +} + // endpointsChange contains all changes to endpoints that happened since proxy rules were synced. For a single object, // changes are accumulated, i.e. previous is state from before applying the changes, // current is state after applying the changes. @@ -157,14 +186,19 @@ type UpdateEndpointMapResult struct { StaleEndpoints []ServiceEndpoint // StaleServiceNames identifies if a service is stale. StaleServiceNames []ServicePortName + // List of the trigger times for all endpoints objects that changed. It's used to export the + // network programming latency. + LastChangeTriggerTimes []time.Time } // UpdateEndpointsMap updates endpointsMap base on the given changes. func UpdateEndpointsMap(endpointsMap EndpointsMap, changes *EndpointChangeTracker) (result UpdateEndpointMapResult) { result.StaleEndpoints = make([]ServiceEndpoint, 0) result.StaleServiceNames = make([]ServicePortName, 0) + result.LastChangeTriggerTimes = make([]time.Time, 0) - endpointsMap.apply(changes, &result.StaleEndpoints, &result.StaleServiceNames) + endpointsMap.apply( + changes, &result.StaleEndpoints, &result.StaleServiceNames, &result.LastChangeTriggerTimes) // TODO: If this will appear to be computationally expensive, consider // computing this incrementally similarly to endpointsMap. @@ -241,7 +275,10 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint // apply the changes to EndpointsMap and updates stale endpoints and service-endpoints pair. The `staleEndpoints` argument // is passed in to store the stale udp endpoints and `staleServiceNames` argument is passed in to store the stale udp service. // The changes map is cleared after applying them. -func (endpointsMap EndpointsMap) apply(changes *EndpointChangeTracker, staleEndpoints *[]ServiceEndpoint, staleServiceNames *[]ServicePortName) { +// In addition it returns (via argument) and resets the lastChangeTriggerTimes for all endpoints +// that were changed and will result in syncing the proxy rules. +func (endpointsMap EndpointsMap) apply(changes *EndpointChangeTracker, staleEndpoints *[]ServiceEndpoint, + staleServiceNames *[]ServicePortName, lastChangeTriggerTimes *[]time.Time) { if changes == nil { return } @@ -253,6 +290,10 @@ func (endpointsMap EndpointsMap) apply(changes *EndpointChangeTracker, staleEndp detectStaleConnections(change.previous, change.current, staleEndpoints, staleServiceNames) } changes.items = make(map[types.NamespacedName]*endpointsChange) + for _, lastChangeTriggerTime := range changes.lastChangeTriggerTimes { + *lastChangeTriggerTimes = append(*lastChangeTriggerTimes, lastChangeTriggerTime...) + } + changes.lastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time) } // Merge ensures that the current EndpointsMap contains all pairs from the EndpointsMap passed in. diff --git a/pkg/proxy/endpoints_test.go b/pkg/proxy/endpoints_test.go index 24fb61ad96..650aa1da99 100644 --- a/pkg/proxy/endpoints_test.go +++ b/pkg/proxy/endpoints_test.go @@ -18,7 +18,9 @@ package proxy import ( "reflect" + "sort" "testing" + "time" "github.com/davecgh/go-spew/spew" @@ -121,8 +123,9 @@ func TestGetLocalEndpointIPs(t *testing.T) { func makeTestEndpoints(namespace, name string, eptFunc func(*v1.Endpoints)) *v1.Endpoints { ept := &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: namespace, + Name: name, + Namespace: namespace, + Annotations: make(map[string]string), }, } eptFunc(ept) @@ -1268,6 +1271,120 @@ func TestUpdateEndpointsMap(t *testing.T) { } } +func TestLastChangeTriggerTime(t *testing.T) { + t0 := time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC) + t1 := t0.Add(time.Second) + t2 := t1.Add(time.Second) + t3 := t2.Add(time.Second) + + createEndpoints := func(namespace, name string, triggerTime time.Time) *v1.Endpoints { + e := makeTestEndpoints(namespace, name, func(ept *v1.Endpoints) { + ept.Subsets = []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "1.1.1.1"}}, + Ports: []v1.EndpointPort{{Port: 11}}, + }} + }) + e.Annotations[v1.EndpointsLastChangeTriggerTime] = triggerTime.Format(time.RFC3339Nano) + return e + } + + modifyEndpoints := func(endpoints *v1.Endpoints, triggerTime time.Time) *v1.Endpoints { + e := endpoints.DeepCopy() + e.Subsets[0].Ports[0].Port++ + e.Annotations[v1.EndpointsLastChangeTriggerTime] = triggerTime.Format(time.RFC3339Nano) + return e + } + + sortTimeSlice := func(data []time.Time) { + sort.Slice(data, func(i, j int) bool { return data[i].Before(data[j]) }) + } + + testCases := []struct { + name string + scenario func(fp *FakeProxier) + expected []time.Time + }{ + { + name: "Single addEndpoints", + scenario: func(fp *FakeProxier) { + e := createEndpoints("ns", "ep1", t0) + fp.addEndpoints(e) + }, + expected: []time.Time{t0}, + }, + { + name: "addEndpoints then updatedEndpoints", + scenario: func(fp *FakeProxier) { + e := createEndpoints("ns", "ep1", t0) + fp.addEndpoints(e) + + e1 := modifyEndpoints(e, t1) + fp.updateEndpoints(e, e1) + }, + expected: []time.Time{t0, t1}, + }, + { + name: "Add two endpoints then modify one", + scenario: func(fp *FakeProxier) { + e1 := createEndpoints("ns", "ep1", t1) + fp.addEndpoints(e1) + + e2 := createEndpoints("ns", "ep2", t2) + fp.addEndpoints(e2) + + e11 := modifyEndpoints(e1, t3) + fp.updateEndpoints(e1, e11) + }, + expected: []time.Time{t1, t2, t3}, + }, + { + name: "Endpoints without annotation set", + scenario: func(fp *FakeProxier) { + e := createEndpoints("ns", "ep1", t1) + delete(e.Annotations, v1.EndpointsLastChangeTriggerTime) + fp.addEndpoints(e) + }, + expected: []time.Time{}, + }, + { + name: "addEndpoints then deleteEndpoints", + scenario: func(fp *FakeProxier) { + e := createEndpoints("ns", "ep1", t1) + fp.addEndpoints(e) + fp.deleteEndpoints(e) + }, + expected: []time.Time{}, + }, + { + name: "add then delete then add again", + scenario: func(fp *FakeProxier) { + e := createEndpoints("ns", "ep1", t1) + fp.addEndpoints(e) + fp.deleteEndpoints(e) + e = modifyEndpoints(e, t2) + fp.addEndpoints(e) + }, + expected: []time.Time{t2}, + }, + } + + for _, tc := range testCases { + fp := newFakeProxier() + + tc.scenario(fp) + + result := UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges) + got := result.LastChangeTriggerTimes + sortTimeSlice(got) + sortTimeSlice(tc.expected) + + if !reflect.DeepEqual(got, tc.expected) { + t.Errorf("%s: Invalid LastChangeTriggerTimes, expected: %v, got: %v", + tc.name, tc.expected, result.LastChangeTriggerTimes) + } + } +} + func compareEndpointsMaps(t *testing.T, tci int, newMap EndpointsMap, expected map[ServicePortName][]*BaseEndpointInfo) { if len(newMap) != len(expected) { t.Errorf("[%d] expected %d results, got %d: %v", tci, len(expected), len(newMap), newMap) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 7caace0933..2c89eb0b53 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -1341,6 +1341,11 @@ func (proxier *Proxier) syncProxyRules() { utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap) return } + for _, lastChangeTriggerTime := range endpointUpdateResult.LastChangeTriggerTimes { + latency := metrics.SinceInSeconds(lastChangeTriggerTime) + metrics.NetworkProgrammingLatency.Observe(latency) + klog.V(4).Infof("Network programming took %f seconds", latency) + } // Close old local ports and save new ones. for k, v := range proxier.portsMap { diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index ab20e70ca4..c8cf3c35bf 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -1200,6 +1200,11 @@ func (proxier *Proxier) syncProxyRules() { utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap) return } + for _, lastChangeTriggerTime := range endpointUpdateResult.LastChangeTriggerTimes { + latency := metrics.SinceInSeconds(lastChangeTriggerTime) + metrics.NetworkProgrammingLatency.Observe(latency) + klog.V(4).Infof("Network programming took %f seconds", latency) + } // Close old local ports and save new ones. for k, v := range proxier.portsMap { diff --git a/pkg/proxy/metrics/metrics.go b/pkg/proxy/metrics/metrics.go index c5c1d8feda..d8c5ed1cd2 100644 --- a/pkg/proxy/metrics/metrics.go +++ b/pkg/proxy/metrics/metrics.go @@ -45,15 +45,34 @@ var ( Buckets: prometheus.ExponentialBuckets(1000, 2, 15), }, ) + + // NetworkProgrammingLatency is defined as the time it took to program the network - from the time + // the service or pod has changed to the time the change was propagated and the proper kube-proxy + // rules were synced. Exported for each endpoints object that were part of the rules sync. + // See https://github.com/kubernetes/community/blob/master/sig-scalability/slos/network_programming_latency.md + // Note that the metrics is partially based on the time exported by the endpoints controller on + // the master machine. The measurement may be inaccurate if there is a clock drift between the + // node and master machine. + NetworkProgrammingLatency = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Subsystem: kubeProxySubsystem, + Name: "network_programming_latency_seconds", + Help: "In Cluster Network Programming Latency in seconds", + // TODO(mm4tt): Reevaluate buckets before 1.14 release. + // The last bucket will be [0.001s*2^20 ~= 17min, +inf) + Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), + }, + ) ) var registerMetricsOnce sync.Once -// RegisterMetrics registers sync proxy rules latency metrics +// RegisterMetrics registers kube-proxy metrics. func RegisterMetrics() { registerMetricsOnce.Do(func() { prometheus.MustRegister(SyncProxyRulesLatency) prometheus.MustRegister(DeprecatedSyncProxyRulesLatency) + prometheus.MustRegister(NetworkProgrammingLatency) }) }