From 4e0f63b688891df26b1e138a329c2096051871b8 Mon Sep 17 00:00:00 2001 From: Kalya Subramanian Date: Mon, 20 May 2019 21:55:01 +0000 Subject: [PATCH] Set loadbalancer flags for DSR --- pkg/proxy/winkernel/hnsV1.go | 10 +++---- pkg/proxy/winkernel/hnsV2.go | 28 ++++++++++++++++--- pkg/proxy/winkernel/hns_test.go | 12 ++++----- pkg/proxy/winkernel/proxier.go | 42 ++++++++++++++++++++++------- pkg/proxy/winkernel/proxier_test.go | 2 +- 5 files changed, 68 insertions(+), 26 deletions(-) diff --git a/pkg/proxy/winkernel/hnsV1.go b/pkg/proxy/winkernel/hnsV1.go index 19aa4ecd0b..2926edb004 100644 --- a/pkg/proxy/winkernel/hnsV1.go +++ b/pkg/proxy/winkernel/hnsV1.go @@ -33,7 +33,7 @@ type HostNetworkService interface { getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error) createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error) deleteEndpoint(hnsID string) error - getLoadBalancer(endpoints []endpointsInfo, isILB bool, isDSR bool, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) + getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) deleteLoadBalancer(hnsID string) error } @@ -148,13 +148,13 @@ func (hns hnsV1) deleteEndpoint(hnsID string) error { return err } -func (hns hnsV1) getLoadBalancer(endpoints []endpointsInfo, isILB bool, isDSR bool, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) { +func (hns hnsV1) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) { plists, err := hcsshim.HNSListPolicyListRequest() if err != nil { return nil, err } - if isDSR { + if flags.isDSR { klog.V(3).Info("DSR is not supported in V1. Using non DSR instead") } @@ -167,7 +167,7 @@ func (hns hnsV1) getLoadBalancer(endpoints []endpointsInfo, isILB bool, isDSR bo if err = json.Unmarshal(plist.Policies[0], &elbPolicy); err != nil { continue } - if elbPolicy.Protocol == protocol && elbPolicy.InternalPort == internalPort && elbPolicy.ExternalPort == externalPort && elbPolicy.ILB == isILB { + if elbPolicy.Protocol == protocol && elbPolicy.InternalPort == internalPort && elbPolicy.ExternalPort == externalPort && elbPolicy.ILB == flags.isILB { if len(vip) > 0 { if len(elbPolicy.VIPs) == 0 || elbPolicy.VIPs[0] != vip { continue @@ -190,7 +190,7 @@ func (hns hnsV1) getLoadBalancer(endpoints []endpointsInfo, isILB bool, isDSR bo } lb, err := hcsshim.AddLoadBalancer( hnsEndpoints, - isILB, + flags.isILB, sourceVip, vip, protocol, diff --git a/pkg/proxy/winkernel/hnsV2.go b/pkg/proxy/winkernel/hnsV2.go index c666f5164f..fe64549304 100644 --- a/pkg/proxy/winkernel/hnsV2.go +++ b/pkg/proxy/winkernel/hnsV2.go @@ -169,7 +169,7 @@ func (hns hnsV2) deleteEndpoint(hnsID string) error { } return err } -func (hns hnsV2) getLoadBalancer(endpoints []endpointsInfo, isILB bool, isDSR bool, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) { +func (hns hnsV2) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) { plists, err := hcn.ListLoadBalancers() if err != nil { return nil, err @@ -181,7 +181,7 @@ func (hns hnsV2) getLoadBalancer(endpoints []endpointsInfo, isILB bool, isDSR bo } // Validate if input meets any of the policy lists lbPortMapping := plist.PortMappings[0] - if lbPortMapping.Protocol == uint32(protocol) && lbPortMapping.InternalPort == internalPort && lbPortMapping.ExternalPort == externalPort && (lbPortMapping.Flags&1 != 0) == isILB { + if lbPortMapping.Protocol == uint32(protocol) && lbPortMapping.InternalPort == internalPort && lbPortMapping.ExternalPort == externalPort && (lbPortMapping.Flags&1 != 0) == flags.isILB { if len(vip) > 0 { if len(plist.FrontendVIPs) == 0 || plist.FrontendVIPs[0] != vip { continue @@ -207,10 +207,30 @@ func (hns hnsV2) getLoadBalancer(endpoints []endpointsInfo, isILB bool, isDSR bo if len(vip) > 0 { vips = append(vips, vip) } + + lbPortMappingFlags := hcn.LoadBalancerPortMappingFlagsNone + if flags.isILB { + lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsILB + } + if flags.useMUX { + lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsUseMux + } + if flags.preserveDIP { + lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsPreserveDIP + } + if flags.localRoutedVIP { + lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsLocalRoutedVIP + } + + lbFlags := hcn.LoadBalancerFlagsNone + if flags.isDSR { + lbFlags |= hcn.LoadBalancerFlagsDSR + } + lb, err := hcn.AddLoadBalancer( hnsEndpoints, - isILB, - isDSR, + lbFlags, + lbPortMappingFlags, sourceVip, vips, protocol, diff --git a/pkg/proxy/winkernel/hns_test.go b/pkg/proxy/winkernel/hns_test.go index 0829291015..fe6b3efe17 100644 --- a/pkg/proxy/winkernel/hns_test.go +++ b/pkg/proxy/winkernel/hns_test.go @@ -354,8 +354,8 @@ func testGetLoadBalancerExisting(t *testing.T, hns HostNetworkService) { Endpoints := []hcn.HostComputeEndpoint{*Endpoint} LoadBalancer, err := hcn.AddLoadBalancer( Endpoints, - false, - false, + hcn.LoadBalancerFlagsNone, + hcn.LoadBalancerPortMappingFlagsNone, sourceVip, []string{serviceVip}, protocol, @@ -371,7 +371,7 @@ func testGetLoadBalancerExisting(t *testing.T, hns HostNetworkService) { hnsID: Endpoint.Id, } endpoints := []endpointsInfo{*endpoint} - lb, err := hns.getLoadBalancer(endpoints, false, false, sourceVip, serviceVip, protocol, internalPort, externalPort) + lb, err := hns.getLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort) if err != nil { t.Error(err) } @@ -419,7 +419,7 @@ func testGetLoadBalancerNew(t *testing.T, hns HostNetworkService) { hnsID: Endpoint.Id, } endpoints := []endpointsInfo{*endpoint} - lb, err := hns.getLoadBalancer(endpoints, false, false, sourceVip, serviceVip, protocol, internalPort, externalPort) + lb, err := hns.getLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort) if err != nil { t.Error(err) } @@ -469,8 +469,8 @@ func testDeleteLoadBalancer(t *testing.T, hns HostNetworkService) { Endpoints := []hcn.HostComputeEndpoint{*Endpoint} LoadBalancer, err := hcn.AddLoadBalancer( Endpoints, - false, - false, + hcn.LoadBalancerFlagsNone, + hcn.LoadBalancerPortMappingFlagsNone, sourceVip, []string{serviceVip}, protocol, diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index 061b8922c0..ee20a8bffe 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -91,6 +91,14 @@ type loadBalancerInfo struct { hnsID string } +type loadBalancerFlags struct { + isILB bool + isDSR bool + localRoutedVIP bool + useMUX bool + preserveDIP bool +} + // internal struct for string service information type serviceInfo struct { clusterIP net.IP @@ -111,6 +119,7 @@ type serviceInfo struct { policyApplied bool remoteEndpoint *endpointsInfo hns HostNetworkService + preserveDIP bool } type hnsNetworkInfo struct { @@ -204,6 +213,14 @@ func newServiceInfo(svcPortName proxy.ServicePortName, port *v1.ServicePort, ser if service.Spec.SessionAffinity == v1.ServiceAffinityClientIP && service.Spec.SessionAffinityConfig != nil { stickyMaxAgeSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds) } + + klog.Infof("Service %q preserve-destination: %v", svcPortName.NamespacedName.String(), service.Annotations["preserve-destination"]) + + preserveDIP := service.Annotations["preserve-destination"] == "true" + err := hcn.DSRSupported() + if err != nil { + preserveDIP = false + } info := &serviceInfo{ clusterIP: net.ParseIP(service.Spec.ClusterIP), port: int(port.Port), @@ -219,6 +236,7 @@ func newServiceInfo(svcPortName proxy.ServicePortName, port *v1.ServicePort, ser loadBalancerSourceRanges: make([]string, len(service.Spec.LoadBalancerSourceRanges)), onlyNodeLocalEndpoints: onlyNodeLocalEndpoints, hns: hns, + preserveDIP: preserveDIP, } copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges) @@ -513,7 +531,7 @@ func NewProxier( var hns HostNetworkService hns = hnsV1{} supportedFeatures := hcn.GetSupportedFeatures() - if supportedFeatures.RemoteSubnet { + if supportedFeatures.Api.V2 { hns = hnsV2{} } @@ -999,6 +1017,7 @@ func (proxier *Proxier) syncProxyRules() { } var hnsEndpoints []endpointsInfo + var hnsLocalEndpoints []endpointsInfo klog.V(4).Infof("====Applying Policy for %s====", svcName) // Create Remote endpoints for every endpoint, corresponding to the service containsPublicIP := false @@ -1087,6 +1106,9 @@ func (proxier *Proxier) syncProxyRules() { // Save the hnsId for reference LogJson(newHnsEndpoint, "Hns Endpoint resource", 1) hnsEndpoints = append(hnsEndpoints, *newHnsEndpoint) + if newHnsEndpoint.isLocal { + hnsLocalEndpoints = append(hnsLocalEndpoints, *newHnsEndpoint) + } ep.hnsID = newHnsEndpoint.hnsID ep.refCount++ Log(ep, "Endpoint resource found", 3) @@ -1112,8 +1134,7 @@ func (proxier *Proxier) syncProxyRules() { } hnsLoadBalancer, err := hns.getLoadBalancer( hnsEndpoints, - false, - proxier.isDSR, + loadBalancerFlags{isDSR: proxier.isDSR}, sourceVip, svcInfo.clusterIP.String(), Enum(svcInfo.protocol), @@ -1132,8 +1153,7 @@ func (proxier *Proxier) syncProxyRules() { if svcInfo.nodePort > 0 { hnsLoadBalancer, err := hns.getLoadBalancer( hnsEndpoints, - false, - false, + loadBalancerFlags{localRoutedVIP: true}, sourceVip, "", Enum(svcInfo.protocol), @@ -1154,8 +1174,7 @@ func (proxier *Proxier) syncProxyRules() { // Try loading existing policies, if already available hnsLoadBalancer, err = hns.getLoadBalancer( hnsEndpoints, - false, - false, + loadBalancerFlags{}, sourceVip, externalIP.ip, Enum(svcInfo.protocol), @@ -1172,10 +1191,13 @@ func (proxier *Proxier) syncProxyRules() { // Create a Load Balancer Policy for each loadbalancer ingress for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs { // Try loading existing policies, if already available + lbIngressEndpoints := hnsEndpoints + if svcInfo.preserveDIP { + lbIngressEndpoints = hnsLocalEndpoints + } hnsLoadBalancer, err := hns.getLoadBalancer( - hnsEndpoints, - false, - false, + lbIngressEndpoints, + loadBalancerFlags{isDSR: svcInfo.preserveDIP || proxier.isDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP}, sourceVip, lbIngressIP.ip, Enum(svcInfo.protocol), diff --git a/pkg/proxy/winkernel/proxier_test.go b/pkg/proxy/winkernel/proxier_test.go index 3bbdc1fef6..3ed78bd54a 100644 --- a/pkg/proxy/winkernel/proxier_test.go +++ b/pkg/proxy/winkernel/proxier_test.go @@ -110,7 +110,7 @@ func (hns fakeHNS) createEndpoint(ep *endpointsInfo, networkName string) (*endpo func (hns fakeHNS) deleteEndpoint(hnsID string) error { return nil } -func (hns fakeHNS) getLoadBalancer(endpoints []endpointsInfo, isILB bool, isDSR bool, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) { +func (hns fakeHNS) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) { return &loadBalancerInfo{ hnsID: guid, }, nil