From b485f7b5b425ba6cf18b19d70c29c8f4a17f84fa Mon Sep 17 00:00:00 2001 From: Zihong Zheng Date: Fri, 16 Feb 2018 19:09:33 -0800 Subject: [PATCH] [kube-proxy] Move Service/EndpointInfo common codes to change tracker --- pkg/proxy/endpoints.go | 79 ++++++- pkg/proxy/endpoints_test.go | 334 +++++++++++++---------------- pkg/proxy/iptables/proxier.go | 205 +++++------------- pkg/proxy/iptables/proxier_test.go | 173 +++++++-------- pkg/proxy/ipvs/proxier.go | 256 ++++++---------------- pkg/proxy/ipvs/proxier_test.go | 236 ++++++++++---------- pkg/proxy/service.go | 129 +++++++++-- pkg/proxy/service_test.go | 112 +++------- pkg/proxy/types.go | 21 +- 9 files changed, 701 insertions(+), 844 deletions(-) diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index 62f3aabbbd..8f1ae4260f 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -17,16 +17,63 @@ limitations under the License. package proxy import ( + "net" "reflect" + "strconv" "sync" "github.com/golang/glog" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/record" api "k8s.io/kubernetes/pkg/apis/core" + utilproxy "k8s.io/kubernetes/pkg/proxy/util" ) +// EndpointInfoCommon contains common endpoint information. +type EndpointInfoCommon struct { + Endpoint string // TODO: should be an endpointString type + // IsLocal indicates whether the endpoint is running in same host as kube-proxy. + IsLocal bool +} + +var _ Endpoint = &EndpointInfoCommon{} + +// String is part of proxy.Endpoint interface. +func (info *EndpointInfoCommon) String() string { + return info.Endpoint +} + +// IsLocal is part of proxy.Endpoint interface. +func (info *EndpointInfoCommon) GetIsLocal() bool { + return info.IsLocal +} + +// IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface. +func (info *EndpointInfoCommon) IP() string { + return utilproxy.IPPart(info.Endpoint) +} + +// Port returns just the Port part of the endpoint. +func (info *EndpointInfoCommon) Port() (int, error) { + return utilproxy.PortPart(info.Endpoint) +} + +// Equal is part of proxy.Endpoint interface. +func (info *EndpointInfoCommon) Equal(other Endpoint) bool { + return info.String() == other.String() && info.GetIsLocal() == other.GetIsLocal() +} + +func newEndpointInfoCommon(IP string, port int, isLocal bool) *EndpointInfoCommon { + return &EndpointInfoCommon{ + Endpoint: net.JoinHostPort(IP, strconv.Itoa(port)), + IsLocal: isLocal, + } +} + +type customizeEndpointInfoFunc func(IP string, port int, isLocal bool, info *EndpointInfoCommon) Endpoint + // EndpointChangeTracker carries state about uncommitted changes to an arbitrary number of // Endpoints, keyed by their namespace and name. type EndpointChangeTracker struct { @@ -36,13 +83,21 @@ type EndpointChangeTracker struct { hostname string // items maps a service to is endpointsChange. items map[types.NamespacedName]*endpointsChange + // customizeEndpointInfo allows proxier to inject customized infomation when processing endpoint. + customizeEndpointInfo customizeEndpointInfoFunc + // isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable. + isIPv6Mode *bool + recorder record.EventRecorder } // NewEndpointChangeTracker initializes an EndpointsChangeMap -func NewEndpointChangeTracker(hostname string) *EndpointChangeTracker { +func NewEndpointChangeTracker(hostname string, customizeEndpointInfo customizeEndpointInfoFunc, isIPv6Mode *bool, recorder record.EventRecorder) *EndpointChangeTracker { return &EndpointChangeTracker{ hostname: hostname, items: make(map[types.NamespacedName]*endpointsChange), + customizeEndpointInfo: customizeEndpointInfo, + isIPv6Mode: isIPv6Mode, + recorder: recorder, } } @@ -54,7 +109,7 @@ func NewEndpointChangeTracker(hostname string) *EndpointChangeTracker { // - pass as the pair. // Delete item // - pass as the pair. -func (ect *EndpointChangeTracker) Update(previous, current *api.Endpoints, makeEndpoints func(IP string, port int, isLocal bool) Endpoint) bool { +func (ect *EndpointChangeTracker) Update(previous, current *api.Endpoints) bool { endpoints := current if endpoints == nil { endpoints = previous @@ -71,10 +126,10 @@ func (ect *EndpointChangeTracker) Update(previous, current *api.Endpoints, makeE change, exists := ect.items[namespacedName] if !exists { change = &endpointsChange{} - change.previous = endpointsToEndpointsMap(previous, ect.hostname, makeEndpoints) + change.previous = ect.endpointsToEndpointsMap(previous) ect.items[namespacedName] = change } - change.current = endpointsToEndpointsMap(current, ect.hostname, makeEndpoints) + change.current = ect.endpointsToEndpointsMap(current) // if change.previous equal to change.current, it means no change if reflect.DeepEqual(change.previous, change.current) { delete(ect.items, namespacedName) @@ -118,14 +173,14 @@ func UpdateEndpointsMap(endpointsMap EndpointsMap, changes *EndpointChangeTracke return result } -// EndpointsMap maps a service to one of its endpoint. +// EndpointsMap maps a service to one of its Endpoint. type EndpointsMap map[ServicePortName][]Endpoint // endpointsToEndpointsMap translates single Endpoints object to EndpointsMap. // This function is used for incremental updated of endpointsMap. // // NOTE: endpoints object should NOT be modified. -func endpointsToEndpointsMap(endpoints *api.Endpoints, hostname string, makeEndpoints func(IP string, port int, isLocal bool) Endpoint) EndpointsMap { +func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *api.Endpoints) EndpointsMap { if endpoints == nil { return nil } @@ -151,9 +206,13 @@ func endpointsToEndpointsMap(endpoints *api.Endpoints, hostname string, makeEndp glog.Warningf("ignoring invalid endpoint port %s with empty host", port.Name) continue } - isLocal := addr.NodeName != nil && *addr.NodeName == hostname - epInfo := makeEndpoints(addr.IP, int(port.Port), isLocal) - endpointsMap[svcPortName] = append(endpointsMap[svcPortName], epInfo) + isLocal := addr.NodeName != nil && *addr.NodeName == ect.hostname + epInfoCommon := newEndpointInfoCommon(addr.IP, int(port.Port), isLocal) + if ect.customizeEndpointInfo != nil { + endpointsMap[svcPortName] = append(endpointsMap[svcPortName], ect.customizeEndpointInfo(addr.IP, int(port.Port), isLocal, epInfoCommon)) + } else { + endpointsMap[svcPortName] = append(endpointsMap[svcPortName], epInfoCommon) + } } if glog.V(3) { newEPList := []string{} @@ -203,7 +262,7 @@ func GetLocalEndpointIPs(endpointsMap EndpointsMap) map[types.NamespacedName]set localIPs := make(map[types.NamespacedName]sets.String) for svcPortName, epList := range endpointsMap { for _, ep := range epList { - if ep.IsLocal() { + if ep.GetIsLocal() { nsn := svcPortName.NamespacedName if localIPs[nsn] == nil { localIPs[nsn] = sets.NewString() diff --git a/pkg/proxy/endpoints_test.go b/pkg/proxy/endpoints_test.go index 76c68e4791..664a804c64 100644 --- a/pkg/proxy/endpoints_test.go +++ b/pkg/proxy/endpoints_test.go @@ -17,9 +17,7 @@ limitations under the License. package proxy import ( - "net" "reflect" - "strconv" "testing" "github.com/davecgh/go-spew/spew" @@ -30,48 +28,16 @@ import ( api "k8s.io/kubernetes/pkg/apis/core" ) -type fakeEndpointsInfo struct { - endpoint string - isLocal bool -} - -func newFakeEndpointsInfo(IP string, port int, isLocal bool) Endpoint { - return &fakeEndpointsInfo{ - endpoint: net.JoinHostPort(IP, strconv.Itoa(port)), - isLocal: isLocal, - } -} - -func (f *fakeEndpointsInfo) String() string { - return f.endpoint -} - -func (f *fakeEndpointsInfo) IsLocal() bool { - return f.isLocal -} - -func (f *fakeEndpointsInfo) IP() string { - // Must be IP:port - host, _, _ := net.SplitHostPort(f.endpoint) - return host -} - -func (f *fakeEndpointsInfo) Equal(other Endpoint) bool { - return f.String() == other.String() && - f.IsLocal() == other.IsLocal() && - f.IP() == other.IP() -} - func (proxier *FakeProxier) addEndpoints(endpoints *api.Endpoints) { - proxier.endpointsChanges.Update(nil, endpoints, newFakeEndpointsInfo) + proxier.endpointsChanges.Update(nil, endpoints) } func (proxier *FakeProxier) updateEndpoints(oldEndpoints, endpoints *api.Endpoints) { - proxier.endpointsChanges.Update(oldEndpoints, endpoints, newFakeEndpointsInfo) + proxier.endpointsChanges.Update(oldEndpoints, endpoints) } func (proxier *FakeProxier) deleteEndpoints(endpoints *api.Endpoints) { - proxier.endpointsChanges.Update(endpoints, nil, newFakeEndpointsInfo) + proxier.endpointsChanges.Update(endpoints, nil) } func TestGetLocalEndpointIPs(t *testing.T) { @@ -86,7 +52,7 @@ func TestGetLocalEndpointIPs(t *testing.T) { // Case[1]: unnamed port endpointsMap: EndpointsMap{ makeServicePortName("ns1", "ep1", ""): []Endpoint{ - &fakeEndpointsInfo{endpoint: "1.1.1.1:11", isLocal: false}, + &EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: false}, }, }, expected: map[types.NamespacedName]sets.String{}, @@ -94,7 +60,7 @@ func TestGetLocalEndpointIPs(t *testing.T) { // Case[2]: unnamed port local endpointsMap: EndpointsMap{ makeServicePortName("ns1", "ep1", ""): []Endpoint{ - &fakeEndpointsInfo{endpoint: "1.1.1.1:11", isLocal: true}, + &EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: true}, }, }, expected: map[types.NamespacedName]sets.String{ @@ -104,12 +70,12 @@ func TestGetLocalEndpointIPs(t *testing.T) { // Case[3]: named local and non-local ports for the same IP. endpointsMap: EndpointsMap{ makeServicePortName("ns1", "ep1", "p11"): []Endpoint{ - &fakeEndpointsInfo{endpoint: "1.1.1.1:11", isLocal: false}, - &fakeEndpointsInfo{endpoint: "1.1.1.2:11", isLocal: true}, + &EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: false}, + &EndpointInfoCommon{Endpoint: "1.1.1.2:11", IsLocal: true}, }, makeServicePortName("ns1", "ep1", "p12"): []Endpoint{ - &fakeEndpointsInfo{endpoint: "1.1.1.1:12", isLocal: false}, - &fakeEndpointsInfo{endpoint: "1.1.1.2:12", isLocal: true}, + &EndpointInfoCommon{Endpoint: "1.1.1.1:12", IsLocal: false}, + &EndpointInfoCommon{Endpoint: "1.1.1.2:12", IsLocal: true}, }, }, expected: map[types.NamespacedName]sets.String{ @@ -119,21 +85,21 @@ func TestGetLocalEndpointIPs(t *testing.T) { // Case[4]: named local and non-local ports for different IPs. endpointsMap: EndpointsMap{ makeServicePortName("ns1", "ep1", "p11"): []Endpoint{ - &fakeEndpointsInfo{endpoint: "1.1.1.1:11", isLocal: false}, + &EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: false}, }, makeServicePortName("ns2", "ep2", "p22"): []Endpoint{ - &fakeEndpointsInfo{endpoint: "2.2.2.2:22", isLocal: true}, - &fakeEndpointsInfo{endpoint: "2.2.2.22:22", isLocal: true}, + &EndpointInfoCommon{Endpoint: "2.2.2.2:22", IsLocal: true}, + &EndpointInfoCommon{Endpoint: "2.2.2.22:22", IsLocal: true}, }, makeServicePortName("ns2", "ep2", "p23"): []Endpoint{ - &fakeEndpointsInfo{endpoint: "2.2.2.3:23", isLocal: true}, + &EndpointInfoCommon{Endpoint: "2.2.2.3:23", IsLocal: true}, }, makeServicePortName("ns4", "ep4", "p44"): []Endpoint{ - &fakeEndpointsInfo{endpoint: "4.4.4.4:44", isLocal: true}, - &fakeEndpointsInfo{endpoint: "4.4.4.5:44", isLocal: false}, + &EndpointInfoCommon{Endpoint: "4.4.4.4:44", IsLocal: true}, + &EndpointInfoCommon{Endpoint: "4.4.4.5:44", IsLocal: false}, }, makeServicePortName("ns4", "ep4", "p45"): []Endpoint{ - &fakeEndpointsInfo{endpoint: "4.4.4.6:45", isLocal: true}, + &EndpointInfoCommon{Endpoint: "4.4.4.6:45", IsLocal: true}, }, }, expected: map[types.NamespacedName]sets.String{ @@ -164,14 +130,16 @@ func makeTestEndpoints(namespace, name string, eptFunc func(*api.Endpoints)) *ap } // This is a coarse test, but it offers some modicum of confidence as the code is evolved. -func Test_endpointsToEndpointsMap(t *testing.T) { +func TestEndpointsToEndpointsMap(t *testing.T) { + epTracker := NewEndpointChangeTracker("test-hostname", nil, nil, nil) + testCases := []struct { newEndpoints *api.Endpoints - expected map[ServicePortName][]*fakeEndpointsInfo + expected map[ServicePortName][]*EndpointInfoCommon }{{ // Case[0]: nothing newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), - expected: map[ServicePortName][]*fakeEndpointsInfo{}, + expected: map[ServicePortName][]*EndpointInfoCommon{}, }, { // Case[1]: no changes, unnamed port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { @@ -187,9 +155,9 @@ func Test_endpointsToEndpointsMap(t *testing.T) { }, } }), - expected: map[ServicePortName][]*fakeEndpointsInfo{ + expected: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", ""): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, }, }, { @@ -207,9 +175,9 @@ func Test_endpointsToEndpointsMap(t *testing.T) { }, } }), - expected: map[ServicePortName][]*fakeEndpointsInfo{ + expected: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "port"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, }, }, { @@ -226,15 +194,15 @@ func Test_endpointsToEndpointsMap(t *testing.T) { }, } }), - expected: map[ServicePortName][]*fakeEndpointsInfo{ + expected: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", ""): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, }, }, { // Case[4]: remove port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), - expected: map[ServicePortName][]*fakeEndpointsInfo{}, + expected: map[ServicePortName][]*EndpointInfoCommon{}, }, { // Case[5]: new IP and port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { @@ -255,14 +223,14 @@ func Test_endpointsToEndpointsMap(t *testing.T) { }, } }), - expected: map[ServicePortName][]*fakeEndpointsInfo{ + expected: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p1"): { - {endpoint: "1.1.1.1:11", isLocal: false}, - {endpoint: "2.2.2.2:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, + {Endpoint: "2.2.2.2:11", IsLocal: false}, }, makeServicePortName("ns1", "ep1", "p2"): { - {endpoint: "1.1.1.1:22", isLocal: false}, - {endpoint: "2.2.2.2:22", isLocal: false}, + {Endpoint: "1.1.1.1:22", IsLocal: false}, + {Endpoint: "2.2.2.2:22", IsLocal: false}, }, }, }, { @@ -280,9 +248,9 @@ func Test_endpointsToEndpointsMap(t *testing.T) { }, } }), - expected: map[ServicePortName][]*fakeEndpointsInfo{ + expected: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p1"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, }, }, { @@ -300,9 +268,9 @@ func Test_endpointsToEndpointsMap(t *testing.T) { }, } }), - expected: map[ServicePortName][]*fakeEndpointsInfo{ + expected: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p2"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, }, }, { @@ -320,16 +288,16 @@ func Test_endpointsToEndpointsMap(t *testing.T) { }, } }), - expected: map[ServicePortName][]*fakeEndpointsInfo{ + expected: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p1"): { - {endpoint: "1.1.1.1:22", isLocal: false}, + {Endpoint: "1.1.1.1:22", IsLocal: false}, }, }, }} for tci, tc := range testCases { // outputs - newEndpoints := endpointsToEndpointsMap(tc.newEndpoints, "host", newFakeEndpointsInfo) + newEndpoints := epTracker.endpointsToEndpointsMap(tc.newEndpoints) if len(newEndpoints) != len(tc.expected) { t.Errorf("[%d] expected %d new, got %d: %v", tci, len(tc.expected), len(newEndpoints), spew.Sdump(newEndpoints)) @@ -339,7 +307,7 @@ func Test_endpointsToEndpointsMap(t *testing.T) { t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(tc.expected[x]), x, len(newEndpoints[x])) } else { for i := range newEndpoints[x] { - ep := newEndpoints[x][i].(*fakeEndpointsInfo) + ep := newEndpoints[x][i].(*EndpointInfoCommon) if *ep != *(tc.expected[x][i]) { t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, tc.expected[x][i], *ep) } @@ -661,15 +629,15 @@ func TestUpdateEndpointsMap(t *testing.T) { // or non-nil) and must be of equal length. previousEndpoints []*api.Endpoints currentEndpoints []*api.Endpoints - oldEndpoints map[ServicePortName][]*fakeEndpointsInfo - expectedResult map[ServicePortName][]*fakeEndpointsInfo + oldEndpoints map[ServicePortName][]*EndpointInfoCommon + expectedResult map[ServicePortName][]*EndpointInfoCommon expectedStaleEndpoints []ServiceEndpoint expectedStaleServiceNames map[ServicePortName]bool expectedHealthchecks map[types.NamespacedName]int }{{ // Case[0]: nothing - oldEndpoints: map[ServicePortName][]*fakeEndpointsInfo{}, - expectedResult: map[ServicePortName][]*fakeEndpointsInfo{}, + oldEndpoints: map[ServicePortName][]*EndpointInfoCommon{}, + expectedResult: map[ServicePortName][]*EndpointInfoCommon{}, expectedStaleEndpoints: []ServiceEndpoint{}, expectedStaleServiceNames: map[ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, @@ -681,14 +649,14 @@ func TestUpdateEndpointsMap(t *testing.T) { currentEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", unnamedPort), }, - oldEndpoints: map[ServicePortName][]*fakeEndpointsInfo{ + oldEndpoints: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", ""): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, }, - expectedResult: map[ServicePortName][]*fakeEndpointsInfo{ + expectedResult: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", ""): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, }, expectedStaleEndpoints: []ServiceEndpoint{}, @@ -702,14 +670,14 @@ func TestUpdateEndpointsMap(t *testing.T) { currentEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", namedPortLocal), }, - oldEndpoints: map[ServicePortName][]*fakeEndpointsInfo{ + oldEndpoints: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: true}, + {Endpoint: "1.1.1.1:11", IsLocal: true}, }, }, - expectedResult: map[ServicePortName][]*fakeEndpointsInfo{ + expectedResult: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: true}, + {Endpoint: "1.1.1.1:11", IsLocal: true}, }, }, expectedStaleEndpoints: []ServiceEndpoint{}, @@ -725,20 +693,20 @@ func TestUpdateEndpointsMap(t *testing.T) { currentEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", multipleSubsets), }, - oldEndpoints: map[ServicePortName][]*fakeEndpointsInfo{ + oldEndpoints: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.2:12", isLocal: false}, + {Endpoint: "1.1.1.2:12", IsLocal: false}, }, }, - expectedResult: map[ServicePortName][]*fakeEndpointsInfo{ + expectedResult: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.2:12", isLocal: false}, + {Endpoint: "1.1.1.2:12", IsLocal: false}, }, }, expectedStaleEndpoints: []ServiceEndpoint{}, @@ -752,26 +720,26 @@ func TestUpdateEndpointsMap(t *testing.T) { currentEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", multipleSubsetsMultiplePortsLocal), }, - oldEndpoints: map[ServicePortName][]*fakeEndpointsInfo{ + oldEndpoints: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: true}, + {Endpoint: "1.1.1.1:11", IsLocal: true}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.1:12", isLocal: true}, + {Endpoint: "1.1.1.1:12", IsLocal: true}, }, makeServicePortName("ns1", "ep1", "p13"): { - {endpoint: "1.1.1.3:13", isLocal: false}, + {Endpoint: "1.1.1.3:13", IsLocal: false}, }, }, - expectedResult: map[ServicePortName][]*fakeEndpointsInfo{ + expectedResult: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: true}, + {Endpoint: "1.1.1.1:11", IsLocal: true}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.1:12", isLocal: true}, + {Endpoint: "1.1.1.1:12", IsLocal: true}, }, makeServicePortName("ns1", "ep1", "p13"): { - {endpoint: "1.1.1.3:13", isLocal: false}, + {Endpoint: "1.1.1.3:13", IsLocal: false}, }, }, expectedStaleEndpoints: []ServiceEndpoint{}, @@ -789,56 +757,56 @@ func TestUpdateEndpointsMap(t *testing.T) { makeTestEndpoints("ns1", "ep1", multipleSubsetsIPsPorts1), makeTestEndpoints("ns2", "ep2", multipleSubsetsIPsPorts2), }, - oldEndpoints: map[ServicePortName][]*fakeEndpointsInfo{ + oldEndpoints: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, - {endpoint: "1.1.1.2:11", isLocal: true}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, + {Endpoint: "1.1.1.2:11", IsLocal: true}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.1:12", isLocal: false}, - {endpoint: "1.1.1.2:12", isLocal: true}, + {Endpoint: "1.1.1.1:12", IsLocal: false}, + {Endpoint: "1.1.1.2:12", IsLocal: true}, }, makeServicePortName("ns1", "ep1", "p13"): { - {endpoint: "1.1.1.3:13", isLocal: false}, - {endpoint: "1.1.1.4:13", isLocal: true}, + {Endpoint: "1.1.1.3:13", IsLocal: false}, + {Endpoint: "1.1.1.4:13", IsLocal: true}, }, makeServicePortName("ns1", "ep1", "p14"): { - {endpoint: "1.1.1.3:14", isLocal: false}, - {endpoint: "1.1.1.4:14", isLocal: true}, + {Endpoint: "1.1.1.3:14", IsLocal: false}, + {Endpoint: "1.1.1.4:14", IsLocal: true}, }, makeServicePortName("ns2", "ep2", "p21"): { - {endpoint: "2.2.2.1:21", isLocal: false}, - {endpoint: "2.2.2.2:21", isLocal: true}, + {Endpoint: "2.2.2.1:21", IsLocal: false}, + {Endpoint: "2.2.2.2:21", IsLocal: true}, }, makeServicePortName("ns2", "ep2", "p22"): { - {endpoint: "2.2.2.1:22", isLocal: false}, - {endpoint: "2.2.2.2:22", isLocal: true}, + {Endpoint: "2.2.2.1:22", IsLocal: false}, + {Endpoint: "2.2.2.2:22", IsLocal: true}, }, }, - expectedResult: map[ServicePortName][]*fakeEndpointsInfo{ + expectedResult: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, - {endpoint: "1.1.1.2:11", isLocal: true}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, + {Endpoint: "1.1.1.2:11", IsLocal: true}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.1:12", isLocal: false}, - {endpoint: "1.1.1.2:12", isLocal: true}, + {Endpoint: "1.1.1.1:12", IsLocal: false}, + {Endpoint: "1.1.1.2:12", IsLocal: true}, }, makeServicePortName("ns1", "ep1", "p13"): { - {endpoint: "1.1.1.3:13", isLocal: false}, - {endpoint: "1.1.1.4:13", isLocal: true}, + {Endpoint: "1.1.1.3:13", IsLocal: false}, + {Endpoint: "1.1.1.4:13", IsLocal: true}, }, makeServicePortName("ns1", "ep1", "p14"): { - {endpoint: "1.1.1.3:14", isLocal: false}, - {endpoint: "1.1.1.4:14", isLocal: true}, + {Endpoint: "1.1.1.3:14", IsLocal: false}, + {Endpoint: "1.1.1.4:14", IsLocal: true}, }, makeServicePortName("ns2", "ep2", "p21"): { - {endpoint: "2.2.2.1:21", isLocal: false}, - {endpoint: "2.2.2.2:21", isLocal: true}, + {Endpoint: "2.2.2.1:21", IsLocal: false}, + {Endpoint: "2.2.2.2:21", IsLocal: true}, }, makeServicePortName("ns2", "ep2", "p22"): { - {endpoint: "2.2.2.1:22", isLocal: false}, - {endpoint: "2.2.2.2:22", isLocal: true}, + {Endpoint: "2.2.2.1:22", IsLocal: false}, + {Endpoint: "2.2.2.2:22", IsLocal: true}, }, }, expectedStaleEndpoints: []ServiceEndpoint{}, @@ -855,10 +823,10 @@ func TestUpdateEndpointsMap(t *testing.T) { currentEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", unnamedPortLocal), }, - oldEndpoints: map[ServicePortName][]*fakeEndpointsInfo{}, - expectedResult: map[ServicePortName][]*fakeEndpointsInfo{ + oldEndpoints: map[ServicePortName][]*EndpointInfoCommon{}, + expectedResult: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", ""): { - {endpoint: "1.1.1.1:11", isLocal: true}, + {Endpoint: "1.1.1.1:11", IsLocal: true}, }, }, expectedStaleEndpoints: []ServiceEndpoint{}, @@ -876,12 +844,12 @@ func TestUpdateEndpointsMap(t *testing.T) { currentEndpoints: []*api.Endpoints{ nil, }, - oldEndpoints: map[ServicePortName][]*fakeEndpointsInfo{ + oldEndpoints: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", ""): { - {endpoint: "1.1.1.1:11", isLocal: true}, + {Endpoint: "1.1.1.1:11", IsLocal: true}, }, }, - expectedResult: map[ServicePortName][]*fakeEndpointsInfo{}, + expectedResult: map[ServicePortName][]*EndpointInfoCommon{}, expectedStaleEndpoints: []ServiceEndpoint{{ Endpoint: "1.1.1.1:11", ServicePortName: makeServicePortName("ns1", "ep1", ""), @@ -896,19 +864,19 @@ func TestUpdateEndpointsMap(t *testing.T) { currentEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", namedPortsLocalNoLocal), }, - oldEndpoints: map[ServicePortName][]*fakeEndpointsInfo{ + oldEndpoints: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, }, - expectedResult: map[ServicePortName][]*fakeEndpointsInfo{ + expectedResult: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, - {endpoint: "1.1.1.2:11", isLocal: true}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, + {Endpoint: "1.1.1.2:11", IsLocal: true}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.1:12", isLocal: false}, - {endpoint: "1.1.1.2:12", isLocal: true}, + {Endpoint: "1.1.1.1:12", IsLocal: false}, + {Endpoint: "1.1.1.2:12", IsLocal: true}, }, }, expectedStaleEndpoints: []ServiceEndpoint{}, @@ -926,19 +894,19 @@ func TestUpdateEndpointsMap(t *testing.T) { currentEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", namedPort), }, - oldEndpoints: map[ServicePortName][]*fakeEndpointsInfo{ + oldEndpoints: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, - {endpoint: "1.1.1.2:11", isLocal: true}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, + {Endpoint: "1.1.1.2:11", IsLocal: true}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.1:12", isLocal: false}, - {endpoint: "1.1.1.2:12", isLocal: true}, + {Endpoint: "1.1.1.1:12", IsLocal: false}, + {Endpoint: "1.1.1.2:12", IsLocal: true}, }, }, - expectedResult: map[ServicePortName][]*fakeEndpointsInfo{ + expectedResult: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, }, expectedStaleEndpoints: []ServiceEndpoint{{ @@ -961,17 +929,17 @@ func TestUpdateEndpointsMap(t *testing.T) { currentEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", multipleSubsetsWithLocal), }, - oldEndpoints: map[ServicePortName][]*fakeEndpointsInfo{ + oldEndpoints: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, }, - expectedResult: map[ServicePortName][]*fakeEndpointsInfo{ + expectedResult: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.2:12", isLocal: true}, + {Endpoint: "1.1.1.2:12", IsLocal: true}, }, }, expectedStaleEndpoints: []ServiceEndpoint{}, @@ -989,17 +957,17 @@ func TestUpdateEndpointsMap(t *testing.T) { currentEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", namedPort), }, - oldEndpoints: map[ServicePortName][]*fakeEndpointsInfo{ + oldEndpoints: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.2:12", isLocal: false}, + {Endpoint: "1.1.1.2:12", IsLocal: false}, }, }, - expectedResult: map[ServicePortName][]*fakeEndpointsInfo{ + expectedResult: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, }, expectedStaleEndpoints: []ServiceEndpoint{{ @@ -1016,14 +984,14 @@ func TestUpdateEndpointsMap(t *testing.T) { currentEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", namedPortRenamed), }, - oldEndpoints: map[ServicePortName][]*fakeEndpointsInfo{ + oldEndpoints: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, }, - expectedResult: map[ServicePortName][]*fakeEndpointsInfo{ + expectedResult: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11-2"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, }, expectedStaleEndpoints: []ServiceEndpoint{{ @@ -1042,14 +1010,14 @@ func TestUpdateEndpointsMap(t *testing.T) { currentEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", namedPortRenumbered), }, - oldEndpoints: map[ServicePortName][]*fakeEndpointsInfo{ + oldEndpoints: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, }, - expectedResult: map[ServicePortName][]*fakeEndpointsInfo{ + expectedResult: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:22", isLocal: false}, + {Endpoint: "1.1.1.1:22", IsLocal: false}, }, }, expectedStaleEndpoints: []ServiceEndpoint{{ @@ -1072,41 +1040,41 @@ func TestUpdateEndpointsMap(t *testing.T) { makeTestEndpoints("ns3", "ep3", complexAfter3), makeTestEndpoints("ns4", "ep4", complexAfter4), }, - oldEndpoints: map[ServicePortName][]*fakeEndpointsInfo{ + oldEndpoints: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, makeServicePortName("ns2", "ep2", "p22"): { - {endpoint: "2.2.2.2:22", isLocal: true}, - {endpoint: "2.2.2.22:22", isLocal: true}, + {Endpoint: "2.2.2.2:22", IsLocal: true}, + {Endpoint: "2.2.2.22:22", IsLocal: true}, }, makeServicePortName("ns2", "ep2", "p23"): { - {endpoint: "2.2.2.3:23", isLocal: true}, + {Endpoint: "2.2.2.3:23", IsLocal: true}, }, makeServicePortName("ns4", "ep4", "p44"): { - {endpoint: "4.4.4.4:44", isLocal: true}, - {endpoint: "4.4.4.5:44", isLocal: true}, + {Endpoint: "4.4.4.4:44", IsLocal: true}, + {Endpoint: "4.4.4.5:44", IsLocal: true}, }, makeServicePortName("ns4", "ep4", "p45"): { - {endpoint: "4.4.4.6:45", isLocal: true}, + {Endpoint: "4.4.4.6:45", IsLocal: true}, }, }, - expectedResult: map[ServicePortName][]*fakeEndpointsInfo{ + expectedResult: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, - {endpoint: "1.1.1.11:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, + {Endpoint: "1.1.1.11:11", IsLocal: false}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.2:12", isLocal: false}, + {Endpoint: "1.1.1.2:12", IsLocal: false}, }, makeServicePortName("ns1", "ep1", "p122"): { - {endpoint: "1.1.1.2:122", isLocal: false}, + {Endpoint: "1.1.1.2:122", IsLocal: false}, }, makeServicePortName("ns3", "ep3", "p33"): { - {endpoint: "3.3.3.3:33", isLocal: false}, + {Endpoint: "3.3.3.3:33", IsLocal: false}, }, makeServicePortName("ns4", "ep4", "p44"): { - {endpoint: "4.4.4.4:44", isLocal: true}, + {Endpoint: "4.4.4.4:44", IsLocal: true}, }, }, expectedStaleEndpoints: []ServiceEndpoint{{ @@ -1141,10 +1109,10 @@ func TestUpdateEndpointsMap(t *testing.T) { currentEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", unnamedPort), }, - oldEndpoints: map[ServicePortName][]*fakeEndpointsInfo{}, - expectedResult: map[ServicePortName][]*fakeEndpointsInfo{ + oldEndpoints: map[ServicePortName][]*EndpointInfoCommon{}, + expectedResult: map[ServicePortName][]*EndpointInfoCommon{ makeServicePortName("ns1", "ep1", ""): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, }, expectedStaleEndpoints: []ServiceEndpoint{}, @@ -1224,7 +1192,7 @@ func TestUpdateEndpointsMap(t *testing.T) { } } -func compareEndpointsMaps(t *testing.T, tci int, newMap EndpointsMap, expected map[ServicePortName][]*fakeEndpointsInfo) { +func compareEndpointsMaps(t *testing.T, tci int, newMap EndpointsMap, expected map[ServicePortName][]*EndpointInfoCommon) { if len(newMap) != len(expected) { t.Errorf("[%d] expected %d results, got %d: %v", tci, len(expected), len(newMap), newMap) } @@ -1233,7 +1201,7 @@ func compareEndpointsMaps(t *testing.T, tci int, newMap EndpointsMap, expected m t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(expected[x]), x, len(newMap[x])) } else { for i := range expected[x] { - newEp, ok := newMap[x][i].(*fakeEndpointsInfo) + newEp, ok := newMap[x][i].(*EndpointInfoCommon) if !ok { t.Errorf("Failed to cast endpointsInfo") continue diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 3ec67c6e50..80f7f54fde 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -38,9 +38,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/record" - apiservice "k8s.io/kubernetes/pkg/api/service" api "k8s.io/kubernetes/pkg/apis/core" - "k8s.io/kubernetes/pkg/apis/core/helper" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/metrics" @@ -141,17 +139,7 @@ const sysctlBridgeCallIPTables = "net/bridge/bridge-nf-call-iptables" // internal struct for string service information type serviceInfo struct { - clusterIP net.IP - port int - protocol api.Protocol - nodePort int - loadBalancerStatus api.LoadBalancerStatus - sessionAffinityType api.ServiceAffinity - stickyMaxAgeSeconds int - externalIPs []string - loadBalancerSourceRanges []string - onlyNodeLocalEndpoints bool - healthCheckNodePort int + *proxy.ServiceInfoCommon // The following fields are computed and stored for performance reasons. serviceNameString string servicePortChainName utiliptables.Chain @@ -160,47 +148,13 @@ type serviceInfo struct { } // returns a new proxy.ServicePort which abstracts a serviceInfo -func newServiceInfo(port *api.ServicePort, service *api.Service) proxy.ServicePort { - onlyNodeLocalEndpoints := false - if apiservice.RequestsOnlyLocalTraffic(service) { - onlyNodeLocalEndpoints = true - } - var stickyMaxAgeSeconds int - if service.Spec.SessionAffinity == api.ServiceAffinityClientIP { - // Kube-apiserver side guarantees SessionAffinityConfig won't be nil when session affinity type is ClientIP - stickyMaxAgeSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds) - } - info := &serviceInfo{ - clusterIP: net.ParseIP(service.Spec.ClusterIP), - port: int(port.Port), - protocol: port.Protocol, - nodePort: int(port.NodePort), - // Deep-copy in case the service instance changes - loadBalancerStatus: *helper.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer), - sessionAffinityType: service.Spec.SessionAffinity, - stickyMaxAgeSeconds: stickyMaxAgeSeconds, - externalIPs: make([]string, len(service.Spec.ExternalIPs)), - loadBalancerSourceRanges: make([]string, len(service.Spec.LoadBalancerSourceRanges)), - onlyNodeLocalEndpoints: onlyNodeLocalEndpoints, - } - - copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges) - copy(info.externalIPs, service.Spec.ExternalIPs) - - svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name} - - if apiservice.NeedsHealthCheck(service) { - p := service.Spec.HealthCheckNodePort - if p == 0 { - glog.Errorf("Service %q has no healthcheck nodeport", svcName.String()) - } else { - info.healthCheckNodePort = int(p) - } - } +func customizeServiceInfo(port *api.ServicePort, service *api.Service, infoCommon *proxy.ServiceInfoCommon) proxy.ServicePort { + info := &serviceInfo{ServiceInfoCommon: infoCommon} // Store the following for performance reasons. - protocol := strings.ToLower(string(info.protocol)) + svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name} + protocol := strings.ToLower(string(info.Protocol)) info.serviceNameString = svcPortName.String() info.servicePortChainName = servicePortChainName(info.serviceNameString, protocol) info.serviceFirewallChainName = serviceFirewallChainName(info.serviceNameString, protocol) @@ -209,37 +163,9 @@ func newServiceInfo(port *api.ServicePort, service *api.Service) proxy.ServicePo return info } -// ClusterIP is part of proxy.ServicePort interface. -func (info *serviceInfo) ClusterIP() string { - return info.clusterIP.String() -} - -// Port is part of proxy.ServicePort interface. -func (info *serviceInfo) Port() int { - return info.port -} - -// Protocol is part of proxy.ServicePort interface. -func (info *serviceInfo) Protocol() api.Protocol { - return info.protocol -} - -// String is part of proxy.ServicePort interface. -func (info *serviceInfo) String() string { - return fmt.Sprintf("%s:%d/%s", info.clusterIP, info.port, info.protocol) -} - -// HealthCheckNodePort is part of proxy.ServicePort interface. -func (info *serviceInfo) HealthCheckNodePort() int { - return info.healthCheckNodePort -} - -var _ proxy.ServicePort = &serviceInfo{} - // internal struct for endpoints information type endpointsInfo struct { - endpoint string // TODO: should be an endpointString type - isLocal bool + *proxy.EndpointInfoCommon // The following fields we lazily compute and store here for performance // reasons. If the protocol is the same as you expect it to be, then the // chainName can be reused, otherwise it should be recomputed. @@ -248,52 +174,32 @@ type endpointsInfo struct { } // returns a new proxy.Endpoint which abstracts a endpointsInfo -func newEndpointsInfo(IP string, port int, isLocal bool) proxy.Endpoint { - return &endpointsInfo{ - endpoint: net.JoinHostPort(IP, strconv.Itoa(port)), - isLocal: isLocal, - } +func customizeEndpointInfo(IP string, port int, isLocal bool, infoCommon *proxy.EndpointInfoCommon) proxy.Endpoint { + return &endpointsInfo{EndpointInfoCommon: infoCommon} } -// IsLocal is part of proxy.Endpoint interface. -func (e *endpointsInfo) IsLocal() bool { - return e.isLocal -} - -// IP is part of proxy.Endpoint interface. -func (e *endpointsInfo) IP() string { - return utilproxy.IPPart(e.endpoint) -} - -// Equal is part of proxy.Endpoint interface. +// Equal overrides the Equal() function imlemented by proxy.EndpointInfoCommon. func (e *endpointsInfo) Equal(other proxy.Endpoint) bool { o, ok := other.(*endpointsInfo) if !ok { glog.Errorf("Failed to cast endpointsInfo") return false } - return e.endpoint == o.endpoint && - e.isLocal == o.isLocal && + return e.Endpoint == o.Endpoint && + e.IsLocal == o.IsLocal && e.protocol == o.protocol && e.chainName == o.chainName } -// String is part of proxy.Endpoint interface. -func (e *endpointsInfo) String() string { - return e.endpoint -} - // Returns the endpoint chain name for a given endpointsInfo. func (e *endpointsInfo) endpointChain(svcNameString, protocol string) utiliptables.Chain { if e.protocol != protocol { e.protocol = protocol - e.chainName = servicePortEndpointChainName(svcNameString, protocol, e.endpoint) + e.chainName = servicePortEndpointChainName(svcNameString, protocol, e.Endpoint) } return e.chainName } -var _ proxy.Endpoint = &endpointsInfo{} - // Proxier is an iptables based proxy for connections between a localhost:lport // and services that provide the actual backends. type Proxier struct { @@ -406,12 +312,13 @@ func NewProxier(ipt utiliptables.Interface, healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps + isIPv6 := ipt.IsIpv6() proxier := &Proxier{ portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), serviceMap: make(proxy.ServiceMap), - serviceChanges: proxy.NewServiceChangeTracker(), + serviceChanges: proxy.NewServiceChangeTracker(customizeServiceInfo, &isIPv6, recorder), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointChangeTracker(hostname), + endpointsChanges: proxy.NewEndpointChangeTracker(hostname, customizeEndpointInfo, &isIPv6, recorder), iptables: ipt, masqueradeAll: masqueradeAll, masqueradeMark: masqueradeMark, @@ -592,19 +499,19 @@ func (proxier *Proxier) isInitialized() bool { } func (proxier *Proxier) OnServiceAdd(service *api.Service) { - if proxier.serviceChanges.Update(nil, service, newServiceInfo) && proxier.isInitialized() { + if proxier.serviceChanges.Update(nil, service) && proxier.isInitialized() { proxier.syncRunner.Run() } } func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) { - if proxier.serviceChanges.Update(oldService, service, newServiceInfo) && proxier.isInitialized() { + if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() { proxier.syncRunner.Run() } } func (proxier *Proxier) OnServiceDelete(service *api.Service) { - if proxier.serviceChanges.Update(service, nil, newServiceInfo) && proxier.isInitialized() { + if proxier.serviceChanges.Update(service, nil) && proxier.isInitialized() { proxier.syncRunner.Run() } } @@ -620,19 +527,19 @@ func (proxier *Proxier) OnServiceSynced() { } func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) { - if proxier.endpointsChanges.Update(nil, endpoints, newEndpointsInfo) && proxier.isInitialized() { + if proxier.endpointsChanges.Update(nil, endpoints) && proxier.isInitialized() { proxier.syncRunner.Run() } } func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) { - if proxier.endpointsChanges.Update(oldEndpoints, endpoints, newEndpointsInfo) && proxier.isInitialized() { + if proxier.endpointsChanges.Update(oldEndpoints, endpoints) && proxier.isInitialized() { proxier.syncRunner.Run() } } func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) { - if proxier.endpointsChanges.Update(endpoints, nil, newEndpointsInfo) && proxier.isInitialized() { + if proxier.endpointsChanges.Update(endpoints, nil) && proxier.isInitialized() { proxier.syncRunner.Run() } } @@ -693,9 +600,9 @@ func servicePortEndpointChainName(servicePortName string, protocol string, endpo // TODO: move it to util func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) { for _, epSvcPair := range connectionMap { - if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == api.ProtocolUDP { + if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.GetProtocol() == api.ProtocolUDP { endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) - err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP(), endpointIP, v1.ProtocolUDP) + err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.GetClusterIP(), endpointIP, v1.ProtocolUDP) if err != nil { glog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err) } @@ -730,9 +637,9 @@ func (proxier *Proxier) syncProxyRules() { staleServices := serviceUpdateResult.UDPStaleClusterIP // merge stale services gathered from updateEndpointsMap for _, svcPortName := range endpointUpdateResult.StaleServiceNames { - if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == api.ProtocolUDP { - glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIP()) - staleServices.Insert(svcInfo.ClusterIP()) + if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.GetProtocol() == api.ProtocolUDP { + glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.GetClusterIP()) + staleServices.Insert(svcInfo.GetClusterIP()) } } @@ -851,8 +758,8 @@ func (proxier *Proxier) syncProxyRules() { glog.Errorf("Failed to cast serviceInfo %q", svcName.String()) continue } - isIPv6 := conntrack.IsIPv6(svcInfo.clusterIP) - protocol := strings.ToLower(string(svcInfo.protocol)) + isIPv6 := conntrack.IsIPv6(svcInfo.ClusterIP) + protocol := strings.ToLower(string(svcInfo.Protocol)) svcNameString := svcInfo.serviceNameString hasEndpoints := len(proxier.endpointsMap[svcName]) > 0 @@ -868,7 +775,7 @@ func (proxier *Proxier) syncProxyRules() { } svcXlbChain := svcInfo.serviceLBChainName - if svcInfo.onlyNodeLocalEndpoints { + if svcInfo.OnlyNodeLocalEndpoints { // Only for services request OnlyLocal traffic // create the per-service LB chain, retaining counters if possible. if lbChain, ok := existingNATChains[svcXlbChain]; ok { @@ -885,8 +792,8 @@ func (proxier *Proxier) syncProxyRules() { "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString), "-m", protocol, "-p", protocol, - "-d", utilproxy.ToCIDR(svcInfo.clusterIP), - "--dport", strconv.Itoa(svcInfo.port), + "-d", utilproxy.ToCIDR(svcInfo.ClusterIP), + "--dport", strconv.Itoa(svcInfo.Port), ) if proxier.masqueradeAll { writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) @@ -904,14 +811,14 @@ func (proxier *Proxier) syncProxyRules() { "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", protocol, "-p", protocol, - "-d", utilproxy.ToCIDR(svcInfo.clusterIP), - "--dport", strconv.Itoa(svcInfo.port), + "-d", utilproxy.ToCIDR(svcInfo.ClusterIP), + "--dport", strconv.Itoa(svcInfo.Port), "-j", "REJECT", ) } // Capture externalIPs. - for _, externalIP := range svcInfo.externalIPs { + for _, externalIP := range svcInfo.ExternalIPs { // If the "external" IP happens to be an IP that is local to this // machine, hold the local port open so no other process can open it // (because the socket might open but it would never work). @@ -921,7 +828,7 @@ func (proxier *Proxier) syncProxyRules() { lp := utilproxy.LocalPort{ Description: "externalIP for " + svcNameString, IP: externalIP, - Port: svcInfo.Port(), + Port: svcInfo.Port, Protocol: protocol, } if proxier.portsMap[lp] != nil { @@ -952,7 +859,7 @@ func (proxier *Proxier) syncProxyRules() { "-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString), "-m", protocol, "-p", protocol, "-d", utilproxy.ToCIDR(net.ParseIP(externalIP)), - "--dport", strconv.Itoa(svcInfo.port), + "--dport", strconv.Itoa(svcInfo.Port), ) // We have to SNAT packets to external IPs. writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) @@ -975,7 +882,7 @@ func (proxier *Proxier) syncProxyRules() { "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", protocol, "-p", protocol, "-d", utilproxy.ToCIDR(net.ParseIP(externalIP)), - "--dport", strconv.Itoa(svcInfo.port), + "--dport", strconv.Itoa(svcInfo.Port), "-j", "REJECT", ) } @@ -984,7 +891,7 @@ func (proxier *Proxier) syncProxyRules() { // Capture load-balancer ingress. if hasEndpoints { fwChain := svcInfo.serviceFirewallChainName - for _, ingress := range svcInfo.loadBalancerStatus.Ingress { + for _, ingress := range svcInfo.LoadBalancerStatus.Ingress { if ingress.IP != "" { // create service firewall chain if chain, ok := existingNATChains[fwChain]; ok { @@ -1002,7 +909,7 @@ func (proxier *Proxier) syncProxyRules() { "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString), "-m", protocol, "-p", protocol, "-d", utilproxy.ToCIDR(net.ParseIP(ingress.IP)), - "--dport", strconv.Itoa(svcInfo.port), + "--dport", strconv.Itoa(svcInfo.Port), ) // jump to service firewall chain writeLine(proxier.natRules, append(args, "-j", string(fwChain))...) @@ -1016,18 +923,18 @@ func (proxier *Proxier) syncProxyRules() { chosenChain := svcXlbChain // If we are proxying globally, we need to masquerade in case we cross nodes. // If we are proxying only locally, we can retain the source IP. - if !svcInfo.onlyNodeLocalEndpoints { + if !svcInfo.OnlyNodeLocalEndpoints { writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) chosenChain = svcChain } - if len(svcInfo.loadBalancerSourceRanges) == 0 { + if len(svcInfo.LoadBalancerSourceRanges) == 0 { // allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain writeLine(proxier.natRules, append(args, "-j", string(chosenChain))...) } else { // firewall filter based on each source range allowFromNode := false - for _, src := range svcInfo.loadBalancerSourceRanges { + for _, src := range svcInfo.LoadBalancerSourceRanges { writeLine(proxier.natRules, append(args, "-s", src, "-j", string(chosenChain))...) // ignore error because it has been validated _, cidr, _ := net.ParseCIDR(src) @@ -1054,13 +961,13 @@ func (proxier *Proxier) syncProxyRules() { // Capture nodeports. If we had more than 2 rules it might be // worthwhile to make a new per-service chain for nodeport rules, but // with just 2 rules it ends up being a waste and a cognitive burden. - if svcInfo.nodePort != 0 { + if svcInfo.NodePort != 0 { // Hold the local port open so no other process can open it // (because the socket might open but it would never work). lp := utilproxy.LocalPort{ Description: "nodePort for " + svcNameString, IP: "", - Port: svcInfo.nodePort, + Port: svcInfo.NodePort, Protocol: protocol, } if proxier.portsMap[lp] != nil { @@ -1090,9 +997,9 @@ func (proxier *Proxier) syncProxyRules() { "-A", string(kubeNodePortsChain), "-m", "comment", "--comment", svcNameString, "-m", protocol, "-p", protocol, - "--dport", strconv.Itoa(svcInfo.nodePort), + "--dport", strconv.Itoa(svcInfo.NodePort), ) - if !svcInfo.onlyNodeLocalEndpoints { + if !svcInfo.OnlyNodeLocalEndpoints { // Nodeports need SNAT, unless they're local. writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) // Jump to the service chain. @@ -1115,7 +1022,7 @@ func (proxier *Proxier) syncProxyRules() { "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", "addrtype", "--dst-type", "LOCAL", "-m", protocol, "-p", protocol, - "--dport", strconv.Itoa(svcInfo.nodePort), + "--dport", strconv.Itoa(svcInfo.NodePort), "-j", "REJECT", ) } @@ -1151,13 +1058,13 @@ func (proxier *Proxier) syncProxyRules() { } // First write session affinity rules, if applicable. - if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { + if svcInfo.SessionAffinityType == api.ServiceAffinityClientIP { for _, endpointChain := range endpointChains { writeLine(proxier.natRules, "-A", string(svcChain), "-m", "comment", "--comment", svcNameString, "-m", "recent", "--name", string(endpointChain), - "--rcheck", "--seconds", strconv.Itoa(svcInfo.stickyMaxAgeSeconds), "--reap", + "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds), "--reap", "-j", string(endpointChain)) } } @@ -1196,16 +1103,16 @@ func (proxier *Proxier) syncProxyRules() { "-s", utilproxy.ToCIDR(net.ParseIP(epIP)), "-j", string(KubeMarkMasqChain))...) // Update client-affinity lists. - if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { + if svcInfo.SessionAffinityType == api.ServiceAffinityClientIP { args = append(args, "-m", "recent", "--name", string(endpointChain), "--set") } // DNAT to final destination. - args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].endpoint) + args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].Endpoint) writeLine(proxier.natRules, args...) } // The logic below this applies only if this service is marked as OnlyLocal - if !svcInfo.onlyNodeLocalEndpoints { + if !svcInfo.OnlyNodeLocalEndpoints { continue } @@ -1214,7 +1121,7 @@ func (proxier *Proxier) syncProxyRules() { localEndpoints := make([]*endpointsInfo, 0) localEndpointChains := make([]utiliptables.Chain, 0) for i := range endpointChains { - if endpoints[i].isLocal { + if endpoints[i].IsLocal { // These slices parallel each other; must be kept in sync localEndpoints = append(localEndpoints, endpoints[i]) localEndpointChains = append(localEndpointChains, endpointChains[i]) @@ -1247,13 +1154,13 @@ func (proxier *Proxier) syncProxyRules() { writeLine(proxier.natRules, args...) } else { // First write session affinity rules only over local endpoints, if applicable. - if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { + if svcInfo.SessionAffinityType == api.ServiceAffinityClientIP { for _, endpointChain := range localEndpointChains { writeLine(proxier.natRules, "-A", string(svcXlbChain), "-m", "comment", "--comment", svcNameString, "-m", "recent", "--name", string(endpointChain), - "--rcheck", "--seconds", strconv.Itoa(svcInfo.stickyMaxAgeSeconds), "--reap", + "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds), "--reap", "-j", string(endpointChain)) } } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index e335df23ab..734e5bde4f 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -179,12 +179,14 @@ func TestGetChainLinesMultipleTables(t *testing.T) { func newFakeServiceInfo(service proxy.ServicePortName, ip net.IP, port int, protocol api.Protocol, onlyNodeLocalEndpoints bool) *serviceInfo { return &serviceInfo{ - sessionAffinityType: api.ServiceAffinityNone, // default - stickyMaxAgeSeconds: int(api.DefaultClientIPServiceAffinitySeconds), // default - clusterIP: ip, - port: port, - protocol: protocol, - onlyNodeLocalEndpoints: onlyNodeLocalEndpoints, + ServiceInfoCommon: &proxy.ServiceInfoCommon{ + SessionAffinityType: api.ServiceAffinityNone, // default + StickyMaxAgeSeconds: int(api.DefaultClientIPServiceAffinitySeconds), // default + ClusterIP: ip, + Port: port, + Protocol: protocol, + OnlyNodeLocalEndpoints: onlyNodeLocalEndpoints, + }, } } @@ -391,9 +393,9 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { p := &Proxier{ exec: &fakeexec.FakeExec{}, serviceMap: make(proxy.ServiceMap), - serviceChanges: proxy.NewServiceChangeTracker(), + serviceChanges: proxy.NewServiceChangeTracker(customizeServiceInfo, nil, nil), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointChangeTracker(testHostname), + endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, customizeEndpointInfo, nil, nil), iptables: ipt, clusterCIDR: "10.0.0.0/24", hostname: testHostname, @@ -821,7 +823,7 @@ func TestExternalIPsReject(t *testing.T) { kubeSvcRules := ipt.GetRules(string(kubeExternalServicesChain)) if !hasJump(kubeSvcRules, iptablestest.Reject, svcExternalIPs, svcPort) { - errorf(fmt.Sprintf("Failed to a %v rule for externalIP %v with no endpoints", iptablestest.Reject, svcPortName), kubeSvcRules, t) + errorf(fmt.Sprintf("Failed to find a %v rule for externalIP %v with no endpoints", iptablestest.Reject, svcPortName), kubeSvcRules, t) } } @@ -1379,7 +1381,10 @@ func compareEndpointsMaps(t *testing.T, tci int, newMap proxy.EndpointsMap, expe t.Errorf("Failed to cast endpointsInfo") continue } - if *newEp != *(expected[x][i]) { + if newEp.Endpoint != expected[x][i].Endpoint || + newEp.IsLocal != expected[x][i].IsLocal || + newEp.protocol != expected[x][i].protocol || + newEp.chainName != expected[x][i].chainName { t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, expected[x][i], newEp) } } @@ -1721,12 +1726,12 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", ""): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: false}}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", ""): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: false}}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, @@ -1742,12 +1747,12 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: true}}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: true}}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, @@ -1765,18 +1770,18 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: false}}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.2:12", isLocal: false}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.2:12", IsLocal: false}}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: false}}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.2:12", isLocal: false}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.2:12", IsLocal: false}}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, @@ -1792,24 +1797,24 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: true}}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.1:12", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:12", IsLocal: true}}, }, makeServicePortName("ns1", "ep1", "p13"): { - {endpoint: "1.1.1.3:13", isLocal: false}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.3:13", IsLocal: false}}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: true}}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.1:12", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:12", IsLocal: true}}, }, makeServicePortName("ns1", "ep1", "p13"): { - {endpoint: "1.1.1.3:13", isLocal: false}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.3:13", IsLocal: false}}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, @@ -1829,54 +1834,54 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, - {endpoint: "1.1.1.2:11", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: false}}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.2:11", IsLocal: true}}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.1:12", isLocal: false}, - {endpoint: "1.1.1.2:12", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:12", IsLocal: false}}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.2:12", IsLocal: true}}, }, makeServicePortName("ns1", "ep1", "p13"): { - {endpoint: "1.1.1.3:13", isLocal: false}, - {endpoint: "1.1.1.4:13", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.3:13", IsLocal: false}}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.4:13", IsLocal: true}}, }, makeServicePortName("ns1", "ep1", "p14"): { - {endpoint: "1.1.1.3:14", isLocal: false}, - {endpoint: "1.1.1.4:14", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.3:14", IsLocal: false}}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.4:14", IsLocal: true}}, }, makeServicePortName("ns2", "ep2", "p21"): { - {endpoint: "2.2.2.1:21", isLocal: false}, - {endpoint: "2.2.2.2:21", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "2.2.2.1:21", IsLocal: false}}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "2.2.2.2:21", IsLocal: true}}, }, makeServicePortName("ns2", "ep2", "p22"): { - {endpoint: "2.2.2.1:22", isLocal: false}, - {endpoint: "2.2.2.2:22", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "2.2.2.1:22", IsLocal: false}}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "2.2.2.2:22", IsLocal: true}}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, - {endpoint: "1.1.1.2:11", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: false}}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.2:11", IsLocal: true}}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.1:12", isLocal: false}, - {endpoint: "1.1.1.2:12", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:12", IsLocal: false}}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.2:12", IsLocal: true}}, }, makeServicePortName("ns1", "ep1", "p13"): { - {endpoint: "1.1.1.3:13", isLocal: false}, - {endpoint: "1.1.1.4:13", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.3:13", IsLocal: false}}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.4:13", IsLocal: true}}, }, makeServicePortName("ns1", "ep1", "p14"): { - {endpoint: "1.1.1.3:14", isLocal: false}, - {endpoint: "1.1.1.4:14", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.3:14", IsLocal: false}}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.4:14", IsLocal: true}}, }, makeServicePortName("ns2", "ep2", "p21"): { - {endpoint: "2.2.2.1:21", isLocal: false}, - {endpoint: "2.2.2.2:21", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "2.2.2.1:21", IsLocal: false}}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "2.2.2.2:21", IsLocal: true}}, }, makeServicePortName("ns2", "ep2", "p22"): { - {endpoint: "2.2.2.1:22", isLocal: false}, - {endpoint: "2.2.2.2:22", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "2.2.2.1:22", IsLocal: false}}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "2.2.2.2:22", IsLocal: true}}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, @@ -1896,7 +1901,7 @@ func Test_updateEndpointsMap(t *testing.T) { oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", ""): { - {endpoint: "1.1.1.1:11", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: true}}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, @@ -1916,7 +1921,7 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", ""): { - {endpoint: "1.1.1.1:11", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: true}}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, @@ -1936,17 +1941,17 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: false}}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, - {endpoint: "1.1.1.2:11", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: false}}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.2:11", IsLocal: true}}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.1:12", isLocal: false}, - {endpoint: "1.1.1.2:12", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:12", IsLocal: false}}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.2:12", IsLocal: true}}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, @@ -1966,17 +1971,17 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, - {endpoint: "1.1.1.2:11", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: false}}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.2:11", IsLocal: true}}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.1:12", isLocal: false}, - {endpoint: "1.1.1.2:12", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:12", IsLocal: false}}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.2:12", IsLocal: true}}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: false}}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{{ @@ -2001,15 +2006,15 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: false}}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: false}}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.2:12", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.2:12", IsLocal: true}}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, @@ -2029,15 +2034,15 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: false}}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.2:12", isLocal: false}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.2:12", IsLocal: false}}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: false}}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{{ @@ -2056,12 +2061,12 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: false}}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11-2"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: false}}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{{ @@ -2082,12 +2087,12 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: false}}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:22", isLocal: false}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:22", IsLocal: false}}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{{ @@ -2112,39 +2117,39 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: false}}, }, makeServicePortName("ns2", "ep2", "p22"): { - {endpoint: "2.2.2.2:22", isLocal: true}, - {endpoint: "2.2.2.22:22", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "2.2.2.2:22", IsLocal: true}}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "2.2.2.22:22", IsLocal: true}}, }, makeServicePortName("ns2", "ep2", "p23"): { - {endpoint: "2.2.2.3:23", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "2.2.2.3:23", IsLocal: true}}, }, makeServicePortName("ns4", "ep4", "p44"): { - {endpoint: "4.4.4.4:44", isLocal: true}, - {endpoint: "4.4.4.5:44", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "4.4.4.4:44", IsLocal: true}}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "4.4.4.5:44", IsLocal: true}}, }, makeServicePortName("ns4", "ep4", "p45"): { - {endpoint: "4.4.4.6:45", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "4.4.4.6:45", IsLocal: true}}, }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, - {endpoint: "1.1.1.11:11", isLocal: false}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: false}}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.11:11", IsLocal: false}}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.2:12", isLocal: false}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.2:12", IsLocal: false}}, }, makeServicePortName("ns1", "ep1", "p122"): { - {endpoint: "1.1.1.2:122", isLocal: false}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.2:122", IsLocal: false}}, }, makeServicePortName("ns3", "ep3", "p33"): { - {endpoint: "3.3.3.3:33", isLocal: false}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "3.3.3.3:33", IsLocal: false}}, }, makeServicePortName("ns4", "ep4", "p44"): { - {endpoint: "4.4.4.4:44", isLocal: true}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "4.4.4.4:44", IsLocal: true}}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{{ @@ -2182,7 +2187,7 @@ func Test_updateEndpointsMap(t *testing.T) { oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", ""): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {EndpointInfoCommon: &proxy.EndpointInfoCommon{Endpoint: "1.1.1.1:11", IsLocal: false}}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index ad2f74449c..3a0261a1f9 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -37,9 +37,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/record" - apiservice "k8s.io/kubernetes/pkg/api/service" api "k8s.io/kubernetes/pkg/apis/core" - "k8s.io/kubernetes/pkg/apis/core/helper" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/metrics" @@ -291,6 +289,10 @@ func NewProxier(ipt utiliptables.Interface, nodeIP = net.ParseIP("127.0.0.1") } + isIPv6 := conntrack.IsIPv6(nodeIP) + + glog.V(2).Infof("nodeIP: %v, isIPv6: %v", nodeIP, isIPv6) + if len(clusterCIDR) == 0 { glog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic") } @@ -302,16 +304,12 @@ func NewProxier(ipt utiliptables.Interface, healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps - isIPv6 := conntrack.IsIPv6(nodeIP) - - glog.V(2).Infof("nodeIP: %v, isIPv6: %v", nodeIP, isIPv6) - proxier := &Proxier{ portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), serviceMap: make(proxy.ServiceMap), - serviceChanges: proxy.NewServiceChangeTracker(), + serviceChanges: proxy.NewServiceChangeTracker(customizeServiceInfo, &isIPv6, recorder), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointChangeTracker(hostname), + endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder), syncPeriod: syncPeriod, minSyncPeriod: minSyncPeriod, iptables: ipt, @@ -353,140 +351,23 @@ func NewProxier(ipt utiliptables.Interface, // internal struct for string service information type serviceInfo struct { - clusterIP net.IP - port int - protocol api.Protocol - nodePort int - loadBalancerStatus api.LoadBalancerStatus - sessionAffinityType api.ServiceAffinity - stickyMaxAgeSeconds int - externalIPs []string - loadBalancerSourceRanges []string - onlyNodeLocalEndpoints bool - healthCheckNodePort int + *proxy.ServiceInfoCommon // The following fields are computed and stored for performance reasons. serviceNameString string } // returns a new proxy.ServicePort which abstracts a serviceInfo -func newServiceInfo(port *api.ServicePort, service *api.Service) proxy.ServicePort { - onlyNodeLocalEndpoints := false - if apiservice.RequestsOnlyLocalTraffic(service) { - onlyNodeLocalEndpoints = true - } - var stickyMaxAgeSeconds int - if service.Spec.SessionAffinity == api.ServiceAffinityClientIP { - stickyMaxAgeSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds) - } - info := &serviceInfo{ - clusterIP: net.ParseIP(service.Spec.ClusterIP), - port: int(port.Port), - protocol: port.Protocol, - nodePort: int(port.NodePort), - // Deep-copy in case the service instance changes - loadBalancerStatus: *helper.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer), - sessionAffinityType: service.Spec.SessionAffinity, - stickyMaxAgeSeconds: stickyMaxAgeSeconds, - externalIPs: make([]string, len(service.Spec.ExternalIPs)), - loadBalancerSourceRanges: make([]string, len(service.Spec.LoadBalancerSourceRanges)), - onlyNodeLocalEndpoints: onlyNodeLocalEndpoints, - } - - copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges) - copy(info.externalIPs, service.Spec.ExternalIPs) - - svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name} - - if apiservice.NeedsHealthCheck(service) { - p := service.Spec.HealthCheckNodePort - if p == 0 { - glog.Errorf("Service %q has no healthcheck nodeport", svcName.String()) - } else { - info.healthCheckNodePort = int(p) - } - } +func customizeServiceInfo(port *api.ServicePort, service *api.Service, infoCommon *proxy.ServiceInfoCommon) proxy.ServicePort { + info := &serviceInfo{ServiceInfoCommon: infoCommon} // Store the following for performance reasons. + svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name} info.serviceNameString = svcPortName.String() return info } -// ClusterIP is part of ServicePort interface. -func (info *serviceInfo) ClusterIP() string { - return info.clusterIP.String() -} - -// Port is part of ServicePort interface. -func (info *serviceInfo) Port() int { - return info.port -} - -// Protocol is part of ServicePort interface. -func (info *serviceInfo) Protocol() api.Protocol { - return info.protocol -} - -// String is part of ServicePort interface. -func (info *serviceInfo) String() string { - return fmt.Sprintf("%s:%d/%s", info.clusterIP, info.port, info.protocol) -} - -// HealthCheckNodePort is part of ServicePort interface. -func (info *serviceInfo) HealthCheckNodePort() int { - return info.healthCheckNodePort -} - -var _ proxy.ServicePort = &serviceInfo{} - -// internal struct for endpoints information -type endpointsInfo struct { - endpoint string // TODO: should be an endpointString type - isLocal bool -} - -// returns a new proxy.Endpoint which abstracts a endpointsInfo -func newEndpointsInfo(IP string, port int, isLocal bool) proxy.Endpoint { - return &endpointsInfo{ - endpoint: net.JoinHostPort(IP, strconv.Itoa(port)), - isLocal: isLocal, - } -} - -// IsLocal is part of proxy.Endpoint interface. -func (e *endpointsInfo) IsLocal() bool { - return e.isLocal -} - -// String is part of proxy.Endpoint interface. -func (e *endpointsInfo) String() string { - return fmt.Sprintf("%v", e.endpoint) -} - -// IP returns just the IP part of the endpoint, it's a part of proxy.Endpoints interface. -func (e *endpointsInfo) IP() string { - return utilproxy.IPPart(e.endpoint) -} - -// PortPart returns just the Port part of the endpoint. -func (e *endpointsInfo) PortPart() (int, error) { - return utilproxy.PortPart(e.endpoint) -} - -// Equal is part of proxy.Endpoint interface. -func (e *endpointsInfo) Equal(other proxy.Endpoint) bool { - o, ok := other.(*endpointsInfo) - if !ok { - glog.Errorf("Failed to cast endpointsInfo") - return false - } - return e.endpoint == o.endpoint && - e.isLocal == o.isLocal -} - -var _ proxy.Endpoint = &endpointsInfo{} - // KernelHandler can handle the current installed kernel modules. type KernelHandler interface { GetModules() ([]string, error) @@ -668,21 +549,21 @@ func (proxier *Proxier) isInitialized() bool { // OnServiceAdd is called whenever creation of new service object is observed. func (proxier *Proxier) OnServiceAdd(service *api.Service) { - if proxier.serviceChanges.Update(nil, service, newServiceInfo) && proxier.isInitialized() { + if proxier.serviceChanges.Update(nil, service) && proxier.isInitialized() { proxier.syncRunner.Run() } } // OnServiceUpdate is called whenever modification of an existing service object is observed. func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) { - if proxier.serviceChanges.Update(oldService, service, newServiceInfo) && proxier.isInitialized() { + if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() { proxier.syncRunner.Run() } } // OnServiceDelete is called whenever deletion of an existing service object is observed. func (proxier *Proxier) OnServiceDelete(service *api.Service) { - if proxier.serviceChanges.Update(service, nil, newServiceInfo) && proxier.isInitialized() { + if proxier.serviceChanges.Update(service, nil) && proxier.isInitialized() { proxier.syncRunner.Run() } } @@ -700,21 +581,21 @@ func (proxier *Proxier) OnServiceSynced() { // OnEndpointsAdd is called whenever creation of new endpoints object is observed. func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) { - if proxier.endpointsChanges.Update(nil, endpoints, newEndpointsInfo) && proxier.isInitialized() { + if proxier.endpointsChanges.Update(nil, endpoints) && proxier.isInitialized() { proxier.syncRunner.Run() } } // OnEndpointsUpdate is called whenever modification of an existing endpoints object is observed. func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) { - if proxier.endpointsChanges.Update(oldEndpoints, endpoints, newEndpointsInfo) && proxier.isInitialized() { + if proxier.endpointsChanges.Update(oldEndpoints, endpoints) && proxier.isInitialized() { proxier.syncRunner.Run() } } // OnEndpointsDelete is called whenever deletion of an existing endpoints object is observed. func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) { - if proxier.endpointsChanges.Update(endpoints, nil, newEndpointsInfo) && proxier.isInitialized() { + if proxier.endpointsChanges.Update(endpoints, nil) && proxier.isInitialized() { proxier.syncRunner.Run() } } @@ -757,9 +638,9 @@ func (proxier *Proxier) syncProxyRules() { staleServices := serviceUpdateResult.UDPStaleClusterIP // merge stale services gathered from updateEndpointsMap for _, svcPortName := range endpointUpdateResult.StaleServiceNames { - if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == api.ProtocolUDP { - glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIP()) - staleServices.Insert(svcInfo.ClusterIP()) + if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.GetProtocol() == api.ProtocolUDP { + glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.GetClusterIP()) + staleServices.Insert(svcInfo.GetClusterIP()) } } @@ -868,20 +749,20 @@ func (proxier *Proxier) syncProxyRules() { glog.Errorf("Failed to cast serviceInfo %q", svcName.String()) continue } - protocol := strings.ToLower(string(svcInfo.protocol)) + protocol := strings.ToLower(string(svcInfo.Protocol)) // Precompute svcNameString; with many services the many calls // to ServicePortName.String() show up in CPU profiles. svcNameString := svcName.String() // Handle traffic that loops back to the originator with SNAT. for _, e := range proxier.endpointsMap[svcName] { - ep, ok := e.(*endpointsInfo) + ep, ok := e.(*proxy.EndpointInfoCommon) if !ok { - glog.Errorf("Failed to cast endpointsInfo %q", e.String()) + glog.Errorf("Failed to cast EndpointInfoCommon %q", e.String()) continue } epIP := ep.IP() - epPort, err := ep.PortPart() + epPort, err := ep.Port() // Error parsing this endpoint has been logged. Skip to next endpoint. if epIP == "" || err != nil { continue @@ -903,8 +784,8 @@ func (proxier *Proxier) syncProxyRules() { // Capture the clusterIP. // ipset call entry := &utilipset.Entry{ - IP: svcInfo.clusterIP.String(), - Port: svcInfo.port, + IP: svcInfo.ClusterIP.String(), + Port: svcInfo.Port, Protocol: protocol, SetType: utilipset.HashIPPort, } @@ -920,15 +801,15 @@ func (proxier *Proxier) syncProxyRules() { } // ipvs call serv := &utilipvs.VirtualServer{ - Address: svcInfo.clusterIP, - Port: uint16(svcInfo.port), - Protocol: string(svcInfo.protocol), + Address: svcInfo.ClusterIP, + Port: uint16(svcInfo.Port), + Protocol: string(svcInfo.Protocol), Scheduler: proxier.ipvsScheduler, } // Set session affinity flag and timeout for IPVS service - if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { + if svcInfo.SessionAffinityType == api.ServiceAffinityClientIP { serv.Flags |= utilipvs.FlagPersistent - serv.Timeout = uint32(svcInfo.stickyMaxAgeSeconds) + serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds) } // We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService() if err := proxier.syncService(svcNameString, serv, true); err == nil { @@ -943,14 +824,14 @@ func (proxier *Proxier) syncProxyRules() { } // Capture externalIPs. - for _, externalIP := range svcInfo.externalIPs { + for _, externalIP := range svcInfo.ExternalIPs { if local, err := utilproxy.IsLocalIP(externalIP); err != nil { glog.Errorf("can't determine if IP is local, assuming not: %v", err) } else if local { lp := utilproxy.LocalPort{ Description: "externalIP for " + svcNameString, IP: externalIP, - Port: svcInfo.port, + Port: svcInfo.Port, Protocol: protocol, } if proxier.portsMap[lp] != nil { @@ -978,7 +859,7 @@ func (proxier *Proxier) syncProxyRules() { // ipset call entry := &utilipset.Entry{ IP: externalIP, - Port: svcInfo.port, + Port: svcInfo.Port, Protocol: protocol, SetType: utilipset.HashIPPort, } @@ -992,18 +873,18 @@ func (proxier *Proxier) syncProxyRules() { // ipvs call serv := &utilipvs.VirtualServer{ Address: net.ParseIP(externalIP), - Port: uint16(svcInfo.port), - Protocol: string(svcInfo.protocol), + Port: uint16(svcInfo.Port), + Protocol: string(svcInfo.Protocol), Scheduler: proxier.ipvsScheduler, } - if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { + if svcInfo.SessionAffinityType == api.ServiceAffinityClientIP { serv.Flags |= utilipvs.FlagPersistent - serv.Timeout = uint32(svcInfo.stickyMaxAgeSeconds) + serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds) } // There is no need to bind externalIP to dummy interface, so set parameter `bindAddr` to `false`. if err := proxier.syncService(svcNameString, serv, false); err == nil { activeIPVSServices[serv.String()] = true - if err := proxier.syncEndpoint(svcName, svcInfo.onlyNodeLocalEndpoints, serv); err != nil { + if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints, serv); err != nil { glog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err) } } else { @@ -1012,12 +893,12 @@ func (proxier *Proxier) syncProxyRules() { } // Capture load-balancer ingress. - for _, ingress := range svcInfo.loadBalancerStatus.Ingress { + for _, ingress := range svcInfo.LoadBalancerStatus.Ingress { if ingress.IP != "" { // ipset call entry = &utilipset.Entry{ IP: ingress.IP, - Port: svcInfo.port, + Port: svcInfo.Port, Protocol: protocol, SetType: utilipset.HashIPPort, } @@ -1025,14 +906,14 @@ func (proxier *Proxier) syncProxyRules() { // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String()) // If we are proxying globally, we need to masquerade in case we cross nodes. // If we are proxying only locally, we can retain the source IP. - if !svcInfo.onlyNodeLocalEndpoints { + if !svcInfo.OnlyNodeLocalEndpoints { if valid := proxier.lbMasqSet.validateEntry(entry); !valid { glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbMasqSet.Name)) continue } proxier.lbMasqSet.activeEntries.Insert(entry.String()) } - if len(svcInfo.loadBalancerSourceRanges) != 0 { + if len(svcInfo.LoadBalancerSourceRanges) != 0 { // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field. // This currently works for loadbalancers that preserves source ips. // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply. @@ -1043,11 +924,11 @@ func (proxier *Proxier) syncProxyRules() { proxier.lbIngressSet.activeEntries.Insert(entry.String()) allowFromNode := false - for _, src := range svcInfo.loadBalancerSourceRanges { + for _, src := range svcInfo.LoadBalancerSourceRanges { // ipset call entry = &utilipset.Entry{ IP: ingress.IP, - Port: svcInfo.port, + Port: svcInfo.Port, Protocol: protocol, Net: src, SetType: utilipset.HashIPPortNet, @@ -1071,7 +952,7 @@ func (proxier *Proxier) syncProxyRules() { if allowFromNode { entry = &utilipset.Entry{ IP: ingress.IP, - Port: svcInfo.port, + Port: svcInfo.Port, Protocol: protocol, IP2: ingress.IP, SetType: utilipset.HashIPPortIP, @@ -1088,18 +969,18 @@ func (proxier *Proxier) syncProxyRules() { // ipvs call serv := &utilipvs.VirtualServer{ Address: net.ParseIP(ingress.IP), - Port: uint16(svcInfo.port), - Protocol: string(svcInfo.protocol), + Port: uint16(svcInfo.Port), + Protocol: string(svcInfo.Protocol), Scheduler: proxier.ipvsScheduler, } - if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { + if svcInfo.SessionAffinityType == api.ServiceAffinityClientIP { serv.Flags |= utilipvs.FlagPersistent - serv.Timeout = uint32(svcInfo.stickyMaxAgeSeconds) + serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds) } // There is no need to bind LB ingress.IP to dummy interface, so set parameter `bindAddr` to `false`. if err := proxier.syncService(svcNameString, serv, false); err == nil { activeIPVSServices[serv.String()] = true - if err := proxier.syncEndpoint(svcName, svcInfo.onlyNodeLocalEndpoints, serv); err != nil { + if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints, serv); err != nil { glog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err) } } else { @@ -1108,11 +989,11 @@ func (proxier *Proxier) syncProxyRules() { } } - if svcInfo.nodePort != 0 { + if svcInfo.NodePort != 0 { lp := utilproxy.LocalPort{ Description: "nodePort for " + svcNameString, IP: "", - Port: svcInfo.nodePort, + Port: svcInfo.NodePort, Protocol: protocol, } if proxier.portsMap[lp] != nil { @@ -1125,7 +1006,7 @@ func (proxier *Proxier) syncProxyRules() { continue } if lp.Protocol == "udp" { - isIPv6 := conntrack.IsIPv6(svcInfo.clusterIP) + isIPv6 := conntrack.IsIPv6(svcInfo.ClusterIP) conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, clientv1.ProtocolUDP) } replacementPortsMap[lp] = socket @@ -1133,10 +1014,10 @@ func (proxier *Proxier) syncProxyRules() { // Nodeports need SNAT, unless they're local. // ipset call - if !svcInfo.onlyNodeLocalEndpoints { + if !svcInfo.OnlyNodeLocalEndpoints { entry = &utilipset.Entry{ // No need to provide ip info - Port: svcInfo.nodePort, + Port: svcInfo.NodePort, Protocol: protocol, SetType: utilipset.BitmapPort, } @@ -1182,18 +1063,18 @@ func (proxier *Proxier) syncProxyRules() { // ipvs call serv := &utilipvs.VirtualServer{ Address: nodeIP, - Port: uint16(svcInfo.nodePort), - Protocol: string(svcInfo.protocol), + Port: uint16(svcInfo.NodePort), + Protocol: string(svcInfo.Protocol), Scheduler: proxier.ipvsScheduler, } - if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { + if svcInfo.SessionAffinityType == api.ServiceAffinityClientIP { serv.Flags |= utilipvs.FlagPersistent - serv.Timeout = uint32(svcInfo.stickyMaxAgeSeconds) + serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds) } // There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`. if err := proxier.syncService(svcNameString, serv, false); err == nil { activeIPVSServices[serv.String()] = true - if err := proxier.syncEndpoint(svcName, svcInfo.onlyNodeLocalEndpoints, serv); err != nil { + if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints, serv); err != nil { glog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err) } } else { @@ -1383,9 +1264,9 @@ func (proxier *Proxier) syncProxyRules() { // This assumes the proxier mutex is held func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) { for _, epSvcPair := range connectionMap { - if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == api.ProtocolUDP { + if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.GetProtocol() == api.ProtocolUDP { endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) - err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP(), endpointIP, clientv1.ProtocolUDP) + err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.GetClusterIP(), endpointIP, clientv1.ProtocolUDP) if err != nil { glog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err) } @@ -1447,14 +1328,9 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode curEndpoints.Insert(des.String()) } - for _, eps := range proxier.endpointsMap[svcPortName] { - epInfo, ok := eps.(*endpointsInfo) - if !ok { - glog.Errorf("Failed to cast endpointsInfo") - continue - } - if !onlyNodeLocalEndpoints || onlyNodeLocalEndpoints && epInfo.isLocal { - newEndpoints.Insert(epInfo.endpoint) + for _, epInfo := range proxier.endpointsMap[svcPortName] { + if !onlyNodeLocalEndpoints || onlyNodeLocalEndpoints && epInfo.GetIsLocal() { + newEndpoints.Insert(epInfo.String()) } } diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index a26e7a38b4..1b5480c21b 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -33,7 +33,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing" - proxyutil "k8s.io/kubernetes/pkg/proxy/util" + utilproxy "k8s.io/kubernetes/pkg/proxy/util" proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing" utilipset "k8s.io/kubernetes/pkg/util/ipset" ipsettest "k8s.io/kubernetes/pkg/util/ipset/testing" @@ -67,12 +67,12 @@ func newFakeHealthChecker() *fakeHealthChecker { // fakePortOpener implements portOpener. type fakePortOpener struct { - openPorts []*proxyutil.LocalPort + openPorts []*utilproxy.LocalPort } // OpenLocalPort fakes out the listen() and bind() used by syncProxyRules // to lock a local port. -func (f *fakePortOpener) OpenLocalPort(lp *proxyutil.LocalPort) (proxyutil.Closeable, error) { +func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort) (utilproxy.Closeable, error) { f.openPorts = append(f.openPorts, lp) return nil, nil } @@ -121,16 +121,16 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u return &Proxier{ exec: fexec, serviceMap: make(proxy.ServiceMap), - serviceChanges: proxy.NewServiceChangeTracker(), + serviceChanges: proxy.NewServiceChangeTracker(customizeServiceInfo, nil, nil), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointChangeTracker(testHostname), + endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, nil, nil), iptables: ipt, ipvs: ipvs, ipset: ipset, clusterCIDR: "10.0.0.0/24", hostname: testHostname, - portsMap: make(map[proxyutil.LocalPort]proxyutil.Closeable), - portMapper: &fakePortOpener{[]*proxyutil.LocalPort{}}, + portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), + portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}}, healthChecker: newFakeHealthChecker(), ipvsScheduler: DefaultScheduler, ipGetter: &fakeIPGetter{nodeIPs: nodeIPs}, @@ -1581,15 +1581,15 @@ func Test_updateEndpointsMap(t *testing.T) { // or non-nil) and must be of equal length. previousEndpoints []*api.Endpoints currentEndpoints []*api.Endpoints - oldEndpoints map[proxy.ServicePortName][]*endpointsInfo - expectedResult map[proxy.ServicePortName][]*endpointsInfo + oldEndpoints map[proxy.ServicePortName][]*proxy.EndpointInfoCommon + expectedResult map[proxy.ServicePortName][]*proxy.EndpointInfoCommon expectedStaleEndpoints []proxy.ServiceEndpoint expectedStaleServiceNames map[proxy.ServicePortName]bool expectedHealthchecks map[types.NamespacedName]int }{{ // Case[0]: nothing - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, + oldEndpoints: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{}, + expectedResult: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{}, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedHealthchecks: map[types.NamespacedName]int{}, @@ -1601,14 +1601,14 @@ func Test_updateEndpointsMap(t *testing.T) { currentEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", unnamedPort), }, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{ makeServicePortName("ns1", "ep1", ""): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{ makeServicePortName("ns1", "ep1", ""): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, @@ -1622,14 +1622,14 @@ func Test_updateEndpointsMap(t *testing.T) { currentEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", namedPortLocal), }, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: true}, + {Endpoint: "1.1.1.1:11", IsLocal: true}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: true}, + {Endpoint: "1.1.1.1:11", IsLocal: true}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, @@ -1645,20 +1645,20 @@ func Test_updateEndpointsMap(t *testing.T) { currentEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", multipleSubsets), }, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.2:12", isLocal: false}, + {Endpoint: "1.1.1.2:12", IsLocal: false}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.2:12", isLocal: false}, + {Endpoint: "1.1.1.2:12", IsLocal: false}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, @@ -1672,26 +1672,26 @@ func Test_updateEndpointsMap(t *testing.T) { currentEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", multipleSubsetsMultiplePortsLocal), }, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: true}, + {Endpoint: "1.1.1.1:11", IsLocal: true}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.1:12", isLocal: true}, + {Endpoint: "1.1.1.1:12", IsLocal: true}, }, makeServicePortName("ns1", "ep1", "p13"): { - {endpoint: "1.1.1.3:13", isLocal: false}, + {Endpoint: "1.1.1.3:13", IsLocal: false}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: true}, + {Endpoint: "1.1.1.1:11", IsLocal: true}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.1:12", isLocal: true}, + {Endpoint: "1.1.1.1:12", IsLocal: true}, }, makeServicePortName("ns1", "ep1", "p13"): { - {endpoint: "1.1.1.3:13", isLocal: false}, + {Endpoint: "1.1.1.3:13", IsLocal: false}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, @@ -1709,56 +1709,56 @@ func Test_updateEndpointsMap(t *testing.T) { makeTestEndpoints("ns1", "ep1", multipleSubsetsIPsPorts1), makeTestEndpoints("ns2", "ep2", multipleSubsetsIPsPorts2), }, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, - {endpoint: "1.1.1.2:11", isLocal: true}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, + {Endpoint: "1.1.1.2:11", IsLocal: true}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.1:12", isLocal: false}, - {endpoint: "1.1.1.2:12", isLocal: true}, + {Endpoint: "1.1.1.1:12", IsLocal: false}, + {Endpoint: "1.1.1.2:12", IsLocal: true}, }, makeServicePortName("ns1", "ep1", "p13"): { - {endpoint: "1.1.1.3:13", isLocal: false}, - {endpoint: "1.1.1.4:13", isLocal: true}, + {Endpoint: "1.1.1.3:13", IsLocal: false}, + {Endpoint: "1.1.1.4:13", IsLocal: true}, }, makeServicePortName("ns1", "ep1", "p14"): { - {endpoint: "1.1.1.3:14", isLocal: false}, - {endpoint: "1.1.1.4:14", isLocal: true}, + {Endpoint: "1.1.1.3:14", IsLocal: false}, + {Endpoint: "1.1.1.4:14", IsLocal: true}, }, makeServicePortName("ns2", "ep2", "p21"): { - {endpoint: "2.2.2.1:21", isLocal: false}, - {endpoint: "2.2.2.2:21", isLocal: true}, + {Endpoint: "2.2.2.1:21", IsLocal: false}, + {Endpoint: "2.2.2.2:21", IsLocal: true}, }, makeServicePortName("ns2", "ep2", "p22"): { - {endpoint: "2.2.2.1:22", isLocal: false}, - {endpoint: "2.2.2.2:22", isLocal: true}, + {Endpoint: "2.2.2.1:22", IsLocal: false}, + {Endpoint: "2.2.2.2:22", IsLocal: true}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, - {endpoint: "1.1.1.2:11", isLocal: true}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, + {Endpoint: "1.1.1.2:11", IsLocal: true}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.1:12", isLocal: false}, - {endpoint: "1.1.1.2:12", isLocal: true}, + {Endpoint: "1.1.1.1:12", IsLocal: false}, + {Endpoint: "1.1.1.2:12", IsLocal: true}, }, makeServicePortName("ns1", "ep1", "p13"): { - {endpoint: "1.1.1.3:13", isLocal: false}, - {endpoint: "1.1.1.4:13", isLocal: true}, + {Endpoint: "1.1.1.3:13", IsLocal: false}, + {Endpoint: "1.1.1.4:13", IsLocal: true}, }, makeServicePortName("ns1", "ep1", "p14"): { - {endpoint: "1.1.1.3:14", isLocal: false}, - {endpoint: "1.1.1.4:14", isLocal: true}, + {Endpoint: "1.1.1.3:14", IsLocal: false}, + {Endpoint: "1.1.1.4:14", IsLocal: true}, }, makeServicePortName("ns2", "ep2", "p21"): { - {endpoint: "2.2.2.1:21", isLocal: false}, - {endpoint: "2.2.2.2:21", isLocal: true}, + {Endpoint: "2.2.2.1:21", IsLocal: false}, + {Endpoint: "2.2.2.2:21", IsLocal: true}, }, makeServicePortName("ns2", "ep2", "p22"): { - {endpoint: "2.2.2.1:22", isLocal: false}, - {endpoint: "2.2.2.2:22", isLocal: true}, + {Endpoint: "2.2.2.1:22", IsLocal: false}, + {Endpoint: "2.2.2.2:22", IsLocal: true}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, @@ -1775,10 +1775,10 @@ func Test_updateEndpointsMap(t *testing.T) { currentEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", unnamedPortLocal), }, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{}, + expectedResult: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{ makeServicePortName("ns1", "ep1", ""): { - {endpoint: "1.1.1.1:11", isLocal: true}, + {Endpoint: "1.1.1.1:11", IsLocal: true}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, @@ -1796,12 +1796,12 @@ func Test_updateEndpointsMap(t *testing.T) { currentEndpoints: []*api.Endpoints{ nil, }, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{ makeServicePortName("ns1", "ep1", ""): { - {endpoint: "1.1.1.1:11", isLocal: true}, + {Endpoint: "1.1.1.1:11", IsLocal: true}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedResult: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{}, expectedStaleEndpoints: []proxy.ServiceEndpoint{{ Endpoint: "1.1.1.1:11", ServicePortName: makeServicePortName("ns1", "ep1", ""), @@ -1816,19 +1816,19 @@ func Test_updateEndpointsMap(t *testing.T) { currentEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", namedPortsLocalNoLocal), }, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, - {endpoint: "1.1.1.2:11", isLocal: true}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, + {Endpoint: "1.1.1.2:11", IsLocal: true}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.1:12", isLocal: false}, - {endpoint: "1.1.1.2:12", isLocal: true}, + {Endpoint: "1.1.1.1:12", IsLocal: false}, + {Endpoint: "1.1.1.2:12", IsLocal: true}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, @@ -1846,19 +1846,19 @@ func Test_updateEndpointsMap(t *testing.T) { currentEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", namedPort), }, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, - {endpoint: "1.1.1.2:11", isLocal: true}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, + {Endpoint: "1.1.1.2:11", IsLocal: true}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.1:12", isLocal: false}, - {endpoint: "1.1.1.2:12", isLocal: true}, + {Endpoint: "1.1.1.1:12", IsLocal: false}, + {Endpoint: "1.1.1.2:12", IsLocal: true}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{{ @@ -1881,17 +1881,17 @@ func Test_updateEndpointsMap(t *testing.T) { currentEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", multipleSubsetsWithLocal), }, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.2:12", isLocal: true}, + {Endpoint: "1.1.1.2:12", IsLocal: true}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, @@ -1909,17 +1909,17 @@ func Test_updateEndpointsMap(t *testing.T) { currentEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", namedPort), }, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.2:12", isLocal: false}, + {Endpoint: "1.1.1.2:12", IsLocal: false}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{{ @@ -1936,14 +1936,14 @@ func Test_updateEndpointsMap(t *testing.T) { currentEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", namedPortRenamed), }, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11-2"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{{ @@ -1962,14 +1962,14 @@ func Test_updateEndpointsMap(t *testing.T) { currentEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", namedPortRenumbered), }, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:22", isLocal: false}, + {Endpoint: "1.1.1.1:22", IsLocal: false}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{{ @@ -1992,41 +1992,41 @@ func Test_updateEndpointsMap(t *testing.T) { makeTestEndpoints("ns3", "ep3", complexAfter3), makeTestEndpoints("ns4", "ep4", complexAfter4), }, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, makeServicePortName("ns2", "ep2", "p22"): { - {endpoint: "2.2.2.2:22", isLocal: true}, - {endpoint: "2.2.2.22:22", isLocal: true}, + {Endpoint: "2.2.2.2:22", IsLocal: true}, + {Endpoint: "2.2.2.22:22", IsLocal: true}, }, makeServicePortName("ns2", "ep2", "p23"): { - {endpoint: "2.2.2.3:23", isLocal: true}, + {Endpoint: "2.2.2.3:23", IsLocal: true}, }, makeServicePortName("ns4", "ep4", "p44"): { - {endpoint: "4.4.4.4:44", isLocal: true}, - {endpoint: "4.4.4.5:44", isLocal: true}, + {Endpoint: "4.4.4.4:44", IsLocal: true}, + {Endpoint: "4.4.4.5:44", IsLocal: true}, }, makeServicePortName("ns4", "ep4", "p45"): { - {endpoint: "4.4.4.6:45", isLocal: true}, + {Endpoint: "4.4.4.6:45", IsLocal: true}, }, }, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + expectedResult: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{ makeServicePortName("ns1", "ep1", "p11"): { - {endpoint: "1.1.1.1:11", isLocal: false}, - {endpoint: "1.1.1.11:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, + {Endpoint: "1.1.1.11:11", IsLocal: false}, }, makeServicePortName("ns1", "ep1", "p12"): { - {endpoint: "1.1.1.2:12", isLocal: false}, + {Endpoint: "1.1.1.2:12", IsLocal: false}, }, makeServicePortName("ns1", "ep1", "p122"): { - {endpoint: "1.1.1.2:122", isLocal: false}, + {Endpoint: "1.1.1.2:122", IsLocal: false}, }, makeServicePortName("ns3", "ep3", "p33"): { - {endpoint: "3.3.3.3:33", isLocal: false}, + {Endpoint: "3.3.3.3:33", IsLocal: false}, }, makeServicePortName("ns4", "ep4", "p44"): { - {endpoint: "4.4.4.4:44", isLocal: true}, + {Endpoint: "4.4.4.4:44", IsLocal: true}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{{ @@ -2061,10 +2061,10 @@ func Test_updateEndpointsMap(t *testing.T) { currentEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", unnamedPort), }, - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + oldEndpoints: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{}, + expectedResult: map[proxy.ServicePortName][]*proxy.EndpointInfoCommon{ makeServicePortName("ns1", "ep1", ""): { - {endpoint: "1.1.1.1:11", isLocal: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, @@ -2148,7 +2148,7 @@ func Test_updateEndpointsMap(t *testing.T) { } } -func compareEndpointsMaps(t *testing.T, tci int, newMap proxy.EndpointsMap, expected map[proxy.ServicePortName][]*endpointsInfo) { +func compareEndpointsMaps(t *testing.T, tci int, newMap proxy.EndpointsMap, expected map[proxy.ServicePortName][]*proxy.EndpointInfoCommon) { if len(newMap) != len(expected) { t.Errorf("[%d] expected %d results, got %d: %v", tci, len(expected), len(newMap), newMap) } @@ -2157,9 +2157,9 @@ func compareEndpointsMaps(t *testing.T, tci int, newMap proxy.EndpointsMap, expe t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(expected[x]), x, len(newMap[x])) } else { for i := range expected[x] { - newEp, ok := newMap[x][i].(*endpointsInfo) + newEp, ok := newMap[x][i].(*proxy.EndpointInfoCommon) if !ok { - t.Errorf("Failed to cast endpointsInfo") + t.Errorf("Failed to cast proxy.EndpointInfoCommon") continue } if *newEp != *(expected[x][i]) { diff --git a/pkg/proxy/service.go b/pkg/proxy/service.go index 82407ab197..515bdaa130 100644 --- a/pkg/proxy/service.go +++ b/pkg/proxy/service.go @@ -17,6 +17,8 @@ limitations under the License. package proxy import ( + "fmt" + "net" "reflect" "sync" @@ -24,10 +26,91 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/record" + apiservice "k8s.io/kubernetes/pkg/api/service" api "k8s.io/kubernetes/pkg/apis/core" - proxyutil "k8s.io/kubernetes/pkg/proxy/util" + "k8s.io/kubernetes/pkg/apis/core/helper" + utilproxy "k8s.io/kubernetes/pkg/proxy/util" ) +// ServiceInfoCommon contains common service information. +type ServiceInfoCommon struct { + ClusterIP net.IP + Port int + Protocol api.Protocol + NodePort int + LoadBalancerStatus api.LoadBalancerStatus + SessionAffinityType api.ServiceAffinity + StickyMaxAgeSeconds int + ExternalIPs []string + LoadBalancerSourceRanges []string + HealthCheckNodePort int + OnlyNodeLocalEndpoints bool +} + +var _ ServicePort = &ServiceInfoCommon{} + +// String is part of ServicePort interface. +func (info *ServiceInfoCommon) String() string { + return fmt.Sprintf("%s:%d/%s", info.ClusterIP, info.Port, info.Protocol) +} + +// GetClusterIP is part of ServicePort interface. +func (info *ServiceInfoCommon) GetClusterIP() string { + return info.ClusterIP.String() +} + +// GetProtocol is part of ServicePort interface. +func (info *ServiceInfoCommon) GetProtocol() api.Protocol { + return info.Protocol +} + +// GetHealthCheckNodePort is part of ServicePort interface. +func (info *ServiceInfoCommon) GetHealthCheckNodePort() int { + return info.HealthCheckNodePort +} + +func (sct *ServiceChangeTracker) newServiceInfoCommon(port *api.ServicePort, service *api.Service) *ServiceInfoCommon { + onlyNodeLocalEndpoints := false + if apiservice.RequestsOnlyLocalTraffic(service) { + onlyNodeLocalEndpoints = true + } + var stickyMaxAgeSeconds int + if service.Spec.SessionAffinity == api.ServiceAffinityClientIP { + // Kube-apiserver side guarantees SessionAffinityConfig won't be nil when session affinity type is ClientIP + stickyMaxAgeSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds) + } + info := &ServiceInfoCommon{ + ClusterIP: net.ParseIP(service.Spec.ClusterIP), + Port: int(port.Port), + Protocol: port.Protocol, + NodePort: int(port.NodePort), + // Deep-copy in case the service instance changes + LoadBalancerStatus: *helper.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer), + SessionAffinityType: service.Spec.SessionAffinity, + StickyMaxAgeSeconds: stickyMaxAgeSeconds, + OnlyNodeLocalEndpoints: onlyNodeLocalEndpoints, + } + + info.ExternalIPs = make([]string, len(service.Spec.ExternalIPs)) + info.LoadBalancerSourceRanges = make([]string, len(service.Spec.LoadBalancerSourceRanges)) + copy(info.LoadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges) + copy(info.ExternalIPs, service.Spec.ExternalIPs) + + if apiservice.NeedsHealthCheck(service) { + p := service.Spec.HealthCheckNodePort + if p == 0 { + glog.Errorf("Service %s/%s has no healthcheck nodeport", service.Namespace, service.Name) + } else { + info.HealthCheckNodePort = int(p) + } + } + + return info +} + +type customizeServiceInfoFunc func(*api.ServicePort, *api.Service, *ServiceInfoCommon) ServicePort + // serviceChange contains all changes to services that happened since proxy rules were synced. For a single object, // changes are accumulated, i.e. previous is state from before applying the changes, // current is state after applying all of the changes. @@ -43,12 +126,20 @@ type ServiceChangeTracker struct { lock sync.Mutex // items maps a service to its serviceChange. items map[types.NamespacedName]*serviceChange + // customizeServiceInfo allows proxier to inject customized infomation when processing service. + customizeServiceInfo customizeServiceInfoFunc + // isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable. + isIPv6Mode *bool + recorder record.EventRecorder } // NewServiceChangeTracker initializes a ServiceChangeTracker -func NewServiceChangeTracker() *ServiceChangeTracker { +func NewServiceChangeTracker(customizeServiceInfo customizeServiceInfoFunc, isIPv6Mode *bool, recorder record.EventRecorder) *ServiceChangeTracker { return &ServiceChangeTracker{ - items: make(map[types.NamespacedName]*serviceChange), + items: make(map[types.NamespacedName]*serviceChange), + customizeServiceInfo: customizeServiceInfo, + isIPv6Mode: isIPv6Mode, + recorder: recorder, } } @@ -60,10 +151,7 @@ func NewServiceChangeTracker() *ServiceChangeTracker { // - pass as the pair. // Delete item // - pass as the pair. -// -// makeServicePort() return a proxy.ServicePort based on the given Service and its ServicePort. We inject makeServicePort() -// so that giving caller side a chance to initialize proxy.ServicePort interface. -func (sct *ServiceChangeTracker) Update(previous, current *api.Service, makeServicePort func(servicePort *api.ServicePort, service *api.Service) ServicePort) bool { +func (sct *ServiceChangeTracker) Update(previous, current *api.Service) bool { svc := current if svc == nil { svc = previous @@ -80,10 +168,10 @@ func (sct *ServiceChangeTracker) Update(previous, current *api.Service, makeServ change, exists := sct.items[namespacedName] if !exists { change = &serviceChange{} - change.previous = serviceToServiceMap(previous, makeServicePort) + change.previous = sct.serviceToServiceMap(previous) sct.items[namespacedName] = change } - change.current = serviceToServiceMap(current, makeServicePort) + change.current = sct.serviceToServiceMap(current) // if change.previous equal to change.current, it means no change if reflect.DeepEqual(change.previous, change.current) { delete(sct.items, namespacedName) @@ -110,28 +198,26 @@ func UpdateServiceMap(serviceMap ServiceMap, changes *ServiceChangeTracker) (res // computing this incrementally similarly to serviceMap. result.HCServiceNodePorts = make(map[types.NamespacedName]uint16) for svcPortName, info := range serviceMap { - if info.HealthCheckNodePort() != 0 { - result.HCServiceNodePorts[svcPortName.NamespacedName] = uint16(info.HealthCheckNodePort()) + if info.GetHealthCheckNodePort() != 0 { + result.HCServiceNodePorts[svcPortName.NamespacedName] = uint16(info.GetHealthCheckNodePort()) } } return result } -// ServiceMap maps a service to its ServicePort information. +// ServiceMap maps a service to its ServicePort. type ServiceMap map[ServicePortName]ServicePort // serviceToServiceMap translates a single Service object to a ServiceMap. -// makeServicePort() return a proxy.ServicePort based on the given Service and its ServicePort. We inject makeServicePort() -// so that giving caller side a chance to initialize proxy.ServicePort interface. // // NOTE: service object should NOT be modified. -func serviceToServiceMap(service *api.Service, makeServicePort func(servicePort *api.ServicePort, service *api.Service) ServicePort) ServiceMap { +func (sct *ServiceChangeTracker) serviceToServiceMap(service *api.Service) ServiceMap { if service == nil { return nil } svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if proxyutil.ShouldSkipService(svcName, service) { + if utilproxy.ShouldSkipService(svcName, service) { return nil } @@ -139,7 +225,12 @@ func serviceToServiceMap(service *api.Service, makeServicePort func(servicePort for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i] svcPortName := ServicePortName{NamespacedName: svcName, Port: servicePort.Name} - serviceMap[svcPortName] = makeServicePort(servicePort, service) + svcInfoCommon := sct.newServiceInfoCommon(servicePort, service) + if sct.customizeServiceInfo != nil { + serviceMap[svcPortName] = sct.customizeServiceInfo(servicePort, service, svcInfoCommon) + } else { + serviceMap[svcPortName] = svcInfoCommon + } } return serviceMap } @@ -213,8 +304,8 @@ func (sm *ServiceMap) unmerge(other ServiceMap, UDPStaleClusterIP sets.String) { info, exists := (*sm)[svcPortName] if exists { glog.V(1).Infof("Removing service port %q", svcPortName) - if info.Protocol() == api.ProtocolUDP { - UDPStaleClusterIP.Insert(info.ClusterIP()) + if info.GetProtocol() == api.ProtocolUDP { + UDPStaleClusterIP.Insert(info.GetClusterIP()) } delete(*sm, svcPortName) } else { diff --git a/pkg/proxy/service_test.go b/pkg/proxy/service_test.go index ed2ad7d556..0a24b1c0fe 100644 --- a/pkg/proxy/service_test.go +++ b/pkg/proxy/service_test.go @@ -17,9 +17,7 @@ limitations under the License. package proxy import ( - "fmt" "net" - "reflect" "testing" "github.com/davecgh/go-spew/spew" @@ -27,59 +25,19 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" - apiservice "k8s.io/kubernetes/pkg/api/service" api "k8s.io/kubernetes/pkg/apis/core" ) const testHostname = "test-hostname" -// fake implementation for service info. -type fakeServiceInfo struct { - clusterIP net.IP - port int - protocol api.Protocol - healthCheckNodePort int -} - -func (f *fakeServiceInfo) String() string { - return fmt.Sprintf("%s:%d/%s", f.clusterIP, f.port, f.protocol) -} - -func (f *fakeServiceInfo) ClusterIP() string { - return f.clusterIP.String() -} - -func (f *fakeServiceInfo) Protocol() api.Protocol { - return f.protocol -} - -func (f *fakeServiceInfo) HealthCheckNodePort() int { - return f.healthCheckNodePort -} - -func makeTestServiceInfo(clusterIP string, port int, protocol string, healthcheckNodePort int) *fakeServiceInfo { - info := &fakeServiceInfo{ - clusterIP: net.ParseIP(clusterIP), - port: port, - protocol: api.Protocol(protocol), +func makeTestServiceInfo(clusterIP string, port int, protocol string, healthcheckNodePort int) *ServiceInfoCommon { + info := &ServiceInfoCommon{ + ClusterIP: net.ParseIP(clusterIP), + Port: port, + Protocol: api.Protocol(protocol), } if healthcheckNodePort != 0 { - info.healthCheckNodePort = healthcheckNodePort - } - return info -} - -func newFakeServiceInfo(servicePort *api.ServicePort, service *api.Service) ServicePort { - info := &fakeServiceInfo{ - clusterIP: net.ParseIP(service.Spec.ClusterIP), - port: int(servicePort.Port), - protocol: servicePort.Protocol, - } - if apiservice.NeedsHealthCheck(service) { - p := service.Spec.HealthCheckNodePort - if p != 0 { - info.healthCheckNodePort = int(p) - } + info.HealthCheckNodePort = healthcheckNodePort } return info } @@ -120,15 +78,17 @@ func makeServicePortName(ns, name, port string) ServicePortName { } } -func Test_serviceToServiceMap(t *testing.T) { +func TestServiceToServiceMap(t *testing.T) { + svcTracker := NewServiceChangeTracker(nil, nil, nil) + testCases := []struct { service *api.Service - expected map[ServicePortName]*fakeServiceInfo + expected map[ServicePortName]*ServiceInfoCommon }{ { // Case[0]: nothing service: nil, - expected: map[ServicePortName]*fakeServiceInfo{}, + expected: map[ServicePortName]*ServiceInfoCommon{}, }, { // Case[1]: headless service @@ -137,7 +97,7 @@ func Test_serviceToServiceMap(t *testing.T) { svc.Spec.ClusterIP = api.ClusterIPNone svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0) }), - expected: map[ServicePortName]*fakeServiceInfo{}, + expected: map[ServicePortName]*ServiceInfoCommon{}, }, { // Case[2]: headless service without port @@ -145,7 +105,7 @@ func Test_serviceToServiceMap(t *testing.T) { svc.Spec.Type = api.ServiceTypeClusterIP svc.Spec.ClusterIP = api.ClusterIPNone }), - expected: map[ServicePortName]*fakeServiceInfo{}, + expected: map[ServicePortName]*ServiceInfoCommon{}, }, { // Case[3]: cluster ip service @@ -155,7 +115,7 @@ func Test_serviceToServiceMap(t *testing.T) { svc.Spec.Ports = addTestPort(svc.Spec.Ports, "p1", "UDP", 1234, 4321, 0) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "p2", "UDP", 1235, 5321, 0) }), - expected: map[ServicePortName]*fakeServiceInfo{ + expected: map[ServicePortName]*ServiceInfoCommon{ makeServicePortName("ns2", "cluster-ip", "p1"): makeTestServiceInfo("172.16.55.4", 1234, "UDP", 0), makeServicePortName("ns2", "cluster-ip", "p2"): makeTestServiceInfo("172.16.55.4", 1235, "UDP", 0), }, @@ -168,7 +128,7 @@ func Test_serviceToServiceMap(t *testing.T) { svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port1", "UDP", 345, 678, 0) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "port2", "TCP", 344, 677, 0) }), - expected: map[ServicePortName]*fakeServiceInfo{ + expected: map[ServicePortName]*ServiceInfoCommon{ makeServicePortName("ns2", "node-port", "port1"): makeTestServiceInfo("172.16.55.10", 345, "UDP", 0), makeServicePortName("ns2", "node-port", "port2"): makeTestServiceInfo("172.16.55.10", 344, "TCP", 0), }, @@ -187,7 +147,7 @@ func Test_serviceToServiceMap(t *testing.T) { }, } }), - expected: map[ServicePortName]*fakeServiceInfo{ + expected: map[ServicePortName]*ServiceInfoCommon{ makeServicePortName("ns1", "load-balancer", "port3"): makeTestServiceInfo("172.16.55.11", 8675, "UDP", 0), makeServicePortName("ns1", "load-balancer", "port4"): makeTestServiceInfo("172.16.55.11", 8676, "UDP", 0), }, @@ -208,7 +168,7 @@ func Test_serviceToServiceMap(t *testing.T) { svc.Spec.ExternalTrafficPolicy = api.ServiceExternalTrafficPolicyTypeLocal svc.Spec.HealthCheckNodePort = 345 }), - expected: map[ServicePortName]*fakeServiceInfo{ + expected: map[ServicePortName]*ServiceInfoCommon{ makeServicePortName("ns1", "only-local-load-balancer", "portx"): makeTestServiceInfo("172.16.55.12", 8677, "UDP", 345), makeServicePortName("ns1", "only-local-load-balancer", "porty"): makeTestServiceInfo("172.16.55.12", 8678, "UDP", 345), }, @@ -221,21 +181,24 @@ func Test_serviceToServiceMap(t *testing.T) { svc.Spec.ExternalName = "foo2.bar.com" svc.Spec.Ports = addTestPort(svc.Spec.Ports, "portz", "UDP", 1235, 5321, 0) }), - expected: map[ServicePortName]*fakeServiceInfo{}, + expected: map[ServicePortName]*ServiceInfoCommon{}, }, } for tci, tc := range testCases { // outputs - newServices := serviceToServiceMap(tc.service, newFakeServiceInfo) + newServices := svcTracker.serviceToServiceMap(tc.service) if len(newServices) != len(tc.expected) { t.Errorf("[%d] expected %d new, got %d: %v", tci, len(tc.expected), len(newServices), spew.Sdump(newServices)) } for x := range tc.expected { - svc := newServices[x].(*fakeServiceInfo) - if !reflect.DeepEqual(svc, tc.expected[x]) { - t.Errorf("[%d] expected new[%v]to be %v, got %v", tci, x, tc.expected[x], *svc) + svcInfo := newServices[x].(*ServiceInfoCommon) + if !svcInfo.ClusterIP.Equal(tc.expected[x].ClusterIP) || + svcInfo.Port != tc.expected[x].Port || + svcInfo.Protocol != tc.expected[x].Protocol || + svcInfo.HealthCheckNodePort != tc.expected[x].HealthCheckNodePort { + t.Errorf("[%d] expected new[%v]to be %v, got %v", tci, x, tc.expected[x], *svcInfo) } } } @@ -252,9 +215,9 @@ type FakeProxier struct { func newFakeProxier() *FakeProxier { return &FakeProxier{ serviceMap: make(ServiceMap), - serviceChanges: NewServiceChangeTracker(), + serviceChanges: NewServiceChangeTracker(nil, nil, nil), endpointsMap: make(EndpointsMap), - endpointsChanges: NewEndpointChangeTracker(testHostname), + endpointsChanges: NewEndpointChangeTracker(testHostname, nil, nil, nil), } } @@ -265,30 +228,15 @@ func makeServiceMap(fake *FakeProxier, allServices ...*api.Service) { } func (fake *FakeProxier) addService(service *api.Service) { - fake.serviceChanges.Update(nil, service, makeServicePort) + fake.serviceChanges.Update(nil, service) } func (fake *FakeProxier) updateService(oldService *api.Service, service *api.Service) { - fake.serviceChanges.Update(oldService, service, makeServicePort) + fake.serviceChanges.Update(oldService, service) } func (fake *FakeProxier) deleteService(service *api.Service) { - fake.serviceChanges.Update(service, nil, makeServicePort) -} - -func makeServicePort(port *api.ServicePort, service *api.Service) ServicePort { - info := &fakeServiceInfo{ - clusterIP: net.ParseIP(service.Spec.ClusterIP), - port: int(port.Port), - protocol: port.Protocol, - } - if apiservice.NeedsHealthCheck(service) { - p := service.Spec.HealthCheckNodePort - if p != 0 { - info.healthCheckNodePort = int(p) - } - } - return info + fake.serviceChanges.Update(service, nil) } func TestUpdateServiceMapHeadless(t *testing.T) { diff --git a/pkg/proxy/types.go b/pkg/proxy/types.go index 613c570897..438635639d 100644 --- a/pkg/proxy/types.go +++ b/pkg/proxy/types.go @@ -48,23 +48,26 @@ func (spn ServicePortName) String() string { type ServicePort interface { // String returns service string. An example format can be: `IP:Port/Protocol`. String() string - // ClusterIP returns service cluster IP. - ClusterIP() string - // Protocol returns service protocol. - Protocol() api.Protocol - // HealthCheckNodePort returns service health check node port if present. If return 0, it means not present. - HealthCheckNodePort() int + // GetClusterIP returns service cluster IP. + GetClusterIP() string + // GetProtocol returns service protocol. + GetProtocol() api.Protocol + // GetHealthCheckNodePort returns service health check node port if present. If return 0, it means not present. + GetHealthCheckNodePort() int } // Endpoint in an interface which abstracts information about an endpoint. +// TODO: Rename functions to be consistent with ServicePort. type Endpoint interface { // String returns endpoint string. An example format can be: `IP:Port`. // We take the returned value as ServiceEndpoint.Endpoint. String() string - // IsLocal returns true if the endpoint is running in same host as kube-proxy, otherwise returns false. - IsLocal() bool - // IP returns IP part of endpoints. + // GetIsLocal returns true if the endpoint is running in same host as kube-proxy, otherwise returns false. + GetIsLocal() bool + // IP returns IP part of the endpoint. IP() string + // Port returns the Port part of the endpoint. + Port() (int, error) // Equal checks if two endpoints are equal. Equal(Endpoint) bool }