mirror of https://github.com/k3s-io/k3s
Set loadbalancer flags for DSR
parent
c32c26000d
commit
4e0f63b688
|
@ -33,7 +33,7 @@ type HostNetworkService interface {
|
||||||
getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error)
|
getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error)
|
||||||
createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error)
|
createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error)
|
||||||
deleteEndpoint(hnsID string) error
|
deleteEndpoint(hnsID string) error
|
||||||
getLoadBalancer(endpoints []endpointsInfo, isILB bool, isDSR bool, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error)
|
getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error)
|
||||||
deleteLoadBalancer(hnsID string) error
|
deleteLoadBalancer(hnsID string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -148,13 +148,13 @@ func (hns hnsV1) deleteEndpoint(hnsID string) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hns hnsV1) getLoadBalancer(endpoints []endpointsInfo, isILB bool, isDSR bool, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) {
|
func (hns hnsV1) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) {
|
||||||
plists, err := hcsshim.HNSListPolicyListRequest()
|
plists, err := hcsshim.HNSListPolicyListRequest()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if isDSR {
|
if flags.isDSR {
|
||||||
klog.V(3).Info("DSR is not supported in V1. Using non DSR instead")
|
klog.V(3).Info("DSR is not supported in V1. Using non DSR instead")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -167,7 +167,7 @@ func (hns hnsV1) getLoadBalancer(endpoints []endpointsInfo, isILB bool, isDSR bo
|
||||||
if err = json.Unmarshal(plist.Policies[0], &elbPolicy); err != nil {
|
if err = json.Unmarshal(plist.Policies[0], &elbPolicy); err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if elbPolicy.Protocol == protocol && elbPolicy.InternalPort == internalPort && elbPolicy.ExternalPort == externalPort && elbPolicy.ILB == isILB {
|
if elbPolicy.Protocol == protocol && elbPolicy.InternalPort == internalPort && elbPolicy.ExternalPort == externalPort && elbPolicy.ILB == flags.isILB {
|
||||||
if len(vip) > 0 {
|
if len(vip) > 0 {
|
||||||
if len(elbPolicy.VIPs) == 0 || elbPolicy.VIPs[0] != vip {
|
if len(elbPolicy.VIPs) == 0 || elbPolicy.VIPs[0] != vip {
|
||||||
continue
|
continue
|
||||||
|
@ -190,7 +190,7 @@ func (hns hnsV1) getLoadBalancer(endpoints []endpointsInfo, isILB bool, isDSR bo
|
||||||
}
|
}
|
||||||
lb, err := hcsshim.AddLoadBalancer(
|
lb, err := hcsshim.AddLoadBalancer(
|
||||||
hnsEndpoints,
|
hnsEndpoints,
|
||||||
isILB,
|
flags.isILB,
|
||||||
sourceVip,
|
sourceVip,
|
||||||
vip,
|
vip,
|
||||||
protocol,
|
protocol,
|
||||||
|
|
|
@ -169,7 +169,7 @@ func (hns hnsV2) deleteEndpoint(hnsID string) error {
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
func (hns hnsV2) getLoadBalancer(endpoints []endpointsInfo, isILB bool, isDSR bool, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) {
|
func (hns hnsV2) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) {
|
||||||
plists, err := hcn.ListLoadBalancers()
|
plists, err := hcn.ListLoadBalancers()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -181,7 +181,7 @@ func (hns hnsV2) getLoadBalancer(endpoints []endpointsInfo, isILB bool, isDSR bo
|
||||||
}
|
}
|
||||||
// Validate if input meets any of the policy lists
|
// Validate if input meets any of the policy lists
|
||||||
lbPortMapping := plist.PortMappings[0]
|
lbPortMapping := plist.PortMappings[0]
|
||||||
if lbPortMapping.Protocol == uint32(protocol) && lbPortMapping.InternalPort == internalPort && lbPortMapping.ExternalPort == externalPort && (lbPortMapping.Flags&1 != 0) == isILB {
|
if lbPortMapping.Protocol == uint32(protocol) && lbPortMapping.InternalPort == internalPort && lbPortMapping.ExternalPort == externalPort && (lbPortMapping.Flags&1 != 0) == flags.isILB {
|
||||||
if len(vip) > 0 {
|
if len(vip) > 0 {
|
||||||
if len(plist.FrontendVIPs) == 0 || plist.FrontendVIPs[0] != vip {
|
if len(plist.FrontendVIPs) == 0 || plist.FrontendVIPs[0] != vip {
|
||||||
continue
|
continue
|
||||||
|
@ -207,10 +207,30 @@ func (hns hnsV2) getLoadBalancer(endpoints []endpointsInfo, isILB bool, isDSR bo
|
||||||
if len(vip) > 0 {
|
if len(vip) > 0 {
|
||||||
vips = append(vips, vip)
|
vips = append(vips, vip)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
lbPortMappingFlags := hcn.LoadBalancerPortMappingFlagsNone
|
||||||
|
if flags.isILB {
|
||||||
|
lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsILB
|
||||||
|
}
|
||||||
|
if flags.useMUX {
|
||||||
|
lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsUseMux
|
||||||
|
}
|
||||||
|
if flags.preserveDIP {
|
||||||
|
lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsPreserveDIP
|
||||||
|
}
|
||||||
|
if flags.localRoutedVIP {
|
||||||
|
lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsLocalRoutedVIP
|
||||||
|
}
|
||||||
|
|
||||||
|
lbFlags := hcn.LoadBalancerFlagsNone
|
||||||
|
if flags.isDSR {
|
||||||
|
lbFlags |= hcn.LoadBalancerFlagsDSR
|
||||||
|
}
|
||||||
|
|
||||||
lb, err := hcn.AddLoadBalancer(
|
lb, err := hcn.AddLoadBalancer(
|
||||||
hnsEndpoints,
|
hnsEndpoints,
|
||||||
isILB,
|
lbFlags,
|
||||||
isDSR,
|
lbPortMappingFlags,
|
||||||
sourceVip,
|
sourceVip,
|
||||||
vips,
|
vips,
|
||||||
protocol,
|
protocol,
|
||||||
|
|
|
@ -354,8 +354,8 @@ func testGetLoadBalancerExisting(t *testing.T, hns HostNetworkService) {
|
||||||
Endpoints := []hcn.HostComputeEndpoint{*Endpoint}
|
Endpoints := []hcn.HostComputeEndpoint{*Endpoint}
|
||||||
LoadBalancer, err := hcn.AddLoadBalancer(
|
LoadBalancer, err := hcn.AddLoadBalancer(
|
||||||
Endpoints,
|
Endpoints,
|
||||||
false,
|
hcn.LoadBalancerFlagsNone,
|
||||||
false,
|
hcn.LoadBalancerPortMappingFlagsNone,
|
||||||
sourceVip,
|
sourceVip,
|
||||||
[]string{serviceVip},
|
[]string{serviceVip},
|
||||||
protocol,
|
protocol,
|
||||||
|
@ -371,7 +371,7 @@ func testGetLoadBalancerExisting(t *testing.T, hns HostNetworkService) {
|
||||||
hnsID: Endpoint.Id,
|
hnsID: Endpoint.Id,
|
||||||
}
|
}
|
||||||
endpoints := []endpointsInfo{*endpoint}
|
endpoints := []endpointsInfo{*endpoint}
|
||||||
lb, err := hns.getLoadBalancer(endpoints, false, false, sourceVip, serviceVip, protocol, internalPort, externalPort)
|
lb, err := hns.getLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
@ -419,7 +419,7 @@ func testGetLoadBalancerNew(t *testing.T, hns HostNetworkService) {
|
||||||
hnsID: Endpoint.Id,
|
hnsID: Endpoint.Id,
|
||||||
}
|
}
|
||||||
endpoints := []endpointsInfo{*endpoint}
|
endpoints := []endpointsInfo{*endpoint}
|
||||||
lb, err := hns.getLoadBalancer(endpoints, false, false, sourceVip, serviceVip, protocol, internalPort, externalPort)
|
lb, err := hns.getLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
@ -469,8 +469,8 @@ func testDeleteLoadBalancer(t *testing.T, hns HostNetworkService) {
|
||||||
Endpoints := []hcn.HostComputeEndpoint{*Endpoint}
|
Endpoints := []hcn.HostComputeEndpoint{*Endpoint}
|
||||||
LoadBalancer, err := hcn.AddLoadBalancer(
|
LoadBalancer, err := hcn.AddLoadBalancer(
|
||||||
Endpoints,
|
Endpoints,
|
||||||
false,
|
hcn.LoadBalancerFlagsNone,
|
||||||
false,
|
hcn.LoadBalancerPortMappingFlagsNone,
|
||||||
sourceVip,
|
sourceVip,
|
||||||
[]string{serviceVip},
|
[]string{serviceVip},
|
||||||
protocol,
|
protocol,
|
||||||
|
|
|
@ -91,6 +91,14 @@ type loadBalancerInfo struct {
|
||||||
hnsID string
|
hnsID string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type loadBalancerFlags struct {
|
||||||
|
isILB bool
|
||||||
|
isDSR bool
|
||||||
|
localRoutedVIP bool
|
||||||
|
useMUX bool
|
||||||
|
preserveDIP bool
|
||||||
|
}
|
||||||
|
|
||||||
// internal struct for string service information
|
// internal struct for string service information
|
||||||
type serviceInfo struct {
|
type serviceInfo struct {
|
||||||
clusterIP net.IP
|
clusterIP net.IP
|
||||||
|
@ -111,6 +119,7 @@ type serviceInfo struct {
|
||||||
policyApplied bool
|
policyApplied bool
|
||||||
remoteEndpoint *endpointsInfo
|
remoteEndpoint *endpointsInfo
|
||||||
hns HostNetworkService
|
hns HostNetworkService
|
||||||
|
preserveDIP bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type hnsNetworkInfo struct {
|
type hnsNetworkInfo struct {
|
||||||
|
@ -204,6 +213,14 @@ func newServiceInfo(svcPortName proxy.ServicePortName, port *v1.ServicePort, ser
|
||||||
if service.Spec.SessionAffinity == v1.ServiceAffinityClientIP && service.Spec.SessionAffinityConfig != nil {
|
if service.Spec.SessionAffinity == v1.ServiceAffinityClientIP && service.Spec.SessionAffinityConfig != nil {
|
||||||
stickyMaxAgeSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds)
|
stickyMaxAgeSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
klog.Infof("Service %q preserve-destination: %v", svcPortName.NamespacedName.String(), service.Annotations["preserve-destination"])
|
||||||
|
|
||||||
|
preserveDIP := service.Annotations["preserve-destination"] == "true"
|
||||||
|
err := hcn.DSRSupported()
|
||||||
|
if err != nil {
|
||||||
|
preserveDIP = false
|
||||||
|
}
|
||||||
info := &serviceInfo{
|
info := &serviceInfo{
|
||||||
clusterIP: net.ParseIP(service.Spec.ClusterIP),
|
clusterIP: net.ParseIP(service.Spec.ClusterIP),
|
||||||
port: int(port.Port),
|
port: int(port.Port),
|
||||||
|
@ -219,6 +236,7 @@ func newServiceInfo(svcPortName proxy.ServicePortName, port *v1.ServicePort, ser
|
||||||
loadBalancerSourceRanges: make([]string, len(service.Spec.LoadBalancerSourceRanges)),
|
loadBalancerSourceRanges: make([]string, len(service.Spec.LoadBalancerSourceRanges)),
|
||||||
onlyNodeLocalEndpoints: onlyNodeLocalEndpoints,
|
onlyNodeLocalEndpoints: onlyNodeLocalEndpoints,
|
||||||
hns: hns,
|
hns: hns,
|
||||||
|
preserveDIP: preserveDIP,
|
||||||
}
|
}
|
||||||
|
|
||||||
copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges)
|
copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges)
|
||||||
|
@ -513,7 +531,7 @@ func NewProxier(
|
||||||
var hns HostNetworkService
|
var hns HostNetworkService
|
||||||
hns = hnsV1{}
|
hns = hnsV1{}
|
||||||
supportedFeatures := hcn.GetSupportedFeatures()
|
supportedFeatures := hcn.GetSupportedFeatures()
|
||||||
if supportedFeatures.RemoteSubnet {
|
if supportedFeatures.Api.V2 {
|
||||||
hns = hnsV2{}
|
hns = hnsV2{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -999,6 +1017,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||||
}
|
}
|
||||||
|
|
||||||
var hnsEndpoints []endpointsInfo
|
var hnsEndpoints []endpointsInfo
|
||||||
|
var hnsLocalEndpoints []endpointsInfo
|
||||||
klog.V(4).Infof("====Applying Policy for %s====", svcName)
|
klog.V(4).Infof("====Applying Policy for %s====", svcName)
|
||||||
// Create Remote endpoints for every endpoint, corresponding to the service
|
// Create Remote endpoints for every endpoint, corresponding to the service
|
||||||
containsPublicIP := false
|
containsPublicIP := false
|
||||||
|
@ -1087,6 +1106,9 @@ func (proxier *Proxier) syncProxyRules() {
|
||||||
// Save the hnsId for reference
|
// Save the hnsId for reference
|
||||||
LogJson(newHnsEndpoint, "Hns Endpoint resource", 1)
|
LogJson(newHnsEndpoint, "Hns Endpoint resource", 1)
|
||||||
hnsEndpoints = append(hnsEndpoints, *newHnsEndpoint)
|
hnsEndpoints = append(hnsEndpoints, *newHnsEndpoint)
|
||||||
|
if newHnsEndpoint.isLocal {
|
||||||
|
hnsLocalEndpoints = append(hnsLocalEndpoints, *newHnsEndpoint)
|
||||||
|
}
|
||||||
ep.hnsID = newHnsEndpoint.hnsID
|
ep.hnsID = newHnsEndpoint.hnsID
|
||||||
ep.refCount++
|
ep.refCount++
|
||||||
Log(ep, "Endpoint resource found", 3)
|
Log(ep, "Endpoint resource found", 3)
|
||||||
|
@ -1112,8 +1134,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||||
}
|
}
|
||||||
hnsLoadBalancer, err := hns.getLoadBalancer(
|
hnsLoadBalancer, err := hns.getLoadBalancer(
|
||||||
hnsEndpoints,
|
hnsEndpoints,
|
||||||
false,
|
loadBalancerFlags{isDSR: proxier.isDSR},
|
||||||
proxier.isDSR,
|
|
||||||
sourceVip,
|
sourceVip,
|
||||||
svcInfo.clusterIP.String(),
|
svcInfo.clusterIP.String(),
|
||||||
Enum(svcInfo.protocol),
|
Enum(svcInfo.protocol),
|
||||||
|
@ -1132,8 +1153,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||||
if svcInfo.nodePort > 0 {
|
if svcInfo.nodePort > 0 {
|
||||||
hnsLoadBalancer, err := hns.getLoadBalancer(
|
hnsLoadBalancer, err := hns.getLoadBalancer(
|
||||||
hnsEndpoints,
|
hnsEndpoints,
|
||||||
false,
|
loadBalancerFlags{localRoutedVIP: true},
|
||||||
false,
|
|
||||||
sourceVip,
|
sourceVip,
|
||||||
"",
|
"",
|
||||||
Enum(svcInfo.protocol),
|
Enum(svcInfo.protocol),
|
||||||
|
@ -1154,8 +1174,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||||
// Try loading existing policies, if already available
|
// Try loading existing policies, if already available
|
||||||
hnsLoadBalancer, err = hns.getLoadBalancer(
|
hnsLoadBalancer, err = hns.getLoadBalancer(
|
||||||
hnsEndpoints,
|
hnsEndpoints,
|
||||||
false,
|
loadBalancerFlags{},
|
||||||
false,
|
|
||||||
sourceVip,
|
sourceVip,
|
||||||
externalIP.ip,
|
externalIP.ip,
|
||||||
Enum(svcInfo.protocol),
|
Enum(svcInfo.protocol),
|
||||||
|
@ -1172,10 +1191,13 @@ func (proxier *Proxier) syncProxyRules() {
|
||||||
// Create a Load Balancer Policy for each loadbalancer ingress
|
// Create a Load Balancer Policy for each loadbalancer ingress
|
||||||
for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs {
|
for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs {
|
||||||
// Try loading existing policies, if already available
|
// Try loading existing policies, if already available
|
||||||
|
lbIngressEndpoints := hnsEndpoints
|
||||||
|
if svcInfo.preserveDIP {
|
||||||
|
lbIngressEndpoints = hnsLocalEndpoints
|
||||||
|
}
|
||||||
hnsLoadBalancer, err := hns.getLoadBalancer(
|
hnsLoadBalancer, err := hns.getLoadBalancer(
|
||||||
hnsEndpoints,
|
lbIngressEndpoints,
|
||||||
false,
|
loadBalancerFlags{isDSR: svcInfo.preserveDIP || proxier.isDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP},
|
||||||
false,
|
|
||||||
sourceVip,
|
sourceVip,
|
||||||
lbIngressIP.ip,
|
lbIngressIP.ip,
|
||||||
Enum(svcInfo.protocol),
|
Enum(svcInfo.protocol),
|
||||||
|
|
|
@ -110,7 +110,7 @@ func (hns fakeHNS) createEndpoint(ep *endpointsInfo, networkName string) (*endpo
|
||||||
func (hns fakeHNS) deleteEndpoint(hnsID string) error {
|
func (hns fakeHNS) deleteEndpoint(hnsID string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
func (hns fakeHNS) getLoadBalancer(endpoints []endpointsInfo, isILB bool, isDSR bool, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) {
|
func (hns fakeHNS) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) {
|
||||||
return &loadBalancerInfo{
|
return &loadBalancerInfo{
|
||||||
hnsID: guid,
|
hnsID: guid,
|
||||||
}, nil
|
}, nil
|
||||||
|
|
Loading…
Reference in New Issue